Source code for streamad.model.spot_Detector

from streamad.base import BaseDetector
import numpy as np
from math import log
from scipy.optimize import minimize

np.seterr(divide="ignore", invalid="ignore")


[docs]class SpotDetector(BaseDetector):
[docs] def __init__(self, window_len: int = 200, prob: float = 1e-4): """Univariate Spot model :cite:`DBLP:conf/kdd/SifferFTL17`. Args: window_len (int, optional): Length of the window for reference. Defaults to 200. prob (float, optional): Threshold for probability, a small float value. Defaults to 1e-4. """ super().__init__() self.data_type = "univariate" self.prob = prob self.init_data = [] self.window_len = window_len self._window_len = max(int(window_len / 100), 20) self.init_length = window_len - self._window_len assert ( self.init_length > 0 ), "window_len is too small, default value is 200" self.num_threshold = {"up": 0, "down": 0} nonedict = {"up": None, "down": None} self.extreme_quantile = dict.copy(nonedict) self.init_threshold = dict.copy(nonedict) self.peaks = dict.copy(nonedict) self.gamma = dict.copy(nonedict) self.sigma = dict.copy(nonedict)
# self.thup = [] # self.thdown = [] def _grimshaw(self, side, epsilon=1e-8, n_points=10): def u(s): return 1 + np.log(s).mean() def v(s): return np.mean(1 / s) def w(Y, t): s = 1 + t * Y us = u(s) vs = v(s) return us * vs - 1 def jac_w(Y, t): s = 1 + t * Y us = u(s) vs = v(s) jac_us = np.divide(1, t, out=np.zeros_like(a), where=t != 0) * ( 1 - vs ) jac_vs = np.divide(1, t, out=np.zeros_like(a), where=t != 0) * ( -vs + np.mean(1 / s**2) ) return us * jac_vs + vs * jac_us Ym = self.peaks[side].min() YM = self.peaks[side].max() Ymean = self.peaks[side].mean() a = np.divide(-1, YM, out=np.array(-epsilon), where=YM != 0) if abs(a) < 2 * epsilon: epsilon = abs(a) / n_points # a = a + epsilon b = 2 * np.divide( (Ymean - Ym), (Ymean * Ym), out=np.array(np.zeros_like(Ymean * Ym) - epsilon), where=(Ymean * Ym) != 0, ) c = 2 * np.divide( Ymean - Ym, Ym**2, out=np.array(np.zeros_like(Ym) + epsilon), where=Ym != 0, ) d = a + epsilon e = -epsilon left_zeros = self._rootsFinder( lambda t: w(self.peaks[side], t), lambda t: jac_w(self.peaks[side], t), (d, e), n_points, "regular", ) right_zeros = self._rootsFinder( lambda t: w(self.peaks[side], t), lambda t: jac_w(self.peaks[side], t), (b, c), n_points, "regular", ) # all the possible roots zeros = np.concatenate((left_zeros, right_zeros)) # 0 is always a solution so we initialize with it gamma_best = 0 sigma_best = Ymean ll_best = self._log_likelihood(self.peaks[side], gamma_best, sigma_best) # we look for better candidates for z in zeros: gamma = u(1 + z * self.peaks[side]) - 1 sigma = np.divide(gamma, z, out=np.zeros_like(gamma), where=z != 0) ll = self._log_likelihood(self.peaks[side], gamma, sigma) if ll > ll_best: gamma_best = gamma sigma_best = sigma ll_best = ll return gamma_best, sigma_best, ll_best def _rootsFinder(self, fun, jac, bounds, npoints, method): """ Find possible roots of a scalar function Parameters ---------- fun : function scalar function jac : function first order derivative of the function bounds : tuple (min,max) interval for the roots search npoints : int maximum number of roots to output method : str 'regular' : regular sample of the search interval, 'random' : uniform (distribution) sample of the search interval Returns ---------- numpy.array possible roots of the function """ if method == "regular": step = (bounds[1] - bounds[0]) / (npoints + 1) try: X0 = np.arange(bounds[0] + step, bounds[1], step) except: X0 = np.random.uniform(bounds[0], bounds[1], npoints) elif method == "random": X0 = np.random.uniform(bounds[0], bounds[1], npoints) def objFun(X, f, jac): g = 0 j = np.zeros(X.shape) i = 0 for x in X: fx = f(x) g = g + fx**2 j[i] = 2 * fx * jac(x) i = i + 1 return g, j opt = minimize( lambda X: objFun(X, fun, jac), X0, method="L-BFGS-B", jac=True, bounds=[bounds] * len(X0), ) X = opt.x np.round(X, decimals=5) return np.unique(X) def _log_likelihood(self, Y, gamma, sigma): """ Compute the log-likelihood for the Generalized Pareto Distribution (μ=0) Parameters ---------- Y : numpy.array observations gamma : float GPD index parameter sigma : float GPD scale parameter (>0) Returns ---------- float log-likelihood of the sample Y to be drawn from a GPD(γ,σ,μ=0) """ n = Y.size if gamma != 0: tau = gamma / sigma L = ( -n * log(sigma) - (1 + (1 / gamma)) * (np.log(1 + tau * Y)).sum() ) else: L = n * (1 + log(abs(Y.mean()) + 1e-8)) return L def _quantile(self, side, gamma, sigma): if side == "up": r = ( (self.init_length - self._window_len) * self.prob / self.num_threshold[side] ) if gamma != 0: return self.init_threshold["up"] + (sigma / gamma) * ( pow(r, -gamma) - 1 ) else: return self.init_threshold["up"] - sigma * log(r) elif side == "down": r = ( (self.init_length - self._window_len) * self.prob / self.num_threshold[side] ) if gamma != 0: return self.init_threshold["down"] - (sigma / gamma) * ( pow(r, -gamma) - 1 ) else: return self.init_threshold["down"] + sigma * log(r) else: raise ValueError("The side is not right") def _back_mean(self): M = [] w = sum(self.init_data[: self._window_len]) M.append(w / self._window_len) for i in range(self._window_len, self.index + 1): w = w - self.init_data[i - self._window_len] + self.init_data[i] M.append(w / self._window_len) return np.array(M) def _init_drift(self, X: np.ndarray, verbose=False): n_init = self.init_length M = self._back_mean() T = self.init_data[self._window_len :] - M[:-1] S = np.sort(T.tolist()) self.init_threshold["up"] = S[int(0.98 * n_init)] self.init_threshold["down"] = S[int(0.02 * n_init)] self.peaks["up"] = ( T[T >= self.init_threshold["up"]] - self.init_threshold["up"] ) self.peaks["down"] = ( self.init_threshold["down"] - T[T <= self.init_threshold["down"]] ) self.num_threshold["up"] = self.peaks["up"].size self.num_threshold["down"] = self.peaks["down"].size for side in ["up", "down"]: gamma, sigma, _ = self._grimshaw(side) self.extreme_quantile[side] = self._quantile(side, gamma, sigma) self.gamma[side] = gamma self.sigma[side] = sigma return self def _update_one_side(self, side: str, X: float): self.peaks[side] = np.append( self.peaks[side], abs(X - self.init_threshold[side]) ) self.num_threshold[side] += 1 gamma, sigma, _ = self._grimshaw(side) self.extreme_quantile[side] = self._quantile( side, gamma=gamma, sigma=sigma ) def fit(self, X: np.ndarray): self.init_data.append(float(X)) if self.index == self.window_len - 1: self._init_drift(X) if self.index > self.window_len - 1: hist_mean = np.mean(self.init_data[-self._window_len :]) normal_X = float(X) - hist_mean if normal_X > self.init_threshold["up"]: self._update_one_side("up", normal_X) elif normal_X < self.init_threshold["down"]: self._update_one_side("down", normal_X) self.init_data = self.init_data[-self._window_len :] return self def score(self, X: np.ndarray) -> float: hist_mean = np.mean(self.init_data[-self._window_len :]) normal_X = float(X) - hist_mean if ( normal_X > self.extreme_quantile["up"] or normal_X < self.extreme_quantile["down"] ): score = 1.0 elif normal_X > self.init_threshold["up"]: side = "up" score = abs( float(self.init_threshold[side] - normal_X) / (self.extreme_quantile[side] - self.init_threshold[side]) ) elif normal_X < self.init_threshold["down"]: side = "down" score = abs( float(self.init_threshold[side] - normal_X) / (self.extreme_quantile[side] - self.init_threshold[side]) ) else: score = 0.0 # self.thup.append(self.extreme_quantile["up"] + hist_mean) # self.thdown.append(self.extreme_quantile["down"] + hist_mean) return float(score)