#!/usr/bin/python3.12 # # Central daemon for all the services on gstein's home server. # This is a Quart-based app to present web pages, and to run # numerous background tasks. # import sys import os.path import logging import asyncio import signal import functools import time import ezt import quart import yaml import asfquart.base from easydict import EasyDict as edict import aiohttp import util LOGGER = logging.getLogger(__name__) CONFIG_FNAME = 'config.yaml' CONFIG_PATH = os.path.join(util.THIS_DIR, CONFIG_FNAME) # Note: no decorators. Route will be added once an APP is constructed. async def startup(): # Loop is running. Create an aiohttp Session that all features can share. APP.session = aiohttp.ClientSession() # Note: no decorators. Route will be added once an APP is constructed. async def shutdown(): ### wontonly kill everything? Or let each task their own ### individual destruction? import time ; print('SHUTDOWN CB:', time.time()) print('BACKGROUND[APP]:', APP.background_tasks) print('BACKGROUND[APP_EXT]:', APP_EXT.background_tasks) await APP.session.close() APP.session = None # Note: no decorators. Route will be added once an APP is constructed. async def homepage(): links = [ edict(**link) for link in APP.cfg.links ] data = edict( message='message goes here', user='gstein', title='Home Page', leftnav=util.build_active_leftnav('home'), links=links, ) return data # Note: no decorators. Route will be added once an APP is constructed. async def favicon(): ### set ETag and cache values? return await quart.send_from_directory(util.STATIC_DIR, 'favicon.ico') # Note: no decorators. Route will be added once an APP is constructed. async def static_content(subdir, fname): dname = os.path.join(util.STATIC_DIR, subdir) ### set ETag and cache values. Especially for the bootstrap files ### to deal with upgrades. meh? versioned in the name. return await quart.send_from_directory(dname, fname) def main(): ### DEBUG for now logging.basicConfig(level=logging.DEBUG) # Order these properly, as last-constructed becomes asfquart.APP global APP, APP_EXT APP_EXT = asfquart.construct('central:external', oauth=False) APP = asfquart.construct('central', oauth=False) # modules['__main__'].APP is now available to sub-modules # When the event loop starts, we'll create a shared Session. # See startup() APP.session = None # asfquart loads "config.yaml" into APP.cfg and APP_EXT.cfg cfg = APP.cfg print('APP_DIR:', APP.app_dir) ### no subclass right now. change? # Hang some systems off the APP object. import workqueue APP.wq = workqueue.WorkQueue() # for long-term processing tasks APP.wq.launch_processors(APP) ### move this to an self-managing object APP.audit = [ ] def performed(msg): APP.audit.append((time.time(), msg)) ### TESTING #APP.audit.append((time.time()-86400, msg)) #APP.audit.append((time.time()-190000, msg)) APP.performed = performed # Add some various routes. ### note: if this group gets larger, then move to separate module that ### is imported after APP has been constructed. APP.route('/')(APP.use_template('templates/homepage.ezt')(homepage)) APP.route('/favicon.ico')(favicon) APP.route('//')(static_content) # Add some additional hooks APP.before_serving(startup) APP.after_serving(shutdown) #APP_EXT.before_serving(??) #APP_EXT.after_serving(??) # Now that asfquart.APP exists, load each of our submodules, to # provide all our functionality. def trim_setup_messages(record): # Don't report on new/added stuff. We're doing setup right now. if record.lineno in (37, 313): return False # do not log print('RECORD:', repr(record)) return True logging.getLogger('disk').addFilter(trim_setup_messages) # This module builds a view of the DVD directories on the server, then # watches for changes to update internal state. It will notify other # feature modules (eg. media) when changes occur. import disk disk.PLEX = disk.PlexLibraries(cfg.plex.root, cfg.plex.libraries.split()) APP.dvdroots = [ ] for idx, root in enumerate(cfg.dvdroots): ### for compat with 3.8 don't use := inside the func args dvdroot = disk.DVDRoot(root) APP.dvdroots.append(dvdroot) APP.add_runner(dvdroot.watch_dirs, name=f'DVDROOT:{idx}/{os.path.basename(root)}') # Setup is complete. Restore normal logging. logging.getLogger('disk').removeFilter(trim_setup_messages) # Make the Alexa listener conditional, so that production and # dev/test do not fight for the UPnP mapping. # NOTE: be very careful that this flag is a boolean, expressing True. # Fail towards NOT running the feature (and its use of UPnP). if (flag := cfg.alexa) and type(flag) is bool: LOGGER.info('Starting Alexa listener.') # Note: this will place route(s) on APP_EXT. import alexa # And, we add a runner to monitor the UPnP port to keep it open. APP_EXT.add_runner(alexa.keep_port_open) else: LOGGER.info('SKIPPING Alexa listener.') import media # media management (DVDs, Plex, etc) import admin # administrative interface # import udev # actions when a disc is inserted # import ripserver ### part of media? # import plexdirs ### part of media? # import bridge # move mosquitto content into InfluxDB import lighting # all the lighting devices in the house APP.lights = lighting.LightingSystem(cfg) host = cfg.hostname # NOTE: much of the code below is direct from quart/app.py:Quart.run() # This local "copy" is to deal with simultaneous operation of two # applications, with different routes/etc. loop = asyncio.new_event_loop() loop.set_debug(True) # Set the loop manually, so that any Futures are created in this loop. asyncio.set_event_loop(loop) def lifespan_handler(app, scope): #assert app == APP_EXT # expected print('LIFESPAN_HANDLER:', app, scope) original_handler = app.asgi_lifespan_class(app, scope) print('ORIGINAL:', original_handler) async def wrapper_handler(receive, send): print('INSIDE WRAPPER:', receive, send) async def wrap_receive(): event = await receive() print(f'WRAP{app}: EVENT:', event) util.dbg_print_all_tasks() return event async def wrap_send(msg): print(f'WRAP{app}: MESSAGE:', msg) return await send(msg) await original_handler(wrap_receive, wrap_send) return wrapper_handler def middleware(original_func): app = original_func.__self__ print('ORIGINAL ASGI_APP:', original_func, app) async def wrapper(scope, receive, send): if scope['type'] == 'lifespan': print('MW-WRAP:', scope, receive, send) asgi_handler = lifespan_handler(app, scope) print('ASGI:', asgi_handler) await asgi_handler(receive, send) return return await original_func(scope, receive, send) return wrapper print('ASGI_APP was:', APP.asgi_app) APP.asgi_app = middleware(APP.asgi_app) print('ASGI_APP now:', APP.asgi_app) # Now that everything has loaded, let's clean/check the Plex libs. async def bg_scrubbing(): disk.PLEX.scrub_libs(APP.dvdroots) loop.create_task(bg_scrubbing(), name='SCRUB') ### for testing .runx() if False: # Test asfquart's .runx(). No EXTRA_FILES needed. APP.runx(host=host, port=cfg.port, loop=loop) return # Get a constructor for an "awaitable" that triggers reload/shutdown. # Note: no EXTRA_FILES are needed. trigger = APP.factory_trigger(loop) # Trigger work to shutdown/close APP_EXT. ext_shutdown_event = asyncio.Event() async def modified_trigger(): try: await trigger() except Exception as e: LOGGER.debug(f'APP TRIGGERED: {repr(e)}') # Signal APP_EXT to gracefully exit, and wait for its completion. ext_shutdown_event.set() # Now wait for APP_EXT to complete its exit. await asyncio.wait((task1, task2), return_when=asyncio.FIRST_COMPLETED) # Propagate the error to the LOOP, and have APP exit. raise t1 = APP.run_task(host=host, port=cfg.port, debug=True, shutdown_trigger=modified_trigger, ) t2 = APP_EXT.run_task(host=host, port=cfg.port_ext, debug=True, shutdown_trigger=ext_shutdown_event.wait, ) task1 = loop.create_task(t1, name='APP') task2 = loop.create_task(t2, name='APP_EXT') # Two applications need to be run simultaneously. gather() returns # a future that waits until ALL tasks have completed. # # task1 (APP) will only complete by throwing an exception to stop, # or to restart the server (to pick up code/config changes). # # task2 (APP_EXT) will only complete when it is told to, using the # EXT_SHUTDOWN_EVENT signal. # # When the loop processing in APP catches the exception, then # task1 will exit gracefully, or it will exec() a new process. # Because the exec() does not exit, the gather() cannot ensure # task2 has completed. Thus, modified_trigger() explicitly waits # for task2 to finish. wait_for_apps = asyncio.gather(task1, task2) print(f' * Serving Quart apps "{APP.name}" and "{APP_EXT.name}"') print(f' * Debug mode: {APP.debug}') print(f' * Using reloader: ALTERNATE') print(f' * Running on http://{host}:{cfg.port}') print(f' * ... and on http://{host}:{cfg.port_ext}') print(f' * ... CTRL + C to quit') sys.stdout.flush() # Run this on the primary APP. Either works, as it does not use SELF. APP.run_forever(loop, wait_for_apps) def CUSTOM_PROTOCOL_MAYBE(): def protocol_factory(): ### streams.py:start_server protocol = None return protocol ### app:run_task() ### asyncio.__init__:serve() ### asyncio.run:worker_serve() ### asyncio.streams:start_server() ### asyncio.streams:StreamReaderProtocol.connection_made() ### asyncio.tcp_server:TCPServer.__init__() #t2 = loop.create_server(protocol_factory, hostname, port, ...) if __name__ == '__main__': main()