# # General utility functions/classes for my home services. # import asyncio import concurrent import influxdb def prepare_influxdb(host, username, password, dbname): idb = influxdb.InfluxDBClient(host, username=username, password=password) # If the database is not present, then create it. if not any(info.get('name') == dbname for info in idb.get_list_database()): idb.create_database(dbname) # Switch the client connection to our database. idb.switch_database(dbname) return idb def shutdown_mqtt(loop, client): "Gracefully close the CLIENT, running within LOOP." loop.run_until_complete(client.disconnect()) # The disconnection creates new tasks, so perform another run of # the loop to flush out those tasks. async def flush(): pass loop.run_until_complete(flush()) def graceful_close(loop, timeout=5.0): tasks = asyncio.Task.all_tasks(loop) for t in tasks: t.cancel() combined = asyncio.gather(*tasks, loop=loop) # Rather than running COMBINED, use wait() to enable a timeout. ### output something if a timeout occurs? loop.run_until_complete(asyncio.wait((combined,), loop=loop, timeout=timeout)) loop.close() def create_periodic(loop, period, func, *args, **kw): async def wrapper(): while True: try: ### catch some (other) exceptions? func(*args, **kw) await asyncio.sleep(period, loop=loop) except concurrent.futures.CancelledError: # Throw out the exception, and end the loop. return return loop.create_task(wrapper())