Skip to content

Commit 43a8b74

Browse files
committed
Client connection / maybe_refresh_metadata changes (#2507)
1 parent 4cd3520 commit 43a8b74

File tree

2 files changed

+49
-36
lines changed

2 files changed

+49
-36
lines changed

kafka/client_async.py

+37-23
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ def _should_recycle_connection(self, conn):
363363

364364
return False
365365

366-
def _maybe_connect(self, node_id):
366+
def _init_connect(self, node_id):
367367
"""Idempotent non-blocking connection attempt to the given node id.
368368
369369
Returns True if connection object exists and is connected / connecting
@@ -392,10 +392,8 @@ def _maybe_connect(self, node_id):
392392
**self.config)
393393
self._conns[node_id] = conn
394394

395-
elif conn.connected():
396-
return True
397-
398-
conn.connect()
395+
if conn.disconnected():
396+
conn.connect()
399397
return not conn.disconnected()
400398

401399
def ready(self, node_id, metadata_priority=True):
@@ -580,15 +578,18 @@ def poll(self, timeout_ms=None, future=None):
580578
if self._closed:
581579
break
582580

583-
# Send a metadata request if needed (or initiate new connection)
584-
metadata_timeout_ms = self._maybe_refresh_metadata()
585-
586581
# Attempt to complete pending connections
587582
for node_id in list(self._connecting):
588583
# False return means no more connection progress is possible
589584
# Connected nodes will update _connecting via state_change callback
590-
if not self._maybe_connect(node_id):
591-
self._connecting.remove(node_id)
585+
if not self._init_connect(node_id):
586+
# It's possible that the connection attempt triggered a state change
587+
# but if not, make sure to remove from _connecting list
588+
if node_id in self._connecting:
589+
self._connecting.remove(node_id)
590+
591+
# Send a metadata request if needed (or initiate new connection)
592+
metadata_timeout_ms = self._maybe_refresh_metadata()
592593

593594
# If we got a future that is already done, don't block in _poll
594595
if future is not None and future.is_done:
@@ -638,6 +639,8 @@ def _poll(self, timeout):
638639
self._register_send_sockets()
639640

640641
start_select = time.time()
642+
if timeout == float('inf'):
643+
timeout = None
641644
ready = self._selector.select(timeout)
642645
end_select = time.time()
643646
if self._sensors:
@@ -850,6 +853,26 @@ def _maybe_refresh_metadata(self, wakeup=False):
850853
log.debug("Give up sending metadata request since no node is available. (reconnect delay %d ms)", next_connect_ms)
851854
return next_connect_ms
852855

856+
if not self._can_send_request(node_id):
857+
# If there's any connection establishment underway, wait until it completes. This prevents
858+
# the client from unnecessarily connecting to additional nodes while a previous connection
859+
# attempt has not been completed.
860+
if self._connecting:
861+
return float('inf')
862+
863+
elif self._can_connect(node_id):
864+
log.debug("Initializing connection to node %s for metadata request", node_id)
865+
self._connecting.add(node_id)
866+
if not self._init_connect(node_id):
867+
if node_id in self._connecting:
868+
self._connecting.remove(node_id)
869+
# Connection attempt failed immediately, need to retry with a different node
870+
return self.config['reconnect_backoff_ms']
871+
else:
872+
# Existing connection with max in flight requests. Wait for request to complete.
873+
return self.config['request_timeout_ms']
874+
875+
# Recheck node_id in case we were able to connect immediately above
853876
if self._can_send_request(node_id):
854877
topics = list(self._topics)
855878
if not topics and self.cluster.is_bootstrap(node_id):
@@ -871,20 +894,11 @@ def refresh_done(val_or_error):
871894
future.add_errback(refresh_done)
872895
return self.config['request_timeout_ms']
873896

874-
# If there's any connection establishment underway, wait until it completes. This prevents
875-
# the client from unnecessarily connecting to additional nodes while a previous connection
876-
# attempt has not been completed.
897+
# Should only get here if still connecting
877898
if self._connecting:
878899
return float('inf')
879-
880-
if self.maybe_connect(node_id, wakeup=wakeup):
881-
log.debug("Initializing connection to node %s for metadata request", node_id)
882-
return float('inf')
883-
884-
# connected but can't send more, OR connecting
885-
# In either case we just need to wait for a network event
886-
# to let us know the selected connection might be usable again.
887-
return float('inf')
900+
else:
901+
return self.config['reconnect_backoff_ms']
888902

889903
def get_api_versions(self):
890904
"""Return the ApiVersions map, if available.
@@ -927,7 +941,7 @@ def check_version(self, node_id=None, timeout=None, strict=False):
927941
if try_node is None:
928942
self._lock.release()
929943
raise Errors.NoBrokersAvailable()
930-
if not self._maybe_connect(try_node):
944+
if not self._init_connect(try_node):
931945
if try_node == node_id:
932946
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
933947
else:

test/test_client_async.py

+12-13
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def test_can_connect(cli, conn):
5858
assert cli._can_connect(0)
5959

6060
# Node is connected, can't reconnect
61-
assert cli._maybe_connect(0) is True
61+
assert cli._init_connect(0) is True
6262
assert not cli._can_connect(0)
6363

6464
# Node is disconnected, can connect
@@ -70,15 +70,15 @@ def test_can_connect(cli, conn):
7070
assert not cli._can_connect(0)
7171

7272

73-
def test_maybe_connect(cli, conn):
73+
def test_init_connect(cli, conn):
7474
# Node not in metadata, return False
75-
assert not cli._maybe_connect(2)
75+
assert not cli._init_connect(2)
7676

7777
# New node_id creates a conn object
7878
assert 0 not in cli._conns
7979
conn.state = ConnectionStates.DISCONNECTED
8080
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING)
81-
assert cli._maybe_connect(0) is True
81+
assert cli._init_connect(0) is True
8282
assert cli._conns[0] is conn
8383

