データ取得(サンプリング)ガイド#

このドキュメントでは、haniwers-v1 のデータ取得・サンプリング機能の仕組み、設計パターン、および開発者が理解すべき重要な概念について説明します。

このドキュメントの対象者: Sampler、Device、RawEvent クラスを理解したい開発者

概要:データ取得の流れ#

データ取得(DAQ: Data Acquisition)は、OSECHI 検出器からのイベントデータをリアルタイムで収集し、CSV ファイルに保存するプロセスです。

OSECHI Detector (Serial Port)
    ↓ (data lines)
Device.readline()
    ↓ (raw string)
RawEvent.from_serial()
    ↓ (validated RawEvent or None)
Sampler.stream_events()
    ↓ (generator: yields RawEvent)
Sampler.acquire_by_count() or acquire_by_time()
    ↓ (collection strategy)
CSV Output File

コアコンポーネント#

1. Deviceクラス: シリアル通信の抽象化#

ファイル: src/haniwers/v1/daq/device.py

責務#

  • OSECHI 検出器とのシリアル通信管理

  • 接続・切断の制御

  • 低レベルの行読み込み

重要なメソッド#

class Device:
    def connect(self) -> None:
        """検出器への接続を確立する"""

    def disconnect(self) -> None:
        """接続を切断する"""

    def readline(self) -> str:
        """1 行のデータを読み込む(検出器から)"""

    def write(self, data: str) -> None:
        """検出器にコマンドを送信する"""

使用例#

from haniwers.v1.daq.device import Device
from haniwers.v1.config.model import DeviceConfig

# デバイス設定
cfg = DeviceConfig(
    port="/dev/ttyUSB0",
    baudrate=115200,
    timeout=1.0,
    label="detector-001"
)

# デバイスの接続
device = Device(cfg)
device.connect()

# データ読み込み
while True:
    line = device.readline()
    if not line:
        break
    print(f"Received: {line}")

# 接続を切断
device.disconnect()

2. RawEvent: 検出器データのモデル化#

ファイル: src/haniwers/v1/daq/model.py

責務#

  • 検出器の生データを Python オブジェクトに変換

  • データの型検証と値の妥当性確認

  • タイムスタンプ付与

重要な方法#

@dataclass
class RawEvent:
    """1 つの宇宙線イベント"""
    timestamp: pendulum.DateTime
    ch1: int  # チャネル 1 のカウント
    ch2: int  # チャネル 2 のカウント
    ch3: int  # チャネル 3 のカウント
    coinc: int  # 同時計測
    temp_out: float  # 外部温度
    temp_in: float  # 内部温度

    @classmethod
    def from_serial(cls, line: str, time: pendulum.DateTime) -> "RawEvent | None":
        """
        検出器からの 1 行を RawEvent に変換

        Args:
            line: 検出器からの生データ文字列(スペース区切り、7 フィールド)
            time: タイムスタンプ

        Returns:
            RawEvent オブジェクト、または無効な行の場合は None

        Example:
            >>> line = "100 150 120 1 0 25.5 24.8"
            >>> event = RawEvent.from_serial(line, pendulum.now())
            >>> print(event.ch1)
            100
        """

重要な設計: 検証(バリデーション)の緩和#

実際の OSECHI 検出器は、通信ノイズや一時的な障害により、ときどき空の行または不正な形式のデータを送信します。 この堅牢性を確保するため、RawEvent.from_serial() は例外を発生させるのではなく、無効なデータに対して None を返します:

  • 空の行 → None を返す(例外なし)

  • フィールド数が 7 でない → None を返す

  • 数値に変換できないフィールド → None を返す

このアプローチにより、DAQ セッションは一時的な通信ノイズで停止することなく、有効なデータのみを CSV に保存します。

使用例#

import pendulum
from haniwers.v1.daq.model import RawEvent

# 有効なデータ
line = "100 150 120 1 0 25.5 24.8"
event = RawEvent.from_serial(line, pendulum.now())
print(event)  # RawEvent(timestamp=..., ch1=100, ch2=150, ...)

# 無効なデータ(フィールド数が足りない)
empty_line = ""
event = RawEvent.from_serial(empty_line, pendulum.now())
print(event)  # None

# 無効なデータ(数値に変換できない)
bad_line = "abc 150 120 1 0 25.5 24.8"
event = RawEvent.from_serial(bad_line, pendulum.now())
print(event)  # None

3. SamplerConfig: サンプリング設定モデル#

ファイル: src/haniwers/v1/config/model.py

責務#

  • データ取得の全設定をまとめたモデル

  • 2つのモード(count_basedtime_based)をサポート

  • Pydantic による自動バリデーション

主要フィールド#

