Skip to content

Commit

Permalink
Flaky tests pass 1 (#616)
Browse files Browse the repository at this point in the history
* Update shared subscription test to remove invalid assertions on a property that does not necessarily hold (all subscriptions receive messages)
* Make MQTT will tests more reliable by adding a delay between subscribe and disconnect, and looping the connection attempt in the 311 test which does not have the ability to send a DISCONNECT_WITH_WILL message.

Co-authored-by: Bret Ambrose <[email protected]>
  • Loading branch information
bretambrose and Bret Ambrose authored Jan 6, 2025
1 parent 2abf568 commit 87746bb
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
11 changes: 10 additions & 1 deletion test/test_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,16 @@ def on_message(**kwargs):
ping_timeout_ms=10000,
keep_alive_secs=30
)
disconnecter.connect().result(TIMEOUT)

# A race condition exists in IoT Core where the interrupter may get refused rather than the existing
# connection getting dropped. Loop until we successfully connect.
continue_connecting = True
while continue_connecting:
try:
disconnecter.connect().result(TIMEOUT)
continue_connecting = False
except BaseException:
pass

# Receive message
rcv = received.result(TIMEOUT)
Expand Down
9 changes: 3 additions & 6 deletions test/test_mqtt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -1009,8 +1009,6 @@ def test_operation_sub_unsub(self):
client.stop()
callbacks.future_stopped.result(TIMEOUT)

sub1_callbacks = False
sub2_callbacks = False
total_callbacks = 0
all_packets_received = Future()
mutex = Lock()
Expand All @@ -1020,7 +1018,6 @@ def subscriber1_callback(self, publish_received_data: mqtt5.PublishReceivedData)
self.mutex.acquire()
var = publish_received_data.publish_packet.payload
self.received_subscriptions[int(var)] = 1
self.sub1_callbacks = True
self.total_callbacks = self.total_callbacks + 1
if self.total_callbacks == 10:
self.all_packets_received.set_result(None)
Expand All @@ -1030,7 +1027,6 @@ def subscriber2_callback(self, publish_received_data: mqtt5.PublishReceivedData)
self.mutex.acquire()
var = publish_received_data.publish_packet.payload
self.received_subscriptions[int(var)] = 1
self.sub2_callbacks = True
self.total_callbacks = self.total_callbacks + 1
if self.total_callbacks == 10:
self.all_packets_received.set_result(None)
Expand Down Expand Up @@ -1154,8 +1150,6 @@ def test_operation_shared_subscription(self):
unsuback_packet = unsubscribe_future.result(TIMEOUT)
self.assertIsInstance(unsuback_packet, mqtt5.UnsubackPacket)

self.assertEqual(self.sub1_callbacks, True)
self.assertEqual(self.sub2_callbacks, True)
self.assertEqual(self.total_callbacks, 10)

for e in self.received_subscriptions:
Expand Down Expand Up @@ -1221,6 +1215,9 @@ def test_operation_will(self):
suback_packet = subscribe_future.result(TIMEOUT)
self.assertIsInstance(suback_packet, mqtt5.SubackPacket)

# wait a few seconds to minimize chance of eventual consistency race condition between subscribe and publish
time.sleep(2)

disconnect_packet = mqtt5.DisconnectPacket(reason_code=mqtt5.DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE)
client1.stop(disconnect_packet=disconnect_packet)
callbacks1.future_stopped.result(TIMEOUT)
Expand Down

0 comments on commit 87746bb

Please sign in to comment.