server.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. #!/usr/bin/env python
  2. import warnings
  3. warnings.filterwarnings('ignore', 'libevent')
  4. import gevent.monkey
  5. gevent.monkey.patch_all(thread=False)
  6. from collections import defaultdict
  7. import copy
  8. import datetime
  9. import errno
  10. import httplib
  11. import json
  12. import os
  13. from os import path
  14. import traceback
  15. import urlparse
  16. import gevent.pywsgi
  17. import fileio
  18. import reloader
  19. DATA_DIR = path.expanduser('~/sysvitals_data')
  20. handlers = None
  21. def main():
  22. global handlers
  23. handlers = {
  24. 'datum': post_datum,
  25. 'raw': get_raw,
  26. 'stats': get_stats,
  27. }
  28. server = gevent.pywsgi.WSGIServer(('0.0.0.0', 8892), application)
  29. reloader.init(server)
  30. server.serve_forever()
  31. class HTTPException(Exception):
  32. def __init__(self, code, body):
  33. self.code = code
  34. self.body = body
  35. BASE_HEADERS = [
  36. ('Access-Control-Allow-Origin', '*'),
  37. ('Access-Control-Allow-Headers', 'X-Requested-With, X-Request'),
  38. ]
  39. DEFAULT_HEADERS = BASE_HEADERS + [('Content-type', 'application/json')]
  40. ERROR_HEADERS = BASE_HEADERS + [('Content-type', 'text/plain')]
  41. def application(environ, start_response):
  42. try:
  43. split = environ['PATH_INFO'][1:].split('/')
  44. qs = environ['QUERY_STRING']
  45. if qs:
  46. query = urlparse.parse_qs(qs, True, True)
  47. for k, v in query.items():
  48. if len(v) > 1:
  49. raise HTTPException(400, 'duplicate query parameter: ' + k)
  50. query[k] = v[0]
  51. else:
  52. query = {}
  53. if split[0] == 'v1':
  54. handler = handlers.get(split[2])
  55. if handler:
  56. body = json.dumps(handler(split, query, environ))
  57. start_response('200 OK', DEFAULT_HEADERS)
  58. return [body]
  59. else:
  60. print 'no handler for', split
  61. else:
  62. print 'split was', split
  63. raise HTTPException(404, 'unhandled path: ' + environ['PATH_INFO'])
  64. except HTTPException as e:
  65. response = '%d %s' % (e.code, httplib.responses[e.code])
  66. start_response(response, ERROR_HEADERS)
  67. return [e.body]
  68. except:
  69. traceback.print_exc()
  70. start_response('500 Internal Server Error', ERROR_HEADERS)
  71. return ['ruh roh']
  72. def get_raw(split, query, environ):
  73. try:
  74. group = int(split[1])
  75. server_id = int(split[3])
  76. start = datetime.datetime.strptime(query['start'], '%Y-%m-%d').date()
  77. end = datetime.datetime.strptime(query['end'], '%Y-%m-%d').date()
  78. except (IndexError, KeyError, ValueError):
  79. raise HTTPException(400, '')
  80. server_dir = path.join(DATA_DIR, str(group), str(server_id))
  81. rval = {}
  82. c = start
  83. while c <= end:
  84. date_str = c.isoformat()
  85. try:
  86. with open(path.join(server_dir, date_str), 'r') as f:
  87. data = fileio.read_stats(f)
  88. except IOError as e:
  89. if e.errno == errno.ENOENT:
  90. data = None
  91. else:
  92. raise
  93. rval[date_str] = data
  94. c += datetime.timedelta(days=1)
  95. return rval
  96. def get_stats(split, query, environ):
  97. raw = get_raw(split, query, environ)
  98. last_val = fileio.gen_template(-1)
  99. stats = defaultdict(dict)
  100. for date, data in raw.items():
  101. for field, subfields in data.items():
  102. if field == 'cpu':
  103. field_data = {}
  104. # subtract times from last minute, calculate total time spent each minute
  105. sums = [0] * 1440
  106. for subfield, raw_array in subfields.items():
  107. if subfield != 'num_cpus':
  108. array = [None] * 1440
  109. for i, d in enumerate(raw_array):
  110. lv = last_val[field][subfield]
  111. if d > 0 and lv > 0:
  112. array[i] = d - lv
  113. sums[i] += array[i]
  114. last_val[field][subfield] = d
  115. field_data[subfield] = array
  116. # divide differences by total times
  117. for subfield, array in field_data.items():
  118. if subfield != 'num_cpus':
  119. for i, d in enumerate(array):
  120. if d > 0:
  121. array[i] = array[i] * subfields['num_cpus'][i] * 100 / sums[i]
  122. stats[field].setdefault(subfield, {})
  123. stats[field][subfield][date] = array
  124. elif field == 'mem':
  125. # translate total to free, subtract buffers and cached from used
  126. field_data = {}
  127. for subfield in ['used', 'buffers', 'cached', 'free']:
  128. field_data[subfield] = [None] * 1440
  129. MB = 1024 * 1024
  130. for i in xrange(1440):
  131. field_data['free'][i] = (subfields['total'][i] - subfields['used'][i]) / MB
  132. field_data['used'][i] = (subfields['used'][i] -
  133. subfields['buffers'][i] - subfields['cached'][i]) / MB
  134. field_data['buffers'][i] = subfields['buffers'][i] / MB
  135. field_data['cached'][i] = subfields['cached'][i] / MB
  136. for subfield, array in field_data.items():
  137. stats[field].setdefault(subfield, {})
  138. stats[field][subfield][date] = array
  139. elif field == 'net':
  140. field_data = {}
  141. for subfield in ['kbit/s_in', 'kbit/s_out', 'err_in', 'err_out', 'drop_in', 'drop_out']:
  142. field_data[subfield] = [None] * 1440
  143. for i in xrange(1440):
  144. lv_recv = last_val['net']['bytes_recv']
  145. lv_sent = last_val['net']['bytes_sent']
  146. if lv_recv > 0:
  147. field_data['kbit/s_in'][i] = (subfields['bytes_recv'][i] - lv_recv) * 8.0 / 1024
  148. if lv_sent > 0:
  149. field_data['kbit/s_out'][i] = (subfields['bytes_sent'][i] - lv_sent) * 8.0 / 1024
  150. last_val['net']['bytes_recv'] = subfields['bytes_recv'][i]
  151. last_val['net']['bytes_sent'] = subfields['bytes_sent'][i]
  152. for subfield in ['err_in', 'err_out', 'drop_in', 'drop_out']:
  153. field_data[subfield][i] = subfields[subfield.replace('_', '')][i]
  154. for subfield, array in field_data.items():
  155. stats[field].setdefault(subfield, {})
  156. stats[field][subfield][date] = array
  157. return stats
  158. def post_datum(split, query, environ):
  159. try:
  160. group = int(split[1])
  161. server_id = int(split[3])
  162. except (IndexError, ValueError):
  163. raise HTTPException(400, '')
  164. try:
  165. body = json.load(environ['wsgi.input'])
  166. except ValueError:
  167. raise HTTPException(400, 'post body was not valid JSON')
  168. if not isinstance(body, dict):
  169. raise HTTPException(400, 'post body was not a JSON dictionary')
  170. if body.keys() != fileio.TEMPLATE.keys():
  171. diff = set(body.keys()).symmetric_difference(set(fileio.TEMPLATE.keys()))
  172. raise HTTPException(400, 'post body had missing or extra keys: ' + ','.join(diff))
  173. server_dir = path.join(DATA_DIR, str(group), str(server_id))
  174. try:
  175. os.makedirs(server_dir)
  176. except OSError as e:
  177. if e.errno != errno.EEXIST:
  178. raise
  179. # we floor to the minute, so this rounds to the nearest minute
  180. now = datetime.datetime.utcnow() + datetime.timedelta(seconds=29)
  181. data_path = path.join(server_dir, now.date().isoformat())
  182. try:
  183. with open(data_path, 'r') as f:
  184. stats = fileio.read_stats(f)
  185. except IOError as e:
  186. if e.errno != errno.ENOENT:
  187. raise
  188. stats = copy.deepcopy(fileio.TEMPLATE)
  189. index = now.hour * 60 + now.minute
  190. data = {}
  191. for field, subfields in stats.items():
  192. field_data = {}
  193. if field == 'disk':
  194. disk = stats['disk']
  195. for mountpoint, datum in body['disk'].items(): # iterate through body to get new mountpoints
  196. disk.setdefault(mountpoint, {'total': [-1] * 1440, 'used': [-1] * 1440})
  197. field_data[mountpoint] = {}
  198. for subfield, array in disk[mountpoint].items():
  199. array = list(array)
  200. array[index] = datum[subfield]
  201. field_data[mountpoint][subfield] = array
  202. else:
  203. for subfield, array in subfields.items():
  204. array = list(array)
  205. array[index] = body[field][subfield]
  206. field_data[subfield] = array
  207. data[field] = field_data
  208. with open(data_path, 'w') as f:
  209. fileio.write_datum(f, data)
  210. return {'status': 'ok'}
  211. main()