class SamplerConfig(BaseModel):
    """統合されたサンプリング設定"""

    # ファイル設定
    label: str  # 識別ラベル
    workspace: str  # 出力ディレクトリ
    filename_prefix: str  # ファイル名接頭辞
    filename_suffix: str  # ファイル名拡張子

    # 取得設定
    mode: str = "count_based"  # "count_based" または "time_based"
    events_per_file: int  # 1 ファイル当たりのイベント数
    number_of_files: int  # ファイル数
    duration: Optional[float] = None  # time_based のときの取得秒数

    # 高度な設定
    stream_mode: bool = False  # リアルタイム書き込みモード
    suppress_threshold: int = 1000  # 抑制閾値
    max_retry: int = 3  # 最大リトライ回数

モードの意味#

モード

用途

パラメータ

count_based

固定イベント数の取得

events_per_file, number_of_files

1000 イベント × 5 ファイル

time_based

固定時間の取得

duration

30 秒間取得

4. Sampler: データ取得エンジン#

ファイル: src/haniwers/v1/daq/sampler.py

責務#

  • Device からの行を読み込む

  • RawEvent に変換

  • 取得モード(count_based / time_based)に応じた集約

  • CSV ファイルへの出力

  • データ品質の監視(警告ログ)

重要なメソッド#

class Sampler:
    def __init__(
        self,
        device: Device,
        config: SamplerConfig,
        output_dir: Optional[str | Path] = None,
        show_progress: bool = True
    ):
        """
        Args:
            device: 接続済みの Device インスタンス
            config: SamplerConfig 設定
            output_dir: 出力ディレクトリ(指定時は config.workspace を上書き)
            show_progress: プログレスバー表示の有無
        """

    def run(self, files: int = 1) -> None:
        """
        データ取得を実行

        Args:
            files: 取得するファイル数
        """

    def stream_events(self, iterator) -> Iterator[RawEvent]:
        """
        Device からのイベントをストリーミング

        Returns:
            RawEvent ジェネレータ
            無効なデータはフィルタリングされ、警告ログが出力される
        """

    def acquire_by_count(self) -> None:
        """イベント数ベースの取得(count_based モード)"""

    def acquire_by_time(self) -> None:
        """時間ベースの取得(time_based モード)"""

重要な設計: ジェネレータパターンとストリーミング#

stream_events() はジェネレータで、検出器からのイベントをリアルタイムで処理します。

def stream_events(self, iterator: Iterator) -> Iterator[RawEvent]:
    """Generator that yields measurements one at a time as they arrive."""
    invalid_count = 0
    for _ in iterator:
        event = self.read_event()
        if event is not None:
            yield event
        else:
            invalid_count += 1
            self.logger.warning(
                f"Skipped invalid/empty detector line (total skipped: {invalid_count})"
            )

この設計により:

  • メモリ効率が良い(全データを保持しない)

  • リアルタイム処理が可能(1 イベントずつ処理)

  • 無効なデータが自動的にフィルタリングされる

  • ユーザーはデータ品質問題をリアルタイムで観察できる

使用例#

from haniwers.v1.daq.device import Device
from haniwers.v1.daq.sampler import Sampler
from haniwers.v1.config.model import DeviceConfig, SamplerConfig

# デバイス設定
device_cfg = DeviceConfig(
    port="/dev/ttyUSB0",
    baudrate=115200,
    timeout=1.0
)

# サンプラー設定(count_based)
sampler_cfg = SamplerConfig(
    label="run001",
    workspace="./data",
    filename_prefix="cosmic_rays",
    filename_suffix=".csv",
    mode="count_based",
    events_per_file=1000,
    number_of_files=5,
)

# 接続してサンプリング
device = Device(device_cfg)
device.connect()

sampler = Sampler(device, sampler_cfg)
sampler.run(files=5)

device.disconnect()

データ品質と堅牢性#

検証の段階的な緩和(Graceful Degradation)#

