Skip to content

Commit 09c05e5

Browse files
committed
Support ListOffsets v3 in consumer
1 parent 5806959 commit 09c05e5

File tree

2 files changed

+36
-3
lines changed

2 files changed

+36
-3
lines changed

kafka/consumer/fetcher.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ def on_fail(err):
570570
return list_offsets_future
571571

572572
def _send_offset_request(self, node_id, timestamps):
573-
version = self._client.api_version(ListOffsetsRequest, max_version=1)
573+
version = self._client.api_version(ListOffsetsRequest, max_version=3)
574574
by_topic = collections.defaultdict(list)
575575
for tp, timestamp in six.iteritems(timestamps):
576576
if version >= 1:
@@ -579,7 +579,16 @@ def _send_offset_request(self, node_id, timestamps):
579579
data = (tp.partition, timestamp, 1)
580580
by_topic[tp.topic].append(data)
581581

582-
request = ListOffsetsRequest[version](-1, list(six.iteritems(by_topic)))
582+
if version <= 1:
583+
request = ListOffsetsRequest[version](
584+
-1,
585+
list(six.iteritems(by_topic)))
586+
else:
587+
request = ListOffsetsRequest[version](
588+
-1,
589+
self._isolation_level,
590+
list(six.iteritems(by_topic)))
591+
583592

584593
# Client returns a future that only fails on network issues
585594
# so create a separate future and attach a callback to update it
@@ -601,6 +610,8 @@ def _handle_offset_response(self, future, response):
601610
Raises:
602611
AssertionError: if response does not match partition
603612
"""
613+
if response.API_VERSION >= 2 and response.throttle_time_ms > 0:
614+
log.warning("ListOffsetsRequest throttled by broker (%d ms)", response.throttle_time_ms)
604615
timestamp_offset_map = {}
605616
for topic, part_data in response.topics:
606617
for partition_info in part_data:

test/test_fetcher.py

+23-1
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ def send_side_effect(node_id, timestamps):
261261
assert isinstance(fut.exception, UnknownTopicOrPartitionError)
262262

263263

264-
def test__handle_offset_response(fetcher, mocker):
264+
def test__handle_offset_response_v1(fetcher, mocker):
265265
# Broker returns UnsupportedForMessageFormatError, will omit partition
266266
fut = Future()
267267
res = ListOffsetsResponse[1]([
@@ -304,6 +304,28 @@ def test__handle_offset_response(fetcher, mocker):
304304
assert isinstance(fut.exception, NotLeaderForPartitionError)
305305

306306

307+
def test__handle_offset_response_v2_v3(fetcher, mocker):
308+
# including a throttle_time shouldnt cause issues
309+
fut = Future()
310+
res = ListOffsetsResponse[2](
311+
123, # throttle_time_ms
312+
[("topic", [(0, 0, 1000, 9999)])
313+
])
314+
fetcher._handle_offset_response(fut, res)
315+
assert fut.succeeded()
316+
assert fut.value == {TopicPartition("topic", 0): (9999, 1000)}
317+
318+
# v3 response is the same format
319+
fut = Future()
320+
res = ListOffsetsResponse[3](
321+
123, # throttle_time_ms
322+
[("topic", [(0, 0, 1000, 9999)])
323+
])
324+
fetcher._handle_offset_response(fut, res)
325+
assert fut.succeeded()
326+
assert fut.value == {TopicPartition("topic", 0): (9999, 1000)}
327+
328+
307329
def test_fetched_records(fetcher, topic, mocker):
308330
fetcher.config['check_crcs'] = False
309331
tp = TopicPartition(topic, 0)

0 commit comments

Comments
 (0)