Switch to using mongodb for the database backend.
authorPat Thoyts <patthoyts@users.sourceforge.net>
Fri, 22 Apr 2016 00:50:47 +0000 (01:50 +0100)
committerPat Thoyts <patthoyts@users.sourceforge.net>
Fri, 22 Apr 2016 00:50:47 +0000 (01:50 +0100)
Implemented recent, since, hosts, sensors and JSON download using mongo.

importlog.py [deleted file]
sensor-hub.wsgi
sensordata.py
static/sensor-hub.html

diff --git a/importlog.py b/importlog.py
deleted file mode 100755 (executable)
index 7d837c6..0000000
+++ /dev/null
@@ -1,92 +0,0 @@
-#!/usr/bin/python3
-
-"""Import sensorlog data to JSON for import into mongodb
-
-mongoimport --db test --collection sensorlog --drop --file JSONDATAFILE
-"""
-
-from __future__ import print_function, absolute_import, division
-import sys, json, pymongo
-from sensordata import SensorData
-from pymongo import MongoClient
-
-def usage():
-    print("usage: importlog import filename\n       demo", file=sys.stderr)
-
-class GranularData():
-    def __init__(self, iterable, granularity):
-        self.iterable = iterable
-        self.granularity = granularity
-    def __iter__(self):
-        self.last = 0
-        return self;
-    def __next__(self):
-        while True:
-            item = next(self.iterable)
-            t = int(int(item['timestamp']) / self.granularity) * self.granularity
-            if t != self.last:
-                self.last = t
-                return item
-
-def import_logfile(filename, granularity = None):
-    """Import a sensor-hub logfile into mongodb"""
-    if granularity is None:
-        granularity = 600 # 10 minutes
-    granularity = int(granularity)
-    last = 0
-    mongo = MongoClient()
-    db = mongo.test.sensorlog
-    db.drop()
-    db.insert_many([item for item in GranularData(iter(SensorData(filename)), granularity)])
-    return 0
-
-def tojson(filename):
-    last = 0
-    for item in SensorData(filename):
-        t = int(int(item['timestamp']) / 600) * 600
-        if t != last:
-            print(json.dumps(item))
-            last = t
-    return 0
-
-def demo():
-    '''
-    {"timestamp": "1460971800",
-     "name": "spd-office",
-     "sensors": [{"id": "office1", "value": "22.25"},
-                 {"id": "office2", "value": "22.69"},
-                {"id": "office3", "value": "22.37"}]}
-    '''
-    uri = 'mongodb://localhost:27017/test'
-    client = MongoClient(uri)
-    db = client.test.sensorlog
-    #db.insert_one(json) / insert_many (iterable)
-    #cursor = db.find({"name": "spd-office"})
-    #cursor = db.find({"sensors.id": "office1", "sensors.value": "22.25"})
-    cursor = db.find({
-        "$and": [
-            { "timestamp": {"$gt": "1460968800"} },
-            { "timestamp": {"$lt": "1460970100"} }
-        ]
-    }).sort([("timestamp", pymongo.ASCENDING)])
-    for item in cursor:
-        print(item)
-    return 0
-
-def main(args = None):
-    if args is None:
-        args = sys.argv
-    if len(args) < 2:
-        usage()
-        return 1
-    if args[1] == "convert":
-        return tojson(args[2])
-    if args[1] == "import":
-        return import_logfile(args[2], args[3])
-    if args[1] == "demo":
-        return demo()
-    usage()
-    return 1
-
-if __name__ == '__main__':
-    sys.exit(main())
index 9d3dca544f0e8f066260349f7d2af9e377e342a4..67981e4d396f63fcf44597075bdfc47ef21982f1 100755 (executable)
@@ -6,14 +6,27 @@ from __future__ import print_function, division, absolute_import
 import sys, os
 sys.path.append(os.path.join(os.path.dirname(__file__)))
 
-import cherrypy, json
+import cherrypy, json, pymongo
 from cherrypy import config, tools
 from time import localtime, time
 from datetime import datetime
 from dateutil import parser as dateparser
 from statistics import median
 from urllib3.util import parse_url
-from sensordata import SensorData
+#from sensordata import SensorData
+from pymongo import MongoClient
+
+class RemoveKeyIterable():
+    """Iterator wrapper to filter out the _id key from mongo results"""
+    def __init__(self, iterable, key):
+        self.iterable = iterable
+        self.key = key
+    def __iter__(self):
+        return self
+    def __next__(self):
+        r = dict(next(self.iterable))
+        del r[self.key]
+        return r
 
 class SensorHubService():
 
@@ -31,10 +44,24 @@ class SensorHubService():
                 print(msg, file=f)
 
     @cherrypy.expose
+    @tools.json_out()
     def recent(self, *args, **kwargs):
