データ取得(サンプリング)ガイド#
このドキュメントでは、haniwers-v1 のデータ取得・サンプリング機能の仕組み、設計パターン、および開発者が理解すべき重要な概念について説明します。
このドキュメントの対象者: Sampler、Device、RawEvent クラスを理解したい開発者
概要:データ取得の流れ#
データ取得(DAQ: Data Acquisition)は、OSECHI 検出器からのイベントデータをリアルタイムで収集し、CSV ファイルに保存するプロセスです。
OSECHI Detector (Serial Port)
↓ (data lines)
Device.readline()
↓ (raw string)
RawEvent.from_serial()
↓ (validated RawEvent or None)
Sampler.stream_events()
↓ (generator: yields RawEvent)
Sampler.acquire_by_count() or acquire_by_time()
↓ (collection strategy)
CSV Output File
コアコンポーネント#
1. Deviceクラス: シリアル通信の抽象化#
ファイル: src/haniwers/v1/daq/device.py
責務#
OSECHI 検出器とのシリアル通信管理
接続・切断の制御
低レベルの行読み込み
重要なメソッド#
class Device:
def connect(self) -> None:
"""検出器への接続を確立する"""
def disconnect(self) -> None:
"""接続を切断する"""
def readline(self) -> str:
"""1 行のデータを読み込む(検出器から)"""
def write(self, data: str) -> None:
"""検出器にコマンドを送信する"""
使用例#
from haniwers.v1.daq.device import Device
from haniwers.v1.config.model import DeviceConfig
# デバイス設定
cfg = DeviceConfig(
port="/dev/ttyUSB0",
baudrate=115200,
timeout=1.0,
label="detector-001"
)
# デバイスの接続
device = Device(cfg)
device.connect()
# データ読み込み
while True:
line = device.readline()
if not line:
break
print(f"Received: {line}")
# 接続を切断
device.disconnect()
2. RawEvent: 検出器データのモデル化#
ファイル: src/haniwers/v1/daq/model.py
責務#
検出器の生データを Python オブジェクトに変換
データの型検証と値の妥当性確認
タイムスタンプ付与
重要な方法#
@dataclass
class RawEvent:
"""1 つの宇宙線イベント"""
timestamp: pendulum.DateTime
ch1: int # チャネル 1 のカウント
ch2: int # チャネル 2 のカウント
ch3: int # チャネル 3 のカウント
coinc: int # 同時計測
temp_out: float # 外部温度
temp_in: float # 内部温度
@classmethod
def from_serial(cls, line: str, time: pendulum.DateTime) -> "RawEvent | None":
"""
検出器からの 1 行を RawEvent に変換
Args:
line: 検出器からの生データ文字列(スペース区切り、7 フィールド)
time: タイムスタンプ
Returns:
RawEvent オブジェクト、または無効な行の場合は None
Example:
>>> line = "100 150 120 1 0 25.5 24.8"
>>> event = RawEvent.from_serial(line, pendulum.now())
>>> print(event.ch1)
100
"""
重要な設計: 検証(バリデーション)の緩和#
実際の OSECHI 検出器は、通信ノイズや一時的な障害により、ときどき空の行または不正な形式のデータを送信します。
この堅牢性を確保するため、RawEvent.from_serial() は例外を発生させるのではなく、無効なデータに対して None を返します:
空の行 →
Noneを返す(例外なし)フィールド数が
7でない →Noneを返す数値に変換できないフィールド →
Noneを返す
このアプローチにより、DAQ セッションは一時的な通信ノイズで停止することなく、有効なデータのみを CSV に保存します。
使用例#
import pendulum
from haniwers.v1.daq.model import RawEvent
# 有効なデータ
line = "100 150 120 1 0 25.5 24.8"
event = RawEvent.from_serial(line, pendulum.now())
print(event) # RawEvent(timestamp=..., ch1=100, ch2=150, ...)
# 無効なデータ(フィールド数が足りない)
empty_line = ""
event = RawEvent.from_serial(empty_line, pendulum.now())
print(event) # None
# 無効なデータ(数値に変換できない)
bad_line = "abc 150 120 1 0 25.5 24.8"
event = RawEvent.from_serial(bad_line, pendulum.now())
print(event) # None
3. SamplerConfig: サンプリング設定モデル#
ファイル: src/haniwers/v1/config/model.py
責務#
データ取得の全設定をまとめたモデル
2つのモード(
count_basedとtime_based)をサポートPydantic による自動バリデーション
主要フィールド#
class SamplerConfig(BaseModel):
"""統合されたサンプリング設定"""
# ファイル設定
label: str # 識別ラベル
workspace: str # 出力ディレクトリ
filename_prefix: str # ファイル名接頭辞
filename_suffix: str # ファイル名拡張子
# 取得設定
mode: str = "count_based" # "count_based" または "time_based"
events_per_file: int # 1 ファイル当たりのイベント数
number_of_files: int # ファイル数
duration: Optional[float] = None # time_based のときの取得秒数
# 高度な設定
stream_mode: bool = False # リアルタイム書き込みモード
suppress_threshold: int = 1000 # 抑制閾値
max_retry: int = 3 # 最大リトライ回数
モードの意味#
モード |
用途 |
パラメータ |
例 |
|---|---|---|---|
|
固定イベント数の取得 |
|
1000 イベント × 5 ファイル |
|
固定時間の取得 |
|
30 秒間取得 |
4. Sampler: データ取得エンジン#
ファイル: src/haniwers/v1/daq/sampler.py
責務#
Device からの行を読み込む
RawEvent に変換
取得モード(count_based / time_based)に応じた集約
CSV ファイルへの出力
データ品質の監視(警告ログ)
重要なメソッド#
class Sampler:
def __init__(
self,
device: Device,
config: SamplerConfig,
output_dir: Optional[str | Path] = None,
show_progress: bool = True
):
"""
Args:
device: 接続済みの Device インスタンス
config: SamplerConfig 設定
output_dir: 出力ディレクトリ(指定時は config.workspace を上書き)
show_progress: プログレスバー表示の有無
"""
def run(self, files: int = 1) -> None:
"""
データ取得を実行
Args:
files: 取得するファイル数
"""
def stream_events(self, iterator) -> Iterator[RawEvent]:
"""
Device からのイベントをストリーミング
Returns:
RawEvent ジェネレータ
無効なデータはフィルタリングされ、警告ログが出力される
"""
def acquire_by_count(self) -> None:
"""イベント数ベースの取得(count_based モード)"""
def acquire_by_time(self) -> None:
"""時間ベースの取得(time_based モード)"""
重要な設計: ジェネレータパターンとストリーミング#
stream_events() はジェネレータで、検出器からのイベントをリアルタイムで処理します。
def stream_events(self, iterator: Iterator) -> Iterator[RawEvent]:
"""Generator that yields measurements one at a time as they arrive."""
invalid_count = 0
for _ in iterator:
event = self.read_event()
if event is not None:
yield event
else:
invalid_count += 1
self.logger.warning(
f"Skipped invalid/empty detector line (total skipped: {invalid_count})"
)
この設計により:
メモリ効率が良い(全データを保持しない)
リアルタイム処理が可能(1 イベントずつ処理)
無効なデータが自動的にフィルタリングされる
ユーザーはデータ品質問題をリアルタイムで観察できる
使用例#
from haniwers.v1.daq.device import Device
from haniwers.v1.daq.sampler import Sampler
from haniwers.v1.config.model import DeviceConfig, SamplerConfig
# デバイス設定
device_cfg = DeviceConfig(
port="/dev/ttyUSB0",
baudrate=115200,
timeout=1.0
)
# サンプラー設定(count_based)
sampler_cfg = SamplerConfig(
label="run001",
workspace="./data",
filename_prefix="cosmic_rays",
filename_suffix=".csv",
mode="count_based",
events_per_file=1000,
number_of_files=5,
)
# 接続してサンプリング
device = Device(device_cfg)
device.connect()
sampler = Sampler(device, sampler_cfg)
sampler.run(files=5)
device.disconnect()
データ品質と堅牢性#
検証の段階的な緩和(Graceful Degradation)#
Haniwers は 3 段階の検証を実施します:
厳密検証(Pydantic): 設定ファイル読み込み時に型と値を検証
これはユーザーエラーを早期に検出
緩いデータ検証(
RawEvent.from_serial()): 検出器データ解析時に形式を確認無効なデータを
Noneで返す(例外なし)DAQ セッションの継続を優先
警告ログ(
stream_events()): データ品質の監視無効なラインが発見されるたびに WARNING ログ
ユーザーは問題をリアルタイムで検知可能
実行時のエラー処理#
パターン 1: 通常のエラー(接続失敗など)#
try:
device = Device(cfg)
device.connect() # ポートが見つからない → 例外
except Exception as e:
typer.echo(f"[ERROR] Connection failed: {e}", err=True)
raise typer.Exit(code=1)
パターン 2: データ品質問題(検出器ノイズ)#
# これは例外を発生させず、ログ警告を出すだけ
event = RawEvent.from_serial("", pendulum.now()) # None を返す
# stream_events() が自動的に警告ログを出力
CLI との統合#
daq コマンドの全体フロー#
ファイル: src/haniwers/v1/cli/daq.py
設定ファイル読み込み(TOML または デフォルト)
CLI オプションで上書き
設定の検証
Device.connect()
タイムスタンプ付きディレクトリ作成
Sampler.run()
Device.disconnect()
CLI 例#
# 設定ファイルから取得
haniwers-v1 daq --config daq.toml
# CLI オプションで上書き
haniwers-v1 daq --config daq.toml --workspace ./output --events-per-file 5000
# CLI オプションのみ
haniwers-v1 daq --port /dev/ttyUSB0 --workspace ./data --events-per-file 1000
モード推論#
明示的な --mode フラグまたは --duration パラメータでモードが決定されます。
# count_based(デフォルト)
haniwers-v1 daq --config config.toml
# time_based(明示的)
haniwers-v1 daq --config config.toml --mode time_based --duration 60
# time_based(暗黙的)- --duration があるとモードは自動的に time_based に
haniwers-v1 daq --config config.toml --duration 30
一般的な開発タスク#
タスク 1: 新しいイベント属性を追加する#
例: センサー湿度を追跡したい#
RawEvent を更新 (
src/haniwers/v1/daq/model.py)
@dataclass
class RawEvent:
timestamp: pendulum.DateTime
ch1: int
ch2: int
ch3: int
coinc: int
temp_out: float
temp_in: float
humidity: float # 新属性
パーサーを更新 (
from_serialメソッド)
@classmethod
def from_serial(cls, line: str, time: pendulum.DateTime) -> "RawEvent | None":
values = line.strip().split()
if len(values) != 8: # 8 フィールドに変更
return None
try:
return cls(
timestamp=time,
ch1=int(values[0]),
ch2=int(values[1]),
ch3=int(values[2]),
coinc=int(values[3]),
temp_out=float(values[4]),
temp_in=float(values[5]),
humidity=float(values[6]),
)
except (ValueError, TypeError):
return None
CSV 出力を更新 (
Samplerクラス)
def _write_csv_header(self, csvfile):
fieldnames = [
"timestamp", "ch1", "ch2", "ch3", "coinc",
"temp_out", "temp_in", "humidity"
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
テストを追加
def test_raw_event_with_humidity():
"""Test parsing event with humidity field"""
line = "100 150 120 1 25.5 24.8 65.2"
event = RawEvent.from_serial(line, pendulum.now())
assert event is not None
assert event.humidity == 65.2
タスク 2: 新しい取得モードを追加する#
例: イベントレート制御モード(秒あたり N イベント)#
SamplerConfig に新モードを追加
class SamplerConfig(BaseModel):
mode: str = "count_based" # "rate_based" を追加可能
event_rate: Optional[float] = None # イベント/秒
Sampler に新メソッドを追加
class Sampler:
def acquire_by_rate(self) -> None:
"""イベントレート制御での取得"""
pass
def run(self, files: int = 1) -> None:
if self.config.mode == "rate_based":
self.acquire_by_rate()
# ... 既存のコードと同じ
ルーティングロジックを更新
# CLI で自動ルーティング
if config.mode == "rate_based":
sampler = Sampler(device, config)
sampler.run()
トラブルシューティング#
問題 1: DAQ が開始直後に停止する#
原因: 検出器通信エラー
デバッグ:
# ポート確認
haniwers-v1 port list
haniwers-v1 port test /dev/ttyUSB0
# ログレベルを DEBUG に
LOG_LEVEL=DEBUG haniwers-v1 daq --config config.toml
問題 2: CSV ファイルが空である#
原因: 全イベントが無効な形式
チェック:
警告ログを確認:
Skipped invalid/empty detector lineの出力検出器のデータ形式が期待値と一致しているか確認
検出器が実際にデータを送信しているか確認:
haniwers-v1 port test /dev/ttyUSB0
問題 3: 一部のイベントが欠落している#
原因: シリアル通信タイムアウト
解決方法:
# SamplerConfig で timeout を増やす
sampler_cfg = SamplerConfig(
# ...
max_retry=5 # リトライ回数を増やす
)
まとめ#
Device: シリアル通信の低レベル管理
RawEvent: 検出器データのパースと検証
SamplerConfig: 統合された設定モデル
Sampler: データ取得戦略の実装
Robustness: 無効なデータをフィルタリングしながら継続
これらの層を理解することで、DAQ システムの拡張やデバッグが効率的に行えます。