@@ -55,7 +55,7 @@ class KafkaClient(object):
55
55
56
56
Keyword Arguments:
57
57
bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
58
- strings) that the consumer should contact to bootstrap initial
58
+ strings) that the client should contact to bootstrap initial
59
59
cluster metadata. This does not have to be the full node list.
60
60
It just needs to have at least one broker that will respond to a
61
61
Metadata API Request. Default port is 9092. If no servers are
@@ -222,76 +222,34 @@ def __init__(self, **configs):
222
222
self .config ['metric_group_prefix' ],
223
223
weakref .proxy (self ._conns ))
224
224
225
- self ._bootstrap (collect_hosts (self .config ['bootstrap_servers' ]))
225
+ self ._num_bootstrap_hosts = len (collect_hosts (self .config ['bootstrap_servers' ]))
226
226
227
227
# Check Broker Version if not set explicitly
228
228
if self .config ['api_version' ] is None :
229
229
check_timeout = self .config ['api_version_auto_timeout_ms' ] / 1000
230
230
self .config ['api_version' ] = self .check_version (timeout = check_timeout )
231
231
232
- def _bootstrap (self , hosts ):
233
- log .info ('Bootstrapping cluster metadata from %s' , hosts )
234
- # Exponential backoff if bootstrap fails
235
- backoff_ms = self .config ['reconnect_backoff_ms' ] * 2 ** self ._bootstrap_fails
232
+ def _can_bootstrap (self ):
233
+ effective_failures = self ._bootstrap_fails // self ._num_bootstrap_hosts
234
+ backoff_factor = 2 ** effective_failures
235
+ backoff_ms = min (self .config ['reconnect_backoff_ms' ] * backoff_factor ,
236
+ self .config ['reconnect_backoff_max_ms' ])
237
+
238
+ backoff_ms *= random .uniform (0.8 , 1.2 )
239
+
236
240
next_at = self ._last_bootstrap + backoff_ms / 1000.0
237
- self ._refresh_on_disconnects = False
238
241
now = time .time ()
239
242
if next_at > now :
240
- log .debug ("Sleeping %0.4f before bootstrapping again" , next_at - now )
241
- time .sleep (next_at - now )
242
- self ._last_bootstrap = time .time ()
243
-
244
- if self .config ['api_version' ] is None or self .config ['api_version' ] < (0 , 10 ):
245
- if self .config ['bootstrap_topics_filter' ]:
246
- metadata_request = MetadataRequest [0 ](list (self .config ['bootstrap_topics_filter' ]))
247
- else :
248
- metadata_request = MetadataRequest [0 ]([])
249
- else :
250
- if self .config ['bootstrap_topics_filter' ]:
251
- metadata_request = MetadataRequest [1 ](list (self .config ['bootstrap_topics_filter' ]))
252
- else :
253
- metadata_request = MetadataRequest [1 ](None )
254
-
255
- for host , port , afi in hosts :
256
- log .debug ("Attempting to bootstrap via node at %s:%s" , host , port )
257
- cb = functools .partial (WeakMethod (self ._conn_state_change ), 'bootstrap' )
258
- bootstrap = BrokerConnection (host , port , afi ,
259
- state_change_callback = cb ,
260
- node_id = 'bootstrap' ,
261
- ** self .config )
262
- if not bootstrap .connect_blocking ():
263
- bootstrap .close ()
264
- continue
265
- future = bootstrap .send (metadata_request )
266
- while not future .is_done :
267
- self ._selector .select (1 )
268
- for r , f in bootstrap .recv ():
269
- f .success (r )
270
- if future .failed ():
271
- bootstrap .close ()
272
- continue
273
- self .cluster .update_metadata (future .value )
274
- log .info ('Bootstrap succeeded: found %d brokers and %d topics.' ,
275
- len (self .cluster .brokers ()), len (self .cluster .topics ()))
276
-
277
- # A cluster with no topics can return no broker metadata
278
- # in that case, we should keep the bootstrap connection
279
- if not len (self .cluster .brokers ()):
280
- self ._conns ['bootstrap' ] = bootstrap
281
- else :
282
- bootstrap .close ()
283
- self ._bootstrap_fails = 0
284
- break
285
- # No bootstrap found...
286
- else :
287
- log .error ('Unable to bootstrap from %s' , hosts )
288
- # Max exponential backoff is 2^12, x4000 (50ms -> 200s)
289
- self ._bootstrap_fails = min (self ._bootstrap_fails + 1 , 12 )
290
- self ._refresh_on_disconnects = True
243
+ return False
244
+ return True
291
245
292
246
def _can_connect (self , node_id ):
293
247
if node_id not in self ._conns :
294
- if self .cluster .broker_metadata (node_id ):
248
+ # cluster.broker_metadata() is stateful when called w/ 'bootstrap'
249
+ # (it cycles through all of the bootstrap servers)
250
+ # so we short-circuit here and assume that we should always have
251
+ # some bootstrap_servers config to power bootstrap broker_metadata
252
+ if node_id == 'bootstrap' or self .cluster .broker_metadata (node_id ):
295
253
return True
296
254
return False
297
255
conn = self ._conns [node_id ]
@@ -308,6 +266,9 @@ def _conn_state_change(self, node_id, conn):
308
266
except KeyError :
309
267
self ._selector .modify (conn ._sock , selectors .EVENT_WRITE )
310
268
269
+ if node_id == 'bootstrap' :
270
+ self ._last_bootstrap = time .time ()
271
+
311
272
elif conn .connected ():
312
273
log .debug ("Node %s connected" , node_id )
313
274
if node_id in self ._connecting :
@@ -323,12 +284,12 @@ def _conn_state_change(self, node_id, conn):
323
284
324
285
self ._idle_expiry_manager .update (node_id )
325
286
326
- if 'bootstrap' in self ._conns and node_id != 'bootstrap' :
287
+ if node_id == 'bootstrap' :
288
+ self ._bootstrap_fails = 0
289
+
290
+ elif 'bootstrap' in self ._conns :
327
291
bootstrap = self ._conns .pop ('bootstrap' )
328
- # XXX: make conn.close() require error to cause refresh
329
- self ._refresh_on_disconnects = False
330
292
bootstrap .close ()
331
- self ._refresh_on_disconnects = True
332
293
333
294
# Connection failures imply that our metadata is stale, so let's refresh
334
295
elif conn .state is ConnectionStates .DISCONNECTING :
@@ -347,7 +308,10 @@ def _conn_state_change(self, node_id, conn):
347
308
idle_disconnect = True
348
309
self ._idle_expiry_manager .remove (node_id )
349
310
350
- if self ._refresh_on_disconnects and not self ._closed and not idle_disconnect :
311
+ if node_id == 'bootstrap' :
312
+ self ._bootstrap_fails += 1
313
+
314
+ elif self ._refresh_on_disconnects and not self ._closed and not idle_disconnect :
351
315
log .warning ("Node %s connection failed -- refreshing metadata" , node_id )
352
316
self .cluster .request_update ()
353
317
@@ -362,13 +326,40 @@ def maybe_connect(self, node_id):
362
326
return True
363
327
return False
364
328
329
+ def _should_recycle_connection (self , conn ):
330
+ # Never recycle unless disconnected
331
+ if not conn .disconnected ():
332
+ return False
333
+
334
+ # Always recycled disconnected bootstraps
335
+ elif conn .node_id == 'bootstrap' :
336
+ return True
337
+
338
+ # Otherwise, only recycle when broker metadata has changed
339
+ broker = self .cluster .broker_metadata (conn .node_id )
340
+ if broker is None :
341
+ return False
342
+
343
+ host , _ , afi = get_ip_port_afi (broker .host )
344
+ if conn .host != host or conn .port != broker .port :
345
+ log .info ("Broker metadata change detected for node %s"
346
+ " from %s:%s to %s:%s" , conn .node_id , conn .host , conn .port ,
347
+ broker .host , broker .port )
348
+ return True
349
+
350
+ return False
351
+
365
352
def _maybe_connect (self , node_id ):
366
353
"""Idempotent non-blocking connection attempt to the given node id."""
367
354
with self ._lock :
368
- broker = self .cluster .broker_metadata (node_id )
369
355
conn = self ._conns .get (node_id )
370
356
371
357
if conn is None :
358
+ # Note that when bootstrapping, each call to broker_metadata may
359
+ # return a different host/port. So we need to be careful to only
360
+ # call when necessary to avoid skipping some possible bootstrap
361
+ # source.
362
+ broker = self .cluster .broker_metadata (node_id )
372
363
assert broker , 'Broker id %s not in current metadata' % (node_id ,)
373
364
374
365
log .debug ("Initiating connection to node %s at %s:%s" ,
@@ -382,17 +373,9 @@ def _maybe_connect(self, node_id):
382
373
self ._conns [node_id ] = conn
383
374
384
375
# Check if existing connection should be recreated because host/port changed
385
- elif conn .disconnected () and broker is not None :
386
- host , _ , __ = get_ip_port_afi (broker .host )
387
- if conn .host != host or conn .port != broker .port :
388
- log .info ("Broker metadata change detected for node %s"
389
- " from %s:%s to %s:%s" , node_id , conn .host , conn .port ,
390
- broker .host , broker .port )
391
-
392
- # Drop old connection object.
393
- # It will be recreated on next _maybe_connect
394
- self ._conns .pop (node_id )
395
- return False
376
+ elif self ._should_recycle_connection (conn ):
377
+ self ._conns .pop (node_id )
378
+ return False
396
379
397
380
elif conn .connected ():
398
381
return True
@@ -713,7 +696,8 @@ def least_loaded_node(self):
713
696
This method will prefer a node with an existing connection and no
714
697
in-flight-requests. If no such node is found, a node will be chosen
715
698
randomly from disconnected nodes that are not "blacked out" (i.e.,
716
- are not subject to a reconnect backoff).
699
+ are not subject to a reconnect backoff). If no node metadata has been
700
+ obtained, will return 'bootstrap' (subject to exponential backoff).
717
701
718
702
Returns:
719
703
node_id or None if no suitable node was found
@@ -740,12 +724,8 @@ def least_loaded_node(self):
740
724
if found is not None :
741
725
return found
742
726
743
- # some broker versions return an empty list of broker metadata
744
- # if there are no topics created yet. the bootstrap process
745
- # should detect this and keep a 'bootstrap' node alive until
746
- # a non-bootstrap node is connected and non-empty broker
747
- # metadata is available
748
- elif 'bootstrap' in self ._conns :
727
+ elif not nodes and self ._can_bootstrap ():
728
+ self ._last_bootstrap = time .time ()
749
729
return 'bootstrap'
750
730
751
731
return None
@@ -805,6 +785,9 @@ def _maybe_refresh_metadata(self):
805
785
806
786
if self ._can_send_request (node_id ):
807
787
topics = list (self ._topics )
788
+ if not topics and node_id == 'bootstrap' :
789
+ topics = list (self .config ['bootstrap_topics_filter' ])
790
+
808
791
if self .cluster .need_all_topic_metadata or not topics :
809
792
topics = [] if self .config ['api_version' ] < (0 , 10 ) else None
810
793
api_version = 0 if self .config ['api_version' ] < (0 , 10 ) else 1
0 commit comments