Skip to content

Commit ca72432

Browse files
committed
Stop treating ReadyForQuery as a universal result indicator
ReadyForQuery is special in auth and simple query flows, where it _does_ signal the final confirmation of the result, but in all other flows other, more specific messages do that. Now, asyncpg will use the rules of a particular subprotocol when determining the timing of the result waiter wakeup. These changes also make most cases of Sync emission unnecessary, although that cleanup will be addressed in subsequent commits. This consolidation also results in a nice reduction of duplicated code.
1 parent 2fc50e4 commit ca72432

File tree

3 files changed

+82
-152
lines changed

3 files changed

+82
-152
lines changed

asyncpg/protocol/coreproto.pxd

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ cdef enum ProtocolState:
1919
PROTOCOL_CANCELLED = 3
2020

2121
PROTOCOL_AUTH = 10
22-
PROTOCOL_PREPARE = 11
22+
PROTOCOL_PARSE_DESCRIBE = 11
2323
PROTOCOL_BIND_EXECUTE = 12
2424
PROTOCOL_BIND_EXECUTE_MANY = 13
2525
PROTOCOL_CLOSE_STMT_PORTAL = 14
@@ -105,7 +105,7 @@ cdef class CoreProtocol:
105105
bint result_execute_completed
106106

107107
cdef _process__auth(self, char mtype)
108-
cdef _process__prepare(self, char mtype)
108+
cdef _process__parse_describe(self, char mtype)
109109
cdef _process__bind_execute(self, char mtype)
110110
cdef _process__bind_execute_many(self, char mtype)
111111
cdef _process__close_stmt_portal(self, char mtype)

asyncpg/protocol/coreproto.pyx

+74-149
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,64 @@ cdef class CoreProtocol:
4444
if mtype == b'S':
4545
# ParameterStatus
4646
self._parse_msg_parameter_status()
47-
continue
47+
4848
elif mtype == b'A':
4949
# NotificationResponse
5050
self._parse_msg_notification()
51-
continue
51+
5252
elif mtype == b'N':
5353
# 'N' - NoticeResponse
5454
self._on_notice(self._parse_msg_error_response(False))
55-
continue
5655

57-
if state == PROTOCOL_AUTH:
56+
elif mtype == b'E':
57+
# ErrorResponse
58+
self._parse_msg_error_response(True)
59+
self._push_result()
60+
61+
elif mtype == b'Z':
62+
# ReadyForQuery
63+
# Auth and SimpleQuery subprotocols use
64+
# ReadyForQuery as the final result indicator,
65+
# _but_ SimpleQuery sends *both* ErrorResponse and
66+
# ReadyForQuery, so there's a check for error state.
67+
# In client-side exceptional states, ReadyForQuery
68+
# serves as a synchronization point, indicating
69+
# when it is safe to push the error into the
70+
# result waiter.
71+
self._parse_msg_ready_for_query()
72+
73+
if (state == PROTOCOL_AUTH or
74+
state == PROTOCOL_CANCELLED or
75+
state == PROTOCOL_ERROR_CONSUME or
76+
(state == PROTOCOL_SIMPLE_QUERY and
77+
self.result_type == RESULT_OK)):
78+
self._push_result()
79+
80+
elif state == PROTOCOL_BIND_EXECUTE_MANY:
81+
if self.result_type == RESULT_FAILED:
82+
self._push_result()
83+
else:
84+
try:
85+
buf = <WriteBuffer>next(self._execute_iter)
86+
except StopIteration:
87+
self._push_result()
88+
except Exception as e:
89+
self.result_type = RESULT_FAILED
90+
self.result = e
91+
self._push_result()
92+
else:
93+
# Next iteration over the executemany()
94+
# arg sequence.
95+
self._send_bind_message(
96+
self._execute_portal_name,
97+
self._execute_stmt_name,
98+
buf, 0)
99+
100+
elif state == PROTOCOL_AUTH:
58101
self._process__auth(mtype)
59102

60-
elif state == PROTOCOL_PREPARE:
61-
self._process__prepare(mtype)
103+
elif state == PROTOCOL_PARSE_DESCRIBE:
104+
self._process__parse_describe(mtype)
62105

63106
elif state == PROTOCOL_BIND_EXECUTE:
64107
self._process__bind_execute(mtype)
@@ -93,42 +136,26 @@ cdef class CoreProtocol:
93136

94137
elif state == PROTOCOL_CANCELLED:
95138
# discard all messages until the sync message
96-
if mtype == b'E':
97-
self._parse_msg_error_response(True)
98-
elif mtype == b'Z':
99-
self._parse_msg_ready_for_query()
100-
self._push_result()
101-
else:
102-
self.buffer.consume_message()
139+
self.buffer.consume_message()
103140

