Skip to content

Commit 244c297

Browse files
teror4uksCaselIT
authored andcommitted
Add name_func optional attribute for asyncpg adapter
I faced an issue related to pg bouncer and prepared statement cache flow in asyncpg dialect. Regarding this discussion #6467 I prepared PR to support an optional parameter `name` in prepared statement which is allowed, since 0.25.0 version in `asyncpg` MagicStack/asyncpg#846 **UPD:** the issue with proposal: #9608 ### Description Added optional parameter `name_func` to `AsyncAdapt_asyncpg_connection` class which will call on the `self._connection.prepare()` function and populate a unique name. so in general instead this ```python from uuid import uuid4 from asyncpg import Connection class CConnection(Connection): def _get_unique_id(self, prefix: str) -> str: return f'__asyncpg_{prefix}_{uuid4()}__' engine = create_async_engine(..., connect_args={ 'connection_class': CConnection, }, ) ``` would be enough ```python from uuid import uuid4 engine = create_async_engine(..., connect_args={ 'name_func': lambda: f'__asyncpg_{uuid4()}__', }, ) ``` ### Checklist <!-- go over following points. check them with an `x` if they do apply, (they turn into clickable checkboxes once the PR is submitted, so no need to do everything at once) --> This pull request is: - [ ] A documentation / typographical error fix - Good to go, no issue or tests are needed - [ ] A short code fix - please include the issue number, and create an issue if none exists, which must include a complete example of the issue. one line code fixes without an issue and demonstration will not be accepted. - Please include: `Fixes: #<issue number>` in the commit message - please include tests. one line code fixes without tests will not be accepted. - [x] A new feature implementation - please include the issue number, and create an issue if none exists, which must include a complete example of how the feature would look. - Please include: `Fixes: #<issue number>` in the commit message - please include tests. **Have a nice day!** Fixes: #9608 Closes: #9607 Pull-request: #9607 Pull-request-sha: b4bc8d3 Change-Id: Icd753366cba166b8a60d1c8566377ec8335cd828
1 parent fc2bcea commit 244c297

File tree

3 files changed

+97
-3
lines changed

3 files changed

+97
-3
lines changed
+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
.. change::
2+
:tags: usecase, postgresql
3+
:tickets: 9608
4+
5+
Added ``prepared_statement_name_func`` connection argument option in the
6+
asyncpg dialect. This option allow passing a callable used to customize
7+
the name of the prepared statement that will be created by the driver
8+
when executing the queries.
9+
Pull request curtesy of Pavel Sirotkin.
10+
11+
.. seealso::
12+
13+
:ref:`asyncpg_prepared_statement_name`

lib/sqlalchemy/dialects/postgresql/asyncpg.py

+67-3
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,44 @@
9898
stale, nor can it retry the statement as the PostgreSQL transaction is
9999
invalidated when these errors occur.
100100
101+
.. _asyncpg_prepared_statement_name:
102+
103+
Prepared Statement Name
104+
-----------------------
105+
106+
By default, asyncpg enumerates prepared statements in numeric order, which
107+
can lead to errors if a name has already been taken for another prepared
108+
statement. This issue can arise if your application uses database proxies
109+
such as PgBouncer to handle connections. One possible workaround is to
110+
use dynamic prepared statement names, which asyncpg now supports through
111+
an optional name value for the statement name. This allows you to
112+
generate your own unique names that won't conflict with existing ones.
113+
To achieve this, you can provide a function that will be called every time
114+
a prepared statement is prepared::
115+
116+
from uuid import uuid4
117+
118+
engine = create_async_engine(
119+
"postgresql+asyncpg://user:pass@hostname/dbname",
120+
poolclass=NullPool,
121+
connect_args={
122+
'prepared_statement_name_func': lambda: f'__asyncpg_{uuid4()}__',
123+
},
124+
)
125+
126+
.. seealso::
127+
128+
https://github.com/MagicStack/asyncpg/issues/837
129+
130+
https://github.com/sqlalchemy/sqlalchemy/issues/6467
131+
132+
.. warning:: To prevent a buildup of useless prepared statements in
133+
your application, it's important to use the NullPool poolclass and
134+
PgBouncer with a configured `DISCARD https://www.postgresql.org/docs/current/sql-discard.html`_
135+
setup. The DISCARD command is used to release resources held by the db connection,
136+
including prepared statements. Without proper setup, prepared statements can
137+
accumulate quickly and cause performance issues.
138+
101139
Disabling the PostgreSQL JIT to improve ENUM datatype handling
102140
---------------------------------------------------------------
103141
@@ -642,13 +680,20 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
642680
"_transaction",
643681
"_started",
644682
"_prepared_statement_cache",
683+
"_prepared_statement_name_func",
645684
"_invalidate_schema_cache_asof",
646685
"_execute_mutex",
647686
)
648687

