# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable
import functools
import inspect
import logging
import time
from types import TracebackType
from typing import Any, Optional, Type
import warnings
from . import compat
from . import connection
from . import exceptions
from . import protocol
logger = logging.getLogger(__name__)
class PoolConnectionProxyMeta(type):
def __new__(
mcls,
name: str,
bases: tuple[Type[Any], ...],
dct: dict[str, Any],
*,
wrap: bool = False,
) -> PoolConnectionProxyMeta:
if wrap:
for attrname in dir(connection.Connection):
if attrname.startswith('_') or attrname in dct:
continue
meth = getattr(connection.Connection, attrname)
if not inspect.isfunction(meth):
continue
iscoroutine = inspect.iscoroutinefunction(meth)
wrapper = mcls._wrap_connection_method(attrname, iscoroutine)
wrapper = functools.update_wrapper(wrapper, meth)
dct[attrname] = wrapper
if '__doc__' not in dct:
dct['__doc__'] = connection.Connection.__doc__
return super().__new__(mcls, name, bases, dct)
@staticmethod
def _wrap_connection_method(
meth_name: str, iscoroutine: bool
) -> Callable[..., Any]:
def call_con_method(self: Any, *args: Any, **kwargs: Any) -> Any:
# This method will be owned by PoolConnectionProxy class.
if self._con is None:
raise exceptions.InterfaceError(
'cannot call Connection.{}(): '
'connection has been released back to the pool'.format(
meth_name))
meth = getattr(self._con.__class__, meth_name)
return meth(self._con, *args, **kwargs)
if iscoroutine:
compat.markcoroutinefunction(call_con_method)
return call_con_method
class PoolConnectionProxy(connection._ConnectionProxy,
metaclass=PoolConnectionProxyMeta,
wrap=True):
__slots__ = ('_con', '_holder')
def __init__(
self, holder: PoolConnectionHolder, con: connection.Connection
) -> None:
self._con = con
self._holder = holder
con._set_proxy(self)
def __getattr__(self, attr: str) -> Any:
# Proxy all unresolved attributes to the wrapped Connection object.
return getattr(self._con, attr)
def _detach(self) -> Optional[connection.Connection]:
if self._con is None:
return
con, self._con = self._con, None
con._set_proxy(None)
return con
def __repr__(self) -> str:
if self._con is None:
return '<{classname} [released] {id:#x}>'.format(
classname=self.__class__.__name__, id=id(self))
else:
return '<{classname} {con!r} {id:#x}>'.format(
classname=self.__class__.__name__, con=self._con, id=id(self))
class PoolConnectionHolder:
__slots__ = ('_con', '_pool', '_loop', '_proxy',
'_max_queries', '_setup',
'_max_inactive_time', '_in_use',
'_inactive_callback', '_timeout',
'_generation')
def __init__(
self,
pool: "Pool",
*,
max_queries: float,
setup: Optional[Callable[[PoolConnectionProxy], Awaitable[None]]],
max_inactive_time: float,
) -> None:
self._pool = pool
self._con: Optional[connection.Connection] = None
self._proxy: Optional[PoolConnectionProxy] = None
self._max_queries = max_queries
self._max_inactive_time = max_inactive_time
self._setup = setup
self._inactive_callback: Optional[Callable] = None
self._in_use: Optional[asyncio.Future] = None
self._timeout: Optional[float] = None
self._generation: Optional[int] = None
def is_connected(self) -> bool:
return self._con is not None and not self._con.is_closed()
def is_idle(self) -> bool:
return not self._in_use
async def connect(self) -> None:
if self._con is not None:
raise exceptions.InternalClientError(
'PoolConnectionHolder.connect() called while another '
'connection already exists')
self._con = await self._pool._get_new_connection()
self._generation = self._pool._generation
self._maybe_cancel_inactive_callback()
self._setup_inactive_callback()
async def acquire(self) -> PoolConnectionProxy:
if self._con is None or self._con.is_closed():
self._con = None
await self.connect()
elif self._generation != self._pool._generation:
# Connections have been expired, re-connect the holder.
self._pool._loop.create_task(
self._con.close(timeout=self._timeout))
self._con = None
await self.connect()
self._maybe_cancel_inactive_callback()
self._proxy = proxy = PoolConnectionProxy(self, self._con)
if self._setup is not None:
try:
await self._setup(proxy)
except (Exception, asyncio.CancelledError) as ex:
# If a user-defined `setup` function fails, we don't
# know if the connection is safe for re-use, hence
# we close it. A new connection will be created
# when `acquire` is called again.
try:
# Use `close()` to close the connection gracefully.
# An exception in `setup` isn't necessarily caused
# by an IO or a protocol error. close() will
# do the necessary cleanup via _release_on_close().
await self._con.close()
finally:
raise ex
self._in_use = self._pool._loop.create_future()
return proxy
async def release(self, timeout: Optional[float]) -> None:
if self._in_use is None:
raise exceptions.InternalClientError(
'PoolConnectionHolder.release() called on '
'a free connection holder')
if self._con.is_closed():
# When closing, pool connections perform the necessary
# cleanup, so we don't have to do anything else here.
return
self._timeout = None
if self._con._protocol.queries_count >= self._max_queries:
# The connection has reached its maximum utilization limit,
# so close it. Connection.close() will call _release().
await self._con.close(timeout=timeout)
return
if self._generation != self._pool._generation:
# The connection has expired because it belongs to
# an older generation (Pool.expire_connections() has
# been called.)
await self._con.close(timeout=timeout)
return
try:
budget = timeout
if self._con._protocol._is_cancelling():
# If the connection is in cancellation state,
# wait for the cancellation
started = time.monotonic()
await compat.wait_for(
self._con._protocol._wait_for_cancellation(),
budget)
if budget is not None:
budget -= time.monotonic() - started
if self._pool._reset is not None:
async with compat.timeout(budget):
await self._con._reset()
await self._pool._reset(self._con)
else:
await self._con.reset(timeout=budget)
except (Exception, asyncio.CancelledError) as ex:
# If the `reset` call failed, terminate the connection.
# A new one will be created when `acquire` is called
# again.
try:
# An exception in `reset` is most likely caused by
# an IO error, so terminate the connection.
self._con.terminate()
finally:
raise ex
# Free this connection holder and invalidate the
# connection proxy.
self._release()
# Rearm the connection inactivity timer.
self._setup_inactive_callback()
async def wait_until_released(self) -> None:
if self._in_use is None:
return
else:
await self._in_use
async def close(self) -> None:
if self._con is not None:
# Connection.close() will call _release_on_close() to
# finish holder cleanup.
await self._con.close()
def terminate(self) -> None:
if self._con is not None:
# Connection.terminate() will call _release_on_close() to
# finish holder cleanup.
self._con.terminate()
def _setup_inactive_callback(self) -> None:
if self._inactive_callback is not None:
raise exceptions.InternalClientError(
'pool connection inactivity timer already exists')
if self._max_inactive_time:
self._inactive_callback = self._pool._loop.call_later(
self._max_inactive_time, self._deactivate_inactive_connection)
def _maybe_cancel_inactive_callback(self) -> None:
if self._inactive_callback is not None:
self._inactive_callback.cancel()
self._inactive_callback = None
def _deactivate_inactive_connection(self) -> None:
if self._in_use is not None:
raise exceptions.InternalClientError(
'attempting to deactivate an acquired connection')
if self._con is not None:
# The connection is idle and not in use, so it's fine to
# use terminate() instead of close().
self._con.terminate()
# Must call clear_connection, because _deactivate_connection
# is called when the connection is *not* checked out, and
# so terminate() above will not call the below.
self._release_on_close()
def _release_on_close(self) -> None:
self._maybe_cancel_inactive_callback()
self._release()
self._con = None
def _release(self) -> None:
"""Release this connection holder."""
if self._in_use is None:
# The holder is not checked out.
return
if not self._in_use.done():
self._in_use.set_result(None)
self._in_use = None
# Deinitialize the connection proxy. All subsequent
# operations on it will fail.
if self._proxy is not None:
self._proxy._detach()
self._proxy = None
# Put ourselves back to the pool queue.
self._pool._queue.put_nowait(self)
[docs]
class Pool:
"""A connection pool.
Connection pool can be used to manage a set of connections to the database.
Connections are first acquired from the pool, then used, and then released
back to the pool. Once a connection is released, it's reset to close all
open cursors and other resources *except* prepared statements.
Pools are created by calling :func:`~asyncpg.pool.create_pool`.
"""
__slots__ = (
'_queue', '_loop', '_minsize', '_maxsize',
'_init', '_connect', '_reset', '_connect_args', '_connect_kwargs',
'_holders', '_initialized', '_initializing', '_closing',
'_closed', '_connection_class', '_record_class', '_generation',
'_setup', '_max_queries', '_max_inactive_connection_lifetime'
)
def __init__(self, *connect_args,
min_size,
max_size,
max_queries,
max_inactive_connection_lifetime,
connect=None,
setup=None,
init=None,
reset=None,
loop,
connection_class,
record_class,
**connect_kwargs):
if len(connect_args) > 1:
warnings.warn(
"Passing multiple positional arguments to asyncpg.Pool "
"constructor is deprecated and will be removed in "
"asyncpg 0.17.0. The non-deprecated form is "
"asyncpg.Pool(<dsn>, **kwargs)",
DeprecationWarning, stacklevel=2)
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
if max_size <= 0:
raise ValueError('max_size is expected to be greater than zero')
if min_size < 0:
raise ValueError(
'min_size is expected to be greater or equal to zero')
if min_size > max_size:
raise ValueError('min_size is greater than max_size')
if max_queries <= 0:
raise ValueError('max_queries is expected to be greater than zero')
if max_inactive_connection_lifetime < 0:
raise ValueError(
'max_inactive_connection_lifetime is expected to be greater '
'or equal to zero')
if not issubclass(connection_class, connection.Connection):
raise TypeError(
'connection_class is expected to be a subclass of '
'asyncpg.Connection, got {!r}'.format(connection_class))
if not issubclass(record_class, protocol.Record):
raise TypeError(
'record_class is expected to be a subclass of '
'asyncpg.Record, got {!r}'.format(record_class))
self._minsize = min_size
self._maxsize = max_size
self._holders = []
self._initialized = False
self._initializing = False
self._queue = None
self._connection_class = connection_class
self._record_class = record_class
self._closing = False
self._closed = False
self._generation = 0
self._connect = connect if connect is not None else connection.connect
self._connect_args = connect_args
self._connect_kwargs = connect_kwargs
self._setup = setup
self._init = init
self._reset = reset
self._max_queries = max_queries
self._max_inactive_connection_lifetime = \
max_inactive_connection_lifetime
async def _async__init__(self):
if self._initialized:
return self
if self._initializing:
raise exceptions.InterfaceError(
'pool is being initialized in another task')
if self._closed:
raise exceptions.InterfaceError('pool is closed')
self._initializing = True
try:
await self._initialize()
return self
finally:
self._initializing = False
self._initialized = True
async def _initialize(self):
self._queue = asyncio.LifoQueue(maxsize=self._maxsize)
for _ in range(self._maxsize):
ch = PoolConnectionHolder(
self,
max_queries=self._max_queries,
max_inactive_time=self._max_inactive_connection_lifetime,
setup=self._setup)
self._holders.append(ch)
self._queue.put_nowait(ch)
if self._minsize:
# Since we use a LIFO queue, the first items in the queue will be
# the last ones in `self._holders`. We want to pre-connect the
# first few connections in the queue, therefore we want to walk
# `self._holders` in reverse.
# Connect the first connection holder in the queue so that
# any connection issues are visible early.
first_ch = self._holders[-1] # type: PoolConnectionHolder
await first_ch.connect()
if self._minsize > 1:
connect_tasks = []
for i, ch in enumerate(reversed(self._holders[:-1])):
# `minsize - 1` because we already have first_ch
if i >= self._minsize - 1:
break
connect_tasks.append(ch.connect())
await asyncio.gather(*connect_tasks)
[docs]
def is_closing(self):
"""Return ``True`` if the pool is closing or is closed.
.. versionadded:: 0.28.0
"""
return self._closed or self._closing
[docs]
def get_size(self):
"""Return the current number of connections in this pool.
.. versionadded:: 0.25.0
"""
return sum(h.is_connected() for h in self._holders)
[docs]
def get_min_size(self):
"""Return the minimum number of connections in this pool.
.. versionadded:: 0.25.0
"""
return self._minsize
[docs]
def get_max_size(self):
"""Return the maximum allowed number of connections in this pool.
.. versionadded:: 0.25.0
"""
return self._maxsize
[docs]
def get_idle_size(self):
"""Return the current number of idle connections in this pool.
.. versionadded:: 0.25.0
"""
return sum(h.is_connected() and h.is_idle() for h in self._holders)
[docs]
def set_connect_args(self, dsn=None, **connect_kwargs):
r"""Set the new connection arguments for this pool.
The new connection arguments will be used for all subsequent
new connection attempts. Existing connections will remain until
they expire. Use :meth:`Pool.expire_connections()
<asyncpg.pool.Pool.expire_connections>` to expedite the connection
expiry.
:param str dsn:
Connection arguments specified using as a single string in
the following format:
``postgres://user:pass@host:port/database?option=value``.
:param \*\*connect_kwargs:
Keyword arguments for the :func:`~asyncpg.connection.connect`
function.
.. versionadded:: 0.16.0
"""
self._connect_args = [dsn]
self._connect_kwargs = connect_kwargs
async def _get_new_connection(self):
con = await self._connect(
*self._connect_args,
loop=self._loop,
connection_class=self._connection_class,
record_class=self._record_class,
**self._connect_kwargs,
)
if not isinstance(con, self._connection_class):
good = self._connection_class
good_n = f'{good.__module__}.{good.__name__}'
bad = type(con)
if bad.__module__ == "builtins":
bad_n = bad.__name__
else:
bad_n = f'{bad.__module__}.{bad.__name__}'
raise exceptions.InterfaceError(
"expected pool connect callback to return an instance of "
f"'{good_n}', got " f"'{bad_n}'"
)
if self._init is not None:
try:
await self._init(con)
except (Exception, asyncio.CancelledError) as ex:
# If a user-defined `init` function fails, we don't
# know if the connection is safe for re-use, hence
# we close it. A new connection will be created
# when `acquire` is called again.
try:
# Use `close()` to close the connection gracefully.
# An exception in `init` isn't necessarily caused
# by an IO or a protocol error. close() will
# do the necessary cleanup via _release_on_close().
await con.close()
finally:
raise ex
return con
[docs]
async def execute(
self,
query: str,
*args,
timeout: Optional[float]=None,
) -> str:
"""Execute an SQL command (or commands).
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.execute() <asyncpg.connection.Connection.execute>`.
.. versionadded:: 0.10.0
"""
async with self.acquire() as con:
return await con.execute(query, *args, timeout=timeout)
[docs]
async def executemany(
self,
command: str,
args,
*,
timeout: Optional[float]=None,
):
"""Execute an SQL *command* for each sequence of arguments in *args*.
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.executemany()
<asyncpg.connection.Connection.executemany>`.
.. versionadded:: 0.10.0
"""
async with self.acquire() as con:
return await con.executemany(command, args, timeout=timeout)
[docs]
async def fetch(
self,
query,
*args,
timeout=None,
record_class=None
) -> list:
"""Run a query and return the results as a list of :class:`Record`.
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.fetch() <asyncpg.connection.Connection.fetch>`.
.. versionadded:: 0.10.0
"""
async with self.acquire() as con:
return await con.fetch(
query,
*args,
timeout=timeout,
record_class=record_class
)
[docs]
async def fetchval(self, query, *args, column=0, timeout=None):
"""Run a query and return a value in the first row.
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.fetchval()
<asyncpg.connection.Connection.fetchval>`.
.. versionadded:: 0.10.0
"""
async with self.acquire() as con:
return await con.fetchval(
query, *args, column=column, timeout=timeout)
[docs]
async def fetchrow(self, query, *args, timeout=None, record_class=None):
"""Run a query and return the first row.
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.fetchrow() <asyncpg.connection.Connection.fetchrow>`.
.. versionadded:: 0.10.0
"""
async with self.acquire() as con:
return await con.fetchrow(
query,
*args,
timeout=timeout,
record_class=record_class
)
[docs]
async def fetchmany(self, query, args, *, timeout=None, record_class=None):
"""Run a query for each sequence of arguments in *args*
and return the results as a list of :class:`Record`.
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.fetchmany()
<asyncpg.connection.Connection.fetchmany>`.
.. versionadded:: 0.30.0
"""
async with self.acquire() as con:
return await con.fetchmany(
query, args, timeout=timeout, record_class=record_class
)
[docs]
async def copy_from_table(
self,
table_name,
*,
output,
columns=None,
schema_name=None,
timeout=None,
format=None,
oids=None,
delimiter=None,
null=None,
header=None,
quote=None,
escape=None,
force_quote=None,
encoding=None
):
"""Copy table contents to a file or file-like object.
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.copy_from_table()
<asyncpg.connection.Connection.copy_from_table>`.
.. versionadded:: 0.24.0
"""
async with self.acquire() as con:
return await con.copy_from_table(
table_name,
output=output,
columns=columns,
schema_name=schema_name,
timeout=timeout,
format=format,
oids=oids,
delimiter=delimiter,
null=null,
header=header,
quote=quote,
escape=escape,
force_quote=force_quote,
encoding=encoding
)
[docs]
async def copy_from_query(
self,
query,
*args,
output,
timeout=None,
format=None,
oids=None,
delimiter=None,
null=None,
header=None,
quote=None,
escape=None,
force_quote=None,
encoding=None
):
"""Copy the results of a query to a file or file-like object.
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.copy_from_query()
<asyncpg.connection.Connection.copy_from_query>`.
.. versionadded:: 0.24.0
"""
async with self.acquire() as con:
return await con.copy_from_query(
query,
*args,
output=output,
timeout=timeout,
format=format,
oids=oids,
delimiter=delimiter,
null=null,
header=header,
quote=quote,
escape=escape,
force_quote=force_quote,
encoding=encoding
)
[docs]
async def copy_to_table(
self,
table_name,
*,
source,
columns=None,
schema_name=None,
timeout=None,
format=None,
oids=None,
freeze=None,
delimiter=None,
null=None,
header=None,
quote=None,
escape=None,
force_quote=None,
force_not_null=None,
force_null=None,
encoding=None,
where=None
):
"""Copy data to the specified table.
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.copy_to_table()
<asyncpg.connection.Connection.copy_to_table>`.
.. versionadded:: 0.24.0
"""
async with self.acquire() as con:
return await con.copy_to_table(
table_name,
source=source,
columns=columns,
schema_name=schema_name,
timeout=timeout,
format=format,
oids=oids,
freeze=freeze,
delimiter=delimiter,
null=null,
header=header,
quote=quote,
escape=escape,
force_quote=force_quote,
force_not_null=force_not_null,
force_null=force_null,
encoding=encoding,
where=where
)
[docs]
async def copy_records_to_table(
self,
table_name,
*,
records,
columns=None,
schema_name=None,
timeout=None,
where=None
):
"""Copy a list of records to the specified table using binary COPY.
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.copy_records_to_table()
<asyncpg.connection.Connection.copy_records_to_table>`.
.. versionadded:: 0.24.0
"""
async with self.acquire() as con:
return await con.copy_records_to_table(
table_name,
records=records,
columns=columns,
schema_name=schema_name,
timeout=timeout,
where=where
)
[docs]
def acquire(self, *, timeout=None):
"""Acquire a database connection from the pool.
:param float timeout: A timeout for acquiring a Connection.
:return: An instance of :class:`~asyncpg.connection.Connection`.
Can be used in an ``await`` expression or with an ``async with`` block.
.. code-block:: python
async with pool.acquire() as con:
await con.execute(...)
Or:
.. code-block:: python
con = await pool.acquire()
try:
await con.execute(...)
finally:
await pool.release(con)
"""
return PoolAcquireContext(self, timeout)
async def _acquire(self, timeout):
async def _acquire_impl():
ch = await self._queue.get() # type: PoolConnectionHolder
try:
proxy = await ch.acquire() # type: PoolConnectionProxy
except (Exception, asyncio.CancelledError):
self._queue.put_nowait(ch)
raise
else:
# Record the timeout, as we will apply it by default
# in release().
ch._timeout = timeout
return proxy
if self._closing:
raise exceptions.InterfaceError('pool is closing')
self._check_init()
if timeout is None:
return await _acquire_impl()
else:
return await compat.wait_for(
_acquire_impl(), timeout=timeout)
[docs]
async def release(self, connection, *, timeout=None):
"""Release a database connection back to the pool.
:param Connection connection:
A :class:`~asyncpg.connection.Connection` object to release.
:param float timeout:
A timeout for releasing the connection. If not specified, defaults
to the timeout provided in the corresponding call to the
:meth:`Pool.acquire() <asyncpg.pool.Pool.acquire>` method.
.. versionchanged:: 0.14.0
Added the *timeout* parameter.
"""
if (type(connection) is not PoolConnectionProxy or
connection._holder._pool is not self):
raise exceptions.InterfaceError(
'Pool.release() received invalid connection: '
'{connection!r} is not a member of this pool'.format(
connection=connection))
if connection._con is None:
# Already released, do nothing.
return
self._check_init()
# Let the connection do its internal housekeeping when its released.
connection._con._on_release()
ch = connection._holder
if timeout is None:
timeout = ch._timeout
# Use asyncio.shield() to guarantee that task cancellation
# does not prevent the connection from being returned to the
# pool properly.
return await asyncio.shield(ch.release(timeout))
[docs]
async def close(self):
"""Attempt to gracefully close all connections in the pool.
Wait until all pool connections are released, close them and
shut down the pool. If any error (including cancellation) occurs
in ``close()`` the pool will terminate by calling
:meth:`Pool.terminate() <pool.Pool.terminate>`.
It is advisable to use :func:`python:asyncio.wait_for` to set
a timeout.
.. versionchanged:: 0.16.0
``close()`` now waits until all pool connections are released
before closing them and the pool. Errors raised in ``close()``
will cause immediate pool termination.
"""
if self._closed:
return
self._check_init()
self._closing = True
warning_callback = None
try:
warning_callback = self._loop.call_later(
60, self._warn_on_long_close)
release_coros = [
ch.wait_until_released() for ch in self._holders]
await asyncio.gather(*release_coros)
close_coros = [
ch.close() for ch in self._holders]
await asyncio.gather(*close_coros)
except (Exception, asyncio.CancelledError):
self.terminate()
raise
finally:
if warning_callback is not None:
warning_callback.cancel()
self._closed = True
self._closing = False
def _warn_on_long_close(self):
logger.warning('Pool.close() is taking over 60 seconds to complete. '
'Check if you have any unreleased connections left. '
'Use asyncio.wait_for() to set a timeout for '
'Pool.close().')
[docs]
def terminate(self):
"""Terminate all connections in the pool."""
if self._closed:
return
self._check_init()
for ch in self._holders:
ch.terminate()
self._closed = True
[docs]
async def expire_connections(self):
"""Expire all currently open connections.
Cause all currently open connections to get replaced on the
next :meth:`~asyncpg.pool.Pool.acquire()` call.
.. versionadded:: 0.16.0
"""
self._generation += 1
def _check_init(self):
if not self._initialized:
if self._initializing:
raise exceptions.InterfaceError(
'pool is being initialized, but not yet ready: '
'likely there is a race between creating a pool and '
'using it')
raise exceptions.InterfaceError('pool is not initialized')
if self._closed:
raise exceptions.InterfaceError('pool is closed')
def _drop_statement_cache(self):
# Drop statement cache for all connections in the pool.
for ch in self._holders:
if ch._con is not None:
ch._con._drop_local_statement_cache()
def _drop_type_cache(self):
# Drop type codec cache for all connections in the pool.
for ch in self._holders:
if ch._con is not None:
ch._con._drop_local_type_cache()
def __await__(self):
return self._async__init__().__await__()
async def __aenter__(self):
await self._async__init__()
return self
async def __aexit__(self, *exc):
await self.close()
class PoolAcquireContext:
__slots__ = ('timeout', 'connection', 'done', 'pool')
def __init__(self, pool: Pool, timeout: Optional[float]) -> None:
self.pool = pool
self.timeout = timeout
self.connection = None
self.done = False
async def __aenter__(self):
if self.connection is not None or self.done:
raise exceptions.InterfaceError('a connection is already acquired')
self.connection = await self.pool._acquire(self.timeout)
return self.connection
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
exc_tb: Optional[TracebackType] = None,
) -> None:
self.done = True
con = self.connection
self.connection = None
await self.pool.release(con)
def __await__(self):
self.done = True
return self.pool._acquire(self.timeout).__await__()
[docs]
def create_pool(dsn=None, *,
min_size=10,
max_size=10,
max_queries=50000,
max_inactive_connection_lifetime=300.0,
connect=None,
setup=None,
init=None,
reset=None,
loop=None,
connection_class=connection.Connection,
record_class=protocol.Record,
**connect_kwargs):
r"""Create a connection pool.
Can be used either with an ``async with`` block:
.. code-block:: python
async with asyncpg.create_pool(user='postgres',
command_timeout=60) as pool:
await pool.fetch('SELECT 1')
Or to perform multiple operations on a single connection:
.. code-block:: python
async with asyncpg.create_pool(user='postgres',
command_timeout=60) as pool:
async with pool.acquire() as con:
await con.execute('''
CREATE TABLE names (
id serial PRIMARY KEY,
name VARCHAR (255) NOT NULL)
''')
await con.fetch('SELECT 1')
Or directly with ``await`` (not recommended):
.. code-block:: python
pool = await asyncpg.create_pool(user='postgres', command_timeout=60)
con = await pool.acquire()
try:
await con.fetch('SELECT 1')
finally:
await pool.release(con)
.. warning::
Prepared statements and cursors returned by
:meth:`Connection.prepare() <asyncpg.connection.Connection.prepare>`
and :meth:`Connection.cursor() <asyncpg.connection.Connection.cursor>`
become invalid once the connection is released. Likewise, all
notification and log listeners are removed, and ``asyncpg`` will
issue a warning if there are any listener callbacks registered on a
connection that is being released to the pool.
:param str dsn:
Connection arguments specified using as a single string in
the following format:
``postgres://user:pass@host:port/database?option=value``.
:param \*\*connect_kwargs:
Keyword arguments for the :func:`~asyncpg.connection.connect`
function.
:param Connection connection_class:
The class to use for connections. Must be a subclass of
:class:`~asyncpg.connection.Connection`.
:param type record_class:
If specified, the class to use for records returned by queries on
the connections in this pool. Must be a subclass of
:class:`~asyncpg.Record`.
:param int min_size:
Number of connection the pool will be initialized with.
:param int max_size:
Max number of connections in the pool.
:param int max_queries:
Number of queries after a connection is closed and replaced
with a new connection.
:param float max_inactive_connection_lifetime:
Number of seconds after which inactive connections in the
pool will be closed. Pass ``0`` to disable this mechanism.
:param coroutine connect:
A coroutine that is called instead of
:func:`~asyncpg.connection.connect` whenever the pool needs to make a
new connection. Must return an instance of type specified by
*connection_class* or :class:`~asyncpg.connection.Connection` if
*connection_class* was not specified.
:param coroutine setup:
A coroutine to prepare a connection right before it is returned
from :meth:`Pool.acquire()`. An example use
case would be to automatically set up notifications listeners for
all connections of a pool.
:param coroutine init:
A coroutine to initialize a connection when it is created.
An example use case would be to setup type codecs with
:meth:`Connection.set_builtin_type_codec() <\
asyncpg.connection.Connection.set_builtin_type_codec>`
or :meth:`Connection.set_type_codec() <\
asyncpg.connection.Connection.set_type_codec>`.
:param coroutine reset:
A coroutine to reset a connection before it is returned to the pool by
:meth:`Pool.release()`. The function is supposed
to reset any changes made to the database session so that the next
acquirer gets the connection in a well-defined state.
The default implementation calls :meth:`Connection.reset() <\
asyncpg.connection.Connection.reset>`, which runs the following::
SELECT pg_advisory_unlock_all();
CLOSE ALL;
UNLISTEN *;
RESET ALL;
The exact reset query is determined by detected server capabilities,
and a custom *reset* implementation can obtain the default query
by calling :meth:`Connection.get_reset_query() <\
asyncpg.connection.Connection.get_reset_query>`.
:param loop:
An asyncio event loop instance. If ``None``, the default
event loop will be used.
:return: An instance of :class:`~asyncpg.pool.Pool`.
.. versionchanged:: 0.10.0
An :exc:`~asyncpg.exceptions.InterfaceError` will be raised on any
attempted operation on a released connection.
.. versionchanged:: 0.13.0
An :exc:`~asyncpg.exceptions.InterfaceError` will be raised on any
attempted operation on a prepared statement or a cursor created
on a connection that has been released to the pool.
.. versionchanged:: 0.13.0
An :exc:`~asyncpg.exceptions.InterfaceWarning` will be produced
if there are any active listeners (added via
:meth:`Connection.add_listener()
<asyncpg.connection.Connection.add_listener>`
or :meth:`Connection.add_log_listener()
<asyncpg.connection.Connection.add_log_listener>`) present on the
connection at the moment of its release to the pool.
.. versionchanged:: 0.22.0
Added the *record_class* parameter.
.. versionchanged:: 0.30.0
Added the *connect* and *reset* parameters.
"""
return Pool(
dsn,
connection_class=connection_class,
record_class=record_class,
min_size=min_size,
max_size=max_size,
max_queries=max_queries,
loop=loop,
connect=connect,
setup=setup,
init=init,
reset=reset,
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
**connect_kwargs,
)