| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- import contextlib
- import gevent.queue
- from gevent.socket import wait_read, wait_write
- import psycopg2
- import psycopg2.extensions
- import psycopg2.extras
- def gevent_wait_callback(conn, timeout=None):
- # https://github.com/zacharyvoase/gevent-psycopg2/blob/master/lib/gevent_psycopg2.py
- while True:
- state = conn.poll()
- if state == psycopg2.extensions.POLL_OK:
- break
- elif state == psycopg2.extensions.POLL_READ:
- wait_read(conn.fileno(), timeout=timeout)
- elif state == psycopg2.extensions.POLL_WRITE:
- wait_write(conn.fileno(), timeout=timeout)
- else:
- raise psycopg2.OperationalError('unhandled state: %r' % state)
- psycopg2.extensions.set_wait_callback(gevent_wait_callback)
- pool = gevent.queue.Queue(maxsize=4)
- for _ in xrange(4):
- pool.put(psycopg2.connect('dbname=%s user=%s' % ('sysvitals', 'sysvitals')))
- @contextlib.contextmanager
- def cursor(): # https://code.google.com/p/gevent/source/browse/examples/psycopg2_pool.py?name=1.0b4#88
- conn = pool.get(timeout=1)
- try:
- yield conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
- except:
- if not conn.closed:
- try:
- conn.rollback()
- except:
- gevent.get_hub().handle_error(conn, *sys.exc_info())
- raise
- else:
- conn.commit()
- finally:
- if conn.closed:
- raise Exception('cursor context manager got back closed connection')
- pool.put_nowait(conn)
- def query_one(cur, sql, *args):
- cur.execute(sql, args)
- rval = cur.fetchone()
- if cur.fetchone() is not None:
- raise Exception('got more than one value for query', sql, args)
- return rval
- def create_server(group_id, hostname):
- with cursor() as cur:
- server_id = query_one(cur, 'INSERT INTO servers (group_id, hostname) VALUES(%s, %s) RETURNING id',
- group_id, hostname)[0]
- return server_id
|