refactoring, version bump

This commit is contained in:
suaveolent 2024-03-14 15:18:11 +01:00
parent a581179240
commit a97c9a8fd2
10 changed files with 662 additions and 379 deletions

View File

@ -27,7 +27,7 @@
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.formatting.provider": "black",
"python.formatting.provider": "ruff",
"python.linting.mypyEnabled": true,
"python.linting.enabled": true
}

View File

@ -1,7 +1,6 @@
# hoymiles-wifi
This Python library facilitates communication with Hoymiles HMS microinverters, specifically targeting the HMS-XXXXW-2T series.
This Python library facilitates communication with Hoymiles DTUs and the HMS-XXXXW-2T HMS microinverters, utilizing protobuf messages.
**Disclaimer: This library is not affiliated with Hoymiles. It is an independent project developed to provide tools for interacting with Hoymiles HMS-XXXXW-2T series micro-inverters featuring integrated WiFi DTU. Any trademarks or product names mentioned are the property of their respective owners.**
@ -9,9 +8,8 @@ This Python library facilitates communication with Hoymiles HMS microinverters,
The library was successfully tested with:
- Hoymiles HMS-800W-2T
- Hoymiles DTU WLite
- Hoymiles HMS-800W-2T
- Hoymiles DTU WLite
## Installation
@ -29,7 +27,7 @@ You can integrate the library into your own project, or simply use it in the com
hoymiles-wifi [-h] --host HOST [--as-json] <command>
commands:
get-real-data-new,
get-real-data-new,
get-real-data-hms,
get-real-data,
get-config,
@ -52,18 +50,19 @@ The `--as-json` option is optional and allows formatting the output as JSON.
### Python code
```
from hoymiles_wifi.inverter import Inverter
from hoymiles_wifi.dtu import DTU
...
inverter = Inverter(<ip_address>)
response = await inverter.<command>
dtu = DTU(<ip_address>)
response = await dtu.<command>
if response:
print(f"Inverter Response: {response}")
print(f"DTU Response: {response}")
else:
print("Unable to get response!")
```
#### Available functions
- `async_get_real_data_new()`: Retrieve real-time data
- `async_get_real_data_hms()`: Retrieve real-time data
- `async_get_real_data()`: Retrieve real-time data
@ -82,15 +81,12 @@ else:
Please be aware of the following considerations:
- No DTU Implementation: This library
retrieves information directly from the internal DTU of Hoymiles Wifi
inverters.
- No DTU Implementation: This library retrieves information directly from the (internal) DTU of Hoymiles Wifi inverters.
## Caution
Use this library responsibly and be aware of potential risks. There are no guarantees provided, and any misuse or incorrect implementation may result in undesirable outcomes. Ensure that your inverter is not compromised during communication.
## Known Limitations
**Update Frequency:** The library may experience limitations in fetching updates, potentially around twice per minute. The inverter firmware may enforce a mandatory wait period of approximately 30 seconds between requests.
@ -100,5 +96,6 @@ Use this library responsibly and be aware of potential risks. There are no guara
## Attribution
A special thank you for the inspiration and codebase to:
- [DennisOSRM](https://github.com/DennisOSRM): [hms-mqtt-publisher](https://github.com/DennisOSRM/hms-mqtt-publisher)
- [henkwiedig](https://github.com/henkwiedig): [Hoymiles-DTU-Proto](https://github.com/henkwiedig/Hoymiles-DTU-Proto)
- [DennisOSRM](https://github.com/DennisOSRM): [hms-mqtt-publisher](https://github.com/DennisOSRM/hms-mqtt-publisher)
- [henkwiedig](https://github.com/henkwiedig): [Hoymiles-DTU-Proto](https://github.com/henkwiedig/Hoymiles-DTU-Proto)

View File

@ -1,5 +1,7 @@
"""Init file for hoymiles_wifi package."""
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)

View File

@ -1,31 +1,51 @@
"""Contains the main functionality of the hoymiles_wifi package."""
from __future__ import annotations
import argparse
import asyncio
from dataclasses import dataclass, asdict
import json
from google.protobuf.message import Message
from dataclasses import asdict, dataclass
from google.protobuf.json_format import MessageToJson
from hoymiles_wifi.inverter import Inverter
from google.protobuf.message import Message
from hoymiles_wifi.const import DTU_FIRMWARE_URL_00_01_11, MAX_POWER_LIMIT
from hoymiles_wifi.dtu import DTU
from hoymiles_wifi.hoymiles import (
generate_version_string,
generate_sw_version_string,
generate_dtu_version_string,
get_hw_model_name,
get_dtu_model_name,
generate_sw_version_string,
generate_version_string,
)
from hoymiles_wifi.protobuf import (
AppGetHistPower_pb2,
APPHeartbeatPB_pb2,
APPInfomationData_pb2,
CommandPB_pb2,
GetConfig_pb2,
InfomationData_pb2,
NetworkInfo_pb2,
RealData_pb2,
RealDataHMS_pb2,
RealDataNew_pb2,
)
from hoymiles_wifi.const import (
DTU_FIRMWARE_URL_00_01_11
)
RED = "\033[91m"
END = "\033[0m"
@dataclass
class VersionInfo:
"""Represents version information for the hoymiles_wifi package."""
dtu_hw_version: str
dtu_sw_version: str
inverter_hw_version: str
inverter_sw_version: str
def __str__(self):
def __str__(self: VersionInfo) -> str:
"""Return a string representation of the VersionInfo object."""
return (
f'dtu_hw_version: "{self.dtu_hw_version}"\n'
f'dtu_sw_version: "{self.dtu_sw_version}"\n'
@ -33,91 +53,132 @@ class VersionInfo:
f'inverter_sw_version: "{self.inverter_sw_version}"\n'
)
def to_dict(self):
def to_dict(self: VersionInfo) -> dict:
"""Convert the VersionInfo object to a dictionary."""
return asdict(self)
# Inverter commands
async def async_get_real_data_new(inverter):
return await inverter.async_get_real_data_new()
async def async_get_real_data_new(
dtu: DTU,
) -> RealDataNew_pb2.RealDataNewResDTO | None:
"""Get real data from the inverter asynchronously."""
async def async_get_real_data_hms(inverter):
return await inverter.async_get_real_data_hms()
return await dtu.async_get_real_data_new()
async def async_get_real_data(inverter):
return await inverter.async_get_real_data()
async def async_get_config(inverter):
return await inverter.async_get_config()
async def async_get_real_data_hms(
dtu: DTU,
) -> RealDataHMS_pb2.RealDataHMSResDTO | None:
"""Get real data from the inverter asynchronously."""
async def async_network_info(inverter):
return await inverter.async_network_info()
return await dtu.async_get_real_data_hms()
async def async_app_information_data(inverter):
return await inverter.async_app_information_data()
async def async_app_get_hist_power(inverter):
return await inverter.async_app_get_hist_power()
async def async_get_real_data(dtu: DTU) -> RealData_pb2.RealDataResDTO | None:
"""Get real data from the inverter asynchronously."""
async def async_set_power_limit(inverter):
return await dtu.async_get_real_data()
RED = '\033[91m'
END = '\033[0m'
print(RED + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + END)
print(RED + "!!! Danger zone! This will change the power limit of the inverter. !!!" + END)
print(RED + "!!! Please be careful and make sure you know what you are doing. !!!" + END)
print(RED + "!!! Only proceed if you know what you are doing. !!!" + END)
print(RED + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + END)
print("")
async def async_get_config(dtu: DTU) -> GetConfig_pb2.GetConfigResDTO | None:
"""Get the config from the inverter asynchronously."""
return await dtu.async_get_config()
async def async_network_info(
dtu: DTU,
) -> NetworkInfo_pb2.NetworkInfoResDTO | None:
"""Get network information from the inverter asynchronously."""
return await dtu.async_network_info()
async def async_app_information_data(
dtu: DTU,
) -> APPInfomationData_pb2.AppInfomationDataResDTO | None:
"""Get application information data from the inverter asynchronously."""
return await dtu.async_app_information_data()
async def async_app_get_hist_power(
dtu: DTU,
) -> AppGetHistPower_pb2.AppGetHistPowerResDTO:
"""Get historical power data from the inverter asynchronously."""
return await dtu.async_app_get_hist_power()
async def async_set_power_limit(
dtu: DTU,
) -> CommandPB_pb2.CommandResDTO | None:
"""Set the power limit of the inverter asynchronously."""
print( # noqa: T201
RED
+ "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
+ "!!! Danger zone! This will change the power limit of the dtu. !!!\n"
+ "!!! Please be careful and make sure you know what you are doing. !!!\n"
+ "!!! Only proceed if you know what you are doing. !!!\n"
+ "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
+ END,
)
cont = input("Do you want to continue? (y/n): ")
if(cont != 'y'):
return
if cont != "y":
return None
power_limit = int(input("Enter the new power limit (0-100): "))
if(power_limit < 0 or power_limit > 100):
print("Error. Invalid power limit!")
return
print(f"Setting power limit to {power_limit}%")
cont = input("Are you sure? (y/n): ")
if power_limit < 0 or power_limit > MAX_POWER_LIMIT:
print("Error. Invalid power limit!") # noqa: T201
return None
if(cont != 'y'):
return
print(f"Setting power limit to {power_limit}%") # noqa: T201
cont = input("Are you sure? (y/n): ")
return await inverter.async_set_power_limit(power_limit)
if cont != "y":
return None
return await dtu.async_set_power_limit(power_limit)
async def async_set_wifi(inverter):
async def async_set_wifi(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Set the wifi SSID and password of the inverter asynchronously."""
wifi_ssid = input("Enter the new wifi SSID: ").strip()
wifi_password = input("Enter the new wifi password: ").strip()
print(f'Setting wifi to "{wifi_ssid}"')
print(f'Setting wifi password to "{wifi_password}"')
print(f'Setting wifi to "{wifi_ssid}"') # noqa: T201
print(f'Setting wifi password to "{wifi_password}"') # noqa: T201
cont = input("Are you sure? (y/n): ")
if(cont != 'y'):
return
return await inverter.async_set_wifi(wifi_ssid, wifi_password)
if cont != "y":
return None
return await dtu.async_set_wifi(wifi_ssid, wifi_password)
async def async_firmware_update(inverter):
RED = '\033[91m'
END = '\033[0m'
print(RED + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + END)
print(RED + "!!! Danger zone! This will update the firmeware of the DTU. !!!" + END)
print(RED + "!!! Please be careful and make sure you know what you are doing. !!!" + END)
print(RED + "!!! Only proceed if you know what you are doing. !!!" + END)
print(RED + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + END)
print("")
async def async_firmware_update(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Update the firmware of the DTU asynchronously."""
print( # noqa: T201
RED
+ "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
+ "!!! Danger zone! This will update the firmeware of the DTU. !!!\n"
+ "!!! Please be careful and make sure you know what you are doing. !!!\n"
+ "!!! Only proceed if you know what you are doing. !!!\n"
+ "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
+ END,
)
cont = input("Do you want to continue? (y/n): ")
if(cont != 'y'):
return
print("Please select a firmware version:")
print("1.) V00.01.11")
print("2.) Custom URL")
if cont != "y":
return None
print("Please select a firmware version:") # noqa: T201
print("1.) V00.01.11") # noqa: T201
print("2.) Custom URL") # noqa: T201
while True:
selection = input("Enter your selection (1 or 2): ")
@ -125,74 +186,106 @@ async def async_firmware_update(inverter):
if selection == "1":
url = DTU_FIRMWARE_URL_00_01_11
break
elif selection == "2":
if selection == "2":
url = input("Enter the custom URL: ").strip()
break
else:
print("Invalid selection. Please enter 1 or 2.")
print()
print(f'Firmware update URL: "{url}"')
print()
print("Invalid selection. Please enter 1 or 2.") # noqa: T201
print() # noqa: T201
print(f'Firmware update URL: "{url}"') # noqa: T201
print() # noqa: T201
cont = input("Do you want to continue? (y/n): ")
if(cont != 'y'):
return
return await inverter.async_update_dtu_firmware()
if cont != "y":
return None
async def async_restart(inverter):
return await dtu.async_update_dtu_firmware()
cont = input("Do you want to restart the device? (y/n): ")
if(cont != 'y'):
return
return await inverter.async_restart()
async def async_turn_off(inverter):
cont = input("Do you want to turn *OFF* the device? (y/n): ")
if(cont != 'y'):
return
return await inverter.async_turn_off()
async def async_restart_dtu(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Restart the device asynchronously."""
async def async_turn_on(inverter):
cont = input("Do you want to turn *ON* the device? (y/n): ")
if(cont != 'y'):
return
return await inverter.async_turn_on()
cont = input("Do you want to restart the DTU? (y/n): ")
if cont != "y":
return None
async def async_get_information_data(inverter):
return await inverter.async_get_information_data()
return await dtu.async_restart_dtu()
async def async_get_version_info(inverter):
response = await async_app_information_data(inverter)
async def async_turn_on_dtu(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Turn on the dtu asynchronously."""
cont = input("Do you want to turn *ON* the DTU? (y/n): ")
if cont != "y":
return None
return await dtu.async_turn_on_dtu()
async def async_turn_off_dtu(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Turn off the dtu asynchronously."""
cont = input("Do you want to turn *OFF* the DTU? (y/n): ")
if cont != "y":
return None
return await dtu.async_turn_off_dtu()
async def async_get_information_data(
dtu: DTU,
) -> InfomationData_pb2.InfomationDataResDTO:
"""Get information data from the dtu asynchronously."""
return await dtu.async_get_information_data()
async def async_get_version_info(dtu: DTU) -> VersionInfo | None:
"""Get version information from the dtu asynchronously."""
response = await async_app_information_data(dtu)
if not response:
return None
return VersionInfo(
dtu_hw_version="H" + generate_dtu_version_string(response.dtu_info.dtu_hw_version),
dtu_sw_version="V" + generate_dtu_version_string(response.dtu_info.dtu_sw_version),
inverter_hw_version="H" + generate_version_string(response.pv_info[0].pv_hw_version),
inverter_sw_version="V" + generate_sw_version_string(response.pv_info[0].pv_sw_version),
dtu_hw_version="H"
+ generate_dtu_version_string(response.dtu_info.dtu_hw_version),
dtu_sw_version="V"
+ generate_dtu_version_string(response.dtu_info.dtu_sw_version),
inverter_hw_version="H"
+ generate_version_string(response.pv_info[0].pv_hw_version),
inverter_sw_version="V"
+ generate_sw_version_string(response.pv_info[0].pv_sw_version),
)
async def async_heatbeat(inverter):
return await inverter.async_heartbeat()
def print_invalid_command(command):
print(f"Invalid command: {command}")
async def async_heatbeat(dtu: DTU) -> APPHeartbeatPB_pb2.APPHeartbeatResDTO | None:
"""Request a heartbeat from the dtu asynchronously."""
async def main():
return await dtu.async_heartbeat()
def print_invalid_command(command: str) -> None:
"""Print an invalid command message."""
print(f"Invalid command: {command}") # noqa: T201
async def main() -> None:
"""Execute the main function for the hoymiles_wifi package."""
parser = argparse.ArgumentParser(description="Hoymiles HMS Monitoring")
parser.add_argument(
"--host", type=str, required=True, help="IP address or hostname of the inverter"
)
parser.add_argument('--as-json', action='store_true', default=False,
help='Format the output as JSON')
"--host", type=str, required=True, help="IP address or hostname of the DTU"
)
parser.add_argument(
"--as-json",
action="store_true",
default=False,
help="Format the output as JSON",
)
parser.add_argument(
"command",
type=str,
@ -217,46 +310,52 @@ async def main():
help="Command to execute",
)
args = parser.parse_args()
inverter = Inverter(args.host)
dtu = DTU(args.host)
# Execute the specified command using a switch case
switch = {
'get-real-data-new': async_get_real_data_new,
'get-real-data-hms': async_get_real_data_hms,
'get-real-data': async_get_real_data,
'get-config': async_get_config,
'network-info': async_network_info,
'app-information-data': async_app_information_data,
'app-get-hist-power': async_app_get_hist_power,
'set-power-limit': async_set_power_limit,
'set-wifi': async_set_wifi,
'firmware-update': async_firmware_update,
'restart': async_restart,
'turn-on': async_turn_on,
'turn-off': async_turn_off,
'get-information-data': async_get_information_data,
'get-version-info': async_get_version_info,
'heartbeat': async_heatbeat,
"get-real-data-new": async_get_real_data_new,
"get-real-data-hms": async_get_real_data_hms,
"get-real-data": async_get_real_data,
"get-config": async_get_config,
"network-info": async_network_info,
"app-information-data": async_app_information_data,
"app-get-hist-power": async_app_get_hist_power,
"set-power-limit": async_set_power_limit,
"set-wifi": async_set_wifi,
"firmware-update": async_firmware_update,
"restart": async_restart_dtu,
"turn-on": async_turn_on_dtu,
"turn-off": async_turn_off_dtu,
"get-information-data": async_get_information_data,
"get-version-info": async_get_version_info,
"heartbeat": async_heatbeat,
}
command_func = switch.get(args.command, print_invalid_command)
response = await command_func(inverter)
response = await command_func(dtu)
if response:
if args.as_json:
if isinstance(response, Message):
print(MessageToJson(response))
if isinstance(response, Message):
print(MessageToJson(response)) # noqa: T201
else:
print(json.dumps(asdict(response), indent=4))
print(json.dumps(asdict(response), indent=4)) # noqa: T201
else:
print(f"{args.command.capitalize()} Response: \n{response}")
print(f"{args.command.capitalize()} Response: \n{response}") # noqa: T201
else:
print(f"No response or unable to retrieve response for {args.command.replace('_', ' ')}")
print( # noqa: T201
f"No response or unable to retrieve response for "
f"{args.command.replace('_', ' ')}",
)
def run_main():
def run_main() -> None:
"""Run the main function for the hoymiles_wifi package."""
asyncio.run(main())
if __name__ == "__main__":
run_main()
run_main()

View File

@ -1,35 +1,37 @@
INVERTER_PORT = 10081
"""Constants for the Hoymiles WiFi integration."""
DTU_PORT = 10081
# App -> DTU start with 0xa3, responses start 0xa2
CMD_HEADER = b'HM'
CMD_APP_INFO_DATA_RES_DTO = b'\xa3\x01'
CMD_HB_RES_DTO = b'\xa3\x02'
CMD_REAL_DATA_RES_DTO = b'\xa3\x03'
CMD_W_INFO_RES_DTO = b'\xa3\x04'
CMD_COMMAND_RES_DTO = b'\xa3\x05'
CMD_COMMAND_STATUS_RES_DTO = b'\xa3\x06'
CMD_DEV_CONFIG_FETCH_RES_DTO = b'\xa3\x07'
CMD_DEV_CONFIG_PUT_RES_DTO = b'\xa3\x08'
CMD_GET_CONFIG = b'\xa3\x09'
CMD_SET_CONFIG = b'\xa3\x10'
CMD_REAL_RES_DTO = b'\xa3\x11'
CMD_GPST_RES_DTO = b'\xa3\x12'
CMD_AUTO_SEARCH = b'\xa3\x13'
CMD_NETWORK_INFO_RES = b'\xa3\x14'
CMD_APP_GET_HIST_POWER_RES = b'\xa3\x15'
CMD_APP_GET_HIST_ED_RES = b'\xa3\x16'
CMD_HB_RES_DTO_ALT = b'\x83\x01'
CMD_REGISTER_RES_DTO = b'\x83\x02'
CMD_STORAGE_DATA_RES = b'\x83\x03'
CMD_COMMAND_RES_DTO_2 = b'\x83\x05'
CMD_COMMAND_STATUS_RES_DTO_2 = b'\x83\x06'
CMD_DEV_CONFIG_FETCH_RES_DTO_2 = b'\x83\x07'
CMD_DEV_CONFIG_PUT_RES_DTO_2 = b'\x83\x08'
CMD_GET_CONFIG_RES = b'\xdb\x08'
CMD_SET_CONFIG_RES = b'\xdb\x07'
CMD_HEADER = b"HM"
CMD_APP_INFO_DATA_RES_DTO = b"\xa3\x01"
CMD_HB_RES_DTO = b"\xa3\x02"
CMD_REAL_DATA_RES_DTO = b"\xa3\x03"
CMD_W_INFO_RES_DTO = b"\xa3\x04"
CMD_COMMAND_RES_DTO = b"\xa3\x05"
CMD_COMMAND_STATUS_RES_DTO = b"\xa3\x06"
CMD_DEV_CONFIG_FETCH_RES_DTO = b"\xa3\x07"
CMD_DEV_CONFIG_PUT_RES_DTO = b"\xa3\x08"
CMD_GET_CONFIG = b"\xa3\x09"
CMD_SET_CONFIG = b"\xa3\x10"
CMD_REAL_RES_DTO = b"\xa3\x11"
CMD_GPST_RES_DTO = b"\xa3\x12"
CMD_AUTO_SEARCH = b"\xa3\x13"
CMD_NETWORK_INFO_RES = b"\xa3\x14"
CMD_APP_GET_HIST_POWER_RES = b"\xa3\x15"
CMD_APP_GET_HIST_ED_RES = b"\xa3\x16"
CMD_HB_RES_DTO_ALT = b"\x83\x01"
CMD_REGISTER_RES_DTO = b"\x83\x02"
CMD_STORAGE_DATA_RES = b"\x83\x03"
CMD_COMMAND_RES_DTO_2 = b"\x83\x05"
CMD_COMMAND_STATUS_RES_DTO_2 = b"\x83\x06"
CMD_DEV_CONFIG_FETCH_RES_DTO_2 = b"\x83\x07"
CMD_DEV_CONFIG_PUT_RES_DTO_2 = b"\x83\x08"
CMD_GET_CONFIG_RES = b"\xdb\x08"
CMD_SET_CONFIG_RES = b"\xdb\x07"
CMD_CLOUD_INFO_DATA_RES_DTO = b'\x23\x01'
CMD_CLOUD_COMMAND_RES_DTO = b'\x23\x05'
CMD_CLOUD_INFO_DATA_RES_DTO = b"\x23\x01"
CMD_CLOUD_COMMAND_RES_DTO = b"\x23\x05"
CMD_ACTION_MICRO_DEFAULT = 0
CMD_ACTION_DTU_REBOOT = 1
@ -96,15 +98,19 @@ CMD_ACTION_INV_UPGRADE = 4112
CMD_ACTION_BMS_UPGRADE = 4112
DEV_DTU = 1
DEV_DTU = 1
DEV_REPEATER = 2
DEV_MICRO = 3
DEV_MODEL = 4
DEV_METER = 5
DEV_MODEL = 4
DEV_METER = 5
DEV_INV = 6
DEV_RSD = 7
DEV_OP = 8
DEV_GATEWAY = 9
DEV_BMS = 10
DTU_FIRMWARE_URL_00_01_11 = "http://fwupdate.hoymiles.com/cfs/bin/2311/06/,1488725943932555264.bin"
DTU_FIRMWARE_URL_00_01_11 = (
"http://fwupdate.hoymiles.com/cfs/bin/2311/06/,1488725943932555264.bin"
)
MAX_POWER_LIMIT = 100

View File

@ -1,128 +1,185 @@
"""DTU communication implementation for Hoymiles WiFi."""
from __future__ import annotations
import asyncio
import struct
from typing import Any
from crcmod import mkCrcFun
from datetime import datetime
import time
from datetime import datetime
from enum import Enum
from typing import Any
from crcmod import mkCrcFun
from hoymiles_wifi import logger
from hoymiles_wifi.utils import initialize_set_config
from hoymiles_wifi.const import (
CMD_ACTION_DTU_REBOOT,
CMD_ACTION_DTU_UPGRADE,
CMD_ACTION_LIMIT_POWER,
CMD_ACTION_MI_SHUTDOWN,
CMD_ACTION_MI_START,
CMD_APP_GET_HIST_POWER_RES,
CMD_APP_INFO_DATA_RES_DTO,
CMD_CLOUD_COMMAND_RES_DTO,
CMD_COMMAND_RES_DTO,
CMD_GET_CONFIG,
CMD_HB_RES_DTO,
CMD_HEADER,
CMD_NETWORK_INFO_RES,
CMD_REAL_DATA_RES_DTO,
CMD_REAL_RES_DTO,
CMD_SET_CONFIG,
DEV_DTU,
DTU_FIRMWARE_URL_00_01_11,
DTU_PORT,
)
from hoymiles_wifi.protobuf import (
AppGetHistPower_pb2,
APPHeartbeatPB_pb2,
APPInfomationData_pb2,
AppGetHistPower_pb2,
CommandPB_pb2,
InfomationData_pb2,
GetConfig_pb2,
RealData_pb2,
RealDataNew_pb2,
RealDataHMS_pb2,
InfomationData_pb2,
NetworkInfo_pb2,
RealData_pb2,
RealDataHMS_pb2,
RealDataNew_pb2,
SetConfig_pb2,
)
from hoymiles_wifi.const import (
INVERTER_PORT,
CMD_HEADER,
CMD_GET_CONFIG,
CMD_REAL_RES_DTO,
CMD_REAL_DATA_RES_DTO,
CMD_NETWORK_INFO_RES,
CMD_APP_INFO_DATA_RES_DTO,
CMD_APP_GET_HIST_POWER_RES,
CMD_ACTION_LIMIT_POWER,
CMD_COMMAND_RES_DTO,
CMD_ACTION_DTU_UPGRADE,
CMD_SET_CONFIG,
CMD_CLOUD_COMMAND_RES_DTO,
CMD_ACTION_DTU_REBOOT,
CMD_ACTION_MI_START,
CMD_ACTION_MI_SHUTDOWN,
DTU_FIRMWARE_URL_00_01_11,
CMD_HB_RES_DTO,
DEV_DTU,
)
from hoymiles_wifi.utils import initialize_set_config
class NetmodeSelect:
class NetmodeSelect(Enum):
"""Network mode selection."""
WIFI = 1
SIM = 2
LAN = 3
class NetworkState:
class NetworkState(Enum):
"""Network state."""
Unknown = 0
Online = 1
Offline = 2
class Inverter:
class DTU:
"""DTU class."""
def __init__(self, host: str):
"""Initialize DTU class."""
self.host = host
self.state = NetworkState.Unknown
self.sequence = 0
self.mutex = asyncio.Lock()
def get_state(self) -> NetworkState:
"""Get DTU state."""
return self.state
def set_state(self, new_state: NetworkState):
"""Set DTU state."""
if self.state != new_state:
self.state = new_state
logger.debug(f"Inverter is {new_state}")
logger.debug(f"DTU is {new_state}")
async def async_get_real_data_hms(self) -> RealDataHMS_pb2.HMSStateResponse | None:
"""Get real data HMS."""
request = RealDataHMS_pb2.HMSRealDataResDTO()
command = CMD_REAL_DATA_RES_DTO
return await self.async_send_request(command, request, RealDataHMS_pb2.HMSStateResponse)
return await self.async_send_request(
command, request, RealDataHMS_pb2.HMSStateResponse
)
async def async_get_real_data(self) -> RealData_pb2.RealDataResDTO | None:
"""Get real data."""
request = RealData_pb2.RealDataResDTO()
command = CMD_REAL_DATA_RES_DTO
return await self.async_send_request(command, request, RealData_pb2.RealDataReqDTO)
return await self.async_send_request(
command, request, RealData_pb2.RealDataReqDTO
)
async def async_get_real_data_new(self) -> RealDataNew_pb2.RealDataNewResDTO | None:
"""Get real data new."""
request = RealDataNew_pb2.RealDataNewResDTO()
request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
request.time_ymd_hms = (
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
)
request.offset = 28800
request.time = int(time.time())
command = CMD_REAL_RES_DTO
return await self.async_send_request(command, request, RealDataNew_pb2.RealDataNewReqDTO)
return await self.async_send_request(
command, request, RealDataNew_pb2.RealDataNewReqDTO
)
async def async_get_config(self) -> GetConfig_pb2.GetConfigResDTO | None:
"""Get config."""
request = GetConfig_pb2.GetConfigResDTO()
request.offset = 28800
request.time = int(time.time()) - 60
command = CMD_GET_CONFIG
return await self.async_send_request(command, request, GetConfig_pb2.GetConfigReqDTO)
return await self.async_send_request(
command,
request,
GetConfig_pb2.GetConfigReqDTO,
)
async def async_network_info(self) -> NetworkInfo_pb2.NetworkInfoResDTO | None:
"""Get network info."""
request = NetworkInfo_pb2.NetworkInfoResDTO()
request.offset = 28800
request.time = int(time.time())
command = CMD_NETWORK_INFO_RES
return await self.async_send_request(command, request, NetworkInfo_pb2.NetworkInfoReqDTO)
async def async_app_information_data(self) -> APPInfomationData_pb2.APPInfoDataResDTO:
return await self.async_send_request(
command, request, NetworkInfo_pb2.NetworkInfoReqDTO
)
async def async_app_information_data(
self,
) -> APPInfomationData_pb2.APPInfoDataResDTO:
"""Get app information data."""
request = APPInfomationData_pb2.APPInfoDataResDTO()
request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
request.time_ymd_hms = (
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
)
request.offset = 28800
request.time = int(time.time())
command = CMD_APP_INFO_DATA_RES_DTO
return await self.async_send_request(command, request, APPInfomationData_pb2.APPInfoDataReqDTO)
async def async_app_get_hist_power(self) -> AppGetHistPower_pb2.AppGetHistPowerResDTO | None:
return await self.async_send_request(
command, request, APPInfomationData_pb2.APPInfoDataReqDTO
)
async def async_app_get_hist_power(
self,
) -> AppGetHistPower_pb2.AppGetHistPowerResDTO | None:
"""Get historical power."""
request = AppGetHistPower_pb2.AppGetHistPowerResDTO()
request.offset = 28800
request.requested_time = int(time.time())
command = CMD_APP_GET_HIST_POWER_RES
return await self.async_send_request(command, request, AppGetHistPower_pb2.AppGetHistPowerReqDTO)
async def async_set_power_limit(self, power_limit: int) -> CommandPB_pb2.CommandResDTO | None:
return await self.async_send_request(
command,
request,
AppGetHistPower_pb2.AppGetHistPowerReqDTO,
)
if(power_limit < 0 or power_limit > 100):
async def async_set_power_limit(
self,
power_limit: int,
) -> CommandPB_pb2.CommandResDTO | None:
"""Set power limit."""
if power_limit < 0 or power_limit > 100:
logger.error("Error. Invalid power limit!")
return
@ -133,46 +190,58 @@ class Inverter:
request.action = CMD_ACTION_LIMIT_POWER
request.package_nub = 1
request.tid = int(time.time())
request.data = f'A:{power_limit},B:0,C:0\r'.encode('utf-8')
request.data = f"A:{power_limit},B:0,C:0\r".encode()
command = CMD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
async def async_set_wifi(self, ssid: str, password: str) -> SetConfig_pb2.SetConfigResDTO | None:
return await self.async_send_request(
command, request, CommandPB_pb2.CommandReqDTO
)
async def async_set_wifi(
self, ssid: str, password: str
) -> SetConfig_pb2.SetConfigResDTO | None:
"""Set wifi."""
get_config_req = await self.async_get_config()
if(get_config_req is None):
if get_config_req is None:
logger.error("Failed to get config")
return
return None
request = initialize_set_config(get_config_req)
request.time = int(time.time())
request.time = int(time.time())
request.offset = 28800
request.app_page = 1
request.netmode_select = NetmodeSelect.WIFI
request.wifi_ssid = ssid.encode('utf-8')
request.wifi_password = password.encode('utf-8')
request.wifi_ssid = ssid.encode("utf-8")
request.wifi_password = password.encode("utf-8")
command = CMD_SET_CONFIG
return await self.async_send_request(command, request, SetConfig_pb2.SetConfigReqDTO)
return await self.async_send_request(
command, request, SetConfig_pb2.SetConfigReqDTO
)
async def async_update_dtu_firmware(self, firmware_url: str = DTU_FIRMWARE_URL_00_01_11) -> CommandPB_pb2.CommandResDTO | None:
async def async_update_dtu_firmware(
self,
firmware_url: str = DTU_FIRMWARE_URL_00_01_11,
) -> CommandPB_pb2.CommandResDTO | None:
"""Update DTU firmware."""
request = CommandPB_pb2.CommandResDTO()
request.action = CMD_ACTION_DTU_UPGRADE
request.package_nub = 1
request.tid = int(time.time())
request.data = (firmware_url + '\r').encode('utf-8')
request.data = (firmware_url + "\r").encode("utf-8")
command = CMD_CLOUD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
return await self.async_send_request(
command, request, CommandPB_pb2.CommandReqDTO
)
async def async_restart(self) -> CommandPB_pb2.CommandResDTO | None:
async def async_restart_dtu(self) -> CommandPB_pb2.CommandResDTO | None:
"""Restart DTU."""
request = CommandPB_pb2.CommandResDTO()
request.action = CMD_ACTION_DTU_REBOOT
@ -180,10 +249,12 @@ class Inverter:
request.tid = int(time.time())
command = CMD_CLOUD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
return await self.async_send_request(
command, request, CommandPB_pb2.CommandReqDTO
)
async def async_turn_on(self) -> CommandPB_pb2.CommandResDTO | None:
async def async_turn_on_dtu(self) -> CommandPB_pb2.CommandResDTO | None:
"""Turn on DTU."""
request = CommandPB_pb2.CommandResDTO()
request.action = CMD_ACTION_MI_START
@ -192,9 +263,12 @@ class Inverter:
request.tid = int(time.time())
command = CMD_CLOUD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
async def async_turn_off(self) -> CommandPB_pb2.CommandResDTO | None:
return await self.async_send_request(
command, request, CommandPB_pb2.CommandReqDTO
)
async def async_turn_off_dtu(self) -> CommandPB_pb2.CommandResDTO | None:
"""Turn off DTU."""
request = CommandPB_pb2.CommandResDTO()
request.action = CMD_ACTION_MI_SHUTDOWN
@ -203,41 +277,67 @@ class Inverter:
request.tid = int(time.time())
command = CMD_CLOUD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
async def async_get_information_data(self) -> InfomationData_pb2.InfoDataResDTO | None:
return await self.async_send_request(
command, request, CommandPB_pb2.CommandReqDTO
)
async def async_get_information_data(
self,
) -> InfomationData_pb2.InfoDataResDTO | None:
"""Get information data."""
request = InfomationData_pb2.InfoDataResDTO()
request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
request.time_ymd_hms = (
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
)
request.offset = 28800
request.time = int(time.time())
command = CMD_APP_INFO_DATA_RES_DTO
return await self.async_send_request(command, request, InfomationData_pb2.InfoDataReqDTO)
return await self.async_send_request(
command, request, InfomationData_pb2.InfoDataReqDTO
)
async def async_heartbeat(self) -> APPHeartbeatPB_pb2.HBReqDTO | None:
"""Request heartbeat."""
request = APPHeartbeatPB_pb2.HBResDTO()
request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
request.time_ymd_hms = (
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
)
request.offset = 28800
request.time = int(time.time())
command = CMD_HB_RES_DTO
return await self.async_send_request(command, request, APPHeartbeatPB_pb2.HBReqDTO)
return await self.async_send_request(
command, request, APPHeartbeatPB_pb2.HBReqDTO
)
async def async_send_request(
self,
command: bytes,
request: Any,
response_type: Any,
dtu_port: int = DTU_PORT,
):
"""Send request to DTU."""
async def async_send_request(self, command: bytes, request: Any, response_type: Any, inverter_port: int = INVERTER_PORT):
self.sequence = (self.sequence + 1) & 0xFFFF
request_as_bytes = request.SerializeToString()
crc16 = mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)(request_as_bytes)
crc16 = mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)(
request_as_bytes
)
length = len(request_as_bytes) + 10
# compose request message
header = CMD_HEADER + command
message = header + struct.pack('>HHH', self.sequence, crc16, length) + request_as_bytes
message = (
header
+ struct.pack(">HHH", self.sequence, crc16, length)
+ request_as_bytes
)
address = (self.host, inverter_port)
address = (self.host, dtu_port)
async with self.mutex:
try:
@ -261,18 +361,22 @@ class Inverter:
try:
if len(buf) < 10:
raise ValueError("Buffer is too short for unpacking")
crc16_target, read_length = struct.unpack('>HH', buf[6:10])
if(len(buf) != read_length):
crc16_target, read_length = struct.unpack(">HH", buf[6:10])
if len(buf) != read_length:
raise ValueError("Buffer is incomplete")
response_as_bytes = buf[10:read_length]
crc16_response = mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)(response_as_bytes)
crc16_response = mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)(
response_as_bytes
)
if crc16_response != crc16_target:
logger.debug(f"CRC16 mismatch: {hex(crc16_response)} != {hex(crc16_target)}")
logger.debug(
f"CRC16 mismatch: {hex(crc16_response)} != {hex(crc16_target)}"
)
raise ValueError("CRC16 mismatch")
parsed = response_type.FromString(response_as_bytes)

View File

@ -1,22 +1,31 @@
""""Hoymiles quirks for inverters and DTU"""
"""Hoymiles quirks for inverters and DTU."""
from enum import Enum
import struct
from enum import Enum
from hoymiles_wifi import logger
class InverterType(Enum):
"""Inverter type."""
ONE = "1T"
TWO = "2T"
FOUR = "4T"
SIX = "6T"
class InverterSeries(Enum):
"""Inverter series."""
HM = "HM"
HMS = "HMS"
HMT = "HMT"
class InverterPower(Enum):
"""Inverter power."""
P_100 = "100"
P_250 = "250"
P_300_350_400 = "300/350/400"
@ -31,7 +40,27 @@ class InverterPower(Enum):
P_2000 = "2000"
power_mapping = {
0x1011: InverterPower.P_100,
0x1020: InverterPower.P_250,
0x1021: InverterPower.P_300_350_400,
0x1121: InverterPower.P_300_350_400,
0x1125: InverterPower.P_400,
0x1040: InverterPower.P_500,
0x1041: InverterPower.P_600_700_800,
0x1042: InverterPower.P_600_700_800,
0x1141: InverterPower.P_600_700_800,
0x1060: InverterPower.P_1000,
0x1061: InverterPower.P_1200_1500,
0x1161: InverterPower.P_1000_1200_1500,
0x1164: InverterPower.P_1600,
0x1412: InverterPower.P_800W,
}
class DTUType(Enum):
"""DTU type."""
DTU_G100 = "DTU-G100"
DTU_W100 = "DTU-W100"
DTU_LITE_S = "DTU-Lite-S"
@ -43,43 +72,114 @@ class DTUType(Enum):
DTU_W_LITE = "DTU-WLite"
def format_number(number) -> str:
return "{:02d}".format(number)
type_mapping = {
0x10F7: DTUType.DTU_PRO,
0x10FB: DTUType.DTU_PRO,
0x4101: DTUType.DTU_PRO,
0x10FC: DTUType.DTU_PRO,
0x4120: DTUType.DTU_PRO,
0x10F8: DTUType.DTU_PRO,
0x4100: DTUType.DTU_PRO,
0x10FD: DTUType.DTU_PRO,
0x4121: DTUType.DTU_PRO,
0x10D3: DTUType.DTU_W100_LITE_S,
0x4110: DTUType.DTU_W100_LITE_S,
0x10D8: DTUType.DTU_W100_LITE_S,
0x4130: DTUType.DTU_W100_LITE_S,
0x4132: DTUType.DTU_W100_LITE_S,
0x4133: DTUType.DTU_W100_LITE_S,
0x10D9: DTUType.DTU_W100_LITE_S,
0x4111: DTUType.DTU_W100_LITE_S,
0x10D2: DTUType.DTU_G100,
0x10D6: DTUType.DTU_LITE,
0x10D7: DTUType.DTU_LITE,
0x4131: DTUType.DTU_LITE,
0x1124: DTUType.DTU_HMS_W,
0x1125: DTUType.DTU_HMS_W,
0x1403: DTUType.DTU_HMS_W,
0x1144: DTUType.DTU_HMS_W,
0x1143: DTUType.DTU_HMS_W,
0x1145: DTUType.DTU_HMS_W,
0x1412: DTUType.DTU_HMS_W,
0x1164: DTUType.DTU_HMS_W,
0x1165: DTUType.DTU_HMS_W,
0x1166: DTUType.DTU_HMS_W,
0x1167: DTUType.DTU_HMS_W,
0x1222: DTUType.DTU_HMS_W,
0x1422: DTUType.DTU_HMS_W,
0x1423: DTUType.DTU_HMS_W,
0x1361: DTUType.DTU_HMS_W,
0x1362: DTUType.DTU_HMS_W,
0x1381: DTUType.DTU_HMS_W,
0x1382: DTUType.DTU_HMS_W,
0x4143: DTUType.DTU_HMS_W,
}
def format_number(number: int) -> str:
"""Format number to two digits."""
return f"{number:02d}"
def generate_version_string(version_number: int) -> str:
version_string = format_number(version_number // 2048) + "." + format_number((version_number // 64) % 32) + "." + format_number(version_number % 64)
"""Generate version string."""
version_string = (
format_number(version_number // 2048)
+ "."
+ format_number((version_number // 64) % 32)
+ "."
+ format_number(version_number % 64)
)
return version_string
def generate_sw_version_string(version_number: int) -> str:
"""Generate software version string."""
version_number2 = version_number // 10000
version_number3 = (version_number - (version_number2 * 10000)) // 100
version_number4 = (version_number - (version_number2 * 10000)) - (version_number3 * 100)
version_number4 = (version_number - (version_number2 * 10000)) - (
version_number3 * 100
)
version_string = format_number(version_number2) + "." + format_number(version_number3) + "." + format_number(version_number4)
version_string = (
format_number(version_number2)
+ "."
+ format_number(version_number3)
+ "."
+ format_number(version_number4)
)
return version_string
def generate_dtu_version_string(version_number: int, type: str = None) -> str:
def generate_dtu_version_string(version_number: int, type: str = "") -> str:
"""Generate DTU version string."""
version_string = ""
version_number2 = version_number % 256
version_number3 = (version_number // 256) % 16
if "SRF" == str:
if type == "SRF":
version_string += f"{format_number(version_number // 1048576)}.{format_number((version_number % 65536) // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
elif "HRF" == str:
elif type == "HRF":
version_string += f"{format_number(version_number // 65536)}.{format_number((version_number % 65536) // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
else:
version_string += f"{format_number(version_number // 4096)}.{format_number(version_number3)}.{format_number(version_number2)}"
return version_string
def generate_inverter_serial_number(serial_number: int) -> str:
"""Generate inverter serial number."""
return hex(serial_number)[2:]
def get_inverter_type(serial_bytes: bytes) -> InverterType:
"""Get inverter type."""
inverter_type = None
# Access individual bytes
if serial_bytes[0] == 0x11:
@ -95,17 +195,20 @@ def get_inverter_type(serial_bytes: bytes) -> InverterType:
if serial_bytes[1] in [0x12]:
inverter_type = InverterType.TWO
if inverter_type == None:
raise ValueError(f"Unknown inverter type: {hex(serial_bytes[0])} {hex(serial_bytes[1])}")
if inverter_type is None:
raise ValueError(
f"Unknown inverter type: {hex(serial_bytes[0])} {hex(serial_bytes[1])}"
)
return inverter_type
def get_inverter_series(serial_bytes: bytes) -> InverterSeries:
"""Get inverter series."""
series = None
if serial_bytes[0] == 0x11:
if (serial_bytes[1] & 0x0f) == 0x04:
if (serial_bytes[1] & 0x0F) == 0x04:
series = InverterSeries.HMS
else:
series = InverterSeries.HM
@ -118,52 +221,34 @@ def get_inverter_series(serial_bytes: bytes) -> InverterSeries:
series = InverterSeries.HMT
elif serial_bytes[0] == 0x14:
series = InverterSeries.HMS
if series is None:
raise ValueError(f"Unknown series: {hex(serial_bytes[0])} {hex(serial_bytes[1])}!")
raise ValueError(
f"Unknown series: {hex(serial_bytes[0])} {hex(serial_bytes[1])}!"
)
return series
def get_inverter_power(serial_bytes: bytes) -> InverterPower:
inverter_type_bytes = struct.unpack('>H', serial_bytes[:2])[0]
"""Get inverter power."""
power = None
inverter_type_bytes = struct.unpack(">H", serial_bytes[:2])[0]
if inverter_type_bytes in [0x1011]:
power = InverterPower.P_100
elif inverter_type_bytes in [0x1020]:
power = InverterPower.P_250
elif inverter_type_bytes in [0x1021, 0x1121]:
power = InverterPower.P_300_350_400
elif inverter_type_bytes in [0x1125]:
power = InverterPower.P_400
elif inverter_type_bytes in [0x1040]:
power = InverterPower.P_500
elif inverter_type_bytes in [0x1041, 0x1042, 0x1141]:
power = InverterPower.P_600_700_800
elif inverter_type_bytes in [0x1060]:
power = InverterPower.P_1000
elif inverter_type_bytes in [0x1061]:
power = InverterPower.P_1200_1500
elif inverter_type_bytes in [0x1161]:
power = InverterPower.P_1000_1200_1500
elif inverter_type_bytes in [0x1164]:
power = InverterPower.P_1600
elif inverter_type_bytes in [0x1412]:
power = InverterPower.P_800W
power = power_mapping.get(inverter_type_bytes)
if power is None:
raise ValueError(f"Unknown power: {hex(serial_bytes[0])} {hex(serial_bytes[1])}!")
raise ValueError(
f"Unknown power: {hex(serial_bytes[0])} {hex(serial_bytes[1])}!"
)
return power
def get_hw_model_name(serial_number: str) -> str:
"""Get hardware model name."""
if(serial_number == "22069994886948"):
if serial_number == "22069994886948":
serial_number = generate_inverter_serial_number(int(serial_number))
serial_bytes = bytes.fromhex(serial_number)
@ -176,40 +261,32 @@ def get_hw_model_name(serial_number: str) -> str:
logger.error(e)
return "Unknown"
else:
inverter_model_name = inverter_series.value + "-" + inverter_power.value + "-" + inverter_type.value
inverter_model_name = (
inverter_series.value
+ "-"
+ inverter_power.value
+ "-"
+ inverter_type.value
)
return inverter_model_name
def get_dtu_model_type(serial_bytes: bytes) -> DTUType:
"""Get DTU model type."""
dtu_type = None
dtu_type_bytes = struct.unpack(">H", serial_bytes[:2])[0]
dtu_type_bytes = struct.unpack('>H', serial_bytes[:2])[0]
if (dtu_type_bytes in [0x10F7] or
dtu_type_bytes in [0x10FB, 0x4101, 0x10FC, 0x4120] or
dtu_type_bytes in [0x10F8, 0x4100, 0x10FD, 0x4121]):
dtu_type = DTUType.DTU_PRO
elif dtu_type_bytes in [0x10D3, 0x4110, 0x10D8, 0x4130, 0x4132, 0x4133, 0x10D9, 0x4111]:
dtu_type = DTUType.DTU_W100_LITE_S
elif dtu_type_bytes in [0x10D2]:
dtu_type = DTUType.DTU_G100
elif dtu_type_bytes in [0x10D6, 0x10D7, 0x4131]:
dtu_type = DTUType.DTU_LITE
elif (dtu_type_bytes in [0x1124, 0x1125, 0x1403] or
dtu_type_bytes in [0x1144, 0x1143, 0x1145, 0x1412] or
dtu_type_bytes in [0x1164, 0x1165, 0x1166, 0x1167, 0x1222, 0x1422, 0x1423] or
dtu_type_bytes in [0x1361, 0x1362] or
dtu_type_bytes in [0x1381, 0x1382] or
dtu_type_bytes in [0x4143]):
dtu_type = DTUType.DTU_HMS_W
dtu_type = type_mapping.get(dtu_type_bytes)
if dtu_type is None:
raise ValueError(f"Unknown DTU: {serial_bytes[:2]}!")
return dtu_type
def get_dtu_model_name(serial_number: str) -> str:
"""Get DTU model name."""
serial_bytes = bytes.fromhex(serial_number)
try:
@ -219,4 +296,3 @@ def get_dtu_model_name(serial_number: str) -> str:
return "Unknown"
else:
return dtu_type.value

View File

@ -2,5 +2,6 @@
for file in $(ls *.proto)
do
protoc --pyi_out=. $file
protoc --python_out=. $file
done

View File

@ -1,11 +1,14 @@
""""Utils for interacting with Hoymiles WiFi API."""
"""Utils for interacting with Hoymiles WiFi API."""
from hoymiles_wifi.protobuf import (
GetConfig_pb2,
SetConfig_pb2,
)
def initialize_set_config(get_config_req: GetConfig_pb2.GetConfigReqDTO):
"""Initialize set config response with get config request."""
set_config_res = SetConfig_pb2.SetConfigResDTO()
set_config_res.lock_password = get_config_req.lock_password
set_config_res.lock_time = get_config_req.lock_time
@ -27,8 +30,8 @@ def initialize_set_config(get_config_req: GetConfig_pb2.GetConfigReqDTO):
set_config_res.access_model = get_config_req.access_model
set_config_res.mac_0 = get_config_req.mac_0
set_config_res.mac_1 = get_config_req.mac_1
set_config_res.mac_2 = get_config_req.mac_2
set_config_res.mac_3 = get_config_req.mac_3
set_config_res.mac_2 = get_config_req.mac_2
set_config_res.mac_3 = get_config_req.mac_3
set_config_res.mac_4 = get_config_req.mac_4
set_config_res.mac_5 = get_config_req.mac_5
set_config_res.dhcp_switch = get_config_req.dhcp_switch
@ -56,7 +59,3 @@ def initialize_set_config(get_config_req: GetConfig_pb2.GetConfigReqDTO):
set_config_res.dtu_ap_pass = get_config_req.dtu_ap_pass
return set_config_res

View File

@ -1,19 +1,18 @@
"""Setup for the hoymiles-wifi package."""
from setuptools import setup
setup(
name='hoymiles-wifi',
packages=['hoymiles_wifi', 'hoymiles_wifi.protobuf'],
install_requires=[
'protobuf',
'crcmod'
],
version='0.1.7',
description='A python library for interfacing with Hoymiles HMS-XXXXW-2T series of micro-inverters.',
author='suaveolent',
name="hoymiles-wifi",
packages=["hoymiles_wifi", "hoymiles_wifi.protobuf"],
install_requires=["protobuf", "crcmod"],
version="0.1.8",
description="A python library for interfacing with the Hoymiles DTUs and the HMS-XXXXW-2T series of micro-inverters using protobuf messages.",
author="suaveolent",
include_package_data=True,
entry_points={
'console_scripts': [
'hoymiles-wifi = hoymiles_wifi.__main__:run_main',
],
}
"console_scripts": [
"hoymiles-wifi = hoymiles_wifi.__main__:run_main",
],
},
)