Source code for streamad.model.xStream_Detector

from streamad.base import BaseDetector
import numpy as np
import mmh3
from math import floor


[docs]class xStreamDetector(BaseDetector):
[docs] def __init__( self, n_components: int = 8, n_chains: int = 8, depth: int = 8, **kwargs, ): """Multivariate xStreamDetector :cite:`DBLP:conf/kdd/ManzoorLA18`. Args: n_components (int, optional): Number of streamhash projection, similar to feature numbers. Defaults to 50. n_chains (int, optional): Number of half-space chains. Defaults to 8. depth (int, optional): Maximum depth for each chain. Defaults to 8. """ super().__init__(data_type="multivariate", **kwargs) self.projector = StreamhashProjector( num_components=n_components, density=1 / 3.0 ) self.cur_window = [] self.ref_window = [] delta = np.ones(n_components) * 0.5 self.hs_chains = _hsChains( deltamax=delta, n_chains=n_chains, depth=depth )
def fit(self, X: np.ndarray, timestamp: int = None): projected_X = self.projector.transform(X) self.cur_window.append(projected_X) self.hs_chains.fit(projected_X) self.ref_window = self.cur_window self.cur_window = [] deltamax = np.ptp(self.ref_window, axis=0) / 2.0 deltamax[np.abs(deltamax) <= 0.0001] = 1.0 self.hs_chains.set_deltamax(deltamax=deltamax) return self def score(self, X: np.ndarray, timestamp: int = None): projected_X = self.projector.transform(X) score = -1.0 * self.hs_chains.score_chains(projected_X) return score
class _Chain: def __init__(self, deltamax, depth): self.depth = depth self.deltamax = deltamax self.rand = np.random.rand(len(deltamax)) self.rand_shift = self.rand * deltamax self.cmsketch_ref = [{} for _ in range(depth)] * depth self.is_first_window = True self.fs = [np.random.randint(0, len(deltamax)) for _ in range(depth)] @staticmethod def float_to_int(x): return x // 1 def bincount(self, X): scores = np.zeros(self.depth) prebins = np.zeros(X.shape[0], dtype=float) depthcount = np.zeros(len(self.deltamax), dtype=int) for depth in range(self.depth): f = self.fs[depth] depthcount[f] += 1 if depthcount[f] == 1: prebins[f] = X[f] + self.rand_shift[f] / self.deltamax[f] else: prebins[f] = ( 2.0 * prebins[f] - self.rand_shift[f] / self.deltamax[f] ) cmsketch = self.cmsketch_ref[depth] l = tuple(map(floor, prebins)) if l in cmsketch: scores[depth] = cmsketch[l] else: scores[depth] = 0.0 return scores def score(self, X): scores = self.bincount(X) depths = np.arange(1, self.depth + 1) scores = np.log2(1.0 + scores) + depths return np.min(scores) def fit(self, X): prebins = np.zeros(X.shape, dtype=float) depthcount = np.zeros(len(self.deltamax), dtype=int) for depth in range(self.depth): f = self.fs[depth] depthcount[f] += 1 if depthcount[f] == 1: prebins[f] = (X[f] + self.rand_shift[f]) / self.deltamax[f] else: prebins[f] = ( 2.0 * prebins[f] - self.rand_shift[f] / self.deltamax[f] ) if self.is_first_window: cmsketch = self.cmsketch_ref[depth] l = tuple(map(floor, prebins)) if l not in cmsketch: cmsketch[l] = 0 cmsketch[l] += 1 self.cmsketch_ref[depth] = cmsketch else: cmsketch = self.cmsketch_ref[depth] l = tuple(map(floor, prebins)) if l not in cmsketch: cmsketch[l] = 0 cmsketch[l] += 1 self.cmsketch_ref[depth] = cmsketch return self class _hsChains: def __init__(self, deltamax, n_chains: int = 100, depth: int = 25) -> None: self.nchains = n_chains self.depth = depth self.chains = [_Chain(deltamax, depth) for _ in range(n_chains)] def score_chains(self, X): scores = 0 for chain in self.chains: scores += chain.score(X) scores = float(scores) / float(self.nchains) return scores def fit(self, X): # for chain in self.chains: # chain.fit(X) list(map(lambda x: x.fit(X), self.chains)) def set_deltamax(self, deltamax): # list(map(lambda x: x.deltamax = deltamax, self.chains)) # list(map(lambda x: x.rand_shift = x.rand * deltamax, self.chains)) for chain in self.chains: chain.deltamax = deltamax chain.rand_shift = chain.rand * deltamax class StreamhashProjector: def __init__(self, num_components, density=1 / 3.0): self.keys = np.arange(0, num_components, 1) self.constant = np.sqrt(1.0 / density) / np.sqrt(num_components) self.density = density self.n_components = num_components def transform(self, X): """Projects particular (next) timestep's vector to (possibly) lower dimensional space. Args: X (float array of shape (num_features,)): Input feature vector. Returns: projected_X (float array of shape (num_components,)): Projected feature vector. """ ndim = X.shape[0] feature_names = [str(i) for i in range(ndim)] R = np.array( [ [self._hash_string(k, f) for f in feature_names] for k in self.keys ] ) Y = np.dot(X, R.T).squeeze() return Y def _hash_string(self, k, s): hash_value = int(mmh3.hash(s, signed=False, seed=k)) / (2.0**32 - 1) s = self.density if hash_value <= s / 2.0: return -1 * self.constant elif hash_value <= s: return self.constant else: return 0 if __name__ == "__main__": import cProfile from line_profiler import LineProfiler lp = LineProfiler() model = xStreamDetector() lp.add_function(_Chain.fit) lp.add_function(_Chain.score) lp.add_function(_Chain.bincount) # lp.add_function(model.fit) # lp.add_function(model.score) lp_wrapper = lp(model.fit_score) for i in range(1500): # lp_wrapper(np.array([i])) model.fit_score(np.array([i])) lp.print_stats()