# # Module to accept commands from Alexa via SNS # # NOTES # https://repost.aws/knowledge-center/sns-ip-address-range # https://docs.aws.amazon.com/sns/latest/dg/sns-verify-signature-of-message.html # https://docs.aws.amazon.com/vpc/latest/userguide/aws-ip-ranges.html # https://repost.aws/knowledge-center/sns-verify-message-authenticity # import ipaddress import logging import os import json import sys import time import asyncio main = sys.modules['__main__'] import quart import async_upnp_client as auc import async_upnp_client.profiles.igd import async_upnp_client.aiohttp import async_upnp_client.client_factory import async_upnp_client.exceptions import aiohttp LOGGER = logging.getLogger(__name__) THIS_DIR = os.path.dirname(os.path.realpath(__file__)) SNS_TYPE_HEADER = 'x-amz-sns-message-type' HDR_SNS_TYPE = 'x-amz-sns-message-type' HDR_SNS_TOPIC = 'x-amz-sns-topic-arn' SNS_TYPE_SUBSCRIBE = 'SubscriptionConfirmation' SNS_TYPE_NOTIFY = 'Notification' SNS_TYPE_UNSUBSCRIBE = 'UnsubscribeConfirmation' IP_RANGES = 'https://ip-ranges.amazonaws.com/ip-ranges.json' CACHE_IP_RANGES = 'ip-ranges.json' # from https://forums.aws.amazon.com/ann.jspa?annID=2347 SNS_SOURCES = { # US-EAST-1 '207.171.167.101', '207.171.167.25', '207.171.167.26', '207.171.172.6', '54.239.98.0/24', '54.240.217.16/29', '54.240.217.8/29', '54.240.217.64/28', '54.240.217.80/29', '72.21.196.64/29', '72.21.198.64/29', '72.21.198.72', '72.21.217.0/24', } @main.APP_EXT.post('/alexa') async def alexa(): hdrs = quart.request.headers reqtype = hdrs.get(HDR_SNS_TYPE) LOGGER.debug(f'REQTYPE: {reqtype}') #print('HEADERS:', hdrs) if reqtype == SNS_TYPE_SUBSCRIBE: # SNS does not set the "proper" content-type, so force parsing # the body as JSON j = await quart.request.get_json(force=True) #print('BODY:', j) await _confirm(j['SubscribeURL']) LOGGER.info(f'Subscribed to: {hdrs.get(HDR_SNS_TOPIC)}') return '', 204 if reqtype == SNS_TYPE_NOTIFY: j = await quart.request.get_json(force=True) #print('BODY:', j) return await _notify(j['Message']) return f'error. unknown reqtype "{reqtype}"', 400 async def _confirm(url): LOGGER.debug(f'Confirming: {url}') async with aiohttp.ClientSession() as session: _ = await session.head(url) async def _notify(msg): # format: xxxx:d (x is a hex digit address; d is on/off) if len(msg) != 6 or msg[4] != ':' or not ('0'<=msg[5]<='1'): return f'Message has improper format: {msg}', 400 try: device = int(msg[:4], 16) except ValueError: return f'Device code is improper: {msg[:4]}', 400 turn_on = msg[5] == '1' LOGGER.info(f'Turning {"on" if turn_on else "off"} {msg[:4]}') if turn_on: device |= 0x20 # the 'E' flag in pic/relay-board/main.asm ### get URL base from config url = f'http://192.168.0.3/i/{device:04x}' LOGGER.debug(f'Device hub URL: {url}') async with aiohttp.ClientSession() as session: ### get target host from config ### add timeout and retries _ = await session.get(url) return '', 204 async def fetch_ranges(): ### add timeout #j = requests.get(IP_RANGES).json() j = json.load(open(os.path.join(THIS_DIR, CACHE_IP_RANGES))) ranges = { p['ip_prefix'] for p in j['prefixes'] if p['service'] == 'AMAZON' } networks = { ipaddress.IPv4Network(r) for r in ranges } LOGGER.debug(f'NETWORKS: {len(networks)}') async def open_port(): t0 = time.time() devs = await auc.profiles.igd.IgdDevice.async_search() LOGGER.debug(f'DURATION: {time.time() - t0:.1f}s DEVS: {len(devs)}') url = list(devs)[0]['location'] LOGGER.debug(f'IGD URL: {url}') requester = auc.aiohttp.AiohttpRequester() factory = auc.client_factory.UpnpFactory(requester) device = await factory.async_create_device(url) LOGGER.debug(f'DEVICE: {device}') ### get host from config host = '192.168.0.2' source = (host, 0) server = auc.aiohttp.AiohttpNotifyServer(requester, source=source) await server.async_start_server() LOGGER.info(f'Listening on: {server.callback_url}') gateway = auc.profiles.igd.IgdDevice(device, server.event_handler) LOGGER.info(f'GATEWAY: {gateway}') #info = await gateway.async_get_status_info() #LOGGER.info(f'STATUS: {info}') # Our internal port, which gets mapped externally # Note: port on WAN is same as port on LAN port_ext = main.APP.cfg['port_ext'] while True: try: entry = await gateway.async_get_specific_port_mapping_entry( None, port_ext, 'TCP') LOGGER.info(f'EXISTING-ENTRY: {entry}') except auc.exceptions.UpnpClientResponseError as e: LOGGER.info(f'ADD-MAPPING: {port_ext}') await gateway.async_add_port_mapping( None, port_ext, 'TCP', port_ext, ipaddress.IPv4Address(host), True, 'Alexa listener', None, ) await asyncio.sleep(5 * 60) # check the UPnP mapping every 5 minutes @main.APP_EXT.before_serving async def startup(): def fix_logger(name): logging.getLogger(name).setLevel(logging.INFO) fix_logger('async_upnp_client.traffic.ssdp') fix_logger('async_upnp_client.search') fix_logger('async_upnp_client.advertisement') fix_logger('async_upnp_client.ssdp_listener') fix_logger('async_upnp_client.traffic.upnp') ### probably need to cancel this when shutting down main.APP_EXT.add_background_task(run_forever) async def run_forever(): # Runs once and done. ### should we monitor for changes? await fetch_ranges() # This will block forever, ensuring the port remains open. try: await open_port() except Exception as e: LOGGER.debug(f'ALEXA-EXIT: {e}') raise