Skip to content

Commit 7a81613

Browse files
committed
Stop treating ReadyForQuery as a universal result indicator
ReadyForQuery is special in auth and simple query flows, where it _does_ signal the final confirmation of the result, but in all other flows other, more specific messages do that. Now, asyncpg will use the rules of a particular subprotocol when determining the timing of the result waiter wakeup. These changes also make most cases of Sync emission unnecessary, although that cleanup will be addressed in subsequent commits. This consolidation also results in a nice reduction of duplicated code.
1 parent 2fc50e4 commit 7a81613

File tree

3 files changed

+78
-155
lines changed

3 files changed

+78
-155
lines changed

asyncpg/protocol/coreproto.pxd

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ cdef enum ProtocolState:
1919
PROTOCOL_CANCELLED = 3
2020

2121
PROTOCOL_AUTH = 10
22-
PROTOCOL_PREPARE = 11
22+
PROTOCOL_PARSE_DESCRIBE = 11
2323
PROTOCOL_BIND_EXECUTE = 12
2424
PROTOCOL_BIND_EXECUTE_MANY = 13
2525
PROTOCOL_CLOSE_STMT_PORTAL = 14
@@ -105,7 +105,7 @@ cdef class CoreProtocol:
105105
bint result_execute_completed
106106

107107
cdef _process__auth(self, char mtype)
108-
cdef _process__prepare(self, char mtype)
108+
cdef _process__parse_describe(self, char mtype)
109109
cdef _process__bind_execute(self, char mtype)
110110
cdef _process__bind_execute_many(self, char mtype)
111111
cdef _process__close_stmt_portal(self, char mtype)

asyncpg/protocol/coreproto.pyx

+66-149
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,56 @@ cdef class CoreProtocol:
4444
if mtype == b'S':
4545
# ParameterStatus
4646
self._parse_msg_parameter_status()
47-
continue
47+
4848
elif mtype == b'A':
4949
# NotificationResponse
5050
self._parse_msg_notification()
51-
continue
51+
5252
elif mtype == b'N':
5353
# 'N' - NoticeResponse
5454
self._on_notice(self._parse_msg_error_response(False))
55-
continue
5655

57-
if state == PROTOCOL_AUTH:
56+
elif mtype == b'E':
57+
# ErrorResponse
58+
self._parse_msg_error_response(True)
59+
# In all cases, except Auth, ErrorResponse will
60+
# be followed by a ReadyForQuery, which is when
61+
# _push_result() will be called.
62+
if state == PROTOCOL_AUTH:
63+
self._push_result()
64+
65+
elif mtype == b'Z':
66+
# ReadyForQuery
67+
self._parse_msg_ready_for_query()
68+
69+
if state != PROTOCOL_BIND_EXECUTE_MANY:
70+
self._push_result()
71+
72+
else:
73+
if self.result_type == RESULT_FAILED:
74+
self._push_result()
75+
else:
76+
try:
77+
buf = <WriteBuffer>next(self._execute_iter)
78+
except StopIteration:
79+
self._push_result()
80+
except Exception as e:
81+
self.result_type = RESULT_FAILED
82+
self.result = e
83+
self._push_result()
84+
else:
85+
# Next iteration over the executemany()
86+
# arg sequence.
87+
self._send_bind_message(
88+
self._execute_portal_name,
89+
self._execute_stmt_name,
90+
buf, 0)
91+
92+
elif state == PROTOCOL_AUTH:
5893
self._process__auth(mtype)
5994

60-
elif state == PROTOCOL_PREPARE:
61-
self._process__prepare(mtype)
95+
elif state == PROTOCOL_PARSE_DESCRIBE:
96+
self._process__parse_describe(mtype)
6297

6398
elif state == PROTOCOL_BIND_EXECUTE:
6499
self._process__bind_execute(mtype)
@@ -93,42 +128,26 @@ cdef class CoreProtocol:
93128

94129
elif state == PROTOCOL_CANCELLED:
95130
# discard all messages until the sync message
96-
if mtype == b'E':
97-
self._parse_msg_error_response(True)
98-
elif mtype == b'Z':
99-
self._parse_msg_ready_for_query()
100-
self._push_result()
101-
else:
102-
self.buffer.consume_message()
131+
self.buffer.consume_message()
103132

