First smart lights :)

This commit is contained in:
2026-04-16 19:23:59 +02:00
parent 645190c1be
commit 8910749eb8
15 changed files with 1374 additions and 20 deletions

View File

@@ -1,3 +1,41 @@
hello_world:
module: hello
class: HelloWorld
motion_tracker:
module: motiontracker
class: MotionTracker
mqtt_device_name: AD Motion Tracker
areas:
corridor:
motion_sensors: binary_sensor.corridor_motion
hallway:
motion_sensors: binary_sensor.hallway_motion
restroom:
motion_sensors: binary_sensor.restroom_motion
light_corridor:
module: smartlight
class: SmartLight
entity: light.corridor
smart_conditions:
trigger_conditions: sensor.corridor_last_motion < 5
blocking_conditions: sensor.corridor_motion_light_level > 10 and not self
light_hallway:
module: smartlight
class: SmartLight
entity: light.hallway
smart_conditions:
trigger_conditions: sensor.hallway_last_motion < 5
blocking_conditions: sensor.hallway_motion_light_level > 10 and not self
light_restroom:
module: smartlight
class: SmartLight
entity: light.restroom
smart_conditions:
trigger_conditions: sensor.restroom_last_motion < 5
blocking_conditions: sensor.restroom_motion_light_level > 10 and not self

View File

@@ -1,3 +0,0 @@
hello_world:
module: hello
class: HelloWorld

Binary file not shown.

View File

@@ -1,13 +0,0 @@
import hassapi as hass
#
# Hello World App
#
# Args:
#
class HelloWorld(hass.Hass):
def initialize(self):
self.log("Hello from AppDaemon")
self.log("You are now ready to run Apps!")

137
apps/motiontracker.py Normal file
View File

