#!/usr/bin/python3 # # An HTTP server to provide a "talk to Alexa" service. # # Inspired by several items: # https://github.com/sammachin/alexaweb # https://developers.google.com/web/fundamentals/media/recording-audio # import sys import os import os.path import configparser import http.client import http.server import ssl CONFIG_FNAME = '/etc/homesvr/talky.conf' SECTION_NAME = 'listener' THIS_DIR = os.path.dirname(__file__) STATIC_CONTENT = { # PATH: FILENAME '/': 'index.html', '/capture.js': 'capture.js', } class TalkyServer(http.server.HTTPServer): def __init__(self, cfg): port = cfg.getint(SECTION_NAME, 'port') super().__init__(('', port), TalkyHandler) #self.path = cfg.get('listener', 'path') # URL path that we serve #self.hub = cfg.get('listener', 'hub') # backend hub to control devices _log_message('Listening on port: %d', port) # similar to BaseHTTPRequestHandler.log_message() def _log_message(format, *args): "Log a message without an instance." sys.stderr.write("- - - [%s] %s\n" % (_log_date_time_string(), format % args)) def _log_date_time_string(): "Format a date/time string without an instance." return _BHRH.log_date_time_string(_BHRH) _BHRH = http.server.BaseHTTPRequestHandler class TalkyHandler(http.server.BaseHTTPRequestHandler): def do_POST(self): if self.path == '/talk': self.handle_talk() else: self.send_error(404) def do_GET(self): if self.path in STATIC_CONTENT: self.handle_static() else: # likely: '/favicon.ico' self.send_error(404) def handle_static(self): fname = STATIC_CONTENT[self.path] # Read the content on each request, to follow changes. content = open(os.path.join(THIS_DIR, fname), 'rb').read() self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', str(len(content))) self.send_header('Cache-Control', 'no-store') # helps with JS self.end_headers() self.wfile.write(content) def handle_talk(self): pass def main(): cfg = configparser.ConfigParser() cfg.read(CONFIG_FNAME) with TalkyServer(cfg) as server: # Interpose SSL processing. server.socket = ssl.wrap_socket( server.socket, keyfile=cfg.get(SECTION_NAME, 'keyfile'), certfile=cfg.get(SECTION_NAME, 'certfile'), server_side=True) # Assume that a HUP or something will end this loop. server.serve_forever() if __name__ == '__main__': main()