+#!/usr/bin/env python3
+
+from __future__ import print_function, absolute_import, division
+import sys
+import argparse
+from configparser import ConfigParser
+from datetime import datetime
+from json import loads, dumps
+from os import path
+from queue import Queue, Full, Empty
+from threading import Thread, Event
+from paho.mqtt import client as mqtt
+from paho.mqtt import publish
+import pymongo
+from pymongo import MongoClient
+
+class Archiver(Thread):
+ """Class to handle database updates for new data items.
+ Granularity sets the time in seconds between database updates. This
+ reduces the datarate as the sensor-hub issues an update every second.
+ """
+ def __init__(self, queue, granularity=60):
+ Thread.__init__(self, group=None, daemon=False)
+ self.stopped = Event()
+ self.queue = queue
+ self.granularity = granularity
+ config = ConfigParser()
+ filename = path.join(path.dirname(path.realpath(__file__)), 'sensor-hub.config')
+ config.read(filename)
+ self.uri = config['database']['uri'].strip("\'")
+ client = MongoClient(self.uri)
+ self.db = client.sensorhub.sensors
+ self.init_recent()
+
+ #def recent(self):
+ # """Get the timestamp of the most recent item in the database."""
+ # try:
+ # cursor = self.db.find().sort([("timestamp", pymongo.DESCENDING)]).limit(1)
+ # last = dict(next(cursor))
+ # except StopIteration:
+ # last = {'timestamp': 0}
+ # r = int(last['timestamp'])
+ # return r
+
+ def updateA(self, item):
+ """Update the database with a new data item only if sufficient
+ time has passed since the last update, as set by the granularity
+ property."""
+ now = datetime.now()
+ if "name" in item:
+ name = item['name']
+ do_update = True
+ if name in self._recent:
+ do_update = (now - self._recent[name]).seconds > self.granularity
+ if do_update:
+ self._recent[name] = now
+ print("UPDATE " + item)
+ #result = self.db.insert_one(item)
+ #print("{0} {1}".format(result.inserted_id, self._recent), flush=True)
+
+ def init_recent(self):
+ self.recent = dict(pressure=[], humidity=[], temp=[], names=[])
+
+ def update(self, item):
+ now = datetime.now()
+ if "name" in item:
+ name = item['name']
+ if name in self.recent['names']:
+ self.recent['timestamp'] = int(now.timestamp())
+ del self.recent['names']
+ print(dumps(self.recent), flush=True)
+ self.db.insert_one(self.recent)
+ self.init_recent()
+ for sensor in item['sensors']:
+ if 'pressure' in sensor:
+ self.recent['pressure'].append((name, sensor['pressure']))
+ if 'humidity' in sensor:
+ self.recent['humidity'].append((name, sensor['humidity']))
+ if 'temp' in sensor:
+ self.recent['temp'].append((name + ':' + sensor['id'], sensor['temp']))
+ self.recent['names'].append(name)
+
+ def stop(self):
+ """Signal the thread to terminate politely."""
+ self.stopped.set()
+
+ def run(self):
+ while not self.stopped.is_set():
+ try:
+ item = self.queue.get(block=True, timeout=0.2)
+ self.update(item)
+ except Empty:
+ pass
+
+class Watcher():
+ def __init__(self, topic, queue, client_id=None, debug=False):
+ self.topic = topic
+ self.queue = queue
+ self.debug = debug
+ self.client = mqtt.Client(client_id, userdata=self)
+ self.client.on_connect = lambda c, u, f, r: self._on_connect(c, f, r)
+ self.client.on_disconnect = lambda c, u, r: self._on_disconnect(c, r)
+ self.client.on_message = lambda c, u, msg: self._on_message(c, msg)
+ self.client.on_publish = lambda c, u, mid: self._on_publish(c, mid)
+ self.client.on_subscribe = lambda c, u, mid, qos: self._on_subscribe(c, mid, qos)
+ self.client.on_unsubscribe = lambda c, u, mid: self._on_unsubscribe(c, mid)
+ self.client.on_log = lambda c, u, lvl, buf: self._on_log(c, lvl, buf)
+
+ def connect(self, host="localhost", port=1883):
+ """Connect to MQTT broker"""
+ self.client.connect_async(host, port)
+
+ def disconnect(self):
+ """Disconnect from MQTT broker"""
+ self.client.disconnect()
+
+ def _on_connect(self, client, flags, result_code):
+ if self.debug:
+ print("connected {0} [{1}]".format(flags, result_code))
+ client.subscribe(self.topic, 0)
+
+ def _on_disconnect(self, client, result_code):
+ if self.debug:
+ print("disconnected [{0}]".format(result_code))
+
+ def _on_message(self, client, msg):
+ try:
+ payload = msg.payload.decode("utf-8")
+ json = loads(payload)
+ self.queue.put(json, False)
+ except Full:
+ pass # drop message if queue is full.
+ except ValueError:
+ if self.debug:
+ print("ERR: {0} {1} {2}".format(msg.topic, msg.qos, payload))
+
+ def _on_publish(self, client, msgid):
+ if self.debug:
+ print("published message {0}".format(msgid))
+
+ def _on_subscribe(self, client, mid, granted_qos):
+ if self.debug:
+ print("subscribed {0} {1}".format(mid, granted_qos))
+
+ def _on_unsubscribe(self, client, mid):
+ if self.debug:
+ print("unsubscribed {0}".format(mid))
+
+ def _on_log(self, client, level, buffer):
+ if self.debug:
+ print("log {0}: {1}".format(level, buffer), file=sys.stderr)
+
+def main(argv=None):
+ """
+ Watch messages posted to our topic and record into a database at intervals.
+ """
+ parser = argparse.ArgumentParser(description="")
+ parser.add_argument('--hostname', dest='hostname', default='localhost',
+ help='specify the MQTT broker hostname')
+ parser.add_argument('--port', dest='port', type=int, default=1883,
+ help='specify the MQTT broker port')
+ parser.add_argument('--topic', dest='topic', default='spd/sensors/#',
+ help='specify the MQTT topic to follow.')
+ parser.add_argument('--debug', dest='debug', action='store_true', default=False)
+ options = parser.parse_args(argv)
+
+ queue = Queue()
+ archiver = Archiver(queue)
+ watcher = Watcher(options.topic, queue, debug=options.debug)
+ archiver.start()
+ watcher.connect(host=options.hostname, port=options.port)
+ try:
+ watcher.client.loop_forever()
+ except KeyboardInterrupt:
+ pass
+ archiver.stop()
+ watcher.disconnect()
+ watcher.client.loop()
+ archiver.join()
+
+ return 0
+
+if __name__ == '__main__':
+ sys.exit(main(sys.argv[1:]))