Skip to content

Commit 7f04909

Browse files
committed
Implement client-side connection throttling / KIP-219
1 parent 6137aed commit 7f04909

File tree

7 files changed

+105
-77
lines changed

7 files changed

+105
-77
lines changed

kafka/client_async.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,16 @@ def connection_delay(self, node_id):
517517
return 0
518518
return conn.connection_delay()
519519

520+
def throttle_delay(self, node_id):
521+
"""
522+
Return the number of milliseconds to wait until a broker is no longer throttled.
523+
When disconnected / connecting, returns 0.
524+
"""
525+
conn = self._conns.get(node_id)
526+
if conn is None:
527+
return 0
528+
return conn.throttle_delay()
529+
520530
def is_ready(self, node_id, metadata_priority=True):
521531
"""Check whether a node is ready to send more requests.
522532
@@ -793,16 +803,17 @@ def _fire_pending_completed_requests(self):
793803
break
794804
future.success(response)
795805
responses.append(response)
806+
796807
return responses
797808

798809
def least_loaded_node(self):
799810
"""Choose the node with fewest outstanding requests, with fallbacks.
800811
801-
This method will prefer a node with an existing connection and no
802-
in-flight-requests. If no such node is found, a node will be chosen
803-
randomly from disconnected nodes that are not "blacked out" (i.e.,
812+
This method will prefer a node with an existing connection (not throttled)
813+
with no in-flight-requests. If no such node is found, a node will be chosen
814+
randomly from all nodes that are not throttled or "blacked out" (i.e.,
804815
are not subject to a reconnect backoff). If no node metadata has been
805-
obtained, will return a bootstrap node (subject to exponential backoff).
816+
obtained, will return a bootstrap node.
806817
807818
Returns:
808819
node_id or None if no suitable node was found
@@ -814,11 +825,11 @@ def least_loaded_node(self):
814825
found = None
815826
for node_id in nodes:
816827
conn = self._conns.get(node_id)
817-
connected = conn is not None and conn.connected()
818-
blacked_out = conn is not None and conn.blacked_out()
828+
connected = conn is not None and conn.connected() and conn.can_send_more()
829+
blacked_out = conn is not None and (conn.blacked_out() or conn.throttled())
819830
curr_inflight = len(conn.in_flight_requests) if conn is not None else 0
820831
if connected and curr_inflight == 0:
821-
# if we find an established connection
832+
# if we find an established connection (not throttled)
822833
# with no in-flight requests, we can stop right away
823834
return node_id
824835
elif not blacked_out and curr_inflight < inflight:
@@ -828,16 +839,23 @@ def least_loaded_node(self):
828839

829840
return found
830841

842+
def _refresh_delay_ms(self, node_id):
843+
conn = self._conns.get(node_id)
844+
if conn is not None and conn.connected():
845+
return self.throttle_delay(node_id)
846+
else:
847+
return self.connection_delay(node_id)
848+
831849
def least_loaded_node_refresh_ms(self):
832-
"""Return connection delay in milliseconds for next available node.
850+
"""Return connection or throttle delay in milliseconds for next available node.
833851
834852
This method is used primarily for retry/backoff during metadata refresh
835853
during / after a cluster outage, in which there are no available nodes.
836854
837855
Returns:
838856
float: delay_ms
839857
"""
840-
return min([self.connection_delay(broker.nodeId) for broker in self.cluster.brokers()])
858+
return min([self._refresh_delay_ms(broker.nodeId) for broker in self.cluster.brokers()])
841859

842860
def set_topics(self, topics):
843861
"""Set specific topics to track for metadata.
@@ -915,8 +933,8 @@ def _maybe_refresh_metadata(self, wakeup=False):
915933
# Connection attempt failed immediately, need to retry with a different node
916934
return self.config['reconnect_backoff_ms']
917935
else:
918-
# Existing connection with max in flight requests. Wait for request to complete.
919-
return self.config['request_timeout_ms']
936+
# Existing connection throttled or max in flight requests.
937+
return self.throttle_delay(node_id) or self.config['request_timeout_ms']
920938

921939
# Recheck node_id in case we were able to connect immediately above
922940
if self._can_send_request(node_id):

kafka/cluster.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -235,9 +235,6 @@ def update_metadata(self, metadata):
235235
236236
Returns: None
237237
"""
238-
if metadata.API_VERSION >= 3 and metadata.throttle_time_ms > 0:
239-
log.warning("MetadataRequest throttled by broker (%d ms)", metadata.throttle_time_ms)
240-
241238
# In the common case where we ask for a single topic and get back an
242239
# error, we should fail the future
243240
if len(metadata.topics) == 1 and metadata.topics[0][0] != Errors.NoError.errno:

kafka/conn.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ def __init__(self, host, port, afi, **configs):
236236
self._sock_afi = afi
237237
self._sock_addr = None
238238
self._api_versions = None
239+
self._throttle_time = None
239240

240241
self.config = copy.copy(self.DEFAULT_CONFIG)
241242
for key in self.config:
@@ -851,6 +852,22 @@ def blacked_out(self):
851852
return self.connection_delay() > 0
852853
return False
853854

855+
def throttled(self):
856+
"""
857+
Return True if we are connected but currently throttled.
858+
"""
859+
if self.state is ConnectionStates.CONNECTED:
860+
return self._throttle_time is not None and self._throttle_time > time.time()
861+
return False
862+
863+
def throttle_delay(self):
864+
"""
865+
Return the number of milliseconds to wait until connection is no longer throttled.
866+
"""
867+
if self._throttle_time is not None:
868+
return max(0, time.time() - self._throttle_time) * 1000
869+
return 0
870+
854871
def connection_delay(self):
855872
"""
856873
Return the number of milliseconds to wait, based on the connection
@@ -976,6 +993,8 @@ def send(self, request, blocking=True, request_timeout_ms=None):
976993
elif not self.connected():
977994
return future.failure(Errors.KafkaConnectionError(str(self)))
978995
elif not self.can_send_more():
996+
if self._throttle_time:
997+
return future.failure(Errors.ThrottlingQuotaExceededError(str(self)))
979998
return future.failure(Errors.TooManyInFlightRequests(str(self)))
980999
return self._send(request, blocking=blocking, request_timeout_ms=request_timeout_ms)
9811000

@@ -1063,8 +1082,28 @@ def send_pending_requests_v2(self):
10631082
self.close(error=error)
10641083
return False
10651084

1085+
def _maybe_throttle(self, response):
1086+
throttle_time_ms = getattr(response, 'throttle_time_ms', 0)
1087+
if self._sensors:
1088+
self._sensors.throttle_time.record(throttle_time_ms)
1089+
if not throttle_time_ms:
1090+
if self._throttle_time is not None:
1091+
self._throttle_time = None
1092+
return
1093+
# Client side throttling enabled in v2.0 brokers
1094+
# prior to that throttling (if present) was managed broker-side
1095+
if not self.config['api_version'] or self.config['api_version'] >= (2, 0):
1096+
throttle_time = time.time() + throttle_time_ms / 1000
1097+
self._throttle_time = max(throttle_time, self._throttle_time or 0)
1098+
log.warning("%s throttled by broker (%d ms)", response.__name__, throttle_time_ms)
1099+
10661100
def can_send_more(self):
1067-
"""Return True unless there are max_in_flight_requests_per_connection."""
1101+
"""Check for throttling / quota violations and max in-flight-requests"""
1102+
if self._throttle_time is not None:
1103+
if self._throttle_time > time.time():
1104+
return False
1105+
# Reset throttle_time if needed
1106+
self._throttle_time = None
10681107
max_ifrs = self.config['max_in_flight_requests_per_connection']
10691108
return len(self.in_flight_requests) < max_ifrs
10701109

@@ -1097,6 +1136,7 @@ def recv(self):
10971136
self._sensors.request_time.record(latency_ms)
10981137

10991138
log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response)
1139+
self._maybe_throttle(response)
11001140
responses[i] = (response, future)
11011141

