Skip to content

TypeError: exceptions must derive from BaseException when unable to determine broker version #2547

Closed
@bentheiii

Description

@bentheiii

Traceback of an error when we tried upgrading to 2.1.0

Scenario, we try to connect to a kafka broker that's just starting up (so failures and retries are expected)

/opt/hostedtoolcache/Python/3.11.11/x64/lib/python3.11/site-packages/kafka/consumer/group.py:379: in __init__
    self._client = self.config['kafka_client'](metrics=self._metrics, **self.config)
/opt/hostedtoolcache/Python/3.11.11/x64/lib/python3.11/site-packages/kafka/client_async.py:262: in __init__
    self.config['api_version'] = self.check_version()
/opt/hostedtoolcache/Python/3.11.11/x64/lib/python3.11/site-packages/kafka/client_async.py:1052: in check_version
    self.poll(timeout_ms=timeout_ms)
/opt/hostedtoolcache/Python/3.11.11/x64/lib/python3.11/site-packages/kafka/client_async.py:683: in poll
    self._poll(timeout / 1000)
/opt/hostedtoolcache/Python/3.11.11/x64/lib/python3.11/site-packages/kafka/client_async.py:740: in _poll
    conn.connect()
/opt/hostedtoolcache/Python/3.11.11/x64/lib/python3.11/site-packages/kafka/conn.py:436: in connect
    if self._try_api_versions_check():
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <kafka.conn.BrokerConnection object at 0x7f40eb315790>

    def _try_api_versions_check(self):
        if self._api_versions_future is None:
            if self.config['api_version'] is not None:
                self._api_version = self.config['api_version']
                self._api_versions = BROKER_API_VERSIONS[self._api_version]
                log.debug('%s: Using pre-configured api_version %s for ApiVersions', self, self._api_version)
                return True
            elif self._check_version_idx is None:
                request = ApiVersionsRequest[self._api_versions_idx]()
                future = Future()
                response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8))
                response.add_callback(self._handle_api_versions_response, future)
                response.add_errback(self._handle_api_versions_failure, future)
                self._api_versions_future = future
                self.state = ConnectionStates.API_VERSIONS_RECV
                self.config['state_change_callback'](self.node_id, self._sock, self)
            elif self._check_version_idx < len(self.VERSION_CHECKS):
                version, request = self.VERSION_CHECKS[self._check_version_idx]
                future = Future()
                response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8))
                response.add_callback(self._handle_check_version_response, future, version)
                response.add_errback(self._handle_check_version_failure, future)
                self._api_versions_future = future
                self.state = ConnectionStates.API_VERSIONS_RECV
                self.config['state_change_callback'](self.node_id, self._sock, self)
            else:
>               raise 'Unable to determine broker version.'
E               TypeError: exceptions must derive from BaseException

This screws up our retry mechanism (as we expect a ConnectionError or KafkaError, but not a type error), our current fix is to lock our usage to 2.0.6

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions