API Reference¶
Connection¶
-
connect
(dsn=None, *, host=None, port=None, user=None, password=None, passfile=None, database=None, loop=None, timeout=60, statement_cache_size=100, max_cached_statement_lifetime=300, max_cacheable_statement_size=15360, command_timeout=None, ssl=None, connection_class=<class 'asyncpg.connection.Connection'>, record_class=<class 'asyncpg.Record'>, server_settings=None)[source]¶ A coroutine to establish a connection to a PostgreSQL server.
The connection parameters may be specified either as a connection URI in dsn, or as specific keyword arguments, or both. If both dsn and keyword arguments are specified, the latter override the corresponding values parsed from the connection URI. The default values for the majority of arguments can be specified using environment variables.
Returns a new
Connection
object.Parameters: - dsn –
Connection arguments specified using as a single string in the libpq connection URI format:
postgres://user:password@host:port/database?option=value
. The following options are recognized by asyncpg: host, port, user, database (or dbname), password, passfile, sslmode. Unlike libpq, asyncpg will treat unrecognized options as server settings to be used for the connection.Note
The URI must be valid, which means that all components must be properly quoted with
urllib.parse.quote()
. - host –
Database host address as one of the following:
- an IP address or a domain name;
- an absolute path to the directory containing the database server Unix-domain socket (not supported on Windows);
- a sequence of any of the above, in which case the addresses will be tried in order, and the first successful connection will be returned.
If not specified, asyncpg will try the following, in order:
- host address(es) parsed from the dsn argument,
- the value of the
PGHOST
environment variable, - on Unix, common directories used for PostgreSQL Unix-domain
sockets:
"/run/postgresql"
,"/var/run/postgresl"
,"/var/pgsql_socket"
,"/private/tmp"
, and"/tmp"
, "localhost"
.
- port –
Port number to connect to at the server host (or Unix-domain socket file extension). If multiple host addresses were specified, this parameter may specify a sequence of port numbers of the same length as the host sequence, or it may specify a single port number to be used for all host addresses.
If not specified, the value parsed from the dsn argument is used, or the value of the
PGPORT
environment variable, or5432
if neither is specified. - user –
The name of the database role used for authentication.
If not specified, the value parsed from the dsn argument is used, or the value of the
PGUSER
environment variable, or the operating system name of the user running the application. - database –
The name of the database to connect to.
If not specified, the value parsed from the dsn argument is used, or the value of the
PGDATABASE
environment variable, or the operating system name of the user running the application. - password –
Password to be used for authentication, if the server requires one. If not specified, the value parsed from the dsn argument is used, or the value of the
PGPASSWORD
environment variable. Note that the use of the environment variable is discouraged as other users and applications may be able to read it without needing specific privileges. It is recommended to use passfile instead.Password may be either a string, or a callable that returns a string. If a callable is provided, it will be called each time a new connection is established.
- passfile – The name of the file used to store passwords
(defaults to
~/.pgpass
, or%APPDATA%\postgresql\pgpass.conf
on Windows). - loop – An asyncio event loop instance. If
None
, the default event loop will be used. - timeout (float) – Connection timeout in seconds.
- statement_cache_size (int) – The size of prepared statement LRU cache. Pass
0
to disable the cache. - max_cached_statement_lifetime (int) – The maximum time in seconds a prepared statement will stay
in the cache. Pass
0
to allow statements be cached indefinitely. - max_cacheable_statement_size (int) – The maximum size of a statement that can be cached (15KiB by
default). Pass
0
to allow all statements to be cached regardless of their size. - command_timeout (float) – The default timeout for operations on this connection
(the default is
None
: no timeout). - ssl – Pass
True
or an ssl.SSLContext instance to require an SSL connection. IfTrue
, a default SSL context returned by ssl.create_default_context() will be used. - server_settings (dict) – An optional dict of server runtime parameters. Refer to PostgreSQL documentation for a list of supported options.
- connection_class (type) – Class of the returned connection object. Must be a subclass of
Connection
. - record_class (type) – If specified, the class to use for records returned by queries on
this connection object. Must be a subclass of
Record
.
Returns: A
Connection
instance.Example:
>>> import asyncpg >>> import asyncio >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... types = await con.fetch('SELECT * FROM pg_type') ... print(types) ... >>> asyncio.get_event_loop().run_until_complete(run()) [<Record typname='bool' typnamespace=11 ...
New in version 0.10.0: Added
max_cached_statement_use_count
parameter.Changed in version 0.11.0: Removed ability to pass arbitrary keyword arguments to set server settings. Added a dedicated parameter
server_settings
for that.New in version 0.11.0: Added
connection_class
parameter.New in version 0.16.0: Added
passfile
parameter (and support for password files in general).New in version 0.18.0: Added ability to specify multiple hosts in the dsn and host arguments.
Changed in version 0.21.0: The password argument now accepts a callable or an async function.
Changed in version 0.22.0: Added the record_class parameter.
- dsn –
-
class
Connection
(protocol, transport, loop, addr, config: asyncpg.connect_utils.ConnectionConfiguration, params: asyncpg.connect_utils.ConnectionParameters)[source]¶ A representation of a database session.
Connections are created by calling
connect()
.-
coroutine
add_listener
(channel, callback)[source]¶ Add a listener for Postgres notifications.
Parameters: - channel (str) – Channel to listen on.
- callback (callable) – A callable receiving the following arguments: connection: a Connection the callback is registered with; pid: PID of the Postgres server that sent the notification; channel: name of the channel the notification was sent to; payload: the payload.
-
add_log_listener
(callback)[source]¶ Add a listener for Postgres log messages.
It will be called when asyncronous NoticeResponse is received from the connection. Possible message types are: WARNING, NOTICE, DEBUG, INFO, or LOG.
Parameters: callback (callable) – A callable receiving the following arguments: connection: a Connection the callback is registered with; message: the exceptions.PostgresLogMessage message. New in version 0.12.0.
-
add_termination_listener
(callback)[source]¶ Add a listener that will be called when the connection is closed.
Parameters: callback (callable) – A callable receiving one argument: connection: a Connection the callback is registered with. New in version 0.21.0.
-
coroutine
close
(*, timeout=None)[source]¶ Close the connection gracefully.
Parameters: timeout (float) – Optional timeout value in seconds. Changed in version 0.14.0: Added the timeout parameter.
-
coroutine
copy_from_query
(query, *args, output, timeout=None, format=None, oids=None, delimiter=None, null=None, header=None, quote=None, escape=None, force_quote=None, encoding=None)[source]¶ Copy the results of a query to a file or file-like object.
Parameters: - query (str) – The query to copy the results of.
- args – Query arguments.
- output – A path-like object,
or a file-like object, or
a coroutine function
that takes a
bytes
instance as a sole argument. - timeout (float) – Optional timeout value in seconds.
The remaining keyword arguments are
COPY
statement options, see COPY statement documentation for details.Returns: The status string of the COPY command. Example:
>>> import asyncpg >>> import asyncio >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... result = await con.copy_from_query( ... 'SELECT foo, bar FROM mytable WHERE foo > $1', 10, ... output='file.csv', format='csv') ... print(result) ... >>> asyncio.get_event_loop().run_until_complete(run()) 'COPY 10'
New in version 0.11.0.
-
coroutine
copy_from_table
(table_name, *, output, columns=None, schema_name=None, timeout=None, format=None, oids=None, delimiter=None, null=None, header=None, quote=None, escape=None, force_quote=None, encoding=None)[source]¶ Copy table contents to a file or file-like object.
Parameters: - table_name (str) – The name of the table to copy data from.
- output – A path-like object,
or a file-like object, or
a coroutine function
that takes a
bytes
instance as a sole argument. - columns (list) – An optional list of column names to copy.
- schema_name (str) – An optional schema name to qualify the table.
- timeout (float) – Optional timeout value in seconds.
The remaining keyword arguments are
COPY
statement options, see COPY statement documentation for details.Returns: The status string of the COPY command. Example:
>>> import asyncpg >>> import asyncio >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... result = await con.copy_from_table( ... 'mytable', columns=('foo', 'bar'), ... output='file.csv', format='csv') ... print(result) ... >>> asyncio.get_event_loop().run_until_complete(run()) 'COPY 100'
New in version 0.11.0.
-
coroutine
copy_records_to_table
(table_name, *, records, columns=None, schema_name=None, timeout=None)[source]¶ Copy a list of records to the specified table using binary COPY.
Parameters: - table_name (str) – The name of the table to copy data to.
- records – An iterable returning row tuples to copy into the table.
- columns (list) – An optional list of column names to copy.
- schema_name (str) – An optional schema name to qualify the table.
- timeout (float) – Optional timeout value in seconds.
Returns: The status string of the COPY command.
Example:
>>> import asyncpg >>> import asyncio >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... result = await con.copy_records_to_table( ... 'mytable', records=[ ... (1, 'foo', 'bar'), ... (2, 'ham', 'spam')]) ... print(result) ... >>> asyncio.get_event_loop().run_until_complete(run()) 'COPY 2'
New in version 0.11.0.
-
coroutine
copy_to_table
(table_name, *, source, columns=None, schema_name=None, timeout=None, format=None, oids=None, freeze=None, delimiter=None, null=None, header=None, quote=None, escape=None, force_quote=None, force_not_null=None, force_null=None, encoding=None)[source]¶ Copy data to the specified table.
Parameters: - table_name (str) – The name of the table to copy data to.
- source – A path-like object,
or a file-like object, or
an asynchronous iterable
that returns
bytes
, or an object supporting the buffer protocol. - columns (list) – An optional list of column names to copy.
- schema_name (str) – An optional schema name to qualify the table.
- timeout (float) – Optional timeout value in seconds.
The remaining keyword arguments are
COPY
statement options, see COPY statement documentation for details.Returns: The status string of the COPY command. Example:
>>> import asyncpg >>> import asyncio >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... result = await con.copy_to_table( ... 'mytable', source='datafile.tbl') ... print(result) ... >>> asyncio.get_event_loop().run_until_complete(run()) 'COPY 140000'
New in version 0.11.0.
-
cursor
(query, *args, prefetch=None, timeout=None, record_class=None)[source]¶ Return a cursor factory for the specified query.
Parameters: - args – Query arguments.
- prefetch (int) – The number of rows the cursor iterator
will prefetch (defaults to
50
.) - timeout (float) – Optional timeout in seconds.
- record_class (type) – If specified, the class to use for records returned by this cursor.
Must be a subclass of
Record
. If not specified, a per-connection record_class is used.
Returns: A
CursorFactory
object.Changed in version 0.22.0: Added the record_class parameter.
-
coroutine
execute
(query: str, *args, timeout: float = None) → str[source]¶ Execute an SQL command (or commands).
This method can execute many SQL commands at once, when no arguments are provided.
Example:
>>> await con.execute(''' ... CREATE TABLE mytab (a int); ... INSERT INTO mytab (a) VALUES (100), (200), (300); ... ''') INSERT 0 3 >>> await con.execute(''' ... INSERT INTO mytab (a) VALUES ($1), ($2) ... ''', 10, 20) INSERT 0 2
Parameters: - args – Query arguments.
- timeout (float) – Optional timeout value in seconds.
Return str: Status of the last SQL command.
Changed in version 0.5.4: Made it possible to pass query arguments.
-
coroutine
executemany
(command: str, args, *, timeout: float = None)[source]¶ Execute an SQL command for each sequence of arguments in args.
Example:
>>> await con.executemany(''' ... INSERT INTO mytab (a) VALUES ($1, $2, $3); ... ''', [(1, 2, 3), (4, 5, 6)])
Parameters: - command – Command to execute.
- args – An iterable containing sequences of arguments.
- timeout (float) – Optional timeout value in seconds.
Return None: This method discards the results of the operations.
Note
When inserting a large number of rows, use
Connection.copy_records_to_table()
instead, it is much more efficient for this purpose.New in version 0.7.0.
Changed in version 0.11.0: timeout became a keyword-only parameter.
Changed in version 0.22.0: The execution was changed to be in an implicit transaction if there was no explicit transaction, so that it will no longer end up with partial success. If you still need the previous behavior to progressively execute many args, please use a loop with prepared statement instead.
-
coroutine
fetch
(query, *args, timeout=None, record_class=None) → list[source]¶ Run a query and return the results as a list of
Record
.Parameters: Return list: A list of
Record
instances. If specified, the actual type of list elements would be record_class.Changed in version 0.22.0: Added the record_class parameter.
-
coroutine
fetchrow
(query, *args, timeout=None, record_class=None)[source]¶ Run a query and return the first row.
Parameters: Returns: The first row as a
Record
instance, or None if no records were returned by the query. If specified, record_class is used as the type for the result value.Changed in version 0.22.0: Added the record_class parameter.
-
coroutine
fetchval
(query, *args, column=0, timeout=None)[source]¶ Run a query and return a value in the first row.
Parameters: Returns: The value of the specified column of the first record, or None if no records were returned by the query.
-
get_server_version
()[source]¶ Return the version of the connected PostgreSQL server.
The returned value is a named tuple similar to that in
sys.version_info
:>>> con.get_server_version() ServerVersion(major=9, minor=6, micro=1, releaselevel='final', serial=0)
New in version 0.8.0.
-
get_settings
()[source]¶ Return connection settings.
Returns: ConnectionSettings
.
-
is_closed
()[source]¶ Return
True
if the connection is closed,False
otherwise.Return bool: True
if the connection is closed,False
otherwise.
-
is_in_transaction
()[source]¶ Return True if Connection is currently inside a transaction.
Return bool: True if inside transaction, False otherwise. New in version 0.16.0.
-
coroutine
prepare
(query, *, timeout=None, record_class=None)[source]¶ Create a prepared statement for the specified query.
Parameters: - query (str) – Text of the query to create a prepared statement for.
- timeout (float) – Optional timeout value in seconds.
- record_class (type) – If specified, the class to use for records returned by the
prepared statement. Must be a subclass of
Record
. If not specified, a per-connection record_class is used.
Returns: A
PreparedStatement
instance.Changed in version 0.22.0: Added the record_class parameter.
-
coroutine
reload_schema_state
()[source]¶ Indicate that the database schema information must be reloaded.
For performance reasons, asyncpg caches certain aspects of the database schema, such as the layout of composite types. Consequently, when the database schema changes, and asyncpg is not able to gracefully recover from an error caused by outdated schema assumptions, an
OutdatedSchemaCacheError
is raised. To prevent the exception, this method may be used to inform asyncpg that the database schema has changed.Example:
>>> import asyncpg >>> import asyncio >>> async def change_type(con): ... result = await con.fetch('SELECT id, info FROM tbl') ... # Change composite's attribute type "int"=>"text" ... await con.execute('ALTER TYPE custom DROP ATTRIBUTE y') ... await con.execute('ALTER TYPE custom ADD ATTRIBUTE y text') ... await con.reload_schema_state() ... for id_, info in result: ... new = (info['x'], str(info['y'])) ... await con.execute( ... 'UPDATE tbl SET info=$2 WHERE id=$1', id_, new) ... >>> async def run(): ... # Initial schema: ... # CREATE TYPE custom AS (x int, y int); ... # CREATE TABLE tbl(id int, info custom); ... con = await asyncpg.connect(user='postgres') ... async with con.transaction(): ... # Prevent concurrent changes in the table ... await con.execute('LOCK TABLE tbl') ... await change_type(con) ... >>> asyncio.get_event_loop().run_until_complete(run())
New in version 0.14.0.
-
coroutine
remove_listener
(channel, callback)[source]¶ Remove a listening callback on the specified channel.
-
remove_log_listener
(callback)[source]¶ Remove a listening callback for log messages.
New in version 0.12.0.
-
remove_termination_listener
(callback)[source]¶ Remove a listening callback for connection termination.
Parameters: callback (callable) – The callable that was passed to Connection.add_termination_listener()
.New in version 0.21.0.
-
coroutine
reset_type_codec
(typename, *, schema='public')[source]¶ Reset typename codec to the default implementation.
Parameters: - typename – Name of the data type the codec is for.
- schema – Schema name of the data type the codec is for
(defaults to
'public'
)
New in version 0.12.0.
-
coroutine
set_builtin_type_codec
(typename, *, schema='public', codec_name, format=None)[source]¶ Set a builtin codec for the specified scalar data type.
This method has two uses. The first is to register a builtin codec for an extension type without a stable OID, such as ‘hstore’. The second use is to declare that an extension type or a user-defined type is wire-compatible with a certain builtin data type and should be exchanged as such.
Parameters: - typename – Name of the data type the codec is for.
- schema – Schema name of the data type the codec is for
(defaults to
'public'
). - codec_name – The name of the builtin codec to use for the type.
This should be either the name of a known core type
(such as
"int"
), or the name of a supported extension type. Currently, the only supported extension type is"pg_contrib.hstore"
. - format – If format is
None
(the default), all formats supported by the target codec are declared to be supported for typename. If format is'text'
or'binary'
, then only the specified format is declared to be supported for typename.
Changed in version 0.18.0: The codec_name argument can be the name of any known core data type. Added the format keyword argument.
-
coroutine
set_type_codec
(typename, *, schema='public', encoder, decoder, format='text')[source]¶ Set an encoder/decoder pair for the specified data type.
Parameters: - typename – Name of the data type the codec is for.
- schema – Schema name of the data type the codec is for
(defaults to
'public'
) - format –
The type of the argument received by the decoder callback, and the type of the encoder callback return value.
If format is
'text'
(the default), the exchange datum is astr
instance containing valid text representation of the data type.If format is
'binary'
, the exchange datum is abytes
instance containing valid _binary_ representation of the data type.If format is
'tuple'
, the exchange datum is a type-specifictuple
of values. The table below lists supported data types and their format for this mode.Type Tuple layout interval
( months
,days
,microseconds
)date
( date ordinal relative to Jan 1 2000
,)-2^31
for negative infinity timestamp2^31-1
for positive infinity timestamp.timestamp
( microseconds relative to Jan 1 2000
,)-2^63
for negative infinity timestamp2^63-1
for positive infinity timestamp.timestamp with time zone
( microseconds relative to Jan 1 2000 UTC
,)-2^63
for negative infinity timestamp2^63-1
for positive infinity timestamp.time
( microseconds
,)time with time zone
( microseconds
,time zone offset in seconds
) - encoder – Callable accepting a Python object as a single argument and returning a value encoded according to format.
- decoder – Callable accepting a single argument encoded according to format and returning a decoded Python object.
Example:
>>> import asyncpg >>> import asyncio >>> import datetime >>> from dateutil.relativedelta import relativedelta >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... def encoder(delta): ... ndelta = delta.normalized() ... return (ndelta.years * 12 + ndelta.months, ... ndelta.days, ... ((ndelta.hours * 3600 + ... ndelta.minutes * 60 + ... ndelta.seconds) * 1000000 + ... ndelta.microseconds)) ... def decoder(tup): ... return relativedelta(months=tup[0], days=tup[1], ... microseconds=tup[2]) ... await con.set_type_codec( ... 'interval', schema='pg_catalog', encoder=encoder, ... decoder=decoder, format='tuple') ... result = await con.fetchval( ... "SELECT '2 years 3 mons 1 day'::interval") ... print(result) ... print(datetime.datetime(2002, 1, 1) + result) ... >>> asyncio.get_event_loop().run_until_complete(run()) relativedelta(years=+2, months=+3, days=+1) 2004-04-02 00:00:00
New in version 0.12.0: Added the
format
keyword argument and support for ‘tuple’ format.Changed in version 0.12.0: The
binary
keyword argument is deprecated in favor offormat
.Changed in version 0.13.0: The
binary
keyword argument was removed in favor offormat
.
-
transaction
(*, isolation=None, readonly=False, deferrable=False)[source]¶ Create a
Transaction
object.Refer to PostgreSQL documentation on the meaning of transaction parameters.
Parameters: - isolation – Transaction isolation mode, can be one of:
‘serializable’, ‘repeatable_read’,
‘read_committed’. If not specified, the behavior
is up to the server and session, which is usually
read_committed
. - readonly – Specifies whether or not this transaction is read-only.
- deferrable – Specifies whether or not this transaction is deferrable.
- isolation – Transaction isolation mode, can be one of:
‘serializable’, ‘repeatable_read’,
‘read_committed’. If not specified, the behavior
is up to the server and session, which is usually
-
coroutine
Prepared Statements¶
Prepared statements are a PostgreSQL feature that can be used to optimize the
performance of queries that are executed more than once. When a query
is prepared by a call to Connection.prepare()
, the server parses,
analyzes and compiles the query allowing to reuse that work once there is
a need to run the same query again.
>>> import asyncpg, asyncio
>>> loop = asyncio.get_event_loop()
>>> async def run():
... conn = await asyncpg.connect()
... stmt = await conn.prepare('''SELECT 2 ^ $1''')
... print(await stmt.fetchval(10))
... print(await stmt.fetchval(20))
...
>>> loop.run_until_complete(run())
1024.0
1048576.0
Note
asyncpg automatically maintains a small LRU cache for queries executed
during calls to the fetch()
, fetchrow()
,
or fetchval()
methods.
Warning
If you are using pgbouncer with pool_mode
set to transaction
or
statement
, prepared statements will not work correctly. See
Why am I getting prepared statement errors? for more information.
-
class
PreparedStatement
[source]¶ A representation of a prepared statement.
-
cursor
(*args, prefetch=None, timeout=None) → asyncpg.cursor.CursorFactory[source]¶ Return a cursor factory for the prepared statement.
Parameters: Returns: A
CursorFactory
object.
-
executemany
(args, *, timeout: float = None)[source]¶ Execute the statement for each sequence of arguments in args.
Parameters: - args – An iterable containing sequences of arguments.
- timeout (float) – Optional timeout value in seconds.
Return None: This method discards the results of the operations.
New in version 0.22.0.
-
explain
(*args, analyze=False)[source]¶ Return the execution plan of the statement.
Parameters: - args – Query arguments.
- analyze – If
True
, the statement will be executed and the run time statitics added to the return value.
Returns: An object representing the execution plan. This value is actually a deserialized JSON output of the SQL
EXPLAIN
command.
-
fetch
(*args, timeout=None)[source]¶ Execute the statement and return a list of
Record
objects.Parameters: Returns: A list of
Record
instances.
-
fetchrow
(*args, timeout=None)[source]¶ Execute the statement and return the first row.
Parameters: Returns: The first row as a
Record
instance.
-
fetchval
(*args, column=0, timeout=None)[source]¶ Execute the statement and return a value in the first row.
Parameters: Returns: The value of the specified column of the first record.
-
get_attributes
()[source]¶ Return a description of relation attributes (columns).
Returns: A tuple of asyncpg.types.Attribute
.Example:
st = await self.con.prepare(''' SELECT typname, typnamespace FROM pg_type ''') print(st.get_attributes()) # Will print: # (Attribute( # name='typname', # type=Type(oid=19, name='name', kind='scalar', # schema='pg_catalog')), # Attribute( # name='typnamespace', # type=Type(oid=26, name='oid', kind='scalar', # schema='pg_catalog')))
-
get_parameters
()[source]¶ Return a description of statement parameters types.
Returns: A tuple of asyncpg.types.Type
.Example:
stmt = await connection.prepare('SELECT ($1::int, $2::text)') print(stmt.get_parameters()) # Will print: # (Type(oid=23, name='int4', kind='scalar', schema='pg_catalog'), # Type(oid=25, name='text', kind='scalar', schema='pg_catalog'))
-
Transactions¶
The most common way to use transactions is through an async with
statement:
async with connection.transaction():
await connection.execute("INSERT INTO mytable VALUES(1, 2, 3)")
asyncpg supports nested transactions (a nested transaction context will create a savepoint.):
async with connection.transaction():
await connection.execute('CREATE TABLE mytab (a int)')
try:
# Create a nested transaction:
async with connection.transaction():
await connection.execute('INSERT INTO mytab (a) VALUES (1), (2)')
# This nested transaction will be automatically rolled back:
raise Exception
except:
# Ignore exception
pass
# Because the nested transaction was rolled back, there
# will be nothing in `mytab`.
assert await connection.fetch('SELECT a FROM mytab') == []
Alternatively, transactions can be used without an async with
block:
tr = connection.transaction()
await tr.start()
try:
...
except:
await tr.rollback()
raise
else:
await tr.commit()
See also the
Connection.transaction()
function.
-
class
Transaction
[source]¶ Represents a transaction or savepoint block.
Transactions are created by calling the
Connection.transaction()
function.-
async with c:
start and commit/rollback the transaction or savepoint block automatically when entering and exiting the code inside the context manager block.
-
Cursors¶
Cursors are useful when there is a need to iterate over the results of
a large query without fetching all rows at once. The cursor interface
provided by asyncpg supports asynchronous iteration via the async for
statement, and also a way to read row chunks and skip forward over the
result set.
To iterate over a cursor using a connection object use
Connection.cursor()
.
To make the iteration efficient, the cursor will prefetch records to
reduce the number of queries sent to the server:
async def iterate(con: Connection):
async with con.transaction():
# Postgres requires non-scrollable cursors to be created
# and used in a transaction.
async for record in con.cursor('SELECT generate_series(0, 100)'):
print(record)
Or, alternatively, you can iterate over the cursor manually (cursor won’t be prefetching any rows):
async def iterate(con: Connection):
async with con.transaction():
# Postgres requires non-scrollable cursors to be created
# and used in a transaction.
# Create a Cursor object
cur = await con.cursor('SELECT generate_series(0, 100)')
# Move the cursor 10 rows forward
await cur.forward(10)
# Fetch one row and print it
print(await cur.fetchrow())
# Fetch a list of 5 rows and print it
print(await cur.fetch(5))
It’s also possible to create cursors from prepared statements:
async def iterate(con: Connection):
# Create a prepared statement that will accept one argument
stmt = await con.prepare('SELECT generate_series(0, $1)')
async with con.transaction():
# Postgres requires non-scrollable cursors to be created
# and used in a transaction.
# Execute the prepared statement passing `10` as the
# argument -- that will generate a series or records
# from 0..10. Iterate over all of them and print every
# record.
async for record in stmt.cursor(10):
print(record)
Note
Cursors created by a call to
Connection.cursor()
or
PreparedStatement.cursor()
are non-scrollable: they can only be read forwards. To create a scrollable
cursor, use the DECLARE ... SCROLL CURSOR
SQL statement directly.
Warning
Cursors created by a call to
Connection.cursor()
or
PreparedStatement.cursor()
cannot be used outside of a transaction. Any such attempt will result in
InterfaceError
.
To create a cursor usable outside of a transaction, use the
DECLARE ... CURSOR WITH HOLD
SQL statement directly.
-
class
CursorFactory
[source]¶ A cursor interface for the results of a query.
A cursor interface can be used to initiate efficient traversal of the results of a large query.
-
async for row in c
Execute the statement and iterate over the results asynchronously.
-
await c
Execute the statement and return an instance of
Cursor
which can be used to navigate over and fetch subsets of the query results.
-
-
class
Cursor
[source]¶ An open portal into the results of a query.
-
fetch
(n, *, timeout=None)[source]¶ Return the next n rows as a list of
Record
objects.Parameters: timeout (float) – Optional timeout value in seconds. Returns: A list of Record
instances.
-
Connection Pools¶
-
create_pool
(dsn=None, *, min_size=10, max_size=10, max_queries=50000, max_inactive_connection_lifetime=300.0, setup=None, init=None, loop=None, connection_class=<class 'asyncpg.connection.Connection'>, record_class=<class 'asyncpg.Record'>, **connect_kwargs)[source]¶ Create a connection pool.
Can be used either with an
async with
block:async with asyncpg.create_pool(user='postgres', command_timeout=60) as pool: await pool.fetch('SELECT 1')
Or to perform multiple operations on a single connection:
async with asyncpg.create_pool(user='postgres', command_timeout=60) as pool: async with pool.acquire() as con: await con.execute(''' CREATE TABLE names ( id serial PRIMARY KEY, name VARCHAR (255) NOT NULL) ''') await con.fetch('SELECT 1')
Or directly with
await
(not recommended):pool = await asyncpg.create_pool(user='postgres', command_timeout=60) con = await pool.acquire() try: await con.fetch('SELECT 1') finally: await pool.release(con)
Warning
Prepared statements and cursors returned by
Connection.prepare()
andConnection.cursor()
become invalid once the connection is released. Likewise, all notification and log listeners are removed, andasyncpg
will issue a warning if there are any listener callbacks registered on a connection that is being released to the pool.Parameters: - dsn (str) – Connection arguments specified using as a single string in
the following format:
postgres://user:pass@host:port/database?option=value
. - **connect_kwargs – Keyword arguments for the
connect()
function. - connection_class (Connection) – The class to use for connections. Must be a subclass of
Connection
. - record_class (type) – If specified, the class to use for records returned by queries on
the connections in this pool. Must be a subclass of
Record
. - min_size (int) – Number of connection the pool will be initialized with.
- max_size (int) – Max number of connections in the pool.
- max_queries (int) – Number of queries after a connection is closed and replaced with a new connection.
- max_inactive_connection_lifetime (float) – Number of seconds after which inactive connections in the
pool will be closed. Pass
0
to disable this mechanism. - setup (coroutine) – A coroutine to prepare a connection right before it is returned
from
Pool.acquire()
. An example use case would be to automatically set up notifications listeners for all connections of a pool. - init (coroutine) – A coroutine to initialize a connection when it is created.
An example use case would be to setup type codecs with
Connection.set_builtin_type_codec()
orConnection.set_type_codec()
. - loop – An asyncio event loop instance. If
None
, the default event loop will be used.
Returns: An instance of
Pool
.Changed in version 0.10.0: An
InterfaceError
will be raised on any attempted operation on a released connection.Changed in version 0.13.0: An
InterfaceError
will be raised on any attempted operation on a prepared statement or a cursor created on a connection that has been released to the pool.Changed in version 0.13.0: An
InterfaceWarning
will be produced if there are any active listeners (added viaConnection.add_listener()
orConnection.add_log_listener()
) present on the connection at the moment of its release to the pool.Changed in version 0.22.0: Added the record_class parameter.
- dsn (str) – Connection arguments specified using as a single string in
the following format:
-
class
Pool
[source]¶ A connection pool.
Connection pool can be used to manage a set of connections to the database. Connections are first acquired from the pool, then used, and then released back to the pool. Once a connection is released, it’s reset to close all open cursors and other resources except prepared statements.
Pools are created by calling
create_pool()
.-
acquire
(*, timeout=None)[source]¶ Acquire a database connection from the pool.
Parameters: timeout (float) – A timeout for acquiring a Connection. Returns: An instance of Connection
.Can be used in an
await
expression or with anasync with
block.async with pool.acquire() as con: await con.execute(...)
Or:
con = await pool.acquire() try: await con.execute(...) finally: await pool.release(con)
-
coroutine
close
()[source]¶ Attempt to gracefully close all connections in the pool.
Wait until all pool connections are released, close them and shut down the pool. If any error (including cancellation) occurs in
close()
the pool will terminate by callingPool.terminate()
.It is advisable to use
asyncio.wait_for()
to set a timeout.Changed in version 0.16.0:
close()
now waits until all pool connections are released before closing them and the pool. Errors raised inclose()
will cause immediate pool termination.
-
coroutine
execute
(query: str, *args, timeout: float = None) → str[source]¶ Execute an SQL command (or commands).
Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.execute()
.New in version 0.10.0.
-
coroutine
executemany
(command: str, args, *, timeout: float = None)[source]¶ Execute an SQL command for each sequence of arguments in args.
Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.executemany()
.New in version 0.10.0.
-
coroutine
expire_connections
()[source]¶ Expire all currently open connections.
Cause all currently open connections to get replaced on the next
acquire()
call.New in version 0.16.0.
-
coroutine
fetch
(query, *args, timeout=None) → list[source]¶ Run a query and return the results as a list of
Record
.Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.fetch()
.New in version 0.10.0.
-
coroutine
fetchrow
(query, *args, timeout=None)[source]¶ Run a query and return the first row.
Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.fetchrow()
.New in version 0.10.0.
-
coroutine
fetchval
(query, *args, column=0, timeout=None)[source]¶ Run a query and return a value in the first row.
Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.fetchval()
.New in version 0.10.0.
-
coroutine
release
(connection, *, timeout=None)[source]¶ Release a database connection back to the pool.
Parameters: - connection (Connection) – A
Connection
object to release. - timeout (float) – A timeout for releasing the connection. If not specified, defaults
to the timeout provided in the corresponding call to the
Pool.acquire()
method.
Changed in version 0.14.0: Added the timeout parameter.
- connection (Connection) – A
-
set_connect_args
(dsn=None, **connect_kwargs)[source]¶ Set the new connection arguments for this pool.
The new connection arguments will be used for all subsequent new connection attempts. Existing connections will remain until they expire. Use
Pool.expire_connections()
to expedite the connection expiry.Parameters: New in version 0.16.0.
-
Record Objects¶
Each row (or composite type value) returned by calls to fetch*
methods
is represented by an instance of the Record
object.
Record
objects are a tuple-/dict-like hybrid, and allow addressing of
items either by a numeric index or by a field name:
>>> import asyncpg
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> conn = loop.run_until_complete(asyncpg.connect())
>>> r = loop.run_until_complete(conn.fetchrow('''
... SELECT oid, rolname, rolsuper FROM pg_roles WHERE rolname = user'''))
>>> r
<Record oid=16388 rolname='elvis' rolsuper=True>
>>> r['oid']
16388
>>> r[0]
16388
>>> dict(r)
{'oid': 16388, 'rolname': 'elvis', 'rolsuper': True}
>>> tuple(r)
(16388, 'elvis', True)
Note
Record
objects currently cannot be created from Python code.
-
class
Record
¶ A read-only representation of PostgreSQL row.
-
len(r)
Return the number of fields in record r.
-
r[field]
Return the field of r with field name or index field.
-
name in r
Return
True
if record r has a field named name.
-
iter(r)
Return an iterator over the values of the record r.
-
get(name[, default])
Return the value for name if the record has a field named name, else return default. If default is not given, return
None
.New in version 0.18.
-
values
()¶ Return an iterator over the record values.
-
keys
()¶ Return an iterator over the record field names.
-
items
()¶ Return an iterator over
(field, value)
pairs.
-
-
class
ConnectionSettings
¶ A read-only collection of Connection settings.
-
settings.setting_name
Return the value of the “setting_name” setting. Raises an
AttributeError
if the setting is not defined.Example:
>>> connection.get_settings().client_encoding 'UTF8'
-
Data Types¶
-
class
Type
¶ Database data type.
-
kind
¶ Type kind. Can be “scalar”, “array”, “composite” or “range”.
-
name
¶ Type name. For example “int2”.
-
oid
¶ OID of the type.
-
schema
¶ Name of the database schema that defines the type.
-
-
class
Attribute
¶ Database relation attribute.
-
name
¶ Attribute name.
-
type
¶ Attribute data type
asyncpg.types.Type
.
-
-
class
Range
(lower=None, upper=None, *, lower_inc=True, upper_inc=False, empty=False)[source]¶ Immutable representation of PostgreSQL range type.
-
class
BitString
(bitstring: Optional[bytes] = None)[source]¶ Immutable representation of PostgreSQL bit and varbit types.
-
classmethod
from_int
(x: int, length: int, bitorder: Literal[big, little] = 'big', *, signed: bool = False) → _BitString[source]¶ Represent the Python int x as a BitString. Acts similarly to int.to_bytes.
Parameters: - x (int) – An integer to represent. Negative integers are represented in two’s complement form, unless the argument signed is False, in which case negative integers raise an OverflowError.
- length (int) – The length of the resulting BitString. An OverflowError is raised if the integer is not representable in this many bits.
- bitorder – Determines the bit order used in the BitString representation. By default, this function uses Postgres conventions for casting ints to bits. If bitorder is ‘big’, the most significant bit is at the start of the string (this is the same as the default). If bitorder is ‘little’, the most significant bit is at the end of the string.
- signed (bool) – Determines whether two’s complement is used in the BitString representation. If signed is False and a negative integer is given, an OverflowError is raised.
Return BitString: A BitString representing the input integer, in the form specified by the other input args.
New in version 0.18.0.
-
to_int
(bitorder: Literal[big, little] = 'big', *, signed: bool = False) → int[source]¶ Interpret the BitString as a Python int. Acts similarly to int.from_bytes.
Parameters: - bitorder – Determines the bit order used to interpret the BitString. By default, this function uses Postgres conventions for casting bits to ints. If bitorder is ‘big’, the most significant bit is at the start of the string (this is the same as the default). If bitorder is ‘little’, the most significant bit is at the end of the string.
- signed (bool) – Determines whether two’s complement is used to interpret the BitString. If signed is False, the returned value is always non-negative.
Return int: An integer representing the BitString. Information about the BitString’s exact length is lost.
New in version 0.18.0.
-
classmethod