Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions can/interfaces/pcan/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,16 @@ class TPCANChannelInformation(Structure):
"PCAN_LANBUS16": PCAN_LANBUS16,
}

VALID_PCAN_CAN_CLOCKS = [8_000_000]

VALID_PCAN_FD_CLOCKS = [
20_000_000,
24_000_000,
30_000_000,
40_000_000,
60_000_000,
80_000_000,
]

# ///////////////////////////////////////////////////////////
# PCAN-Basic API function declarations
Expand Down
78 changes: 52 additions & 26 deletions can/interfaces/pcan/pcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@
import time
from datetime import datetime
import platform
from typing import Optional, List, Tuple
from typing import Optional, List, Tuple, Union, Any

from packaging import version

from can import (
BusABC,
BusState,
BitTiming,
BitTimingFd,
Message,
CanError,
CanOperationError,
CanInitializationError,
Message,
)
from can.util import len2dlc, dlc2len

from can.util import check_or_adjust_timing_clock, dlc2len, len2dlc
from .basic import (
PCAN_BITRATES,
PCAN_FD_PARAMETER_LIST,
Expand Down Expand Up @@ -61,8 +62,11 @@
FEATURE_FD_CAPABLE,
PCAN_DICT_STATUS,
PCAN_BUSOFF_AUTORESET,
TPCANBaudrate,
PCAN_ATTACHED_CHANNELS,
TPCANChannelInformation,
VALID_PCAN_FD_CLOCKS,
VALID_PCAN_CAN_CLOCKS,
)


