Skip to content

Commit c15a7ff

Browse files
authored
Dont block pending FetchRequests when Metadata update requested (#2576)
1 parent a520232 commit c15a7ff

File tree

2 files changed

+25
-8
lines changed

2 files changed

+25
-8
lines changed

kafka/consumer/fetcher.py

+23-8
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ def __init__(self, client, subscriptions, metrics, **configs):
114114
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
115115
self._isolation_level = READ_UNCOMMITTED
116116
self._session_handlers = {}
117+
self._nodes_with_pending_fetch_requests = set()
117118

118119
def send_fetches(self):
119120
"""Send FetchRequests for all assigned partitions that do not already have
@@ -124,12 +125,12 @@ def send_fetches(self):
124125
"""
125126
futures = []
126127
for node_id, (request, fetch_offsets) in six.iteritems(self._create_fetch_requests()):
127-
if self._client.ready(node_id):
128-
log.debug("Sending FetchRequest to node %s", node_id)
129-
future = self._client.send(node_id, request, wakeup=False)
130-
future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.time())
131-
future.add_errback(self._handle_fetch_error, node_id)
132-
futures.append(future)
128+
log.debug("Sending FetchRequest to node %s", node_id)
129+
self._nodes_with_pending_fetch_requests.add(node_id)
130+
future = self._client.send(node_id, request, wakeup=False)
131+
future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.time())
132+
future.add_errback(self._handle_fetch_error, node_id)
133+
futures.append(future)
133134
self._fetch_futures.extend(futures)
134135
self._clean_done_fetch_futures()
135136
return futures
@@ -593,8 +594,20 @@ def _create_fetch_requests(self):
593594
" Requesting metadata update", partition)
594595
self._client.cluster.request_update()
595596

596-
elif self._client.in_flight_request_count(node_id) > 0:
597-
log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s",
597+
elif not self._client.connected(node_id) and self._client.connection_delay(node_id) > 0:
598+
# If we try to send during the reconnect backoff window, then the request is just
599+
# going to be failed anyway before being sent, so skip the send for now
600+
log.log(0, "Skipping fetch for partition %s because node %s is awaiting reconnect backoff",
601+
partition, node_id)
602+
603+
elif self._client.throttle_delay(node_id) > 0:
604+
# If we try to send while throttled, then the request is just
605+
# going to be failed anyway before being sent, so skip the send for now
606+
log.log(0, "Skipping fetch for partition %s because node %s is throttled",
607+
partition, node_id)
608+
609+
elif node_id in self._nodes_with_pending_fetch_requests:
610+
log.log(0, "Skipping fetch for partition %s because there is a pending fetch request to node %s",
598611
partition, node_id)
599612
continue
600613

@@ -707,12 +720,14 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response):
707720
self._completed_fetches.append(completed_fetch)
708721

709722
self._sensors.fetch_latency.record((time.time() - send_time) * 1000)
723+
self._nodes_with_pending_fetch_requests.remove(node_id)
710724

711725
def _handle_fetch_error(self, node_id, exception):
712726
level = logging.INFO if isinstance(exception, Errors.Cancelled) else logging.ERROR
713727
log.log(level, 'Fetch to node %s failed: %s', node_id, exception)
714728
if node_id in self._session_handlers:
715729
self._session_handlers[node_id].handle_error(exception)
730+
self._nodes_with_pending_fetch_requests.remove(node_id)
716731

717732
def _parse_fetched_data(self, completed_fetch):
718733
tp = completed_fetch.topic_partition

test/test_fetcher.py

+2
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ def test_fetched_records(fetcher, topic, mocker):
423423
),
424424
])
425425
def test__handle_fetch_response(fetcher, fetch_offsets, fetch_response, num_partitions):
426+
fetcher._nodes_with_pending_fetch_requests.add(0)
426427
fetcher._handle_fetch_response(0, fetch_offsets, time.time(), fetch_response)
427428
assert len(fetcher._completed_fetches) == num_partitions
428429

@@ -438,6 +439,7 @@ def test__handle_fetch_response(fetcher, fetch_offsets, fetch_response, num_part
438439
)
439440
])
440441
def test__handle_fetch_error(fetcher, caplog, exception, log_level):
442+
fetcher._nodes_with_pending_fetch_requests.add(3)
441443
fetcher._handle_fetch_error(3, exception)
442444
assert len(caplog.records) == 1
443445
assert caplog.records[0].levelname == logging.getLevelName(log_level)

0 commit comments

Comments
 (0)