#!/usr/bin/python # # asfquart app with minimal interface, and background tasks to manage # the UPnP IGD device and listen to incoming requests from Alexa. # # This app has some specific concerns around managing its port, since # it gets mapped to a public IP/port. # import logging import quart import aiohttp import asfquart _LOGGER = logging.getLogger(__name__) # Various header values. SNS_TYPE_HEADER = 'x-amz-sns-message-type' HDR_SNS_TYPE = 'x-amz-sns-message-type' HDR_SNS_TOPIC = 'x-amz-sns-topic-arn' SNS_TYPE_SUBSCRIBE = 'SubscriptionConfirmation' SNS_TYPE_NOTIFY = 'Notification' SNS_TYPE_UNSUBSCRIBE = 'UnsubscribeConfirmation' ### not needed? # Check UPnP every N minutes. UPNP_CHECK_DELAY = 5 * 60 class AlexaListener: def __init__(self, cfg): self.cfg = cfg # Wait until a loop is running. self.session = None # @APP.before_serving async def prepare(self): # This requires a loop, so we use before_serving() to create # the session once the loop is up and running. self.session = aiohttp.ClientSession() # @APP.route /alexa async def endpoint(self): ### TODO: additional defensive logic # Essentially, two types of request. It's in the headers. hdrs = quart.request.headers reqtype = hdrs.get(HDR_SNS_TYPE) if not reqtype: _LOGGER.error('Missing HDR_SNS_TYPE') return 'Bad headers', 400 # Don't provide why. # NOTE: don't parse the body as JSON unless the request looks # somewhat reasonable. if reqtype == SNS_TYPE_SUBSCRIBE: # SNS does not set the "proper" content-type, so force parsing # the body as JSON j = await quart.request.get_json(force=True) #print('BODY:', j) return await _confirm(j['SubscribeURL'], hdrs.get(HDR_SNS_TOPIC)) if reqtype == SNS_TYPE_NOTIFY: j = await quart.request.get_json(force=True) #print('BODY:', j) return await _notify(j['Message']) # Don't disclose what we're looking for. Just fail. _LOGGER.error(f'error. unknown reqtype "{reqtype}"') return 'Bad headers', 400 async def _confirm(self, url, topic): "Confirm to AWS SNS that we are a valid endpoint." _LOGGER.debug(f'Confirming: {url}') _ = await self.session.head(url) _LOGGER.info(f'Subscribed to: {topic}') return '', 204 async def _notify(self, msg): "Process a message from the AWS Lambda / Alexa handler." if msg.startswith('h'): # HomeAssistant light identifier follows. return await self._handle_hass(msg) # Assume v1. return await self._handle_v1(msg) async def _handle_v1(self, msg): "Process a v1 messsage of 'device:state'" # Message format is: "04x:1d" representing device ID and on/off. if len(msg) != 6 or msg[4] != ':' or not ('0'<=msg[5]<='1'): _LOGGER.error(f'Message has improper format: {msg}') # Note: don't tell caller how the message failed. return 'Bad request.', 400 try: device = int(msg[:4], 16) except ValueError: _LOGGER.error(f'Device code is improper: {msg[:4]}') # Note: don't tell caller how the message failed. return 'Bad request.', 400 ### wrong ID in the AWS/Alexa Lambda if device == 0x281e: device = 0x280e # Back Hall turn_on = (msg[5] == '1') for light in self.cfg.lights.values(): if light.get('i2c') == device: break else: _LOGGER.error(f'Unknown device code: {device:04x}') # Note: don't tell caller how the message failed. return 'Bad request.', 400 # Use Home Assistant to perform the toggle. v1 message, but v2 # to operate the light. operation = 'turn_on' if turn_on else 'turn_off' # third op: toggle return self._call_hass(light, operation) async def _handle_hass(self, msg): "Process a HomeAssisant messsage of 'h:entity/operation'" _LOGGER.debug(f'HAss MESSAGE: {msg}') return '', 204 async def _call_hass(self, light, operation): _LOGGER.info(f'Performing {operation.upper()} on "{light.label}"') url = f'{self.cfg.network.hass_api}/services/light/{operation}' headers = { 'Authorization': f'Bearer {self.cfg.network.hass}', } data = { 'entity_id': f'light.{light.hass}', # the entity ID } response = await self.session.post(url, headers=headers, json=data) print('RESPONSE:', await response.text()) return f'Performed "{operation}" on "{light.label}"' # @APP.route /test/ async def test_endpoint(self, msg): # Test endpoint where the message is in the URL, not the JSON body. return await self._notify(msg) async def maintain_upnp(): "Runner to ensure the IGD UPnP is configured, to point to this app." while True: try: await _check_upnp() except: _LOGGER.exception('Error during UPnP check') # Wait for a while. App shutdown will cancel this sleep. await asyncio.sleep(UPNP_CHECK_DELAY) async def _check_upnp(): _LOGGER.debug('TBD: check upnp config on IGD') def main(): logging.basicConfig(level=logging.DEBUG, style='{', format='[{asctime}|{levelname}|{module}] {message}', datefmt='%m/%d %H:%M', ) app = asfquart.construct('alexa') # Keep the IGD configured to point to this app. app.add_runner(maintain_upnp, name='UPnP Maintainer') # Build the singleton to manage incoming Alexa requests. global ALEXA ### maybe not needed? ALEXA = AlexaListener(app.cfg) # Now that we have an APP and ALEXA, add the route. app.post('/alexa')(ALEXA.endpoint) ### add a testing endpoint app.get('/test/')(ALEXA.test_endpoint) # The Alexa listener needs some preparation, once a loop exists. app.before_serving(ALEXA.prepare) ### fetch port from config app.runx(port=app.cfg.network.alexa_port) if __name__ == '__main__': main()