DTU and Inverter model parsing

This commit is contained in:
suaveolent 2024-03-13 16:59:57 +00:00
parent d5d63339e8
commit a581179240
7 changed files with 278 additions and 160 deletions

View File

@ -0,0 +1,40 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/python
{
"name": "Python 3",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/python:1-3.12-bullseye",
"features": {
"ghcr.io/devcontainers-contrib/features/coverage-py:2": {}
},
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],
// Use 'postCreateCommand' to run commands after the container is created.
"postCreateCommand": "pip install -e .",
// Configure tool-specific properties.
"customizations": {
"vscode": {
"extensions": [
"ms-python.python"
],
"settings": {
"python.testing.pytestArgs": [
"."
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.formatting.provider": "black",
"python.linting.mypyEnabled": true,
"python.linting.enabled": true
}
}
}
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}

12
.github/dependabot.yml vendored Normal file
View File

@ -0,0 +1,12 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for more information:
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
# https://containers.dev/guide/dependabot
version: 2
updates:
- package-ecosystem: "devcontainers"
directory: "/"
schedule:
interval: weekly

View File

@ -6,11 +6,12 @@ from google.protobuf.message import Message
from google.protobuf.json_format import MessageToJson
from hoymiles_wifi.inverter import Inverter
from hoymiles_wifi.utils import (
from hoymiles_wifi.hoymiles import (
generate_version_string,
generate_sw_version_string,
generate_dtu_version_string,
get_hw_model_name,
get_dtu_model_name,
)
from hoymiles_wifi.const import (
@ -186,8 +187,6 @@ def print_invalid_command(command):
async def main():
get_hw_model_name("116473511341")
parser = argparse.ArgumentParser(description="Hoymiles HMS Monitoring")
parser.add_argument(
"--host", type=str, required=True, help="IP address or hostname of the inverter"

222
hoymiles_wifi/hoymiles.py Normal file
View File

@ -0,0 +1,222 @@
""""Hoymiles quirks for inverters and DTU"""
from enum import Enum
import struct
from hoymiles_wifi import logger
class InverterType(Enum):
ONE = "1T"
TWO = "2T"
FOUR = "4T"
SIX = "6T"
class InverterSeries(Enum):
HM = "HM"
HMS = "HMS"
HMT = "HMT"
class InverterPower(Enum):
P_100 = "100"
P_250 = "250"
P_300_350_400 = "300/350/400"
P_400 = "400"
P_500 = "500"
P_600_700_800 = "600/700/800"
P_800W = "800W"
P_1000 = "1000"
P_1000_1200_1500 = "1000/1200/1500"
P_1200_1500 = "1200/1500"
P_1600 = "1600"
P_2000 = "2000"
class DTUType(Enum):
DTU_G100 = "DTU-G100"
DTU_W100 = "DTU-W100"
DTU_LITE_S = "DTU-Lite-S"
DTU_LITE = "DTU-Lite"
DTU_PRO = "DTU-PRO"
DTU_PRO_S = "DTU-PRO-S"
DTU_HMS_W = "DTU-HMS-W"
DTU_W100_LITE_S = "DTU-W100/DTU-Lite-S"
DTU_W_LITE = "DTU-WLite"
def format_number(number) -> str:
return "{:02d}".format(number)
def generate_version_string(version_number: int) -> str:
version_string = format_number(version_number // 2048) + "." + format_number((version_number // 64) % 32) + "." + format_number(version_number % 64)
return version_string
def generate_sw_version_string(version_number: int) -> str:
version_number2 = version_number // 10000
version_number3 = (version_number - (version_number2 * 10000)) // 100
version_number4 = (version_number - (version_number2 * 10000)) - (version_number3 * 100)
version_string = format_number(version_number2) + "." + format_number(version_number3) + "." + format_number(version_number4)
return version_string
def generate_dtu_version_string(version_number: int, type: str = None) -> str:
version_string = ""
version_number2 = version_number % 256
version_number3 = (version_number // 256) % 16
if "SRF" == str:
version_string += f"{format_number(version_number // 1048576)}.{format_number((version_number % 65536) // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
elif "HRF" == str:
version_string += f"{format_number(version_number // 65536)}.{format_number((version_number % 65536) // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
else:
version_string += f"{format_number(version_number // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
return version_string
def generate_inverter_serial_number(serial_number: int) -> str:
return hex(serial_number)[2:]
def get_inverter_type(serial_bytes: bytes) -> InverterType:
inverter_type = None
# Access individual bytes
if serial_bytes[0] == 0x11:
if serial_bytes[1] in [0x25, 0x24, 0x22, 0x21]:
inverter_type = InverterType.ONE
elif serial_bytes[1] in [0x44, 0x42, 0x41]:
inverter_type = InverterType.TWO
elif serial_bytes[1] in [0x64, 0x62, 0x61]:
inverter_type = InverterType.FOUR
elif serial_bytes[0] == 0x13:
inverter_type = InverterType.SIX
elif serial_bytes[0] == 0x14:
if serial_bytes[1] in [0x12]:
inverter_type = InverterType.TWO
if inverter_type == None:
raise ValueError(f"Unknown inverter type: {hex(serial_bytes[0])} {hex(serial_bytes[1])}")
return inverter_type
def get_inverter_series(serial_bytes: bytes) -> InverterSeries:
series = None
if serial_bytes[0] == 0x11:
if (serial_bytes[1] & 0x0f) == 0x04:
series = InverterSeries.HMS
else:
series = InverterSeries.HM
elif serial_bytes[0] == 0x10:
if serial_bytes[1] & 0x03 == 0x02:
series = InverterSeries.HM
else:
series = InverterSeries.HMS
elif serial_bytes[0] == 0x13:
series = InverterSeries.HMT
elif serial_bytes[0] == 0x14:
series = InverterSeries.HMS
if series is None:
raise ValueError(f"Unknown series: {hex(serial_bytes[0])} {hex(serial_bytes[1])}!")
return series
def get_inverter_power(serial_bytes: bytes) -> InverterPower:
inverter_type_bytes = struct.unpack('>H', serial_bytes[:2])[0]
power = None
if inverter_type_bytes in [0x1011]:
power = InverterPower.P_100
elif inverter_type_bytes in [0x1020]:
power = InverterPower.P_250
elif inverter_type_bytes in [0x1021, 0x1121]:
power = InverterPower.P_300_350_400
elif inverter_type_bytes in [0x1125]:
power = InverterPower.P_400
elif inverter_type_bytes in [0x1040]:
power = InverterPower.P_500
elif inverter_type_bytes in [0x1041, 0x1042, 0x1141]:
power = InverterPower.P_600_700_800
elif inverter_type_bytes in [0x1060]:
power = InverterPower.P_1000
elif inverter_type_bytes in [0x1061]:
power = InverterPower.P_1200_1500
elif inverter_type_bytes in [0x1161]:
power = InverterPower.P_1000_1200_1500
elif inverter_type_bytes in [0x1164]:
power = InverterPower.P_1600
elif inverter_type_bytes in [0x1412]:
power = InverterPower.P_800W
if power is None:
raise ValueError(f"Unknown power: {hex(serial_bytes[0])} {hex(serial_bytes[1])}!")
return power
def get_hw_model_name(serial_number: str) -> str:
if(serial_number == "22069994886948"):
serial_number = generate_inverter_serial_number(int(serial_number))
serial_bytes = bytes.fromhex(serial_number)
try:
inverter_type = get_inverter_type(serial_bytes)
inverter_series = get_inverter_series(serial_bytes)
inverter_power = get_inverter_power(serial_bytes)
except Exception as e:
logger.error(e)
return "Unknown"
else:
inverter_model_name = inverter_series.value + "-" + inverter_power.value + "-" + inverter_type.value
return inverter_model_name
def get_dtu_model_type(serial_bytes: bytes) -> DTUType:
dtu_type = None
dtu_type_bytes = struct.unpack('>H', serial_bytes[:2])[0]
if (dtu_type_bytes in [0x10F7] or
dtu_type_bytes in [0x10FB, 0x4101, 0x10FC, 0x4120] or
dtu_type_bytes in [0x10F8, 0x4100, 0x10FD, 0x4121]):
dtu_type = DTUType.DTU_PRO
elif dtu_type_bytes in [0x10D3, 0x4110, 0x10D8, 0x4130, 0x4132, 0x4133, 0x10D9, 0x4111]:
dtu_type = DTUType.DTU_W100_LITE_S
elif dtu_type_bytes in [0x10D2]:
dtu_type = DTUType.DTU_G100
elif dtu_type_bytes in [0x10D6, 0x10D7, 0x4131]:
dtu_type = DTUType.DTU_LITE
elif (dtu_type_bytes in [0x1124, 0x1125, 0x1403] or
dtu_type_bytes in [0x1144, 0x1143, 0x1145, 0x1412] or
dtu_type_bytes in [0x1164, 0x1165, 0x1166, 0x1167, 0x1222, 0x1422, 0x1423] or
dtu_type_bytes in [0x1361, 0x1362] or
dtu_type_bytes in [0x1381, 0x1382] or
dtu_type_bytes in [0x4143]):
dtu_type = DTUType.DTU_HMS_W
if dtu_type is None:
raise ValueError(f"Unknown DTU: {serial_bytes[:2]}!")
return dtu_type
def get_dtu_model_name(serial_number: str) -> str:
serial_bytes = bytes.fromhex(serial_number)
try:
dtu_type = get_dtu_model_type(serial_bytes)
except Exception as e:
logger.error(e)
return "Unknown"
else:
return dtu_type.value

View File

@ -44,7 +44,6 @@ from hoymiles_wifi.const import (
CMD_ACTION_MI_SHUTDOWN,
DTU_FIRMWARE_URL_00_01_11,
CMD_HB_RES_DTO,
CMD_CLOUD_INFO_DATA_RES_DTO,
DEV_DTU,
)

View File

@ -5,83 +5,6 @@ from hoymiles_wifi.protobuf import (
SetConfig_pb2,
)
devInfo = [
([0x10, 0x10, 0x10, "ALL"], 300, "HM-300-1T"),
([0x10, 0x10, 0x20, "ALL"], 350, "HM-350-1T"),
([0x10, 0x10, 0x30, "ALL"], 400, "HM-400-1T"),
([0x10, 0x10, 0x40, "ALL"], 400, "HM-400-1T"),
([0x10, 0x11, 0x10, "ALL"], 600, "HM-600-2T"),
([0x10, 0x11, 0x20, "ALL"], 700, "HM-700-2T"),
([0x10, 0x11, 0x30, "ALL"], 800, "HM-800-2T"),
([0x10, 0x11, 0x40, "ALL"], 800, "HM-800-2T"),
([0x10, 0x12, 0x10, "ALL"], 1200, "HM-1200-4T"),
([0x10, 0x02, 0x30, "ALL"], 1500, "MI-1500-4T Gen3"),
([0x10, 0x12, 0x30, "ALL"], 1500, "HM-1500-4T"),
([0x10, 0x10, 0x10, 0x15], int(300 * 0.7), "HM-300-1T"), # HM-300 factory limited to 70%
([0x10, 0x20, 0x11, "ALL"], 300, "HMS-300-1T"), # 00
([0x10, 0x20, 0x21, "ALL"], 350, "HMS-350-1T"), # 00
([0x10, 0x20, 0x41, "ALL"], 400, "HMS-400-1T"), # 00
([0x10, 0x10, 0x51, "ALL"], 450, "HMS-450-1T"), # 01
([0x10, 0x20, 0x51, "ALL"], 450, "HMS-450-1T"), # 03
([0x10, 0x10, 0x71, "ALL"], 500, "HMS-500-1T"), # 02
([0x10, 0x20, 0x71, "ALL"], 500, "HMS-500-1T v2"), # 02
([0x10, 0x21, 0x11, "ALL"], 600, "HMS-600-2T"), # 01
([0x10, 0x21, 0x41, "ALL"], 800, "HMS-800-2T"), # 00
([0x10, 0x11, 0x51, "ALL"], 900, "HMS-900-2T"), # 01
([0x10, 0x21, 0x51, "ALL"], 900, "HMS-900-2T"), # 03
([0x10, 0x21, 0x71, "ALL"], 1000, "HMS-1000-2T"), # 05
([0x10, 0x11, 0x71, "ALL"], 1000, "HMS-1000-2T"), # 01
([0x10, 0x22, 0x41, "ALL"], 1600, "HMS-1600-4T"), # 4
([0x10, 0x12, 0x51, "ALL"], 1800, "HMS-1800-4T"), # 01
([0x10, 0x22, 0x51, "ALL"], 1800, "HMS-1800-4T"), # 16
([0x10, 0x12, 0x71, "ALL"], 2000, "HMS-2000-4T"), # 01
([0x10, 0x22, 0x71, "ALL"], 2000, "HMS-2000-4T"), # 10
([0x10, 0x32, 0x41, "ALL"], 1600, "HMT-1600-4T"), # 00
([0x10, 0x32, 0x51, "ALL"], 1800, "HMT-1800-4T"), # 00
([0x10, 0x32, 0x71, "ALL"], 2000, "HMT-2000-4T"), # 0
([0x10, 0x33, 0x11, "ALL"], 1800, "HMT-1800-6T"), # 01
([0x10, 0x33, 0x31, "ALL"], 2250, "HMT-2250-6T") # 01
]
def format_number(number) -> str:
return "{:02d}".format(number)
def generate_version_string(version_number: int) -> str:
version_string = format_number(version_number // 2048) + "." + format_number((version_number // 64) % 32) + "." + format_number(version_number % 64)
return version_string
def generate_sw_version_string(version_number: int) -> str:
version_number2 = version_number // 10000
version_number3 = (version_number - (version_number2 * 10000)) // 100
version_number4 = (version_number - (version_number2 * 10000)) - (version_number3 * 100)
version_string = format_number(version_number2) + "." + format_number(version_number3) + "." + format_number(version_number4)
return version_string
def generate_dtu_version_string(version_number: int, type: str = None) -> str:
version_string = ""
version_number2 = version_number % 256
version_number3 = (version_number // 256) % 16
if "SRF" == str:
version_string += f"{format_number(version_number // 1048576)}.{format_number((version_number % 65536) // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
elif "HRF" == str:
version_string += f"{format_number(version_number // 65536)}.{format_number((version_number % 65536) // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
else:
version_string += f"{format_number(version_number // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
return version_string
def generate_inverter_serial_number(serial_number: int) -> str:
return hex(serial_number)[2:]
def initialize_set_config(get_config_req: GetConfig_pb2.GetConfigReqDTO):
set_config_res = SetConfig_pb2.SetConfigResDTO()
set_config_res.lock_password = get_config_req.lock_password
@ -135,82 +58,5 @@ def initialize_set_config(get_config_req: GetConfig_pb2.GetConfigReqDTO):
return set_config_res
ALL = 0xff
dev_info = [
([0x10, 0x10, 0x10, ALL], 300, "HM-300-1T"),
([0x10, 0x10, 0x20, ALL], 350, "HM-350-1T"),
([0x10, 0x10, 0x30, ALL], 400, "HM-400-1T"),
([0x10, 0x10, 0x40, ALL], 400, "HM-400-1T"),
([0x10, 0x11, 0x10, ALL], 600, "HM-600-2T"),
([0x10, 0x11, 0x20, ALL], 700, "HM-700-2T"),
([0x10, 0x11, 0x30, ALL], 800, "HM-800-2T"),
([0x10, 0x11, 0x40, ALL], 800, "HM-800-2T"),
([0x10, 0x12, 0x10, ALL], 1200, "HM-1200-4T"),
([0x10, 0x02, 0x30, ALL], 1500, "MI-1500-4T Gen3"),
([0x10, 0x12, 0x30, ALL], 1500, "HM-1500-4T"),
([0x10, 0x10, 0x10, 0x15], int(300 * 0.7), "HM-300-1T"), # HM-300 factory limited to 70%
([0x10, 0x20, 0x11, ALL], 300, "HMS-300-1T"), # 00
([0x10, 0x20, 0x21, ALL], 350, "HMS-350-1T"), # 00
([0x10, 0x20, 0x41, ALL], 400, "HMS-400-1T"), # 00
([0x10, 0x10, 0x51, ALL], 450, "HMS-450-1T"), # 01
([0x10, 0x20, 0x51, ALL], 450, "HMS-450-1T"), # 03
([0x10, 0x10, 0x71, ALL], 500, "HMS-500-1T"), # 02
([0x10, 0x20, 0x71, ALL], 500, "HMS-500-1T v2"), # 02
([0x10, 0x21, 0x11, ALL], 600, "HMS-600-2T"), # 01
([0x10, 0x21, 0x41, ALL], 800, "HMS-800-2T"), # 00
([0x10, 0x11, 0x51, ALL], 900, "HMS-900-2T"), # 01
([0x10, 0x21, 0x51, ALL], 900, "HMS-900-2T"), # 03
([0x10, 0x21, 0x71, ALL], 1000, "HMS-1000-2T"), # 05
([0x10, 0x11, 0x71, ALL], 1000, "HMS-1000-2T"), # 01
([0x10, 0x22, 0x41, ALL], 1600, "HMS-1600-4T"), # 4
([0x10, 0x12, 0x51, ALL], 1800, "HMS-1800-4T"), # 01
([0x10, 0x22, 0x51, ALL], 1800, "HMS-1800-4T"), # 16
([0x10, 0x12, 0x71, ALL], 2000, "HMS-2000-4T"), # 01
([0x10, 0x22, 0x71, ALL], 2000, "HMS-2000-4T"), # 10
([0x10, 0x32, 0x41, ALL], 1600, "HMT-1600-4T"), # 00
([0x10, 0x32, 0x51, ALL], 1800, "HMT-1800-4T"), # 00
([0x10, 0x32, 0x71, ALL], 2000, "HMT-2000-4T"), # 0
([0x10, 0x33, 0x11, ALL], 1800, "HMT-1800-6T"), # 01
([0x10, 0x33, 0x31, ALL], 2250, "HMT-2250-6T") # 01
]
def convert_serial_to_hex(serial_number: str):
# Convert the serial number from string to int and then to hex
return [int(serial_number[i:i + 2], 16) for i in range(0, len(serial_number), 2)]
def get_dev_idx(serial_number: str):
ret = 0xff
pos = 0
# Convert serial number to hex
serial_hex = convert_serial_to_hex(serial_number)
print(serial_hex)
# Check for all 4 bytes first
for pos in range(len(dev_info)):
if dev_info[pos][0] == serial_hex:
ret = pos
break
# Then only for 3 bytes but only if not already found
if ret == 0xff:
for pos in range(len(dev_info)):
if dev_info[pos][0][:3] == serial_hex[:3]:
ret = pos
break
return ret
def get_hw_model_name(serial_number: str):
# Convert serial number to hex
idx = get_dev_idx(serial_number)
if idx == 0xff:
return ""
return dev_info[idx][2]

View File

@ -5,10 +5,10 @@ setup(
packages=['hoymiles_wifi', 'hoymiles_wifi.protobuf'],
install_requires=[
'protobuf',
'crcmod',
'crcmod'
],
version='0.1.7',
description='A python library for interfacing with Hoymiles HMS-XXXXW-T2 series of micro-inverters.',
description='A python library for interfacing with Hoymiles HMS-XXXXW-2T series of micro-inverters.',
author='suaveolent',
include_package_data=True,
entry_points={