-        return json.dumps(dict(response='error', message='not implemented'))
+        cherrypy.response.headers['content-type'] = 'application/json'
+        try:
+            uri = cherrypy.request.app.config['database']['uri']
+            client = MongoClient(uri)
+            db = client.test.sensorlog
+            cursor = db.find().sort([("timestamp", pymongo.DESCENDING)]).limit(1)
+            item = dict(next(cursor))
+            del item['_id']
+            res = dict(response=item, version=self.version)
+        except Exception as e:
+            self.log("error in \"since\": {0}".format(str(e)))
+            res = dict(response='error', message=str(e), version=self.version)
+        return res
 
     @cherrypy.expose
+    @tools.json_out()
     def since(self, *args, **kwargs):
         """Get data since any timepoint.
         eg: /lab-monitor/since?when=2016-03-01T00:00:00
@@ -48,18 +75,69 @@ class SensorHubService():
                 when = when.timestamp()
             else:
                 when = int(time() - (86400 * 7)) # 7 days ago
-            res = json.dumps(self.get_since(int(when))).encode(encoding='utf-8')
+            data = self.get_since(when)
+            res = dict(response="ok", result=[x for x in data], version=self.version)
         except Exception as e:
             cherrypy.response.headers['content-type'] = 'application/json'
             self.log("error in \"since\": {0}".format(str(e)))
             res = dict(response='error', message=str(e))
+        return res
+
+    def get_since(self, when, until = None):
+        uri = cherrypy.request.app.config['database']['uri']
+        client = MongoClient(uri)
+        db = client.test.sensorlog
+        selector = [{"timestamp": {"$gt": str(when)}}]
+        if not until is None:
+            selector.append({"timestamp": {"$lt": str(until)}})
+        cursor = db.find({"$and": selector}).sort([("timestamp", pymongo.ASCENDING)])
+        return RemoveKeyIterable(cursor, '_id')
+
+    @cherrypy.expose
+    @tools.json_out()
+    def hosts(self):
+        uri = cherrypy.request.app.config['database']['uri']
+        client = MongoClient(uri)
+        db = client.test.sensorlog
+        # groupby("name")
+        result = [x['_id'] for x in db.aggregate([{"$group": {"_id": "$name"}} ])]
+        return dict(response=result, version=self.version)
+
+    @cherrypy.expose
+    @tools.json_out()
+    def sensors(self, hostname):
+        uri = cherrypy.request.app.config['database']['uri']
+        client = MongoClient(uri)
+        db = client.test.sensorlog
+        groups = [x['_id'] for x in db.aggregate([
+             {"$match": {"name": hostname}},
+             {"$group": {"_id": "$sensors.id"}}
+             ])]
+        result = sorted(groups, key=len, reverse=True)[0]
+        return {'response': result, 'version': self.version}
+
+    @cherrypy.expose
+    def download(self, *args, **kwargs):
+        try:
+            param_from = cherrypy.request.params.get('from')
+            param_until = cherrypy.request.params.get('to')
+            if param_from is None or param_until is None:
+                raise Exception("must provide both from and to parameters")
+            else:
+                data = self.get_since(int(param_from), int(param_until))
+                if cherrypy.request.params.get('type') == 'text':
+                    cherrypy.response.headers['content-type'] = 'text/plain'
+                    raise Exception("not implemented")
+                else:
+                    cherrypy.response.headers['content-type'] = 'application/json'
+                    res = dict(response="ok", result=[x for x in data], version=self.version)
+                    return json.dumps(res).encode(encoding="utf-8")
+        except Exception as e:
+            cherrypy.response.headers['content-type'] = 'application/json'
+            self.log("error in \"since\": {0}\n {1}".format(str(e), repr(param)))
+            res = dict(response='error', message=str(e))
             res = json.dumps(res).encode(encoding='utf-8')
         return res
-        
-    def get_since(self, when):
-        url = cherrypy.request.app.config['database']['url']
-        path = parse_url(url).path
-        return dict(result=[x for x in SensorData(path) if int(x['timestamp']) >= when])
 
 def application(environ, start_response):
     staticdir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'static'))
index 41db2ee5c9ebd0e74cf7b7f973bb260d38abd75e..7b81cc906c4c9b2220453b3fce5e9f96a0647721 100644 (file)
@@ -5,13 +5,15 @@
 Reads data from the ASCII log file and presents a collection
 """
 
-__all__ = ['SensorData']
+from __future__ import print_function, absolute_import, division
+import sys, re, json, pymongo, unittest
+from pymongo import MongoClient
+
+__all__ = ['SensorData','SensorDataIterator']
 __version__ = '1.0.0'
 __author__ = 'Pat Thoyts <patthoyts@users.sourceforge.net>'
 __copyright__ = 'Copyright (c) 2016 Pat Thoyts'
 
