Source code for streamad.model.KNN_Detector

from collections import deque
from copy import deepcopy

import numpy as np
from scipy.spatial.distance import cdist
from streamad.base import BaseDetector


[docs]class KNNDetector(BaseDetector):
[docs] def __init__(self, k_neighbor: int = 5, **kwargs): """Univariate KNN-CAD model with mahalanobis distance :cite:`DBLP:journals/corr/BurnaevI16`. Args: k_neighbor (int, optional): The number of neighbors to cumulate distances. Defaults to 5. """ super().__init__(data_type="univariate", **kwargs) self.window = deque(maxlen=int(np.sqrt(self.window_len))) self.buffer = deque(maxlen=self.window_len - self.window.maxlen) assert ( k_neighbor < self.buffer.maxlen ), "k_neighbor must be less than the length of buffer" self.k = k_neighbor
def fit(self, X: np.ndarray, timestamp: int = None): self.window.append(X[0]) if len(self.window) == self.window.maxlen: self.buffer.append(deepcopy(self.window)) return self def score(self, X: np.ndarray, timestamp: int = None) -> float: window = deepcopy(self.window) window.pop() window.append(X[0]) try: dist = cdist(np.array([window]), self.buffer, metric="mahalanobis")[ 0 ] except: dist = cdist( np.array([window]), self.buffer, metric="mahalanobis", VI=np.linalg.pinv(self.buffer), )[0] score = np.sum(np.partition(np.array(dist), self.k + 1)[1 : self.k + 1]) return float(score)