Skip to content

Commit c91c81d

Browse files
author
Elvis Pranskevichus
authored
Merge pull request #45 from MagicStack/executemany
Implement the Connection.executemany() method
2 parents 2707ca3 + 1f4f7df commit c91c81d

File tree

5 files changed

+201
-11
lines changed

5 files changed

+201
-11
lines changed

asyncpg/connection.py

+21
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,27 @@ async def execute(self, query: str, *args, timeout: float=None) -> str:
148148
True, timeout)
149149
return status.decode()
150150

151+
async def executemany(self, command: str, args, timeout: float=None):
152+
"""Execute an SQL *command* for each sequence of arguments in *args*.
153+
154+
Example:
155+
156+
.. code-block:: pycon
157+
158+
>>> await con.executemany('''
159+
... INSERT INTO mytab (a) VALUES ($1, $2, $3);
160+
... ''', [(1, 2, 3), (4, 5, 6)])
161+
162+
:param command: Command to execute.
163+
:args: An iterable containing sequences of arguments.
164+
:param float timeout: Optional timeout value in seconds.
165+
:return None: This method discards the results of the operations.
166+
167+
.. versionadded:: 0.7.0
168+
"""
169+
stmt = await self._get_statement(command, timeout)
170+
return await self._protocol.bind_execute_many(stmt, args, '', timeout)
171+
151172
async def _get_statement(self, query, timeout):
152173
cache = self._stmt_cache_max_size > 0
153174

asyncpg/protocol/coreproto.pxd

+16-4
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ cdef enum ProtocolState:
2121
PROTOCOL_AUTH = 10
2222
PROTOCOL_PREPARE = 11
2323
PROTOCOL_BIND_EXECUTE = 12
24-
PROTOCOL_CLOSE_STMT_PORTAL = 13
25-
PROTOCOL_SIMPLE_QUERY = 14
26-
PROTOCOL_EXECUTE = 15
27-
PROTOCOL_BIND = 16
24+
PROTOCOL_BIND_EXECUTE_MANY = 13
25+
PROTOCOL_CLOSE_STMT_PORTAL = 14
26+
PROTOCOL_SIMPLE_QUERY = 15
27+
PROTOCOL_EXECUTE = 16
28+
PROTOCOL_BIND = 17
2829

2930

3031
cdef enum AuthenticationMessage:
@@ -67,6 +68,12 @@ cdef class CoreProtocol:
6768
cdef:
6869
ReadBuffer buffer
6970
bint _skip_discard
71+
bint _discard_data
72+
73+
# executemany support data
74+
object _execute_iter
75+
str _execute_portal_name
76+
str _execute_stmt_name
7077

7178
ConnectionStatus con_status
7279
ProtocolState state
@@ -95,6 +102,7 @@ cdef class CoreProtocol:
95102
cdef _process__auth(self, char mtype)
96103
cdef _process__prepare(self, char mtype)
97104
cdef _process__bind_execute(self, char mtype)
105+
cdef _process__bind_execute_many(self, char mtype)
98106
cdef _process__close_stmt_portal(self, char mtype)
99107
cdef _process__simple_query(self, char mtype)
100108
cdef _process__bind(self, char mtype)
@@ -129,8 +137,12 @@ cdef class CoreProtocol:
129137

130138
cdef _connect(self)
131139
cdef _prepare(self, str stmt_name, str query)
140+
cdef _send_bind_message(self, str portal_name, str stmt_name,
141+
WriteBuffer bind_data, int32_t limit)
132142
cdef _bind_execute(self, str portal_name, str stmt_name,
133143
WriteBuffer bind_data, int32_t limit)
144+
cdef _bind_execute_many(self, str portal_name, str stmt_name,
145+
object bind_data)
134146
cdef _bind(self, str portal_name, str stmt_name,
135147
WriteBuffer bind_data)
136148
cdef _execute(self, str portal_name, int32_t limit)

asyncpg/protocol/coreproto.pyx

+95-7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ cdef class CoreProtocol:
2424

2525
self._skip_discard = False
2626

