Skip to content

Commit

Permalink
Add support for POWER4 report "responses"
Browse files Browse the repository at this point in the history
  • Loading branch information
mill1000 committed Mar 3, 2024
1 parent 315a8fa commit ccda5f9
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 12 deletions.
93 changes: 82 additions & 11 deletions msmart/device/C3/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@

_LOGGER = logging.getLogger(__name__)

# Useful acronyms
# DHW - Domestic hot water
# TBH - Tank booster heater
# IBH - Internal backup heater


class ControlType(IntEnum):
CONTROL_BASIC = 0x1
Expand Down Expand Up @@ -40,6 +45,13 @@ class QueryType(IntEnum):
QUERY_UNIT_PARAMETERS = 0x10


class ReportType(IntEnum): # Ref: MSG_TYPE_UP
REPORT_BASIC = 0x1
REPORT_POWER3 = 0x3
REPORT_POWER4 = 0x4
REPORT_UNIT_PARAMETERS = 0x5


class QueryCommand(Frame):
"""Base class for query commands."""

Expand Down Expand Up @@ -69,7 +81,7 @@ def __init__(self) -> None:


class QueryUnitParametersCommand(QueryCommand):
"""Command to query ECO state."""
"""Command to query unit parameters."""

def __init__(self) -> None:
super().__init__(QueryType.QUERY_UNIT_PARAMETERS)
Expand Down Expand Up @@ -161,18 +173,21 @@ def validate(cls, frame: memoryview) -> None:
Frame.validate(frame)

@classmethod
def construct(cls, frame: bytes) -> Union[QueryBasicResponse, QueryUnitParametersResponse, Response]:
def construct(cls, frame: bytes) -> Union[QueryBasicResponse, QueryUnitParametersResponse, ReportPower4Response, Response]:
# Build a memoryview of the frame for zero-copy slicing
with memoryview(frame) as frame_mv:
# Ensure frame is valid before parsing
Response.validate(frame_mv)

# Parse frame depending on id
frame_type = frame_mv[9]
type = frame_mv[10]
if type == QueryType.QUERY_BASIC:
if frame_type == FrameType.QUERY and type == QueryType.QUERY_BASIC:
return QueryBasicResponse(frame_mv)
elif type == QueryType.QUERY_UNIT_PARAMETERS:
elif frame_type == FrameType.QUERY and type == QueryType.QUERY_UNIT_PARAMETERS:
return QueryUnitParametersResponse(frame_mv)
elif frame_type == FrameType.REPORT and type == ReportType.REPORT_POWER4:
return ReportPower4Response(frame_mv)
else:
return Response(frame_mv)

Expand All @@ -183,7 +198,7 @@ class QueryBasicResponse(Response):
def __init__(self, frame: memoryview) -> None:
super().__init__(frame)

_LOGGER.debug("Query basic payload: %s", self.payload.hex())
_LOGGER.debug("Query basic response payload: %s", self.payload.hex())

with memoryview(self.payload) as payload:
self._parse(payload)
Expand Down Expand Up @@ -264,13 +279,69 @@ def _parse(self, payload: memoryview) -> None:
self.zone2_curve_type = payload[26]


class ReportPower4Response(Response):
"""Unsolicited report of POWER4."""

def __init__(self, frame: memoryview) -> None:
super().__init__(frame)

_LOGGER.debug("Power4 report payload: %s", self.payload.hex())

with memoryview(self.payload) as payload:
self._parse(payload)

def _parse(self, payload: memoryview) -> None:

# Local function to convert byte to signed int
def signed_int(data) -> int:
return struct.unpack("b", data)[0]

# TODO do these indicate current activity, or power state
self.heat_active = bool(payload[1] & 0x01) # Ref: isheatrun0
self.cool_active = bool(payload[1] & 0x02) # Ref: iscoolrun0
self.dhw_active = bool(payload[1] & 0x04) # Ref: isdhwrun0
self.tbh_active = bool(payload[1] & 0x08) # Ref: istbhrun0

# TODO isibhrun0, issmartgrid0m, ishighprices0, isbottomprices0

