Sensor archiver: monitor mqtt topic and record events. master
authorPat Thoyts <Patrick.Thoyts@renishaw.com>
Fri, 19 Jan 2018 14:07:04 +0000 (14:07 +0000)
committerPat Thoyts <Patrick.Thoyts@renishaw.com>
Fri, 19 Jan 2018 14:07:04 +0000 (14:07 +0000)
This monitors a topic (spd/sensors/# by default) and records events into
a MongoDB database for use in later data mining.
Added an init script to enable automatic starting on boot.

sensor-archiver.conf [new file with mode: 0644]
sensor-archiver.py [new file with mode: 0644]

diff --git a/sensor-archiver.conf b/sensor-archiver.conf
new file mode 100644 (file)
index 0000000..b5ee9ce
--- /dev/null
@@ -0,0 +1,11 @@
+# sensor-archiver - office temperature aggregation and archiving to mongo db
+description    "sensor-hub archiver"
+
+start on runlevel [2345]
+stop on runlevel [!2345]
+
+respawn
+
+script
+  exec su pat -c "/usr/bin/python3 /opt/django/wsgi-scripts/sensor-hub/sensor-archiver.py >> /var/log/sensor-archiver.log 2>&1"
+end script
diff --git a/sensor-archiver.py b/sensor-archiver.py
new file mode 100644 (file)
index 0000000..5b67f6f
--- /dev/null
@@ -0,0 +1,184 @@
+#!/usr/bin/env python3
+
+from __future__ import print_function, absolute_import, division
+import sys
+import argparse
+from configparser import ConfigParser
+from datetime import datetime
+from json import loads, dumps
+from os import path
+from queue import Queue, Full, Empty
+from threading import Thread, Event
+from paho.mqtt import client as mqtt
+from paho.mqtt import publish
+import pymongo
+from pymongo import MongoClient
+
+class Archiver(Thread):
+    """Class to handle database updates for new data items.
+    Granularity sets the time in seconds between database updates. This
+    reduces the datarate as the sensor-hub issues an update every second.
+    """
+    def __init__(self, queue, granularity=60):
+        Thread.__init__(self, group=None, daemon=False)
+        self.stopped = Event()
+        self.queue = queue
+        self.granularity = granularity
+        config = ConfigParser()
+        filename = path.join(path.dirname(path.realpath(__file__)), 'sensor-hub.config')
+        config.read(filename)
+        self.uri = config['database']['uri'].strip("\'")
+        client = MongoClient(self.uri)
+        self.db = client.sensorhub.sensors
+        self.init_recent()
+
+    #def recent(self):
+    #    """Get the timestamp of the most recent item in the database."""
+    #    try:
+    #        cursor = self.db.find().sort([("timestamp", pymongo.DESCENDING)]).limit(1)
+    #        last = dict(next(cursor))
+    #    except StopIteration:
+    #        last = {'timestamp': 0}
+    #    r = int(last['timestamp'])
+    #    return r
+
+    def updateA(self, item):
+        """Update the database with a new data item only if sufficient
+        time has passed since the last update, as set by the granularity
+        property."""
+        now = datetime.now()
+        if "name" in item:
+            name = item['name']
+            do_update = True
+            if name in self._recent:
+                do_update = (now - self._recent[name]).seconds > self.granularity
+            if do_update:
+                self._recent[name] = now
+                print("UPDATE " + item)
+                #result = self.db.insert_one(item)
+                #print("{0} {1}".format(result.inserted_id, self._recent), flush=True)
+
+    def init_recent(self):
+        self.recent = dict(pressure=[], humidity=[], temp=[], names=[])
+
+    def update(self, item):
+        now = datetime.now()
+        if "name" in item:
+            name = item['name']
+            if name in self.recent['names']:
+                self.recent['timestamp'] = int(now.timestamp())
+                del self.recent['names']
+                print(dumps(self.recent), flush=True)
+                self.db.insert_one(self.recent)
+                self.init_recent()
+            for sensor in item['sensors']:
+                if 'pressure' in sensor:
+                    self.recent['pressure'].append((name, sensor['pressure']))
+                if 'humidity' in sensor:
+                    self.recent['humidity'].append((name, sensor['humidity']))
+                if 'temp' in sensor:
+                    self.recent['temp'].append((name + ':' + sensor['id'], sensor['temp']))
+            self.recent['names'].append(name)
+
+    def stop(self):
+        """Signal the thread to terminate politely."""
+        self.stopped.set()
+
+    def run(self):
+        while not self.stopped.is_set():
+            try:
+                item = self.queue.get(block=True, timeout=0.2)
+                self.update(item)
+            except Empty:
+                pass
+
+class Watcher():
+    def __init__(self, topic, queue, client_id=None, debug=False):
+        self.topic = topic
+        self.queue = queue
+        self.debug = debug
+        self.client = mqtt.Client(client_id, userdata=self)
+        self.client.on_connect = lambda c, u, f, r: self._on_connect(c, f, r)
+        self.client.on_disconnect = lambda c, u, r: self._on_disconnect(c, r)
+        self.client.on_message = lambda c, u, msg: self._on_message(c, msg)
+        self.client.on_publish = lambda c, u, mid: self._on_publish(c, mid)
+        self.client.on_subscribe = lambda c, u, mid, qos: self._on_subscribe(c, mid, qos)
+        self.client.on_unsubscribe = lambda c, u, mid: self._on_unsubscribe(c, mid)
+        self.client.on_log = lambda c, u, lvl, buf: self._on_log(c, lvl, buf)
+
+    def connect(self, host="localhost", port=1883):
+        """Connect to MQTT broker"""
+        self.client.connect_async(host, port)
+
+    def disconnect(self):
+        """Disconnect from MQTT broker"""
+        self.client.disconnect()
+
+    def _on_connect(self, client, flags, result_code):
+        if self.debug:
+            print("connected {0} [{1}]".format(flags, result_code))
+        client.subscribe(self.topic, 0)
+
+    def _on_disconnect(self, client, result_code):
+        if self.debug:
+            print("disconnected [{0}]".format(result_code))
+
+    def _on_message(self, client, msg):
+        try:
+            payload = msg.payload.decode("utf-8")
+            json = loads(payload)
+            self.queue.put(json, False)
+        except Full:
+            pass # drop message if queue is full.
+        except ValueError:
+            if self.debug:
+                print("ERR: {0} {1} {2}".format(msg.topic, msg.qos, payload))
+
+    def _on_publish(self, client, msgid):
+        if self.debug:
+            print("published message {0}".format(msgid))
+
+    def _on_subscribe(self, client, mid, granted_qos):
+        if self.debug:
+            print("subscribed {0} {1}".format(mid, granted_qos))
+
+    def _on_unsubscribe(self, client, mid):
+        if self.debug:
+            print("unsubscribed {0}".format(mid))
+
+    def _on_log(self, client, level, buffer):
+        if self.debug:
+            print("log {0}: {1}".format(level, buffer), file=sys.stderr)
+
+def main(argv=None):
+    """
+    Watch messages posted to our topic and record into a database at intervals.
+    """
+    parser = argparse.ArgumentParser(description="")
+    parser.add_argument('--hostname', dest='hostname', default='localhost',
+                        help='specify the MQTT broker hostname')
+    parser.add_argument('--port', dest='port', type=int, default=1883,
+                        help='specify the MQTT broker port')
+    parser.add_argument('--topic', dest='topic', default='spd/sensors/#',
+                        help='specify the MQTT topic to follow.')
+    parser.add_argument('--debug', dest='debug', action='store_true', default=False)
+    options = parser.parse_args(argv)
+
+    queue = Queue()
+    archiver = Archiver(queue)
+    watcher = Watcher(options.topic, queue, debug=options.debug)
+    archiver.start()
+    watcher.connect(host=options.hostname, port=options.port)
+    try:
+        watcher.client.loop_forever()
+    except KeyboardInterrupt:
+        pass
+    archiver.stop()
+    watcher.disconnect()
+    watcher.client.loop()
+    archiver.join()
+
+    return 0
+
+if __name__ == '__main__':
+    sys.exit(main(sys.argv[1:]))