Compare commits

..

12 Commits

15 changed files with 815 additions and 23 deletions

1
.vscode/launch.json vendored
View File

@@ -12,6 +12,7 @@
"host": "localhost", "host": "localhost",
"port": 5678 "port": 5678
}, },
"steppingResumesAllThreads": true,
"pathMappings": [ "pathMappings": [
{ {
"localRoot": "${workspaceFolder}", "localRoot": "${workspaceFolder}",

View File

@@ -3,6 +3,7 @@ appdaemon:
longitude: 0 longitude: 0
elevation: 30 elevation: 30
time_zone: Europe/Berlin time_zone: Europe/Berlin
internal_function_timeout: 00:20:00
exclude_dirs: exclude_dirs:
- unit_tests - unit_tests
plugins: plugins:
@@ -11,6 +12,8 @@ appdaemon:
ha_url: http://10.0.0.21:8123 ha_url: http://10.0.0.21:8123
token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJiZThhYWI2ZTlkNmQ0NWU2YTk1ODU5OWJjOWM3MWJkYiIsImlhdCI6MTc3NjEwOTc0NCwiZXhwIjoyMDkxNDY5NzQ0fQ.BorkjFjWlWCZqnUa9-NMUxGsDiupDoRZ3cEgsmeSofM token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJiZThhYWI2ZTlkNmQ0NWU2YTk1ODU5OWJjOWM3MWJkYiIsImlhdCI6MTc3NjEwOTc0NCwiZXhwIjoyMDkxNDY5NzQ0fQ.BorkjFjWlWCZqnUa9-NMUxGsDiupDoRZ3cEgsmeSofM
cert_verify: True cert_verify: True
q_timeout: 1200
ws_timeout: 00:10:00
http: http:
url: http://0.0.0.0:5050 url: http://0.0.0.0:5050
admin: admin:

333
apps/alarmclock.py Normal file
View File

@@ -0,0 +1,333 @@
import appdaemon.plugins.hass.hassapi as hass
from ad_toolbox.smartobject import SmartObject
import ad_toolbox.smartcondition as SmartCondition
import datetime
from threading import Lock
from requests import get
import requests
import icalendar
from enum import Enum
class EventType(Enum):
Vacation = 1
Flex = 2
Busy = 3
Tentative = 4
class CalendarEvent:
def __init__(self,start_time,stop_time,event_type):
self.start_time = start_time
self.stop_time = stop_time
self.type = event_type
def __str__(self):
return f"[{self.type}]Start at {self.start_time},Finish at {self.stop_time}"
def __eq__(self, obj):
return isinstance(obj, CalendarEvent) and obj.type == self.type and obj.start_time == self.start_time and obj.stop_time == self.stop_time
def __ne__(self, obj):
return not self == obj
class AlarmClock(SmartObject):
def on_initialize_smart_object(self):
self.switch_alarmclock_main = self.args['switch_alarmclock_main']
self.switch_alarmclock_secondary = self.args['switch_alarmclock_secondary']
self.inputdate_alarmclock_main = self.args['inputdate_alarmclock_main']
self.inputdate_alarmclock_secondary = self.args['inputdate_alarmclock_secondary']
self.input_text_wakeup_time = self.args['input_text_wakeup_time']
#self.sensor_wakeup_datetime = self.args['sensor_wakeup_datetime']
self.sleeping_condition = SmartCondition.Evaluator(self, self.args['sleeping_condition'], condition_name="sleeping_condition", constants=self.constants, templates_library=self.templates_library)
self.lock = Lock()
self.wakeup_time_str = ""
self.handle_program_next_wakueup_cb = None
self.handle_prewakueup_cb = None
self.handle_wakueup_cb = None
self.handle_switch_reveil_principal = self.listen_state(self.on_alarm_clock_set, self.switch_alarmclock_main)
self.handle_switch_reveil_secondaire = self.listen_state(self.on_alarm_clock_set, self.switch_alarmclock_secondary)
self.listen_state(self.on_alarm_clock_set, self.inputdate_alarmclock_main)
self.listen_state(self.on_alarm_clock_set, self.inputdate_alarmclock_secondary)
self.run_hourly(self.on_refresh_cal,datetime.time(0, 0, 0))
self.sensor_wakeup_datetime_entity = self.create_entity("sensor.wakeup_datetime", icon="mdi:alarm")
#sensor_wakeup_datetime_entity = self.get_entity(self.sensor_wakeup_datetime)
#if not sensor_wakeup_datetime_entity.exists():
# sensor_wakeup_datetime_entity.add()
self.calendar = list()
self.refresh_cal()
self.update_wakeup_time()
def refresh_cal(self):
new_cal = list()
#if self.load_cal_from_google_calendar(new_cal):
if self.load_cal_from_ics(new_cal):
if not self.are_cal_identical(self.calendar,new_cal):
self.calendar = new_cal
self.log("Calendar has been updated!")
self.print_cal(self.calendar)
return True
return False
def are_cal_identical(self,calendar1,calendar2):
if len(calendar1) != len(calendar2): return False
for i, event in enumerate(calendar1):
if calendar2[i] != event:
return False
return True
def print_cal(self,calendar):
for event in calendar:
self.log(f"{event}")
def load_cal_from_ics(self,calendar):
calendar.clear()
try:
self.log(f"Retrieving calengar from outlook.office365.com")
r = get("https://outlook.office365.com/owa/calendar/0c64359c70eb41cd9713fe92c470d3df@ubisoft.com/5edb3fb5150f4203b66d9cf8db3184be11626033572842691749/calendar.ics", timeout=10)#, headers=headers, verify=False)
#self.log(f"Processing Calendar")
today = datetime.datetime.combine(datetime.date.today(), datetime.time(0,0, 0))
start_load_date = today - datetime.timedelta(days=1)
end_load_date = today + datetime.timedelta(days=2)
#self.log("[" + str(start_load_date) + "]-[" + str(end_load_date) + "]")
calendar_ubi = icalendar.Calendar.from_ical(r.text)
for component in calendar_ubi.walk():
if component.name == "VEVENT":
event_summary = component.get('summary')
if event_summary == "Away": event_type = EventType.Vacation
elif event_summary == "Busy": event_type = EventType.Busy
elif event_summary == "Tentative": event_type = EventType.Tentative
elif event_summary == "Working elsewhere" or event_summary == "Free": event_type = EventType.Flex
else: event_type = None
start_datetime = component.get('dtstart').dt
end_datetime = component.get('dtend').dt
#regular event have a datetime and full day events only a date
#let's make both event a time zone unaware datetime
if isinstance(start_datetime,datetime.datetime):
start_datetime = start_datetime.replace(tzinfo=None)
else:
start_datetime = datetime.datetime.combine(start_datetime,datetime.time(0,0,0))
if isinstance(end_datetime,datetime.datetime):
end_datetime = end_datetime.replace(tzinfo=None)
else:
end_datetime = datetime.datetime.combine(end_datetime,datetime.time(0,0,0))
outside_of_loading_period = False
if start_datetime < start_load_date and end_datetime < start_load_date:
outside_of_loading_period = True
if start_datetime > end_load_date and end_datetime > end_load_date:
outside_of_loading_period = True
if not outside_of_loading_period:
if event_type:
calendar.append(CalendarEvent(start_datetime,end_datetime,event_type))
else: self.log(f"[Unknow event : {event_summary}]Start at {start_datetime},Finish at {end_datetime}",level = "WARNING")
except ValueError:
self.log_error("Couldn't load calendar")
return False
except (requests.exceptions.ConnectionError,requests.exceptions.ReadTimeout):
self.log("Couldn't load calendar, request Timeout",level = "WARNING")
return False
#self.log(f"Calendar loaded")
return True
def on_alarm_clock_set(self, entity, attribute, old, new, kwargs):
self.update_wakeup_time()
def on_refresh_cal(self, kwargs):
if self.refresh_cal():
self.update_wakeup_time()
def on_program_next_wakeup(self, kwargs):
self.handle_program_next_wakueup_cb = None
no_switch_changed = True
#le reveil principal se reactive chaque matin
if self.get_state(self.switch_alarmclock_main) == 'off':
self.log("Turning on main alarm clock")
no_switch_changed = False
self.turn_on(self.switch_alarmclock_main)
#le reveil secondaire ne sonne qu'une fois, il est donc desactivé chaque matin
if self.get_state(self.switch_alarmclock_secondary) == 'on':
self.log("Turning off secondary alarm clock")
no_switch_changed = False
self.turn_off(self.switch_alarmclock_secondary)
if no_switch_changed:
self.update_wakeup_time()
def on_pre_wakeup(self, kwargs):
self.handle_prewakueup_cb = None
for key, value in kwargs.items():
if key == 'wakeup_in' :
wakeup_in = value
if wakeup_in > 0:
self.handle_prewakueup_cb = self.run_in(self.on_pre_wakeup, 5 * 60,wakeup_in = wakeup_in - 5)
wakeup_message = "On se reveil dans " + str(wakeup_in) + " minutes"
self.log(wakeup_message)
if self.sleeping_condition.evaluate() == SmartCondition.Result.Succeeded: self.fire_event("WAKEUP_IN", wakeup_in = wakeup_in)
#self.call_service("notify/ios_iphone_de_pierre", title = "Reveil", message = wakeup_message)
def on_wakeup(self, kwargs):
self.handle_wakueup_cb = None
wakeup_message = "WAKE UP!!!! It's " + self.wakeup_time_str
self.log(wakeup_message)
if self.sleeping_condition.evaluate() == SmartCondition.Result.Succeeded: self.fire_event("WAKEUP")
#self.call_service("notify/ios_iphone_de_pierre", title = "Reveil", message = wakeup_message)
def is_wakeup_time_on_a_working_day(self, wakeup_datetime):
if self.is_during_vacation(wakeup_datetime):
return False
if wakeup_datetime.weekday() >= 5: #saturday is 5, sunday is 6
return False
return True
def is_during_vacation(self, date_time):
for event in self.calendar:
if event.type == EventType.Vacation and event.start_time <= date_time and date_time <= event.stop_time:
return True
return False
def is_wakeup_time_on_a_flex_day(self, date_time):
return False
#we'll see about that later
for event in self.calendar:
if event.type == EventType.Flex and event.start_time <= date_time and date_time <= event.stop_time:
return True
return False
def get_next_busy(self,date_time):
for event in self.calendar:
if event.type == EventType.Busy and event.start_time >= date_time:
return event
return None
def offset_wakeup_time_for_meeting(self,wakeup_datetime):
date_time = wakeup_datetime - datetime.timedelta(hours=wakeup_datetime.hour,minutes=wakeup_datetime.minute)
busy_event = self.get_next_busy(date_time)
if busy_event and busy_event.start_time.date() == date_time.date() and busy_event.start_time.hour > 6 and busy_event.start_time <= wakeup_datetime:
wake_up_datetime = busy_event.start_time - datetime.timedelta(minutes=30)
new_wake_up_time = wake_up_datetime.strftime("%H:%M")
self.log(f"A meeting have been found at {busy_event.start_time}, we'll wake up at {new_wake_up_time}")
return new_wake_up_time
new_wake_up_time = wakeup_datetime.strftime("%H:%M")
#self.log(f"No meeting have been found before {wake_up_time}, we'll wake up at {wake_up_time}")
return new_wake_up_time
def calculate_wakeup_time(self):
new_wakeup_time = "--:--"
if self.get_state(self.switch_alarmclock_secondary) == 'on':
new_wakeup_time = self.get_state(self.inputdate_alarmclock_secondary)[0:5]
elif self.get_state(self.switch_alarmclock_main) == 'on':
max_flex_wakeup_time = "10:00" #todo: make that data driven
max_flex_wakeup_datetime = self.generate_wakeup_datetime_from_string(max_flex_wakeup_time)
new_wakeup_time = self.get_state(self.inputdate_alarmclock_main)[0:5]
new_wakeup_datetime = self.generate_wakeup_datetime_from_string(new_wakeup_time)
if not self.is_wakeup_time_on_a_working_day(new_wakeup_datetime):
self.log(f"Primary Wake up time ({new_wakeup_datetime}) is not on a working day")
new_wakeup_time = "--:--"
elif self.is_wakeup_time_on_a_flex_day(max_flex_wakeup_datetime):
self.log(f"Flex Wake up time ({max_flex_wakeup_datetime}) is on a flex office day")
new_wakeup_time = self.offset_wakeup_time_for_meeting(max_flex_wakeup_datetime)
else:
new_wakeup_time = self.offset_wakeup_time_for_meeting(new_wakeup_datetime)
return new_wakeup_time
def generate_wakeup_datetime_from_string(self,wakeup_string):
wakeup_time = datetime.time(int(wakeup_string[0:2]), int(wakeup_string[3:5]), 0)
wakeup_datetime = datetime.datetime.combine(datetime.date.today(),wakeup_time)
if (datetime.datetime.combine(datetime.date.today(), wakeup_time) - datetime.datetime.now()).total_seconds() <= 0:
wakeup_datetime = wakeup_datetime + datetime.timedelta(days=1)
return wakeup_datetime
def register_callbacks(self, wakeup_time_str):
if self.wakeup_time_str != wakeup_time_str or not self.handle_program_next_wakueup_cb:
with self.lock:
#first, cancel all previous callbacks
if self.handle_program_next_wakueup_cb:
self.log("Canceling previous program next wakeup cb")
self.cancel_timer(self.handle_program_next_wakueup_cb)
self.handle_program_next_wakueup_cb = None
if self.handle_prewakueup_cb:
self.log("Canceling previous pre wakeup cb")
self.cancel_timer(self.handle_prewakueup_cb)
self.handle_prewakueup_cb = None
if self.handle_wakueup_cb:
self.log("Canceling previous wakeup cb")
self.cancel_timer(self.handle_wakueup_cb)
self.handle_wakueup_cb = None
if wakeup_time_str == '--:--':
#if their is no alarm clock set up, we just need to program a callback to update the wakeup time tomorow
# 12:00 should do it
program_next_wakeup_time = datetime.datetime.combine(datetime.date.today(), datetime.time(12,0, 0))
#if we toggle off an alalrm clock before 4, we assume it's for the 'next' day (meaning tbe same day between 0 and 4)
#if we toggle off an alalrm clock after 4, it's suppose to stay so untill next day
if datetime.datetime.now().hour >= 4:
program_next_wakeup_time = program_next_wakeup_time + datetime.timedelta(days=1)
self.log("program next wakeup cb registered for " + str(program_next_wakeup_time))
self.handle_program_next_wakueup_cb = self.run_at(self.on_program_next_wakeup, program_next_wakeup_time)
self.sensor_wakeup_datetime_entity.set_state(state = 'unavailable')
else:
wakeup_datetime = self.generate_wakeup_datetime_from_string(wakeup_time_str)
self.sensor_wakeup_datetime_entity.set_state(state = wakeup_datetime.strftime("%H:%M"))
#the program next wakeup CB need to be slightly after the wakeup CB to avoid the risk of wakeup CB cleaning it's handle AFTER program next wakup set up the next one
program_next_wakeup_datetime = wakeup_datetime + datetime.timedelta(seconds=1)
prewakeup_offset = min(60,(wakeup_datetime - datetime.datetime.now()).total_seconds() / 60)
prewakeup_offset = int(prewakeup_offset / 10) * 10
prewakeup_datetime = wakeup_datetime - datetime.timedelta(minutes=prewakeup_offset)
self.handle_prewakueup_cb = self.run_at(self.on_pre_wakeup, prewakeup_datetime,wakeup_in = prewakeup_offset)
self.log("prewakup cb registered for " + str(prewakeup_datetime))
self.handle_wakueup_cb = self.run_at(self.on_wakeup, wakeup_datetime)
self.log("wakeup cb registered for " + str(wakeup_datetime))
self.handle_program_next_wakueup_cb = self.run_at(self.on_program_next_wakeup, program_next_wakeup_datetime)
self.log("program next wakeup cb registered for " + str(program_next_wakeup_datetime))
self.wakeup_time_str = wakeup_time_str
self.call_service("input_text/set_value", entity_id = self.input_text_wakeup_time, value = self.wakeup_time_str)
def update_wakeup_time(self):
self.register_callbacks(self.calculate_wakeup_time())

View File

@@ -1,8 +1,9 @@
# I now directly run AppDaemon with debugpy in DockerStart.sh. Not sure which is best debugger:
# debugger: module: debugger
# module: debugger class: Debugger
# class: Debugger priority: 0
# port: 5678 port: 5678
wait_for_client: false # set to true when you need to debug initialize()
motion_tracker: motion_tracker:
module: motiontracker module: motiontracker
@@ -10,8 +11,6 @@ motion_tracker:
priority: 5 # this need to be initialized before app using motion tracker priority: 5 # this need to be initialized before app using motion tracker
mqtt_device_name: AD Motion Tracker
max_time: 30 max_time: 30
areas: areas:
@@ -25,4 +24,56 @@ motion_tracker:
motion_sensors: binary_sensor.garage_motion motion_sensors: binary_sensor.garage_motion
kitchen: kitchen:
motion_sensors: binary_sensor.kitchen_motion motion_sensors: binary_sensor.kitchen_motion
mezzanine:
motion_sensors: binary_sensor.mezzanine_motion
informations_collector:
module: informationscollector
class: InformationsCollector
priority: 1
#mute: True
error_collector:
reset_error_count_button: input_button.reset_error_count
output_sensor: sensor.ad_errors
# min_max_temp_sensors:
# temperature_sensors:
# - sensor.temperature_exterieur_nord
# - sensor.temperature_exterieur_sud
# output_sensors:
# max_temp: sensor.max_temp_outside
# max_temp_yesterday: sensor.max_temp_yesterday_outside
# min_temp: sensor.min_temp_outside
# min_temp_yesterday: sensor.min_temp_yesterday_outside
group_collectors:
windows:
group: binary_sensor.group_windows
output_sensor: sensor.ad_number_of_windows_open
music_players:
group: media_player.group_music_player
output_sensor: sensor.ad_number_of_music_player_active
active_condition: self == 'playing'
#windows:
# group: group.fenetres
# output_sensor: sensor.ad_number_of_windows_open
# shutters:
# group: group.volets
# output_sensor: sensor.ad_number_of_shutter_close
# occupency_sensors:
# group: group.occupency_sensors
# output_sensor: sensor.ad_number_of_rooms_occupied
# smart_lights:
# group:
# - group.smart_lights
# - group.dumb_lights
# output_sensor: sensor.ad_number_of_lights_on

View File

@@ -5,8 +5,14 @@ import debugpy
class Debugger(hass.Hass): class Debugger(hass.Hass):
def initialize(self): def initialize(self):
port = int(self.args.get("port", 5678)) port = int(self.args.get("port", 5678))
wait_for_client = bool(self.args.get("wait_for_client", False))
try: try:
debugpy.listen(("0.0.0.0", port)) debugpy.listen(("0.0.0.0", port))
self.log(f"debugpy listening on port {port}") self.log(f"debugpy listening on port {port}")
except RuntimeError: except RuntimeError:
self.log("debugpy already listening, skipping") self.log("debugpy already listening, skipping")
if wait_for_client:
self.log(f"waiting for debug client on port {port}")
debugpy.wait_for_client()
self.log("debug client attached")

View File

@@ -0,0 +1,182 @@
import ad_toolbox.smartcondition as SmartCondition
from ad_toolbox.smartobject import SmartObject
import pickle
import os
class InformationsCollector(SmartObject):
def on_initialize_smart_object(self):
super().on_initialize_smart_object()
if self.dataset is None:
self.dataset = dict()
if 'error_collector' in self.args:
if 'output_sensor' in self.args['error_collector']:
sensor_name = self.args['error_collector']['output_sensor']
self.error_list = []
self.log_handle = self.listen_log(self.on_log_error, "ERROR", log="main_log")
self.error_count_sensor = self.create_entity(
sensor_name, state=0,
attributes={"last_error": "", "error_list": []},
icon="mdi:alert-circle-outline"
)
if 'reset_error_count_button' in self.args['error_collector']:
self.listen_state(self.on_reset_error_count, self.args['error_collector']['reset_error_count_button'])
else:
self.log_error("No output_sensor defined for error_collector")
if 'min_max_temp_sensors' in self.args:
for sensor in self.args['min_max_temp_sensors']['temperature_sensors']:
self.listen_state(self.on_temperature_change, sensor)
output_sensors = self.args['min_max_temp_sensors']['output_sensors']
self.max_temp_sensor = self.get_entity(output_sensors['max_temp'])
self.max_temp_yesterday_sensor = self.get_entity(output_sensors['max_temp_yesterday'])
self.min_temp_sensor = self.get_entity(output_sensors['min_temp'])
self.min_temp_yesterday_sensor = self.get_entity(output_sensors['min_temp_yesterday'])
current_min_temp, current_max_temp = self.compute_min_max_temp()
if not self.max_temp_sensor.exists() or self.max_temp_sensor.get_state() == 'unavailable':
max_temp = current_max_temp
try:
if max_temp is None or self.dataset['max_temp'] > max_temp:
max_temp = self.dataset['max_temp']
self.log_info(f"Restoring max_temp from dataset ({max_temp}°C)")
except KeyError: pass
self.set_max_temp_sensor(max_temp)
if not self.min_temp_sensor.exists() or self.min_temp_sensor.get_state() == 'unavailable':
min_temp = current_min_temp
try:
if min_temp is None or self.dataset['min_temp'] < min_temp:
min_temp = self.dataset['min_temp']
self.log_info(f"Restoring min_temp from dataset ({min_temp}°C)")
except KeyError: pass
self.set_min_temp_sensor(min_temp)
self.run_daily(self.run_at_midnight, "00:00:00")
if 'group_collectors' in self.args:
self._group_sensors = {}
self._group_evaluators = {}
for name, config in self.args['group_collectors'].items():
self._group_sensors[name] = self.create_entity(
config['output_sensor'], state=0,
attributes={'active_entities': ''},
icon='mdi:counter'
)
entities = self._get_group_entities(config)
if 'active_condition' in config:
self._group_evaluators[name] = {
entity: SmartCondition.Evaluator(
self,
config['active_condition'],
condition_name=f"{name}_{entity}",
constants={'self': entity},
log_callback_trigger_reason=False,
on_change_cb=self._on_group_condition_changed,
)
for entity in entities
}
else:
for entity in entities:
self.listen_state(self.on_group_entity_state_changed, entity)
self.update_group_collectors()
def set_max_temp_sensor(self,temperature):
self.dataset['max_temp'] = temperature
self.max_temp_sensor.set_state(state = temperature, attributes = { 'unit_of_measurement': '°C', 'device_class': 'temperature'})
def set_min_temp_sensor(self,temperature):
self.dataset['min_temp'] = temperature
self.min_temp_sensor.set_state(state = temperature, attributes = { 'unit_of_measurement': '°C', 'device_class': 'temperature'})
def terminate(self):
self.log_info("Writing dataset to " + self.get_dataset_name())
f = open(self.get_dataset_name(), 'wb')
pickle.dump(self.dataset, f)
f.close()
def compute_min_max_temp(self):
max_temp = None
min_temp = None
for sensor in self.args['min_max_temp_sensors']['temperature_sensors']:
temp = self.get_state(sensor)
if not max_temp or temp > max_temp: max_temp = temp
if not min_temp or temp < min_temp: min_temp = temp
return min_temp,max_temp
def on_log_error(self, name, ts, level, type, message, kwargs):
error_count = int(self.error_count_sensor.get_state()) + 1
self.error_list.append(message)
self.error_count_sensor.set_state(state=error_count, attributes={"last_error": message, "error_list": self.error_list})
def on_reset_error_count(self, entity, attribute, old, new, kwargs):
error_log_file = self.get_ad_api()._logging.config['error_log']['filename']
open(error_log_file, 'w').close()
self.error_list.clear()
self.error_count_sensor.set_state(state=0, attributes={"last_error": "", "error_list": self.error_list})
def _on_group_condition_changed(self, prev_result,result):
self.update_group_collectors()
def on_group_entity_state_changed(self, entity, attribute, old, new, kwargs):
if new != old:
self.log_info(f"updating group collectors ({entity} changed from {old} to {new})")
self.update_group_collectors()
def on_temperature_change(self, entity, attribute, old, new, kwargs):
if new != old:
min_temp = self.min_temp_sensor.get_state()
max_temp = self.max_temp_sensor.get_state()
if max_temp is None or max_temp == 'unavailable' or new > max_temp: self.set_max_temp_sensor(new)
if min_temp is None or min_temp == 'unavailable' or new < min_temp: self.set_min_temp_sensor(new)
def run_at_midnight(self, kwargs):
self.max_temp_yesterday_sensor.set_state(state=self.max_temp_sensor.get_state(), attributes={'unit_of_measurement': '°C', 'device_class': 'temperature'})
self.min_temp_yesterday_sensor.set_state(state=self.min_temp_sensor.get_state(), attributes={'unit_of_measurement': '°C', 'device_class': 'temperature'})
min_temp, max_temp = self.compute_min_max_temp()
self.set_max_temp_sensor(max_temp)
self.set_min_temp_sensor(min_temp)
def _get_group_entities(self, config):
group = config['group']
if isinstance(group, str):
return self.get_state(group, attribute='entity_id') or []
entities = []
for g in group:
entities.extend(self.get_state(g, attribute='entity_id') or [])
return entities
def _is_entity_active(self, name, entity):
if name in self._group_evaluators:
return self._group_evaluators[name][entity].evaluate(log=False) == SmartCondition.Result.Succeeded
return self.get_state(entity) == 'on'
# def _get_group_entity_name(self, entity):
# entity_state = self.get_state(entity, attribute='all') or {}
# attributes = entity_state.get('attributes', {})
# friendly_name = attributes.get('friendly_name')
# return friendly_name if isinstance(friendly_name, str) and friendly_name else entity
def update_group_collectors(self):
for name, config in self.args['group_collectors'].items():
sensor = self._group_sensors[name]
prev_state = sensor.get_state()
prev_count = int(prev_state) if prev_state is not None else 0
active_entities = []
for entity in self._get_group_entities(config):
if self.entity_exists(entity) and self._is_entity_active(name, entity):
active_entities.append(entity)
count = len(active_entities)
sensor.set_state(state=count, attributes={'active_entities': ', '.join(active_entities)})
if prev_count != count:
self.log_info(f"{name}: {count} active entities (previously {prev_count})")

View File

@@ -30,6 +30,34 @@ sleep_switch:
button: 'off' button: 'off'
press_type: 'long_release' press_type: 'long_release'
off_events:
- WAKEUP
bedroom_light_switch:
module: smartswitch
class: SmartSwitch
entity: switch.bedroom_light
off_events:
- turn_off_all_lights
toggle_events:
button_press_pierre:
event_name: zha_event
event_data:
device_name: 'bedroom_remote_pierre'
args:
press_type: 'press'
button: 'on'
button_press_maeva:
event_name: zha_event
event_data:
device_name: 'bedroom_remote_maeva'
args:
press_type: 'press'
button: 'on'
sonos_led: sonos_led:
module: smartswitch module: smartswitch
class: SmartSwitch class: SmartSwitch
@@ -73,6 +101,16 @@ light_bedroom_bedlight_pierre:
press_type: 'hold' press_type: 'hold'
button: 'down' button: 'down'
on_events_with_transition:
wake_up_events:
transition_time: 120
brightness_pct: 50
events:
WAKEUP_IN:
event_name: WAKEUP_IN
event_data:
wakeup_in: 5
light_bedroom_bedlight_maeva: light_bedroom_bedlight_maeva:
module: smartlight module: smartlight
class: SmartLight class: SmartLight
@@ -113,4 +151,30 @@ bed_ledstrip:
class: SmartLight class: SmartLight
entity: light.bedroom_ledstripe entity: light.bedroom_ledstripe
smart_conditions: (input_boolean.sleeping or binary_sensor.day_interval_night) and (binary_sensor.bedroom_motion_bed_pierre or binary_sensor.bedroom_motion_bed_maeva) smart_conditions:
trigger_conditions: (input_boolean.sleeping or binary_sensor.day_interval_night) and (binary_sensor.bedroom_motion_bed_pierre or binary_sensor.bedroom_motion_bed_maeva)
blocking_conditions: switch.bedroom_light or light.bedroom_bedlight_pierre or light.bedroom_bedlight_maeva
on_events_with_transition:
wake_up_events:
transition_time: 120
brightness_pct: 50
events:
WAKEUP_IN:
event_name: WAKEUP_IN
event_data:
wakeup_in: 8
light_brightness_pct: 1
alarm_clock:
module: alarmclock
class: AlarmClock
switch_alarmclock_main: input_boolean.main_alarmclock
switch_alarmclock_secondary: input_boolean.secondary_alarmclock
inputdate_alarmclock_main: input_datetime.main_alarmclock
inputdate_alarmclock_secondary: input_datetime.secondary_alarmclock
input_text_wakeup_time: input_text.wakeup_time
sleeping_condition: input_boolean.sleeping

View File

@@ -3,5 +3,6 @@ light_garage:
class: SmartLight class: SmartLight
entity: switch.garage_01 entity: switch.garage_01
smart_conditions: sensor.garage_last_motion < 3 smart_conditions:
blocking_conditions: sensor.garage_motion_light_level > 10 and not self trigger_conditions: sensor.garage_last_motion < 3
blocking_conditions: sensor.garage_motion_light_level > 10

View File

@@ -20,3 +20,74 @@ light_kitchen_sink_switch:
off_events: off_events:
- turn_off_all_lights - turn_off_all_lights
senseo_quadrante:
module: senseo
class: Senseo
virtual_sensors:
default_values:
# this is done to break the circular dependencies between sensor.senseo_cup_icon and sensor.senseo_cup
sensor.senseo_cup_icon: "" # "mdi:coffee-off-outline"
sensors:
value_selector.senseo_cup:
attributes:
friendly_name: Tasse
sensor_attributes:
icon: sensor.senseo_cup_icon
values:
unavailable: sensor.senseo_quadrante_operating_state == 'unavailable'
Prête: binary_sensor.senseo_quadrante_cup_available and binary_sensor.senseo_quadrante_cup_full
Service en cours: binary_sensor.senseo_quadrante_cup_available and sensor.senseo_quadrante_operating_state == 'SENSEO_BREWING'
Vide: binary_sensor.senseo_quadrante_cup_available
Absente: True
value_selector.senseo_cup_icon:
"mdi:coffee": sensor.senseo_cup == 'Prête'
"mdi:coffee-outline": sensor.senseo_cup == 'Vide' or sensor.senseo_cup == 'Service en cours'
"mdi:coffee-off-outline": True
value_selector.senseo_programmation:
attributes:
friendly_name: Programme
icon: mdi:coffee-to-go
values:
unavailable: sensor.senseo_quadrante_operating_state == 'unavailable'
1 Tasse (douche): binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 1 and binary_sensor.senseo_quadrante_has_program.power_pressed
2 Tasses (douche): binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 2 and binary_sensor.senseo_quadrante_has_program.power_pressed
1 Tasse: binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 1
2 Tasses: binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 2
Aucun: True
clear_program_button: button.senseo_quadrante_clear_program
programs:
one_cup_shower:
events:
shower_finished:
event_name: shower_finished
button: button.senseo_quadrante_brew_coffee_normal
condition: binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 1 and binary_sensor.senseo_quadrante_has_program.power_pressed
two_cup_shower:
events:
shower_finished:
event_name: shower_finished
button: button.senseo_quadrante_brew_coffee_double
condition: binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 2 and binary_sensor.senseo_quadrante_has_program.power_pressed
one_cup_morning:
delay: 10
events:
good_morning:
event_name: good_morning
button: button.senseo_quadrante_brew_coffee_normal
condition: binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 1 and not binary_sensor.senseo_quadrante_has_program.power_pressed
two_cup_morning:
delay: 10
events:
good_morning:
event_name: good_morning
button: button.senseo_quadrante_brew_coffee_double
condition: binary_sensor.senseo_quadrante_has_program and binary_sensor.senseo_quadrante_has_program.cup_size == 2 and not binary_sensor.senseo_quadrante_has_program.power_pressed

View File

@@ -0,0 +1,27 @@
light_mezzanine_01:
module: smartlight
class: SmartLight
entity: light.mezzanine_01
smart_conditions:
trigger_conditions: sensor.mezzanine_last_motion < 3
blocking_conditions: sensor.mezzanine_motion_light_level > 10 and not self
light_brightness_pct:
15: input_boolean.sleeping
35: binary_sensor.day_interval_night
100: True
light_mezzanine_02:
module: smartlight
class: SmartLight
entity: light.mezzanine_02
smart_conditions:
trigger_conditions: sensor.mezzanine_last_motion < 3
blocking_conditions: sensor.mezzanine_motion_light_level > 10 and not self
light_brightness_pct:
5: input_boolean.sleeping
25: binary_sensor.day_interval_night
100: True

46
apps/senseo.py Normal file
View File

@@ -0,0 +1,46 @@
from ad_toolbox.smartobject import SmartObject
import ad_toolbox.smartcondition as SmartCondition
from ad_toolbox.eventhandler import EventHandler
class Senseo(SmartObject):
def on_initialize_smart_object(self):
super().on_initialize_smart_object()
self.delay_cb_handle = None
self.smartconditions = dict()
self.event_handlers = list()
try: self.program_configs = self.args['programs']
except KeyError: self.program_configs = None
for config_name in self.program_configs:
config = self.program_configs[config_name]
self.smartconditions[config_name] = SmartCondition.Evaluator(self,config['condition'],condition_name = config_name)
if not self.entity_exists(config['button']): self.log_error(f"Entity not found {config['button']}")
self.event_handlers.append(EventHandler(self,config["events"],self.on_event,config_name))
def on_event(self, event_name, event_data,config_name):
self.log_info(f"Event {event_name} received. Config name = {config_name}")
config = self.program_configs[config_name]
try: delay = config['delay']
except KeyError: delay = 0
if self.smartconditions[config_name].evaluate() == SmartCondition.Result.Succeeded:
if (delay > 0):
if self.delay_cb_handle: self.cancel_timer(self.delay_cb_handle)
self.log_info(f"The coffee will be prepared in {delay}s")
self.delay_cb_handle = self.run_in(self.on_delay_elapse,delay,config_name = config_name)
else:
self.prepare_coffee(config_name)
def on_delay_elapse(self, kwargs):
self.delay_cb_handle = None
self.prepare_coffee(kwargs['config_name'])
def prepare_coffee(self,config_name):
config = self.program_configs[config_name]
self.log_info(f"Preparing Coffee ({config_name})")
self.call_service("button/press", entity_id = config['button'])
self.call_service("button/press", entity_id = self.args['clear_program_button'])

View File

@@ -141,7 +141,9 @@ class SmartLight(SmartSwitch):
# light_brightness_pct: build ordered evaluator list, run initial pass # light_brightness_pct: build ordered evaluator list, run initial pass
if "light_brightness_pct" in self.args: if "light_brightness_pct" in self.args:
self.always_change_brightness = False self.always_change_brightness = False
if isinstance(self.args["light_brightness_pct"], int):
self.light_brightness_pct = self.args["light_brightness_pct"]
else:
for key in self.args["light_brightness_pct"]: for key in self.args["light_brightness_pct"]:
if key == 'always_change_brightness': if key == 'always_change_brightness':
self.always_change_brightness = bool(self.args["light_brightness_pct"][key]) self.always_change_brightness = bool(self.args["light_brightness_pct"][key])
@@ -183,7 +185,7 @@ class SmartLight(SmartSwitch):
# EventHandler callback for on_events_with_transition. Jumps to 1 % # EventHandler callback for on_events_with_transition. Jumps to 1 %
# first so the ramp always starts from a known low level, then # first so the ramp always starts from a known low level, then
# transitions to the configured brightness over transition_time seconds. # transitions to the configured brightness over transition_time seconds.
def on_turn_on_with_transition(self, event_name, data, kwargs, event_category): def on_turn_on_with_transition(self, event_name, data, event_category):
transition_time = self.args["on_events_with_transition"][event_category]["transition_time"] transition_time = self.args["on_events_with_transition"][event_category]["transition_time"]
brightness_pct = self.args["on_events_with_transition"][event_category]["brightness_pct"] brightness_pct = self.args["on_events_with_transition"][event_category]["brightness_pct"]
self.log(f"Turn on at {brightness_pct}% with a transition of {transition_time}s") self.log(f"Turn on at {brightness_pct}% with a transition of {transition_time}s")

View File

@@ -155,8 +155,12 @@ class SmartSwitch(SmartObject):
# Timer callback: applies the deferred state set by auto_switch_*_after. # Timer callback: applies the deferred state set by auto_switch_*_after.
def on_auto_switch_after(self, kwargs): def on_auto_switch_after(self, kwargs):
self.auto_switch_cb_handle = None self.auto_switch_cb_handle = None
self.log(f"Switching {self.entity_id} {kwargs['new_state']} after {kwargs['autoswitch_delay']}s") new_state = kwargs['new_state']
self.set_state(self.entity_id,state = kwargs['new_state']) self.log(f"Switching {self.entity_id} {new_state} after {kwargs['autoswitch_delay']}s")
if new_state == 'on':
self.switch_on()
else:
self.switch_off()
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Public switch API # Public switch API

View File

@@ -1 +1,2 @@
debugpy debugpy
icalendar