Source code for streamad.model.KNN_Detector

from collections import deque
from copy import deepcopy

import numpy as np
from scipy.spatial.distance import cdist
from streamad.base import BaseDetector


[docs]class KNNDetector(BaseDetector): """Univariate KNN-CAD model with mahalanobis distance. :cite:`DBLP:journals/corr/BurnaevI16`. See `KNN-CAD <https://arxiv.org/abs/1608.04585>`_"""
[docs] def __init__( self, window_len: int = 100, buffer_len=200, k_neighbor: int = 3 ): """KNN anomaly detector with mahalanobis distance. Args: window_len (int, optional): The length of window. Defaults to 100. buffer_len (int, optional): The length of references. Defaults to 200. k_neighbor (int, optional): The number of neighbors to cumulate distances. Defaults to 3. """ assert ( k_neighbor < buffer_len ), "k_neighbor must be less than buffer_len" self.data_type = "univariate" self.buffer = deque(maxlen=buffer_len) self.window = deque(maxlen=window_len) self.scores = deque(maxlen=window_len) self.k = k_neighbor
[docs] def fit(self, X: np.ndarray): """Record and analyse the current observation from the stream. Detector collect the init data firstly, and further score observation base on the observed data. Args: X (np.ndarray): Current observation. """ self.window.append(X[0]) if len(self.window) == self.window.maxlen: self.buffer.append(deepcopy(self.window)) if len(self.buffer) == self.buffer.maxlen: if len(self.scores) == 0: all_dist = cdist(self.buffer, self.buffer, metric="mahalanobis") for dist in all_dist: d = np.sum( np.partition(np.array(dist), self.k + 1)[1 : self.k + 1] ) self.scores.append(d) else: dist = cdist( np.array([self.window]), self.buffer, metric="mahalanobis" )[0] d = np.sum( np.partition(np.array(dist), self.k + 1)[1 : self.k + 1] ) self.scores.append(d) return self
[docs] def score(self, X) -> float: """Score the current observation. None for init period and float for the score of anomalies. Args: X (np.ndarray): Current observation. Returns: float: Anomaly probability. """ if ( len(self.window) + len(self.buffer) < self.window.maxlen + self.buffer.maxlen ): return None score = self.scores[-1] scores = np.array(self.scores) score_mean = np.mean(scores[:-1]) score_std = np.std(scores[:-1]) prob = (score - score_mean) / score_std if prob > 3: max_score = max(scores[:-1]) prob = (score - score_mean) / (max_score - score_mean) else: return 0 return abs(prob)