#import appdaemon.plugins.hass.hassapi as hass import enum from datetime import datetime #from pickle import FALSE from adexpressioncontext import ADExpressionContext from expressionparser import ConditionsParser,ParsingException,EvaluationException from kwargsparser import kwargsParser def catch_smartcondition_exception(error_lambda): def decorator(function): def wrapper(*args, **kwargs): try: return function(*args, **kwargs) except ParsingException as e: error_lambda(args[0],str(e)) return wrapper return decorator class Result(enum.Enum): Failed = 0 Succeeded = 1 Disabled = 2 Unavailable = 3 class Evaluator(): __TEMPLATE_CONDITIONS_STRING = "template_conditions" __TRIGGER_CONDITIONS_STRING = "trigger_conditions" __BLOCKING_CONDITIONS_STRING = "blocking_conditions" __DISABLE_CONDITIONS_STRING = "disable_conditions" __CALLBACK_DELAY_STRING = "callback_delay" __LOG_NO_VALID_CONDITION_DETAILS = "log_no_valid_condition_details" __LOG_INIT = "log_init" __EXTRA_ENTITIES_TO_LISTEN = "extra_entities_to_listen" __CONSTANTS = "constants" __UNAVAIBILITY_RESULT = "unavaibility_result" def __init__(self, appdaemon_api, conditions_block,**kwargs): self.__trigger_conditions = [] self.__blocking_conditions = [] self.__disable_conditions = [] self.__entities_to_listen = {} self.__debug_log_prefixes = [] self.__callback_delay = 0.0 self.__last_evaluation_result = None self.__state_callbacks_handle = list() self.__condition_state_change_delayed = list() self.__condition_state_change_delayed_cb_handle = None self.__appdaemon_api = appdaemon_api self.__activated = False self.__callback_trigger_reason_log_string = None self.__evaluation_result_log_string = None parser = kwargsParser(kwargs,lambda string: self.__log(string,level = "ERROR"),lambda string: self.__log(string,level = "WARNING")) self.__logger_interface = parser.parse_args('logger_interface',None) self.__no_log_prefix = parser.parse_args('no_log_prefix',False) #as no_log_prefix is used to feedback parser error, it need to be defined first condition_name = parser.parse_args('condition_name',None,['log_prefix']) if condition_name: if isinstance(condition_name,list): for name in condition_name: self.__debug_log_prefixes.append(f"[{name}]") else: self.__debug_log_prefixes.append(f"[{condition_name}]") extra_entities_to_listen = parser.parse_args(self.__EXTRA_ENTITIES_TO_LISTEN,None) if extra_entities_to_listen: if isinstance(extra_entities_to_listen, str): extra_entities_to_listen = [ extra_entities_to_listen ] self.__add_to_entities_to_listen(extra_entities_to_listen) self.__on_update_callback = parser.parse_args('on_update_cb',None,['callback']) self.__on_change_callback = parser.parse_args('on_change_cb',None) self.__unavaibility_result = parser.parse_args(self.__UNAVAIBILITY_RESULT,None) #None means throw an exception self.__on_succeed_callback = parser.parse_args('on_succeed_cb',None) self.__on_fail_callback = parser.parse_args('on_fail_cb',None) self.__pass_condition_name_to_cb = parser.parse_args('pass_condition_name_to_cb',None) self.__templates_library = parser.parse_args('templates_library',None) #try: default_args = appdaemon_api.args #except AttributeError: default_args = None #args = parser.parse_args('args',default_args) self.__log_init = parser.parse_args(self.__LOG_INIT,False) self.__trigger_callback_on_activation = parser.parse_args('trigger_callback_on_activation',True) self.__trigger_callback_on_entity_creation = parser.parse_args('trigger_callback_on_entity_creation',True) self.__log_no_valid_condition_details = parser.parse_args(self.__LOG_NO_VALID_CONDITION_DETAILS,False) self.__log_callback_trigger_reason = parser.parse_args('log_callback_trigger_reason',True) self.__sensors_aliases_default_value = parser.parse_args('sensors_aliases_default_value',dict()) activated = parser.parse_args('activated',True) sensor_existence_validation = parser.parse_args('sensor_existence_validation',True) self.expression_context = ADExpressionContext(self.__appdaemon_api,sensor_existence_validation) constants = parser.parse_args('constants',dict()) for constant_name,value in constants.items(): self.expression_context.declare_constant(constant_name,value) for alias,value in self.__sensors_aliases_default_value.items(): self.expression_context.declare_sensor_alias_default_value(alias,value) parser.validate_args() self.__parse_conditions_block(conditions_block) if activated: self.activate() def __has_user_callback(self): return self.__on_update_callback or self.__on_change_callback or self.__on_succeed_callback or self.__on_fail_callback def __has_user_callback_based_on_result(self): return self.__on_change_callback or self.__on_succeed_callback or self.__on_fail_callback def activate(self): if not self.__activated: self.__activated = True if self.__has_user_callback(): self.__register_internal_callback() if self.__has_user_callback_based_on_result() and self.__trigger_callback_on_activation: self.__trigger_callbacks("Callback trigger by condition activation",True) def deactivate(self): if self.__activated: self.__activated = False self.__last_evaluation_result = None self.__unregister_internal_callback() def __evaluate(self): try: self.__debug_log_prefixes.append("[Evaluating]") result, disable_reason = self.__evaluate_conditions_list(self.__disable_conditions) if result: self.__evaluation_result_log_string = self.__log_to_string(f"Disabled by {disable_reason}") self.__debug_log_prefixes.pop() return Result.Disabled else: result, trigger_reason = self.__evaluate_conditions_list(self.__trigger_conditions) if result: logstring = f"Triggered by {trigger_reason}" result,blocking_reason = self.__evaluate_conditions_list(self.__blocking_conditions) if result: logstring = f"Blocked by {blocking_reason}({logstring})" self.__evaluation_result_log_string = self.__log_to_string(logstring) self.__debug_log_prefixes.pop() return Result.Failed else: self.__evaluation_result_log_string = self.__log_to_string(logstring) self.__debug_log_prefixes.pop() return Result.Succeeded if self.__log_no_valid_condition_details: self.__evaluation_result_log_string = self.__log_to_string(f"No valid conditions\n{trigger_reason}") else: self.__evaluation_result_log_string = self.__log_to_string(f"No valid conditions") self.__debug_log_prefixes.pop() return Result.Failed except EvaluationException as e: if self.__unavaibility_result != None and "unavailable" in e.operands: self.__log_to_string(f"The expression will be evaluated as '{self.__unavaibility_result}' because of the following exception {e}") self.__debug_log_prefixes.pop() return self.__unavaibility_result else: self.__debug_log_prefixes.pop() raise e except Exception as e: self.__debug_log_prefixes.pop() raise e def evaluate(self,log = True): try: result = self.__evaluate() except EvaluationException as e: self.__log(f"{e}",level = "ERROR") return None if log: self.log_evaluation_result() return result def log_callback_trigger_reason(self): if self.__callback_trigger_reason_log_string: self.__log_info(self.__callback_trigger_reason_log_string) self.__callback_trigger_reason_log_string = None def log_evaluation_result(self): if self.__evaluation_result_log_string: self.__log_info(self.__evaluation_result_log_string) self.__evaluation_result_log_string = None def get_evaluation_result_log_string(self): return self.__evaluation_result_log_string def log_entities_states(self): log_string = "Entities states: " for entity,attributes in self.__entities_to_listen.items(): for attribute in attributes: if attribute == "state" or attribute == None: log_string += f"{entity} = {self.__appdaemon_api.get_state(entity)}," #not sure why it was here, most likely a left over #handle = self.__appdaemon_api.listen_state(self.__on_condition_state_change,entity) else: log_string += f"{entity}[{attribute}] = {self.__appdaemon_api.get_state(entity,attribute = attribute)}," self.__log_info(log_string[:-1]) def get_entities_to_listen(self): return self.__entities_to_listen def get_entities_to_listen_as_list(self): entities_list = list() for entity_id,attributes in self.__entities_to_listen.items(): for attribute in attributes: if attribute == None: entities_list.append(entity_id) else: entities_list.append(f"{entity_id}.{attribute}") return entities_list def __add_to_entities_to_listen(self,extra_entities_to_listen): #__add_to_entities_to_listen support both : # ['sensor.a.attribute', 'sensor.a','sensor.a.another_attribute'] and # { 'sensor.a' : ['attribute','another_attribute'] } # single entity to listen in the form of a string 'sensor.a' are converted to a one element list if isinstance(extra_entities_to_listen,str): extra_entities_to_listen = [ extra_entities_to_listen ] if isinstance(extra_entities_to_listen, list): for entity_string in extra_entities_to_listen: splitted_entity = entity_string.split('.') if len(splitted_entity) == 2: entity_id = entity_string attribute = None elif len(splitted_entity) == 3: entity_id = f"{splitted_entity[0]}.{splitted_entity[1]}" attribute = splitted_entity[2] else: self.__log(f"Invalid entity string '{entity_string}' present in extra_entities_to_listen",level = "ERROR") continue if not entity_id in self.__entities_to_listen: self.__entities_to_listen[entity_id] = [] if not attribute in self.__entities_to_listen[entity_id]: self.__entities_to_listen[entity_id].append(attribute) elif isinstance(extra_entities_to_listen, dict): for entity in extra_entities_to_listen: if not entity in self.__entities_to_listen: self.__entities_to_listen[entity] = [] #I support both "my.entity" : "attribute" and "my.entity": ["attribute1","attribute2"] if isinstance(extra_entities_to_listen[entity], str): attribute = extra_entities_to_listen[entity] if not attribute in self.__entities_to_listen[entity]: self.__entities_to_listen[entity].append(attribute) else: for attribute in extra_entities_to_listen[entity]: if not attribute in self.__entities_to_listen[entity]: self.__entities_to_listen[entity].append(attribute) else: self.__log(f"extra_entities_to_listen is in an invalid format: {extra_entities_to_listen}",level = "ERROR") def __evaluate_conditions_list(self,condition_list): for condition in condition_list: if condition.evaluate(): return True, str(condition) full_condition = "" for condition in condition_list: full_condition += f"- {condition}\n" return False, full_condition[:-1] def __load_template(self, conditions_block, template_library): def add_conditions_to_conditions_block(conditions_block,conditions_type,extra_conditions): def log_template_loading(condition_string): if self.__log_init: self.__log(f"Loading {conditions_type} : '{condition_string}'") if not conditions_type in conditions_block: conditions_block[conditions_type] = list() elif isinstance(conditions_block[conditions_type], str): #convert inline condition to list as we are goint to add more of them conditions_block[conditions_type] = [conditions_block[conditions_type]] if isinstance(extra_conditions, list): for condition_string in extra_conditions: log_template_loading(condition_string) conditions_block[conditions_type].append(condition_string) else: #support inline conditions log_template_loading(extra_conditions) conditions_block[conditions_type].append(extra_conditions) if isinstance(conditions_block, str): if conditions_block[0] == '<' and conditions_block[-1] == '>': conditions_block = { self.__TEMPLATE_CONDITIONS_STRING : conditions_block} if isinstance(conditions_block, dict) and self.__TEMPLATE_CONDITIONS_STRING in conditions_block: if template_library != None: self.__debug_log_prefixes.append("[Loading Templates]") template_string = conditions_block[self.__TEMPLATE_CONDITIONS_STRING] if template_string[0] == '<' and template_string[-1] == '>': splitted_template = template_string.split(",") for template in splitted_template: template = template.strip() if template[0] == '<' and template[-1] == '>': template = template[1:-1].strip() if template in template_library: if self.__log_init: self.__log(f"Loading template {template}") if isinstance(template_library[template],dict): param_parser = kwargsParser(template_library[template],lambda string: self.__log(string,level = "ERROR"),lambda string: self.__log(string,level = "WARNING")) trigger_conditions_args = param_parser.parse_args(self.__TRIGGER_CONDITIONS_STRING,None) if trigger_conditions_args: add_conditions_to_conditions_block(conditions_block,self.__TRIGGER_CONDITIONS_STRING,trigger_conditions_args) blocking_conditions_args = param_parser.parse_args(self.__BLOCKING_CONDITIONS_STRING, None) if blocking_conditions_args: add_conditions_to_conditions_block(conditions_block,self.__BLOCKING_CONDITIONS_STRING,blocking_conditions_args) disable_conditions_args = param_parser.parse_args(self.__DISABLE_CONDITIONS_STRING, None) if disable_conditions_args: add_conditions_to_conditions_block(conditions_block,self.__DISABLE_CONDITIONS_STRING,disable_conditions_args) if param_parser.parse_args(self.__TEMPLATE_CONDITIONS_STRING, None): self.__log(f"Template '{template}' contain recursive template",level = "ERROR") param_parser.validate_args() else: #manage inline conditions add_conditions_to_conditions_block(conditions_block,self.__TRIGGER_CONDITIONS_STRING,template_library[template]) else: self.__log(f"Template '{template}' is not present in the template library",level = "ERROR") else: self.__log(f"Error loading templates '{template}', invalid template format",level = "ERROR") break else: self.__log(f"Error loading templates '{template_string}', invalid template format",level = "ERROR") else: self.__log(f"Error no template library defined",level = "ERROR") #we don't need the template reference anymore in the dictionnary del conditions_block['template_conditions'] self.__debug_log_prefixes.pop() return conditions_block def __parse_conditions_block(self,conditions_block): logstring = "" self.__debug_log_prefixes.append("[Parsing]") args_to_ignore_in_validation = [self.__LOG_INIT] try: self.__log_init = conditions_block[self.__LOG_INIT] except (TypeError,KeyError): pass conditions_block = self.__load_template(conditions_block, self.__templates_library) if isinstance(conditions_block, dict): param_parser = kwargsParser(conditions_block,lambda string: self.__log(string,level = "ERROR"),lambda string: self.__log(string,level = "WARNING")) self.__callback_delay = param_parser.parse_args(self.__CALLBACK_DELAY_STRING,self.__callback_delay) if self.__log_init and self.__callback_delay > 0.0: self.__log("Callback delay set to " + str(self.__callback_delay)) self.__log_no_valid_condition_details = param_parser.parse_args(self.__LOG_NO_VALID_CONDITION_DETAILS, self.__log_no_valid_condition_details) unavaibility_result = param_parser.parse_args(self.__UNAVAIBILITY_RESULT, None) if unavaibility_result != None: if unavaibility_result == "Succeeded": self.__unavaibility_result = Result.Succeeded elif unavaibility_result == "Failed": self.__unavaibility_result = Result.Failed elif unavaibility_result == "Disabled": self.__unavaibility_result = Result.Disabled elif unavaibility_result == "None": self.__unavaibility_result = None else: self.__log_error(f'Invalid value for {self.__UNAVAIBILITY_RESULT} : {unavaibility_result}. Valid values are [Succeeded,Failed,Disabled,Unavailable,None]') extra_entities_to_listen = param_parser.parse_args(self.__EXTRA_ENTITIES_TO_LISTEN, None) if extra_entities_to_listen: if self.__log_init: self.__log(f"adding extra entities to listen : {extra_entities_to_listen}") self.__add_to_entities_to_listen(extra_entities_to_listen) constants = param_parser.parse_args(self.__CONSTANTS, None) if constants: if self.__log_init: self.__log(f"declaring constants :") for constant_name,value in constants.items(): if self.__log_init: self.__log(f"{constant_name} = {value}") self.expression_context.declare_constant(constant_name,value) trigger_conditions_args = param_parser.parse_args(self.__TRIGGER_CONDITIONS_STRING,None) if trigger_conditions_args: conditions_count = self.__parse_conditions_from_arg(trigger_conditions_args,self.__trigger_conditions) if logstring == "": logstring = "Found " else: logstring += ", " logstring += str(conditions_count) + " trigger conditions" blocking_conditions_args = param_parser.parse_args(self.__BLOCKING_CONDITIONS_STRING, None) if blocking_conditions_args: conditions_count = self.__parse_conditions_from_arg(blocking_conditions_args,self.__blocking_conditions) if logstring == "": logstring = "Found " else: logstring += ", " logstring += str(conditions_count) + " blocking conditions" disable_conditions_args = param_parser.parse_args(self.__DISABLE_CONDITIONS_STRING, None) if disable_conditions_args: conditions_count = self.__parse_conditions_from_arg(disable_conditions_args,self.__disable_conditions) if logstring == "": logstring = "Found " else: logstring += ", " logstring += str(conditions_count) + " disable conditions" param_parser.validate_args(args_to_ignore_in_validation) else: #inline conditon are considered as trigger conditions conditions_count = self.__parse_conditions_from_arg(conditions_block,self.__trigger_conditions) logstring = f"Found {conditions_count} trigger conditions" if logstring == "": logstring = "No conditions found" if self.__log_init: self.__log(logstring) self.__debug_log_prefixes.pop() def __parse_conditions_from_arg(self,conditions_arg, conditions_list): def parse_string(str): condition_parser = ConditionsParser(str,self.expression_context) #self.__log(f"{condition_parser}") conditions_list.append(condition_parser) self.__add_to_entities_to_listen(self.expression_context.get_entities_to_listen()) if isinstance(conditions_arg, list): for condition_string in conditions_arg: parse_string(condition_string) else: #support inline conditions parse_string(str(conditions_arg)) return len(conditions_list) def __register_internal_callback(self): assert len(self.__state_callbacks_handle) == 0, f"self.__state_callbacks_handle is not empy, you are probably trying to call __register_internal_callback() twice" logstring = "" for entity,attributes in self.__entities_to_listen.items(): attributes_string = "" for attribute in attributes: if attribute == "state" or attribute == None: #this is not healthy to use 'state' here as at some point an attribute could actually be called "state" assert attribute != 'state', "please don't use 'state' as a keyword to design the actual state of the entity, but use None instead" handle = self.__appdaemon_api.listen_state(self.__on_condition_state_change,entity) else: handle = self.__appdaemon_api.listen_state(self.__on_condition_state_change,entity, attribute = attribute) self.__state_callbacks_handle.append(handle) if attributes_string == "" : attributes_string = f"({attribute if attribute else 'state'}" else: attributes_string += f",{attribute if attribute else 'state'}" attributes_string += f")" if attributes_string == "(state)": attributes_string = "" if logstring == "": logstring = f"Registering CB for {entity}{attributes_string}" else: logstring += f", {entity}{attributes_string}" if self.__log_init: if logstring == "": logstring = f"No CB has been registered" self.__log(logstring) def __unregister_internal_callback(self): if len(self.__state_callbacks_handle) > 0: self.__log(f"Unregistering {len(self.__state_callbacks_handle)} callbacks") for handle in self.__state_callbacks_handle: self.__appdaemon_api.cancel_listen_state(handle) self.__state_callbacks_handle = list() def force_last_evaluation_result(self,new_result): assert new_result == Result.Succeeded or new_result == Result.Failed if self.__last_evaluation_result != Result.Disabled: #disabled smart_condition stay disabled self.__last_evaluation_result = new_result def __trigger_callbacks(self,trigger_reason,triggered_from_activation = False): has_log_trigger_reason = False has_evaluation_result = False def log_trigger_reason_once(trigger_reason): nonlocal has_log_trigger_reason if not has_log_trigger_reason: has_log_trigger_reason = True self.__callback_trigger_reason_log_string = self.__log_to_string(trigger_reason) if self.__log_callback_trigger_reason and (not triggered_from_activation or self.__log_init): self.log_callback_trigger_reason() def log_evaluation_result_once(): nonlocal has_evaluation_result if not has_evaluation_result: has_evaluation_result = True if self.__log_callback_trigger_reason and (not triggered_from_activation or self.__log_init): #in that particular case, the evaluation result is a trigger reason self.log_evaluation_result() if self.__on_update_callback and not triggered_from_activation: log_trigger_reason_once(trigger_reason) if self.__pass_condition_name_to_cb != None: self.__on_update_callback(self.__pass_condition_name_to_cb) else: self.__on_update_callback() if self.__has_user_callback_based_on_result(): try: result = self.__evaluate() except EvaluationException as e: self.__log(f"{e}",level = "ERROR") return #if result != Result.Disabled and result != self.__last_evaluation_result: if result == Result.Disabled: log_trigger_reason_once(trigger_reason) log_evaluation_result_once() elif result != self.__last_evaluation_result: if self.__on_change_callback: log_trigger_reason_once(trigger_reason) log_evaluation_result_once() if self.__pass_condition_name_to_cb != None: self.__on_change_callback(self.__pass_condition_name_to_cb,self.__last_evaluation_result,result) else: self.__on_change_callback(self.__last_evaluation_result,result) if self.__on_succeed_callback and result == Result.Succeeded: log_trigger_reason_once(trigger_reason) log_evaluation_result_once() if self.__pass_condition_name_to_cb != None: self.__on_succeed_callback(self.__pass_condition_name_to_cb) else: self.__on_succeed_callback() if self.__on_fail_callback and result == Result.Failed: log_trigger_reason_once(trigger_reason) log_evaluation_result_once() if self.__pass_condition_name_to_cb != None: self.__on_fail_callback(self.__pass_condition_name_to_cb) else: self.__on_fail_callback() #I want Disabled => something to trigger CB self.__last_evaluation_result = result def __on_condition_state_change_delayed(self, kwargs): self.__condition_state_change_delayed_cb_handle = None if len(self.__condition_state_change_delayed) == 0: self.__log(f"Callback has been trigger but self.__condition_state_change_delayed is empty, this shouldn't happen, the callback won't be called", level = "WARNING") else: logstring = "Callback trigger by" for element in self.__condition_state_change_delayed: logstring = logstring + f" {element[0]}({element[1]} => {element[2]} at {element[3].strftime('%Hh%M:%S')})," delay = (datetime.now() - self.__condition_state_change_delayed[0][3]).total_seconds() logstring = f"{logstring[:-1]} after a delay of {round(delay,1)}s" # if self.__manual_logs: # self.__callback_trigger_reason_log_string = self.__log_to_string(logstring) # else: # self.__log(logstring) self.__condition_state_change_delayed.clear() self.__trigger_callbacks(logstring) def __on_condition_state_change(self, entity, attribute, old, new, kwargs): if old != new and (old != None or self.__trigger_callback_on_entity_creation): if self.__callback_delay == 0.0: # log_string = f"Callback trigger by {entity}({old} => {new})" # if self.__manual_logs: # self.__callback_trigger_reason_log_string = self.__log_to_string(log_string) # else: self.__log(log_string) # if self.__on_update_callback: self.__on_update_callback() # if self.__has_user_callback_based_on_result(): # self.__update_result() self.__trigger_callbacks(f"Callback trigger by {entity}({old} => {new})") else: if self.__condition_state_change_delayed_cb_handle != None and self.__appdaemon_api.timer_running(self.__condition_state_change_delayed_cb_handle): self.__appdaemon_api.cancel_timer(self.__condition_state_change_delayed_cb_handle) self.__condition_state_change_delayed_cb_handle = None self.__condition_state_change_delayed.append((entity,old,new,datetime.now())) #self.__log("State changed :" + str(entity) + "(" + str(old) + " => " + str(new) + ") but callback delayed for " + str(self.__callback_delay) + "s") self.__condition_state_change_delayed_cb_handle = self.__appdaemon_api.run_in(self.__on_condition_state_change_delayed, self.__callback_delay) def __log(self,message,**kwargs): log_string = self.__log_to_string(message) if 'level' in kwargs and kwargs['level'] == 'ERROR': self.__log_error(log_string) elif 'level' in kwargs and kwargs['level'] == 'WARNING': self.__log_warning(log_string) else: self.__log_info(log_string) def __log_info(self,message): if self.__logger_interface: self.__logger_interface.log_info(message) else: try: self.__appdaemon_api.log_info(message) except AttributeError: self.__appdaemon_api.log(message,level = "INFO") def __log_warning(self,message): if self.__logger_interface: self.__logger_interface.log_warning(message) else: try: self.__appdaemon_api.log_warning(message) except AttributeError: self.__appdaemon_api.log(message,level = "WARNING") def __log_error(self,message): if self.__logger_interface: self.__logger_interface.log_error(message) else: try: self.__appdaemon_api.log_error(message) except AttributeError: self.__appdaemon_api.log(message,level = "ERROR") def __log_to_string(self,message): dest_string = "" if self.__no_log_prefix == False: for prefix in self.__debug_log_prefixes: dest_string = dest_string + prefix if dest_string: dest_string += " " return dest_string + message