Skip to content

Commit b4bc8d3

Browse files
committed
Add name_func optional attribute for asyncpg adapter
Fixes: #9608
1 parent 95f531d commit b4bc8d3

File tree

2 files changed

+84
-3
lines changed

2 files changed

+84
-3
lines changed

lib/sqlalchemy/dialects/postgresql/asyncpg.py

+67-3
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,44 @@
9898
stale, nor can it retry the statement as the PostgreSQL transaction is
9999
invalidated when these errors occur.
100100
101+
.. _asyncpg_prepared_statement_name:
102+
103+
Prepared Statement Name
104+
-----------------------
105+
106+
By default, asyncpg enumerates prepared statements in numeric order, which
107+
can lead to errors if a name has already been taken for another prepared
108+
statement. This issue can arise if your application uses database proxies
109+
such as PgBouncer to handle connections. One possible workaround is to
110+
use dynamic prepared statement names, which asyncpg now supports through
111+
an optional name value for the statement name. This allows you to
112+
generate your own unique names that won't conflict with existing ones.
113+
To achieve this, you can provide a function that will be called every time
114+
a prepared statement is prepared::
115+
116+
from uuid import uuid4
117+
118+
engine = create_async_engine(
119+
"postgresql+asyncpg://user:pass@hostname/dbname",
120+
poolclass=NullPool,
121+
connect_args={
122+
'pepared_statement_name_func': lambda: f'__asyncpg_{uuid4()}__',
123+
},
124+
)
125+
126+
.. seealso::
127+
128+
https://github.com/MagicStack/asyncpg/issues/837
129+
130+
https://github.com/sqlalchemy/sqlalchemy/issues/6467
131+
132+
.. warning:: To prevent a buildup of useless prepared statements in
133+
your application, it's important to use the NullPool poolclass and
134+
PgBouncer with a configured `DISCARD https://www.postgresql.org/docs/current/sql-discard.html`_
135+
setup. The DISCARD command is used to release resources held by the db connection,
136+
including prepared statements. Without proper setup, prepared statements can
137+
accumulate quickly and cause performance issues.
138+
101139
Disabling the PostgreSQL JIT to improve ENUM datatype handling
102140
---------------------------------------------------------------
103141
@@ -642,13 +680,20 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
642680
"_transaction",
643681
"_started",
644682
"_prepared_statement_cache",
683+
"_prepared_statement_name_func",
645684
"_invalidate_schema_cache_asof",
646685
"_execute_mutex",
647686
)
648687

649688
await_ = staticmethod(await_only)
650689

651-
def __init__(self, dbapi, connection, prepared_statement_cache_size=100):
690+
def __init__(
691+
self,
692+
dbapi,
693+
connection,
694+
prepared_statement_cache_size=100,
695+
prepared_statement_name_func=None,
696+
):
652697
self.dbapi = dbapi
653698
self._connection = connection
654699
self.isolation_level = self._isolation_setting = "read_committed"
@@ -666,6 +711,11 @@ def __init__(self, dbapi, connection, prepared_statement_cache_size=100):
666711
else:
667712
self._prepared_statement_cache = None
668713

714+
if prepared_statement_name_func:
715+
self._prepared_statement_name_func = prepared_statement_name_func
716+
else:
717+
self._prepared_statement_name_func = self._default_name_func
718+
669719
async def _check_type_cache_invalidation(self, invalidate_timestamp):
670720
if invalidate_timestamp > self._invalidate_schema_cache_asof:
671721
await self._connection.reload_schema_state()
@@ -676,7 +726,9 @@ async def _prepare(self, operation, invalidate_timestamp):
676726

677727
cache = self._prepared_statement_cache
678728
if cache is None:
679-
prepared_stmt = await self._connection.prepare(operation)
729+
prepared_stmt = await self._connection.prepare(
730+
operation, name=self._prepared_statement_name_func()
731+
)
680732
attributes = prepared_stmt.get_attributes()
681733
return prepared_stmt, attributes
682734

@@ -692,7 +744,9 @@ async def _prepare(self, operation, invalidate_timestamp):
692744
if cached_timestamp > invalidate_timestamp:
693745
return prepared_stmt, attributes
694746

695-
prepared_stmt = await self._connection.prepare(operation)
747+
prepared_stmt = await self._connection.prepare(
748+
operation, name=self._prepared_statement_name_func()
749+
)
696750
attributes = prepared_stmt.get_attributes()
697751
cache[operation] = (prepared_stmt, attributes, time.time())
698752

@@ -792,6 +846,10 @@ def close(self):
792846
def terminate(self):
793847
self._connection.terminate()
794848

849+
@staticmethod
850+
def _default_name_func():
851+
return None
852+
795853

796854
class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection):
797855
__slots__ = ()
@@ -809,17 +867,23 @@ def connect(self, *arg, **kw):
809867
prepared_statement_cache_size = kw.pop(
810868
"prepared_statement_cache_size", 100
811869
)
870+
prepared_statement_name_func = kw.pop(
871+
"prepared_statement_name_func", None
872+
)
873+
812874
if util.asbool(async_fallback):
813875
return AsyncAdaptFallback_asyncpg_connection(
814876
self,
815877
await_fallback(self.asyncpg.connect(*arg, **kw)),
816878
prepared_statement_cache_size=prepared_statement_cache_size,
879+
prepared_statement_name_func=prepared_statement_name_func,
817880
)
818881
else:
819882
return AsyncAdapt_asyncpg_connection(
820883
self,
821884
await_only(self.asyncpg.connect(*arg, **kw)),
822885
prepared_statement_cache_size=prepared_statement_cache_size,
886+
prepared_statement_name_func=prepared_statement_name_func,
823887
)
824888

825889
class Error(Exception):

test/dialect/postgresql/test_async_pg_py3k.py

+17
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import random
2+
import uuid
23

34
from sqlalchemy import Column
45
from sqlalchemy import exc
@@ -272,3 +273,19 @@ async def test_codec_registration(
272273
await conn.close()
273274

274275
eq_(codec_meth.mock_calls, [mock.call(adapted_conn)])
276+
277+
@async_test
278+
async def test_name_connection_func(self, metadata, async_testing_engine):
279+
cache = []
280+
281+
def name_f():
282+
name = str(uuid.uuid4())
283+
cache.append(name)
284+
return name
285+
286+
engine = async_testing_engine(
287+
options={"connect_args": {"prepared_statement_name_func": name_f}},
288+
)
289+
async with engine.begin() as conn:
290+
await conn.execute(select(1))
291+
assert len(cache) > 0

0 commit comments

Comments
 (0)