#!/usr/bin/python3 # # Listener to map MQTT channels into InfluxDB # # This MQTT client runs in the foreground, listening to the broker, # then inserting the values into the InfluxDB. Should a failure occur, # systemd will simply restart the script. # import os.path import asyncio import signal import configparser import importlib.util import gmqtt # Import our utilities. _pathname = os.path.join(os.path.dirname(__file__), '../lib/util.py') _spec = importlib.util.spec_from_file_location('util', _pathname) util = importlib.util.module_from_spec(_spec) _spec.loader.exec_module(util) CONFIG_FNAME = '/etc/homesvr/bridge.conf' SECTION_NAME = 'listener' def parse_aqua_data(idb, payload): # format is: TEMP,HUMIDITY,DEWPOINT,CHIPID,KEY t, h, d, _, _ = payload.decode('utf-8').split(',') #print('T:%s, H:%s, D:%s' % (t, h, d)) body = [{ ### NOTE: 'sample' is local config-specific 'measurement': 'sample', #'tags': { }, 'fields': { 'tempF': float(t), 'humidity': float(h), 'dewF': float(d), } }] result = idb.write_points(body) #print('WROTE; result=', result) ### NOTE: this is specific to gstein's sensor data. ### TODO?? somehow make this stuff reasonably configurable PARSERS = { # TOPIC: PARSER_FUNC 'aqua/data': parse_aqua_data, } class Bridge(object): def __init__(self, idb): self.idb = idb def on_connect(self, client, flags, rc, properties): #print('Connected') subs = set() for k in PARSERS.keys(): s, _ = k.rsplit('/', maxsplit=1) subs.add(s + '/#') for s in subs: #print('Subscribing to:', s) client.subscribe(s, qos=0) def on_message(self, client, topic, payload, qos, properties): f = PARSERS.get(topic) if f: f(self.idb, payload) else: print('UNKNOWN:', topic, payload) def main(): cfg = configparser.ConfigParser() cfg.read(CONFIG_FNAME) sect = cfg[SECTION_NAME] idb = util.prepare_influxdb(sect['influx'], sect['i_user'], sect['i_pwd'], sect['database']) client = gmqtt.Client(sect['client_name']) b = Bridge(idb) client.on_connect = b.on_connect client.on_message = b.on_message loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, loop.stop) loop.add_signal_handler(signal.SIGTERM, loop.stop) ### we know mosquitto is just v3.1.1 loop.create_task(client.connect(sect['mqtt'], version=gmqtt.mqtt.constants.MQTTv311)) loop.run_forever() # Graceful close. util.shutdown_mqtt(loop, client) loop.close() if __name__ == '__main__': main()