Source code for streamad.model.rshash_Detector

import numpy as np
from streamad.base import BaseDetector
from streamad.util import StreamStatistic
from collections import deque


[docs]class RShashDetector(BaseDetector):
[docs] def __init__( self, decay=0.015, components_num=10, hash_num: int = 10, **kwargs ): """Multivariate RSHashDetector :cite:`DBLP:conf/icdm/SatheA16`. Args: window_len (int, optional): Length of data to burn in/init. Defaults to 50. decay (float, optional): Decay ratio. Defaults to 0.015. components_num (int, optional): Number of components. Defaults to 10. hash_num (int, optional): Number of hash functions. Defaults to 10. """ super().__init__(data_type="multivariate", **kwargs) self.decay = decay self.data_stats = StreamStatistic() self.hash_num = hash_num self.components_num = components_num self.cmsketches = [{} for _ in range(hash_num)] self.alpha = None self.effective_s = max(1000, 1.0 / (1 - np.power(2, -self.decay))) self.f = np.random.uniform( low=1.0 / np.sqrt(self.effective_s), high=1 - (1.0 / np.sqrt(self.effective_s)), size=self.components_num, )
def _burn_in(self): # Normalized the init data buffer = np.array(self.window) buffer_normalized = np.divide( buffer - self.data_stats.get_min(), self.data_stats.get_max() - self.data_stats.get_min(), out=np.zeros_like(buffer).astype(float), where=self.data_stats.get_max() - self.data_stats.get_min() != 0, ) buffer_normalized[np.abs(buffer_normalized) == np.inf] = 0 for r in range(self.components_num): for i in range(buffer.shape[0]): Y = np.floor( (buffer_normalized[i, :] + np.array(self.alpha[r])) / self.f[r] ) # mod_entry = np.insert(Y, 0, r) mod_entry = np.concatenate(([r], Y)) mod_entry = tuple(mod_entry.astype(int)) for w in range(self.hash_num): try: value = self.cmsketches[w][mod_entry] except KeyError: value = (0, 0) value = (0, value[1] + 1) self.cmsketches[w][mod_entry] = value def fit(self, X: np.ndarray, timestamp: int = None): if self.index == 0: self.alpha = [ np.random.uniform(low=0, high=self.f[r], size=len(X)) for r in range(self.components_num) ] self.data_stats.update(X) if self.index == self.window.maxlen - 1: self._burn_in() if len(self.window) < self.window.maxlen: self.window.append(X) return self return self def score(self, X: np.ndarray, timestamp: int = None) -> float: X_normalized = np.divide( X - self.data_stats.get_min(), self.data_stats.get_max() - self.data_stats.get_min(), out=np.zeros_like(X).astype(float), where=self.data_stats.get_max() - self.data_stats.get_min() != 0, ) X_normalized[np.abs(X_normalized) == np.inf] = 0 score_instance = 0 for r in range(self.components_num): Y = np.floor((X_normalized + np.array(self.alpha[r])) / self.f[r]) # mod_entry = np.insert(Y, 0, r) mod_entry = np.concatenate(([r], Y)) mod_entry = tuple(mod_entry.astype(int)) c = [] for w in range(len(self.cmsketches)): try: value = self.cmsketches[w][mod_entry] except KeyError: value = (self.index, 0) tstamp = value[0] wt = value[1] new_wt = wt * np.power(2, -self.decay * (self.index - tstamp)) c.append(new_wt) new_tstamp = self.index self.cmsketches[w][mod_entry] = (new_tstamp, new_wt + 1) min_c = min(c) c = np.log(1 + min_c) score_instance += c score = score_instance / self.components_num return float(score)