haniwers.v1.daq.sampler#
Module Contents#
Classes#
Collect detector measurements and save them to CSV files. |
Functions#
Run a complete acquisition session with device, config, and output directory. |
API#
- class haniwers.v1.daq.sampler.Sampler(device: Union[haniwers.v1.daq.device.Device, haniwers.v1.daq.mocker.Mocker, haniwers.v1.daq.mocker.RandomMocker], config: Union[haniwers.v1.config.model.DaqConfig, haniwers.v1.config.model.ScanConfig, haniwers.v1.config.model.SamplerConfig], output_dir: Optional[pathlib.Path] = None, show_progress: bool = True)#
Collect detector measurements and save them to CSV files.
What is Sampler? The recommended way to capture data from a detector (real or mock) and store it on disk. It handles all the details: reading measurements, adding timestamps, organizing files, and showing progress.
Key features: ✓ Reads events from any source: real Device, Mocker (CSV playback), or RandomMocker (synthetic data) ✓ Multiple acquisition modes: Fixed count or fixed time duration ✓ Streaming or buffered: Write events immediately or collect then write ✓ Progress tracking: Optional progress bar shows data collection status ✓ Timestamped files: Automatically generates human-readable filenames
Life cycle: 1. Create: Sampler(device, config, output_dir) 2. Acquire: Run acquisition (by count or by time) 3. Output: CSV files saved with timestamped names
Example (real detector with fixed count):
from pathlib import Path from haniwers.v1.config.model import DaqConfig from haniwers.v1.daq.device import Device from haniwers.v1.daq.sampler import Sampler # Setup device = Device(config.device) device.connect() # Create sampler sampler = Sampler( device=device, config=config.daq, output_dir=Path("./data") ) # Collect 1000 events into one file, show progress sampler.acquire_by_count( file_path=Path("./data/measurement.csv"), event_count=1000 ) device.disconnect()
Example (mock detector with multiple files):
from pathlib import Path from tempfile import TemporaryDirectory from haniwers.v1.daq.mocker import RandomMocker, MockerConfig from haniwers.v1.daq.sampler import Sampler from haniwers.v1.config.model import DaqConfig # Create mock device (generates synthetic data) mock_device = RandomMocker( config=MockerConfig(csv_path=None, speed=10.0), seed=42 # Reproducible random data ) # Configuration for DAQ daq_config = DaqConfig( label="test", workspace=".", filename_prefix="synthetic", filename_suffix=".csv", events_per_file=100, number_of_files=5, stream_mode=True # Write immediately, good for data safety ) # Create and run sampler with TemporaryDirectory() as tmpdir: sampler = Sampler( device=mock_device, config=daq_config, output_dir=Path(tmpdir), show_progress=True # Show progress bar ) sampler.run(mode="daq", files=5)
Advanced features: - sampler.acquire_by_count(): Collect fixed number of events - sampler.acquire_by_time(): Collect for fixed duration (e.g., 10 seconds) - show_progress: Optional progress bar (tqdm) during acquisition - stream_mode: Immediate write (safer) vs buffered (faster)
Initialization
Create a Sampler object with device and configuration.
What this does: Stores your detector settings (real, mock, or random) but does NOT start data collection yet. Call run() or acquire_by_count() or acquire_by_time() to actually collect measurements and save to CSV files.
Args: device (Device | Mocker | RandomMocker): The data source - Device: Real OSECHI detector connected to USB/serial port - Mocker: Playback pre-recorded CSV file (for replay/replay testing) - RandomMocker: Generate synthetic measurements (for development)
config (DaqConfig | ScanConfig | SamplerConfig): Settings for session - SamplerConfig: Modern config with mode and workspace built-in (NEW) - DaqConfig/ScanConfig: Legacy config (requires output_dir parameter) - events_per_file: How many measurements per CSV file - number_of_files: How many output files to create (for run() mode) - stream_mode: Write immediately (True) or buffer first (False) - filename_prefix: Name pattern for output files - (SamplerConfig only) mode: "count_based" or "time_based" - (SamplerConfig only) workspace: Directory for output files output_dir (Path, optional): Directory where CSV files will be saved - For SamplerConfig: Uses config.workspace (output_dir not needed) - For DaqConfig/ScanConfig: Required, must exist Example: Path("./data") or Path("/home/user/measurements") show_progress (bool, optional): Show progress bar during collection Default: True (shows tqdm progress bar) Set to False for scripts/batch jobs that don't need visual feedbackRaises: FileNotFoundError: If output directory doesn’t exist or can’t be created ValueError: If config missing required fields (events_per_file, number_of_files)
Example (with SamplerConfig - RECOMMENDED):
from pathlib import Path from haniwers.v1.daq.device import Device from haniwers.v1.daq.sampler import Sampler device = Device(config.device) device.connect() # Create sampler with SamplerConfig (workspace managed automatically) sampler = Sampler( device=device, config=config.sampler, # Has workspace built-in show_progress=True ) # Run acquisition based on config.mode (count_based or time_based) sampler.run(files=5) device.disconnect()
Example (with DaqConfig - LEGACY):
from pathlib import Path from haniwers.v1.daq.device import Device from haniwers.v1.daq.sampler import Sampler device = Device(config.device) device.connect() sampler = Sampler( device=device, config=config.daq, output_dir=Path("./data"), show_progress=True ) sampler.acquire_by_count(Path("./data/run1.csv"), 1000) device.disconnect()
- timestamped_filename(fid: int) pathlib.Path#
Generate a unique filename with timestamp for saving measurements.
What this does: Creates a filename automatically using: 1. Prefix from config (e.g., “data”) 2. Current timestamp (e.g., “2024-05-20T12h34m56s”) 3. File number for sequence (e.g., “000001”) 4. Suffix from config (e.g., “.csv”)
Args: fid (int): File sequence number (0, 1, 2, …) Used to distinguish multiple files from same session Example: fid=0 → “data_2024-05-20T12h34m56s_000000.csv”
Returns: Path: Full path to output file (including directory)
How it works: 1. Gets current time at method call time (not file creation time) 2. Formats as “YYYY-MM-DDTHH[h]MM[m]SS[s]” (human-readable, filesystem-safe) 3. Combines: {prefix}{timestamp}{fid:07d}{suffix} 4. Returns full path: output_dir / filename
Why filesystem-safe timestamp format: Uses “T12h34m56s” instead of “T12:34:56” to avoid colon character (Windows filesystems don’t allow colons in filenames)
Example:
sampler = Sampler(device, config, Path("./data")) # Generate three filenames file0 = sampler.timestamped_filename(0) # Returns: Path("./data/data_2024-05-20T14h32m11s_000000.csv") file1 = sampler.timestamped_filename(1) # Returns: Path("./data/data_2024-05-20T14h32m15s_000001.csv") file2 = sampler.timestamped_filename(2) # Returns: Path("./data/data_2024-05-20T14h32m19s_000002.csv")
Note: Timestamp updates for each file (each call gets current time) Sequence numbers always zero-padded to 7 digits (000000, 000001, etc.)
- read_event() RawEvent | None#
Read one measurement from the device and add timestamp.
What this does: 1. Waits for detector to send one line of measurement data 2. Records the exact time data was received (to nearest microsecond) 3. Parses the 7 sensor values from the detector format 4. Returns a RawEvent object with all data, or None if line is invalid
Invalid or empty lines are returned as None (not raising errors). This allows data collection to continue even if the detector sends corrupted data.
Returns: RawEvent | None: - RawEvent: Valid measurement with timestamp, ready for analysis - None: Empty or invalid line from detector (skipped gracefully)
How it works: 1. device.readline() - Blocks until detector sends data 2. pendulum.now() - Get timezone-aware timestamp (current time) 3. RawEvent.from_serial() - Parse detector format to RawEvent (or None) 4. Return RawEvent object or None
When to use: - Core of data acquisition loop - Usually called by stream_events() or collect_events() - Rarely called directly (use acquire_by_count() instead)
Note about None returns: stream_events() automatically skips None values, so only valid RawEvent objects are yielded to callers. This means data collection continues gracefully even when detector sends empty lines.
Raises: serial.SerialException: If device disconnected
Beginner tip: Use acquire_by_count() or acquire_by_time() instead of calling this repeatedly. Those methods handle iteration and file saving:
# DON'T do this: for i in range(1000): event = sampler.read_event() if event is not None: process(event) # DO this instead: sampler.acquire_by_count(Path("data.csv"), 1000)
Example (low-level, for understanding):
sampler = Sampler(device, config, Path("./data")) # Read one measurement (may be None if invalid line) event = sampler.read_event() if event is not None: print(f"Timestamp: {event.time}") print(f"Top sensor: {event.top}")
- stream_events(iterator: collections.abc.Iterator) collections.abc.Iterator[haniwers.v1.daq.model.RawEvent]#
Generator that yields measurements one at a time as they arrive.
What this does: Takes an iterator (count-based or time-based) and yields measurements one-by-one from the detector. Each time you ask for the next measurement, it reads from the device and returns a RawEvent object.
Invalid or empty lines from the detector are automatically skipped without raising errors. This allows data collection to continue even if the detector sends corrupted data. Warning logs are generated when invalid lines are encountered.
Args: iterator (Iterator): Controls when to stop reading - count_based_iterator(1000): Yield 1000 times - time_based_iterator(10.0): Yield for 10 seconds - range(5): Yield 5 times
Yields: RawEvent: One measurement at a time, as they arrive from detector (invalid lines are silently skipped)
How it works (generator pattern): 1. Loop through each iteration from the provided iterator 2. For each iteration, read one event from device (read_event()) 3. Skip invalid/empty lines (None values) and log warning 4. Yield valid RawEvent to caller 5. Pause until caller asks for next measurement 6. Repeat until iterator exhausted
Note on invalid data: When the detector sends empty or malformed lines (e.g., not exactly 7 values), read_event() returns None. The generator skips these and logs a warning, then tries again. This means fewer valid events may be collected than the iterator requested, but data collection continues gracefully instead of crashing.
When to use: - Memory-efficient for large data collection (one event at a time) - Allows processing events as they arrive - Usually called by save_events() or collect_events() - Rarely called directly
Beginner tip: This is a “lazy” generator - measurements are read only when requested. Contrast with collect_events() which reads ALL measurements first, then returns them as a list:
# Generator: One at a time (memory-efficient) for event in sampler.stream_events(iterator): print(event) # Process as events arrive # List: All at once (loads everything into memory) all_events = sampler.collect_events(iterator) print(f"Total: {len(all_events)}")
Example (low-level, for understanding):
sampler = Sampler(device, config, Path("./data")) # Create an iterator for 100 measurements iterator = sampler.count_based_iterator(100) # Stream events one at a time (invalid lines skipped automatically) count = 0 skipped = 0 for event in sampler.stream_events(iterator): count += 1 if count <= 3: print(f"Event {count}: {event.ch1}, {event.ch2}, {event.ch3}") print(f"Received {count} total events")
Performance note: Streaming is much more memory-efficient than collect_events() for large datasets because only one event is in memory at a time.
- collect_events(iterator: collections.abc.Iterator) list[haniwers.v1.daq.model.RawEvent]#
Read all measurements and return them as a list.
What this does: Reads measurements according to the provided iterator and collects them all into a Python list before returning. Opposite of streaming - waits for all data to arrive first, then gives you everything at once.
Args: iterator (Iterator): Controls how many measurements to collect - count_based_iterator(1000): Collect 1000 measurements - time_based_iterator(10.0): Collect for 10 seconds - range(5): Collect 5 measurements
Returns: list[RawEvent]: All measurements as a list Each element is a RawEvent object with timestamp and sensor values
How it works: 1. Use stream_events() to read from device one-by-one 2. Collect all RawEvent objects into a Python list 3. Return the complete list when done
When to use: - When you need all measurements before processing - Small to medium datasets (entire list fits in memory) - Buffered mode (not streaming) in save_events() - Further analysis after collection is complete
When NOT to use: - Large datasets (can run out of memory) - Real-time processing (wait for all data defeats the purpose) - Use stream_events() for streaming/real-time instead
Memory warning: For 1 million measurements (1M events × ~100 bytes each ≈ 100 MB): - collect_events() stores all 100 MB in RAM at once - stream_events() stores only 1 event (~100 bytes) at a time Use streaming for large datasets!
Beginner tip: Compare two approaches:
# Approach 1: Streaming (memory-efficient, one at a time) iterator = sampler.count_based_iterator(1000) for event in sampler.stream_events(iterator): print(f"Event: {event.ch1}") # Approach 2: Collecting (simple, but loads everything into RAM) iterator = sampler.count_based_iterator(1000) all_events = sampler.collect_events(iterator) print(f"Collected {len(all_events)} events") for event in all_events: print(f"Event: {event.ch1}")
Example:
sampler = Sampler(device, config, Path("./data")) # Collect exactly 100 measurements iterator = sampler.count_based_iterator(100) events = sampler.collect_events(iterator) print(f"Received {len(events)} measurements") print(f"First event timestamp: {events[0].timestamp}") print(f"Last event timestamp: {events[-1].timestamp}")
- save_events(file_path: pathlib.Path, source: Union[collections.abc.Iterator, list[haniwers.v1.daq.model.RawEvent]]) None#
Write measurements to a CSV file.
What this does: Takes measurements from either a stream (iterator) or a pre-collected list and writes them to a CSV file. Handles both streaming (write as you go) and buffered (collect first, then write) modes automatically.
Args: file_path (Path): Where to save the CSV file Example: Path(“./data/run1.csv”) or Path(“/home/user/measurement.csv”)
source (Iterator | list[RawEvent]): Where to get measurements from - Iterator: stream_events() or collect_events() output Reads from device/mock on-demand as file is written - list[RawEvent]: Pre-collected measurements Already have all data in memory, just write to fileHow it works: If source is a list: 1. Assume all measurements collected already 2. Write all rows to CSV at once
If source is an Iterator: - Check stream_mode from config - If stream_mode=True (default): Write events as they arrive (memory-efficient, good for large datasets) - If stream_mode=False: Collect all first, then write all at once (simpler, but needs more RAM)Output CSV format: Header row: timestamp,ch1,ch2,ch3,ch4,ch5,ch6,ch7 Data rows: 2024-10-19T14h32m45s,100,200,150,175,210,190,220 One row per measurement
When to use: - Standard data saving for DAQ operations - Called by acquire_by_count() and acquire_by_time() - Rarely called directly (use higher-level methods)
Beginner tip: Choose stream_mode in your config based on dataset size:
# For large datasets (millions of measurements): config.stream_mode = True # Write as you go, memory-efficient # For small datasets (thousands of measurements): config.stream_mode = False # Collect first, then write once
Example (low-level, for understanding):
sampler = Sampler(device, config, Path("./data")) # Option 1: Save from iterator (streaming) iterator = sampler.count_based_iterator(1000) sampler.save_events(Path("./data/stream.csv"), iterator) # Option 2: Collect first, then save iterator = sampler.count_based_iterator(1000) events = sampler.collect_events(iterator) sampler.save_events(Path("./data/collected.csv"), events) # Option 3: Use higher-level acquire_by_count (recommended) sampler.acquire_by_count(Path("./data/measurement.csv"), 1000)
Performance: - Stream mode: Best for large datasets, writes incrementally - Buffered mode: Good for post-processing, reads all first - Both produce identical CSV files
- acquire_by_count(file_path: pathlib.Path, event_count: int)#
Collect exactly N measurements and save to a CSV file (with progress bar).
What this does: Reads exactly event_count measurements from the detector and saves them to a CSV file. Optionally shows a progress bar counting down to completion.
Args: file_path (Path): Where to save the CSV file Example: Path(“./data/measurement.csv”)
event_count (int): Exact number of measurements to collect Example: 1000 collects 1000 measurements then stopsHow it works: 1. Create a counter iterator for event_count iterations 2. Wrap iterator with tqdm progress bar (if show_progress=True) 3. Read event_count measurements via save_events() 4. Write all measurements to CSV file 5. Display results
When to use: - Most common use case for data collection - When you know exactly how many measurements you need - DAQ sessions with fixed event counts - Reproducible experiments with known measurement counts
Output: CSV file with event_count + 1 rows (header + data):
timestamp,ch1,ch2,ch3,ch4,ch5,ch6,ch7 2024-10-19T14h32m45s,100,200,150,175,210,190,220 2024-10-19T14h32m46s,105,198,152,176,212,189,222 ... (1000 total data rows)Progress bar: If show_progress=True (default):
Events: 45%|████▌ | 450/1000 [00:05<00:06, 89.00 it/s]Beginner tip: This is the recommended way to collect a fixed amount of data:
sampler = Sampler(device, config, Path("./data")) # Collect 1000 measurements (simple and clear) sampler.acquire_by_count( file_path=Path("./data/run.csv"), event_count=1000 ) # File is now saved at ./data/run.csv print("✓ Collection complete")
Compare with time-based collection:
# Fixed count: Stop after 1000 measurements sampler.acquire_by_count(Path("./data/fixed.csv"), 1000) # Fixed time: Stop after 10 seconds sampler.acquire_by_time(Path("./data/timed.csv"), duration=10.0, sleep_interval=0.1)
Example with all options:
from pathlib import Path from haniwers.v1.daq.device import Device from haniwers.v1.daq.sampler import Sampler device = Device(config.device) device.connect() sampler = Sampler( device=device, config=config.daq, output_dir=Path("./measurements"), show_progress=True # Show progress bar ) # Collect 5000 measurements sampler.acquire_by_count( file_path=Path("./measurements/experiment1.csv"), event_count=5000 ) device.disconnect()
- acquire_by_time(file_path: pathlib.Path, duration: float, sleep_interval: float)#
Collect measurements for a fixed duration and save to a CSV file.
What this does: Reads measurements for exactly duration seconds from the detector and saves them to a CSV file. Useful when you want a time-limited data collection rather than a fixed event count.
Args: file_path (Path): Where to save the CSV file Example: Path(“./data/10second_run.csv”)
duration (float): How long to collect measurements (in seconds) Example: 10.0 collects data for 10 seconds sleep_interval (float): Delay between read attempts (in seconds) - 0.1: Check detector 10 times per second (default for scanning) - 0.01: Very frequent checks (high CPU, for fast events) - 1.0: Slow polling (low CPU, for slow events)How it works: 1. Record start time 2. Loop until duration seconds have passed: a. Attempt to read one measurement b. Sleep for sleep_interval seconds c. Check if duration elapsed 3. Write all measurements to CSV file
When to use: - Physics experiments with time-based measurements - Threshold scanning (typical use: 10 seconds per scan) - Background noise measurements - When you don’t know measurement rate in advance
Note about sleep_interval: - Measurements still come from detector at its natural rate - sleep_interval just controls checking frequency - Total events = duration × (detector_rate / sleep_interval)
Output: CSV file with variable number of rows (depends on detector rate):
timestamp,ch1,ch2,ch3,ch4,ch5,ch6,ch7 2024-10-19T14h32m45s,100,200,150,175,210,190,220 2024-10-19T14h32m45s,105,198,152,176,212,189,222 ... (varies, typically 100-1000 rows for 10 seconds)Progress bar: If show_progress=True (default):
Duration: 23%|██▎ | 2.3/10.0 [00:02<00:08, 1.00s/s]Beginner tip: This is used extensively in threshold scanning:
sampler = Sampler(device, config, Path("./data")) # Collect for 10 seconds (standard for scanning) sampler.acquire_by_time( file_path=Path("./data/threshold_scan.csv"), duration=10.0, # 10 seconds sleep_interval=0.1 # Check 10 times per second ) # File now contains all measurements collected during 10 seconds
Compare with count-based collection:
# Fixed count: Stop after 1000 measurements sampler.acquire_by_count(Path("./data/fixed.csv"), 1000) # Fixed time: Stop after 10 seconds (event count varies) sampler.acquire_by_time(Path("./data/timed.csv"), 10.0, 0.1)
Example (threshold scanning pattern):
from pathlib import Path from haniwers.v1.daq.device import Device from haniwers.v1.daq.sampler import Sampler device = Device(config.device) device.connect() sampler = Sampler( device=device, config=config.daq, output_dir=Path("./scan_results") ) # Scanning 3 thresholds, 10 seconds each for threshold_value in [250, 300, 350]: device.write(f"THRESHOLD {threshold_value}") sampler.acquire_by_time( file_path=Path(f"./scan_results/threshold_{threshold_value}.csv"), duration=10.0, sleep_interval=0.1 ) print(f"✓ Completed scan at threshold {threshold_value}") device.disconnect()
- time_based_iterator(duration: float, sleep_interval: float) collections.abc.Generator[None, None, None]#
Generator that yields for a fixed duration (time-based acquisition loop).
What this does: Yields control back to the caller (which reads one measurement) until the specified duration has elapsed. Used to implement time-limited data collection (“collect for 10 seconds”).
Args: duration (float): Total time to yield for (in seconds) Example: 10.0 yields for 10 seconds total
sleep_interval (float): Time to sleep between yields (in seconds) Example: 0.1 means "check every 0.1 seconds" Smaller intervals = more frequent checks but higher CPU usage Larger intervals = less frequent checks but lower CPU usageYields: None: Each yield means “read one measurement, then continue”
How it works: 1. Record start time at first yield 2. Loop: a. Yield None (signal to caller to read one event) b. Sleep for sleep_interval seconds c. Check if total elapsed_time >= duration 3. Stop yielding when duration exceeded
When to use: - Core of time-based data acquisition - Called by acquire_by_time() - Rarely called directly
When NOT to use: - For fixed event counts: use count_based_iterator() instead - For manual iteration: Too low-level, use acquire_by_time()
Beginner tip: This is automatically used by acquire_by_time():
# High-level (recommended): sampler.acquire_by_time(Path("data.csv"), duration=10.0, sleep_interval=0.1) # Low-level (not recommended): iterator = sampler.time_based_iterator(10.0, 0.1) for _ in iterator: event = sampler.read_event() # This is just one event
Example (understanding sleep_interval):
# Frequent checks (high CPU): iterator = sampler.time_based_iterator(duration=5.0, sleep_interval=0.01) # Yields ~500 times in 5 seconds (every 0.01 seconds) # Standard checks: iterator = sampler.time_based_iterator(duration=5.0, sleep_interval=0.1) # Yields ~50 times in 5 seconds (every 0.1 seconds) # Slow checks (low CPU): iterator = sampler.time_based_iterator(duration=5.0, sleep_interval=0.5) # Yields ~10 times in 5 seconds (every 0.5 seconds)
Performance notes: - Smaller sleep_interval = more responsive but higher CPU usage - Larger sleep_interval = lower CPU but less responsive - Choose based on your detector’s measurement rate and CPU constraints - Default (0.1s) is standard for physics detector scanning
Exact timing: - Uses time.time() for wall-clock accuracy - Timing includes measurement read time, so total may exceed duration slightly - Suitable for 1-10 second measurements, not for sub-millisecond precision
- count_based_iterator(counts: int)#
Create an iterator that yields exactly N times (count-based acquisition loop).
What this does: Returns a simple counter from 0 to counts-1. Used to implement fixed-count data collection (“collect 1000 measurements”).
Args: counts (int): Exact number of times to yield Example: 1000 yields 1000 times (0 through 999)
Yields: int: Counter value (0, 1, 2, …, counts-1)
How it works: 1. Create a range(0, counts) iterator 2. Each time called, return next value in the range 3. Stop after yielding counts times
When to use: - Core of count-based data acquisition - Called by acquire_by_count() - Rarely called directly
When NOT to use: - For time-based collection: use time_based_iterator() instead - For manual iteration: use acquire_by_count() instead
Beginner tip: This is automatically used by acquire_by_count():
# High-level (recommended): sampler.acquire_by_count(Path("data.csv"), event_count=1000) # Low-level (not recommended): iterator = sampler.count_based_iterator(1000) for i in iterator: event = sampler.read_event() # This is just one event if i < 3: print(f"Event {i}")
Example (understanding iteration):
# Collect exactly 100 measurements iterator = sampler.count_based_iterator(100) # Process with different iteration patterns for index in iterator: event = sampler.read_event() # Index tells you which measurement this is (0-99) if index == 0: print("First measurement") elif index == 99: print("Last measurement")
Performance: - Extremely efficient: Just a simple counter - No sleep/timing overhead (unlike time_based_iterator) - Perfect for fixed-size experiments - Predictable number of measurements
Equivalent to:
# This: iterator = sampler.count_based_iterator(10) for i in iterator: pass # Is equivalent to: for i in range(10): pass
- run(mode: Optional[str] = None, files: Optional[int] = None) Optional[list[haniwers.v1.daq.model.RawEvent]]#
Run a complete acquisition session collecting N files of measurements.
What this does: Collects data into multiple output files based on the mode. Each file gets a unique timestamped filename, with progress bar showing overall progress across all files.
When stream_mode=False, returns all collected events as a list instead of writing to files. Useful for threshold scanning where you need to aggregate results in memory.
Args: mode (str, optional): Acquisition mode determines how measurements are collected - “daq”: Collect fixed event count per file (use acquire_by_count) - “scan”: Collect for fixed duration per file (use acquire_by_time) - “time_based”: Same as “scan” (for SamplerConfig compatibility) - “count_based”: Same as “daq” (for SamplerConfig compatibility) - “mock”: Same as “daq” (for mock detector testing) If not provided, uses self.mode from config (for SamplerConfig)
files (int, optional): Number of output CSV files to create Example: 5 creates 5 separate CSV files If not provided, uses self.files from configReturns: - If stream_mode=True: None (writes to files as usual) - If stream_mode=False: list[RawEvent] with all collected events
How it works (daq mode - most common): 1. Loop files times: a. Generate timestamped filename (includes directory, file number) b. Collect exactly config.events_per_file measurements c. If stream_mode=True: write to CSV file d. If stream_mode=False: collect events in memory e. Update progress bar 2. All measurements go to directory specified in init
How it works (scan mode - threshold scanning): 1. Loop files times: a. Generate timestamped filename b. Collect measurements for duration seconds c. Sleep 0.5 seconds between checks d. If stream_mode=True: write to CSV file e. If stream_mode=False: collect events in memory f. Update progress bar
When to use: - High-level interface for batch data collection - When you need multiple files from one session - DAQ mode for regular data collection - Scan mode for threshold scanning experiments - stream_mode=False for threshold scanning where results need aggregation - Rarely call directly: usually managed by CLI command
Output (stream_mode=True): Creates N timestamped CSV files in output_dir:
data_2024-10-19T14h32m45s_000000.csv (1000 measurements) data_2024-10-19T14h32m55s_000001.csv (1000 measurements) data_2024-10-19T14h33m05s_000002.csv (1000 measurements) data_2024-10-19T14h33m15s_000003.csv (1000 measurements) data_2024-10-19T14h33m25s_000004.csv (1000 measurements)Output (stream_mode=False): Returns collected events as list[RawEvent] for aggregation.
Progress bar:
Files: 40%|████ | 2/5 [00:10<00:15, 5.00s/file]Beginner tip: For most use cases, use the higher-level CLI interface instead:
# CLI (recommended): haniwers-v1 daq --config config.toml --files 5 # Low-level Python (rarely used): sampler = Sampler(device, config, Path("./data")) sampler.run(mode="daq", files=5)
Mode comparison:
# DAQ mode (regular data collection): sampler.run(mode="daq", files=5) # Creates 5 files, each with events_per_file measurements # Scan mode (threshold scanning): sampler.run(mode="scan", files=5) # Creates 5 files, each collected for 10 seconds # Scan with in-memory aggregation: sampler.stream_mode = False events = sampler.run(mode="time_based", files=1) # Returns list[RawEvent] instead of writing to file
Advanced example:
from pathlib import Path from haniwers.v1.daq.device import Device from haniwers.v1.daq.sampler import Sampler device = Device(config.device) device.connect() sampler = Sampler( device=device, config=config.daq, output_dir=Path("./data"), show_progress=True ) # Collect 5 files, each with 1000 measurements print("Starting data acquisition...") sampler.run(mode="daq", files=5) print("✓ Data collection complete") device.disconnect()
Threshold scanning example (with aggregation):
# Use stream_mode=False to get events for aggregation sampler.stream_mode = False events = sampler.run(mode="time_based", files=1) # Collect for duration seconds # Now you can aggregate the results result = aggregate_scan_result(events, channel=1, vth=300, duration=10)
Note about scan mode: Hardcoded values for scanning (10 seconds, 0.5s sleep interval) are meant for physics detector threshold scanning. For custom durations, use acquire_by_time() directly or modify this method.
- static sanitize(event: list) list#
Convert raw CSV row to typed values (empty→None, text→float/int).
What this does: Takes a list of strings (from CSV) and converts to proper Python types: - Empty strings become None - Numbers become int (if whole) or float (if decimal) - Non-numbers stay as strings
Args: event (list): List of string values from CSV row
Returns: list: Same values but with proper Python types
How it works: For each value in the list: 1. If empty string (“”) → convert to None 2. Try to convert to float 3. If successful and is whole number (e.g., 100.0) → convert to int 4. If successful and has decimal → keep as float 5. If conversion fails → keep as string
When to use: - Processing CSV data to clean up string values - Converting detector measurements from strings to numbers - Part of data preprocessing pipeline - Rarely called directly
Beginner tip: This is usually called automatically by data processing code:
# Automatic (preferred): event = RawEvent.from_list(["2024-10-19T14h32m45s", "100", "200", ...]) # Manual (for understanding): raw_values = ["2024-10-19T14h32m45s", "100", "", "200.5", "invalid"] cleaned = Sampler.sanitize(raw_values) # Result: ["2024-10-19T14h32m45s", 100, None, 200.5, "invalid"]
Example conversions:
# Strings to integers Sampler.sanitize(["100", "200", "300"]) # Returns: [100, 200, 300] # Empty strings to None Sampler.sanitize(["100", "", "300"]) # Returns: [100, None, 300] # Decimals stay as float Sampler.sanitize(["100.5", "200.0"]) # Returns: [100.5, 200] # Note: 200.0 becomes int(200) # Invalid strings stay as strings Sampler.sanitize(["100", "abc", "300"]) # Returns: [100, "abc", 300]
Data type mapping: “100” → 100 (int) “100.0” → 100 (int, because no fractional part) “100.5” → 100.5 (float) “” → None “abc” → “abc” (string, unchanged) “1e5” → 100000.0 (scientific notation)
Performance: - Reasonably efficient for typical CSV rows (7-10 values) - Suitable for processing large CSV files - Worth the type safety gained
- static tqdm_wrapper(iterable, desc: Optional[str] = None, show: bool = True)#
Optionally wrap an iterable with a progress bar (tqdm).
What this does: If show=True: Returns an iterable that displays a progress bar. If show=False: Returns the iterable unchanged (no progress bar). Useful for conditional progress display in batch processing.
Args: iterable: Any iterable (list, range, generator, etc.) Example: range(1000), [1, 2, 3, 4, 5], file_list, etc.
desc (str, optional): Label for progress bar Example: "Events", "Files", "Duration" Only used if show=True show (bool, optional): Whether to show progress bar Default: True (show progress) Set to False for scripts/batch jobsReturns: tqdm object (if show=True) or original iterable (if show=False)
How it works: - If show=True: Wraps with tqdm() for progress bar display - If show=False: Returns iterable unchanged - Calling code iterates the same way either way
When to use: - Conditional progress display (interactive vs batch) - Usually called internally by acquire_by_count(), acquire_by_time() - Rarely called directly
Progress bar example (show=True):
Events: 45%|████▌ | 450/1000 [00:05<00:06, 89.00 it/s]Beginner tip: The calling code doesn’t need to know about tqdm:
# This works the same either way: iterator = sampler.tqdm_wrapper(range(1000), desc="Events", show=True) for i in iterator: print(i) iterator = sampler.tqdm_wrapper(range(1000), desc="Events", show=False) for i in iterator: # Same code, no progress bar print(i)
Use cases:
# Interactive script: Show progress bar = Sampler.tqdm_wrapper( range(1000), desc="Processing", show=True # User sees progress bar ) # Batch/automated script: Don't show progress bar = Sampler.tqdm_wrapper( range(1000), desc="Processing", show=False # No output to terminal ) # Dynamic based on condition: verbose = True # Could come from command-line flag bar = Sampler.tqdm_wrapper( range(1000), desc="Processing", show=verbose )
Performance: - show=False: No overhead (returns iterable unchanged) - show=True: Minimal overhead (~1% slowdown for fast operations) - Progress bar updates once per iteration
Why use this pattern: - Cleaner than if/else statements in calling code - Consistent interface regardless of display preference - Easy to add progress bars to functions without changing loop logic - Industry-standard pattern for CLI tools
- static mock_sample(*args, **kwargs)#
Generate a mock RawEvent for testing or simulation.
What this does: Returns hardcoded mock data [“mock_event”] for testing purposes. Useful for unit tests that don’t have a real detector or mocker.
Returns: list: Always returns [“mock_event”] (hardcoded test data)
When to use: - Unit testing code that uses Sampler - Verifying Sampler logic without detector - Debugging file I/O without hardware - Demonstration purposes
When NOT to use: - For realistic mock data: Use RandomMocker instead - For testing with real detector: Use Device class - For production code: Don’t use mock_sample at all
Note: This method accepts *args and **kwargs but ignores them (for flexibility)
Example:
# Get mock data (for testing) mock_event = Sampler.mock_sample() # Returns: ["mock_event"] # Can be called with any arguments (ignored): result = Sampler.mock_sample("arg1", "arg2", kwarg1="value") # Still returns: ["mock_event"]
Why this exists: Placeholder for potential future enhancements to mock data generation. Current implementation is intentionally simple for clarity.
- haniwers.v1.daq.sampler.run_session(mode: str, device: haniwers.v1.daq.device.Device, config: Union[haniwers.v1.config.model.DaqConfig, haniwers.v1.config.model.ScanConfig], output_dir: pathlib.Path) None#
Run a complete acquisition session with device, config, and output directory.
What this does: High-level function that creates a Sampler and runs a complete acquisition session. Handles all the setup and execution for a standard DAQ or scan run.
Args: mode (str): Acquisition mode - “daq”: Data acquisition (fixed event count) - “scan”: Threshold scanning (fixed duration)
device (Device | Mocker | RandomMocker): Data source - Device: Real detector connected to serial port - Mocker: Replay pre-recorded CSV file - RandomMocker: Generate synthetic measurements config (DaqConfig | ScanConfig): Configuration for the session - Specifies number_of_files (how many output CSV files) - Contains events_per_file or time-based settings - Stream mode and filename preferences output_dir (Path): Directory where CSV files will be saved Must exist and be writableHow it works: 1. Create Sampler object with provided device and config 2. Call sampler.run() with mode and number_of_files from config 3. Function returns when all files have been collected
When to use: - Standard way to start DAQ/scan sessions from Python code - Higher-level than Sampler.run() directly - Recommended for scripts and batch processing - Called by CLI commands internally
When NOT to use: - For complex acquisition patterns (use Sampler directly) - For multiple devices (create multiple Samplers) - For custom file handling (use Sampler.acquire_by_count/time)
Raises: FileNotFoundError: If output_dir doesn’t exist ValueError: If config missing required fields
Example (standard DAQ):
from pathlib import Path from haniwers.v1.daq.device import Device from haniwers.v1.daq.sampler import run_session from haniwers.v1.config.loader import ConfigLoader # Load configuration loader = ConfigLoader(Path("config.toml")) cfg = loader.config # Connect to detector device = Device(cfg.device) device.connect() # Run acquisition session run_session( mode="daq", device=device, config=cfg.daq, output_dir=Path("./data") ) device.disconnect() print("✓ Acquisition complete")
Example (threshold scanning):
from pathlib import Path from haniwers.v1.daq.device import Device from haniwers.v1.daq.sampler import run_session from haniwers.v1.config.loader import ConfigLoader loader = ConfigLoader(Path("config.toml")) cfg = loader.config device = Device(cfg.device) device.connect() # Run threshold scan run_session( mode="scan", device=device, config=cfg.scan, output_dir=Path("./scan_results") ) device.disconnect()
Equivalent to:
# This: run_session("daq", device, config, Path("./data")) # Is equivalent to: sampler = Sampler(device, config, Path("./data")) sampler.run(mode="daq", files=config.number_of_files)