Skip to content

Commit

Permalink
Merge pull request #204 from mkinney/add_more_unit_tests
Browse files Browse the repository at this point in the history
get last two lines covered in node
  • Loading branch information
mkinney authored Jan 1, 2022
2 parents 9380f04 + 83c18f4 commit e7664cb
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 8 deletions.
13 changes: 5 additions & 8 deletions meshtastic/mesh_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ def __enter__(self):

def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None and exc_value is not None:
logging.error(
f'An exception of type {exc_type} with value {exc_value} has occurred')
logging.error(f'An exception of type {exc_type} with value {exc_value} has occurred')
if traceback is not None:
logging.error(f'Traceback: {traceback}')
self.close()
Expand Down Expand Up @@ -394,11 +393,11 @@ def getShortName(self):
return user.get('shortName', None)
return None

def _waitConnected(self):
def _waitConnected(self, timeout=15.0):
"""Block until the initial node db download is complete, or timeout
and raise an exception"""
if not self.noProto:
if not self.isConnected.wait(15.0): # timeout after x seconds
if not self.isConnected.wait(timeout): # timeout after x seconds
raise Exception("Timed out waiting for connection completion")

# If we failed while connecting, raise the connection to the client
Expand All @@ -416,8 +415,7 @@ def _generatePacketId(self):
def _disconnected(self):
"""Called by subclasses to tell clients this interface has disconnected"""
self.isConnected.clear()
publishingThread.queueWork(lambda: pub.sendMessage(
"meshtastic.connection.lost", interface=self))
publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.connection.lost", interface=self))

def _startHeartbeat(self):
"""We need to send a heartbeat message to the device every X seconds"""
Expand All @@ -443,8 +441,7 @@ def _connected(self):
if not self.isConnected.is_set():
self.isConnected.set()
self._startHeartbeat()
publishingThread.queueWork(lambda: pub.sendMessage(
"meshtastic.connection.established", interface=self))
publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.connection.established", interface=self))

def _startConfig(self):
"""Start device packets flowing"""
Expand Down
85 changes: 85 additions & 0 deletions meshtastic/tests/test_mesh_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from ..node import Node
from .. import mesh_pb2
from ..__init__ import LOCAL_ADDR, BROADCAST_ADDR
from ..radioconfig_pb2 import RadioConfig
from ..util import Timeout


@pytest.mark.unit
Expand Down Expand Up @@ -164,6 +166,22 @@ def test_sendPosition(reset_globals, caplog):
assert re.search(r'p.time:', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_close_with_heartbeatTimer(reset_globals, caplog):
"""Test close() with heartbeatTimer"""
iface = MeshInterface(noProto=True)
anode = Node('foo', 'bar')
radioConfig = RadioConfig()
radioConfig.preferences.phone_timeout_secs = 10
anode.radioConfig = radioConfig
iface.localNode = anode
assert iface.heartbeatTimer is None
with caplog.at_level(logging.DEBUG):
iface._startHeartbeat()
assert iface.heartbeatTimer is not None
iface.close()


@pytest.mark.unit
def test_handleFromRadio_empty_payload(reset_globals, caplog):
"""Test _handleFromRadio"""
Expand Down Expand Up @@ -543,3 +561,70 @@ def test_getOrCreateByNum(capsys, reset_globals, iface_with_nodes):
iface.myInfo.my_node_num = 2475227164
tmp = iface._getOrCreateByNum(2475227164)
assert tmp['num'] == 2475227164


@pytest.mark.unit
def test_enter():
"""Test __enter__()"""
iface = MeshInterface(noProto=True)
assert iface == iface.__enter__()


@pytest.mark.unit
def test_exit_with_exception(caplog):
"""Test __exit__()"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.ERROR):
iface.__exit__('foo', 'bar', 'baz')
assert re.search(r'An exception of type foo with value bar has occurred', caplog.text, re.MULTILINE)
assert re.search(r'Traceback: baz', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_showNodes_exclude_self(capsys, caplog, reset_globals, iface_with_nodes):
"""Test that we hit that continue statement"""
with caplog.at_level(logging.DEBUG):
iface = iface_with_nodes
iface.localNode.nodeNum = 2475227164
iface.showNodes()
iface.showNodes(includeSelf=False)
capsys.readouterr()


@pytest.mark.unit
def test_waitForConfig(caplog, capsys):
"""Test waitForConfig()"""
iface = MeshInterface(noProto=True)
# override how long to wait
iface._timeout = Timeout(0.01)
with pytest.raises(Exception) as pytest_wrapped_e:
iface.waitForConfig()
assert pytest_wrapped_e.type == Exception
out, err = capsys.readouterr()
assert re.search(r'Exception: Timed out waiting for interface config', err, re.MULTILINE)
assert out == ''


@pytest.mark.unit
def test_waitConnected_raises_an_exception(caplog, capsys):
"""Test waitConnected()"""
iface = MeshInterface(noProto=True)
with pytest.raises(Exception) as pytest_wrapped_e:
iface.failure = "warn about something"
iface._waitConnected(0.01)
assert pytest_wrapped_e.type == Exception
out, err = capsys.readouterr()
assert re.search(r'warn about something', err, re.MULTILINE)
assert out == ''


@pytest.mark.unit
def test_waitConnected_isConnected_timeout(caplog, capsys):
"""Test waitConnected()"""
with pytest.raises(Exception) as pytest_wrapped_e:
iface = MeshInterface()
iface._waitConnected(0.01)
assert pytest_wrapped_e.type == Exception
out, err = capsys.readouterr()
assert re.search(r'warn about something', err, re.MULTILINE)
assert out == ''
22 changes: 22 additions & 0 deletions meshtastic/tests/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ..admin_pb2 import AdminMessage
from ..channel_pb2 import Channel
from ..radioconfig_pb2 import RadioConfig
from ..util import Timeout


@pytest.mark.unit
Expand Down Expand Up @@ -90,6 +91,16 @@ def test_setOwner_no_short_name_and_long_name_has_words(caplog):
assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setOwner_long_name_no_short(caplog):
"""Test setOwner"""
anode = Node('foo', 'bar', noProto=True)
with caplog.at_level(logging.DEBUG):
anode.setOwner(long_name ='Aabo', is_licensed=True)
assert re.search(r'p.set_owner.long_name:Aabo:', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.short_name:Aab:', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_exitSimulator(caplog):
"""Test exitSimulator"""
Expand Down Expand Up @@ -869,3 +880,14 @@ def test_onResponseRequestSetting_with_error(capsys):
out, err = capsys.readouterr()
assert re.search(r'Error on response', out)
assert err == ''


@pytest.mark.unit
def test_waitForConfig():
"""Test waitForConfig()"""
anode = Node('foo', 'bar')
radioConfig = RadioConfig()
anode.radioConfig = radioConfig
anode._timeout = Timeout(0.01)
result = anode.waitForConfig()
assert not result

0 comments on commit e7664cb

Please sign in to comment.