# Ref: totalelectricity0
self.electric_power = struct.unpack(">I", payload[2:6])[0]
# Ref: totalthermal0
self.thermal_power = struct.unpack(">I", payload[6:10])[0]

# TODO water and air temperatures may need to be checked for validity
self.outdoor_air_temperature = signed_int(payload[10:11]) # Ref: t4

self.zone1_target_temperature = payload[11]
self.zone2_target_temperature = payload[12]

self.water_tank_temperature = payload[13] # Ref: t5s
# TODO payload[14] Ref: tas

# TODO What does online mean?
self.online = bool(payload[17] & 0x01) # Ref: isonline0

# Bytes 19 - 153 contain run status and energy 1 - 15

# Bytes 154, 155 contain run status of ibh2 0-15

self.voltage = payload[156] # Ref: voltage0
# voltage0-15 bytes 157-171

# TODO
# self.ibh1_power = payload[172] # Ref: power_ibh1
# self.ibh2_power = payload[173] # Ref: power_ibh1
# self.tbh_power = payload[174] # Ref: power_ibh1


class QueryUnitParametersResponse(Response):
"""Response to unit parameters query."""

def __init__(self, frame: memoryview) -> None:
super().__init__(frame)

_LOGGER.debug("Query unit parameters payload: %s", self.payload.hex())
_LOGGER.debug("Query unit parameters response payload: %s",
self.payload.hex())

with memoryview(self.payload) as payload:
self._parse(payload)
Expand All @@ -280,11 +351,11 @@ def _parse(self, payload: memoryview) -> None:
# There are many fields of this response that are unused and thus not parsed

# Local function to convert byte to signed int
def signed_int(data):
def signed_int(data) -> int:
return struct.unpack("b", data)[0]

self.outdoor_temperature = signed_int(payload[8]) # Ref: tempT4
self.water_temperature_2 = signed_int(payload[11]) # Ref: tempTwout
self.outdoor_temperature = signed_int(payload[8:9]) # Ref: tempT4
self.water_temperature_2 = signed_int(payload[11:12]) # Ref: tempTwout
# Referenced in JS w/o friendly name
self.tempT5 = signed_int(payload[38])
self.room_temperature = signed_int(payload[39]) # Ref: tempTa
self.tempT5 = signed_int(payload[38:39])
self.room_temperature = signed_int(payload[39:40]) # Ref: tempTa
32 changes: 31 additions & 1 deletion msmart/device/C3/test_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import unittest
from typing import Union, cast

from .command import QueryBasicResponse, Response
from .command import QueryBasicResponse, ReportPower4Response, Response


class _TestResponseBase(unittest.TestCase):
Expand Down Expand Up @@ -47,5 +47,35 @@ def test_message(self) -> None:
self.assertEqual(type(resp), QueryBasicResponse)


class TestPower4Response(_TestResponseBase):
"""Test POWER4 report messages."""

# Attributes expected in state response objects
EXPECTED_ATTRS = []

def _test_response(self, msg) -> ReportPower4Response:
resp = self._test_build_response(msg)
# self._test_check_attributes(resp, self.EXPECTED_ATTRS)
return cast(ReportPower4Response, resp)

def test_message(self) -> None:
# https://github.com/mill1000/midea-msmart/issues/107#issuecomment-1962036384
# Unsolicited report with POWER4 payload
TEST_MESSAGE = bytes.fromhex(
"aab9c3000000000000040400000012fc000023aa0b201e2930ffff01000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e10000000000000000000000000000000000140b")
resp = self._test_response(TEST_MESSAGE)

# Assert response is a state response
self.assertEqual(type(resp), ReportPower4Response)

self.assertEqual(resp.electric_power, 4860)
self.assertEqual(resp.thermal_power, 9130)

self.assertEqual(resp.outdoor_air_temperature, 11)
self.assertEqual(resp.water_tank_temperature, 41)

self.assertEqual(resp.voltage, 225)


if __name__ == "__main__":
unittest.main()

0 comments on commit ccda5f9

Please sign in to comment.