8484

@@ -122,8 +122,8 @@ def test_ready(mocker, cli, conn):
122122

123123

124124
def test_is_ready(mocker, cli, conn):
125-
cli._maybe_connect(0)
126-
cli._maybe_connect(1)
125+
cli._init_connect(0)
126+
cli._init_connect(1)
127127

128128
# metadata refresh blocks ready nodes
129129
assert cli.is_ready(0)
@@ -166,14 +166,14 @@ def test_close(mocker, cli, conn):
166166
assert conn.close.call_count == call_count
167167

168168
# Single node close
169-
cli._maybe_connect(0)
169+
cli._init_connect(0)
170170
assert conn.close.call_count == call_count
171171
cli.close(0)
172172
call_count += 1
173173
assert conn.close.call_count == call_count
174174

175175
# All node close
176-
cli._maybe_connect(1)
176+
cli._init_connect(1)
177177
cli.close()
178178
# +2 close: node 1, node bootstrap (node 0 already closed)
179179
call_count += 2
@@ -185,7 +185,7 @@ def test_is_disconnected(cli, conn):
185185
conn.state = ConnectionStates.DISCONNECTED
186186
assert not cli.is_disconnected(0)
187187

188-
cli._maybe_connect(0)
188+
cli._init_connect(0)
189189
assert cli.is_disconnected(0)
190190

191191
conn.state = ConnectionStates.CONNECTING
@@ -210,7 +210,7 @@ def test_send(cli, conn):
210210
assert isinstance(f.exception, Errors.NodeNotReadyError)
211211

212212
conn.state = ConnectionStates.CONNECTED
213-
cli._maybe_connect(0)
213+
cli._init_connect(0)
214214
# ProduceRequest w/ 0 required_acks -> no response
215215
request = ProduceRequest[0](0, 0, [])
216216
assert request.expect_response() is False
@@ -339,8 +339,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
339339
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
340340
mocker.patch.object(client, '_can_send_request', return_value=False)
341341
mocker.patch.object(client, '_can_connect', return_value=True)
342-
mocker.patch.object(client, '_maybe_connect', return_value=True)
343-
mocker.patch.object(client, 'maybe_connect', return_value=True)
342+
mocker.patch.object(client, '_init_connect', return_value=True)
344343

345344
now = time.time()
346345
t = mocker.patch('time.time')
@@ -349,7 +348,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
349348
# first poll attempts connection
350349
client.poll(timeout_ms=12345678)
351350
client._poll.assert_called_with(12345.678)
352-
client.maybe_connect.assert_called_once_with('foobar', wakeup=False)
351+
client._init_connect.assert_called_once_with('foobar')
353352

354353
# poll while connecting should not attempt a new connection
355354
client._connecting.add('foobar')

0 commit comments

Comments
 (0)