Skip to content

Commit da4329e

Browse files
authored
Improve connection error handling when try_api_versions_check fails all attempts (#2548)
1 parent e247776 commit da4329e

File tree

2 files changed

+35
-1
lines changed

2 files changed

+35
-1
lines changed

kafka/conn.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,9 @@ def _try_api_versions_check(self):
531531
if self._api_versions_future is None:
532532
if self.config['api_version'] is not None:
533533
self._api_version = self.config['api_version']
534+
# api_version will be normalized by KafkaClient, so this should not happen
535+
if self._api_version not in BROKER_API_VERSIONS:
536+
raise Errors.UnrecognizedBrokerVersion('api_version %s not found in kafka.protocol.broker_api_versions' % (self._api_version,))
534537
self._api_versions = BROKER_API_VERSIONS[self._api_version]
535538
log.debug('%s: Using pre-configured api_version %s for ApiVersions', self, self._api_version)
536539
return True
@@ -553,7 +556,8 @@ def _try_api_versions_check(self):
553556
self.state = ConnectionStates.API_VERSIONS_RECV
554557
self.config['state_change_callback'](self.node_id, self._sock, self)
555558
else:
556-
raise 'Unable to determine broker version.'
559+
self.close(Errors.KafkaConnectionError('Unable to determine broker version.'))
560+
return False
557561

558562
for r, f in self.recv():
559563
f.success(r)

test/test_conn.py

+30
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,36 @@ def test_connect(_socket, conn, states):
6969
assert conn.state is state
7070

7171

72+
def test_api_versions_check(_socket):
73+
conn = BrokerConnection('localhost', 9092, socket.AF_INET)
74+
assert conn._api_versions_future is None
75+
conn.connect()
76+
assert conn._api_versions_future is not None
77+
assert conn.connecting() is True
78+
assert conn.state is ConnectionStates.API_VERSIONS_RECV
79+
80+
assert conn._try_api_versions_check() is False
81+
assert conn.connecting() is True
82+
assert conn.state is ConnectionStates.API_VERSIONS_RECV
83+
84+
conn._api_versions_future = None
85+
conn._check_version_idx = 0
86+
assert conn._try_api_versions_check() is False
87+
assert conn.connecting() is True
88+
89+
conn._check_version_idx = len(conn.VERSION_CHECKS)
90+
conn._api_versions_future = None
91+
assert conn._try_api_versions_check() is False
92+
assert conn.connecting() is False
93+
assert conn.disconnected() is True
94+
95+
96+
def test_api_versions_check_unrecognized(_socket):
97+
conn = BrokerConnection('localhost', 9092, socket.AF_INET, api_version=(0, 0))
98+
with pytest.raises(Errors.UnrecognizedBrokerVersion):
99+
conn.connect()
100+
101+
72102
def test_connect_timeout(_socket, conn):
73103
assert conn.state is ConnectionStates.DISCONNECTED
74104

0 commit comments

Comments
 (0)