#!/usr/bin/python2 # # Listener for messages from Alexa Smart Home, via SNS # import sys import os.path import ConfigParser import BaseHTTPServer import urlparse import httplib import json import time import threading import traceback import miniupnpc CONFIG_FNAME = '.alexa.conf' SNS_TYPE_HEADER = 'x-amz-sns-message-type' SNS_TYPE_SUBSCRIBE = 'SubscriptionConfirmation' SNS_TYPE_NOTIFY = 'Notification' SNS_TYPE_UNSUBSCRIBE = 'UnsubscribeConfirmation' # from https://forums.aws.amazon.com/ann.jspa?annID=2347 SNS_SOURCES = [ # US-EAST-1 '207.171.167.101', '207.171.167.25', '207.171.167.26', '207.171.172.6', '54.239.98.0/24', '54.240.217.16/29', '54.240.217.8/29', '54.240.217.64/28', '54.240.217.80/29', '72.21.196.64/29', '72.21.198.64/29', '72.21.198.72', '72.21.217.0/24', ] # Don't let us hang. Make sure our network operations will time out. ### could move this to the config, but ... meh. TIMEOUT_SECS = 2 def run_listener(cfg, upnp): server = SNSListener(cfg, upnp) # Assume that a HUP or something will end this loop. server.serve_forever() class SNSListener(BaseHTTPServer.HTTPServer): def __init__(self, cfg, upnp): port = cfg.getint('listener', 'int_port') BaseHTTPServer.HTTPServer.__init__(self, ('', port), SNSHandler) self.path = cfg.get('listener', 'path') # URL path that we serve self.hub = cfg.get('listener', 'hub') # backend hub to control devices self.masks = _build_masks() self.worker = threading.Thread(target=self.worker_monitor_port, args=(cfg, upnp), name='monitor-upnp') self.worker.start() _log_message('Listening on port: %d', port) _SUPER_VR = BaseHTTPServer.SocketServer.BaseServer.verify_request def verify_request(self, request, client_addr): if not self._SUPER_VR(request, client_addr): return False ip = _ip_to_u32(client_addr[0]) for mask, net in self.masks: if ip & mask == net: # matched! break else: # none of the networks matched, so the loop finished. _log_message('Unknown source for SNS: %s', client_addr[0]) self.shutdown_request(request) return False return True def worker_monitor_port(self, cfg, upnp): 'Background worker to monitor the IGD port, to ensure it remains open.' ext_port = cfg.getint('listener', 'ext_port') interval = cfg.getint('listener', 'check_interval') while True: try: time.sleep(interval) # Returns info about the mapping, or None. info = upnp.getspecificportmapping(ext_port, 'TCP') if not info: _log_message('Reopening UPnP port.') ###open_port(cfg, upnp) ### Got an error on this before. Maybe it thought we already had ### a mapping, and didn't like a dup? Let's try discovery again. upnp.discover() upnp.selectigd() open_port(cfg, upnp) except: _log_message(traceback.format_exc(1)) # keep looping # similar to BaseHTTPRequestHandler.log_message() def _log_message(format, *args): sys.stderr.write("- - - [%s] %s\n" % (_log_date_time_string(), format % args)) def _build_masks(): masks = [ ] for cidr in SNS_SOURCES: if '/' in cidr: netstr, bits = cidr.split('/') mask = (0xffffffff << (32 - int(bits))) & 0xffffffff net = _ip_to_u32(netstr) & mask else: mask = 0xffffffff net = _ip_to_u32(cidr) masks.append((mask, net)) return masks def _ip_to_u32(ip): # convert dotted-quad to 8-digit hex repr, then to an integer return int(''.join('%02x' % int(d) for d in ip.split('.')), 16) # similar to BaseHTTPRequestHandler.log_date_time_string() def _log_date_time_string(): year, month, day, hh, mm, ss, _, _, _ = time.localtime() monthstr = BaseHTTPServer.BaseHTTPRequestHandler.monthname[month] return "%02d/%3s/%04d %02d:%02d:%02d" % (day, monthstr, year, hh, mm, ss) class SNSHandler(BaseHTTPServer.BaseHTTPRequestHandler): def do_POST(self): # We server a single path, at the moment. if self.path != self.server.path: self.send_error(404) return # We must read the whole body, in order to support keep-alive. CL = self.headers.get('Content-Length') if not CL: # This server can't deal with Chunked. Demand the C-L header. self.send_error(411) return ### maybe validate/trap the int() and provide proper HTTP response body = self.rfile.read(int(CL)) reqtype = self.headers.get(SNS_TYPE_HEADER) if reqtype == SNS_TYPE_UNSUBSCRIBE: # We don't care about this. Exit early (before parse/validate). self.log_message('Ignoring: %s' % (SNS_TYPE_UNSUBSCRIBE,)) self.send_response(204) return request = json.loads(body) ### validate the request if reqtype == SNS_TYPE_SUBSCRIBE: self.subscribe(request) elif reqtype == SNS_TYPE_NOTIFY: self.notify(request) elif reqtype is None: self.send_error(400, 'Missing SNS type header') else: self.send_error(400, 'Unknown SNS type: %s' % (reqtype,)) # The POST has been handled. kthxbai. return def subscribe(self, request): url = request['SubscribeURL'] confirm(url) self.log_message('Subscribed to: %s', request['TopicArn']) self.send_response(204) def notify(self, request): msg = request['Message'] if len(msg) != 6 or msg[4] != ':' or not ('0'<=msg[5]<='1'): self.send_error(400, 'Message has improper format: %s' % (msg,)) return try: device = int(msg[:4], 16) except ValueError: self.send_error(400, 'Device code is improper: %s' % (msg[:4])) return self.log_message('Notify: %s', msg) if msg[5] == '1': # TurnOn device |= 0x20 # the 'E' flag in pic/relay-board/main.asm send_operation(self.server.hub, device) self.send_response(204) def confirm(url): u = urlparse.urlparse(url) assert u.scheme == 'https' # AWS should be using this ### one day: add a cert chain context to this connection ### maybe? do we really care about hitting not-AWS? conn = httplib.HTTPSConnection(u.hostname, u.port, timeout=TIMEOUT_SECS) # Just touch the confirmation URL. conn.request('HEAD', '%s?%s' % (u.path, u.query)) _ = conn.getresponse() conn.close() def send_operation(hub, op): conn = httplib.HTTPConnection(hub, timeout=TIMEOUT_SECS) ### this path construction is specific to gstein's device hub conn.request('GET', '/i/%04x' % (op,)) response = conn.getresponse() _ = response.read() conn.close() def open_port(cfg, upnp): upnp.addportmapping(cfg.getint('listener', 'ext_port'), 'TCP', cfg.get('listener', 'int_ip'), cfg.getint('listener', 'int_port'), 'Accept SNS Messages', '') def main(): cfg = ConfigParser.SafeConfigParser() cfg.read(os.path.expanduser(os.path.join('~', CONFIG_FNAME))) u = miniupnpc.UPnP() u.discover() u.selectigd() open_port(cfg, u) try: run_listener(cfg, u) finally: # Always delete the port mapping on shutdown u.deleteportmapping(cfg.getint('listener', 'ext_port'), 'TCP') if __name__ == '__main__': main()