104133
elif state == PROTOCOL_ERROR_CONSUME:
105134
# Error in protocol (on asyncpg side);
106135
# discard all messages until sync message
107-
108-
if mtype == b'Z':
109-
# Sync point, self to push the result
110-
if self.result_type != RESULT_FAILED:
111-
self.result_type = RESULT_FAILED
112-
self.result = apg_exc.InternalClientError(
113-
'unknown error in protocol implementation')
114-
115-
self._push_result()
116-
117-
else:
118-
self.buffer.consume_message()
136+
self.buffer.consume_message()
119137

120138
else:
121139
raise apg_exc.InternalClientError(
122140
'protocol is in an unknown state {}'.format(state))
123141

124142
except Exception as ex:
143+
self.state = PROTOCOL_ERROR_CONSUME
125144
self.result_type = RESULT_FAILED
126145
self.result = ex
127146

128147
if mtype == b'Z':
148+
# This should only happen if _parse_msg_ready_for_query()
149+
# has failed.
129150
self._push_result()
130-
else:
131-
self.state = PROTOCOL_ERROR_CONSUME
132151

133152
finally:
134153
if self._skip_discard:
@@ -153,43 +172,27 @@ cdef class CoreProtocol:
153172
# BackendKeyData
154173
self._parse_msg_backend_key_data()
155174

156-
elif mtype == b'E':
157-
# ErrorResponse
158-
self.con_status = CONNECTION_BAD
159-
self._parse_msg_error_response(True)
160-
self._push_result()
161-
162-
elif mtype == b'Z':
163-
# ReadyForQuery
164-
self._parse_msg_ready_for_query()
165-
self.con_status = CONNECTION_OK
166-
self._push_result()
167-
168-
cdef _process__prepare(self, char mtype):
169-
if mtype == b't':
170-
# Parameters description
171-
self.result_param_desc = self.buffer.consume_message().as_bytes()
175+
# push_result() will be initiated by handling
176+
# ReadyForQuery or ErrorResponse in the main loop.
172177

173-
elif mtype == b'1':
178+
cdef _process__parse_describe(self, char mtype):
179+
if mtype == b'1':
174180
# ParseComplete
175181
self.buffer.consume_message()
176182

183+
elif mtype == b't':
184+
# ParameterDescription
185+
self.result_param_desc = self.buffer.consume_message().as_bytes()
186+
177187
elif mtype == b'T':
178-
# Row description
188+
# RowDescription
179189
self.result_row_desc = self.buffer.consume_message().as_bytes()
180-
181-
elif mtype == b'E':
182-
# ErrorResponse
183-
self._parse_msg_error_response(True)
184-
185-
elif mtype == b'Z':
186-
# ReadyForQuery
187-
self._parse_msg_ready_for_query()
188190
self._push_result()
189191

190192
elif mtype == b'n':
191193
# NoData
192194
self.buffer.consume_message()
195+
self._push_result()
193196

194197
cdef _process__bind_execute(self, char mtype):
195198
if mtype == b'D':
@@ -199,28 +202,22 @@ cdef class CoreProtocol:
199202
elif mtype == b's':
200203
# PortalSuspended
201204
self.buffer.consume_message()
205+
self._push_result()
202206

203207
elif mtype == b'C':
204208
# CommandComplete
205209
self.result_execute_completed = True
206210
self._parse_msg_command_complete()
207-
208-
elif mtype == b'E':
209-
# ErrorResponse
210-
self._parse_msg_error_response(True)
211+
self._push_result()
211212

212213
elif mtype == b'2':
213214
# BindComplete
214215
self.buffer.consume_message()
215216

216-
elif mtype == b'Z':
217-
# ReadyForQuery
218-
self._parse_msg_ready_for_query()
219-
self._push_result()
220-
221217
elif mtype == b'I':
222218
# EmptyQueryResponse
223219
self.buffer.consume_message()
220+
self._push_result()
224221

225222
cdef _process__bind_execute_many(self, char mtype):
226223
cdef WriteBuffer buf
@@ -237,64 +234,24 @@ cdef class CoreProtocol:
237234
# CommandComplete
238235
self._parse_msg_command_complete()
239236

240-
elif mtype == b'E':
241-
# ErrorResponse
242-
self._parse_msg_error_response(True)
243-
244237
elif mtype == b'2':
245238
# BindComplete
246239
self.buffer.consume_message()
247240

248-
elif mtype == b'Z':
249-
# ReadyForQuery
250-
self._parse_msg_ready_for_query()
251-
if self.result_type == RESULT_FAILED:
252-
self._push_result()
253-
else:
254-
try:
255-
buf = <WriteBuffer>next(self._execute_iter)
256-
except StopIteration:
257-
self._push_result()
258-
except Exception as e:
259-
self.result_type = RESULT_FAILED
260-
self.result = e
261-
self._push_result()
262-
else:
263-
# Next iteration over the executemany() arg sequence
264-
self._send_bind_message(
265-
self._execute_portal_name, self._execute_stmt_name,
266-
buf, 0)
267-
268241
elif mtype == b'I':
269242
# EmptyQueryResponse
270243
self.buffer.consume_message()
271244

