From ae71853be7fb42870bb5763066b0c3c50d015669 Mon Sep 17 00:00:00 2001 From: Thomas Willson Date: Sun, 5 Jan 2025 14:01:16 -0800 Subject: [PATCH] Fix SDO writes of empty strings (#551) Previously, when attempting a write of "" to a VISIBLE_STRING, the expedited SDO request would not be sent and the transaction would timeout. To resolve this, if the transaction size is known to be 0, a segmented transfer is used. --- canopen/sdo/client.py | 2 +- test/test_sdo.py | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/canopen/sdo/client.py b/canopen/sdo/client.py index 421fc3d4..c08d92be 100644 --- a/canopen/sdo/client.py +++ b/canopen/sdo/client.py @@ -353,7 +353,7 @@ def __init__(self, sdo_client, index, subindex=0, size=None, force_segment=False self._exp_header = None self._done = False - if size is None or size > 4 or force_segment: + if size is None or size < 1 or size > 4 or force_segment: # Initiate segmented download request = bytearray(8) command = REQUEST_DOWNLOAD diff --git a/test/test_sdo.py b/test/test_sdo.py index 6fe8544c..e4036efe 100644 --- a/test/test_sdo.py +++ b/test/test_sdo.py @@ -68,6 +68,8 @@ def _send_message(self, can_id, data, remote=False): while self.data and self.data[0][0] == RX: self.network.notify(0x582, self.data.pop(0)[1], 0.0) + self.message_sent = True + def setUp(self): network = canopen.Network() network.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0 @@ -76,6 +78,8 @@ def setUp(self): node.sdo.RESPONSE_TIMEOUT = 0.01 self.network = network + self.message_sent = False + def test_expedited_upload(self): self.data = [ (TX, b'\x40\x18\x10\x01\x00\x00\x00\x00'), @@ -91,6 +95,7 @@ def test_expedited_upload(self): ] trans_type = self.network[2].sdo[0x1400]['Transmission type RPDO 1'].raw self.assertEqual(trans_type, 254) + self.assertTrue(self.message_sent) def test_size_not_specified(self): self.data = [ @@ -100,6 +105,7 @@ def test_size_not_specified(self): # Make sure the size of the data is 1 byte data = self.network[2].sdo.upload(0x1400, 2) self.assertEqual(data, b'\xfe') + self.assertTrue(self.message_sent) def test_expedited_download(self): self.data = [ @@ -107,6 +113,7 @@ def test_expedited_download(self): (RX, b'\x60\x17\x10\x00\x00\x00\x00\x00') ] self.network[2].sdo[0x1017].raw = 4000 + self.assertTrue(self.message_sent) def test_segmented_upload(self): self.data = [ @@ -153,6 +160,16 @@ def test_block_download(self): 'wb', size=len(data), block_transfer=True) as fp: fp.write(data) + def test_segmented_download_zero_length(self): + self.data = [ + (TX, b'\x21\x00\x20\x00\x00\x00\x00\x00'), + (RX, b'\x60\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x0F\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x20\x00\x00\x00\x00\x00\x00\x00'), + ] + self.network[2].sdo[0x2000].raw = "" + self.assertTrue(self.message_sent) + def test_block_upload(self): self.data = [ (TX, b'\xa4\x08\x10\x00\x7f\x00\x00\x00'),