Skip to content

Retry bootstrapping after backoff when necessary #1736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 66 additions & 83 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class KafkaClient(object):

Keyword Arguments:
bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
strings) that the consumer should contact to bootstrap initial
strings) that the client should contact to bootstrap initial
cluster metadata. This does not have to be the full node list.
It just needs to have at least one broker that will respond to a
Metadata API Request. Default port is 9092. If no servers are
Expand Down Expand Up @@ -223,76 +223,34 @@ def __init__(self, **configs):
self.config['metric_group_prefix'],
weakref.proxy(self._conns))

self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._num_bootstrap_hosts = len(collect_hosts(self.config['bootstrap_servers']))

# Check Broker Version if not set explicitly
if self.config['api_version'] is None:
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
self.config['api_version'] = self.check_version(timeout=check_timeout)

def _bootstrap(self, hosts):
log.info('Bootstrapping cluster metadata from %s', hosts)
# Exponential backoff if bootstrap fails
backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails
def _can_bootstrap(self):
effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts
backoff_factor = 2 ** effective_failures
backoff_ms = min(self.config['reconnect_backoff_ms'] * backoff_factor,
self.config['reconnect_backoff_max_ms'])

backoff_ms *= random.uniform(0.8, 1.2)

next_at = self._last_bootstrap + backoff_ms / 1000.0
self._refresh_on_disconnects = False
now = time.time()
if next_at > now:
log.debug("Sleeping %0.4f before bootstrapping again", next_at - now)
time.sleep(next_at - now)
self._last_bootstrap = time.time()

if self.config['api_version'] is None or self.config['api_version'] < (0, 10):
if self.config['bootstrap_topics_filter']:
metadata_request = MetadataRequest[0](list(self.config['bootstrap_topics_filter']))
else:
metadata_request = MetadataRequest[0]([])
else:
if self.config['bootstrap_topics_filter']:
metadata_request = MetadataRequest[1](list(self.config['bootstrap_topics_filter']))
else:
metadata_request = MetadataRequest[1](None)

for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
cb = functools.partial(WeakMethod(self._conn_state_change), 'bootstrap')
bootstrap = BrokerConnection(host, port, afi,
state_change_callback=cb,
node_id='bootstrap',
**self.config)
if not bootstrap.connect_blocking():
bootstrap.close()
continue
future = bootstrap.send(metadata_request)
while not future.is_done:
self._selector.select(1)
for r, f in bootstrap.recv():
f.success(r)
if future.failed():
bootstrap.close()
continue
self.cluster.update_metadata(future.value)
log.info('Bootstrap succeeded: found %d brokers and %d topics.',
len(self.cluster.brokers()), len(self.cluster.topics()))

# A cluster with no topics can return no broker metadata
# in that case, we should keep the bootstrap connection
if not len(self.cluster.brokers()):
self._conns['bootstrap'] = bootstrap
else:
bootstrap.close()
self._bootstrap_fails = 0
break
# No bootstrap found...
else:
log.error('Unable to bootstrap from %s', hosts)
# Max exponential backoff is 2^12, x4000 (50ms -> 200s)
self._bootstrap_fails = min(self._bootstrap_fails + 1, 12)
self._refresh_on_disconnects = True
return False
return True

def _can_connect(self, node_id):
if node_id not in self._conns:
if self.cluster.broker_metadata(node_id):
# cluster.broker_metadata() is stateful when called w/ 'bootstrap'
# (it cycles through all of the bootstrap servers)
# so we short-circuit here and assume that we should always have
# some bootstrap_servers config to power bootstrap broker_metadata
if node_id == 'bootstrap' or self.cluster.broker_metadata(node_id):
return True
return False
conn = self._conns[node_id]
Expand All @@ -309,6 +267,9 @@ def _conn_state_change(self, node_id, conn):
except KeyError:
self._selector.modify(conn._sock, selectors.EVENT_WRITE)

if node_id == 'bootstrap':
self._last_bootstrap = time.time()

elif conn.connected():
log.debug("Node %s connected", node_id)
if node_id in self._connecting:
Expand All @@ -324,12 +285,12 @@ def _conn_state_change(self, node_id, conn):

self._idle_expiry_manager.update(node_id)

if 'bootstrap' in self._conns and node_id != 'bootstrap':
if node_id == 'bootstrap':
self._bootstrap_fails = 0

elif 'bootstrap' in self._conns:
bootstrap = self._conns.pop('bootstrap')
# XXX: make conn.close() require error to cause refresh
self._refresh_on_disconnects = False
bootstrap.close()
self._refresh_on_disconnects = True

# Connection failures imply that our metadata is stale, so let's refresh
elif conn.state is ConnectionStates.DISCONNECTING:
Expand All @@ -348,7 +309,10 @@ def _conn_state_change(self, node_id, conn):
idle_disconnect = True
self._idle_expiry_manager.remove(node_id)

if self._refresh_on_disconnects and not self._closed and not idle_disconnect:
if node_id == 'bootstrap':
self._bootstrap_fails += 1

elif self._refresh_on_disconnects and not self._closed and not idle_disconnect:
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()

