Skip to content

Commit ac5a935

Browse files
dpkpjeffwidman
authored andcommitted
Timeout all unconnected conns (incl SSL) after request_timeout_ms
1 parent 1a31be5 commit ac5a935

File tree

1 file changed

+8
-6
lines changed

1 file changed

+8
-6
lines changed

kafka/conn.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,6 @@ def connect(self):
351351
if self.state is ConnectionStates.CONNECTING:
352352
# in non-blocking mode, use repeated calls to socket.connect_ex
353353
# to check connection status
354-
request_timeout = self.config['request_timeout_ms'] / 1000.0
355354
ret = None
356355
try:
357356
ret = self._sock.connect_ex(self._sock_addr)
@@ -389,11 +388,6 @@ def connect(self):
389388
errstr = errno.errorcode.get(ret, 'UNKNOWN')
390389
self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr)))
391390

392-
# Connection timed out
393-
elif time.time() > request_timeout + self.last_attempt:
394-
log.error('Connection attempt to %s timed out', self)
395-
self.close(Errors.KafkaConnectionError('timeout'))
396-
397391
# Needs retry
398392
else:
399393
pass
@@ -419,6 +413,14 @@ def connect(self):
419413
self._reset_reconnect_backoff()
420414
self.config['state_change_callback'](self)
421415

416+
if self.state not in (ConnectionStates.CONNECTED,
417+
ConnectionStates.DISCONNECTED):
418+
# Connection timed out
419+
request_timeout = self.config['request_timeout_ms'] / 1000.0
420+
if time.time() > request_timeout + self.last_attempt:
421+
log.error('Connection attempt to %s timed out', self)
422+
self.close(Errors.KafkaConnectionError('timeout'))
423+
422424
return self.state
423425

424426
def _wrap_ssl(self):

0 commit comments

Comments
 (0)