server.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. #!/usr/bin/env python
  2. import warnings
  3. warnings.filterwarnings('ignore', 'libevent')
  4. import gevent.monkey
  5. gevent.monkey.patch_all(thread=False)
  6. from collections import defaultdict
  7. import copy
  8. import datetime
  9. import errno
  10. import httplib
  11. import json
  12. import os
  13. from os import path
  14. import traceback
  15. import urlparse
  16. import gevent.pywsgi
  17. import db
  18. import fileio
  19. import reloader
  20. DATA_DIR = path.expanduser('~/sysvitals_data')
  21. handlers = None
  22. def main():
  23. global handlers
  24. handlers = {
  25. 'datum': post_datum,
  26. 'raw': get_raw,
  27. 'stats': get_stats,
  28. 'register_server': register_server,
  29. }
  30. server = gevent.pywsgi.WSGIServer(('0.0.0.0', 8892), application)
  31. reloader.init(server)
  32. server.serve_forever()
  33. class HTTPException(Exception):
  34. def __init__(self, code, body):
  35. self.code = code
  36. self.body = body
  37. BASE_HEADERS = [
  38. ('Access-Control-Allow-Origin', '*'),
  39. ('Access-Control-Allow-Headers', 'X-Requested-With, X-Request'),
  40. ]
  41. DEFAULT_HEADERS = BASE_HEADERS + [('Content-type', 'application/json')]
  42. ERROR_HEADERS = BASE_HEADERS + [('Content-type', 'text/plain')]
  43. def application(environ, start_response):
  44. try:
  45. if environ['REQUEST_METHOD'] == 'OPTIONS':
  46. start_response('200 OK', DEFAULT_HEADERS)
  47. return []
  48. split = environ['PATH_INFO'][1:].split('/')
  49. qs = environ['QUERY_STRING']
  50. if qs:
  51. query = urlparse.parse_qs(qs, True, True)
  52. for k, v in query.items():
  53. if len(v) > 1:
  54. raise HTTPException(400, 'duplicate query parameter: ' + k)
  55. query[k] = v[0]
  56. else:
  57. query = {}
  58. if split[0] == 'v1':
  59. handler = handlers.get(split[2])
  60. if handler:
  61. body = json.dumps(handler(split, query, environ))
  62. start_response('200 OK', DEFAULT_HEADERS)
  63. return [body]
  64. else:
  65. print 'no handler for', split
  66. else:
  67. print 'split was', split
  68. raise HTTPException(404, 'unhandled path: ' + environ['PATH_INFO'])
  69. except HTTPException as e:
  70. response = '%d %s' % (e.code, httplib.responses[e.code])
  71. start_response(response, ERROR_HEADERS)
  72. return [e.body]
  73. except:
  74. traceback.print_exc()
  75. start_response('500 Internal Server Error', ERROR_HEADERS)
  76. return ['ruh roh']
  77. def get_raw(split, query, environ):
  78. try:
  79. group = int(split[1])
  80. server_id = int(split[3])
  81. start = datetime.datetime.strptime(query['start'], '%Y-%m-%d').date()
  82. end = datetime.datetime.strptime(query['end'], '%Y-%m-%d').date()
  83. except (IndexError, KeyError, ValueError):
  84. raise HTTPException(400, '')
  85. server_dir = path.join(DATA_DIR, str(group), str(server_id))
  86. rval = {}
  87. c = start
  88. while c <= end:
  89. date_str = c.isoformat()
  90. try:
  91. with open(path.join(server_dir, date_str), 'r') as f:
  92. data = fileio.read_stats(f)
  93. except IOError as e:
  94. if e.errno == errno.ENOENT:
  95. data = None
  96. else:
  97. raise
  98. rval[date_str] = data
  99. c += datetime.timedelta(days=1)
  100. return rval
  101. def get_stats(split, query, environ):
  102. raw = get_raw(split, query, environ)
  103. last_val = fileio.gen_template(-1)
  104. stats = defaultdict(dict)
  105. for date, data in raw.items():
  106. for field, subfields in data.items():
  107. if field == 'cpu':
  108. field_data = {}
  109. # subtract times from last minute, calculate total time spent each minute
  110. sums = [0] * 1440
  111. for subfield, raw_array in subfields.items():
  112. if subfield != 'num_cpus':
  113. array = [None] * 1440
  114. for i, d in enumerate(raw_array):
  115. lv = last_val[field][subfield]
  116. if d > 0 and lv > 0:
  117. array[i] = d - lv
  118. sums[i] += array[i]
  119. last_val[field][subfield] = d
  120. field_data[subfield] = array
  121. # divide differences by total times
  122. for subfield, array in field_data.items():
  123. if subfield != 'num_cpus':
  124. for i, d in enumerate(array):
  125. if d > 0:
  126. array[i] = array[i] * subfields['num_cpus'][i] * 100 / sums[i]
  127. stats[field].setdefault(subfield, {})
  128. stats[field][subfield][date] = array
  129. elif field == 'mem':
  130. # translate total to free, subtract buffers and cached from used
  131. field_data = {}
  132. new_fields = ['used', 'buffers', 'cached', 'free']
  133. for subfield in new_fields:
  134. field_data[subfield] = [None] * 1440
  135. for i in xrange(1440):
  136. if subfields['total'][i] == -1:
  137. for nf in new_fields:
  138. field_data[nf][i] = None
  139. continue
  140. field_data['free'][i] = subfields['total'][i] - subfields['used'][i]
  141. field_data['used'][i] = subfields['used'][i] - subfields['buffers'][i] - subfields['cached'][i]
  142. field_data['buffers'][i] = subfields['buffers'][i]
  143. field_data['cached'][i] = subfields['cached'][i]
  144. for subfield, array in field_data.items():
  145. stats[field].setdefault(subfield, {})
  146. stats[field][subfield][date] = array
  147. elif field == 'net':
  148. field_data = {}
  149. for subfield in ['bit/s_in', 'bit/s_out', 'err_in', 'err_out', 'drop_in', 'drop_out']:
  150. field_data[subfield] = [None] * 1440
  151. for i in xrange(1440):
  152. lv_recv = last_val['net']['bytes_recv']
  153. lv_sent = last_val['net']['bytes_sent']
  154. if lv_recv > 0:
  155. field_data['bit/s_in'][i] = (subfields['bytes_recv'][i] - lv_recv) * 8.0 / 60
  156. if lv_sent > 0:
  157. field_data['bit/s_out'][i] = (subfields['bytes_sent'][i] - lv_sent) * 8.0 / 60
  158. last_val['net']['bytes_recv'] = subfields['bytes_recv'][i]
  159. last_val['net']['bytes_sent'] = subfields['bytes_sent'][i]
  160. for subfield in ['err_in', 'err_out', 'drop_in', 'drop_out']:
  161. field_data[subfield][i] = subfields[subfield.replace('_', '')][i]
  162. for subfield, array in field_data.items():
  163. stats[field].setdefault(subfield, {})
  164. stats[field][subfield][date] = array
  165. return stats
  166. def post_datum(split, query, environ):
  167. try:
  168. group = int(split[1])
  169. server_id = int(split[3])
  170. except (IndexError, ValueError):
  171. raise HTTPException(400, '')
  172. body = load_json_body(environ)
  173. if body.keys() != fileio.TEMPLATE.keys():
  174. diff = set(body.keys()).symmetric_difference(set(fileio.TEMPLATE.keys()))
  175. raise HTTPException(400, 'post body had missing or extra keys: ' + ','.join(diff))
  176. server_dir = path.join(DATA_DIR, str(group), str(server_id))
  177. try:
  178. os.makedirs(server_dir)
  179. except OSError as e:
  180. if e.errno != errno.EEXIST:
  181. raise
  182. # we floor to the minute, so this rounds to the nearest minute
  183. now = datetime.datetime.utcnow() + datetime.timedelta(seconds=29)
  184. data_path = path.join(server_dir, now.date().isoformat())
  185. try:
  186. with open(data_path, 'r') as f:
  187. stats = fileio.read_stats(f)
  188. except IOError as e:
  189. if e.errno != errno.ENOENT:
  190. raise
  191. stats = copy.deepcopy(fileio.TEMPLATE)
  192. index = now.hour * 60 + now.minute
  193. data = {}
  194. for field, subfields in stats.items():
  195. field_data = {}
  196. if field == 'disk':
  197. disk = stats['disk']
  198. for mountpoint, datum in body['disk'].items(): # iterate through body to get new mountpoints
  199. disk.setdefault(mountpoint, {'total': [-1] * 1440, 'used': [-1] * 1440})
  200. field_data[mountpoint] = {}
  201. for subfield, array in disk[mountpoint].items():
  202. array = list(array)
  203. array[index] = datum[subfield]
  204. field_data[mountpoint][subfield] = array
  205. else:
  206. for subfield, array in subfields.items():
  207. array = list(array)
  208. array[index] = body[field][subfield]
  209. field_data[subfield] = array
  210. data[field] = field_data
  211. with open(data_path, 'w') as f:
  212. fileio.write_datum(f, data)
  213. return {'status': 'ok'}
  214. def register_server(split, query, environ):
  215. try:
  216. group = int(split[1])
  217. except (IndexError, ValueError):
  218. raise HTTPException(400, '')
  219. body = load_json_body(environ)
  220. try:
  221. hostname = body['hostname']
  222. except KeyError:
  223. raise HTTPException(400, 'post body didn\'t contain "hostname" key')
  224. server_id = db.create_server(group, hostname)
  225. return {'server_id': server_id}
  226. def load_json_body(environ):
  227. try:
  228. body = json.load(environ['wsgi.input'])
  229. except ValueError:
  230. raise HTTPException(400, 'post body was not valid JSON')
  231. if not isinstance(body, dict):
  232. raise HTTPException(400, 'post body was not a JSON dictionary')
  233. return body
  234. main()