Source code for streamad.process.tdigest_calibrator

from tdigest import TDigest
from collections import deque


[docs]class TDigestCalibrator:
[docs] def __init__( self, percentile_up: float = 95, percentile_down: float = 5, is_global: bool = True, window_len: int = 100, ) -> None: """A calibrator which can filter out outliers using t-digest, and normalize the anomaly scores into [0,1] :cite:`DBLP:journals/simpa/Dunning21`. Args: percentile_up (float, optional): We regard the scores above `percentile_up` as anomalies. Defaults to 95. percentile_down (float, optional): We regard the scores below `percentile_down` as anomalies. Defaults to 5. is_global (bool, optional): Method to record, a global way or a rolling window way. Defaults to True. window_len (int, optional): The length of rolling window, ignore this when `is_global=True`. Defaults to 100. """ self.percentile_up = percentile_up self.percentile_down = percentile_down self.init_data = [] self.init_flag = False assert ( percentile_up >= 0 and percentile_up <= 100 and percentile_down >= 0 and percentile_down <= 100 ), "percentile must be between 0 and 100" self.is_global = is_global self.score_stats = TDigest() self.score_deque = deque(maxlen=window_len)
def normalize(self, score: float) -> float: if not score: return None self.score_deque.append(score) if self.is_global: self.score_stats.update(score) else: self.score_stats = TDigest() self.score_stats.batch_update(self.score_deque) if self.score_deque.maxlen != len(self.score_deque): return None percentile_up = self.score_stats.percentile(self.percentile_up) percentile_down = self.score_stats.percentile(self.percentile_down) if score > percentile_up or score < percentile_down: score = 1.0 else: score = 0.0 return score