272245
cdef _process__bind(self, char mtype):
273-
if mtype == b'E':
274-
# ErrorResponse
275-
self._parse_msg_error_response(True)
276-
277-
elif mtype == b'2':
246+
if mtype == b'2':
278247
# BindComplete
279248
self.buffer.consume_message()
280-
281-
elif mtype == b'Z':
282-
# ReadyForQuery
283-
self._parse_msg_ready_for_query()
284249
self._push_result()
285250

286251
cdef _process__close_stmt_portal(self, char mtype):
287-
if mtype == b'E':
288-
# ErrorResponse
289-
self._parse_msg_error_response(True)
290-
291-
elif mtype == b'3':
252+
if mtype == b'3':
292253
# CloseComplete
293254
self.buffer.consume_message()
294-
295-
elif mtype == b'Z':
296-
# ReadyForQuery
297-
self._parse_msg_ready_for_query()
298255
self._push_result()
299256

300257
cdef _process__simple_query(self, char mtype):
@@ -304,42 +261,21 @@ cdef class CoreProtocol:
304261
# 'T' - RowDescription
305262
self.buffer.consume_message()
306263

307-
elif mtype == b'E':
308-
# ErrorResponse
309-
self._parse_msg_error_response(True)
310-
311-
elif mtype == b'Z':
312-
# ReadyForQuery
313-
self._parse_msg_ready_for_query()
314-
self._push_result()
315-
316264
elif mtype == b'C':
317265
# CommandComplete
318266
self._parse_msg_command_complete()
319-
320267
else:
321268
# We don't really care about COPY IN etc
322269
self.buffer.consume_message()
323270

324271
cdef _process__copy_out(self, char mtype):
325-
if mtype == b'E':
326-
self._parse_msg_error_response(True)
327-
328-
elif mtype == b'H':
272+
if mtype == b'H':
329273
# CopyOutResponse
330274
self._set_state(PROTOCOL_COPY_OUT_DATA)
331275
self.buffer.consume_message()
332276

333-
elif mtype == b'Z':
334-
# ReadyForQuery
335-
self._parse_msg_ready_for_query()
336-
self._push_result()
337-
338277
cdef _process__copy_out_data(self, char mtype):
339-
if mtype == b'E':
340-
self._parse_msg_error_response(True)
341-
342-
elif mtype == b'd':
278+
if mtype == b'd':
343279
# CopyData
344280
self._parse_copy_data_msgs()
345281

@@ -351,37 +287,18 @@ cdef class CoreProtocol:
351287
elif mtype == b'C':
352288
# CommandComplete
353289
self._parse_msg_command_complete()
354-
355-
elif mtype == b'Z':
356-
# ReadyForQuery
357-
self._parse_msg_ready_for_query()
358290
self._push_result()
359291

360292
cdef _process__copy_in(self, char mtype):
361-
if mtype == b'E':
362-
self._parse_msg_error_response(True)
363-
364-
elif mtype == b'G':
293+
if mtype == b'G':
365294
# CopyInResponse
366295
self._set_state(PROTOCOL_COPY_IN_DATA)
367296
self.buffer.consume_message()
368297

369-
elif mtype == b'Z':
370-
# ReadyForQuery
371-
self._parse_msg_ready_for_query()
372-
self._push_result()
373-
374298
cdef _process__copy_in_data(self, char mtype):
375-
if mtype == b'E':
376-
self._parse_msg_error_response(True)
377-
378-
elif mtype == b'C':
299+
if mtype == b'C':
379300
# CommandComplete
380301
self._parse_msg_command_complete()
381-
382-
elif mtype == b'Z':
383-
# ReadyForQuery
384-
self._parse_msg_ready_for_query()
385302
self._push_result()
386303

387304
cdef _parse_msg_command_complete(self):
@@ -739,7 +656,7 @@ cdef class CoreProtocol:
739656
WriteBuffer buf
740657

741658
self._ensure_connected()
742-
self._set_state(PROTOCOL_PREPARE)
659+
self._set_state(PROTOCOL_PARSE_DESCRIBE)
743660

744661
buf = WriteBuffer.new_message(b'P')
745662
buf.write_str(stmt_name, self.encoding)

0 commit comments

Comments
 (0)