haniwers.v1.daq.device#
Serial device communication for cosmic ray detector.
What is this module? This module handles communication with the OSECHI detector over a serial port (USB cable). It provides two ways to talk to the detector: 1. Low-level functions: Direct access to serial port operations 2. High-level Device class: Recommended for most users (easier and safer)
Why use this? The detector sends data through a USB cable. This module makes it easy to: - Connect to the detector - Send commands to the detector - Read data back from the detector - Handle errors gracefully
When to use Device class vs functions? Use Device class (recommended): - Most common use case - Handles connection lifecycle automatically - Built-in logging for debugging - Cleaner code
Use low-level functions:
- When you need fine-grained control
- Building higher-level abstractions
- Advanced troubleshooting
Example with Device class (recommended):
from haniwers.v1.config.model import DeviceConfig
from haniwers.v1.daq.device import Device
# Create configuration
config = DeviceConfig(
label="detector",
port="/dev/ttyUSB0",
baudrate=115200,
timeout=1.0
)
# Use Device class
device = Device(config)
device.connect()
device.write("READ")
line = device.readline()
device.disconnect()
Example with low-level functions:
from haniwers.v1.daq.device import connect, write, readline, disconnect
device = connect(config)
write(device, "READ")
line = readline(device)
disconnect(device)
Module Contents#
Classes#
Easy-to-use interface for talking to the cosmic ray detector. |
Functions#
Open a serial connection to the detector. |
|
Read one line of data from the detector. |
|
Send a command or data to the detector. |
|
Clear any leftover data in the connection buffers. |
|
Close the connection to the detector. |
|
Check if the detector is ready to connect. |
API#
- haniwers.v1.daq.device.connect(config: haniwers.v1.config.model.DeviceConfig) serial.Serial#
Open a serial connection to the detector.
What this does: Opens a USB serial connection to the detector using the settings from your configuration (port, speed, timeout).
Args: config (DeviceConfig): Connection settings (port, baudrate, timeout)
Returns: serial.Serial: An open connection object. Use this to send commands and read data from the detector.
Raises: serial.SerialException: If the port doesn’t exist or is already in use
Beginner tip: For most use cases, use the Device class instead of this function. The Device class handles connection lifecycle and is safer:
```python device = Device(config) device.connect() # Use this pattern instead ```
Example:
from haniwers.v1.config.model import DeviceConfig from haniwers.v1.daq.device import connect config = DeviceConfig( port="/dev/ttyUSB0", baudrate=115200, timeout=1.0 ) device = connect(config)
- haniwers.v1.daq.device.readline(device: serial.Serial) str#
Read one line of data from the detector.
What this does: Waits for data from the detector, reads one complete line, and converts it from binary data into readable text (a Python string).
Args: device (serial.Serial): An open connection from connect() or Device.connect()
Returns: str: The data as text, with extra whitespace removed
Raises: UnicodeDecodeError: If detector sends non-UTF-8 data (rare, indicates hardware issue) serial.SerialException: If the connection is broken
How it works: 1. Waits for data from detector 2. Reads until it sees a newline character (\n) 3. Converts bytes to text (UTF-8 decoding) 4. Removes leading/trailing whitespace 5. Returns the text string
Beginner tip: Use the Device class which handles this for you:
```python device = Device(config) device.connect() line = device.readline() # Much safer! ```
Example:
from haniwers.v1.daq.device import readline line = readline(device) print(f"Received: {line}") # Output: "1 2 3 4 5 6 7"
- haniwers.v1.daq.device.write(device: serial.Serial, data: str | bytes) int#
Send a command or data to the detector.
What this does: Takes your message and sends it to the detector through the serial connection. Converts text to binary as needed.
Args: device (serial.Serial): An open connection from connect() data (str | bytes): The message to send - str: Text command (e.g., “READ”, “THRESHOLD 300”) Automatically converted to bytes using UTF-8 - bytes: Raw binary data (e.g., b"\x01\x14\x60") Sent as-is without conversion
Returns: int: Number of bytes successfully sent to the detector
How it works: 1. If data is text: Convert to bytes using UTF-8 encoding 2. If data is bytes: Send as-is 3. Send all bytes to the detector 4. Return how many bytes were sent
Beginner tip: Use the Device class for safety:
```python device = Device(config) device.connect() bytes_sent = device.write("READ") ```Example:
from haniwers.v1.daq.device import write # Send text command bytes_sent = write(device, "READ") print(f"Sent {bytes_sent} bytes") # Send raw bytes bytes_sent = write(device, b"\x01\x14\x60")
- haniwers.v1.daq.device.flush(device: serial.Serial) None#
Clear any leftover data in the connection buffers.
What this does: Removes any stale data that’s sitting in the connection pipes (buffers). Useful when switching between different commands or operations.
Args: device (serial.Serial): An open connection from connect()
Why use this: If detector sends old data but you don’t read it, it stays in the buffer. Next time you read(), you might get old data instead of new data. Call flush() to clean things out before starting a new operation.
Example:
device = Device(config) device.connect() device.flush() # Clear any old data device.write("READ") line = device.readline() # Gets fresh data
- haniwers.v1.daq.device.disconnect(device: serial.Serial) None#
Close the connection to the detector.
What this does: Properly shuts down the serial connection, releasing the USB port so other programs can use it.
Args: device (serial.Serial): An open connection from connect()
Why use this: Always close connections when done. If you don’t: - Port stays locked (other programs can’t use detector) - Memory resources aren’t released - Connection may stay open in background
Example:
device = Device(config) device.connect() device.write("READ") line = device.readline() device.disconnect() # Always close when done!
- haniwers.v1.daq.device.is_available(config: haniwers.v1.config.model.DeviceConfig) bool#
Check if the detector is ready to connect.
What this does: Tests if the USB port exists and isn’t already in use by another program. Does NOT communicate with the detector, just checks port availability.
Args: config (DeviceConfig): Connection settings (port, baudrate, timeout)
Returns: bool: True if port is available and ready to connect, False otherwise
When to use: - Before connecting: Make sure port is free - Debugging: Figure out why connect() fails - Finding detectors: Check which ports have connected detectors
Example:
from haniwers.v1.config.model import DeviceConfig from haniwers.v1.daq.device import is_available config = DeviceConfig( label="detector", port="/dev/ttyUSB0", baudrate=115200, timeout=1.0 ) if is_available(config): print("✓ Detector is ready to use") device = Device(config) device.connect() else: print("✗ Port not available. Check:") print(" - Is detector plugged in?") print(" - Is another program using this port?") print(" - Try different port: haniwers-v1 port list")
- class haniwers.v1.daq.device.Device(config: haniwers.v1.config.model.DeviceConfig)#
Easy-to-use interface for talking to the cosmic ray detector.
What is Device? The recommended way to communicate with the OSECHI detector. It handles all the connection details so you focus on your work.
Key features: ✓ Simple: Just call connect(), write(), readline(), disconnect() ✓ Safe: Handles errors and edge cases automatically ✓ Logged: All operations are recorded for debugging ✓ Tested: Proven pattern used throughout haniwers
Life cycle: 1. Create: Device(config) 2. Connect: device.connect() 3. Use: device.write(), device.readline() 4. Disconnect: device.disconnect()
Example (recommended pattern):
```python from haniwers.v1.config.model import DeviceConfig from haniwers.v1.daq.device import Device # Setup config = DeviceConfig( label="detector", port="/dev/ttyUSB0", baudrate=115200, timeout=1.0 ) # Connect to detector device = Device(config) device.connect() # Talk to detector device.write("READ") line = device.readline() print(f"Received: {line}") # Cleanup device.disconnect() ```Advanced features: - device.flush(): Clear stale data from buffers - device.is_available(): Check if port is ready before connecting - device.with_timeout(sec): Temporarily change read timeout
Initialization
Create a Device object with connection settings.
What this does: Stores your detector configuration settings but does NOT connect yet. Call device.connect() afterward to actually open the connection.
Args: config (DeviceConfig): Connection settings (port, baudrate, timeout)
Example:
from haniwers.v1.config.model import DeviceConfig from haniwers.v1.daq.device import Device config = DeviceConfig( port="/dev/ttyUSB0", baudrate=115200, timeout=1.0 ) # Create device object (connection not open yet) device = Device(config) # Now connect device.connect()
- connect() None#
Open the USB connection to the detector.
What this does: Opens the serial port and establishes communication with the detector. After this succeeds, you can use write() and readline() methods.
Raises: RuntimeError: If the port doesn’t exist, is in use, or detector not responding
Error tips: “Port not found”: Wrong port number, detector not plugged in “Permission denied”: Need to add user to dialout group (Linux) “Device busy”: Another program is using this port
Example:
device = Device(config) try: device.connect() print("✓ Connected to detector") except RuntimeError as e: print(f"✗ Connection failed: {e}")
- readline() str#
Read one line of data from the detector.
What this does: Waits for the detector to send data, reads one complete line, and returns it as readable text.
Returns: str: The data received from detector, cleaned up (whitespace removed)
Raises: serial.SerialException: If connection is broken UnicodeDecodeError: If detector sends corrupted data (very rare)
How to use: Call write() first to send a command, then readline() to get the response
Example:
device = Device(config) device.connect() # Send command device.write("READ") # Read response line = device.readline() print(f"Received: {line}") # Output: "1 2 3 4 5 6 7" device.disconnect()
- write(data: str | bytes) int#
Send a command or data to the detector.
What this does: Takes your message and sends it to the detector over the USB connection. Automatically converts text to binary format if needed.
Args: data (str | bytes): The message to send to detector - str: Text command (e.g., “READ”, “THRESHOLD 300”) Automatically converted to bytes - bytes: Raw binary data (e.g., b"\x01\x14\x60") Sent as-is
Returns: int: Number of bytes successfully sent to detector
Raises: serial.SerialTimeoutException: If detector not responding (timeout)
Example:
device = Device(config) device.connect() # Send text command bytes_written = device.write("READ") print(f"Sent {bytes_written} bytes") # Send raw bytes bytes_written = device.write(b"\x01\x14\x60") device.disconnect()
- flush() None#
Clear any leftover data in the connection buffers.
What this does: Removes stale data that may be sitting in the USB connection pipes. Useful when switching between different commands or operations.
When to use: Before starting a new sequence of reads/writes to ensure clean state After errors to clear corrupted data Between different command sequences
Example:
device = Device(config) device.connect() # Clear any stale data device.flush() # Now read fresh data device.write("READ") line = device.readline()
- disconnect() None#
Close the USB connection to the detector.
What this does: Properly shuts down the serial connection and releases the USB port so other programs can use it if needed.
Important: Always call disconnect() when done, especially in scripts that run multiple times. If you don’t, the port stays locked and connection resources aren’t released.
Example:
device = Device(config) device.connect() try: device.write("READ") line = device.readline() finally: # Always close, even if there's an error device.disconnect()
Better pattern (using context manager):
# Future enhancement: Device may support 'with' statement # For now, always use try/finally as shown above
- with_timeout(sec: float)#
Temporarily change the read timeout for one operation.
What this does: Changes how long readline() will wait for data from the detector, but only for the code inside the ‘with’ block. After the block exits, the timeout goes back to the original value.
Args: sec (float): New timeout in seconds (e.g., 1.0, 2.5, 0.5)
When to use: Some detector operations are slow and need longer timeout Some operations are fast and you want to detect missing data quickly Debugging timing issues
Example:
device = Device(config) device.connect() # Normal timeout (from config, usually 1.0 seconds) line = device.readline() # For a slow operation, increase timeout with device.with_timeout(5.0): slow_line = device.readline() # Back to normal timeout fast_line = device.readline() device.disconnect()
- is_available() bool#
Check if the detector is ready to connect (port exists and is free).
What this does: Tests if the USB port exists and isn’t already in use by another program. Does NOT actually connect to the detector, just checks that the port is available.
Returns: bool: True if port is available, False if port doesn’t exist or is busy
When to use: Before calling connect() to catch problems early Debugging connection issues Checking if detector is plugged in (gives indirect confirmation)
Example:
device = Device(config) if device.is_available(): print("✓ Port is available, connecting...") device.connect() else: print("✗ Port not available") print(" Check:") print(" - Is detector plugged in?") print(" - Is another program using this port?") print(" - Use: haniwers-v1 port list")
- get_mac_address() str#
Retrieve ESP32 MAC address from connected device.
What this does: Reads the MAC address from the connected ESP32 device using esptool. Returns the address in filesystem-safe format (lowercase hex, no colons) or falls back to “unknown” if retrieval fails.
Returns: str: MAC address as 12 lowercase hex chars (e.g., ‘aabbccddee00’) or ‘unknown’ if retrieval fails
How it works: 1. Calls esptool.main() with read_mac command on configured port 2. Parses output for MAC address pattern 3. Formats using format_mac_address() helper 4. Catches exceptions (timeout, not found, parse error) 5. Logs warning and returns “unknown” on any error
When to use: - After device.connect() to get unique device identifier - During DAQ startup to embed in filenames - For device tracking and traceability
Raises: Nothing (always returns valid string, never raises exceptions)
Examples:
device = Device(config) device.connect() # Get MAC address for filename mac = device.get_mac_address() print(f"Device MAC: {mac}") # Output: "Device MAC: aabbccddee00" or "Device MAC: unknown" device.disconnect()
Error cases handled: - esptool not installed: Returns “unknown”, logs warning - Device not responding: Returns “unknown”, logs warning - Timeout: Returns “unknown”, logs warning - Invalid MAC format: Returns “unknown”, logs warning