From 87746bb1be21a3b71895f618d4c890db0fb487b3 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Mon, 6 Jan 2025 13:35:38 -0800 Subject: [PATCH] Flaky tests pass 1 (#616) * Update shared subscription test to remove invalid assertions on a property that does not necessarily hold (all subscriptions receive messages) * Make MQTT will tests more reliable by adding a delay between subscribe and disconnect, and looping the connection attempt in the 311 test which does not have the ability to send a DISCONNECT_WITH_WILL message. Co-authored-by: Bret Ambrose --- test/test_mqtt.py | 11 ++++++++++- test/test_mqtt5.py | 9 +++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/test/test_mqtt.py b/test/test_mqtt.py index c76bad7fa..547e1e65c 100644 --- a/test/test_mqtt.py +++ b/test/test_mqtt.py @@ -208,7 +208,16 @@ def on_message(**kwargs): ping_timeout_ms=10000, keep_alive_secs=30 ) - disconnecter.connect().result(TIMEOUT) + + # A race condition exists in IoT Core where the interrupter may get refused rather than the existing + # connection getting dropped. Loop until we successfully connect. + continue_connecting = True + while continue_connecting: + try: + disconnecter.connect().result(TIMEOUT) + continue_connecting = False + except BaseException: + pass # Receive message rcv = received.result(TIMEOUT) diff --git a/test/test_mqtt5.py b/test/test_mqtt5.py index 143fd04cf..2e1399307 100644 --- a/test/test_mqtt5.py +++ b/test/test_mqtt5.py @@ -1009,8 +1009,6 @@ def test_operation_sub_unsub(self): client.stop() callbacks.future_stopped.result(TIMEOUT) - sub1_callbacks = False - sub2_callbacks = False total_callbacks = 0 all_packets_received = Future() mutex = Lock() @@ -1020,7 +1018,6 @@ def subscriber1_callback(self, publish_received_data: mqtt5.PublishReceivedData) self.mutex.acquire() var = publish_received_data.publish_packet.payload self.received_subscriptions[int(var)] = 1 - self.sub1_callbacks = True self.total_callbacks = self.total_callbacks + 1 if self.total_callbacks == 10: self.all_packets_received.set_result(None) @@ -1030,7 +1027,6 @@ def subscriber2_callback(self, publish_received_data: mqtt5.PublishReceivedData) self.mutex.acquire() var = publish_received_data.publish_packet.payload self.received_subscriptions[int(var)] = 1 - self.sub2_callbacks = True self.total_callbacks = self.total_callbacks + 1 if self.total_callbacks == 10: self.all_packets_received.set_result(None) @@ -1154,8 +1150,6 @@ def test_operation_shared_subscription(self): unsuback_packet = unsubscribe_future.result(TIMEOUT) self.assertIsInstance(unsuback_packet, mqtt5.UnsubackPacket) - self.assertEqual(self.sub1_callbacks, True) - self.assertEqual(self.sub2_callbacks, True) self.assertEqual(self.total_callbacks, 10) for e in self.received_subscriptions: @@ -1221,6 +1215,9 @@ def test_operation_will(self): suback_packet = subscribe_future.result(TIMEOUT) self.assertIsInstance(suback_packet, mqtt5.SubackPacket) + # wait a few seconds to minimize chance of eventual consistency race condition between subscribe and publish + time.sleep(2) + disconnect_packet = mqtt5.DisconnectPacket(reason_code=mqtt5.DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE) client1.stop(disconnect_packet=disconnect_packet) callbacks1.future_stopped.result(TIMEOUT)