From: Pat Thoyts Date: Fri, 19 Jan 2018 14:07:04 +0000 (+0000) Subject: Sensor archiver: monitor mqtt topic and record events. X-Git-Url: https://privyetmir.co.uk/gitweb.cgi?a=commitdiff_plain;ds=inline;p=spd%2Fsensor-hub.git Sensor archiver: monitor mqtt topic and record events. This monitors a topic (spd/sensors/# by default) and records events into a MongoDB database for use in later data mining. Added an init script to enable automatic starting on boot. --- diff --git a/sensor-archiver.conf b/sensor-archiver.conf new file mode 100644 index 0000000..b5ee9ce --- /dev/null +++ b/sensor-archiver.conf @@ -0,0 +1,11 @@ +# sensor-archiver - office temperature aggregation and archiving to mongo db +description "sensor-hub archiver" + +start on runlevel [2345] +stop on runlevel [!2345] + +respawn + +script + exec su pat -c "/usr/bin/python3 /opt/django/wsgi-scripts/sensor-hub/sensor-archiver.py >> /var/log/sensor-archiver.log 2>&1" +end script diff --git a/sensor-archiver.py b/sensor-archiver.py new file mode 100644 index 0000000..5b67f6f --- /dev/null +++ b/sensor-archiver.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 + +from __future__ import print_function, absolute_import, division +import sys +import argparse +from configparser import ConfigParser +from datetime import datetime +from json import loads, dumps +from os import path +from queue import Queue, Full, Empty +from threading import Thread, Event +from paho.mqtt import client as mqtt +from paho.mqtt import publish +import pymongo +from pymongo import MongoClient + +class Archiver(Thread): + """Class to handle database updates for new data items. + Granularity sets the time in seconds between database updates. This + reduces the datarate as the sensor-hub issues an update every second. + """ + def __init__(self, queue, granularity=60): + Thread.__init__(self, group=None, daemon=False) + self.stopped = Event() + self.queue = queue + self.granularity = granularity + config = ConfigParser() + filename = path.join(path.dirname(path.realpath(__file__)), 'sensor-hub.config') + config.read(filename) + self.uri = config['database']['uri'].strip("\'") + client = MongoClient(self.uri) + self.db = client.sensorhub.sensors + self.init_recent() + + #def recent(self): + # """Get the timestamp of the most recent item in the database.""" + # try: + # cursor = self.db.find().sort([("timestamp", pymongo.DESCENDING)]).limit(1) + # last = dict(next(cursor)) + # except StopIteration: + # last = {'timestamp': 0} + # r = int(last['timestamp']) + # return r + + def updateA(self, item): + """Update the database with a new data item only if sufficient + time has passed since the last update, as set by the granularity + property.""" + now = datetime.now() + if "name" in item: + name = item['name'] + do_update = True + if name in self._recent: + do_update = (now - self._recent[name]).seconds > self.granularity + if do_update: + self._recent[name] = now + print("UPDATE " + item) + #result = self.db.insert_one(item) + #print("{0} {1}".format(result.inserted_id, self._recent), flush=True) + + def init_recent(self): + self.recent = dict(pressure=[], humidity=[], temp=[], names=[]) + + def update(self, item): + now = datetime.now() + if "name" in item: + name = item['name'] + if name in self.recent['names']: + self.recent['timestamp'] = int(now.timestamp()) + del self.recent['names'] + print(dumps(self.recent), flush=True) + self.db.insert_one(self.recent) + self.init_recent() + for sensor in item['sensors']: + if 'pressure' in sensor: + self.recent['pressure'].append((name, sensor['pressure'])) + if 'humidity' in sensor: + self.recent['humidity'].append((name, sensor['humidity'])) + if 'temp' in sensor: + self.recent['temp'].append((name + ':' + sensor['id'], sensor['temp'])) + self.recent['names'].append(name) + + def stop(self): + """Signal the thread to terminate politely.""" + self.stopped.set() + + def run(self): + while not self.stopped.is_set(): + try: + item = self.queue.get(block=True, timeout=0.2) + self.update(item) + except Empty: + pass + +class Watcher(): + def __init__(self, topic, queue, client_id=None, debug=False): + self.topic = topic + self.queue = queue + self.debug = debug + self.client = mqtt.Client(client_id, userdata=self) + self.client.on_connect = lambda c, u, f, r: self._on_connect(c, f, r) + self.client.on_disconnect = lambda c, u, r: self._on_disconnect(c, r) + self.client.on_message = lambda c, u, msg: self._on_message(c, msg) + self.client.on_publish = lambda c, u, mid: self._on_publish(c, mid) + self.client.on_subscribe = lambda c, u, mid, qos: self._on_subscribe(c, mid, qos) + self.client.on_unsubscribe = lambda c, u, mid: self._on_unsubscribe(c, mid) + self.client.on_log = lambda c, u, lvl, buf: self._on_log(c, lvl, buf) + + def connect(self, host="localhost", port=1883): + """Connect to MQTT broker""" + self.client.connect_async(host, port) + + def disconnect(self): + """Disconnect from MQTT broker""" + self.client.disconnect() + + def _on_connect(self, client, flags, result_code): + if self.debug: + print("connected {0} [{1}]".format(flags, result_code)) + client.subscribe(self.topic, 0) + + def _on_disconnect(self, client, result_code): + if self.debug: + print("disconnected [{0}]".format(result_code)) + + def _on_message(self, client, msg): + try: + payload = msg.payload.decode("utf-8") + json = loads(payload) + self.queue.put(json, False) + except Full: + pass # drop message if queue is full. + except ValueError: + if self.debug: + print("ERR: {0} {1} {2}".format(msg.topic, msg.qos, payload)) + + def _on_publish(self, client, msgid): + if self.debug: + print("published message {0}".format(msgid)) + + def _on_subscribe(self, client, mid, granted_qos): + if self.debug: + print("subscribed {0} {1}".format(mid, granted_qos)) + + def _on_unsubscribe(self, client, mid): + if self.debug: + print("unsubscribed {0}".format(mid)) + + def _on_log(self, client, level, buffer): + if self.debug: + print("log {0}: {1}".format(level, buffer), file=sys.stderr) + +def main(argv=None): + """ + Watch messages posted to our topic and record into a database at intervals. + """ + parser = argparse.ArgumentParser(description="") + parser.add_argument('--hostname', dest='hostname', default='localhost', + help='specify the MQTT broker hostname') + parser.add_argument('--port', dest='port', type=int, default=1883, + help='specify the MQTT broker port') + parser.add_argument('--topic', dest='topic', default='spd/sensors/#', + help='specify the MQTT topic to follow.') + parser.add_argument('--debug', dest='debug', action='store_true', default=False) + options = parser.parse_args(argv) + + queue = Queue() + archiver = Archiver(queue) + watcher = Watcher(options.topic, queue, debug=options.debug) + archiver.start() + watcher.connect(host=options.hostname, port=options.port) + try: + watcher.client.loop_forever() + except KeyboardInterrupt: + pass + archiver.stop() + watcher.disconnect() + watcher.client.loop() + archiver.join() + + return 0 + +if __name__ == '__main__': + sys.exit(main(sys.argv[1:]))