Source code for streamad.model.rrcf_Detector

from collections import deque

import numpy as np
import rrcf
from streamad.base import BaseDetector
from copy import deepcopy


[docs]class RrcfDetector(BaseDetector):
[docs] def __init__(self, num_trees=10, tree_size=12, **kwargs): """Rrcf detector :cite:`DBLP:conf/icml/GuhaMRS16`. Args: window_len (int, optional): Length of sliding window. Defaults to 50. num_trees (int, optional): Number of trees. Defaults to 10. tree_size (int, optional): Size of each tree. Defaults to 12. """ super().__init__(data_type="multivariate", **kwargs) self.num_trees = num_trees self.tree_size = tree_size self.forest = [] for _ in range(num_trees): tree = rrcf.RCTree() self.forest.append(tree) self.avg_codisp = {} self.shingle = deque(maxlen=int(np.sqrt(self.window_len)))
def fit(self, X: np.ndarray, timestamp: int = None): self.shingle.append(X) if not self.forest[0].ndim: dim = X.shape[0] for tree in self.forest: tree.ndim = dim if self.shingle.maxlen == len(self.shingle): if self.index > (self.shingle.maxlen + self.tree_size): list( map( lambda x: x.forget_point(self.index - self.tree_size), self.forest, ) ) list( map( lambda x: x.insert_point(self.shingle, self.index), self.forest, ) ) return self def score(self, X: np.ndarray, timestamp: int = None): score_list = list(map(lambda x: x.codisp(self.index), self.forest)) score = sum(score_list) / self.num_trees return float(score)
if __name__ == "__main__": import cProfile from line_profiler import LineProfiler lp = LineProfiler() model = RrcfDetector() # lp.add_function(_Chain.fit) # lp.add_function(_Chain.score) # lp.add_function(_Chain.bincount) lp.add_function(model.fit) lp.add_function(model.score) lp_wrapper = lp(model.fit_score) for i in range(1500): lp_wrapper(np.array([i])) # model.fit_score(np.array([i])) lp.print_stats()