649688
await_ = staticmethod(await_only)
650689

651-
def __init__(self, dbapi, connection, prepared_statement_cache_size=100):
690+
def __init__(
691+
self,
692+
dbapi,
693+
connection,
694+
prepared_statement_cache_size=100,
695+
prepared_statement_name_func=None,
696+
):
652697
self.dbapi = dbapi
653698
self._connection = connection
654699
self.isolation_level = self._isolation_setting = "read_committed"
@@ -666,6 +711,11 @@ def __init__(self, dbapi, connection, prepared_statement_cache_size=100):
666711
else:
667712
self._prepared_statement_cache = None
668713

714+
if prepared_statement_name_func:
715+
self._prepared_statement_name_func = prepared_statement_name_func
716+
else:
717+
self._prepared_statement_name_func = self._default_name_func
718+
669719
async def _check_type_cache_invalidation(self, invalidate_timestamp):
670720
if invalidate_timestamp > self._invalidate_schema_cache_asof:
671721
await self._connection.reload_schema_state()
@@ -676,7 +726,9 @@ async def _prepare(self, operation, invalidate_timestamp):
676726

677727
cache = self._prepared_statement_cache
678728
if cache is None:
679-
prepared_stmt = await self._connection.prepare(operation)
729+
prepared_stmt = await self._connection.prepare(
730+
operation, name=self._prepared_statement_name_func()
731+
)
680732
attributes = prepared_stmt.get_attributes()
681733
return prepared_stmt, attributes
682734

@@ -692,7 +744,9 @@ async def _prepare(self, operation, invalidate_timestamp):
692744
if cached_timestamp > invalidate_timestamp:
693745
return prepared_stmt, attributes
694746

695-
prepared_stmt = await self._connection.prepare(operation)
747+
prepared_stmt = await self._connection.prepare(
748+
operation, name=self._prepared_statement_name_func()
749+
)
696750
attributes = prepared_stmt.get_attributes()
697751
cache[operation] = (prepared_stmt, attributes, time.time())
698752

@@ -792,6 +846,10 @@ def close(self):
792846
def terminate(self):
793847
self._connection.terminate()
794848

849+
@staticmethod
850+
def _default_name_func():
851+
return None
852+
795853

796854
class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection):
797855
__slots__ = ()
@@ -809,17 +867,23 @@ def connect(self, *arg, **kw):
809867
prepared_statement_cache_size = kw.pop(
810868
"prepared_statement_cache_size", 100
811869
)
870+
prepared_statement_name_func = kw.pop(
871+
"prepared_statement_name_func", None
872+
)
873+
812874
if util.asbool(async_fallback):
813875
return AsyncAdaptFallback_asyncpg_connection(
814876
self,
815877
await_fallback(self.asyncpg.connect(*arg, **kw)),
816878
prepared_statement_cache_size=prepared_statement_cache_size,
879+
prepared_statement_name_func=prepared_statement_name_func,
817880
)
818881
else:
819882
return AsyncAdapt_asyncpg_connection(
820883
self,
821884
await_only(self.asyncpg.connect(*arg, **kw)),
822885
prepared_statement_cache_size=prepared_statement_cache_size,
886+
prepared_statement_name_func=prepared_statement_name_func,
823887
)
824888

825889
class Error(Exception):

test/dialect/postgresql/test_async_pg_py3k.py

+17
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import random
2+
import uuid
23

34
from sqlalchemy import Column
45
from sqlalchemy import exc
@@ -272,3 +273,19 @@ async def test_codec_registration(
272273
await conn.close()
273274

274275
eq_(codec_meth.mock_calls, [mock.call(adapted_conn)])
276+
277+
@async_test
278+
async def test_name_connection_func(self, metadata, async_testing_engine):
279+
cache = []
280+
281+
def name_f():
282+
name = str(uuid.uuid4())
283+
cache.append(name)
284+
return name
285+
286+
engine = async_testing_engine(
287+
options={"connect_args": {"prepared_statement_name_func": name_f}},
288+
)
289+
async with engine.begin() as conn:
290+
await conn.execute(select(1))
291+
assert len(cache) > 0

0 commit comments

Comments
 (0)