Skip to content

Commit 690048d

Browse files
authored
Batch executemany (#295)
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take advantage of `writelines()`. A single `Sync` is sent at last, so that all args live in the same transaction. pgbench results of inserting 1000 rows per query with executemany() on Python 3.6 of 2.2GHz 2015 MacBook Air (best out of 5 runs): asyncpg 0.18.2: 710 queries in 30.31 seconds Latency: min 341.88ms; max 636.29ms; mean 425.022ms; std: 39.782ms (9.36%) Latency distribution: 25% under 401.67ms; 50% under 414.26ms; 75% under 435.37ms; 90% under 478.39ms; 99% under 576.638ms; 99.99% under 636.299ms Queries/sec: 23.42 Rows/sec: 23424.32 This patch: 4125 queries in 30.02 seconds Latency: min 23.14ms; max 734.91ms; mean 72.723ms; std: 49.226ms (67.69%) Latency distribution: 25% under 59.958ms; 50% under 65.414ms; 75% under 71.538ms; 90% under 80.95ms; 99% under 175.375ms; 99.99% under 734.912ms Queries/sec: 137.39 Rows/sec: 137389.64 This is a backwards incompatible change. Here `executemany()` becomes atomic, whereas previously any error in the middle of argument iteration would retain the results of the preceding set of arguments unless an explicit transaction block was used. Closes: #289
1 parent 8b313bd commit 690048d

File tree

8 files changed

+390
-100
lines changed

8 files changed

+390
-100
lines changed

.travis.yml

+4
Original file line numberDiff line numberDiff line change
@@ -197,18 +197,22 @@ jobs:
197197

198198
- name: "OSX py 3.5"
199199
os: osx
200+
osx_image: xcode10.2
200201
env: BUILD=tests,wheels PYTHON_VERSION=3.5.9 PGVERSION=12
201202

202203
- name: "OSX py 3.6"
203204
os: osx
205+
osx_image: xcode10.2
204206
env: BUILD=tests,wheels PYTHON_VERSION=3.6.10 PGVERSION=12
205207

206208
- name: "OSX py 3.7"
207209
os: osx
210+
osx_image: xcode10.2
208211
env: BUILD=tests,wheels PYTHON_VERSION=3.7.7 PGVERSION=12
209212

210213
- name: "OSX py 3.8"
211214
os: osx
215+
osx_image: xcode10.2
212216
env: BUILD=tests,wheels PYTHON_VERSION=3.8.3 PGVERSION=12
213217

214218
cache:

asyncpg/connection.py

+10
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,13 @@ async def executemany(self, command: str, args, *, timeout: float=None):
331331
332332
.. versionchanged:: 0.11.0
333333
`timeout` became a keyword-only parameter.
334+
335+
.. versionchanged:: 0.22.0
336+
The execution was changed to be in an implicit transaction if there
337+
was no explicit transaction, so that it will no longer end up with
338+
partial success. If you still need the previous behavior to
339+
progressively execute many args, please use a loop with prepared
340+
statement instead.
334341
"""
335342
self._check_open()
336343
return await self._executemany(command, args, timeout)
@@ -1010,6 +1017,9 @@ async def _copy_in(self, copy_stmt, source, timeout):
10101017
f = source
10111018
elif isinstance(source, collections.abc.AsyncIterable):
10121019
# assuming calling output returns an awaitable.
1020+
# copy_in() is designed to handle very large amounts of data, and
1021+
# the source async iterable is allowed to return an arbitrary
1022+
# amount of data on every iteration.
10131023
reader = source
10141024
else:
10151025
# assuming source is an instance supporting the buffer protocol.

asyncpg/prepared_stmt.py

+21-3
Original file line numberDiff line numberDiff line change
@@ -202,11 +202,24 @@ async def fetchrow(self, *args, timeout=None):
202202
return None
203203
return data[0]
204204

205-
async def __bind_execute(self, args, limit, timeout):
205+
@connresource.guarded
206+
async def executemany(self, args, *, timeout: float=None):
207+
"""Execute the statement for each sequence of arguments in *args*.
208+
209+
:param args: An iterable containing sequences of arguments.
210+
:param float timeout: Optional timeout value in seconds.
211+
:return None: This method discards the results of the operations.
212+
213+
.. versionadded:: 0.22.0
214+
"""
215+
return await self.__do_execute(
216+
lambda protocol: protocol.bind_execute_many(
217+
self._state, args, '', timeout))
218+
219+
async def __do_execute(self, executor):
206220
protocol = self._connection._protocol
207221
try:
208-
data, status, _ = await protocol.bind_execute(
209-
self._state, args, '', limit, True, timeout)
222+
return await executor(protocol)
210223
except exceptions.OutdatedSchemaCacheError:
211224
await self._connection.reload_schema_state()
212225
# We can not find all manually created prepared statements, so just
@@ -215,6 +228,11 @@ async def __bind_execute(self, args, limit, timeout):
215228
# invalidate themselves (unfortunately, clearing caches again).
216229
self._state.mark_closed()
217230
raise
231+
232+
async def __bind_execute(self, args, limit, timeout):
233+
data, status, _ = await self.__do_execute(
234+
lambda protocol: protocol.bind_execute(
235+
self._state, args, '', limit, True, timeout))
218236
self._last_status = status
219237
return data
220238

asyncpg/protocol/consts.pxi

+2
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,5 @@
88
DEF _MAXINT32 = 2**31 - 1
99
DEF _COPY_BUFFER_SIZE = 524288
1010
DEF _COPY_SIGNATURE = b"PGCOPY\n\377\r\n\0"
11+
DEF _EXECUTE_MANY_BUF_NUM = 4
12+
DEF _EXECUTE_MANY_BUF_SIZE = 32768

asyncpg/protocol/coreproto.pxd

+10-2
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ cdef class CoreProtocol:
114114
# True - completed, False - suspended
115115
bint result_execute_completed
116116

117+
cpdef is_in_transaction(self)
117118
cdef _process__auth(self, char mtype)
118119
cdef _process__prepare(self, char mtype)
119120
cdef _process__bind_execute(self, char mtype)
@@ -146,6 +147,7 @@ cdef class CoreProtocol:
146147
cdef _auth_password_message_sasl_continue(self, bytes server_response)
147148

148149
cdef _write(self, buf)
150+
cdef _writelines(self, list buffers)
149151

150152
cdef _read_server_messages(self)
151153

@@ -155,9 +157,13 @@ cdef class CoreProtocol:
155157

156158
cdef _ensure_connected(self)
157159

160+
cdef WriteBuffer _build_parse_message(self, str stmt_name, str query)
158161
cdef WriteBuffer _build_bind_message(self, str portal_name,
159162
str stmt_name,
160163
WriteBuffer bind_data)
164+
cdef WriteBuffer _build_empty_bind_data(self)
165+
cdef WriteBuffer _build_execute_message(self, str portal_name,
166+
int32_t limit)
161167

162168

163169
cdef _connect(self)
@@ -166,8 +172,10 @@ cdef class CoreProtocol:
166172
WriteBuffer bind_data, int32_t limit)
167173
cdef _bind_execute(self, str portal_name, str stmt_name,
168174
WriteBuffer bind_data, int32_t limit)
169-
cdef _bind_execute_many(self, str portal_name, str stmt_name,
170-
object bind_data)
175+
cdef bint _bind_execute_many(self, str portal_name, str stmt_name,
176+
object bind_data)
177+
cdef bint _bind_execute_many_more(self, bint first=*)
178+
cdef _bind_execute_many_fail(self, object error, bint first=*)
171179
cdef _bind(self, str portal_name, str stmt_name,
172180
WriteBuffer bind_data)
173181
cdef _execute(self, str portal_name, int32_t limit)

0 commit comments

Comments
 (0)