Skip to content

Stop treating ReadyForQuery as a universal result indicator #362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions asyncpg/protocol/coreproto.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ cdef enum ProtocolState:
PROTOCOL_CANCELLED = 3

PROTOCOL_AUTH = 10
PROTOCOL_PREPARE = 11
PROTOCOL_PARSE_DESCRIBE = 11
PROTOCOL_BIND_EXECUTE = 12
PROTOCOL_BIND_EXECUTE_MANY = 13
PROTOCOL_CLOSE_STMT_PORTAL = 14
Expand Down Expand Up @@ -105,7 +105,7 @@ cdef class CoreProtocol:
bint result_execute_completed

cdef _process__auth(self, char mtype)
cdef _process__prepare(self, char mtype)
cdef _process__parse_describe(self, char mtype)
cdef _process__bind_execute(self, char mtype)
cdef _process__bind_execute_many(self, char mtype)
cdef _process__close_stmt_portal(self, char mtype)
Expand Down
215 changes: 66 additions & 149 deletions asyncpg/protocol/coreproto.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,56 @@ cdef class CoreProtocol:
if mtype == b'S':
# ParameterStatus
self._parse_msg_parameter_status()
continue

elif mtype == b'A':
# NotificationResponse
self._parse_msg_notification()
continue

elif mtype == b'N':
# 'N' - NoticeResponse
self._on_notice(self._parse_msg_error_response(False))
continue

if state == PROTOCOL_AUTH:
elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)
# In all cases, except Auth, ErrorResponse will
# be followed by a ReadyForQuery, which is when
# _push_result() will be called.
if state == PROTOCOL_AUTH:
self._push_result()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()

if state != PROTOCOL_BIND_EXECUTE_MANY:
self._push_result()

else:
if self.result_type == RESULT_FAILED:
self._push_result()
else:
try:
buf = <WriteBuffer>next(self._execute_iter)
except StopIteration:
self._push_result()
except Exception as e:
self.result_type = RESULT_FAILED
self.result = e
self._push_result()
else:
# Next iteration over the executemany()
# arg sequence.
self._send_bind_message(
self._execute_portal_name,
self._execute_stmt_name,
buf, 0)

elif state == PROTOCOL_AUTH:
self._process__auth(mtype)

elif state == PROTOCOL_PREPARE:
self._process__prepare(mtype)
elif state == PROTOCOL_PARSE_DESCRIBE:
self._process__parse_describe(mtype)

elif state == PROTOCOL_BIND_EXECUTE:
self._process__bind_execute(mtype)
Expand Down Expand Up @@ -93,42 +128,26 @@ cdef class CoreProtocol:

elif state == PROTOCOL_CANCELLED:
# discard all messages until the sync message
if mtype == b'E':
self._parse_msg_error_response(True)
elif mtype == b'Z':
self._parse_msg_ready_for_query()
self._push_result()
else:
self.buffer.consume_message()
self.buffer.consume_message()

elif state == PROTOCOL_ERROR_CONSUME:
# Error in protocol (on asyncpg side);
# discard all messages until sync message

if mtype == b'Z':
# Sync point, self to push the result
if self.result_type != RESULT_FAILED:
self.result_type = RESULT_FAILED
self.result = apg_exc.InternalClientError(
'unknown error in protocol implementation')

self._push_result()

else:
self.buffer.consume_message()
self.buffer.consume_message()

else:
raise apg_exc.InternalClientError(
'protocol is in an unknown state {}'.format(state))

except Exception as ex:
self.state = PROTOCOL_ERROR_CONSUME
self.result_type = RESULT_FAILED
self.result = ex

if mtype == b'Z':
# This should only happen if _parse_msg_ready_for_query()
# has failed.
self._push_result()
else:
self.state = PROTOCOL_ERROR_CONSUME

finally:
if self._skip_discard:
Expand All @@ -153,43 +172,27 @@ cdef class CoreProtocol:
# BackendKeyData
self._parse_msg_backend_key_data()

elif mtype == b'E':
# ErrorResponse
self.con_status = CONNECTION_BAD
self._parse_msg_error_response(True)
self._push_result()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self.con_status = CONNECTION_OK
self._push_result()

cdef _process__prepare(self, char mtype):
if mtype == b't':
# Parameters description
self.result_param_desc = self.buffer.consume_message().as_bytes()
# push_result() will be initiated by handling
# ReadyForQuery or ErrorResponse in the main loop.

elif mtype == b'1':
cdef _process__parse_describe(self, char mtype):
if mtype == b'1':
# ParseComplete
self.buffer.consume_message()

elif mtype == b't':
# ParameterDescription
self.result_param_desc = self.buffer.consume_message().as_bytes()

elif mtype == b'T':
# Row description
# RowDescription
self.result_row_desc = self.buffer.consume_message().as_bytes()

elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

elif mtype == b'n':
# NoData
self.buffer.consume_message()
self._push_result()

cdef _process__bind_execute(self, char mtype):
if mtype == b'D':
Expand All @@ -199,28 +202,22 @@ cdef class CoreProtocol:
elif mtype == b's':
# PortalSuspended
self.buffer.consume_message()
self._push_result()

elif mtype == b'C':
# CommandComplete
self.result_execute_completed = True
self._parse_msg_command_complete()

elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)
self._push_result()

elif mtype == b'2':
# BindComplete
self.buffer.consume_message()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

elif mtype == b'I':
# EmptyQueryResponse
self.buffer.consume_message()
self._push_result()

cdef _process__bind_execute_many(self, char mtype):
cdef WriteBuffer buf
Expand All @@ -237,64 +234,24 @@ cdef class CoreProtocol:
# CommandComplete
self._parse_msg_command_complete()

elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)

elif mtype == b'2':
# BindComplete
self.buffer.consume_message()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
if self.result_type == RESULT_FAILED:
self._push_result()
else:
try:
buf = <WriteBuffer>next(self._execute_iter)
except StopIteration:
self._push_result()
except Exception as e:
self.result_type = RESULT_FAILED
self.result = e
self._push_result()
else:
# Next iteration over the executemany() arg sequence
self._send_bind_message(
self._execute_portal_name, self._execute_stmt_name,
buf, 0)

elif mtype == b'I':
# EmptyQueryResponse
self.buffer.consume_message()

cdef _process__bind(self, char mtype):
if mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)

elif mtype == b'2':
if mtype == b'2':
# BindComplete
self.buffer.consume_message()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

cdef _process__close_stmt_portal(self, char mtype):
if mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)

elif mtype == b'3':
if mtype == b'3':
# CloseComplete
self.buffer.consume_message()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

cdef _process__simple_query(self, char mtype):
Expand All @@ -304,42 +261,21 @@ cdef class CoreProtocol:
# 'T' - RowDescription
self.buffer.consume_message()

elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

elif mtype == b'C':
# CommandComplete
self._parse_msg_command_complete()

else:
# We don't really care about COPY IN etc
self.buffer.consume_message()

cdef _process__copy_out(self, char mtype):
if mtype == b'E':
self._parse_msg_error_response(True)

elif mtype == b'H':
if mtype == b'H':
# CopyOutResponse
self._set_state(PROTOCOL_COPY_OUT_DATA)
self.buffer.consume_message()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

cdef _process__copy_out_data(self, char mtype):
if mtype == b'E':
self._parse_msg_error_response(True)

elif mtype == b'd':
if mtype == b'd':
# CopyData
self._parse_copy_data_msgs()

Expand All @@ -351,37 +287,18 @@ cdef class CoreProtocol:
elif mtype == b'C':
# CommandComplete
self._parse_msg_command_complete()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

cdef _process__copy_in(self, char mtype):
if mtype == b'E':
self._parse_msg_error_response(True)

elif mtype == b'G':
if mtype == b'G':
# CopyInResponse
self._set_state(PROTOCOL_COPY_IN_DATA)
self.buffer.consume_message()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

cdef _process__copy_in_data(self, char mtype):
if mtype == b'E':
self._parse_msg_error_response(True)

elif mtype == b'C':
if mtype == b'C':
# CommandComplete
self._parse_msg_command_complete()

elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()

cdef _parse_msg_command_complete(self):
Expand Down Expand Up @@ -739,7 +656,7 @@ cdef class CoreProtocol:
WriteBuffer buf

self._ensure_connected()
self._set_state(PROTOCOL_PREPARE)
self._set_state(PROTOCOL_PARSE_DESCRIBE)

buf = WriteBuffer.new_message(b'P')
buf.write_str(stmt_name, self.encoding)
Expand Down
Loading