@@ -453,7 +453,7 @@ def _send_join_group_request(self):
453
453
(protocol , metadata if isinstance (metadata , bytes ) else metadata .encode ())
454
454
for protocol , metadata in self .group_protocols ()
455
455
]
456
- version = self ._client .api_version (JoinGroupRequest , max_version = 2 )
456
+ version = self ._client .api_version (JoinGroupRequest , max_version = 3 )
457
457
if version == 0 :
458
458
request = JoinGroupRequest [version ](
459
459
self .group_id ,
@@ -493,6 +493,11 @@ def _failed_request(self, node_id, request, future, error):
493
493
future .failure (error )
494
494
495
495
def _handle_join_group_response (self , future , send_time , response ):
496
+ if response .API_VERSION >= 2 :
497
+ self .sensors .throttle_time .record (response .throttle_time_ms )
498
+ if response .throttle_time_ms > 0 :
499
+ log .warning ("JoinGroupRequest throttled by broker (%d ms)" , response .throttle_time_ms )
500
+
496
501
error_type = Errors .for_code (response .error_code )
497
502
if error_type is Errors .NoError :
498
503
log .debug ("Received successful JoinGroup response for group %s: %s" ,
@@ -554,7 +559,7 @@ def _handle_join_group_response(self, future, send_time, response):
554
559
555
560
def _on_join_follower (self ):
556
561
# send follower's sync group with an empty assignment
557
- version = self ._client .api_version (SyncGroupRequest , max_version = 1 )
562
+ version = self ._client .api_version (SyncGroupRequest , max_version = 2 )
558
563
request = SyncGroupRequest [version ](
559
564
self .group_id ,
560
565
self ._generation .generation_id ,
@@ -582,7 +587,7 @@ def _on_join_leader(self, response):
582
587
except Exception as e :
583
588
return Future ().failure (e )
584
589
585
- version = self ._client .api_version (SyncGroupRequest , max_version = 1 )
590
+ version = self ._client .api_version (SyncGroupRequest , max_version = 2 )
586
591
request = SyncGroupRequest [version ](
587
592
self .group_id ,
588
593
self ._generation .generation_id ,
@@ -614,6 +619,11 @@ def _send_sync_group_request(self, request):
614
619
return future
615
620
616
621
def _handle_sync_group_response (self , future , send_time , response ):
622
+ if response .API_VERSION >= 1 :
623
+ self .sensors .throttle_time .record (response .throttle_time_ms )
624
+ if response .throttle_time_ms > 0 :
625
+ log .warning ("SyncGroupRequest throttled by broker (%d ms)" , response .throttle_time_ms )
626
+
617
627
error_type = Errors .for_code (response .error_code )
618
628
if error_type is Errors .NoError :
619
629
self .sensors .sync_latency .record ((time .time () - send_time ) * 1000 )
@@ -770,7 +780,7 @@ def maybe_leave_group(self):
770
780
# this is a minimal effort attempt to leave the group. we do not
771
781
# attempt any resending if the request fails or times out.
772
782
log .info ('Leaving consumer group (%s).' , self .group_id )
773
- version = self ._client .api_version (LeaveGroupRequest , max_version = 1 )
783
+ version = self ._client .api_version (LeaveGroupRequest , max_version = 2 )
774
784
request = LeaveGroupRequest [version ](self .group_id , self ._generation .member_id )
775
785
future = self ._client .send (self .coordinator_id , request )
776
786
future .add_callback (self ._handle_leave_group_response )
@@ -780,6 +790,11 @@ def maybe_leave_group(self):
780
790
self .reset_generation ()
781
791
782
792
def _handle_leave_group_response (self , response ):
793
+ if response .API_VERSION >= 1 :
794
+ self .sensors .throttle_time .record (response .throttle_time_ms )
795
+ if response .throttle_time_ms > 0 :
796
+ log .warning ("LeaveGroupRequest throttled by broker (%d ms)" , response .throttle_time_ms )
797
+
783
798
error_type = Errors .for_code (response .error_code )
784
799
if error_type is Errors .NoError :
785
800
log .debug ("LeaveGroup request for group %s returned successfully" ,
@@ -798,7 +813,7 @@ def _send_heartbeat_request(self):
798
813
e = Errors .NodeNotReadyError (self .coordinator_id )
799
814
return Future ().failure (e )
800
815
801
- version = self ._client .api_version (HeartbeatRequest , max_version = 1 )
816
+ version = self ._client .api_version (HeartbeatRequest , max_version = 2 )
802
817
request = HeartbeatRequest [version ](self .group_id ,
803
818
self ._generation .generation_id ,
804
819
self ._generation .member_id )
@@ -811,6 +826,11 @@ def _send_heartbeat_request(self):
811
826
return future
812
827
813
828
def _handle_heartbeat_response (self , future , send_time , response ):
829
+ if response .API_VERSION >= 1 :
830
+ self .sensors .throttle_time .record (response .throttle_time_ms )
831
+ if response .throttle_time_ms > 0 :
832
+ log .warning ("HeartbeatRequest throttled by broker (%d ms)" , response .throttle_time_ms )
833
+
814
834
self .sensors .heartbeat_latency .record ((time .time () - send_time ) * 1000 )
815
835
error_type = Errors .for_code (response .error_code )
816
836
if error_type is Errors .NoError :
@@ -899,6 +919,14 @@ def __init__(self, heartbeat, metrics, prefix, tags=None):
899
919
tags ), AnonMeasurable (
900
920
lambda _ , now : (now / 1000 ) - self .heartbeat .last_send ))
901
921
922
+ self .throttle_time = metrics .sensor ('throttle-time' )
923
+ self .throttle_time .add (metrics .metric_name (
924
+ 'throttle-time-avg' , self .metric_group_name ,
925
+ 'The average throttle time in ms' ), Avg ())
926
+ self .throttle_time .add (metrics .metric_name (
927
+ 'throttle-time-max' , self .metric_group_name ,
928
+ 'The maximum throttle time in ms' ), Max ())
929
+
902
930
903
931
class HeartbeatThread (threading .Thread ):
904
932
def __init__ (self , coordinator ):
0 commit comments