server.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. #!/usr/bin/env python
  2. import warnings
  3. warnings.filterwarnings('ignore', 'libevent')
  4. import gevent.monkey
  5. gevent.monkey.patch_all(thread=False)
  6. from collections import defaultdict
  7. import copy
  8. import datetime
  9. import errno
  10. import httplib
  11. import json
  12. import os
  13. from os import path
  14. import traceback
  15. import urlparse
  16. import gevent.pywsgi
  17. import config
  18. import db
  19. import fileio
  20. import reloader
  21. DATA_DIR = path.expanduser('~/sysvitals_data')
  22. handlers = None
  23. def main():
  24. global handlers
  25. handlers = {
  26. 'datum': post_datum,
  27. 'raw': get_raw,
  28. 'stats': get_stats,
  29. 'servers': get_servers,
  30. 'register_server': register_server,
  31. }
  32. server = gevent.pywsgi.WSGIServer(('0.0.0.0', config.api_port), application)
  33. if config.debug:
  34. reloader.init(server)
  35. server.serve_forever()
  36. class HTTPException(Exception):
  37. def __init__(self, code, body):
  38. self.code = code
  39. self.body = body
  40. BASE_HEADERS = [
  41. ('Access-Control-Allow-Origin', '*'),
  42. ('Access-Control-Allow-Headers', 'X-Requested-With, X-Request'),
  43. ]
  44. DEFAULT_HEADERS = BASE_HEADERS + [('Content-type', 'application/json')]
  45. ERROR_HEADERS = BASE_HEADERS + [('Content-type', 'text/plain')]
  46. def application(environ, start_response):
  47. try:
  48. if environ['REQUEST_METHOD'] == 'OPTIONS':
  49. start_response('200 OK', DEFAULT_HEADERS)
  50. return []
  51. split = environ['PATH_INFO'][1:].split('/')
  52. qs = environ['QUERY_STRING']
  53. if qs:
  54. query = urlparse.parse_qs(qs, True, True)
  55. for k, v in query.items():
  56. if len(v) > 1:
  57. raise HTTPException(400, 'duplicate query parameter: ' + k)
  58. query[k] = v[0]
  59. else:
  60. query = {}
  61. if split[0] == 'v1':
  62. handler = handlers.get(split[2])
  63. if handler:
  64. body = json.dumps(handler(split, query, environ))
  65. start_response('200 OK', DEFAULT_HEADERS)
  66. return [body]
  67. else:
  68. print 'no handler for', split
  69. else:
  70. print 'split was', split
  71. raise HTTPException(404, 'unhandled path: ' + environ['PATH_INFO'])
  72. except HTTPException as e:
  73. response = '%d %s' % (e.code, httplib.responses[e.code])
  74. start_response(response, ERROR_HEADERS)
  75. return [e.body]
  76. except:
  77. traceback.print_exc()
  78. start_response('500 Internal Server Error', ERROR_HEADERS)
  79. return ['ruh roh']
  80. def get_raw(split, query, environ):
  81. try:
  82. group_id = int(split[1])
  83. server_id = int(split[3])
  84. start = datetime.datetime.strptime(query['start'], '%Y-%m-%d').date()
  85. end = datetime.datetime.strptime(query['end'], '%Y-%m-%d').date()
  86. except (IndexError, KeyError, ValueError):
  87. raise HTTPException(400, '')
  88. server_dir = path.join(DATA_DIR, str(group_id), str(server_id))
  89. rval = {}
  90. c = start
  91. while c <= end:
  92. date_str = c.isoformat()
  93. try:
  94. with open(path.join(server_dir, date_str), 'r') as f:
  95. data = fileio.read_stats(f)
  96. except IOError as e:
  97. if e.errno == errno.ENOENT:
  98. data = None
  99. else:
  100. raise
  101. rval[date_str] = data
  102. c += datetime.timedelta(days=1)
  103. return rval
  104. def get_stats(split, query, environ):
  105. raw = get_raw(split, query, environ)
  106. try:
  107. interval = int(query.get('interval', 1))
  108. except ValueError:
  109. raise HTTPException(400, 'interval must be integer')
  110. last_val = fileio.gen_template(-1)
  111. stats = defaultdict(dict)
  112. for date, data in raw.items():
  113. for field, subfields in data.items():
  114. if field == 'cpu':
  115. field_data = {}
  116. # subtract times from last minute, calculate total time spent each minute
  117. sums = [0] * 1440
  118. for subfield, raw_array in subfields.items():
  119. if subfield != 'num_cpus':
  120. array = [None] * 1440
  121. for i, d in enumerate(raw_array):
  122. lv = last_val[field][subfield]
  123. if d > 0 and lv > 0:
  124. array[i] = d - lv
  125. sums[i] += array[i]
  126. last_val[field][subfield] = d
  127. field_data[subfield] = array
  128. # divide differences by total times
  129. for subfield, array in field_data.items():
  130. if subfield != 'num_cpus':
  131. for i, d in enumerate(array):
  132. if d > 0:
  133. array[i] = array[i] * subfields['num_cpus'][i] * 100 / sums[i]
  134. stats[field].setdefault(subfield, {})
  135. stats[field][subfield][date] = average(array, interval)
  136. elif field == 'mem':
  137. # translate total to free, subtract buffers and cached from used
  138. field_data = {}
  139. new_fields = ['used', 'buffers', 'cached', 'free']
  140. for subfield in new_fields:
  141. field_data[subfield] = [None] * 1440
  142. for i in xrange(1440):
  143. if subfields['total'][i] == -1:
  144. for nf in new_fields:
  145. field_data[nf][i] = None
  146. continue
  147. field_data['free'][i] = subfields['total'][i] - subfields['used'][i]
  148. field_data['used'][i] = subfields['used'][i] - subfields['buffers'][i] - subfields['cached'][i]
  149. field_data['buffers'][i] = subfields['buffers'][i]
  150. field_data['cached'][i] = subfields['cached'][i]
  151. for subfield, array in field_data.items():
  152. stats[field].setdefault(subfield, {})
  153. stats[field][subfield][date] = average(array, interval)
  154. elif field == 'net':
  155. field_data = {}
  156. for subfield in ['bit/s_in', 'bit/s_out', 'err_in', 'err_out', 'drop_in', 'drop_out']:
  157. field_data[subfield] = [None] * 1440
  158. for i in xrange(1440):
  159. lv_recv = last_val['net']['bytes_recv']
  160. lv_sent = last_val['net']['bytes_sent']
  161. if lv_recv > 0:
  162. field_data['bit/s_in'][i] = (subfields['bytes_recv'][i] - lv_recv) * 8.0 / 60
  163. if lv_sent > 0:
  164. field_data['bit/s_out'][i] = (subfields['bytes_sent'][i] - lv_sent) * 8.0 / 60
  165. last_val['net']['bytes_recv'] = subfields['bytes_recv'][i]
  166. last_val['net']['bytes_sent'] = subfields['bytes_sent'][i]
  167. for subfield in ['err_in', 'err_out', 'drop_in', 'drop_out']:
  168. field_data[subfield][i] = subfields[subfield.replace('_', '')][i]
  169. for subfield, array in field_data.items():
  170. stats[field].setdefault(subfield, {})
  171. stats[field][subfield][date] = average(array, interval)
  172. return stats
  173. def post_datum(split, query, environ):
  174. group_id = get_group(split, environ)
  175. try:
  176. server_id = int(split[3])
  177. except (IndexError, ValueError):
  178. raise HTTPException(400, '')
  179. body = load_json_body(environ)
  180. if body.keys() != fileio.TEMPLATE.keys():
  181. diff = set(body.keys()).symmetric_difference(set(fileio.TEMPLATE.keys()))
  182. raise HTTPException(400, 'post body had missing or extra keys: ' + ','.join(diff))
  183. server_dir = path.join(DATA_DIR, str(group_id), str(server_id))
  184. try:
  185. os.makedirs(server_dir)
  186. except OSError as e:
  187. if e.errno != errno.EEXIST:
  188. raise
  189. # we floor to the minute, so this rounds to the nearest minute
  190. now = datetime.datetime.utcnow() + datetime.timedelta(seconds=29)
  191. data_path = path.join(server_dir, now.date().isoformat())
  192. try:
  193. with open(data_path, 'r') as f:
  194. stats = fileio.read_stats(f)
  195. except IOError as e:
  196. if e.errno != errno.ENOENT:
  197. raise
  198. stats = copy.deepcopy(fileio.TEMPLATE)
  199. index = now.hour * 60 + now.minute
  200. data = {}
  201. for field, subfields in stats.items():
  202. field_data = {}
  203. if field == 'disk':
  204. disk = stats['disk']
  205. for mountpoint, datum in body['disk'].items(): # iterate through body to get new mountpoints
  206. disk.setdefault(mountpoint, {'total': [-1] * 1440, 'used': [-1] * 1440})
  207. field_data[mountpoint] = {}
  208. for subfield, array in disk[mountpoint].items():
  209. array = list(array)
  210. array[index] = datum[subfield]
  211. field_data[mountpoint][subfield] = array
  212. else:
  213. for subfield, array in subfields.items():
  214. array = list(array)
  215. array[index] = body[field][subfield]
  216. field_data[subfield] = array
  217. data[field] = field_data
  218. with open(data_path, 'w') as f:
  219. fileio.write_datum(f, data)
  220. return {'status': 'ok'}
  221. def get_servers(split, query, environ):
  222. group_id = get_group(split, environ)
  223. return db.get_servers(group_id)
  224. def register_server(split, query, environ):
  225. group_id = get_group(split, environ)
  226. body = load_json_body(environ)
  227. try:
  228. hostname = body['hostname']
  229. except KeyError:
  230. raise HTTPException(400, 'post body didn\'t contain "hostname" key')
  231. server_id = db.create_server(group_id, hostname)
  232. return {'server_id': server_id}
  233. def get_group(split, environ):
  234. try:
  235. group_id = int(split[1])
  236. except (IndexError, ValueError):
  237. raise HTTPException(400, '/v1/[group_id] - group_id was not valid int')
  238. if 'HTTP_AUTHORIZATION' not in environ:
  239. raise HTTPException(401, 'no api key passed in Authorization header')
  240. if db.get_api_key(group_id) != environ['HTTP_AUTHORIZATION']:
  241. raise HTTPException(403, 'api key did not match')
  242. return group_id
  243. def load_json_body(environ):
  244. try:
  245. body = json.load(environ['wsgi.input'])
  246. except ValueError:
  247. raise HTTPException(400, 'post body was not valid JSON')
  248. if not isinstance(body, dict):
  249. raise HTTPException(400, 'post body was not a JSON dictionary')
  250. return body
  251. def average(array, interval):
  252. if interval == 1:
  253. return array
  254. averaged = []
  255. accum = num_values = 0
  256. for i, v in enumerate(array):
  257. if v is not None:
  258. accum += v
  259. num_values += 1
  260. if (i + 1) % interval == 0:
  261. if num_values > 0:
  262. averaged.append(accum / num_values)
  263. else:
  264. averaged.append(None)
  265. accum = num_values = 0
  266. return averaged
  267. main()