
409 lines
12 KiB

"""Contains the main functionality of the hoymiles_wifi package."""
from __future__ import annotations
import argparse
import asyncio
import json
import sys
from dataclasses import asdict, dataclass
from google.protobuf.json_format import MessageToJson
from google.protobuf.message import Message
from hoymiles_wifi.const import DTU_FIRMWARE_URL_00_01_11, MAX_POWER_LIMIT
from hoymiles_wifi.dtu import DTU
from hoymiles_wifi.hoymiles import (
from hoymiles_wifi.protobuf import (
RED = "\033[91m"
END = "\033[0m"
class VersionInfo:
"""Represents version information for the hoymiles_wifi package."""
dtu_hw_version: str
dtu_sw_version: str
inverter_hw_version: str
inverter_sw_version: str
def __str__(self: VersionInfo) -> str:
"""Return a string representation of the VersionInfo object."""
return (
f'dtu_hw_version: "{self.dtu_hw_version}"\n'
f'dtu_sw_version: "{self.dtu_sw_version}"\n'
f'inverter_hw_version: "{self.inverter_hw_version}"\n'
f'inverter_sw_version: "{self.inverter_sw_version}"\n'
def to_dict(self: VersionInfo) -> dict:
"""Convert the VersionInfo object to a dictionary."""
return asdict(self)
# Inverter commands
async def async_get_real_data_new(
dtu: DTU,
) -> RealDataNew_pb2.RealDataNewResDTO | None:
"""Get real data from the inverter asynchronously."""
return await dtu.async_get_real_data_new()
async def async_get_real_data(dtu: DTU) -> RealData_pb2.RealDataResDTO | None:
"""Get real data from the inverter asynchronously."""
return await dtu.async_get_real_data()
async def async_get_config(dtu: DTU) -> GetConfig_pb2.GetConfigResDTO | None:
"""Get the config from the inverter asynchronously."""
return await dtu.async_get_config()
async def async_network_info(
dtu: DTU,
) -> NetworkInfo_pb2.NetworkInfoResDTO | None:
"""Get network information from the inverter asynchronously."""
return await dtu.async_network_info()
async def async_app_information_data(
dtu: DTU,
) -> APPInfomationData_pb2.AppInfomationDataResDTO | None:
"""Get application information data from the inverter asynchronously."""
return await dtu.async_app_information_data()
async def async_app_get_hist_power(
dtu: DTU,
) -> AppGetHistPower_pb2.AppGetHistPowerResDTO:
"""Get historical power data from the inverter asynchronously."""
return await dtu.async_app_get_hist_power()
async def async_set_power_limit(
dtu: DTU,
) -> CommandPB_pb2.CommandResDTO | None:
"""Set the power limit of the inverter asynchronously."""
print( # noqa: T201
+ "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
+ "!!! Danger zone! This will change the power limit of the dtu. !!!\n"
+ "!!! Please be careful and make sure you know what you are doing. !!!\n"
+ "!!! Only proceed if you know what you are doing. !!!\n"
+ "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
+ END,
cont = input("Do you want to continue? (y/n): ")
if cont != "y":
return None
power_limit = int(input("Enter the new power limit (0-100): "))
if power_limit < 0 or power_limit > MAX_POWER_LIMIT:
print("Error. Invalid power limit!") # noqa: T201
return None
print(f"Setting power limit to {power_limit}%") # noqa: T201
cont = input("Are you sure? (y/n): ")
if cont != "y":
return None
return await dtu.async_set_power_limit(power_limit)
async def async_set_wifi(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Set the wifi SSID and password of the inverter asynchronously."""
wifi_ssid = input("Enter the new wifi SSID: ").strip()
wifi_password = input("Enter the new wifi password: ").strip()
print(f'Setting wifi to "{wifi_ssid}"') # noqa: T201
print(f'Setting wifi password to "{wifi_password}"') # noqa: T201
cont = input("Are you sure? (y/n): ")
if cont != "y":
return None
return await dtu.async_set_wifi(wifi_ssid, wifi_password)
async def async_firmware_update(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Update the firmware of the DTU asynchronously."""
print( # noqa: T201
+ "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
+ "!!! Danger zone! This will update the firmeware of the DTU. !!!\n"
+ "!!! Please be careful and make sure you know what you are doing. !!!\n"
+ "!!! Only proceed if you know what you are doing. !!!\n"
+ "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
+ END,
cont = input("Do you want to continue? (y/n): ")
if cont != "y":
return None
print("Please select a firmware version:") # noqa: T201
print("1.) V00.01.11") # noqa: T201
print("2.) Custom URL") # noqa: T201
while True:
selection = input("Enter your selection (1 or 2): ")
if selection == "1":
url = DTU_FIRMWARE_URL_00_01_11
if selection == "2":
url = input("Enter the custom URL: ").strip()
print("Invalid selection. Please enter 1 or 2.") # noqa: T201
print() # noqa: T201
print(f'Firmware update URL: "{url}"') # noqa: T201
print() # noqa: T201
cont = input("Do you want to continue? (y/n): ")
if cont != "y":
return None
return await dtu.async_update_dtu_firmware()
async def async_restart_dtu(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Restart the DTU asynchronously."""
cont = input("Do you want to restart the DTU? (y/n): ")
if cont != "y":
return None
return await dtu.async_restart_dtu()
async def async_turn_on_inverter(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Turn on the inverter asynchronously."""
inverter_serial = input("Enter the inverter serial number to turn *ON*: ")
cont = input(f"Do you want to turn *ON* the Inverter {inverter_serial}? (y/n): ")
if cont != "y":
return None
return await dtu.async_turn_on_inverter(inverter_serial)
async def async_turn_off_inverter(dtu: DTU) -> CommandPB_pb2.CommandResDTO | None:
"""Turn off the inverter asynchronously."""
inverter_serial = input("Enter the inverter serial number to turn *OFF*: ")
cont = input(f"Do you want to turn *OFF* the Inverter {inverter_serial}? (y/n): ")
if cont != "y":
return None
return await dtu.async_turn_off_inverter(inverter_serial)
async def async_get_information_data(
dtu: DTU,
) -> InfomationData_pb2.InfomationDataResDTO:
"""Get information data from the dtu asynchronously."""
return await dtu.async_get_information_data()
async def async_get_version_info(dtu: DTU) -> VersionInfo | None:
"""Get version information from the dtu asynchronously."""
response = await async_app_information_data(dtu)
if not response:
return None
return VersionInfo(
+ generate_dtu_version_string(response.dtu_info.dtu_hw_version),
+ generate_dtu_version_string(response.dtu_info.dtu_sw_version),
+ generate_version_string(response.pv_info[0].pv_hw_version),
+ generate_sw_version_string(response.pv_info[0].pv_sw_version),
async def async_heatbeat(dtu: DTU) -> APPHeartbeatPB_pb2.APPHeartbeatResDTO | None:
"""Request a heartbeat from the dtu asynchronously."""
return await dtu.async_heartbeat()
async def async_identify_dtu(dtu: DTU) -> str:
"""Identify the DTU asynchronously."""
real_data = await async_get_real_data_new(dtu)
dtu_model_name = get_dtu_model_name(real_data.device_serial_number)
return {real_data.device_serial_number: dtu_model_name}
async def async_identify_inverters(dtu: DTU) -> list[str]:
"""Identify the DTU asynchronously."""
inverter_models = {}
real_data = await async_get_real_data_new(dtu)
if real_data:
for sgs_data in real_data.sgs_data:
serial_number = generate_inverter_serial_number(sgs_data.serial_number)
inverter_model = get_inverter_model_name(serial_number)
inverter_models[serial_number] = inverter_model
for tgs_data in real_data.tgs_data:
serial_number = generate_inverter_serial_number(tgs_data.serial_number)
inverter_model = get_inverter_model_name(serial_number)
inverter_models[serial_number] = inverter_model
return inverter_models
async def async_get_alarm_list(dtu: DTU) -> None:
"""Get alarm list from the dtu asynchronously."""
return await dtu.async_get_alarm_list()
def print_invalid_command(command: str) -> None:
"""Print an invalid command message."""
print(f"Invalid command: {command}") # noqa: T201
async def main() -> None:
"""Execute the main function for the hoymiles_wifi package."""
parser = argparse.ArgumentParser(description="Hoymiles DTU Monitoring")
"--host", type=str, required=True, help="IP address or hostname of the DTU"
help="IP address of the interface to bind to",
help="Format the output as JSON",
help="Command to execute",
args = parser.parse_args()
dtu = DTU(args.host, args.local_addr)
# Execute the specified command using a switch case
switch = {
"get-real-data-new": async_get_real_data_new,
"get-real-data": async_get_real_data,
"get-config": async_get_config,
"network-info": async_network_info,
"app-information-data": async_app_information_data,
"app-get-hist-power": async_app_get_hist_power,
"set-power-limit": async_set_power_limit,
"set-wifi": async_set_wifi,
"firmware-update": async_firmware_update,
"restart-dtu": async_restart_dtu,
"turn-on-inverter": async_turn_on_inverter,
"turn-off-inverter": async_turn_off_inverter,
"get-information-data": async_get_information_data,
"get-version-info": async_get_version_info,
"heartbeat": async_heatbeat,
"identify-dtu": async_identify_dtu,
"identify-inverters": async_identify_inverters,
"get-alarm-list": async_get_alarm_list,
command_func = switch.get(args.command, print_invalid_command)
response = await command_func(dtu)
if response:
if args.as_json:
if isinstance(response, Message):
print(MessageToJson(response)) # noqa: T201
elif isinstance(response, dict):
print(json.dumps(response, indent=4)) # noqa: T201
print(json.dumps(asdict(response), indent=4)) # noqa: T201
print(f"{args.command.capitalize()} Response: \n{response}") # noqa: T201
print( # noqa: T201
f"No response or unable to retrieve response for "
f"{args.command.replace('_', ' ')}",
def run_main() -> None:
"""Run the main function for the hoymiles_wifi package."""
if __name__ == "__main__":