feat: add servo motor driver (Modbus RTU + CiA402)#4
feat: add servo motor driver (Modbus RTU + CiA402)#4
Conversation
…shaker) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Don't retry Modbus exception responses (application-level errors) - Fix jog() far-position sign inversion - Replace magic hex constants with ControlWordBits expressions - Add abort checks to all polling loops (_wait_for_state/motion/homing) - Add bounds checking to move_relative() in real ServoMotor class - abort() now calls quick_stop() on all axes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
18e93b1 to
d81b91e
Compare
There was a problem hiding this comment.
Pull request overview
Adds a new NiMotion Z4-oriented servo motor driver to the fluidics control layer, including a consolidated Modbus RTU client and a CiA402-based motion/state-machine implementation, plus unit tests and an implementation plan document.
Changes:
- Added
fluidics/control/modbus_rtu.pywith CRC16, frame builders, and a thread-safe Modbus RTU client with retry logic. - Added
fluidics/control/servo_motor.pyimplementing CiA402 state decoding/transitions, axis configuration, motion primitives, abort support, and a simulation class. - Added unit tests for Modbus RTU primitives/client defaults and servo motor state machine + simulation behaviors; added a hardware drivers plan doc.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| software/fluidics/control/modbus_rtu.py | New Modbus RTU utilities + client used by the servo driver |
| software/fluidics/control/servo_motor.py | New CiA402 servo driver + simulation and axis configuration |
| software/tests/unit/control/test_modbus_rtu.py | Unit tests for CRC, frame building, and basic client behavior |
| software/tests/unit/control/test_servo_motor.py | Unit tests for axis config conversions, state machine, and simulation |
| software/docs/plans/2026-03-09-hardware-drivers.md | Implementation plan/reference for the driver work |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def test_known_crc_value(self): | ||
| # Modbus CRC of [0x01, 0x03, 0x00, 0x00, 0x00, 0x01] = 0x840A | ||
| data = bytes([0x01, 0x03, 0x00, 0x00, 0x00, 0x01]) | ||
| assert calculate_crc(data) == 0x840A | ||
|
|
There was a problem hiding this comment.
The CRC value in this plan snippet (0x840A) conflicts with the implemented/tested representation in tests/unit/control/test_modbus_rtu.py (which expects calculate_crc(...) == 0x0A84 and appends low-byte-first for wire order). Consider updating the plan text to match the code’s CRC integer convention, or explicitly clarify which endianness each value refers to, to avoid future confusion.
| from enum import Enum, IntEnum | ||
| from typing import Optional |
There was a problem hiding this comment.
Optional is imported but not used in this module. Please remove the unused import to keep linting/static analysis clean.
| response = self._serial.read(expected_response_len) | ||
| if len(response) < expected_response_len: | ||
| raise ModbusError( | ||
| f"Incomplete response: expected {expected_response_len} " | ||
| f"bytes, got {len(response)}", | ||
| slave_id=frame[0], | ||
| ) | ||
|
|
There was a problem hiding this comment.
_send_receive() treats any response shorter than expected_response_len as a retryable "Incomplete response". Modbus exception responses are typically 5 bytes long (slave, fc|0x80, code, crc16), so they will be misclassified and retried, and the non-retry exception handling block will never run. Consider special-casing len(response) == 5 and (response[1] & 0x80) (with CRC verification) before raising the incomplete-response error, or reading/parsing enough bytes to detect exception frames first.
| try: | ||
| self._serial.reset_input_buffer() | ||
| self._serial.write(frame) | ||
| time.sleep(FRAME_INTERVAL) | ||
|
|
||
| response = self._serial.read(expected_response_len) | ||
| if len(response) < expected_response_len: | ||
| raise ModbusError( | ||
| f"Incomplete response: expected {expected_response_len} " | ||
| f"bytes, got {len(response)}", | ||
| slave_id=frame[0], | ||
| ) | ||
|
|
||
| if not _verify_crc(response): | ||
| raise ModbusError("CRC verification failed", slave_id=frame[0]) | ||
|
|
||
| except ModbusError as e: | ||
| last_error = e | ||
| logger.warning( | ||
| f"Modbus request failed (attempt {attempt + 1}/" | ||
| f"{self._retries + 1}): {e}" | ||
| ) | ||
| if attempt < self._retries: | ||
| time.sleep(FRAME_INTERVAL * 2) | ||
| continue |
There was a problem hiding this comment.
The retry loop in _send_receive() only catches ModbusError. Transport-layer failures from pyserial (e.g., serial.SerialException / OSError during reset_input_buffer(), write(), or read()) will bypass the retry logic entirely, even though the PR description says transport errors are retried. Consider catching those exceptions, wrapping them in ModbusError, and applying the same retry/backoff behavior.
| def connect(self, port: Optional[str] = None, baudrate: Optional[int] = None): | ||
| if port is not None: | ||
| self._port = port | ||
| if baudrate is not None: | ||
| self._baudrate = baudrate | ||
| self._serial = serial.Serial( | ||
| self._port, baudrate=self._baudrate, timeout=self._timeout | ||
| ) |
There was a problem hiding this comment.
connect() will pass self._port directly into serial.Serial(...) even when it is still None, which results in a pyserial error message that’s harder to interpret at the call site. Consider validating that a port was provided (either via __init__ or the connect() argument) and raising a clear ModbusError/ValueError if it’s missing.
- Detect Modbus exception responses (5 bytes) before incomplete check - Catch pyserial SerialException/OSError in retry loop - Validate port is set before connecting - Remove unused Optional import from servo_motor.py Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Log errors in abort() instead of silently swallowing exceptions - Close existing connection before re-opening in ModbusRTUClient.connect() - Add abort/fault checks and TimeoutError to stop() polling loop - Log warning when jog() clamps velocity to max - Wrap pyserial exceptions in ModbusError in connect() - Use try/finally in disconnect() to ensure _serial = None - Reject zero velocity in jog() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
OEM-AMCB209 peristaltic pump driver reusing ModbusRTUClient. Supports multiple pumps on a shared RS485 bus with direction-based sign convention for aspirate/dispense. Includes start/stop, run_for_duration, abort, and simulation class. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add ServoMotorConfig and PeristalticPumpsConfig pydantic models to config.py. Both are optional sections keyed by serial number. Rename default axis from Z4 to Z. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
modbus_rtu.py— consolidated Modbus RTU client with CRC-16, frame building, retry logic for transport errors (exception responses are not retried)servo_motor.py— CiA402 servo motor driver with state machine, profile position/homing/jog control, abort support (quick_stop on all axes), andServoMotorSimulationclassTest plan
python -m pytest tests/unit/control/test_modbus_rtu.py -v— 22 tests passpython -m pytest tests/unit/control/test_servo_motor.py -v— 36 tests pass🤖 Generated with Claude Code