diff --git a/pyhon/appliance.py b/pyhon/appliance.py index 73992c1..3754aca 100644 --- a/pyhon/appliance.py +++ b/pyhon/appliance.py @@ -3,7 +3,7 @@ import logging import re from datetime import datetime, timedelta from pathlib import Path -from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload, Callable +from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload from pyhon import diagnose, exceptions from pyhon.appliances.base import ApplianceBase @@ -43,7 +43,6 @@ class HonAppliance: self._additional_data: Dict[str, Any] = {} self._last_update: Optional[datetime] = None self._default_setting = HonParameter("", {}, "") - self._notify_function: Optional[Callable[[Any], None]] = None try: self._extra: Optional[ApplianceBase] = importlib.import_module( @@ -313,11 +312,3 @@ class HonAppliance: elif isinstance(target, HonParameterEnum): target.values = main.values target.value = main.value - - def subscribe(self, notify_function: Callable[[Any], None]) -> None: - self._notify_function = notify_function - - def notify(self) -> None: - self.sync_params_to_command("settings") - if self._notify_function: - self._notify_function(self.attributes) diff --git a/pyhon/connection/api.py b/pyhon/connection/api.py index f99fd5d..788a067 100644 --- a/pyhon/connection/api.py +++ b/pyhon/connection/api.py @@ -7,12 +7,10 @@ from types import TracebackType from typing import Dict, Optional, Any, List, no_type_check, Type from aiohttp import ClientSession -from awscrt import mqtt5 from typing_extensions import Self from pyhon import const, exceptions from pyhon.appliance import HonAppliance -from pyhon.connection import mqtt from pyhon.connection.auth import HonAuth from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler from pyhon.connection.handler.hon import HonConnectionHandler @@ -40,7 +38,6 @@ class HonAPI: self._hon_handler: Optional[HonConnectionHandler] = None self._hon_anonymous_handler: Optional[HonAnonymousConnectionHandler] = None self._session: Optional[ClientSession] = session - self._mqtt_client: mqtt5.Client | None = None async def __aenter__(self) -> Self: return await self.create() @@ -269,10 +266,6 @@ class HonAPI: result: Dict[str, Any] = await response.json() return result - async def subscribe_mqtt(self, appliances: list[HonAppliance]) -> None: - if not self._mqtt_client: - self._mqtt_client = await mqtt.start(self, appliances) - async def close(self) -> None: if self._hon_handler is not None: await self._hon_handler.close() diff --git a/pyhon/connection/mqtt.py b/pyhon/connection/mqtt.py index ce6eaed..cb1f309 100644 --- a/pyhon/connection/mqtt.py +++ b/pyhon/connection/mqtt.py @@ -1,3 +1,4 @@ +import asyncio import json import logging import secrets @@ -10,91 +11,127 @@ from pyhon import const from pyhon.appliance import HonAppliance if TYPE_CHECKING: - from pyhon import HonAPI + from pyhon import Hon _LOGGER = logging.getLogger(__name__) -appliances: list[HonAppliance] = [] +class MQTTClient: + def __init__(self, hon: "Hon"): + self._client: mqtt5.Client | None = None + self._hon = hon + self._api = hon.api + self._appliances = hon.appliances + self._connection = False + self._watchdog_task: asyncio.Task[None] | None = None -def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData) -> None: - _LOGGER.info("Lifecycle Stopped: %s", str(lifecycle_stopped_data)) + @property + def client(self) -> mqtt5.Client: + if self._client is not None: + return self._client + raise AttributeError("Client is not set") + async def create(self) -> "MQTTClient": + await self._start() + self._subscribe_appliances() + return self -def on_lifecycle_connection_success( - lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData, -) -> None: - _LOGGER.info( - "Lifecycle Connection Success: %s", str(lifecycle_connect_success_data) - ) + def _on_lifecycle_stopped( + self, lifecycle_stopped_data: mqtt5.LifecycleStoppedData + ) -> None: + _LOGGER.info("Lifecycle Stopped: %s", str(lifecycle_stopped_data)) - -def on_lifecycle_attempting_connect( - lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData, -) -> None: - _LOGGER.info( - "Lifecycle Attempting Connect - %s", str(lifecycle_attempting_connect_data) - ) - - -def on_lifecycle_connection_failure( - lifecycle_connection_failure_data: mqtt5.LifecycleConnectFailureData, -) -> None: - _LOGGER.info( - "Lifecycle Connection Failure - %s", str(lifecycle_connection_failure_data) - ) - - -def on_lifecycle_disconnection( - lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData, -) -> None: - _LOGGER.info("Lifecycle Disconnection - %s", str(lifecycle_disconnect_data)) - - -def on_publish_received(data: mqtt5.PublishReceivedData) -> None: - if not (data and data.publish_packet and data.publish_packet.payload): - return - payload = json.loads(data.publish_packet.payload.decode()) - topic = data.publish_packet.topic - if topic and "appliancestatus" in topic: - appliance = next( - a for a in appliances if topic in a.info["topics"]["subscribe"] + def _on_lifecycle_connection_success( + self, + lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData, + ) -> None: + self._connection = True + _LOGGER.info( + "Lifecycle Connection Success: %s", str(lifecycle_connect_success_data) ) - for parameter in payload["parameters"]: - appliance.attributes["parameters"][parameter["parName"]].update(parameter) - appliance.notify() - _LOGGER.info("%s - %s", topic, payload) + def _on_lifecycle_attempting_connect( + self, + lifecycle_attempting_connect_data: mqtt5.LifecycleAttemptingConnectData, + ) -> None: + _LOGGER.info( + "Lifecycle Attempting Connect - %s", str(lifecycle_attempting_connect_data) + ) -async def create_mqtt_client(api: "HonAPI") -> mqtt5.Client: - client: mqtt5.Client = mqtt5_client_builder.websockets_with_custom_authorizer( - endpoint=const.AWS_ENDPOINT, - auth_authorizer_name=const.AWS_AUTHORIZER, - auth_authorizer_signature=await api.load_aws_token(), - auth_token_key_name="token", - auth_token_value=api.auth.id_token, - client_id=f"{const.MOBILE_ID}_{secrets.token_hex(8)}", - on_lifecycle_stopped=on_lifecycle_stopped, - on_lifecycle_connection_success=on_lifecycle_connection_success, - on_lifecycle_attempting_connect=on_lifecycle_attempting_connect, - on_lifecycle_connection_failure=on_lifecycle_connection_failure, - on_lifecycle_disconnection=on_lifecycle_disconnection, - on_publish_received=on_publish_received, - ) - client.start() - return client + def _on_lifecycle_connection_failure( + self, + lifecycle_connection_failure_data: mqtt5.LifecycleConnectFailureData, + ) -> None: + _LOGGER.info( + "Lifecycle Connection Failure - %s", str(lifecycle_connection_failure_data) + ) + def _on_lifecycle_disconnection( + self, + lifecycle_disconnect_data: mqtt5.LifecycleDisconnectData, + ) -> None: + self._connection = False + _LOGGER.info("Lifecycle Disconnection - %s", str(lifecycle_disconnect_data)) -def subscribe(client: mqtt5.Client, appliance: HonAppliance) -> None: - for topic in appliance.info.get("topics", {}).get("subscribe", []): - client.subscribe(mqtt5.SubscribePacket([mqtt5.Subscription(topic)])).result(10) - _LOGGER.info("Subscribed to topic %s", topic) + def _on_publish_received(self, data: mqtt5.PublishReceivedData) -> None: + if not (data and data.publish_packet and data.publish_packet.payload): + return + payload = json.loads(data.publish_packet.payload.decode()) + topic = data.publish_packet.topic + appliance = next( + a for a in self._appliances if topic in a.info["topics"]["subscribe"] + ) + if topic and "appliancestatus" in topic: + for parameter in payload["parameters"]: + appliance.attributes["parameters"][parameter["parName"]].update( + parameter + ) + appliance.sync_params_to_command("settings") + self._hon.notify() + elif topic and "connected" in topic: + _LOGGER.info("Connected %s", appliance.nick_name) + elif topic and "disconnected" in topic: + _LOGGER.info("Disconnected %s", appliance.nick_name) + elif topic and "discovery" in topic: + _LOGGER.info("Discovered %s", appliance.nick_name) + _LOGGER.info("%s - %s", topic, payload) + async def _start(self) -> None: + self._client = mqtt5_client_builder.websockets_with_custom_authorizer( + endpoint=const.AWS_ENDPOINT, + auth_authorizer_name=const.AWS_AUTHORIZER, + auth_authorizer_signature=await self._api.load_aws_token(), + auth_token_key_name="token", + auth_token_value=self._api.auth.id_token, + client_id=f"{const.MOBILE_ID}_{secrets.token_hex(8)}", + on_lifecycle_stopped=self._on_lifecycle_stopped, + on_lifecycle_connection_success=self._on_lifecycle_connection_success, + on_lifecycle_attempting_connect=self._on_lifecycle_attempting_connect, + on_lifecycle_connection_failure=self._on_lifecycle_connection_failure, + on_lifecycle_disconnection=self._on_lifecycle_disconnection, + on_publish_received=self._on_publish_received, + ) + self.client.start() -async def start(api: "HonAPI", app: list[HonAppliance]) -> mqtt5.Client: - client = await create_mqtt_client(api) - global appliances # pylint: disable=global-statement - appliances = app - for appliance in appliances: - subscribe(client, appliance) - return client + def _subscribe_appliances(self) -> None: + for appliance in self._appliances: + self._subscribe(appliance) + + def _subscribe(self, appliance: HonAppliance) -> None: + for topic in appliance.info.get("topics", {}).get("subscribe", []): + self.client.subscribe( + mqtt5.SubscribePacket([mqtt5.Subscription(topic)]) + ).result(10) + _LOGGER.info("Subscribed to topic %s", topic) + + async def start_watchdog(self) -> None: + if not self._watchdog_task or self._watchdog_task.done(): + await asyncio.create_task(self._watchdog()) + + async def _watchdog(self) -> None: + while True: + await asyncio.sleep(5) + if not self._connection: + _LOGGER.info("Restart mqtt connection") + await self._start() + self._subscribe_appliances() diff --git a/pyhon/hon.py b/pyhon/hon.py index acdd92c..5d2da5d 100644 --- a/pyhon/hon.py +++ b/pyhon/hon.py @@ -2,7 +2,7 @@ import asyncio import logging from pathlib import Path from types import TracebackType -from typing import List, Optional, Dict, Any, Type +from typing import List, Optional, Dict, Any, Type, Callable from aiohttp import ClientSession from typing_extensions import Self @@ -10,6 +10,7 @@ from typing_extensions import Self from pyhon.appliance import HonAppliance from pyhon.connection.api import HonAPI from pyhon.connection.api import TestAPI +from pyhon.connection.mqtt import MQTTClient from pyhon.exceptions import NoAuthenticationException _LOGGER = logging.getLogger(__name__) @@ -33,6 +34,8 @@ class Hon: self._test_data_path: Path = test_data_path or Path().cwd() self._mobile_id: str = mobile_id self._refresh_token: str = refresh_token + self._mqtt_client: MQTTClient | None = None + self._notify_function: Optional[Callable[[Any], None]] = None async def __aenter__(self) -> Self: return await self.create() @@ -120,7 +123,15 @@ class Hon: api = TestAPI(test_data) for appliance in await api.load_appliances(): await self._create_appliance(appliance, api) - await self.api.subscribe_mqtt(self.appliances) + if not self._mqtt_client: + self._mqtt_client = await MQTTClient(self).create() + + def subscribe_updates(self, notify_function: Callable[[Any], None]) -> None: + self._notify_function = notify_function + + def notify(self) -> None: + if self._notify_function: + self._notify_function(None) async def close(self) -> None: await self.api.close() diff --git a/setup.py b/setup.py index 3d5d5e7..022bf16 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ with open("README.md", "r", encoding="utf-8") as f: setup( name="pyhOn", - version="0.17.1", + version="0.17.2", author="Andre Basche", description="Control hOn devices with python", long_description=long_description,