Files
ad_toolbox/eventhandler.py

187 lines
8.7 KiB
Python

import appdaemon.plugins.hass.hassapi as hass
import urllib.request
import json
# =============================================================================
# EventDispatcher / EventHandler — HA event subscription helpers
# =============================================================================
#
# EventDispatcher
# ---------------
# Subscribes to a single HA event and invokes a callback when the event fires
# and its payload matches the configured filter.
#
# event_name : HA event name to listen for.
# event_data : dict of key/value pairs that must all match the event
# payload (deep partial match — nested dicts are matched
# recursively). None means "match any payload".
# Special key: ``device_name`` — for zha_event payloads you
# may supply the friendly ZHA device name instead of the raw
# IEEE address. It is resolved once at construction time via
# the ``zha/get_devices`` service and stored as
# ``device_ieee`` in the effective filter dict.
# reset_data : optional dict. When set, the dispatcher becomes a
# one-shot latch: the callback fires once on event_data
# match, then waits for a reset_data match before it can
# fire again. Useful for press/release pairs.
# event_context: arbitrary value forwarded as the third argument to the
# callback — use it to identify which dispatcher fired.
#
# Callback signature: callback(event_name, event_data, event_context)
#
# EventHandler
# ------------
# Convenience wrapper that creates one EventDispatcher per entry in an
# events_block dict (the YAML "events_to_listen" structure).
#
# YAML events_block format:
# events_to_listen:
# <key>:
# event_name: <ha_event_name> # required
# event_data: # optional — payload filter
# <field>: <value>
# reset_data: # optional — latch reset filter
# <field>: <value>
#
# Usage:
# handler = EventHandler(ad_api, self.args['events_to_listen'], my_callback)
# # Keep a reference to handler — it owns the subscriptions.
#
# Or construct the events_block programmatically:
# handler = EventHandler(
# ad_api,
# {'evt': {'event_name': 'MY_EVENT', 'event_data': {'key': 'value'}}},
# my_callback,
# event_context='optional_context'
# )
# =============================================================================
class EventDispatcher:
def __init__(self,ad_api,event_name,callback,event_data,reset_data,event_context):
self.ad_api = ad_api
event_data = self._resolve_zha_device_name(event_data)
reset_data = self._resolve_zha_device_name(reset_data)
self.event_name = event_name
self.callback = callback
self.event_data = event_data
self.reset_data = reset_data
self.waiting_for_reset = False
self.event_context = event_context
if event_data == None:
self.ad_api.listen_event(self.on_event,event_name)
else:
event_kwargs = {}
for key,value in event_data.items(): #when we have dict in event_data, we usually only want a partial match, we deal with that in process_event
if not isinstance(value,dict):
event_kwargs[key] = value
self.ad_api.listen_event(self.on_event,event_name,**event_kwargs)
if reset_data:
reset_data_kwargs = {}
for key,value in reset_data.items(): #when we have dict in event_data, we usually only want a partial match, we deal with that in process_event
if not isinstance(value,dict):
reset_data_kwargs[key] = value
if reset_data_kwargs != event_kwargs:
self.ad_api.listen_event(self.on_event,event_name,**reset_data_kwargs)
def on_event(self, event_name, data, kwargs):
self.process_event(data)
def _lookup_zha_ieee(self, device_name):
# Resolve a ZHA device friendly name to its IEEE address by rendering
# a Jinja2 template via the HA REST API (/api/template).
# Retrieves the HA URL and token from the HASS plugin config object.
try:
plugin = self.ad_api.AD.plugins.get_plugin_object('default')
ha_url = str(plugin.config.ha_url).rstrip('/')
token = plugin.config.token.get_secret_value()
template = (
"{% set ns = namespace(ieee='') %}"
"{% for eid in integration_entities('zha') %}"
" {% set did = device_id(eid) %}"
" {% if did and not ns.ieee %}"
" {% if device_attr(did, 'name_by_user') == '" + device_name + "'"
" or device_attr(did, 'name') == '" + device_name + "' %}"
" {% for conn in device_attr(did, 'connections') %}"
" {% if conn[0] == 'zigbee' %}{% set ns.ieee = conn[1] %}{% endif %}"
" {% endfor %}"
" {% endif %}"
" {% endif %}"
"{% endfor %}"
"{{ ns.ieee }}"
)
payload = json.dumps({'template': template}).encode('utf-8')
req = urllib.request.Request(
f"{ha_url}/api/template",
data=payload,
headers={'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'},
method='POST',
)
with urllib.request.urlopen(req, timeout=10) as resp:
result = resp.read().decode('utf-8').strip()
return result if result else None
except Exception as e:
self.ad_api.log_error(f"[EventDispatcher] Failed to resolve ZHA device name '{device_name}': {e}")
return None
def _resolve_zha_device_name(self, data):
# Replace a device_name key in data with device_ieee looked up from
# the ZHA device registry. Returns data unchanged when the key is
# absent or data is None.
if data is None or 'device_name' not in data:
return data
name = data['device_name']
ieee = self._lookup_zha_ieee(name)
resolved = {k: v for k, v in data.items() if k != 'device_name'}
if ieee is not None:
self.ad_api.log_info(f"[EventDispatcher] Resolved ZHA device name '{name}' to IEEE '{ieee}'")
resolved['device_ieee'] = ieee
else:
self.ad_api.log_error(f"[EventDispatcher] ZHA device '{name}' not found — 'device_name' filter ignored")
return resolved
def process_event(self,data):
def are_data_matching(ref_data, data):
if ref_data != None:
for key in ref_data:
if not key in data:
return False
if isinstance(ref_data[key],dict):
if not are_data_matching(ref_data[key],data[key]):
return False
elif ref_data[key] != data[key]:
return False
return True
#lets see if those args match
match_event_data = are_data_matching(self.event_data,data)
match_reset_data = are_data_matching(self.reset_data,data)
#special threatment for event
if match_event_data and not self.waiting_for_reset:
if self.reset_data != None:
self.waiting_for_reset = True
self.callback(self.event_name, self.event_data ,self.event_context)
return True
if self.reset_data != None and match_reset_data:
self.waiting_for_reset = False
return False
class EventHandler:
def __init__(self, ad_api,events_block,callback,event_context = None): #event_context is passed back to the CB has a context
def register_event_with_params(event_block,callback,event_context):
self.add_dispatcher(event_block['event_name'],callback,event_block['event_data'] if 'event_data' in event_block else None, event_block['reset_data'] if 'reset_data' in event_block else None,event_context)
self.__ad_api = ad_api
self.event_dispatchers = []
for event_block in events_block.values():
register_event_with_params(event_block,callback,event_context)
def add_dispatcher(self,event_name,callback,event_data = None,reset_data = None ,event_context = None):
self.__ad_api.log_info(f'Registering dispatcher {callback.__name__} for event "{event_name}" ({event_data})')
dispatcher = EventDispatcher(self.__ad_api,event_name,callback,event_data,reset_data,event_context)
self.event_dispatchers.append(dispatcher)