Skip to content

Commit 48acf1d

Browse files
committed
Improve error handling in client._maybe_connect
1 parent 1886cac commit 48acf1d

File tree

1 file changed

+23
-10
lines changed

1 file changed

+23
-10
lines changed

kafka/client_async.py

+23-10
Original file line numberDiff line numberDiff line change
@@ -399,13 +399,23 @@ def _should_recycle_connection(self, conn):
399399
return False
400400

401401
def _maybe_connect(self, node_id):
402-
"""Idempotent non-blocking connection attempt to the given node id."""
402+
"""Idempotent non-blocking connection attempt to the given node id.
403+
404+
Returns True if connection object exists and is connected / connecting
405+
"""
403406
with self._lock:
404407
conn = self._conns.get(node_id)
405408

409+
# Check if existing connection should be recreated because host/port changed
410+
if self._should_recycle_connection(conn):
411+
self._conns.pop(node_id).close()
412+
conn = None
413+
406414
if conn is None:
407415
broker = self.cluster.broker_metadata(node_id)
408-
assert broker, 'Broker id %s not in current metadata' % (node_id,)
416+
if broker is None:
417+
log.debug('Broker id %s not in current metadata', node_id)
418+
return False
409419

410420
log.debug("Initiating connection to node %s at %s:%s",
411421
node_id, broker.host, broker.port)
@@ -417,16 +427,11 @@ def _maybe_connect(self, node_id):
417427
**self.config)
418428
self._conns[node_id] = conn
419429

420-
# Check if existing connection should be recreated because host/port changed
421-
elif self._should_recycle_connection(conn):
422-
self._conns.pop(node_id)
423-
return False
424-
425430
elif conn.connected():
426431
return True
427432

428433
conn.connect()
429-
return conn.connected()
434+
return !conn.disconnected()
430435

431436
def ready(self, node_id, metadata_priority=True):
432437
"""Check whether a node is connected and ok to send more requests.
@@ -621,7 +626,10 @@ def poll(self, timeout_ms=None, future=None):
621626

622627
# Attempt to complete pending connections
623628
for node_id in list(self._connecting):
624-
self._maybe_connect(node_id)
629+
# False return means no more connection progress is possible
630+
# Connected nodes will update _connecting via state_change callback
631+
if not self._maybe_connect(node_id):
632+
self._connecting.remove(node_id)
625633

626634
# If we got a future that is already done, don't block in _poll
627635
if future is not None and future.is_done:
@@ -965,7 +973,12 @@ def check_version(self, node_id=None, timeout=None, strict=False):
965973
if try_node is None:
966974
self._lock.release()
967975
raise Errors.NoBrokersAvailable()
968-
self._maybe_connect(try_node)
976+
if not self._maybe_connect(try_node):
977+
if try_node == node_id:
978+
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
979+
else:
980+
continue
981+
969982
conn = self._conns[try_node]
970983

971984
# We will intentionally cause socket failures

0 commit comments

Comments
 (0)