Skip to content

Check for disconnects during ssl handshake and sasl authentication #1249

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 10, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 42 additions & 31 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,15 @@ def connect(self):
self._sock.setsockopt(*option)

self._sock.setblocking(False)
self.last_attempt = time.time()
self.state = ConnectionStates.CONNECTING
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
self._wrap_ssl()
log.info('%s: connecting to %s:%d', self, self.host, self.port)
self.state = ConnectionStates.CONNECTING
self.last_attempt = time.time()
self.config['state_change_callback'](self)
# _wrap_ssl can alter the connection state -- disconnects on failure
# so we need to double check that we are still connecting before
if self.connecting():
self.config['state_change_callback'](self)
log.info('%s: connecting to %s:%d', self, self.host, self.port)

if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
Expand Down Expand Up @@ -367,10 +370,12 @@ def connect(self):
if self.state is ConnectionStates.AUTHENTICATING:
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
if self._try_authenticate():
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self)
# _try_authenticate has side-effects: possibly disconnected on socket errors
if self.state is ConnectionStates.AUTHENTICATING:
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self)

return self.state

Expand All @@ -397,10 +402,7 @@ def _wrap_ssl(self):
password=self.config['ssl_password'])
if self.config['ssl_crlfile']:
if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'):
error = 'No CRL support with this version of Python.'
log.error('%s: %s Disconnecting.', self, error)
self.close(Errors.ConnectionError(error))
return
raise RuntimeError('This version of Python does not support ssl_crlfile!')
log.info('%s: Loading SSL CRL from %s', self, self.config['ssl_crlfile'])
self._ssl_context.load_verify_locations(self.config['ssl_crlfile'])
# pylint: disable=no-member
Expand Down Expand Up @@ -443,7 +445,9 @@ def _try_authenticate(self):
self._sasl_auth_future = future
self._recv()
if self._sasl_auth_future.failed():
raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type
ex = self._sasl_auth_future.exception
if not isinstance(ex, Errors.ConnectionError):
raise ex # pylint: disable-msg=raising-bad-type
return self._sasl_auth_future.succeeded()

def _handle_sasl_handshake_response(self, future, response):
Expand All @@ -463,6 +467,19 @@ def _handle_sasl_handshake_response(self, future, response):
'kafka-python does not support SASL mechanism %s' %
self.config['sasl_mechanism']))

def _recv_bytes_blocking(self, n):
self._sock.setblocking(True)
try:
data = b''
while len(data) < n:
fragment = self._sock.recv(n - len(data))
if not fragment:
raise ConnectionError('Connection reset during recv')
data += fragment
return data
finally:
self._sock.setblocking(False)

def _try_authenticate_plain(self, future):
if self.config['security_protocol'] == 'SASL_PLAINTEXT':
log.warning('%s: Sending username and password in the clear', self)
Expand All @@ -476,30 +493,23 @@ def _try_authenticate_plain(self, future):
self.config['sasl_plain_password']]).encode('utf-8'))
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)
self._sock.setblocking(False)

# The server will send a zero sized message (that is Int32(0)) on success.
# The connection is closed on failure
while len(data) < 4:
fragment = self._sock.recv(4 - len(data))
if not fragment:
log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username'])
error = Errors.AuthenticationFailedError(
'Authentication failed for user {0}'.format(
self.config['sasl_plain_username']))
future.failure(error)
raise error
data += fragment
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
self._recv_bytes_blocking(4)

except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
future.failure(error)
self.close(error=error)
return future.failure(error)

if data != b'\x00\x00\x00\x00':
return future.failure(Errors.AuthenticationFailedError())
error = Errors.AuthenticationFailedError('Unrecognized response during authentication')
return future.failure(error)

log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username'])
return future.success(True)

def _try_authenticate_gssapi(self, future):
Expand All @@ -524,14 +534,15 @@ def _try_authenticate_gssapi(self, future):
msg = output_token
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)
self._sock.setblocking(False)

# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
# The gssapi will be able to identify the needed next step.
# The connection is closed on failure.
header = self._sock.recv(4)
header = self._recv_bytes_blocking(4)
token_size = struct.unpack('>i', header)
received_token = self._sock.recv(token_size)
self._sock.setblocking(False)
received_token = self._recv_bytes_blocking(token_size)

except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
Expand Down