diff --git a/pyhon/connection/mqtt.py b/pyhon/connection/mqtt.py index c654707..d6a04da 100644 --- a/pyhon/connection/mqtt.py +++ b/pyhon/connection/mqtt.py @@ -1,6 +1,8 @@ +import functools import json import logging import secrets +import ssl from typing import TYPE_CHECKING from urllib.parse import urlencode @@ -9,9 +11,9 @@ from paho.mqtt.client import Client, MQTTv5 from pyhon import const if TYPE_CHECKING: - from paho.mqtt.client import MQTTMessage, _UserData + from paho.mqtt.client import MQTTMessage, _UserData, ReasonCodes, Properties - from pyhon import Hon + from pyhon import Hon, HonAPI from pyhon.appliance import HonAppliance _LOGGER = logging.getLogger(__name__) @@ -22,8 +24,14 @@ class MQTTClient: self._client: Client | None = None self._hon = hon self._mobile_id = mobile_id or const.MOBILE_ID - self._api = hon.api - self._appliances = hon.appliances + + @property + def _appliances(self) -> list["HonAppliance"]: + return self._hon.appliances + + @property + def _api(self) -> "HonAPI": + return self._hon.api @property def client(self) -> Client: @@ -33,46 +41,8 @@ class MQTTClient: async def create(self) -> "MQTTClient": await self._start() - self._subscribe_appliances() return self - def _on_message( - self, - client: Client, # pylint: disable=unused-argument - userdata: "_UserData", # pylint: disable=unused-argument - message: "MQTTMessage", - ) -> None: - if not message.payload or not message.topic: - return - - payload = json.loads(message.payload) - topic = message.topic - appliance = next( - a for a in self._appliances if topic in a.info["topics"]["subscribe"] - ) - - topic_parts = topic.split("/") - if "appliancestatus" in topic_parts: - for parameter in payload["parameters"]: - appliance.attributes["parameters"][parameter["parName"]].update( - parameter - ) - appliance.sync_params_to_command("settings") - elif "disconnected" in topic_parts: - _LOGGER.info( - "Disconnected %s: %s", - appliance.nick_name, - payload.get("disconnectReason"), - ) - appliance.connection = False - elif "connected" in topic_parts: - appliance.connection = True - _LOGGER.info("Connected %s", appliance.nick_name) - elif "discovery" in topic_parts: - _LOGGER.info("Discovered %s", appliance.nick_name) - - self._hon.notify() - async def _start(self) -> None: self._client = Client( client_id=f"{self._mobile_id}_{secrets.token_hex(8)}", @@ -80,8 +50,12 @@ class MQTTClient: reconnect_on_failure=True, ) - self._client.on_message = self._on_message - self._client.enable_logger(_LOGGER) + ssl_context = ssl.create_default_context() + ssl_context.set_alpn_protocols([const.ALPN_PROTOCOL]) + + self.client.tls_set_context(ssl_context) + self.client.enable_logger(_LOGGER) + self.client.on_connect = self._subscribe_appliances query_params = urlencode( { @@ -91,17 +65,73 @@ class MQTTClient: } ) - self._client.username_pw_set(f"?{query_params}") + self.client.username_pw_set("?" + query_params) - self._client.connect_async(const.AWS_ENDPOINT, 443) - self._client.loop_start() + self.client.connect_async(const.AWS_ENDPOINT, 443) + self.client.loop_start() - def _subscribe_appliances(self) -> None: + def _subscribe_appliances( + self, + client: Client, + userdata: "_UserData", + flags: dict[str, int], + rc: "ReasonCodes", + properties: "Properties|None", + ) -> None: + del client, userdata, flags, rc, properties for appliance in self._appliances: self._subscribe(appliance) + def _appliance_status_callback( + self, + appliance: "HonAppliance", + client: Client, + userdata: "_UserData", + message: "MQTTMessage", + ) -> None: + del client, userdata + payload = json.loads(message.payload) + for parameter in payload["parameters"]: + appliance.attributes["parameters"][parameter["parName"]].update(parameter) + appliance.sync_params_to_command("settings") + + self._hon.notify() + + def _appliance_disconnected_callback( + self, + appliance: "HonAppliance", + client: Client, + userdata: "_UserData", + message: "MQTTMessage", + ) -> None: + del client, userdata, message + appliance.connection = False + + self._hon.notify() + + def _appliance_connected_callback( + self, + appliance: "HonAppliance", + client: Client, + userdata: "_UserData", + message: "MQTTMessage", + ) -> None: + del client, userdata, message + appliance.connection = True + + self._hon.notify() + def _subscribe(self, appliance: "HonAppliance") -> None: + topic_part_to_callback_mapping = { + "appliancestatus": self._appliance_status_callback, + "disconnected": self._appliance_disconnected_callback, + "connected": self._appliance_connected_callback, + } for topic in appliance.info.get("topics", {}).get("subscribe", []): - if self._client: - self._client.subscribe(topic) - _LOGGER.info("Subscribed to topic %s", topic) + for topic_part, callback in topic_part_to_callback_mapping.items(): + if topic_part in topic: + self.client.message_callback_add( + topic, functools.partial(callback, appliance) + ) + self.client.subscribe(topic) + _LOGGER.info("Subscribed to topic %s", topic) diff --git a/pyhon/const.py b/pyhon/const.py index a63da63..8cc8c7f 100644 --- a/pyhon/const.py +++ b/pyhon/const.py @@ -1,3 +1,4 @@ +ALPN_PROTOCOL = "mqtt" AUTH_API = "https://account2.hon-smarthome.com" API_URL = "https://api-iot.he.services" API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration"