Compare commits

...

20 Commits

Author SHA1 Message Date
c9d65b29fd Added support for the senseo 2026-05-10 22:41:23 +02:00
16af480ee2 Remove mqtt_device_name entry from alarm_clock configuration 2026-05-10 22:01:25 +02:00
f3cee4271f Remove mqtt_device_name entries from motion_tracker and informations_collector 2026-05-10 22:00:23 +02:00
51aea99f57 Change debugger strategy to try fix debugger being unresponsive issue 2026-05-09 13:36:29 +02:00
0e14281516 Refactor auto switch handling in SmartSwitch to use dedicated methods for state changes 2026-05-09 13:35:32 +02:00
9f8d0b68fa Reintroduce a first draft of the information collectors 2026-05-09 13:35:15 +02:00
df47eb8f3b Added motion detector and light to the mezzanine 2026-05-09 13:34:24 +02:00
a344700f5e update subproject commit reference in ad_toolbox 2026-05-06 22:13:12 +02:00
87ba4b93eb Setup the Alarm clock 2026-05-06 22:12:54 +02:00
d6313b8a29 improved brightness handling 2026-05-06 09:04:38 +02:00
a2798d10dc fixed a typo in yaml 2026-05-06 09:02:54 +02:00
d4829a2cab Tweaked sensor light 2026-04-28 22:24:27 +02:00
ea70e60690 fix garage light 2026-04-25 11:23:10 +02:00
0398f5be71 added led strip under the bed 2026-04-23 22:57:55 +02:00
76b550d3df Added motion detection for garage and kitchen 2026-04-23 22:57:33 +02:00
f8c50c6e63 add script to archive log files into timestamped subdirectory 2026-04-20 22:43:52 +02:00
6e8eb58c97 Full bedroom setup with two bedlight, two remote and the sonos led 2026-04-20 22:12:41 +02:00
534059d831 add bedroom nightstand light configuration with brightness control events 2026-04-19 22:47:53 +02:00
08dabb4c7f add brightness step transition configuration for smoother adjustments 2026-04-19 22:47:33 +02:00
14fa4ef078 remove debugger configuration from apps.yaml and link dockerStart.sh to the source 2026-04-19 18:50:02 +02:00
19 changed files with 1007 additions and 34 deletions

1
.vscode/launch.json vendored
View File

