Skip to content

feat: Add SSPI (Kerberos for Windows) authentication mechanism #2521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ def _sasl_authenticate(self, future):
self._sasl_mechanism.receive(recv_token)

if self._sasl_mechanism.is_authenticated():
log.info('%s: Authenticated via %s', self, self.config['sasl_mechanism'])
log.info('%s: %s', self, self._sasl_mechanism.auth_details())
return future.success(True)
else:
return future.failure(Errors.AuthenticationFailedError('Failed to authenticate via SASL %s' % self.config['sasl_mechanism']))
Expand Down
10 changes: 8 additions & 2 deletions kafka/sasl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from __future__ import absolute_import

import platform

from kafka.sasl.gssapi import SaslMechanismGSSAPI
from kafka.sasl.msk import SaslMechanismAwsMskIam
from kafka.sasl.oauth import SaslMechanismOAuth
from kafka.sasl.plain import SaslMechanismPlain
from kafka.sasl.scram import SaslMechanismScram
from kafka.sasl.sspi import SaslMechanismSSPI


SASL_MECHANISMS = {}
Expand All @@ -20,9 +23,12 @@ def get_sasl_mechanism(name):
return SASL_MECHANISMS[name]


register_sasl_mechanism('GSSAPI', SaslMechanismGSSAPI)
register_sasl_mechanism('AWS_MSK_IAM', SaslMechanismAwsMskIam)
if platform.system() == 'Windows':
register_sasl_mechanism('GSSAPI', SaslMechanismSSPI)
else:
register_sasl_mechanism('GSSAPI', SaslMechanismGSSAPI)
register_sasl_mechanism('OAUTHBEARER', SaslMechanismOAuth)
register_sasl_mechanism('PLAIN', SaslMechanismPlain)
register_sasl_mechanism('SCRAM-SHA-256', SaslMechanismScram)
register_sasl_mechanism('SCRAM-SHA-512', SaslMechanismScram)
register_sasl_mechanism('AWS_MSK_IAM', SaslMechanismAwsMskIam)
5 changes: 5 additions & 0 deletions kafka/sasl/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ def is_done(self):
@abc.abstractmethod
def is_authenticated(self):
pass

def auth_details(self):
if not self.is_authenticated:
raise RuntimeError('Not authenticated yet!')
return 'Authenticated via SASL'
13 changes: 9 additions & 4 deletions kafka/sasl/gssapi.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import absolute_import

import struct

# needed for SASL_GSSAPI authentication:
try:
import gssapi
Expand Down Expand Up @@ -67,10 +65,12 @@ def receive(self, auth_bytes):
# Kafka currently doesn't support integrity or confidentiality security layers, so we
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
# by the server
client_flags = self.SASL_QOP_AUTH
server_flags = msg[0]
message_parts = [
struct.pack('>b', self.SASL_QOP_AUTH & struct.unpack('>b', msg[0:1])),
bytes(client_flags & server_flags),
msg[:1],
self.auth_id.encode(),
self.auth_id.encode('utf-8'),
]
# add authorization identity to the response, and GSS-wrap
self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message
Expand All @@ -80,3 +80,8 @@ def is_done(self):

def is_authenticated(self):
return self._is_authenticated

def auth_details(self):
if not self.is_authenticated:
raise RuntimeError('Not authenticated yet!')
return 'Authenticated as %s to %s via SASL / GSSAPI' % (self._client_ctx.initiator_name, self._client_ctx.target_name)
5 changes: 5 additions & 0 deletions kafka/sasl/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,8 @@ def _token_extensions(self):
extensions = getattr(self.token_provider, 'extensions', lambda: [])()
msg = '\x01'.join(['{}={}'.format(k, v) for k, v in extensions.items()])
return '\x01' + msg if msg else ''

def auth_details(self):
if not self.is_authenticated:
raise RuntimeError('Not authenticated yet!')
return 'Authenticated via SASL / OAuth'
5 changes: 5 additions & 0 deletions kafka/sasl/plain.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,8 @@ def is_done(self):

def is_authenticated(self):
return self._is_authenticated

def auth_details(self):
if not self.is_authenticated:
raise RuntimeError('Not authenticated yet!')
return 'Authenticated as %s via SASL / Plain' % self.username
7 changes: 7 additions & 0 deletions kafka/sasl/scram.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def __init__(self, **config):
if config.get('security_protocol', '') == 'SASL_PLAINTEXT':
log.warning('Exchanging credentials in the clear during Sasl Authentication')

