db.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import contextlib
  2. import gevent.queue
  3. from gevent.socket import wait_read, wait_write
  4. import psycopg2
  5. import psycopg2.extensions
  6. import psycopg2.extras
  7. import config
  8. def gevent_wait_callback(conn, timeout=None):
  9. # https://github.com/zacharyvoase/gevent-psycopg2/blob/master/lib/gevent_psycopg2.py
  10. while True:
  11. state = conn.poll()
  12. if state == psycopg2.extensions.POLL_OK:
  13. break
  14. elif state == psycopg2.extensions.POLL_READ:
  15. wait_read(conn.fileno(), timeout=timeout)
  16. elif state == psycopg2.extensions.POLL_WRITE:
  17. wait_write(conn.fileno(), timeout=timeout)
  18. else:
  19. raise psycopg2.OperationalError('unhandled state: %r' % state)
  20. psycopg2.extensions.set_wait_callback(gevent_wait_callback)
  21. pool = gevent.queue.Queue(maxsize=4)
  22. for _ in xrange(4):
  23. pool.put(psycopg2.connect('dbname=%s user=%s' % (config.database, config.db_user)))
  24. @contextlib.contextmanager
  25. def cursor(): # https://code.google.com/p/gevent/source/browse/examples/psycopg2_pool.py?name=1.0b4#88
  26. conn = pool.get(timeout=1)
  27. try:
  28. yield conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
  29. except:
  30. if not conn.closed:
  31. try:
  32. conn.rollback()
  33. except:
  34. gevent.get_hub().handle_error(conn, *sys.exc_info())
  35. raise
  36. else:
  37. conn.commit()
  38. finally:
  39. if conn.closed:
  40. raise Exception('cursor context manager got back closed connection')
  41. pool.put_nowait(conn)
  42. def query(cur, sql, *args):
  43. cur.execute(sql, args)
  44. return cur.fetchall()
  45. def query_iter(cur, sql, *args):
  46. cur.execute(sql, args)
  47. while True:
  48. r = cur.fetchone()
  49. if r is None:
  50. break
  51. yield r
  52. def query_one(cur, sql, *args):
  53. cur.execute(sql, args)
  54. rval = cur.fetchone()
  55. if cur.fetchone() is not None:
  56. raise Exception('got more than one value for query', sql, args)
  57. return rval
  58. def get_api_key(group_id):
  59. with cursor() as cur:
  60. api_key = query_one(cur, 'SELECT api_key FROM groups WHERE id = %s', group_id)[0]
  61. return api_key
  62. def create_server(group_id, hostname):
  63. with cursor() as cur:
  64. server_id = query_one(cur, 'INSERT INTO servers (group_id, hostname) VALUES(%s, %s) RETURNING id',
  65. group_id, hostname)[0]
  66. return server_id
  67. def get_servers(group_id):
  68. servers = []
  69. with cursor() as cur:
  70. for server in query(cur, 'SELECT id, hostname FROM servers WHERE group_id = %s', group_id):
  71. servers.append(server.copy())
  72. return servers