Skip to content

Commit b1cc966

Browse files
authored
KIP-144: Exponential backoff for broker reconnections (#1124)
1 parent cceaf4a commit b1cc966

File tree

4 files changed

+69
-22
lines changed

4 files changed

+69
-22
lines changed

kafka/client_async.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ class KafkaClient(object):
6767
reconnect_backoff_ms (int): The amount of time in milliseconds to
6868
wait before attempting to reconnect to a given host.
6969
Default: 50.
70+
reconnect_backoff_max_ms (int): The maximum amount of time in
71+
milliseconds to wait when reconnecting to a broker that has
72+
repeatedly failed to connect. If provided, the backoff per host
73+
will increase exponentially for each consecutive connection
74+
failure, up to this maximum. To avoid connection storms, a
75+
randomization factor of 0.2 will be applied to the backoff
76+
resulting in a random range between 20% below and 20% above
77+
the computed value. Default: 1000.
7078
request_timeout_ms (int): Client request timeout in milliseconds.
7179
Default: 40000.
7280
retry_backoff_ms (int): Milliseconds to backoff when retrying on
@@ -137,6 +145,7 @@ class KafkaClient(object):
137145
'request_timeout_ms': 40000,
138146
'connections_max_idle_ms': 9 * 60 * 1000,
139147
'reconnect_backoff_ms': 50,
148+
'reconnect_backoff_max_ms': 1000,
140149
'max_in_flight_requests_per_connection': 5,
141150
'receive_buffer_bytes': None,
142151
'send_buffer_bytes': None,
@@ -432,15 +441,7 @@ def connection_delay(self, node_id):
432441
"""
433442
if node_id not in self._conns:
434443
return 0
435-
436-
conn = self._conns[node_id]
437-
time_waited_ms = time.time() - (conn.last_attempt or 0)
438-
if conn.disconnected():
439-
return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
440-
elif conn.connecting():
441-
return 0
442-
else:
443-
return 999999999
444+
return self._conns[node_id].connection_delay()
444445

445446
def is_ready(self, node_id, metadata_priority=True):
446447
"""Check whether a node is ready to send more requests.
@@ -655,12 +656,10 @@ def in_flight_request_count(self, node_id=None):
655656
def least_loaded_node(self):
656657
"""Choose the node with fewest outstanding requests, with fallbacks.
657658
658-
This method will prefer a node with an existing connection, but will
659-
potentially choose a node for which we don't yet have a connection if
660-
all existing connections are in use. This method will never choose a
661-
node that was disconnected within the reconnect backoff period.
662-
If all else fails, the method will attempt to bootstrap again using the
663-
bootstrap_servers list.
659+
This method will prefer a node with an existing connection and no
660+
in-flight-requests. If no such node is found, a node will be chosen
661+
randomly from disconnected nodes that are not "blacked out" (i.e.,
662+
are not subject to a reconnect backoff).
664663
665664
Returns:
666665
node_id or None if no suitable node was found
@@ -695,10 +694,6 @@ def least_loaded_node(self):
695694
elif 'bootstrap' in self._conns:
696695
return 'bootstrap'
697696

698-
# Last option: try to bootstrap again
699-
# this should only happen if no prior bootstrap has been successful
700-
log.error('No nodes found in metadata -- retrying bootstrap')
701-
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
702697
return None
703698

704699
def set_topics(self, topics):

kafka/conn.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import errno
66
import logging
77
import io
8-
from random import shuffle
8+
from random import shuffle, uniform
99
import socket
1010
import time
1111
import traceback
@@ -78,6 +78,14 @@ class BrokerConnection(object):
7878
reconnect_backoff_ms (int): The amount of time in milliseconds to
7979
wait before attempting to reconnect to a given host.
8080
Default: 50.
81+
reconnect_backoff_max_ms (int): The maximum amount of time in
82+
milliseconds to wait when reconnecting to a broker that has
83+
repeatedly failed to connect. If provided, the backoff per host
84+
will increase exponentially for each consecutive connection
85+
failure, up to this maximum. To avoid connection storms, a
86+
randomization factor of 0.2 will be applied to the backoff
87+
resulting in a random range between 20% below and 20% above
88+
the computed value. Default: 1000.
8189
request_timeout_ms (int): Client request timeout in milliseconds.
8290
Default: 40000.
8391
max_in_flight_requests_per_connection (int): Requests are pipelined
@@ -140,6 +148,7 @@ class BrokerConnection(object):
140148
'node_id': 0,
141149
'request_timeout_ms': 40000,
142150
'reconnect_backoff_ms': 50,
151+
'reconnect_backoff_max_ms': 1000,
143152
'max_in_flight_requests_per_connection': 5,
144153
'receive_buffer_bytes': None,
145154
'send_buffer_bytes': None,
@@ -199,6 +208,7 @@ def __init__(self, host, port, afi, **configs):
199208
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
200209

201210
self.state = ConnectionStates.DISCONNECTED
211+
self._reset_reconnect_backoff()
202212
self._sock = None
203213
self._ssl_context = None
204214
if self.config['ssl_context'] is not None:
@@ -305,6 +315,7 @@ def connect(self):
305315
else:
306316
log.debug('%s: Connection complete.', self)
307317
self.state = ConnectionStates.CONNECTED
318+
self._reset_reconnect_backoff()
308319
self.config['state_change_callback'](self)
309320

310321
# Connection failed
@@ -340,6 +351,7 @@ def connect(self):
340351
log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
341352
log.debug('%s: Connection complete.', self)
342353
self.state = ConnectionStates.CONNECTED
354+
self._reset_reconnect_backoff()
343355
self.config['state_change_callback'](self)
344356

345357
return self.state
@@ -475,11 +487,19 @@ def blacked_out(self):
475487
re-establish a connection yet
476488
"""
477489
if self.state is ConnectionStates.DISCONNECTED:
478-
backoff = self.config['reconnect_backoff_ms'] / 1000.0
479-
if time.time() < self.last_attempt + backoff:
490+
if time.time() < self.last_attempt + self._reconnect_backoff:
480491
return True
481492
return False
482493

494+
def connection_delay(self):
495+
time_waited_ms = time.time() - (self.last_attempt or 0)
496+
if self.state is ConnectionStates.DISCONNECTED:
497+
return max(self._reconnect_backoff - time_waited_ms, 0)
498+
elif self.connecting():
499+
return 0
500+
else:
501+
return 999999999
502+
483503
def connected(self):
484504
"""Return True iff socket is connected."""
485505
return self.state is ConnectionStates.CONNECTED
@@ -495,6 +515,19 @@ def disconnected(self):
495515
"""Return True iff socket is closed"""
496516
return self.state is ConnectionStates.DISCONNECTED
497517

518+
def _reset_reconnect_backoff(self):
519+
self._failures = 0
520+
self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0
521+
522+
def _update_reconnect_backoff(self):
523+
if self.config['reconnect_backoff_max_ms'] > self.config['reconnect_backoff_ms']:
524+
self._failures += 1
525+
self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1)
526+
self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms'])
527+
self._reconnect_backoff *= uniform(0.8, 1.2)
528+
self._reconnect_backoff /= 1000.0
529+
log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)
530+
498531
def close(self, error=None):
499532
"""Close socket and fail all in-flight-requests.
500533
@@ -512,6 +545,7 @@ def close(self, error=None):
512545
log.info('%s: Closing connection. %s', self, error or '')
513546
self.state = ConnectionStates.DISCONNECTING
514547
self.config['state_change_callback'](self)
548+
self._update_reconnect_backoff()
515549
if self._sock:
516550
self._sock.close()
517551
self._sock = None

kafka/consumer/group.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,14 @@ class KafkaConsumer(six.Iterator):
9191
reconnect_backoff_ms (int): The amount of time in milliseconds to
9292
wait before attempting to reconnect to a given host.
9393
Default: 50.
94+
reconnect_backoff_max_ms (int): The maximum amount of time in
95+
milliseconds to wait when reconnecting to a broker that has
96+
repeatedly failed to connect. If provided, the backoff per host
97+
will increase exponentially for each consecutive connection
98+
failure, up to this maximum. To avoid connection storms, a
99+
randomization factor of 0.2 will be applied to the backoff
100+
resulting in a random range between 20% below and 20% above
101+
the computed value. Default: 1000.
94102
max_in_flight_requests_per_connection (int): Requests are pipelined
95103
to kafka brokers up to this number of maximum requests per
96104
broker connection. Default: 5.
@@ -230,6 +238,7 @@ class KafkaConsumer(six.Iterator):
230238
'request_timeout_ms': 40 * 1000,
231239
'retry_backoff_ms': 100,
232240
'reconnect_backoff_ms': 50,
241+
'reconnect_backoff_max_ms': 1000,
233242
'max_in_flight_requests_per_connection': 5,
234243
'auto_offset_reset': 'latest',
235244
'enable_auto_commit': True,

kafka/producer/kafka.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,14 @@ class KafkaProducer(object):
199199
reconnect_backoff_ms (int): The amount of time in milliseconds to
200200
wait before attempting to reconnect to a given host.
201201
Default: 50.
202+
reconnect_backoff_max_ms (int): The maximum amount of time in
203+
milliseconds to wait when reconnecting to a broker that has
204+
repeatedly failed to connect. If provided, the backoff per host
205+
will increase exponentially for each consecutive connection
206+
failure, up to this maximum. To avoid connection storms, a
207+
randomization factor of 0.2 will be applied to the backoff
208+
resulting in a random range between 20% below and 20% above
209+
the computed value. Default: 1000.
202210
max_in_flight_requests_per_connection (int): Requests are pipelined
203211
to kafka brokers up to this number of maximum requests per
204212
broker connection. Default: 5.
@@ -276,6 +284,7 @@ class KafkaProducer(object):
276284
'send_buffer_bytes': None,
277285
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
278286
'reconnect_backoff_ms': 50,
287+
'reconnect_backoff_max': 1000,
279288
'max_in_flight_requests_per_connection': 5,
280289
'security_protocol': 'PLAINTEXT',
281290
'ssl_context': None,

0 commit comments

Comments
 (0)