11021142
return responses
@@ -1399,6 +1439,16 @@ def __init__(self, metrics, metric_group_prefix, node_id):
13991439
'The maximum request latency in ms.'),
14001440
Max())
14011441

1442+
throttle_time = metrics.sensor('throttle-time')
1443+
throttle_time.add(metrics.metric_name(
1444+
'throttle-time-avg', metric_group_name,
1445+
'The average throttle time in ms.'),
1446+
Avg())
1447+
throttle_time.add(metrics.metric_name(
1448+
'throttle-time-max', metric_group_name,
1449+
'The maximum throttle time in ms.'),
1450+
Max())
1451+
14021452
# if one sensor of the metrics has been registered for the connection,
14031453
# then all other sensors should have been registered; and vice versa
14041454
node_str = 'node-{0}'.format(node_id)
@@ -1450,9 +1500,23 @@ def __init__(self, metrics, metric_group_prefix, node_id):
14501500
'The maximum request latency in ms.'),
14511501
Max())
14521502

1503+
throttle_time = metrics.sensor(
1504+
node_str + '.throttle',
1505+
parents=[metrics.get_sensor('throttle-time')])
1506+
throttle_time.add(metrics.metric_name(
1507+
'throttle-time-avg', metric_group_name,
1508+
'The average throttle time in ms.'),
1509+
Avg())
1510+
throttle_time.add(metrics.metric_name(
1511+
'throttle-time-max', metric_group_name,
1512+
'The maximum throttle time in ms.'),
1513+
Max())
1514+
1515+
14531516
self.bytes_sent = metrics.sensor(node_str + '.bytes-sent')
14541517
self.bytes_received = metrics.sensor(node_str + '.bytes-received')
14551518
self.request_time = metrics.sensor(node_str + '.latency')
1519+
self.throttle_time = metrics.sensor(node_str + '.throttle')
14561520