Expand All @@ -363,13 +327,40 @@ def maybe_connect(self, node_id):
return True
return False

def _should_recycle_connection(self, conn):
# Never recycle unless disconnected
if not conn.disconnected():
return False

# Always recycled disconnected bootstraps
elif conn.node_id == 'bootstrap':
return True

# Otherwise, only recycle when broker metadata has changed
broker = self.cluster.broker_metadata(conn.node_id)
if broker is None:
return False

host, _, afi = get_ip_port_afi(broker.host)
if conn.host != host or conn.port != broker.port:
log.info("Broker metadata change detected for node %s"
" from %s:%s to %s:%s", conn.node_id, conn.host, conn.port,
broker.host, broker.port)
return True

return False

def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
with self._lock:
broker = self.cluster.broker_metadata(node_id)
conn = self._conns.get(node_id)

if conn is None:
# Note that when bootstrapping, each call to broker_metadata may
# return a different host/port. So we need to be careful to only
# call when necessary to avoid skipping some possible bootstrap
# source.
broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % (node_id,)

log.debug("Initiating connection to node %s at %s:%s",
Expand All @@ -383,17 +374,9 @@ def _maybe_connect(self, node_id):
self._conns[node_id] = conn

# Check if existing connection should be recreated because host/port changed
elif conn.disconnected() and broker is not None:
host, _, __ = get_ip_port_afi(broker.host)
if conn.host != host or conn.port != broker.port:
log.info("Broker metadata change detected for node %s"
" from %s:%s to %s:%s", node_id, conn.host, conn.port,
broker.host, broker.port)

# Drop old connection object.
# It will be recreated on next _maybe_connect
self._conns.pop(node_id)
return False
elif self._should_recycle_connection(conn):
self._conns.pop(node_id)
return False

elif conn.connected():
return True
Expand Down Expand Up @@ -713,7 +696,8 @@ def least_loaded_node(self):
This method will prefer a node with an existing connection and no
in-flight-requests. If no such node is found, a node will be chosen
randomly from disconnected nodes that are not "blacked out" (i.e.,
are not subject to a reconnect backoff).
are not subject to a reconnect backoff). If no node metadata has been
obtained, will return 'bootstrap' (subject to exponential backoff).

Returns:
node_id or None if no suitable node was found
Expand All @@ -740,12 +724,8 @@ def least_loaded_node(self):
if found is not None:
return found

# some broker versions return an empty list of broker metadata
# if there are no topics created yet. the bootstrap process
# should detect this and keep a 'bootstrap' node alive until
# a non-bootstrap node is connected and non-empty broker
# metadata is available
elif 'bootstrap' in self._conns:
elif not nodes and self._can_bootstrap():
self._last_bootstrap = time.time()
return 'bootstrap'

return None
Expand Down Expand Up @@ -805,6 +785,9 @@ def _maybe_refresh_metadata(self):

if self._can_send_request(node_id):
topics = list(self._topics)
if not topics and node_id == 'bootstrap':
topics = list(self.config['bootstrap_topics_filter'])

if self.cluster.need_all_topic_metadata or not topics:
topics = [] if self.config['api_version'] < (0, 10) else None
api_version = 0 if self.config['api_version'] < (0, 10) else 1
Expand Down
24 changes: 23 additions & 1 deletion kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from kafka.vendor import six

from kafka import errors as Errors
from kafka.conn import collect_hosts, dns_lookup
from kafka.future import Future
from kafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition

Expand All @@ -29,10 +30,17 @@ class ClusterMetadata(object):
which we force a refresh of metadata even if we haven't seen any
partition leadership changes to proactively discover any new
brokers or partitions. Default: 300000
bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
strings) that the client should contact to bootstrap initial
cluster metadata. This does not have to be the full node list.
It just needs to have at least one broker that will respond to a
Metadata API Request. Default port is 9092. If no servers are
specified, will default to localhost:9092.
"""
DEFAULT_CONFIG = {
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
'bootstrap_servers': 'localhost',
}

def __init__(self, **configs):
Expand All @@ -42,7 +50,7 @@ def __init__(self, **configs):
self._groups = {} # group_name -> node_id
self._last_refresh_ms = 0
self._last_successful_refresh_ms = 0
self._need_update = False
self._need_update = True
self._future = None
self._listeners = set()
self._lock = threading.Lock()
Expand All @@ -56,6 +64,17 @@ def __init__(self, **configs):
if key in configs:
self.config[key] = configs[key]

self._bootstrap_brokers = self._generate_bootstrap_brokers()

def _generate_bootstrap_brokers(self):
# collect_hosts does not perform DNS, so we should be fine to re-use
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])

while True:
for host, port, afi in bootstrap_hosts:
for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None)

def brokers(self):
"""Get all BrokerMetadata

Expand All @@ -73,6 +92,9 @@ def broker_metadata(self, broker_id):
Returns:
BrokerMetadata or None if not found
"""
if broker_id == 'bootstrap':
return next(self._bootstrap_brokers)

return self._brokers.get(broker_id)

def partitions_for_topic(self, topic):
Expand Down
Loading