| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- #!/usr/bin/env python
- import gevent.monkey
- gevent.monkey.patch_all(thread=False)
- from collections import defaultdict
- import copy
- import datetime
- import errno
- import httplib
- import json
- import os
- from os import path
- import traceback
- import urlparse
- import gevent.pywsgi
- import config
- import db
- import fileio
- import reloader
- DATA_DIR = path.expanduser('~/sysvitals_data')
- handlers = None
- def main():
- global handlers
- handlers = {
- 'datum': post_datum,
- 'raw': get_raw,
- 'stats': get_stats,
- 'servers': get_servers,
- 'register_server': register_server,
- }
- server = gevent.pywsgi.WSGIServer(('0.0.0.0', config.api_port), application)
- if config.debug:
- reloader.init(server)
- print 'listening on', config.api_port
- server.serve_forever()
- class HTTPException(Exception):
- def __init__(self, code, body):
- self.code = code
- self.body = body
- BASE_HEADERS = [
- ('Access-Control-Allow-Origin', '*'),
- ('Access-Control-Allow-Headers', 'Authorization, X-Requested-With, X-Request'),
- ]
- DEFAULT_HEADERS = BASE_HEADERS + [('Content-type', 'application/json')]
- ERROR_HEADERS = BASE_HEADERS + [('Content-type', 'text/plain')]
- def application(environ, start_response):
- try:
- if environ['REQUEST_METHOD'] == 'OPTIONS':
- start_response('200 OK', copy.copy(DEFAULT_HEADERS))
- return []
- split = environ['PATH_INFO'][1:].split('/')
- qs = environ['QUERY_STRING']
- if qs:
- query = urlparse.parse_qs(qs, True, True)
- for k, v in query.iteritems():
- if len(v) > 1:
- raise HTTPException(400, 'duplicate query parameter: ' + k)
- query[k] = v[0]
- else:
- query = {}
- if split[0] == 'v1':
- handler = handlers.get(split[2])
- if handler:
- body = json.dumps(handler(split, query, environ))
- start_response('200 OK', copy.copy(DEFAULT_HEADERS))
- return [body]
- else:
- print 'no handler for', split
- else:
- print 'split was', split
- raise HTTPException(404, 'unhandled path: ' + environ['PATH_INFO'])
- except HTTPException as e:
- response = '%d %s' % (e.code, httplib.responses[e.code])
- start_response(response, copy.copy(ERROR_HEADERS))
- return [e.body]
- except:
- traceback.print_exc()
- start_response('500 Internal Server Error', copy.copy(ERROR_HEADERS))
- return ['ruh roh']
- def get_raw(split, query, environ):
- try:
- group_id = get_group(split, environ)
- server_id = int(split[3])
- start = datetime.datetime.strptime(query['start'], '%Y-%m-%d').date()
- end = datetime.datetime.strptime(query['end'], '%Y-%m-%d').date()
- except (IndexError, KeyError, ValueError):
- raise HTTPException(400, '')
- server_dir = path.join(DATA_DIR, str(group_id), str(server_id))
- rval = {}
- c = start
- while c <= end:
- date_str = c.isoformat()
- try:
- with open(path.join(server_dir, date_str), 'r') as f:
- data = fileio.read_stats(f)
- except IOError as e:
- if e.errno == errno.ENOENT:
- data = None
- else:
- raise
- rval[date_str] = data
- c += datetime.timedelta(days=1)
- return rval
- fields = {
- 'cpu': ['user', 'iowait', 'system', 'nice', 'guest', 'guest_nice', 'irq', 'softirq', 'steal', 'idle'],
- 'mem': ['used', 'buffers', 'cached', 'free'],
- 'net': ['bit/s_in', 'bit/s_out', 'err_in', 'err_out', 'drop_in', 'drop_out'],
- }
- def get_stats(split, query, environ):
- try:
- interval = int(query.get('interval', 1))
- except ValueError:
- raise HTTPException(400, 'interval must be integer')
- raw = get_raw(split, query, environ)
- dates = raw.keys()
- dates.sort()
- last_val = fileio.gen_template(-1)
- stats = defaultdict(dict)
- for date in dates:
- data = raw[date]
- if data is None:
- for field, subfields in fields.iteritems():
- for subfield in subfields:
- stats[field].setdefault(subfield, {})
- stats[field][subfield][date] = [None] * (1440 / interval)
- continue
- for field, subfields in data.items():
- if field == 'cpu':
- field_data = {}
- # subtract times from last minute, calculate total time spent each minute
- sums = [0] * 1440
- for subfield, raw_array in subfields.iteritems():
- if subfield != 'num_cpus':
- array = [None] * 1440
- for i, d in enumerate(raw_array):
- lv = last_val[field][subfield]
- if d > 0 and lv > 0:
- array[i] = d - lv
- sums[i] += array[i]
- last_val[field][subfield] = d
- field_data[subfield] = array
- # divide differences by total times
- for subfield, array in field_data.items():
- if subfield != 'num_cpus':
- for i, d in enumerate(array):
- if d > 0:
- array[i] = array[i] * subfields['num_cpus'][i] * 100 / sums[i]
- stats[field].setdefault(subfield, {})
- stats[field][subfield][date] = average(array, interval)
- elif field == 'mem':
- # translate total to free, subtract buffers and cached from used
- field_data = {}
- new_fields = ['used', 'buffers', 'cached', 'free']
- for subfield in new_fields:
- field_data[subfield] = [None] * 1440
- for i in xrange(1440):
- if subfields['total'][i] == -1:
- for nf in new_fields:
- field_data[nf][i] = None
- continue
- field_data['free'][i] = subfields['total'][i] - subfields['used'][i]
- field_data['used'][i] = subfields['used'][i] - subfields['buffers'][i] - subfields['cached'][i]
- field_data['buffers'][i] = subfields['buffers'][i]
- field_data['cached'][i] = subfields['cached'][i]
- for subfield, array in field_data.iteritems():
- stats[field].setdefault(subfield, {})
- stats[field][subfield][date] = average(array, interval)
- elif field == 'net':
- field_data = {}
- for subfield in ['bit/s_in', 'bit/s_out', 'err_in', 'err_out', 'drop_in', 'drop_out']:
- field_data[subfield] = [None] * 1440
- for i in xrange(1440):
- lv_recv = last_val['net']['bytes_recv']
- lv_sent = last_val['net']['bytes_sent']
- if lv_recv > 0:
- field_data['bit/s_in'][i] = (subfields['bytes_recv'][i] - lv_recv) * 8.0 / 60
- if lv_sent > 0:
- field_data['bit/s_out'][i] = (subfields['bytes_sent'][i] - lv_sent) * 8.0 / 60
- last_val['net']['bytes_recv'] = subfields['bytes_recv'][i]
- last_val['net']['bytes_sent'] = subfields['bytes_sent'][i]
- for subfield in ['err_in', 'err_out', 'drop_in', 'drop_out']:
- field_data[subfield][i] = subfields[subfield.replace('_', '')][i]
- for subfield, array in field_data.iteritems():
- stats[field].setdefault(subfield, {})
- stats[field][subfield][date] = average(array, interval)
- elif field == 'disk':
- field_data = {}
- for mountpoint, disk_sfs in subfields.items():
- used_key = mountpoint + '_used'
- free_key = mountpoint + '_free'
- field_data[used_key] = [None] * 1440
- field_data[free_key] = [None] * 1440
- for i in xrange(1440):
- if disk_sfs['total'][i] == -1:
- continue
- used = disk_sfs['used'][i]
- field_data[used_key][i] = used
- field_data[free_key][i] = disk_sfs['total'][i] - used
- for subfield, array in field_data.iteritems():
- stats[field].setdefault(subfield, {})
- stats[field][subfield][date] = average(array, interval)
- return stats
- def post_datum(split, query, environ):
- group_id = get_group(split, environ)
- try:
- server_id = int(split[3])
- except (IndexError, ValueError):
- raise HTTPException(400, '')
- body = load_json_body(environ)
- if body.keys() != fileio.TEMPLATE.keys():
- diff = set(body.keys()).symmetric_difference(set(fileio.TEMPLATE.keys()))
- raise HTTPException(400, 'post body had missing or extra keys: ' + ','.join(diff))
- server_dir = path.join(DATA_DIR, str(group_id), str(server_id))
- try:
- os.makedirs(server_dir)
- except OSError as e:
- if e.errno != errno.EEXIST:
- raise
- # we floor to the minute, so this rounds to the nearest minute
- now = datetime.datetime.utcnow() + datetime.timedelta(seconds=29)
- data_path = path.join(server_dir, now.date().isoformat())
- try:
- with open(data_path, 'r') as f:
- stats = fileio.read_stats(f)
- except IOError as e:
- if e.errno != errno.ENOENT:
- raise
- stats = copy.deepcopy(fileio.TEMPLATE)
- index = now.hour * 60 + now.minute
- data = {}
- for field, subfields in stats.iteritems():
- field_data = {}
- if field == 'disk':
- disk = stats['disk']
- for mountpoint, datum in body['disk'].iteritems(): # iterate through body to get new mountpoints
- disk.setdefault(mountpoint, {'total': [-1] * 1440, 'used': [-1] * 1440})
- field_data[mountpoint] = {}
- for subfield, array in disk[mountpoint].iteritems():
- array = list(array)
- array[index] = datum[subfield]
- field_data[mountpoint][subfield] = array
- else:
- for subfield, array in subfields.iteritems():
- array = list(array)
- array[index] = body[field][subfield]
- field_data[subfield] = array
- data[field] = field_data
- with open(data_path, 'w') as f:
- fileio.write_datum(f, data)
- return {'status': 'ok'}
- def get_servers(split, query, environ):
- group_id = get_group(split, environ)
- return db.get_servers(group_id)
- def register_server(split, query, environ):
- group_id = get_group(split, environ)
- body = load_json_body(environ)
- try:
- hostname = body['hostname']
- except KeyError:
- raise HTTPException(400, 'post body didn\'t contain "hostname" key')
- server_id = db.create_server(group_id, hostname)
- return {'server_id': server_id}
- def get_group(split, environ):
- try:
- group_id = int(split[1])
- except (IndexError, ValueError):
- raise HTTPException(400, '/v1/[group_id] - group_id was not valid int')
- if 'HTTP_AUTHORIZATION' not in environ:
- raise HTTPException(401, 'no api key passed in Authorization header')
- if db.get_api_key(group_id) != environ['HTTP_AUTHORIZATION']:
- raise HTTPException(403, 'api key did not match')
- return group_id
- def load_json_body(environ):
- try:
- body = json.load(environ['wsgi.input'])
- except ValueError:
- raise HTTPException(400, 'post body was not valid JSON')
- if not isinstance(body, dict):
- raise HTTPException(400, 'post body was not a JSON dictionary')
- return body
- def average(array, interval):
- if interval == 1:
- return array
- averaged = []
- accum = num_values = 0
- for i, v in enumerate(array):
- if v is not None:
- accum += v
- num_values += 1
- if (i + 1) % interval == 0:
- if num_values > 0:
- averaged.append(accum / num_values)
- else:
- averaged.append(None)
- accum = num_values = 0
- return averaged
- main()
|