db.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import contextlib
  2. import gevent.queue
  3. from gevent.socket import wait_read, wait_write
  4. import psycopg2
  5. import psycopg2.extensions
  6. import psycopg2.extras
  7. import config
  8. def gevent_wait_callback(conn, timeout=None):
  9. # https://github.com/zacharyvoase/gevent-psycopg2/blob/master/lib/gevent_psycopg2.py
  10. while True:
  11. state = conn.poll()
  12. if state == psycopg2.extensions.POLL_OK:
  13. break
  14. elif state == psycopg2.extensions.POLL_READ:
  15. wait_read(conn.fileno(), timeout=timeout)
  16. elif state == psycopg2.extensions.POLL_WRITE:
  17. wait_write(conn.fileno(), timeout=timeout)
  18. else:
  19. raise psycopg2.OperationalError('unhandled state: %r' % state)
  20. psycopg2.extensions.set_wait_callback(gevent_wait_callback)
  21. pool = gevent.queue.Queue(maxsize=4)
  22. for _ in xrange(4):
  23. pool.put(psycopg2.connect('dbname=%s user=%s' % (config.database, config.db_user)))
  24. @contextlib.contextmanager
  25. def cursor(): # https://code.google.com/p/gevent/source/browse/examples/psycopg2_pool.py?name=1.0b4#88
  26. conn = pool.get(timeout=1)
  27. try:
  28. yield conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
  29. except:
  30. if not conn.closed:
  31. try:
  32. conn.rollback()
  33. except:
  34. gevent.get_hub().handle_error(conn, *sys.exc_info())
  35. raise
  36. else:
  37. conn.commit()
  38. finally:
  39. if conn.closed:
  40. raise Exception('cursor context manager got back closed connection')
  41. pool.put_nowait(conn)
  42. def query_one(cur, sql, *args):
  43. cur.execute(sql, args)
  44. rval = cur.fetchone()
  45. if cur.fetchone() is not None:
  46. raise Exception('got more than one value for query', sql, args)
  47. return rval
  48. def create_server(group_id, hostname):
  49. with cursor() as cur:
  50. server_id = query_one(cur, 'INSERT INTO servers (group_id, hostname) VALUES(%s, %s) RETURNING id',
  51. group_id, hostname)[0]
  52. return server_id