From: Pat Thoyts Date: Fri, 22 Apr 2016 00:50:47 +0000 (+0100) Subject: Switch to using mongodb for the database backend. X-Git-Url: http://privyetmir.co.uk/gitweb?a=commitdiff_plain;h=1959c1dd441700278154486d67b05131ed62cf65;p=spd%2Fsensor-hub.git Switch to using mongodb for the database backend. Implemented recent, since, hosts, sensors and JSON download using mongo. --- diff --git a/importlog.py b/importlog.py deleted file mode 100755 index 7d837c6..0000000 --- a/importlog.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/python3 - -"""Import sensorlog data to JSON for import into mongodb - -mongoimport --db test --collection sensorlog --drop --file JSONDATAFILE -""" - -from __future__ import print_function, absolute_import, division -import sys, json, pymongo -from sensordata import SensorData -from pymongo import MongoClient - -def usage(): - print("usage: importlog import filename\n demo", file=sys.stderr) - -class GranularData(): - def __init__(self, iterable, granularity): - self.iterable = iterable - self.granularity = granularity - def __iter__(self): - self.last = 0 - return self; - def __next__(self): - while True: - item = next(self.iterable) - t = int(int(item['timestamp']) / self.granularity) * self.granularity - if t != self.last: - self.last = t - return item - -def import_logfile(filename, granularity = None): - """Import a sensor-hub logfile into mongodb""" - if granularity is None: - granularity = 600 # 10 minutes - granularity = int(granularity) - last = 0 - mongo = MongoClient() - db = mongo.test.sensorlog - db.drop() - db.insert_many([item for item in GranularData(iter(SensorData(filename)), granularity)]) - return 0 - -def tojson(filename): - last = 0 - for item in SensorData(filename): - t = int(int(item['timestamp']) / 600) * 600 - if t != last: - print(json.dumps(item)) - last = t - return 0 - -def demo(): - ''' - {"timestamp": "1460971800", - "name": "spd-office", - "sensors": [{"id": "office1", "value": "22.25"}, - {"id": "office2", "value": "22.69"}, - {"id": "office3", "value": "22.37"}]} - ''' - uri = 'mongodb://localhost:27017/test' - client = MongoClient(uri) - db = client.test.sensorlog - #db.insert_one(json) / insert_many (iterable) - #cursor = db.find({"name": "spd-office"}) - #cursor = db.find({"sensors.id": "office1", "sensors.value": "22.25"}) - cursor = db.find({ - "$and": [ - { "timestamp": {"$gt": "1460968800"} }, - { "timestamp": {"$lt": "1460970100"} } - ] - }).sort([("timestamp", pymongo.ASCENDING)]) - for item in cursor: - print(item) - return 0 - -def main(args = None): - if args is None: - args = sys.argv - if len(args) < 2: - usage() - return 1 - if args[1] == "convert": - return tojson(args[2]) - if args[1] == "import": - return import_logfile(args[2], args[3]) - if args[1] == "demo": - return demo() - usage() - return 1 - -if __name__ == '__main__': - sys.exit(main()) diff --git a/sensor-hub.wsgi b/sensor-hub.wsgi index 9d3dca5..67981e4 100755 --- a/sensor-hub.wsgi +++ b/sensor-hub.wsgi @@ -6,14 +6,27 @@ from __future__ import print_function, division, absolute_import import sys, os sys.path.append(os.path.join(os.path.dirname(__file__))) -import cherrypy, json +import cherrypy, json, pymongo from cherrypy import config, tools from time import localtime, time from datetime import datetime from dateutil import parser as dateparser from statistics import median from urllib3.util import parse_url -from sensordata import SensorData +#from sensordata import SensorData +from pymongo import MongoClient + +class RemoveKeyIterable(): + """Iterator wrapper to filter out the _id key from mongo results""" + def __init__(self, iterable, key): + self.iterable = iterable + self.key = key + def __iter__(self): + return self + def __next__(self): + r = dict(next(self.iterable)) + del r[self.key] + return r class SensorHubService(): @@ -31,10 +44,24 @@ class SensorHubService(): print(msg, file=f) @cherrypy.expose + @tools.json_out() def recent(self, *args, **kwargs): - return json.dumps(dict(response='error', message='not implemented')) + cherrypy.response.headers['content-type'] = 'application/json' + try: + uri = cherrypy.request.app.config['database']['uri'] + client = MongoClient(uri) + db = client.test.sensorlog + cursor = db.find().sort([("timestamp", pymongo.DESCENDING)]).limit(1) + item = dict(next(cursor)) + del item['_id'] + res = dict(response=item, version=self.version) + except Exception as e: + self.log("error in \"since\": {0}".format(str(e))) + res = dict(response='error', message=str(e), version=self.version) + return res @cherrypy.expose + @tools.json_out() def since(self, *args, **kwargs): """Get data since any timepoint. eg: /lab-monitor/since?when=2016-03-01T00:00:00 @@ -48,18 +75,69 @@ class SensorHubService(): when = when.timestamp() else: when = int(time() - (86400 * 7)) # 7 days ago - res = json.dumps(self.get_since(int(when))).encode(encoding='utf-8') + data = self.get_since(when) + res = dict(response="ok", result=[x for x in data], version=self.version) except Exception as e: cherrypy.response.headers['content-type'] = 'application/json' self.log("error in \"since\": {0}".format(str(e))) res = dict(response='error', message=str(e)) + return res + + def get_since(self, when, until = None): + uri = cherrypy.request.app.config['database']['uri'] + client = MongoClient(uri) + db = client.test.sensorlog + selector = [{"timestamp": {"$gt": str(when)}}] + if not until is None: + selector.append({"timestamp": {"$lt": str(until)}}) + cursor = db.find({"$and": selector}).sort([("timestamp", pymongo.ASCENDING)]) + return RemoveKeyIterable(cursor, '_id') + + @cherrypy.expose + @tools.json_out() + def hosts(self): + uri = cherrypy.request.app.config['database']['uri'] + client = MongoClient(uri) + db = client.test.sensorlog + # groupby("name") + result = [x['_id'] for x in db.aggregate([{"$group": {"_id": "$name"}} ])] + return dict(response=result, version=self.version) + + @cherrypy.expose + @tools.json_out() + def sensors(self, hostname): + uri = cherrypy.request.app.config['database']['uri'] + client = MongoClient(uri) + db = client.test.sensorlog + groups = [x['_id'] for x in db.aggregate([ + {"$match": {"name": hostname}}, + {"$group": {"_id": "$sensors.id"}} + ])] + result = sorted(groups, key=len, reverse=True)[0] + return {'response': result, 'version': self.version} + + @cherrypy.expose + def download(self, *args, **kwargs): + try: + param_from = cherrypy.request.params.get('from') + param_until = cherrypy.request.params.get('to') + if param_from is None or param_until is None: + raise Exception("must provide both from and to parameters") + else: + data = self.get_since(int(param_from), int(param_until)) + if cherrypy.request.params.get('type') == 'text': + cherrypy.response.headers['content-type'] = 'text/plain' + raise Exception("not implemented") + else: + cherrypy.response.headers['content-type'] = 'application/json' + res = dict(response="ok", result=[x for x in data], version=self.version) + return json.dumps(res).encode(encoding="utf-8") + except Exception as e: + cherrypy.response.headers['content-type'] = 'application/json' + self.log("error in \"since\": {0}\n {1}".format(str(e), repr(param))) + res = dict(response='error', message=str(e)) res = json.dumps(res).encode(encoding='utf-8') return res - - def get_since(self, when): - url = cherrypy.request.app.config['database']['url'] - path = parse_url(url).path - return dict(result=[x for x in SensorData(path) if int(x['timestamp']) >= when]) def application(environ, start_response): staticdir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'static')) diff --git a/sensordata.py b/sensordata.py index 41db2ee..7b81cc9 100644 --- a/sensordata.py +++ b/sensordata.py @@ -5,13 +5,15 @@ Reads data from the ASCII log file and presents a collection """ -__all__ = ['SensorData'] +from __future__ import print_function, absolute_import, division +import sys, re, json, pymongo, unittest +from pymongo import MongoClient + +__all__ = ['SensorData','SensorDataIterator'] __version__ = '1.0.0' __author__ = 'Pat Thoyts ' __copyright__ = 'Copyright (c) 2016 Pat Thoyts' -import re - class SensorDataIterator(): def __init__(self, filename): self.filename = filename @@ -42,8 +44,6 @@ class SensorData(): def __iter__(self): return iter(SensorDataIterator(self.filename)) -import unittest - class TestSensorData(unittest.TestCase): datafile = '_test_data.log' data = '''# comment @@ -73,13 +73,77 @@ class TestSensorData(unittest.TestCase): count = count + 1 self.assertEqual(3, count) +class GranularData(): + """Iterator wrapper to reduce the number of collected points to 1 every N seconds.""" + def __init__(self, iterable, granularity): + self.iterable = iterable + self.granularity = int(granularity) + def __iter__(self): + self.last = 0 + return self; + def __next__(self): + while True: + item = next(self.iterable) + if 'timestamp' in item: + t = int(int(item['timestamp']) / self.granularity) * self.granularity + if t != self.last: + self.last = t + return item + +def import_logfile(filename, granularity = None): + """Import a sensor-hub logfile into mongodb while reducing the number of samples""" + if granularity is None: + granularity = 60 # 1 per minute + granularity = int(granularity) + mongo = MongoClient() + db = mongo.test.sensorlog + db.drop() + r = db.insert_many([item for item in GranularData(iter(SensorData(filename)), granularity)]) + print("imported {0} records".format(len(r.inserted_ids))) + return 0 + +def tojson(filename, granularity = None): + for item in GranularData(iter(SensorData(filename)), granularity): + print(item) + return 0 + +def testing(*args, **kwargs): + ''' + {"timestamp": "1460971800", + "name": "spd-office", + "sensors": [{"id": "office1", "value": "22.25"}, + {"id": "office2", "value": "22.69"}, + {"id": "office3", "value": "22.37"}]} + ''' + uri = 'mongodb://localhost:27017/test' + client = MongoClient(uri) + db = client.test.sensorlog + #db.insert_one(json) / insert_many (iterable) + #cursor = db.find({"name": "spd-office"}) + #cursor = db.find({"sensors.id": "office1", "sensors.value": "22.25"}) + cursor_since = db.find({ + "$and": [ + { "timestamp": {"$gt": "1460968800"} }, + { "timestamp": {"$lt": "1460970100"} } + ] + }).sort([("timestamp", pymongo.ASCENDING)]) + cursor_sensors = db.aggregate([ + {"$match": {"name": "spd-office"}}, + {"$group": {"_id": "$sensors.id"}} + ]) + for item in cursor_sensors: + print(item) + return 0 + if __name__ == '__main__': import sys - if len(sys.argv) > 2: - if sys.argv[1] == 'test': - # tester().test(argv[2]) etc - print("not implemented yet", file = sys.stderr) - 1 + if len(sys.argv) > 1: + if sys.argv[1] == 'import': + import_logfile(*sys.argv[2:]) + elif sys.argv[1] == 'json': + tojson(*sys.argv[2:]) + elif sys.argv[1] == 'test': + testing(*sys.argv[2:]) else: print("usage: SensorData test") 1 diff --git a/static/sensor-hub.html b/static/sensor-hub.html index 580d108..21bb52f 100644 --- a/static/sensor-hub.html +++ b/static/sensor-hub.html @@ -158,16 +158,16 @@ function on_draw_graph(data) { }); } function main() { - //$.ajax({ - // url: 'recent', - // dataType: 'json', - // complete: function(jqqxhr, msg) { $('#result').removeClass("loading"); }, - // beforeSend: function(jqxhr, opt) { $('#result').addClass("loading"); }, - // success: on_received_recent, - // error: function(jqqxhr, err, evt) { alert("recent: "+err); } - //}); + $.ajax({ + url: 'recent', + dataType: 'json', + complete: function(jqqxhr, msg) { $('#result').removeClass("loading"); }, + beforeSend: function(jqxhr, opt) { $('#result').addClass("loading"); }, + success: on_received_recent, + error: function(jqqxhr, err, evt) { alert("recent: "+err); } + }); var now = ((new Date() - 0) / 1000); - var when = now - (60 * 60 * 24 * 360); // 28 days + var when = now - (60 * 60 * 24 * 28); // 28 days when = (new Date(when * 1000)).toISOString(); $.ajax({ url: 'since',