Publish new data to MQTT server.
authorPat Thoyts <patthoyts@users.sourceforge.net>
Sun, 17 Sep 2017 08:26:19 +0000 (09:26 +0100)
committerPat Thoyts <Patrick.Thoyts@renishaw.com>
Sun, 17 Sep 2017 14:22:27 +0000 (15:22 +0100)
Added new thread that pick object date from a queue and posts it
to the MQTT broker. Data format now compatible with the ESP8266
sensors.

monitor.py

index 6ed71eff46877afda024e1787061bf9f15610e41..d454fb758fd1b0a8afcd554e714df99441f6104e 100755 (executable)
@@ -3,19 +3,61 @@
 # curl --data "key=$KEY&field1=$Temp0&field2=$Pressure0&field3=$Lux" $URL
 
 from __future__ import print_function, division, absolute_import
-import sys, pymongo
-from twisted.internet import reactor, stdio
-from twisted.internet.serialport import SerialPort, PARITY_ODD, PARITY_NONE
-from twisted.protocols import basic
-from pymongo import MongoClient
+import sys
+import argparse
 from os import path
+from queue import Queue, Full, Empty
+from threading import Thread, Event
 from configparser import ConfigParser
+from copy import deepcopy
+from json import dumps
+from datetime import datetime
+from twisted.internet import reactor
+from twisted.protocols import basic
+from twisted.internet.serialport import SerialPort, PARITY_ODD, PARITY_NONE
+import pymongo
+from pymongo import MongoClient
+import paho.mqtt.client as mqtt
 
-__all__ = ['MonitorSensorHub','SensorProtocol']
-__version__ = '1.0.0'
+__all__ = ['MonitorSensorHub', 'SensorProtocol']
+__version__ = '1.1.0'
 __author__ = 'Pat Thoyts <patthoyts@users.sourceforge.net>'
 __copyright__ = 'Copyright (c) 2016 Pat Thoyts'
 
+class PublisherThread(Thread):
+    """
+    Handle publishing sensor data to MQTT server
+    """
+    def __init__(self, queue, topic, hostname='wirezilla', port=1883):
+        Thread.__init__(self, daemon=False)
+        self.queue = queue
+        self.hostname = hostname
+        self.port = port
+        self.topic = topic
+        self.stopped = Event()
+        self.recent = datetime.now() #datetime.fromtimestamp(0)
+    def stop(self):
+        """Request the thread to terminate"""
+        self.stopped.set()
+    def run(self):
+        client = mqtt.Client()
+        client.connect_async(self.hostname, port=self.port)
+        client.loop_start()
+        while not self.stopped.is_set():
+            try:
+                data = deepcopy(self.queue.get(block=True, timeout=1.0))
+                now = datetime.now()
+                delta = now - self.recent
+                if delta.seconds > 60: # rate limit to 1 per minute
+                    self.recent = now
+                    del data['timestamp']
+                    if '_id' in data:
+                        del data['_id']
+                    topic = "{0}/{1}".format(self.topic, data['name'])
+                    client.publish(topic, payload=dumps(data), qos=0, retain=True)
+            except Empty:
+                pass
+
 class MonitorSensorHub():
     """Class to handle database updates for new data items read from
     the sensor-hub over serial port.
@@ -30,7 +72,7 @@ class MonitorSensorHub():
         self.uri = config['database']['uri'].strip("\'")
         client = MongoClient(self.uri)
         self.db = client.sensorhub.sensorlog
-        self.recent = self.recent()
+        self._recent = self.recent()
 
     def opendb(self):
         """Open the database"""
@@ -50,10 +92,10 @@ class MonitorSensorHub():
         time has passed since the last update, as set by the granularity
         property."""
         t = int(item['timestamp'])
-        if t > (self.recent + self.granularity):
-            self.recent = t
+        if t > (self._recent + self.granularity):
+            self._recent = t
             result = self.db.insert_one(item)
-            print("{0} {1}".format(result.inserted_id, self.recent), flush=True)
+            #print("{0} {1}".format(result.inserted_id, self._recent), flush=True)
 
 class SensorProtocol(basic.LineReceiver):
     """Read input from the sensor-hub and parse each line into a
@@ -62,8 +104,13 @@ class SensorProtocol(basic.LineReceiver):
     are reported singly for the hub.
     """
     delimiter = b'\r\n' # default delimiter is crlf
