Skip to content

Commit d39fdd7

Browse files
committed
Avoid multiple connection attempts when refreshing metadata
1 parent 998147d commit d39fdd7

File tree

2 files changed

+73
-58
lines changed

2 files changed

+73
-58
lines changed

kafka/client_async.py

Lines changed: 48 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,6 @@ def __init__(self, **configs):
183183
self.cluster = ClusterMetadata(**self.config)
184184
self._topics = set() # empty set will fetch all topic metadata
185185
self._metadata_refresh_in_progress = False
186-
self._last_no_node_available_ms = 0
187186
self._selector = self.config['selector']()
188187
self._conns = {}
189188
self._connecting = set()
@@ -709,50 +708,55 @@ def _maybe_refresh_metadata(self):
709708
int: milliseconds until next refresh
710709
"""
711710
ttl = self.cluster.ttl()
712-
next_reconnect_ms = self._last_no_node_available_ms + self.cluster.refresh_backoff()
713-
next_reconnect_ms = max(next_reconnect_ms - time.time() * 1000, 0)
714-
wait_for_in_progress_ms = 9999999999 if self._metadata_refresh_in_progress else 0
715-
timeout = max(ttl, next_reconnect_ms, wait_for_in_progress_ms)
716-
717-
if timeout == 0:
718-
node_id = self.least_loaded_node()
719-
if node_id is None:
720-
log.debug("Give up sending metadata request since no node is available")
721-
# mark the timestamp for no node available to connect
722-
self._last_no_node_available_ms = time.time() * 1000
723-
return timeout
724-
725-
if self._can_send_request(node_id):
726-
topics = list(self._topics)
727-
if self.cluster.need_all_topic_metadata or not topics:
728-
topics = [] if self.config['api_version'] < (0, 10) else None
729-
api_version = 0 if self.config['api_version'] < (0, 10) else 1
730-
request = MetadataRequest[api_version](topics)
731-
log.debug("Sending metadata request %s to node %s", request, node_id)
732-
future = self.send(node_id, request)
733-
future.add_callback(self.cluster.update_metadata)
734-
future.add_errback(self.cluster.failed_update)
735-
736-
self._metadata_refresh_in_progress = True
737-
def refresh_done(val_or_error):
738-
self._metadata_refresh_in_progress = False
739-
future.add_callback(refresh_done)
740-
future.add_errback(refresh_done)
741-
742-
elif self._can_connect(node_id):
743-
log.debug("Initializing connection to node %s for metadata request", node_id)
744-
self._maybe_connect(node_id)
745-
# If _maybe_connect failed immediately, this node will be put into blackout and we
746-
# should allow immediately retrying in case there is another candidate node. If it
747-
# is still connecting, the worst case is that we end up setting a longer timeout
748-
# on the next round and then wait for the response.
749-
else:
750-
# connected, but can't send more OR connecting
751-
# In either case, we just need to wait for a network event to let us know the selected
752-
# connection might be usable again.
753-
self._last_no_node_available_ms = time.time() * 1000
711+
wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0
712+
metadata_timeout = max(ttl, wait_for_in_progress_ms)
754713

755-
return timeout
714+
if metadata_timeout > 0:
715+
return metadata_timeout
716+
717+
# Beware that the behavior of this method and the computation of
718+
# timeouts for poll() are highly dependent on the behavior of
719+
# least_loaded_node()
720+
node_id = self.least_loaded_node()
721+
if node_id is None:
722+
log.debug("Give up sending metadata request since no node is available");
723+
return self.config['reconnect_backoff_ms']
724+
725+
if self._can_send_request(node_id):
726+
topics = list(self._topics)
727+
if self.cluster.need_all_topic_metadata or not topics:
728+
topics = [] if self.config['api_version'] < (0, 10) else None
729+
api_version = 0 if self.config['api_version'] < (0, 10) else 1
730+
request = MetadataRequest[api_version](topics)
731+
log.debug("Sending metadata request %s to node %s", request, node_id)
732+
future = self.send(node_id, request)
733+
future.add_callback(self.cluster.update_metadata)
734+
future.add_errback(self.cluster.failed_update)
735+
736+
self._metadata_refresh_in_progress = True
737+
def refresh_done(val_or_error):
738+
self._metadata_refresh_in_progress = False
739+
future.add_callback(refresh_done)
740+
future.add_errback(refresh_done)
741+
return self.config['request_timeout_ms']
742+
743+
# If there's any connection establishment underway, wait until it completes. This prevents
744+
# the client from unnecessarily connecting to additional nodes while a previous connection
745+
# attempt has not been completed.
746+
if self._connecting:
747+
# Strictly the timeout we should return here is "connect timeout", but as we don't
748+
# have such application level configuration, using request timeout instead.
749+
return self.config['request_timeout_ms']
750+
751+
if self._can_connect(node_id):
752+
log.debug("Initializing connection to node %s for metadata request", node_id)
753+
self._maybe_connect(node_id)
754+
return self.config['reconnect_backoff_ms']
755+
756+
# connected but can't send more, OR connecting
757+
# In either case we just need to wait for a network event
758+
# to let us know the selected connection might be usable again.
759+
return float('inf')
756760

757761
def schedule(self, task, at):
758762
"""Schedule a new task to be executed at the given time.

