Updated the monitor to read the Arduino device and publish to MQTT
authorPat Thoyts <Patrick.Thoyts@renishaw.com>
Fri, 19 Jan 2018 14:01:29 +0000 (14:01 +0000)
committerPat Thoyts <Patrick.Thoyts@renishaw.com>
Fri, 19 Jan 2018 14:01:29 +0000 (14:01 +0000)
monitor.py
sensor-hub.conf

index d454fb758fd1b0a8afcd554e714df99441f6104e..98c5cd2fe719187f4f5b67ddf094a4382c290936 100755 (executable)
@@ -1,54 +1,56 @@
-#!/usr/bin/python3
-#
-# curl --data "key=$KEY&field1=$Temp0&field2=$Pressure0&field3=$Lux" $URL
+#!/usr/bin/env python3
+
+"""
+Read from the sensor-hub Arduino device and publish the sensor data to our MQTT
+broker. Sensor data comes out as line data with sections for each sensor package
+that gets posted over the NRF24 radio link. The result is a JSON object that is
+posted to the MQTT spd/sensors/sensor-hub topic.
+"""
 
 from __future__ import print_function, division, absolute_import
 import sys
 import argparse
-from os import path
 from queue import Queue, Full, Empty
 from threading import Thread, Event
-from configparser import ConfigParser
-from copy import deepcopy
 from json import dumps
 from datetime import datetime
 from twisted.internet import reactor
 from twisted.protocols import basic
 from twisted.internet.serialport import SerialPort, PARITY_ODD, PARITY_NONE
-import pymongo
-from pymongo import MongoClient
-import paho.mqtt.client as mqtt
+from paho.mqtt import client as mqtt
 
-__all__ = ['MonitorSensorHub', 'SensorProtocol']
+__all__ = ['SensorProtocol', 'PublisherThread']
 __version__ = '1.1.0'
 __author__ = 'Pat Thoyts <patthoyts@users.sourceforge.net>'
-__copyright__ = 'Copyright (c) 2016 Pat Thoyts'
+__copyright__ = 'Copyright (c) 2016-2017 Pat Thoyts'
 
 class PublisherThread(Thread):
     """
     Handle publishing sensor data to MQTT server
     """
-    def __init__(self, queue, topic, hostname='wirezilla', port=1883):
+    def __init__(self, queue, topic, interval=60, hostname='wirezilla', port=1883):
         Thread.__init__(self, daemon=False)
         self.queue = queue
         self.hostname = hostname
         self.port = port
         self.topic = topic
+        self.interval = interval # interval between published messages (seconds)
         self.stopped = Event()
-        self.recent = datetime.now() #datetime.fromtimestamp(0)
+        self.recent = datetime.now()
     def stop(self):
         """Request the thread to terminate"""
         self.stopped.set()
     def run(self):
+        """Run the thread while looking for a signal to terminate politely."""
         client = mqtt.Client()
         client.connect_async(self.hostname, port=self.port)
         client.loop_start()
         while not self.stopped.is_set():
             try:
-                data = deepcopy(self.queue.get(block=True, timeout=1.0))
+                data = self.queue.get(block=True, timeout=1.0)
                 now = datetime.now()
                 delta = now - self.recent
-                if delta.seconds > 60: # rate limit to 1 per minute
+                if delta.seconds > self.interval: # rate limit
                     self.recent = now
                     del data['timestamp']
                     if '_id' in data:
@@ -58,45 +60,6 @@ class PublisherThread(Thread):
             except Empty:
                 pass
 
-class MonitorSensorHub():
-    """Class to handle database updates for new data items read from
-    the sensor-hub over serial port.
-    Granularity sets the time in seconds between database updates. This
-    reduces the datarate as the sensor-hub issues an update every second.
-    """
-    def __init__(self, granularity = 60):
-        self.granularity = granularity
-        config = ConfigParser()
-        filename = path.join(path.dirname(path.realpath(__file__)), 'sensor-hub.config')
-        config.read(filename)
-        self.uri = config['database']['uri'].strip("\'")
-        client = MongoClient(self.uri)
-        self.db = client.sensorhub.sensorlog
-        self._recent = self.recent()
-
-    def opendb(self):
-        """Open the database"""
-
-    def recent(self):
-        """Get the timestamp of the most recent item in the database."""
-        try:
-            cursor = self.db.find().sort([("timestamp", pymongo.DESCENDING)]).limit(1)
-            last = dict(next(cursor))
-        except StopIteration:
-            last = {'timestamp': 0}
-        r = int(last['timestamp'])
-        return r
-
-    def update(self, item):
-        """Update the database with a new data item only if sufficient
-        time has passed since the last update, as set by the granularity
-        property."""
-        t = int(item['timestamp'])
-        if t > (self._recent + self.granularity):
-            self._recent = t
-            result = self.db.insert_one(item)
-            #print("{0} {1}".format(result.inserted_id, self._recent), flush=True)
-
 class SensorProtocol(basic.LineReceiver):
     """Read input from the sensor-hub and parse each line into a
     dictionary of values.
@@ -104,12 +67,7 @@ class SensorProtocol(basic.LineReceiver):
     are reported singly for the hub.
     """
     delimiter = b'\r\n' # default delimiter is crlf