27+
# executemany support data
28+
self._execute_iter = None
29+
self._execute_portal_name = None
30+
self._execute_stmt_name = None
31+
2732
self._reset_result()
2833

2934
cdef _write(self, buf):
@@ -60,6 +65,9 @@ cdef class CoreProtocol:
6065
elif state == PROTOCOL_BIND_EXECUTE:
6166
self._process__bind_execute(mtype)
6267

68+
elif state == PROTOCOL_BIND_EXECUTE_MANY:
69+
self._process__bind_execute_many(mtype)
70+
6371
elif state == PROTOCOL_EXECUTE:
6472
self._process__bind_execute(mtype)
6573

@@ -194,6 +202,49 @@ cdef class CoreProtocol:
194202
# EmptyQueryResponse
195203
self.buffer.consume_message()
196204

205+
cdef _process__bind_execute_many(self, char mtype):
206+
cdef WriteBuffer buf
207+
208+
if mtype == b'D':
209+
# DataRow
210+
self._parse_data_msgs()
211+
212+
elif mtype == b's':
213+
# PortalSuspended
214+
self.buffer.consume_message()
215+
216+
elif mtype == b'C':
217+
# CommandComplete
218+
self._parse_msg_command_complete()
219+
220+
elif mtype == b'E':
221+
# ErrorResponse
222+
self._parse_msg_error_response(True)
223+
224+
elif mtype == b'2':
225+
# BindComplete
226+
self.buffer.consume_message()
227+
228+
elif mtype == b'Z':
229+
# ReadyForQuery
230+
self._parse_msg_ready_for_query()
231+
if self.result_type == RESULT_FAILED:
232+
self._push_result()
233+
else:
234+
try:
235+
buf = <WriteBuffer>next(self._execute_iter)
236+
except StopIteration:
237+
self._push_result()
238+
else:
239+
# Next iteration over the executemany() arg sequence
240+
self._send_bind_message(
241+
self._execute_portal_name, self._execute_stmt_name,
242+
buf, 0)
243+
244+
elif mtype == b'I':
245+
# EmptyQueryResponse
246+
self.buffer.consume_message()
247+
197248
cdef _process__bind(self, char mtype):
198249
if mtype == b'E':
199250
# ErrorResponse
@@ -275,6 +326,14 @@ cdef class CoreProtocol:
275326
raise RuntimeError(
276327
'_parse_data_msgs: first message is not "D"')
277328

