Source code for streamad.model.SR_Detector

import numpy as np
from streamad.base import BaseDetector
from collections import deque
from copy import deepcopy

EPS = 1e-8


[docs]class SRDetector(BaseDetector):
[docs] def __init__( self, extend_len: int = 5, ahead_len: int = 10, mag_num: int = 5, **kwargs ): """Spectral Residual Detector :cite:`DBLP:conf/kdd/RenXWYHKXYTZ19`. Args: window_len (int, optional): Length of sliding window. Defaults to 50. extend_len (int, optional): Length to be extended, for FFT transforme. Defaults to 5. ahead_len (int, optional): Length to look ahead for references. Defaults to 10. mag_num (int, optional): Number of FFT magnitude. Defaults to 5. """ super().__init__(data_type="univariate", **kwargs) self.extend_len = extend_len assert ahead_len > 1, "ahead_len must be greater than 1" self.ahead_len = ahead_len self.mag_num = mag_num
def fit(self, X: np.ndarray, timestamp: int = None): self.window.append(X[0]) return self def score(self, X: np.ndarray, timestamp: int = None) -> float: window = deepcopy(self.window) window.pop() window.append(X[0]) extended_window = self._extend_window(window) mags = self._sr_transform(extended_window) anomaly_scores = self._spectral_score(mags) return anomaly_scores[-1 - self.extend_len] def _spectral_score(self, mags): avg_mag = self._average_filter(mags, n=self.mag_num * 10) safeDivisors = np.clip(avg_mag, EPS, avg_mag.max()) raw_scores = np.divide( np.abs(mags - avg_mag), safeDivisors, out=np.zeros_like(mags), where=safeDivisors != 0, ) scores = np.clip(raw_scores / 10.0, 0, 1.0) return scores def _sr_transform(self, window): trans = np.fft.fft(window) mag = np.sqrt(trans.real**2 + trans.imag**2) eps_index = np.where(mag <= EPS)[0] mag[eps_index] = EPS mag_log = np.log(mag) mag_log[eps_index] = 0 spectral = np.exp( mag_log - self._average_filter(mag_log, n=self.mag_num) ) trans.real = trans.real * spectral / mag trans.imag = trans.imag * spectral / mag trans.real[eps_index] = 0 trans.imag[eps_index] = 0 wave_r = np.fft.ifft(trans) mag = np.sqrt(wave_r.real**2, wave_r.imag**2) return mag def _average_filter(self, values, n=3): if n >= len(values): n = len(values) res = np.cumsum(values, dtype=float) res[n:] = res[n:] - res[:-n] res[n:] = res[n:] / n for i in range(1, n): res[i] /= i + 1 return res def _extend_window(self, window): predicted_window = [ self._predict_next(list(window)[-self.ahead_len : -1]) ] * self.extend_len extended_window = np.concatenate((window, predicted_window), axis=0) return extended_window def _predict_next(self, ahead_window): assert ( len(ahead_window) > 1 ), "ahead window must have at least 2 elements" ele_last = ahead_window[-1] n = len(ahead_window) slopes = [ (ele_last - ele) / (n - 1 - i) for i, ele in enumerate(ahead_window[:-1]) ] return ahead_window[1] + sum(slopes)