From: Pat Thoyts Date: Sun, 17 Sep 2017 08:26:19 +0000 (+0100) Subject: Publish new data to MQTT server. X-Git-Url: https://privyetmir.co.uk/gitweb.cgi?a=commitdiff_plain;h=4f68de6b1d918d63f0a40558466113bce56bcdf0;p=spd%2Fsensor-hub.git Publish new data to MQTT server. Added new thread that pick object date from a queue and posts it to the MQTT broker. Data format now compatible with the ESP8266 sensors. --- diff --git a/monitor.py b/monitor.py index 6ed71ef..d454fb7 100755 --- a/monitor.py +++ b/monitor.py @@ -3,19 +3,61 @@ # curl --data "key=$KEY&field1=$Temp0&field2=$Pressure0&field3=$Lux" $URL from __future__ import print_function, division, absolute_import -import sys, pymongo -from twisted.internet import reactor, stdio -from twisted.internet.serialport import SerialPort, PARITY_ODD, PARITY_NONE -from twisted.protocols import basic -from pymongo import MongoClient +import sys +import argparse from os import path +from queue import Queue, Full, Empty +from threading import Thread, Event from configparser import ConfigParser +from copy import deepcopy +from json import dumps +from datetime import datetime +from twisted.internet import reactor +from twisted.protocols import basic +from twisted.internet.serialport import SerialPort, PARITY_ODD, PARITY_NONE +import pymongo +from pymongo import MongoClient +import paho.mqtt.client as mqtt -__all__ = ['MonitorSensorHub','SensorProtocol'] -__version__ = '1.0.0' +__all__ = ['MonitorSensorHub', 'SensorProtocol'] +__version__ = '1.1.0' __author__ = 'Pat Thoyts ' __copyright__ = 'Copyright (c) 2016 Pat Thoyts' +class PublisherThread(Thread): + """ + Handle publishing sensor data to MQTT server + """ + def __init__(self, queue, topic, hostname='wirezilla', port=1883): + Thread.__init__(self, daemon=False) + self.queue = queue + self.hostname = hostname + self.port = port + self.topic = topic + self.stopped = Event() + self.recent = datetime.now() #datetime.fromtimestamp(0) + def stop(self): + """Request the thread to terminate""" + self.stopped.set() + def run(self): + client = mqtt.Client() + client.connect_async(self.hostname, port=self.port) + client.loop_start() + while not self.stopped.is_set(): + try: + data = deepcopy(self.queue.get(block=True, timeout=1.0)) + now = datetime.now() + delta = now - self.recent + if delta.seconds > 60: # rate limit to 1 per minute + self.recent = now + del data['timestamp'] + if '_id' in data: + del data['_id'] + topic = "{0}/{1}".format(self.topic, data['name']) + client.publish(topic, payload=dumps(data), qos=0, retain=True) + except Empty: + pass + class MonitorSensorHub(): """Class to handle database updates for new data items read from the sensor-hub over serial port. @@ -30,7 +72,7 @@ class MonitorSensorHub(): self.uri = config['database']['uri'].strip("\'") client = MongoClient(self.uri) self.db = client.sensorhub.sensorlog - self.recent = self.recent() + self._recent = self.recent() def opendb(self): """Open the database""" @@ -50,10 +92,10 @@ class MonitorSensorHub(): time has passed since the last update, as set by the granularity property.""" t = int(item['timestamp']) - if t > (self.recent + self.granularity): - self.recent = t + if t > (self._recent + self.granularity): + self._recent = t result = self.db.insert_one(item) - print("{0} {1}".format(result.inserted_id, self.recent), flush=True) + #print("{0} {1}".format(result.inserted_id, self._recent), flush=True) class SensorProtocol(basic.LineReceiver): """Read input from the sensor-hub and parse each line into a @@ -62,8 +104,13 @@ class SensorProtocol(basic.LineReceiver): are reported singly for the hub. """ delimiter = b'\r\n' # default delimiter is crlf - def __init__(self, monitor): + def __init__(self, monitor, queue): + """ + Initialize with a MonitorSensorHub instance forupdating the MongoDB database + and a queue for passing the JSON information to be published to the MQTT server. + """ self.monitor = monitor + self.queue = queue self.log = open('/tmp/sensor-hub.log', 'a+') super().__init__() @@ -72,57 +119,74 @@ class SensorProtocol(basic.LineReceiver): line = data.decode('ascii').strip() if len(line) > 0: print(line, file=self.log, flush=True) - if not line[0] == '#': + if line[0] != '#': item = self.parse(line) + self.queue.put(item, False) self.monitor.update(item) + except Full: + # ignore the exception if the queue is full. + pass except Exception as ex: print("error: " + repr(ex), file=sys.stderr) def parse(self, data): - item = dict(name='spd-office') + """ + Parse the line based data from the Arduino sensor-hub device and convert it + into a JSON packet compatible with the ESP8266 based devices. + """ + item = dict(name='sensor-hub') try: - if not data[0] == '#': + if data[0] != '#': sensors = [] - humidity = 0.0 - pressure = 0.0 parts = [x.rstrip(" ]\r\n") for x in data.split('[')][1:] for part in parts: values = part.split() sensor = {'id': 'office' + str(values[0])} - #if len(values) > 1: # timestamp - # #sensor['timestamp'] = values[1] if len(values) > 2: # temperature - sensor['value'] = float(values[2]) + sensor['temp'] = float(values[2]) if len(values) > 3 and float(values[3]) != 0.0: # humidity - item['humidity'] = float(values[3]) + sensor['humidity'] = float(values[3]) if len(values) > 4 and float(values[4]) != 0.0: # pressure - item['pressure'] = float(values[4]) + sensor['pressure'] = float(values[4]) sensors.append(sensor) item['sensors'] = sensors item['timestamp'] = int(data.split()[0]) + item['version'] = 4 except IndexError: pass return item -def main(argv = None): - """Monitor the serial port for lines from the sensor-hub and update +def main(argv=None): + """ + Monitor the serial port for lines from the sensor-hub and update the mongodb database and notify thingspeak at reduced time-rates. """ - port,baud = r'/dev/ttyUSB0',57600 - if argv is None: - argv = sys.argv - if len(argv) > 1: - port = argv[1] - if len(argv) > 2: - baud = argv[2] + parser = argparse.ArgumentParser(description="Read data from the sensor-hub Arduino device and publish") + parser.add_argument('port', help="serial port device to read sensor-hub data") + parser.add_argument('baud', type=int, default=57600, + help="serial port baud rate (device uses 57600)") + parser.add_argument('--mqtt_hostname', dest='mqtt_hostname', default='wirezilla', + help='MQTT broker host name') + parser.add_argument('--mqtt_port', dest='mqtt_port', type=int, default=1883, + help='MQTT broker port number') + parser.add_argument('--mqtt_topic', dest='mqtt_topic', default='spd/sensors', + help='topic prefix for MQTT publishing') + options = parser.parse_args(argv) + + queue = Queue() monitor = MonitorSensorHub() - serial = SerialPort(SensorProtocol(monitor), port, reactor, baudrate=baud) + serial = SerialPort(SensorProtocol(monitor, queue), options.port, reactor, baudrate=options.baud) serial._serial.parity = PARITY_ODD # work around pyserial issue #30 serial._serial.parity = PARITY_NONE + publisher = PublisherThread(queue, options.mqtt_topic, hostname=options.mqtt_hostname, port=options.mqtt_port) try: + publisher.start() reactor.run() except KeyboardInterrupt: reactor.stop() + + publisher.stop() + publisher.join() return 0 if __name__ == '__main__':