-    def __init__(self, monitor):
+    def __init__(self, monitor, queue):
+        """
+        Initialize with a MonitorSensorHub instance forupdating the MongoDB database
+        and a queue for passing the JSON information to be published to the MQTT server.
+        """
         self.monitor = monitor
+        self.queue = queue
         self.log = open('/tmp/sensor-hub.log', 'a+')
         super().__init__()
 
@@ -72,57 +119,74 @@ class SensorProtocol(basic.LineReceiver):
             line = data.decode('ascii').strip()
             if len(line) > 0:
                 print(line, file=self.log, flush=True)
-                if not line[0] == '#':
+                if line[0] != '#':
                     item = self.parse(line)
+                    self.queue.put(item, False)
                     self.monitor.update(item)
+        except Full:
+            # ignore the exception if the queue is full.
+            pass
         except Exception as ex:
             print("error: " + repr(ex), file=sys.stderr)
 
     def parse(self, data):
-        item = dict(name='spd-office')
+        """
+        Parse the line based data from the Arduino sensor-hub device and convert it
+        into a JSON packet compatible with the ESP8266 based devices.
+        """
+        item = dict(name='sensor-hub')
         try:
-            if not data[0] == '#':
+            if data[0] != '#':
                 sensors = []
-                humidity = 0.0
-                pressure = 0.0
                 parts = [x.rstrip(" ]\r\n") for x in data.split('[')][1:]
                 for part in parts:
                     values = part.split()
                     sensor = {'id': 'office' + str(values[0])}
-                    #if len(values) > 1: # timestamp
-                    #    #sensor['timestamp'] = values[1]
                     if len(values) > 2:                             # temperature
-                        sensor['value'] = float(values[2])
+                        sensor['temp'] = float(values[2])
                     if len(values) > 3 and float(values[3]) != 0.0: # humidity
-                        item['humidity'] = float(values[3])
+                        sensor['humidity'] = float(values[3])
                     if len(values) > 4 and float(values[4]) != 0.0: # pressure
-                        item['pressure'] = float(values[4])
+                        sensor['pressure'] = float(values[4])
                     sensors.append(sensor)
                 item['sensors'] = sensors
                 item['timestamp'] = int(data.split()[0])
+                item['version'] = 4
         except IndexError:
             pass
         return item
 
-def main(argv = None):
-    """Monitor the serial port for lines from the sensor-hub and update
+def main(argv=None):
+    """
+    Monitor the serial port for lines from the sensor-hub and update
     the mongodb database and notify thingspeak at reduced time-rates.
     """
-    port,baud = r'/dev/ttyUSB0',57600
-    if argv is None:
-        argv = sys.argv
-    if len(argv) > 1:
-        port = argv[1]
-    if len(argv) > 2:
-        baud = argv[2]
+    parser = argparse.ArgumentParser(description="Read data from the sensor-hub Arduino device and publish")
+    parser.add_argument('port', help="serial port device to read sensor-hub data")
+    parser.add_argument('baud', type=int, default=57600,
+                        help="serial port baud rate (device uses 57600)")
+    parser.add_argument('--mqtt_hostname', dest='mqtt_hostname', default='wirezilla',
+                        help='MQTT broker host name')
+    parser.add_argument('--mqtt_port', dest='mqtt_port', type=int, default=1883,
+                        help='MQTT broker port number')
+    parser.add_argument('--mqtt_topic', dest='mqtt_topic', default='spd/sensors',
+                        help='topic prefix for MQTT publishing')
+    options = parser.parse_args(argv)
+
+    queue = Queue()
     monitor = MonitorSensorHub()
-    serial = SerialPort(SensorProtocol(monitor), port, reactor, baudrate=baud)
+    serial = SerialPort(SensorProtocol(monitor, queue), options.port, reactor, baudrate=options.baud)
     serial._serial.parity = PARITY_ODD # work around pyserial issue #30
     serial._serial.parity = PARITY_NONE
+    publisher = PublisherThread(queue, options.mqtt_topic, hostname=options.mqtt_hostname, port=options.mqtt_port)
     try:
+        publisher.start()
         reactor.run()
     except KeyboardInterrupt:
         reactor.stop()
+
+    publisher.stop()
+    publisher.join()
     return 0
 
 if __name__ == '__main__':