Skip to content

added gssapi support (Kerberos) for SASL #1152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 20, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 75 additions & 2 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ class SSLWantReadError(Exception):
class SSLWantWriteError(Exception):
pass

# needed for SASL_GSSAPI authentication:
try:
import gssapi
from gssapi.raw.misc import GSSError
except ImportError:
#no gssapi available, will disable gssapi mechanism
gssapi = None
GSSError = None

class ConnectionStates(object):
DISCONNECTING = '<disconnecting>'
DISCONNECTED = '<disconnected>'
Expand Down Expand Up @@ -167,9 +176,13 @@ class BrokerConnection(object):
'metric_group_prefix': '',
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': None,
'sasl_plain_password': None
'sasl_plain_password': None,
'sasl_kerberos_service_name':'kafka'
}
SASL_MECHANISMS = ('PLAIN',)
if gssapi is None:
SASL_MECHANISMS = ('PLAIN',)
else:
SASL_MECHANISMS = ('PLAIN', 'GSSAPI')

def __init__(self, host, port, afi, **configs):
self.hostname = host
Expand Down Expand Up @@ -206,6 +219,9 @@ def __init__(self, host, port, afi, **configs):
if self.config['sasl_mechanism'] == 'PLAIN':
assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
if self.config['sasl_mechanism'] == 'GSSAPI':
assert gssapi is not None, 'GSSAPI lib not available'
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_servicename_kafka required for GSSAPI sasl'

self.state = ConnectionStates.DISCONNECTED
self._reset_reconnect_backoff()
Expand Down Expand Up @@ -437,6 +453,8 @@ def _handle_sasl_handshake_response(self, future, response):

if self.config['sasl_mechanism'] == 'PLAIN':
return self._try_authenticate_plain(future)
elif self.config['sasl_mechanism'] == 'GSSAPI':
return self._try_authenticate_gssapi(future)
else:
return future.failure(
Errors.UnsupportedSaslMechanismError(
Expand Down Expand Up @@ -481,6 +499,61 @@ def _try_authenticate_plain(self, future):

return future.success(True)

def _try_authenticate_gssapi(self, future):

data = b''
gssname = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service)
ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos)
log.debug('%s: canonical Servicename: %s', self, ctx_CanonName)
ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
#Exchange tokens until authentication either suceeded or failed:
received_token = None
try:
while not ctx_Context.complete:
#calculate the output token
try:
output_token = ctx_Context.step(received_token)
except GSSError as e:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this raise a NameError unless GSSError has been imported directly?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternately I'd be ok with except Exception as e here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the NameError i added the lines below in case of ImportError

class GSSError(Exception):
        pass

log.exception("%s: Error invalid token received from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))

if not output_token:
if ctx_Context.complete:
log.debug("%s: Security Context complete ", self)
log.debug("%s: Successful GSSAPI handshake for %s", self, ctx_Context.initiator_name)
break
try:
self._sock.setblocking(True)
# Send output token
msg = output_token
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be bytes on python3 ? if not, should encode('utf-8')

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a binary token that is exchanged. I'll check the type for python3, but i think encoding will harm the authentication token.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return type is bytes

size = Int32.encode(len(msg))
self._sock.sendall(size + msg)

# The server will send a token back. processing of this token either
# establishes a security context, or needs further token exchange
# the gssapi will be able to identify the needed next step
# The connection is closed on failure
response = self._sock.recv(2000)
self._sock.setblocking(False)

except (AssertionError, ConnectionError) as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
future.failure(error)
self.close(error=error)

#pass the received token back to gssapi, strip the first 4 bytes
received_token = response[4:]

except Exception as e:
log.exception("%s: GSSAPI handshake error", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
future.failure(error)
self.close(error=error)

return future.success(True)

def blacked_out(self):
"""
Return true if we are disconnected from the given node and can't
Expand Down