Merge pull request #5 from suaveolent/hw-model

Refactoring & Support to identify DTU and Inverter model
This commit is contained in:
suaveolent 2024-03-14 15:57:42 +01:00 committed by GitHub
commit 87b709439b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 907 additions and 352 deletions

View File

@ -0,0 +1,40 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/python
{
"name": "Python 3",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/python:1-3.12-bullseye",
"features": {
"ghcr.io/devcontainers-contrib/features/coverage-py:2": {}
},
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],
// Use 'postCreateCommand' to run commands after the container is created.
"postCreateCommand": "pip install -e .",
// Configure tool-specific properties.
"customizations": {
"vscode": {
"extensions": [
"ms-python.python"
],
"settings": {
"python.testing.pytestArgs": [
"."
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.formatting.provider": "ruff",
"python.linting.mypyEnabled": true,
"python.linting.enabled": true
}
}
}
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}

12
.github/dependabot.yml vendored Normal file
View File

@ -0,0 +1,12 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for more information:
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
# https://containers.dev/guide/dependabot
version: 2
updates:
- package-ecosystem: "devcontainers"
directory: "/"
schedule:
interval: weekly

View File

@ -1,7 +1,6 @@
# hoymiles-wifi
This Python library facilitates communication with Hoymiles HMS microinverters, specifically targeting the HMS-XXXXW-2T series.
This Python library facilitates communication with Hoymiles DTUs and the HMS-XXXXW-2T HMS microinverters, utilizing protobuf messages.
**Disclaimer: This library is not affiliated with Hoymiles. It is an independent project developed to provide tools for interacting with Hoymiles HMS-XXXXW-2T series micro-inverters featuring integrated WiFi DTU. Any trademarks or product names mentioned are the property of their respective owners.**
@ -9,9 +8,8 @@ This Python library facilitates communication with Hoymiles HMS microinverters,
The library was successfully tested with:
- Hoymiles HMS-800W-2T
- Hoymiles DTU WLite
- Hoymiles HMS-800W-2T
- Hoymiles DTU WLite
## Installation
@ -39,12 +37,14 @@ commands:
set-power-limit,
set-wifi,
firmware-update,
restart,
turn-on,
turn-off,
restart-dtu,
turn-on-inverter,
turn-off-inverter,
get-information-data,
get-version-info,
heartbeat,
identify-dtu,
identify-inverters
The `--as-json` option is optional and allows formatting the output as JSON.
```
@ -52,18 +52,19 @@ The `--as-json` option is optional and allows formatting the output as JSON.
### Python code
```
from hoymiles_wifi.inverter import Inverter
from hoymiles_wifi.dtu import DTU
...
inverter = Inverter(<ip_address>)
response = await inverter.<command>
dtu = DTU(<ip_address>)
response = await dtu.<command>
if response:
print(f"Inverter Response: {response}")
print(f"DTU Response: {response}")
else:
print("Unable to get response!")
```
#### Available functions
- `async_get_real_data_new()`: Retrieve real-time data
- `async_get_real_data_hms()`: Retrieve real-time data
- `async_get_real_data()`: Retrieve real-time data
@ -74,23 +75,22 @@ else:
- `async_set_power_limit(power_limit)`: Set the power limit of the inverter (0-100%)
- `async_set_wifi(wifi_ssid, wifi_password)`: Configure the wifi network
- `async_firmware_update()`: Update to latest firmware
- `async_restart`: Restart the inverter
- `async_turn_on`: Turn the inverter on
- `async_turn_off`: Turn the inverter off
- `async_restart_dtu`: Restart the DTU
- `async_turn_on_inverter`: Turn the inverter on
- `async_turn_off_inverter`: Turn the inverter off
- `async_get_information_data`: Retrieve information data
- `async_heartbeat`: Request a heartbeat message from the DTU
## Note
Please be aware of the following considerations:
- No DTU Implementation: This library
retrieves information directly from the internal DTU of Hoymiles Wifi
inverters.
- No DTU Implementation: This library retrieves information directly from the (internal) DTU of Hoymiles Wifi inverters.
## Caution
Use this library responsibly and be aware of potential risks. There are no guarantees provided, and any misuse or incorrect implementation may result in undesirable outcomes. Ensure that your inverter is not compromised during communication.
## Known Limitations
**Update Frequency:** The library may experience limitations in fetching updates, potentially around twice per minute. The inverter firmware may enforce a mandatory wait period of approximately 30 seconds between requests.
@ -100,5 +100,6 @@ Use this library responsibly and be aware of potential risks. There are no guara
## Attribution
A special thank you for the inspiration and codebase to:
- [DennisOSRM](https://github.com/DennisOSRM): [hms-mqtt-publisher](https://github.com/DennisOSRM/hms-mqtt-publisher)
- [henkwiedig](https://github.com/henkwiedig): [Hoymiles-DTU-Proto](https://github.com/henkwiedig/Hoymiles-DTU-Proto)
- [DennisOSRM](https://github.com/DennisOSRM): [hms-mqtt-publisher](https://github.com/DennisOSRM/hms-mqtt-publisher)
- [henkwiedig](https://github.com/henkwiedig): [Hoymiles-DTU-Proto](https://github.com/henkwiedig/Hoymiles-DTU-Proto)

View File

@ -1,3 +1,5 @@
"""Init file for hoymiles_wifi package."""
import logging
logging.basicConfig(level=logging.INFO)

View File

@ -1,30 +1,54 @@
"""Contains the main functionality of the hoymiles_wifi package."""
from __future__ import annotations
import argparse
import asyncio
from dataclasses import dataclass, asdict
import json
from google.protobuf.message import Message
from google.protobuf.json_format import MessageToJson
from hoymiles_wifi.inverter import Inverter
from dataclasses import asdict, dataclass
from hoymiles_wifi.utils import (
generate_version_string,
generate_sw_version_string,
from google.protobuf.json_format import MessageToJson
from google.protobuf.message import Message
from hoymiles_wifi.const import DTU_FIRMWARE_URL_00_01_11, MAX_POWER_LIMIT
from hoymiles_wifi.dtu import DTU
from hoymiles_wifi.hoymiles import (
generate_dtu_version_string,
generate_inverter_serial_number,
generate_sw_version_string,
generate_version_string,
get_dtu_model_name,
get_inverter_model_name,
)
from hoymiles_wifi.protobuf import (
AppGetHistPower_pb2,
APPHeartbeatPB_pb2,
APPInfomationData_pb2,
CommandPB_pb2,
GetConfig_pb2,
InfomationData_pb2,
NetworkInfo_pb2,
RealData_pb2,
RealDataHMS_pb2,
RealDataNew_pb2,
)
from hoymiles_wifi.const import (
DTU_FIRMWARE_URL_00_01_11
)
RED = "\033[91m"
END = "\033[0m"
@dataclass
class VersionInfo:
"""Represents version information for the hoymiles_wifi package."""
dtu_hw_version: str
dtu_sw_version: str
inverter_hw_version: str
inverter_sw_version: str
def __str__(self):
def __str__(self: VersionInfo) -> str:
"""Return a string representation of the VersionInfo object."""
return (
f'dtu_hw_version: "{self.dtu_hw_version}"\n'
f'dtu_sw_version: "{self.dtu_sw_version}"\n'
@ -32,91 +56,132 @@ class VersionInfo:
f'inverter_sw_version: "{self.inverter_sw_version}"\n'
)
def to_dict(self):
def to_dict(self: VersionInfo) -> dict:
"""Convert the VersionInfo object to a dictionary."""
return asdict(self)
# Inverter commands
async def async_get_real_data_new(inverter):
return await inverter.async_get_real_data_new()
async def async_get_real_data_new(
dtu: DTU,
) -> RealDataNew_pb2.RealDataNewResDTO | None:
"""Get real data from the inverter asynchronously."""
async def async_get_real_data_hms(inverter):
return await inverter.async_get_real_data_hms()
return await dtu.async_get_real_data_new()
async def async_get_real_data(inverter):
return await inverter.async_get_real_data()
async def async_get_config(inverter):
return await inverter.async_get_config()
async def async_get_real_data_hms(
dtu: DTU,
) -> RealDataHMS_pb2.RealDataHMSResDTO | None:
"""Get real data from the inverter asynchronously."""
async def async_network_info(inverter):
return await inverter.async_network_info()
return await dtu.async_get_real_data_hms()
async def async_app_information_data(inverter):
return await inverter.async_app_information_data()
async def async_app_get_hist_power(inverter):
return await inverter.async_app_get_hist_power()
async def async_get_real_data(dtu: DTU) -> RealData_pb2.RealDataResDTO | None:
"""Get real data from the inverter asynchronously."""
async def async_set_power_limit(inverter):
return await dtu.async_get_real_data()
RED = '\033[91m'
END = '\033[0m'
print(RED + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + END)
print(RED + "!!! Danger zone! This will change the power limit of the inverter. !!!" + END)
print(RED + "!!! Please be careful and make sure you know what you are doing. !!!" + END)
print(RED + "!!! Only proceed if you know what you are doing. !!!" + END)
print(RED + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + END)
print("")
async def async_get_config(dtu: DTU) -> GetConfig_pb2.GetConfigResDTO | None:
"""Get the config from the inverter asynchronously."""
return await dtu.async_get_config()
async def async_network_info(
dtu: DTU,
) -> NetworkInfo_pb2.NetworkInfoResDTO | None:
"""Get network information from the inverter asynchronously."""
return await dtu.async_network_info()
async def async_app_information_data(
dtu: DTU,
) -> APPInfomationData_pb2.AppInfomationDataResDTO | None:
"""Get application information data from the inverter asynchronously."""
return await dtu.async_app_information_data()
async def async_app_get_hist_power(
dtu: DTU,
) -> AppGetHistPower_pb2.AppGetHistPowerResDTO:
"""Get historical power data from the inverter asynchronously."""
return await dtu.async_app_get_hist_power()
async def async_set_power_limit(
dtu: DTU,
) -> CommandPB_pb2.CommandResDTO | None:
"""Set the power limit of the inverter asynchronously."""
print( # noqa: T201
RED
+ "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
+ "!!! Danger zone! This will change the power limit of the dtu. !!!\n"
+ "!!! Please be careful and make sure you know what you are doing. !!!\n"
+ "!!! Only proceed if you know what you are doing. !!!\n"
+ "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
+ END,
)
cont = input("Do you want to continue? (y/n): ")
if(cont != 'y'):
return
if cont != "y":
return None
power_limit = int(input("Enter the new power limit (0-100): "))
if(power_limit < 0 or power_limit > 100):
print("Error. Invalid power limit!")
return
if power_limit < 0 or power_limit > MAX_POWER_LIMIT:
print("Error. Invalid power limit!") # noqa: T201
return None
print(f"Setting power limit to {power_limit}%")
print(f"Setting power limit to {power_limit}%") # noqa: T201
cont = input("Are you sure? (y/n): ")
if(cont != 'y'):
return
if cont != "y":
return None
return await inverter.async_set_power_limit(power_limit)
return await dtu.async_set_power_limit(power_limit)
async def async_set_wifi(inverter):
async def async_set_wifi(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Set the wifi SSID and password of the inverter asynchronously."""
wifi_ssid = input("Enter the new wifi SSID: ").strip()
wifi_password = input("Enter the new wifi password: ").strip()
print(f'Setting wifi to "{wifi_ssid}"')
print(f'Setting wifi password to "{wifi_password}"')
print(f'Setting wifi to "{wifi_ssid}"') # noqa: T201
print(f'Setting wifi password to "{wifi_password}"') # noqa: T201
cont = input("Are you sure? (y/n): ")
if(cont != 'y'):
return
return await inverter.async_set_wifi(wifi_ssid, wifi_password)
if cont != "y":
return None
return await dtu.async_set_wifi(wifi_ssid, wifi_password)
async def async_firmware_update(inverter):
RED = '\033[91m'
END = '\033[0m'
print(RED + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + END)
print(RED + "!!! Danger zone! This will update the firmeware of the DTU. !!!" + END)
print(RED + "!!! Please be careful and make sure you know what you are doing. !!!" + END)
print(RED + "!!! Only proceed if you know what you are doing. !!!" + END)
print(RED + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + END)
print("")
async def async_firmware_update(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Update the firmware of the DTU asynchronously."""
print( # noqa: T201
RED
+ "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
+ "!!! Danger zone! This will update the firmeware of the DTU. !!!\n"
+ "!!! Please be careful and make sure you know what you are doing. !!!\n"
+ "!!! Only proceed if you know what you are doing. !!!\n"
+ "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
+ END,
)
cont = input("Do you want to continue? (y/n): ")
if(cont != 'y'):
return
if cont != "y":
return None
print("Please select a firmware version:")
print("1.) V00.01.11")
print("2.) Custom URL")
print("Please select a firmware version:") # noqa: T201
print("1.) V00.01.11") # noqa: T201
print("2.) Custom URL") # noqa: T201
while True:
selection = input("Enter your selection (1 or 2): ")
@ -124,74 +189,127 @@ async def async_firmware_update(inverter):
if selection == "1":
url = DTU_FIRMWARE_URL_00_01_11
break
elif selection == "2":
if selection == "2":
url = input("Enter the custom URL: ").strip()
break
else:
print("Invalid selection. Please enter 1 or 2.")
print()
print(f'Firmware update URL: "{url}"')
print()
print("Invalid selection. Please enter 1 or 2.") # noqa: T201
print() # noqa: T201
print(f'Firmware update URL: "{url}"') # noqa: T201
print() # noqa: T201
cont = input("Do you want to continue? (y/n): ")
if(cont != 'y'):
return
if cont != "y":
return None
return await inverter.async_update_dtu_firmware()
return await dtu.async_update_dtu_firmware()
async def async_restart(inverter):
cont = input("Do you want to restart the device? (y/n): ")
if(cont != 'y'):
return
async def async_restart_dtu(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Restart the DTU asynchronously."""
return await inverter.async_restart()
cont = input("Do you want to restart the DTU? (y/n): ")
if cont != "y":
return None
async def async_turn_off(inverter):
cont = input("Do you want to turn *OFF* the device? (y/n): ")
if(cont != 'y'):
return
return await dtu.async_restart_dtu()
return await inverter.async_turn_off()
async def async_turn_on(inverter):
cont = input("Do you want to turn *ON* the device? (y/n): ")
if(cont != 'y'):
return
async def async_turn_on_inverter(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Turn on the irnverte asynchronously."""
return await inverter.async_turn_on()
cont = input("Do you want to turn *ON* the DTU? (y/n): ")
if cont != "y":
return None
async def async_get_information_data(inverter):
return await inverter.async_get_information_data()
return await dtu.async_turn_on_dtu()
async def async_get_version_info(inverter):
response = await async_app_information_data(inverter)
async def async_turn_off_inverter(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Turn off the inverter asynchronously."""
cont = input("Do you want to turn *OFF* the DTU? (y/n): ")
if cont != "y":
return None
return await dtu.async_turn_off_dtu()
async def async_get_information_data(
dtu: DTU,
) -> InfomationData_pb2.InfomationDataResDTO:
"""Get information data from the dtu asynchronously."""
return await dtu.async_get_information_data()
async def async_get_version_info(dtu: DTU) -> VersionInfo | None:
"""Get version information from the dtu asynchronously."""
response = await async_app_information_data(dtu)
if not response:
return None
return VersionInfo(
dtu_hw_version="H" + generate_dtu_version_string(response.dtu_info.dtu_hw_version),
dtu_sw_version="V" + generate_dtu_version_string(response.dtu_info.dtu_sw_version),
inverter_hw_version="H" + generate_version_string(response.pv_info[0].pv_hw_version),
inverter_sw_version="V" + generate_sw_version_string(response.pv_info[0].pv_sw_version),
dtu_hw_version="H"
+ generate_dtu_version_string(response.dtu_info.dtu_hw_version),
dtu_sw_version="V"
+ generate_dtu_version_string(response.dtu_info.dtu_sw_version),
inverter_hw_version="H"
+ generate_version_string(response.pv_info[0].pv_hw_version),
inverter_sw_version="V"
+ generate_sw_version_string(response.pv_info[0].pv_sw_version),
)
async def async_heatbeat(inverter):
return await inverter.async_heartbeat()
def print_invalid_command(command):
print(f"Invalid command: {command}")
async def async_heatbeat(dtu: DTU) -> APPHeartbeatPB_pb2.APPHeartbeatResDTO | None:
"""Request a heartbeat from the dtu asynchronously."""
async def main():
return await dtu.async_heartbeat()
async def async_identify_dtu(dtu: DTU) -> str:
"""Identify the DTU asynchronously."""
real_data = await async_get_real_data_new(dtu)
return get_dtu_model_name(real_data.device_serial_number)
async def async_identify_inverters(dtu: DTU) -> list[str]:
"""Identify the DTU asynchronously."""
inverter_models = []
real_data = await async_get_real_data_new(dtu)
if real_data:
for sgs_data in real_data.sgs_data:
serial_number = generate_inverter_serial_number(sgs_data.serial_number)
inverter_model = get_inverter_model_name(serial_number)
inverter_models.append(inverter_model)
return inverter_models
def print_invalid_command(command: str) -> None:
"""Print an invalid command message."""
print(f"Invalid command: {command}") # noqa: T201
async def main() -> None:
"""Execute the main function for the hoymiles_wifi package."""
parser = argparse.ArgumentParser(description="Hoymiles HMS Monitoring")
parser.add_argument(
"--host", type=str, required=True, help="IP address or hostname of the inverter"
"--host", type=str, required=True, help="IP address or hostname of the DTU"
)
parser.add_argument(
"--as-json",
action="store_true",
default=False,
help="Format the output as JSON",
)
parser.add_argument('--as-json', action='store_true', default=False,
help='Format the output as JSON')
parser.add_argument(
"command",
type=str,
@ -206,56 +324,66 @@ async def main():
"set-power-limit",
"set-wifi",
"firmware-update",
"restart",
"turn-on",
"turn-off",
"restart-dtu",
"turn-on-inverter",
"turn-off-inverter",
"get-information-data",
"get-version-info",
"heartbeat",
"identify-dtu",
"identify-inverters",
],
help="Command to execute",
)
args = parser.parse_args()
inverter = Inverter(args.host)
dtu = DTU(args.host)
# Execute the specified command using a switch case
switch = {
'get-real-data-new': async_get_real_data_new,
'get-real-data-hms': async_get_real_data_hms,
'get-real-data': async_get_real_data,
'get-config': async_get_config,
'network-info': async_network_info,
'app-information-data': async_app_information_data,
'app-get-hist-power': async_app_get_hist_power,
'set-power-limit': async_set_power_limit,
'set-wifi': async_set_wifi,
'firmware-update': async_firmware_update,
'restart': async_restart,
'turn-on': async_turn_on,
'turn-off': async_turn_off,
'get-information-data': async_get_information_data,
'get-version-info': async_get_version_info,
'heartbeat': async_heatbeat,
"get-real-data-new": async_get_real_data_new,
"get-real-data-hms": async_get_real_data_hms,
"get-real-data": async_get_real_data,
"get-config": async_get_config,
"network-info": async_network_info,
"app-information-data": async_app_information_data,
"app-get-hist-power": async_app_get_hist_power,
"set-power-limit": async_set_power_limit,
"set-wifi": async_set_wifi,
"firmware-update": async_firmware_update,
"restart-dtu": async_restart_dtu,
"turn-on": async_turn_on_inverter,
"turn-off": async_turn_off_inverter,
"get-information-data": async_get_information_data,
"get-version-info": async_get_version_info,
"heartbeat": async_heatbeat,
"identify-dtu": async_identify_dtu,
"identify-inverters": async_identify_inverters,
}
command_func = switch.get(args.command, print_invalid_command)
response = await command_func(inverter)
response = await command_func(dtu)
if response:
if args.as_json:
if isinstance(response, Message):
print(MessageToJson(response))
print(MessageToJson(response)) # noqa: T201
else:
print(json.dumps(asdict(response), indent=4))
print(json.dumps(asdict(response), indent=4)) # noqa: T201
else:
print(f"{args.command.capitalize()} Response: \n{response}")
print(f"{args.command.capitalize()} Response: \n{response}") # noqa: T201
else:
print(f"No response or unable to retrieve response for {args.command.replace('_', ' ')}")
print( # noqa: T201
f"No response or unable to retrieve response for "
f"{args.command.replace('_', ' ')}",
)
def run_main():
def run_main() -> None:
"""Run the main function for the hoymiles_wifi package."""
asyncio.run(main())
if __name__ == "__main__":
run_main()

View File

@ -1,35 +1,37 @@
INVERTER_PORT = 10081
"""Constants for the Hoymiles WiFi integration."""
DTU_PORT = 10081
# App -> DTU start with 0xa3, responses start 0xa2
CMD_HEADER = b'HM'
CMD_APP_INFO_DATA_RES_DTO = b'\xa3\x01'
CMD_HB_RES_DTO = b'\xa3\x02'
CMD_REAL_DATA_RES_DTO = b'\xa3\x03'
CMD_W_INFO_RES_DTO = b'\xa3\x04'
CMD_COMMAND_RES_DTO = b'\xa3\x05'
CMD_COMMAND_STATUS_RES_DTO = b'\xa3\x06'
CMD_DEV_CONFIG_FETCH_RES_DTO = b'\xa3\x07'
CMD_DEV_CONFIG_PUT_RES_DTO = b'\xa3\x08'
CMD_GET_CONFIG = b'\xa3\x09'
CMD_SET_CONFIG = b'\xa3\x10'
CMD_REAL_RES_DTO = b'\xa3\x11'
CMD_GPST_RES_DTO = b'\xa3\x12'
CMD_AUTO_SEARCH = b'\xa3\x13'
CMD_NETWORK_INFO_RES = b'\xa3\x14'
CMD_APP_GET_HIST_POWER_RES = b'\xa3\x15'
CMD_APP_GET_HIST_ED_RES = b'\xa3\x16'
CMD_HB_RES_DTO_ALT = b'\x83\x01'
CMD_REGISTER_RES_DTO = b'\x83\x02'
CMD_STORAGE_DATA_RES = b'\x83\x03'
CMD_COMMAND_RES_DTO_2 = b'\x83\x05'
CMD_COMMAND_STATUS_RES_DTO_2 = b'\x83\x06'
CMD_DEV_CONFIG_FETCH_RES_DTO_2 = b'\x83\x07'
CMD_DEV_CONFIG_PUT_RES_DTO_2 = b'\x83\x08'
CMD_GET_CONFIG_RES = b'\xdb\x08'
CMD_SET_CONFIG_RES = b'\xdb\x07'
CMD_HEADER = b"HM"
CMD_APP_INFO_DATA_RES_DTO = b"\xa3\x01"
CMD_HB_RES_DTO = b"\xa3\x02"
CMD_REAL_DATA_RES_DTO = b"\xa3\x03"
CMD_W_INFO_RES_DTO = b"\xa3\x04"
CMD_COMMAND_RES_DTO = b"\xa3\x05"
CMD_COMMAND_STATUS_RES_DTO = b"\xa3\x06"
CMD_DEV_CONFIG_FETCH_RES_DTO = b"\xa3\x07"
CMD_DEV_CONFIG_PUT_RES_DTO = b"\xa3\x08"
CMD_GET_CONFIG = b"\xa3\x09"
CMD_SET_CONFIG = b"\xa3\x10"
CMD_REAL_RES_DTO = b"\xa3\x11"
CMD_GPST_RES_DTO = b"\xa3\x12"
CMD_AUTO_SEARCH = b"\xa3\x13"
CMD_NETWORK_INFO_RES = b"\xa3\x14"
CMD_APP_GET_HIST_POWER_RES = b"\xa3\x15"
CMD_APP_GET_HIST_ED_RES = b"\xa3\x16"
CMD_HB_RES_DTO_ALT = b"\x83\x01"
CMD_REGISTER_RES_DTO = b"\x83\x02"
CMD_STORAGE_DATA_RES = b"\x83\x03"
CMD_COMMAND_RES_DTO_2 = b"\x83\x05"
CMD_COMMAND_STATUS_RES_DTO_2 = b"\x83\x06"
CMD_DEV_CONFIG_FETCH_RES_DTO_2 = b"\x83\x07"
CMD_DEV_CONFIG_PUT_RES_DTO_2 = b"\x83\x08"
CMD_GET_CONFIG_RES = b"\xdb\x08"
CMD_SET_CONFIG_RES = b"\xdb\x07"
CMD_CLOUD_INFO_DATA_RES_DTO = b'\x23\x01'
CMD_CLOUD_COMMAND_RES_DTO = b'\x23\x05'
CMD_CLOUD_INFO_DATA_RES_DTO = b"\x23\x01"
CMD_CLOUD_COMMAND_RES_DTO = b"\x23\x05"
CMD_ACTION_MICRO_DEFAULT = 0
CMD_ACTION_DTU_REBOOT = 1
@ -67,7 +69,7 @@ CMD_ACTION_GW_AUTO_NETWORKING = 4101
CMD_ACTION_GW_UPGRADE = 4102
CMD_ACTION_MICRO_MEMORY_SNAPSHOT = 53
CMD_ACTION_MICRO_DATA_WAVE = 54
CMD_ACTION_SET_485_PORT, = 36
CMD_ACTION_SET_485_PORT = 36
CMD_ACTION_THREE_BALANCE_SET = 37
CMD_ACTION_MI_GRID_PROTECT_SELF = 38
CMD_ACTION_SUN_SPEC_CONFIG = 39
@ -107,4 +109,8 @@ DEV_OP = 8
DEV_GATEWAY = 9
DEV_BMS = 10
DTU_FIRMWARE_URL_00_01_11 = "http://fwupdate.hoymiles.com/cfs/bin/2311/06/,1488725943932555264.bin"
DTU_FIRMWARE_URL_00_01_11 = (
"http://fwupdate.hoymiles.com/cfs/bin/2311/06/,1488725943932555264.bin"
)
MAX_POWER_LIMIT = 100

View File

@ -1,129 +1,185 @@
"""DTU communication implementation for Hoymiles WiFi."""
from __future__ import annotations
import asyncio
import struct
from typing import Any
from crcmod import mkCrcFun
from datetime import datetime
import time
from datetime import datetime
from enum import Enum
from typing import Any
from crcmod import mkCrcFun
from hoymiles_wifi import logger
from hoymiles_wifi.utils import initialize_set_config
from hoymiles_wifi.const import (
CMD_ACTION_DTU_REBOOT,
CMD_ACTION_DTU_UPGRADE,
CMD_ACTION_LIMIT_POWER,
CMD_ACTION_MI_SHUTDOWN,
CMD_ACTION_MI_START,
CMD_APP_GET_HIST_POWER_RES,
CMD_APP_INFO_DATA_RES_DTO,
CMD_CLOUD_COMMAND_RES_DTO,
CMD_COMMAND_RES_DTO,
CMD_GET_CONFIG,
CMD_HB_RES_DTO,
CMD_HEADER,
CMD_NETWORK_INFO_RES,
CMD_REAL_DATA_RES_DTO,
CMD_REAL_RES_DTO,
CMD_SET_CONFIG,
DEV_DTU,
DTU_FIRMWARE_URL_00_01_11,
DTU_PORT,
)
from hoymiles_wifi.protobuf import (
AppGetHistPower_pb2,
APPHeartbeatPB_pb2,
APPInfomationData_pb2,
AppGetHistPower_pb2,
CommandPB_pb2,
InfomationData_pb2,
GetConfig_pb2,
RealData_pb2,
RealDataNew_pb2,
RealDataHMS_pb2,
InfomationData_pb2,
NetworkInfo_pb2,
RealData_pb2,
RealDataHMS_pb2,
RealDataNew_pb2,
SetConfig_pb2,
)
from hoymiles_wifi.const import (
INVERTER_PORT,
CMD_HEADER,
CMD_GET_CONFIG,
CMD_REAL_RES_DTO,
CMD_REAL_DATA_RES_DTO,
CMD_NETWORK_INFO_RES,
CMD_APP_INFO_DATA_RES_DTO,
CMD_APP_GET_HIST_POWER_RES,
CMD_ACTION_LIMIT_POWER,
CMD_COMMAND_RES_DTO,
CMD_ACTION_DTU_UPGRADE,
CMD_SET_CONFIG,
CMD_CLOUD_COMMAND_RES_DTO,
CMD_ACTION_DTU_REBOOT,
CMD_ACTION_MI_START,
CMD_ACTION_MI_SHUTDOWN,
DTU_FIRMWARE_URL_00_01_11,
CMD_HB_RES_DTO,
CMD_CLOUD_INFO_DATA_RES_DTO,
DEV_DTU,
)
from hoymiles_wifi.utils import initialize_set_config
class NetmodeSelect:
class NetmodeSelect(Enum):
"""Network mode selection."""
WIFI = 1
SIM = 2
LAN = 3
class NetworkState:
class NetworkState(Enum):
"""Network state."""
Unknown = 0
Online = 1
Offline = 2
class Inverter:
class DTU:
"""DTU class."""
def __init__(self, host: str):
"""Initialize DTU class."""
self.host = host
self.state = NetworkState.Unknown
self.sequence = 0
self.mutex = asyncio.Lock()
def get_state(self) -> NetworkState:
"""Get DTU state."""
return self.state
def set_state(self, new_state: NetworkState):
"""Set DTU state."""
if self.state != new_state:
self.state = new_state
logger.debug(f"Inverter is {new_state}")
logger.debug(f"DTU is {new_state}")
async def async_get_real_data_hms(self) -> RealDataHMS_pb2.HMSStateResponse | None:
"""Get real data HMS."""
request = RealDataHMS_pb2.HMSRealDataResDTO()
command = CMD_REAL_DATA_RES_DTO
return await self.async_send_request(command, request, RealDataHMS_pb2.HMSStateResponse)
return await self.async_send_request(
command, request, RealDataHMS_pb2.HMSStateResponse
)
async def async_get_real_data(self) -> RealData_pb2.RealDataResDTO | None:
"""Get real data."""
request = RealData_pb2.RealDataResDTO()
command = CMD_REAL_DATA_RES_DTO
return await self.async_send_request(command, request, RealData_pb2.RealDataReqDTO)
return await self.async_send_request(
command, request, RealData_pb2.RealDataReqDTO
)
async def async_get_real_data_new(self) -> RealDataNew_pb2.RealDataNewResDTO | None:
"""Get real data new."""
request = RealDataNew_pb2.RealDataNewResDTO()
request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
request.time_ymd_hms = (
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
)
request.offset = 28800
request.time = int(time.time())
command = CMD_REAL_RES_DTO
return await self.async_send_request(command, request, RealDataNew_pb2.RealDataNewReqDTO)
return await self.async_send_request(
command, request, RealDataNew_pb2.RealDataNewReqDTO
)
async def async_get_config(self) -> GetConfig_pb2.GetConfigResDTO | None:
"""Get config."""
request = GetConfig_pb2.GetConfigResDTO()
request.offset = 28800
request.time = int(time.time()) - 60
command = CMD_GET_CONFIG
return await self.async_send_request(command, request, GetConfig_pb2.GetConfigReqDTO)
return await self.async_send_request(
command,
request,
GetConfig_pb2.GetConfigReqDTO,
)
async def async_network_info(self) -> NetworkInfo_pb2.NetworkInfoResDTO | None:
"""Get network info."""
request = NetworkInfo_pb2.NetworkInfoResDTO()
request.offset = 28800
request.time = int(time.time())
command = CMD_NETWORK_INFO_RES
return await self.async_send_request(command, request, NetworkInfo_pb2.NetworkInfoReqDTO)
return await self.async_send_request(
command, request, NetworkInfo_pb2.NetworkInfoReqDTO
)
async def async_app_information_data(self) -> APPInfomationData_pb2.APPInfoDataResDTO:
async def async_app_information_data(
self,
) -> APPInfomationData_pb2.APPInfoDataResDTO:
"""Get app information data."""
request = APPInfomationData_pb2.APPInfoDataResDTO()
request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
request.time_ymd_hms = (
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
)
request.offset = 28800
request.time = int(time.time())
command = CMD_APP_INFO_DATA_RES_DTO
return await self.async_send_request(command, request, APPInfomationData_pb2.APPInfoDataReqDTO)
return await self.async_send_request(
command, request, APPInfomationData_pb2.APPInfoDataReqDTO
)
async def async_app_get_hist_power(
self,
) -> AppGetHistPower_pb2.AppGetHistPowerResDTO | None:
"""Get historical power."""
async def async_app_get_hist_power(self) -> AppGetHistPower_pb2.AppGetHistPowerResDTO | None:
request = AppGetHistPower_pb2.AppGetHistPowerResDTO()
request.offset = 28800
request.requested_time = int(time.time())
command = CMD_APP_GET_HIST_POWER_RES
return await self.async_send_request(command, request, AppGetHistPower_pb2.AppGetHistPowerReqDTO)
return await self.async_send_request(
command,
request,
AppGetHistPower_pb2.AppGetHistPowerReqDTO,
)
async def async_set_power_limit(self, power_limit: int) -> CommandPB_pb2.CommandResDTO | None:
if(power_limit < 0 or power_limit > 100):
async def async_set_power_limit(
self,
power_limit: int,
) -> CommandPB_pb2.CommandResDTO | None:
"""Set power limit."""
if power_limit < 0 or power_limit > 100:
logger.error("Error. Invalid power limit!")
return
@ -134,19 +190,24 @@ class Inverter:
request.action = CMD_ACTION_LIMIT_POWER
request.package_nub = 1
request.tid = int(time.time())
request.data = f'A:{power_limit},B:0,C:0\r'.encode('utf-8')
request.data = f"A:{power_limit},B:0,C:0\r".encode()
command = CMD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
return await self.async_send_request(
command, request, CommandPB_pb2.CommandReqDTO
)
async def async_set_wifi(self, ssid: str, password: str) -> SetConfig_pb2.SetConfigResDTO | None:
async def async_set_wifi(
self, ssid: str, password: str
) -> SetConfig_pb2.SetConfigResDTO | None:
"""Set wifi."""
get_config_req = await self.async_get_config()
if(get_config_req is None):
if get_config_req is None:
logger.error("Failed to get config")
return
return None
request = initialize_set_config(get_config_req)
@ -154,26 +215,33 @@ class Inverter:
request.offset = 28800
request.app_page = 1
request.netmode_select = NetmodeSelect.WIFI
request.wifi_ssid = ssid.encode('utf-8')
request.wifi_password = password.encode('utf-8')
request.wifi_ssid = ssid.encode("utf-8")
request.wifi_password = password.encode("utf-8")
command = CMD_SET_CONFIG
return await self.async_send_request(command, request, SetConfig_pb2.SetConfigReqDTO)
return await self.async_send_request(
command, request, SetConfig_pb2.SetConfigReqDTO
)
async def async_update_dtu_firmware(self, firmware_url: str = DTU_FIRMWARE_URL_00_01_11) -> CommandPB_pb2.CommandResDTO | None:
async def async_update_dtu_firmware(
self,
firmware_url: str = DTU_FIRMWARE_URL_00_01_11,
) -> CommandPB_pb2.CommandResDTO | None:
"""Update DTU firmware."""
request = CommandPB_pb2.CommandResDTO()
request.action = CMD_ACTION_DTU_UPGRADE
request.package_nub = 1
request.tid = int(time.time())
request.data = (firmware_url + '\r').encode('utf-8')
request.data = (firmware_url + "\r").encode("utf-8")
command = CMD_CLOUD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
return await self.async_send_request(
command, request, CommandPB_pb2.CommandReqDTO
)
async def async_restart(self) -> CommandPB_pb2.CommandResDTO | None:
async def async_restart_dtu(self) -> CommandPB_pb2.CommandResDTO | None:
"""Restart DTU."""
request = CommandPB_pb2.CommandResDTO()
request.action = CMD_ACTION_DTU_REBOOT
@ -181,10 +249,12 @@ class Inverter:
request.tid = int(time.time())
command = CMD_CLOUD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
return await self.async_send_request(
command, request, CommandPB_pb2.CommandReqDTO
)
async def async_turn_on(self) -> CommandPB_pb2.CommandResDTO | None:
async def async_turn_on_inverter(self) -> CommandPB_pb2.CommandResDTO | None:
"""Turn on DTU."""
request = CommandPB_pb2.CommandResDTO()
request.action = CMD_ACTION_MI_START
@ -193,9 +263,12 @@ class Inverter:
request.tid = int(time.time())
command = CMD_CLOUD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
return await self.async_send_request(
command, request, CommandPB_pb2.CommandReqDTO
)
async def async_turn_off(self) -> CommandPB_pb2.CommandResDTO | None:
async def async_turn_off_inverter(self) -> CommandPB_pb2.CommandResDTO | None:
"""Turn off DTU."""
request = CommandPB_pb2.CommandResDTO()
request.action = CMD_ACTION_MI_SHUTDOWN
@ -204,41 +277,67 @@ class Inverter:
request.tid = int(time.time())
command = CMD_CLOUD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
return await self.async_send_request(
command, request, CommandPB_pb2.CommandReqDTO
)
async def async_get_information_data(self) -> InfomationData_pb2.InfoDataResDTO | None:
async def async_get_information_data(
self,
) -> InfomationData_pb2.InfoDataResDTO | None:
"""Get information data."""
request = InfomationData_pb2.InfoDataResDTO()
request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
request.time_ymd_hms = (
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
)
request.offset = 28800
request.time = int(time.time())
command = CMD_APP_INFO_DATA_RES_DTO
return await self.async_send_request(command, request, InfomationData_pb2.InfoDataReqDTO)
return await self.async_send_request(
command, request, InfomationData_pb2.InfoDataReqDTO
)
async def async_heartbeat(self) -> APPHeartbeatPB_pb2.HBReqDTO | None:
"""Request heartbeat."""
request = APPHeartbeatPB_pb2.HBResDTO()
request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
request.time_ymd_hms = (
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
)
request.offset = 28800
request.time = int(time.time())
command = CMD_HB_RES_DTO
return await self.async_send_request(command, request, APPHeartbeatPB_pb2.HBReqDTO)
return await self.async_send_request(
command, request, APPHeartbeatPB_pb2.HBReqDTO
)
async def async_send_request(
self,
command: bytes,
request: Any,
response_type: Any,
dtu_port: int = DTU_PORT,
):
"""Send request to DTU."""
async def async_send_request(self, command: bytes, request: Any, response_type: Any, inverter_port: int = INVERTER_PORT):
self.sequence = (self.sequence + 1) & 0xFFFF
request_as_bytes = request.SerializeToString()
crc16 = mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)(request_as_bytes)
crc16 = mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)(
request_as_bytes
)
length = len(request_as_bytes) + 10
# compose request message
header = CMD_HEADER + command
message = header + struct.pack('>HHH', self.sequence, crc16, length) + request_as_bytes
message = (
header
+ struct.pack(">HHH", self.sequence, crc16, length)
+ request_as_bytes
)
address = (self.host, inverter_port)
address = (self.host, dtu_port)
async with self.mutex:
try:
@ -263,17 +362,21 @@ class Inverter:
if len(buf) < 10:
raise ValueError("Buffer is too short for unpacking")
crc16_target, read_length = struct.unpack('>HH', buf[6:10])
crc16_target, read_length = struct.unpack(">HH", buf[6:10])
if(len(buf) != read_length):
if len(buf) != read_length:
raise ValueError("Buffer is incomplete")
response_as_bytes = buf[10:read_length]
crc16_response = mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)(response_as_bytes)
crc16_response = mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)(
response_as_bytes
)
if crc16_response != crc16_target:
logger.debug(f"CRC16 mismatch: {hex(crc16_response)} != {hex(crc16_target)}")
logger.debug(
f"CRC16 mismatch: {hex(crc16_response)} != {hex(crc16_target)}"
)
raise ValueError("CRC16 mismatch")
parsed = response_type.FromString(response_as_bytes)

298
hoymiles_wifi/hoymiles.py Normal file
View File

@ -0,0 +1,298 @@
"""Hoymiles quirks for inverters and DTU."""
import struct
from enum import Enum
from hoymiles_wifi import logger
class InverterType(Enum):
"""Inverter type."""
ONE = "1T"
TWO = "2T"
FOUR = "4T"
SIX = "6T"
class InverterSeries(Enum):
"""Inverter series."""
HM = "HM"
HMS = "HMS"
HMT = "HMT"
class InverterPower(Enum):
"""Inverter power."""
P_100 = "100"
P_250 = "250"
P_300_350_400 = "300/350/400"
P_400 = "400"
P_500 = "500"
P_600_700_800 = "600/700/800"
P_800W = "800W"
P_1000 = "1000"
P_1000_1200_1500 = "1000/1200/1500"
P_1200_1500 = "1200/1500"
P_1600 = "1600"
P_2000 = "2000"
power_mapping = {
0x1011: InverterPower.P_100,
0x1020: InverterPower.P_250,
0x1021: InverterPower.P_300_350_400,
0x1121: InverterPower.P_300_350_400,
0x1125: InverterPower.P_400,
0x1040: InverterPower.P_500,
0x1041: InverterPower.P_600_700_800,
0x1042: InverterPower.P_600_700_800,
0x1141: InverterPower.P_600_700_800,
0x1060: InverterPower.P_1000,
0x1061: InverterPower.P_1200_1500,
0x1161: InverterPower.P_1000_1200_1500,
0x1164: InverterPower.P_1600,
0x1412: InverterPower.P_800W,
}
class DTUType(Enum):
"""DTU type."""
DTU_G100 = "DTU-G100"
DTU_W100 = "DTU-W100"
DTU_LITE_S = "DTU-Lite-S"
DTU_LITE = "DTU-Lite"
DTU_PRO = "DTU-PRO"
DTU_PRO_S = "DTU-PRO-S"
DTU_HMS_W = "DTU-HMS-W"
DTU_W100_LITE_S = "DTU-W100/DTU-Lite-S"
DTU_W_LITE = "DTU-WLite"
type_mapping = {
0x10F7: DTUType.DTU_PRO,
0x10FB: DTUType.DTU_PRO,
0x4101: DTUType.DTU_PRO,
0x10FC: DTUType.DTU_PRO,
0x4120: DTUType.DTU_PRO,
0x10F8: DTUType.DTU_PRO,
0x4100: DTUType.DTU_PRO,
0x10FD: DTUType.DTU_PRO,
0x4121: DTUType.DTU_PRO,
0x10D3: DTUType.DTU_W100_LITE_S,
0x4110: DTUType.DTU_W100_LITE_S,
0x10D8: DTUType.DTU_W100_LITE_S,
0x4130: DTUType.DTU_W100_LITE_S,
0x4132: DTUType.DTU_W100_LITE_S,
0x4133: DTUType.DTU_W100_LITE_S,
0x10D9: DTUType.DTU_W100_LITE_S,
0x4111: DTUType.DTU_W100_LITE_S,
0x10D2: DTUType.DTU_G100,
0x10D6: DTUType.DTU_LITE,
0x10D7: DTUType.DTU_LITE,
0x4131: DTUType.DTU_LITE,
0x1124: DTUType.DTU_HMS_W,
0x1125: DTUType.DTU_HMS_W,
0x1403: DTUType.DTU_HMS_W,
0x1144: DTUType.DTU_HMS_W,
0x1143: DTUType.DTU_HMS_W,
0x1145: DTUType.DTU_HMS_W,
0x1412: DTUType.DTU_HMS_W,
0x1164: DTUType.DTU_HMS_W,
0x1165: DTUType.DTU_HMS_W,
0x1166: DTUType.DTU_HMS_W,
0x1167: DTUType.DTU_HMS_W,
0x1222: DTUType.DTU_HMS_W,
0x1422: DTUType.DTU_HMS_W,
0x1423: DTUType.DTU_HMS_W,
0x1361: DTUType.DTU_HMS_W,
0x1362: DTUType.DTU_HMS_W,
0x1381: DTUType.DTU_HMS_W,
0x1382: DTUType.DTU_HMS_W,
0x4143: DTUType.DTU_HMS_W,
}
def format_number(number: int) -> str:
"""Format number to two digits."""
return f"{number:02d}"
def generate_version_string(version_number: int) -> str:
"""Generate version string."""
version_string = (
format_number(version_number // 2048)
+ "."
+ format_number((version_number // 64) % 32)
+ "."
+ format_number(version_number % 64)
)
return version_string
def generate_sw_version_string(version_number: int) -> str:
"""Generate software version string."""
version_number2 = version_number // 10000
version_number3 = (version_number - (version_number2 * 10000)) // 100
version_number4 = (version_number - (version_number2 * 10000)) - (
version_number3 * 100
)
version_string = (
format_number(version_number2)
+ "."
+ format_number(version_number3)
+ "."
+ format_number(version_number4)
)
return version_string
def generate_dtu_version_string(version_number: int, type: str = "") -> str:
"""Generate DTU version string."""
version_string = ""
version_number2 = version_number % 256
version_number3 = (version_number // 256) % 16
if type == "SRF":
version_string += f"{format_number(version_number // 1048576)}.{format_number((version_number % 65536) // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
elif type == "HRF":
version_string += f"{format_number(version_number // 65536)}.{format_number((version_number % 65536) // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
else:
version_string += f"{format_number(version_number // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
return version_string
def generate_inverter_serial_number(serial_number: int) -> str:
"""Generate inverter serial number."""
return hex(serial_number)[2:]
def get_inverter_type(serial_bytes: bytes) -> InverterType:
"""Get inverter type."""
inverter_type = None
# Access individual bytes
if serial_bytes[0] == 0x11:
if serial_bytes[1] in [0x25, 0x24, 0x22, 0x21]:
inverter_type = InverterType.ONE
elif serial_bytes[1] in [0x44, 0x42, 0x41]:
inverter_type = InverterType.TWO
elif serial_bytes[1] in [0x64, 0x62, 0x61]:
inverter_type = InverterType.FOUR
elif serial_bytes[0] == 0x13:
inverter_type = InverterType.SIX
elif serial_bytes[0] == 0x14:
if serial_bytes[1] in [0x12]:
inverter_type = InverterType.TWO
if inverter_type is None:
raise ValueError(
f"Unknown inverter type: {hex(serial_bytes[0])} {hex(serial_bytes[1])}"
)
return inverter_type
def get_inverter_series(serial_bytes: bytes) -> InverterSeries:
"""Get inverter series."""
series = None
if serial_bytes[0] == 0x11:
if (serial_bytes[1] & 0x0F) == 0x04:
series = InverterSeries.HMS
else:
series = InverterSeries.HM
elif serial_bytes[0] == 0x10:
if serial_bytes[1] & 0x03 == 0x02:
series = InverterSeries.HM
else:
series = InverterSeries.HMS
elif serial_bytes[0] == 0x13:
series = InverterSeries.HMT
elif serial_bytes[0] == 0x14:
series = InverterSeries.HMS
if series is None:
raise ValueError(
f"Unknown series: {hex(serial_bytes[0])} {hex(serial_bytes[1])}!"
)
return series
def get_inverter_power(serial_bytes: bytes) -> InverterPower:
"""Get inverter power."""
inverter_type_bytes = struct.unpack(">H", serial_bytes[:2])[0]
power = power_mapping.get(inverter_type_bytes)
if power is None:
raise ValueError(
f"Unknown power: {hex(serial_bytes[0])} {hex(serial_bytes[1])}!"
)
return power
def get_inverter_model_name(serial_number: str) -> str:
"""Get hardware model name."""
if serial_number == "22069994886948":
serial_number = generate_inverter_serial_number(int(serial_number))
serial_bytes = bytes.fromhex(serial_number)
try:
inverter_type = get_inverter_type(serial_bytes)
inverter_series = get_inverter_series(serial_bytes)
inverter_power = get_inverter_power(serial_bytes)
except Exception as e:
logger.error(e)
return "Unknown"
else:
inverter_model_name = (
inverter_series.value
+ "-"
+ inverter_power.value
+ "-"
+ inverter_type.value
)
return inverter_model_name
def get_dtu_model_type(serial_bytes: bytes) -> DTUType:
"""Get DTU model type."""
dtu_type_bytes = struct.unpack(">H", serial_bytes[:2])[0]
dtu_type = type_mapping.get(dtu_type_bytes)
if dtu_type is None:
raise ValueError(f"Unknown DTU: {serial_bytes[:2]}!")
return dtu_type
def get_dtu_model_name(serial_number: str) -> str:
"""Get DTU model name."""
serial_bytes = bytes.fromhex(serial_number)
try:
dtu_type = get_dtu_model_type(serial_bytes)
except Exception as e:
logger.error(e)
return "Unknown"
else:
return dtu_type.value

View File

@ -2,5 +2,6 @@
for file in $(ls *.proto)
do
protoc --pyi_out=. $file
protoc --python_out=. $file
done

View File

@ -1,47 +1,14 @@
""""Utils for interacting with Hoymiles WiFi API."""
"""Utils for interacting with Hoymiles WiFi API."""
from hoymiles_wifi.protobuf import (
GetConfig_pb2,
SetConfig_pb2,
)
def format_number(number) -> str:
return "{:02d}".format(number)
def generate_version_string(version_number: int) -> str:
version_string = format_number(version_number // 2048) + "." + format_number((version_number // 64) % 32) + "." + format_number(version_number % 64)
return version_string
def generate_sw_version_string(version_number: int) -> str:
version_number2 = version_number // 10000
version_number3 = (version_number - (version_number2 * 10000)) // 100
version_number4 = (version_number - (version_number2 * 10000)) - (version_number3 * 100)
version_string = format_number(version_number2) + "." + format_number(version_number3) + "." + format_number(version_number4)
return version_string
def generate_dtu_version_string(version_number: int, type: str = None) -> str:
version_string = ""
version_number2 = version_number % 256
version_number3 = (version_number // 256) % 16
if "SRF" == str:
version_string += f"{format_number(version_number // 1048576)}.{format_number((version_number % 65536) // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
elif "HRF" == str:
version_string += f"{format_number(version_number // 65536)}.{format_number((version_number % 65536) // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
else:
version_string += f"{format_number(version_number // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
return version_string
def generate_inverter_serial_number(serial_number: int) -> str:
return hex(serial_number)[2:]
def initialize_set_config(get_config_req: GetConfig_pb2.GetConfigReqDTO):
"""Initialize set config response with get config request."""
set_config_res = SetConfig_pb2.SetConfigResDTO()
set_config_res.lock_password = get_config_req.lock_password
set_config_res.lock_time = get_config_req.lock_time
@ -92,5 +59,3 @@ def initialize_set_config(get_config_req: GetConfig_pb2.GetConfigReqDTO):
set_config_res.dtu_ap_pass = get_config_req.dtu_ap_pass
return set_config_res

View File

@ -1,19 +1,18 @@
"""Setup for the hoymiles-wifi package."""
from setuptools import setup
setup(
name='hoymiles-wifi',
packages=['hoymiles_wifi', 'hoymiles_wifi.protobuf'],
install_requires=[
'protobuf',
'crcmod',
],
version='0.1.7',
description='A python library for interfacing with Hoymiles HMS-XXXXW-T2 series of micro-inverters.',
author='suaveolent',
name="hoymiles-wifi",
packages=["hoymiles_wifi", "hoymiles_wifi.protobuf"],
install_requires=["protobuf", "crcmod"],
version="0.1.8",
description="A python library for interfacing with the Hoymiles DTUs and the HMS-XXXXW-2T series of micro-inverters using protobuf messages.",
author="suaveolent",
include_package_data=True,
entry_points={
'console_scripts': [
'hoymiles-wifi = hoymiles_wifi.__main__:run_main',
"console_scripts": [
"hoymiles-wifi = hoymiles_wifi.__main__:run_main",
],
}
},
)