Source code for streamad.model.spot_Detector

from streamad.base import BaseDetector
import numpy as np
from math import log
from scipy.optimize import minimize
from collections import deque
import heapq

np.seterr(divide="ignore", invalid="ignore")


[docs]class SpotDetector(BaseDetector):
[docs] def __init__( self, prob: float = 1e-4, back_mean_len: int = 20, num_threshold_up: int = 20, num_threshold_down: int = 20, deviance_ratio: float = 0.01, global_memory: bool = True, **kwargs ): """Univariate Spot model :cite:`DBLP:conf/kdd/SifferFTL17`. Args: prob (float, optional): Threshold for the probability of anomalies, a small float value. Defaults to 1e-4.. Defaults to 1e-4. back_mean_len (int, optional): The length of backward window to calculate the first-order difference. Defaults to 20. num_threshold_up (int, optional): Number of peaks over upper threshold to estimate distribution. Defaults to 20. num_threshold_down (int, optional): Number of peaks over lower threshold to estimate distribution. Defaults to 20. deviance_ratio (float, optional): Deviance ratio aginest the absolute value of data, which is useful when the value is very large and deviances are small. Defaults to 0.01. window_len (int, optional): Length of the window for reference. Defaults to 200. """ super().__init__(data_type="univariate", **kwargs) self.prob = prob self.deviance_ratio = deviance_ratio self.global_memory = global_memory # self.window = deque(maxlen=self.window_len) self.back_mean_len = back_mean_len self.back_mean_window = deque(maxlen=self.back_mean_len) # self.window_len = self.window_len - self.back_mean_len assert ( self.window_len > 0 ), "window_len is too small, default value is 200" self.num_threshold = { "up": num_threshold_up, "down": num_threshold_down, } nonedict = {"up": None, "down": None} self.extreme_quantile = dict.copy(nonedict) self.init_threshold = dict.copy(nonedict) self.peaks = dict.copy(nonedict) self.history_peaks = {"up": [], "down": []} # self.peaks = {'up':deque(maxlen=20),'down':deque(maxlen=20)} self.gamma = dict.copy(nonedict) self.sigma = dict.copy(nonedict) self.normal_X = None
# self.thup = [] # self.thdown = [] def _grimshaw(self, side, epsilon=1e-8, n_points=10): def u(s): return 1 + np.log(s).mean() def v(s): return np.mean(1 / s) def w(Y, t): s = 1 + t * Y us = u(s) vs = v(s) return us * vs - 1 def jac_w(Y, t): s = 1 + t * Y us = u(s) vs = v(s) jac_us = np.divide( 1, t, out=np.array(1 / epsilon), where=t != 0 ) * (1 - vs) jac_vs = np.divide( 1, t, out=np.array(1 / epsilon), where=t != 0 ) * (-vs + np.mean(1 / s**2)) return us * jac_vs + vs * jac_us self.peaks[side][self.peaks[side] == 0] = epsilon Ym = self.peaks[side].min() YM = self.peaks[side].max() Ymean = self.peaks[side].mean() a = np.divide(-1, YM, out=np.array(-epsilon), where=YM != 0) if abs(a) < 2 * epsilon: epsilon = abs(a) / n_points # a = a + epsilon b = 2 * np.divide( (Ymean - Ym), (Ymean * Ym), out=np.array((Ymean - Ym) / epsilon - epsilon), where=(Ymean * Ym) != 0, ) c = 2 * np.divide( Ymean - Ym, Ym**2, out=np.array((Ymean - Ym) / epsilon + epsilon), where=Ym != 0, ) d = a + epsilon e = -epsilon left_zeros = self._rootsFinder( lambda t: w(self.peaks[side], t), lambda t: jac_w(self.peaks[side], t), (d, e) if d < e else (e, d), n_points, "regular", ) right_zeros = self._rootsFinder( lambda t: w(self.peaks[side], t), lambda t: jac_w(self.peaks[side], t), (b, c) if b < c else (c, b), n_points, "regular", ) # all the possible roots zeros = np.concatenate((left_zeros, right_zeros)) # 0 is always a solution so we initialize with it gamma_best = 0 sigma_best = Ymean ll_best = self._log_likelihood(self.peaks[side], gamma_best, sigma_best) # we look for better candidates for z in zeros: gamma = u(1 + z * self.peaks[side]) - 1 sigma = np.divide( gamma, z, out=np.array(gamma / epsilon), where=z != 0 ) ll = self._log_likelihood(self.peaks[side], gamma, sigma) if ll > ll_best: gamma_best = gamma sigma_best = sigma ll_best = ll return gamma_best, sigma_best, ll_best def _rootsFinder(self, fun, jac, bounds, npoints, method): """ Find possible roots of a scalar function Parameters ---------- fun : function scalar function jac : function first order derivative of the function bounds : tuple (min,max) interval for the roots search npoints : int maximum number of roots to output method : str 'regular' : regular sample of the search interval, 'random' : uniform (distribution) sample of the search interval Returns ---------- numpy.array possible roots of the function """ if method == "regular": step = (bounds[1] - bounds[0]) / (npoints + 1) try: X0 = np.arange(bounds[0] + step, bounds[1], step) except: X0 = np.random.uniform(bounds[0], bounds[1], npoints) elif method == "random": X0 = np.random.uniform(bounds[0], bounds[1], npoints) def objFun(X, f, jac): g = 0 j = np.zeros(X.shape) i = 0 for x in X: fx = f(x) g = g + fx**2 j[i] = 2 * fx * jac(x) i = i + 1 return g, j opt = minimize( lambda X: objFun(X, fun, jac), X0, method="L-BFGS-B", jac=True, bounds=[bounds] * len(X0), ) X = opt.x np.round(X, decimals=5) return np.unique(X) def _log_likelihood(self, Y, gamma, sigma): """ Compute the log-likelihood for the Generalized Pareto Distribution (μ=0) Parameters ---------- Y : numpy.array observations gamma : float GPD index parameter sigma : float GPD scale parameter (>0) Returns ---------- float log-likelihood of the sample Y to be drawn from a GPD(γ,σ,μ=0) """ n = Y.size if gamma != 0: tau = gamma / sigma L = ( -n * log(sigma) - (1 + (1 / gamma)) * (np.log(1 + tau * Y)).sum() ) else: L = n * (1 + log(abs(Y.mean()) + 1e-8)) return L def _quantile(self, side, gamma, sigma): if side == "up": r = self.window_len * self.prob / self.num_threshold[side] # r = 1000 * self.prob if gamma != 0: return self.init_threshold["up"] + (sigma / gamma) * ( pow(r, -gamma) - 1 ) else: return self.init_threshold["up"] - sigma * log(r) elif side == "down": r = self.window_len * self.prob / self.num_threshold[side] # r = 1000 * self.prob if gamma != 0: return self.init_threshold["down"] - (sigma / gamma) * ( pow(r, -gamma) - 1 ) else: return self.init_threshold["down"] + sigma * log(r) else: raise ValueError("The side is not right") def _init_drift(self, verbose=False): for side in ["up", "down"]: self._update_one_side(side) return self def _update_one_side(self, side: str): if side == "up": candidates = ( list(self.window) + self.history_peaks[side] if self.global_memory else list(self.window) ) self.history_peaks[side] = heapq.nlargest( self.num_threshold[side], candidates, ) self.init_threshold[side] = self.history_peaks[side][-1] self.peaks[side] = np.array(self.history_peaks[side]) - np.array( self.init_threshold[side] ) elif side == "down": candidates = ( list(self.window) + self.history_peaks[side] if self.global_memory else list(self.window) ) self.history_peaks[side] = heapq.nsmallest( self.num_threshold[side], candidates, ) self.init_threshold[side] = self.history_peaks[side][-1] self.peaks[side] = np.array(self.init_threshold[side]) - np.array( self.history_peaks[side] ) # remove the largest incase the first anomaly change the threshold # self.peaks[side] = self.peaks[side][1:] gamma, sigma, _ = self._grimshaw(side) self.extreme_quantile[side] = self._quantile(side, gamma, sigma) self.gamma[side] = gamma self.sigma[side] = sigma def _cal_back_mean(self, X): back_mean = ( np.mean(self.back_mean_window) if self.back_mean_window.maxlen > 0 else np.array(0.0) ) return X - back_mean def fit(self, X: np.ndarray, timestamp: int = None): X = float(X[0]) self.back_mean_window.append(X) if self.index >= self.back_mean_len: self.normal_X = self._cal_back_mean(X) self.window.append(self.normal_X) if self.index == self.window_len: self._init_drift() if self.index >= self.window_len: last_X = ( self.window[-2] if self.back_mean_len == 0 else (X - self.window[-1]) ) if ( abs( np.divide( X - last_X, last_X, np.array(X), where=last_X != 0 ) ) < self.deviance_ratio ): return self if self.normal_X > self.init_threshold["up"]: self._update_one_side("up") elif self.normal_X < self.init_threshold["down"]: self._update_one_side("down") return self def score(self, X: np.ndarray, timestamp: int = None): X = float(X[0]) # if self.score_first: # last_X = self._cal_back_mean(X) # else: last_X = ( self.window[-2] if self.back_mean_len == 0 else (X - self.window[-1]) ) if ( abs(np.divide(X - last_X, last_X, np.array(X), where=last_X != 0)) < self.deviance_ratio ): score = 0.0 elif ( self.normal_X > self.extreme_quantile["up"] or self.normal_X < self.extreme_quantile["down"] ): score = 1.0 elif self.normal_X > self.init_threshold["up"]: side = "up" score = np.divide( self.normal_X - self.init_threshold[side], (self.extreme_quantile[side] - self.init_threshold[side]), np.array(0.5), where=( self.extreme_quantile[side] - self.init_threshold[side] != 0 ), ) elif self.normal_X < self.init_threshold["down"]: side = "down" score = np.divide( self.init_threshold[side] - self.normal_X, (self.init_threshold[side] - self.extreme_quantile[side]), np.array(0.5), where=( self.init_threshold[side] - self.extreme_quantile[side] != 0 ), ) else: score = 0.0 # self.thup.append(self.extreme_quantile["up"] + hist_mean) # self.thdown.append(self.extreme_quantile["down"] + hist_mean) return float(score)