commit 45376aebcb0c73a8d295643dbc8722ac6cf90916 Author: Pierre Date: Fri May 31 14:50:50 2024 +0200 migration from ha-cantegrill repo diff --git a/adexpressioncontext.py b/adexpressioncontext.py new file mode 100644 index 0000000..653757c --- /dev/null +++ b/adexpressioncontext.py @@ -0,0 +1,170 @@ +import pyparsing as pp +from expressionparser import BaseExpressionContext + +class ADExpressionContext(BaseExpressionContext): + def __init__(self,appdaemon_api, sensor_existence_validation = True): + self.appdaemon_api = appdaemon_api + self.variables = {} + self.constants = list() + self.sensor_aliases = list() + self.parser = None + self.entities_to_listen = dict() + self.variables_parsed_data = dict() + self.sensors_default_value = dict() + self.sensor_states = dict() + self.sensor_existence_validation = sensor_existence_validation + + self.declare_default_constants() + + def reset(self): + self.sensor_states = dict() + + def declare_default_constants(self): + self.declare_constant("True",True) + self.declare_constant("False",False) + self.declare_constant("None",None) + + class EvaluateBase: + def get_value(self,context): return None + def get_sensor_name_with_attribute(self,context): return None + + class EvaluateConstant(EvaluateBase): + def __init__(self, tokens): + assert len(tokens) == 1 + self.constant_name = tokens[0] + + def get_value(self,context): + return context.get_constant_value(self.constant_name) + + class EvaluateSensor(EvaluateBase): + def __init__(self, tokens): + assert len(tokens) == 1 + self.tokens = tokens[0] + + def get_value(self,context): + sensor_name_with_attribute = self.get_sensor_name_with_attribute(context) + + if sensor_name_with_attribute[1] != None: + return context.appdaemon_api.get_state(sensor_name_with_attribute[0], attribute = sensor_name_with_attribute[1]) + else: + return context.appdaemon_api.get_state(sensor_name_with_attribute[0]) + + def get_sensor_name_with_attribute(self,context): + if isinstance(self.tokens,ADExpressionContext.EvaluateConstant): + sensor_name = self.tokens.get_value(context) + else: + assert len(self.tokens) == 3 + sensor_name = f"{self.tokens[0]}{self.tokens[1]}{self.tokens[2]}" + return (sensor_name,None) #None is for the actual state of the sensor, not an attribute + + class EvaluateAttribute(EvaluateSensor): + def get_sensor_name_with_attribute(self,context): + sensor_name_with_attribute = self.tokens[0].get_sensor_name_with_attribute(context) + return (sensor_name_with_attribute[0],self.tokens[2]) + + def create_variable_parser(self): + separator = pp.Literal(".") + word = pp.Word(pp.alphanums + "_") + constant = pp.oneOf([constant for constant in self.constants]).set_parse_action(self.EvaluateConstant) + sensor_alias = pp.oneOf([constant for constant in self.sensor_aliases]).set_parse_action(self.EvaluateConstant) + sensor = sensor_alias | pp.Group(word + separator + word) + sensor.set_parse_action(self.EvaluateSensor) + sensor_with_attribute = pp.Group(sensor + separator + word).set_parse_action(self.EvaluateAttribute) + + expression = sensor_with_attribute | constant | sensor + + return expression + + def add_variable(self,variable_name): + parsed_data = self.evaluate_variable_expression(variable_name) + + #Couldn't parse the variable, it's either an unknown constant or a bad formed string + if parsed_data == None: + return False + + sensor_name_with_attribute = parsed_data.get_sensor_name_with_attribute(self) + + self.variables_parsed_data[variable_name] = parsed_data + if sensor_name_with_attribute != None: + sensor_name = sensor_name_with_attribute[0] + + if self.sensor_existence_validation and sensor_name not in self.sensors_default_value and not self.appdaemon_api.entity_exists(sensor_name): + return False + + if not sensor_name in self.entities_to_listen: + self.entities_to_listen[sensor_name] = [] + + if not sensor_name_with_attribute[1] in self.entities_to_listen[sensor_name]: + self.entities_to_listen[sensor_name].append(sensor_name_with_attribute[1]) + + return True + + def get_variable_value(self,variable_name): + parsed_data = self.variables_parsed_data[variable_name] + sensor_name_with_attribute = parsed_data.get_sensor_name_with_attribute(self) + if sensor_name_with_attribute == None: + return parsed_data.get_value(self) + else: + if sensor_name_with_attribute[1] != None: + key_name = f"{sensor_name_with_attribute[0]}.{sensor_name_with_attribute[1]}" + else: + key_name = sensor_name_with_attribute[0] + + if key_name in self.sensor_states: + return self.sensor_states[key_name] + else: + if sensor_name_with_attribute[1] != None: + value = self.appdaemon_api.get_state(sensor_name_with_attribute[0], attribute = sensor_name_with_attribute[1]) + # accessing an attribute of unavailable entity should return unavailable + # whereas accessing an attribute that doesn't exist return None + if value == None and self.appdaemon_api.get_state(sensor_name_with_attribute[0]) == 'unavailable': + return 'unavailable' + else: + if self.appdaemon_api.entity_exists(sensor_name_with_attribute[0]): + value = self.appdaemon_api.get_state(sensor_name_with_attribute[0]) + else: #we should always have a default value if we get here + value = self.sensors_default_value[sensor_name_with_attribute[0]] + + self.sensor_states[key_name] = value + return value + + def get_entities_to_listen(self): + return self.entities_to_listen + + def get_constant_value(self,name): return self.variables[name] + + def declare_constant(self,name : str,value): + def is_a_sensor(string): + if isinstance(string,str): + splitted_string = string.split('.') + if len(splitted_string) == 2: + for part in splitted_string: + if not part.replace('_', '').isalnum() or not part.isascii(): + return False + return True + return False + + + assert name not in self.variables, f"Constant {name} already exist with value {self.variables[name]}" + assert name.replace('_', '').isalnum() and name.isascii(), f"Invavalid constant name: {name}" + self.variables[name] = value + if is_a_sensor(value): + self.sensor_aliases.append(name) + else: + self.constants.append(name) + + self.parser = None #adding new constant need to reset the parser + + def declare_sensor_alias_default_value(self, alias, default_value): + assert alias in self.variables, f"Unknown alias {alias} please declare it first" + sensor_name = self.variables[alias] + assert sensor_name not in self.sensors_default_value, f"{sensor_name} has already a default value {self.sensors_default_value[sensor_name]}" + + self.sensors_default_value[sensor_name] = default_value + + def evaluate_variable_expression(self,expression_string): + if not self.parser: + self.parser = self.create_variable_parser() + + try: return self.parser.parseString(expression_string,parse_all=True)[0] + except pp.ParseException: return None \ No newline at end of file diff --git a/eventhandler.py b/eventhandler.py new file mode 100644 index 0000000..c5ae76b --- /dev/null +++ b/eventhandler.py @@ -0,0 +1,99 @@ +import appdaemon.plugins.hass.hassapi as hass +import pickle +import os + +class EventDispatcher: + def __init__(self,ad_api,event_name,callback,event_data,reset_data,event_context): + self.event_name = event_name + self.callback = callback + self.event_data = event_data + self.reset_data = reset_data + self.waiting_for_reset = False + self.event_context = event_context + self.ad_api = ad_api + if event_data == None: + self.ad_api.listen_event(self.on_event,event_name) + else: + event_kwargs = {} + for key,value in event_data.items(): #when we have dict in event_data, we usually only want a partial match, we deal with that in process_event + if not isinstance(value,dict): + event_kwargs[key] = value + self.ad_api.listen_event(self.on_event,event_name,**event_kwargs) + if reset_data: + reset_data_kwargs = {} + for key,value in reset_data.items(): #when we have dict in event_data, we usually only want a partial match, we deal with that in process_event + if not isinstance(value,dict): + reset_data_kwargs[key] = value + + if reset_data_kwargs != event_kwargs: + self.ad_api.listen_event(self.on_event,event_name,**reset_data_kwargs) + + def on_event(self, event_name, data, kwargs): + self.process_event(data) + + def process_event(self,data): + def are_data_matching(ref_data, data): + if ref_data != None: + for key in ref_data: + if not key in data: + return False + if isinstance(ref_data[key],dict): + if not are_data_matching(ref_data[key],data[key]): + return False + elif ref_data[key] != data[key]: + return False + return True + + #lets see if those args match + match_event_data = are_data_matching(self.event_data,data) + match_reset_data = are_data_matching(self.reset_data,data) + + #special threatment for event + if match_event_data and not self.waiting_for_reset: + if self.reset_data != None: + self.waiting_for_reset = True + + self.callback(self.event_name, self.event_data ,self.event_context) + return True + + if self.reset_data != None and match_reset_data: + self.waiting_for_reset = False + + return False + +class EventHandler: + def __init__(self, ad_api,events_block,callback,event_context = None): #event_context is passed back to the CB has a context + def register_event_with_params(event_block,callback,event_context): + self.add_dispatcher(event_block['event_name'],callback,event_block['event_data'] if 'event_data' in event_block else None, event_block['reset_data'] if 'reset_data' in event_block else None,event_context) + + self.__ad_api = ad_api + self.event_dispatchers = [] + + #try: + for event_block in events_block.values(): + register_event_with_params(event_block,callback,event_context) + #except (AttributeError): + # self.log(f"Format not supported : {events_block}") + + def log(self,message,**kwargs): + self.__ad_api.log(message,**kwargs) + + def add_dispatcher(self,event_name,callback,event_data = None,reset_data = None ,event_context = None): + self.log(f'Registering dispatcher {callback.__name__} for event "{event_name}" ({event_data})') + dispatcher = EventDispatcher(self.__ad_api,event_name,callback,event_data,reset_data,event_context) + self.event_dispatchers.append(dispatcher) + + # if not event_name in self.event_dispatchers: + # self.event_dispatchers[event_name] = list() + # self.log(f"listening event {event_name}") + # self.__ad_api.listen_event(self._on_event_internal,event_name) + + # self.event_dispatchers[event_name].append(dispatcher) + + # def _on_event_internal(self, event_name, data, kwargs): + # if event_name in self.event_dispatchers: + # for dispatcher in self.event_dispatchers[event_name]: + # if dispatcher.process_event(data) == True: + # break + # else: + # self.log(f"{event_name} has no dispatcher registered",level="WARNING") \ No newline at end of file diff --git a/expressionparser.py b/expressionparser.py new file mode 100644 index 0000000..d27e823 --- /dev/null +++ b/expressionparser.py @@ -0,0 +1,354 @@ +import pyparsing as pp +import threading +import datetime + +'''Base ExpressionContext for the parser +MUST be implemented : +get_variable_value(self,variable_name) is called when evaluating the expression to retrieve the value of a variable + +optional: +add_variable(self,variable_name) is called when a variable is encounter during the parsing of the expression + +reset(self) is called before a new evaluation + +''' +class BaseExpressionContext: + def add_variable(self,variable_name): return True + + def get_variable_value(self,variable_name): return None + + def reset(self): pass + +class ParsingException(Exception): + def __init__(self, message = None, expression_string = None, char_pos = -1): + self.char_pos = char_pos + self.message = message + self.expression_string = expression_string + + def __str__(self): + string = "" + if self.expression_string: + string += f'Error parsing string "{self.expression_string}"' + if self.char_pos >= 0: + string += f' at character {self.char_pos}' + string += ':' + elif self.char_pos >= 0: + string += f'Parsing error at character {self.char_pos}' + if self.message: + string += self.message + + return string + +class EvaluationException(Exception): + def __init__(self, message = None, expression_string = None, char_pos = -1, operands = []): + self.char_pos = char_pos + self.message = message + self.expression_string = expression_string + self.operands = operands + + def __str__(self): + string = "" + if self.expression_string: + string += f'Error evaluating "{self.expression_string}"' + if self.char_pos >= 0: + string += f' at character {self.char_pos}' + string += ':' + elif self.char_pos >= 0: + string += f'Parsing error at character {self.char_pos}' + if self.message: + string += self.message + + return string + +class EvaluateBase(object): + def eval_bool(self) : return self.convert_to_bool(self.eval()) + + @staticmethod + def convert_to_bool(value): + if value == 'off': return False + if value == 0: return False + if value == 'False': return False + if value == 'false': return False + if value == False: return False + if value == None: return False + # this should be handle explicitely by the condition + # edit: this is actually usefull if don't want disable to be evaluated as true + if value == 'unavailable': return False + + return True + + @staticmethod + def convert_python_type(value): + if value == None: return None + + if type(value) == str: + #converting to bool + if value == 'False': return False + if value == 'false': return False + #if value == False: return False + + if value == 'True': return True + if value == 'true': return True + #if value == True: return True + + #Converting "None" to None + if value == "none" or value == "None": return None + + #converting to int + try: return int(value) + except: pass + + #converting to float + try: return float(value) + except: pass + + #converting to datetime + try: return datetime.datetime.strptime(value,"%Y-%m-%dT%H:%M:%S.%f%z") + except: pass + + # this should really be a string + return value + else: + #it's probably already been converted + return value + +class EvaluateOperand(EvaluateBase): + def __init__(self, expression_string: str, location: int, tokens: pp.ParseResults): + self.value = tokens[0] + + def __str__(self): return str(self.eval()) + + def eval(self): pass + def set_context(self,context): pass + +class EvaluateNumber(EvaluateOperand): + def eval(self): return EvaluateBase.convert_python_type(self.value) + # try: return float(self.value) + # except: return float('nan') + +class EvaluateString(EvaluateOperand): + def eval(self): return str(self.value) + +class EvaluateVariable(EvaluateOperand): + def __init__(self, expression_string: str, location: int, tokens: pp.ParseResults): + super().__init__(expression_string, location, tokens) + self.context : BaseExpressionContext = None + self.expression_string = expression_string + self.location = location + + def eval(self): + return_value = self.context.get_variable_value(self.value) + return EvaluateBase.convert_python_type(return_value) + # if return_value == None: + # return None + + # if type(return_value) == bool: return return_value + + # try: return_value = float(return_value) + # except: pass + + # return return_value + + def __str__(self): return f"{self.value}[{self.eval()}]" + + def set_context(self,context : BaseExpressionContext): + self.context = context + if not self.context.add_variable(self.value): + raise ParsingException(message = f"Unknown variable : {self.value}",char_pos= self.location, expression_string=self.expression_string) + +def opPair(tokenlist): + #Return pairs of tokens to the caller as a generator. + it = iter(tokenlist) + while 1: + try: yield(next(it), next(it)) + except StopIteration: break + +class EvaluateOperator(EvaluateBase): + def __init__(self, expression_string: str, location: int, tokens: pp.ParseResults): + self.value = tokens[0] + self.expression_string = expression_string + self.location = location + + def set_context(self,context): + for element in self.value: + if isinstance(element,EvaluateBase): element.set_context(context) + +class EvaluatSignOp(EvaluateOperator): + def eval(self): + assert(len(self.value) == 2) + if self.value[0] == '-': + return -self.value[1].eval() + else: + return self.value[1].eval() + +class EvaluateNot(EvaluateOperator): + + operators = { + "not": ["not", lambda a: not EvaluateNot.convert_to_bool(a)] + } + + def eval(self): + for op, val in opPair(self.value[0:]): + fn = EvaluateNot.operators[op][1] + val2 = val.eval_bool() + val = fn(val2) + return val + + def __str__(self): + return " ".join([str(v) for v in self.value]) + +class EvaluateBinaryOp(EvaluateOperator): + + def eval(self): + val1 = self.value[0].eval() + for op, val in opPair(self.value[1:]): + fn = self.operators[op][1] + if not self.should_skip_second_operand(op,val1): + val2 = val.eval() + try: val1 = fn(val1, val2) + except TypeError: raise EvaluationException(message = f"Couldn't not evaluate '{val1} {op} {val2}'",char_pos= self.location, expression_string=self.expression_string,operands = [val1,val2]) + return val1 + + def should_skip_second_operand(self,operator,first_value): return False + + def __str__(self): + string = "(" + val1 = str(self.value[0]) + for op, val in opPair(self.value[1:]): + fn = self.operators[op][0] + if not self.should_skip_second_operand(op,self.value[0].eval()): + val2 = val + else: val2 = "[Skipped]" + string += f"{val1} {fn} {val2}" + + evaluation_result = self.eval() + try: evaluation_result = self.eval() + except TypeError: evaluation_result = "Evaluation Error" + + return string + f")[{evaluation_result}]" + +class EvaluateMulOp(EvaluateBinaryOp): + + operators = { + "*": ["*", lambda a, b: a * b], + "/": ["/", lambda a, b: a / b], + } + +class EvaluatePlusOp(EvaluateBinaryOp): + + operators = { + "+": ["+", lambda a, b: a + b], + "-": ["-", lambda a, b: a - b], + } + +class EvaluateComparison(EvaluateBinaryOp): + + operators = { + "<": ["<", lambda a, b: EvaluateBase.convert_python_type(a) < EvaluateBase.convert_python_type(b)], + ">": [">", lambda a, b: EvaluateBase.convert_python_type(a) > EvaluateBase.convert_python_type(b)], + ">=": [">=", lambda a, b: EvaluateBase.convert_python_type(a) >= EvaluateBase.convert_python_type(b)], + "<=": ["<=", lambda a, b: EvaluateBase.convert_python_type(a) <= EvaluateBase.convert_python_type(b)], + "==": ["==", lambda a, b: EvaluateBase.convert_python_type(a) == EvaluateBase.convert_python_type(b)], + "!=": ["!=", lambda a, b: EvaluateBase.convert_python_type(a) != EvaluateBase.convert_python_type(b)], + } + +class EvaluateOrAnd(EvaluateBinaryOp): + + operators = { + "and": ["and", lambda a, b: EvaluateBase.convert_to_bool(a) and EvaluateBase.convert_to_bool(b)], + "or": ["or", lambda a, b: EvaluateBase.convert_to_bool(a) or EvaluateBase.convert_to_bool(b)], + } + + def should_skip_second_operand(self,operator,first_value): + if first_value == False and operator == "and": + return True + if first_value == True and operator == "or": + return True + return False + +class EvaluateTernary(EvaluateOperator): + def eval(self): + ret = self.value[0].eval_bool() + i = 1 + while i < len(self.value): + ret = self.value[i+1].eval() if ret else self.value[i+3].eval() + i += 4 + return ret + + def __str__(self): + condition = self.value[0].eval_bool() + return f"({self.value[0]} {self.value[1]} {self.value[2] if condition else '[Skipped]'} {self.value[3]} {self.value[4] if not condition else '[Skipped]'})[{self.eval()}]" + +def create_parser(): + pp.ParserElement.enablePackrat() + #sys.setrecursionlimit(3000) + + #floatNumber = pp.Regex(r'[-]?\d+(\.\d*)?([eE][-+]?\d+)?').setParseAction(EvaluateNumber) + floatNumber = pp.pyparsing_common.real.setParseAction(EvaluateNumber) + intNumber = pp.pyparsing_common.signed_integer.setParseAction(EvaluateNumber) + variable = pp.Word(pp.alphanums + "_.").setParseAction(EvaluateVariable) + string = pp.QuotedString(quoteChar="'").setParseAction(EvaluateString) + + + operand = floatNumber | intNumber | variable | string + + signop = pp.oneOf(("+", "-")) + multop = pp.oneOf(("*", "/")) + plusop = pp.oneOf(("+", "-")) + comparisonOp = pp.oneOf(("<", "<=", ">", ">=" ,"==", "!=")) + notOp = pp.Keyword("not") + andOp = pp.Keyword("and") + orOp = pp.Keyword("or") + #boolOp = pp.oneOf(("and" , "or")) + + #https://stackoverflow.com/questions/63225718/pyparsing-precedence-breaks-with-unary-operator + expr = pp.infixNotation( + operand, + [ + (signop, 1, pp.opAssoc.RIGHT,EvaluatSignOp), + (multop, 2, pp.opAssoc.LEFT,EvaluateMulOp), + (plusop, 2, pp.opAssoc.LEFT,EvaluatePlusOp), + (comparisonOp,2,pp.opAssoc.LEFT,EvaluateComparison), + (notOp,1,pp.opAssoc.RIGHT,EvaluateNot), + (andOp,2,pp.opAssoc.LEFT,EvaluateOrAnd), + (orOp,2,pp.opAssoc.LEFT,EvaluateOrAnd), + (("?", ":"), 3, pp.opAssoc.RIGHT,EvaluateTernary) + ],) + + return expr + +class ExpressionParser(object): + + #the parser need to be created only once + parser = create_parser() + parser_lock = threading.RLock() #the parser can be access from many app at the same time + + def __init__(self, expression_string : str, expression_context : BaseExpressionContext): + self.context = expression_context + self.expression_string = expression_string #let's keep it for debuging + with ExpressionParser.parser_lock: + try: self.parsed_data = ExpressionParser.parser.parseString(expression_string,parse_all=True)[0] + except pp.ParseException as e: raise ParsingException(message = str(e),char_pos=e.column,expression_string=expression_string) + # except KeyError as e: + # import traceback + # message = str(e) + # for line in traceback.format_stack()[:-1]: + # message += line.strip() + # raise ParsingException(message = message,char_pos=-1,expression_string=expression_string) + + self.parsed_data.set_context(self.context) + + def __str__(self): + return str(self.parsed_data) + + def evaluate(self): + self.context.reset() + result = self.parsed_data.eval() + return result + +class ConditionsParser(ExpressionParser): + + def evaluate(self): + self.context.reset() + result = self.parsed_data.eval_bool() + return result diff --git a/kwargsparser.py b/kwargsparser.py new file mode 100644 index 0000000..461d094 --- /dev/null +++ b/kwargsparser.py @@ -0,0 +1,60 @@ +# Usage: +# +# you can optionnaly pass two callback log(string) to log error and warning. +# If you don't, error will assert and warning will be ignored +# parser = kwargsParser(kwargs) +# +# some_arg = parser.parse_args('some_arg',None) +# some_other_arg = parser.parse_args('some_other_arg',False) +# +# parser.validate_args() #optional, will assert if kwargs contain unknow args and will display a list of valid args +# + +class kwargsParser(): + def __init__(self,kwargs,log_error_cb = None, log_warning_cb = None): + self.kwargs = dict(kwargs) + self.valid_args = list() + self.log_error_cb = log_error_cb + self.log_warning_cb = log_warning_cb + + def parse_args(self,arg_name,default_value,legacy_args_name = []): + self.valid_args.append(arg_name) + arg_found = None + if arg_name in self.kwargs: + arg_found = arg_name + else: #let's check legacy args + for legacy_arg in legacy_args_name: + if legacy_arg in self.kwargs: + arg_found = legacy_arg + self.log_warning(f"{arg_found} is a legacy arg please use {arg_name} instead") + break + + if arg_found: + for legacy_arg in legacy_args_name: + if arg_found != legacy_arg and legacy_arg in self.kwargs: + self.log_warning(f"{legacy_arg} is a legacy arg and will ignored as you used {arg_name} before") + + value = self.kwargs[arg_found] + del self.kwargs[arg_found] + return value + else: + return default_value + + def log_error(self, error_string): + if self.log_error_cb: + self.log_error_cb(error_string) + else: + assert False, error_string + + def log_warning(self, warning_string): + if self.log_warning_cb: + self.log_warning_cb(warning_string) + + def validate_args(self,args_to_ignore_in_validation = []): + for arg in args_to_ignore_in_validation: + if arg in self.kwargs: + del self.kwargs[arg] + + args_left = len(self.kwargs) + if args_left != 0: + self.log_error(f"Unknown kwargs : {self.kwargs}. Valid args are {self.valid_args}") \ No newline at end of file diff --git a/logger_interface.py b/logger_interface.py new file mode 100644 index 0000000..5799302 --- /dev/null +++ b/logger_interface.py @@ -0,0 +1,44 @@ +import traceback + +class LoggerInterface: + + def __init__(self): + self.initialized = False + + def __init__(self,ad_api, default_log = "main_log",mute = False): + self.initialize_logger_interface(ad_api,default_log,mute) + + def initialize_logger_interface(self,ad_api, default_log = "main_log",mute = False): + self.ad_api = ad_api + self.default_log = default_log + self.mute = False + self.initialized = True + + def mute_logger(self,mute : bool): + if mute: + self.ad_api.log("Muting logger",level = "INFO", log = self.default_log) + self.mute = bool(mute) + + def log_info(self,message): + assert self.initialized + if not self.mute: + self.ad_api.log(message,level = "INFO", log = self.default_log) + + def log_warning(self,message): + assert self.initialized + self.ad_api.log(message,level = "WARNING", log = self.default_log) + + def log_error(self,message,stop_app = False,dump_stack = False): + assert self.initialized + + if dump_stack: + stack = "\n" + for line in traceback.format_stack()[:-1]: + stack += line.strip() + '\n' + else: stack = "" + + self.ad_api.log(message,level = "ERROR", log = self.default_log) + self.ad_api.log(message + stack,level = "ERROR", log = "error_log") + + if stop_app: + self.stop_app(self.name) \ No newline at end of file diff --git a/smartcondition.py b/smartcondition.py new file mode 100644 index 0000000..9d5c1ea --- /dev/null +++ b/smartcondition.py @@ -0,0 +1,614 @@ +#import appdaemon.plugins.hass.hassapi as hass +import enum +from datetime import datetime +#from pickle import FALSE + +from adexpressioncontext import ADExpressionContext +from expressionparser import ConditionsParser,ParsingException,EvaluationException +from kwargsparser import kwargsParser + +def catch_smartcondition_exception(error_lambda): + def decorator(function): + def wrapper(*args, **kwargs): + try: return function(*args, **kwargs) + except ParsingException as e: error_lambda(args[0],str(e)) + return wrapper + return decorator + +class Result(enum.Enum): + Failed = 0 + Succeeded = 1 + Disabled = 2 + Unavailable = 3 + +class Evaluator(): + __TEMPLATE_CONDITIONS_STRING = "template_conditions" + __TRIGGER_CONDITIONS_STRING = "trigger_conditions" + __BLOCKING_CONDITIONS_STRING = "blocking_conditions" + __DISABLE_CONDITIONS_STRING = "disable_conditions" + __CALLBACK_DELAY_STRING = "callback_delay" + __LOG_NO_VALID_CONDITION_DETAILS = "log_no_valid_condition_details" + __LOG_INIT = "log_init" + __EXTRA_ENTITIES_TO_LISTEN = "extra_entities_to_listen" + __CONSTANTS = "constants" + __UNAVAIBILITY_RESULT = "unavaibility_result" + + def __init__(self, appdaemon_api, conditions_block,**kwargs): + self.__trigger_conditions = [] + self.__blocking_conditions = [] + self.__disable_conditions = [] + self.__entities_to_listen = {} + self.__debug_log_prefixes = [] + self.__callback_delay = 0.0 + self.__last_evaluation_result = None + self.__state_callbacks_handle = list() + self.__condition_state_change_delayed = list() + self.__condition_state_change_delayed_cb_handle = None + + self.__appdaemon_api = appdaemon_api + self.__activated = False + self.__callback_trigger_reason_log_string = None + self.__evaluation_result_log_string = None + + parser = kwargsParser(kwargs,lambda string: self.__log(string,level = "ERROR"),lambda string: self.__log(string,level = "WARNING")) + self.__logger_interface = parser.parse_args('logger_interface',None) + + self.__no_log_prefix = parser.parse_args('no_log_prefix',False) #as no_log_prefix is used to feedback parser error, it need to be defined first + condition_name = parser.parse_args('condition_name',None,['log_prefix']) + if condition_name: + if isinstance(condition_name,list): + for name in condition_name: + self.__debug_log_prefixes.append(f"[{name}]") + else: + self.__debug_log_prefixes.append(f"[{condition_name}]") + + extra_entities_to_listen = parser.parse_args(self.__EXTRA_ENTITIES_TO_LISTEN,None) + if extra_entities_to_listen: + if isinstance(extra_entities_to_listen, str): extra_entities_to_listen = [ extra_entities_to_listen ] + self.__add_to_entities_to_listen(extra_entities_to_listen) + + self.__on_update_callback = parser.parse_args('on_update_cb',None,['callback']) + self.__on_change_callback = parser.parse_args('on_change_cb',None) + self.__unavaibility_result = parser.parse_args(self.__UNAVAIBILITY_RESULT,None) #None means throw an exception + self.__on_succeed_callback = parser.parse_args('on_succeed_cb',None) + self.__on_fail_callback = parser.parse_args('on_fail_cb',None) + self.__pass_condition_name_to_cb = parser.parse_args('pass_condition_name_to_cb',None) + self.__templates_library = parser.parse_args('templates_library',None) + #try: default_args = appdaemon_api.args + #except AttributeError: default_args = None + #args = parser.parse_args('args',default_args) + self.__log_init = parser.parse_args(self.__LOG_INIT,False) + self.__trigger_callback_on_activation = parser.parse_args('trigger_callback_on_activation',True) + self.__trigger_callback_on_entity_creation = parser.parse_args('trigger_callback_on_entity_creation',True) + self.__log_no_valid_condition_details = parser.parse_args(self.__LOG_NO_VALID_CONDITION_DETAILS,False) + self.__log_callback_trigger_reason = parser.parse_args('log_callback_trigger_reason',True) + self.__sensors_aliases_default_value = parser.parse_args('sensors_aliases_default_value',dict()) + activated = parser.parse_args('activated',True) + sensor_existence_validation = parser.parse_args('sensor_existence_validation',True) + + self.expression_context = ADExpressionContext(self.__appdaemon_api,sensor_existence_validation) + constants = parser.parse_args('constants',dict()) + for constant_name,value in constants.items(): + self.expression_context.declare_constant(constant_name,value) + for alias,value in self.__sensors_aliases_default_value.items(): + self.expression_context.declare_sensor_alias_default_value(alias,value) + + parser.validate_args() + + self.__parse_conditions_block(conditions_block) + + if activated: self.activate() + + def __has_user_callback(self): + return self.__on_update_callback or self.__on_change_callback or self.__on_succeed_callback or self.__on_fail_callback + + def __has_user_callback_based_on_result(self): + return self.__on_change_callback or self.__on_succeed_callback or self.__on_fail_callback + + def activate(self): + if not self.__activated: + self.__activated = True + if self.__has_user_callback(): + self.__register_internal_callback() + if self.__has_user_callback_based_on_result() and self.__trigger_callback_on_activation: + self.__trigger_callbacks("Callback trigger by condition activation",True) + + def deactivate(self): + if self.__activated: + self.__activated = False + self.__last_evaluation_result = None + self.__unregister_internal_callback() + + def __evaluate(self): + try: + self.__debug_log_prefixes.append("[Evaluating]") + result, disable_reason = self.__evaluate_conditions_list(self.__disable_conditions) + if result: + self.__evaluation_result_log_string = self.__log_to_string(f"Disabled by {disable_reason}") + self.__debug_log_prefixes.pop() + return Result.Disabled + else: + result, trigger_reason = self.__evaluate_conditions_list(self.__trigger_conditions) + if result: + logstring = f"Triggered by {trigger_reason}" + result,blocking_reason = self.__evaluate_conditions_list(self.__blocking_conditions) + if result: + logstring = f"Blocked by {blocking_reason}({logstring})" + self.__evaluation_result_log_string = self.__log_to_string(logstring) + self.__debug_log_prefixes.pop() + return Result.Failed + else: + self.__evaluation_result_log_string = self.__log_to_string(logstring) + self.__debug_log_prefixes.pop() + return Result.Succeeded + if self.__log_no_valid_condition_details: + self.__evaluation_result_log_string = self.__log_to_string(f"No valid conditions\n{trigger_reason}") + else: + self.__evaluation_result_log_string = self.__log_to_string(f"No valid conditions") + self.__debug_log_prefixes.pop() + return Result.Failed + except EvaluationException as e: + if self.__unavaibility_result != None and "unavailable" in e.operands: + self.__log_to_string(f"The expression will be evaluated as '{self.__unavaibility_result}' because of the following exception {e}") + self.__debug_log_prefixes.pop() + return self.__unavaibility_result + else: + self.__debug_log_prefixes.pop() + raise e + except Exception as e: + self.__debug_log_prefixes.pop() + raise e + + + def evaluate(self,log = True): + try: result = self.__evaluate() + except EvaluationException as e: + self.__log(f"{e}",level = "ERROR") + return None + if log: + self.log_evaluation_result() + return result + + def log_callback_trigger_reason(self): + if self.__callback_trigger_reason_log_string: + self.__log_info(self.__callback_trigger_reason_log_string) + self.__callback_trigger_reason_log_string = None + + def log_evaluation_result(self): + if self.__evaluation_result_log_string: + self.__log_info(self.__evaluation_result_log_string) + self.__evaluation_result_log_string = None + + def get_evaluation_result_log_string(self): + return self.__evaluation_result_log_string + + def log_entities_states(self): + log_string = "Entities states: " + for entity,attributes in self.__entities_to_listen.items(): + for attribute in attributes: + if attribute == "state" or attribute == None: + log_string += f"{entity} = {self.__appdaemon_api.get_state(entity)}," + #not sure why it was here, most likely a left over + #handle = self.__appdaemon_api.listen_state(self.__on_condition_state_change,entity) + else: + log_string += f"{entity}[{attribute}] = {self.__appdaemon_api.get_state(entity,attribute = attribute)}," + + self.__log_info(log_string[:-1]) + + def get_entities_to_listen(self): + return self.__entities_to_listen + + def get_entities_to_listen_as_list(self): + entities_list = list() + for entity_id,attributes in self.__entities_to_listen.items(): + for attribute in attributes: + if attribute == None: + entities_list.append(entity_id) + else: + entities_list.append(f"{entity_id}.{attribute}") + return entities_list + + def __add_to_entities_to_listen(self,extra_entities_to_listen): + #__add_to_entities_to_listen support both : + # ['sensor.a.attribute', 'sensor.a','sensor.a.another_attribute'] and + # { 'sensor.a' : ['attribute','another_attribute'] } + # single entity to listen in the form of a string 'sensor.a' are converted to a one element list + if isinstance(extra_entities_to_listen,str): + extra_entities_to_listen = [ extra_entities_to_listen ] + + if isinstance(extra_entities_to_listen, list): + for entity_string in extra_entities_to_listen: + splitted_entity = entity_string.split('.') + + if len(splitted_entity) == 2: + entity_id = entity_string + attribute = None + elif len(splitted_entity) == 3: + entity_id = f"{splitted_entity[0]}.{splitted_entity[1]}" + attribute = splitted_entity[2] + else: + self.__log(f"Invalid entity string '{entity_string}' present in extra_entities_to_listen",level = "ERROR") + continue + + if not entity_id in self.__entities_to_listen: + self.__entities_to_listen[entity_id] = [] + if not attribute in self.__entities_to_listen[entity_id]: + self.__entities_to_listen[entity_id].append(attribute) + + elif isinstance(extra_entities_to_listen, dict): + for entity in extra_entities_to_listen: + if not entity in self.__entities_to_listen: + self.__entities_to_listen[entity] = [] + + #I support both "my.entity" : "attribute" and "my.entity": ["attribute1","attribute2"] + if isinstance(extra_entities_to_listen[entity], str): + attribute = extra_entities_to_listen[entity] + if not attribute in self.__entities_to_listen[entity]: self.__entities_to_listen[entity].append(attribute) + else: + for attribute in extra_entities_to_listen[entity]: + if not attribute in self.__entities_to_listen[entity]: self.__entities_to_listen[entity].append(attribute) + else: + self.__log(f"extra_entities_to_listen is in an invalid format: {extra_entities_to_listen}",level = "ERROR") + + def __evaluate_conditions_list(self,condition_list): + for condition in condition_list: + if condition.evaluate(): + return True, str(condition) + + full_condition = "" + for condition in condition_list: + full_condition += f"- {condition}\n" + + return False, full_condition[:-1] + + def __load_template(self, conditions_block, template_library): + + def add_conditions_to_conditions_block(conditions_block,conditions_type,extra_conditions): + def log_template_loading(condition_string): + if self.__log_init: + self.__log(f"Loading {conditions_type} : '{condition_string}'") + + if not conditions_type in conditions_block: + conditions_block[conditions_type] = list() + elif isinstance(conditions_block[conditions_type], str): + #convert inline condition to list as we are goint to add more of them + conditions_block[conditions_type] = [conditions_block[conditions_type]] + + if isinstance(extra_conditions, list): + for condition_string in extra_conditions: + log_template_loading(condition_string) + conditions_block[conditions_type].append(condition_string) + else: #support inline conditions + log_template_loading(extra_conditions) + conditions_block[conditions_type].append(extra_conditions) + + if isinstance(conditions_block, str): + if conditions_block[0] == '<' and conditions_block[-1] == '>': + conditions_block = { self.__TEMPLATE_CONDITIONS_STRING : conditions_block} + + if isinstance(conditions_block, dict) and self.__TEMPLATE_CONDITIONS_STRING in conditions_block: + if template_library != None: + self.__debug_log_prefixes.append("[Loading Templates]") + template_string = conditions_block[self.__TEMPLATE_CONDITIONS_STRING] + if template_string[0] == '<' and template_string[-1] == '>': + splitted_template = template_string.split(",") + for template in splitted_template: + template = template.strip() + if template[0] == '<' and template[-1] == '>': + template = template[1:-1].strip() + if template in template_library: + if self.__log_init: + self.__log(f"Loading template {template}") + if isinstance(template_library[template],dict): + param_parser = kwargsParser(template_library[template],lambda string: self.__log(string,level = "ERROR"),lambda string: self.__log(string,level = "WARNING")) + trigger_conditions_args = param_parser.parse_args(self.__TRIGGER_CONDITIONS_STRING,None) + if trigger_conditions_args: + add_conditions_to_conditions_block(conditions_block,self.__TRIGGER_CONDITIONS_STRING,trigger_conditions_args) + + blocking_conditions_args = param_parser.parse_args(self.__BLOCKING_CONDITIONS_STRING, None) + if blocking_conditions_args: + add_conditions_to_conditions_block(conditions_block,self.__BLOCKING_CONDITIONS_STRING,blocking_conditions_args) + + disable_conditions_args = param_parser.parse_args(self.__DISABLE_CONDITIONS_STRING, None) + if disable_conditions_args: + add_conditions_to_conditions_block(conditions_block,self.__DISABLE_CONDITIONS_STRING,disable_conditions_args) + + if param_parser.parse_args(self.__TEMPLATE_CONDITIONS_STRING, None): + self.__log(f"Template '{template}' contain recursive template",level = "ERROR") + + param_parser.validate_args() + else: #manage inline conditions + add_conditions_to_conditions_block(conditions_block,self.__TRIGGER_CONDITIONS_STRING,template_library[template]) + else: + self.__log(f"Template '{template}' is not present in the template library",level = "ERROR") + else: + self.__log(f"Error loading templates '{template}', invalid template format",level = "ERROR") + break + else: + self.__log(f"Error loading templates '{template_string}', invalid template format",level = "ERROR") + else: + self.__log(f"Error no template library defined",level = "ERROR") + + #we don't need the template reference anymore in the dictionnary + del conditions_block['template_conditions'] + self.__debug_log_prefixes.pop() + + return conditions_block + + def __parse_conditions_block(self,conditions_block): + logstring = "" + self.__debug_log_prefixes.append("[Parsing]") + + args_to_ignore_in_validation = [self.__LOG_INIT] + try: self.__log_init = conditions_block[self.__LOG_INIT] + except (TypeError,KeyError): pass + + conditions_block = self.__load_template(conditions_block, self.__templates_library) + + if isinstance(conditions_block, dict): + param_parser = kwargsParser(conditions_block,lambda string: self.__log(string,level = "ERROR"),lambda string: self.__log(string,level = "WARNING")) + + self.__callback_delay = param_parser.parse_args(self.__CALLBACK_DELAY_STRING,self.__callback_delay) + if self.__log_init and self.__callback_delay > 0.0: + self.__log("Callback delay set to " + str(self.__callback_delay)) + + self.__log_no_valid_condition_details = param_parser.parse_args(self.__LOG_NO_VALID_CONDITION_DETAILS, self.__log_no_valid_condition_details) + + + unavaibility_result = param_parser.parse_args(self.__UNAVAIBILITY_RESULT, None) + if unavaibility_result != None: + if unavaibility_result == "Succeeded": self.__unavaibility_result = Result.Succeeded + elif unavaibility_result == "Failed": self.__unavaibility_result = Result.Failed + elif unavaibility_result == "Disabled": self.__unavaibility_result = Result.Disabled + elif unavaibility_result == "None": self.__unavaibility_result = None + else: self.__log_error(f'Invalid value for {self.__UNAVAIBILITY_RESULT} : {unavaibility_result}. Valid values are [Succeeded,Failed,Disabled,Unavailable,None]') + + extra_entities_to_listen = param_parser.parse_args(self.__EXTRA_ENTITIES_TO_LISTEN, None) + if extra_entities_to_listen: + if self.__log_init: + self.__log(f"adding extra entities to listen : {extra_entities_to_listen}") + self.__add_to_entities_to_listen(extra_entities_to_listen) + + constants = param_parser.parse_args(self.__CONSTANTS, None) + if constants: + if self.__log_init: + self.__log(f"declaring constants :") + for constant_name,value in constants.items(): + if self.__log_init: + self.__log(f"{constant_name} = {value}") + self.expression_context.declare_constant(constant_name,value) + + + trigger_conditions_args = param_parser.parse_args(self.__TRIGGER_CONDITIONS_STRING,None) + if trigger_conditions_args: + conditions_count = self.__parse_conditions_from_arg(trigger_conditions_args,self.__trigger_conditions) + if logstring == "": logstring = "Found " + else: logstring += ", " + logstring += str(conditions_count) + " trigger conditions" + + blocking_conditions_args = param_parser.parse_args(self.__BLOCKING_CONDITIONS_STRING, None) + if blocking_conditions_args: + conditions_count = self.__parse_conditions_from_arg(blocking_conditions_args,self.__blocking_conditions) + if logstring == "": logstring = "Found " + else: logstring += ", " + logstring += str(conditions_count) + " blocking conditions" + + disable_conditions_args = param_parser.parse_args(self.__DISABLE_CONDITIONS_STRING, None) + if disable_conditions_args: + conditions_count = self.__parse_conditions_from_arg(disable_conditions_args,self.__disable_conditions) + if logstring == "": logstring = "Found " + else: logstring += ", " + logstring += str(conditions_count) + " disable conditions" + + param_parser.validate_args(args_to_ignore_in_validation) + else: + #inline conditon are considered as trigger conditions + conditions_count = self.__parse_conditions_from_arg(conditions_block,self.__trigger_conditions) + logstring = f"Found {conditions_count} trigger conditions" + + if logstring == "": logstring = "No conditions found" + + + if self.__log_init: + self.__log(logstring) + self.__debug_log_prefixes.pop() + + def __parse_conditions_from_arg(self,conditions_arg, conditions_list): + def parse_string(str): + condition_parser = ConditionsParser(str,self.expression_context) + #self.__log(f"{condition_parser}") + conditions_list.append(condition_parser) + self.__add_to_entities_to_listen(self.expression_context.get_entities_to_listen()) + + if isinstance(conditions_arg, list): + for condition_string in conditions_arg: + parse_string(condition_string) + else: #support inline conditions + parse_string(str(conditions_arg)) + + return len(conditions_list) + + def __register_internal_callback(self): + assert len(self.__state_callbacks_handle) == 0, f"self.__state_callbacks_handle is not empy, you are probably trying to call __register_internal_callback() twice" + logstring = "" + for entity,attributes in self.__entities_to_listen.items(): + attributes_string = "" + for attribute in attributes: + if attribute == "state" or attribute == None: #this is not healthy to use 'state' here as at some point an attribute could actually be called "state" + assert attribute != 'state', "please don't use 'state' as a keyword to design the actual state of the entity, but use None instead" + handle = self.__appdaemon_api.listen_state(self.__on_condition_state_change,entity) + else: + handle = self.__appdaemon_api.listen_state(self.__on_condition_state_change,entity, attribute = attribute) + + self.__state_callbacks_handle.append(handle) + + if attributes_string == "" : attributes_string = f"({attribute if attribute else 'state'}" + else: attributes_string += f",{attribute if attribute else 'state'}" + + attributes_string += f")" + + if attributes_string == "(state)": attributes_string = "" + + if logstring == "": + logstring = f"Registering CB for {entity}{attributes_string}" + else: logstring += f", {entity}{attributes_string}" + + if self.__log_init: + if logstring == "": + logstring = f"No CB has been registered" + self.__log(logstring) + + def __unregister_internal_callback(self): + if len(self.__state_callbacks_handle) > 0: + self.__log(f"Unregistering {len(self.__state_callbacks_handle)} callbacks") + for handle in self.__state_callbacks_handle: + self.__appdaemon_api.cancel_listen_state(handle) + self.__state_callbacks_handle = list() + + def force_last_evaluation_result(self,new_result): + assert new_result == Result.Succeeded or new_result == Result.Failed + if self.__last_evaluation_result != Result.Disabled: + #disabled smart_condition stay disabled + self.__last_evaluation_result = new_result + + def __trigger_callbacks(self,trigger_reason,triggered_from_activation = False): + has_log_trigger_reason = False + has_evaluation_result = False + + def log_trigger_reason_once(trigger_reason): + nonlocal has_log_trigger_reason + if not has_log_trigger_reason: + has_log_trigger_reason = True + self.__callback_trigger_reason_log_string = self.__log_to_string(trigger_reason) + if self.__log_callback_trigger_reason and (not triggered_from_activation or self.__log_init): + self.log_callback_trigger_reason() + + def log_evaluation_result_once(): + nonlocal has_evaluation_result + if not has_evaluation_result: + has_evaluation_result = True + if self.__log_callback_trigger_reason and (not triggered_from_activation or self.__log_init): #in that particular case, the evaluation result is a trigger reason + self.log_evaluation_result() + + if self.__on_update_callback and not triggered_from_activation: + log_trigger_reason_once(trigger_reason) + if self.__pass_condition_name_to_cb != None: + self.__on_update_callback(self.__pass_condition_name_to_cb) + else: + self.__on_update_callback() + if self.__has_user_callback_based_on_result(): + try: result = self.__evaluate() + except EvaluationException as e: + self.__log(f"{e}",level = "ERROR") + return + + #if result != Result.Disabled and result != self.__last_evaluation_result: + if result == Result.Disabled: + log_trigger_reason_once(trigger_reason) + log_evaluation_result_once() + elif result != self.__last_evaluation_result: + if self.__on_change_callback: + log_trigger_reason_once(trigger_reason) + log_evaluation_result_once() + if self.__pass_condition_name_to_cb != None: + self.__on_change_callback(self.__pass_condition_name_to_cb,self.__last_evaluation_result,result) + else: + self.__on_change_callback(self.__last_evaluation_result,result) + if self.__on_succeed_callback and result == Result.Succeeded: + log_trigger_reason_once(trigger_reason) + log_evaluation_result_once() + if self.__pass_condition_name_to_cb != None: + self.__on_succeed_callback(self.__pass_condition_name_to_cb) + else: + self.__on_succeed_callback() + if self.__on_fail_callback and result == Result.Failed: + log_trigger_reason_once(trigger_reason) + log_evaluation_result_once() + if self.__pass_condition_name_to_cb != None: + self.__on_fail_callback(self.__pass_condition_name_to_cb) + else: + self.__on_fail_callback() + #I want Disabled => something to trigger CB + self.__last_evaluation_result = result + + def __on_condition_state_change_delayed(self, kwargs): + self.__condition_state_change_delayed_cb_handle = None + if len(self.__condition_state_change_delayed) == 0: + self.__log(f"Callback has been trigger but self.__condition_state_change_delayed is empty, this shouldn't happen, the callback won't be called", level = "WARNING") + + else: + logstring = "Callback trigger by" + + for element in self.__condition_state_change_delayed: + logstring = logstring + f" {element[0]}({element[1]} => {element[2]} at {element[3].strftime('%Hh%M:%S')})," + + delay = (datetime.now() - self.__condition_state_change_delayed[0][3]).total_seconds() + logstring = f"{logstring[:-1]} after a delay of {round(delay,1)}s" + + # if self.__manual_logs: + # self.__callback_trigger_reason_log_string = self.__log_to_string(logstring) + # else: + # self.__log(logstring) + + self.__condition_state_change_delayed.clear() + self.__trigger_callbacks(logstring) + + def __on_condition_state_change(self, entity, attribute, old, new, kwargs): + if old != new and (old != None or self.__trigger_callback_on_entity_creation): + if self.__callback_delay == 0.0: + # log_string = f"Callback trigger by {entity}({old} => {new})" + # if self.__manual_logs: + # self.__callback_trigger_reason_log_string = self.__log_to_string(log_string) + # else: self.__log(log_string) + + # if self.__on_update_callback: self.__on_update_callback() + # if self.__has_user_callback_based_on_result(): + # self.__update_result() + self.__trigger_callbacks(f"Callback trigger by {entity}({old} => {new})") + else: + if self.__condition_state_change_delayed_cb_handle != None and self.__appdaemon_api.timer_running(self.__condition_state_change_delayed_cb_handle): + self.__appdaemon_api.cancel_timer(self.__condition_state_change_delayed_cb_handle) + self.__condition_state_change_delayed_cb_handle = None + + self.__condition_state_change_delayed.append((entity,old,new,datetime.now())) + #self.__log("State changed :" + str(entity) + "(" + str(old) + " => " + str(new) + ") but callback delayed for " + str(self.__callback_delay) + "s") + self.__condition_state_change_delayed_cb_handle = self.__appdaemon_api.run_in(self.__on_condition_state_change_delayed, self.__callback_delay) + + def __log(self,message,**kwargs): + log_string = self.__log_to_string(message) + + if 'level' in kwargs and kwargs['level'] == 'ERROR': + self.__log_error(log_string) + elif 'level' in kwargs and kwargs['level'] == 'WARNING': + self.__log_warning(log_string) + else: + self.__log_info(log_string) + + def __log_info(self,message): + if self.__logger_interface: + self.__logger_interface.log_info(message) + else: + try: self.__appdaemon_api.log_info(message) + except AttributeError: self.__appdaemon_api.log(message,level = "INFO") + + def __log_warning(self,message): + if self.__logger_interface: + self.__logger_interface.log_warning(message) + else: + try: self.__appdaemon_api.log_warning(message) + except AttributeError: self.__appdaemon_api.log(message,level = "WARNING") + + def __log_error(self,message): + if self.__logger_interface: + self.__logger_interface.log_error(message) + else: + try: self.__appdaemon_api.log_error(message) + except AttributeError: self.__appdaemon_api.log(message,level = "ERROR") + + def __log_to_string(self,message): + dest_string = "" + if self.__no_log_prefix == False: + for prefix in self.__debug_log_prefixes: + dest_string = dest_string + prefix + if dest_string: dest_string += " " + return dest_string + message diff --git a/smartvalue.py b/smartvalue.py new file mode 100644 index 0000000..2e25682 --- /dev/null +++ b/smartvalue.py @@ -0,0 +1,305 @@ +#import appdaemon.plugins.hass.hassapi as hass +from datetime import datetime +from adexpressioncontext import ADExpressionContext +from expressionparser import ExpressionParser,ParsingException,EvaluationException +from kwargsparser import kwargsParser + +def catch_smartvalue_exception(error_lambda): + def decorator(function): + def wrapper(*args, **kwargs): + try: return function(*args, **kwargs) + except ParsingException as e: error_lambda(args[0],str(e)) + return wrapper + return decorator + +class Evaluator(): + + def __init__(self, appdaemon_api, expression,**kwargs): + self.__expression_parser = [] + self.__entities_to_listen = {} + self.__debug_log_prefixes = [] + self.__last_evaluation_result = None + self.__state_callbacks_handle = list() + + self.__appdaemon_api = appdaemon_api + self.__activated = False + self.__callback_trigger_reason_log_string = None + self.__evaluation_result_log_string = None + + parser = kwargsParser(kwargs,lambda string: self.__log(string,level = "ERROR"),lambda string: self.__log(string,level = "WARNING")) + self.__logger_interface = parser.parse_args('logger_interface',None) + + self.__no_log_prefix = parser.parse_args('no_log_prefix',False) #as no_log_prefix is used to feedback parser error, it need to be defined first + expression_name = parser.parse_args('expression_name',None) + if expression_name: + self.__debug_log_prefixes.append(f"[{expression_name}]") + + self.__on_update_callback = parser.parse_args('on_update_cb',None,['callback']) + self.__on_change_callback = parser.parse_args('on_change_cb',None) + self.__pass_expression_name_to_cb = parser.parse_args('pass_expression_name_to_cb',None) + self.__log_init = parser.parse_args('log_init',True) + self.__unavaibility_value = parser.parse_args('unavaibility_value','unavailable') #None means throw an exception + self.__trigger_callback_on_activation = parser.parse_args('trigger_callback_on_activation',True) + self.__trigger_callback_on_entity_creation = parser.parse_args('trigger_callback_on_entity_creation',True) + self.__log_callback_trigger_reason = parser.parse_args('log_callback_trigger_reason',True) + self.__constants = parser.parse_args('constants',dict()) + self.__sensors_aliases_default_value = parser.parse_args('sensors_aliases_default_value',dict()) + activated = parser.parse_args('activated',True) + sensor_existence_validation = parser.parse_args('sensor_existence_validation',True) + + self.expression_context = ADExpressionContext(self.__appdaemon_api,sensor_existence_validation) + for constant_name,value in self.__constants.items(): + self.expression_context.declare_constant(constant_name,value) + for alias,value in self.__sensors_aliases_default_value.items(): + self.expression_context.declare_sensor_alias_default_value(alias,value) + + parser.validate_args() + + self.__debug_log_prefixes.append("[Parsing]") + self.__expression_parser = ExpressionParser(str(expression),self.expression_context) + self.__add_to_entities_to_listen(self.expression_context.get_entities_to_listen()) + self.__debug_log_prefixes.pop() + + if activated: self.activate() + + def __has_user_callback(self): + return self.__on_update_callback or self.__on_change_callback + + def __has_user_callback_based_on_result(self): + return self.__on_change_callback + + def activate(self): + if not self.__activated: + self.__activated = True + if self.__has_user_callback(): + self.__register_internal_callback() + if self.__has_user_callback_based_on_result() and self.__trigger_callback_on_activation: + self.__trigger_callbacks("Callback trigger by condition activation",True) + + def deactivate(self): + if self.__activated: + self.__activated = False + self.__last_evaluation_result = None + self.__unregister_internal_callback() + + def __evaluate(self): + self.__debug_log_prefixes.append("[Evaluating]") + try: result = self.__expression_parser.evaluate() + except EvaluationException as e: + if self.__unavaibility_value != None and "unavailable" in e.operands: + self.__log_to_string(f"The expression will be evaluated as '{self.__unavaibility_value}' because of the following exception {e}") + self.__debug_log_prefixes.pop() + return self.__unavaibility_value + else: + self.__debug_log_prefixes.pop() + raise e + self.__debug_log_prefixes.pop() + return result + + def evaluate(self,log = True): + try: result = self.__evaluate() + except EvaluationException as e: + self.__log(f"{e}",level = "ERROR") + return None + if log: + self.log_evaluation_result() + return result + + def log_callback_trigger_reason(self): + if self.__callback_trigger_reason_log_string: + self.__log_info(self.__callback_trigger_reason_log_string) + self.__callback_trigger_reason_log_string = None + + def log_evaluation_result(self): + if self.__evaluation_result_log_string: + self.__log_info(self.__evaluation_result_log_string) + self.__evaluation_result_log_string = None + + def get_evaluation_result_log_string(self): + return self.__evaluation_result_log_string + + def log_entities_states(self): + log_string = "Entities states: " + for entity,attributes in self.__entities_to_listen.items(): + for attribute in attributes: + if attribute == "state" or attribute == None: + log_string += f"{entity} = {self.__appdaemon_api.get_state(entity)}," + #not sure why it was here, most likely a left over + #handle = self.__appdaemon_api.listen_state(self.__on_condition_state_change,entity) + else: + log_string += f"{entity}[{attribute}] = {self.__appdaemon_api.get_state(entity,attribute = attribute)}," + + self.__log_info(log_string[:-1]) + + def get_entities_to_listen(self): + return self.__entities_to_listen + + def get_entities_to_listen_as_list(self): + entities_list = list() + for entity_id,attributes in self.__entities_to_listen.items(): + for attribute in attributes: + if attribute == None: + entities_list.append(entity_id) + else: + entities_list.append(f"{entity_id}.{attribute}") + return entities_list + + def __add_to_entities_to_listen(self,extra_entities_to_listen): + #__add_to_entities_to_listen support both : + # ['sensor.a.attribute', 'sensor.a','sensor.a.another_attribute'] and + # { 'sensor.a' : ['attribute','another_attribute'] } + if isinstance(extra_entities_to_listen, list): + for entity_string in extra_entities_to_listen: + splitted_entity = entity_string.split('.') + + if len(splitted_entity) == 2: + entity_id = entity_string + attribute = None + elif len(splitted_entity) == 3: + entity_id = f"{splitted_entity[0]}.{splitted_entity[1]}" + attribute = splitted_entity[2] + else: + self.__log(f"Invalid entity string '{entity_string}' present in extra_entities_to_listen",level = "ERROR") + continue + + if not entity_id in self.__entities_to_listen: + self.__entities_to_listen[entity_id] = [] + if not attribute in self.__entities_to_listen[entity_id]: + self.__entities_to_listen[entity_id].append(attribute) + + elif isinstance(extra_entities_to_listen, dict): + for entity in extra_entities_to_listen: + if not entity in self.__entities_to_listen: + self.__entities_to_listen[entity] = [] + + #I support both "my.entity" : "attribute" and "my.entity": ["attribute1","attribute2"] + if isinstance(extra_entities_to_listen[entity], str): + attribute = extra_entities_to_listen[entity] + if not attribute in self.__entities_to_listen[entity]: self.__entities_to_listen[entity].append(attribute) + else: + for attribute in extra_entities_to_listen[entity]: + if not attribute in self.__entities_to_listen[entity]: self.__entities_to_listen[entity].append(attribute) + else: + self.__log(f"extra_entities_to_listen is in an invalid format: {extra_entities_to_listen}",level = "ERROR") + + def __register_internal_callback(self): + assert len(self.__state_callbacks_handle) == 0, f"self.__state_callbacks_handle is not empy, you are probably trying to call __register_internal_callback() twice" + logstring = "" + for entity,attributes in self.__entities_to_listen.items(): + attributes_string = "" + for attribute in attributes: + if attribute == "state" or attribute == None: #this is not healthy to use 'state' here as at some point an attribute could actually be called "state" + assert attribute != 'state', "please don't use 'state' as a keyword to design the actual state of the entity, but use None instead" + handle = self.__appdaemon_api.listen_state(self.__on_condition_state_change,entity) + else: + handle = self.__appdaemon_api.listen_state(self.__on_condition_state_change,entity, attribute = attribute) + + self.__state_callbacks_handle.append(handle) + + if attributes_string == "" : attributes_string = f"({attribute if attribute else 'state'}" + else: attributes_string += f",{attribute if attribute else 'state'}" + + attributes_string += f")" + + if attributes_string == "(state)": attributes_string = "" + + if logstring == "": + logstring = f"Registering CB for {entity}{attributes_string}" + else: logstring += f", {entity}{attributes_string}" + + if self.__log_init: + if logstring == "": + logstring = f"No CB has been registered" + self.__log(logstring) + + def __unregister_internal_callback(self): + if len(self.__state_callbacks_handle) > 0: + self.__log(f"Unregistering {len(self.__state_callbacks_handle)} callbacks") + for handle in self.__state_callbacks_handle: + self.__appdaemon_api.cancel_listen_state(handle) + self.__state_callbacks_handle = list() + + def __trigger_callbacks(self,trigger_reason,triggered_from_activation = False): + has_log_trigger_reason = False + has_evaluation_result = False + + def log_trigger_reason_once(trigger_reason): + nonlocal has_log_trigger_reason + if not has_log_trigger_reason: + has_log_trigger_reason = True + self.__callback_trigger_reason_log_string = self.__log_to_string(trigger_reason) + if self.__log_callback_trigger_reason: + self.log_callback_trigger_reason() + + def log_evaluation_result_once(): + nonlocal has_evaluation_result + if not has_evaluation_result: + has_evaluation_result = True + if self.__log_callback_trigger_reason: #in that particular case, the evaluation result is a trigger reason + self.log_evaluation_result() + + if self.__on_update_callback and not triggered_from_activation: + log_trigger_reason_once(trigger_reason) + if self.__pass_expression_name_to_cb != None: + self.__on_update_callback(self.__pass_expression_name_to_cb) + else: + self.__on_update_callback() + if self.__has_user_callback_based_on_result(): + try: result = self.__evaluate() + except EvaluationException as e: + self.__log(f"{e}",level = "ERROR") + return + + if result != self.__last_evaluation_result: + if self.__on_change_callback: + log_trigger_reason_once(trigger_reason) + log_evaluation_result_once() + if self.__pass_expression_name_to_cb != None: + self.__on_change_callback(self.__pass_expression_name_to_cb,self.__last_evaluation_result,result) + else: + self.__on_change_callback(self.__last_evaluation_result,result) + + self.__last_evaluation_result = result + + def __on_condition_state_change(self, entity, attribute, old, new, kwargs): + if old != new and (old != None or self.__trigger_callback_on_entity_creation): + self.__trigger_callbacks(f"Callback trigger by {entity}({old} => {new})") + + def __log(self,message,**kwargs): + log_string = self.__log_to_string(message) + + if 'level' in kwargs and kwargs['level'] == 'ERROR': + self.__log_error(log_string) + elif 'level' in kwargs and kwargs['level'] == 'WARNING': + self.__log_warning(log_string) + else: + self.__log_info(log_string) + + def __log_info(self,message): + if self.__logger_interface: + self.__logger_interface.log_info(message) + else: + try: self.__appdaemon_api.log_info(message) + except AttributeError: self.__appdaemon_api.log(message,level = "INFO") + + def __log_warning(self,message): + if self.__logger_interface: + self.__logger_interface.log_warning(message) + else: + try: self.__appdaemon_api.log_warning(message) + except AttributeError: self.__appdaemon_api.log(message,level = "WARNING") + + def __log_error(self,message): + if self.__logger_interface: + self.__logger_interface.log_error(message) + else: + try: self.__appdaemon_api.log_error(message) + except AttributeError: self.__appdaemon_api.log(message,level = "ERROR") + + def __log_to_string(self,message): + dest_string = "" + if self.__no_log_prefix == False: + for prefix in self.__debug_log_prefixes: + dest_string = dest_string + prefix + if dest_string: dest_string += " " + return dest_string + message diff --git a/ssm.py b/ssm.py new file mode 100644 index 0000000..990ad99 --- /dev/null +++ b/ssm.py @@ -0,0 +1,37 @@ +from kwargsparser import kwargsParser as kwargsParser + +class SimpleStateMachine(): + def __init__(self,default_state,**kwargs): + parser = kwargsParser(kwargs) + + self.log_cb = parser.parse_args("log_cb",None) + + parser.validate_args() + self.current_state = None + self.change_state(default_state) + + def log(self,log_string): + if self.log_cb: self.log_cb(log_string) + + def change_state(self,new_state): + if new_state != self.current_state: + if self.current_state != None: + if self.current_state.on_exit != None: + self.current_state.on_exit() + self.log(f"Changing from state {self.current_state.state_name} to {new_state.state_name}") + else: + self.log(f"Initialising with state {new_state.state_name}") + self.current_state = new_state + if self.current_state.on_enter != None: + self.current_state.on_enter() + +class State(): + def __init__(self,state_name,**kwargs): + self.state_name = state_name + + parser = kwargsParser(kwargs) + + self.on_enter = parser.parse_args("on_enter_cb",None) + self.on_exit = parser.parse_args("on_exit_cb",None) + + parser.validate_args() \ No newline at end of file diff --git a/unit_tests/test_attributes.yaml b/unit_tests/test_attributes.yaml new file mode 100644 index 0000000..c994f61 --- /dev/null +++ b/unit_tests/test_attributes.yaml @@ -0,0 +1,56 @@ +fake_entities: + binary_sensor.true: True + binary_sensor.false: False + binary_sensor.off: 'off' + binary_sensor.on: 'on' + sensor.int17: 17 + sensor.int170: 170 + sensor.int200: 200 + sensor.int3: 3 + sensor.float10: 10.0 + sensor.float5: 5.0 + sensor.strNuit: 'Nuit' + sensor.strHome: 'home' + sensor.with_state_and_attribute: 'SomeRandomState' + sensor.with_state_and_attribute.off: 'off' + sensor.with_state_and_attribute.on: 'on' + sensor.with_state_and_attribute.f127: 127 + sensor.with_only_attribute.on: 'on' + sensor.None: #None + +constants: + self: sensor.with_state_and_attribute + pi: 3.14 + +tests: + test_1: + expected_result: Succeeded + conditions: sensor.with_state_and_attribute.on + + test_2: + expected_result: Succeeded + #TODO: find a way to make it work without parenthesis + #conditions: (sensor.with_state_and_attribute.on and binary_sensor.on) == not binary_sensor.false + conditions: (sensor.with_state_and_attribute.on and binary_sensor.on) == (not binary_sensor.false) + + test_3: + expected_result: Succeeded + conditions: sensor.with_state_and_attribute.f127 < sensor.int170 + + test_4: + expected_result: ParsingException + conditions: sensor.with_only_attribute.on != binary_sensor.on + + test_5: + expected_result: Succeeded + #TODO: find a way to make it work without parenthesis + #conditions: (self.on and binary_sensor.on) == not binary_sensor.false + conditions: (self.on and binary_sensor.on) == (not binary_sensor.false) + + test_6: + expected_result: Succeeded + conditions: self.f127 < sensor.int170 + + test_7: + expected_result: ParsingException + conditions: pi.on != binary_sensor.on \ No newline at end of file diff --git a/unit_tests/test_bool.yaml b/unit_tests/test_bool.yaml new file mode 100644 index 0000000..5550f4f --- /dev/null +++ b/unit_tests/test_bool.yaml @@ -0,0 +1,58 @@ +fake_entities: + binary_sensor.true: True + binary_sensor.false: False + binary_sensor.off: 'off' + binary_sensor.on: 'on' + sensor.int17: 17 + sensor.int170: 170 + sensor.int200: 200 + sensor.int3: 3 + sensor.float10: 10.0 + sensor.float5: 5.0 + sensor.strNuit: 'Nuit' + sensor.strHome: 'home' + binary_sensor.nottrue: False + +constants: + andtrap: False + +tests: + test_1: + expected_result: Succeeded + conditions: binary_sensor.true or binary_sensor.false + + test_2: + expected_result: Failed + conditions: binary_sensor.true and binary_sensor.false + + test_3: + expected_result: Succeeded + conditions: binary_sensor.on or binary_sensor.off + + test_4: + expected_result: Failed + conditions: binary_sensor.on and binary_sensor.off == False + + test_5: + expected_result: Succeeded + conditions: binary_sensor.true or binary_sensor.false and binary_sensor.false + + test_6: + expected_result: Failed + conditions: (binary_sensor.true or binary_sensor.false) and binary_sensor.false + + test_7: + expected_result: Succeeded + conditions: not binary_sensor.off + + test_8: + expected_result: Failed + conditions: not binary_sensor.on + + test_9: + expected_result : Succeeded + conditions: binary_sensor.true == (not binary_sensor.false) + + test_10: + conditions: binary_sensor.nottrue == andtrap + expected_result: Succeeded \ No newline at end of file diff --git a/unit_tests/test_comparaison.yaml b/unit_tests/test_comparaison.yaml new file mode 100644 index 0000000..402279d --- /dev/null +++ b/unit_tests/test_comparaison.yaml @@ -0,0 +1,37 @@ +fake_entities: + binary_sensor.true: True + binary_sensor.false: False + binary_sensor.off: 'off' + binary_sensor.on: 'on' + sensor.int17: 17 + sensor.int170: 170 + sensor.int200: 200 + sensor.int3: 3 + sensor.float10: 10.0 + sensor.float5: 5.0 + sensor.strNuit: 'Nuit' + sensor.strHome: 'home' + +tests: + test_1: + expected_result: Succeeded + conditions: sensor.int200 > 115 and (sensor.int200 < 190 or (binary_sensor.on and sensor.int200 < 220)) + test_2: + expected_result: Failed + conditions: (sensor.int170 * 2) < sensor.int200 + test_3: + expected_result: Failed + conditions: not not binary_sensor.off + test_4: + expected_result: Succeeded + #TODO: find a way to make it work without parenthesis + #conditions: (binary_sensor.on and binary_sensor.on) == not binary_sensor.false + conditions: (binary_sensor.on and binary_sensor.on) == (not binary_sensor.false) + test_5: + expected_result: Succeeded + conditions: (binary_sensor.on and binary_sensor.on) == True + + + + + diff --git a/unit_tests/test_constants.yaml b/unit_tests/test_constants.yaml new file mode 100644 index 0000000..0382fbb --- /dev/null +++ b/unit_tests/test_constants.yaml @@ -0,0 +1,53 @@ +fake_entities: + binary_sensor.true: True + binary_sensor.false: False + binary_sensor.off: 'off' + binary_sensor.on: 'on' + sensor.int17: 17 + sensor.int170: 170 + sensor.int200: 200 + sensor.int3: 3 + sensor.float10: 10.0 + sensor.float5: 5.0 + sensor.strNuit: 'Nuit' + sensor.strHome: 'home' + sensor.None: #None + +constants: + self: sensor.int170 + selftrue: binary_sensor.true + pi: 3.14 + +tests: + test_1: + expected_result: Succeeded + conditions: binary_sensor.true and True + + test_2: + expected_result: Failed + conditions: binary_sensor.true and False + + test_3: + expected_result: Succeeded + conditions: sensor.strNuit != None + + test_4: + expected_result: Succeeded + conditions: sensor.None == None + + test_5: + expected_result: Succeeded + conditions: self > 160 + + test_6: + expected_result: Failed + conditions: self < 160 + + test_7: + expected_result: Succeeded + conditions: pi * 2 == 6.28 + + test_8: + expected_result: Succeeded + conditions: selftrue and not sensor.strHome == 'Nuit' + \ No newline at end of file diff --git a/unit_tests/test_entity_to_listen.yaml b/unit_tests/test_entity_to_listen.yaml new file mode 100644 index 0000000..b80b507 --- /dev/null +++ b/unit_tests/test_entity_to_listen.yaml @@ -0,0 +1,54 @@ +fake_entities: + binary_sensor.true: True + binary_sensor.false: False + binary_sensor.off: 'off' + binary_sensor.on: 'on' + sensor.int17: 17 + sensor.int170: 170 + sensor.int200: 200 + sensor.int3: 3 + sensor.float10: 10.0 + sensor.float5: 5.0 + sensor.strNuit: 'Nuit' + sensor.strHome: 'home' + sensor.None: #None + sensor.with_state_and_attribute: 'SomeRandomState' + sensor.with_state_and_attribute.off: 'off' + sensor.with_state_and_attribute.on: 'on' + sensor.with_state_and_attribute.f127: 127 + +constants: + self: sensor.int170 + pi: 3.14 + +tests: + test_1: + expected_result: Succeeded + expected_entities_to_listen: + - binary_sensor.true + conditions: binary_sensor.true and True + test_2: + expected_result: Succeeded + expected_entities_to_listen: + - binary_sensor.true + - binary_sensor.false + conditions: (binary_sensor.true and binary_sensor.false) or binary_sensor.true + test_3: + expected_result: Succeeded + expected_entities_to_listen: + - sensor.int170 + - sensor.float10 + conditions: self > sensor.float10 + test_4: + expected_result: Succeeded + expected_entities_to_listen: + - sensor.float5 + conditions: pi < sensor.float5 + test_5: + expected_result: Failed + expected_entities_to_listen: + - sensor.float5 + - sensor.with_state_and_attribute + - sensor.with_state_and_attribute.on + - sensor.with_state_and_attribute.f127 + conditions: (sensor.with_state_and_attribute.f127 > sensor.float5 or sensor.with_state_and_attribute.on) and sensor.with_state_and_attribute == 'randomstring' diff --git a/unit_tests/test_parsing_errors.yaml b/unit_tests/test_parsing_errors.yaml new file mode 100644 index 0000000..a3bc98c --- /dev/null +++ b/unit_tests/test_parsing_errors.yaml @@ -0,0 +1,26 @@ +fake_entities: + binary_sensor.true: True + binary_sensor.false: False + binary_sensor.off: 'off' + binary_sensor.on: 'on' + sensor.int17: 17 + sensor.int170: 170 + sensor.int200: 200 + sensor.int3: 3 + sensor.float10: 10.0 + sensor.float5: 5.0 + sensor.strNuit: 'Nuit' + sensor.strHome: 'home' + +tests: + test_1: + expected_result: ParsingException + conditions: (binary_sensor.true or binary_sensor.false + + test_2: + expected_result: ParsingException + conditions: (binary_sensor.true and) binary_sensor.false + + test_3: + expected_result: ParsingException + conditions: binary_sensor.on or binary_sensore.off \ No newline at end of file diff --git a/unit_tests/test_smart_conditions.yaml b/unit_tests/test_smart_conditions.yaml new file mode 100644 index 0000000..3760df8 --- /dev/null +++ b/unit_tests/test_smart_conditions.yaml @@ -0,0 +1,60 @@ +fake_entities: + binary_sensor.true: True + binary_sensor.false: False + binary_sensor.off: 'off' + binary_sensor.on: 'on' + sensor.int17: 17 + sensor.int170: 170 + sensor.int200: 200 + sensor.int3: 3 + sensor.float10: 10.0 + sensor.float5: 5.0 + sensor.strNuit: 'Nuit' + sensor.strHome: 'home' + sensor.strNuit.int80: 70 + +constants: + self: binary_sensor.true + +tests: + test_1: + expected_result: Succeeded + conditions: + trigger_conditions: + - sensor.int170 < 10 and sensor.int17 >= 28 + - self and not sensor.strNuit == 'Nuit' + - sensor.int170 >= 34 + + test_2: + expected_result: Disabled + conditions: + trigger_conditions: + - binary_sensor.false and sensor.int170 > 15 + - not binary_sensor.off and sensor.int170 > 65 + disable_conditions: + - sensor.strNuit.int80 < 80 and sensor.strNuit.int80 > 40 + - sensor.strNuit == 'Nuit' + test_3: + expected_result: Failed + conditions: + trigger_conditions: + - binary_sensor.off and binary_sensor.true + - binary_sensor.true + blocking_conditions: self and not binary_sensor.off + disable_conditions: binary_sensor.off + test_4: + expected_result: Disabled + conditions: + trigger_conditions: + - binary_sensor.true and (not binary_sensor.on or binary_sensor.off) + - binary_sensor.false and binary_sensor.on + - binary_sensor.on and not binary_sensor.true + blocking_conditions: not binary_sensor.off + disable_conditions: + - binary_sensor.off + - not binary_sensor.false and sensor.strNuit == 'Nuit' + + + + + diff --git a/unit_tests/test_strings.yaml b/unit_tests/test_strings.yaml new file mode 100644 index 0000000..1dcfb9c --- /dev/null +++ b/unit_tests/test_strings.yaml @@ -0,0 +1,38 @@ +fake_entities: + binary_sensor.true: True + binary_sensor.false: False + binary_sensor.off: 'off' + binary_sensor.on: 'on' + sensor.int17: 17 + sensor.int170: 170 + sensor.int200: 200 + sensor.int3: 3 + sensor.float10: 10.0 + sensor.float5: 5.0 + sensor.strNuit: 'Nuit' + sensor.strHome: 'home' + +tests: + test_1: + expected_result: Succeeded + conditions: binary_sensor.true and (sensor.strHome == 'home' and sensor.float10 > 1) + + test_2: + expected_result: Failed + conditions: binary_sensor.true and (sensor.strHome != 'home' and sensor.float10 > 1) + + test_3: + expected_result: Failed + conditions: not binary_sensor.off and not binary_sensor.on and sensor.strNuit == 'Nuit' + + test_4: + expected_result: Failed #this test will make the parser crash if the and operator evaluate the second operand wether the first one is true or false + conditions: sensor.strHome != 'home' and sensor.strHome < 15 + + test_5: + expected_result: Succeeded + conditions: sensor.float10 != 'Home' and sensor.float10 < 15 + + + + diff --git a/unit_tests/test_template.yaml b/unit_tests/test_template.yaml new file mode 100644 index 0000000..a74cf33 --- /dev/null +++ b/unit_tests/test_template.yaml @@ -0,0 +1,46 @@ +fake_entities: + binary_sensor.true: True + binary_sensor.false: False + binary_sensor.off: 'off' + binary_sensor.on: 'on' + sensor.int17: 17 + sensor.int170: 170 + sensor.int200: 200 + sensor.int3: 3 + sensor.float10: 10.0 + sensor.float5: 5.0 + sensor.strNuit: 'Nuit' + sensor.strHome: 'home' + binary_sensor.nottrue: False + +constants: + andtrap: False + +templates_library: + T1: binary_sensor.true or binary_sensor.false + T2: binary_sensor.on and binary_sensor.off == False + T3: + trigger_conditions: binary_sensor.off == False + blocking_conditions: + - binary_sensor.on + +tests: + test_1: + expected_result: Succeeded + conditions: + + test_2: + expected_result: Succeeded + conditions: + template_conditions: + trigger_conditions: binary_sensor.true + + test_3: + expected_result: Succeeded + conditions: , + + test_4: + expected_result: Failed + conditions: + template_conditions: , + trigger_conditions: binary_sensor.true and binary_sensor.false diff --git a/unit_tests/test_ternary_operator.yaml b/unit_tests/test_ternary_operator.yaml new file mode 100644 index 0000000..2aaa17c --- /dev/null +++ b/unit_tests/test_ternary_operator.yaml @@ -0,0 +1,56 @@ +fake_entities: + binary_sensor.true: True + binary_sensor.false: False + binary_sensor.off: 'off' + binary_sensor.on: 'on' + sensor.int17: 17 + sensor.int170: 170 + sensor.int200: 200 + sensor.int3: 3 + sensor.float10: 10.0 + sensor.float5: 5.0 + sensor.strNuit: 'Nuit' + sensor.strHome: 'home' + sensor.none: + +tests: + test_1: + expected_result: Succeeded + conditions: "(binary_sensor.on ? 10 : 0) > sensor.int3" + + test_2: + expected_result: Succeeded + conditions: "(binary_sensor.on ? sensor.float10 : 0) > sensor.int3" + + test_3: + expected_result: Failed + conditions: "binary_sensor.off ? (binary_sensor.on or binary_sensor.off) : binary_sensor.off" + + test_4: + expected_result: Failed + conditions: "binary_sensor.on ? binary_sensor.on and binary_sensor.off : binary_sensor.on" + + test_5: + expected_result: Succeeded + conditions: "((binary_sensor.on or binary_sensor.false) ? 5 : 1) > sensor.int3" + + test_6: + expected_result: Failed + conditions: "((binary_sensor.on and binary_sensor.false) ? 5 : 1) > sensor.int3" + + test_7: + expected_result: Succeeded + conditions: "(binary_sensor.on ? sensor.float10 : 0) > sensor.int3" + + test_none_ternary: + expected_result: Failed + conditions: "sensor.none != None ? sensor.none > 75 : False" + + # test_15: + # expected_result: Succeeded + # conditions: "(binary_sensor.off ? (binary_sensor.on or binary_sensor.off) : (binary_sensor.off ? sensor.int3 : sensor.int17)) > sensor.int3" + + + + + diff --git a/unittests.py b/unittests.py new file mode 100644 index 0000000..18d1c24 --- /dev/null +++ b/unittests.py @@ -0,0 +1,159 @@ +import smartcondition +import os +import yaml +from expressionparser import ParsingException + +UNIT_TEST_PATH = "unit_tests" + +class AppDaemonMokup: + def __init__(self,fake_entitites): + self.entities = fake_entitites + + def log(self,message,**kwargs): + error_level = f"[{kwargs['level']}]" if 'level' in kwargs else "" + print(f'{error_level}{message}') + + def get_state(self,entity_id,attribute = None): + key = f"{entity_id}.{attribute}" if attribute != None else entity_id + + return self.entities[key] if key in self.entities else None + + def entity_exists(self,entity_id): + return entity_id in self.entities + + #added a couple of method to be directly compatible with Condition Parser + def add_variable(self,variable_name): return self.entity_exists(variable_name) + + def get_variable_value(self,variable_name): return self.get_state(variable_name) + + def reset(self): pass + +def execute_unit_test(test_file): + failed_test = dict() + + with open(test_file, 'r') as file: + test_config = yaml.safe_load(file) + if not 'fake_entities' in test_config: + print(f"Can't run test {test_file} fake_entities is not defined") + return False + else: + print(f"\nRunning test from {test_file}") + + constants = test_config['constants'] if 'constants' in test_config else dict() + templates_library = test_config['templates_library'] if 'templates_library' in test_config else None + appdaemon = AppDaemonMokup(test_config['fake_entities']) + + test_expression = None + #test_expression = "binary_sensor.on == not binary_sensor.false" + + if test_expression: + condition_parser = smartcondition.ConditionsParser(test_expression,appdaemon) + print(f"{condition_parser.parsed_data}") + result = condition_parser.evaluate() + + for test in test_config['tests']: + try: conditions = smartcondition.Evaluator(appdaemon,test_config['tests'][test]['conditions'], constants = constants, log_init = False, log_no_valid_condition_details = True, templates_library = templates_library) + except ParsingException as e: + success = test_config['tests'][test]['expected_result'] == "ParsingException" + + print(f"{test}: {'Success' if success else 'Failed'}") + if not success: + print(f"{os.path.basename(test_file)} {test}: 'Parsing failed' with Exception {e}") + failed_test[test] = test_config['tests'][test] + print() + continue + + else: + if 'expected_entities_to_listen' in test_config['tests'][test]: + expected_entities_to_listen = test_config['tests'][test]['expected_entities_to_listen'] + entities_to_listen = conditions.get_entities_to_listen_as_list() + if sorted(expected_entities_to_listen) != sorted(entities_to_listen): + failed_test[test] = test_config['tests'][test] + print(f"{test}: Failed") + print(f"entities_to_listen = {entities_to_listen}, expected : {expected_entities_to_listen}") + continue + + + result = conditions.evaluate(False) + success = result.name == test_config['tests'][test]['expected_result'] + print(f"{test}: {'Success' if success else 'Failed'}") + if not success: + failed_test[test] = test_config['tests'][test] + conditions.log_evaluation_result() + print() + continue + + + print() + if len(failed_test) > 0: + print(f"{len(failed_test)} tests have failed in {os.path.basename(test_file)} :") + for test in failed_test: + print(f"{test} : {test_config['tests'][test]['conditions']}") + print(f"Expected result was {test_config['tests'][test]['expected_result']}") + else: + print("All the tests have succeeded") + + + return len(failed_test) == 0 + +def tiny_parser(): + import pyparsing + + variable_names = pyparsing.Combine(pyparsing.Literal('$') + pyparsing.Word(pyparsing.alphanums + '_')) + + #integer = pyparsing.Word(pyparsing.nums) + integer = pyparsing.pyparsing_common.signed_integer + + double = pyparsing.Combine(pyparsing.Word(pyparsing.nums) + '.' + pyparsing.Word(pyparsing.nums)) + + parser = pyparsing.infix_notation( + variable_names | double | integer, + [ + ('not', 1, pyparsing.opAssoc.RIGHT), + ('**', 2, pyparsing.opAssoc.RIGHT), + ('-', 1, pyparsing.opAssoc.RIGHT), + (pyparsing.oneOf('* / // %'), 2, pyparsing.opAssoc.LEFT), + (pyparsing.oneOf('+ -'), 2, pyparsing.opAssoc.LEFT), + (pyparsing.oneOf('> >= < <= == !='), 2, pyparsing.opAssoc.LEFT), + ('and', 2, pyparsing.opAssoc.LEFT), + ('or', 2, pyparsing.opAssoc.LEFT) + ] + ) + + examples = [ + "$true and not 10 == 9", + "$true == $false", + "$true == not $false", + "$true == (not $false)", + "5 * 10 ** -2", + "5 * 10 * -2", + "5 * 10 ** (-2)", + "5 * -10 ** 2", + "5 * (-10) ** 2", + "5 and not 8", + "5 and -8", + "1 ** -2", + "-1 ** 2", + ] + + longest = max(map(len, examples)) + + for ex in examples: + result = parser.parseString(ex) + print(f'{ex:{longest}} <=> {result}') + + +#tiny_parser() +failed_test = list() + +path = os.path.join(os.path.dirname(os.path.abspath(__file__)),UNIT_TEST_PATH) +with os.scandir(path) as it: + for entry in it: + if entry.is_file() and entry.name.lower().endswith('.yaml'): + if not execute_unit_test(os.path.join(path , entry.name)): + failed_test.append(entry.name) + +if len(failed_test): + print(f"{len(failed_test)} test file(s) have failed :") + for test in failed_test: + print(f"{test}") \ No newline at end of file diff --git a/virtualsensors.py b/virtualsensors.py new file mode 100644 index 0000000..d6e5c53 --- /dev/null +++ b/virtualsensors.py @@ -0,0 +1,672 @@ +import appdaemon.plugins.hass.hassapi as hass +import smartcondition as SmartCondition +import smartvalue as SmartValue +from kwargsparser import kwargsParser +from expressionparser import ParsingException +import time +from logger_interface import LoggerInterface + +class VirtualSensorBase: + def __init__(self,ad_api,logger_interface, virtual_sensor_name = None, sensor_name = None,super_entity_id = None, yaml_block = None,templates_library = None, constants = None,self_initialize = False): + assert yaml_block + self.ad_api = ad_api + self.logger_interface = logger_interface + + # sensor_name should be something like 'domain.name' whereas virtual_sensor_name should be only 'name' + # only one should be provided and the other one will be deduced. + # the regular use case is giving virtual_sensor_name. + # Giving a specific sensor_name is usefull when the entity exist before with a different domain + if virtual_sensor_name: + assert sensor_name == None + self.virtual_sensor_name = virtual_sensor_name + self.sensor_name = self.generate_sensor_name(virtual_sensor_name) + else: + assert virtual_sensor_name == None + self.sensor_name = sensor_name + self.virtual_sensor_name = sensor_name.split('.')[1] + self.sensor_domain = self.sensor_name.split(".")[0] + + self.yaml_block = yaml_block + self.templates_library = templates_library + + try: self.attributes = self.yaml_block['attributes'] + except (TypeError,KeyError): self.attributes = dict() + + if constants: self.constants = dict(constants) # we don't want to modify the parent dict + else: self.constants = {'self' : self.sensor_name } + if super_entity_id != None: + self.constants['super'] = super_entity_id + + if self_initialize: + self._initialize() + + def generate_sensor_name(self,virtual_sensor_name): + return f"sensor.{virtual_sensor_name}" + # TODO : restore that code once we rename the sensor + # splitted_name = virtual_sensor_name.split(".") + # assert len(splitted_name) == 2,f"Invalid virtual sensor name : {virtual_sensor_name}" + # return f"sensor.{splitted_name[1]}" + + def log(self,message): self.logger_interface.log_info(f"[{self.sensor_name}]{message}") + + def log_info(self,message): self.logger_interface.log_info(f"[{self.sensor_name}]{message}") + + def log_warning(self,message): self.logger_interface.log_warning(f"[{self.sensor_name}]{message}") + + def log_error(self,message): self.logger_interface.log_error(f"[{self.sensor_name}]{message}") + + def generate_dependencies_list(self): return [] + + def generate_attributes_dependencies_list(self): + dependencies = list() + if 'sensor_attributes' in self.yaml_block: + for key,entity in self.yaml_block['sensor_attributes'].items(): + dependencies.append(entity) + return dependencies + + def _initialize(self): + if 'sensor_attributes' in self.yaml_block: + for attribute_name,sensor_name in self.yaml_block['sensor_attributes'].items(): + if not self.ad_api.entity_exists(sensor_name): + self.log_error(f"Error in sensor_attribute. {attribute_name}:{sensor_name}. {sensor_name} doesn't exist") + else: + self.attributes[attribute_name] = self.ad_api.get_state(sensor_name) + self.ad_api.listen_state(self.on_attribute_sensor_state_change,sensor_name) + + try: self.initialize() + except ParsingException as e: + self.log_error(e) + self.ad_api.set_state(self.sensor_name, attributes = self.attributes) + + def on_attribute_sensor_state_change(self, entity, attribute, old, new, kwargs): + attributes_update = False + for attribute_name,sensor_name in self.yaml_block['sensor_attributes'].items(): + if sensor_name == entity: + self.attributes[attribute_name] = new + attributes_update = True + + if attributes_update: + self.ad_api.set_state(self.sensor_name, attributes = self.attributes) + + def initialize(self): assert False + + def extract_dependencies_from_entities_to_listen(self,entities_to_listen,dependencies): + for entity_with_attribute in entities_to_listen: + splitted_entity = entity_with_attribute.split('.') + assert len(splitted_entity) == 3 or len(splitted_entity) == 2 + if len(splitted_entity) == 3: + entity = f"{splitted_entity[0]}.{splitted_entity[1]}" + else: + entity = entity_with_attribute + + if entity != self.sensor_name and not entity in dependencies: + dependencies.append(entity) + +class Averager(VirtualSensorBase): + def initialize(self): + self.round = None + self.confirm_steady_cb_handle = None + self.confirm_tendency_cb_handle = None + self.icons_config = None + self.sensors_list = list() + self.average_sensor_attributes = dict() + self.all_sensors_are_mandatory = True + if isinstance(self.yaml_block,dict): + if 'all_sensors_are_mandatory' in self.yaml_block: self.all_sensors_are_mandatory = self.yaml_block['all_sensors_are_mandatory'] + if 'round' in self.yaml_block: self.round = self.yaml_block['round'] + if 'sensors' in self.yaml_block: + self.sensors_list = self.yaml_block['sensors'] if isinstance(self.yaml_block['sensors'],list) else [self.yaml_block['sensors']] + if 'icons' in self.yaml_block: + self.icons_config = self.yaml_block['icons'] + if 'attributes' in self.yaml_block: + self.average_sensor_attributes = self.yaml_block['attributes'] + else: + self.average_sensor_attributes = { 'tendency' : 'steady' } + else: + self.sensors_list = [self.yaml_block] + + for sensor in self.sensors_list: + self.ad_api.listen_state(self.on_sensor_state_change,sensor) + + self.average_sensor = self.ad_api.get_entity(self.sensor_name) + + if not self.average_sensor.exists(): + average = self.compute_average() + self.average_sensor.set_state(state = average if average else 'unavailable',attributes = self.average_sensor_attributes) + + self.update_average() + + def generate_dependencies_list(self): + if isinstance(self.yaml_block,dict): + if 'sensors' in self.yaml_block: + return self.yaml_block['sensors'] if isinstance(self.yaml_block['sensors'],list) else [self.yaml_block['sensors']] + else: + return [] + else: + return [ self.yaml_block ] + + def on_sensor_state_change(self, *args): + self.update_average() + + def compute_average(self): + result_count = 0 + average = 0 + for sensor in self.sensors_list: + try: + value = float(self.ad_api.get_state(sensor)) + result_count += 1 + average += value + except (TypeError,ValueError): + if self.all_sensors_are_mandatory: + return None + + if result_count == 0: return None + average /= result_count + if self.round != None: + average = round(average,self.round) + return average + + + def update_average(self): + try: prev_average = float(self.average_sensor.get_state()) + except (TypeError,ValueError): prev_average = None + new_average = self.compute_average() + if new_average and new_average != prev_average: + #if we don't change the value for one hour, let's go back to the default icon + if self.confirm_steady_cb_handle != None: + self.ad_api.cancel_timer(self.confirm_steady_cb_handle) + self.confirm_steady_cb_handle = None + self.confirm_steady_cb_handle = self.ad_api.run_in(self.on_confirm_steady, 60 * 60) + + if prev_average: + if self.confirm_tendency_cb_handle: + self.ad_api.cancel_timer(self.confirm_tendency_cb_handle) + self.confirm_tendency_cb_handle = None + + prev_tendency = self.average_sensor.get_state(attribute = 'tendency') + if new_average > prev_average: + if prev_tendency == 'down': + self.average_sensor_attributes['tendency'] = 'steady' + self.confirm_tendency_cb_handle = self.ad_api.run_in(self.on_confirm_tendency, 30 * 60,tendency = 'up') + else: + self.average_sensor_attributes['tendency'] = 'up' + else: + if prev_tendency == 'up': + self.average_sensor_attributes['tendency'] = 'steady' + self.confirm_tendency_cb_handle = self.ad_api.run_in(self.on_confirm_tendency, 30 * 60,tendency = 'down') + else: + self.average_sensor_attributes['tendency'] = 'down' + + self.update_icon_attribute() + + self.average_sensor.set_state(state = new_average,attributes = self.average_sensor_attributes) + + def update_icon_attribute(self): + if self.icons_config: + if self.average_sensor_attributes['tendency'] == 'up' and 'up' in self.icons_config: + self.average_sensor_attributes['icon'] = self.icons_config['up'] + if self.average_sensor_attributes['tendency'] == 'down' and 'down' in self.icons_config: + self.average_sensor_attributes['icon'] = self.icons_config['down'] + if self.average_sensor_attributes['tendency'] == 'steady' and 'default' in self.icons_config: + self.average_sensor_attributes['icon'] = self.icons_config['default'] + + def on_confirm_tendency(self, kwargs): + tendency = kwargs['tendency'] + self.confirm_tendency_cb_handle = None + + self.average_sensor_attributes['tendency'] = tendency + self.update_icon_attribute() + self.average_sensor.set_state(attributes = self.average_sensor_attributes) + + def on_confirm_steady(self, kwargs): + self.confirm_steady_cb_handle = None + self.average_sensor_attributes['tendency'] = 'steady' + self.update_icon_attribute() + self.average_sensor.set_state(attributes = self.average_sensor_attributes) + +class ValueSelector(VirtualSensorBase): + class SelectorConfig: + def __init__(self,name,conditions,value,is_smart_value): + self.conditions = conditions + self.value = str(value) #value like "17" are deserialized as int + self.name = str(name) + self.is_smart_value = is_smart_value + + class Selector: + def __init__(self,config, ad_api = None,logger_interface = None, constants = None, templates_library = None): + self.config = config + self.conditions = SmartCondition.Evaluator(ad_api,self.config.conditions, logger_interface = logger_interface, unavaibility_result = SmartCondition.Result.Failed,condition_name = self.config.name, log_callback_trigger_reason = False, constants = constants, templates_library = templates_library) + if config.is_smart_value: + self.value = SmartValue.Evaluator(ad_api,self.config.value, logger_interface = logger_interface,expression_name = self.config.name, log_callback_trigger_reason = False, constants = constants) + else: + self.value = None + + def compute_value(self): + if self.value: return self.value.evaluate(False) + else: return self.config.value + + def get_entities_to_listen(self): + if self.value: + entities_to_listen = list(self.conditions.get_entities_to_listen_as_list()) + entities_to_listen.extend(self.value.get_entities_to_listen_as_list()) + return entities_to_listen + else: + return self.conditions.get_entities_to_listen_as_list() + + + def initialize(self): + self.value_selectors = list() + selectors_config = self.process_config() + + self.output_sensor = self.ad_api.get_entity(self.sensor_name) + + if not self.output_sensor.exists(): + default_config = selectors_config[-1] #the last config is the default config + if default_config.is_smart_value: + default_value = SmartValue.Evaluator(self.ad_api,default_config.value,expression_name = default_config.name, logger_interface = self.logger_interface, log_callback_trigger_reason = False, constants = self.constants).evaluate() + else: + default_value = default_config.value + self.set_value(default_value) + + entities_to_listen = set() + + for config in selectors_config: + new_selector = self.Selector(config,ad_api = self.ad_api, logger_interface = self.logger_interface, constants = self.constants, templates_library = self.templates_library) + self.value_selectors.append(new_selector) + entities_to_listen.update(new_selector.get_entities_to_listen()) + + for entity_to_listen in entities_to_listen: + slitted_entity = entity_to_listen.split(".") + if len(slitted_entity) == 3: + self.ad_api.listen_state(self.on_entities_to_listen_changed,f"{slitted_entity[0]}.{slitted_entity[1]}",attribute = slitted_entity[2]) + else: + self.ad_api.listen_state(self.on_entities_to_listen_changed,entity_to_listen) + + self.update_value() + + def on_entities_to_listen_changed(self, entity, attribute, old, new, kwargs): + self.update_value() + + def process_config(self) -> list[SelectorConfig]: + configs = list() + #self.log(f"yaml_block:{self.yaml_block}") + if 'values' in self.yaml_block: values_block = self.yaml_block['values'] + else: values_block = self.yaml_block + + for key,item in values_block.items(): + try: conditions_block = item['conditions'] + except (TypeError,KeyError): conditions_block = item + + try: + value = item['value'] + is_smart_value = True + except (TypeError,KeyError): + value = key + is_smart_value = False #SmartValue are not supported for 'inline' value like "17: some_condition" + + configs.append(self.SelectorConfig(key,conditions_block,value,is_smart_value)) + + return configs + + def generate_dependencies_list(self): + dependencies = list() + + selectors_config = self.process_config() + + for config in selectors_config: + #self.log(f"config: name = {config.name},conditions = {config.conditions},value = {config.value}") + conditions = SmartCondition.Evaluator(self.ad_api,config.conditions, logger_interface = self.logger_interface,condition_name = config.name, sensor_existence_validation = False, constants = self.constants, log_init = False) + self.extract_dependencies_from_entities_to_listen(conditions.get_entities_to_listen_as_list(),dependencies) + + if config.is_smart_value: + value = SmartValue.Evaluator(self.ad_api,config.value,expression_name = config.name, sensor_existence_validation = False, constants = self.constants, log_init = False) + self.extract_dependencies_from_entities_to_listen(value.get_entities_to_listen_as_list(),dependencies) + + return dependencies + + def on_value_condition_change(self, *kwargs): + self.update_value() + + def set_value(self, value): + if self.sensor_domain == 'number': + self.ad_api.call_service("number/set_value", entity_id = self.sensor_name,value = value) + else: + self.output_sensor.set_state(state = value) + + def update_value(self): + previous_value = str(self.output_sensor.get_state()) + + for selector in self.value_selectors: + if selector.conditions.evaluate(False) == SmartCondition.Result.Succeeded: + new_value = selector.compute_value() + if previous_value != str(new_value): + #TODO: find a way to know wether if it's the condition or the value that trigger the update + selector.conditions.log_callback_trigger_reason() + selector.conditions.log_evaluation_result() + if selector.value: + selector.value.log_callback_trigger_reason() + selector.value.log_evaluation_result() + self.set_value(new_value) + break + +class ContinuousCondition(VirtualSensorBase): + def initialize(self): + self.cb_handle = None + self.time = self.yaml_block['time'] + try: self.ignore_unavailable_condition = self.yaml_block['ignore_unavailable_condition'] + except KeyError: self.ignore_unavailable_condition = False + try: self.invert = self.yaml_block['invert'] + except KeyError: self.invert = False + self.log(f"Initializing continuous condition for {self.sensor_name}") + self.output_sensor = self.ad_api.get_entity(self.sensor_name) + + if not self.output_sensor.exists(): + self.log(f"{self.sensor_name} does not exist") + try: + self.output_sensor.set_state(state = self.yaml_block['default_value']) + self.log(f"adding {self.sensor_name} with default state {self.yaml_block['default_value']}") + except KeyError: + self.output_sensor.set_state(state = 'off') + self.log(f"adding {self.sensor_name} with default state off") + else: + self.log(f"{self.sensor_name} already exist with state = {self.output_sensor.get_state()}") + + if self.invert: + succeed_cb = self.on_condition_fail + fail_cb = self.on_condition_succeed + else: + succeed_cb = self.on_condition_succeed + fail_cb = self.on_condition_fail + + self.condition = SmartCondition.Evaluator(self.ad_api,self.yaml_block['conditions'], unavaibility_result = SmartCondition.Result.Unavailable if self.ignore_unavailable_condition else None, logger_interface = self.logger_interface,condition_name = self.sensor_name,on_succeed_cb = succeed_cb, on_fail_cb = fail_cb, constants = self.constants, templates_library = self.templates_library) + # new_value = 'on' if self.condition.evaluate() == SmartCondition.Result.Succeeded else 'off' + # self.output_sensor.set_state(state = new_value) + + def generate_sensor_name(self,virtual_sensor_name): + return f"binary_sensor.{virtual_sensor_name}" + #TODO : restore that code once we rename the sensor + splitted_name = virtual_sensor_name.split(".") + assert len(splitted_name) == 2,f"Invalid virtual sensor name : {virtual_sensor_name}" + return f"binary_sensor.{splitted_name[1]}" + + def generate_dependencies_list(self): + dependencies = list() + condition = SmartCondition.Evaluator(self.ad_api,self.yaml_block['conditions'], logger_interface = self.logger_interface, constants = self.constants, templates_library = self.templates_library, sensor_existence_validation = False, log_init = False) + self.extract_dependencies_from_entities_to_listen(condition.get_entities_to_listen_as_list(),dependencies) + + return dependencies + + def register_callback(self): + if self.cb_handle == None: + self.cb_handle = self.ad_api.run_in(self.on_time_elapse, self.time) + + def cancel_callback(self): + if self.cb_handle != None: + self.ad_api.cancel_timer(self.cb_handle) + self.cb_handle = None + + def on_time_elapse(self, kwargs): + self.cb_handle = None + value = 'off' if self.invert else 'on' + if self.output_sensor.get_state() != value: + self.output_sensor.set_state(state = value) + + def on_condition_succeed(self): + self.register_callback() + + def on_condition_fail(self): + self.cancel_callback() + value = 'on' if self.invert else 'off' + if self.output_sensor.get_state() != value: + self.output_sensor.set_state(state = value) + +class RetainCondition(VirtualSensorBase): + def initialize(self): + self.cb_handle = None + self.retain_time = self.yaml_block['retain_time'] + try: self.reset_retain_time_on_new_success = self.yaml_block['reset_retain_time_on_new_success'] + except KeyError: self.reset_retain_time_on_new_success = True + self.log(f"Initializing retain condition for {self.sensor_name}") + self.output_sensor = self.ad_api.get_entity(self.sensor_name) + + if not self.output_sensor.exists(): + self.log(f"{self.sensor_name} does not exist") + try: + self.output_sensor.set_state(state = self.yaml_block['default_value']) + self.log(f"adding {self.sensor_name} with default state {self.yaml_block['default_value']}") + except KeyError: + self.output_sensor.set_state(state = 'off') + self.log(f"adding {self.sensor_name} with default state off") + else: + self.log(f"{self.sensor_name} already exist with state = {self.output_sensor.get_state()}") + + self.condition = SmartCondition.Evaluator(self.ad_api,self.yaml_block['conditions'], logger_interface = self.logger_interface,condition_name = self.sensor_name,on_succeed_cb = self.on_condition_succeed, on_fail_cb = self.on_condition_fail, constants = self.constants, templates_library = self.templates_library) + # new_value = 'on' if self.condition.evaluate() == SmartCondition.Result.Succeeded else 'off' + # self.output_sensor.set_state(state = new_value) + + def generate_sensor_name(self,virtual_sensor_name): + return f"binary_sensor.{virtual_sensor_name}" + + def generate_dependencies_list(self): + dependencies = list() + condition = SmartCondition.Evaluator(self.ad_api,self.yaml_block['conditions'], logger_interface = self.logger_interface, constants = self.constants, templates_library = self.templates_library, sensor_existence_validation = False, log_init = False) + self.extract_dependencies_from_entities_to_listen(condition.get_entities_to_listen_as_list(),dependencies) + + return dependencies + + def register_callback(self): + if self.cb_handle == None: + self.cb_handle = self.ad_api.run_in(self.on_time_elapse, self.retain_time) + + def cancel_callback(self): + if self.cb_handle != None: + self.ad_api.cancel_timer(self.cb_handle) + self.cb_handle = None + + def on_time_elapse(self, kwargs): + self.cb_handle = None + if self.condition.evaluate() == SmartCondition.Result.Failed: + if self.output_sensor.get_state() != 'off': + self.output_sensor.set_state(state = 'off') + + def on_condition_succeed(self): + if self.output_sensor.get_state() != 'on': + self.output_sensor.set_state(state = 'on') + if self.reset_retain_time_on_new_success: + self.cancel_callback() + self.register_callback() + + def on_condition_fail(self): + # we don't want to set the sensor to off during the timer + if self.cb_handle == None: + if self.output_sensor.get_state() != 'off': + self.output_sensor.set_state(state = 'off') + +class BinarySensor(VirtualSensorBase): + def initialize(self): + self.output_sensor = self.ad_api.get_entity(self.sensor_name) + + if not self.output_sensor.exists(): + self.output_sensor.set_state(state = 'off',attributes = self.attributes) + + #self.ad_api.log(f"Creating Binary Sensor: binary_sensor = {self.virtual_sensor_name}, yaml_block = {self.yaml_block}") + self.condition = SmartCondition.Evaluator(self.ad_api,self.get_expression_yaml(), logger_interface = self.logger_interface, unavaibility_result = SmartCondition.Result.Unavailable,condition_name = self.virtual_sensor_name,on_change_cb = self.on_sensor_change, constants = self.constants, templates_library = self.templates_library) + new_value = 'on' if self.condition.evaluate() == SmartCondition.Result.Succeeded else 'off' + self.output_sensor.set_state(state = new_value) + + def on_sensor_change(self,prev_result,result): + if result == SmartCondition.Result.Unavailable: self.output_sensor.set_state(state = 'unavailable') + elif result == SmartCondition.Result.Succeeded: self.output_sensor.set_state(state = 'on') + elif result == SmartCondition.Result.Failed: self.output_sensor.set_state(state = 'off') + # else: Disabled means we keep the previous value + + def generate_dependencies_list(self): + dependencies = list() + condition = SmartCondition.Evaluator(self.ad_api,self.get_expression_yaml(), logger_interface = self.logger_interface, condition_name = [self.virtual_sensor_name,"generate_dependencies_list"],constants = self.constants, templates_library = self.templates_library, sensor_existence_validation = False, log_init = False) + self.extract_dependencies_from_entities_to_listen(condition.get_entities_to_listen_as_list(),dependencies) + + return dependencies + + def get_expression_yaml(self): + try: return self.yaml_block['conditions'] + except (TypeError,KeyError): return self.yaml_block + + def generate_sensor_name(self,virtual_sensor_name): + return f"binary_sensor.{virtual_sensor_name}" + #TODO : restore that code once we rename the sensor + splitted_name = virtual_sensor_name.split(".") + assert len(splitted_name) == 2,f"Invalid virtual sensor name : {virtual_sensor_name}" + return f"binary_sensor.{splitted_name[1]}" + +class ValueSensor(VirtualSensorBase): + def initialize(self): + # should be done in the base class + # try: self.sensor_attributes = self.yaml_block['attributes'] + # except TypeError: self.sensor_attributes = {} + + self.output_sensor = self.ad_api.get_entity(self.sensor_name) + if not self.output_sensor.exists(): + self.output_sensor.set_state(state = 0, attributes = self.attributes) + + self.smart_value = SmartValue.Evaluator(self.ad_api,self.get_expression_yaml(), logger_interface = self.logger_interface, expression_name = self.virtual_sensor_name,on_change_cb = self.on_sensor_change, constants = self.constants) + new_value = self.smart_value.evaluate() + self.output_sensor.set_state(state = new_value) + + def on_sensor_change(self,prev_result,result): + self.output_sensor.set_state(state = result, attributes = self.attributes) + + def get_expression_yaml(self): + try: return self.yaml_block['value'] + except (TypeError,KeyError): return self.yaml_block + + def generate_dependencies_list(self): + dependencies = list() + smart_value = SmartValue.Evaluator(self.ad_api,self.get_expression_yaml(), logger_interface = self.logger_interface, expression_name = self.virtual_sensor_name, constants = self.constants, sensor_existence_validation = False, log_init = False) + self.extract_dependencies_from_entities_to_listen(smart_value.get_entities_to_listen_as_list(),dependencies) + + return dependencies + + def generate_sensor_name(self,virtual_sensor_name): + return f"sensor.{virtual_sensor_name}" + +class VirtualSensors(): + + def __init__(self, ad_api = None,logger_interface = None, super_entity_id = None, yaml_block = None ,args_to_ignore_in_validation = [], constants = None, templates_library = None): + assert ad_api + assert yaml_block + assert logger_interface + self.ad_api = ad_api + self.virtual_sensors = dict() + self.logger_interface = logger_interface + + parser = kwargsParser(yaml_block,lambda string: logger_interface.log_error(string),lambda string: logger_interface.log_warning(string)) + + default_values = parser.parse_args('default_values',{}) + for sensor_name,default_value in default_values.items(): + if not self.ad_api.entity_exists(sensor_name): + self.logger_interface.log_info(f"Creating sensor {sensor_name} = {default_value}") + self.ad_api.set_state(sensor_name, state = str(default_value)) + + averagers = parser.parse_args('averagers',{}) + for averager in averagers: + self.virtual_sensors[f"sensor.{averager}"] = Averager(self.ad_api,self.logger_interface,averager,super_entity_id = super_entity_id,yaml_block = yaml_block['averagers'][averager],constants = constants, templates_library = templates_library) + + continuous_conditions = parser.parse_args('continuous_conditions',{}) + for continuous_condition in continuous_conditions: + self.virtual_sensors[f"binary_sensor.{continuous_condition}"] = ContinuousCondition(self.ad_api,self.logger_interface,continuous_condition,super_entity_id = super_entity_id,yaml_block = yaml_block['continuous_conditions'][continuous_condition],templates_library = templates_library) + + binary_sensors = parser.parse_args('binary_sensors',{}) + for binary_sensor in binary_sensors: + #self.logger_interface.log_info(f"Creating Binary Sensor: binary_sensor = {binary_sensor}, super_entity_id = {super_entity_id}, yaml_block = {yaml_block['binary_sensors']}") + self.virtual_sensors[f"binary_sensor.{binary_sensor}"] = BinarySensor(self.ad_api,self.logger_interface,binary_sensor,super_entity_id = super_entity_id,yaml_block = yaml_block['binary_sensors'][binary_sensor],constants = constants,templates_library = templates_library) + + value_selectors = parser.parse_args('value_selectors',{}) + for value_selector in value_selectors: + self.virtual_sensors[f"sensor.{value_selector}"] = ValueSelector(self.ad_api,self.logger_interface,value_selector,super_entity_id = super_entity_id,yaml_block = yaml_block['value_selectors'][value_selector],constants = constants,templates_library = templates_library) + + sensors = parser.parse_args('sensors',{}) + for sensor in sensors: + splitted_sensor = sensor.split('.') + if len(splitted_sensor) != 2: + self.logger_interface.log_error(f"Invalid sensor name {splitted_sensor}") + + if splitted_sensor[0] == 'binary_sensor': + self.virtual_sensors[f"binary_sensor.{splitted_sensor[1]}"] = BinarySensor(self.ad_api,self.logger_interface,splitted_sensor[1],super_entity_id = super_entity_id,yaml_block = yaml_block['sensors'][sensor],constants = constants,templates_library = templates_library) + elif splitted_sensor[0] == 'sensor': + self.virtual_sensors[f"sensor.{splitted_sensor[1]}"] = ValueSensor(self.ad_api,self.logger_interface,splitted_sensor[1],super_entity_id = super_entity_id,yaml_block = yaml_block['sensors'][sensor],constants = constants,templates_library = templates_library) + elif splitted_sensor[0] == 'continuous_condition': + self.virtual_sensors[f"binary_sensor.{splitted_sensor[1]}"] = ContinuousCondition(self.ad_api,self.logger_interface,splitted_sensor[1],super_entity_id = super_entity_id,yaml_block = yaml_block['sensors'][sensor],constants = constants,templates_library = templates_library) + elif splitted_sensor[0] == 'averager': + self.virtual_sensors[f"sensor.{splitted_sensor[1]}"] = Averager(self.ad_api,self.logger_interface,splitted_sensor[1],super_entity_id = super_entity_id,yaml_block = yaml_block['sensors'][sensor],constants = constants,templates_library = templates_library) + elif splitted_sensor[0] == 'value_selector': + self.virtual_sensors[f"sensor.{splitted_sensor[1]}"] = ValueSelector(self.ad_api,self.logger_interface,splitted_sensor[1],super_entity_id = super_entity_id,yaml_block = yaml_block['sensors'][sensor],constants = constants,templates_library = templates_library) + elif splitted_sensor[0] == 'retain_condition': + self.virtual_sensors[f"binary_sensor.{splitted_sensor[1]}"] = RetainCondition(self.ad_api,self.logger_interface,splitted_sensor[1],super_entity_id = super_entity_id,yaml_block = yaml_block['sensors'][sensor],constants = constants,templates_library = templates_library) + else: + self.logger_interface.log_error(f"Invalid sensor prefix {splitted_sensor[0]}") + + parser.validate_args(args_to_ignore_in_validation) + + dependencies_graph = dict() + for sensor_name, virtual_sensor in self.virtual_sensors.items(): + try: + dependencies_graph[sensor_name] = virtual_sensor.generate_dependencies_list() + # add the dependencies from the attributes + dependencies_graph[sensor_name].extend(virtual_sensor.generate_attributes_dependencies_list()) + #self.logger_interface.log_info(f"Dependencies = {dependencies_graph}") + except ParsingException as e: + self.logger_interface.log_error(f"[Dependencies][{sensor_name}] {e}") + + self.remove_obsolete_dependencies(dependencies_graph) + + while len(dependencies_graph) > 0: + virtual_sensor_initialized = list() + #self.logger_interface.log_info(f"Dependencies = {dependencies_graph}") + + #initialize sensor with no dependencies + for sensor_name, dependencies_list in dependencies_graph.items(): + if len(dependencies_list) == 0: + virtual_sensor_initialized.append(sensor_name) + self.virtual_sensors[sensor_name]._initialize() + + #removing initialized sensors from dependencies_graph + for sensor_name in virtual_sensor_initialized: + del dependencies_graph[sensor_name] + + #if didn't initialized any sensors, we are in a circular dependencies + ignore_sensor = None + if len(virtual_sensor_initialized) == 0: + foundInDefaultValues = False + for sensor_name in dependencies_graph: + if sensor_name in default_values: + self.logger_interface.log_info(f"Found circular dependencies in the following sensors : {dependencies_graph}. The default values of {sensor_name} will be use to break it.") + ignore_sensor = sensor_name + foundInDefaultValues = True + break + + if not foundInDefaultValues: + self.logger_interface.log_error(f"Found circular dependencies in the following sensors : {dependencies_graph}. You can give a default value with 'default_values' to one of them to break it.") + break + + #remove all dependencies towards sensors that are already been initialized + self.remove_obsolete_dependencies(dependencies_graph,ignore_sensor) + + def remove_obsolete_dependencies(self, dependencies_graph, ignore_sensor = None): + for sensor_name, dependencies_list in dependencies_graph.items(): + current_dependencies = list() + for dependency in dependencies_list: + if dependency in dependencies_graph and dependency != ignore_sensor: + current_dependencies.append(dependency) + dependencies_graph[sensor_name] = current_dependencies + + + def on_sensor_change(self,sensor_name,prev_result,result): + if result == SmartCondition.Result.Succeeded: + self.ad_api.set_state(f'binary_sensor.{sensor_name}',state = 'on') + else : + self.ad_api.set_state(f'binary_sensor.{sensor_name}',state = 'off') + +class VirtualSensorsApp(hass.Hass): + def initialize(self): + self.logger_interface = LoggerInterface(self.get_ad_api(),default_log = "virtualsensors_log") + self.virtual_sensors = VirtualSensors(ad_api = self.get_ad_api(),logger_interface = self.logger_interface,yaml_block = self.args,args_to_ignore_in_validation = ['module','class','global_dependencies','priority']) \ No newline at end of file