Skip to content

Commit a2935ae

Browse files
committed
Add max_inactive_connection_lifetime parameter to Pool.
1 parent 12cce92 commit a2935ae

File tree

3 files changed

+145
-6
lines changed

3 files changed

+145
-6
lines changed

asyncpg/_testbase.py

+2
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ def create_pool(dsn=None, *,
157157
min_size=10,
158158
max_size=10,
159159
max_queries=50000,
160+
max_inactive_connection_lifetime=60.0,
160161
setup=None,
161162
init=None,
162163
loop=None,
@@ -166,6 +167,7 @@ def create_pool(dsn=None, *,
166167
dsn,
167168
min_size=min_size, max_size=max_size,
168169
max_queries=max_queries, loop=loop, setup=setup, init=init,
170+
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
169171
**connect_kwargs)
170172

171173

asyncpg/pool.py

+49-6
Original file line numberDiff line numberDiff line change
@@ -86,19 +86,24 @@ class PoolConnectionHolder:
8686

8787
__slots__ = ('_con', '_pool', '_loop',
8888
'_connect_args', '_connect_kwargs',
89-
'_max_queries', '_setup', '_init')
89+
'_max_queries', '_setup', '_init',
90+
'_max_inactive_time', '_in_use',
91+
'_inactive_callback')
9092

9193
def __init__(self, pool, *, connect_args, connect_kwargs,
92-
max_queries, setup, init):
94+
max_queries, setup, init, max_inactive_time):
9395

9496
self._pool = pool
9597
self._con = None
9698

9799
self._connect_args = connect_args
98100
self._connect_kwargs = connect_kwargs
99101
self._max_queries = max_queries
102+
self._max_inactive_time = max_inactive_time
100103
self._setup = setup
101104
self._init = init
105+
self._inactive_callback = None
106+
self._in_use = False
102107

103108
async def connect(self):
104109
assert self._con is None
@@ -134,6 +139,8 @@ async def acquire(self) -> PoolConnectionProxy:
134139
if self._con is None:
135140
await self.connect()
136141

142+
self._maybe_cancel_inactive_callback()
143+
137144
proxy = PoolConnectionProxy(self, self._con)
138145

139146
if self._setup is not None:
@@ -154,9 +161,13 @@ async def acquire(self) -> PoolConnectionProxy:
154161
self._con = None
155162
raise ex
156163

164+
self._in_use = True
157165
return proxy
158166

159167
async def release(self):
168+
assert self._in_use
169+
self._in_use = False
170+
160171
if self._con.is_closed():
161172
self._con = None
162173

@@ -181,7 +192,13 @@ async def release(self):
181192
self._con = None
182193
raise ex
183194

195+
assert self._inactive_callback is None
196+
if self._max_inactive_time and self._con is not None:
197+
self._inactive_callback = self._pool._loop.call_later(
198+
self._max_inactive_time, self._deactivate_connection)
199+
184200
async def close(self):
201+
self._maybe_cancel_inactive_callback()
185202
if self._con is None:
186203
return
187204
if self._con.is_closed():
@@ -194,6 +211,7 @@ async def close(self):
194211
self._con = None
195212

196213
def terminate(self):
214+
self._maybe_cancel_inactive_callback()
197215
if self._con is None:
198216
return
199217
if self._con.is_closed():
@@ -205,6 +223,18 @@ def terminate(self):
205223
finally:
206224
self._con = None
207225

226+
def _maybe_cancel_inactive_callback(self):
227+
if self._inactive_callback is not None:
228+
self._inactive_callback.cancel()
229+
self._inactive_callback = None
230+
231+
def _deactivate_connection(self):
232+
assert not self._in_use
233+
if self._con is None or self._con.is_closed():
234+
return
235+
self._con.terminate()
236+
self._con = None
237+
208238

209239
class Pool:
210240
"""A connection pool.
@@ -225,6 +255,7 @@ def __init__(self, *connect_args,
225255
min_size,
226256
max_size,
227257
max_queries,
258+
max_inactive_connection_lifetime,
228259
setup,
229260
init,
230261
loop,
@@ -247,6 +278,11 @@ def __init__(self, *connect_args,
247278
if max_queries <= 0:
248279
raise ValueError('max_queries is expected to be greater than zero')
249280

281+
if max_inactive_connection_lifetime < 0:
282+
raise ValueError(
283+
'max_inactive_connection_lifetime is expected to be greater '
284+
'or equal to zero')
285+
250286
self._minsize = min_size
251287
self._maxsize = max_size
252288

@@ -265,6 +301,7 @@ def __init__(self, *connect_args,
265301
connect_args=connect_args,
266302
connect_kwargs=connect_kwargs,
267303
max_queries=max_queries,
304+
max_inactive_time=max_inactive_connection_lifetime,
268305
setup=setup,
269306
init=init)
270307

@@ -511,6 +548,7 @@ def create_pool(dsn=None, *,
511548
min_size=10,
512549
max_size=10,
513550
max_queries=50000,
551+
max_inactive_connection_lifetime=300.0,
514552
setup=None,
515553
init=None,
516554
loop=None,
@@ -548,6 +586,9 @@ def create_pool(dsn=None, *,
548586
:param int max_size: Max number of connections in the pool.
549587
:param int max_queries: Number of queries after a connection is closed
550588
and replaced with a new connection.
589+
:param float max_inactive_connection_lifetime:
590+
Number of seconds after which inactive connections in the
591+
pool will be closed. Pass ``0`` to disable this mechanism.
551592
:param coroutine setup: A coroutine to prepare a connection right before
552593
it is returned from :meth:`~pool.Pool.acquire`.
553594
An example use case would be to automatically
@@ -567,7 +608,9 @@ def create_pool(dsn=None, *,
567608
An :exc:`~asyncpg.exceptions.InterfaceError` will be raised on any
568609
attempted operation on a released connection.
569610
"""
570-
return Pool(dsn,
571-
min_size=min_size, max_size=max_size,
572-
max_queries=max_queries, loop=loop, setup=setup, init=init,
573-
**connect_kwargs)
611+
return Pool(
612+
dsn,
613+
min_size=min_size, max_size=max_size,
614+
max_queries=max_queries, loop=loop, setup=setup, init=init,
615+
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
616+
**connect_kwargs)

