From a97c9a8fd2c9f054efcc2c9ee9b73a47a479d949 Mon Sep 17 00:00:00 2001 From: suaveolent Date: Thu, 14 Mar 2024 15:18:11 +0100 Subject: [PATCH] refactoring, version bump --- .devcontainer/devcontainer.json | 2 +- README.md | 29 +- hoymiles_wifi/__init__.py | 4 +- hoymiles_wifi/__main__.py | 377 +++++++++++++++--------- hoymiles_wifi/const.py | 72 ++--- hoymiles_wifi/{inverter.py => dtu.py} | 302 ++++++++++++------- hoymiles_wifi/hoymiles.py | 216 +++++++++----- hoymiles_wifi/protobuf/compile_proto.sh | 1 + hoymiles_wifi/utils.py | 13 +- setup.py | 25 +- 10 files changed, 662 insertions(+), 379 deletions(-) rename hoymiles_wifi/{inverter.py => dtu.py} (56%) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 7686403..5c3dbcf 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -27,7 +27,7 @@ ], "python.testing.unittestEnabled": false, "python.testing.pytestEnabled": true, - "python.formatting.provider": "black", + "python.formatting.provider": "ruff", "python.linting.mypyEnabled": true, "python.linting.enabled": true } diff --git a/README.md b/README.md index 1aa6586..17b51bb 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ # hoymiles-wifi - -This Python library facilitates communication with Hoymiles HMS microinverters, specifically targeting the HMS-XXXXW-2T series. +This Python library facilitates communication with Hoymiles DTUs and the HMS-XXXXW-2T HMS microinverters, utilizing protobuf messages. **Disclaimer: This library is not affiliated with Hoymiles. It is an independent project developed to provide tools for interacting with Hoymiles HMS-XXXXW-2T series micro-inverters featuring integrated WiFi DTU. Any trademarks or product names mentioned are the property of their respective owners.** @@ -9,9 +8,8 @@ This Python library facilitates communication with Hoymiles HMS microinverters, The library was successfully tested with: - - Hoymiles HMS-800W-2T - - Hoymiles DTU WLite - +- Hoymiles HMS-800W-2T +- Hoymiles DTU WLite ## Installation @@ -29,7 +27,7 @@ You can integrate the library into your own project, or simply use it in the com hoymiles-wifi [-h] --host HOST [--as-json] commands: - get-real-data-new, + get-real-data-new, get-real-data-hms, get-real-data, get-config, @@ -52,18 +50,19 @@ The `--as-json` option is optional and allows formatting the output as JSON. ### Python code ``` -from hoymiles_wifi.inverter import Inverter +from hoymiles_wifi.dtu import DTU ... -inverter = Inverter() -response = await inverter. +dtu = DTU() +response = await dtu. if response: - print(f"Inverter Response: {response}") + print(f"DTU Response: {response}") else: print("Unable to get response!") ``` #### Available functions + - `async_get_real_data_new()`: Retrieve real-time data - `async_get_real_data_hms()`: Retrieve real-time data - `async_get_real_data()`: Retrieve real-time data @@ -82,15 +81,12 @@ else: Please be aware of the following considerations: - - No DTU Implementation: This library - retrieves information directly from the internal DTU of Hoymiles Wifi - inverters. +- No DTU Implementation: This library retrieves information directly from the (internal) DTU of Hoymiles Wifi inverters. ## Caution Use this library responsibly and be aware of potential risks. There are no guarantees provided, and any misuse or incorrect implementation may result in undesirable outcomes. Ensure that your inverter is not compromised during communication. - ## Known Limitations **Update Frequency:** The library may experience limitations in fetching updates, potentially around twice per minute. The inverter firmware may enforce a mandatory wait period of approximately 30 seconds between requests. @@ -100,5 +96,6 @@ Use this library responsibly and be aware of potential risks. There are no guara ## Attribution A special thank you for the inspiration and codebase to: - - [DennisOSRM](https://github.com/DennisOSRM): [hms-mqtt-publisher](https://github.com/DennisOSRM/hms-mqtt-publisher) - - [henkwiedig](https://github.com/henkwiedig): [Hoymiles-DTU-Proto](https://github.com/henkwiedig/Hoymiles-DTU-Proto) + +- [DennisOSRM](https://github.com/DennisOSRM): [hms-mqtt-publisher](https://github.com/DennisOSRM/hms-mqtt-publisher) +- [henkwiedig](https://github.com/henkwiedig): [Hoymiles-DTU-Proto](https://github.com/henkwiedig/Hoymiles-DTU-Proto) diff --git a/hoymiles_wifi/__init__.py b/hoymiles_wifi/__init__.py index b8d20bd..ddb0c8c 100644 --- a/hoymiles_wifi/__init__.py +++ b/hoymiles_wifi/__init__.py @@ -1,5 +1,7 @@ +"""Init file for hoymiles_wifi package.""" + import logging logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) \ No newline at end of file +logger = logging.getLogger(__name__) diff --git a/hoymiles_wifi/__main__.py b/hoymiles_wifi/__main__.py index 7e0f29a..d71ca14 100644 --- a/hoymiles_wifi/__main__.py +++ b/hoymiles_wifi/__main__.py @@ -1,31 +1,51 @@ +"""Contains the main functionality of the hoymiles_wifi package.""" + +from __future__ import annotations + import argparse import asyncio -from dataclasses import dataclass, asdict import json -from google.protobuf.message import Message +from dataclasses import asdict, dataclass + from google.protobuf.json_format import MessageToJson -from hoymiles_wifi.inverter import Inverter +from google.protobuf.message import Message +from hoymiles_wifi.const import DTU_FIRMWARE_URL_00_01_11, MAX_POWER_LIMIT +from hoymiles_wifi.dtu import DTU from hoymiles_wifi.hoymiles import ( - generate_version_string, - generate_sw_version_string, generate_dtu_version_string, - get_hw_model_name, - get_dtu_model_name, + generate_sw_version_string, + generate_version_string, +) +from hoymiles_wifi.protobuf import ( + AppGetHistPower_pb2, + APPHeartbeatPB_pb2, + APPInfomationData_pb2, + CommandPB_pb2, + GetConfig_pb2, + InfomationData_pb2, + NetworkInfo_pb2, + RealData_pb2, + RealDataHMS_pb2, + RealDataNew_pb2, ) -from hoymiles_wifi.const import ( - DTU_FIRMWARE_URL_00_01_11 -) +RED = "\033[91m" +END = "\033[0m" + @dataclass class VersionInfo: + """Represents version information for the hoymiles_wifi package.""" + dtu_hw_version: str dtu_sw_version: str inverter_hw_version: str inverter_sw_version: str - def __str__(self): + def __str__(self: VersionInfo) -> str: + """Return a string representation of the VersionInfo object.""" + return ( f'dtu_hw_version: "{self.dtu_hw_version}"\n' f'dtu_sw_version: "{self.dtu_sw_version}"\n' @@ -33,91 +53,132 @@ class VersionInfo: f'inverter_sw_version: "{self.inverter_sw_version}"\n' ) - def to_dict(self): + def to_dict(self: VersionInfo) -> dict: + """Convert the VersionInfo object to a dictionary.""" + return asdict(self) + # Inverter commands -async def async_get_real_data_new(inverter): - return await inverter.async_get_real_data_new() +async def async_get_real_data_new( + dtu: DTU, +) -> RealDataNew_pb2.RealDataNewResDTO | None: + """Get real data from the inverter asynchronously.""" -async def async_get_real_data_hms(inverter): - return await inverter.async_get_real_data_hms() + return await dtu.async_get_real_data_new() -async def async_get_real_data(inverter): - return await inverter.async_get_real_data() -async def async_get_config(inverter): - return await inverter.async_get_config() +async def async_get_real_data_hms( + dtu: DTU, +) -> RealDataHMS_pb2.RealDataHMSResDTO | None: + """Get real data from the inverter asynchronously.""" -async def async_network_info(inverter): - return await inverter.async_network_info() + return await dtu.async_get_real_data_hms() -async def async_app_information_data(inverter): - return await inverter.async_app_information_data() -async def async_app_get_hist_power(inverter): - return await inverter.async_app_get_hist_power() +async def async_get_real_data(dtu: DTU) -> RealData_pb2.RealDataResDTO | None: + """Get real data from the inverter asynchronously.""" -async def async_set_power_limit(inverter): + return await dtu.async_get_real_data() - RED = '\033[91m' - END = '\033[0m' - print(RED + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + END) - print(RED + "!!! Danger zone! This will change the power limit of the inverter. !!!" + END) - print(RED + "!!! Please be careful and make sure you know what you are doing. !!!" + END) - print(RED + "!!! Only proceed if you know what you are doing. !!!" + END) - print(RED + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + END) - print("") +async def async_get_config(dtu: DTU) -> GetConfig_pb2.GetConfigResDTO | None: + """Get the config from the inverter asynchronously.""" + return await dtu.async_get_config() + + +async def async_network_info( + dtu: DTU, +) -> NetworkInfo_pb2.NetworkInfoResDTO | None: + """Get network information from the inverter asynchronously.""" + + return await dtu.async_network_info() + + +async def async_app_information_data( + dtu: DTU, +) -> APPInfomationData_pb2.AppInfomationDataResDTO | None: + """Get application information data from the inverter asynchronously.""" + + return await dtu.async_app_information_data() + + +async def async_app_get_hist_power( + dtu: DTU, +) -> AppGetHistPower_pb2.AppGetHistPowerResDTO: + """Get historical power data from the inverter asynchronously.""" + + return await dtu.async_app_get_hist_power() + + +async def async_set_power_limit( + dtu: DTU, +) -> CommandPB_pb2.CommandResDTO | None: + """Set the power limit of the inverter asynchronously.""" + + print( # noqa: T201 + RED + + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" + + "!!! Danger zone! This will change the power limit of the dtu. !!!\n" + + "!!! Please be careful and make sure you know what you are doing. !!!\n" + + "!!! Only proceed if you know what you are doing. !!!\n" + + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" + + END, + ) cont = input("Do you want to continue? (y/n): ") - if(cont != 'y'): - return + if cont != "y": + return None power_limit = int(input("Enter the new power limit (0-100): ")) - if(power_limit < 0 or power_limit > 100): - print("Error. Invalid power limit!") - return - - print(f"Setting power limit to {power_limit}%") - cont = input("Are you sure? (y/n): ") + if power_limit < 0 or power_limit > MAX_POWER_LIMIT: + print("Error. Invalid power limit!") # noqa: T201 + return None - if(cont != 'y'): - return + print(f"Setting power limit to {power_limit}%") # noqa: T201 + cont = input("Are you sure? (y/n): ") - return await inverter.async_set_power_limit(power_limit) + if cont != "y": + return None + + return await dtu.async_set_power_limit(power_limit) -async def async_set_wifi(inverter): +async def async_set_wifi(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None: + """Set the wifi SSID and password of the inverter asynchronously.""" + wifi_ssid = input("Enter the new wifi SSID: ").strip() wifi_password = input("Enter the new wifi password: ").strip() - print(f'Setting wifi to "{wifi_ssid}"') - print(f'Setting wifi password to "{wifi_password}"') + print(f'Setting wifi to "{wifi_ssid}"') # noqa: T201 + print(f'Setting wifi password to "{wifi_password}"') # noqa: T201 cont = input("Are you sure? (y/n): ") - if(cont != 'y'): - return - return await inverter.async_set_wifi(wifi_ssid, wifi_password) + if cont != "y": + return None + return await dtu.async_set_wifi(wifi_ssid, wifi_password) -async def async_firmware_update(inverter): - RED = '\033[91m' - END = '\033[0m' - print(RED + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + END) - print(RED + "!!! Danger zone! This will update the firmeware of the DTU. !!!" + END) - print(RED + "!!! Please be careful and make sure you know what you are doing. !!!" + END) - print(RED + "!!! Only proceed if you know what you are doing. !!!" + END) - print(RED + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + END) - print("") +async def async_firmware_update(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None: + """Update the firmware of the DTU asynchronously.""" + + print( # noqa: T201 + RED + + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" + + "!!! Danger zone! This will update the firmeware of the DTU. !!!\n" + + "!!! Please be careful and make sure you know what you are doing. !!!\n" + + "!!! Only proceed if you know what you are doing. !!!\n" + + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" + + END, + ) cont = input("Do you want to continue? (y/n): ") - if(cont != 'y'): - return - - print("Please select a firmware version:") - print("1.) V00.01.11") - print("2.) Custom URL") + if cont != "y": + return None + + print("Please select a firmware version:") # noqa: T201 + print("1.) V00.01.11") # noqa: T201 + print("2.) Custom URL") # noqa: T201 while True: selection = input("Enter your selection (1 or 2): ") @@ -125,74 +186,106 @@ async def async_firmware_update(inverter): if selection == "1": url = DTU_FIRMWARE_URL_00_01_11 break - elif selection == "2": + if selection == "2": url = input("Enter the custom URL: ").strip() break - else: - print("Invalid selection. Please enter 1 or 2.") - - print() - print(f'Firmware update URL: "{url}"') - print() + + print("Invalid selection. Please enter 1 or 2.") # noqa: T201 + + print() # noqa: T201 + print(f'Firmware update URL: "{url}"') # noqa: T201 + print() # noqa: T201 cont = input("Do you want to continue? (y/n): ") - if(cont != 'y'): - return - - return await inverter.async_update_dtu_firmware() + if cont != "y": + return None -async def async_restart(inverter): + return await dtu.async_update_dtu_firmware() - cont = input("Do you want to restart the device? (y/n): ") - if(cont != 'y'): - return - - return await inverter.async_restart() -async def async_turn_off(inverter): - cont = input("Do you want to turn *OFF* the device? (y/n): ") - if(cont != 'y'): - return - - return await inverter.async_turn_off() +async def async_restart_dtu(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None: + """Restart the device asynchronously.""" -async def async_turn_on(inverter): - cont = input("Do you want to turn *ON* the device? (y/n): ") - if(cont != 'y'): - return - - return await inverter.async_turn_on() + cont = input("Do you want to restart the DTU? (y/n): ") + if cont != "y": + return None -async def async_get_information_data(inverter): - return await inverter.async_get_information_data() + return await dtu.async_restart_dtu() -async def async_get_version_info(inverter): - response = await async_app_information_data(inverter) + +async def async_turn_on_dtu(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None: + """Turn on the dtu asynchronously.""" + + cont = input("Do you want to turn *ON* the DTU? (y/n): ") + if cont != "y": + return None + + return await dtu.async_turn_on_dtu() + + +async def async_turn_off_dtu(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None: + """Turn off the dtu asynchronously.""" + + cont = input("Do you want to turn *OFF* the DTU? (y/n): ") + if cont != "y": + return None + + return await dtu.async_turn_off_dtu() + + +async def async_get_information_data( + dtu: DTU, +) -> InfomationData_pb2.InfomationDataResDTO: + """Get information data from the dtu asynchronously.""" + + return await dtu.async_get_information_data() + + +async def async_get_version_info(dtu: DTU) -> VersionInfo | None: + """Get version information from the dtu asynchronously.""" + + response = await async_app_information_data(dtu) if not response: return None return VersionInfo( - dtu_hw_version="H" + generate_dtu_version_string(response.dtu_info.dtu_hw_version), - dtu_sw_version="V" + generate_dtu_version_string(response.dtu_info.dtu_sw_version), - inverter_hw_version="H" + generate_version_string(response.pv_info[0].pv_hw_version), - inverter_sw_version="V" + generate_sw_version_string(response.pv_info[0].pv_sw_version), + dtu_hw_version="H" + + generate_dtu_version_string(response.dtu_info.dtu_hw_version), + dtu_sw_version="V" + + generate_dtu_version_string(response.dtu_info.dtu_sw_version), + inverter_hw_version="H" + + generate_version_string(response.pv_info[0].pv_hw_version), + inverter_sw_version="V" + + generate_sw_version_string(response.pv_info[0].pv_sw_version), ) -async def async_heatbeat(inverter): - return await inverter.async_heartbeat() -def print_invalid_command(command): - print(f"Invalid command: {command}") +async def async_heatbeat(dtu: DTU) -> APPHeartbeatPB_pb2.APPHeartbeatResDTO | None: + """Request a heartbeat from the dtu asynchronously.""" -async def main(): + return await dtu.async_heartbeat() + + +def print_invalid_command(command: str) -> None: + """Print an invalid command message.""" + + print(f"Invalid command: {command}") # noqa: T201 + + +async def main() -> None: + """Execute the main function for the hoymiles_wifi package.""" parser = argparse.ArgumentParser(description="Hoymiles HMS Monitoring") parser.add_argument( - "--host", type=str, required=True, help="IP address or hostname of the inverter" - ) - parser.add_argument('--as-json', action='store_true', default=False, - help='Format the output as JSON') + "--host", type=str, required=True, help="IP address or hostname of the DTU" + ) + parser.add_argument( + "--as-json", + action="store_true", + default=False, + help="Format the output as JSON", + ) parser.add_argument( "command", type=str, @@ -217,46 +310,52 @@ async def main(): help="Command to execute", ) args = parser.parse_args() - - inverter = Inverter(args.host) + + dtu = DTU(args.host) # Execute the specified command using a switch case switch = { - 'get-real-data-new': async_get_real_data_new, - 'get-real-data-hms': async_get_real_data_hms, - 'get-real-data': async_get_real_data, - 'get-config': async_get_config, - 'network-info': async_network_info, - 'app-information-data': async_app_information_data, - 'app-get-hist-power': async_app_get_hist_power, - 'set-power-limit': async_set_power_limit, - 'set-wifi': async_set_wifi, - 'firmware-update': async_firmware_update, - 'restart': async_restart, - 'turn-on': async_turn_on, - 'turn-off': async_turn_off, - 'get-information-data': async_get_information_data, - 'get-version-info': async_get_version_info, - 'heartbeat': async_heatbeat, + "get-real-data-new": async_get_real_data_new, + "get-real-data-hms": async_get_real_data_hms, + "get-real-data": async_get_real_data, + "get-config": async_get_config, + "network-info": async_network_info, + "app-information-data": async_app_information_data, + "app-get-hist-power": async_app_get_hist_power, + "set-power-limit": async_set_power_limit, + "set-wifi": async_set_wifi, + "firmware-update": async_firmware_update, + "restart": async_restart_dtu, + "turn-on": async_turn_on_dtu, + "turn-off": async_turn_off_dtu, + "get-information-data": async_get_information_data, + "get-version-info": async_get_version_info, + "heartbeat": async_heatbeat, } command_func = switch.get(args.command, print_invalid_command) - response = await command_func(inverter) + response = await command_func(dtu) if response: if args.as_json: - if isinstance(response, Message): - print(MessageToJson(response)) + if isinstance(response, Message): + print(MessageToJson(response)) # noqa: T201 else: - print(json.dumps(asdict(response), indent=4)) + print(json.dumps(asdict(response), indent=4)) # noqa: T201 else: - print(f"{args.command.capitalize()} Response: \n{response}") + print(f"{args.command.capitalize()} Response: \n{response}") # noqa: T201 else: - print(f"No response or unable to retrieve response for {args.command.replace('_', ' ')}") + print( # noqa: T201 + f"No response or unable to retrieve response for " + f"{args.command.replace('_', ' ')}", + ) -def run_main(): +def run_main() -> None: + """Run the main function for the hoymiles_wifi package.""" + asyncio.run(main()) + if __name__ == "__main__": - run_main() \ No newline at end of file + run_main() diff --git a/hoymiles_wifi/const.py b/hoymiles_wifi/const.py index 063188f..fd479d7 100644 --- a/hoymiles_wifi/const.py +++ b/hoymiles_wifi/const.py @@ -1,35 +1,37 @@ -INVERTER_PORT = 10081 +"""Constants for the Hoymiles WiFi integration.""" + +DTU_PORT = 10081 # App -> DTU start with 0xa3, responses start 0xa2 -CMD_HEADER = b'HM' -CMD_APP_INFO_DATA_RES_DTO = b'\xa3\x01' -CMD_HB_RES_DTO = b'\xa3\x02' -CMD_REAL_DATA_RES_DTO = b'\xa3\x03' -CMD_W_INFO_RES_DTO = b'\xa3\x04' -CMD_COMMAND_RES_DTO = b'\xa3\x05' -CMD_COMMAND_STATUS_RES_DTO = b'\xa3\x06' -CMD_DEV_CONFIG_FETCH_RES_DTO = b'\xa3\x07' -CMD_DEV_CONFIG_PUT_RES_DTO = b'\xa3\x08' -CMD_GET_CONFIG = b'\xa3\x09' -CMD_SET_CONFIG = b'\xa3\x10' -CMD_REAL_RES_DTO = b'\xa3\x11' -CMD_GPST_RES_DTO = b'\xa3\x12' -CMD_AUTO_SEARCH = b'\xa3\x13' -CMD_NETWORK_INFO_RES = b'\xa3\x14' -CMD_APP_GET_HIST_POWER_RES = b'\xa3\x15' -CMD_APP_GET_HIST_ED_RES = b'\xa3\x16' -CMD_HB_RES_DTO_ALT = b'\x83\x01' -CMD_REGISTER_RES_DTO = b'\x83\x02' -CMD_STORAGE_DATA_RES = b'\x83\x03' -CMD_COMMAND_RES_DTO_2 = b'\x83\x05' -CMD_COMMAND_STATUS_RES_DTO_2 = b'\x83\x06' -CMD_DEV_CONFIG_FETCH_RES_DTO_2 = b'\x83\x07' -CMD_DEV_CONFIG_PUT_RES_DTO_2 = b'\x83\x08' -CMD_GET_CONFIG_RES = b'\xdb\x08' -CMD_SET_CONFIG_RES = b'\xdb\x07' +CMD_HEADER = b"HM" +CMD_APP_INFO_DATA_RES_DTO = b"\xa3\x01" +CMD_HB_RES_DTO = b"\xa3\x02" +CMD_REAL_DATA_RES_DTO = b"\xa3\x03" +CMD_W_INFO_RES_DTO = b"\xa3\x04" +CMD_COMMAND_RES_DTO = b"\xa3\x05" +CMD_COMMAND_STATUS_RES_DTO = b"\xa3\x06" +CMD_DEV_CONFIG_FETCH_RES_DTO = b"\xa3\x07" +CMD_DEV_CONFIG_PUT_RES_DTO = b"\xa3\x08" +CMD_GET_CONFIG = b"\xa3\x09" +CMD_SET_CONFIG = b"\xa3\x10" +CMD_REAL_RES_DTO = b"\xa3\x11" +CMD_GPST_RES_DTO = b"\xa3\x12" +CMD_AUTO_SEARCH = b"\xa3\x13" +CMD_NETWORK_INFO_RES = b"\xa3\x14" +CMD_APP_GET_HIST_POWER_RES = b"\xa3\x15" +CMD_APP_GET_HIST_ED_RES = b"\xa3\x16" +CMD_HB_RES_DTO_ALT = b"\x83\x01" +CMD_REGISTER_RES_DTO = b"\x83\x02" +CMD_STORAGE_DATA_RES = b"\x83\x03" +CMD_COMMAND_RES_DTO_2 = b"\x83\x05" +CMD_COMMAND_STATUS_RES_DTO_2 = b"\x83\x06" +CMD_DEV_CONFIG_FETCH_RES_DTO_2 = b"\x83\x07" +CMD_DEV_CONFIG_PUT_RES_DTO_2 = b"\x83\x08" +CMD_GET_CONFIG_RES = b"\xdb\x08" +CMD_SET_CONFIG_RES = b"\xdb\x07" -CMD_CLOUD_INFO_DATA_RES_DTO = b'\x23\x01' -CMD_CLOUD_COMMAND_RES_DTO = b'\x23\x05' +CMD_CLOUD_INFO_DATA_RES_DTO = b"\x23\x01" +CMD_CLOUD_COMMAND_RES_DTO = b"\x23\x05" CMD_ACTION_MICRO_DEFAULT = 0 CMD_ACTION_DTU_REBOOT = 1 @@ -96,15 +98,19 @@ CMD_ACTION_INV_UPGRADE = 4112 CMD_ACTION_BMS_UPGRADE = 4112 -DEV_DTU = 1 +DEV_DTU = 1 DEV_REPEATER = 2 DEV_MICRO = 3 -DEV_MODEL = 4 -DEV_METER = 5 +DEV_MODEL = 4 +DEV_METER = 5 DEV_INV = 6 DEV_RSD = 7 DEV_OP = 8 DEV_GATEWAY = 9 DEV_BMS = 10 -DTU_FIRMWARE_URL_00_01_11 = "http://fwupdate.hoymiles.com/cfs/bin/2311/06/,1488725943932555264.bin" +DTU_FIRMWARE_URL_00_01_11 = ( + "http://fwupdate.hoymiles.com/cfs/bin/2311/06/,1488725943932555264.bin" +) + +MAX_POWER_LIMIT = 100 diff --git a/hoymiles_wifi/inverter.py b/hoymiles_wifi/dtu.py similarity index 56% rename from hoymiles_wifi/inverter.py rename to hoymiles_wifi/dtu.py index 7ccb75d..252474a 100644 --- a/hoymiles_wifi/inverter.py +++ b/hoymiles_wifi/dtu.py @@ -1,128 +1,185 @@ +"""DTU communication implementation for Hoymiles WiFi.""" + from __future__ import annotations + import asyncio import struct -from typing import Any -from crcmod import mkCrcFun -from datetime import datetime import time +from datetime import datetime +from enum import Enum +from typing import Any + +from crcmod import mkCrcFun from hoymiles_wifi import logger - -from hoymiles_wifi.utils import initialize_set_config - +from hoymiles_wifi.const import ( + CMD_ACTION_DTU_REBOOT, + CMD_ACTION_DTU_UPGRADE, + CMD_ACTION_LIMIT_POWER, + CMD_ACTION_MI_SHUTDOWN, + CMD_ACTION_MI_START, + CMD_APP_GET_HIST_POWER_RES, + CMD_APP_INFO_DATA_RES_DTO, + CMD_CLOUD_COMMAND_RES_DTO, + CMD_COMMAND_RES_DTO, + CMD_GET_CONFIG, + CMD_HB_RES_DTO, + CMD_HEADER, + CMD_NETWORK_INFO_RES, + CMD_REAL_DATA_RES_DTO, + CMD_REAL_RES_DTO, + CMD_SET_CONFIG, + DEV_DTU, + DTU_FIRMWARE_URL_00_01_11, + DTU_PORT, +) from hoymiles_wifi.protobuf import ( + AppGetHistPower_pb2, APPHeartbeatPB_pb2, APPInfomationData_pb2, - AppGetHistPower_pb2, CommandPB_pb2, - InfomationData_pb2, GetConfig_pb2, - RealData_pb2, - RealDataNew_pb2, - RealDataHMS_pb2, + InfomationData_pb2, NetworkInfo_pb2, + RealData_pb2, + RealDataHMS_pb2, + RealDataNew_pb2, SetConfig_pb2, - ) - - -from hoymiles_wifi.const import ( - INVERTER_PORT, - CMD_HEADER, - CMD_GET_CONFIG, - CMD_REAL_RES_DTO, - CMD_REAL_DATA_RES_DTO, - CMD_NETWORK_INFO_RES, - CMD_APP_INFO_DATA_RES_DTO, - CMD_APP_GET_HIST_POWER_RES, - CMD_ACTION_LIMIT_POWER, - CMD_COMMAND_RES_DTO, - CMD_ACTION_DTU_UPGRADE, - CMD_SET_CONFIG, - CMD_CLOUD_COMMAND_RES_DTO, - CMD_ACTION_DTU_REBOOT, - CMD_ACTION_MI_START, - CMD_ACTION_MI_SHUTDOWN, - DTU_FIRMWARE_URL_00_01_11, - CMD_HB_RES_DTO, - DEV_DTU, ) +from hoymiles_wifi.utils import initialize_set_config -class NetmodeSelect: +class NetmodeSelect(Enum): + """Network mode selection.""" + WIFI = 1 SIM = 2 LAN = 3 -class NetworkState: + +class NetworkState(Enum): + """Network state.""" + Unknown = 0 Online = 1 Offline = 2 -class Inverter: + +class DTU: + """DTU class.""" + def __init__(self, host: str): + """Initialize DTU class.""" + self.host = host self.state = NetworkState.Unknown self.sequence = 0 self.mutex = asyncio.Lock() def get_state(self) -> NetworkState: + """Get DTU state.""" + return self.state def set_state(self, new_state: NetworkState): + """Set DTU state.""" + if self.state != new_state: self.state = new_state - logger.debug(f"Inverter is {new_state}") - + logger.debug(f"DTU is {new_state}") + async def async_get_real_data_hms(self) -> RealDataHMS_pb2.HMSStateResponse | None: + """Get real data HMS.""" + request = RealDataHMS_pb2.HMSRealDataResDTO() command = CMD_REAL_DATA_RES_DTO - return await self.async_send_request(command, request, RealDataHMS_pb2.HMSStateResponse) - + return await self.async_send_request( + command, request, RealDataHMS_pb2.HMSStateResponse + ) + async def async_get_real_data(self) -> RealData_pb2.RealDataResDTO | None: + """Get real data.""" + request = RealData_pb2.RealDataResDTO() command = CMD_REAL_DATA_RES_DTO - return await self.async_send_request(command, request, RealData_pb2.RealDataReqDTO) + return await self.async_send_request( + command, request, RealData_pb2.RealDataReqDTO + ) async def async_get_real_data_new(self) -> RealDataNew_pb2.RealDataNewResDTO | None: + """Get real data new.""" + request = RealDataNew_pb2.RealDataNewResDTO() - request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8") + request.time_ymd_hms = ( + datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8") + ) request.offset = 28800 request.time = int(time.time()) command = CMD_REAL_RES_DTO - return await self.async_send_request(command, request, RealDataNew_pb2.RealDataNewReqDTO) - + return await self.async_send_request( + command, request, RealDataNew_pb2.RealDataNewReqDTO + ) + async def async_get_config(self) -> GetConfig_pb2.GetConfigResDTO | None: + """Get config.""" + request = GetConfig_pb2.GetConfigResDTO() request.offset = 28800 request.time = int(time.time()) - 60 command = CMD_GET_CONFIG - return await self.async_send_request(command, request, GetConfig_pb2.GetConfigReqDTO) + return await self.async_send_request( + command, + request, + GetConfig_pb2.GetConfigReqDTO, + ) async def async_network_info(self) -> NetworkInfo_pb2.NetworkInfoResDTO | None: + """Get network info.""" + request = NetworkInfo_pb2.NetworkInfoResDTO() request.offset = 28800 request.time = int(time.time()) command = CMD_NETWORK_INFO_RES - return await self.async_send_request(command, request, NetworkInfo_pb2.NetworkInfoReqDTO) - - async def async_app_information_data(self) -> APPInfomationData_pb2.APPInfoDataResDTO: + return await self.async_send_request( + command, request, NetworkInfo_pb2.NetworkInfoReqDTO + ) + + async def async_app_information_data( + self, + ) -> APPInfomationData_pb2.APPInfoDataResDTO: + """Get app information data.""" request = APPInfomationData_pb2.APPInfoDataResDTO() - request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8") + request.time_ymd_hms = ( + datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8") + ) request.offset = 28800 request.time = int(time.time()) command = CMD_APP_INFO_DATA_RES_DTO - return await self.async_send_request(command, request, APPInfomationData_pb2.APPInfoDataReqDTO) - - async def async_app_get_hist_power(self) -> AppGetHistPower_pb2.AppGetHistPowerResDTO | None: + return await self.async_send_request( + command, request, APPInfomationData_pb2.APPInfoDataReqDTO + ) + + async def async_app_get_hist_power( + self, + ) -> AppGetHistPower_pb2.AppGetHistPowerResDTO | None: + """Get historical power.""" + request = AppGetHistPower_pb2.AppGetHistPowerResDTO() request.offset = 28800 request.requested_time = int(time.time()) command = CMD_APP_GET_HIST_POWER_RES - return await self.async_send_request(command, request, AppGetHistPower_pb2.AppGetHistPowerReqDTO) - - async def async_set_power_limit(self, power_limit: int) -> CommandPB_pb2.CommandResDTO | None: + return await self.async_send_request( + command, + request, + AppGetHistPower_pb2.AppGetHistPowerReqDTO, + ) - if(power_limit < 0 or power_limit > 100): + async def async_set_power_limit( + self, + power_limit: int, + ) -> CommandPB_pb2.CommandResDTO | None: + """Set power limit.""" + if power_limit < 0 or power_limit > 100: logger.error("Error. Invalid power limit!") return @@ -133,46 +190,58 @@ class Inverter: request.action = CMD_ACTION_LIMIT_POWER request.package_nub = 1 request.tid = int(time.time()) - request.data = f'A:{power_limit},B:0,C:0\r'.encode('utf-8') + request.data = f"A:{power_limit},B:0,C:0\r".encode() command = CMD_COMMAND_RES_DTO - return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO) - - async def async_set_wifi(self, ssid: str, password: str) -> SetConfig_pb2.SetConfigResDTO | None: + return await self.async_send_request( + command, request, CommandPB_pb2.CommandReqDTO + ) + + async def async_set_wifi( + self, ssid: str, password: str + ) -> SetConfig_pb2.SetConfigResDTO | None: + """Set wifi.""" get_config_req = await self.async_get_config() - if(get_config_req is None): + if get_config_req is None: logger.error("Failed to get config") - return - + return None + request = initialize_set_config(get_config_req) - - request.time = int(time.time()) + + request.time = int(time.time()) request.offset = 28800 request.app_page = 1 request.netmode_select = NetmodeSelect.WIFI - request.wifi_ssid = ssid.encode('utf-8') - request.wifi_password = password.encode('utf-8') + request.wifi_ssid = ssid.encode("utf-8") + request.wifi_password = password.encode("utf-8") command = CMD_SET_CONFIG - return await self.async_send_request(command, request, SetConfig_pb2.SetConfigReqDTO) + return await self.async_send_request( + command, request, SetConfig_pb2.SetConfigReqDTO + ) - - async def async_update_dtu_firmware(self, firmware_url: str = DTU_FIRMWARE_URL_00_01_11) -> CommandPB_pb2.CommandResDTO | None: + async def async_update_dtu_firmware( + self, + firmware_url: str = DTU_FIRMWARE_URL_00_01_11, + ) -> CommandPB_pb2.CommandResDTO | None: + """Update DTU firmware.""" request = CommandPB_pb2.CommandResDTO() request.action = CMD_ACTION_DTU_UPGRADE request.package_nub = 1 request.tid = int(time.time()) - request.data = (firmware_url + '\r').encode('utf-8') + request.data = (firmware_url + "\r").encode("utf-8") command = CMD_CLOUD_COMMAND_RES_DTO - return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO) - + return await self.async_send_request( + command, request, CommandPB_pb2.CommandReqDTO + ) - async def async_restart(self) -> CommandPB_pb2.CommandResDTO | None: + async def async_restart_dtu(self) -> CommandPB_pb2.CommandResDTO | None: + """Restart DTU.""" request = CommandPB_pb2.CommandResDTO() request.action = CMD_ACTION_DTU_REBOOT @@ -180,10 +249,12 @@ class Inverter: request.tid = int(time.time()) command = CMD_CLOUD_COMMAND_RES_DTO - return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO) + return await self.async_send_request( + command, request, CommandPB_pb2.CommandReqDTO + ) - - async def async_turn_on(self) -> CommandPB_pb2.CommandResDTO | None: + async def async_turn_on_dtu(self) -> CommandPB_pb2.CommandResDTO | None: + """Turn on DTU.""" request = CommandPB_pb2.CommandResDTO() request.action = CMD_ACTION_MI_START @@ -192,9 +263,12 @@ class Inverter: request.tid = int(time.time()) command = CMD_CLOUD_COMMAND_RES_DTO - return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO) - - async def async_turn_off(self) -> CommandPB_pb2.CommandResDTO | None: + return await self.async_send_request( + command, request, CommandPB_pb2.CommandReqDTO + ) + + async def async_turn_off_dtu(self) -> CommandPB_pb2.CommandResDTO | None: + """Turn off DTU.""" request = CommandPB_pb2.CommandResDTO() request.action = CMD_ACTION_MI_SHUTDOWN @@ -203,41 +277,67 @@ class Inverter: request.tid = int(time.time()) command = CMD_CLOUD_COMMAND_RES_DTO - return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO) - - async def async_get_information_data(self) -> InfomationData_pb2.InfoDataResDTO | None: + return await self.async_send_request( + command, request, CommandPB_pb2.CommandReqDTO + ) + + async def async_get_information_data( + self, + ) -> InfomationData_pb2.InfoDataResDTO | None: + """Get information data.""" request = InfomationData_pb2.InfoDataResDTO() - request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8") + request.time_ymd_hms = ( + datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8") + ) request.offset = 28800 request.time = int(time.time()) command = CMD_APP_INFO_DATA_RES_DTO - return await self.async_send_request(command, request, InfomationData_pb2.InfoDataReqDTO) - + return await self.async_send_request( + command, request, InfomationData_pb2.InfoDataReqDTO + ) async def async_heartbeat(self) -> APPHeartbeatPB_pb2.HBReqDTO | None: + """Request heartbeat.""" request = APPHeartbeatPB_pb2.HBResDTO() - request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8") + request.time_ymd_hms = ( + datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8") + ) request.offset = 28800 request.time = int(time.time()) command = CMD_HB_RES_DTO - return await self.async_send_request(command, request, APPHeartbeatPB_pb2.HBReqDTO) - + return await self.async_send_request( + command, request, APPHeartbeatPB_pb2.HBReqDTO + ) + + async def async_send_request( + self, + command: bytes, + request: Any, + response_type: Any, + dtu_port: int = DTU_PORT, + ): + """Send request to DTU.""" - async def async_send_request(self, command: bytes, request: Any, response_type: Any, inverter_port: int = INVERTER_PORT): self.sequence = (self.sequence + 1) & 0xFFFF request_as_bytes = request.SerializeToString() - crc16 = mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)(request_as_bytes) + crc16 = mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)( + request_as_bytes + ) length = len(request_as_bytes) + 10 # compose request message header = CMD_HEADER + command - message = header + struct.pack('>HHH', self.sequence, crc16, length) + request_as_bytes + message = ( + header + + struct.pack(">HHH", self.sequence, crc16, length) + + request_as_bytes + ) - address = (self.host, inverter_port) + address = (self.host, dtu_port) async with self.mutex: try: @@ -261,18 +361,22 @@ class Inverter: try: if len(buf) < 10: raise ValueError("Buffer is too short for unpacking") - - crc16_target, read_length = struct.unpack('>HH', buf[6:10]) - if(len(buf) != read_length): + crc16_target, read_length = struct.unpack(">HH", buf[6:10]) + + if len(buf) != read_length: raise ValueError("Buffer is incomplete") response_as_bytes = buf[10:read_length] - crc16_response = mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)(response_as_bytes) + crc16_response = mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)( + response_as_bytes + ) if crc16_response != crc16_target: - logger.debug(f"CRC16 mismatch: {hex(crc16_response)} != {hex(crc16_target)}") + logger.debug( + f"CRC16 mismatch: {hex(crc16_response)} != {hex(crc16_target)}" + ) raise ValueError("CRC16 mismatch") parsed = response_type.FromString(response_as_bytes) diff --git a/hoymiles_wifi/hoymiles.py b/hoymiles_wifi/hoymiles.py index 229b3df..70be4f1 100644 --- a/hoymiles_wifi/hoymiles.py +++ b/hoymiles_wifi/hoymiles.py @@ -1,22 +1,31 @@ -""""Hoymiles quirks for inverters and DTU""" +"""Hoymiles quirks for inverters and DTU.""" -from enum import Enum import struct +from enum import Enum from hoymiles_wifi import logger + class InverterType(Enum): + """Inverter type.""" + ONE = "1T" TWO = "2T" FOUR = "4T" SIX = "6T" + class InverterSeries(Enum): + """Inverter series.""" + HM = "HM" HMS = "HMS" HMT = "HMT" + class InverterPower(Enum): + """Inverter power.""" + P_100 = "100" P_250 = "250" P_300_350_400 = "300/350/400" @@ -31,7 +40,27 @@ class InverterPower(Enum): P_2000 = "2000" +power_mapping = { + 0x1011: InverterPower.P_100, + 0x1020: InverterPower.P_250, + 0x1021: InverterPower.P_300_350_400, + 0x1121: InverterPower.P_300_350_400, + 0x1125: InverterPower.P_400, + 0x1040: InverterPower.P_500, + 0x1041: InverterPower.P_600_700_800, + 0x1042: InverterPower.P_600_700_800, + 0x1141: InverterPower.P_600_700_800, + 0x1060: InverterPower.P_1000, + 0x1061: InverterPower.P_1200_1500, + 0x1161: InverterPower.P_1000_1200_1500, + 0x1164: InverterPower.P_1600, + 0x1412: InverterPower.P_800W, +} + + class DTUType(Enum): + """DTU type.""" + DTU_G100 = "DTU-G100" DTU_W100 = "DTU-W100" DTU_LITE_S = "DTU-Lite-S" @@ -43,43 +72,114 @@ class DTUType(Enum): DTU_W_LITE = "DTU-WLite" -def format_number(number) -> str: - return "{:02d}".format(number) +type_mapping = { + 0x10F7: DTUType.DTU_PRO, + 0x10FB: DTUType.DTU_PRO, + 0x4101: DTUType.DTU_PRO, + 0x10FC: DTUType.DTU_PRO, + 0x4120: DTUType.DTU_PRO, + 0x10F8: DTUType.DTU_PRO, + 0x4100: DTUType.DTU_PRO, + 0x10FD: DTUType.DTU_PRO, + 0x4121: DTUType.DTU_PRO, + 0x10D3: DTUType.DTU_W100_LITE_S, + 0x4110: DTUType.DTU_W100_LITE_S, + 0x10D8: DTUType.DTU_W100_LITE_S, + 0x4130: DTUType.DTU_W100_LITE_S, + 0x4132: DTUType.DTU_W100_LITE_S, + 0x4133: DTUType.DTU_W100_LITE_S, + 0x10D9: DTUType.DTU_W100_LITE_S, + 0x4111: DTUType.DTU_W100_LITE_S, + 0x10D2: DTUType.DTU_G100, + 0x10D6: DTUType.DTU_LITE, + 0x10D7: DTUType.DTU_LITE, + 0x4131: DTUType.DTU_LITE, + 0x1124: DTUType.DTU_HMS_W, + 0x1125: DTUType.DTU_HMS_W, + 0x1403: DTUType.DTU_HMS_W, + 0x1144: DTUType.DTU_HMS_W, + 0x1143: DTUType.DTU_HMS_W, + 0x1145: DTUType.DTU_HMS_W, + 0x1412: DTUType.DTU_HMS_W, + 0x1164: DTUType.DTU_HMS_W, + 0x1165: DTUType.DTU_HMS_W, + 0x1166: DTUType.DTU_HMS_W, + 0x1167: DTUType.DTU_HMS_W, + 0x1222: DTUType.DTU_HMS_W, + 0x1422: DTUType.DTU_HMS_W, + 0x1423: DTUType.DTU_HMS_W, + 0x1361: DTUType.DTU_HMS_W, + 0x1362: DTUType.DTU_HMS_W, + 0x1381: DTUType.DTU_HMS_W, + 0x1382: DTUType.DTU_HMS_W, + 0x4143: DTUType.DTU_HMS_W, +} + + +def format_number(number: int) -> str: + """Format number to two digits.""" + + return f"{number:02d}" + def generate_version_string(version_number: int) -> str: - version_string = format_number(version_number // 2048) + "." + format_number((version_number // 64) % 32) + "." + format_number(version_number % 64) + """Generate version string.""" + + version_string = ( + format_number(version_number // 2048) + + "." + + format_number((version_number // 64) % 32) + + "." + + format_number(version_number % 64) + ) return version_string + def generate_sw_version_string(version_number: int) -> str: + """Generate software version string.""" version_number2 = version_number // 10000 version_number3 = (version_number - (version_number2 * 10000)) // 100 - version_number4 = (version_number - (version_number2 * 10000)) - (version_number3 * 100) + version_number4 = (version_number - (version_number2 * 10000)) - ( + version_number3 * 100 + ) - version_string = format_number(version_number2) + "." + format_number(version_number3) + "." + format_number(version_number4) + version_string = ( + format_number(version_number2) + + "." + + format_number(version_number3) + + "." + + format_number(version_number4) + ) return version_string -def generate_dtu_version_string(version_number: int, type: str = None) -> str: +def generate_dtu_version_string(version_number: int, type: str = "") -> str: + """Generate DTU version string.""" version_string = "" version_number2 = version_number % 256 version_number3 = (version_number // 256) % 16 - if "SRF" == str: + if type == "SRF": version_string += f"{format_number(version_number // 1048576)}.{format_number((version_number % 65536) // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}" - elif "HRF" == str: + elif type == "HRF": version_string += f"{format_number(version_number // 65536)}.{format_number((version_number % 65536) // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}" else: version_string += f"{format_number(version_number // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}" return version_string + def generate_inverter_serial_number(serial_number: int) -> str: + """Generate inverter serial number.""" + return hex(serial_number)[2:] + def get_inverter_type(serial_bytes: bytes) -> InverterType: - + """Get inverter type.""" + inverter_type = None # Access individual bytes if serial_bytes[0] == 0x11: @@ -95,17 +195,20 @@ def get_inverter_type(serial_bytes: bytes) -> InverterType: if serial_bytes[1] in [0x12]: inverter_type = InverterType.TWO - if inverter_type == None: - raise ValueError(f"Unknown inverter type: {hex(serial_bytes[0])} {hex(serial_bytes[1])}") + if inverter_type is None: + raise ValueError( + f"Unknown inverter type: {hex(serial_bytes[0])} {hex(serial_bytes[1])}" + ) return inverter_type def get_inverter_series(serial_bytes: bytes) -> InverterSeries: + """Get inverter series.""" series = None if serial_bytes[0] == 0x11: - if (serial_bytes[1] & 0x0f) == 0x04: + if (serial_bytes[1] & 0x0F) == 0x04: series = InverterSeries.HMS else: series = InverterSeries.HM @@ -118,52 +221,34 @@ def get_inverter_series(serial_bytes: bytes) -> InverterSeries: series = InverterSeries.HMT elif serial_bytes[0] == 0x14: series = InverterSeries.HMS - + if series is None: - raise ValueError(f"Unknown series: {hex(serial_bytes[0])} {hex(serial_bytes[1])}!") - + raise ValueError( + f"Unknown series: {hex(serial_bytes[0])} {hex(serial_bytes[1])}!" + ) + return series + def get_inverter_power(serial_bytes: bytes) -> InverterPower: - - inverter_type_bytes = struct.unpack('>H', serial_bytes[:2])[0] + """Get inverter power.""" - power = None + inverter_type_bytes = struct.unpack(">H", serial_bytes[:2])[0] - if inverter_type_bytes in [0x1011]: - power = InverterPower.P_100 - elif inverter_type_bytes in [0x1020]: - power = InverterPower.P_250 - elif inverter_type_bytes in [0x1021, 0x1121]: - power = InverterPower.P_300_350_400 - elif inverter_type_bytes in [0x1125]: - power = InverterPower.P_400 - elif inverter_type_bytes in [0x1040]: - power = InverterPower.P_500 - elif inverter_type_bytes in [0x1041, 0x1042, 0x1141]: - power = InverterPower.P_600_700_800 - elif inverter_type_bytes in [0x1060]: - power = InverterPower.P_1000 - elif inverter_type_bytes in [0x1061]: - power = InverterPower.P_1200_1500 - elif inverter_type_bytes in [0x1161]: - power = InverterPower.P_1000_1200_1500 - elif inverter_type_bytes in [0x1164]: - power = InverterPower.P_1600 - elif inverter_type_bytes in [0x1412]: - power = InverterPower.P_800W + power = power_mapping.get(inverter_type_bytes) if power is None: - raise ValueError(f"Unknown power: {hex(serial_bytes[0])} {hex(serial_bytes[1])}!") - + raise ValueError( + f"Unknown power: {hex(serial_bytes[0])} {hex(serial_bytes[1])}!" + ) + return power - - def get_hw_model_name(serial_number: str) -> str: + """Get hardware model name.""" - if(serial_number == "22069994886948"): + if serial_number == "22069994886948": serial_number = generate_inverter_serial_number(int(serial_number)) serial_bytes = bytes.fromhex(serial_number) @@ -176,40 +261,32 @@ def get_hw_model_name(serial_number: str) -> str: logger.error(e) return "Unknown" else: - inverter_model_name = inverter_series.value + "-" + inverter_power.value + "-" + inverter_type.value + inverter_model_name = ( + inverter_series.value + + "-" + + inverter_power.value + + "-" + + inverter_type.value + ) return inverter_model_name + def get_dtu_model_type(serial_bytes: bytes) -> DTUType: + """Get DTU model type.""" - dtu_type = None + dtu_type_bytes = struct.unpack(">H", serial_bytes[:2])[0] - dtu_type_bytes = struct.unpack('>H', serial_bytes[:2])[0] - - if (dtu_type_bytes in [0x10F7] or - dtu_type_bytes in [0x10FB, 0x4101, 0x10FC, 0x4120] or - dtu_type_bytes in [0x10F8, 0x4100, 0x10FD, 0x4121]): - dtu_type = DTUType.DTU_PRO - elif dtu_type_bytes in [0x10D3, 0x4110, 0x10D8, 0x4130, 0x4132, 0x4133, 0x10D9, 0x4111]: - dtu_type = DTUType.DTU_W100_LITE_S - elif dtu_type_bytes in [0x10D2]: - dtu_type = DTUType.DTU_G100 - elif dtu_type_bytes in [0x10D6, 0x10D7, 0x4131]: - dtu_type = DTUType.DTU_LITE - elif (dtu_type_bytes in [0x1124, 0x1125, 0x1403] or - dtu_type_bytes in [0x1144, 0x1143, 0x1145, 0x1412] or - dtu_type_bytes in [0x1164, 0x1165, 0x1166, 0x1167, 0x1222, 0x1422, 0x1423] or - dtu_type_bytes in [0x1361, 0x1362] or - dtu_type_bytes in [0x1381, 0x1382] or - dtu_type_bytes in [0x4143]): - dtu_type = DTUType.DTU_HMS_W + dtu_type = type_mapping.get(dtu_type_bytes) if dtu_type is None: raise ValueError(f"Unknown DTU: {serial_bytes[:2]}!") return dtu_type + def get_dtu_model_name(serial_number: str) -> str: - + """Get DTU model name.""" + serial_bytes = bytes.fromhex(serial_number) try: @@ -219,4 +296,3 @@ def get_dtu_model_name(serial_number: str) -> str: return "Unknown" else: return dtu_type.value - diff --git a/hoymiles_wifi/protobuf/compile_proto.sh b/hoymiles_wifi/protobuf/compile_proto.sh index 8ca6463..73932c0 100755 --- a/hoymiles_wifi/protobuf/compile_proto.sh +++ b/hoymiles_wifi/protobuf/compile_proto.sh @@ -2,5 +2,6 @@ for file in $(ls *.proto) do + protoc --pyi_out=. $file protoc --python_out=. $file done diff --git a/hoymiles_wifi/utils.py b/hoymiles_wifi/utils.py index c98bfe0..52c3dd3 100644 --- a/hoymiles_wifi/utils.py +++ b/hoymiles_wifi/utils.py @@ -1,11 +1,14 @@ -""""Utils for interacting with Hoymiles WiFi API.""" +"""Utils for interacting with Hoymiles WiFi API.""" from hoymiles_wifi.protobuf import ( GetConfig_pb2, SetConfig_pb2, ) + def initialize_set_config(get_config_req: GetConfig_pb2.GetConfigReqDTO): + """Initialize set config response with get config request.""" + set_config_res = SetConfig_pb2.SetConfigResDTO() set_config_res.lock_password = get_config_req.lock_password set_config_res.lock_time = get_config_req.lock_time @@ -27,8 +30,8 @@ def initialize_set_config(get_config_req: GetConfig_pb2.GetConfigReqDTO): set_config_res.access_model = get_config_req.access_model set_config_res.mac_0 = get_config_req.mac_0 set_config_res.mac_1 = get_config_req.mac_1 - set_config_res.mac_2 = get_config_req.mac_2 - set_config_res.mac_3 = get_config_req.mac_3 + set_config_res.mac_2 = get_config_req.mac_2 + set_config_res.mac_3 = get_config_req.mac_3 set_config_res.mac_4 = get_config_req.mac_4 set_config_res.mac_5 = get_config_req.mac_5 set_config_res.dhcp_switch = get_config_req.dhcp_switch @@ -56,7 +59,3 @@ def initialize_set_config(get_config_req: GetConfig_pb2.GetConfigReqDTO): set_config_res.dtu_ap_pass = get_config_req.dtu_ap_pass return set_config_res - - - - diff --git a/setup.py b/setup.py index 1b331df..1c37a12 100644 --- a/setup.py +++ b/setup.py @@ -1,19 +1,18 @@ +"""Setup for the hoymiles-wifi package.""" + from setuptools import setup setup( - name='hoymiles-wifi', - packages=['hoymiles_wifi', 'hoymiles_wifi.protobuf'], - install_requires=[ - 'protobuf', - 'crcmod' - ], - version='0.1.7', - description='A python library for interfacing with Hoymiles HMS-XXXXW-2T series of micro-inverters.', - author='suaveolent', + name="hoymiles-wifi", + packages=["hoymiles_wifi", "hoymiles_wifi.protobuf"], + install_requires=["protobuf", "crcmod"], + version="0.1.8", + description="A python library for interfacing with the Hoymiles DTUs and the HMS-XXXXW-2T series of micro-inverters using protobuf messages.", + author="suaveolent", include_package_data=True, entry_points={ - 'console_scripts': [ - 'hoymiles-wifi = hoymiles_wifi.__main__:run_main', - ], -} + "console_scripts": [ + "hoymiles-wifi = hoymiles_wifi.__main__:run_main", + ], + }, )