Skip to content

Commit 7f1bbb9

Browse files
dpkpmanuco
andauthored
feat: Add SSPI (Kerberos for Windows) authentication mechanism (#2521)
Co-authored-by: Emmanuel <[email protected]>
1 parent 57f1782 commit 7f1bbb9

File tree

8 files changed

+151
-7
lines changed

8 files changed

+151
-7
lines changed

kafka/conn.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,7 @@ def _sasl_authenticate(self, future):
766766
self._sasl_mechanism.receive(recv_token)
767767

768768
if self._sasl_mechanism.is_authenticated():
769-
log.info('%s: Authenticated via %s', self, self.config['sasl_mechanism'])
769+
log.info('%s: %s', self, self._sasl_mechanism.auth_details())
770770
return future.success(True)
771771
else:
772772
return future.failure(Errors.AuthenticationFailedError('Failed to authenticate via SASL %s' % self.config['sasl_mechanism']))

kafka/sasl/__init__.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
from __future__ import absolute_import
22

3+
import platform
4+
35
from kafka.sasl.gssapi import SaslMechanismGSSAPI
46
from kafka.sasl.msk import SaslMechanismAwsMskIam
57
from kafka.sasl.oauth import SaslMechanismOAuth
68
from kafka.sasl.plain import SaslMechanismPlain
79
from kafka.sasl.scram import SaslMechanismScram
10+
from kafka.sasl.sspi import SaslMechanismSSPI
811

912

1013
SASL_MECHANISMS = {}
@@ -20,9 +23,12 @@ def get_sasl_mechanism(name):
2023
return SASL_MECHANISMS[name]
2124

2225

23-
register_sasl_mechanism('GSSAPI', SaslMechanismGSSAPI)
26+
register_sasl_mechanism('AWS_MSK_IAM', SaslMechanismAwsMskIam)
27+
if platform.system() == 'Windows':
28+
register_sasl_mechanism('GSSAPI', SaslMechanismSSPI)
29+
else:
30+
register_sasl_mechanism('GSSAPI', SaslMechanismGSSAPI)
2431
register_sasl_mechanism('OAUTHBEARER', SaslMechanismOAuth)
2532
register_sasl_mechanism('PLAIN', SaslMechanismPlain)
2633
register_sasl_mechanism('SCRAM-SHA-256', SaslMechanismScram)
2734
register_sasl_mechanism('SCRAM-SHA-512', SaslMechanismScram)
28-
register_sasl_mechanism('AWS_MSK_IAM', SaslMechanismAwsMskIam)

kafka/sasl/abc.py

+5
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,8 @@ def is_done(self):
2525
@abc.abstractmethod
2626
def is_authenticated(self):
2727
pass
28+
29+
def auth_details(self):
30+
if not self.is_authenticated:
31+
raise RuntimeError('Not authenticated yet!')
32+
return 'Authenticated via SASL'

kafka/sasl/gssapi.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
from __future__ import absolute_import
22

3-
import struct
4-
53
# needed for SASL_GSSAPI authentication:
64
try:
75
import gssapi
@@ -67,10 +65,12 @@ def receive(self, auth_bytes):
6765
# Kafka currently doesn't support integrity or confidentiality security layers, so we
6866
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
6967
# by the server
68+
client_flags = self.SASL_QOP_AUTH
69+
server_flags = msg[0]
7070
message_parts = [
71-
struct.pack('>b', self.SASL_QOP_AUTH & struct.unpack('>b', msg[0:1])),
71+
bytes(client_flags & server_flags),
7272
msg[:1],
73-
self.auth_id.encode(),
73+
self.auth_id.encode('utf-8'),
7474
]
7575
# add authorization identity to the response, and GSS-wrap
7676
self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message
@@ -80,3 +80,8 @@ def is_done(self):
8080

8181
def is_authenticated(self):
8282
return self._is_authenticated
83+
84+
def auth_details(self):
85+
if not self.is_authenticated:
86+
raise RuntimeError('Not authenticated yet!')
87+
return 'Authenticated as %s to %s via SASL / GSSAPI' % (self._client_ctx.initiator_name, self._client_ctx.target_name)

kafka/sasl/oauth.py

+5
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,8 @@ def _token_extensions(self):
3737
extensions = getattr(self.token_provider, 'extensions', lambda: [])()
3838
msg = '\x01'.join(['{}={}'.format(k, v) for k, v in extensions.items()])
3939
return '\x01' + msg if msg else ''
40+
41+
def auth_details(self):
42+
if not self.is_authenticated:
43+
raise RuntimeError('Not authenticated yet!')
44+
return 'Authenticated via SASL / OAuth'

kafka/sasl/plain.py

+5
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,8 @@ def is_done(self):
3434

3535
def is_authenticated(self):
3636
return self._is_authenticated
37+
38+
def auth_details(self):
39+
if not self.is_authenticated:
40+
raise RuntimeError('Not authenticated yet!')
41+
return 'Authenticated as %s via SASL / Plain' % self.username

kafka/sasl/scram.py

+7
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ def __init__(self, **config):
3030
if config.get('security_protocol', '') == 'SASL_PLAINTEXT':
3131
log.warning('Exchanging credentials in the clear during Sasl Authentication')
3232

33+
self.username = config['sasl_plain_username']
34+
self.mechanism = config['sasl_mechanism']
3335
self._scram_client = ScramClient(
3436
config['sasl_plain_username'],
3537
config['sasl_plain_password'],
@@ -62,6 +64,11 @@ def is_authenticated(self):
6264
# receive raises if authentication fails...?
6365
return self._state == 2
6466

67+
def auth_details(self):
68+
if not self.is_authenticated:
69+
raise RuntimeError('Not authenticated yet!')
70+
return 'Authenticated as %s via SASL / %s' % (self.username, self.mechanism)
71+
6572

6673
class ScramClient:
6774
MECHANISMS = {

kafka/sasl/sspi.py

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
from __future__ import absolute_import
2+
3+
import logging
4+
5+
# Windows-only
6+
try:
7+
import sspi
8+
import pywintypes
9+
import sspicon
10+
import win32security
11+
except ImportError:
12+
sspi = None
13+
14+
from kafka.sasl.abc import SaslMechanism
15+
16+
17+
log = logging.getLogger(__name__)
18+
19+
20+
class SaslMechanismSSPI(SaslMechanism):
21+
# Establish security context and negotiate protection level
22+
# For reference see RFC 4752, section 3
23+
24+
SASL_QOP_AUTH = 1
25+
SASL_QOP_AUTH_INT = 2
26+
SASL_QOP_AUTH_CONF = 4
27+
28+
def __init__(self, **config):
29+
assert sspi is not None, 'No GSSAPI lib available (gssapi or sspi)'
30+
if 'sasl_kerberos_name' not in config and 'sasl_kerberos_service_name' not in config:
31+
raise ValueError('sasl_kerberos_service_name or sasl_kerberos_name required for GSSAPI sasl configuration')
32+
self._is_done = False
33+
self._is_authenticated = False
34+
if config.get('sasl_kerberos_name', None) is not None:
35+
self.auth_id = str(config['sasl_kerberos_name'])
36+
else:
37+
kerberos_domain_name = config.get('sasl_kerberos_domain_name', '') or config.get('host', '')
38+
self.auth_id = config['sasl_kerberos_service_name'] + '/' + kerberos_domain_name
39+
scheme = "Kerberos" # Do not try with Negotiate for SASL authentication. Tokens are different.
40+
# https://docs.microsoft.com/en-us/windows/win32/secauthn/context-requirements
41+
flags = (
42+
sspicon.ISC_REQ_MUTUAL_AUTH | # mutual authentication
43+
sspicon.ISC_REQ_INTEGRITY | # check for integrity
44+
sspicon.ISC_REQ_SEQUENCE_DETECT | # enable out-of-order messages
45+
sspicon.ISC_REQ_CONFIDENTIALITY # request confidentiality
46+
)
47+
self._client_ctx = sspi.ClientAuth(scheme, targetspn=self.auth_id, scflags=flags)
48+
self._next_token = self._client_ctx.step(None)
49+
50+
def auth_bytes(self):
51+
# GSSAPI Auth does not have a final broker->client message
52+
# so mark is_done after the final auth_bytes are provided
53+
# in practice we'll still receive a response when using SaslAuthenticate
54+
# but not when using the prior unframed approach.
55+
if self._client_ctx.authenticated:
56+
self._is_done = True
57+
self._is_authenticated = True
58+
return self._next_token or b''
59+
60+
def receive(self, auth_bytes):
61+
log.debug("Received token from server (size %s)", len(auth_bytes))
62+
if not self._client_ctx.authenticated:
63+
# calculate an output token from kafka token (or None on first iteration)
64+
# https://docs.microsoft.com/en-us/windows/win32/api/sspi/nf-sspi-initializesecuritycontexta
65+
# https://docs.microsoft.com/en-us/windows/win32/secauthn/initializesecuritycontext--kerberos
66+
# authorize method will wrap for us our token in sspi structures
67+
error, auth = self._client_ctx.authorize(auth_bytes)
68+
if len(auth) > 0 and len(auth[0].Buffer):
69+
log.debug("Got token from context")
70+
# this buffer must be sent to the server whatever the result is
71+
self._next_token = auth[0].Buffer
72+
else:
73+
log.debug("Got no token, exchange finished")
74+
# seems to be the end of the loop
75+
self._next_token = b''
76+
elif self._is_done:
77+
# The final step of gssapi is send, so we do not expect any additional bytes
78+
# however, allow an empty message to support SaslAuthenticate response
79+
if auth_bytes != b'':
80+
raise ValueError("Unexpected receive auth_bytes after sasl/gssapi completion")
81+
else:
82+
# Process the security layer negotiation token, sent by the server
83+
# once the security context is established.
84+
85+
# The following part is required by SASL, but not by classic Kerberos.
86+
# See RFC 4752
87+
88+
# unwraps message containing supported protection levels and msg size
89+
msg, _was_encrypted = self._client_ctx.unwrap(auth_bytes)
90+
91+
# Kafka currently doesn't support integrity or confidentiality security layers, so we
92+
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
93+
# by the server
94+
client_flags = self.SASL_QOP_AUTH
95+
server_flags = msg[0]
96+
message_parts = [
97+
bytes(client_flags & server_flags),
98+
msg[:1],
99+
self.auth_id.encode('utf-8'),
100+
]
101+
# add authorization identity to the response, and GSS-wrap
102+
self._next_token = self._client_ctx.wrap(b''.join(message_parts), False)
103+
104+
def is_done(self):
105+
return self._is_done
106+
107+
def is_authenticated(self):
108+
return self._is_authenticated
109+
110+
def auth_details(self):
111+
return 'Authenticated as %s to %s via SASL / SSPI/GSSAPI \\o/' % (self._client_ctx.initiator_name, self._client_ctx.service_name)

0 commit comments

Comments
 (0)