Source code for streamad.process.zscore_calibrator
import numpy as np
from streamad.util import StreamStatistic
[docs]class ZScoreCalibrator:
[docs] def __init__(
self,
sigma: int = 3,
extreme_sigma: int = 5,
is_global: bool = True,
window_len: int = 100,
) -> None:
"""A calibrator which can filter out outliers using z-score, and normalize the anomaly scores into [0,1].
Args:
sigma (int, optional): Zscore threshold, we regard the scores out of sigma as potential anomalies. Defaults to 2.
extreme_sigma (int, optional): Zscore threshold for extreme values, we regard the scores out of extreme_sigma as extreme anomalies. Defaults to 3.
is_global (bool, optional): Method to record, a global way or a rolling window way. Defaults to True.
window_len (int, optional): The length of rolling window, ignore this when `is_global=True`. Defaults to 100.
"""
self.sigma = sigma
self.extreme_sigma = extreme_sigma
self.init_data = []
self.init_flag = False
self.score_stats = StreamStatistic(
is_global=is_global, window_len=window_len
)
def normalize(self, score: float) -> float:
if score is None:
return None
self.score_stats.update(score)
if (
self.score_stats._window.maxlen != len(self.score_stats._window)
and self.score_stats._window.maxlen >= self.score_stats._num_items
):
return None
score_mean = self.score_stats.get_mean()
score_std = self.score_stats.get_std()
sigma = np.divide(
(score - score_mean),
score_std,
out=np.array((score - score_mean) / 1e-5),
where=score_std != 0,
)
sigma = abs(sigma)
if sigma > self.extreme_sigma:
return 1.0
elif sigma > self.sigma:
score_max = self.score_stats.get_max()
score_min = self.score_stats.get_min()
score = np.divide(
(score - score_min),
(score_max - score_min),
out=min(np.array((score - score_min) / 1e-5), np.array(1.0)),
where=score_max != score_min,
)
score = abs(score)
else:
return 0.0
return score