Skip to content

Avoid multiple connection attempts when refreshing metadata #1067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 48 additions & 44 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ def __init__(self, **configs):
self.cluster = ClusterMetadata(**self.config)
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
self._last_no_node_available_ms = 0
self._selector = self.config['selector']()
self._conns = {}
self._connecting = set()
Expand Down Expand Up @@ -709,50 +708,55 @@ def _maybe_refresh_metadata(self):
int: milliseconds until next refresh
"""
ttl = self.cluster.ttl()
next_reconnect_ms = self._last_no_node_available_ms + self.cluster.refresh_backoff()
next_reconnect_ms = max(next_reconnect_ms - time.time() * 1000, 0)
wait_for_in_progress_ms = 9999999999 if self._metadata_refresh_in_progress else 0
timeout = max(ttl, next_reconnect_ms, wait_for_in_progress_ms)

if timeout == 0:
node_id = self.least_loaded_node()
if node_id is None:
log.debug("Give up sending metadata request since no node is available")
# mark the timestamp for no node available to connect
self._last_no_node_available_ms = time.time() * 1000
return timeout

if self._can_send_request(node_id):
topics = list(self._topics)
if self.cluster.need_all_topic_metadata or not topics:
topics = [] if self.config['api_version'] < (0, 10) else None
api_version = 0 if self.config['api_version'] < (0, 10) else 1
request = MetadataRequest[api_version](topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request)
future.add_callback(self.cluster.update_metadata)
future.add_errback(self.cluster.failed_update)

self._metadata_refresh_in_progress = True
def refresh_done(val_or_error):
self._metadata_refresh_in_progress = False
future.add_callback(refresh_done)
future.add_errback(refresh_done)

elif self._can_connect(node_id):
log.debug("Initializing connection to node %s for metadata request", node_id)
self._maybe_connect(node_id)
# If _maybe_connect failed immediately, this node will be put into blackout and we
# should allow immediately retrying in case there is another candidate node. If it
# is still connecting, the worst case is that we end up setting a longer timeout
# on the next round and then wait for the response.
else:
# connected, but can't send more OR connecting
# In either case, we just need to wait for a network event to let us know the selected
# connection might be usable again.
self._last_no_node_available_ms = time.time() * 1000
wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0
metadata_timeout = max(ttl, wait_for_in_progress_ms)

return timeout
if metadata_timeout > 0:
return metadata_timeout

# Beware that the behavior of this method and the computation of
# timeouts for poll() are highly dependent on the behavior of
# least_loaded_node()
node_id = self.least_loaded_node()
if node_id is None:
log.debug("Give up sending metadata request since no node is available");
return self.config['reconnect_backoff_ms']

if self._can_send_request(node_id):
topics = list(self._topics)
if self.cluster.need_all_topic_metadata or not topics:
topics = [] if self.config['api_version'] < (0, 10) else None
api_version = 0 if self.config['api_version'] < (0, 10) else 1
request = MetadataRequest[api_version](topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request)
future.add_callback(self.cluster.update_metadata)
future.add_errback(self.cluster.failed_update)

self._metadata_refresh_in_progress = True
def refresh_done(val_or_error):
self._metadata_refresh_in_progress = False
future.add_callback(refresh_done)
future.add_errback(refresh_done)
return self.config['request_timeout_ms']

# If there's any connection establishment underway, wait until it completes. This prevents
# the client from unnecessarily connecting to additional nodes while a previous connection
# attempt has not been completed.
if self._connecting:
# Strictly the timeout we should return here is "connect timeout", but as we don't
# have such application level configuration, using request timeout instead.
return self.config['request_timeout_ms']

if self._can_connect(node_id):
log.debug("Initializing connection to node %s for metadata request", node_id)
self._maybe_connect(node_id)
return self.config['reconnect_backoff_ms']

# connected but can't send more, OR connecting
# In either case we just need to wait for a network event
# to let us know the selected connection might be usable again.
return float('inf')

def schedule(self, task, at):
"""Schedule a new task to be executed at the given time.
Expand Down
39 changes: 25 additions & 14 deletions test/test_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def client(mocker):
mocker.patch.object(KafkaClient, '_bootstrap')
_poll = mocker.patch.object(KafkaClient, '_poll')

cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222, api_version=(0, 9))
cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))

tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
tasks.return_value = 9999999
Expand All @@ -332,49 +332,60 @@ def client(mocker):
def test_maybe_refresh_metadata_ttl(mocker, client):
client.cluster.ttl.return_value = 1234

client.poll(timeout_ms=9999999, sleep=True)
client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(1.234, sleep=True)


def test_maybe_refresh_metadata_backoff(mocker, client):
now = time.time()
t = mocker.patch('time.time')
t.return_value = now
client._last_no_node_available_ms = now * 1000

client.poll(timeout_ms=9999999, sleep=True)
client._poll.assert_called_with(2.222, sleep=True)
client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff


def test_maybe_refresh_metadata_in_progress(mocker, client):
client._metadata_refresh_in_progress = True

client.poll(timeout_ms=9999999, sleep=True)
client._poll.assert_called_with(9999.999, sleep=True)
client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms


def test_maybe_refresh_metadata_update(mocker, client):
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
mocker.patch.object(client, '_can_send_request', return_value=True)
send = mocker.patch.object(client, 'send')

client.poll(timeout_ms=9999999, sleep=True)
client._poll.assert_called_with(0, sleep=True)
client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
assert client._metadata_refresh_in_progress
request = MetadataRequest[0]([])
send.assert_called_with('foobar', request)
send.assert_called_once_with('foobar', request)


def test_maybe_refresh_metadata_failure(mocker, client):
def test_maybe_refresh_metadata_cant_send(mocker, client):
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
mocker.patch.object(client, '_can_connect', return_value=True)
mocker.patch.object(client, '_maybe_connect', return_value=True)

now = time.time()
t = mocker.patch('time.time')
t.return_value = now

client.poll(timeout_ms=9999999, sleep=True)
client._poll.assert_called_with(0, sleep=True)
assert client._last_no_node_available_ms == now * 1000
# first poll attempts connection
client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
client._can_connect.assert_called_once_with('foobar')
client._maybe_connect.assert_called_once_with('foobar')

# poll while connecting should not attempt a new connection
client._connecting.add('foobar')
client._can_connect.reset_mock()
client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout)
assert not client._can_connect.called

assert not client._metadata_refresh_in_progress


Expand Down