self.username = config['sasl_plain_username']
self.mechanism = config['sasl_mechanism']
self._scram_client = ScramClient(
config['sasl_plain_username'],
config['sasl_plain_password'],
Expand Down Expand Up @@ -62,6 +64,11 @@ def is_authenticated(self):
# receive raises if authentication fails...?
return self._state == 2

def auth_details(self):
if not self.is_authenticated:
raise RuntimeError('Not authenticated yet!')
return 'Authenticated as %s via SASL / %s' % (self.username, self.mechanism)


class ScramClient:
MECHANISMS = {
Expand Down
111 changes: 111 additions & 0 deletions kafka/sasl/sspi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from __future__ import absolute_import

import logging

# Windows-only
try:
import sspi
import pywintypes
import sspicon
import win32security
except ImportError:
sspi = None

from kafka.sasl.abc import SaslMechanism


log = logging.getLogger(__name__)


class SaslMechanismSSPI(SaslMechanism):
# Establish security context and negotiate protection level
# For reference see RFC 4752, section 3

SASL_QOP_AUTH = 1
SASL_QOP_AUTH_INT = 2
SASL_QOP_AUTH_CONF = 4

def __init__(self, **config):
assert sspi is not None, 'No GSSAPI lib available (gssapi or sspi)'
if 'sasl_kerberos_name' not in config and 'sasl_kerberos_service_name' not in config:
raise ValueError('sasl_kerberos_service_name or sasl_kerberos_name required for GSSAPI sasl configuration')
self._is_done = False
self._is_authenticated = False
if config.get('sasl_kerberos_name', None) is not None:
self.auth_id = str(config['sasl_kerberos_name'])
else:
kerberos_domain_name = config.get('sasl_kerberos_domain_name', '') or config.get('host', '')
self.auth_id = config['sasl_kerberos_service_name'] + '/' + kerberos_domain_name
scheme = "Kerberos" # Do not try with Negotiate for SASL authentication. Tokens are different.
# https://docs.microsoft.com/en-us/windows/win32/secauthn/context-requirements
flags = (
sspicon.ISC_REQ_MUTUAL_AUTH | # mutual authentication
sspicon.ISC_REQ_INTEGRITY | # check for integrity
sspicon.ISC_REQ_SEQUENCE_DETECT | # enable out-of-order messages
sspicon.ISC_REQ_CONFIDENTIALITY # request confidentiality
)
self._client_ctx = sspi.ClientAuth(scheme, targetspn=self.auth_id, scflags=flags)
self._next_token = self._client_ctx.step(None)

def auth_bytes(self):
# GSSAPI Auth does not have a final broker->client message
# so mark is_done after the final auth_bytes are provided
# in practice we'll still receive a response when using SaslAuthenticate
# but not when using the prior unframed approach.
if self._client_ctx.authenticated:
self._is_done = True
self._is_authenticated = True
return self._next_token or b''

def receive(self, auth_bytes):
log.debug("Received token from server (size %s)", len(auth_bytes))
if not self._client_ctx.authenticated:
# calculate an output token from kafka token (or None on first iteration)
# https://docs.microsoft.com/en-us/windows/win32/api/sspi/nf-sspi-initializesecuritycontexta
# https://docs.microsoft.com/en-us/windows/win32/secauthn/initializesecuritycontext--kerberos
# authorize method will wrap for us our token in sspi structures
error, auth = self._client_ctx.authorize(auth_bytes)
if len(auth) > 0 and len(auth[0].Buffer):
log.debug("Got token from context")
# this buffer must be sent to the server whatever the result is
self._next_token = auth[0].Buffer
else:
log.debug("Got no token, exchange finished")
# seems to be the end of the loop
self._next_token = b''
elif self._is_done:
# The final step of gssapi is send, so we do not expect any additional bytes
# however, allow an empty message to support SaslAuthenticate response
if auth_bytes != b'':
raise ValueError("Unexpected receive auth_bytes after sasl/gssapi completion")
else:
# Process the security layer negotiation token, sent by the server
# once the security context is established.

# The following part is required by SASL, but not by classic Kerberos.
# See RFC 4752

# unwraps message containing supported protection levels and msg size
msg, _was_encrypted = self._client_ctx.unwrap(auth_bytes)

# Kafka currently doesn't support integrity or confidentiality security layers, so we
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
# by the server
client_flags = self.SASL_QOP_AUTH
server_flags = msg[0]
message_parts = [
bytes(client_flags & server_flags),
msg[:1],
self.auth_id.encode('utf-8'),
]
# add authorization identity to the response, and GSS-wrap
self._next_token = self._client_ctx.wrap(b''.join(message_parts), False)

def is_done(self):
return self._is_done

def is_authenticated(self):
return self._is_authenticated

def auth_details(self):
return 'Authenticated as %s to %s via SASL / SSPI/GSSAPI \\o/' % (self._client_ctx.initiator_name, self._client_ctx.service_name)