104141
elif state == PROTOCOL_ERROR_CONSUME:
105142
# Error in protocol (on asyncpg side);
106143
# discard all messages until sync message
107-
108-
if mtype == b'Z':
109-
# Sync point, self to push the result
110-
if self.result_type != RESULT_FAILED:
111-
self.result_type = RESULT_FAILED
112-
self.result = apg_exc.InternalClientError(
113-
'unknown error in protocol implementation')
114-
115-
self._push_result()
116-
117-
else:
118-
self.buffer.consume_message()
144+
self.buffer.consume_message()
119145

120146
else:
121147
raise apg_exc.InternalClientError(
122148
'protocol is in an unknown state {}'.format(state))
123149

124150
except Exception as ex:
151+
self.state = PROTOCOL_ERROR_CONSUME
125152
self.result_type = RESULT_FAILED
126153
self.result = ex
127154

128155
if mtype == b'Z':
156+
# This should only happen if _parse_msg_ready_for_query()
157+
# has failed.
129158
self._push_result()
130-
else:
131-
self.state = PROTOCOL_ERROR_CONSUME
132159

133160
finally:
134161
if self._skip_discard:
@@ -153,43 +180,27 @@ cdef class CoreProtocol:
153180
# BackendKeyData
154181
self._parse_msg_backend_key_data()
155182

156-
elif mtype == b'E':
157-
# ErrorResponse
158-
self.con_status = CONNECTION_BAD
159-
self._parse_msg_error_response(True)
160-
self._push_result()
161-
162-
elif mtype == b'Z':
163-
# ReadyForQuery
164-
self._parse_msg_ready_for_query()
165-
self.con_status = CONNECTION_OK
166-
self._push_result()
167-
168-
cdef _process__prepare(self, char mtype):
169-
if mtype == b't':
170-
# Parameters description
171-
self.result_param_desc = self.buffer.consume_message().as_bytes()
183+
# push_result() will be initiated by handling
184+
# ReadyForQuery or ErrorResponse in the main loop.
172185

173-
elif mtype == b'1':
186+
cdef _process__parse_describe(self, char mtype):
187+
if mtype == b'1':
174188
# ParseComplete
175189
self.buffer.consume_message()
176190

191+
elif mtype == b't':
192+
# ParameterDescription
193+
self.result_param_desc = self.buffer.consume_message().as_bytes()
194+
177195
elif mtype == b'T':
178-
# Row description
196+
# RowDescription
179197
self.result_row_desc = self.buffer.consume_message().as_bytes()
180-
181-
elif mtype == b'E':
182-
# ErrorResponse
183-
self._parse_msg_error_response(True)
184-
185-
elif mtype == b'Z':
186-
# ReadyForQuery
187-
self._parse_msg_ready_for_query()
188198
self._push_result()
189199

190200
elif mtype == b'n':
191201
# NoData
192202
self.buffer.consume_message()
203+
self._push_result()
193204

194205
cdef _process__bind_execute(self, char mtype):
195206
if mtype == b'D':
@@ -199,28 +210,22 @@ cdef class CoreProtocol:
199210
elif mtype == b's':
200211
# PortalSuspended
201212
self.buffer.consume_message()
213+
self._push_result()
202214

203215
elif mtype == b'C':
204216
# CommandComplete
205217
self.result_execute_completed = True
206218
self._parse_msg_command_complete()
207-
208-
elif mtype == b'E':
209-
# ErrorResponse
210-
self._parse_msg_error_response(True)
219+
self._push_result()
211220

212221
elif mtype == b'2':
213222
# BindComplete
214223
self.buffer.consume_message()
215224

216-
elif mtype == b'Z':
217-
# ReadyForQuery
218-
self._parse_msg_ready_for_query()
219-
self._push_result()
220-
221225
elif mtype == b'I':
222226
# EmptyQueryResponse
223227
self.buffer.consume_message()
228+
self._push_result()
224229

225230
cdef _process__bind_execute_many(self, char mtype):
226231
cdef WriteBuffer buf
@@ -237,64 +242,24 @@ cdef class CoreProtocol:
237242
# CommandComplete
238243
self._parse_msg_command_complete()
239244

240-
elif mtype == b'E':
241-
# ErrorResponse
242-
self._parse_msg_error_response(True)
243-
244245
elif mtype == b'2':
245246
# BindComplete
246247
self.buffer.consume_message()
247248

