Skip to content

Fix SASL authentication bugs #1257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 16, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 34 additions & 23 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,11 @@ def _try_authenticate(self):
sasl_response.add_callback(self._handle_sasl_handshake_response, future)
sasl_response.add_errback(lambda f, e: f.failure(e), future)
self._sasl_auth_future = future
self._recv()
if self._sasl_auth_future.failed():
self.recv()
# A connection error could trigger close() which will reset the future
if self._sasl_auth_future is None:
return False
elif self._sasl_auth_future.failed():
ex = self._sasl_auth_future.exception
if not isinstance(ex, Errors.ConnectionError):
raise ex # pylint: disable-msg=raising-bad-type
Expand All @@ -457,7 +460,12 @@ def _handle_sasl_handshake_response(self, future, response):
self.close(error=error)
return future.failure(error_type(self))

if self.config['sasl_mechanism'] == 'PLAIN':
if self.config['sasl_mechanism'] not in response.enabled_mechanisms:
return future.failure(
Errors.UnsupportedSaslMechanismError(
'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s'
% (self.config['sasl_mechanism'], response.enabled_mechanisms)))
elif self.config['sasl_mechanism'] == 'PLAIN':
return self._try_authenticate_plain(future)
elif self.config['sasl_mechanism'] == 'GSSAPI':
return self._try_authenticate_gssapi(future)
Expand All @@ -467,6 +475,19 @@ def _handle_sasl_handshake_response(self, future, response):
'kafka-python does not support SASL mechanism %s' %
self.config['sasl_mechanism']))

def _send_bytes_blocking(self, data):
self._sock.setblocking(True)
total_sent = 0
try:
while total_sent < len(data):
sent_bytes = self._sock.send(data[total_sent:])
total_sent += sent_bytes
if total_sent != len(data):
raise ConnectionError('Buffer overrun during socket send')
return total_sent
finally:
self._sock.setblocking(False)

def _recv_bytes_blocking(self, n):
self._sock.setblocking(True)
try:
Expand All @@ -485,15 +506,13 @@ def _try_authenticate_plain(self, future):
log.warning('%s: Sending username and password in the clear', self)

data = b''
# Send PLAIN credentials per RFC-4616
msg = bytes('\0'.join([self.config['sasl_plain_username'],
self.config['sasl_plain_username'],
self.config['sasl_plain_password']]).encode('utf-8'))
size = Int32.encode(len(msg))
try:
self._sock.setblocking(True)
# Send PLAIN credentials per RFC-4616
msg = bytes('\0'.join([self.config['sasl_plain_username'],
self.config['sasl_plain_username'],
self.config['sasl_plain_password']]).encode('utf-8'))
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)
self._sock.setblocking(False)
self._send_bytes_blocking(size + msg)

# The server will send a zero sized message (that is Int32(0)) on success.
# The connection is closed on failure
Expand Down Expand Up @@ -530,11 +549,9 @@ def _try_authenticate_gssapi(self, future):

# pass output token to kafka
try:
self._sock.setblocking(True)
msg = output_token
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)
self._sock.setblocking(False)
self._send_bytes_blocking(size + msg)

# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
Expand Down Expand Up @@ -662,16 +679,10 @@ def _send(self, request):
# In the future we might manage an internal write buffer
# and send bytes asynchronously. For now, just block
# sending each request payload
self._sock.setblocking(True)
total_sent = 0
while total_sent < len(data):
sent_bytes = self._sock.send(data[total_sent:])
total_sent += sent_bytes
assert total_sent == len(data)
total_bytes = self._send_bytes_blocking(data)
if self._sensors:
self._sensors.bytes_sent.record(total_sent)
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
self._sensors.bytes_sent.record(total_bytes)
except ConnectionError as e:
log.exception("Error sending %s to %s", request, self)
error = Errors.ConnectionError("%s: %s" % (self, e))
self.close(error=error)
Expand Down