@@ -0,0 +1,137 @@
import appdaemon.plugins.hass.hassapi as hass
from ad_toolbox.smartobject import SmartObject
from ad_toolbox.eventhandler import EventHandler
from ad_toolbox.expressionparser import ParsingException
import time
class MotionTracker(SmartObject):
MAX_TIME = 120
def initialize(self):
super().initialize()
if self.dataset == None:
self.dataset = { 'areas_movement_time' : dict(), 'last_area_with_movement' : 'Unknown', 'areas_door_close_time' : dict() }
else:
# clean obsolete keys
self.dataset['areas_movement_time'] = {area : self.dataset['areas_movement_time'][area] for area in self.args['areas'] if area in self.dataset['areas_movement_time']}
self.dataset['areas_door_close_time'] = {area : self.dataset['areas_door_close_time'][area] for area in self.args['areas'] if area in self.dataset['areas_door_close_time']}
self.input_sensors = dict()
self.output_last_motion_sensors = dict()
self.output_last_motion_time_sensors = dict()
self.output_door_close_time_sensors = dict()
self.update_cb_handle = None
if "areas" in self.args:
current_time = time.time()
for area in self.args['areas']:
update_on_both_front = False
self.output_last_motion_sensors[area] = self.create_entity(f"sensor.{area}_last_motion")
self.output_last_motion_time_sensors[area] = self.create_entity(f"sensor.{area}_last_motion_time")
if isinstance(self.args['areas'][area], dict):
sensor_entities = self.args['areas'][area]['motion_sensors']
try: update_on_both_front = self.args['areas'][area]['update_on_both_front']
except KeyError: pass
if 'door_sensor' in self.args['areas'][area]:
self.output_door_close_time_sensors[area] = self.create_entity(f"{area}_sensor.door_close_time")
if not area in self.dataset['areas_door_close_time']:
self.dataset['areas_door_close_time'][area] = 0
if not self.output_door_close_time_sensors[area].exists():
self.output_door_close_time_sensors[area].set_state(state = self.dataset['areas_door_close_time'][area],attributes = {'unit_of_measurement' : "s"})
self.listen_state(self.on_door_close,self.args['areas'][area]['door_sensor'],new = 'off', old = 'on',area = area)
else:
sensor_entities = self.args['areas'][area]
if isinstance(sensor_entities, list):
for entity in sensor_entities:
self.register_motion_sensor(area,entity,update_on_both_front)
else:
self.register_motion_sensor(area,sensor_entities,update_on_both_front)
if not area in self.dataset['areas_movement_time']:
self.dataset['areas_movement_time'][area] = 0
if not self.output_last_motion_time_sensors[area].exists():
self.output_last_motion_time_sensors[area].set_state(state = self.dataset['areas_movement_time'][area],attributes = {'unit_of_measurement' : "s"})
self.update_area_sensor(area,current_time)
if "clear_areas_events" in self.args:
self.event_handlers = list()
for entry in self.args["clear_areas_events"]:
yaml_block = self.args["clear_areas_events"][entry]
try: self.event_handlers.append(EventHandler(self,yaml_block['events_to_listen'],self.on_clear_areas_event,entry))
except ParsingException as e:
self.log_error(str(e))
continue
self.update_areas_data()
def on_clear_areas_event(self, event_name, event_data,entry):
for area in self.args["clear_areas_events"][entry]['areas_to_clear']:
self.log(f"{area} movement data reseted by {event_name} event")
self.dataset['areas_movement_time'][area] = 0
self.output_last_motion_time_sensors[area].set_state(state = 0)
self.update_areas_data()
def register_motion_sensor(self,area,sensor_entity,update_on_both_front):
self.log(f"Registering sensor {sensor_entity} for area {area}")
if sensor_entity not in self.input_sensors:
self.input_sensors[sensor_entity] = area
self.listen_state(self.on_motion_detected,sensor_entity,old = "off", new = "on", area = area)
if update_on_both_front:
self.listen_state(self.on_motion_detected,sensor_entity,old = "on", new = "off", area = area)
else:
self.log_error(f"{sensor_entity} is already registered for area {self.input_sensors[sensor_entity]}")
def update_area_sensor(self,area,current_time):
time_elapsed = min((current_time - self.dataset['areas_movement_time'][area]) / 60,self.MAX_TIME)
assert time_elapsed != None
self.output_last_motion_sensors[area].set_state(state = int(time_elapsed),attributes = {'unit_of_measurement' : "min"})
def is_excluded_from_last_area_with_movement(self,area):
if "areas_excluded_from_last_area_with_movement" in self.args:
return area in self.args["areas_excluded_from_last_area_with_movement"]
return False
def is_area_initialized(self,area):
return area in self.output_last_motion_sensors #might need a dedicated boolean at some point
def update_areas_data(self,*args):
current_time = time.time()
#we don't want update_areas_data_to_fork if it's called directly from on_state_change
if self.update_cb_handle and self.timer_running(self.update_cb_handle):
self.cancel_timer(self.update_cb_handle)
self.update_cb_handle = None
#I want to start updating the oldest
#if not, the order might not be respected for a fraction of seconds.
#For example if A = 5 and B = 6 and add 2 to the both of them starting by A
#A will be bigger than B (A = 7 and B = 6) for a very short time, and might trigger some automation
for area in sorted(self.dataset['areas_movement_time'], key = lambda area: self.dataset['areas_movement_time'][area]):
if (self.is_area_initialized(area)): self.update_area_sensor(area,current_time)
self.set_state("sensor.last_motion",state = self.dataset['last_area_with_movement'])
self.update_cb_handle = self.run_in(self.update_areas_data,30)
def on_motion_detected(self, entity, attribute, old, new, kwargs):
current_time = time.time()
self.dataset['areas_movement_time'][kwargs['area']] = current_time
self.output_last_motion_time_sensors[kwargs['area']].set_state(state = current_time)
if not self.is_excluded_from_last_area_with_movement(kwargs['area']):
self.dataset['last_area_with_movement'] = kwargs['area']
self.update_areas_data()
def on_door_close(self, entity, attribute, old, new, kwargs):
self.output_door_close_time_sensors[kwargs['area']].set_state(state = time.time())

92
apps/smartlight.py Normal file
View File

