Compare commits

..

4 Commits

7 changed files with 285 additions and 7 deletions

1
.vscode/launch.json vendored
View File

@@ -12,6 +12,7 @@
"host": "localhost",
"port": 5678
},
"steppingResumesAllThreads": true,
"pathMappings": [
{
"localRoot": "${workspaceFolder}",

View File

@@ -3,6 +3,7 @@ appdaemon:
longitude: 0
elevation: 30
time_zone: Europe/Berlin
internal_function_timeout: 00:20:00
exclude_dirs:
- unit_tests
plugins:
@@ -11,6 +12,8 @@ appdaemon:
ha_url: http://10.0.0.21:8123
token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJiZThhYWI2ZTlkNmQ0NWU2YTk1ODU5OWJjOWM3MWJkYiIsImlhdCI6MTc3NjEwOTc0NCwiZXhwIjoyMDkxNDY5NzQ0fQ.BorkjFjWlWCZqnUa9-NMUxGsDiupDoRZ3cEgsmeSofM
cert_verify: True
q_timeout: 1200
ws_timeout: 00:10:00
http:
url: http://0.0.0.0:5050
admin:

View File

@@ -1,8 +1,9 @@
# I now directly run AppDaemon with debugpy in DockerStart.sh. Not sure which is best
# debugger:
# module: debugger
# class: Debugger
# port: 5678
debugger:
module: debugger
class: Debugger
priority: 0
port: 5678
wait_for_client: false # set to true when you need to debug initialize()
motion_tracker:
module: motiontracker
@@ -25,4 +26,58 @@ motion_tracker:
motion_sensors: binary_sensor.garage_motion
kitchen:
motion_sensors: binary_sensor.kitchen_motion
mezzanine:
motion_sensors: binary_sensor.mezzanine_motion
informations_collector:
module: informationscollector
class: InformationsCollector
priority: 1
mqtt_device_name: AD Informations Collector
#mute: True
error_collector:
reset_error_count_button: input_button.reset_error_count
output_sensor: sensor.ad_errors
# min_max_temp_sensors:
# temperature_sensors:
# - sensor.temperature_exterieur_nord
# - sensor.temperature_exterieur_sud
# output_sensors:
# max_temp: sensor.max_temp_outside
# max_temp_yesterday: sensor.max_temp_yesterday_outside
# min_temp: sensor.min_temp_outside
# min_temp_yesterday: sensor.min_temp_yesterday_outside
group_collectors:
windows:
group: binary_sensor.group_windows
output_sensor: sensor.ad_number_of_windows_open
music_players:
group: media_player.group_music_player
output_sensor: sensor.ad_number_of_music_player_active
active_condition: self == 'playing'
#windows:
# group: group.fenetres
# output_sensor: sensor.ad_number_of_windows_open
# shutters:
# group: group.volets
# output_sensor: sensor.ad_number_of_shutter_close
# occupency_sensors:
# group: group.occupency_sensors
# output_sensor: sensor.ad_number_of_rooms_occupied
# smart_lights:
# group:
# - group.smart_lights
# - group.dumb_lights
# output_sensor: sensor.ad_number_of_lights_on

View File

@@ -5,8 +5,14 @@ import debugpy
class Debugger(hass.Hass):
def initialize(self):
port = int(self.args.get("port", 5678))
wait_for_client = bool(self.args.get("wait_for_client", False))
try:
debugpy.listen(("0.0.0.0", port))
self.log(f"debugpy listening on port {port}")
except RuntimeError:
self.log("debugpy already listening, skipping")
if wait_for_client:
self.log(f"waiting for debug client on port {port}")
debugpy.wait_for_client()
self.log("debug client attached")

View File

@@ -0,0 +1,182 @@
import ad_toolbox.smartcondition as SmartCondition
from ad_toolbox.smartobject import SmartObject
import pickle
import os
class InformationsCollector(SmartObject):
def on_initialize_smart_object(self):
super().on_initialize_smart_object()
if self.dataset is None:
self.dataset = dict()
if 'error_collector' in self.args:
if 'output_sensor' in self.args['error_collector']:
sensor_name = self.args['error_collector']['output_sensor']
self.error_list = []
self.log_handle = self.listen_log(self.on_log_error, "ERROR", log="main_log")
self.error_count_sensor = self.create_entity(
sensor_name, state=0,
attributes={"last_error": "", "error_list": []},
icon="mdi:alert-circle-outline"
)
if 'reset_error_count_button' in self.args['error_collector']:
self.listen_state(self.on_reset_error_count, self.args['error_collector']['reset_error_count_button'])
else:
self.log_error("No output_sensor defined for error_collector")
if 'min_max_temp_sensors' in self.args:
for sensor in self.args['min_max_temp_sensors']['temperature_sensors']:
self.listen_state(self.on_temperature_change, sensor)
output_sensors = self.args['min_max_temp_sensors']['output_sensors']
self.max_temp_sensor = self.get_entity(output_sensors['max_temp'])
self.max_temp_yesterday_sensor = self.get_entity(output_sensors['max_temp_yesterday'])
self.min_temp_sensor = self.get_entity(output_sensors['min_temp'])
self.min_temp_yesterday_sensor = self.get_entity(output_sensors['min_temp_yesterday'])
current_min_temp, current_max_temp = self.compute_min_max_temp()
if not self.max_temp_sensor.exists() or self.max_temp_sensor.get_state() == 'unavailable':
max_temp = current_max_temp
try:
if max_temp is None or self.dataset['max_temp'] > max_temp:
max_temp = self.dataset['max_temp']
self.log_info(f"Restoring max_temp from dataset ({max_temp}°C)")
except KeyError: pass
self.set_max_temp_sensor(max_temp)
if not self.min_temp_sensor.exists() or self.min_temp_sensor.get_state() == 'unavailable':
min_temp = current_min_temp
try:
if min_temp is None or self.dataset['min_temp'] < min_temp:
min_temp = self.dataset['min_temp']
self.log_info(f"Restoring min_temp from dataset ({min_temp}°C)")
except KeyError: pass
self.set_min_temp_sensor(min_temp)
self.run_daily(self.run_at_midnight, "00:00:00")
if 'group_collectors' in self.args:
self._group_sensors = {}
self._group_evaluators = {}
for name, config in self.args['group_collectors'].items():
self._group_sensors[name] = self.create_entity(
config['output_sensor'], state=0,
attributes={'active_entities': ''},
icon='mdi:counter'
)
entities = self._get_group_entities(config)
if 'active_condition' in config:
self._group_evaluators[name] = {
entity: SmartCondition.Evaluator(
self,
config['active_condition'],
condition_name=f"{name}_{entity}",
constants={'self': entity},
log_callback_trigger_reason=False,
on_change_cb=self._on_group_condition_changed,
)
for entity in entities
}
else:
for entity in entities:
self.listen_state(self.on_group_entity_state_changed, entity)
self.update_group_collectors()
def set_max_temp_sensor(self,temperature):
self.dataset['max_temp'] = temperature
self.max_temp_sensor.set_state(state = temperature, attributes = { 'unit_of_measurement': '°C', 'device_class': 'temperature'})
def set_min_temp_sensor(self,temperature):
self.dataset['min_temp'] = temperature
self.min_temp_sensor.set_state(state = temperature, attributes = { 'unit_of_measurement': '°C', 'device_class': 'temperature'})
def terminate(self):
self.log_info("Writing dataset to " + self.get_dataset_name())
f = open(self.get_dataset_name(), 'wb')
pickle.dump(self.dataset, f)
f.close()
def compute_min_max_temp(self):
max_temp = None
min_temp = None
for sensor in self.args['min_max_temp_sensors']['temperature_sensors']:
temp = self.get_state(sensor)
if not max_temp or temp > max_temp: max_temp = temp
if not min_temp or temp < min_temp: min_temp = temp
return min_temp,max_temp
def on_log_error(self, name, ts, level, type, message, kwargs):
error_count = int(self.error_count_sensor.get_state()) + 1
self.error_list.append(message)
self.error_count_sensor.set_state(state=error_count, attributes={"last_error": message, "error_list": self.error_list})
def on_reset_error_count(self, entity, attribute, old, new, kwargs):
error_log_file = self.get_ad_api()._logging.config['error_log']['filename']
open(error_log_file, 'w').close()
self.error_list.clear()
self.error_count_sensor.set_state(state=0, attributes={"last_error": "", "error_list": self.error_list})
def _on_group_condition_changed(self, prev_result,result):
self.update_group_collectors()
def on_group_entity_state_changed(self, entity, attribute, old, new, kwargs):
if new != old:
self.log_info(f"updating group collectors ({entity} changed from {old} to {new})")
self.update_group_collectors()
def on_temperature_change(self, entity, attribute, old, new, kwargs):
if new != old:
min_temp = self.min_temp_sensor.get_state()
max_temp = self.max_temp_sensor.get_state()
if max_temp is None or max_temp == 'unavailable' or new > max_temp: self.set_max_temp_sensor(new)
if min_temp is None or min_temp == 'unavailable' or new < min_temp: self.set_min_temp_sensor(new)
def run_at_midnight(self, kwargs):
self.max_temp_yesterday_sensor.set_state(state=self.max_temp_sensor.get_state(), attributes={'unit_of_measurement': '°C', 'device_class': 'temperature'})
self.min_temp_yesterday_sensor.set_state(state=self.min_temp_sensor.get_state(), attributes={'unit_of_measurement': '°C', 'device_class': 'temperature'})
min_temp, max_temp = self.compute_min_max_temp()
self.set_max_temp_sensor(max_temp)
self.set_min_temp_sensor(min_temp)
def _get_group_entities(self, config):
group = config['group']
if isinstance(group, str):
return self.get_state(group, attribute='entity_id') or []
entities = []
for g in group:
entities.extend(self.get_state(g, attribute='entity_id') or [])
return entities
def _is_entity_active(self, name, entity):
if name in self._group_evaluators:
return self._group_evaluators[name][entity].evaluate(log=False) == SmartCondition.Result.Succeeded
return self.get_state(entity) == 'on'
# def _get_group_entity_name(self, entity):
# entity_state = self.get_state(entity, attribute='all') or {}
# attributes = entity_state.get('attributes', {})
# friendly_name = attributes.get('friendly_name')
# return friendly_name if isinstance(friendly_name, str) and friendly_name else entity
def update_group_collectors(self):
for name, config in self.args['group_collectors'].items():
sensor = self._group_sensors[name]
prev_state = sensor.get_state()
prev_count = int(prev_state) if prev_state is not None else 0
active_entities = []
for entity in self._get_group_entities(config):
if self.entity_exists(entity) and self._is_entity_active(name, entity):
active_entities.append(entity)
count = len(active_entities)
sensor.set_state(state=count, attributes={'active_entities': ', '.join(active_entities)})
if prev_count != count:
self.log_info(f"{name}: {count} active entities (previously {prev_count})")

View File

@@ -0,0 +1,27 @@
light_mezzanine_01:
module: smartlight
class: SmartLight
entity: light.mezzanine_01
smart_conditions:
trigger_conditions: sensor.mezzanine_last_motion < 3
blocking_conditions: sensor.mezzanine_motion_light_level > 10 and not self
light_brightness_pct:
15: input_boolean.sleeping
35: binary_sensor.day_interval_night
100: True
light_mezzanine_02:
module: smartlight
class: SmartLight
entity: light.mezzanine_02
smart_conditions:
trigger_conditions: sensor.mezzanine_last_motion < 3
blocking_conditions: sensor.mezzanine_motion_light_level > 10 and not self
light_brightness_pct:
5: input_boolean.sleeping
25: binary_sensor.day_interval_night
100: True

View File

@@ -155,8 +155,12 @@ class SmartSwitch(SmartObject):
# Timer callback: applies the deferred state set by auto_switch_*_after.
def on_auto_switch_after(self, kwargs):
self.auto_switch_cb_handle = None
self.log(f"Switching {self.entity_id} {kwargs['new_state']} after {kwargs['autoswitch_delay']}s")
self.set_state(self.entity_id,state = kwargs['new_state'])
new_state = kwargs['new_state']
self.log(f"Switching {self.entity_id} {new_state} after {kwargs['autoswitch_delay']}s")
if new_state == 'on':
self.switch_on()
else:
self.switch_off()
# ------------------------------------------------------------------
# Public switch API