355 lines
12 KiB
Python
355 lines
12 KiB
Python
import pyparsing as pp
|
|
import threading
|
|
import datetime
|
|
|
|
'''Base ExpressionContext for the parser
|
|
MUST be implemented :
|
|
get_variable_value(self,variable_name) is called when evaluating the expression to retrieve the value of a variable
|
|
|
|
optional:
|
|
add_variable(self,variable_name) is called when a variable is encounter during the parsing of the expression
|
|
|
|
reset(self) is called before a new evaluation
|
|
|
|
'''
|
|
class BaseExpressionContext:
|
|
def add_variable(self,variable_name): return True
|
|
|
|
def get_variable_value(self,variable_name): return None
|
|
|
|
def reset(self): pass
|
|
|
|
class ParsingException(Exception):
|
|
def __init__(self, message = None, expression_string = None, char_pos = -1):
|
|
self.char_pos = char_pos
|
|
self.message = message
|
|
self.expression_string = expression_string
|
|
|
|
def __str__(self):
|
|
string = ""
|
|
if self.expression_string:
|
|
string += f'Error parsing string "{self.expression_string}"'
|
|
if self.char_pos >= 0:
|
|
string += f' at character {self.char_pos}'
|
|
string += ':'
|
|
elif self.char_pos >= 0:
|
|
string += f'Parsing error at character {self.char_pos}'
|
|
if self.message:
|
|
string += self.message
|
|
|
|
return string
|
|
|
|
class EvaluationException(Exception):
|
|
def __init__(self, message = None, expression_string = None, char_pos = -1, operands = []):
|
|
self.char_pos = char_pos
|
|
self.message = message
|
|
self.expression_string = expression_string
|
|
self.operands = operands
|
|
|
|
def __str__(self):
|
|
string = ""
|
|
if self.expression_string:
|
|
string += f'Error evaluating "{self.expression_string}"'
|
|
if self.char_pos >= 0:
|
|
string += f' at character {self.char_pos}'
|
|
string += ':'
|
|
elif self.char_pos >= 0:
|
|
string += f'Parsing error at character {self.char_pos}'
|
|
if self.message:
|
|
string += self.message
|
|
|
|
return string
|
|
|
|
class EvaluateBase(object):
|
|
def eval_bool(self) : return self.convert_to_bool(self.eval())
|
|
|
|
@staticmethod
|
|
def convert_to_bool(value):
|
|
if value == 'off': return False
|
|
if value == 0: return False
|
|
if value == 'False': return False
|
|
if value == 'false': return False
|
|
if value == False: return False
|
|
if value == None: return False
|
|
# this should be handle explicitely by the condition
|
|
# edit: this is actually usefull if don't want disable to be evaluated as true
|
|
if value == 'unavailable': return False
|
|
|
|
return True
|
|
|
|
@staticmethod
|
|
def convert_python_type(value):
|
|
if value == None: return None
|
|
|
|
if type(value) == str:
|
|
#converting to bool
|
|
if value == 'False': return False
|
|
if value == 'false': return False
|
|
#if value == False: return False
|
|
|
|
if value == 'True': return True
|
|
if value == 'true': return True
|
|
#if value == True: return True
|
|
|
|
#Converting "None" to None
|
|
if value == "none" or value == "None": return None
|
|
|
|
#converting to int
|
|
try: return int(value)
|
|
except: pass
|
|
|
|
#converting to float
|
|
try: return float(value)
|
|
except: pass
|
|
|
|
#converting to datetime
|
|
try: return datetime.datetime.strptime(value,"%Y-%m-%dT%H:%M:%S.%f%z")
|
|
except: pass
|
|
|
|
# this should really be a string
|
|
return value
|
|
else:
|
|
#it's probably already been converted
|
|
return value
|
|
|
|
class EvaluateOperand(EvaluateBase):
|
|
def __init__(self, expression_string: str, location: int, tokens: pp.ParseResults):
|
|
self.value = tokens[0]
|
|
|
|
def __str__(self): return str(self.eval())
|
|
|
|
def eval(self): pass
|
|
def set_context(self,context): pass
|
|
|
|
class EvaluateNumber(EvaluateOperand):
|
|
def eval(self): return EvaluateBase.convert_python_type(self.value)
|
|
# try: return float(self.value)
|
|
# except: return float('nan')
|
|
|
|
class EvaluateString(EvaluateOperand):
|
|
def eval(self): return str(self.value)
|
|
|
|
class EvaluateVariable(EvaluateOperand):
|
|
def __init__(self, expression_string: str, location: int, tokens: pp.ParseResults):
|
|
super().__init__(expression_string, location, tokens)
|
|
self.context : BaseExpressionContext = None
|
|
self.expression_string = expression_string
|
|
self.location = location
|
|
|
|
def eval(self):
|
|
return_value = self.context.get_variable_value(self.value)
|
|
return EvaluateBase.convert_python_type(return_value)
|
|
# if return_value == None:
|
|
# return None
|
|
|
|
# if type(return_value) == bool: return return_value
|
|
|
|
# try: return_value = float(return_value)
|
|
# except: pass
|
|
|
|
# return return_value
|
|
|
|
def __str__(self): return f"{self.value}[{self.eval()}]"
|
|
|
|
def set_context(self,context : BaseExpressionContext):
|
|
self.context = context
|
|
if not self.context.add_variable(self.value):
|
|
raise ParsingException(message = f"Unknown variable : {self.value}",char_pos= self.location, expression_string=self.expression_string)
|
|
|
|
def opPair(tokenlist):
|
|
#Return pairs of tokens to the caller as a generator.
|
|
it = iter(tokenlist)
|
|
while 1:
|
|
try: yield(next(it), next(it))
|
|
except StopIteration: break
|
|
|
|
class EvaluateOperator(EvaluateBase):
|
|
def __init__(self, expression_string: str, location: int, tokens: pp.ParseResults):
|
|
self.value = tokens[0]
|
|
self.expression_string = expression_string
|
|
self.location = location
|
|
|
|
def set_context(self,context):
|
|
for element in self.value:
|
|
if isinstance(element,EvaluateBase): element.set_context(context)
|
|
|
|
class EvaluatSignOp(EvaluateOperator):
|
|
def eval(self):
|
|
assert(len(self.value) == 2)
|
|
if self.value[0] == '-':
|
|
return -self.value[1].eval()
|
|
else:
|
|
return self.value[1].eval()
|
|
|
|
class EvaluateNot(EvaluateOperator):
|
|
|
|
operators = {
|
|
"not": ["not", lambda a: not EvaluateNot.convert_to_bool(a)]
|
|
}
|
|
|
|
def eval(self):
|
|
for op, val in opPair(self.value[0:]):
|
|
fn = EvaluateNot.operators[op][1]
|
|
val2 = val.eval_bool()
|
|
val = fn(val2)
|
|
return val
|
|
|
|
def __str__(self):
|
|
return " ".join([str(v) for v in self.value])
|
|
|
|
class EvaluateBinaryOp(EvaluateOperator):
|
|
|
|
def eval(self):
|
|
val1 = self.value[0].eval()
|
|
for op, val in opPair(self.value[1:]):
|
|
fn = self.operators[op][1]
|
|
if not self.should_skip_second_operand(op,val1):
|
|
val2 = val.eval()
|
|
try: val1 = fn(val1, val2)
|
|
except TypeError: raise EvaluationException(message = f"Couldn't not evaluate '{val1} {op} {val2}'",char_pos= self.location, expression_string=self.expression_string,operands = [val1,val2])
|
|
return val1
|
|
|
|
def should_skip_second_operand(self,operator,first_value): return False
|
|
|
|
def __str__(self):
|
|
string = "("
|
|
val1 = str(self.value[0])
|
|
for op, val in opPair(self.value[1:]):
|
|
fn = self.operators[op][0]
|
|
if not self.should_skip_second_operand(op,self.value[0].eval()):
|
|
val2 = val
|
|
else: val2 = "[Skipped]"
|
|
string += f"{val1} {fn} {val2}"
|
|
|
|
evaluation_result = self.eval()
|
|
try: evaluation_result = self.eval()
|
|
except TypeError: evaluation_result = "Evaluation Error"
|
|
|
|
return string + f")[{evaluation_result}]"
|
|
|
|
class EvaluateMulOp(EvaluateBinaryOp):
|
|
|
|
operators = {
|
|
"*": ["*", lambda a, b: a * b],
|
|
"/": ["/", lambda a, b: a / b],
|
|
}
|
|
|
|
class EvaluatePlusOp(EvaluateBinaryOp):
|
|
|
|
operators = {
|
|
"+": ["+", lambda a, b: a + b],
|
|
"-": ["-", lambda a, b: a - b],
|
|
}
|
|
|
|
class EvaluateComparison(EvaluateBinaryOp):
|
|
|
|
operators = {
|
|
"<": ["<", lambda a, b: EvaluateBase.convert_python_type(a) < EvaluateBase.convert_python_type(b)],
|
|
">": [">", lambda a, b: EvaluateBase.convert_python_type(a) > EvaluateBase.convert_python_type(b)],
|
|
">=": [">=", lambda a, b: EvaluateBase.convert_python_type(a) >= EvaluateBase.convert_python_type(b)],
|
|
"<=": ["<=", lambda a, b: EvaluateBase.convert_python_type(a) <= EvaluateBase.convert_python_type(b)],
|
|
"==": ["==", lambda a, b: EvaluateBase.convert_python_type(a) == EvaluateBase.convert_python_type(b)],
|
|
"!=": ["!=", lambda a, b: EvaluateBase.convert_python_type(a) != EvaluateBase.convert_python_type(b)],
|
|
}
|
|
|
|
class EvaluateOrAnd(EvaluateBinaryOp):
|
|
|
|
operators = {
|
|
"and": ["and", lambda a, b: EvaluateBase.convert_to_bool(a) and EvaluateBase.convert_to_bool(b)],
|
|
"or": ["or", lambda a, b: EvaluateBase.convert_to_bool(a) or EvaluateBase.convert_to_bool(b)],
|
|
}
|
|
|
|
def should_skip_second_operand(self,operator,first_value):
|
|
if first_value == False and operator == "and":
|
|
return True
|
|
if first_value == True and operator == "or":
|
|
return True
|
|
return False
|
|
|
|
class EvaluateTernary(EvaluateOperator):
|
|
def eval(self):
|
|
ret = self.value[0].eval_bool()
|
|
i = 1
|
|
while i < len(self.value):
|
|
ret = self.value[i+1].eval() if ret else self.value[i+3].eval()
|
|
i += 4
|
|
return ret
|
|
|
|
def __str__(self):
|
|
condition = self.value[0].eval_bool()
|
|
return f"({self.value[0]} {self.value[1]} {self.value[2] if condition else '[Skipped]'} {self.value[3]} {self.value[4] if not condition else '[Skipped]'})[{self.eval()}]"
|
|
|
|
def create_parser():
|
|
pp.ParserElement.enablePackrat()
|
|
#sys.setrecursionlimit(3000)
|
|
|
|
#floatNumber = pp.Regex(r'[-]?\d+(\.\d*)?([eE][-+]?\d+)?').setParseAction(EvaluateNumber)
|
|
floatNumber = pp.pyparsing_common.real.setParseAction(EvaluateNumber)
|
|
intNumber = pp.pyparsing_common.signed_integer.setParseAction(EvaluateNumber)
|
|
variable = pp.Word(pp.alphanums + "_.").setParseAction(EvaluateVariable)
|
|
string = pp.QuotedString(quoteChar="'").setParseAction(EvaluateString)
|
|
|
|
|
|
operand = floatNumber | intNumber | variable | string
|
|
|
|
signop = pp.oneOf(("+", "-"))
|
|
multop = pp.oneOf(("*", "/"))
|
|
plusop = pp.oneOf(("+", "-"))
|
|
comparisonOp = pp.oneOf(("<", "<=", ">", ">=" ,"==", "!="))
|
|
notOp = pp.Keyword("not")
|
|
andOp = pp.Keyword("and")
|
|
orOp = pp.Keyword("or")
|
|
#boolOp = pp.oneOf(("and" , "or"))
|
|
|
|
#https://stackoverflow.com/questions/63225718/pyparsing-precedence-breaks-with-unary-operator
|
|
expr = pp.infixNotation(
|
|
operand,
|
|
[
|
|
(signop, 1, pp.opAssoc.RIGHT,EvaluatSignOp),
|
|
(multop, 2, pp.opAssoc.LEFT,EvaluateMulOp),
|
|
(plusop, 2, pp.opAssoc.LEFT,EvaluatePlusOp),
|
|
(comparisonOp,2,pp.opAssoc.LEFT,EvaluateComparison),
|
|
(notOp,1,pp.opAssoc.RIGHT,EvaluateNot),
|
|
(andOp,2,pp.opAssoc.LEFT,EvaluateOrAnd),
|
|
(orOp,2,pp.opAssoc.LEFT,EvaluateOrAnd),
|
|
(("?", ":"), 3, pp.opAssoc.RIGHT,EvaluateTernary)
|
|
],)
|
|
|
|
return expr
|
|
|
|
class ExpressionParser(object):
|
|
|
|
#the parser need to be created only once
|
|
parser = create_parser()
|
|
parser_lock = threading.RLock() #the parser can be access from many app at the same time
|
|
|
|
def __init__(self, expression_string : str, expression_context : BaseExpressionContext):
|
|
self.context = expression_context
|
|
self.expression_string = expression_string #let's keep it for debuging
|
|
with ExpressionParser.parser_lock:
|
|
try: self.parsed_data = ExpressionParser.parser.parseString(expression_string,parse_all=True)[0]
|
|
except pp.ParseException as e: raise ParsingException(message = str(e),char_pos=e.column,expression_string=expression_string)
|
|
# except KeyError as e:
|
|
# import traceback
|
|
# message = str(e)
|
|
# for line in traceback.format_stack()[:-1]:
|
|
# message += line.strip()
|
|
# raise ParsingException(message = message,char_pos=-1,expression_string=expression_string)
|
|
|
|
self.parsed_data.set_context(self.context)
|
|
|
|
def __str__(self):
|
|
return str(self.parsed_data)
|
|
|
|
def evaluate(self):
|
|
self.context.reset()
|
|
result = self.parsed_data.eval()
|
|
return result
|
|
|
|
class ConditionsParser(ExpressionParser):
|
|
|
|
def evaluate(self):
|
|
self.context.reset()
|
|
result = self.parsed_data.eval_bool()
|
|
return result
|