Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions can/interfaces/vector/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,20 @@

# Import Modules
# ==============
from can import BusABC, Message, CanInterfaceNotImplementedError, CanInitializationError
from can import (
BusABC,
Message,
CanInterfaceNotImplementedError,
CanInitializationError,
BitTiming,
BitTimingFd,
)
from can.util import (
len2dlc,
dlc2len,
deprecated_args_alias,
time_perfcounter_correlation,
check_or_adjust_timing_clock,
)
from can.typechecking import AutoDetectedConfig, CanFilters

Expand Down Expand Up @@ -86,6 +94,7 @@ def __init__(
can_filters: Optional[CanFilters] = None,
poll_interval: float = 0.01,
receive_own_messages: bool = False,
timing: Optional[Union[BitTiming, BitTimingFd]] = None,
bitrate: Optional[int] = None,
rx_queue_size: int = 2**14,
app_name: Optional[str] = "CANalyzer",
Expand All @@ -108,6 +117,15 @@ def __init__(
See :class:`can.BusABC`.
:param receive_own_messages:
See :class:`can.BusABC`.
:param timing:
An instance of :class:`~can.BitTiming` or :class:`~can.BitTimingFd`
to specify the bit timing parameters for the VectorBus interface. The
`f_clock` value of the timing instance must be set to 16.000.000 (16MHz)
for standard CAN or 80.000.000 (80MHz) for CAN FD. If this parameter is provided,
it takes precedence over all other timing-related parameters.
Otherwise, the bit timing can be specified using the following parameters:
`bitrate` for standard CAN or `fd`, `data_bitrate`, `sjw_abr`, `tseg1_abr`,
`tseg2_abr`, `sjw_dbr`, `tseg1_dbr`, and `tseg2_dbr` for CAN FD.
:param poll_interval:
Poll interval in seconds.
:param bitrate:
Expand Down Expand Up @@ -184,7 +202,7 @@ def __init__(
channel_configs = get_channel_configs()

self.mask = 0
self.fd = fd
self.fd = isinstance(timing, BitTimingFd) if timing else fd
self.channel_masks: Dict[int, int] = {}
self.index_to_channel: Dict[int, int] = {}

Expand All @@ -204,12 +222,12 @@ def __init__(

permission_mask = xlclass.XLaccess()
# Set mask to request channel init permission if needed
if bitrate or fd:
if bitrate or fd or timing:
permission_mask.value = self.mask

interface_version = (
xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4
if fd
if self.fd
else xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION
)

Expand All @@ -233,7 +251,30 @@ def __init__(

# set CAN settings
for channel in self.channels:
if fd:
if isinstance(timing, BitTiming):
timing = check_or_adjust_timing_clock(timing, [16_000_000])
self._set_bitrate_can(
channel=channel,
bitrate=timing.bitrate,
sjw=timing.sjw,
tseg1=timing.tseg1,
tseg2=timing.tseg2,
sam=timing.nof_samples,
)
elif isinstance(timing, BitTimingFd):
timing = check_or_adjust_timing_clock(timing, [80_000_000])
self._set_bitrate_canfd(
channel=channel,
bitrate=timing.nom_bitrate,
data_bitrate=timing.data_bitrate,
sjw_abr=timing.nom_sjw,
tseg1_abr=timing.nom_tseg1,
tseg2_abr=timing.nom_tseg2,
sjw_dbr=timing.data_sjw,
tseg1_dbr=timing.data_tseg1,
tseg2_dbr=timing.data_tseg2,
)
elif fd:
self._set_bitrate_canfd(
channel=channel,
bitrate=bitrate,
Expand Down
148 changes: 148 additions & 0 deletions test/test_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,154 @@ def test_bus_creation_fd_bitrate_timings() -> None:
bus.shutdown()


def test_bus_creation_timing_mocked(mock_xldriver) -> None:
timing = can.BitTiming.from_bitrate_and_segments(
f_clock=16_000_000,
bitrate=125_000,
tseg1=13,
tseg2=2,
sjw=1,
)
bus = can.Bus(channel=0, interface="vector", timing=timing, _testing=True)
assert isinstance(bus, canlib.VectorBus)
can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called()
can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called()

can.interfaces.vector.canlib.xldriver.xlOpenPort.assert_called()
xlOpenPort_args = can.interfaces.vector.canlib.xldriver.xlOpenPort.call_args[0]
assert xlOpenPort_args[5] == xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION.value
assert xlOpenPort_args[6] == xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value

can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration.assert_not_called()
can.interfaces.vector.canlib.xldriver.xlCanSetChannelParams.assert_called()
chip_params = (
can.interfaces.vector.canlib.xldriver.xlCanSetChannelParams.call_args[0]
)[2]
assert chip_params.bitRate == 125_000
assert chip_params.sjw == 1
assert chip_params.tseg1 == 13
assert chip_params.tseg2 == 2
assert chip_params.sam == 1


@pytest.mark.skipif(not XLDRIVER_FOUND, reason="Vector XL API is unavailable")
def test_bus_creation_timing() -> None:
timing = can.BitTiming.from_bitrate_and_segments(
f_clock=16_000_000,
bitrate=125_000,
tseg1=13,
tseg2=2,
sjw=1,
)
bus = can.Bus(
channel=0,
serial=_find_virtual_can_serial(),
interface="vector",
timing=timing,
)
assert isinstance(bus, canlib.VectorBus)

xl_channel_config = _find_xl_channel_config(
serial=_find_virtual_can_serial(), channel=0
)
assert xl_channel_config.busParams.data.can.bitRate == 125_000
assert xl_channel_config.busParams.data.can.sjw == 1
assert xl_channel_config.busParams.data.can.tseg1 == 13
assert xl_channel_config.busParams.data.can.tseg2 == 2

bus.shutdown()


def test_bus_creation_timingfd_mocked(mock_xldriver) -> None:
timing = can.BitTimingFd.from_bitrate_and_segments(
f_clock=80_000_000,
nom_bitrate=500_000,
nom_tseg1=68,
nom_tseg2=11,
nom_sjw=10,
data_bitrate=2_000_000,
data_tseg1=10,
data_tseg2=9,
data_sjw=8,
)
bus = can.Bus(
channel=0,
interface="vector",
timing=timing,
_testing=True,
)
assert isinstance(bus, canlib.VectorBus)
can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called()
can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called()

can.interfaces.vector.canlib.xldriver.xlOpenPort.assert_called()
xlOpenPort_args = can.interfaces.vector.canlib.xldriver.xlOpenPort.call_args[0]
assert (
xlOpenPort_args[5] == xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4.value
)

assert xlOpenPort_args[6] == xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value

can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration.assert_called()
can.interfaces.vector.canlib.xldriver.xlCanSetChannelBitrate.assert_not_called()

xlCanFdSetConfiguration_args = (
can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration.call_args[0]
)
canFdConf = xlCanFdSetConfiguration_args[2]
assert canFdConf.arbitrationBitRate == 500_000
assert canFdConf.dataBitRate == 2_000_000
assert canFdConf.sjwAbr == 10
assert canFdConf.tseg1Abr == 68
assert canFdConf.tseg2Abr == 11
assert canFdConf.sjwDbr == 8
assert canFdConf.tseg1Dbr == 10
assert canFdConf.tseg2Dbr == 9


@pytest.mark.skipif(not XLDRIVER_FOUND, reason="Vector XL API is unavailable")
def test_bus_creation_timingfd() -> None:
timing = can.BitTimingFd.from_bitrate_and_segments(
f_clock=80_000_000,
nom_bitrate=500_000,
nom_tseg1=68,
nom_tseg2=11,
nom_sjw=10,
data_bitrate=2_000_000,
data_tseg1=10,
data_tseg2=9,
data_sjw=8,
)
bus = can.Bus(
channel=0,
serial=_find_virtual_can_serial(),
interface="vector",
timing=timing,
)

xl_channel_config = _find_xl_channel_config(
serial=_find_virtual_can_serial(), channel=0
)
assert (
xl_channel_config.interfaceVersion
== xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4
)
assert (
xl_channel_config.busParams.data.canFD.canOpMode
& xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CANFD
)
assert xl_channel_config.busParams.data.canFD.arbitrationBitRate == 500_000
assert xl_channel_config.busParams.data.canFD.sjwAbr == 10
assert xl_channel_config.busParams.data.canFD.tseg1Abr == 68
assert xl_channel_config.busParams.data.canFD.tseg2Abr == 11
assert xl_channel_config.busParams.data.canFD.sjwDbr == 8
assert xl_channel_config.busParams.data.canFD.tseg1Dbr == 10
assert xl_channel_config.busParams.data.canFD.tseg2Dbr == 9
assert xl_channel_config.busParams.data.canFD.dataBitRate == 2_000_000

bus.shutdown()


def test_send_mocked(mock_xldriver) -> None:
bus = can.Bus(channel=0, interface="vector", _testing=True)
msg = can.Message(
Expand Down