@@ -0,0 +1,92 @@
import appdaemon.plugins.hass.hassapi as hass
from smartswitch import SmartSwitch
import ad_toolbox.smartcondition as SmartCondition
class SmartLight(SmartSwitch):
#@SmartCondition.catch_smartcondition_exception(lambda self, message: self.log_error(message,stop_app = True))
def on_initialize_smart_object(self):
self.light_brightness_pct_list = list()
self.light_brightness_pct = None
super().on_initialize_smart_object()
if "increase_brightness_events" in self.args:
self.register_event_from_yaml(self.args["increase_brightness_events"],self.on_increase_brightness_event)
if "decrease_brightness_events" in self.args:
self.register_event_from_yaml(self.args["decrease_brightness_events"],self.on_decrease_brightness_event)
if "brightness_pct_step" in self.args:
self.brightness_pct_step = self.args["brightness_pct_step"]
else: self.brightness_pct_step = 5
if "on_events_with_transition" in self.args:
for key in self.args["on_events_with_transition"]:
#self.log(f"{key}")
self.register_event_from_yaml(self.args["on_events_with_transition"][key]["events"],self.on_turn_on_with_transition,key)
if "light_brightness_pct" in self.args:
self.always_change_brightness = False
for key in self.args["light_brightness_pct"]:
if key == 'always_change_brightness':
self.always_change_brightness = bool(self.args["light_brightness_pct"][key])
else:
self.light_brightness_pct_list.append((SmartCondition.Evaluator(self,self.args["light_brightness_pct"][key],condition_name = key, on_update_cb = self.on_update_light_brightness_pct,constants = self.constants, templates_library = self.templates_library, log_callback_trigger_reason = False),key))
self. on_update_light_brightness_pct()
self.listen_state(self.on_state_change,self.entity_id)
def on_state_change(self, entity, attribute, old, new, *kwargs):
if "icon_override" in self.args:
override_data = self.args['icon_override']
def update_icon(entity_id,new_state): self.set_state(entity_id,attributes = { 'icon' : override_data['on_icon'] if new_state == 'on' else override_data['off_icon'] })
if isinstance(override_data['dest_entities'],list):
for target_entity in override_data['dest_entities']:
update_icon(target_entity,new)
else:
update_icon(override_data['dest_entities'],new)
def on_increase_brightness_event(self, event_name, data, kwargs):
if self.get_state(self.entity_id) != 'off':
self.call_service("light/turn_on", entity_id = self.entity_id, brightness_step_pct = self.brightness_pct_step)
def on_decrease_brightness_event(self, event_name, data, kwargs):
if self.get_state(self.entity_id) != 'off':
self.call_service("light/turn_on", entity_id = self.entity_id, brightness_step_pct = -self.brightness_pct_step)
def on_turn_on_with_transition(self, event_name, data, kwargs,event_category):
transition_time = self.args["on_events_with_transition"][event_category]["transition_time"]
brightness_pct = self.args["on_events_with_transition"][event_category]["brightness_pct"]
self.log(f"Turn on at {brightness_pct}% with a transition of {transition_time}s")
self.call_service("light/turn_on", entity_id = self.entity_id,brightness_pct = 1)
self.call_service("light/turn_on", entity_id = self.entity_id, transition = transition_time,brightness_pct = brightness_pct)
def switch_on(self):
if self.light_brightness_pct != None:
self.log(f"Turn on {self.entity_id} at {self.light_brightness_pct}%")
self.call_service("light/turn_on", entity_id = self.entity_id,brightness_pct = self.light_brightness_pct)
else:
super().switch_on()
def on_update_light_brightness_pct(self):
for brightness_pct_evaluator in self.light_brightness_pct_list:
if brightness_pct_evaluator[0].evaluate(False) == SmartCondition.Result.Succeeded:
if self.light_brightness_pct != brightness_pct_evaluator[1]:
brightness_pct_evaluator[0].log_callback_trigger_reason()
brightness_pct_evaluator[0].log_evaluation_result()
self.light_brightness_pct = brightness_pct_evaluator[1]
self.log_info(f"New brightness : {self.light_brightness_pct}%")
break
if (self.always_change_brightness or self.get_state(self.entity_id) == "on") and self.light_brightness_pct != None:
self.call_service("light/turn_on", entity_id = self.entity_id,brightness_pct = self.light_brightness_pct)
if int(self.light_brightness_pct) == 0: #some integration seems to not turn off the light when you set up a brightness of 0
self.log_info("Turning off")
self.call_service("light/turn_off", entity_id = self.entity_id)

144
apps/smartswitch.py Normal file
View File

