From 20f168c2f3f552c734c2d1f640076ae7e7adeed3 Mon Sep 17 00:00:00 2001 From: Pat Thoyts Date: Fri, 19 Jan 2018 14:01:29 +0000 Subject: [PATCH] Updated the monitor to read the Arduino device and publish to MQTT --- monitor.py | 92 ++++++++++++++----------------------------------- sensor-hub.conf | 3 +- 2 files changed, 26 insertions(+), 69 deletions(-) diff --git a/monitor.py b/monitor.py index d454fb7..98c5cd2 100755 --- a/monitor.py +++ b/monitor.py @@ -1,54 +1,56 @@ -#!/usr/bin/python3 -# -# curl --data "key=$KEY&field1=$Temp0&field2=$Pressure0&field3=$Lux" $URL +#!/usr/bin/env python3 + +""" +Read from the sensor-hub Arduino device and publish the sensor data to our MQTT +broker. Sensor data comes out as line data with sections for each sensor package +that gets posted over the NRF24 radio link. The result is a JSON object that is +posted to the MQTT spd/sensors/sensor-hub topic. +""" from __future__ import print_function, division, absolute_import import sys import argparse -from os import path from queue import Queue, Full, Empty from threading import Thread, Event -from configparser import ConfigParser -from copy import deepcopy from json import dumps from datetime import datetime from twisted.internet import reactor from twisted.protocols import basic from twisted.internet.serialport import SerialPort, PARITY_ODD, PARITY_NONE -import pymongo -from pymongo import MongoClient -import paho.mqtt.client as mqtt +from paho.mqtt import client as mqtt -__all__ = ['MonitorSensorHub', 'SensorProtocol'] +__all__ = ['SensorProtocol', 'PublisherThread'] __version__ = '1.1.0' __author__ = 'Pat Thoyts ' -__copyright__ = 'Copyright (c) 2016 Pat Thoyts' +__copyright__ = 'Copyright (c) 2016-2017 Pat Thoyts' class PublisherThread(Thread): """ Handle publishing sensor data to MQTT server """ - def __init__(self, queue, topic, hostname='wirezilla', port=1883): + def __init__(self, queue, topic, interval=60, hostname='wirezilla', port=1883): Thread.__init__(self, daemon=False) self.queue = queue self.hostname = hostname self.port = port self.topic = topic + self.interval = interval # interval between published messages (seconds) self.stopped = Event() - self.recent = datetime.now() #datetime.fromtimestamp(0) + self.recent = datetime.now() def stop(self): """Request the thread to terminate""" self.stopped.set() def run(self): + """Run the thread while looking for a signal to terminate politely.""" client = mqtt.Client() client.connect_async(self.hostname, port=self.port) client.loop_start() while not self.stopped.is_set(): try: - data = deepcopy(self.queue.get(block=True, timeout=1.0)) + data = self.queue.get(block=True, timeout=1.0) now = datetime.now() delta = now - self.recent - if delta.seconds > 60: # rate limit to 1 per minute + if delta.seconds > self.interval: # rate limit self.recent = now del data['timestamp'] if '_id' in data: @@ -58,45 +60,6 @@ class PublisherThread(Thread): except Empty: pass -class MonitorSensorHub(): - """Class to handle database updates for new data items read from - the sensor-hub over serial port. - Granularity sets the time in seconds between database updates. This - reduces the datarate as the sensor-hub issues an update every second. - """ - def __init__(self, granularity = 60): - self.granularity = granularity - config = ConfigParser() - filename = path.join(path.dirname(path.realpath(__file__)), 'sensor-hub.config') - config.read(filename) - self.uri = config['database']['uri'].strip("\'") - client = MongoClient(self.uri) - self.db = client.sensorhub.sensorlog - self._recent = self.recent() - - def opendb(self): - """Open the database""" - - def recent(self): - """Get the timestamp of the most recent item in the database.""" - try: - cursor = self.db.find().sort([("timestamp", pymongo.DESCENDING)]).limit(1) - last = dict(next(cursor)) - except StopIteration: - last = {'timestamp': 0} - r = int(last['timestamp']) - return r - - def update(self, item): - """Update the database with a new data item only if sufficient - time has passed since the last update, as set by the granularity - property.""" - t = int(item['timestamp']) - if t > (self._recent + self.granularity): - self._recent = t - result = self.db.insert_one(item) - #print("{0} {1}".format(result.inserted_id, self._recent), flush=True) - class SensorProtocol(basic.LineReceiver): """Read input from the sensor-hub and parse each line into a dictionary of values. @@ -104,12 +67,7 @@ class SensorProtocol(basic.LineReceiver): are reported singly for the hub. """ delimiter = b'\r\n' # default delimiter is crlf - def __init__(self, monitor, queue): - """ - Initialize with a MonitorSensorHub instance forupdating the MongoDB database - and a queue for passing the JSON information to be published to the MQTT server. - """ - self.monitor = monitor + def __init__(self, queue): self.queue = queue self.log = open('/tmp/sensor-hub.log', 'a+') super().__init__() @@ -117,12 +75,11 @@ class SensorProtocol(basic.LineReceiver): def lineReceived(self, data): try: line = data.decode('ascii').strip() - if len(line) > 0: + if line: print(line, file=self.log, flush=True) if line[0] != '#': item = self.parse(line) self.queue.put(item, False) - self.monitor.update(item) except Full: # ignore the exception if the queue is full. pass @@ -158,8 +115,7 @@ class SensorProtocol(basic.LineReceiver): def main(argv=None): """ - Monitor the serial port for lines from the sensor-hub and update - the mongodb database and notify thingspeak at reduced time-rates. + Monitor the serial port for lines from the sensor-hub and publish to MQTT. """ parser = argparse.ArgumentParser(description="Read data from the sensor-hub Arduino device and publish") parser.add_argument('port', help="serial port device to read sensor-hub data") @@ -171,14 +127,16 @@ def main(argv=None): help='MQTT broker port number') parser.add_argument('--mqtt_topic', dest='mqtt_topic', default='spd/sensors', help='topic prefix for MQTT publishing') + parser.add_argument('--interval', dest='interval', type=int, default=60, + help='interval in seconds between published messages.') options = parser.parse_args(argv) queue = Queue() - monitor = MonitorSensorHub() - serial = SerialPort(SensorProtocol(monitor, queue), options.port, reactor, baudrate=options.baud) + serial = SerialPort(SensorProtocol(queue), options.port, reactor, baudrate=options.baud) serial._serial.parity = PARITY_ODD # work around pyserial issue #30 serial._serial.parity = PARITY_NONE - publisher = PublisherThread(queue, options.mqtt_topic, hostname=options.mqtt_hostname, port=options.mqtt_port) + publisher = PublisherThread(queue, options.mqtt_topic, interval=options.interval, + hostname=options.mqtt_hostname, port=options.mqtt_port) try: publisher.start() reactor.run() diff --git a/sensor-hub.conf b/sensor-hub.conf index 145dad4..4658b96 100644 --- a/sensor-hub.conf +++ b/sensor-hub.conf @@ -8,6 +8,5 @@ respawn script [ ! -f /etc/default/sensor-hub ] || . /etc/default/sensor-hub - #exec su pat -c "/usr/bin/env http_proxy=http://localhost:3128 https_proxy=http://localhost:3128 /usr/bin/tclsh /home/pat/sensor-hub/monitor.tcl /dev/ttyUSB0 57600 > /tmp/sensor-hub.log" - exec su pat -c "/usr/bin/env http_proxy=http://localhost:3128 https_proxy=http://localhost:3128 /usr/bin/python3 /opt/django/wsgi-scripts/sensor-hub/monitor.py /dev/ttyUSB0 57600 >> /var/log/sensor-hub.log 2>&1" + exec su pat -c "/usr/bin/python3 /opt/django/wsgi-scripts/sensor-hub/monitor.py /dev/ttyUSB0 57600 >> /var/log/sensor-hub.log 2>&1" end script -- 2.23.0