| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- #!/usr/bin/env python
- import warnings
- warnings.filterwarnings('ignore', 'libevent')
- import gevent.monkey
- gevent.monkey.patch_all(thread=False)
- from collections import defaultdict
- import copy
- import datetime
- import errno
- import httplib
- import json
- import os
- from os import path
- import traceback
- import urlparse
- import gevent.pywsgi
- import fileio
- import reloader
- DATA_DIR = path.expanduser('~/sysvitals_data')
- handlers = None
- def main():
- global handlers
- handlers = {
- 'datum': post_datum,
- 'raw': get_raw,
- 'stats': get_stats,
- }
- server = gevent.pywsgi.WSGIServer(('0.0.0.0', 8892), application)
- reloader.init(server)
- server.serve_forever()
- class HTTPException(Exception):
- def __init__(self, code, body):
- self.code = code
- self.body = body
- BASE_HEADERS = [
- ('Access-Control-Allow-Origin', '*'),
- ('Access-Control-Allow-Headers', 'X-Requested-With, X-Request'),
- ]
- DEFAULT_HEADERS = BASE_HEADERS + [('Content-type', 'application/json')]
- ERROR_HEADERS = BASE_HEADERS + [('Content-type', 'text/plain')]
- def application(environ, start_response):
- try:
- if environ['REQUEST_METHOD'] == 'OPTIONS':
- start_response('200 OK', DEFAULT_HEADERS)
- return []
- split = environ['PATH_INFO'][1:].split('/')
- qs = environ['QUERY_STRING']
- if qs:
- query = urlparse.parse_qs(qs, True, True)
- for k, v in query.items():
- if len(v) > 1:
- raise HTTPException(400, 'duplicate query parameter: ' + k)
- query[k] = v[0]
- else:
- query = {}
- if split[0] == 'v1':
- handler = handlers.get(split[2])
- if handler:
- body = json.dumps(handler(split, query, environ))
- start_response('200 OK', DEFAULT_HEADERS)
- return [body]
- else:
- print 'no handler for', split
- else:
- print 'split was', split
- raise HTTPException(404, 'unhandled path: ' + environ['PATH_INFO'])
- except HTTPException as e:
- response = '%d %s' % (e.code, httplib.responses[e.code])
- start_response(response, ERROR_HEADERS)
- return [e.body]
- except:
- traceback.print_exc()
- start_response('500 Internal Server Error', ERROR_HEADERS)
- return ['ruh roh']
- def get_raw(split, query, environ):
- try:
- group = int(split[1])
- server_id = int(split[3])
- start = datetime.datetime.strptime(query['start'], '%Y-%m-%d').date()
- end = datetime.datetime.strptime(query['end'], '%Y-%m-%d').date()
- except (IndexError, KeyError, ValueError):
- raise HTTPException(400, '')
- server_dir = path.join(DATA_DIR, str(group), str(server_id))
- rval = {}
- c = start
- while c <= end:
- date_str = c.isoformat()
- try:
- with open(path.join(server_dir, date_str), 'r') as f:
- data = fileio.read_stats(f)
- except IOError as e:
- if e.errno == errno.ENOENT:
- data = None
- else:
- raise
- rval[date_str] = data
- c += datetime.timedelta(days=1)
- return rval
- def get_stats(split, query, environ):
- raw = get_raw(split, query, environ)
- last_val = fileio.gen_template(-1)
- stats = defaultdict(dict)
- for date, data in raw.items():
- for field, subfields in data.items():
- if field == 'cpu':
- field_data = {}
- # subtract times from last minute, calculate total time spent each minute
- sums = [0] * 1440
- for subfield, raw_array in subfields.items():
- if subfield != 'num_cpus':
- array = [None] * 1440
- for i, d in enumerate(raw_array):
- lv = last_val[field][subfield]
- if d > 0 and lv > 0:
- array[i] = d - lv
- sums[i] += array[i]
- last_val[field][subfield] = d
- field_data[subfield] = array
- # divide differences by total times
- for subfield, array in field_data.items():
- if subfield != 'num_cpus':
- for i, d in enumerate(array):
- if d > 0:
- array[i] = array[i] * subfields['num_cpus'][i] * 100 / sums[i]
- stats[field].setdefault(subfield, {})
- stats[field][subfield][date] = array
- elif field == 'mem':
- # translate total to free, subtract buffers and cached from used
- field_data = {}
- for subfield in ['used', 'buffers', 'cached', 'free']:
- field_data[subfield] = [None] * 1440
- MB = 1024 * 1024
- for i in xrange(1440):
- field_data['free'][i] = (subfields['total'][i] - subfields['used'][i]) / MB
- field_data['used'][i] = (subfields['used'][i] -
- subfields['buffers'][i] - subfields['cached'][i]) / MB
- field_data['buffers'][i] = subfields['buffers'][i] / MB
- field_data['cached'][i] = subfields['cached'][i] / MB
- for subfield, array in field_data.items():
- stats[field].setdefault(subfield, {})
- stats[field][subfield][date] = array
- elif field == 'net':
- field_data = {}
- for subfield in ['bit/s_in', 'bit/s_out', 'err_in', 'err_out', 'drop_in', 'drop_out']:
- field_data[subfield] = [None] * 1440
- for i in xrange(1440):
- lv_recv = last_val['net']['bytes_recv']
- lv_sent = last_val['net']['bytes_sent']
- if lv_recv > 0:
- field_data['bit/s_in'][i] = (subfields['bytes_recv'][i] - lv_recv) * 8.0 / 60
- if lv_sent > 0:
- field_data['bit/s_out'][i] = (subfields['bytes_sent'][i] - lv_sent) * 8.0 / 60
- last_val['net']['bytes_recv'] = subfields['bytes_recv'][i]
- last_val['net']['bytes_sent'] = subfields['bytes_sent'][i]
- for subfield in ['err_in', 'err_out', 'drop_in', 'drop_out']:
- field_data[subfield][i] = subfields[subfield.replace('_', '')][i]
- for subfield, array in field_data.items():
- stats[field].setdefault(subfield, {})
- stats[field][subfield][date] = array
- return stats
- def post_datum(split, query, environ):
- try:
- group = int(split[1])
- server_id = int(split[3])
- except (IndexError, ValueError):
- raise HTTPException(400, '')
- try:
- body = json.load(environ['wsgi.input'])
- except ValueError:
- raise HTTPException(400, 'post body was not valid JSON')
- if not isinstance(body, dict):
- raise HTTPException(400, 'post body was not a JSON dictionary')
- if body.keys() != fileio.TEMPLATE.keys():
- diff = set(body.keys()).symmetric_difference(set(fileio.TEMPLATE.keys()))
- raise HTTPException(400, 'post body had missing or extra keys: ' + ','.join(diff))
- server_dir = path.join(DATA_DIR, str(group), str(server_id))
- try:
- os.makedirs(server_dir)
- except OSError as e:
- if e.errno != errno.EEXIST:
- raise
- # we floor to the minute, so this rounds to the nearest minute
- now = datetime.datetime.utcnow() + datetime.timedelta(seconds=29)
- data_path = path.join(server_dir, now.date().isoformat())
- try:
- with open(data_path, 'r') as f:
- stats = fileio.read_stats(f)
- except IOError as e:
- if e.errno != errno.ENOENT:
- raise
- stats = copy.deepcopy(fileio.TEMPLATE)
- index = now.hour * 60 + now.minute
- data = {}
- for field, subfields in stats.items():
- field_data = {}
- if field == 'disk':
- disk = stats['disk']
- for mountpoint, datum in body['disk'].items(): # iterate through body to get new mountpoints
- disk.setdefault(mountpoint, {'total': [-1] * 1440, 'used': [-1] * 1440})
- field_data[mountpoint] = {}
- for subfield, array in disk[mountpoint].items():
- array = list(array)
- array[index] = datum[subfield]
- field_data[mountpoint][subfield] = array
- else:
- for subfield, array in subfields.items():
- array = list(array)
- array[index] = body[field][subfield]
- field_data[subfield] = array
- data[field] = field_data
- with open(data_path, 'w') as f:
- fileio.write_datum(f, data)
- return {'status': 'ok'}
- main()
|