-
Notifications
You must be signed in to change notification settings - Fork 1.4k
added gssapi support (Kerberos) for SASL #1152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b4f788f
2023035
799c9fc
e339af4
7783ef4
91bbf29
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,15 @@ class SSLWantReadError(Exception): | |
class SSLWantWriteError(Exception): | ||
pass | ||
|
||
# needed for SASL_GSSAPI authentication: | ||
try: | ||
import gssapi | ||
from gssapi.raw.misc import GSSError | ||
except ImportError: | ||
#no gssapi available, will disable gssapi mechanism | ||
gssapi = None | ||
GSSError = None | ||
|
||
class ConnectionStates(object): | ||
DISCONNECTING = '<disconnecting>' | ||
DISCONNECTED = '<disconnected>' | ||
|
@@ -167,9 +176,13 @@ class BrokerConnection(object): | |
'metric_group_prefix': '', | ||
'sasl_mechanism': 'PLAIN', | ||
'sasl_plain_username': None, | ||
'sasl_plain_password': None | ||
'sasl_plain_password': None, | ||
'sasl_kerberos_service_name':'kafka' | ||
} | ||
SASL_MECHANISMS = ('PLAIN',) | ||
if gssapi is None: | ||
SASL_MECHANISMS = ('PLAIN',) | ||
else: | ||
SASL_MECHANISMS = ('PLAIN', 'GSSAPI') | ||
|
||
def __init__(self, host, port, afi, **configs): | ||
self.hostname = host | ||
|
@@ -206,6 +219,9 @@ def __init__(self, host, port, afi, **configs): | |
if self.config['sasl_mechanism'] == 'PLAIN': | ||
assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl' | ||
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl' | ||
if self.config['sasl_mechanism'] == 'GSSAPI': | ||
assert gssapi is not None, 'GSSAPI lib not available' | ||
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_servicename_kafka required for GSSAPI sasl' | ||
|
||
self.state = ConnectionStates.DISCONNECTED | ||
self._reset_reconnect_backoff() | ||
|
@@ -437,6 +453,8 @@ def _handle_sasl_handshake_response(self, future, response): | |
|
||
if self.config['sasl_mechanism'] == 'PLAIN': | ||
return self._try_authenticate_plain(future) | ||
elif self.config['sasl_mechanism'] == 'GSSAPI': | ||
return self._try_authenticate_gssapi(future) | ||
else: | ||
return future.failure( | ||
Errors.UnsupportedSaslMechanismError( | ||
|
@@ -481,6 +499,61 @@ def _try_authenticate_plain(self, future): | |
|
||
return future.success(True) | ||
|
||
def _try_authenticate_gssapi(self, future): | ||
|
||
data = b'' | ||
gssname = self.config['sasl_kerberos_service_name'] + '@' + self.hostname | ||
ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service) | ||
ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos) | ||
log.debug('%s: canonical Servicename: %s', self, ctx_CanonName) | ||
ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate') | ||
#Exchange tokens until authentication either suceeded or failed: | ||
received_token = None | ||
try: | ||
while not ctx_Context.complete: | ||
#calculate the output token | ||
try: | ||
output_token = ctx_Context.step(received_token) | ||
except GSSError as e: | ||
log.exception("%s: Error invalid token received from server", self) | ||
error = Errors.ConnectionError("%s: %s" % (self, e)) | ||
|
||
if not output_token: | ||
if ctx_Context.complete: | ||
log.debug("%s: Security Context complete ", self) | ||
log.debug("%s: Successful GSSAPI handshake for %s", self, ctx_Context.initiator_name) | ||
break | ||
try: | ||
self._sock.setblocking(True) | ||
# Send output token | ||
msg = output_token | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will this be bytes on python3 ? if not, should encode('utf-8') There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is a binary token that is exchanged. I'll check the type for python3, but i think encoding will harm the authentication token. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. return type is bytes |
||
size = Int32.encode(len(msg)) | ||
self._sock.sendall(size + msg) | ||
|
||
# The server will send a token back. processing of this token either | ||
# establishes a security context, or needs further token exchange | ||
# the gssapi will be able to identify the needed next step | ||
# The connection is closed on failure | ||
response = self._sock.recv(2000) | ||
self._sock.setblocking(False) | ||
|
||
except (AssertionError, ConnectionError) as e: | ||
log.exception("%s: Error receiving reply from server", self) | ||
error = Errors.ConnectionError("%s: %s" % (self, e)) | ||
future.failure(error) | ||
self.close(error=error) | ||
|
||
#pass the received token back to gssapi, strip the first 4 bytes | ||
received_token = response[4:] | ||
|
||
except Exception as e: | ||
log.exception("%s: GSSAPI handshake error", self) | ||
error = Errors.ConnectionError("%s: %s" % (self, e)) | ||
future.failure(error) | ||
self.close(error=error) | ||
|
||
return future.success(True) | ||
|
||
def blacked_out(self): | ||
""" | ||
Return true if we are disconnected from the given node and can't | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't this raise a NameError unless GSSError has been imported directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternately I'd be ok with
except Exception as e
hereThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid the NameError i added the lines below in case of ImportError