Skip to content

Commit 6e1b9e5

Browse files
authored
Cleanup sasl mechanism configuration checks; fix gssapi bugs; add sasl_kerberos_name config (#2520)
1 parent 8320cc8 commit 6e1b9e5

File tree

9 files changed

+40
-13
lines changed

9 files changed

+40
-13
lines changed

kafka/admin/client.py

+4
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ class KafkaAdminClient(object):
142142
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
143143
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
144144
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
145+
sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with
146+
sasl mechanism handshake. If provided, sasl_kerberos_service_name and
147+
sasl_kerberos_domain name are ignored. Default: None.
145148
sasl_kerberos_service_name (str): Service name to include in GSSAPI
146149
sasl mechanism handshake. Default: 'kafka'
147150
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
@@ -181,6 +184,7 @@ class KafkaAdminClient(object):
181184
'sasl_mechanism': None,
182185
'sasl_plain_username': None,
183186
'sasl_plain_password': None,
187+
'sasl_kerberos_name': None,
184188
'sasl_kerberos_service_name': 'kafka',
185189
'sasl_kerberos_domain_name': None,
186190
'sasl_oauth_token_provider': None,

kafka/client_async.py

+4
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ class KafkaClient(object):
163163
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
164164
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
165165
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
166+
sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with
167+
sasl mechanism handshake. If provided, sasl_kerberos_service_name and
168+
sasl_kerberos_domain name are ignored. Default: None.
166169
sasl_kerberos_service_name (str): Service name to include in GSSAPI
167170
sasl mechanism handshake. Default: 'kafka'
168171
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
@@ -206,6 +209,7 @@ class KafkaClient(object):
206209
'sasl_mechanism': None,
207210
'sasl_plain_username': None,
208211
'sasl_plain_password': None,
212+
'sasl_kerberos_name': None,
209213
'sasl_kerberos_service_name': 'kafka',
210214
'sasl_kerberos_domain_name': None,
211215
'sasl_oauth_token_provider': None

kafka/conn.py

+4
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,9 @@ class BrokerConnection(object):
178178
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
179179
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
180180
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
181+
sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with
182+
sasl mechanism handshake. If provided, sasl_kerberos_service_name and
183+
sasl_kerberos_domain name are ignored. Default: None.
181184
sasl_kerberos_service_name (str): Service name to include in GSSAPI
182185
sasl mechanism handshake. Default: 'kafka'
183186
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
@@ -216,6 +219,7 @@ class BrokerConnection(object):
216219
'sasl_mechanism': None,
217220
'sasl_plain_username': None,
218221
'sasl_plain_password': None,
222+
'sasl_kerberos_name': None,
219223
'sasl_kerberos_service_name': 'kafka',
220224
'sasl_kerberos_domain_name': None,
221225
'sasl_oauth_token_provider': None

kafka/consumer/group.py

+4
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,9 @@ class KafkaConsumer(six.Iterator):
250250
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
251251
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
252252
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
253+
sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with
254+
sasl mechanism handshake. If provided, sasl_kerberos_service_name and
255+
sasl_kerberos_domain name are ignored. Default: None.
253256
sasl_kerberos_service_name (str): Service name to include in GSSAPI
254257
sasl mechanism handshake. Default: 'kafka'
255258
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
@@ -317,6 +320,7 @@ class KafkaConsumer(six.Iterator):
317320
'sasl_mechanism': None,
318321
'sasl_plain_username': None,
319322
'sasl_plain_password': None,
323+
'sasl_kerberos_name': None,
320324
'sasl_kerberos_service_name': 'kafka',
321325
'sasl_kerberos_domain_name': None,
322326
'sasl_oauth_token_provider': None,

kafka/producer/kafka.py

+4
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,9 @@ class KafkaProducer(object):
289289
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
290290
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
291291
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
292+
sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with
293+
sasl mechanism handshake. If provided, sasl_kerberos_service_name and
294+
sasl_kerberos_domain name are ignored. Default: None.
292295
sasl_kerberos_service_name (str): Service name to include in GSSAPI
293296
sasl mechanism handshake. Default: 'kafka'
294297
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
@@ -347,6 +350,7 @@ class KafkaProducer(object):
347350
'sasl_mechanism': None,
348351
'sasl_plain_username': None,
349352
'sasl_plain_password': None,
353+
'sasl_kerberos_name': None,
350354
'sasl_kerberos_service_name': 'kafka',
351355
'sasl_kerberos_domain_name': None,
352356
'sasl_oauth_token_provider': None,

kafka/sasl/gssapi.py

+12-5
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,19 @@ class SaslMechanismGSSAPI(SaslMechanism):
2222

2323
def __init__(self, **config):
2424
assert gssapi is not None, 'GSSAPI lib not available'
25-
assert config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
25+
if 'sasl_kerberos_name' not in config and 'sasl_kerberos_service_name' not in config:
26+
raise ValueError('sasl_kerberos_service_name or sasl_kerberos_name required for GSSAPI sasl configuration')
2627
self._is_done = False
2728
self._is_authenticated = False
28-
self.kerberos_damin_name = config['sasl_kerberos_domain_name'] or config['host']
29-
self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name
30-
self.gssapi_name = gssapi.Name(auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos)
29+
if config.get('sasl_kerberos_name', None) is not None:
30+
self.auth_id = str(config['sasl_kerberos_name'])
31+
else:
32+
kerberos_domain_name = config.get('sasl_kerberos_domain_name', '') or config.get('host', '')
33+
self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_domain_name
34+
if isinstance(config.get('sasl_kerberos_name', None), gssapi.Name):
35+
self.gssapi_name = config['sasl_kerberos_name']
36+
else:
37+
self.gssapi_name = gssapi.Name(self.auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos)
3138
self._client_ctx = gssapi.SecurityContext(name=self.gssapi_name, usage='initiate')
3239
self._next_token = self._client_ctx.step(None)
3340

@@ -54,7 +61,7 @@ def receive(self, auth_bytes):
5461
raise ValueError("Unexpected receive auth_bytes after sasl/gssapi completion")
5562
else:
5663
# unwraps message containing supported protection levels and msg size
57-
msg = client_ctx.unwrap(received_token).message
64+
msg = self._client_ctx.unwrap(auth_bytes).message
5865
# Kafka currently doesn't support integrity or confidentiality security layers, so we
5966
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
6067
# by the server

kafka/sasl/oauth.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
class SaslMechanismOAuth(SaslMechanism):
77

88
def __init__(self, **config):
9+
assert 'sasl_oauth_token_provider' in config, 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
910
self.token_provider = config['sasl_oauth_token_provider']
10-
assert self.token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
1111
assert callable(getattr(self.token_provider, 'token', None)), 'sasl_oauth_token_provider must implement method #token()'
1212
self._is_done = False
1313
self._is_authenticated = False

kafka/sasl/plain.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@
1111
class SaslMechanismPlain(SaslMechanism):
1212

1313
def __init__(self, **config):
14-
if config['security_protocol'] == 'SASL_PLAINTEXT':
14+
if config.get('security_protocol', '') == 'SASL_PLAINTEXT':
1515
log.warning('Sending username and password in the clear')
16-
assert config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
17-
assert config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
16+
assert 'sasl_plain_username' in config, 'sasl_plain_username required for PLAIN sasl'
17+
assert 'sasl_plain_password' in config, 'sasl_plain_password required for PLAIN sasl'
1818

1919
self.username = config['sasl_plain_username']
2020
self.password = config['sasl_plain_password']

kafka/sasl/scram.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ def xor_bytes(left, right):
2323

2424

2525
class SaslMechanismScram(SaslMechanism):
26-
2726
def __init__(self, **config):
28-
assert config['sasl_plain_username'] is not None, 'sasl_plain_username required for SCRAM sasl'
29-
assert config['sasl_plain_password'] is not None, 'sasl_plain_password required for SCRAM sasl'
30-
if config['security_protocol'] == 'SASL_PLAINTEXT':
27+
assert 'sasl_plain_username' in config, 'sasl_plain_username required for SCRAM sasl'
28+
assert 'sasl_plain_password' in config, 'sasl_plain_password required for SCRAM sasl'
29+
assert config.get('sasl_mechanism', '') in ScramClient.MECHANISMS, 'Unrecognized SCRAM mechanism'
30+
if config.get('security_protocol', '') == 'SASL_PLAINTEXT':
3131
log.warning('Exchanging credentials in the clear during Sasl Authentication')
3232

3333
self._scram_client = ScramClient(

0 commit comments

Comments
 (0)