Skip to content

Commit e939255

Browse files
committed
Stop treating ReadyForQuery as a universal result indicator
ReadyForQuery is special in auth and simple query flows, where it _does_ signal the final confirmation of the result, but in all other flows other, more specific messages do that. Now, asyncpg will use the rules of a particular subprotocol when determining the timing of the result waiter wakeup. These changes also make most cases of Sync emission unnecessary, although that cleanup will be addressed in subsequent commits. This consolidation also results in a nice reduction of duplicated code.
1 parent 2fc50e4 commit e939255

File tree

3 files changed

+89
-155
lines changed

3 files changed

+89
-155
lines changed

asyncpg/protocol/coreproto.pxd

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ cdef enum ProtocolState:
1919
PROTOCOL_CANCELLED = 3
2020

2121
PROTOCOL_AUTH = 10
22-
PROTOCOL_PREPARE = 11
22+
PROTOCOL_PARSE_DESCRIBE = 11
2323
PROTOCOL_BIND_EXECUTE = 12
2424
PROTOCOL_BIND_EXECUTE_MANY = 13
2525
PROTOCOL_CLOSE_STMT_PORTAL = 14
@@ -105,7 +105,7 @@ cdef class CoreProtocol:
105105
bint result_execute_completed
106106

107107
cdef _process__auth(self, char mtype)
108-
cdef _process__prepare(self, char mtype)
108+
cdef _process__parse_describe(self, char mtype)
109109
cdef _process__bind_execute(self, char mtype)
110110
cdef _process__bind_execute_many(self, char mtype)
111111
cdef _process__close_stmt_portal(self, char mtype)

asyncpg/protocol/coreproto.pyx

+71-149
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,61 @@ cdef class CoreProtocol:
4444
if mtype == b'S':
4545
# ParameterStatus
4646
self._parse_msg_parameter_status()
47-
continue
47+
4848
elif mtype == b'A':
4949
# NotificationResponse
5050
self._parse_msg_notification()
51-
continue
51+
5252
elif mtype == b'N':
5353
# 'N' - NoticeResponse
5454
self._on_notice(self._parse_msg_error_response(False))
55-
continue
5655

57-
if state == PROTOCOL_AUTH:
56+
elif mtype == b'E':
57+
# ErrorResponse
58+
self._parse_msg_error_response(True)
59+
self._push_result()
60+
61+
elif mtype == b'Z':
62+
# ReadyForQuery
63+
# Auth and SimpleQuery subprotocols use
64+
# ReadyForQuery as the final result indicator.
65+
# In client-side exceptional states, ReadyForQuery
66+
# serves as a synchronization point, indicating
67+
# when it is safe to push the error into the
68+
# result waiter.
69+
self._parse_msg_ready_for_query()
70+
71+
if (state == PROTOCOL_AUTH or
72+
state == PROTOCOL_CANCELLED or
73+
state == PROTOCOL_ERROR_CONSUME or
74+
state == PROTOCOL_SIMPLE_QUERY):
75+
self._push_result()
76+
77+
elif state == PROTOCOL_BIND_EXECUTE_MANY:
78+
if self.result_type == RESULT_FAILED:
79+
self._push_result()
80+
else:
81+
try:
82+
buf = <WriteBuffer>next(self._execute_iter)
83+
except StopIteration:
84+
self._push_result()
85+
except Exception as e:
86+
self.result_type = RESULT_FAILED
87+
self.result = e
88+
self._push_result()
89+
else:
90+
# Next iteration over the executemany()
91+
# arg sequence.
92+
self._send_bind_message(
93+
self._execute_portal_name,
94+
self._execute_stmt_name,
95+
buf, 0)
96+
97+
elif state == PROTOCOL_AUTH:
5898
self._process__auth(mtype)
5999

60-
elif state == PROTOCOL_PREPARE:
61-
self._process__prepare(mtype)
100+
elif state == PROTOCOL_PARSE_DESCRIBE:
101+
self._process__parse_describe(mtype)
62102

63103
elif state == PROTOCOL_BIND_EXECUTE:
64104
self._process__bind_execute(mtype)
@@ -93,42 +133,26 @@ cdef class CoreProtocol:
93133

94134
elif state == PROTOCOL_CANCELLED:
95135
# discard all messages until the sync message
96-
if mtype == b'E':
97-
self._parse_msg_error_response(True)
98-
elif mtype == b'Z':
99-
self._parse_msg_ready_for_query()
100-
self._push_result()
101-
else:
102-
self.buffer.consume_message()
136+
self.buffer.consume_message()
103137

104138
elif state == PROTOCOL_ERROR_CONSUME:
105139
# Error in protocol (on asyncpg side);
106140
# discard all messages until sync message
107-
108-
if mtype == b'Z':
109-
# Sync point, self to push the result
110-
if self.result_type != RESULT_FAILED:
111-
self.result_type = RESULT_FAILED
112-
self.result = apg_exc.InternalClientError(
113-
'unknown error in protocol implementation')
114-
115-
self._push_result()
116-
117-
else:
118-
self.buffer.consume_message()
141+
self.buffer.consume_message()
119142

120143
else:
121144
raise apg_exc.InternalClientError(
122145
'protocol is in an unknown state {}'.format(state))
123146

124147
except Exception as ex:
148+
self.state = PROTOCOL_ERROR_CONSUME
125149
self.result_type = RESULT_FAILED
126150
self.result = ex
127151

128152
if mtype == b'Z':
153+
# This should only happen if _parse_msg_ready_for_query()
154+
# has failed.
129155
self._push_result()
130-
else:
131-
self.state = PROTOCOL_ERROR_CONSUME
132156

133157
finally:
134158
if self._skip_discard:
@@ -153,43 +177,27 @@ cdef class CoreProtocol:
153177
# BackendKeyData
154178
self._parse_msg_backend_key_data()
155179

156-
elif mtype == b'E':
157-
# ErrorResponse
158-
self.con_status = CONNECTION_BAD
159-
self._parse_msg_error_response(True)
160-
self._push_result()
161-
162-
elif mtype == b'Z':
163-
# ReadyForQuery
164-
self._parse_msg_ready_for_query()
165-
self.con_status = CONNECTION_OK
166-
self._push_result()
167-
168-
cdef _process__prepare(self, char mtype):
169-
if mtype == b't':
170-
# Parameters description
171-
self.result_param_desc = self.buffer.consume_message().as_bytes()
180+
# push_result() will be initiated by handling
181+
# ReadyForQuery or ErrorResponse in the main loop.
172182

173-
elif mtype == b'1':
183+
cdef _process__parse_describe(self, char mtype):
184+
if mtype == b'1':
174185
# ParseComplete
175186
self.buffer.consume_message()
176187

188+
elif mtype == b't':
189+
# ParameterDescription
190+
self.result_param_desc = self.buffer.consume_message().as_bytes()
191+
177192
elif mtype == b'T':
178-
# Row description
193+
# RowDescription
179194
self.result_row_desc = self.buffer.consume_message().as_bytes()
180-
181-
elif mtype == b'E':
182-
# ErrorResponse
183-
self._parse_msg_error_response(True)
184-
185-
elif mtype == b'Z':
186-
# ReadyForQuery
187-
self._parse_msg_ready_for_query()
188195
self._push_result()
189196

190197
elif mtype == b'n':
191198
# NoData
192199
self.buffer.consume_message()
200+
self._push_result()
193201

194202
cdef _process__bind_execute(self, char mtype):
195203
if mtype == b'D':
@@ -199,28 +207,22 @@ cdef class CoreProtocol:
199207
elif mtype == b's':
200208
# PortalSuspended
201209
self.buffer.consume_message()
210+
self._push_result()
202211

203212
elif mtype == b'C':
204213
# CommandComplete
205214
self.result_execute_completed = True
206215
self._parse_msg_command_complete()
207-
208-
elif mtype == b'E':
209-
# ErrorResponse
210-
self._parse_msg_error_response(True)
216+
self._push_result()
211217

212218
elif mtype == b'2':
213219
# BindComplete
214220
self.buffer.consume_message()
215221

216-
elif mtype == b'Z':
217-
# ReadyForQuery
218-
self._parse_msg_ready_for_query()
219-
self._push_result()
220-
221222
elif mtype == b'I':
222223
# EmptyQueryResponse
223224
self.buffer.consume_message()
225+
self._push_result()
224226

225227
cdef _process__bind_execute_many(self, char mtype):
226228
cdef WriteBuffer buf
@@ -237,64 +239,24 @@ cdef class CoreProtocol:
237239
# CommandComplete
238240
self._parse_msg_command_complete()
239241

240-
elif mtype == b'E':
241-
# ErrorResponse
242-
self._parse_msg_error_response(True)
243-
244242
elif mtype == b'2':
245243
# BindComplete
246244
self.buffer.consume_message()
247245

248-
elif mtype == b'Z':
249-
# ReadyForQuery
250-
self._parse_msg_ready_for_query()
251-
if self.result_type == RESULT_FAILED:
252-
self._push_result()
253-
else:
254-
try:
255-
buf = <WriteBuffer>next(self._execute_iter)
256-
except StopIteration:
257-
self._push_result()
258-
except Exception as e:
259-
self.result_type = RESULT_FAILED
260-
self.result = e
261-
self._push_result()
262-
else:
263-
# Next iteration over the executemany() arg sequence
264-
self._send_bind_message(
265-
self._execute_portal_name, self._execute_stmt_name,
266-
buf, 0)
267-
268246
elif mtype == b'I':
269247
# EmptyQueryResponse
270248
self.buffer.consume_message()
271249

272250
cdef _process__bind(self, char mtype):
273-
if mtype == b'E':
274-
# ErrorResponse
275-
self._parse_msg_error_response(True)
276-
277-
elif mtype == b'2':
251+
if mtype == b'2':
278252
# BindComplete
279253
self.buffer.consume_message()
280-
281-
elif mtype == b'Z':
282-
# ReadyForQuery
283-
self._parse_msg_ready_for_query()
284254
self._push_result()
285255

286256
cdef _process__close_stmt_portal(self, char mtype):
287-
if mtype == b'E':
288-
# ErrorResponse
289-
self._parse_msg_error_response(True)
290-
291-
elif mtype == b'3':
257+
if mtype == b'3':
292258
# CloseComplete
293259
self.buffer.consume_message()
294-
295-
elif mtype == b'Z':
296-
# ReadyForQuery
297-
self._parse_msg_ready_for_query()
298260
self._push_result()
299261

300262
cdef _process__simple_query(self, char mtype):
@@ -304,42 +266,21 @@ cdef class CoreProtocol:
304266
# 'T' - RowDescription
305267
self.buffer.consume_message()
306268

307-
elif mtype == b'E':
308-
# ErrorResponse
309-
self._parse_msg_error_response(True)
310-
311-
elif mtype == b'Z':
312-
# ReadyForQuery
313-
self._parse_msg_ready_for_query()
314-
self._push_result()
315-
316269
elif mtype == b'C':
317270
# CommandComplete
318271
self._parse_msg_command_complete()
319-
320272
else:
321273
# We don't really care about COPY IN etc
322274
self.buffer.consume_message()
323275

324276
cdef _process__copy_out(self, char mtype):
325-
if mtype == b'E':
326-
self._parse_msg_error_response(True)
327-
328-
elif mtype == b'H':
277+
if mtype == b'H':
329278
# CopyOutResponse
330279
self._set_state(PROTOCOL_COPY_OUT_DATA)
331280
self.buffer.consume_message()
332281

333-
elif mtype == b'Z':
334-
# ReadyForQuery
335-
self._parse_msg_ready_for_query()
336-
self._push_result()
337-
338282
cdef _process__copy_out_data(self, char mtype):
339-
if mtype == b'E':
340-
self._parse_msg_error_response(True)
341-
342-
elif mtype == b'd':
283+
if mtype == b'd':
343284
# CopyData
344285
self._parse_copy_data_msgs()
345286

@@ -351,37 +292,18 @@ cdef class CoreProtocol:
351292
elif mtype == b'C':
352293
# CommandComplete
353294
self._parse_msg_command_complete()
354-
355-
elif mtype == b'Z':
356-
# ReadyForQuery
357-
self._parse_msg_ready_for_query()
358295
self._push_result()
359296

360297
cdef _process__copy_in(self, char mtype):
361-
if mtype == b'E':
362-
self._parse_msg_error_response(True)
363-
364-
elif mtype == b'G':
298+
if mtype == b'G':
365299
# CopyInResponse
366300
self._set_state(PROTOCOL_COPY_IN_DATA)
367301
self.buffer.consume_message()
368302

369-
elif mtype == b'Z':
370-
# ReadyForQuery
371-
self._parse_msg_ready_for_query()
372-
self._push_result()
373-
374303
cdef _process__copy_in_data(self, char mtype):
375-
if mtype == b'E':
376-
self._parse_msg_error_response(True)
377-
378-
elif mtype == b'C':
304+
if mtype == b'C':
379305
# CommandComplete
380306
self._parse_msg_command_complete()
381-
382-
elif mtype == b'Z':
383-
# ReadyForQuery
384-
self._parse_msg_ready_for_query()
385307
self._push_result()
386308

387309
cdef _parse_msg_command_complete(self):
@@ -739,7 +661,7 @@ cdef class CoreProtocol:
739661
WriteBuffer buf
740662

741663
self._ensure_connected()
742-
self._set_state(PROTOCOL_PREPARE)
664+
self._set_state(PROTOCOL_PARSE_DESCRIBE)
743665

744666
buf = WriteBuffer.new_message(b'P')
745667
buf.write_str(stmt_name, self.encoding)

0 commit comments

Comments
 (0)