From ab1fa8bf1f4d8b617a890145362bcdd105da9fbe Mon Sep 17 00:00:00 2001 From: zariiii9003 <52598363+zariiii9003@users.noreply.github.com> Date: Wed, 25 Jan 2023 23:27:09 +0100 Subject: [PATCH 1/5] add BitTiming parameter to PcanBus --- can/interfaces/pcan/pcan.py | 76 ++++++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 14 deletions(-) diff --git a/can/interfaces/pcan/pcan.py b/can/interfaces/pcan/pcan.py index 01adbe0c2..e08cc0919 100644 --- a/can/interfaces/pcan/pcan.py +++ b/can/interfaces/pcan/pcan.py @@ -5,20 +5,21 @@ import time from datetime import datetime import platform -from typing import Optional, List, Tuple +from typing import Optional, List, Tuple, Union, Any from packaging import version from can import ( BusABC, BusState, + BitTiming, + BitTimingFd, + Message, CanError, CanOperationError, CanInitializationError, - Message, ) -from can.util import len2dlc, dlc2len - +from can.util import check_or_adjust_timing_clock, dlc2len, len2dlc from .basic import ( PCAN_BITRATES, PCAN_FD_PARAMETER_LIST, @@ -61,6 +62,7 @@ FEATURE_FD_CAPABLE, PCAN_DICT_STATUS, PCAN_BUSOFF_AUTORESET, + TPCANBaudrate, PCAN_ATTACHED_CHANNELS, TPCANChannelInformation, ) @@ -112,12 +114,12 @@ class PcanBus(BusABC): def __init__( self, - channel="PCAN_USBBUS1", - device_id=None, - state=BusState.ACTIVE, - bitrate=500000, - *args, - **kwargs, + channel: str = "PCAN_USBBUS1", + device_id: Optional[int] = None, + state: BusState = BusState.ACTIVE, + timing: Optional[Union[BitTiming, BitTimingFd]] = None, + bitrate: int = 500000, + **kwargs: Any, ): """A PCAN USB interface to CAN. @@ -142,6 +144,18 @@ def __init__( BusState of the channel. Default is ACTIVE + :param timing: + An instance of :class:`~can.BitTiming` or :class:`~can.BitTimingFd` + to specify the bit timing parameters for the PCAN interface. If this parameter + is provided, it takes precedence over all other timing-related parameters. + If this parameter is not provided, the bit timing parameters can be specified + using the `bitrate` parameter for standard CAN or the `fd`, `f_clock`, + `f_clock_mhz`, `nom_brp`, `nom_tseg1`, `nom_tseg2`, `nom_sjw`, `data_brp`, + `data_tseg1`, `data_tseg2`, and `data_sjw` parameters for CAN FD. + Note that the `f_clock` value of the `timing` instance must be 8_000_000 + for standard CAN or any of the following values for CAN FD: 20_000_000, + 24_000_000, 30_000_000, 40_000_000, 60_000_000, 80_000_000. + :param int bitrate: Bitrate of channel in bit/s. Default is 500 kbit/s. @@ -231,8 +245,7 @@ def __init__( raise ValueError(err_msg) self.channel_info = str(channel) - self.fd = kwargs.get("fd", False) - pcan_bitrate = PCAN_BITRATES.get(bitrate, PCAN_BAUD_500K) + self.fd = isinstance(timing, BitTimingFd) if timing else kwargs.get("fd", False) hwtype = PCAN_TYPE_ISA ioport = 0x02A0 @@ -250,7 +263,41 @@ def __init__( else: raise ValueError("BusState must be Active or Passive") - if self.fd: + if isinstance(timing, BitTimingFd): + valid_fd_f_clocks = [ + 20_000_000, + 24_000_000, + 30_000_000, + 40_000_000, + 60_000_000, + 80_000_000, + ] + timing = check_or_adjust_timing_clock( + timing, sorted(valid_fd_f_clocks, reverse=True) + ) + self.fd_bitrate = ( + f"f_clock={timing.f_clock}," + f"nom_brp={timing.nom_brp}," + f"nom_tseg1={timing.nom_tseg1}," + f"nom_tseg2={timing.nom_tseg2}," + f"nom_sjw={timing.nom_sjw}," + f"data_brp={timing.data_brp}," + f"data_tseg1={timing.data_tseg1}," + f"data_tseg2={timing.data_tseg2}," + f"data_sjw={timing.data_sjw}" + ).encode("ascii") + result = self.m_objPCANBasic.InitializeFD( + self.m_PcanHandle, self.fd_bitrate + ) + + elif isinstance(timing, BitTiming): + timing = check_or_adjust_timing_clock(timing, [8_000_000]) + pcan_bitrate = TPCANBaudrate(timing.btr0 << 8 | timing.btr1) + result = self.m_objPCANBasic.Initialize( + self.m_PcanHandle, pcan_bitrate, hwtype, ioport, interrupt + ) + + elif self.fd: f_clock_val = kwargs.get("f_clock", None) if f_clock_val is None: f_clock = "{}={}".format("f_clock_mhz", kwargs.get("f_clock_mhz", None)) @@ -269,6 +316,7 @@ def __init__( self.m_PcanHandle, self.fd_bitrate ) else: + pcan_bitrate = PCAN_BITRATES.get(bitrate, PCAN_BAUD_500K) result = self.m_objPCANBasic.Initialize( self.m_PcanHandle, pcan_bitrate, hwtype, ioport, interrupt ) @@ -312,7 +360,7 @@ def __init__( if result != PCAN_ERROR_OK: raise PcanCanInitializationError(self._get_formatted_error(result)) - super().__init__(channel=channel, state=state, bitrate=bitrate, *args, **kwargs) + super().__init__(channel=channel, state=state, bitrate=bitrate, **kwargs) def _find_channel_by_dev_id(self, device_id): """ From 6056b76ce146a526d0030932cf5aab7f3b0f824a Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sat, 28 Jan 2023 12:09:17 +0100 Subject: [PATCH 2/5] Move valid CAN/FD clocks to pcan/basic --- can/interfaces/pcan/basic.py | 10 ++++++++++ can/interfaces/pcan/pcan.py | 14 ++++---------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/can/interfaces/pcan/basic.py b/can/interfaces/pcan/basic.py index 5f161eecc..b2624802a 100644 --- a/can/interfaces/pcan/basic.py +++ b/can/interfaces/pcan/basic.py @@ -655,6 +655,16 @@ class TPCANChannelInformation(Structure): "PCAN_LANBUS16": PCAN_LANBUS16, } +VALID_PCAN_CAN_CLOCKS = [8_000_000] + +VALID_PCAN_FD_CLOCKS = [ + 20_000_000, + 24_000_000, + 30_000_000, + 40_000_000, + 60_000_000, + 80_000_000, +] # /////////////////////////////////////////////////////////// # PCAN-Basic API function declarations diff --git a/can/interfaces/pcan/pcan.py b/can/interfaces/pcan/pcan.py index e08cc0919..fd274742b 100644 --- a/can/interfaces/pcan/pcan.py +++ b/can/interfaces/pcan/pcan.py @@ -65,6 +65,8 @@ TPCANBaudrate, PCAN_ATTACHED_CHANNELS, TPCANChannelInformation, + VALID_PCAN_FD_CLOCKS, + VALID_PCAN_CAN_CLOCKS, ) @@ -264,16 +266,8 @@ def __init__( raise ValueError("BusState must be Active or Passive") if isinstance(timing, BitTimingFd): - valid_fd_f_clocks = [ - 20_000_000, - 24_000_000, - 30_000_000, - 40_000_000, - 60_000_000, - 80_000_000, - ] timing = check_or_adjust_timing_clock( - timing, sorted(valid_fd_f_clocks, reverse=True) + timing, sorted(VALID_PCAN_FD_CLOCKS, reverse=True) ) self.fd_bitrate = ( f"f_clock={timing.f_clock}," @@ -291,7 +285,7 @@ def __init__( ) elif isinstance(timing, BitTiming): - timing = check_or_adjust_timing_clock(timing, [8_000_000]) + timing = check_or_adjust_timing_clock(timing, VALID_PCAN_CAN_CLOCKS) pcan_bitrate = TPCANBaudrate(timing.btr0 << 8 | timing.btr1) result = self.m_objPCANBasic.Initialize( self.m_PcanHandle, pcan_bitrate, hwtype, ioport, interrupt From ffa2a89fdc10a4d9e5cfbb20afe31113ca663bbf Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sat, 28 Jan 2023 15:09:43 +0100 Subject: [PATCH 3/5] Add additional PCAN constructor tests --- can/interfaces/pcan/pcan.py | 13 ++++------- test/test_pcan.py | 46 ++++++++++++++++++++++++++++++++----- 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/can/interfaces/pcan/pcan.py b/can/interfaces/pcan/pcan.py index fd274742b..b585b10b0 100644 --- a/can/interfaces/pcan/pcan.py +++ b/can/interfaces/pcan/pcan.py @@ -260,7 +260,7 @@ def __init__( self.check_api_version() - if state is BusState.ACTIVE or state is BusState.PASSIVE: + if state in [BusState.ACTIVE, BusState.PASSIVE]: self.state = state else: raise ValueError("BusState must be Active or Passive") @@ -292,15 +292,10 @@ def __init__( ) elif self.fd: - f_clock_val = kwargs.get("f_clock", None) - if f_clock_val is None: - f_clock = "{}={}".format("f_clock_mhz", kwargs.get("f_clock_mhz", None)) - else: - f_clock = "{}={}".format("f_clock", kwargs.get("f_clock", None)) - - fd_parameters_values = [f_clock] + [ + clock_param = "f_clock_mhz" if "f_clock_mhz" in kwargs else "f_clock" + fd_parameters_values = [ f"{key}={kwargs.get(key, None)}" - for key in PCAN_FD_PARAMETER_LIST + for key in (clock_param,) + PCAN_FD_PARAMETER_LIST if kwargs.get(key, None) is not None ] diff --git a/test/test_pcan.py b/test/test_pcan.py index 01dac848c..e433618df 100644 --- a/test/test_pcan.py +++ b/test/test_pcan.py @@ -3,20 +3,18 @@ """ import ctypes -import platform import unittest from unittest import mock from unittest.mock import Mock, patch - import pytest from parameterized import parameterized import can from can.bus import BusState from can.exceptions import CanInitializationError -from can.interfaces.pcan.basic import * from can.interfaces.pcan import PcanBus, PcanError +from can.interfaces.pcan.basic import * class TestPCANBus(unittest.TestCase): @@ -53,8 +51,10 @@ def _mockGetValue(self, channel, parameter): def test_bus_creation(self) -> None: self.bus = can.Bus(interface="pcan") + self.assertIsInstance(self.bus, PcanBus) self.MockPCANBasic.assert_called_once() + self.mock_pcan.Initialize.assert_called_once() self.mock_pcan.InitializeFD.assert_not_called() @@ -62,13 +62,41 @@ def test_bus_creation_state_error(self) -> None: with self.assertRaises(ValueError): can.Bus(interface="pcan", state=BusState.ERROR) - def test_bus_creation_fd(self) -> None: - self.bus = can.Bus(interface="pcan", fd=True) + @parameterized.expand([("f_clock", 8_000_000), ("f_clock_mhz", 8)]) + def test_bus_creation_fd(self, clock_param: str, clock_val: int) -> None: + self.bus = can.Bus( + interface="pcan", + fd=True, + nom_brp=1, + nom_tseg1=129, + nom_tseg2=30, + nom_sjw=1, + data_brp=1, + data_tseg1=9, + data_tseg2=6, + data_sjw=1, + channel="PCAN_USBBUS1", + **{clock_param: clock_val}, + ) + self.assertIsInstance(self.bus, PcanBus) self.MockPCANBasic.assert_called_once() self.mock_pcan.Initialize.assert_not_called() self.mock_pcan.InitializeFD.assert_called_once() + # Retrieve second argument of first call + bitrate_arg = self.mock_pcan.InitializeFD.call_args[0][-1] + + self.assertTrue(f"{clock_param}={clock_val}".encode("ascii") in bitrate_arg) + self.assertTrue(b"nom_brp=1" in bitrate_arg) + self.assertTrue(b"nom_tseg1=129" in bitrate_arg) + self.assertTrue(b"nom_tseg2=30" in bitrate_arg) + self.assertTrue(b"nom_sjw=1" in bitrate_arg) + self.assertTrue(b"data_brp=1" in bitrate_arg) + self.assertTrue(b"data_tseg1=9" in bitrate_arg) + self.assertTrue(b"data_tseg2=6" in bitrate_arg) + self.assertTrue(b"data_sjw=1" in bitrate_arg) + def test_api_version_low(self) -> None: self.PCAN_API_VERSION_SIM = "1.0" with self.assertLogs("can.pcan", level="WARNING") as cm: @@ -333,6 +361,11 @@ def test_state(self, name, bus_state: BusState, expected_parameter) -> None: (PCAN_USBBUS1, PCAN_LISTEN_ONLY, expected_parameter), ) + def test_state_constructor(self): + for state in [BusState.ACTIVE, BusState.PASSIVE]: + bus = can.Bus(interface="pcan", state=state) + assert bus.state == state + def test_detect_available_configs(self) -> None: if platform.system() == "Darwin": self.mock_pcan.GetValue = Mock( @@ -381,7 +414,8 @@ def get_value_side_effect(handle, param): self.mock_pcan.GetValue = Mock(side_effect=get_value_side_effect) if expected_result == "error": - self.assertRaises(ValueError, can.Bus, interface="pcan", device_id=dev_id) + with self.assertRaises(ValueError): + can.Bus(interface="pcan", device_id=dev_id) else: self.bus = can.Bus(interface="pcan", device_id=dev_id) self.assertEqual(expected_result, self.bus.channel_info) From d218c6f5868e7281e4d3544c7993c05f22eab5d7 Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sat, 28 Jan 2023 15:31:58 +0100 Subject: [PATCH 4/5] Add tests for BitTiming with PCAN constructor --- test/test_pcan.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/test/test_pcan.py b/test/test_pcan.py index e433618df..aa0988a48 100644 --- a/test/test_pcan.py +++ b/test/test_pcan.py @@ -450,6 +450,39 @@ def test_peak_fd_bus_constructor_regression(self): can.Bus(**params) + def test_constructor_bit_timing(self): + timing = can.BitTiming.from_registers(f_clock=8_000_000, btr0=0x47, btr1=0x2F) + can.Bus(interface="pcan", channel="PCAN_USBBUS1", timing=timing) + + bitrate_arg = self.mock_pcan.Initialize.call_args[0][1] + self.assertEqual(bitrate_arg.value, 0x472F) + + def test_constructor_bit_timing_fd(self): + timing = can.BitTimingFd( + f_clock=40_000_000, + nom_brp=1, + nom_tseg1=129, + nom_tseg2=30, + nom_sjw=1, + data_brp=1, + data_tseg1=9, + data_tseg2=6, + data_sjw=1, + ) + can.Bus(interface="pcan", channel="PCAN_USBBUS1", timing=timing) + + bitrate_arg = self.mock_pcan.InitializeFD.call_args[0][-1] + + self.assertTrue(b"f_clock=40000000" in bitrate_arg) + self.assertTrue(b"nom_brp=1" in bitrate_arg) + self.assertTrue(b"nom_tseg1=129" in bitrate_arg) + self.assertTrue(b"nom_tseg2=30" in bitrate_arg) + self.assertTrue(b"nom_sjw=1" in bitrate_arg) + self.assertTrue(b"data_brp=1" in bitrate_arg) + self.assertTrue(b"data_tseg1=9" in bitrate_arg) + self.assertTrue(b"data_tseg2=6" in bitrate_arg) + self.assertTrue(b"data_sjw=1" in bitrate_arg) + if __name__ == "__main__": unittest.main() From 09245e0dd857312b80a18f6487c88b5244d13bce Mon Sep 17 00:00:00 2001 From: Lukas Magel Date: Sat, 28 Jan 2023 18:10:29 +0100 Subject: [PATCH 5/5] Unify PCAN constructor code paths for FD with/without timing --- can/interfaces/pcan/pcan.py | 39 +++++++++++++------------------------ 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/can/interfaces/pcan/pcan.py b/can/interfaces/pcan/pcan.py index b585b10b0..2ed0ff445 100644 --- a/can/interfaces/pcan/pcan.py +++ b/can/interfaces/pcan/pcan.py @@ -265,45 +265,34 @@ def __init__( else: raise ValueError("BusState must be Active or Passive") - if isinstance(timing, BitTimingFd): - timing = check_or_adjust_timing_clock( - timing, sorted(VALID_PCAN_FD_CLOCKS, reverse=True) - ) - self.fd_bitrate = ( - f"f_clock={timing.f_clock}," - f"nom_brp={timing.nom_brp}," - f"nom_tseg1={timing.nom_tseg1}," - f"nom_tseg2={timing.nom_tseg2}," - f"nom_sjw={timing.nom_sjw}," - f"data_brp={timing.data_brp}," - f"data_tseg1={timing.data_tseg1}," - f"data_tseg2={timing.data_tseg2}," - f"data_sjw={timing.data_sjw}" - ).encode("ascii") - result = self.m_objPCANBasic.InitializeFD( - self.m_PcanHandle, self.fd_bitrate - ) - - elif isinstance(timing, BitTiming): + if isinstance(timing, BitTiming): timing = check_or_adjust_timing_clock(timing, VALID_PCAN_CAN_CLOCKS) pcan_bitrate = TPCANBaudrate(timing.btr0 << 8 | timing.btr1) result = self.m_objPCANBasic.Initialize( self.m_PcanHandle, pcan_bitrate, hwtype, ioport, interrupt ) - elif self.fd: - clock_param = "f_clock_mhz" if "f_clock_mhz" in kwargs else "f_clock" + if isinstance(timing, BitTimingFd): + timing = check_or_adjust_timing_clock( + timing, sorted(VALID_PCAN_FD_CLOCKS, reverse=True) + ) + # We dump the timing parameters into the kwargs because they have equal names + # as the kwargs parameters and this saves us one additional code path + kwargs.update(timing) + + clock_param = "f_clock" if "f_clock" in kwargs else "f_clock_mhz" fd_parameters_values = [ - f"{key}={kwargs.get(key, None)}" + f"{key}={kwargs[key]}" for key in (clock_param,) + PCAN_FD_PARAMETER_LIST - if kwargs.get(key, None) is not None + if key in kwargs ] - self.fd_bitrate = " ,".join(fd_parameters_values).encode("ascii") + self.fd_bitrate = ", ".join(fd_parameters_values).encode("ascii") result = self.m_objPCANBasic.InitializeFD( self.m_PcanHandle, self.fd_bitrate ) + else: pcan_bitrate = PCAN_BITRATES.get(bitrate, PCAN_BAUD_500K) result = self.m_objPCANBasic.Initialize(