Implemented recent, since, hosts, sensors and JSON download using mongo.
+++ /dev/null
-#!/usr/bin/python3
-
-"""Import sensorlog data to JSON for import into mongodb
-
-mongoimport --db test --collection sensorlog --drop --file JSONDATAFILE
-"""
-
-from __future__ import print_function, absolute_import, division
-import sys, json, pymongo
-from sensordata import SensorData
-from pymongo import MongoClient
-
-def usage():
- print("usage: importlog import filename\n demo", file=sys.stderr)
-
-class GranularData():
- def __init__(self, iterable, granularity):
- self.iterable = iterable
- self.granularity = granularity
- def __iter__(self):
- self.last = 0
- return self;
- def __next__(self):
- while True:
- item = next(self.iterable)
- t = int(int(item['timestamp']) / self.granularity) * self.granularity
- if t != self.last:
- self.last = t
- return item
-
-def import_logfile(filename, granularity = None):
- """Import a sensor-hub logfile into mongodb"""
- if granularity is None:
- granularity = 600 # 10 minutes
- granularity = int(granularity)
- last = 0
- mongo = MongoClient()
- db = mongo.test.sensorlog
- db.drop()
- db.insert_many([item for item in GranularData(iter(SensorData(filename)), granularity)])
- return 0
-
-def tojson(filename):
- last = 0
- for item in SensorData(filename):
- t = int(int(item['timestamp']) / 600) * 600
- if t != last:
- print(json.dumps(item))
- last = t
- return 0
-
-def demo():
- '''
- {"timestamp": "1460971800",
- "name": "spd-office",
- "sensors": [{"id": "office1", "value": "22.25"},
- {"id": "office2", "value": "22.69"},
- {"id": "office3", "value": "22.37"}]}
- '''
- uri = 'mongodb://localhost:27017/test'
- client = MongoClient(uri)
- db = client.test.sensorlog
- #db.insert_one(json) / insert_many (iterable)
- #cursor = db.find({"name": "spd-office"})
- #cursor = db.find({"sensors.id": "office1", "sensors.value": "22.25"})
- cursor = db.find({
- "$and": [
- { "timestamp": {"$gt": "1460968800"} },
- { "timestamp": {"$lt": "1460970100"} }
- ]
- }).sort([("timestamp", pymongo.ASCENDING)])
- for item in cursor:
- print(item)
- return 0
-
-def main(args = None):
- if args is None:
- args = sys.argv
- if len(args) < 2:
- usage()
- return 1
- if args[1] == "convert":
- return tojson(args[2])
- if args[1] == "import":
- return import_logfile(args[2], args[3])
- if args[1] == "demo":
- return demo()
- usage()
- return 1
-
-if __name__ == '__main__':
- sys.exit(main())
import sys, os
sys.path.append(os.path.join(os.path.dirname(__file__)))
-import cherrypy, json
+import cherrypy, json, pymongo
from cherrypy import config, tools
from time import localtime, time
from datetime import datetime
from dateutil import parser as dateparser
from statistics import median
from urllib3.util import parse_url
-from sensordata import SensorData
+#from sensordata import SensorData
+from pymongo import MongoClient
+
+class RemoveKeyIterable():
+ """Iterator wrapper to filter out the _id key from mongo results"""
+ def __init__(self, iterable, key):
+ self.iterable = iterable
+ self.key = key
+ def __iter__(self):
+ return self
+ def __next__(self):
+ r = dict(next(self.iterable))
+ del r[self.key]
+ return r
class SensorHubService():
print(msg, file=f)
@cherrypy.expose
+ @tools.json_out()
def recent(self, *args, **kwargs):
- return json.dumps(dict(response='error', message='not implemented'))
+ cherrypy.response.headers['content-type'] = 'application/json'
+ try:
+ uri = cherrypy.request.app.config['database']['uri']
+ client = MongoClient(uri)
+ db = client.test.sensorlog
+ cursor = db.find().sort([("timestamp", pymongo.DESCENDING)]).limit(1)
+ item = dict(next(cursor))
+ del item['_id']
+ res = dict(response=item, version=self.version)
+ except Exception as e:
+ self.log("error in \"since\": {0}".format(str(e)))
+ res = dict(response='error', message=str(e), version=self.version)
+ return res
@cherrypy.expose
+ @tools.json_out()
def since(self, *args, **kwargs):
"""Get data since any timepoint.
eg: /lab-monitor/since?when=2016-03-01T00:00:00
when = when.timestamp()
else:
when = int(time() - (86400 * 7)) # 7 days ago
- res = json.dumps(self.get_since(int(when))).encode(encoding='utf-8')
+ data = self.get_since(when)
+ res = dict(response="ok", result=[x for x in data], version=self.version)
except Exception as e:
cherrypy.response.headers['content-type'] = 'application/json'
self.log("error in \"since\": {0}".format(str(e)))
res = dict(response='error', message=str(e))
+ return res
+
+ def get_since(self, when, until = None):
+ uri = cherrypy.request.app.config['database']['uri']
+ client = MongoClient(uri)
+ db = client.test.sensorlog
+ selector = [{"timestamp": {"$gt": str(when)}}]
+ if not until is None:
+ selector.append({"timestamp": {"$lt": str(until)}})
+ cursor = db.find({"$and": selector}).sort([("timestamp", pymongo.ASCENDING)])
+ return RemoveKeyIterable(cursor, '_id')
+
+ @cherrypy.expose
+ @tools.json_out()
+ def hosts(self):
+ uri = cherrypy.request.app.config['database']['uri']
+ client = MongoClient(uri)
+ db = client.test.sensorlog
+ # groupby("name")
+ result = [x['_id'] for x in db.aggregate([{"$group": {"_id": "$name"}} ])]
+ return dict(response=result, version=self.version)
+
+ @cherrypy.expose
+ @tools.json_out()
+ def sensors(self, hostname):
+ uri = cherrypy.request.app.config['database']['uri']
+ client = MongoClient(uri)
+ db = client.test.sensorlog
+ groups = [x['_id'] for x in db.aggregate([
+ {"$match": {"name": hostname}},
+ {"$group": {"_id": "$sensors.id"}}
+ ])]
+ result = sorted(groups, key=len, reverse=True)[0]
+ return {'response': result, 'version': self.version}
+
+ @cherrypy.expose
+ def download(self, *args, **kwargs):
+ try:
+ param_from = cherrypy.request.params.get('from')
+ param_until = cherrypy.request.params.get('to')
+ if param_from is None or param_until is None:
+ raise Exception("must provide both from and to parameters")
+ else:
+ data = self.get_since(int(param_from), int(param_until))
+ if cherrypy.request.params.get('type') == 'text':
+ cherrypy.response.headers['content-type'] = 'text/plain'
+ raise Exception("not implemented")
+ else:
+ cherrypy.response.headers['content-type'] = 'application/json'
+ res = dict(response="ok", result=[x for x in data], version=self.version)
+ return json.dumps(res).encode(encoding="utf-8")
+ except Exception as e:
+ cherrypy.response.headers['content-type'] = 'application/json'
+ self.log("error in \"since\": {0}\n {1}".format(str(e), repr(param)))
+ res = dict(response='error', message=str(e))
res = json.dumps(res).encode(encoding='utf-8')
return res
-
- def get_since(self, when):
- url = cherrypy.request.app.config['database']['url']
- path = parse_url(url).path
- return dict(result=[x for x in SensorData(path) if int(x['timestamp']) >= when])
def application(environ, start_response):
staticdir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'static'))
Reads data from the ASCII log file and presents a collection
"""
-__all__ = ['SensorData']
+from __future__ import print_function, absolute_import, division
+import sys, re, json, pymongo, unittest
+from pymongo import MongoClient
+
+__all__ = ['SensorData','SensorDataIterator']
__version__ = '1.0.0'
__author__ = 'Pat Thoyts <patthoyts@users.sourceforge.net>'
__copyright__ = 'Copyright (c) 2016 Pat Thoyts'
-import re
-
class SensorDataIterator():
def __init__(self, filename):
self.filename = filename
def __iter__(self):
return iter(SensorDataIterator(self.filename))
-import unittest
-
class TestSensorData(unittest.TestCase):
datafile = '_test_data.log'
data = '''# comment
count = count + 1
self.assertEqual(3, count)
+class GranularData():
+ """Iterator wrapper to reduce the number of collected points to 1 every N seconds."""
+ def __init__(self, iterable, granularity):
+ self.iterable = iterable
+ self.granularity = int(granularity)
+ def __iter__(self):
+ self.last = 0
+ return self;
+ def __next__(self):
+ while True:
+ item = next(self.iterable)
+ if 'timestamp' in item:
+ t = int(int(item['timestamp']) / self.granularity) * self.granularity
+ if t != self.last:
+ self.last = t
+ return item
+
+def import_logfile(filename, granularity = None):
+ """Import a sensor-hub logfile into mongodb while reducing the number of samples"""
+ if granularity is None:
+ granularity = 60 # 1 per minute
+ granularity = int(granularity)
+ mongo = MongoClient()
+ db = mongo.test.sensorlog
+ db.drop()
+ r = db.insert_many([item for item in GranularData(iter(SensorData(filename)), granularity)])
+ print("imported {0} records".format(len(r.inserted_ids)))
+ return 0
+
+def tojson(filename, granularity = None):
+ for item in GranularData(iter(SensorData(filename)), granularity):
+ print(item)
+ return 0
+
+def testing(*args, **kwargs):
+ '''
+ {"timestamp": "1460971800",
+ "name": "spd-office",
+ "sensors": [{"id": "office1", "value": "22.25"},
+ {"id": "office2", "value": "22.69"},
+ {"id": "office3", "value": "22.37"}]}
+ '''
+ uri = 'mongodb://localhost:27017/test'
+ client = MongoClient(uri)
+ db = client.test.sensorlog
+ #db.insert_one(json) / insert_many (iterable)
+ #cursor = db.find({"name": "spd-office"})
+ #cursor = db.find({"sensors.id": "office1", "sensors.value": "22.25"})
+ cursor_since = db.find({
+ "$and": [
+ { "timestamp": {"$gt": "1460968800"} },
+ { "timestamp": {"$lt": "1460970100"} }
+ ]
+ }).sort([("timestamp", pymongo.ASCENDING)])
+ cursor_sensors = db.aggregate([
+ {"$match": {"name": "spd-office"}},
+ {"$group": {"_id": "$sensors.id"}}
+ ])
+ for item in cursor_sensors:
+ print(item)
+ return 0
+
if __name__ == '__main__':
import sys
- if len(sys.argv) > 2:
- if sys.argv[1] == 'test':
- # tester().test(argv[2]) etc
- print("not implemented yet", file = sys.stderr)
- 1
+ if len(sys.argv) > 1:
+ if sys.argv[1] == 'import':
+ import_logfile(*sys.argv[2:])
+ elif sys.argv[1] == 'json':
+ tojson(*sys.argv[2:])
+ elif sys.argv[1] == 'test':
+ testing(*sys.argv[2:])
else:
print("usage: SensorData test")
1
});
}
function main() {
- //$.ajax({
- // url: 'recent',
- // dataType: 'json',
- // complete: function(jqqxhr, msg) { $('#result').removeClass("loading"); },
- // beforeSend: function(jqxhr, opt) { $('#result').addClass("loading"); },
- // success: on_received_recent,
- // error: function(jqqxhr, err, evt) { alert("recent: "+err); }
- //});
+ $.ajax({
+ url: 'recent',
+ dataType: 'json',
+ complete: function(jqqxhr, msg) { $('#result').removeClass("loading"); },
+ beforeSend: function(jqxhr, opt) { $('#result').addClass("loading"); },
+ success: on_received_recent,
+ error: function(jqqxhr, err, evt) { alert("recent: "+err); }
+ });
var now = ((new Date() - 0) / 1000);
- var when = now - (60 * 60 * 24 * 360); // 28 days
+ var when = now - (60 * 60 * 24 * 28); // 28 days
when = (new Date(when * 1000)).toISOString();
$.ajax({
url: 'since',