Skip to content

KIP-144: Exponential backoff for broker reconnections #1124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 14 additions & 19 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ class KafkaClient(object):
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
reconnect_backoff_max_ms (int): The maximum amount of time in
milliseconds to wait when reconnecting to a broker that has
repeatedly failed to connect. If provided, the backoff per host
will increase exponentially for each consecutive connection
failure, up to this maximum. To avoid connection storms, a
randomization factor of 0.2 will be applied to the backoff
resulting in a random range between 20% below and 20% above
the computed value. Default: 1000.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000.
retry_backoff_ms (int): Milliseconds to backoff when retrying on
Expand Down Expand Up @@ -137,6 +145,7 @@ class KafkaClient(object):
'request_timeout_ms': 40000,
'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
Expand Down Expand Up @@ -432,15 +441,7 @@ def connection_delay(self, node_id):
"""
if node_id not in self._conns:
return 0

conn = self._conns[node_id]
time_waited_ms = time.time() - (conn.last_attempt or 0)
if conn.disconnected():
return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
elif conn.connecting():
return 0
else:
return 999999999
return self._conns[node_id].connection_delay()

def is_ready(self, node_id, metadata_priority=True):
"""Check whether a node is ready to send more requests.
Expand Down Expand Up @@ -655,12 +656,10 @@ def in_flight_request_count(self, node_id=None):
def least_loaded_node(self):
"""Choose the node with fewest outstanding requests, with fallbacks.

This method will prefer a node with an existing connection, but will
potentially choose a node for which we don't yet have a connection if
all existing connections are in use. This method will never choose a
node that was disconnected within the reconnect backoff period.
If all else fails, the method will attempt to bootstrap again using the
bootstrap_servers list.
This method will prefer a node with an existing connection and no
in-flight-requests. If no such node is found, a node will be chosen
randomly from disconnected nodes that are not "blacked out" (i.e.,
are not subject to a reconnect backoff).

Returns:
node_id or None if no suitable node was found
Expand Down Expand Up @@ -695,10 +694,6 @@ def least_loaded_node(self):
elif 'bootstrap' in self._conns:
return 'bootstrap'

# Last option: try to bootstrap again
# this should only happen if no prior bootstrap has been successful
log.error('No nodes found in metadata -- retrying bootstrap')
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
return None

def set_topics(self, topics):
Expand Down
40 changes: 37 additions & 3 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import errno
import logging
import io
from random import shuffle
from random import shuffle, uniform
import socket
import time
import traceback
Expand Down Expand Up @@ -78,6 +78,14 @@ class BrokerConnection(object):
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
reconnect_backoff_max_ms (int): The maximum amount of time in
milliseconds to wait when reconnecting to a broker that has
repeatedly failed to connect. If provided, the backoff per host
will increase exponentially for each consecutive connection
failure, up to this maximum. To avoid connection storms, a
randomization factor of 0.2 will be applied to the backoff
resulting in a random range between 20% below and 20% above
the computed value. Default: 1000.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000.
max_in_flight_requests_per_connection (int): Requests are pipelined
Expand Down Expand Up @@ -140,6 +148,7 @@ class BrokerConnection(object):
'node_id': 0,
'request_timeout_ms': 40000,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
Expand Down Expand Up @@ -199,6 +208,7 @@ def __init__(self, host, port, afi, **configs):
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'

self.state = ConnectionStates.DISCONNECTED
self._reset_reconnect_backoff()
self._sock = None
self._ssl_context = None
if self.config['ssl_context'] is not None:
Expand Down Expand Up @@ -305,6 +315,7 @@ def connect(self):
else:
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self)

# Connection failed
Expand Down Expand Up @@ -340,6 +351,7 @@ def connect(self):
log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self)

return self.state
Expand Down Expand Up @@ -475,11 +487,19 @@ def blacked_out(self):
re-establish a connection yet
"""
if self.state is ConnectionStates.DISCONNECTED:
backoff = self.config['reconnect_backoff_ms'] / 1000.0
if time.time() < self.last_attempt + backoff:
if time.time() < self.last_attempt + self._reconnect_backoff:
return True
return False

def connection_delay(self):
time_waited_ms = time.time() - (self.last_attempt or 0)
if self.state is ConnectionStates.DISCONNECTED:
return max(self._reconnect_backoff - time_waited_ms, 0)
elif self.connecting():
return 0
else:
return 999999999

def connected(self):
"""Return True iff socket is connected."""
return self.state is ConnectionStates.CONNECTED
Expand All @@ -495,6 +515,19 @@ def disconnected(self):
"""Return True iff socket is closed"""
return self.state is ConnectionStates.DISCONNECTED

def _reset_reconnect_backoff(self):
self._failures = 0
self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0

def _update_reconnect_backoff(self):
if self.config['reconnect_backoff_max_ms'] > self.config['reconnect_backoff_ms']:
self._failures += 1
self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1)
self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms'])
self._reconnect_backoff *= uniform(0.8, 1.2)
self._reconnect_backoff /= 1000.0
log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)

def close(self, error=None):
"""Close socket and fail all in-flight-requests.

Expand All @@ -512,6 +545,7 @@ def close(self, error=None):
log.info('%s: Closing connection. %s', self, error or '')
self.state = ConnectionStates.DISCONNECTING
self.config['state_change_callback'](self)
self._update_reconnect_backoff()
if self._sock:
self._sock.close()
self._sock = None
Expand Down
9 changes: 9 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ class KafkaConsumer(six.Iterator):
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
reconnect_backoff_max_ms (int): The maximum amount of time in
milliseconds to wait when reconnecting to a broker that has
repeatedly failed to connect. If provided, the backoff per host
will increase exponentially for each consecutive connection
failure, up to this maximum. To avoid connection storms, a
randomization factor of 0.2 will be applied to the backoff
resulting in a random range between 20% below and 20% above
the computed value. Default: 1000.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
Expand Down Expand Up @@ -230,6 +238,7 @@ class KafkaConsumer(six.Iterator):
'request_timeout_ms': 40 * 1000,
'retry_backoff_ms': 100,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
'auto_offset_reset': 'latest',
'enable_auto_commit': True,
Expand Down
9 changes: 9 additions & 0 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ class KafkaProducer(object):
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
reconnect_backoff_max_ms (int): The maximum amount of time in
milliseconds to wait when reconnecting to a broker that has
repeatedly failed to connect. If provided, the backoff per host
will increase exponentially for each consecutive connection
failure, up to this maximum. To avoid connection storms, a
randomization factor of 0.2 will be applied to the backoff
resulting in a random range between 20% below and 20% above
the computed value. Default: 1000.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
Expand Down Expand Up @@ -276,6 +284,7 @@ class KafkaProducer(object):
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'reconnect_backoff_ms': 50,
'reconnect_backoff_max': 1000,
'max_in_flight_requests_per_connection': 5,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
Expand Down