diff --git a/msmart/device/C3/command.py b/msmart/device/C3/command.py index abdfe0f4..be0c08f0 100644 --- a/msmart/device/C3/command.py +++ b/msmart/device/C3/command.py @@ -13,6 +13,11 @@ _LOGGER = logging.getLogger(__name__) +# Useful acronyms +# DHW - Domestic hot water +# TBH - Tank booster heater +# IBH - Internal backup heater + class ControlType(IntEnum): CONTROL_BASIC = 0x1 @@ -40,6 +45,13 @@ class QueryType(IntEnum): QUERY_UNIT_PARAMETERS = 0x10 +class ReportType(IntEnum): # Ref: MSG_TYPE_UP + REPORT_BASIC = 0x1 + REPORT_POWER3 = 0x3 + REPORT_POWER4 = 0x4 + REPORT_UNIT_PARAMETERS = 0x5 + + class QueryCommand(Frame): """Base class for query commands.""" @@ -69,7 +81,7 @@ def __init__(self) -> None: class QueryUnitParametersCommand(QueryCommand): - """Command to query ECO state.""" + """Command to query unit parameters.""" def __init__(self) -> None: super().__init__(QueryType.QUERY_UNIT_PARAMETERS) @@ -161,18 +173,21 @@ def validate(cls, frame: memoryview) -> None: Frame.validate(frame) @classmethod - def construct(cls, frame: bytes) -> Union[QueryBasicResponse, QueryUnitParametersResponse, Response]: + def construct(cls, frame: bytes) -> Union[QueryBasicResponse, QueryUnitParametersResponse, ReportPower4Response, Response]: # Build a memoryview of the frame for zero-copy slicing with memoryview(frame) as frame_mv: # Ensure frame is valid before parsing Response.validate(frame_mv) # Parse frame depending on id + frame_type = frame_mv[9] type = frame_mv[10] - if type == QueryType.QUERY_BASIC: + if frame_type == FrameType.QUERY and type == QueryType.QUERY_BASIC: return QueryBasicResponse(frame_mv) - elif type == QueryType.QUERY_UNIT_PARAMETERS: + elif frame_type == FrameType.QUERY and type == QueryType.QUERY_UNIT_PARAMETERS: return QueryUnitParametersResponse(frame_mv) + elif frame_type == FrameType.REPORT and type == ReportType.REPORT_POWER4: + return ReportPower4Response(frame_mv) else: return Response(frame_mv) @@ -183,7 +198,7 @@ class QueryBasicResponse(Response): def __init__(self, frame: memoryview) -> None: super().__init__(frame) - _LOGGER.debug("Query basic payload: %s", self.payload.hex()) + _LOGGER.debug("Query basic response payload: %s", self.payload.hex()) with memoryview(self.payload) as payload: self._parse(payload) @@ -264,13 +279,69 @@ def _parse(self, payload: memoryview) -> None: self.zone2_curve_type = payload[26] +class ReportPower4Response(Response): + """Unsolicited report of POWER4.""" + + def __init__(self, frame: memoryview) -> None: + super().__init__(frame) + + _LOGGER.debug("Power4 report payload: %s", self.payload.hex()) + + with memoryview(self.payload) as payload: + self._parse(payload) + + def _parse(self, payload: memoryview) -> None: + + # Local function to convert byte to signed int + def signed_int(data) -> int: + return struct.unpack("b", data)[0] + + # TODO do these indicate current activity, or power state + self.heat_active = bool(payload[1] & 0x01) # Ref: isheatrun0 + self.cool_active = bool(payload[1] & 0x02) # Ref: iscoolrun0 + self.dhw_active = bool(payload[1] & 0x04) # Ref: isdhwrun0 + self.tbh_active = bool(payload[1] & 0x08) # Ref: istbhrun0 + + # TODO isibhrun0, issmartgrid0m, ishighprices0, isbottomprices0 + + # Ref: totalelectricity0 + self.electric_power = struct.unpack(">I", payload[2:6])[0] + # Ref: totalthermal0 + self.thermal_power = struct.unpack(">I", payload[6:10])[0] + + # TODO water and air temperatures may need to be checked for validity + self.outdoor_air_temperature = signed_int(payload[10:11]) # Ref: t4 + + self.zone1_target_temperature = payload[11] + self.zone2_target_temperature = payload[12] + + self.water_tank_temperature = payload[13] # Ref: t5s + # TODO payload[14] Ref: tas + + # TODO What does online mean? + self.online = bool(payload[17] & 0x01) # Ref: isonline0 + + # Bytes 19 - 153 contain run status and energy 1 - 15 + + # Bytes 154, 155 contain run status of ibh2 0-15 + + self.voltage = payload[156] # Ref: voltage0 + # voltage0-15 bytes 157-171 + + # TODO + # self.ibh1_power = payload[172] # Ref: power_ibh1 + # self.ibh2_power = payload[173] # Ref: power_ibh1 + # self.tbh_power = payload[174] # Ref: power_ibh1 + + class QueryUnitParametersResponse(Response): """Response to unit parameters query.""" def __init__(self, frame: memoryview) -> None: super().__init__(frame) - _LOGGER.debug("Query unit parameters payload: %s", self.payload.hex()) + _LOGGER.debug("Query unit parameters response payload: %s", + self.payload.hex()) with memoryview(self.payload) as payload: self._parse(payload) @@ -280,11 +351,11 @@ def _parse(self, payload: memoryview) -> None: # There are many fields of this response that are unused and thus not parsed # Local function to convert byte to signed int - def signed_int(data): + def signed_int(data) -> int: return struct.unpack("b", data)[0] - self.outdoor_temperature = signed_int(payload[8]) # Ref: tempT4 - self.water_temperature_2 = signed_int(payload[11]) # Ref: tempTwout + self.outdoor_temperature = signed_int(payload[8:9]) # Ref: tempT4 + self.water_temperature_2 = signed_int(payload[11:12]) # Ref: tempTwout # Referenced in JS w/o friendly name - self.tempT5 = signed_int(payload[38]) - self.room_temperature = signed_int(payload[39]) # Ref: tempTa + self.tempT5 = signed_int(payload[38:39]) + self.room_temperature = signed_int(payload[39:40]) # Ref: tempTa diff --git a/msmart/device/C3/test_command.py b/msmart/device/C3/test_command.py index 2964de7d..e54bc0b9 100644 --- a/msmart/device/C3/test_command.py +++ b/msmart/device/C3/test_command.py @@ -2,7 +2,7 @@ import unittest from typing import Union, cast -from .command import QueryBasicResponse, Response +from .command import QueryBasicResponse, ReportPower4Response, Response class _TestResponseBase(unittest.TestCase): @@ -47,5 +47,35 @@ def test_message(self) -> None: self.assertEqual(type(resp), QueryBasicResponse) +class TestPower4Response(_TestResponseBase): + """Test POWER4 report messages.""" + + # Attributes expected in state response objects + EXPECTED_ATTRS = [] + + def _test_response(self, msg) -> ReportPower4Response: + resp = self._test_build_response(msg) + # self._test_check_attributes(resp, self.EXPECTED_ATTRS) + return cast(ReportPower4Response, resp) + + def test_message(self) -> None: + # https://github.com/mill1000/midea-msmart/issues/107#issuecomment-1962036384 + # Unsolicited report with POWER4 payload + TEST_MESSAGE = bytes.fromhex( + "aab9c3000000000000040400000012fc000023aa0b201e2930ffff01000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e10000000000000000000000000000000000140b") + resp = self._test_response(TEST_MESSAGE) + + # Assert response is a state response + self.assertEqual(type(resp), ReportPower4Response) + + self.assertEqual(resp.electric_power, 4860) + self.assertEqual(resp.thermal_power, 9130) + + self.assertEqual(resp.outdoor_air_temperature, 11) + self.assertEqual(resp.water_tank_temperature, 41) + + self.assertEqual(resp.voltage, 225) + + if __name__ == "__main__": unittest.main()