13
13
from kafka .future import Future
14
14
from kafka .metrics .stats import Avg , Count , Max , Rate
15
15
from kafka .protocol .fetch import FetchRequest
16
- from kafka .protocol .offset import (
17
- OffsetRequest , OffsetResetStrategy , UNKNOWN_OFFSET
16
+ from kafka .protocol .list_offsets import (
17
+ ListOffsetsRequest , OffsetResetStrategy , UNKNOWN_OFFSET
18
18
)
19
19
from kafka .record import MemoryRecords
20
20
from kafka .serializer import Deserializer
@@ -272,7 +272,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
272
272
if not timestamps :
273
273
return {}
274
274
275
- future = self ._send_offset_requests (timestamps )
275
+ future = self ._send_list_offsets_requests (timestamps )
276
276
self ._client .poll (future = future , timeout_ms = remaining_ms )
277
277
278
278
if future .succeeded ():
@@ -519,7 +519,7 @@ def _deserialize(self, f, topic, bytes_):
519
519
return f .deserialize (topic , bytes_ )
520
520
return f (bytes_ )
521
521
522
- def _send_offset_requests (self , timestamps ):
522
+ def _send_list_offsets_requests (self , timestamps ):
523
523
"""Fetch offsets for each partition in timestamps dict. This may send
524
524
request to multiple nodes, based on who is Leader for partition.
525
525
@@ -564,13 +564,13 @@ def on_fail(err):
564
564
list_offsets_future .failure (err )
565
565
566
566
for node_id , timestamps in six .iteritems (timestamps_by_node ):
567
- _f = self ._send_offset_request (node_id , timestamps )
567
+ _f = self ._send_list_offsets_request (node_id , timestamps )
568
568
_f .add_callback (on_success )
569
569
_f .add_errback (on_fail )
570
570
return list_offsets_future
571
571
572
- def _send_offset_request (self , node_id , timestamps ):
573
- version = self ._client .api_version (OffsetRequest , max_version = 1 )
572
+ def _send_list_offsets_request (self , node_id , timestamps ):
573
+ version = self ._client .api_version (ListOffsetsRequest , max_version = 3 )
574
574
by_topic = collections .defaultdict (list )
575
575
for tp , timestamp in six .iteritems (timestamps ):
576
576
if version >= 1 :
@@ -579,28 +579,39 @@ def _send_offset_request(self, node_id, timestamps):
579
579
data = (tp .partition , timestamp , 1 )
580
580
by_topic [tp .topic ].append (data )
581
581
582
- request = OffsetRequest [version ](- 1 , list (six .iteritems (by_topic )))
582
+ if version <= 1 :
583
+ request = ListOffsetsRequest [version ](
584
+ - 1 ,
585
+ list (six .iteritems (by_topic )))
586
+ else :
587
+ request = ListOffsetsRequest [version ](
588
+ - 1 ,
589
+ self ._isolation_level ,
590
+ list (six .iteritems (by_topic )))
591
+
583
592
584
593
# Client returns a future that only fails on network issues
585
594
# so create a separate future and attach a callback to update it
586
595
# based on response error codes
587
596
future = Future ()
588
597
589
598
_f = self ._client .send (node_id , request )
590
- _f .add_callback (self ._handle_offset_response , future )
599
+ _f .add_callback (self ._handle_list_offsets_response , future )
591
600
_f .add_errback (lambda e : future .failure (e ))
592
601
return future
593
602
594
- def _handle_offset_response (self , future , response ):
595
- """Callback for the response of the list offset call above.
603
+ def _handle_list_offsets_response (self , future , response ):
604
+ """Callback for the response of the ListOffsets api call
596
605
597
606
Arguments:
598
607
future (Future): the future to update based on response
599
- response (OffsetResponse ): response from the server
608
+ response (ListOffsetsResponse ): response from the server
600
609
601
610
Raises:
602
611
AssertionError: if response does not match partition
603
612
"""
613
+ if response .API_VERSION >= 2 and response .throttle_time_ms > 0 :
614
+ log .warning ("ListOffsetsRequest throttled by broker (%d ms)" , response .throttle_time_ms )
604
615
timestamp_offset_map = {}
605
616
for topic , part_data in response .topics :
606
617
for partition_info in part_data :
@@ -610,18 +621,18 @@ def _handle_offset_response(self, future, response):
610
621
if error_type is Errors .NoError :
611
622
if response .API_VERSION == 0 :
612
623
offsets = partition_info [2 ]
613
- assert len (offsets ) <= 1 , 'Expected OffsetResponse with one offset'
624
+ assert len (offsets ) <= 1 , 'Expected ListOffsetsResponse with one offset'
614
625
if not offsets :
615
626
offset = UNKNOWN_OFFSET
616
627
else :
617
628
offset = offsets [0 ]
618
- log .debug ("Handling v0 ListOffsetResponse response for %s. "
629
+ log .debug ("Handling v0 ListOffsetsResponse response for %s. "
619
630
"Fetched offset %s" , partition , offset )
620
631
if offset != UNKNOWN_OFFSET :
621
632
timestamp_offset_map [partition ] = (offset , None )
622
633
else :
623
634
timestamp , offset = partition_info [2 :]
624
- log .debug ("Handling ListOffsetResponse response for %s. "
635
+ log .debug ("Handling ListOffsetsResponse response for %s. "
625
636
"Fetched offset %s, timestamp %s" ,
626
637
partition , offset , timestamp )
627
638
if offset != UNKNOWN_OFFSET :
@@ -638,7 +649,7 @@ def _handle_offset_response(self, future, response):
638
649
future .failure (error_type (partition ))
639
650
return
640
651
elif error_type is Errors .UnknownTopicOrPartitionError :
641
- log .warning ("Received unknown topic or partition error in ListOffset "
652
+ log .warning ("Received unknown topic or partition error in ListOffsets "
642
653
"request for partition %s. The topic/partition " +
643
654
"may not exist or the user may not have Describe access "
644
655
"to it." , partition )
0 commit comments