Source code for streamad.process.zscore_Thresholder

from typing import Type

import numpy as np
from streamad.base import BaseDetector
from streamad.util import StreamStatistic


[docs]class ZScoreThresholder:
[docs] def __init__( self, sigma: int = 3, is_global: bool = True, window_len: int = 100, ) -> None: """A thresholder which can filter out outliers using z-score, and normalize the anomaly scores into [0,1]. Args: sigma (int, optional): Zscore threshold, we regard the scores out of sigma as anomalies. Defaults to 3. is_global (bool, optional): Method to record, a global way or a rolling window way. Defaults to True. window_len (int, optional): The length of rolling window, ignore this when `is_global=True`. Defaults to 100. """ self.sigma = sigma self.init_data = [] self.init_flag = False self.score_stats = StreamStatistic( is_global=is_global, window_len=window_len )
def normalize(self, score: float) -> float: if not score: return None self.score_stats.update(score) score_mean = self.score_stats.get_mean() score_std = self.score_stats.get_std() sigma = np.divide( (score - score_mean), score_std, out=np.zeros_like(score), where=score_std != 0, ) if sigma >= self.sigma: score_max = self.score_stats.get_max() score = (score - score_mean) / (score_max - score_mean) elif sigma <= -self.sigma: score_min = self.score_stats.get_min() score = (score - score_mean) / (score_min - score_mean) else: return 0 return score