@@ -0,0 +1,144 @@
import appdaemon.plugins.hass.hassapi as hass
import ad_toolbox.smartcondition as SmartCondition
from ad_toolbox.smartobject import SmartObject
class SmartSwitch(SmartObject):
#@SmartCondition.catch_smartcondition_exception(lambda self, message: self.log_error(message,stop_app = True))
def on_initialize_smart_object(self):
#super().initialize()
self.off_conditions_evaluator = None
self.smart_conditions_evaluator = None
#self.depends_on_module("smartswitch")
if "smart_conditions" in self.args:
self.smart_conditions_evaluator = SmartCondition.Evaluator(self,self.args['smart_conditions'],condition_name = "smart_conditions",on_change_cb = self.on_smart_conditions_change,constants = self.constants, templates_library = self.templates_library)
if "off_conditions" in self.args:
if self.smart_conditions_evaluator == None:
self.off_conditions_evaluator = SmartCondition.Evaluator(self,self.args['off_conditions'],condition_name = "off_conditions",on_succeed_cb = self.on_off_conditions,constants = self.constants, templates_library = self.templates_library)
else:
self.log(f"Warning you can't have both an off_conditons and a smart_conditions, the off_conditions will be ignored")
if "debug" in self.args:
self.log(f'Registering on_debug_display_event for {self.args["debug"]}')
self.listen_event(self.on_debug_display_event,self.args["debug"])
if "off_events" in self.args:
self.register_event_from_yaml(self.args["off_events"],self.on_turn_off_event)
if "on_events" in self.args:
self.register_event_from_yaml(self.args["on_events"],self.on_turn_on_event)
if "toggle_events" in self.args:
self.register_event_from_yaml(self.args["toggle_events"],self.on_toggle_event)
#todo: replace with register_event_from_yaml
if "toggle_action" in self.args:
self.toggle_action = self.args["toggle_action"]
self.listen_event(self.on_toggle_event_action,"ios.action_fired")
self.auto_switch_cb_handle = None
if 'auto_switch_on_after' in self.args:
if self.is_off():
self.log(f"Smartswitch has been restarted while it was on. Since we have auto_switch_on_after activated we turn it on as we can't know how long is left for the timer",level = 'WARNING')
self.switch_on()
self.listen_state(self.on_state_change,self.entity_id, old = 'on',new = 'off')
if 'auto_switch_off_after' in self.args:
if self.is_on():
self.log(f"Smartswitch has been restarted while it was on. Since we have auto_switch_off_after activated we turn it off as we can't know how long is left for the timer",level = 'WARNING')
self.switch_off()
self.listen_state(self.on_state_change,self.entity_id, old = 'off',new = 'on')
def terminate(self):
self.smart_conditions_evaluator = None
self.off_conditions_evaluator = None
super().terminate()
def on_state_change(self, entity, attribute, old, new, kwargs):
self.log("state changed from " + str(old) + " to " + str(new))
if old != new:
if self.auto_switch_cb_handle != None:
self.cancel_timer(self.auto_switch_cb_handle)
self.auto_switch_cb_handle = None
if new == 'on' and 'auto_switch_off_after' in self.args:
delay = self.args['auto_switch_off_after']
self.log(f"{self.entity_id} will auto switch on in {delay}s")
self.auto_switch_cb_handle = self.run_in(self.on_auto_switch_after, delay, new_state = 'off',autoswitch_delay = delay)
if new == 'off' and 'auto_switch_on_after' in self.args:
delay = self.args['auto_switch_on_after']
self.log(f"{self.entity_id} will auto switch off in {delay}s")
self.auto_switch_cb_handle = self.run_in(self.on_auto_switch_after, delay, new_state = 'on',autoswitch_delay = delay)
def on_auto_switch_after(self, kwargs):
self.auto_switch_cb_handle = None
self.log(f"Switching {self.entity_id} {kwargs['new_state']} after {kwargs['autoswitch_delay']}s")
self.set_state(self.entity_id,state = kwargs['new_state'])
#debug display to display all events
def on_debug_display_event(self,event_name,data,kwargs):
self.log(f"events {event_name} has been catched. data = {data}")
def switch_on(self):
self.log(f"Turn on {self.entity_id}")
self.turn_on(self.entity_id)
# not needed yet
# if self.smart_conditions_evaluator:
# self.smart_conditions_evaluator.force_last_evaluation_result(SmartCondition.Result.Succeeded)
# if self.off_conditions_evaluator:
# self.off_conditions_evaluator.force_last_evaluation_result(SmartCondition.Result.Succeeded)
def switch_off(self):
self.log(f"Turn off {self.entity_id}")
self.turn_off(self.entity_id)
# not needed yet
# if self.smart_conditions_evaluator:
# self.smart_conditions_evaluator.force_last_evaluation_result(SmartCondition.Result.Failed)
# if self.off_conditions_evaluator:
# self.off_conditions_evaluator.force_last_evaluation_result(SmartCondition.Result.Failed)
def on_toggle_event(self, event_name, data, kwargs):
if self.is_on():
self.log(f"Toggled off by event {event_name}")
self.switch_off()
else:
self.log(f"Toggled on by event {event_name}")
self.switch_on()
def is_on(self): return self.get_state(self.entity_id) != 'off'
def is_off(self): return not self.is_on()
def on_turn_off_event(self, event_name, data, kwargs):
self.log(f"Turned off by event {event_name}")
self.switch_off()
def on_turn_on_event(self, event_name, data, kwargs):
self.log(f"Turned on by event {event_name}")
self.switch_on()
def on_toggle_event_action(self, event_name, data, kwargs):
if data['actionName'] == self.toggle_action:
if self.is_on():
self.switch_off()
else:
self.switch_on()
def on_smart_conditions_change(self,prev_result,result):
#trying to track some weird behavior
# if self.smart_conditions_evaluator:
# debug_result = self.smart_conditions_evaluator.evaluate()
# if debug_result != result:
# self.log(f"on_smart_conditions_change was called with prev_result = {prev_result}, result = {result} and evaluate returned {debug_result}")
# else:
# self.log("on_smart_conditions_change was called without a smart_conditions_evaluator")
if result == SmartCondition.Result.Succeeded:
if self.is_off():
self.switch_on()
else :
if self.is_on():
self.switch_off()
def on_off_conditions(self):
if self.is_on():
self.switch_off()