Files
ad_toolbox/adexpressioncontext.py

170 lines
7.5 KiB
Python

import pyparsing as pp
from expressionparser import BaseExpressionContext
class ADExpressionContext(BaseExpressionContext):
def __init__(self,appdaemon_api, sensor_existence_validation = True):
self.appdaemon_api = appdaemon_api
self.variables = {}
self.constants = list()
self.sensor_aliases = list()
self.parser = None
self.entities_to_listen = dict()
self.variables_parsed_data = dict()
self.sensors_default_value = dict()
self.sensor_states = dict()
self.sensor_existence_validation = sensor_existence_validation
self.declare_default_constants()
def reset(self):
self.sensor_states = dict()
def declare_default_constants(self):
self.declare_constant("True",True)
self.declare_constant("False",False)
self.declare_constant("None",None)
class EvaluateBase:
def get_value(self,context): return None
def get_sensor_name_with_attribute(self,context): return None
class EvaluateConstant(EvaluateBase):
def __init__(self, tokens):
assert len(tokens) == 1
self.constant_name = tokens[0]
def get_value(self,context):
return context.get_constant_value(self.constant_name)
class EvaluateSensor(EvaluateBase):
def __init__(self, tokens):
assert len(tokens) == 1
self.tokens = tokens[0]
def get_value(self,context):
sensor_name_with_attribute = self.get_sensor_name_with_attribute(context)
if sensor_name_with_attribute[1] != None:
return context.appdaemon_api.get_state(sensor_name_with_attribute[0], attribute = sensor_name_with_attribute[1])
else:
return context.appdaemon_api.get_state(sensor_name_with_attribute[0])
def get_sensor_name_with_attribute(self,context):
if isinstance(self.tokens,ADExpressionContext.EvaluateConstant):
sensor_name = self.tokens.get_value(context)
else:
assert len(self.tokens) == 3
sensor_name = f"{self.tokens[0]}{self.tokens[1]}{self.tokens[2]}"
return (sensor_name,None) #None is for the actual state of the sensor, not an attribute
class EvaluateAttribute(EvaluateSensor):
def get_sensor_name_with_attribute(self,context):
sensor_name_with_attribute = self.tokens[0].get_sensor_name_with_attribute(context)
return (sensor_name_with_attribute[0],self.tokens[2])
def create_variable_parser(self):
separator = pp.Literal(".")
word = pp.Word(pp.alphanums + "_")
constant = pp.oneOf([constant for constant in self.constants]).set_parse_action(self.EvaluateConstant)
sensor_alias = pp.oneOf([constant for constant in self.sensor_aliases]).set_parse_action(self.EvaluateConstant)
sensor = sensor_alias | pp.Group(word + separator + word)
sensor.set_parse_action(self.EvaluateSensor)
sensor_with_attribute = pp.Group(sensor + separator + word).set_parse_action(self.EvaluateAttribute)
expression = sensor_with_attribute | constant | sensor
return expression
def add_variable(self,variable_name):
parsed_data = self.evaluate_variable_expression(variable_name)
#Couldn't parse the variable, it's either an unknown constant or a bad formed string
if parsed_data == None:
return False
sensor_name_with_attribute = parsed_data.get_sensor_name_with_attribute(self)
self.variables_parsed_data[variable_name] = parsed_data
if sensor_name_with_attribute != None:
sensor_name = sensor_name_with_attribute[0]
if self.sensor_existence_validation and sensor_name not in self.sensors_default_value and not self.appdaemon_api.entity_exists(sensor_name):
return False
if not sensor_name in self.entities_to_listen:
self.entities_to_listen[sensor_name] = []
if not sensor_name_with_attribute[1] in self.entities_to_listen[sensor_name]:
self.entities_to_listen[sensor_name].append(sensor_name_with_attribute[1])
return True
def get_variable_value(self,variable_name):
parsed_data = self.variables_parsed_data[variable_name]
sensor_name_with_attribute = parsed_data.get_sensor_name_with_attribute(self)
if sensor_name_with_attribute == None:
return parsed_data.get_value(self)
else:
if sensor_name_with_attribute[1] != None:
key_name = f"{sensor_name_with_attribute[0]}.{sensor_name_with_attribute[1]}"
else:
key_name = sensor_name_with_attribute[0]
if key_name in self.sensor_states:
return self.sensor_states[key_name]
else:
if sensor_name_with_attribute[1] != None:
value = self.appdaemon_api.get_state(sensor_name_with_attribute[0], attribute = sensor_name_with_attribute[1])
# accessing an attribute of unavailable entity should return unavailable
# whereas accessing an attribute that doesn't exist return None
if value == None and self.appdaemon_api.get_state(sensor_name_with_attribute[0]) == 'unavailable':
return 'unavailable'
else:
if self.appdaemon_api.entity_exists(sensor_name_with_attribute[0]):
value = self.appdaemon_api.get_state(sensor_name_with_attribute[0])
else: #we should always have a default value if we get here
value = self.sensors_default_value[sensor_name_with_attribute[0]]
self.sensor_states[key_name] = value
return value
def get_entities_to_listen(self):
return self.entities_to_listen
def get_constant_value(self,name): return self.variables[name]
def declare_constant(self,name : str,value):
def is_a_sensor(string):
if isinstance(string,str):
splitted_string = string.split('.')
if len(splitted_string) == 2:
for part in splitted_string:
if not part.replace('_', '').isalnum() or not part.isascii():
return False
return True
return False
assert name not in self.variables, f"Constant {name} already exist with value {self.variables[name]}"
assert name.replace('_', '').isalnum() and name.isascii(), f"Invavalid constant name: {name}"
self.variables[name] = value
if is_a_sensor(value):
self.sensor_aliases.append(name)
else:
self.constants.append(name)
self.parser = None #adding new constant need to reset the parser
def declare_sensor_alias_default_value(self, alias, default_value):
assert alias in self.variables, f"Unknown alias {alias} please declare it first"
sensor_name = self.variables[alias]
assert sensor_name not in self.sensors_default_value, f"{sensor_name} has already a default value {self.sensors_default_value[sensor_name]}"
self.sensors_default_value[sensor_name] = default_value
def evaluate_variable_expression(self,expression_string):
if not self.parser:
self.parser = self.create_variable_parser()
try: return self.parser.parseString(expression_string,parse_all=True)[0]
except pp.ParseException: return None