248-
elif mtype == b'Z':
249-
# ReadyForQuery
250-
self._parse_msg_ready_for_query()
251-
if self.result_type == RESULT_FAILED:
252-
self._push_result()
253-
else:
254-
try:
255-
buf = <WriteBuffer>next(self._execute_iter)
256-
except StopIteration:
257-
self._push_result()
258-
except Exception as e:
259-
self.result_type = RESULT_FAILED
260-
self.result = e
261-
self._push_result()
262-
else:
263-
# Next iteration over the executemany() arg sequence
264-
self._send_bind_message(
265-
self._execute_portal_name, self._execute_stmt_name,
266-
buf, 0)
267-
268249
elif mtype == b'I':
269250
# EmptyQueryResponse
270251
self.buffer.consume_message()
271252

272253
cdef _process__bind(self, char mtype):
273-
if mtype == b'E':
274-
# ErrorResponse
275-
self._parse_msg_error_response(True)
276-
277-
elif mtype == b'2':
254+
if mtype == b'2':
278255
# BindComplete
279256
self.buffer.consume_message()
280-
281-
elif mtype == b'Z':
282-
# ReadyForQuery
283-
self._parse_msg_ready_for_query()
284257
self._push_result()
285258

286259
cdef _process__close_stmt_portal(self, char mtype):
287-
if mtype == b'E':
288-
# ErrorResponse
289-
self._parse_msg_error_response(True)
290-
291-
elif mtype == b'3':
260+
if mtype == b'3':
292261
# CloseComplete
293262
self.buffer.consume_message()
294-
295-
elif mtype == b'Z':
296-
# ReadyForQuery
297-
self._parse_msg_ready_for_query()
298263
self._push_result()
299264

300265
cdef _process__simple_query(self, char mtype):
@@ -304,42 +269,21 @@ cdef class CoreProtocol:
304269
# 'T' - RowDescription
305270
self.buffer.consume_message()
306271

307-
elif mtype == b'E':
308-
# ErrorResponse
309-
self._parse_msg_error_response(True)
310-
311-
elif mtype == b'Z':
312-
# ReadyForQuery
313-
self._parse_msg_ready_for_query()
314-
self._push_result()
315-
316272
elif mtype == b'C':
317273
# CommandComplete
318274
self._parse_msg_command_complete()
319-
320275
else:
321276
# We don't really care about COPY IN etc
322277
self.buffer.consume_message()
323278

324279
cdef _process__copy_out(self, char mtype):
325-
if mtype == b'E':
326-
self._parse_msg_error_response(True)
327-
328-
elif mtype == b'H':
280+
if mtype == b'H':
329281
# CopyOutResponse
330282
self._set_state(PROTOCOL_COPY_OUT_DATA)
331283
self.buffer.consume_message()
332284

333-
elif mtype == b'Z':
334-
# ReadyForQuery
335-
self._parse_msg_ready_for_query()
336-
self._push_result()
337-
338285
cdef _process__copy_out_data(self, char mtype):
339-
if mtype == b'E':
340-
self._parse_msg_error_response(True)
341-
342-
elif mtype == b'd':
286+
if mtype == b'd':
343287
# CopyData
344288
self._parse_copy_data_msgs()
345289

@@ -351,37 +295,18 @@ cdef class CoreProtocol:
351295
elif mtype == b'C':
352296
# CommandComplete
353297
self._parse_msg_command_complete()
354-
355-
elif mtype == b'Z':
356-
# ReadyForQuery
357-
self._parse_msg_ready_for_query()
358298
self._push_result()
359299

360300
cdef _process__copy_in(self, char mtype):
361-
if mtype == b'E':
362-
self._parse_msg_error_response(True)
363-
364-
elif mtype == b'G':
301+
if mtype == b'G':
365302
# CopyInResponse
366303
self._set_state(PROTOCOL_COPY_IN_DATA)
367304
self.buffer.consume_message()
368305

369-
elif mtype == b'Z':
370-
# ReadyForQuery
371-
self._parse_msg_ready_for_query()
372-
self._push_result()
373-
374306
cdef _process__copy_in_data(self, char mtype):
375-
if mtype == b'E':
376-
self._parse_msg_error_response(True)
377-
378-
elif mtype == b'C':
307+
if mtype == b'C':
379308
# CommandComplete
380309
self._parse_msg_command_complete()
381-
382-
elif mtype == b'Z':
383-
# ReadyForQuery
384-
self._parse_msg_ready_for_query()
385310
self._push_result()
386311

387312
cdef _parse_msg_command_complete(self):
@@ -739,7 +664,7 @@ cdef class CoreProtocol:
739664
WriteBuffer buf
740665

741666
self._ensure_connected()
742-
self._set_state(PROTOCOL_PREPARE)
667+
self._set_state(PROTOCOL_PARSE_DESCRIBE)
743668

744669
buf = WriteBuffer.new_message(b'P')
745670
buf.write_str(stmt_name, self.encoding)

0 commit comments

Comments
 (0)