Reintroduce a first draft of the information collectors

This commit is contained in:
2026-05-09 13:35:15 +02:00
parent df47eb8f3b
commit 9f8d0b68fa
2 changed files with 234 additions and 0 deletions

View File

@@ -0,0 +1,182 @@
import ad_toolbox.smartcondition as SmartCondition
from ad_toolbox.smartobject import SmartObject
import pickle
import os
class InformationsCollector(SmartObject):
def on_initialize_smart_object(self):
super().on_initialize_smart_object()
if self.dataset is None:
self.dataset = dict()
if 'error_collector' in self.args:
if 'output_sensor' in self.args['error_collector']:
sensor_name = self.args['error_collector']['output_sensor']
self.error_list = []
self.log_handle = self.listen_log(self.on_log_error, "ERROR", log="main_log")
self.error_count_sensor = self.create_entity(
sensor_name, state=0,
attributes={"last_error": "", "error_list": []},
icon="mdi:alert-circle-outline"
)
if 'reset_error_count_button' in self.args['error_collector']:
self.listen_state(self.on_reset_error_count, self.args['error_collector']['reset_error_count_button'])
else:
self.log_error("No output_sensor defined for error_collector")
if 'min_max_temp_sensors' in self.args:
for sensor in self.args['min_max_temp_sensors']['temperature_sensors']:
self.listen_state(self.on_temperature_change, sensor)
output_sensors = self.args['min_max_temp_sensors']['output_sensors']
self.max_temp_sensor = self.get_entity(output_sensors['max_temp'])
self.max_temp_yesterday_sensor = self.get_entity(output_sensors['max_temp_yesterday'])
self.min_temp_sensor = self.get_entity(output_sensors['min_temp'])
self.min_temp_yesterday_sensor = self.get_entity(output_sensors['min_temp_yesterday'])
current_min_temp, current_max_temp = self.compute_min_max_temp()
if not self.max_temp_sensor.exists() or self.max_temp_sensor.get_state() == 'unavailable':
max_temp = current_max_temp
try:
if max_temp is None or self.dataset['max_temp'] > max_temp:
max_temp = self.dataset['max_temp']
self.log_info(f"Restoring max_temp from dataset ({max_temp}°C)")
except KeyError: pass
self.set_max_temp_sensor(max_temp)
if not self.min_temp_sensor.exists() or self.min_temp_sensor.get_state() == 'unavailable':
min_temp = current_min_temp
try:
if min_temp is None or self.dataset['min_temp'] < min_temp:
min_temp = self.dataset['min_temp']
self.log_info(f"Restoring min_temp from dataset ({min_temp}°C)")
except KeyError: pass
self.set_min_temp_sensor(min_temp)
self.run_daily(self.run_at_midnight, "00:00:00")
if 'group_collectors' in self.args:
self._group_sensors = {}
self._group_evaluators = {}
for name, config in self.args['group_collectors'].items():
self._group_sensors[name] = self.create_entity(
config['output_sensor'], state=0,
attributes={'active_entities': ''},
icon='mdi:counter'
)
entities = self._get_group_entities(config)
if 'active_condition' in config:
self._group_evaluators[name] = {
entity: SmartCondition.Evaluator(
self,
config['active_condition'],
condition_name=f"{name}_{entity}",
constants={'self': entity},
log_callback_trigger_reason=False,
on_change_cb=self._on_group_condition_changed,
)
for entity in entities
}
else:
for entity in entities:
self.listen_state(self.on_group_entity_state_changed, entity)
self.update_group_collectors()
def set_max_temp_sensor(self,temperature):
self.dataset['max_temp'] = temperature
self.max_temp_sensor.set_state(state = temperature, attributes = { 'unit_of_measurement': '°C', 'device_class': 'temperature'})
def set_min_temp_sensor(self,temperature):
self.dataset['min_temp'] = temperature
self.min_temp_sensor.set_state(state = temperature, attributes = { 'unit_of_measurement': '°C', 'device_class': 'temperature'})
def terminate(self):
self.log_info("Writing dataset to " + self.get_dataset_name())
f = open(self.get_dataset_name(), 'wb')
pickle.dump(self.dataset, f)
f.close()
def compute_min_max_temp(self):
max_temp = None
min_temp = None
for sensor in self.args['min_max_temp_sensors']['temperature_sensors']:
temp = self.get_state(sensor)
if not max_temp or temp > max_temp: max_temp = temp
if not min_temp or temp < min_temp: min_temp = temp
return min_temp,max_temp
def on_log_error(self, name, ts, level, type, message, kwargs):
error_count = int(self.error_count_sensor.get_state()) + 1
self.error_list.append(message)
self.error_count_sensor.set_state(state=error_count, attributes={"last_error": message, "error_list": self.error_list})
def on_reset_error_count(self, entity, attribute, old, new, kwargs):
error_log_file = self.get_ad_api()._logging.config['error_log']['filename']
open(error_log_file, 'w').close()
self.error_list.clear()
self.error_count_sensor.set_state(state=0, attributes={"last_error": "", "error_list": self.error_list})
def _on_group_condition_changed(self, prev_result,result):
self.update_group_collectors()
def on_group_entity_state_changed(self, entity, attribute, old, new, kwargs):
if new != old:
self.log_info(f"updating group collectors ({entity} changed from {old} to {new})")
self.update_group_collectors()
def on_temperature_change(self, entity, attribute, old, new, kwargs):
if new != old:
min_temp = self.min_temp_sensor.get_state()
max_temp = self.max_temp_sensor.get_state()
if max_temp is None or max_temp == 'unavailable' or new > max_temp: self.set_max_temp_sensor(new)
if min_temp is None or min_temp == 'unavailable' or new < min_temp: self.set_min_temp_sensor(new)
def run_at_midnight(self, kwargs):
self.max_temp_yesterday_sensor.set_state(state=self.max_temp_sensor.get_state(), attributes={'unit_of_measurement': '°C', 'device_class': 'temperature'})
self.min_temp_yesterday_sensor.set_state(state=self.min_temp_sensor.get_state(), attributes={'unit_of_measurement': '°C', 'device_class': 'temperature'})
min_temp, max_temp = self.compute_min_max_temp()
self.set_max_temp_sensor(max_temp)
self.set_min_temp_sensor(min_temp)
def _get_group_entities(self, config):
group = config['group']
if isinstance(group, str):
return self.get_state(group, attribute='entity_id') or []
entities = []
for g in group:
entities.extend(self.get_state(g, attribute='entity_id') or [])
return entities
def _is_entity_active(self, name, entity):
if name in self._group_evaluators:
return self._group_evaluators[name][entity].evaluate(log=False) == SmartCondition.Result.Succeeded
return self.get_state(entity) == 'on'
# def _get_group_entity_name(self, entity):
# entity_state = self.get_state(entity, attribute='all') or {}
# attributes = entity_state.get('attributes', {})
# friendly_name = attributes.get('friendly_name')
# return friendly_name if isinstance(friendly_name, str) and friendly_name else entity
def update_group_collectors(self):
for name, config in self.args['group_collectors'].items():
sensor = self._group_sensors[name]
prev_state = sensor.get_state()
prev_count = int(prev_state) if prev_state is not None else 0
active_entities = []
for entity in self._get_group_entities(config):
if self.entity_exists(entity) and self._is_entity_active(name, entity):
active_entities.append(entity)
count = len(active_entities)
sensor.set_state(state=count, attributes={'active_entities': ', '.join(active_entities)})
if prev_count != count:
self.log_info(f"{name}: {count} active entities (previously {prev_count})")