Skip to content

Commit

Permalink
Tests: harden TestNetwork and TestNmt even more (#508)
Browse files Browse the repository at this point in the history
- take into account that payloads with DATA1 may be in flight when we
  update the task
- test task.stop()
- use more lenient timeout
- use threading.Condition for improved readability
- wait for periodicity to be established before validation; some
  platforms are a little bit flaky

Co-authored-by: André Colomb <src@andre.colomb.de>
  • Loading branch information
erlend-aasland and acolomb authored Aug 6, 2024
1 parent 7d7e2f1 commit ecf216a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
52 changes: 38 additions & 14 deletions test/test_network.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import unittest
from threading import Event
import threading

import canopen
import can
Expand Down Expand Up @@ -231,34 +231,58 @@ def test_network_send_periodic(self):
DATA2 = bytes([4, 5, 6])
COB_ID = 0x123
PERIOD = 0.1
TIMEOUT = PERIOD * 10
self.network.connect(interface="virtual", receive_own_messages=True)
self.addCleanup(self.network.disconnect)

acc = []
event = Event()
condition = threading.Condition()

def hook(_, data, ts):
acc.append((data, ts))
event.set()
with condition:
item = data, ts
acc.append(item)
condition.notify_all()

self.network.subscribe(COB_ID, hook)
self.addCleanup(self.network.unsubscribe, COB_ID)

task = self.network.send_periodic(COB_ID, DATA1, PERIOD)
self.addCleanup(task.stop)

event.wait(PERIOD*2)

# Update task data.
def periodicity():
# Check if periodicity is established; flakiness has been observed
# on macOS.
if len(acc) >= 2:
delta = acc[-1][1] - acc[-2][1]
return round(delta, ndigits=1) == PERIOD
return False

# Wait for frames to arrive; then check the result.
with condition:
condition.wait_for(periodicity, TIMEOUT)
self.assertTrue(all(v[0] == DATA1 for v in acc))

# Update task data, which may implicitly restart the timer.
# Wait for frames to arrive; then check the result.
task.update(DATA2)
event.clear()
event.wait(PERIOD*2)
task.stop()

with condition:
acc.clear()
condition.wait_for(periodicity, TIMEOUT)
# Find the first message with new data, and verify that all subsequent
# messages also carry the new payload.
data = [v[0] for v in acc]
self.assertEqual(data, [DATA1, DATA2])
ts = [v[1] for v in acc]
self.assertAlmostEqual(ts[1]-ts[0], PERIOD, places=1)
idx = data.index(DATA2)
self.assertTrue(all(v[0] == DATA2 for v in acc[idx:]))

# Stop the task.
task.stop()
# A message may have been in flight when we stopped the timer,
# so allow a single failure.
bus = self.network.bus
msg = bus.recv(TIMEOUT)
if msg is not None:
self.assertIsNone(bus.recv(TIMEOUT))


class TestScanner(unittest.TestCase):
Expand Down
6 changes: 5 additions & 1 deletion test/test_nmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ def test_nmt_master_node_guarding(self):
self.assertEqual(msg.dlc, 0)

self.node.nmt.stop_node_guarding()
self.assertIsNone(self.bus.recv(self.TIMEOUT))
# A message may have been in flight when we stopped the timer,
# so allow a single failure.
msg = self.bus.recv(self.TIMEOUT)
if msg is not None:
self.assertIsNone(self.bus.recv(self.TIMEOUT))


class TestNmtSlave(unittest.TestCase):
Expand Down

0 comments on commit ecf216a

Please sign in to comment.