-import re
-
 class SensorDataIterator():
     def __init__(self, filename):
         self.filename = filename
@@ -42,8 +44,6 @@ class SensorData():
     def __iter__(self):
         return iter(SensorDataIterator(self.filename))
 
-import unittest
-
 class TestSensorData(unittest.TestCase):
     datafile = '_test_data.log'
     data = '''# comment
@@ -73,13 +73,77 @@ class TestSensorData(unittest.TestCase):
             count = count + 1
         self.assertEqual(3, count)
 
+class GranularData():
+    """Iterator wrapper to reduce the number of collected points to 1 every N seconds."""
+    def __init__(self, iterable, granularity):
+        self.iterable = iterable
+        self.granularity = int(granularity)
+    def __iter__(self):
+        self.last = 0
+        return self;
+    def __next__(self):
+        while True:
+            item = next(self.iterable)
+            if 'timestamp' in item:
+                t = int(int(item['timestamp']) / self.granularity) * self.granularity
+                if t != self.last:
+                    self.last = t
+                    return item
+
+def import_logfile(filename, granularity = None):
+    """Import a sensor-hub logfile into mongodb while reducing the number of samples"""
+    if granularity is None:
+        granularity = 60 # 1 per minute
+    granularity = int(granularity)
+    mongo = MongoClient()
+    db = mongo.test.sensorlog
+    db.drop()
+    r = db.insert_many([item for item in GranularData(iter(SensorData(filename)), granularity)])
+    print("imported {0} records".format(len(r.inserted_ids)))
+    return 0
+
+def tojson(filename, granularity = None):
+    for item in GranularData(iter(SensorData(filename)), granularity):
+        print(item)
+    return 0
+
+def testing(*args, **kwargs):
+    '''
+    {"timestamp": "1460971800",
+     "name": "spd-office",
+     "sensors": [{"id": "office1", "value": "22.25"},
+                 {"id": "office2", "value": "22.69"},
+                {"id": "office3", "value": "22.37"}]}
+    '''
+    uri = 'mongodb://localhost:27017/test'
+    client = MongoClient(uri)
+    db = client.test.sensorlog
+    #db.insert_one(json) / insert_many (iterable)
+    #cursor = db.find({"name": "spd-office"})
+    #cursor = db.find({"sensors.id": "office1", "sensors.value": "22.25"})
+    cursor_since = db.find({
+        "$and": [
+            { "timestamp": {"$gt": "1460968800"} },
+            { "timestamp": {"$lt": "1460970100"} }
+        ]
+    }).sort([("timestamp", pymongo.ASCENDING)])
+    cursor_sensors = db.aggregate([
+        {"$match": {"name": "spd-office"}},
+        {"$group": {"_id": "$sensors.id"}}
+    ])
+    for item in cursor_sensors:
+        print(item)
+    return 0
+
 if __name__ == '__main__':
     import sys
-    if len(sys.argv) > 2:
-        if sys.argv[1] == 'test':
-            # tester().test(argv[2]) etc
-            print("not implemented yet", file = sys.stderr)
-            1
+    if len(sys.argv) > 1:
+        if sys.argv[1] == 'import':
+            import_logfile(*sys.argv[2:])
+        elif sys.argv[1] == 'json':
+            tojson(*sys.argv[2:])
+        elif sys.argv[1] == 'test':
+            testing(*sys.argv[2:])
         else:
             print("usage: SensorData test")
             1
index 580d1087c1052b782e49c09639742b7915258590..21bb52f7d3b1432c9854a856aedade1a18bdfd4c 100644 (file)
@@ -158,16 +158,16 @@ function on_draw_graph(data) {
     });
 }
 function main() {
-    //$.ajax({
-    //    url: 'recent',
-    //    dataType: 'json',
-    //    complete: function(jqqxhr, msg) { $('#result').removeClass("loading"); },
-    //    beforeSend: function(jqxhr, opt) { $('#result').addClass("loading"); },
-    //    success: on_received_recent,
-    //    error: function(jqqxhr, err, evt) { alert("recent: "+err); }
-    //});
+    $.ajax({
+        url: 'recent',
+        dataType: 'json',
+        complete: function(jqqxhr, msg) { $('#result').removeClass("loading"); },
+        beforeSend: function(jqxhr, opt) { $('#result').addClass("loading"); },
+        success: on_received_recent,
+        error: function(jqqxhr, err, evt) { alert("recent: "+err); }
+    });
     var now = ((new Date() - 0) / 1000);
-    var when = now - (60 * 60 * 24 * 360); // 28 days
+    var when = now - (60 * 60 * 24 * 28); // 28 days
     when = (new Date(when * 1000)).toISOString();
     $.ajax({
         url: 'since',