#!/usr/bin/python # # asfquart app with minimal interface, and background tasks to manage # the UPnP IGD device, listen to incoming requests from Alexa, and # maintain light-state and perform transitions (on, off, toggle). # # This app has some specific concerns around managing its port, since # it gets mapped to a public IP/port. # # desc: http://192.168.0.1:49153/IGDdevicedesc_brlan0.xml # st: urn:schemas-upnp-org:device:InternetGatewayDevice:1 # http://192.168.0.1:49153/upnp/control/WANIPConnection0 # # NOTE: Home Assistant calls this a "light". We also use the term # "relay" interchangeably since they are tied. # import sys import logging import operator import quart import aiohttp import asfquart import careful # symlinked to origin _LOGGER = logging.getLogger(__name__) # Various header values. SNS_TYPE_HEADER = 'x-amz-sns-message-type' HDR_SNS_TYPE = 'x-amz-sns-message-type' HDR_SNS_TOPIC = 'x-amz-sns-topic-arn' SNS_TYPE_SUBSCRIBE = 'SubscriptionConfirmation' SNS_TYPE_NOTIFY = 'Notification' SNS_TYPE_UNSUBSCRIBE = 'UnsubscribeConfirmation' ### not needed? # Check UPnP every N minutes. UPNP_CHECK_DELAY = 5 * 60 class AlexaListener: "Delivers Alexa commands to Home Assistant." def __init__(self, cfg): self.cfg = cfg # Wait until a loop is running. self.session = None # @APP.before_serving async def prepare(self): # This requires a loop, so we use before_serving() to create # the session once the loop is up and running. self.session = aiohttp.ClientSession() # @APP.post /alexa async def endpoint(self): ### TODO: additional defensive logic # Essentially, two types of request. It's in the headers. hdrs = quart.request.headers reqtype = hdrs.get(HDR_SNS_TYPE) if not reqtype: _LOGGER.error('Missing HDR_SNS_TYPE') return 'Bad headers', 400 # Don't provide why. # NOTE: don't parse the body as JSON unless the request looks # somewhat reasonable. if reqtype == SNS_TYPE_SUBSCRIBE: # SNS does not set the "proper" content-type, so force parsing # the body as JSON j = await quart.request.get_json(force=True) #print('BODY:', j) return await _confirm(j['SubscribeURL'], hdrs.get(HDR_SNS_TOPIC)) if reqtype == SNS_TYPE_NOTIFY: j = await quart.request.get_json(force=True) #print('BODY:', j) return await _notify(j['Message']) # Don't disclose what we're looking for. Just fail. _LOGGER.error(f'error. unknown reqtype "{reqtype}"') return '', 400 async def _confirm(self, url, topic): "Confirm to AWS SNS that we are a valid endpoint." _LOGGER.debug(f'Confirming: {url}') _ = await self.session.head(url) _LOGGER.info(f'Subscribed to: {topic}') return '', 204 async def _notify(self, msg): "Process a message from the AWS Lambda / Alexa handler." if msg.startswith('h'): # HomeAssistant light identifier follows. return await self._handle_hass(msg) # Assume v1. return await self._handle_v1(msg) async def _handle_v1(self, msg): "Process a v1 messsage of 'device:state'" # Map the inbound DEVICE to a HASS entity. # Map the inbound on/off to an OPERATION value for HASS. # Message format is: "04x:1d" representing device ID and on/off. if len(msg) != 6 or msg[4] != ':' or not ('0'<=msg[5]<='1'): _LOGGER.error(f'Message has improper format: {msg}') # Note: don't tell caller how the message failed. return 'Bad request.', 400 try: device = int(msg[:4], 16) except ValueError: _LOGGER.error(f'Device code is improper: {msg[:4]}') # Note: don't tell caller how the message failed. return 'Bad request.', 400 ### wrong ID in the AWS/Alexa Lambda if device == 0x281e: device = 0x280e # Back Hall turn_on = (msg[5] == '1') for light in self.cfg.lights.values(): if light.get('i2c') == device: break else: _LOGGER.error(f'Unknown device code: {device:04x}') # Note: don't tell caller how the message failed. return 'Bad request.', 400 # Use Home Assistant to perform the toggle. v1 message, but v2 # to operate the light. operation = 'turn_on' if turn_on else 'turn_off' # third op: toggle return self._call_hass(light, operation) async def _handle_hass(self, msg): "Process a HomeAssisant messsage of 'h:entity/operation'" _LOGGER.debug(f'HAss MESSAGE: {msg}') return '', 204 async def _call_hass(self, light, operation): _LOGGER.info(f'Performing {operation.upper()} on "{light.label}"') url = f'{self.cfg.network.hass_api}/services/light/{operation}' headers = { 'Authorization': f'Bearer {self.cfg.network.hass}', } data = { 'entity_id': f'light.{light.hass}', # the entity ID } response = await self.session.post(url, headers=headers, json=data) print('RESPONSE:', await response.text()) return f'Performed "{operation}" on "{light.label}"' # @APP.route /test/ async def test_endpoint(self, msg): # Test endpoint where the message is in the URL, not the JSON body. return await self._notify(msg) class RelayManager: "Handle state changes for the relays." ### for now, this is an HTTP request to the Arduino ### FUTURE: deliver to a Pico to perform the I2C calls ### FUTURE: deliver to a Pico to manipulate GPIO pins def __init__(self, cfg): self.host = cfg.network.L110 # Arduino or Pico ### for now, the arduino LAN->I2c proxy ("middleman") self.host = '192.168.0.3' _LOGGER.debug(f'Using {self.host}:80 to alter relays') self.relays = { } # HASS-entity: LIGHT-dict # Pull out the relay-based lights. for light in cfg.lights.values(): if light.get('board') == 'L110' and 'v3gpio' not in light: assert light.i2c, f'Oops. {light.label}' light.state = None # Unknown ... Inject state into the edict. self.relays[light.hass] = light _LOGGER.debug(f'Relays: {len(self.relays)} found;' f' sample: {",".join(list(self.relays.keys())[:3])}') # @APP.get('/relay//') async def relay_change(self, name, action): # ACTION: on, off, toggle if (relay := self.relays.get(name)) is None: _LOGGER.error(f'Unknown relay: {name}') return '', 404 if action not in { 'on', 'off', 'toggle' }: _LOGGER.error(f'Unknown action: {action}') return '', 400 # bad ACTION value if action == 'on': setting = True elif action == 'off': setting = False else: setting = not relay.state _LOGGER.info(f'Performing {action.upper()} on "{relay.label}".' f' New state: {"ON" if setting else "OFF"}') ### FUTURE: use _call_pico to do this result = await self._call_arduino(relay, setting) # Success. Update the state. relay.state = setting print('RESULT:', result) return result async def _call_arduino(self, relay, setting): opcode = relay.i2c if setting: opcode |= 0x0020 #print(f'WOULD INVOKE: {opcode:04x}') careful.invoke(f'{opcode:04x}'.encode('latin-1')) return '', 204 async def _call_pico(self, relay, setting): # Same as our API endpoints, but no "toggle" since we own the State. action = 'on' if setting else 'off' url = f'http://{self.host}/relay/{relay.hass}/{action}' _LOGGER.debug(f'Using URL: {url}') response = await self.session.get(url) # We don't care about the body. The response has read the headers, # and is now pending consumption of the body. Do it. _ = response.text return '', 204 # @APP.get('/list') async def list_relays(self): relays = [ dict(id=r.hass, name=r.label, on=r.state) for r in self.relays.values() ] return relays # auto-rendered as JSON # @APP.get('/state/') async def get_state(self, name): if (relay := self.relays.get(name)) is None: _LOGGER.error(f'Unknown relay: {name}') return '', 404 # Caller wants to know "on". # We have a tri-state: True, False, None. Assume False for Unknown(None). # Proper result (JSON) values: true, false return { "on": bool(relay.state) } # @APP.get('/show') # @APP.template('relays.ezt') async def show_relays(self): # Make a local copy of the relays/state, then inject a state value # for use within the template. relays = sorted(self.relays.values(), key=operator.attrgetter('label')) for relay in relays: # JavaScript true/false boolean (initial) state relay.jsstate = 'true' if relay.state else 'false' return { 'lights': relays, } # fed into EZT async def _ping_url(url): LOGGER.info(f'LIGHT URL: {url}') async with aiohttp.ClientSession() as session: async with session.get(url) as response: # We don't care about the body. The response has read the headers, # and is now pending consumption of the body. There is no close() # function, so we're just simpy done. pass return '', 204 async def maintain_upnp(): "Runner to ensure the IGD UPnP is configured, to point to this app." try: while True: try: await _check_upnp() except: _LOGGER.exception('Error during UPnP check') # Wait for a while. App shutdown will cancel this sleep. await asyncio.sleep(UPNP_CHECK_DELAY) except: _LOGGER.exception('error in runner') async def _check_upnp(): _LOGGER.debug('TBD: check upnp config on IGD') def main(): logging.basicConfig(level=logging.DEBUG, style='{', format='[{asctime}|{levelname}|{module}] {message}', datefmt='%m/%d %H:%M', ) app = asfquart.construct('alexa') # Keep the IGD configured to point to this app. ### not working for some reason. ??? ### disable for now. a cron is backstopping this. #app.add_runner(maintain_upnp, name='UPnP Maintainer') # Build the singleton to manage incoming Alexa requests. alexa = AlexaListener(app.cfg) # Now that we have an APP and ALEXA, add the route. app.post('/alexa')(alexa.endpoint) ### add a testing endpoint for v1 message from the Lambda app.get('/test/')(alexa.test_endpoint) # The Alexa listener needs some preparation, once a loop exists. app.before_serving(alexa.prepare) # Create singleton to alter the relays. relays = RelayManager(app.cfg) # Set up endpoints for relay management. app.get('/relay//')(relays.relay_change) app.get('/list')(relays.list_relays) app.get('/state/')(relays.get_state) app.get('/show')(app.use_template('relays.ezt')(relays.show_relays)) # The port we should use is in the config. app.runx(port=app.cfg.network.alexa_port) if __name__ == '__main__': main()