Skip to content

Commit 3cf418a

Browse files
authored
Improve error handling in client._maybe_connect (#2504)
1 parent 89c97e1 commit 3cf418a

File tree

2 files changed

+26
-18
lines changed

2 files changed

+26
-18
lines changed

kafka/client_async.py

+23-10
Original file line numberDiff line numberDiff line change
@@ -399,13 +399,23 @@ def _should_recycle_connection(self, conn):
399399
return False
400400

401401
def _maybe_connect(self, node_id):
402-
"""Idempotent non-blocking connection attempt to the given node id."""
402+
"""Idempotent non-blocking connection attempt to the given node id.
403+
404+
Returns True if connection object exists and is connected / connecting
405+
"""
403406
with self._lock:
404407
conn = self._conns.get(node_id)
405408

409+
# Check if existing connection should be recreated because host/port changed
410+
if conn is not None and self._should_recycle_connection(conn):
411+
self._conns.pop(node_id).close()
412+
conn = None
413+
406414
if conn is None:
407415
broker = self.cluster.broker_metadata(node_id)
408-
assert broker, 'Broker id %s not in current metadata' % (node_id,)
416+
if broker is None:
417+
log.debug('Broker id %s not in current metadata', node_id)
418+
return False
409419

410420
log.debug("Initiating connection to node %s at %s:%s",
411421
node_id, broker.host, broker.port)
@@ -417,16 +427,11 @@ def _maybe_connect(self, node_id):
417427
**self.config)
418428
self._conns[node_id] = conn
419429

420-
# Check if existing connection should be recreated because host/port changed
421-
elif self._should_recycle_connection(conn):
422-
self._conns.pop(node_id)
423-
return False
424-
425430
elif conn.connected():
426431
return True
427432

428433
conn.connect()
429-
return conn.connected()
434+
return not conn.disconnected()
430435

431436
def ready(self, node_id, metadata_priority=True):
432437
"""Check whether a node is connected and ok to send more requests.
@@ -621,7 +626,10 @@ def poll(self, timeout_ms=None, future=None):
621626

622627
# Attempt to complete pending connections
623628
for node_id in list(self._connecting):
624-
self._maybe_connect(node_id)
629+
# False return means no more connection progress is possible
630+
# Connected nodes will update _connecting via state_change callback
631+
if not self._maybe_connect(node_id):
632+
self._connecting.remove(node_id)
625633

626634
# If we got a future that is already done, don't block in _poll
627635
if future is not None and future.is_done:
@@ -965,7 +973,12 @@ def check_version(self, node_id=None, timeout=None, strict=False):
965973
if try_node is None:
966974
self._lock.release()
967975
raise Errors.NoBrokersAvailable()
968-
self._maybe_connect(try_node)
976+
if not self._maybe_connect(try_node):
977+
if try_node == node_id:
978+
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
979+
else:
980+
continue
981+
969982
conn = self._conns[try_node]
970983

971984
# We will intentionally cause socket failures

test/test_client_async.py

+3-8
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,14 @@ def test_can_connect(cli, conn):
7171

7272

7373
def test_maybe_connect(cli, conn):
74-
try:
75-
# Node not in metadata, raises AssertionError
76-
cli._maybe_connect(2)
77-
except AssertionError:
78-
pass
79-
else:
80-
assert False, 'Exception not raised'
74+
# Node not in metadata, return False
75+
assert not cli._maybe_connect(2)
8176

8277
# New node_id creates a conn object
8378
assert 0 not in cli._conns
8479
conn.state = ConnectionStates.DISCONNECTED
8580
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING)
86-
assert cli._maybe_connect(0) is False
81+
assert cli._maybe_connect(0) is True
8782
assert cli._conns[0] is conn
8883

8984

0 commit comments

Comments
 (0)