-    def __init__(self, monitor, queue):
-        """
-        Initialize with a MonitorSensorHub instance forupdating the MongoDB database
-        and a queue for passing the JSON information to be published to the MQTT server.
-        """
-        self.monitor = monitor
+    def __init__(self, queue):
         self.queue = queue
         self.log = open('/tmp/sensor-hub.log', 'a+')
         super().__init__()
@@ -117,12 +75,11 @@ class SensorProtocol(basic.LineReceiver):
     def lineReceived(self, data):
         try:
             line = data.decode('ascii').strip()
-            if len(line) > 0:
+            if line:
                 print(line, file=self.log, flush=True)
                 if line[0] != '#':
                     item = self.parse(line)
                     self.queue.put(item, False)
-                    self.monitor.update(item)
         except Full:
             # ignore the exception if the queue is full.
             pass
@@ -158,8 +115,7 @@ class SensorProtocol(basic.LineReceiver):
 
 def main(argv=None):
     """
-    Monitor the serial port for lines from the sensor-hub and update
-    the mongodb database and notify thingspeak at reduced time-rates.
+    Monitor the serial port for lines from the sensor-hub and publish to MQTT.
     """
     parser = argparse.ArgumentParser(description="Read data from the sensor-hub Arduino device and publish")
     parser.add_argument('port', help="serial port device to read sensor-hub data")
@@ -171,14 +127,16 @@ def main(argv=None):
                         help='MQTT broker port number')
     parser.add_argument('--mqtt_topic', dest='mqtt_topic', default='spd/sensors',
                         help='topic prefix for MQTT publishing')
+    parser.add_argument('--interval', dest='interval', type=int, default=60,
+                        help='interval in seconds between published messages.')
     options = parser.parse_args(argv)
 
     queue = Queue()
-    monitor = MonitorSensorHub()
-    serial = SerialPort(SensorProtocol(monitor, queue), options.port, reactor, baudrate=options.baud)
+    serial = SerialPort(SensorProtocol(queue), options.port, reactor, baudrate=options.baud)
     serial._serial.parity = PARITY_ODD # work around pyserial issue #30
     serial._serial.parity = PARITY_NONE
-    publisher = PublisherThread(queue, options.mqtt_topic, hostname=options.mqtt_hostname, port=options.mqtt_port)
+    publisher = PublisherThread(queue, options.mqtt_topic, interval=options.interval,
+                                hostname=options.mqtt_hostname, port=options.mqtt_port)
     try:
         publisher.start()
         reactor.run()
index 145dad49e1d71b82a734c1241ddd8ba4edb98544..4658b9610d75f96cef9fa54cfa9eeb1ecb9b4298 100644 (file)
@@ -8,6 +8,5 @@ respawn
 
 script
   [ ! -f /etc/default/sensor-hub ] || . /etc/default/sensor-hub
-  #exec su pat -c "/usr/bin/env http_proxy=http://localhost:3128 https_proxy=http://localhost:3128 /usr/bin/tclsh /home/pat/sensor-hub/monitor.tcl /dev/ttyUSB0 57600 > /tmp/sensor-hub.log"
-  exec su pat -c "/usr/bin/env http_proxy=http://localhost:3128 https_proxy=http://localhost:3128 /usr/bin/python3 /opt/django/wsgi-scripts/sensor-hub/monitor.py /dev/ttyUSB0 57600 >> /var/log/sensor-hub.log 2>&1"
+  exec su pat -c "/usr/bin/python3 /opt/django/wsgi-scripts/sensor-hub/monitor.py /dev/ttyUSB0 57600 >> /var/log/sensor-hub.log 2>&1"
 end script