Haniwers は 3 段階の検証を実施します:

  1. 厳密検証(Pydantic): 設定ファイル読み込み時に型と値を検証

    • これはユーザーエラーを早期に検出

  2. 緩いデータ検証(RawEvent.from_serial(): 検出器データ解析時に形式を確認

    • 無効なデータを None で返す(例外なし)

    • DAQ セッションの継続を優先

  3. 警告ログ(stream_events(): データ品質の監視

    • 無効なラインが発見されるたびに WARNING ログ

    • ユーザーは問題をリアルタイムで検知可能

実行時のエラー処理#

パターン 1: 通常のエラー(接続失敗など)#

try:
    device = Device(cfg)
    device.connect()  # ポートが見つからない → 例外
except Exception as e:
    typer.echo(f"[ERROR] Connection failed: {e}", err=True)
    raise typer.Exit(code=1)

パターン 2: データ品質問題(検出器ノイズ)#

# これは例外を発生させず、ログ警告を出すだけ
event = RawEvent.from_serial("", pendulum.now())  # None を返す
# stream_events() が自動的に警告ログを出力

CLI との統合#

daq コマンドの全体フロー#

ファイル: src/haniwers/v1/cli/daq.py

  1. 設定ファイル読み込み(TOML または デフォルト)

  2. CLI オプションで上書き

  3. 設定の検証

  4. Device.connect()

  5. タイムスタンプ付きディレクトリ作成

  6. Sampler.run()

  7. Device.disconnect()

CLI 例#

# 設定ファイルから取得
haniwers-v1 daq --config daq.toml

# CLI オプションで上書き
haniwers-v1 daq --config daq.toml --workspace ./output --events-per-file 5000

# CLI オプションのみ
haniwers-v1 daq --port /dev/ttyUSB0 --workspace ./data --events-per-file 1000

モード推論#

明示的な --mode フラグまたは --duration パラメータでモードが決定されます。

# count_based(デフォルト)
haniwers-v1 daq --config config.toml

# time_based(明示的)
haniwers-v1 daq --config config.toml --mode time_based --duration 60

# time_based(暗黙的)- --duration があるとモードは自動的に time_based に
haniwers-v1 daq --config config.toml --duration 30

一般的な開発タスク#

タスク 1: 新しいイベント属性を追加する#

例: センサー湿度を追跡したい#

  1. RawEvent を更新 (src/haniwers/v1/daq/model.py)

@dataclass
class RawEvent:
    timestamp: pendulum.DateTime
    ch1: int
    ch2: int
    ch3: int
    coinc: int
    temp_out: float
    temp_in: float
    humidity: float  # 新属性
  1. パーサーを更新 (from_serial メソッド)

@classmethod
def from_serial(cls, line: str, time: pendulum.DateTime) -> "RawEvent | None":
    values = line.strip().split()
    if len(values) != 8:  # 8 フィールドに変更
        return None
    try:
        return cls(
            timestamp=time,
            ch1=int(values[0]),
            ch2=int(values[1]),
            ch3=int(values[2]),
            coinc=int(values[3]),
            temp_out=float(values[4]),
            temp_in=float(values[5]),
            humidity=float(values[6]),
        )
    except (ValueError, TypeError):
        return None
  1. CSV 出力を更新 (Sampler クラス)

def _write_csv_header(self, csvfile):
    fieldnames = [
        "timestamp", "ch1", "ch2", "ch3", "coinc",
        "temp_out", "temp_in", "humidity"
    ]
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
  1. テストを追加

def test_raw_event_with_humidity():
    """Test parsing event with humidity field"""
    line = "100 150 120 1 25.5 24.8 65.2"
    event = RawEvent.from_serial(line, pendulum.now())
    assert event is not None
    assert event.humidity == 65.2

タスク 2: 新しい取得モードを追加する#

: イベントレート制御モード(秒あたり N イベント)#

  1. SamplerConfig に新モードを追加

class SamplerConfig(BaseModel):
    mode: str = "count_based"  # "rate_based" を追加可能
    event_rate: Optional[float] = None  # イベント/秒
  1. Sampler に新メソッドを追加

class Sampler:
    def acquire_by_rate(self) -> None:
        """イベントレート制御での取得"""
        pass

    def run(self, files: int = 1) -> None:
        if self.config.mode == "rate_based":
            self.acquire_by_rate()
        # ... 既存のコードと同じ
  1. ルーティングロジックを更新

# CLI で自動ルーティング
if config.mode == "rate_based":
    sampler = Sampler(device, config)
    sampler.run()

トラブルシューティング#

問題 1: DAQ が開始直後に停止する#

原因: 検出器通信エラー

デバッグ:

# ポート確認
haniwers-v1 port list
haniwers-v1 port test /dev/ttyUSB0

# ログレベルを DEBUG に
LOG_LEVEL=DEBUG haniwers-v1 daq --config config.toml

問題 2: CSV ファイルが空である#

原因: 全イベントが無効な形式

チェック:

  1. 警告ログを確認:Skipped invalid/empty detector line の出力

  2. 検出器のデータ形式が期待値と一致しているか確認

  3. 検出器が実際にデータを送信しているか確認:haniwers-v1 port test /dev/ttyUSB0

問題 3: 一部のイベントが欠落している#

原因: シリアル通信タイムアウト

解決方法:

# SamplerConfig で timeout を増やす
sampler_cfg = SamplerConfig(
    # ...
    max_retry=5  # リトライ回数を増やす
)

まとめ#

  • Device: シリアル通信の低レベル管理

  • RawEvent: 検出器データのパースと検証

  • SamplerConfig: 統合された設定モデル

  • Sampler: データ取得戦略の実装

  • Robustness: 無効なデータをフィルタリングしながら継続

これらの層を理解することで、DAQ システムの拡張やデバッグが効率的に行えます。