#!/usr/bin/env python import warnings warnings.filterwarnings('ignore', 'libevent') import gevent.monkey gevent.monkey.patch_all(thread=False) from collections import defaultdict import copy import datetime import errno import httplib import json import os from os import path import traceback import urlparse import gevent.pywsgi import config import db import fileio import reloader DATA_DIR = path.expanduser('~/sysvitals_data') handlers = None def main(): global handlers handlers = { 'datum': post_datum, 'raw': get_raw, 'stats': get_stats, 'servers': get_servers, 'register_server': register_server, } server = gevent.pywsgi.WSGIServer(('0.0.0.0', config.api_port), application) reloader.init(server) server.serve_forever() class HTTPException(Exception): def __init__(self, code, body): self.code = code self.body = body BASE_HEADERS = [ ('Access-Control-Allow-Origin', '*'), ('Access-Control-Allow-Headers', 'X-Requested-With, X-Request'), ] DEFAULT_HEADERS = BASE_HEADERS + [('Content-type', 'application/json')] ERROR_HEADERS = BASE_HEADERS + [('Content-type', 'text/plain')] def application(environ, start_response): try: if environ['REQUEST_METHOD'] == 'OPTIONS': start_response('200 OK', DEFAULT_HEADERS) return [] split = environ['PATH_INFO'][1:].split('/') qs = environ['QUERY_STRING'] if qs: query = urlparse.parse_qs(qs, True, True) for k, v in query.items(): if len(v) > 1: raise HTTPException(400, 'duplicate query parameter: ' + k) query[k] = v[0] else: query = {} if split[0] == 'v1': handler = handlers.get(split[2]) if handler: body = json.dumps(handler(split, query, environ)) start_response('200 OK', DEFAULT_HEADERS) return [body] else: print 'no handler for', split else: print 'split was', split raise HTTPException(404, 'unhandled path: ' + environ['PATH_INFO']) except HTTPException as e: response = '%d %s' % (e.code, httplib.responses[e.code]) start_response(response, ERROR_HEADERS) return [e.body] except: traceback.print_exc() start_response('500 Internal Server Error', ERROR_HEADERS) return ['ruh roh'] def get_raw(split, query, environ): try: group = int(split[1]) server_id = int(split[3]) start = datetime.datetime.strptime(query['start'], '%Y-%m-%d').date() end = datetime.datetime.strptime(query['end'], '%Y-%m-%d').date() except (IndexError, KeyError, ValueError): raise HTTPException(400, '') server_dir = path.join(DATA_DIR, str(group), str(server_id)) rval = {} c = start while c <= end: date_str = c.isoformat() try: with open(path.join(server_dir, date_str), 'r') as f: data = fileio.read_stats(f) except IOError as e: if e.errno == errno.ENOENT: data = None else: raise rval[date_str] = data c += datetime.timedelta(days=1) return rval def get_stats(split, query, environ): raw = get_raw(split, query, environ) last_val = fileio.gen_template(-1) stats = defaultdict(dict) for date, data in raw.items(): for field, subfields in data.items(): if field == 'cpu': field_data = {} # subtract times from last minute, calculate total time spent each minute sums = [0] * 1440 for subfield, raw_array in subfields.items(): if subfield != 'num_cpus': array = [None] * 1440 for i, d in enumerate(raw_array): lv = last_val[field][subfield] if d > 0 and lv > 0: array[i] = d - lv sums[i] += array[i] last_val[field][subfield] = d field_data[subfield] = array # divide differences by total times for subfield, array in field_data.items(): if subfield != 'num_cpus': for i, d in enumerate(array): if d > 0: array[i] = array[i] * subfields['num_cpus'][i] * 100 / sums[i] stats[field].setdefault(subfield, {}) stats[field][subfield][date] = array elif field == 'mem': # translate total to free, subtract buffers and cached from used field_data = {} new_fields = ['used', 'buffers', 'cached', 'free'] for subfield in new_fields: field_data[subfield] = [None] * 1440 for i in xrange(1440): if subfields['total'][i] == -1: for nf in new_fields: field_data[nf][i] = None continue field_data['free'][i] = subfields['total'][i] - subfields['used'][i] field_data['used'][i] = subfields['used'][i] - subfields['buffers'][i] - subfields['cached'][i] field_data['buffers'][i] = subfields['buffers'][i] field_data['cached'][i] = subfields['cached'][i] for subfield, array in field_data.items(): stats[field].setdefault(subfield, {}) stats[field][subfield][date] = array elif field == 'net': field_data = {} for subfield in ['bit/s_in', 'bit/s_out', 'err_in', 'err_out', 'drop_in', 'drop_out']: field_data[subfield] = [None] * 1440 for i in xrange(1440): lv_recv = last_val['net']['bytes_recv'] lv_sent = last_val['net']['bytes_sent'] if lv_recv > 0: field_data['bit/s_in'][i] = (subfields['bytes_recv'][i] - lv_recv) * 8.0 / 60 if lv_sent > 0: field_data['bit/s_out'][i] = (subfields['bytes_sent'][i] - lv_sent) * 8.0 / 60 last_val['net']['bytes_recv'] = subfields['bytes_recv'][i] last_val['net']['bytes_sent'] = subfields['bytes_sent'][i] for subfield in ['err_in', 'err_out', 'drop_in', 'drop_out']: field_data[subfield][i] = subfields[subfield.replace('_', '')][i] for subfield, array in field_data.items(): stats[field].setdefault(subfield, {}) stats[field][subfield][date] = array return stats def post_datum(split, query, environ): try: group = int(split[1]) server_id = int(split[3]) except (IndexError, ValueError): raise HTTPException(400, '') body = load_json_body(environ) if body.keys() != fileio.TEMPLATE.keys(): diff = set(body.keys()).symmetric_difference(set(fileio.TEMPLATE.keys())) raise HTTPException(400, 'post body had missing or extra keys: ' + ','.join(diff)) server_dir = path.join(DATA_DIR, str(group), str(server_id)) try: os.makedirs(server_dir) except OSError as e: if e.errno != errno.EEXIST: raise # we floor to the minute, so this rounds to the nearest minute now = datetime.datetime.utcnow() + datetime.timedelta(seconds=29) data_path = path.join(server_dir, now.date().isoformat()) try: with open(data_path, 'r') as f: stats = fileio.read_stats(f) except IOError as e: if e.errno != errno.ENOENT: raise stats = copy.deepcopy(fileio.TEMPLATE) index = now.hour * 60 + now.minute data = {} for field, subfields in stats.items(): field_data = {} if field == 'disk': disk = stats['disk'] for mountpoint, datum in body['disk'].items(): # iterate through body to get new mountpoints disk.setdefault(mountpoint, {'total': [-1] * 1440, 'used': [-1] * 1440}) field_data[mountpoint] = {} for subfield, array in disk[mountpoint].items(): array = list(array) array[index] = datum[subfield] field_data[mountpoint][subfield] = array else: for subfield, array in subfields.items(): array = list(array) array[index] = body[field][subfield] field_data[subfield] = array data[field] = field_data with open(data_path, 'w') as f: fileio.write_datum(f, data) return {'status': 'ok'} def get_servers(split, query, environ): try: group = int(split[1]) except (IndexError, ValueError): raise HTTPException(400, '') return db.get_servers(group) def register_server(split, query, environ): try: group = int(split[1]) except (IndexError, ValueError): raise HTTPException(400, '') body = load_json_body(environ) try: hostname = body['hostname'] except KeyError: raise HTTPException(400, 'post body didn\'t contain "hostname" key') server_id = db.create_server(group, hostname) return {'server_id': server_id} def load_json_body(environ): try: body = json.load(environ['wsgi.input']) except ValueError: raise HTTPException(400, 'post body was not valid JSON') if not isinstance(body, dict): raise HTTPException(400, 'post body was not a JSON dictionary') return body main()