API Reference¶
Connection¶
- async connect(dsn=None, *, host=None, port=None, user=None, password=None, passfile=None, database=None, loop=None, timeout=60, statement_cache_size=100, max_cached_statement_lifetime=300, max_cacheable_statement_size=15360, command_timeout=None, ssl=None, direct_tls=False, connection_class=<class 'asyncpg.connection.Connection'>, record_class=<class 'asyncpg.Record'>, server_settings=None)[source]¶
A coroutine to establish a connection to a PostgreSQL server.
The connection parameters may be specified either as a connection URI in dsn, or as specific keyword arguments, or both. If both dsn and keyword arguments are specified, the latter override the corresponding values parsed from the connection URI. The default values for the majority of arguments can be specified using environment variables.
Returns a new
Connection
object.- Parameters
dsn –
Connection arguments specified using as a single string in the libpq connection URI format:
postgres://user:password@host:port/database?option=value
. The following options are recognized by asyncpg:host
,port
,user
,database
(ordbname
),password
,passfile
,sslmode
,sslcert
,sslkey
,sslrootcert
, andsslcrl
. Unlike libpq, asyncpg will treat unrecognized options as server settings to be used for the connection.Note
The URI must be valid, which means that all components must be properly quoted with
urllib.parse.quote()
, and any literal IPv6 addresses must be enclosed in square brackets. For example:postgres://dbuser@[fe80::1ff:fe23:4567:890a%25eth0]/dbname
host –
Database host address as one of the following:
an IP address or a domain name;
an absolute path to the directory containing the database server Unix-domain socket (not supported on Windows);
a sequence of any of the above, in which case the addresses will be tried in order, and the first successful connection will be returned.
If not specified, asyncpg will try the following, in order:
host address(es) parsed from the dsn argument,
the value of the
PGHOST
environment variable,on Unix, common directories used for PostgreSQL Unix-domain sockets:
"/run/postgresql"
,"/var/run/postgresl"
,"/var/pgsql_socket"
,"/private/tmp"
, and"/tmp"
,"localhost"
.
port –
Port number to connect to at the server host (or Unix-domain socket file extension). If multiple host addresses were specified, this parameter may specify a sequence of port numbers of the same length as the host sequence, or it may specify a single port number to be used for all host addresses.
If not specified, the value parsed from the dsn argument is used, or the value of the
PGPORT
environment variable, or5432
if neither is specified.user –
The name of the database role used for authentication.
If not specified, the value parsed from the dsn argument is used, or the value of the
PGUSER
environment variable, or the operating system name of the user running the application.database –
The name of the database to connect to.
If not specified, the value parsed from the dsn argument is used, or the value of the
PGDATABASE
environment variable, or the computed value of the user argument.password –
Password to be used for authentication, if the server requires one. If not specified, the value parsed from the dsn argument is used, or the value of the
PGPASSWORD
environment variable. Note that the use of the environment variable is discouraged as other users and applications may be able to read it without needing specific privileges. It is recommended to use passfile instead.Password may be either a string, or a callable that returns a string. If a callable is provided, it will be called each time a new connection is established.
passfile – The name of the file used to store passwords (defaults to
~/.pgpass
, or%APPDATA%\postgresql\pgpass.conf
on Windows).loop – An asyncio event loop instance. If
None
, the default event loop will be used.timeout (float) – Connection timeout in seconds.
statement_cache_size (int) – The size of prepared statement LRU cache. Pass
0
to disable the cache.max_cached_statement_lifetime (int) – The maximum time in seconds a prepared statement will stay in the cache. Pass
0
to allow statements be cached indefinitely.max_cacheable_statement_size (int) – The maximum size of a statement that can be cached (15KiB by default). Pass
0
to allow all statements to be cached regardless of their size.command_timeout (float) – The default timeout for operations on this connection (the default is
None
: no timeout).ssl –
Pass
True
or an ssl.SSLContext instance to require an SSL connection. IfTrue
, a default SSL context returned by ssl.create_default_context() will be used. The value can also be one of the following strings:'disable'
- SSL is disabled (equivalent toFalse
)'prefer'
- try SSL first, fallback to non-SSL connection if SSL connection fails'allow'
- try without SSL first, then retry with SSL if the first attempt fails.'require'
- only try an SSL connection. Certificate verification errors are ignored'verify-ca'
- only try an SSL connection, and verify that the server certificate is issued by a trusted certificate authority (CA)'verify-full'
- only try an SSL connection, verify that the server certificate is issued by a trusted CA and that the requested server host name matches that in the certificate.
The default is
'prefer'
: try an SSL connection and fallback to non-SSL connection if that fails.Note
ssl is ignored for Unix domain socket communication.
Example of programmatic SSL context configuration that is equivalent to
sslmode=verify-full&sslcert=..&sslkey=..&sslrootcert=..
:>>> import asyncpg >>> import asyncio >>> import ssl >>> async def main(): ... # Load CA bundle for server certificate verification, ... # equivalent to sslrootcert= in DSN. ... sslctx = ssl.create_default_context( ... ssl.Purpose.SERVER_AUTH, ... cafile="path/to/ca_bundle.pem") ... # If True, equivalent to sslmode=verify-full, if False: ... # sslmode=verify-ca. ... sslctx.check_hostname = True ... # Load client certificate and private key for client ... # authentication, equivalent to sslcert= and sslkey= in ... # DSN. ... sslctx.load_cert_chain( ... "path/to/client.cert", ... keyfile="path/to/client.key", ... ) ... con = await asyncpg.connect(user='postgres', ssl=sslctx) ... await con.close() >>> asyncio.run(run())
Example of programmatic SSL context configuration that is equivalent to
sslmode=require
(no server certificate or host verification):>>> import asyncpg >>> import asyncio >>> import ssl >>> async def main(): ... sslctx = ssl.create_default_context( ... ssl.Purpose.SERVER_AUTH) ... sslctx.check_hostname = False ... sslctx.verify_mode = ssl.CERT_NONE ... con = await asyncpg.connect(user='postgres', ssl=sslctx) ... await con.close() >>> asyncio.run(run())
direct_tls (bool) – Pass
True
to skip PostgreSQL STARTTLS mode and perform a direct SSL connection. Must be used alongsidessl
param.server_settings (dict) – An optional dict of server runtime parameters. Refer to PostgreSQL documentation for a list of supported options.
connection_class (type) – Class of the returned connection object. Must be a subclass of
Connection
.record_class (type) – If specified, the class to use for records returned by queries on this connection object. Must be a subclass of
Record
.
- Returns
A
Connection
instance.
Example:
>>> import asyncpg >>> import asyncio >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... types = await con.fetch('SELECT * FROM pg_type') ... print(types) ... >>> asyncio.get_event_loop().run_until_complete(run()) [<Record typname='bool' typnamespace=11 ...
New in version 0.10.0: Added
max_cached_statement_use_count
parameter.Changed in version 0.11.0: Removed ability to pass arbitrary keyword arguments to set server settings. Added a dedicated parameter
server_settings
for that.New in version 0.11.0: Added
connection_class
parameter.New in version 0.16.0: Added
passfile
parameter (and support for password files in general).New in version 0.18.0: Added ability to specify multiple hosts in the dsn and host arguments.
Changed in version 0.21.0: The password argument now accepts a callable or an async function.
Changed in version 0.22.0: Added the record_class parameter.
Changed in version 0.22.0: The ssl argument now defaults to
'prefer'
.Changed in version 0.24.0: The
sslcert
,sslkey
,sslrootcert
, andsslcrl
options are supported in the dsn argument.Changed in version 0.25.0: The
sslpassword
,ssl_min_protocol_version
, andssl_max_protocol_version
options are supported in the dsn argument.Changed in version 0.25.0: Default system root CA certificates won’t be loaded when specifying a particular sslmode, following the same behavior in libpq.
Changed in version 0.25.0: The
sslcert
,sslkey
,sslrootcert
, andsslcrl
options in the dsn argument now have consistent default values of files under~/.postgresql/
as libpq.Changed in version 0.26.0: Added the direct_tls parameter.
- class Connection(protocol, transport, loop, addr, config: asyncpg.connect_utils.ConnectionConfiguration, params: asyncpg.connect_utils.ConnectionParameters)[source]¶
A representation of a database session.
Connections are created by calling
connect()
.- coroutine add_listener(channel, callback)[source]¶
Add a listener for Postgres notifications.
- Parameters
channel (str) – Channel to listen on.
callback (callable) – A callable or a coroutine function receiving the following arguments: connection: a Connection the callback is registered with; pid: PID of the Postgres server that sent the notification; channel: name of the channel the notification was sent to; payload: the payload.
Changed in version 0.24.0: The
callback
argument may be a coroutine function.
- add_log_listener(callback)[source]¶
Add a listener for Postgres log messages.
It will be called when asyncronous NoticeResponse is received from the connection. Possible message types are: WARNING, NOTICE, DEBUG, INFO, or LOG.
- Parameters
callback (callable) – A callable or a coroutine function receiving the following arguments: connection: a Connection the callback is registered with; message: the exceptions.PostgresLogMessage message.
New in version 0.12.0.
Changed in version 0.24.0: The
callback
argument may be a coroutine function.
- add_termination_listener(callback)[source]¶
Add a listener that will be called when the connection is closed.
- Parameters
callback (callable) – A callable or a coroutine function receiving one argument: connection: a Connection the callback is registered with.
New in version 0.21.0.
Changed in version 0.24.0: The
callback
argument may be a coroutine function.
- coroutine close(*, timeout=None)[source]¶
Close the connection gracefully.
- Parameters
timeout (float) – Optional timeout value in seconds.
Changed in version 0.14.0: Added the timeout parameter.
- coroutine copy_from_query(query, *args, output, timeout=None, format=None, oids=None, delimiter=None, null=None, header=None, quote=None, escape=None, force_quote=None, encoding=None)[source]¶
Copy the results of a query to a file or file-like object.
- Parameters
query (str) – The query to copy the results of.
args – Query arguments.
output – A path-like object, or a file-like object, or a coroutine function that takes a
bytes
instance as a sole argument.timeout (float) – Optional timeout value in seconds.
The remaining keyword arguments are
COPY
statement options, see COPY statement documentation for details.- Returns
The status string of the COPY command.
Example:
>>> import asyncpg >>> import asyncio >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... result = await con.copy_from_query( ... 'SELECT foo, bar FROM mytable WHERE foo > $1', 10, ... output='file.csv', format='csv') ... print(result) ... >>> asyncio.get_event_loop().run_until_complete(run()) 'COPY 10'
New in version 0.11.0.
- coroutine copy_from_table(table_name, *, output, columns=None, schema_name=None, timeout=None, format=None, oids=None, delimiter=None, null=None, header=None, quote=None, escape=None, force_quote=None, encoding=None)[source]¶
Copy table contents to a file or file-like object.
- Parameters
table_name (str) – The name of the table to copy data from.
output – A path-like object, or a file-like object, or a coroutine function that takes a
bytes
instance as a sole argument.columns (list) – An optional list of column names to copy.
schema_name (str) – An optional schema name to qualify the table.
timeout (float) – Optional timeout value in seconds.
The remaining keyword arguments are
COPY
statement options, see COPY statement documentation for details.- Returns
The status string of the COPY command.
Example:
>>> import asyncpg >>> import asyncio >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... result = await con.copy_from_table( ... 'mytable', columns=('foo', 'bar'), ... output='file.csv', format='csv') ... print(result) ... >>> asyncio.get_event_loop().run_until_complete(run()) 'COPY 100'
New in version 0.11.0.
- coroutine copy_records_to_table(table_name, *, records, columns=None, schema_name=None, timeout=None)[source]¶
Copy a list of records to the specified table using binary COPY.
- Parameters
table_name (str) – The name of the table to copy data to.
records – An iterable returning row tuples to copy into the table. Asynchronous iterables are also supported.
columns (list) – An optional list of column names to copy.
schema_name (str) – An optional schema name to qualify the table.
timeout (float) – Optional timeout value in seconds.
- Returns
The status string of the COPY command.
Example:
>>> import asyncpg >>> import asyncio >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... result = await con.copy_records_to_table( ... 'mytable', records=[ ... (1, 'foo', 'bar'), ... (2, 'ham', 'spam')]) ... print(result) ... >>> asyncio.get_event_loop().run_until_complete(run()) 'COPY 2'
Asynchronous record iterables are also supported:
>>> import asyncpg >>> import asyncio >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... async def record_gen(size): ... for i in range(size): ... yield (i,) ... result = await con.copy_records_to_table( ... 'mytable', records=record_gen(100)) ... print(result) ... >>> asyncio.get_event_loop().run_until_complete(run()) 'COPY 100'
New in version 0.11.0.
Changed in version 0.24.0: The
records
argument may be an asynchronous iterable.
- coroutine copy_to_table(table_name, *, source, columns=None, schema_name=None, timeout=None, format=None, oids=None, freeze=None, delimiter=None, null=None, header=None, quote=None, escape=None, force_quote=None, force_not_null=None, force_null=None, encoding=None)[source]¶
Copy data to the specified table.
- Parameters
table_name (str) – The name of the table to copy data to.
source – A path-like object, or a file-like object, or an asynchronous iterable that returns
bytes
, or an object supporting the buffer protocol.columns (list) – An optional list of column names to copy.
schema_name (str) – An optional schema name to qualify the table.
timeout (float) – Optional timeout value in seconds.
The remaining keyword arguments are
COPY
statement options, see COPY statement documentation for details.- Returns
The status string of the COPY command.
Example:
>>> import asyncpg >>> import asyncio >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... result = await con.copy_to_table( ... 'mytable', source='datafile.tbl') ... print(result) ... >>> asyncio.get_event_loop().run_until_complete(run()) 'COPY 140000'
New in version 0.11.0.
- cursor(query, *args, prefetch=None, timeout=None, record_class=None)[source]¶
Return a cursor factory for the specified query.
- Parameters
args – Query arguments.
prefetch (int) – The number of rows the cursor iterator will prefetch (defaults to
50
.)timeout (float) – Optional timeout in seconds.
record_class (type) – If specified, the class to use for records returned by this cursor. Must be a subclass of
Record
. If not specified, a per-connection record_class is used.
- Returns
A
CursorFactory
object.
Changed in version 0.22.0: Added the record_class parameter.
- coroutine execute(query: str, *args, timeout: Optional[float] = None) str [source]¶
Execute an SQL command (or commands).
This method can execute many SQL commands at once, when no arguments are provided.
Example:
>>> await con.execute(''' ... CREATE TABLE mytab (a int); ... INSERT INTO mytab (a) VALUES (100), (200), (300); ... ''') INSERT 0 3 >>> await con.execute(''' ... INSERT INTO mytab (a) VALUES ($1), ($2) ... ''', 10, 20) INSERT 0 2
- Parameters
args – Query arguments.
timeout (float) – Optional timeout value in seconds.
- Return str
Status of the last SQL command.
Changed in version 0.5.4: Made it possible to pass query arguments.
- coroutine executemany(command: str, args, *, timeout: Optional[float] = None)[source]¶
Execute an SQL command for each sequence of arguments in args.
Example:
>>> await con.executemany(''' ... INSERT INTO mytab (a) VALUES ($1, $2, $3); ... ''', [(1, 2, 3), (4, 5, 6)])
- Parameters
command – Command to execute.
args – An iterable containing sequences of arguments.
timeout (float) – Optional timeout value in seconds.
- Return None
This method discards the results of the operations.
New in version 0.7.0.
Changed in version 0.11.0: timeout became a keyword-only parameter.
Changed in version 0.22.0:
executemany()
is now an atomic operation, which means that either all executions succeed, or none at all. This is in contrast to prior versions, where the effect of already-processed iterations would remain in place when an error has occurred, unlessexecutemany()
was called in a transaction.
- coroutine fetch(query, *args, timeout=None, record_class=None) list [source]¶
Run a query and return the results as a list of
Record
.- Parameters
- Return list
A list of
Record
instances. If specified, the actual type of list elements would be record_class.
Changed in version 0.22.0: Added the record_class parameter.
- coroutine fetchrow(query, *args, timeout=None, record_class=None)[source]¶
Run a query and return the first row.
- Parameters
- Returns
The first row as a
Record
instance, or None if no records were returned by the query. If specified, record_class is used as the type for the result value.
Changed in version 0.22.0: Added the record_class parameter.
- coroutine fetchval(query, *args, column=0, timeout=None)[source]¶
Run a query and return a value in the first row.
- Parameters
- Returns
The value of the specified column of the first record, or None if no records were returned by the query.
- get_server_version()[source]¶
Return the version of the connected PostgreSQL server.
The returned value is a named tuple similar to that in
sys.version_info
:>>> con.get_server_version() ServerVersion(major=9, minor=6, micro=1, releaselevel='final', serial=0)
New in version 0.8.0.
- is_closed()[source]¶
Return
True
if the connection is closed,False
otherwise.- Return bool
True
if the connection is closed,False
otherwise.
- is_in_transaction()[source]¶
Return True if Connection is currently inside a transaction.
- Return bool
True if inside transaction, False otherwise.
New in version 0.16.0.
- coroutine prepare(query, *, name=None, timeout=None, record_class=None)[source]¶
Create a prepared statement for the specified query.
- Parameters
query (str) – Text of the query to create a prepared statement for.
name (str) – Optional name of the returned prepared statement. If not specified, the name is auto-generated.
timeout (float) – Optional timeout value in seconds.
record_class (type) – If specified, the class to use for records returned by the prepared statement. Must be a subclass of
Record
. If not specified, a per-connection record_class is used.
- Returns
A
PreparedStatement
instance.
Changed in version 0.22.0: Added the record_class parameter.
Changed in version 0.25.0: Added the name parameter.
- coroutine reload_schema_state()[source]¶
Indicate that the database schema information must be reloaded.
For performance reasons, asyncpg caches certain aspects of the database schema, such as the layout of composite types. Consequently, when the database schema changes, and asyncpg is not able to gracefully recover from an error caused by outdated schema assumptions, an
OutdatedSchemaCacheError
is raised. To prevent the exception, this method may be used to inform asyncpg that the database schema has changed.Example:
>>> import asyncpg >>> import asyncio >>> async def change_type(con): ... result = await con.fetch('SELECT id, info FROM tbl') ... # Change composite's attribute type "int"=>"text" ... await con.execute('ALTER TYPE custom DROP ATTRIBUTE y') ... await con.execute('ALTER TYPE custom ADD ATTRIBUTE y text') ... await con.reload_schema_state() ... for id_, info in result: ... new = (info['x'], str(info['y'])) ... await con.execute( ... 'UPDATE tbl SET info=$2 WHERE id=$1', id_, new) ... >>> async def run(): ... # Initial schema: ... # CREATE TYPE custom AS (x int, y int); ... # CREATE TABLE tbl(id int, info custom); ... con = await asyncpg.connect(user='postgres') ... async with con.transaction(): ... # Prevent concurrent changes in the table ... await con.execute('LOCK TABLE tbl') ... await change_type(con) ... >>> asyncio.get_event_loop().run_until_complete(run())
New in version 0.14.0.
- coroutine remove_listener(channel, callback)[source]¶
Remove a listening callback on the specified channel.
- remove_log_listener(callback)[source]¶
Remove a listening callback for log messages.
New in version 0.12.0.
- remove_termination_listener(callback)[source]¶
Remove a listening callback for connection termination.
- Parameters
callback (callable) – The callable or coroutine function that was passed to
Connection.add_termination_listener()
.
New in version 0.21.0.
- coroutine reset_type_codec(typename, *, schema='public')[source]¶
Reset typename codec to the default implementation.
- Parameters
typename – Name of the data type the codec is for.
schema – Schema name of the data type the codec is for (defaults to
'public'
)
New in version 0.12.0.
- coroutine set_builtin_type_codec(typename, *, schema='public', codec_name, format=None)[source]¶
Set a builtin codec for the specified scalar data type.
This method has two uses. The first is to register a builtin codec for an extension type without a stable OID, such as ‘hstore’. The second use is to declare that an extension type or a user-defined type is wire-compatible with a certain builtin data type and should be exchanged as such.
- Parameters
typename – Name of the data type the codec is for.
schema – Schema name of the data type the codec is for (defaults to
'public'
).codec_name – The name of the builtin codec to use for the type. This should be either the name of a known core type (such as
"int"
), or the name of a supported extension type. Currently, the only supported extension type is"pg_contrib.hstore"
.format – If format is
None
(the default), all formats supported by the target codec are declared to be supported for typename. If format is'text'
or'binary'
, then only the specified format is declared to be supported for typename.
Changed in version 0.18.0: The codec_name argument can be the name of any known core data type. Added the format keyword argument.
- coroutine set_type_codec(typename, *, schema='public', encoder, decoder, format='text')[source]¶
Set an encoder/decoder pair for the specified data type.
- Parameters
typename – Name of the data type the codec is for.
schema – Schema name of the data type the codec is for (defaults to
'public'
)format –
The type of the argument received by the decoder callback, and the type of the encoder callback return value.
If format is
'text'
(the default), the exchange datum is astr
instance containing valid text representation of the data type.If format is
'binary'
, the exchange datum is abytes
instance containing valid _binary_ representation of the data type.If format is
'tuple'
, the exchange datum is a type-specifictuple
of values. The table below lists supported data types and their format for this mode.Type
Tuple layout
interval
(
months
,days
,microseconds
)date
(
date ordinal relative to Jan 1 2000
,)-2^31
for negative infinity timestamp2^31-1
for positive infinity timestamp.timestamp
(
microseconds relative to Jan 1 2000
,)-2^63
for negative infinity timestamp2^63-1
for positive infinity timestamp.timestamp with time zone
(
microseconds relative to Jan 1 2000 UTC
,)-2^63
for negative infinity timestamp2^63-1
for positive infinity timestamp.time
(
microseconds
,)time with time zone
(
microseconds
,time zone offset in seconds
)encoder – Callable accepting a Python object as a single argument and returning a value encoded according to format.
decoder – Callable accepting a single argument encoded according to format and returning a decoded Python object.
Example:
>>> import asyncpg >>> import asyncio >>> import datetime >>> from dateutil.relativedelta import relativedelta >>> async def run(): ... con = await asyncpg.connect(user='postgres') ... def encoder(delta): ... ndelta = delta.normalized() ... return (ndelta.years * 12 + ndelta.months, ... ndelta.days, ... ((ndelta.hours * 3600 + ... ndelta.minutes * 60 + ... ndelta.seconds) * 1000000 + ... ndelta.microseconds)) ... def decoder(tup): ... return relativedelta(months=tup[0], days=tup[1], ... microseconds=tup[2]) ... await con.set_type_codec( ... 'interval', schema='pg_catalog', encoder=encoder, ... decoder=decoder, format='tuple') ... result = await con.fetchval( ... "SELECT '2 years 3 mons 1 day'::interval") ... print(result) ... print(datetime.datetime(2002, 1, 1) + result) ... >>> asyncio.get_event_loop().run_until_complete(run()) relativedelta(years=+2, months=+3, days=+1) 2004-04-02 00:00:00
New in version 0.12.0: Added the
format
keyword argument and support for ‘tuple’ format.Changed in version 0.12.0: The
binary
keyword argument is deprecated in favor offormat
.Changed in version 0.13.0: The
binary
keyword argument was removed in favor offormat
.Note
It is recommended to use the
'binary'
or'tuple'
format whenever possible and if the underlying type supports it. Asyncpg currently does not support text I/O for composite and range types, and some other functionality, such asConnection.copy_to_table()
, does not support types with text codecs.
- transaction(*, isolation=None, readonly=False, deferrable=False)[source]¶
Create a
Transaction
object.Refer to PostgreSQL documentation on the meaning of transaction parameters.
- Parameters
isolation – Transaction isolation mode, can be one of: ‘serializable’, ‘repeatable_read’, ‘read_committed’. If not specified, the behavior is up to the server and session, which is usually
read_committed
.readonly – Specifies whether or not this transaction is read-only.
deferrable – Specifies whether or not this transaction is deferrable.
Prepared Statements¶
Prepared statements are a PostgreSQL feature that can be used to optimize the
performance of queries that are executed more than once. When a query
is prepared by a call to Connection.prepare()
, the server parses,
analyzes and compiles the query allowing to reuse that work once there is
a need to run the same query again.
>>> import asyncpg, asyncio
>>> loop = asyncio.get_event_loop()
>>> async def run():
... conn = await asyncpg.connect()
... stmt = await conn.prepare('''SELECT 2 ^ $1''')
... print(await stmt.fetchval(10))
... print(await stmt.fetchval(20))
...
>>> loop.run_until_complete(run())
1024.0
1048576.0
Note
asyncpg automatically maintains a small LRU cache for queries executed
during calls to the fetch()
, fetchrow()
,
or fetchval()
methods.
Warning
If you are using pgbouncer with pool_mode
set to transaction
or
statement
, prepared statements will not work correctly. See
Why am I getting prepared statement errors? for more information.
- class PreparedStatement[source]¶
A representation of a prepared statement.
- cursor(*args, prefetch=None, timeout=None) asyncpg.cursor.CursorFactory [source]¶
Return a cursor factory for the prepared statement.
- executemany(args, *, timeout: float = None)[source]¶
Execute the statement for each sequence of arguments in args.
- Parameters
args – An iterable containing sequences of arguments.
timeout (float) – Optional timeout value in seconds.
- Return None
This method discards the results of the operations.
New in version 0.22.0.
- explain(*args, analyze=False)[source]¶
Return the execution plan of the statement.
- Parameters
args – Query arguments.
analyze – If
True
, the statement will be executed and the run time statitics added to the return value.
- Returns
An object representing the execution plan. This value is actually a deserialized JSON output of the SQL
EXPLAIN
command.
- fetchval(*args, column=0, timeout=None)[source]¶
Execute the statement and return a value in the first row.
- Parameters
- Returns
The value of the specified column of the first record.
- get_attributes()[source]¶
Return a description of relation attributes (columns).
- Returns
A tuple of
asyncpg.types.Attribute
.
Example:
st = await self.con.prepare(''' SELECT typname, typnamespace FROM pg_type ''') print(st.get_attributes()) # Will print: # (Attribute( # name='typname', # type=Type(oid=19, name='name', kind='scalar', # schema='pg_catalog')), # Attribute( # name='typnamespace', # type=Type(oid=26, name='oid', kind='scalar', # schema='pg_catalog')))
- get_parameters()[source]¶
Return a description of statement parameters types.
- Returns
A tuple of
asyncpg.types.Type
.
Example:
stmt = await connection.prepare('SELECT ($1::int, $2::text)') print(stmt.get_parameters()) # Will print: # (Type(oid=23, name='int4', kind='scalar', schema='pg_catalog'), # Type(oid=25, name='text', kind='scalar', schema='pg_catalog'))
Transactions¶
The most common way to use transactions is through an async with
statement:
async with connection.transaction():
await connection.execute("INSERT INTO mytable VALUES(1, 2, 3)")
asyncpg supports nested transactions (a nested transaction context will create a savepoint.):
async with connection.transaction():
await connection.execute('CREATE TABLE mytab (a int)')
try:
# Create a nested transaction:
async with connection.transaction():
await connection.execute('INSERT INTO mytab (a) VALUES (1), (2)')
# This nested transaction will be automatically rolled back:
raise Exception
except:
# Ignore exception
pass
# Because the nested transaction was rolled back, there
# will be nothing in `mytab`.
assert await connection.fetch('SELECT a FROM mytab') == []
Alternatively, transactions can be used without an async with
block:
tr = connection.transaction()
await tr.start()
try:
...
except:
await tr.rollback()
raise
else:
await tr.commit()
See also the
Connection.transaction()
function.
- class Transaction[source]¶
Represents a transaction or savepoint block.
Transactions are created by calling the
Connection.transaction()
function.- async with c:
start and commit/rollback the transaction or savepoint block automatically when entering and exiting the code inside the context manager block.
Cursors¶
Cursors are useful when there is a need to iterate over the results of
a large query without fetching all rows at once. The cursor interface
provided by asyncpg supports asynchronous iteration via the async for
statement, and also a way to read row chunks and skip forward over the
result set.
To iterate over a cursor using a connection object use
Connection.cursor()
.
To make the iteration efficient, the cursor will prefetch records to
reduce the number of queries sent to the server:
async def iterate(con: Connection):
async with con.transaction():
# Postgres requires non-scrollable cursors to be created
# and used in a transaction.
async for record in con.cursor('SELECT generate_series(0, 100)'):
print(record)
Or, alternatively, you can iterate over the cursor manually (cursor won’t be prefetching any rows):
async def iterate(con: Connection):
async with con.transaction():
# Postgres requires non-scrollable cursors to be created
# and used in a transaction.
# Create a Cursor object
cur = await con.cursor('SELECT generate_series(0, 100)')
# Move the cursor 10 rows forward
await cur.forward(10)
# Fetch one row and print it
print(await cur.fetchrow())
# Fetch a list of 5 rows and print it
print(await cur.fetch(5))
It’s also possible to create cursors from prepared statements:
async def iterate(con: Connection):
# Create a prepared statement that will accept one argument
stmt = await con.prepare('SELECT generate_series(0, $1)')
async with con.transaction():
# Postgres requires non-scrollable cursors to be created
# and used in a transaction.
# Execute the prepared statement passing `10` as the
# argument -- that will generate a series or records
# from 0..10. Iterate over all of them and print every
# record.
async for record in stmt.cursor(10):
print(record)
Note
Cursors created by a call to
Connection.cursor()
or
PreparedStatement.cursor()
are non-scrollable: they can only be read forwards. To create a scrollable
cursor, use the DECLARE ... SCROLL CURSOR
SQL statement directly.
Warning
Cursors created by a call to
Connection.cursor()
or
PreparedStatement.cursor()
cannot be used outside of a transaction. Any such attempt will result in
InterfaceError
.
To create a cursor usable outside of a transaction, use the
DECLARE ... CURSOR WITH HOLD
SQL statement directly.
- class CursorFactory[source]¶
A cursor interface for the results of a query.
A cursor interface can be used to initiate efficient traversal of the results of a large query.
- async for row in c
Execute the statement and iterate over the results asynchronously.
- await c
Execute the statement and return an instance of
Cursor
which can be used to navigate over and fetch subsets of the query results.
- class Cursor[source]¶
An open portal into the results of a query.
- fetch(n, *, timeout=None)[source]¶
Return the next n rows as a list of
Record
objects.- Parameters
timeout (float) – Optional timeout value in seconds.
- Returns
A list of
Record
instances.
Connection Pools¶
- create_pool(dsn=None, *, min_size=10, max_size=10, max_queries=50000, max_inactive_connection_lifetime=300.0, setup=None, init=None, loop=None, connection_class=<class 'asyncpg.connection.Connection'>, record_class=<class 'asyncpg.Record'>, **connect_kwargs)[source]¶
Create a connection pool.
Can be used either with an
async with
block:async with asyncpg.create_pool(user='postgres', command_timeout=60) as pool: await pool.fetch('SELECT 1')
Or to perform multiple operations on a single connection:
async with asyncpg.create_pool(user='postgres', command_timeout=60) as pool: async with pool.acquire() as con: await con.execute(''' CREATE TABLE names ( id serial PRIMARY KEY, name VARCHAR (255) NOT NULL) ''') await con.fetch('SELECT 1')
Or directly with
await
(not recommended):pool = await asyncpg.create_pool(user='postgres', command_timeout=60) con = await pool.acquire() try: await con.fetch('SELECT 1') finally: await pool.release(con)
Warning
Prepared statements and cursors returned by
Connection.prepare()
andConnection.cursor()
become invalid once the connection is released. Likewise, all notification and log listeners are removed, andasyncpg
will issue a warning if there are any listener callbacks registered on a connection that is being released to the pool.- Parameters
dsn (str) – Connection arguments specified using as a single string in the following format:
postgres://user:pass@host:port/database?option=value
.**connect_kwargs – Keyword arguments for the
connect()
function.connection_class (Connection) – The class to use for connections. Must be a subclass of
Connection
.record_class (type) – If specified, the class to use for records returned by queries on the connections in this pool. Must be a subclass of
Record
.min_size (int) – Number of connection the pool will be initialized with.
max_size (int) – Max number of connections in the pool.
max_queries (int) – Number of queries after a connection is closed and replaced with a new connection.
max_inactive_connection_lifetime (float) – Number of seconds after which inactive connections in the pool will be closed. Pass
0
to disable this mechanism.setup (coroutine) – A coroutine to prepare a connection right before it is returned from
Pool.acquire()
. An example use case would be to automatically set up notifications listeners for all connections of a pool.init (coroutine) – A coroutine to initialize a connection when it is created. An example use case would be to setup type codecs with
Connection.set_builtin_type_codec()
orConnection.set_type_codec()
.loop – An asyncio event loop instance. If
None
, the default event loop will be used.
- Returns
An instance of
Pool
.
Changed in version 0.10.0: An
InterfaceError
will be raised on any attempted operation on a released connection.Changed in version 0.13.0: An
InterfaceError
will be raised on any attempted operation on a prepared statement or a cursor created on a connection that has been released to the pool.Changed in version 0.13.0: An
InterfaceWarning
will be produced if there are any active listeners (added viaConnection.add_listener()
orConnection.add_log_listener()
) present on the connection at the moment of its release to the pool.Changed in version 0.22.0: Added the record_class parameter.
- class Pool[source]¶
A connection pool.
Connection pool can be used to manage a set of connections to the database. Connections are first acquired from the pool, then used, and then released back to the pool. Once a connection is released, it’s reset to close all open cursors and other resources except prepared statements.
Pools are created by calling
create_pool()
.- acquire(*, timeout=None)[source]¶
Acquire a database connection from the pool.
- Parameters
timeout (float) – A timeout for acquiring a Connection.
- Returns
An instance of
Connection
.
Can be used in an
await
expression or with anasync with
block.async with pool.acquire() as con: await con.execute(...)
Or:
con = await pool.acquire() try: await con.execute(...) finally: await pool.release(con)
- coroutine close()[source]¶
Attempt to gracefully close all connections in the pool.
Wait until all pool connections are released, close them and shut down the pool. If any error (including cancellation) occurs in
close()
the pool will terminate by callingPool.terminate()
.It is advisable to use
asyncio.wait_for()
to set a timeout.Changed in version 0.16.0:
close()
now waits until all pool connections are released before closing them and the pool. Errors raised inclose()
will cause immediate pool termination.
- coroutine copy_from_query(query, *args, output, timeout=None, format=None, oids=None, delimiter=None, null=None, header=None, quote=None, escape=None, force_quote=None, encoding=None)[source]¶
Copy the results of a query to a file or file-like object.
Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.copy_from_query()
.New in version 0.24.0.
- coroutine copy_from_table(table_name, *, output, columns=None, schema_name=None, timeout=None, format=None, oids=None, delimiter=None, null=None, header=None, quote=None, escape=None, force_quote=None, encoding=None)[source]¶
Copy table contents to a file or file-like object.
Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.copy_from_table()
.New in version 0.24.0.
- coroutine copy_records_to_table(table_name, *, records, columns=None, schema_name=None, timeout=None)[source]¶
Copy a list of records to the specified table using binary COPY.
Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.copy_records_to_table()
.New in version 0.24.0.
- coroutine copy_to_table(table_name, *, source, columns=None, schema_name=None, timeout=None, format=None, oids=None, freeze=None, delimiter=None, null=None, header=None, quote=None, escape=None, force_quote=None, force_not_null=None, force_null=None, encoding=None)[source]¶
Copy data to the specified table.
Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.copy_to_table()
.New in version 0.24.0.
- coroutine execute(query: str, *args, timeout: Optional[float] = None) str [source]¶
Execute an SQL command (or commands).
Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.execute()
.New in version 0.10.0.
- coroutine executemany(command: str, args, *, timeout: Optional[float] = None)[source]¶
Execute an SQL command for each sequence of arguments in args.
Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.executemany()
.New in version 0.10.0.
- coroutine expire_connections()[source]¶
Expire all currently open connections.
Cause all currently open connections to get replaced on the next
acquire()
call.New in version 0.16.0.
- coroutine fetch(query, *args, timeout=None, record_class=None) list [source]¶
Run a query and return the results as a list of
Record
.Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.fetch()
.New in version 0.10.0.
- coroutine fetchrow(query, *args, timeout=None, record_class=None)[source]¶
Run a query and return the first row.
Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.fetchrow()
.New in version 0.10.0.
- coroutine fetchval(query, *args, column=0, timeout=None)[source]¶
Run a query and return a value in the first row.
Pool performs this operation using one of its connections. Other than that, it behaves identically to
Connection.fetchval()
.New in version 0.10.0.
- get_idle_size()[source]¶
Return the current number of idle connections in this pool.
New in version 0.25.0.
- get_max_size()[source]¶
Return the maximum allowed number of connections in this pool.
New in version 0.25.0.
- get_min_size()[source]¶
Return the minimum number of connections in this pool.
New in version 0.25.0.
- coroutine release(connection, *, timeout=None)[source]¶
Release a database connection back to the pool.
- Parameters
connection (Connection) – A
Connection
object to release.timeout (float) – A timeout for releasing the connection. If not specified, defaults to the timeout provided in the corresponding call to the
Pool.acquire()
method.
Changed in version 0.14.0: Added the timeout parameter.
- set_connect_args(dsn=None, **connect_kwargs)[source]¶
Set the new connection arguments for this pool.
The new connection arguments will be used for all subsequent new connection attempts. Existing connections will remain until they expire. Use
Pool.expire_connections()
to expedite the connection expiry.- Parameters
New in version 0.16.0.
Record Objects¶
Each row (or composite type value) returned by calls to fetch*
methods
is represented by an instance of the Record
object.
Record
objects are a tuple-/dict-like hybrid, and allow addressing of
items either by a numeric index or by a field name:
>>> import asyncpg
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> conn = loop.run_until_complete(asyncpg.connect())
>>> r = loop.run_until_complete(conn.fetchrow('''
... SELECT oid, rolname, rolsuper FROM pg_roles WHERE rolname = user'''))
>>> r
<Record oid=16388 rolname='elvis' rolsuper=True>
>>> r['oid']
16388
>>> r[0]
16388
>>> dict(r)
{'oid': 16388, 'rolname': 'elvis', 'rolsuper': True}
>>> tuple(r)
(16388, 'elvis', True)
Note
Record
objects currently cannot be created from Python code.
- class Record¶
A read-only representation of PostgreSQL row.
- len(r)
Return the number of fields in record r.
- r[field]
Return the field of r with field name or index field.
- name in r
Return
True
if record r has a field named name.
- iter(r)
Return an iterator over the values of the record r.
- get(name[, default])
Return the value for name if the record has a field named name, else return default. If default is not given, return
None
.New in version 0.18.
- values()¶
Return an iterator over the record values.
- keys()¶
Return an iterator over the record field names.
- items()¶
Return an iterator over
(field, value)
pairs.
- class ConnectionSettings¶
A read-only collection of Connection settings.
- settings.setting_name
Return the value of the “setting_name” setting. Raises an
AttributeError
if the setting is not defined.Example:
>>> connection.get_settings().client_encoding 'UTF8'
Data Types¶
- class Attribute(name, type)¶
Database relation attribute.
- name¶
Attribute name.
- type¶
Attribute data type
asyncpg.types.Type
.
- class BitString(bitstring: Optional[bytes] = None)[source]¶
Immutable representation of PostgreSQL bit and varbit types.
- classmethod from_int(x: int, length: int, bitorder: Literal[big, little] = 'big', *, signed: bool = False) asyncpg.pgproto.types._BitString [source]¶
Represent the Python int x as a BitString. Acts similarly to int.to_bytes.
- Parameters
x (int) – An integer to represent. Negative integers are represented in two’s complement form, unless the argument signed is False, in which case negative integers raise an OverflowError.
length (int) – The length of the resulting BitString. An OverflowError is raised if the integer is not representable in this many bits.
bitorder – Determines the bit order used in the BitString representation. By default, this function uses Postgres conventions for casting ints to bits. If bitorder is ‘big’, the most significant bit is at the start of the string (this is the same as the default). If bitorder is ‘little’, the most significant bit is at the end of the string.
signed (bool) – Determines whether two’s complement is used in the BitString representation. If signed is False and a negative integer is given, an OverflowError is raised.
- Return BitString
A BitString representing the input integer, in the form specified by the other input args.
New in version 0.18.0.
- to_int(bitorder: Literal[big, little] = 'big', *, signed: bool = False) int [source]¶
Interpret the BitString as a Python int. Acts similarly to int.from_bytes.
- Parameters
bitorder – Determines the bit order used to interpret the BitString. By default, this function uses Postgres conventions for casting bits to ints. If bitorder is ‘big’, the most significant bit is at the start of the string (this is the same as the default). If bitorder is ‘little’, the most significant bit is at the end of the string.
signed (bool) – Determines whether two’s complement is used to interpret the BitString. If signed is False, the returned value is always non-negative.
- Return int
An integer representing the BitString. Information about the BitString’s exact length is lost.
New in version 0.18.0.
- class Box(high: Sequence[float], low: Sequence[float])[source]¶
Immutable representation of PostgreSQL box type.
- class Circle(center: asyncpg.pgproto.types.Point, radius: float)[source]¶
Immutable representation of PostgreSQL circle type.
- class LineSegment(p1: Sequence[float], p2: Sequence[float])[source]¶
Immutable representation of PostgreSQL lseg type.
- class Path(*points: Sequence[float], is_closed: bool = False)[source]¶
Immutable representation of PostgreSQL path type.
- class Point(x: Union[SupportsFloat, SupportsIndex, str, bytes, bytearray], y: Union[SupportsFloat, SupportsIndex, str, bytes, bytearray])[source]¶
Immutable representation of PostgreSQL point type.
- class Polygon(*points: Sequence[float])[source]¶
Immutable representation of PostgreSQL polygon type.
- class Range(lower=None, upper=None, *, lower_inc=True, upper_inc=False, empty=False)[source]¶
Immutable representation of PostgreSQL range type.