#!/usr/bin/env python import gevent.monkey gevent.monkey.patch_all(thread=False) from collections import defaultdict import copy import datetime import errno import httplib import json import os from os import path import traceback import urlparse import gevent.pywsgi import config import db import fileio import reloader DATA_DIR = path.expanduser('~/sysvitals_data') handlers = None def main(): global handlers handlers = { 'datum': post_datum, 'raw': get_raw, 'stats': get_stats, 'servers': get_servers, 'register_server': register_server, } server = gevent.pywsgi.WSGIServer(('0.0.0.0', config.api_port), application) if config.debug: reloader.init(server) print 'listening on', config.api_port server.serve_forever() class HTTPException(Exception): def __init__(self, code, body): self.code = code self.body = body BASE_HEADERS = [ ('Access-Control-Allow-Origin', '*'), ('Access-Control-Allow-Headers', 'X-Requested-With, X-Request'), ] DEFAULT_HEADERS = BASE_HEADERS + [('Content-type', 'application/json')] ERROR_HEADERS = BASE_HEADERS + [('Content-type', 'text/plain')] def application(environ, start_response): try: if environ['REQUEST_METHOD'] == 'OPTIONS': start_response('200 OK', copy.copy(DEFAULT_HEADERS)) return [] split = environ['PATH_INFO'][1:].split('/') qs = environ['QUERY_STRING'] if qs: query = urlparse.parse_qs(qs, True, True) for k, v in query.iteritems(): if len(v) > 1: raise HTTPException(400, 'duplicate query parameter: ' + k) query[k] = v[0] else: query = {} if split[0] == 'v1': handler = handlers.get(split[2]) if handler: body = json.dumps(handler(split, query, environ)) start_response('200 OK', copy.copy(DEFAULT_HEADERS)) return [body] else: print 'no handler for', split else: print 'split was', split raise HTTPException(404, 'unhandled path: ' + environ['PATH_INFO']) except HTTPException as e: response = '%d %s' % (e.code, httplib.responses[e.code]) start_response(response, copy.copy(ERROR_HEADERS)) return [e.body] except: traceback.print_exc() start_response('500 Internal Server Error', copy.copy(ERROR_HEADERS)) return ['ruh roh'] def get_raw(split, query, environ): try: group_id = get_group(split, environ) server_id = int(split[3]) start = datetime.datetime.strptime(query['start'], '%Y-%m-%d').date() end = datetime.datetime.strptime(query['end'], '%Y-%m-%d').date() except (IndexError, KeyError, ValueError): raise HTTPException(400, '') server_dir = path.join(DATA_DIR, str(group_id), str(server_id)) rval = {} c = start while c <= end: date_str = c.isoformat() try: with open(path.join(server_dir, date_str), 'r') as f: data = fileio.read_stats(f) except IOError as e: if e.errno == errno.ENOENT: data = None else: raise rval[date_str] = data c += datetime.timedelta(days=1) return rval fields = { 'cpu': ['user', 'iowait', 'system', 'nice', 'guest', 'guest_nice', 'irq', 'softirq', 'steal', 'idle'], 'mem': ['used', 'buffers', 'cached', 'free'], 'net': ['bit/s_in', 'bit/s_out', 'err_in', 'err_out', 'drop_in', 'drop_out'], } def get_stats(split, query, environ): try: interval = int(query.get('interval', 1)) except ValueError: raise HTTPException(400, 'interval must be integer') raw = get_raw(split, query, environ) dates = raw.keys() dates.sort() last_val = fileio.gen_template(-1) stats = defaultdict(dict) for date in dates: data = raw[date] if data is None: for field, subfields in fields.iteritems(): for subfield in subfields: stats[field].setdefault(subfield, {}) stats[field][subfield][date] = [None] * (1440 / interval) continue for field, subfields in data.items(): if field == 'cpu': field_data = {} # subtract times from last minute, calculate total time spent each minute sums = [0] * 1440 for subfield, raw_array in subfields.iteritems(): if subfield != 'num_cpus': array = [None] * 1440 for i, d in enumerate(raw_array): lv = last_val[field][subfield] if d > 0 and lv > 0: array[i] = d - lv sums[i] += array[i] last_val[field][subfield] = d field_data[subfield] = array # divide differences by total times for subfield, array in field_data.items(): if subfield != 'num_cpus': for i, d in enumerate(array): if d > 0: array[i] = array[i] * subfields['num_cpus'][i] * 100 / sums[i] stats[field].setdefault(subfield, {}) stats[field][subfield][date] = average(array, interval) elif field == 'mem': # translate total to free, subtract buffers and cached from used field_data = {} new_fields = ['used', 'buffers', 'cached', 'free'] for subfield in new_fields: field_data[subfield] = [None] * 1440 for i in xrange(1440): if subfields['total'][i] == -1: for nf in new_fields: field_data[nf][i] = None continue field_data['free'][i] = subfields['total'][i] - subfields['used'][i] field_data['used'][i] = subfields['used'][i] - subfields['buffers'][i] - subfields['cached'][i] field_data['buffers'][i] = subfields['buffers'][i] field_data['cached'][i] = subfields['cached'][i] for subfield, array in field_data.iteritems(): stats[field].setdefault(subfield, {}) stats[field][subfield][date] = average(array, interval) elif field == 'net': field_data = {} for subfield in ['bit/s_in', 'bit/s_out', 'err_in', 'err_out', 'drop_in', 'drop_out']: field_data[subfield] = [None] * 1440 for i in xrange(1440): lv_recv = last_val['net']['bytes_recv'] lv_sent = last_val['net']['bytes_sent'] if lv_recv > 0: field_data['bit/s_in'][i] = (subfields['bytes_recv'][i] - lv_recv) * 8.0 / 60 if lv_sent > 0: field_data['bit/s_out'][i] = (subfields['bytes_sent'][i] - lv_sent) * 8.0 / 60 last_val['net']['bytes_recv'] = subfields['bytes_recv'][i] last_val['net']['bytes_sent'] = subfields['bytes_sent'][i] for subfield in ['err_in', 'err_out', 'drop_in', 'drop_out']: field_data[subfield][i] = subfields[subfield.replace('_', '')][i] for subfield, array in field_data.iteritems(): stats[field].setdefault(subfield, {}) stats[field][subfield][date] = average(array, interval) elif field == 'disk': field_data = {} for mountpoint, disk_sfs in subfields.items(): used_key = mountpoint + '_used' free_key = mountpoint + '_free' field_data[used_key] = [None] * 1440 field_data[free_key] = [None] * 1440 for i in xrange(1440): if disk_sfs['total'][i] == -1: continue used = disk_sfs['used'][i] field_data[used_key][i] = used field_data[free_key][i] = disk_sfs['total'][i] - used for subfield, array in field_data.iteritems(): stats[field].setdefault(subfield, {}) stats[field][subfield][date] = average(array, interval) return stats def post_datum(split, query, environ): group_id = get_group(split, environ) try: server_id = int(split[3]) except (IndexError, ValueError): raise HTTPException(400, '') body = load_json_body(environ) if body.keys() != fileio.TEMPLATE.keys(): diff = set(body.keys()).symmetric_difference(set(fileio.TEMPLATE.keys())) raise HTTPException(400, 'post body had missing or extra keys: ' + ','.join(diff)) server_dir = path.join(DATA_DIR, str(group_id), str(server_id)) try: os.makedirs(server_dir) except OSError as e: if e.errno != errno.EEXIST: raise # we floor to the minute, so this rounds to the nearest minute now = datetime.datetime.utcnow() + datetime.timedelta(seconds=29) data_path = path.join(server_dir, now.date().isoformat()) try: with open(data_path, 'r') as f: stats = fileio.read_stats(f) except IOError as e: if e.errno != errno.ENOENT: raise stats = copy.deepcopy(fileio.TEMPLATE) index = now.hour * 60 + now.minute data = {} for field, subfields in stats.iteritems(): field_data = {} if field == 'disk': disk = stats['disk'] for mountpoint, datum in body['disk'].iteritems(): # iterate through body to get new mountpoints if mountpoint not in disk: disk[mountpoint] = {'total': [-1] * 1440, 'used': [-1] * 1440} field_data[mountpoint] = {} for subfield, array in disk[mountpoint].iteritems(): array = list(array) array[index] = datum[subfield] field_data[mountpoint][subfield] = array else: for subfield, array in subfields.iteritems(): array = list(array) array[index] = body[field][subfield] field_data[subfield] = array data[field] = field_data with open(data_path, 'w') as f: fileio.write_datum(f, data) return {'status': 'ok'} def get_servers(split, query, environ): group_id = get_group(split, environ) return db.get_servers(group_id) def register_server(split, query, environ): group_id = get_group(split, environ) body = load_json_body(environ) try: hostname = body['hostname'] except KeyError: raise HTTPException(400, 'post body didn\'t contain "hostname" key') server_id = db.create_server(group_id, hostname) return {'server_id': server_id} def get_group(split, environ): try: group_id = int(split[1]) except (IndexError, ValueError): raise HTTPException(400, '/v1/[group_id] - group_id was not valid int') if 'HTTP_AUTHORIZATION' not in environ: raise HTTPException(401, 'no api key passed in Authorization header') if db.get_api_key(group_id) != environ['HTTP_AUTHORIZATION']: raise HTTPException(403, 'api key did not match') return group_id def load_json_body(environ): try: body = json.load(environ['wsgi.input']) except ValueError: raise HTTPException(400, 'post body was not valid JSON') if not isinstance(body, dict): raise HTTPException(400, 'post body was not a JSON dictionary') return body def average(array, interval): if interval == 1: return array averaged = [] accum = num_values = 0 for i, v in enumerate(array): if v is not None: accum += v num_values += 1 if (i + 1) % interval == 0: if num_values > 0: averaged.append(accum / num_values) else: averaged.append(None) accum = num_values = 0 return averaged main()