migration from ha-cantegrill repo

This commit is contained in:
2024-05-31 14:50:50 +02:00
commit 45376aebcb
20 changed files with 2998 additions and 0 deletions

354
expressionparser.py Normal file
View File

@@ -0,0 +1,354 @@
import pyparsing as pp
import threading
import datetime
'''Base ExpressionContext for the parser
MUST be implemented :
get_variable_value(self,variable_name) is called when evaluating the expression to retrieve the value of a variable
optional:
add_variable(self,variable_name) is called when a variable is encounter during the parsing of the expression
reset(self) is called before a new evaluation
'''
class BaseExpressionContext:
def add_variable(self,variable_name): return True
def get_variable_value(self,variable_name): return None
def reset(self): pass
class ParsingException(Exception):
def __init__(self, message = None, expression_string = None, char_pos = -1):
self.char_pos = char_pos
self.message = message
self.expression_string = expression_string
def __str__(self):
string = ""
if self.expression_string:
string += f'Error parsing string "{self.expression_string}"'
if self.char_pos >= 0:
string += f' at character {self.char_pos}'
string += ':'
elif self.char_pos >= 0:
string += f'Parsing error at character {self.char_pos}'
if self.message:
string += self.message
return string
class EvaluationException(Exception):
def __init__(self, message = None, expression_string = None, char_pos = -1, operands = []):
self.char_pos = char_pos
self.message = message
self.expression_string = expression_string
self.operands = operands
def __str__(self):
string = ""
if self.expression_string:
string += f'Error evaluating "{self.expression_string}"'
if self.char_pos >= 0:
string += f' at character {self.char_pos}'
string += ':'
elif self.char_pos >= 0:
string += f'Parsing error at character {self.char_pos}'
if self.message:
string += self.message
return string
class EvaluateBase(object):
def eval_bool(self) : return self.convert_to_bool(self.eval())
@staticmethod
def convert_to_bool(value):
if value == 'off': return False
if value == 0: return False
if value == 'False': return False
if value == 'false': return False
if value == False: return False
if value == None: return False
# this should be handle explicitely by the condition
# edit: this is actually usefull if don't want disable to be evaluated as true
if value == 'unavailable': return False
return True
@staticmethod
def convert_python_type(value):
if value == None: return None
if type(value) == str:
#converting to bool
if value == 'False': return False
if value == 'false': return False
#if value == False: return False
if value == 'True': return True
if value == 'true': return True
#if value == True: return True
#Converting "None" to None
if value == "none" or value == "None": return None
#converting to int
try: return int(value)
except: pass
#converting to float
try: return float(value)
except: pass
#converting to datetime
try: return datetime.datetime.strptime(value,"%Y-%m-%dT%H:%M:%S.%f%z")
except: pass
# this should really be a string
return value
else:
#it's probably already been converted
return value
class EvaluateOperand(EvaluateBase):
def __init__(self, expression_string: str, location: int, tokens: pp.ParseResults):
self.value = tokens[0]
def __str__(self): return str(self.eval())
def eval(self): pass
def set_context(self,context): pass
class EvaluateNumber(EvaluateOperand):
def eval(self): return EvaluateBase.convert_python_type(self.value)
# try: return float(self.value)
# except: return float('nan')
class EvaluateString(EvaluateOperand):
def eval(self): return str(self.value)
class EvaluateVariable(EvaluateOperand):
def __init__(self, expression_string: str, location: int, tokens: pp.ParseResults):
super().__init__(expression_string, location, tokens)
self.context : BaseExpressionContext = None
self.expression_string = expression_string
self.location = location
def eval(self):
return_value = self.context.get_variable_value(self.value)
return EvaluateBase.convert_python_type(return_value)
# if return_value == None:
# return None
# if type(return_value) == bool: return return_value
# try: return_value = float(return_value)
# except: pass
# return return_value
def __str__(self): return f"{self.value}[{self.eval()}]"
def set_context(self,context : BaseExpressionContext):
self.context = context
if not self.context.add_variable(self.value):
raise ParsingException(message = f"Unknown variable : {self.value}",char_pos= self.location, expression_string=self.expression_string)
def opPair(tokenlist):
#Return pairs of tokens to the caller as a generator.
it = iter(tokenlist)
while 1:
try: yield(next(it), next(it))
except StopIteration: break
class EvaluateOperator(EvaluateBase):
def __init__(self, expression_string: str, location: int, tokens: pp.ParseResults):
self.value = tokens[0]
self.expression_string = expression_string
self.location = location
def set_context(self,context):
for element in self.value:
if isinstance(element,EvaluateBase): element.set_context(context)
class EvaluatSignOp(EvaluateOperator):
def eval(self):
assert(len(self.value) == 2)
if self.value[0] == '-':
return -self.value[1].eval()
else:
return self.value[1].eval()
class EvaluateNot(EvaluateOperator):
operators = {
"not": ["not", lambda a: not EvaluateNot.convert_to_bool(a)]
}
def eval(self):
for op, val in opPair(self.value[0:]):
fn = EvaluateNot.operators[op][1]
val2 = val.eval_bool()
val = fn(val2)
return val
def __str__(self):
return " ".join([str(v) for v in self.value])
class EvaluateBinaryOp(EvaluateOperator):
def eval(self):
val1 = self.value[0].eval()
for op, val in opPair(self.value[1:]):
fn = self.operators[op][1]
if not self.should_skip_second_operand(op,val1):
val2 = val.eval()
try: val1 = fn(val1, val2)
except TypeError: raise EvaluationException(message = f"Couldn't not evaluate '{val1} {op} {val2}'",char_pos= self.location, expression_string=self.expression_string,operands = [val1,val2])
return val1
def should_skip_second_operand(self,operator,first_value): return False
def __str__(self):
string = "("
val1 = str(self.value[0])
for op, val in opPair(self.value[1:]):
fn = self.operators[op][0]
if not self.should_skip_second_operand(op,self.value[0].eval()):
val2 = val
else: val2 = "[Skipped]"
string += f"{val1} {fn} {val2}"
evaluation_result = self.eval()
try: evaluation_result = self.eval()
except TypeError: evaluation_result = "Evaluation Error"
return string + f")[{evaluation_result}]"
class EvaluateMulOp(EvaluateBinaryOp):
operators = {
"*": ["*", lambda a, b: a * b],
"/": ["/", lambda a, b: a / b],
}
class EvaluatePlusOp(EvaluateBinaryOp):
operators = {
"+": ["+", lambda a, b: a + b],
"-": ["-", lambda a, b: a - b],
}
class EvaluateComparison(EvaluateBinaryOp):
operators = {
"<": ["<", lambda a, b: EvaluateBase.convert_python_type(a) < EvaluateBase.convert_python_type(b)],
">": [">", lambda a, b: EvaluateBase.convert_python_type(a) > EvaluateBase.convert_python_type(b)],
">=": [">=", lambda a, b: EvaluateBase.convert_python_type(a) >= EvaluateBase.convert_python_type(b)],
"<=": ["<=", lambda a, b: EvaluateBase.convert_python_type(a) <= EvaluateBase.convert_python_type(b)],
"==": ["==", lambda a, b: EvaluateBase.convert_python_type(a) == EvaluateBase.convert_python_type(b)],
"!=": ["!=", lambda a, b: EvaluateBase.convert_python_type(a) != EvaluateBase.convert_python_type(b)],
}
class EvaluateOrAnd(EvaluateBinaryOp):
operators = {
"and": ["and", lambda a, b: EvaluateBase.convert_to_bool(a) and EvaluateBase.convert_to_bool(b)],
"or": ["or", lambda a, b: EvaluateBase.convert_to_bool(a) or EvaluateBase.convert_to_bool(b)],
}
def should_skip_second_operand(self,operator,first_value):
if first_value == False and operator == "and":
return True
if first_value == True and operator == "or":
return True
return False
class EvaluateTernary(EvaluateOperator):
def eval(self):
ret = self.value[0].eval_bool()
i = 1
while i < len(self.value):
ret = self.value[i+1].eval() if ret else self.value[i+3].eval()
i += 4
return ret
def __str__(self):
condition = self.value[0].eval_bool()
return f"({self.value[0]} {self.value[1]} {self.value[2] if condition else '[Skipped]'} {self.value[3]} {self.value[4] if not condition else '[Skipped]'})[{self.eval()}]"
def create_parser():
pp.ParserElement.enablePackrat()
#sys.setrecursionlimit(3000)
#floatNumber = pp.Regex(r'[-]?\d+(\.\d*)?([eE][-+]?\d+)?').setParseAction(EvaluateNumber)
floatNumber = pp.pyparsing_common.real.setParseAction(EvaluateNumber)
intNumber = pp.pyparsing_common.signed_integer.setParseAction(EvaluateNumber)
variable = pp.Word(pp.alphanums + "_.").setParseAction(EvaluateVariable)
string = pp.QuotedString(quoteChar="'").setParseAction(EvaluateString)
operand = floatNumber | intNumber | variable | string
signop = pp.oneOf(("+", "-"))
multop = pp.oneOf(("*", "/"))
plusop = pp.oneOf(("+", "-"))
comparisonOp = pp.oneOf(("<", "<=", ">", ">=" ,"==", "!="))
notOp = pp.Keyword("not")
andOp = pp.Keyword("and")
orOp = pp.Keyword("or")
#boolOp = pp.oneOf(("and" , "or"))
#https://stackoverflow.com/questions/63225718/pyparsing-precedence-breaks-with-unary-operator
expr = pp.infixNotation(
operand,
[
(signop, 1, pp.opAssoc.RIGHT,EvaluatSignOp),
(multop, 2, pp.opAssoc.LEFT,EvaluateMulOp),
(plusop, 2, pp.opAssoc.LEFT,EvaluatePlusOp),
(comparisonOp,2,pp.opAssoc.LEFT,EvaluateComparison),
(notOp,1,pp.opAssoc.RIGHT,EvaluateNot),
(andOp,2,pp.opAssoc.LEFT,EvaluateOrAnd),
(orOp,2,pp.opAssoc.LEFT,EvaluateOrAnd),
(("?", ":"), 3, pp.opAssoc.RIGHT,EvaluateTernary)
],)
return expr
class ExpressionParser(object):
#the parser need to be created only once
parser = create_parser()
parser_lock = threading.RLock() #the parser can be access from many app at the same time
def __init__(self, expression_string : str, expression_context : BaseExpressionContext):
self.context = expression_context
self.expression_string = expression_string #let's keep it for debuging
with ExpressionParser.parser_lock:
try: self.parsed_data = ExpressionParser.parser.parseString(expression_string,parse_all=True)[0]
except pp.ParseException as e: raise ParsingException(message = str(e),char_pos=e.column,expression_string=expression_string)
# except KeyError as e:
# import traceback
# message = str(e)
# for line in traceback.format_stack()[:-1]:
# message += line.strip()
# raise ParsingException(message = message,char_pos=-1,expression_string=expression_string)
self.parsed_data.set_context(self.context)
def __str__(self):
return str(self.parsed_data)
def evaluate(self):
self.context.reset()
result = self.parsed_data.eval()
return result
class ConditionsParser(ExpressionParser):
def evaluate(self):
self.context.reset()
result = self.parsed_data.eval_bool()
return result