server.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. #!/usr/bin/env python
  2. import gevent.monkey
  3. gevent.monkey.patch_all(thread=False)
  4. from collections import defaultdict
  5. import copy
  6. import datetime
  7. import errno
  8. import httplib
  9. import json
  10. import os
  11. from os import path
  12. import traceback
  13. import urlparse
  14. import gevent.pywsgi
  15. import config
  16. import db
  17. import fileio
  18. import reloader
  19. DATA_DIR = path.expanduser('~/sysvitals_data')
  20. handlers = None
  21. def main():
  22. global handlers
  23. handlers = {
  24. 'datum': post_datum,
  25. 'raw': get_raw,
  26. 'stats': get_stats,
  27. 'servers': get_servers,
  28. 'register_server': register_server,
  29. }
  30. server = gevent.pywsgi.WSGIServer(('0.0.0.0', config.api_port), application)
  31. if config.debug:
  32. reloader.init(server)
  33. server.serve_forever()
  34. class HTTPException(Exception):
  35. def __init__(self, code, body):
  36. self.code = code
  37. self.body = body
  38. BASE_HEADERS = [
  39. ('Access-Control-Allow-Origin', '*'),
  40. ('Access-Control-Allow-Headers', 'X-Requested-With, X-Request'),
  41. ]
  42. DEFAULT_HEADERS = BASE_HEADERS + [('Content-type', 'application/json')]
  43. ERROR_HEADERS = BASE_HEADERS + [('Content-type', 'text/plain')]
  44. def application(environ, start_response):
  45. try:
  46. if environ['REQUEST_METHOD'] == 'OPTIONS':
  47. start_response('200 OK', copy.copy(DEFAULT_HEADERS))
  48. return []
  49. split = environ['PATH_INFO'][1:].split('/')
  50. qs = environ['QUERY_STRING']
  51. if qs:
  52. query = urlparse.parse_qs(qs, True, True)
  53. for k, v in query.items():
  54. if len(v) > 1:
  55. raise HTTPException(400, 'duplicate query parameter: ' + k)
  56. query[k] = v[0]
  57. else:
  58. query = {}
  59. if split[0] == 'v1':
  60. handler = handlers.get(split[2])
  61. if handler:
  62. body = json.dumps(handler(split, query, environ))
  63. start_response('200 OK', copy.copy(DEFAULT_HEADERS))
  64. return [body]
  65. else:
  66. print 'no handler for', split
  67. else:
  68. print 'split was', split
  69. raise HTTPException(404, 'unhandled path: ' + environ['PATH_INFO'])
  70. except HTTPException as e:
  71. response = '%d %s' % (e.code, httplib.responses[e.code])
  72. start_response(response, copy.copy(ERROR_HEADERS))
  73. return [e.body]
  74. except:
  75. traceback.print_exc()
  76. start_response('500 Internal Server Error', copy.copy(ERROR_HEADERS))
  77. return ['ruh roh']
  78. def get_raw(split, query, environ):
  79. try:
  80. group_id = int(split[1])
  81. server_id = int(split[3])
  82. start = datetime.datetime.strptime(query['start'], '%Y-%m-%d').date()
  83. end = datetime.datetime.strptime(query['end'], '%Y-%m-%d').date()
  84. except (IndexError, KeyError, ValueError):
  85. raise HTTPException(400, '')
  86. server_dir = path.join(DATA_DIR, str(group_id), str(server_id))
  87. rval = {}
  88. c = start
  89. while c <= end:
  90. date_str = c.isoformat()
  91. try:
  92. with open(path.join(server_dir, date_str), 'r') as f:
  93. data = fileio.read_stats(f)
  94. except IOError as e:
  95. if e.errno == errno.ENOENT:
  96. data = None
  97. else:
  98. raise
  99. rval[date_str] = data
  100. c += datetime.timedelta(days=1)
  101. return rval
  102. fields = {
  103. 'cpu': ['user', 'iowait', 'system', 'nice', 'guest', 'guest_nice', 'irq', 'softirq', 'steal', 'idle'],
  104. 'mem': ['used', 'buffers', 'cached', 'free'],
  105. 'net': ['bit/s_in', 'bit/s_out', 'err_in', 'err_out', 'drop_in', 'drop_out'],
  106. }
  107. def get_stats(split, query, environ):
  108. try:
  109. interval = int(query.get('interval', 1))
  110. except ValueError:
  111. raise HTTPException(400, 'interval must be integer')
  112. raw = get_raw(split, query, environ)
  113. dates = raw.keys()
  114. dates.sort()
  115. last_val = fileio.gen_template(-1)
  116. stats = defaultdict(dict)
  117. for date in dates:
  118. data = raw[date]
  119. if data is None:
  120. for field, subfields in fields.iteritems():
  121. for subfield in subfields:
  122. stats[field].setdefault(subfield, {})
  123. stats[field][subfield][date] = [None] * (1440 / interval)
  124. continue
  125. for field, subfields in data.items():
  126. if field == 'cpu':
  127. field_data = {}
  128. # subtract times from last minute, calculate total time spent each minute
  129. sums = [0] * 1440
  130. for subfield, raw_array in subfields.items():
  131. if subfield != 'num_cpus':
  132. array = [None] * 1440
  133. for i, d in enumerate(raw_array):
  134. lv = last_val[field][subfield]
  135. if d > 0 and lv > 0:
  136. array[i] = d - lv
  137. sums[i] += array[i]
  138. last_val[field][subfield] = d
  139. field_data[subfield] = array
  140. # divide differences by total times
  141. for subfield, array in field_data.items():
  142. if subfield != 'num_cpus':
  143. for i, d in enumerate(array):
  144. if d > 0:
  145. array[i] = array[i] * subfields['num_cpus'][i] * 100 / sums[i]
  146. stats[field].setdefault(subfield, {})
  147. stats[field][subfield][date] = average(array, interval)
  148. elif field == 'mem':
  149. # translate total to free, subtract buffers and cached from used
  150. field_data = {}
  151. new_fields = ['used', 'buffers', 'cached', 'free']
  152. for subfield in new_fields:
  153. field_data[subfield] = [None] * 1440
  154. for i in xrange(1440):
  155. if subfields['total'][i] == -1:
  156. for nf in new_fields:
  157. field_data[nf][i] = None
  158. continue
  159. field_data['free'][i] = subfields['total'][i] - subfields['used'][i]
  160. field_data['used'][i] = subfields['used'][i] - subfields['buffers'][i] - subfields['cached'][i]
  161. field_data['buffers'][i] = subfields['buffers'][i]
  162. field_data['cached'][i] = subfields['cached'][i]
  163. for subfield, array in field_data.items():
  164. stats[field].setdefault(subfield, {})
  165. stats[field][subfield][date] = average(array, interval)
  166. elif field == 'net':
  167. field_data = {}
  168. for subfield in ['bit/s_in', 'bit/s_out', 'err_in', 'err_out', 'drop_in', 'drop_out']:
  169. field_data[subfield] = [None] * 1440
  170. for i in xrange(1440):
  171. lv_recv = last_val['net']['bytes_recv']
  172. lv_sent = last_val['net']['bytes_sent']
  173. if lv_recv > 0:
  174. field_data['bit/s_in'][i] = (subfields['bytes_recv'][i] - lv_recv) * 8.0 / 60
  175. if lv_sent > 0:
  176. field_data['bit/s_out'][i] = (subfields['bytes_sent'][i] - lv_sent) * 8.0 / 60
  177. last_val['net']['bytes_recv'] = subfields['bytes_recv'][i]
  178. last_val['net']['bytes_sent'] = subfields['bytes_sent'][i]
  179. for subfield in ['err_in', 'err_out', 'drop_in', 'drop_out']:
  180. field_data[subfield][i] = subfields[subfield.replace('_', '')][i]
  181. for subfield, array in field_data.items():
  182. stats[field].setdefault(subfield, {})
  183. stats[field][subfield][date] = average(array, interval)
  184. return stats
  185. def post_datum(split, query, environ):
  186. group_id = get_group(split, environ)
  187. try:
  188. server_id = int(split[3])
  189. except (IndexError, ValueError):
  190. raise HTTPException(400, '')
  191. body = load_json_body(environ)
  192. if body.keys() != fileio.TEMPLATE.keys():
  193. diff = set(body.keys()).symmetric_difference(set(fileio.TEMPLATE.keys()))
  194. raise HTTPException(400, 'post body had missing or extra keys: ' + ','.join(diff))
  195. server_dir = path.join(DATA_DIR, str(group_id), str(server_id))
  196. try:
  197. os.makedirs(server_dir)
  198. except OSError as e:
  199. if e.errno != errno.EEXIST:
  200. raise
  201. # we floor to the minute, so this rounds to the nearest minute
  202. now = datetime.datetime.utcnow() + datetime.timedelta(seconds=29)
  203. data_path = path.join(server_dir, now.date().isoformat())
  204. try:
  205. with open(data_path, 'r') as f:
  206. stats = fileio.read_stats(f)
  207. except IOError as e:
  208. if e.errno != errno.ENOENT:
  209. raise
  210. stats = copy.deepcopy(fileio.TEMPLATE)
  211. index = now.hour * 60 + now.minute
  212. data = {}
  213. for field, subfields in stats.items():
  214. field_data = {}
  215. if field == 'disk':
  216. disk = stats['disk']
  217. for mountpoint, datum in body['disk'].items(): # iterate through body to get new mountpoints
  218. disk.setdefault(mountpoint, {'total': [-1] * 1440, 'used': [-1] * 1440})
  219. field_data[mountpoint] = {}
  220. for subfield, array in disk[mountpoint].items():
  221. array = list(array)
  222. array[index] = datum[subfield]
  223. field_data[mountpoint][subfield] = array
  224. else:
  225. for subfield, array in subfields.items():
  226. array = list(array)
  227. array[index] = body[field][subfield]
  228. field_data[subfield] = array
  229. data[field] = field_data
  230. with open(data_path, 'w') as f:
  231. fileio.write_datum(f, data)
  232. return {'status': 'ok'}
  233. def get_servers(split, query, environ):
  234. group_id = get_group(split, environ)
  235. return db.get_servers(group_id)
  236. def register_server(split, query, environ):
  237. group_id = get_group(split, environ)
  238. body = load_json_body(environ)
  239. try:
  240. hostname = body['hostname']
  241. except KeyError:
  242. raise HTTPException(400, 'post body didn\'t contain "hostname" key')
  243. server_id = db.create_server(group_id, hostname)
  244. return {'server_id': server_id}
  245. def get_group(split, environ):
  246. try:
  247. group_id = int(split[1])
  248. except (IndexError, ValueError):
  249. raise HTTPException(400, '/v1/[group_id] - group_id was not valid int')
  250. if 'HTTP_AUTHORIZATION' not in environ:
  251. raise HTTPException(401, 'no api key passed in Authorization header')
  252. if db.get_api_key(group_id) != environ['HTTP_AUTHORIZATION']:
  253. raise HTTPException(403, 'api key did not match')
  254. return group_id
  255. def load_json_body(environ):
  256. try:
  257. body = json.load(environ['wsgi.input'])
  258. except ValueError:
  259. raise HTTPException(400, 'post body was not valid JSON')
  260. if not isinstance(body, dict):
  261. raise HTTPException(400, 'post body was not a JSON dictionary')
  262. return body
  263. def average(array, interval):
  264. if interval == 1:
  265. return array
  266. averaged = []
  267. accum = num_values = 0
  268. for i, v in enumerate(array):
  269. if v is not None:
  270. accum += v
  271. num_values += 1
  272. if (i + 1) % interval == 0:
  273. if num_values > 0:
  274. averaged.append(accum / num_values)
  275. else:
  276. averaged.append(None)
  277. accum = num_values = 0
  278. return averaged
  279. main()