test/test_client_async.py

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ def client(mocker):
319319
mocker.patch.object(KafkaClient, '_bootstrap')
320320
_poll = mocker.patch.object(KafkaClient, '_poll')
321321

322-
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222, api_version=(0, 9))
322+
cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))
323323

324324
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
325325
tasks.return_value = 9999999
@@ -332,49 +332,60 @@ def client(mocker):
332332
def test_maybe_refresh_metadata_ttl(mocker, client):
333333
client.cluster.ttl.return_value = 1234
334334

335-
client.poll(timeout_ms=9999999, sleep=True)
335+
client.poll(timeout_ms=12345678, sleep=True)
336336
client._poll.assert_called_with(1.234, sleep=True)
337337

338338

339339
def test_maybe_refresh_metadata_backoff(mocker, client):
340340
now = time.time()
341341
t = mocker.patch('time.time')
342342
t.return_value = now
343-
client._last_no_node_available_ms = now * 1000
344343

345-
client.poll(timeout_ms=9999999, sleep=True)
346-
client._poll.assert_called_with(2.222, sleep=True)
344+
client.poll(timeout_ms=12345678, sleep=True)
345+
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
347346

348347

349348
def test_maybe_refresh_metadata_in_progress(mocker, client):
350349
client._metadata_refresh_in_progress = True
351350

352-
client.poll(timeout_ms=9999999, sleep=True)
353-
client._poll.assert_called_with(9999.999, sleep=True)
351+
client.poll(timeout_ms=12345678, sleep=True)
352+
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
354353

355354

356355
def test_maybe_refresh_metadata_update(mocker, client):
357356
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
358357
mocker.patch.object(client, '_can_send_request', return_value=True)
359358
send = mocker.patch.object(client, 'send')
360359

361-
client.poll(timeout_ms=9999999, sleep=True)
362-
client._poll.assert_called_with(0, sleep=True)
360+
client.poll(timeout_ms=12345678, sleep=True)
361+
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
363362
assert client._metadata_refresh_in_progress
364363
request = MetadataRequest[0]([])
365-
send.assert_called_with('foobar', request)
364+
send.assert_called_once_with('foobar', request)
366365

367366

368-
def test_maybe_refresh_metadata_failure(mocker, client):
367+
def test_maybe_refresh_metadata_cant_send(mocker, client):
369368
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
369+
mocker.patch.object(client, '_can_connect', return_value=True)
370+
mocker.patch.object(client, '_maybe_connect', return_value=True)
370371

371372
now = time.time()
372373
t = mocker.patch('time.time')
373374
t.return_value = now
374375

375-
client.poll(timeout_ms=9999999, sleep=True)
376-
client._poll.assert_called_with(0, sleep=True)
377-
assert client._last_no_node_available_ms == now * 1000
376+
# first poll attempts connection
377+
client.poll(timeout_ms=12345678, sleep=True)
378+
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
379+
client._can_connect.assert_called_once_with('foobar')
380+
client._maybe_connect.assert_called_once_with('foobar')
381+
382+
# poll while connecting should not attempt a new connection
383+
client._connecting.add('foobar')
384+
client._can_connect.reset_mock()
385+
client.poll(timeout_ms=12345678, sleep=True)
386+
client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout)
387+
assert not client._can_connect.called
388+
378389
assert not client._metadata_refresh_in_progress
379390

380391

0 commit comments

Comments
 (0)