diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index 19a59b37..275eea2d 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -68,8 +68,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): if exc_type is not None and exc_value is not None: - logging.error( - f'An exception of type {exc_type} with value {exc_value} has occurred') + logging.error(f'An exception of type {exc_type} with value {exc_value} has occurred') if traceback is not None: logging.error(f'Traceback: {traceback}') self.close() @@ -394,11 +393,11 @@ def getShortName(self): return user.get('shortName', None) return None - def _waitConnected(self): + def _waitConnected(self, timeout=15.0): """Block until the initial node db download is complete, or timeout and raise an exception""" if not self.noProto: - if not self.isConnected.wait(15.0): # timeout after x seconds + if not self.isConnected.wait(timeout): # timeout after x seconds raise Exception("Timed out waiting for connection completion") # If we failed while connecting, raise the connection to the client @@ -416,8 +415,7 @@ def _generatePacketId(self): def _disconnected(self): """Called by subclasses to tell clients this interface has disconnected""" self.isConnected.clear() - publishingThread.queueWork(lambda: pub.sendMessage( - "meshtastic.connection.lost", interface=self)) + publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.connection.lost", interface=self)) def _startHeartbeat(self): """We need to send a heartbeat message to the device every X seconds""" @@ -443,8 +441,7 @@ def _connected(self): if not self.isConnected.is_set(): self.isConnected.set() self._startHeartbeat() - publishingThread.queueWork(lambda: pub.sendMessage( - "meshtastic.connection.established", interface=self)) + publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.connection.established", interface=self)) def _startConfig(self): """Start device packets flowing""" diff --git a/meshtastic/tests/test_mesh_interface.py b/meshtastic/tests/test_mesh_interface.py index b96f9e43..8d77d443 100644 --- a/meshtastic/tests/test_mesh_interface.py +++ b/meshtastic/tests/test_mesh_interface.py @@ -10,6 +10,8 @@ from ..node import Node from .. import mesh_pb2 from ..__init__ import LOCAL_ADDR, BROADCAST_ADDR +from ..radioconfig_pb2 import RadioConfig +from ..util import Timeout @pytest.mark.unit @@ -164,6 +166,22 @@ def test_sendPosition(reset_globals, caplog): assert re.search(r'p.time:', caplog.text, re.MULTILINE) +@pytest.mark.unit +def test_close_with_heartbeatTimer(reset_globals, caplog): + """Test close() with heartbeatTimer""" + iface = MeshInterface(noProto=True) + anode = Node('foo', 'bar') + radioConfig = RadioConfig() + radioConfig.preferences.phone_timeout_secs = 10 + anode.radioConfig = radioConfig + iface.localNode = anode + assert iface.heartbeatTimer is None + with caplog.at_level(logging.DEBUG): + iface._startHeartbeat() + assert iface.heartbeatTimer is not None + iface.close() + + @pytest.mark.unit def test_handleFromRadio_empty_payload(reset_globals, caplog): """Test _handleFromRadio""" @@ -543,3 +561,70 @@ def test_getOrCreateByNum(capsys, reset_globals, iface_with_nodes): iface.myInfo.my_node_num = 2475227164 tmp = iface._getOrCreateByNum(2475227164) assert tmp['num'] == 2475227164 + + +@pytest.mark.unit +def test_enter(): + """Test __enter__()""" + iface = MeshInterface(noProto=True) + assert iface == iface.__enter__() + + +@pytest.mark.unit +def test_exit_with_exception(caplog): + """Test __exit__()""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.ERROR): + iface.__exit__('foo', 'bar', 'baz') + assert re.search(r'An exception of type foo with value bar has occurred', caplog.text, re.MULTILINE) + assert re.search(r'Traceback: baz', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_showNodes_exclude_self(capsys, caplog, reset_globals, iface_with_nodes): + """Test that we hit that continue statement""" + with caplog.at_level(logging.DEBUG): + iface = iface_with_nodes + iface.localNode.nodeNum = 2475227164 + iface.showNodes() + iface.showNodes(includeSelf=False) + capsys.readouterr() + + +@pytest.mark.unit +def test_waitForConfig(caplog, capsys): + """Test waitForConfig()""" + iface = MeshInterface(noProto=True) + # override how long to wait + iface._timeout = Timeout(0.01) + with pytest.raises(Exception) as pytest_wrapped_e: + iface.waitForConfig() + assert pytest_wrapped_e.type == Exception + out, err = capsys.readouterr() + assert re.search(r'Exception: Timed out waiting for interface config', err, re.MULTILINE) + assert out == '' + + +@pytest.mark.unit +def test_waitConnected_raises_an_exception(caplog, capsys): + """Test waitConnected()""" + iface = MeshInterface(noProto=True) + with pytest.raises(Exception) as pytest_wrapped_e: + iface.failure = "warn about something" + iface._waitConnected(0.01) + assert pytest_wrapped_e.type == Exception + out, err = capsys.readouterr() + assert re.search(r'warn about something', err, re.MULTILINE) + assert out == '' + + +@pytest.mark.unit +def test_waitConnected_isConnected_timeout(caplog, capsys): + """Test waitConnected()""" + with pytest.raises(Exception) as pytest_wrapped_e: + iface = MeshInterface() + iface._waitConnected(0.01) + assert pytest_wrapped_e.type == Exception + out, err = capsys.readouterr() + assert re.search(r'warn about something', err, re.MULTILINE) + assert out == '' diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index e2ab56ac..f4a597de 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -11,6 +11,7 @@ from ..admin_pb2 import AdminMessage from ..channel_pb2 import Channel from ..radioconfig_pb2 import RadioConfig +from ..util import Timeout @pytest.mark.unit @@ -90,6 +91,16 @@ def test_setOwner_no_short_name_and_long_name_has_words(caplog): assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE) +@pytest.mark.unit +def test_setOwner_long_name_no_short(caplog): + """Test setOwner""" + anode = Node('foo', 'bar', noProto=True) + with caplog.at_level(logging.DEBUG): + anode.setOwner(long_name ='Aabo', is_licensed=True) + assert re.search(r'p.set_owner.long_name:Aabo:', caplog.text, re.MULTILINE) + assert re.search(r'p.set_owner.short_name:Aab:', caplog.text, re.MULTILINE) + + @pytest.mark.unit def test_exitSimulator(caplog): """Test exitSimulator""" @@ -869,3 +880,14 @@ def test_onResponseRequestSetting_with_error(capsys): out, err = capsys.readouterr() assert re.search(r'Error on response', out) assert err == '' + + +@pytest.mark.unit +def test_waitForConfig(): + """Test waitForConfig()""" + anode = Node('foo', 'bar') + radioConfig = RadioConfig() + anode.radioConfig = radioConfig + anode._timeout = Timeout(0.01) + result = anode.waitForConfig() + assert not result