Skip to content

Commit f2f2bfe

Browse files
authored
Wrap SSL sockets after connecting (#1754)
1 parent 64f70b5 commit f2f2bfe

File tree

1 file changed

+11
-19
lines changed

1 file changed

+11
-19
lines changed

kafka/conn.py

+11-19
Original file line numberDiff line numberDiff line change
@@ -356,14 +356,9 @@ def connect(self):
356356

357357
self._sock.setblocking(False)
358358
self.state = ConnectionStates.CONNECTING
359-
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
360-
self._wrap_ssl()
361-
# _wrap_ssl can alter the connection state -- disconnects on failure
362-
# so we need to double check that we are still connecting before
363-
if self.connecting():
364-
self.config['state_change_callback'](self)
365-
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
366-
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
359+
self.config['state_change_callback'](self)
360+
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
361+
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
367362

368363
if self.state is ConnectionStates.CONNECTING:
369364
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -373,29 +368,29 @@ def connect(self):
373368
ret = self._sock.connect_ex(self._sock_addr)
374369
except socket.error as err:
375370
ret = err.errno
376-
except ValueError as err:
377-
# Python 3.7 and higher raises ValueError if a socket
378-
# is already connected
379-
if sys.version_info >= (3, 7):
380-
ret = None
381-
else:
382-
raise
383371

384372
# Connection succeeded
385373
if not ret or ret == errno.EISCONN:
386374
log.debug('%s: established TCP connection', self)
375+
387376
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
388377
log.debug('%s: initiating SSL handshake', self)
389378
self.state = ConnectionStates.HANDSHAKE
379+
self.config['state_change_callback'](self)
380+
# _wrap_ssl can alter the connection state -- disconnects on failure
381+
self._wrap_ssl()
382+
390383
elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
391384
log.debug('%s: initiating SASL authentication', self)
392385
self.state = ConnectionStates.AUTHENTICATING
386+
self.config['state_change_callback'](self)
387+
393388
else:
394389
# security_protocol PLAINTEXT
395390
log.info('%s: Connection complete.', self)
396391
self.state = ConnectionStates.CONNECTED
397392
self._reset_reconnect_backoff()
398-
self.config['state_change_callback'](self)
393+
self.config['state_change_callback'](self)
399394

400395
# Connection failed
401396
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
@@ -486,9 +481,6 @@ def _try_handshake(self):
486481
# old ssl in python2.6 will swallow all SSLErrors here...
487482
except (SSLWantReadError, SSLWantWriteError):
488483
pass
489-
# python 3.7 throws OSError
490-
except OSError:
491-
pass
492484
except (SSLZeroReturnError, ConnectionError, SSLEOFError):
493485
log.warning('SSL connection closed by server during handshake.')
494486
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))

0 commit comments

Comments
 (0)