Source code for streamad.util.math_toolkit
import math
import numpy as np
from collections import deque, defaultdict
[docs]class StreamStatistic:
"""Data statistics for the streaming data, with supporting max, min, sum, mean, sum of squares, var, std and standard scaler."""
[docs] def __init__(self, is_global: bool = True, window_len: int = 10):
"""Statistics for the streaming data, with supporting max, min, sum, mean, sum of squares, var, std and standard scaler.
Args:
is_global (bool, optional): For whole stream or a windowed stream. Defaults to True.
window_len (int, optional): Rolloing window length. Only works when is_global is False. Defaults to 10.
"""
self._is_uni = False
self._is_global = is_global
self._window = deque(maxlen=window_len)
self._num_items = 0
self._max = defaultdict(lambda: -math.inf)
self._min = defaultdict(lambda: math.inf)
self._sum = defaultdict(float)
self._mean = defaultdict(float)
self._sum_squares = defaultdict(float)
self._var = defaultdict(float)
self._std = defaultdict(float)
[docs] def update(self, X: np.ndarray):
"""Update a pd.Series to stream
Args:
X (np.ndarray): An item from StreamGenerator
"""
self._num_items += 1
if isinstance(X, int) or isinstance(X, float):
X = np.array([X])
self._is_uni = True
elif isinstance(X, np.ndarray):
X = np.array([X]).flatten()
if len(X) == 1:
self._is_uni = True
else:
self._is_uni = False
else:
raise NotImplementedError("Only support int, float and np.ndarray")
if self._is_global:
tmp = defaultdict(float)
for index, item in enumerate(X):
self._max[index] = (
self._max[index] if self._max[index] > item else item
)
self._min[index] = (
self._min[index] if self._min[index] < item else item
)
self._sum[index] += X[index]
old_mean = self._mean[index]
tmp[index] = item - self._mean[index]
self._mean[index] = self._sum[index] / self._num_items
self._sum_squares[index] += (X[index] - old_mean) * (
X[index] - self._mean[index]
)
self._var[index] = self._sum_squares[index] / self._num_items
self._std[index] = math.sqrt(self._var[index])
else:
self._window.append(X)
[docs] def get_max(self):
"""
Get max statistic.
"""
if self._is_global:
result = [_ for _ in self._max.values()]
else:
result = np.max(self._window, axis=0)
return result[0] if self._is_uni else np.array(result)
[docs] def get_min(self):
"""
Get min statistic.
"""
if self._is_global:
result = [_ for _ in self._min.values()]
else:
result = np.min(self._window, axis=0)
return result[0] if self._is_uni else np.array(result)
[docs] def get_mean(self):
"""
Get mean statistic.
"""
if self._is_global:
result = [_ for _ in self._mean.values()]
else:
result = np.mean(self._window, axis=0)
return result[0] if self._is_uni else np.array(result)
[docs] def get_std(self):
"""
Get max statistic.
"""
if self._is_global:
result = [_ for _ in self._std.values()]
else:
result = np.std(self._window, axis=0)
return result[0] if self._is_uni else np.array(result)
[docs] def get_sum(self):
"""
Get sum statistic.
"""
if self._is_global:
result = [_ for _ in self._sum.values()]
else:
result = np.sum(self._window, axis=0)
return result[0] if self._is_uni else np.array(result)
[docs] def get_var(self):
"""
Get var statistic.
"""
if self._is_global:
result = [_ for _ in self._var.values()]
else:
result = np.var(self._window, axis=0)
return result[0] if self._is_uni else np.array(result)
class SDFT:
def __init__(self, window_len) -> None:
self.window_len = window_len
self.window = deque(maxlen=window_len)
self.coefficients = deque(maxlen=window_len)
def update(self, X: np.ndarray):
# def _get_coefficients(coeff, diff, i):
# self.coefficients[i] = (coeff + diff) * np.exp(
# 2j * np.pi * i / self.window_len
# )
# return
if len(self.window) < self.window_len - 1:
self.window.append(X)
elif len(self.window) == self.window_len - 1:
self.window.append(X)
self.coefficients.extend(np.fft.fft(self.window))
else:
diff = X - self.window[0]
for i, c in enumerate(self.coefficients):
self.coefficients[i] = (c + diff) * np.exp(
2j * np.pi * i / self.window_len
)
# This vectorize seems to be slower than the loop above
# vfunc = np.vectorize(_get_coefficients)
# vfunc(
# self.coefficients, diff, [i for i in range(self.window_len)]
# )
self.window.append(X)
return self