From 923cfd415266502ea0af267314be9c9e41ea4af2 Mon Sep 17 00:00:00 2001 From: Pierre Gironde Date: Thu, 16 Apr 2026 19:21:47 +0200 Subject: [PATCH] feat: add SmartObject and TemplateLibrary classes --- smartobject.py | 214 +++++++++++++++++++++++++++++++++++++++++++++ templatelibrary.py | 14 +++ 2 files changed, 228 insertions(+) create mode 100644 smartobject.py create mode 100644 templatelibrary.py diff --git a/smartobject.py b/smartobject.py new file mode 100644 index 0000000..5853b78 --- /dev/null +++ b/smartobject.py @@ -0,0 +1,214 @@ +import appdaemon.plugins.hass.hassapi as hass +import pickle +import os +import json +import re +from virtualsensors import VirtualSensors +from expressionparser import ParsingException +from logger_interface import LoggerInterface + +class SmartObject(hass.Hass,LoggerInterface): + + def _sanitize_for_topic(self, value): + return re.sub(r"[^a-zA-Z0-9_]+", "_", str(value)).strip("_").lower() + + def _mqtt_lazy_init(self): + if hasattr(self, '_mqtt'): + return self._mqtt is not None + self._mqtt = None + try: + available = any( + i.get('domain') == 'mqtt' and i.get('service') == 'publish' + for i in self.list_services('global') + ) + except Exception: + available = False + if not available: + return False + node_id = self._sanitize_for_topic(self.name) + device_name = self.args.get('mqtt_device_name', self.name) + self._mqtt = { + 'node_id': node_id, + 'device': { + 'identifiers': [node_id], + 'name': device_name, + 'manufacturer': 'AppDaemon', + 'model': self.__class__.__name__, + }, + 'entities': set(), + 'handles': {}, + } + return True + + def _mqtt_publish(self, topic, payload, retain=False): + if not isinstance(payload, str): + payload = json.dumps(payload) + self.call_service('mqtt/publish', topic=topic, payload=payload, retain=retain) + + def _mqtt_sync_state(self, entity_id, attribute, old, new, kwargs): + if not getattr(self, '_mqtt', None) or entity_id not in self._mqtt['entities']: + return + state_data = self.get_state(entity_id, attribute='all') or {} + payload = {'state': state_data.get('state')} + payload.update(state_data.get('attributes', {})) + node_id = self._mqtt['node_id'] + obj = self._sanitize_for_topic(entity_id.split('.')[-1]) + self._mqtt_publish(f"appdaemon/{node_id}/{obj}/state", payload, retain=True) + + def create_entity(self, entity_id, state=None, attributes=None, name=None, icon=None, unit_of_measurement=None, device_class=None, state_class=None): + entity = self.get_entity(entity_id, check_existence=False) + if self._mqtt_lazy_init() and entity_id not in self._mqtt['entities']: + node_id = self._mqtt['node_id'] + obj = self._sanitize_for_topic(entity_id.split('.')[-1]) + state_topic = f"appdaemon/{node_id}/{obj}/state" + availability_topic = f"appdaemon/{node_id}/{obj}/availability" + config = { + 'name': name or entity_id, + 'unique_id': f"{node_id}_{obj}", + 'state_topic': state_topic, + 'value_template': '{{ value_json.state }}', + 'availability_topic': availability_topic, + 'payload_available': 'online', + 'payload_not_available': 'offline', + 'device': self._mqtt['device'], + } + if icon: config['icon'] = icon + if unit_of_measurement: config['unit_of_measurement'] = unit_of_measurement + if device_class: config['device_class'] = device_class + if state_class: config['state_class'] = state_class + self._mqtt_publish(f"homeassistant/sensor/{node_id}/{obj}/config", config, retain=True) + self._mqtt_publish(availability_topic, 'online', retain=True) + self._mqtt['entities'].add(entity_id) + self._mqtt['handles'][entity_id] = self.listen_state(self._mqtt_sync_state, entity_id) + if state is not None and attributes is not None: + self.set_state(entity_id, state=state, attributes=attributes) + elif state is not None: + self.set_state(entity_id, state=state) + elif attributes is not None: + self.set_state(entity_id, attributes=attributes) + elif not entity.exists(): + self.set_state(entity_id, state='unknown') + return entity + + def _mqtt_terminate(self): + if not getattr(self, '_mqtt', None): + return + for entity_id in self._mqtt['entities']: + node_id = self._mqtt['node_id'] + obj = self._sanitize_for_topic(entity_id.split('.')[-1]) + try: self._mqtt_publish(f"appdaemon/{node_id}/{obj}/availability", 'offline', retain=True) + except Exception: pass + for handle in self._mqtt['handles'].values(): + try: self.cancel_listen_state(handle, silent=True) + except Exception: pass + + def get_dataset_folder(self): return os.path.join(str(self.AD.app_dir),"data") + + def get_dataset_name(self): + return os.path.join(self.get_dataset_folder(),f"{self.name}.dataset") + + def add_constants(self,constants_dict): + self.log_info(f"Declaring constants {constants_dict}") + self.constants.update(constants_dict) + + def get_default_entity_state(self): self.log_error(f"{self.entity_id} doesn't exist. Please use default_entity_state if you wan't to create one or override get_default_entity_state()",True) + + def initialize(self): + try: + #self.depends_on_module(["smartobject","virtualsensors","logger_interface"]) + + self.initialize_logger_interface(self.get_ad_api()) + self.log_info(f"Name = {self.name}") + try: self.mute_logger(self.args['mute']) + except KeyError: pass + self.event_dispatchers = dict() + self.constants = dict() + self.dataset = None + + #retrieve object param + if "entity" in self.args: + self.entity_id = self.args["entity"] + self.entity = self.get_entity(self.entity_id) + if self.entity.exists(): + self.log_info(f"Linked to {self.entity_id}") + elif 'default_entity_state' in self.args: + self.entity.set_state(state = self.args["default_entity_state"]) + self.log_info(f"Creating {self.entity_id}, default_state = {self.args['default_entity_state']}") + else: + self.entity.set_state(state = self.get_default_entity_state()) + + self.constants['self'] = self.entity_id + else: + self.entity_id = None + self.entity = None + + if 'constants' in self.args: + self.add_constants(self.args['constants']) + + self.templates_library = None + if 'templates_library' in self.args: + library_name = self.args['templates_library'] + library_app = self.get_app(library_name) + if library_app: + self.listen_event(self.on_template_library_loaded,'template_library_loaded', library_name = library_name) + self.templates_library = library_app.get_template_library() + else: + self.log_error(f"Can't find the library app {library_name}") + + if 'virtual_sensors' in self.args: + self.virtual_sensors = VirtualSensors(ad_api = self.get_ad_api(),logger_interface = self,super_entity_id = self.entity_id,yaml_block = self.args['virtual_sensors'],templates_library = self.templates_library,constants = self.constants) + + if 'attributes_override' in self.args: + self.attribute_sensors = dict() + new_attributes = dict() + for attribute in self.args['attributes_override']: + attribute_sensor = self.args['attributes_override'][attribute] + if self.entity_exists(attribute_sensor): + self.log_info(f"Registering sensor {attribute_sensor} for attribute {attribute}") + self.attribute_sensors[attribute_sensor] = attribute + self.listen_state(self.on_attribute_sensor_changed,attribute_sensor) + attribute_value = self.get_state(attribute_sensor) + else: + attribute_value = self.args['attributes_override'][attribute] + new_attributes[attribute] = attribute_value + + self.log_info(f"Overriding {self.entity_id} attributes with {new_attributes}") + self.entity.set_state(state = self.entity.get_state(), attributes = new_attributes) + + #load dataset from disk + try: + try: os.makedirs(self.get_dataset_folder()) + except FileExistsError: pass + + f = open(self.get_dataset_name(), 'rb') + self.dataset = pickle.load(f) + f.close() + self.log_info(self.get_dataset_name() + " loaded") + except (FileNotFoundError,EOFError): pass #self.log_info("File " + self.get_dataset_name() + " not found (and it's ok)") + self.on_initialize_smart_object() + except ParsingException as e: self.log_error(str(e),stop_app = True) + + + def on_initialize_smart_object(self): pass + + def terminate(self): + self.event_dispatchers = None + self.virtual_sensors = None + self._mqtt_terminate() + try: has_dataset = self.dataset != None + except AttributeError: has_dataset = False + if has_dataset: + self.log_info("Writing dataset to " + self.get_dataset_name()) + f = open(self.get_dataset_name(), 'wb') + pickle.dump(self.dataset, f) + f.close() + + def on_template_library_loaded(self, event_name, data, kwargs): + self.log_info(f"Restarting app to reload new template") + self.restart_app(self.name) + + def on_attribute_sensor_changed(self, entity, attribute, old, new, kwargs): + if new != old: + new_attributes = { self.attribute_sensors[entity] : new} + self.log_info(f"Overriding {self.entity_id} attributes with {new_attributes}") + self.entity.set_state(attributes = new_attributes) diff --git a/templatelibrary.py b/templatelibrary.py new file mode 100644 index 0000000..425c961 --- /dev/null +++ b/templatelibrary.py @@ -0,0 +1,14 @@ +import appdaemon.plugins.hass.hassapi as hass + +class TemplateLibrary(hass.Hass): + def initialize(self): + self.fire_event('template_library_loaded',library_name = self.name) + + def get_template_library(self): + if "templates_library" in self.args: + return self.args["templates_library"] + else: + message = f"Can't find 'templates_library'" + self.log(message,level = "ERROR", log = "main_log") + self.log(message,level = "ERROR", log = "error_log") + return None \ No newline at end of file