diff --git a/can/interfaces/pcan/basic.py b/can/interfaces/pcan/basic.py index 5f161eecc..b2624802a 100644 --- a/can/interfaces/pcan/basic.py +++ b/can/interfaces/pcan/basic.py @@ -655,6 +655,16 @@ class TPCANChannelInformation(Structure): "PCAN_LANBUS16": PCAN_LANBUS16, } +VALID_PCAN_CAN_CLOCKS = [8_000_000] + +VALID_PCAN_FD_CLOCKS = [ + 20_000_000, + 24_000_000, + 30_000_000, + 40_000_000, + 60_000_000, + 80_000_000, +] # /////////////////////////////////////////////////////////// # PCAN-Basic API function declarations diff --git a/can/interfaces/pcan/pcan.py b/can/interfaces/pcan/pcan.py index 01adbe0c2..2ed0ff445 100644 --- a/can/interfaces/pcan/pcan.py +++ b/can/interfaces/pcan/pcan.py @@ -5,20 +5,21 @@ import time from datetime import datetime import platform -from typing import Optional, List, Tuple +from typing import Optional, List, Tuple, Union, Any from packaging import version from can import ( BusABC, BusState, + BitTiming, + BitTimingFd, + Message, CanError, CanOperationError, CanInitializationError, - Message, ) -from can.util import len2dlc, dlc2len - +from can.util import check_or_adjust_timing_clock, dlc2len, len2dlc from .basic import ( PCAN_BITRATES, PCAN_FD_PARAMETER_LIST, @@ -61,8 +62,11 @@ FEATURE_FD_CAPABLE, PCAN_DICT_STATUS, PCAN_BUSOFF_AUTORESET, + TPCANBaudrate, PCAN_ATTACHED_CHANNELS, TPCANChannelInformation, + VALID_PCAN_FD_CLOCKS, + VALID_PCAN_CAN_CLOCKS, ) @@ -112,12 +116,12 @@ class PcanBus(BusABC): def __init__( self, - channel="PCAN_USBBUS1", - device_id=None, - state=BusState.ACTIVE, - bitrate=500000, - *args, - **kwargs, + channel: str = "PCAN_USBBUS1", + device_id: Optional[int] = None, + state: BusState = BusState.ACTIVE, + timing: Optional[Union[BitTiming, BitTimingFd]] = None, + bitrate: int = 500000, + **kwargs: Any, ): """A PCAN USB interface to CAN. @@ -142,6 +146,18 @@ def __init__( BusState of the channel. Default is ACTIVE + :param timing: + An instance of :class:`~can.BitTiming` or :class:`~can.BitTimingFd` + to specify the bit timing parameters for the PCAN interface. If this parameter + is provided, it takes precedence over all other timing-related parameters. + If this parameter is not provided, the bit timing parameters can be specified + using the `bitrate` parameter for standard CAN or the `fd`, `f_clock`, + `f_clock_mhz`, `nom_brp`, `nom_tseg1`, `nom_tseg2`, `nom_sjw`, `data_brp`, + `data_tseg1`, `data_tseg2`, and `data_sjw` parameters for CAN FD. + Note that the `f_clock` value of the `timing` instance must be 8_000_000 + for standard CAN or any of the following values for CAN FD: 20_000_000, + 24_000_000, 30_000_000, 40_000_000, 60_000_000, 80_000_000. + :param int bitrate: Bitrate of channel in bit/s. Default is 500 kbit/s. @@ -231,8 +247,7 @@ def __init__( raise ValueError(err_msg) self.channel_info = str(channel) - self.fd = kwargs.get("fd", False) - pcan_bitrate = PCAN_BITRATES.get(bitrate, PCAN_BAUD_500K) + self.fd = isinstance(timing, BitTimingFd) if timing else kwargs.get("fd", False) hwtype = PCAN_TYPE_ISA ioport = 0x02A0 @@ -245,30 +260,41 @@ def __init__( self.check_api_version() - if state is BusState.ACTIVE or state is BusState.PASSIVE: + if state in [BusState.ACTIVE, BusState.PASSIVE]: self.state = state else: raise ValueError("BusState must be Active or Passive") - if self.fd: - f_clock_val = kwargs.get("f_clock", None) - if f_clock_val is None: - f_clock = "{}={}".format("f_clock_mhz", kwargs.get("f_clock_mhz", None)) - else: - f_clock = "{}={}".format("f_clock", kwargs.get("f_clock", None)) - - fd_parameters_values = [f_clock] + [ - f"{key}={kwargs.get(key, None)}" - for key in PCAN_FD_PARAMETER_LIST - if kwargs.get(key, None) is not None + if isinstance(timing, BitTiming): + timing = check_or_adjust_timing_clock(timing, VALID_PCAN_CAN_CLOCKS) + pcan_bitrate = TPCANBaudrate(timing.btr0 << 8 | timing.btr1) + result = self.m_objPCANBasic.Initialize( + self.m_PcanHandle, pcan_bitrate, hwtype, ioport, interrupt + ) + elif self.fd: + if isinstance(timing, BitTimingFd): + timing = check_or_adjust_timing_clock( + timing, sorted(VALID_PCAN_FD_CLOCKS, reverse=True) + ) + # We dump the timing parameters into the kwargs because they have equal names + # as the kwargs parameters and this saves us one additional code path + kwargs.update(timing) + + clock_param = "f_clock" if "f_clock" in kwargs else "f_clock_mhz" + fd_parameters_values = [ + f"{key}={kwargs[key]}" + for key in (clock_param,) + PCAN_FD_PARAMETER_LIST + if key in kwargs ] - self.fd_bitrate = " ,".join(fd_parameters_values).encode("ascii") + self.fd_bitrate = ", ".join(fd_parameters_values).encode("ascii") result = self.m_objPCANBasic.InitializeFD( self.m_PcanHandle, self.fd_bitrate ) + else: + pcan_bitrate = PCAN_BITRATES.get(bitrate, PCAN_BAUD_500K) result = self.m_objPCANBasic.Initialize( self.m_PcanHandle, pcan_bitrate, hwtype, ioport, interrupt ) @@ -312,7 +338,7 @@ def __init__( if result != PCAN_ERROR_OK: raise PcanCanInitializationError(self._get_formatted_error(result)) - super().__init__(channel=channel, state=state, bitrate=bitrate, *args, **kwargs) + super().__init__(channel=channel, state=state, bitrate=bitrate, **kwargs) def _find_channel_by_dev_id(self, device_id): """ diff --git a/test/test_pcan.py b/test/test_pcan.py index 01dac848c..aa0988a48 100644 --- a/test/test_pcan.py +++ b/test/test_pcan.py @@ -3,20 +3,18 @@ """ import ctypes -import platform import unittest from unittest import mock from unittest.mock import Mock, patch - import pytest from parameterized import parameterized import can from can.bus import BusState from can.exceptions import CanInitializationError -from can.interfaces.pcan.basic import * from can.interfaces.pcan import PcanBus, PcanError +from can.interfaces.pcan.basic import * class TestPCANBus(unittest.TestCase): @@ -53,8 +51,10 @@ def _mockGetValue(self, channel, parameter): def test_bus_creation(self) -> None: self.bus = can.Bus(interface="pcan") + self.assertIsInstance(self.bus, PcanBus) self.MockPCANBasic.assert_called_once() + self.mock_pcan.Initialize.assert_called_once() self.mock_pcan.InitializeFD.assert_not_called() @@ -62,13 +62,41 @@ def test_bus_creation_state_error(self) -> None: with self.assertRaises(ValueError): can.Bus(interface="pcan", state=BusState.ERROR) - def test_bus_creation_fd(self) -> None: - self.bus = can.Bus(interface="pcan", fd=True) + @parameterized.expand([("f_clock", 8_000_000), ("f_clock_mhz", 8)]) + def test_bus_creation_fd(self, clock_param: str, clock_val: int) -> None: + self.bus = can.Bus( + interface="pcan", + fd=True, + nom_brp=1, + nom_tseg1=129, + nom_tseg2=30, + nom_sjw=1, + data_brp=1, + data_tseg1=9, + data_tseg2=6, + data_sjw=1, + channel="PCAN_USBBUS1", + **{clock_param: clock_val}, + ) + self.assertIsInstance(self.bus, PcanBus) self.MockPCANBasic.assert_called_once() self.mock_pcan.Initialize.assert_not_called() self.mock_pcan.InitializeFD.assert_called_once() + # Retrieve second argument of first call + bitrate_arg = self.mock_pcan.InitializeFD.call_args[0][-1] + + self.assertTrue(f"{clock_param}={clock_val}".encode("ascii") in bitrate_arg) + self.assertTrue(b"nom_brp=1" in bitrate_arg) + self.assertTrue(b"nom_tseg1=129" in bitrate_arg) + self.assertTrue(b"nom_tseg2=30" in bitrate_arg) + self.assertTrue(b"nom_sjw=1" in bitrate_arg) + self.assertTrue(b"data_brp=1" in bitrate_arg) + self.assertTrue(b"data_tseg1=9" in bitrate_arg) + self.assertTrue(b"data_tseg2=6" in bitrate_arg) + self.assertTrue(b"data_sjw=1" in bitrate_arg) + def test_api_version_low(self) -> None: self.PCAN_API_VERSION_SIM = "1.0" with self.assertLogs("can.pcan", level="WARNING") as cm: @@ -333,6 +361,11 @@ def test_state(self, name, bus_state: BusState, expected_parameter) -> None: (PCAN_USBBUS1, PCAN_LISTEN_ONLY, expected_parameter), ) + def test_state_constructor(self): + for state in [BusState.ACTIVE, BusState.PASSIVE]: + bus = can.Bus(interface="pcan", state=state) + assert bus.state == state + def test_detect_available_configs(self) -> None: if platform.system() == "Darwin": self.mock_pcan.GetValue = Mock( @@ -381,7 +414,8 @@ def get_value_side_effect(handle, param): self.mock_pcan.GetValue = Mock(side_effect=get_value_side_effect) if expected_result == "error": - self.assertRaises(ValueError, can.Bus, interface="pcan", device_id=dev_id) + with self.assertRaises(ValueError): + can.Bus(interface="pcan", device_id=dev_id) else: self.bus = can.Bus(interface="pcan", device_id=dev_id) self.assertEqual(expected_result, self.bus.channel_info) @@ -416,6 +450,39 @@ def test_peak_fd_bus_constructor_regression(self): can.Bus(**params) + def test_constructor_bit_timing(self): + timing = can.BitTiming.from_registers(f_clock=8_000_000, btr0=0x47, btr1=0x2F) + can.Bus(interface="pcan", channel="PCAN_USBBUS1", timing=timing) + + bitrate_arg = self.mock_pcan.Initialize.call_args[0][1] + self.assertEqual(bitrate_arg.value, 0x472F) + + def test_constructor_bit_timing_fd(self): + timing = can.BitTimingFd( + f_clock=40_000_000, + nom_brp=1, + nom_tseg1=129, + nom_tseg2=30, + nom_sjw=1, + data_brp=1, + data_tseg1=9, + data_tseg2=6, + data_sjw=1, + ) + can.Bus(interface="pcan", channel="PCAN_USBBUS1", timing=timing) + + bitrate_arg = self.mock_pcan.InitializeFD.call_args[0][-1] + + self.assertTrue(b"f_clock=40000000" in bitrate_arg) + self.assertTrue(b"nom_brp=1" in bitrate_arg) + self.assertTrue(b"nom_tseg1=129" in bitrate_arg) + self.assertTrue(b"nom_tseg2=30" in bitrate_arg) + self.assertTrue(b"nom_sjw=1" in bitrate_arg) + self.assertTrue(b"data_brp=1" in bitrate_arg) + self.assertTrue(b"data_tseg1=9" in bitrate_arg) + self.assertTrue(b"data_tseg2=6" in bitrate_arg) + self.assertTrue(b"data_sjw=1" in bitrate_arg) + if __name__ == "__main__": unittest.main()