Skip to content
28 changes: 16 additions & 12 deletions canopen/pdo/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,22 @@ def __init__(self, node: Union[LocalNode, RemoteNode]):
def __iter__(self):
return iter(self.map)

def __getitem__(self, key):
if isinstance(key, int) and (0x1A00 <= key <= 0x1BFF or # By TPDO ID (512)
0x1600 <= key <= 0x17FF or # By RPDO ID (512)
0 < key <= 512): # By PDO Index
return self.map[key]
else:
for pdo_map in self.map.values():
try:
return pdo_map[key]
except KeyError:
# ignore if one specific PDO does not have the key and try the next one
continue
def __getitem__(self, key: Union[int, str]):
if isinstance(key, int):
if key == 0:
raise KeyError("PDO index zero requested for 1-based sequence")
if (
0 < key <= 512 # By PDO Index
or 0x1600 <= key <= 0x17FF # By RPDO ID (512)
or 0x1A00 <= key <= 0x1BFF # By TPDO ID (512)
):
return self.map[key]
for pdo_map in self.map.values():
try:
return pdo_map[key]
except KeyError:
# ignore if one specific PDO does not have the key and try the next one
continue
raise KeyError(f"PDO: {key} was not found in any map")

def __len__(self):
Expand Down
44 changes: 35 additions & 9 deletions test/test_pdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,41 @@ def test_pdo_getitem(self):
self.assertEqual(node.tpdo[1]['BOOLEAN value 2'].raw, True)

# Test different types of access
self.assertEqual(node.pdo[0x1600]['INTEGER16 value'].raw, -3)
self.assertEqual(node.pdo['INTEGER16 value'].raw, -3)
self.assertEqual(node.pdo.tx[1]['INTEGER16 value'].raw, -3)
self.assertEqual(node.pdo[0x2001].raw, -3)
self.assertEqual(node.tpdo[0x2001].raw, -3)
self.assertEqual(node.pdo[0x2002].raw, 0xf)
self.assertEqual(node.pdo['0x2002'].raw, 0xf)
self.assertEqual(node.tpdo[0x2002].raw, 0xf)
self.assertEqual(node.pdo[0x1600][0x2002].raw, 0xf)
by_mapping_record = node.pdo[0x1600]
self.assertIsInstance(by_mapping_record, canopen.pdo.PdoMap)
self.assertEqual(by_mapping_record['INTEGER16 value'].raw, -3)
by_object_name = node.pdo['INTEGER16 value']
self.assertIsInstance(by_object_name, canopen.pdo.PdoVariable)
self.assertIs(by_object_name.od, node.object_dictionary['INTEGER16 value'])
self.assertEqual(by_object_name.raw, -3)
by_pdo_index = node.pdo.tx[1]
self.assertIs(by_pdo_index, by_mapping_record)
by_object_index = node.pdo[0x2001]
self.assertIsInstance(by_object_index, canopen.pdo.PdoVariable)
self.assertIs(by_object_index, by_object_name)
by_object_index_tpdo = node.tpdo[0x2001]
self.assertIs(by_object_index_tpdo, by_object_name)
by_object_index = node.pdo[0x2002]
self.assertEqual(by_object_index.raw, 0xf)
self.assertIs(node.pdo['0x2002'], by_object_index)
self.assertIs(node.tpdo[0x2002], by_object_index)
self.assertIs(node.pdo[0x1600][0x2002], by_object_index)

self.assertRaises(KeyError, lambda: node.pdo[0])
self.assertRaises(KeyError, lambda: node.tpdo[0])
self.assertRaises(KeyError, lambda: node.pdo['DOES NOT EXIST'])
self.assertRaises(KeyError, lambda: node.pdo[0x1BFF])
self.assertRaises(KeyError, lambda: node.tpdo[0x1BFF])

def test_pdo_maps_iterate(self):
node = self.node
self.assertEqual(len(node.pdo), sum(1 for _ in node.pdo))
self.assertEqual(len(node.tpdo), sum(1 for _ in node.tpdo))
self.assertEqual(len(node.rpdo), sum(1 for _ in node.rpdo))
self.assertEqual(len(node.rpdo) + len(node.tpdo), len(node.pdo))

pdo = node.tpdo[1]
self.assertEqual(len(pdo), sum(1 for _ in pdo))

def test_pdo_save(self):
self.node.tpdo.save()
Expand Down
Loading