14571521

14581522
def _address_family(address):

kafka/consumer/fetcher.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -614,8 +614,6 @@ def _handle_list_offsets_response(self, future, response):
614614
Raises:
615615
AssertionError: if response does not match partition
616616
"""
617-
if response.API_VERSION >= 2 and response.throttle_time_ms > 0:
618-
log.warning("ListOffsetsRequest throttled by broker (%d ms)", response.throttle_time_ms)
619617
timestamp_offset_map = {}
620618
for topic, part_data in response.topics:
621619
for partition_info in part_data:
@@ -816,8 +814,6 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response):
816814
)
817815
self._completed_fetches.append(completed_fetch)
818816

819-
if response.API_VERSION >= 1:
820-
self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms)
821817
self._sensors.fetch_latency.record((time.time() - send_time) * 1000)
822818

823819
def _handle_fetch_error(self, node_id, exception):
@@ -1032,6 +1028,11 @@ def handle_response(self, response):
10321028
self.node_id, len(response_tps))
10331029
self.next_metadata = FetchMetadata.INITIAL
10341030
return True
1031+
elif response.session_id == FetchMetadata.THROTTLED_SESSION_ID:
1032+
log.debug("Node %s sent a empty full fetch response due to a quota violation (%s partitions)",
1033+
self.node_id, len(response_tps))
1034+
# Keep current metadata
1035+
return True
10351036
else:
10361037
# The server created a new incremental fetch session.
10371038
log.debug("Node %s sent a full fetch response that created a new incremental fetch session %s"
@@ -1054,6 +1055,11 @@ def handle_response(self, response):
10541055
len(response_tps), len(self.session_partitions) - len(response_tps))
10551056
self.next_metadata = FetchMetadata.INITIAL
10561057
return True
1058+
elif response.session_id == FetchMetadata.THROTTLED_SESSION_ID:
1059+
log.debug("Node %s sent a empty incremental fetch response due to a quota violation (%s partitions)",
1060+
self.node_id, len(response_tps))
1061+
# Keep current metadata
1062+
return True
10571063
else:
10581064
# The incremental fetch session was continued by the server.
10591065
log.debug("Node %s sent an incremental fetch response for session %s"
@@ -1077,6 +1083,7 @@ class FetchMetadata(object):
10771083

10781084
MAX_EPOCH = 2147483647
10791085
INVALID_SESSION_ID = 0 # used by clients with no session.
1086+
THROTTLED_SESSION_ID = -1 # returned with empty response on quota violation
10801087
INITIAL_EPOCH = 0 # client wants to create or recreate a session.
10811088
FINAL_EPOCH = -1 # client wants to close any existing session, and not create a new one.
10821089

@@ -1217,12 +1224,6 @@ def __init__(self, metrics, prefix):
12171224
self.records_fetch_lag.add(metrics.metric_name('records-lag-max', self.group_name,
12181225
'The maximum lag in terms of number of records for any partition in self window'), Max())
12191226

1220-
self.fetch_throttle_time_sensor = metrics.sensor('fetch-throttle-time')
1221-
self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-avg', self.group_name,
1222-
'The average throttle time in ms'), Avg())
1223-
self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-max', self.group_name,
1224-
'The maximum throttle time in ms'), Max())
1225-
12261227
def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
12271228
# record bytes fetched
12281229
name = '.'.join(['topic', topic, 'bytes-fetched'])

kafka/coordinator/base.py

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -488,11 +488,6 @@ def _failed_request(self, node_id, request, future, error):
488488
future.failure(error)
489489

490490
def _handle_join_group_response(self, future, send_time, response):
491-
if response.API_VERSION >= 2:
492-
self.sensors.throttle_time.record(response.throttle_time_ms)
493-
if response.throttle_time_ms > 0:
494-
log.warning("JoinGroupRequest throttled by broker (%d ms)", response.throttle_time_ms)
495-
496491
error_type = Errors.for_code(response.error_code)
497492
if error_type is Errors.NoError:
498493
log.debug("Received successful JoinGroup response for group %s: %s",
@@ -614,11 +609,6 @@ def _send_sync_group_request(self, request):
614609
return future
615610

616611
def _handle_sync_group_response(self, future, send_time, response):
617-
if response.API_VERSION >= 1:
618-
self.sensors.throttle_time.record(response.throttle_time_ms)
619-
if response.throttle_time_ms > 0:
620-
log.warning("SyncGroupRequest throttled by broker (%d ms)", response.throttle_time_ms)
621-
622612
error_type = Errors.for_code(response.error_code)
623613
if error_type is Errors.NoError:
624614
self.sensors.sync_latency.record((time.time() - send_time) * 1000)
@@ -678,9 +668,6 @@ def _send_group_coordinator_request(self):
678668
return future
679669

680670
def _handle_group_coordinator_response(self, future, response):
681-
if response.API_VERSION >= 1 and response.throttle_time_ms > 0:
682-
log.warning("FindCoordinatorRequest throttled by broker (%d ms)", response.throttle_time_ms)
683-
684671
log.debug("Received group coordinator response %s", response)
685672

686673
error_type = Errors.for_code(response.error_code)
@@ -785,11 +772,6 @@ def maybe_leave_group(self):
785772
self.reset_generation()
786773

787774
def _handle_leave_group_response(self, response):
788-
if response.API_VERSION >= 1:
789-
self.sensors.throttle_time.record(response.throttle_time_ms)
790-
if response.throttle_time_ms > 0:
791-
log.warning("LeaveGroupRequest throttled by broker (%d ms)", response.throttle_time_ms)
792-
793775
error_type = Errors.for_code(response.error_code)
794776
if error_type is Errors.NoError:
795777
log.debug("LeaveGroup request for group %s returned successfully",
@@ -821,11 +803,6 @@ def _send_heartbeat_request(self):
821803
return future
822804

823805
def _handle_heartbeat_response(self, future, send_time, response):
824-
if response.API_VERSION >= 1:
825-
self.sensors.throttle_time.record(response.throttle_time_ms)
826-
if response.throttle_time_ms > 0:
827-
log.warning("HeartbeatRequest throttled by broker (%d ms)", response.throttle_time_ms)
828-
829806
self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
830807
error_type = Errors.for_code(response.error_code)
831808
if error_type is Errors.NoError:
@@ -914,14 +891,6 @@ def __init__(self, heartbeat, metrics, prefix, tags=None):
914891
tags), AnonMeasurable(
915892
lambda _, now: (now / 1000) - self.heartbeat.last_send))
916893

917-
self.throttle_time = metrics.sensor('throttle-time')
918-
self.throttle_time.add(metrics.metric_name(
919-
'throttle-time-avg', self.metric_group_name,
920-
'The average throttle time in ms'), Avg())
921-
self.throttle_time.add(metrics.metric_name(
922-
'throttle-time-max', self.metric_group_name,
923-
'The maximum throttle time in ms'), Max())
924-
925894

926895
class HeartbeatThread(threading.Thread):
927896
def __init__(self, coordinator):

kafka/coordinator/consumer.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -665,9 +665,6 @@ def _send_offset_commit_request(self, offsets):
665665
return future
666666

667667
def _handle_offset_commit_response(self, offsets, future, send_time, response):
668-
if response.API_VERSION >= 3 and response.throttle_time_ms > 0:
669-
log.warning("OffsetCommitRequest throttled by broker (%d ms)", response.throttle_time_ms)
670-
671668
# TODO look at adding request_latency_ms to response (like java kafka)
672669
self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
673670
unauthorized_topics = set()
@@ -785,9 +782,6 @@ def _send_offset_fetch_request(self, partitions):
785782
return future
786783

787784
def _handle_offset_fetch_response(self, future, response):
788-
if response.API_VERSION >= 3 and response.throttle_time_ms > 0:
789-
log.warning("OffsetFetchRequest throttled by broker (%d ms)", response.throttle_time_ms)
790-
791785
if response.API_VERSION >= 2 and response.error_code != Errors.NoError.errno:
792786
error_type = Errors.for_code(response.error_code)
793787
log.debug("Offset fetch failed: %s", error_type.__name__)

0 commit comments

Comments
 (0)