Source code for streamad.model.rrcf_Detector
from collections import deque
import numpy as np
import rrcf
from streamad.base import BaseDetector
from copy import deepcopy
[docs]class RrcfDetector(BaseDetector):
[docs] def __init__(self, window_len=100, num_trees=40, tree_size=256):
"""Rrcf detector :cite:`DBLP:conf/icml/GuhaMRS16`.
Args:
window_len (int, optional): Length of sliding window. Defaults to 100.
num_trees (int, optional): Number of trees. Defaults to 40.
tree_size (int, optional): Size of each tree. Defaults to 256.
"""
super().__init__()
self.num_trees = num_trees
self.window_len = window_len
self.tree_size = tree_size
self.forest = []
for _ in range(num_trees):
tree = rrcf.RCTree()
self.forest.append(tree)
self.avg_codisp = {}
self.shingle = deque(maxlen=int(np.sqrt(window_len)))
self.shingle.extend([0] * int(np.sqrt(window_len)))
def fit(self, X: np.ndarray):
self.shingle.append(X[0])
# if self.index < self.window_len:
# return self
for tree in self.forest:
if len(tree.leaves) > self.tree_size:
tree.forget_point(self.index - self.tree_size)
tree.insert_point(self.shingle, self.index)
return self
def score(self, X: np.ndarray) -> float:
score_list = []
shingle = deepcopy(self.shingle)
shingle.pop()
shingle.append(X[0])
for tree in self.forest:
try:
query_idx = tree.query(X[0])
score_list.append(tree.codisp(query_idx))
except:
tree.insert_point(shingle, "tmp")
score_list.append(tree.codisp("tmp"))
tree.forget_point("tmp")
score = np.mean(score_list)
return float(score)