Skip to content

Commit 79d88e5

Browse files
committed
Default client.check_version timeout to api_version_auto_timeout_ms (#2496)
1 parent aa78be3 commit 79d88e5

File tree

2 files changed

+11
-8
lines changed

2 files changed

+11
-8
lines changed

kafka/admin/client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ def _refresh_controller_id(self, timeout_ms=30000):
294294
time.sleep(1)
295295
continue
296296
# verify the controller is new enough to support our requests
297-
controller_version = self._client.check_version(node_id=controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
297+
controller_version = self._client.check_version(node_id=controller_id)
298298
if controller_version < (0, 10, 0):
299299
raise IncompatibleBrokerVersion(
300300
"The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."

kafka/client_async.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,7 @@ def __init__(self, **configs):
237237

238238
# Check Broker Version if not set explicitly
239239
if self.config['api_version'] is None:
240-
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
241-
self.config['api_version'] = self.check_version(timeout=check_timeout)
240+
self.config['api_version'] = self.check_version()
242241

243242
def _init_wakeup_socketpair(self):
244243
self._wake_r, self._wake_w = socket.socketpair()
@@ -890,13 +889,16 @@ def get_api_versions(self):
890889
"""
891890
return self._api_versions
892891

893-
def check_version(self, node_id=None, timeout=2, strict=False):
892+
def check_version(self, node_id=None, timeout=None, strict=False):
894893
"""Attempt to guess the version of a Kafka broker.
895894
896-
Note: It is possible that this method blocks longer than the
897-
specified timeout. This can happen if the entire cluster
898-
is down and the client enters a bootstrap backoff sleep.
899-
This is only possible if node_id is None.
895+
Keyword Arguments:
896+
node_id (str, optional): Broker node id from cluster metadata. If None, attempts
897+
to connect to any available broker until version is identified.
898+
Default: None
899+
timeout (num, optional): Maximum time in seconds to try to check broker version.
900+
If unable to identify version before timeout, raise error (see below).
901+
Default: api_version_auto_timeout_ms / 1000
900902
901903
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
902904
@@ -906,6 +908,7 @@ def check_version(self, node_id=None, timeout=2, strict=False):
906908
UnrecognizedBrokerVersion: please file bug if seen!
907909
AssertionError (if strict=True): please file bug if seen!
908910
"""
911+
timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000)
909912
self._lock.acquire()
910913
end = time.time() + timeout
911914
while time.time() < end:

0 commit comments

Comments
 (0)