Closed
Description
Traceback of an error when we tried upgrading to 2.1.0
Scenario, we try to connect to a kafka broker that's just starting up (so failures and retries are expected)
/opt/hostedtoolcache/Python/3.11.11/x64/lib/python3.11/site-packages/kafka/consumer/group.py:379: in __init__
self._client = self.config['kafka_client'](metrics=self._metrics, **self.config)
/opt/hostedtoolcache/Python/3.11.11/x64/lib/python3.11/site-packages/kafka/client_async.py:262: in __init__
self.config['api_version'] = self.check_version()
/opt/hostedtoolcache/Python/3.11.11/x64/lib/python3.11/site-packages/kafka/client_async.py:1052: in check_version
self.poll(timeout_ms=timeout_ms)
/opt/hostedtoolcache/Python/3.11.11/x64/lib/python3.11/site-packages/kafka/client_async.py:683: in poll
self._poll(timeout / 1000)
/opt/hostedtoolcache/Python/3.11.11/x64/lib/python3.11/site-packages/kafka/client_async.py:740: in _poll
conn.connect()
/opt/hostedtoolcache/Python/3.11.11/x64/lib/python3.11/site-packages/kafka/conn.py:436: in connect
if self._try_api_versions_check():
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <kafka.conn.BrokerConnection object at 0x7f40eb315790>
def _try_api_versions_check(self):
if self._api_versions_future is None:
if self.config['api_version'] is not None:
self._api_version = self.config['api_version']
self._api_versions = BROKER_API_VERSIONS[self._api_version]
log.debug('%s: Using pre-configured api_version %s for ApiVersions', self, self._api_version)
return True
elif self._check_version_idx is None:
request = ApiVersionsRequest[self._api_versions_idx]()
future = Future()
response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8))
response.add_callback(self._handle_api_versions_response, future)
response.add_errback(self._handle_api_versions_failure, future)
self._api_versions_future = future
self.state = ConnectionStates.API_VERSIONS_RECV
self.config['state_change_callback'](self.node_id, self._sock, self)
elif self._check_version_idx < len(self.VERSION_CHECKS):
version, request = self.VERSION_CHECKS[self._check_version_idx]
future = Future()
response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8))
response.add_callback(self._handle_check_version_response, future, version)
response.add_errback(self._handle_check_version_failure, future)
self._api_versions_future = future
self.state = ConnectionStates.API_VERSIONS_RECV
self.config['state_change_callback'](self.node_id, self._sock, self)
else:
> raise 'Unable to determine broker version.'
E TypeError: exceptions must derive from BaseException
This screws up our retry mechanism (as we expect a ConnectionError or KafkaError, but not a type error), our current fix is to lock our usage to 2.0.6
Metadata
Metadata
Assignees
Labels
No labels