329+
if self._discard_data:
330+
while True:
331+
buf.consume_message()
332+
if not buf.has_message() or buf.get_message_type() != b'D':
333+
self._skip_discard = True
334+
return
335+
336+
if ASYNCPG_DEBUG:
278337
if type(self.result) is not list:
279338
raise RuntimeError(
280339
'_parse_data_msgs: result is not a list, but {!r}'.
@@ -424,6 +483,7 @@ cdef class CoreProtocol:
424483
self.result_row_desc = None
425484
self.result_status_msg = None
426485
self.result_execute_completed = False
486+
self._discard_data = False
427487

428488
cdef _set_state(self, ProtocolState new_state):
429489
if new_state == PROTOCOL_IDLE:
@@ -537,16 +597,11 @@ cdef class CoreProtocol:
537597

538598
self.transport.write(memoryview(packet))
539599

540-
cdef _bind_execute(self, str portal_name, str stmt_name,
541-
WriteBuffer bind_data, int32_t limit):
600+
cdef _send_bind_message(self, str portal_name, str stmt_name,
601+
WriteBuffer bind_data, int32_t limit):
542602

543603
cdef WriteBuffer buf
544604

545-
self._ensure_connected()
546-
self._set_state(PROTOCOL_BIND_EXECUTE)
547-
548-
self.result = []
549-
550605
buf = self._build_bind_message(portal_name, stmt_name, bind_data)
551606
self._write(buf)
552607

@@ -558,6 +613,39 @@ cdef class CoreProtocol:
558613

559614
self._write_sync_message()
560615

616+
cdef _bind_execute(self, str portal_name, str stmt_name,
617+
WriteBuffer bind_data, int32_t limit):
618+
619+
cdef WriteBuffer buf
620+
621+
self._ensure_connected()
622+
self._set_state(PROTOCOL_BIND_EXECUTE)
623+
624+
self.result = []
625+
626+
self._send_bind_message(portal_name, stmt_name, bind_data, limit)
627+
628+
cdef _bind_execute_many(self, str portal_name, str stmt_name,
629+
object bind_data):
630+
631+
cdef WriteBuffer buf
632+
633+
self._ensure_connected()
634+
self._set_state(PROTOCOL_BIND_EXECUTE_MANY)
635+
636+
self.result = None
637+
self._discard_data = True
638+
self._execute_iter = bind_data
639+
self._execute_portal_name = portal_name
640+
self._execute_stmt_name = stmt_name
641+
642+
try:
643+
buf = <WriteBuffer>next(bind_data)
644+
except StopIteration:
645+
self._push_result()
646+
else:
647+
self._send_bind_message(portal_name, stmt_name, buf, 0)
648+
561649
cdef _execute(self, str portal_name, int32_t limit):
562650
cdef WriteBuffer buf
563651

asyncpg/protocol/protocol.pyx

+34
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,37 @@ cdef class BaseProtocol(CoreProtocol):
162162

163163
return await self._new_waiter(timeout)
164164

165+
async def bind_execute_many(self, PreparedStatementState state, args,
166+
str portal_name, timeout):
167+
168+
if self.cancel_waiter is not None:
169+
await self.cancel_waiter
170+
if self.cancel_sent_waiter is not None:
171+
await self.cancel_sent_waiter
172+
self.cancel_sent_waiter = None
173+
174+
self._ensure_clear_state()
175+
176+
# Make sure the argument sequence is encoded lazily with
177+
# this generator expression to keep the memory pressure under
178+
# control.
179+
data_gen = (state._encode_bind_msg(b) for b in args)
180+
arg_bufs = iter(data_gen)
181+
182+
waiter = self._new_waiter(timeout)
183+
184+
self._bind_execute_many(
185+
portal_name,
186+
state.name,
187+
arg_bufs)
188+
189+
self.last_query = state.query
190+
self.statement = state
191+
self.return_extra = False
192+
self.queries_count += 1
193+
194+
return await waiter
195+
165196
async def bind(self, PreparedStatementState state, args,
166197
str portal_name, timeout):
167198

@@ -405,6 +436,9 @@ cdef class BaseProtocol(CoreProtocol):
405436
elif self.state == PROTOCOL_BIND_EXECUTE:
406437
self._on_result__bind_and_exec(waiter)
407438

439+
elif self.state == PROTOCOL_BIND_EXECUTE_MANY:
440+
self._on_result__bind_and_exec(waiter)
441+
408442
elif self.state == PROTOCOL_EXECUTE:
409443
self._on_result__bind_and_exec(waiter)
410444

tests/test_execute.py

+35
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,38 @@ async def test_execute_script_interrupted_terminate(self):
9797
await fut
9898

9999
self.con.terminate()
100+
101+
async def test_execute_many_1(self):
102+
await self.con.execute('CREATE TEMP TABLE exmany (a text, b int)')
103+
104+
try:
105+
result = await self.con.executemany('''
106+
INSERT INTO exmany VALUES($1, $2)
107+
''', [
108+
('a', 1), ('b', 2), ('c', 3), ('d', 4)
109+
])
110+
111+
self.assertIsNone(result)
112+
113+
result = await self.con.fetch('''
114+
SELECT * FROM exmany
115+
''')
116+
117+
self.assertEqual(result, [
118+
('a', 1), ('b', 2), ('c', 3), ('d', 4)
119+
])
120+
121+
# Empty set
122+
result = await self.con.executemany('''
123+
INSERT INTO exmany VALUES($1, $2)
124+
''', ())
125+
126+
result = await self.con.fetch('''
127+
SELECT * FROM exmany
128+
''')
129+
130+
self.assertEqual(result, [
131+
('a', 1), ('b', 2), ('c', 3), ('d', 4)
132+
])
133+
finally:
134+
await self.con.execute('DROP TABLE exmany')

0 commit comments

Comments
 (0)