haniwers.v1.preprocess.aggregator#
Aggregation layer for time-series statistics and resampling.
This module handles converting processed events into time-windowed statistics by:
Resampling to fixed time intervals (600s default)
Computing mean and standard deviation for numeric columns
Calculating event rates per time window
Design (Principle IX - SRP):
Single responsibility: Aggregate processed data into statistics
Pure functions, no side effects
Each function adds one logical aggregation step
References:
FR-005: Resample time-series data at variable intervals
FR-006: Compute aggregated metrics (mean, std, rates)
ADR-012: Pure functions, easy to unit test
Module Contents#
Functions#
Group events into time windows and aggregate. |
|
Calculate mean and std for numeric columns per time window. |
|
Calculate event rates (events per second) per time window. |
API#
- haniwers.v1.preprocess.aggregator.resample_by_interval(df: pandas.DataFrame, interval_seconds: int = 600) pandas.DataFrame#
Group events into time windows and aggregate.
Resamples a processed event DataFrame into fixed-size time windows, grouping by both time interval and hit_type for statistics aggregation.
Args: df: DataFrame with ‘datetime’ column (timezone-aware datetime) interval_seconds: Window size in seconds (default 600s = 10 minutes)
Returns: DataFrame with one row per interval per hit_type, containing counts
Example: >>> import pandas as pd >>> df = pd.DataFrame({ … “datetime”: pd.date_range(“2024-06-11 12:00:00”, periods=3600, freq=“1s”), … “hit_type”: [0, 1, 6] * 1200 … }) >>> resampled = resample_by_interval(df, interval_seconds=600) >>> resampled.shape (4, 3) # 4 windows x 3 hit types
- haniwers.v1.preprocess.aggregator.compute_statistics(df: pandas.DataFrame, processed_df: pandas.DataFrame, interval_seconds: int = 600) pandas.DataFrame#
Calculate mean and std for numeric columns per time window.
Computes aggregated statistics (mean, std) for environmental columns within each time window, grouped by hit_type.
Args: df: Aggregated DataFrame from resample_by_interval() processed_df: Original processed DataFrame (for numeric columns) interval_seconds: Window size for binning (for reference)
Returns: DataFrame with adc_mean, adc_std, tmp_mean, tmp_std, atm_mean, atm_std
Example: >>> import pandas as pd >>> processed_df = pd.DataFrame({ … “datetime”: pd.date_range(“2024-06-11 12:00:00”, periods=100, freq=“1s”), … “hit_type”: [0, 1, 6] * 33 + [0], … “adc”: range(100), … “tmp”: [25.0 + i * 0.1 for i in range(100)], … “atm”: [1013.25] * 100 … }) >>> resampled = resample_by_interval(processed_df, 60) >>> stats = compute_statistics(resampled, processed_df, 60) >>> “adc_mean” in stats.columns True
- haniwers.v1.preprocess.aggregator.compute_event_rates(df: pandas.DataFrame, interval_seconds: int = 600) pandas.DataFrame#
Calculate event rates (events per second) per time window.
Computes overall and per-layer event rates by dividing event counts by the time interval. Rates represent frequency of events in Hz (events/sec).
Args: df: Aggregated DataFrame with ‘events’ column interval_seconds: Window size for rate calculation
Returns: DataFrame with event_rate, event_rate_top, event_rate_mid, event_rate_btm
Formulas: event_rate = events / interval_seconds event_rate_top = sum(hit_top) / interval_seconds (for hit_type 4,5,6,7) event_rate_mid = sum(hit_mid) / interval_seconds (for hit_type 2,3,6,7) event_rate_btm = sum(hit_btm) / interval_seconds (for hit_type 1,3,5,7)
Example: >>> import pandas as pd >>> df = pd.DataFrame({ … “time”: pd.date_range(“2024-06-11”, periods=2, freq=“10min”), … “hit_type”: [0, 6], … “events”: [60, 120] … }) >>> rates = compute_event_rates(df, 600) >>> rates[“event_rate”].tolist() [0.1, 0.2]