tests/test_pool.py

+94
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import os
1212
import platform
1313
import random
14+
import time
1415
import unittest
1516

1617
from asyncpg import _testbase as tb
@@ -457,6 +458,99 @@ async def worker(pool):
457458
finally:
458459
await pool.execute('DROP TABLE exmany')
459460

461+
async def test_pool_max_inactive_time_01(self):
462+
async with self.create_pool(
463+
database='postgres', min_size=1, max_size=1,
464+
max_inactive_connection_lifetime=0.1) as pool:
465+
466+
# Test that it's OK if a query takes longer time to execute
467+
# than `max_inactive_connection_lifetime`.
468+
469+
con = pool._holders[0]._con
470+
471+
for _ in range(3):
472+
await pool.execute('SELECT pg_sleep(0.5)')
473+
self.assertIs(pool._holders[0]._con, con)
474+
475+
self.assertEqual(
476+
await pool.execute('SELECT 1::int'),
477+
'SELECT 1')
478+
self.assertIs(pool._holders[0]._con, con)
479+
480+
async def test_pool_max_inactive_time_02(self):
481+
async with self.create_pool(
482+
database='postgres', min_size=1, max_size=1,
483+
max_inactive_connection_lifetime=0.5) as pool:
484+
485+
# Test that we have a new connection after pool not
486+
# being used longer than `max_inactive_connection_lifetime`.
487+
488+
con = pool._holders[0]._con
489+
490+
self.assertEqual(
491+
await pool.execute('SELECT 1::int'),
492+
'SELECT 1')
493+
self.assertIs(pool._holders[0]._con, con)
494+
495+
await asyncio.sleep(1, loop=self.loop)
496+
self.assertIs(pool._holders[0]._con, None)
497+
498+
self.assertEqual(
499+
await pool.execute('SELECT 1::int'),
500+
'SELECT 1')
501+
self.assertIsNot(pool._holders[0]._con, con)
502+
503+
async def test_pool_max_inactive_time_03(self):
504+
async with self.create_pool(
505+
database='postgres', min_size=1, max_size=1,
506+
max_inactive_connection_lifetime=1) as pool:
507+
508+
# Test that we start counting inactive time *after*
509+
# the connection is being released back to the pool.
510+
511+
con = pool._holders[0]._con
512+
513+
await pool.execute('SELECT pg_sleep(0.5)')
514+
await asyncio.sleep(0.6, loop=self.loop)
515+
516+
self.assertIs(pool._holders[0]._con, con)
517+
518+
self.assertEqual(
519+
await pool.execute('SELECT 1::int'),
520+
'SELECT 1')
521+
self.assertIs(pool._holders[0]._con, con)
522+
523+
async def test_pool_max_inactive_time_04(self):
524+
# Chaos test for max_inactive_connection_lifetime.
525+
DURATION = 2.0
526+
START = time.monotonic()
527+
N = 0
528+
529+
async def worker(pool):
530+
nonlocal N
531+
await asyncio.sleep(random.random() / 10 + 0.1, loop=self.loop)
532+
async with pool.acquire() as con:
533+
if random.random() > 0.5:
534+
await con.execute('SELECT pg_sleep({:.2f})'.format(
535+
random.random() / 10))
536+
self.assertEqual(
537+
await con.fetchval('SELECT 42::int'),
538+
42)
539+
540+
if time.monotonic() - START < DURATION:
541+
await worker(pool)
542+
543+
N += 1
544+
545+
async with self.create_pool(
546+
database='postgres', min_size=10, max_size=30,
547+
max_inactive_connection_lifetime=0.1) as pool:
548+
549+
workers = [worker(pool) for _ in range(50)]
550+
await asyncio.gather(*workers, loop=self.loop)
551+
552+
self.assertGreater(N, 50)
553+
460554

461555
@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
462556
class TestHostStandby(tb.ConnectedTestCase):

0 commit comments

Comments
 (0)