-#!/usr/bin/python3
-#
-# curl --data "key=$KEY&field1=$Temp0&field2=$Pressure0&field3=$Lux" $URL
+#!/usr/bin/env python3
+
+"""
+Read from the sensor-hub Arduino device and publish the sensor data to our MQTT
+broker. Sensor data comes out as line data with sections for each sensor package
+that gets posted over the NRF24 radio link. The result is a JSON object that is
+posted to the MQTT spd/sensors/sensor-hub topic.
+"""
from __future__ import print_function, division, absolute_import
import sys
import argparse
-from os import path
from queue import Queue, Full, Empty
from threading import Thread, Event
-from configparser import ConfigParser
-from copy import deepcopy
from json import dumps
from datetime import datetime
from twisted.internet import reactor
from twisted.protocols import basic
from twisted.internet.serialport import SerialPort, PARITY_ODD, PARITY_NONE
-import pymongo
-from pymongo import MongoClient
-import paho.mqtt.client as mqtt
+from paho.mqtt import client as mqtt
-__all__ = ['MonitorSensorHub', 'SensorProtocol']
+__all__ = ['SensorProtocol', 'PublisherThread']
__version__ = '1.1.0'
__author__ = 'Pat Thoyts <patthoyts@users.sourceforge.net>'
-__copyright__ = 'Copyright (c) 2016 Pat Thoyts'
+__copyright__ = 'Copyright (c) 2016-2017 Pat Thoyts'
class PublisherThread(Thread):
"""
Handle publishing sensor data to MQTT server
"""
- def __init__(self, queue, topic, hostname='wirezilla', port=1883):
+ def __init__(self, queue, topic, interval=60, hostname='wirezilla', port=1883):
Thread.__init__(self, daemon=False)
self.queue = queue
self.hostname = hostname
self.port = port
self.topic = topic
+ self.interval = interval # interval between published messages (seconds)
self.stopped = Event()
- self.recent = datetime.now() #datetime.fromtimestamp(0)
+ self.recent = datetime.now()
def stop(self):
"""Request the thread to terminate"""
self.stopped.set()
def run(self):
+ """Run the thread while looking for a signal to terminate politely."""
client = mqtt.Client()
client.connect_async(self.hostname, port=self.port)
client.loop_start()
while not self.stopped.is_set():
try:
- data = deepcopy(self.queue.get(block=True, timeout=1.0))
+ data = self.queue.get(block=True, timeout=1.0)
now = datetime.now()
delta = now - self.recent
- if delta.seconds > 60: # rate limit to 1 per minute
+ if delta.seconds > self.interval: # rate limit
self.recent = now
del data['timestamp']
if '_id' in data:
except Empty:
pass
-class MonitorSensorHub():
- """Class to handle database updates for new data items read from
- the sensor-hub over serial port.
- Granularity sets the time in seconds between database updates. This
- reduces the datarate as the sensor-hub issues an update every second.
- """
- def __init__(self, granularity = 60):
- self.granularity = granularity
- config = ConfigParser()
- filename = path.join(path.dirname(path.realpath(__file__)), 'sensor-hub.config')
- config.read(filename)
- self.uri = config['database']['uri'].strip("\'")
- client = MongoClient(self.uri)
- self.db = client.sensorhub.sensorlog
- self._recent = self.recent()
-
- def opendb(self):
- """Open the database"""
-
- def recent(self):
- """Get the timestamp of the most recent item in the database."""
- try:
- cursor = self.db.find().sort([("timestamp", pymongo.DESCENDING)]).limit(1)
- last = dict(next(cursor))
- except StopIteration:
- last = {'timestamp': 0}
- r = int(last['timestamp'])
- return r
-
- def update(self, item):
- """Update the database with a new data item only if sufficient
- time has passed since the last update, as set by the granularity
- property."""
- t = int(item['timestamp'])
- if t > (self._recent + self.granularity):
- self._recent = t
- result = self.db.insert_one(item)
- #print("{0} {1}".format(result.inserted_id, self._recent), flush=True)
-
class SensorProtocol(basic.LineReceiver):
"""Read input from the sensor-hub and parse each line into a
dictionary of values.
are reported singly for the hub.
"""
delimiter = b'\r\n' # default delimiter is crlf
- def __init__(self, monitor, queue):
- """
- Initialize with a MonitorSensorHub instance forupdating the MongoDB database
- and a queue for passing the JSON information to be published to the MQTT server.
- """
- self.monitor = monitor
+ def __init__(self, queue):
self.queue = queue
self.log = open('/tmp/sensor-hub.log', 'a+')
super().__init__()
def lineReceived(self, data):
try:
line = data.decode('ascii').strip()
- if len(line) > 0:
+ if line:
print(line, file=self.log, flush=True)
if line[0] != '#':
item = self.parse(line)
self.queue.put(item, False)
- self.monitor.update(item)
except Full:
# ignore the exception if the queue is full.
pass
def main(argv=None):
"""
- Monitor the serial port for lines from the sensor-hub and update
- the mongodb database and notify thingspeak at reduced time-rates.
+ Monitor the serial port for lines from the sensor-hub and publish to MQTT.
"""
parser = argparse.ArgumentParser(description="Read data from the sensor-hub Arduino device and publish")
parser.add_argument('port', help="serial port device to read sensor-hub data")
help='MQTT broker port number')
parser.add_argument('--mqtt_topic', dest='mqtt_topic', default='spd/sensors',
help='topic prefix for MQTT publishing')
+ parser.add_argument('--interval', dest='interval', type=int, default=60,
+ help='interval in seconds between published messages.')
options = parser.parse_args(argv)
queue = Queue()
- monitor = MonitorSensorHub()
- serial = SerialPort(SensorProtocol(monitor, queue), options.port, reactor, baudrate=options.baud)
+ serial = SerialPort(SensorProtocol(queue), options.port, reactor, baudrate=options.baud)
serial._serial.parity = PARITY_ODD # work around pyserial issue #30
serial._serial.parity = PARITY_NONE
- publisher = PublisherThread(queue, options.mqtt_topic, hostname=options.mqtt_hostname, port=options.mqtt_port)
+ publisher = PublisherThread(queue, options.mqtt_topic, interval=options.interval,
+ hostname=options.mqtt_hostname, port=options.mqtt_port)
try:
publisher.start()
reactor.run()