Skip to content

Always wait for completion during SASL/GSSAPI authentication #1248

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 10, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 15 additions & 26 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import logging
from random import shuffle, uniform
import socket
import time
import struct
import sys
import time

from kafka.vendor import six

Expand Down Expand Up @@ -514,52 +515,40 @@ def _try_authenticate_gssapi(self, future):
ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos)
log.debug('%s: canonical Servicename: %s', self, ctx_CanonName)
ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
# Exchange tokens until authentication either succeeds or fails:
log.debug("%s: initiator name: %s", self, ctx_Context.initiator_name)

# Exchange tokens until authentication either succeeds or fails
received_token = None
try:
while not ctx_Context.complete:
# calculate the output token
try:
output_token = ctx_Context.step(received_token)
except GSSError as e:
log.exception("%s: Error invalid token received from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
# calculate an output token from kafka token (or None if first iteration)
output_token = ctx_Context.step(received_token)

if not output_token:
if ctx_Context.complete:
log.debug("%s: Security Context complete ", self)
log.debug("%s: Successful GSSAPI handshake for %s", self, ctx_Context.initiator_name)
break
# pass output token to kafka
try:
self._sock.setblocking(True)
# Send output token
msg = output_token
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)

# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
# The gssapi will be able to identify the needed next step.
# The connection is closed on failure.
response = self._sock.recv(2000)
header = self._sock.recv(4)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Socket can return less than 4 bytes

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, yes it can.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be easier to address this issue in #1249

token_size = struct.unpack('>i', header)
received_token = self._sock.recv(token_size)
self._sock.setblocking(False)

except (AssertionError, ConnectionError) as e:
except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
future.failure(error)
self.close(error=error)

# pass the received token back to gssapi, strip the first 4 bytes
received_token = response[4:]
return future.failure(error)

except Exception as e:
log.exception("%s: GSSAPI handshake error", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
future.failure(error)
self.close(error=error)
return future.failure(e)

log.info('%s: Authenticated as %s', self, gssname)
log.info('%s: Authenticated as %s via GSSAPI', self, gssname)
return future.success(True)

def blacked_out(self):
Expand Down