Skip to content

feat: Add SSPI (Kerberos for Windows) authentication mechanism #2219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 142 additions & 6 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,18 @@ class SSLWantWriteError(Exception):
import gssapi
from gssapi.raw.misc import GSSError
except ImportError:
#no gssapi available, will disable gssapi mechanism
gssapi = None
GSSError = None


try:
import sspi
import pywintypes
import sspicon
import win32security
except ImportError:
sspi = None

AFI_NAMES = {
socket.AF_UNSPEC: "unspecified",
socket.AF_INET: "IPv4",
Expand Down Expand Up @@ -270,7 +277,8 @@ def __init__(self, host, port, afi, **configs):
'sasl_plain_password required for PLAIN or SCRAM sasl'
)
if self.config['sasl_mechanism'] == 'GSSAPI':
assert gssapi is not None, 'GSSAPI lib not available'
if gssapi is None and sspi is None:
raise AssertionError('No GSSAPI lib available')
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
if self.config['sasl_mechanism'] == 'OAUTHBEARER':
token_provider = self.config['sasl_oauth_token_provider']
Expand Down Expand Up @@ -709,13 +717,20 @@ def _try_authenticate_scram(self, future):
return future.success(True)

def _try_authenticate_gssapi(self, future):
kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host
auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name
if gssapi is not None:
return self._try_authenticate_gssapi_gss_implementation(future)

if sspi is not None:
return self._try_authenticate_gssapi_sspi_implementation(future)

def _try_authenticate_gssapi_gss_implementation(self, future):
kerberos_host_name = self.config['sasl_kerberos_domain_name'] or self.host
auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_host_name
gssapi_name = gssapi.Name(
auth_id,
name_type=gssapi.NameType.hostbased_service
).canonicalize(gssapi.MechType.kerberos)
log.debug('%s: GSSAPI name: %s', self, gssapi_name)
log.debug('%s: GSSAPI Service Principal Name: %s', self, gssapi_name)

err = None
close = False
Expand Down Expand Up @@ -778,9 +793,130 @@ def _try_authenticate_gssapi(self, future):
self.close(error=err)
return future.failure(err)

log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name)
log.info(
'%s: Authenticated as %s to %s via GSSAPI',
self,
client_ctx.initiator_name,
client_ctx.target_name
)
return future.success(True)

def _try_authenticate_gssapi_sspi_implementation(self, future):
global log_sspi
log_sspi = logging.getLogger("kafka.conn.sspi")
kerberos_host_name = self.config['sasl_kerberos_domain_name'] or self.host
service_principal_name = self.config['sasl_kerberos_service_name'] + '/' + kerberos_host_name
scheme = "Kerberos" # Do not try with Negotiate for SASL authentication. Tokens are different.
# https://docs.microsoft.com/en-us/windows/win32/secauthn/context-requirements
flags = (
sspicon.ISC_REQ_MUTUAL_AUTH | # mutual authentication
sspicon.ISC_REQ_INTEGRITY | # check for integrity
sspicon.ISC_REQ_SEQUENCE_DETECT | # enable out-of-order messages
sspicon.ISC_REQ_CONFIDENTIALITY # request confidentiality
)

err = None
close = False
with self._lock:
if not self._can_send_recv():
err = Errors.NodeNotReadyError(str(self))
close = False
else:
# Establish security context and negotiate protection level
# For reference see RFC 4752, section 3
try:
log_sspi.debug("Create client security context")
# instantiate sspi context
client_ctx = sspi.ClientAuth(
scheme,
targetspn=service_principal_name,
scflags=flags,
)
# Print some SSPI implementation
log_sspi.info("Using %s SSPI Security Package (%s)", client_ctx.pkg_info["Name"], client_ctx.pkg_info["Comment"])

# Exchange tokens until authentication either succeeds or fails
log_sspi.debug("Beginning rounds...")
received_token = None # no token to pass when initiating the first round
while not client_ctx.authenticated:
# calculate an output token from kafka token (or None on first iteration)
# https://docs.microsoft.com/en-us/windows/win32/api/sspi/nf-sspi-initializesecuritycontexta
# https://docs.microsoft.com/en-us/windows/win32/secauthn/initializesecuritycontext--kerberos
# authorize method will wrap for us our token in sspi structures
log_sspi.debug("Exchange a token")
error, auth = client_ctx.authorize(received_token)
if len(auth) > 0 and len(auth[0].Buffer):
log_sspi.debug("Got token from context")
# this buffer must be sent to the server whatever the result is
output_token = auth[0].Buffer
else:
log_sspi.debug("Got no token, exchange finished")
# seems to be the end of the loop
output_token = None

# pass output token to kafka, or send empty response if the security
# context is complete (output token is None in that case)
if output_token is None:
log_sspi.debug("Sending end of exchange to server")
self._send_bytes_blocking(Int32.encode(0))
else:
log_sspi.debug("Sending token from local context to server")
msg = output_token
size = Int32.encode(len(msg))
self._send_bytes_blocking(size + msg)

# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
# The gssapi will be able to identify the needed next step.
# The connection is closed on failure.
header = self._recv_bytes_blocking(4)
(token_size,) = struct.unpack('>i', header)
received_token = self._recv_bytes_blocking(token_size)
log_sspi.debug("Received token from server (size %s)", token_size)

# Process the security layer negotiation token, sent by the server
# once the security context is established.

# The following part is required by SASL, but not by classic Kerberos.
# See RFC 4752

# unwraps message containing supported protection levels and msg size
msg, was_encrypted = client_ctx.unwrap(received_token)

# Kafka currently doesn't support integrity or confidentiality security layers, so we
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
# by the server
msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))) + msg[1:]

# add authorization identity to the response, GSS-wrap and send it
msg = msg + service_principal_name.encode("utf-8")
msg = client_ctx.wrap(msg)
size = Int32.encode(len(msg))
self._send_bytes_blocking(size + msg)

except (ConnectionError, TimeoutError) as e:
log.exception("%s: Error receiving reply from server", self)
err = Errors.KafkaConnectionError("%s: %s" % (self, e))
close = True
except Exception as e:
err = e
close = True

if err is not None:
if close:
self.close(error=err)
return future.failure(err)

# noinspection PyUnresolvedReferences
log.info(
'%s: Authenticated as %s to %s via SSPI/GSSAPI \\o/',
self,
client_ctx.initiator_name,
client_ctx.service_name
)
return future.success(True)


def _try_authenticate_oauth(self, future):
data = b''

Expand Down
9 changes: 7 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def run(cls):
"lz4": ["lz4"],
"snappy": ["python-snappy"],
"zstd": ["python-zstandard"],
"gssapi": ["gssapi"],
"sspi": ["pywin32 >= 301"],
"krb5_auto": [
'pywin32>=301;platform_system=="Windows"',
'gssapi;platform_system=="Linux"',
],

},
cmdclass={"test": Tox},
packages=find_packages(exclude=['test']),
Expand All @@ -56,8 +63,6 @@ def run(cls):
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
Expand Down