# # Work queue: copy, rip, ??, profit # import sys import asyncio import logging import time ### will this be needed? APP = sys.modules['__main__'].APP LOGGER = logging.getLogger(__name__) class WorkQueue: def __init__(self): self.cpu = _SerializedQueue() # cpu-bound tasks self.io = _SerializedQueue() # io-bound tasks def status_for(self, work_key): "Return queue status for any work items with WORK_KEY." messages = [ ] # Are there currently-running work items for WORK_KEY? if (work := self.cpu.current) and work.key == work_key: messages.append(f'running: {work}') if (work := self.io.current) and work.key == work_key: messages.append(f'running: {work}') # Any work items queued for WORK_KEY? messages.extend(f'waiting to run: {w}' for w in (self.cpu + self.io) if w.key == work_key) return messages async def add_cpu_task(self, task): await self.cpu.queue.put(task) async def add_io_task(self, task): await self.io.queue.put(task) def launch_processors(self, app): app.add_runner(self.cpu.process, name='CPU task processor') app.add_runner(self.io.process, name='I/O task processor') class _SerializedQueue: "Each work item will be executed in sequence." def __init__(self): self.queue = asyncio.Queue() self.current = None async def process(self): "Run each queue event serially." # This process is a "runner" and will be cancelled at shutdown # time, so we don't need an event to stop this loop. while True: self.current = work = await self.queue.get() try: await work.run() finally: self.current = None self.queue.task_done() class WorkItem: "Run AW for underlying KEY, and calling this item NAME." def __init__(self, key, aw, name): self.key = key self.aw = aw self.name = name def __str__(self): return f'"{self.name}" for key={self.key}' async def run(self): #print('AUDIT:', APP.audit) duration = Duration() LOGGER.info(f'Running work item: {self}') try: await self.aw except Exception as e: LOGGER.exception(f'Exception occurred during: {self}') APP.performed(f'Exception occurred during: {self}; duration: {duration}') ### do something better. log. audit report. etc. raise LOGGER.info(f'.. Completed work item: {self} in {duration}') APP.performed(f'Completed: {self}; duration: {duration}') class Duration: """Hold a start time, and stringify to a duration in human-useful format.""" def __init__(self): self.t0 = time.time() def __float__(self): return time.time() - self.t0 def __int__(self): return int(float(self)) def __str__(self): d = float(self) if d >= 120: return f'{d/60:.1f} minutes' return f'{d:.1f} seconds'