Expand Down Expand Up @@ -112,12 +116,12 @@
class PcanBus(BusABC):
def __init__(
self,
channel="PCAN_USBBUS1",
device_id=None,
state=BusState.ACTIVE,
bitrate=500000,
*args,
**kwargs,
channel: str = "PCAN_USBBUS1",
device_id: Optional[int] = None,
state: BusState = BusState.ACTIVE,
timing: Optional[Union[BitTiming, BitTimingFd]] = None,
bitrate: int = 500000,
**kwargs: Any,
):
"""A PCAN USB interface to CAN.

Expand All @@ -142,6 +146,18 @@ def __init__(
BusState of the channel.
Default is ACTIVE

:param timing:
An instance of :class:`~can.BitTiming` or :class:`~can.BitTimingFd`
to specify the bit timing parameters for the PCAN interface. If this parameter
is provided, it takes precedence over all other timing-related parameters.
If this parameter is not provided, the bit timing parameters can be specified
using the `bitrate` parameter for standard CAN or the `fd`, `f_clock`,
`f_clock_mhz`, `nom_brp`, `nom_tseg1`, `nom_tseg2`, `nom_sjw`, `data_brp`,
`data_tseg1`, `data_tseg2`, and `data_sjw` parameters for CAN FD.
Note that the `f_clock` value of the `timing` instance must be 8_000_000
for standard CAN or any of the following values for CAN FD: 20_000_000,
24_000_000, 30_000_000, 40_000_000, 60_000_000, 80_000_000.

:param int bitrate:
Bitrate of channel in bit/s.
Default is 500 kbit/s.
Expand Down Expand Up @@ -231,8 +247,7 @@ def __init__(
raise ValueError(err_msg)

self.channel_info = str(channel)
self.fd = kwargs.get("fd", False)
pcan_bitrate = PCAN_BITRATES.get(bitrate, PCAN_BAUD_500K)
self.fd = isinstance(timing, BitTimingFd) if timing else kwargs.get("fd", False)

hwtype = PCAN_TYPE_ISA
ioport = 0x02A0
Expand All @@ -245,30 +260,41 @@ def __init__(

self.check_api_version()

if state is BusState.ACTIVE or state is BusState.PASSIVE:
if state in [BusState.ACTIVE, BusState.PASSIVE]:
self.state = state
else:
raise ValueError("BusState must be Active or Passive")

if self.fd:
f_clock_val = kwargs.get("f_clock", None)
if f_clock_val is None:
f_clock = "{}={}".format("f_clock_mhz", kwargs.get("f_clock_mhz", None))
else:
f_clock = "{}={}".format("f_clock", kwargs.get("f_clock", None))

fd_parameters_values = [f_clock] + [
f"{key}={kwargs.get(key, None)}"
for key in PCAN_FD_PARAMETER_LIST
if kwargs.get(key, None) is not None
if isinstance(timing, BitTiming):
timing = check_or_adjust_timing_clock(timing, VALID_PCAN_CAN_CLOCKS)
pcan_bitrate = TPCANBaudrate(timing.btr0 << 8 | timing.btr1)
result = self.m_objPCANBasic.Initialize(
self.m_PcanHandle, pcan_bitrate, hwtype, ioport, interrupt
)
elif self.fd:
if isinstance(timing, BitTimingFd):
timing = check_or_adjust_timing_clock(
timing, sorted(VALID_PCAN_FD_CLOCKS, reverse=True)
)
# We dump the timing parameters into the kwargs because they have equal names
# as the kwargs parameters and this saves us one additional code path
kwargs.update(timing)

clock_param = "f_clock" if "f_clock" in kwargs else "f_clock_mhz"
fd_parameters_values = [
f"{key}={kwargs[key]}"
for key in (clock_param,) + PCAN_FD_PARAMETER_LIST
if key in kwargs
]

self.fd_bitrate = " ,".join(fd_parameters_values).encode("ascii")
self.fd_bitrate = ", ".join(fd_parameters_values).encode("ascii")

result = self.m_objPCANBasic.InitializeFD(
self.m_PcanHandle, self.fd_bitrate
)

else:
pcan_bitrate = PCAN_BITRATES.get(bitrate, PCAN_BAUD_500K)
result = self.m_objPCANBasic.Initialize(
self.m_PcanHandle, pcan_bitrate, hwtype, ioport, interrupt
)
Expand Down Expand Up @@ -312,7 +338,7 @@ def __init__(
if result != PCAN_ERROR_OK:
raise PcanCanInitializationError(self._get_formatted_error(result))

super().__init__(channel=channel, state=state, bitrate=bitrate, *args, **kwargs)
super().__init__(channel=channel, state=state, bitrate=bitrate, **kwargs)

def _find_channel_by_dev_id(self, device_id):
"""
Expand Down
79 changes: 73 additions & 6 deletions test/test_pcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@
"""

import ctypes
import platform
import unittest
from unittest import mock
from unittest.mock import Mock, patch


import pytest
from parameterized import parameterized

import can
from can.bus import BusState
from can.exceptions import CanInitializationError
from can.interfaces.pcan.basic import *
from can.interfaces.pcan import PcanBus, PcanError
from can.interfaces.pcan.basic import *


class TestPCANBus(unittest.TestCase):
Expand Down Expand Up @@ -53,22 +51,52 @@ def _mockGetValue(self, channel, parameter):

def test_bus_creation(self) -> None:
self.bus = can.Bus(interface="pcan")

self.assertIsInstance(self.bus, PcanBus)
self.MockPCANBasic.assert_called_once()

self.mock_pcan.Initialize.assert_called_once()
self.mock_pcan.InitializeFD.assert_not_called()

def test_bus_creation_state_error(self) -> None:
with self.assertRaises(ValueError):
can.Bus(interface="pcan", state=BusState.ERROR)

def test_bus_creation_fd(self) -> None:
self.bus = can.Bus(interface="pcan", fd=True)
@parameterized.expand([("f_clock", 8_000_000), ("f_clock_mhz", 8)])
def test_bus_creation_fd(self, clock_param: str, clock_val: int) -> None:
self.bus = can.Bus(
interface="pcan",
fd=True,
nom_brp=1,
nom_tseg1=129,
nom_tseg2=30,
nom_sjw=1,
data_brp=1,
data_tseg1=9,
data_tseg2=6,
data_sjw=1,
channel="PCAN_USBBUS1",
**{clock_param: clock_val},
)

self.assertIsInstance(self.bus, PcanBus)
self.MockPCANBasic.assert_called_once()
self.mock_pcan.Initialize.assert_not_called()
self.mock_pcan.InitializeFD.assert_called_once()

# Retrieve second argument of first call
bitrate_arg = self.mock_pcan.InitializeFD.call_args[0][-1]

self.assertTrue(f"{clock_param}={clock_val}".encode("ascii") in bitrate_arg)
self.assertTrue(b"nom_brp=1" in bitrate_arg)
self.assertTrue(b"nom_tseg1=129" in bitrate_arg)
self.assertTrue(b"nom_tseg2=30" in bitrate_arg)
self.assertTrue(b"nom_sjw=1" in bitrate_arg)
self.assertTrue(b"data_brp=1" in bitrate_arg)
self.assertTrue(b"data_tseg1=9" in bitrate_arg)
self.assertTrue(b"data_tseg2=6" in bitrate_arg)
self.assertTrue(b"data_sjw=1" in bitrate_arg)

def test_api_version_low(self) -> None:
self.PCAN_API_VERSION_SIM = "1.0"
with self.assertLogs("can.pcan", level="WARNING") as cm:
Expand Down Expand Up @@ -333,6 +361,11 @@ def test_state(self, name, bus_state: BusState, expected_parameter) -> None:
(PCAN_USBBUS1, PCAN_LISTEN_ONLY, expected_parameter),
)

def test_state_constructor(self):
for state in [BusState.ACTIVE, BusState.PASSIVE]:
bus = can.Bus(interface="pcan", state=state)
assert bus.state == state

def test_detect_available_configs(self) -> None:
if platform.system() == "Darwin":
self.mock_pcan.GetValue = Mock(
Expand Down Expand Up @@ -381,7 +414,8 @@ def get_value_side_effect(handle, param):
self.mock_pcan.GetValue = Mock(side_effect=get_value_side_effect)

if expected_result == "error":
self.assertRaises(ValueError, can.Bus, interface="pcan", device_id=dev_id)
with self.assertRaises(ValueError):
can.Bus(interface="pcan", device_id=dev_id)
else:
self.bus = can.Bus(interface="pcan", device_id=dev_id)
self.assertEqual(expected_result, self.bus.channel_info)
Expand Down Expand Up @@ -416,6 +450,39 @@ def test_peak_fd_bus_constructor_regression(self):

can.Bus(**params)

def test_constructor_bit_timing(self):
timing = can.BitTiming.from_registers(f_clock=8_000_000, btr0=0x47, btr1=0x2F)
can.Bus(interface="pcan", channel="PCAN_USBBUS1", timing=timing)

bitrate_arg = self.mock_pcan.Initialize.call_args[0][1]
self.assertEqual(bitrate_arg.value, 0x472F)

def test_constructor_bit_timing_fd(self):
timing = can.BitTimingFd(
f_clock=40_000_000,
nom_brp=1,
nom_tseg1=129,
nom_tseg2=30,
nom_sjw=1,
data_brp=1,
data_tseg1=9,
data_tseg2=6,
data_sjw=1,
)
can.Bus(interface="pcan", channel="PCAN_USBBUS1", timing=timing)

bitrate_arg = self.mock_pcan.InitializeFD.call_args[0][-1]

self.assertTrue(b"f_clock=40000000" in bitrate_arg)
self.assertTrue(b"nom_brp=1" in bitrate_arg)
self.assertTrue(b"nom_tseg1=129" in bitrate_arg)
self.assertTrue(b"nom_tseg2=30" in bitrate_arg)
self.assertTrue(b"nom_sjw=1" in bitrate_arg)
self.assertTrue(b"data_brp=1" in bitrate_arg)
self.assertTrue(b"data_tseg1=9" in bitrate_arg)
self.assertTrue(b"data_tseg2=6" in bitrate_arg)
self.assertTrue(b"data_sjw=1" in bitrate_arg)


if __name__ == "__main__":
unittest.main()