hoymiles-wifi/hoymiles_wifi/inverter.py
suaveolent cd476628e4 bugfix
2024-02-12 13:12:56 +01:00

262 lines
9.0 KiB
Python

from __future__ import annotations
import asyncio
import struct
from typing import Any
from crcmod import mkCrcFun
from datetime import datetime
import time
from hoymiles_wifi import logger
from hoymiles_wifi.utils import initialize_set_config
from hoymiles_wifi.protobuf import (
APPInfomationData_pb2,
AppGetHistPower_pb2,
CommandPB_pb2,
InfomationData_pb2,
GetConfig_pb2,
RealData_pb2,
RealDataNew_pb2,
RealDataHMS_pb2,
NetworkInfo_pb2,
SetConfig_pb2,
)
from hoymiles_wifi.const import (
INVERTER_PORT,
CMD_HEADER,
CMD_GET_CONFIG,
CMD_REAL_RES_DTO,
CMD_REAL_DATA_RES_DTO,
CMD_NETWORK_INFO_RES,
CMD_APP_INFO_DATA_RES_DTO,
CMD_APP_GET_HIST_POWER_RES,
CMD_ACTION_POWER_LIMIT,
CMD_COMMAND_RES_DTO,
CMD_ACTION_FIRMWARE_UPGRADE,
CMD_SET_CONFIG,
CMD_CLOUD_COMMAND_RES_DTO,
CMD_ACTION_RESTART,
CMD_ACTION_TURN_ON,
CMD_ACTION_TURN_OFF,
DTU_FIRMWARE_URL_00_01_11,
)
class NetmodeSelect:
WIFI = 1
SIM = 2
LAN = 3
class NetworkState:
Unknown = 0
Online = 1
Offline = 2
class Inverter:
def __init__(self, host: str):
self.host = host
self.state = NetworkState.Unknown
self.sequence = 0
def get_state(self) -> NetworkState:
return self.state
def set_state(self, new_state: NetworkState):
if self.state != new_state:
self.state = new_state
logger.debug(f"Inverter is {new_state}")
async def async_get_real_data_hms(self) -> RealDataHMS_pb2.HMSStateResponse | None:
request = RealDataHMS_pb2.HMSRealDataResDTO()
command = CMD_REAL_DATA_RES_DTO
return await self.async_send_request(command, request, RealDataHMS_pb2.HMSStateResponse)
async def async_get_real_data(self) -> RealData_pb2.RealDataResDTO | None:
request = RealData_pb2.RealDataResDTO()
command = CMD_REAL_DATA_RES_DTO
return await self.async_.send_request(command, request, RealData_pb2.RealDataReqDTO)
async def async_get_real_data_new(self) -> RealDataNew_pb2.RealDataNewResDTO | None:
request = RealDataNew_pb2.RealDataNewResDTO()
request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
request.offset = 28800
request.time = int(time.time())
command = CMD_REAL_RES_DTO
return await self.async_send_request(command, request, RealDataNew_pb2.RealDataNewReqDTO)
async def async_get_config(self) -> GetConfig_pb2.GetConfigResDTO | None:
request = GetConfig_pb2.GetConfigResDTO()
request.offset = 28800
request.time = int(time.time()) - 60
command = CMD_GET_CONFIG
return await self.async_send_request(command, request, GetConfig_pb2.GetConfigReqDTO)
async def async_network_info(self) -> NetworkInfo_pb2.NetworkInfoResDTO | None:
request = NetworkInfo_pb2.NetworkInfoResDTO()
request.offset = 28800
request.time = int(time.time())
command = CMD_NETWORK_INFO_RES
return await self.async_send_request(command, request, NetworkInfo_pb2.NetworkInfoReqDTO)
async def async_app_information_data(self) -> APPInfomationData_pb2.APPInfoDataResDTO:
request = APPInfomationData_pb2.APPInfoDataResDTO()
request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
request.offset = 28800
request.time = int(time.time())
command = CMD_APP_INFO_DATA_RES_DTO
return await self.async_send_request(command, request, APPInfomationData_pb2.APPInfoDataReqDTO)
async def async_app_get_hist_power(self) -> AppGetHistPower_pb2.AppGetHistPowerResDTO | None:
request = AppGetHistPower_pb2.AppGetHistPowerResDTO()
request.offset = 28800
request.requested_time = int(time.time())
command = CMD_APP_GET_HIST_POWER_RES
return await self.async_send_request(command, request, AppGetHistPower_pb2.AppGetHistPowerReqDTO)
async def async_set_power_limit(self, power_limit: int) -> CommandPB_pb2.CommandResDTO | None:
if(power_limit < 0 or power_limit > 100):
logger.error("Error. Invalid power limit!")
return
power_limit = power_limit * 10
request = CommandPB_pb2.CommandResDTO()
request.time = int(time.time())
request.action = CMD_ACTION_POWER_LIMIT
request.package_nub = 1
request.tid = int(time.time())
request.data = f'A:{power_limit},B:0,C:0\r'.encode('utf-8')
command = CMD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
async def async_set_wifi(self, ssid: str, password: str) -> SetConfig_pb2.SetConfigResDTO | None:
get_config_req = await self.async_get_config()
if(get_config_req is None):
logger.error("Failed to get config")
return
request = initialize_set_config(get_config_req)
request.time = int(time.time())
request.offset = 28800
request.app_page = 1
request.netmode_select = NetmodeSelect.WIFI
request.wifi_ssid = ssid.encode('utf-8')
request.wifi_password = password.encode('utf-8')
command = CMD_SET_CONFIG
return await self.async_send_request(command, request, SetConfig_pb2.SetConfigReqDTO)
async def async_update_dtu_firmware(self, firmware_url: str = DTU_FIRMWARE_URL_00_01_11) -> CommandPB_pb2.CommandResDTO | None:
request = CommandPB_pb2.CommandResDTO()
request.action = CMD_ACTION_FIRMWARE_UPGRADE
request.package_nub = 1
request.tid = int(time.time())
request.data = (firmware_url + '\r').encode('utf-8')
command = CMD_CLOUD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
async def async_restart(self) -> CommandPB_pb2.CommandResDTO | None:
request = CommandPB_pb2.CommandResDTO()
request.action = CMD_ACTION_RESTART
request.package_nub = 1
request.tid = int(time.time())
command = CMD_CLOUD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
async def async_turn_on(self) -> CommandPB_pb2.CommandResDTO | None:
request = CommandPB_pb2.CommandResDTO()
request.action = CMD_ACTION_TURN_ON
request.package_nub = 1
request.dev_kind = 1
request.tid = int(time.time())
command = CMD_CLOUD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
async def async_turn_off(self) -> CommandPB_pb2.CommandResDTO | None:
request = CommandPB_pb2.CommandResDTO()
request.action = CMD_ACTION_TURN_OFF
request.package_nub = 1
request.dev_kind = 1
request.tid = int(time.time())
command = CMD_CLOUD_COMMAND_RES_DTO
return await self.async_send_request(command, request, CommandPB_pb2.CommandReqDTO)
async def async_get_information_data(self) -> InfomationData_pb2.InfoDataResDTO | None:
request = InfomationData_pb2.InfoDataResDTO()
request.time_ymd_hms = datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
request.offset = 28800
request.time = int(time.time())
command = CMD_APP_INFO_DATA_RES_DTO
return await self.async_send_request(command, request, InfomationData_pb2.InfoDataReqDTO)
async def async_send_request(self, command: bytes, request: Any, response_type: Any, inverter_port: int = INVERTER_PORT):
self.sequence = (self.sequence + 1) & 0xFFFF
request_as_bytes = request.SerializeToString()
crc16 = mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)(request_as_bytes)
length = len(request_as_bytes) + 10
# compose request message
header = CMD_HEADER + command
message = header + struct.pack('>HHH', self.sequence, crc16, length) + request_as_bytes
address = (self.host, inverter_port)
try:
reader, writer = await asyncio.open_connection(*address)
writer.write(message)
await writer.drain()
buf = await asyncio.wait_for(reader.read(1024), timeout=5)
except (OSError, asyncio.TimeoutError) as e:
logger.debug(f"{e}")
self.set_state(NetworkState.Offline)
return None
try:
if len(buf) < 8:
raise ValueError("Buffer is too short for unpacking")
read_length = struct.unpack('>H', buf[6:8])[0]
parsed = response_type.FromString(buf[10:10 + read_length])
if not parsed:
raise ValueError("Parsing resulted in an empty or falsy value")
except Exception as e:
logger.debug(f"Failed to parse response: {e}")
self.set_state(NetworkState.Unknown)
return None
finally:
writer.close()
await writer.wait_closed()
self.set_state(NetworkState.Online)
return parsed