# curl --data "key=$KEY&field1=$Temp0&field2=$Pressure0&field3=$Lux" $URL
from __future__ import print_function, division, absolute_import
-import sys, pymongo
-from twisted.internet import reactor, stdio
-from twisted.internet.serialport import SerialPort, PARITY_ODD, PARITY_NONE
-from twisted.protocols import basic
-from pymongo import MongoClient
+import sys
+import argparse
from os import path
+from queue import Queue, Full, Empty
+from threading import Thread, Event
from configparser import ConfigParser
+from copy import deepcopy
+from json import dumps
+from datetime import datetime
+from twisted.internet import reactor
+from twisted.protocols import basic
+from twisted.internet.serialport import SerialPort, PARITY_ODD, PARITY_NONE
+import pymongo
+from pymongo import MongoClient
+import paho.mqtt.client as mqtt
-__all__ = ['MonitorSensorHub','SensorProtocol']
-__version__ = '1.0.0'
+__all__ = ['MonitorSensorHub', 'SensorProtocol']
+__version__ = '1.1.0'
__author__ = 'Pat Thoyts <patthoyts@users.sourceforge.net>'
__copyright__ = 'Copyright (c) 2016 Pat Thoyts'
+class PublisherThread(Thread):
+ """
+ Handle publishing sensor data to MQTT server
+ """
+ def __init__(self, queue, topic, hostname='wirezilla', port=1883):
+ Thread.__init__(self, daemon=False)
+ self.queue = queue
+ self.hostname = hostname
+ self.port = port
+ self.topic = topic
+ self.stopped = Event()
+ self.recent = datetime.now() #datetime.fromtimestamp(0)
+ def stop(self):
+ """Request the thread to terminate"""
+ self.stopped.set()
+ def run(self):
+ client = mqtt.Client()
+ client.connect_async(self.hostname, port=self.port)
+ client.loop_start()
+ while not self.stopped.is_set():
+ try:
+ data = deepcopy(self.queue.get(block=True, timeout=1.0))
+ now = datetime.now()
+ delta = now - self.recent
+ if delta.seconds > 60: # rate limit to 1 per minute
+ self.recent = now
+ del data['timestamp']
+ if '_id' in data:
+ del data['_id']
+ topic = "{0}/{1}".format(self.topic, data['name'])
+ client.publish(topic, payload=dumps(data), qos=0, retain=True)
+ except Empty:
+ pass
+
class MonitorSensorHub():
"""Class to handle database updates for new data items read from
the sensor-hub over serial port.
self.uri = config['database']['uri'].strip("\'")
client = MongoClient(self.uri)
self.db = client.sensorhub.sensorlog
- self.recent = self.recent()
+ self._recent = self.recent()
def opendb(self):
"""Open the database"""
time has passed since the last update, as set by the granularity
property."""
t = int(item['timestamp'])
- if t > (self.recent + self.granularity):
- self.recent = t
+ if t > (self._recent + self.granularity):
+ self._recent = t
result = self.db.insert_one(item)
- print("{0} {1}".format(result.inserted_id, self.recent), flush=True)
+ #print("{0} {1}".format(result.inserted_id, self._recent), flush=True)
class SensorProtocol(basic.LineReceiver):
"""Read input from the sensor-hub and parse each line into a
are reported singly for the hub.
"""
delimiter = b'\r\n' # default delimiter is crlf
- def __init__(self, monitor):
+ def __init__(self, monitor, queue):
+ """
+ Initialize with a MonitorSensorHub instance forupdating the MongoDB database
+ and a queue for passing the JSON information to be published to the MQTT server.
+ """
self.monitor = monitor
+ self.queue = queue
self.log = open('/tmp/sensor-hub.log', 'a+')
super().__init__()
line = data.decode('ascii').strip()
if len(line) > 0:
print(line, file=self.log, flush=True)
- if not line[0] == '#':
+ if line[0] != '#':
item = self.parse(line)
+ self.queue.put(item, False)
self.monitor.update(item)
+ except Full:
+ # ignore the exception if the queue is full.
+ pass
except Exception as ex:
print("error: " + repr(ex), file=sys.stderr)
def parse(self, data):
- item = dict(name='spd-office')
+ """
+ Parse the line based data from the Arduino sensor-hub device and convert it
+ into a JSON packet compatible with the ESP8266 based devices.
+ """
+ item = dict(name='sensor-hub')
try:
- if not data[0] == '#':
+ if data[0] != '#':
sensors = []
- humidity = 0.0
- pressure = 0.0
parts = [x.rstrip(" ]\r\n") for x in data.split('[')][1:]
for part in parts:
values = part.split()
sensor = {'id': 'office' + str(values[0])}
- #if len(values) > 1: # timestamp
- # #sensor['timestamp'] = values[1]
if len(values) > 2: # temperature
- sensor['value'] = float(values[2])
+ sensor['temp'] = float(values[2])
if len(values) > 3 and float(values[3]) != 0.0: # humidity
- item['humidity'] = float(values[3])
+ sensor['humidity'] = float(values[3])
if len(values) > 4 and float(values[4]) != 0.0: # pressure
- item['pressure'] = float(values[4])
+ sensor['pressure'] = float(values[4])
sensors.append(sensor)
item['sensors'] = sensors
item['timestamp'] = int(data.split()[0])
+ item['version'] = 4
except IndexError:
pass
return item
-def main(argv = None):
- """Monitor the serial port for lines from the sensor-hub and update
+def main(argv=None):
+ """
+ Monitor the serial port for lines from the sensor-hub and update
the mongodb database and notify thingspeak at reduced time-rates.
"""
- port,baud = r'/dev/ttyUSB0',57600
- if argv is None:
- argv = sys.argv
- if len(argv) > 1:
- port = argv[1]
- if len(argv) > 2:
- baud = argv[2]
+ parser = argparse.ArgumentParser(description="Read data from the sensor-hub Arduino device and publish")
+ parser.add_argument('port', help="serial port device to read sensor-hub data")
+ parser.add_argument('baud', type=int, default=57600,
+ help="serial port baud rate (device uses 57600)")
+ parser.add_argument('--mqtt_hostname', dest='mqtt_hostname', default='wirezilla',
+ help='MQTT broker host name')
+ parser.add_argument('--mqtt_port', dest='mqtt_port', type=int, default=1883,
+ help='MQTT broker port number')
+ parser.add_argument('--mqtt_topic', dest='mqtt_topic', default='spd/sensors',
+ help='topic prefix for MQTT publishing')
+ options = parser.parse_args(argv)
+
+ queue = Queue()
monitor = MonitorSensorHub()
- serial = SerialPort(SensorProtocol(monitor), port, reactor, baudrate=baud)
+ serial = SerialPort(SensorProtocol(monitor, queue), options.port, reactor, baudrate=options.baud)
serial._serial.parity = PARITY_ODD # work around pyserial issue #30
serial._serial.parity = PARITY_NONE
+ publisher = PublisherThread(queue, options.mqtt_topic, hostname=options.mqtt_hostname, port=options.mqtt_port)
try:
+ publisher.start()
reactor.run()
except KeyboardInterrupt:
reactor.stop()
+
+ publisher.stop()
+ publisher.join()
return 0
if __name__ == '__main__':