Skip to content

feat: add servo motor driver (Modbus RTU + CiA402)#4

Open
Alpaca233 wants to merge 10 commits intomainfrom
add-hardware-drivers
Open

feat: add servo motor driver (Modbus RTU + CiA402)#4
Alpaca233 wants to merge 10 commits intomainfrom
add-hardware-drivers

Conversation

@Alpaca233
Copy link
Copy Markdown
Collaborator

Summary

  • Add modbus_rtu.py — consolidated Modbus RTU client with CRC-16, frame building, retry logic for transport errors (exception responses are not retried)
  • Add servo_motor.py — CiA402 servo motor driver with state machine, profile position/homing/jog control, abort support (quick_stop on all axes), and ServoMotorSimulation class
  • 58 unit tests covering CRC, frame building, state machine decoding/transitions, and simulation class

Test plan

  • python -m pytest tests/unit/control/test_modbus_rtu.py -v — 22 tests pass
  • python -m pytest tests/unit/control/test_servo_motor.py -v — 36 tests pass
  • Hardware integration test with NiMotion Z4 axis (requires connected hardware)

🤖 Generated with Claude Code

Alpaca233 and others added 4 commits March 9, 2026 14:25
…shaker)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Don't retry Modbus exception responses (application-level errors)
- Fix jog() far-position sign inversion
- Replace magic hex constants with ControlWordBits expressions
- Add abort checks to all polling loops (_wait_for_state/motion/homing)
- Add bounds checking to move_relative() in real ServoMotor class
- abort() now calls quick_stop() on all axes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new NiMotion Z4-oriented servo motor driver to the fluidics control layer, including a consolidated Modbus RTU client and a CiA402-based motion/state-machine implementation, plus unit tests and an implementation plan document.

Changes:

  • Added fluidics/control/modbus_rtu.py with CRC16, frame builders, and a thread-safe Modbus RTU client with retry logic.
  • Added fluidics/control/servo_motor.py implementing CiA402 state decoding/transitions, axis configuration, motion primitives, abort support, and a simulation class.
  • Added unit tests for Modbus RTU primitives/client defaults and servo motor state machine + simulation behaviors; added a hardware drivers plan doc.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
software/fluidics/control/modbus_rtu.py New Modbus RTU utilities + client used by the servo driver
software/fluidics/control/servo_motor.py New CiA402 servo driver + simulation and axis configuration
software/tests/unit/control/test_modbus_rtu.py Unit tests for CRC, frame building, and basic client behavior
software/tests/unit/control/test_servo_motor.py Unit tests for axis config conversions, state machine, and simulation
software/docs/plans/2026-03-09-hardware-drivers.md Implementation plan/reference for the driver work

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +38 to +42
def test_known_crc_value(self):
# Modbus CRC of [0x01, 0x03, 0x00, 0x00, 0x00, 0x01] = 0x840A
data = bytes([0x01, 0x03, 0x00, 0x00, 0x00, 0x01])
assert calculate_crc(data) == 0x840A

Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CRC value in this plan snippet (0x840A) conflicts with the implemented/tested representation in tests/unit/control/test_modbus_rtu.py (which expects calculate_crc(...) == 0x0A84 and appends low-byte-first for wire order). Consider updating the plan text to match the code’s CRC integer convention, or explicitly clarify which endianness each value refers to, to avoid future confusion.

Copilot uses AI. Check for mistakes.
Comment on lines +4 to +5
from enum import Enum, IntEnum
from typing import Optional
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional is imported but not used in this module. Please remove the unused import to keep linting/static analysis clean.

Copilot uses AI. Check for mistakes.
Comment thread software/fluidics/control/modbus_rtu.py Outdated
Comment on lines +188 to +195
response = self._serial.read(expected_response_len)
if len(response) < expected_response_len:
raise ModbusError(
f"Incomplete response: expected {expected_response_len} "
f"bytes, got {len(response)}",
slave_id=frame[0],
)

Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_send_receive() treats any response shorter than expected_response_len as a retryable "Incomplete response". Modbus exception responses are typically 5 bytes long (slave, fc|0x80, code, crc16), so they will be misclassified and retried, and the non-retry exception handling block will never run. Consider special-casing len(response) == 5 and (response[1] & 0x80) (with CRC verification) before raising the incomplete-response error, or reading/parsing enough bytes to detect exception frames first.

Copilot uses AI. Check for mistakes.
Comment on lines +183 to +207
try:
self._serial.reset_input_buffer()
self._serial.write(frame)
time.sleep(FRAME_INTERVAL)

response = self._serial.read(expected_response_len)
if len(response) < expected_response_len:
raise ModbusError(
f"Incomplete response: expected {expected_response_len} "
f"bytes, got {len(response)}",
slave_id=frame[0],
)

if not _verify_crc(response):
raise ModbusError("CRC verification failed", slave_id=frame[0])

except ModbusError as e:
last_error = e
logger.warning(
f"Modbus request failed (attempt {attempt + 1}/"
f"{self._retries + 1}): {e}"
)
if attempt < self._retries:
time.sleep(FRAME_INTERVAL * 2)
continue
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry loop in _send_receive() only catches ModbusError. Transport-layer failures from pyserial (e.g., serial.SerialException / OSError during reset_input_buffer(), write(), or read()) will bypass the retry logic entirely, even though the PR description says transport errors are retried. Consider catching those exceptions, wrapping them in ModbusError, and applying the same retry/backoff behavior.

Copilot uses AI. Check for mistakes.
Comment thread software/fluidics/control/modbus_rtu.py Outdated
Comment on lines +120 to +127
def connect(self, port: Optional[str] = None, baudrate: Optional[int] = None):
if port is not None:
self._port = port
if baudrate is not None:
self._baudrate = baudrate
self._serial = serial.Serial(
self._port, baudrate=self._baudrate, timeout=self._timeout
)
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connect() will pass self._port directly into serial.Serial(...) even when it is still None, which results in a pyserial error message that’s harder to interpret at the call site. Consider validating that a port was provided (either via __init__ or the connect() argument) and raising a clear ModbusError/ValueError if it’s missing.

Copilot uses AI. Check for mistakes.
Alpaca233 and others added 6 commits March 9, 2026 14:48
- Detect Modbus exception responses (5 bytes) before incomplete check
- Catch pyserial SerialException/OSError in retry loop
- Validate port is set before connecting
- Remove unused Optional import from servo_motor.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Log errors in abort() instead of silently swallowing exceptions
- Close existing connection before re-opening in ModbusRTUClient.connect()
- Add abort/fault checks and TimeoutError to stop() polling loop
- Log warning when jog() clamps velocity to max
- Wrap pyserial exceptions in ModbusError in connect()
- Use try/finally in disconnect() to ensure _serial = None
- Reject zero velocity in jog()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
OEM-AMCB209 peristaltic pump driver reusing ModbusRTUClient. Supports
multiple pumps on a shared RS485 bus with direction-based sign convention
for aspirate/dispense. Includes start/stop, run_for_duration, abort,
and simulation class.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add ServoMotorConfig and PeristalticPumpsConfig pydantic models to
config.py. Both are optional sections keyed by serial number. Rename
default axis from Z4 to Z.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants