-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Fix race conditions with conn.in_flight_requests #1757
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
isamaru
wants to merge
4
commits into
dpkp:master
from
exponea:kafka-python_#1744_Fix_race_conditions_with_conn.in_flight_requests
Closed
Changes from 3 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
34ca6b3
kafka-python #1744 Fix race conditions with conn.in_flight_requests
f229be3
kafka-python #1744 Fix one more race condition with IFR
fd3ec03
kafka-python #1744 Fix additional race condition with IFR
7a0d086
kafka-python #1744 Fix deadlock with _ifr_lock in recv
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -271,6 +271,8 @@ def __init__(self, host, port, afi, **configs): | |
# including tracking request futures and timestamps, we | ||
# can use a simple dictionary of correlation_id => request data | ||
self.in_flight_requests = dict() | ||
# Ensures that all in_flight_requests futures are properly closed on disconnect. | ||
self._ifr_lock = threading.Lock() | ||
|
||
self._protocol = KafkaProtocol( | ||
client_id=self.config['client_id'], | ||
|
@@ -741,15 +743,19 @@ def close(self, error=None): | |
self.config['state_change_callback'](self) | ||
self._update_reconnect_backoff() | ||
self._close_socket() | ||
self.state = ConnectionStates.DISCONNECTED | ||
self._sasl_auth_future = None | ||
self._protocol = KafkaProtocol( | ||
client_id=self.config['client_id'], | ||
api_version=self.config['api_version']) | ||
|
||
with self._ifr_lock: | ||
self.state = ConnectionStates.DISCONNECTED | ||
self._sasl_auth_future = None | ||
self._protocol = KafkaProtocol( | ||
client_id=self.config['client_id'], | ||
api_version=self.config['api_version']) | ||
fail_ifrs = dict(self.in_flight_requests) | ||
self.in_flight_requests.clear() | ||
|
||
if error is None: | ||
error = Errors.Cancelled(str(self)) | ||
while self.in_flight_requests: | ||
(_correlation_id, (future, _timestamp)) = self.in_flight_requests.popitem() | ||
for future, _timestamp in fail_ifrs.values(): | ||
future.failure(error) | ||
self.config['state_change_callback'](self) | ||
|
||
|
@@ -770,13 +776,18 @@ def _send(self, request, blocking=True): | |
with self._lock: | ||
correlation_id = self._protocol.send_request(request) | ||
|
||
log.debug('%s Request %d: %s', self, correlation_id, request) | ||
if request.expect_response(): | ||
sent_time = time.time() | ||
assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!' | ||
self.in_flight_requests[correlation_id] = (future, sent_time) | ||
else: | ||
future.success(None) | ||
log.debug('%s Request %d: %s', self, correlation_id, request) | ||
if request.expect_response(): | ||
with self._ifr_lock: | ||
if self.disconnected(): | ||
log.debug("%s: Connection already closed.", self) | ||
future.failure(Errors.Cancelled(str(self))) | ||
else: | ||
sent_time = time.time() | ||
assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!' | ||
self.in_flight_requests[correlation_id] = (future, sent_time) | ||
else: | ||
future.success(None) | ||
|
||
# Attempt to replicate behavior from prior to introduction of | ||
# send_pending_requests() / async sends | ||
|
@@ -808,24 +819,26 @@ def send_pending_requests(self): | |
def can_send_more(self): | ||
"""Return True unless there are max_in_flight_requests_per_connection.""" | ||
max_ifrs = self.config['max_in_flight_requests_per_connection'] | ||
return len(self.in_flight_requests) < max_ifrs | ||
with self._ifr_lock: | ||
return len(self.in_flight_requests) < max_ifrs | ||
|
||
def recv(self): | ||
"""Non-blocking network receive. | ||
|
||
Return list of (response, future) tuples | ||
""" | ||
if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING: | ||
log.warning('%s cannot recv: socket not connected', self) | ||
# If requests are pending, we should close the socket and | ||
# fail all the pending request futures | ||
if self.in_flight_requests: | ||
self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests')) | ||
return () | ||
with self._ifr_lock: | ||
if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING: | ||
log.warning('%s cannot recv: socket not connected', self) | ||
# If requests are pending, we should close the socket and | ||
# fail all the pending request futures | ||
if self.in_flight_requests: | ||
self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests')) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, not another one :( |
||
return () | ||
|
||
elif not self.in_flight_requests: | ||
log.warning('%s: No in-flight-requests to recv', self) | ||
return () | ||
elif not self.in_flight_requests: | ||
log.warning('%s: No in-flight-requests to recv', self) | ||
return () | ||
|
||
responses = self._recv() | ||
if not responses and self.requests_timed_out(): | ||
|
@@ -839,9 +852,14 @@ def recv(self): | |
# augment respones w/ correlation_id, future, and timestamp | ||
for i, (correlation_id, response) in enumerate(responses): | ||
try: | ||
(future, timestamp) = self.in_flight_requests.pop(correlation_id) | ||
with self._ifr_lock: | ||
(future, timestamp) = self.in_flight_requests.pop(correlation_id) | ||
except KeyError: | ||
self.close(Errors.KafkaConnectionError('Received unrecognized correlation id')) | ||
if self.disconnected(): | ||
log.warning('%s: Received response %s after the connection had been closed.', | ||
self, response.__class__.__name__) | ||
else: | ||
self.close(Errors.KafkaConnectionError('Received unrecognized correlation id')) | ||
return () | ||
latency_ms = (time.time() - timestamp) * 1000 | ||
if self._sensors: | ||
|
@@ -895,15 +913,20 @@ def _recv(self): | |
else: | ||
return responses | ||
|
||
def has_in_flight_requests(self): | ||
with self._ifr_lock: | ||
return bool(self.in_flight_requests) | ||
|
||
def requests_timed_out(self): | ||
if self.in_flight_requests: | ||
get_timestamp = lambda v: v[1] | ||
oldest_at = min(map(get_timestamp, | ||
self.in_flight_requests.values())) | ||
timeout = self.config['request_timeout_ms'] / 1000.0 | ||
if time.time() >= oldest_at + timeout: | ||
return True | ||
return False | ||
with self._ifr_lock: | ||
if self.in_flight_requests: | ||
get_timestamp = lambda v: v[1] | ||
oldest_at = min(map(get_timestamp, | ||
self.in_flight_requests.values())) | ||
timeout = self.config['request_timeout_ms'] / 1000.0 | ||
if time.time() >= oldest_at + timeout: | ||
return True | ||
return False | ||
|
||
def _handle_api_version_response(self, response): | ||
error_type = Errors.for_code(response.error_code) | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition which causes the "Protocol out of sync" (the root cause) happens on this condition.