@@ -12,6 +12,7 @@
"host": "localhost",
"port": 5678
},
"steppingResumesAllThreads": true,
"pathMappings": [
{
"localRoot": "${workspaceFolder}",

View File

@@ -3,6 +3,7 @@ appdaemon:
longitude: 0
elevation: 30
time_zone: Europe/Berlin
internal_function_timeout: 00:20:00
exclude_dirs:
- unit_tests
plugins:
@@ -11,6 +12,8 @@ appdaemon:
ha_url: http://10.0.0.21:8123
token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJiZThhYWI2ZTlkNmQ0NWU2YTk1ODU5OWJjOWM3MWJkYiIsImlhdCI6MTc3NjEwOTc0NCwiZXhwIjoyMDkxNDY5NzQ0fQ.BorkjFjWlWCZqnUa9-NMUxGsDiupDoRZ3cEgsmeSofM
cert_verify: True
q_timeout: 1200
ws_timeout: 00:10:00
http:
url: http://0.0.0.0:5050
admin:

333
apps/alarmclock.py Normal file
View File

@@ -0,0 +1,333 @@
import appdaemon.plugins.hass.hassapi as hass
from ad_toolbox.smartobject import SmartObject
import ad_toolbox.smartcondition as SmartCondition
import datetime
from threading import Lock
from requests import get
import requests
import icalendar
from enum import Enum
class EventType(Enum):
Vacation = 1
Flex = 2
Busy = 3
Tentative = 4
class CalendarEvent:
def __init__(self,start_time,stop_time,event_type):
self.start_time = start_time
self.stop_time = stop_time
self.type = event_type
def __str__(self):
return f"[{self.type}]Start at {self.start_time},Finish at {self.stop_time}"
def __eq__(self, obj):
return isinstance(obj, CalendarEvent) and obj.type == self.type and obj.start_time == self.start_time and obj.stop_time == self.stop_time
def __ne__(self, obj):
return not self == obj
class AlarmClock(SmartObject):
def on_initialize_smart_object(self):
self.switch_alarmclock_main = self.args['switch_alarmclock_main']
self.switch_alarmclock_secondary = self.args['switch_alarmclock_secondary']
self.inputdate_alarmclock_main = self.args['inputdate_alarmclock_main']
self.inputdate_alarmclock_secondary = self.args['inputdate_alarmclock_secondary']
self.input_text_wakeup_time = self.args['input_text_wakeup_time']
#self.sensor_wakeup_datetime = self.args['sensor_wakeup_datetime']
self.sleeping_condition = SmartCondition.Evaluator(self, self.args['sleeping_condition'], condition_name="sleeping_condition", constants=self.constants, templates_library=self.templates_library)
self.lock = Lock()
self.wakeup_time_str = ""
self.handle_program_next_wakueup_cb = None
self.handle_prewakueup_cb = None
self.handle_wakueup_cb = None
self.handle_switch_reveil_principal = self.listen_state(self.on_alarm_clock_set, self.switch_alarmclock_main)
self.handle_switch_reveil_secondaire = self.listen_state(self.on_alarm_clock_set, self.switch_alarmclock_secondary)
self.listen_state(self.on_alarm_clock_set, self.inputdate_alarmclock_main)
self.listen_state(self.on_alarm_clock_set, self.inputdate_alarmclock_secondary)
self.run_hourly(self.on_refresh_cal,datetime.time(0, 0, 0))
self.sensor_wakeup_datetime_entity = self.create_entity("sensor.wakeup_datetime", icon="mdi:alarm")
#sensor_wakeup_datetime_entity = self.get_entity(self.sensor_wakeup_datetime)
#if not sensor_wakeup_datetime_entity.exists():
# sensor_wakeup_datetime_entity.add()
self.calendar = list()
self.refresh_cal()
self.update_wakeup_time()
def refresh_cal(self):
new_cal = list()
#if self.load_cal_from_google_calendar(new_cal):
if self.load_cal_from_ics(new_cal):
if not self.are_cal_identical(self.calendar,new_cal):
self.calendar = new_cal
self.log("Calendar has been updated!")
self.print_cal(self.calendar)
return True
return False
def are_cal_identical(self,calendar1,calendar2):
if len(calendar1) != len(calendar2): return False
for i, event in enumerate(calendar1):
if calendar2[i] != event:
return False
return True
def print_cal(self,calendar):
for event in calendar:
self.log(f"{event}")
def load_cal_from_ics(self,calendar):
calendar.clear()
try:
self.log(f"Retrieving calengar from outlook.office365.com")
r = get("https://outlook.office365.com/owa/calendar/0c64359c70eb41cd9713fe92c470d3df@ubisoft.com/5edb3fb5150f4203b66d9cf8db3184be11626033572842691749/calendar.ics", timeout=10)#, headers=headers, verify=False)
#self.log(f"Processing Calendar")
today = datetime.datetime.combine(datetime.date.today(), datetime.time(0,0, 0))
start_load_date = today - datetime.timedelta(days=1)
end_load_date = today + datetime.timedelta(days=2)
#self.log("[" + str(start_load_date) + "]-[" + str(end_load_date) + "]")
calendar_ubi = icalendar.Calendar.from_ical(r.text)
for component in calendar_ubi.walk():
if component.name == "VEVENT":
event_summary = component.get('summary')
if event_summary == "Away": event_type = EventType.Vacation
elif event_summary == "Busy": event_type = EventType.Busy
elif event_summary == "Tentative": event_type = EventType.Tentative
elif event_summary == "Working elsewhere" or event_summary == "Free": event_type = EventType.Flex
else: event_type = None
start_datetime = component.get('dtstart').dt
end_datetime = component.get('dtend').dt
#regular event have a datetime and full day events only a date
#let's make both event a time zone unaware datetime
if isinstance(start_datetime,datetime.datetime):
start_datetime = start_datetime.replace(tzinfo=None)
else:
start_datetime = datetime.datetime.combine(start_datetime,datetime.time(0,0,0))
if isinstance(end_datetime,datetime.datetime):
end_datetime = end_datetime.replace(tzinfo=None)
else:
end_datetime = datetime.datetime.combine(end_datetime,datetime.time(0,0,0))
outside_of_loading_period = False
if start_datetime < start_load_date and end_datetime < start_load_date:
outside_of_loading_period = True
if start_datetime > end_load_date and end_datetime > end_load_date:
outside_of_loading_period = True
if not outside_of_loading_period:
if event_type:
calendar.append(CalendarEvent(start_datetime,end_datetime,event_type))
else: self.log(f"[Unknow event : {event_summary}]Start at {start_datetime},Finish at {end_datetime}",level = "WARNING")
except ValueError:
self.log_error("Couldn't load calendar")
return False
except (requests.exceptions.ConnectionError,requests.exceptions.ReadTimeout):
self.log("Couldn't load calendar, request Timeout",level = "WARNING")
return False
#self.log(f"Calendar loaded")
return True
def on_alarm_clock_set(self, entity, attribute, old, new, kwargs):
self.update_wakeup_time()
def on_refresh_cal(self, kwargs):
if self.refresh_cal():
self.update_wakeup_time()
def on_program_next_wakeup(self, kwargs):
self.handle_program_next_wakueup_cb = None
no_switch_changed = True
#le reveil principal se reactive chaque matin
if self.get_state(self.switch_alarmclock_main) == 'off':
self.log("Turning on main alarm clock")
no_switch_changed = False
self.turn_on(self.switch_alarmclock_main)
#le reveil secondaire ne sonne qu'une fois, il est donc desactivé chaque matin
if self.get_state(self.switch_alarmclock_secondary) == 'on':
self.log("Turning off secondary alarm clock")
no_switch_changed = False
self.turn_off(self.switch_alarmclock_secondary)
if no_switch_changed:
self.update_wakeup_time()
def on_pre_wakeup(self, kwargs):
self.handle_prewakueup_cb = None
for key, value in kwargs.items():
if key == 'wakeup_in' :
wakeup_in = value
if wakeup_in > 0:
self.handle_prewakueup_cb = self.run_in(self.on_pre_wakeup, 5 * 60,wakeup_in = wakeup_in - 5)
wakeup_message = "On se reveil dans " + str(wakeup_in) + " minutes"
self.log(wakeup_message)
if self.sleeping_condition.evaluate() == SmartCondition.Result.Succeeded: self.fire_event("WAKEUP_IN", wakeup_in = wakeup_in)
#self.call_service("notify/ios_iphone_de_pierre", title = "Reveil", message = wakeup_message)
def on_wakeup(self, kwargs):
self.handle_wakueup_cb = None
wakeup_message = "WAKE UP!!!! It's " + self.wakeup_time_str
self.log(wakeup_message)
if self.sleeping_condition.evaluate() == SmartCondition.Result.Succeeded: self.fire_event("WAKEUP")
#self.call_service("notify/ios_iphone_de_pierre", title = "Reveil", message = wakeup_message)
def is_wakeup_time_on_a_working_day(self, wakeup_datetime):
if self.is_during_vacation(wakeup_datetime):
return False
if wakeup_datetime.weekday() >= 5: #saturday is 5, sunday is 6
return False
return True
def is_during_vacation(self, date_time):
for event in self.calendar:
if event.type == EventType.Vacation and event.start_time <= date_time and date_time <= event.stop_time:
return True
return False
def is_wakeup_time_on_a_flex_day(self, date_time):
return False
#we'll see about that later
for event in self.calendar:
if event.type == EventType.Flex and event.start_time <= date_time and date_time <= event.stop_time:
return True
return False
def get_next_busy(self,date_time):
for event in self.calendar:
if event.type == EventType.Busy and event.start_time >= date_time:
return event
return None
def offset_wakeup_time_for_meeting(self,wakeup_datetime):
date_time = wakeup_datetime - datetime.timedelta(hours=wakeup_datetime.hour,minutes=wakeup_datetime.minute)
busy_event = self.get_next_busy(date_time)
if busy_event and busy_event.start_time.date() == date_time.date() and busy_event.start_time.hour > 6 and busy_event.start_time <= wakeup_datetime:
wake_up_datetime = busy_event.start_time - datetime.timedelta(minutes=30)
new_wake_up_time = wake_up_datetime.strftime("%H:%M")
self.log(f"A meeting have been found at {busy_event.start_time}, we'll wake up at {new_wake_up_time}")
return new_wake_up_time
new_wake_up_time = wakeup_datetime.strftime("%H:%M")
#self.log(f"No meeting have been found before {wake_up_time}, we'll wake up at {wake_up_time}")
return new_wake_up_time
def calculate_wakeup_time(self):
new_wakeup_time = "--:--"
if self.get_state(self.switch_alarmclock_secondary) == 'on':
new_wakeup_time = self.get_state(self.inputdate_alarmclock_secondary)[0:5]
elif self.get_state(self.switch_alarmclock_main) == 'on':
max_flex_wakeup_time = "10:00" #todo: make that data driven
max_flex_wakeup_datetime = self.generate_wakeup_datetime_from_string(max_flex_wakeup_time)
new_wakeup_time = self.get_state(self.inputdate_alarmclock_main)[0:5]
new_wakeup_datetime = self.generate_wakeup_datetime_from_string(new_wakeup_time)
if not self.is_wakeup_time_on_a_working_day(new_wakeup_datetime):
self.log(f"Primary Wake up time ({new_wakeup_datetime}) is not on a working day")
new_wakeup_time = "--:--"
elif self.is_wakeup_time_on_a_flex_day(max_flex_wakeup_datetime):
self.log(f"Flex Wake up time ({max_flex_wakeup_datetime}) is on a flex office day")
new_wakeup_time = self.offset_wakeup_time_for_meeting(max_flex_wakeup_datetime)
else:
new_wakeup_time = self.offset_wakeup_time_for_meeting(new_wakeup_datetime)
return new_wakeup_time
def generate_wakeup_datetime_from_string(self,wakeup_string):
wakeup_time = datetime.time(int(wakeup_string[0:2]), int(wakeup_string[3:5]), 0)
wakeup_datetime = datetime.datetime.combine(datetime.date.today(),wakeup_time)
if (datetime.datetime.combine(datetime.date.today(), wakeup_time) - datetime.datetime.now()).total_seconds() <= 0:
wakeup_datetime = wakeup_datetime + datetime.timedelta(days=1)
return wakeup_datetime
def register_callbacks(self, wakeup_time_str):
if self.wakeup_time_str != wakeup_time_str or not self.handle_program_next_wakueup_cb:
with self.lock:
#first, cancel all previous callbacks
if self.handle_program_next_wakueup_cb:
self.log("Canceling previous program next wakeup cb")
self.cancel_timer(self.handle_program_next_wakueup_cb)
self.handle_program_next_wakueup_cb = None
if self.handle_prewakueup_cb:
self.log("Canceling previous pre wakeup cb")
self.cancel_timer(self.handle_prewakueup_cb)
self.handle_prewakueup_cb = None
if self.handle_wakueup_cb:
self.log("Canceling previous wakeup cb")
self.cancel_timer(self.handle_wakueup_cb)
self.handle_wakueup_cb = None
if wakeup_time_str == '--:--':
#if their is no alarm clock set up, we just need to program a callback to update the wakeup time tomorow
# 12:00 should do it
program_next_wakeup_time = datetime.datetime.combine(datetime.date.today(), datetime.time(12,0, 0))
#if we toggle off an alalrm clock before 4, we assume it's for the 'next' day (meaning tbe same day between 0 and 4)
#if we toggle off an alalrm clock after 4, it's suppose to stay so untill next day
if datetime.datetime.now().hour >= 4:
program_next_wakeup_time = program_next_wakeup_time + datetime.timedelta(days=1)
self.log("program next wakeup cb registered for " + str(program_next_wakeup_time))
self.handle_program_next_wakueup_cb = self.run_at(self.on_program_next_wakeup, program_next_wakeup_time)
self.sensor_wakeup_datetime_entity.set_state(state = 'unavailable')
else:
wakeup_datetime = self.generate_wakeup_datetime_from_string(wakeup_time_str)
self.sensor_wakeup_datetime_entity.set_state(state = wakeup_datetime.strftime("%H:%M"))
#the program next wakeup CB need to be slightly after the wakeup CB to avoid the risk of wakeup CB cleaning it's handle AFTER program next wakup set up the next one
program_next_wakeup_datetime = wakeup_datetime + datetime.timedelta(seconds=1)
prewakeup_offset = min(60,(wakeup_datetime - datetime.datetime.now()).total_seconds() / 60)
prewakeup_offset = int(prewakeup_offset / 10) * 10
prewakeup_datetime = wakeup_datetime - datetime.timedelta(minutes=prewakeup_offset)
self.handle_prewakueup_cb = self.run_at(self.on_pre_wakeup, prewakeup_datetime,wakeup_in = prewakeup_offset)
self.log("prewakup cb registered for " + str(prewakeup_datetime))
self.handle_wakueup_cb = self.run_at(self.on_wakeup, wakeup_datetime)
self.log("wakeup cb registered for " + str(wakeup_datetime))
self.handle_program_next_wakueup_cb = self.run_at(self.on_program_next_wakeup, program_next_wakeup_datetime)
self.log("program next wakeup cb registered for " + str(program_next_wakeup_datetime))
self.wakeup_time_str = wakeup_time_str
self.call_service("input_text/set_value", entity_id = self.input_text_wakeup_time, value = self.wakeup_time_str)
def update_wakeup_time(self):
self.register_callbacks(self.calculate_wakeup_time())

View File

@@ -1,7 +1,9 @@
debugger:
module: debugger
class: Debugger
priority: 0
port: 5678
wait_for_client: false # set to true when you need to debug initialize()
motion_tracker:
module: motiontracker
@@ -9,8 +11,6 @@ motion_tracker:
priority: 5 # this need to be initialized before app using motion tracker
mqtt_device_name: AD Motion Tracker
max_time: 30
areas:
@@ -20,25 +20,60 @@ motion_tracker:
motion_sensors: binary_sensor.hallway_motion
restroom:
motion_sensors: binary_sensor.restroom_motion
garage:
motion_sensors: binary_sensor.garage_motion
kitchen:
motion_sensors: binary_sensor.kitchen_motion
mezzanine:
motion_sensors: binary_sensor.mezzanine_motion
sleep_switch:
module: smartswitch
class: SmartSwitch
informations_collector:
module: informationscollector
class: InformationsCollector
priority: 1
entity: input_boolean.sleeping
#mute: True
toggle_events:
button_long_press:
event_name: zha_event
event_data:
device_name: 'bedroom_hue_remote'
args:
press_type: 'hold'
button: 'off'
reset_data:
device_name: 'bedroom_hue_remote'
args:
button: 'off'
press_type: 'long_release'
error_collector:
reset_error_count_button: input_button.reset_error_count
output_sensor: sensor.ad_errors
# min_max_temp_sensors:
# temperature_sensors:
# - sensor.temperature_exterieur_nord
# - sensor.temperature_exterieur_sud
# output_sensors:
# max_temp: sensor.max_temp_outside
# max_temp_yesterday: sensor.max_temp_yesterday_outside
# min_temp: sensor.min_temp_outside
# min_temp_yesterday: sensor.min_temp_yesterday_outside
group_collectors:
windows:
group: binary_sensor.group_windows
output_sensor: sensor.ad_number_of_windows_open
music_players:
group: media_player.group_music_player
output_sensor: sensor.ad_number_of_music_player_active
active_condition: self == 'playing'
#windows:
# group: group.fenetres
# output_sensor: sensor.ad_number_of_windows_open
# shutters:
# group: group.volets
# output_sensor: sensor.ad_number_of_shutter_close
# occupency_sensors:
# group: group.occupency_sensors
# output_sensor: sensor.ad_number_of_rooms_occupied
# smart_lights:
# group:
# - group.smart_lights
# - group.dumb_lights
# output_sensor: sensor.ad_number_of_lights_on

View File

@@ -5,8 +5,14 @@ import debugpy
class Debugger(hass.Hass):
def initialize(self):
port = int(self.args.get("port", 5678))
wait_for_client = bool(self.args.get("wait_for_client", False))
try:
debugpy.listen(("0.0.0.0", port))
self.log(f"debugpy listening on port {port}")
except RuntimeError:
self.log("debugpy already listening, skipping")
if wait_for_client:
self.log(f"waiting for debug client on port {port}")
debugpy.wait_for_client()
self.log("debug client attached")

View File

@@ -0,0 +1,182 @@
import ad_toolbox.smartcondition as SmartCondition
from ad_toolbox.smartobject import SmartObject
import pickle
import os
class InformationsCollector(SmartObject):
def on_initialize_smart_object(self):
super().on_initialize_smart_object()
if self.dataset is None:
self.dataset = dict()
if 'error_collector' in self.args:
if 'output_sensor' in self.args['error_collector']:
sensor_name = self.args['error_collector']['output_sensor']
self.error_list = []
self.log_handle = self.listen_log(self.on_log_error, "ERROR", log="main_log")
self.error_count_sensor = self.create_entity(
sensor_name, state=0,
attributes={"last_error": "", "error_list": []},
icon="mdi:alert-circle-outline"
)
if 'reset_error_count_button' in self.args['error_collector']:
self.listen_state(self.on_reset_error_count, self.args['error_collector']['reset_error_count_button'])
else:
self.log_error("No output_sensor defined for error_collector")
if 'min_max_temp_sensors' in self.args:
for sensor in self.args['min_max_temp_sensors']['temperature_sensors']:
self.listen_state(self.on_temperature_change, sensor)
output_sensors = self.args['min_max_temp_sensors']['output_sensors']
self.max_temp_sensor = self.get_entity(output_sensors['max_temp'])
self.max_temp_yesterday_sensor = self.get_entity(output_sensors['max_temp_yesterday'])
self.min_temp_sensor = self.get_entity(output_sensors['min_temp'])
self.min_temp_yesterday_sensor = self.get_entity(output_sensors['min_temp_yesterday'])
current_min_temp, current_max_temp = self.compute_min_max_temp()
if not self.max_temp_sensor.exists() or self.max_temp_sensor.get_state() == 'unavailable':
max_temp = current_max_temp
try:
if max_temp is None or self.dataset['max_temp'] > max_temp:
max_temp = self.dataset['max_temp']
self.log_info(f"Restoring max_temp from dataset ({max_temp}°C)")
except KeyError: pass
self.set_max_temp_sensor(max_temp)
if not self.min_temp_sensor.exists() or self.min_temp_sensor.get_state() == 'unavailable':
min_temp = current_min_temp
try:
if min_temp is None or self.dataset['min_temp'] < min_temp:
min_temp = self.dataset['min_temp']
self.log_info(f"Restoring min_temp from dataset ({min_temp}°C)")
except KeyError: pass
self.set_min_temp_sensor(min_temp)
self.run_daily(self.run_at_midnight, "00:00:00")
if 'group_collectors' in self.args:
self._group_sensors = {}
self._group_evaluators = {}
for name, config in self.args['group_collectors'].items():
self._group_sensors[name] = self.create_entity(
config['output_sensor'], state=0,
attributes={'active_entities': ''},
icon='mdi:counter'
)
entities = self._get_group_entities(config)
if 'active_condition' in config:
self._group_evaluators[name] = {
entity: SmartCondition.Evaluator(
self,
config['active_condition'],
condition_name=f"{name}_{entity}",
constants={'self': entity},
log_callback_trigger_reason=False,
on_change_cb=self._on_group_condition_changed,
)
for entity in entities
}
else:
for entity in entities:
self.listen_state(self.on_group_entity_state_changed, entity)
self.update_group_collectors()
def set_max_temp_sensor(self,temperature):
self.dataset['max_temp'] = temperature
self.max_temp_sensor.set_state(state = temperature, attributes = { 'unit_of_measurement': '°C', 'device_class': 'temperature'})
def set_min_temp_sensor(self,temperature):
self.dataset['min_temp'] = temperature
self.min_temp_sensor.set_state(state = temperature, attributes = { 'unit_of_measurement': '°C', 'device_class': 'temperature'})
def terminate(self):
self.log_info("Writing dataset to " + self.get_dataset_name())
f = open(self.get_dataset_name(), 'wb')
pickle.dump(self.dataset, f)
f.close()
def compute_min_max_temp(self):
max_temp = None
min_temp = None
for sensor in self.args['min_max_temp_sensors']['temperature_sensors']:
temp = self.get_state(sensor)
if not max_temp or temp > max_temp: max_temp = temp
if not min_temp or temp < min_temp: min_temp = temp
return min_temp,max_temp
def on_log_error(self, name, ts, level, type, message, kwargs):
error_count = int(self.error_count_sensor.get_state()) + 1
self.error_list.append(message)
self.error_count_sensor.set_state(state=error_count, attributes={"last_error": message, "error_list": self.error_list})
def on_reset_error_count(self, entity, attribute, old, new, kwargs):
error_log_file = self.get_ad_api()._logging.config['error_log']['filename']
open(error_log_file, 'w').close()
self.error_list.clear()
self.error_count_sensor.set_state(state=0, attributes={"last_error": "", "error_list": self.error_list})
def _on_group_condition_changed(self, prev_result,result):
self.update_group_collectors()
def on_group_entity_state_changed(self, entity, attribute, old, new, kwargs):
if new != old:
self.log_info(f"updating group collectors ({entity} changed from {old} to {new})")
self.update_group_collectors()
def on_temperature_change(self, entity, attribute, old, new, kwargs):
if new != old:
min_temp = self.min_temp_sensor.get_state()
max_temp = self.max_temp_sensor.get_state()
if max_temp is None or max_temp == 'unavailable' or new > max_temp: self.set_max_temp_sensor(new)
if min_temp is None or min_temp == 'unavailable' or new < min_temp: self.set_min_temp_sensor(new)
def run_at_midnight(self, kwargs):
self.max_temp_yesterday_sensor.set_state(state=self.max_temp_sensor.get_state(), attributes={'unit_of_measurement': '°C', 'device_class': 'temperature'})
self.min_temp_yesterday_sensor.set_state(state=self.min_temp_sensor.get_state(), attributes={'unit_of_measurement': '°C', 'device_class': 'temperature'})
min_temp, max_temp = self.compute_min_max_temp()
self.set_max_temp_sensor(max_temp)
self.set_min_temp_sensor(min_temp)
def _get_group_entities(self, config):
group = config['group']
if isinstance(group, str):
return self.get_state(group, attribute='entity_id') or []
entities = []
for g in group:
entities.extend(self.get_state(g, attribute='entity_id') or [])
return entities
def _is_entity_active(self, name, entity):
if name in self._group_evaluators:
return self._group_evaluators[name][entity].evaluate(log=False) == SmartCondition.Result.Succeeded
return self.get_state(entity) == 'on'
# def _get_group_entity_name(self, entity):
# entity_state = self.get_state(entity, attribute='all') or {}
# attributes = entity_state.get('attributes', {})
# friendly_name = attributes.get('friendly_name')
# return friendly_name if isinstance(friendly_name, str) and friendly_name else entity
def update_group_collectors(self):
for name, config in self.args['group_collectors'].items():
sensor = self._group_sensors[name]
prev_state = sensor.get_state()
prev_count = int(prev_state) if prev_state is not None else 0
active_entities = []
for entity in self._get_group_entities(config):
if self.entity_exists(entity) and self._is_entity_active(name, entity):
active_entities.append(entity)
count = len(active_entities)
sensor.set_state(state=count, attributes={'active_entities': ', '.join(active_entities)})
if prev_count != count:
self.log_info(f"{name}: {count} active entities (previously {prev_count})")

180
apps/rooms/bedroom.yaml Normal file
View File

@@ -0,0 +1,180 @@
sleep_switch:
module: smartswitch
class: SmartSwitch
entity: input_boolean.sleeping
toggle_events:
button_long_press_pierre:
event_name: zha_event
event_data:
device_name: 'bedroom_remote_pierre'
args:
press_type: 'hold'
button: 'off'
reset_data:
device_name: 'bedroom_remote_pierre'
args:
button: 'off'
press_type: 'long_release'
button_long_press_maeva:
event_name: zha_event
event_data:
device_name: 'bedroom_remote_maeva'
args:
press_type: 'hold'
button: 'off'
reset_data:
device_name: 'bedroom_remote_maeva'
args:
button: 'off'
press_type: 'long_release'
off_events:
- WAKEUP
bedroom_light_switch:
module: smartswitch
class: SmartSwitch
entity: switch.bedroom_light
off_events:
- turn_off_all_lights
toggle_events:
button_press_pierre:
event_name: zha_event
event_data:
device_name: 'bedroom_remote_pierre'
args:
press_type: 'press'
button: 'on'
button_press_maeva:
event_name: zha_event
event_data:
device_name: 'bedroom_remote_maeva'
args:
press_type: 'press'
button: 'on'
sonos_led:
module: smartswitch
class: SmartSwitch
entity: switch.bedroom_led
smart_conditions: not input_boolean.sleeping
light_bedroom_bedlight_pierre:
module: smartlight
class: SmartLight
entity: light.bedroom_bedlight_pierre
toggle_events:
button_press:
event_name: zha_event
event_data:
device_name: 'bedroom_remote_pierre'
args:
press_type: 'press'
button: 'off'
off_events:
- turn_off_all_lights
increase_brightness_events:
up_hold:
event_name: zha_event
event_data:
device_name: 'bedroom_remote_pierre'
args:
press_type: 'hold'
button: 'up'
decrease_brightness_events:
down_hold:
event_name: zha_event
event_data:
device_name: 'bedroom_remote_pierre'
args:
press_type: 'hold'
button: 'down'
on_events_with_transition:
wake_up_events:
transition_time: 120
brightness_pct: 50
events:
WAKEUP_IN:
event_name: WAKEUP_IN
event_data:
wakeup_in: 5
light_bedroom_bedlight_maeva:
module: smartlight
class: SmartLight
entity: light.bedroom_bedlight_maeva
toggle_events:
button_press:
event_name: zha_event
event_data:
device_name: 'bedroom_remote_maeva'
args:
press_type: 'press'
button: 'off'
off_events:
- turn_off_all_lights
increase_brightness_events:
up_hold:
event_name: zha_event
event_data:
device_name: 'bedroom_remote_maeva'
args:
press_type: 'hold'
button: 'up'
decrease_brightness_events:
down_hold:
event_name: zha_event
event_data:
device_name: 'bedroom_remote_maeva'
args:
press_type: 'hold'
button: 'down'
bed_ledstrip:
module: smartlight
class: SmartLight
entity: light.bedroom_ledstripe
smart_conditions:
trigger_conditions: (input_boolean.sleeping or binary_sensor.day_interval_night) and (binary_sensor.bedroom_motion_bed_pierre or binary_sensor.bedroom_motion_bed_maeva)
blocking_conditions: switch.bedroom_light or light.bedroom_bedlight_pierre or light.bedroom_bedlight_maeva
on_events_with_transition:
wake_up_events:
transition_time: 120
brightness_pct: 50
events:
WAKEUP_IN:
event_name: WAKEUP_IN
event_data:
wakeup_in: 8
light_brightness_pct: 1
alarm_clock:
module: alarmclock
class: AlarmClock
switch_alarmclock_main: input_boolean.main_alarmclock
switch_alarmclock_secondary: input_boolean.secondary_alarmclock
inputdate_alarmclock_main: input_datetime.main_alarmclock
inputdate_alarmclock_secondary: input_datetime.secondary_alarmclock
input_text_wakeup_time: input_text.wakeup_time
sleeping_condition: input_boolean.sleeping

View File

@@ -25,6 +25,7 @@ light_hallway:
1: input_boolean.sleeping
25: binary_sensor.day_interval_night
100: True
light_restroom:
module: smartlight
@@ -38,4 +39,7 @@ light_restroom:
light_brightness_pct:
1: input_boolean.sleeping
25: binary_sensor.day_interval_night
100: True
100: True

8
apps/rooms/garage.yaml Normal file
View File

@@ -0,0 +1,8 @@
light_garage:
module: smartlight
class: SmartLight
entity: switch.garage_01
smart_conditions:
trigger_conditions: sensor.garage_last_motion < 3
blocking_conditions: sensor.garage_motion_light_level > 10

93
apps/rooms/kitchen.yaml Normal file
View File

@@ -0,0 +1,93 @@
light_kitchen_sink:
module: smartlight
class: SmartLight
entity: light.kitchen_sink
smart_conditions:
- light.kitchen_sink_switch
- sensor.kitchen_last_motion < 3 and (sensor.kitchen_motion_light_level < 10 or self)
light_brightness_pct:
100: light.kitchen_sink_switch
1: binary_sensor.day_interval_night or input_boolean.sleeping
15: True
light_kitchen_sink_switch:
module: smartswitch
class: SmartSwitch
entity: light.kitchen_sink_switch
off_events:
- turn_off_all_lights
senseo_quadrante:
module: senseo
class: Senseo
virtual_sensors:
default_values:
# this is done to break the circular dependencies between sensor.senseo_cup_icon and sensor.senseo_cup
sensor.senseo_cup_icon: "" # "mdi:coffee-off-outline"
sensors:
value_selector.senseo_cup:
attributes:
friendly_name: Tasse
sensor_attributes:
icon: sensor.senseo_cup_icon
values:
unavailable: sensor.senseo_quadrante_operating_state == 'unavailable'
Prête: binary_sensor.senseo_quadrante_cup_available and binary_sensor.senseo_quadrante_cup_full
Service en cours: binary_sensor.senseo_quadrante_cup_available and sensor.senseo_quadrante_operating_state == 'SENSEO_BREWING'
Vide: binary_sensor.senseo_quadrante_cup_available
Absente: True
value_selector.senseo_cup_icon:
"mdi:coffee": sensor.senseo_cup == 'Prête'
"mdi:coffee-outline": sensor.senseo_cup == 'Vide' or sensor.senseo_cup == 'Service en cours'
"mdi:coffee-off-outline": True
value_selector.senseo_programmation:
attributes:
friendly_name: Programme
icon: mdi:coffee-to-go
values:
unavailable: sensor.senseo_quadrante_operating_state == 'unavailable'
1 Tasse (douche): binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 1 and binary_sensor.senseo_quadrante_has_program.power_pressed
2 Tasses (douche): binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 2 and binary_sensor.senseo_quadrante_has_program.power_pressed
1 Tasse: binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 1
2 Tasses: binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 2
Aucun: True
clear_program_button: button.senseo_quadrante_clear_program
programs:
one_cup_shower:
events:
shower_finished:
event_name: shower_finished
button: button.senseo_quadrante_brew_coffee_normal
condition: binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 1 and binary_sensor.senseo_quadrante_has_program.power_pressed
two_cup_shower:
events:
shower_finished:
event_name: shower_finished
button: button.senseo_quadrante_brew_coffee_double
condition: binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 2 and binary_sensor.senseo_quadrante_has_program.power_pressed
one_cup_morning:
delay: 10
events:
good_morning:
event_name: good_morning
button: button.senseo_quadrante_brew_coffee_normal
condition: binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 1 and not binary_sensor.senseo_quadrante_has_program.power_pressed
two_cup_morning:
delay: 10
events:
good_morning:
event_name: good_morning
button: button.senseo_quadrante_brew_coffee_double
condition: binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 2 and not binary_sensor.senseo_quadrante_has_program.power_pressed

View File

@@ -0,0 +1,27 @@
light_mezzanine_01:
module: smartlight
class: SmartLight
entity: light.mezzanine_01
smart_conditions:
trigger_conditions: sensor.mezzanine_last_motion < 3
blocking_conditions: sensor.mezzanine_motion_light_level > 10 and not self
light_brightness_pct:
15: input_boolean.sleeping
35: binary_sensor.day_interval_night
100: True
light_mezzanine_02:
module: smartlight
class: SmartLight
entity: light.mezzanine_02
smart_conditions:
trigger_conditions: sensor.mezzanine_last_motion < 3
blocking_conditions: sensor.mezzanine_motion_light_level > 10 and not self
light_brightness_pct:
5: input_boolean.sleeping
25: binary_sensor.day_interval_night
100: True

46
apps/senseo.py Normal file
View File

@@ -0,0 +1,46 @@
from ad_toolbox.smartobject import SmartObject
import ad_toolbox.smartcondition as SmartCondition
from ad_toolbox.eventhandler import EventHandler
class Senseo(SmartObject):
def on_initialize_smart_object(self):
super().on_initialize_smart_object()
self.delay_cb_handle = None
self.smartconditions = dict()
self.event_handlers = list()
try: self.program_configs = self.args['programs']
except KeyError: self.program_configs = None
for config_name in self.program_configs:
config = self.program_configs[config_name]
self.smartconditions[config_name] = SmartCondition.Evaluator(self,config['condition'],condition_name = config_name)
if not self.entity_exists(config['button']): self.log_error(f"Entity not found {config['button']}")
self.event_handlers.append(EventHandler(self,config["events"],self.on_event,config_name))
def on_event(self, event_name, event_data,config_name):
self.log_info(f"Event {event_name} received. Config name = {config_name}")
config = self.program_configs[config_name]
try: delay = config['delay']
except KeyError: delay = 0
if self.smartconditions[config_name].evaluate() == SmartCondition.Result.Succeeded:
if (delay > 0):
if self.delay_cb_handle: self.cancel_timer(self.delay_cb_handle)
self.log_info(f"The coffee will be prepared in {delay}s")
self.delay_cb_handle = self.run_in(self.on_delay_elapse,delay,config_name = config_name)
else:
self.prepare_coffee(config_name)
def on_delay_elapse(self, kwargs):
self.delay_cb_handle = None
self.prepare_coffee(kwargs['config_name'])
def prepare_coffee(self,config_name):
config = self.program_configs[config_name]
self.log_info(f"Preparing Coffee ({config_name})")
self.call_service("button/press", entity_id = config['button'])
self.call_service("button/press", entity_id = self.args['clear_program_button'])

View File

@@ -17,6 +17,10 @@ from ad_toolbox.eventhandler import EventHandler
# brightness_pct_step: <int> # optional, default 5
# Step size (% of full brightness) used by increase/decrease events.
#
# brightness_step_transition: <seconds> # optional, default 0.5
# Transition duration applied to each increase/decrease brightness
# step. Smooths out the brightness change instead of jumping.
#
# increase_brightness_events: # optional
# <EventHandler events_block>
# HA events that increase brightness by brightness_pct_step.
@@ -69,6 +73,7 @@ from ad_toolbox.eventhandler import EventHandler
# entity: light.living_room
#
# brightness_pct_step: 10
# brightness_step_transition: 0.5
#
# increase_brightness_events:
# btn_up:
@@ -126,6 +131,8 @@ class SmartLight(SmartSwitch):
self.brightness_pct_step = self.args["brightness_pct_step"]
else: self.brightness_pct_step = 5
self.brightness_step_transition = self.args.get("brightness_step_transition", 0.8)
# on_events_with_transition: one EventHandler per labelled transition
if "on_events_with_transition" in self.args:
for key in self.args["on_events_with_transition"]:
@@ -134,13 +141,15 @@ class SmartLight(SmartSwitch):
# light_brightness_pct: build ordered evaluator list, run initial pass
if "light_brightness_pct" in self.args:
self.always_change_brightness = False
for key in self.args["light_brightness_pct"]:
if key == 'always_change_brightness':
self.always_change_brightness = bool(self.args["light_brightness_pct"][key])
else:
self.light_brightness_pct_list.append((SmartCondition.Evaluator(self,self.args["light_brightness_pct"][key],condition_name = key, on_update_cb = self.on_update_light_brightness_pct,constants = self.constants, templates_library = self.templates_library, log_callback_trigger_reason = False),key))
self. on_update_light_brightness_pct()
if isinstance(self.args["light_brightness_pct"], int):
self.light_brightness_pct = self.args["light_brightness_pct"]
else:
for key in self.args["light_brightness_pct"]:
if key == 'always_change_brightness':
self.always_change_brightness = bool(self.args["light_brightness_pct"][key])
else:
self.light_brightness_pct_list.append((SmartCondition.Evaluator(self,self.args["light_brightness_pct"][key],condition_name = key, on_update_cb = self.on_update_light_brightness_pct,constants = self.constants, templates_library = self.templates_library, log_callback_trigger_reason = False),key))
self.on_update_light_brightness_pct()
self.listen_state(self.on_state_change,self.entity_id)
@@ -166,17 +175,17 @@ class SmartLight(SmartSwitch):
# Increase brightness by brightness_pct_step while the light is on.
def on_increase_brightness_event(self, event_name, data, kwargs):
if self.get_state(self.entity_id) != 'off':
self.call_service("light/turn_on", entity_id = self.entity_id, brightness_step_pct = self.brightness_pct_step)
self.call_service("light/turn_on", entity_id = self.entity_id, brightness_step_pct = self.brightness_pct_step, transition = self.brightness_step_transition)
# Decrease brightness by brightness_pct_step while the light is on.
def on_decrease_brightness_event(self, event_name, data, kwargs):
if self.get_state(self.entity_id) != 'off':
self.call_service("light/turn_on", entity_id = self.entity_id, brightness_step_pct = -self.brightness_pct_step)
self.call_service("light/turn_on", entity_id = self.entity_id, brightness_step_pct = -self.brightness_pct_step, transition = self.brightness_step_transition)
# EventHandler callback for on_events_with_transition. Jumps to 1 %
# first so the ramp always starts from a known low level, then
# transitions to the configured brightness over transition_time seconds.
def on_turn_on_with_transition(self, event_name, data, kwargs, event_category):
def on_turn_on_with_transition(self, event_name, data, event_category):
transition_time = self.args["on_events_with_transition"][event_category]["transition_time"]
brightness_pct = self.args["on_events_with_transition"][event_category]["brightness_pct"]
self.log(f"Turn on at {brightness_pct}% with a transition of {transition_time}s")

View File

@@ -155,8 +155,12 @@ class SmartSwitch(SmartObject):
# Timer callback: applies the deferred state set by auto_switch_*_after.
def on_auto_switch_after(self, kwargs):
self.auto_switch_cb_handle = None
self.log(f"Switching {self.entity_id} {kwargs['new_state']} after {kwargs['autoswitch_delay']}s")
self.set_state(self.entity_id,state = kwargs['new_state'])
new_state = kwargs['new_state']
self.log(f"Switching {self.entity_id} {new_state} after {kwargs['autoswitch_delay']}s")
if new_state == 'on':
self.switch_on()
else:
self.switch_off()
# ------------------------------------------------------------------
# Public switch API

16
apps/virtualevents.yaml Normal file
View File

@@ -0,0 +1,16 @@
virtual_events:
module: virtualevents
class: VirtualEvents
priority: 100 # default priority app is 50, since the virtual_events doesn't create any sensor but is based on sensor created by many app, it's important it's created last
virtual_events:
good_morning:
event_name: good_morning
event_condition: not input_boolean.sleeping
turn_off_all_lights:
event_name: turn_off_all_lights
event_condition: input_boolean.sleeping

24
archiveLogs.sh Normal file
View File

@@ -0,0 +1,24 @@
#!/bin/sh
# Archive log files from the logs directory into a timestamped subdirectory, then delete them.
LOGS_DIR=/conf/logs
ARCHIVE_DIR=$LOGS_DIR/archive
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
DEST=$ARCHIVE_DIR/$TIMESTAMP
# Find log files directly in the logs directory (non-recursive, skip archive subdir)
LOG_FILES=$(find "$LOGS_DIR" -maxdepth 1 -type f -name "*.log")
if [ -z "$LOG_FILES" ]; then
echo "[archiveLogs] No log files to archive."
exit 0
fi
mkdir -p "$DEST"
for f in $LOG_FILES; do
mv "$f" "$DEST/"
done
echo "[archiveLogs] Archived logs to $DEST"

1
dockerStart.sh Symbolic link
View File

@@ -0,0 +1 @@
/usr/src/app/dockerStart.sh

View File

@@ -1 +1,2 @@
debugpy
debugpy
icalendar