# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
import json
from . import connresource
from . import cursor
from . import exceptions
[docs]
class PreparedStatement(connresource.ConnectionResource):
"""A representation of a prepared statement."""
__slots__ = ('_state', '_query', '_last_status')
def __init__(self, connection, query, state):
super().__init__(connection)
self._state = state
self._query = query
state.attach()
self._last_status = None
[docs]
@connresource.guarded
def get_name(self) -> str:
"""Return the name of this prepared statement.
.. versionadded:: 0.25.0
"""
return self._state.name
[docs]
@connresource.guarded
def get_query(self) -> str:
"""Return the text of the query for this prepared statement.
Example::
stmt = await connection.prepare('SELECT $1::int')
assert stmt.get_query() == "SELECT $1::int"
"""
return self._query
[docs]
@connresource.guarded
def get_statusmsg(self) -> str:
"""Return the status of the executed command.
Example::
stmt = await connection.prepare('CREATE TABLE mytab (a int)')
await stmt.fetch()
assert stmt.get_statusmsg() == "CREATE TABLE"
"""
if self._last_status is None:
return self._last_status
return self._last_status.decode()
[docs]
@connresource.guarded
def get_parameters(self):
"""Return a description of statement parameters types.
:return: A tuple of :class:`asyncpg.types.Type`.
Example::
stmt = await connection.prepare('SELECT ($1::int, $2::text)')
print(stmt.get_parameters())
# Will print:
# (Type(oid=23, name='int4', kind='scalar', schema='pg_catalog'),
# Type(oid=25, name='text', kind='scalar', schema='pg_catalog'))
"""
return self._state._get_parameters()
[docs]
@connresource.guarded
def get_attributes(self):
"""Return a description of relation attributes (columns).
:return: A tuple of :class:`asyncpg.types.Attribute`.
Example::
st = await self.con.prepare('''
SELECT typname, typnamespace FROM pg_type
''')
print(st.get_attributes())
# Will print:
# (Attribute(
# name='typname',
# type=Type(oid=19, name='name', kind='scalar',
# schema='pg_catalog')),
# Attribute(
# name='typnamespace',
# type=Type(oid=26, name='oid', kind='scalar',
# schema='pg_catalog')))
"""
return self._state._get_attributes()
[docs]
@connresource.guarded
def cursor(self, *args, prefetch=None,
timeout=None) -> cursor.CursorFactory:
"""Return a *cursor factory* for the prepared statement.
:param args: Query arguments.
:param int prefetch: The number of rows the *cursor iterator*
will prefetch (defaults to ``50``.)
:param float timeout: Optional timeout in seconds.
:return: A :class:`~cursor.CursorFactory` object.
"""
return cursor.CursorFactory(
self._connection,
self._query,
self._state,
args,
prefetch,
timeout,
self._state.record_class,
)
[docs]
@connresource.guarded
async def explain(self, *args, analyze=False):
"""Return the execution plan of the statement.
:param args: Query arguments.
:param analyze: If ``True``, the statement will be executed and
the run time statitics added to the return value.
:return: An object representing the execution plan. This value
is actually a deserialized JSON output of the SQL
``EXPLAIN`` command.
"""
query = 'EXPLAIN (FORMAT JSON, VERBOSE'
if analyze:
query += ', ANALYZE) '
else:
query += ') '
query += self._state.query
if analyze:
# From PostgreSQL docs:
# Important: Keep in mind that the statement is actually
# executed when the ANALYZE option is used. Although EXPLAIN
# will discard any output that a SELECT would return, other
# side effects of the statement will happen as usual. If you
# wish to use EXPLAIN ANALYZE on an INSERT, UPDATE, DELETE,
# MERGE, CREATE TABLE AS, or EXECUTE statement without letting
# the command affect your data, use this approach:
# BEGIN;
# EXPLAIN ANALYZE ...;
# ROLLBACK;
tr = self._connection.transaction()
await tr.start()
try:
data = await self._connection.fetchval(query, *args)
finally:
await tr.rollback()
else:
data = await self._connection.fetchval(query, *args)
return json.loads(data)
[docs]
@connresource.guarded
async def fetch(self, *args, timeout=None):
r"""Execute the statement and return a list of :class:`Record` objects.
:param str query: Query text
:param args: Query arguments
:param float timeout: Optional timeout value in seconds.
:return: A list of :class:`Record` instances.
"""
data = await self.__bind_execute(args, 0, timeout)
return data
[docs]
@connresource.guarded
async def fetchval(self, *args, column=0, timeout=None):
"""Execute the statement and return a value in the first row.
:param args: Query arguments.
:param int column: Numeric index within the record of the value to
return (defaults to 0).
:param float timeout: Optional timeout value in seconds.
If not specified, defaults to the value of
``command_timeout`` argument to the ``Connection``
instance constructor.
:return: The value of the specified column of the first record.
"""
data = await self.__bind_execute(args, 1, timeout)
if not data:
return None
return data[0][column]
[docs]
@connresource.guarded
async def fetchrow(self, *args, timeout=None):
"""Execute the statement and return the first row.
:param str query: Query text
:param args: Query arguments
:param float timeout: Optional timeout value in seconds.
:return: The first row as a :class:`Record` instance.
"""
data = await self.__bind_execute(args, 1, timeout)
if not data:
return None
return data[0]
[docs]
@connresource.guarded
async def fetchmany(self, args, *, timeout=None):
"""Execute the statement and return a list of :class:`Record` objects.
:param args: Query arguments.
:param float timeout: Optional timeout value in seconds.
:return: A list of :class:`Record` instances.
.. versionadded:: 0.30.0
"""
return await self.__do_execute(
lambda protocol: protocol.bind_execute_many(
self._state,
args,
portal_name='',
timeout=timeout,
return_rows=True,
)
)
[docs]
@connresource.guarded
async def executemany(self, args, *, timeout: float=None):
"""Execute the statement for each sequence of arguments in *args*.
:param args: An iterable containing sequences of arguments.
:param float timeout: Optional timeout value in seconds.
:return None: This method discards the results of the operations.
.. versionadded:: 0.22.0
"""
return await self.__do_execute(
lambda protocol: protocol.bind_execute_many(
self._state,
args,
portal_name='',
timeout=timeout,
return_rows=False,
))
async def __do_execute(self, executor):
protocol = self._connection._protocol
try:
return await executor(protocol)
except exceptions.OutdatedSchemaCacheError:
await self._connection.reload_schema_state()
# We can not find all manually created prepared statements, so just
# drop known cached ones in the `self._connection`.
# Other manually created prepared statements will fail and
# invalidate themselves (unfortunately, clearing caches again).
self._state.mark_closed()
raise
async def __bind_execute(self, args, limit, timeout):
data, status, _ = await self.__do_execute(
lambda protocol: protocol.bind_execute(
self._state, args, '', limit, True, timeout))
self._last_status = status
return data
def _check_open(self, meth_name):
if self._state.closed:
raise exceptions.InterfaceError(
'cannot call PreparedStmt.{}(): '
'the prepared statement is closed'.format(meth_name))
def _check_conn_validity(self, meth_name):
self._check_open(meth_name)
super()._check_conn_validity(meth_name)
def __del__(self):
self._state.detach()
self._connection._maybe_gc_stmt(self._state)