Skip to content

Commit e3362ac

Browse files
committed
Style updates to scram sasl support
1 parent ee1c4a4 commit e3362ac

File tree

3 files changed

+93
-82
lines changed

3 files changed

+93
-82
lines changed

kafka/conn.py

+5-78
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
11
from __future__ import absolute_import, division
22

3-
import base64
43
import copy
54
import errno
6-
import hashlib
7-
import hmac
85
import io
96
import logging
107
from random import shuffle, uniform
118

12-
from uuid import uuid4
13-
149
# selectors in stdlib as of py3.4
1510
try:
1611
import selectors # pylint: disable=import-error
@@ -34,6 +29,7 @@
3429
from kafka.protocol.metadata import MetadataRequest
3530
from kafka.protocol.parser import KafkaProtocol
3631
from kafka.protocol.types import Int32, Int8
32+
from kafka.scram import ScramClient
3733
from kafka.version import __version__
3834

3935

@@ -42,12 +38,6 @@
4238
TimeoutError = socket.error
4339
BlockingIOError = Exception
4440

45-
def xor_bytes(left, right):
46-
return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right))
47-
else:
48-
def xor_bytes(left, right):
49-
return bytes(lb ^ rb for lb, rb in zip(left, right))
50-
5141
log = logging.getLogger(__name__)
5242

5343
DEFAULT_KAFKA_PORT = 9092
@@ -107,69 +97,6 @@ class ConnectionStates(object):
10797
AUTHENTICATING = '<authenticating>'
10898

10999

110-
class ScramClient:
111-
MECHANISMS = {
112-
'SCRAM-SHA-256': hashlib.sha256,
113-
'SCRAM-SHA-512': hashlib.sha512
114-
}
115-
116-
def __init__(self, user, password, mechanism):
117-
self.nonce = str(uuid4()).replace('-', '')
118-
self.auth_message = ''
119-
self.salted_password = None
120-
self.user = user
121-
self.password = password.encode()
122-
self.hashfunc = self.MECHANISMS[mechanism]
123-
self.hashname = ''.join(mechanism.lower().split('-')[1:3])
124-
self.stored_key = None
125-
self.client_key = None
126-
self.client_signature = None
127-
self.client_proof = None
128-
self.server_key = None
129-
self.server_signature = None
130-
131-
def first_message(self):
132-
client_first_bare = 'n={},r={}'.format(self.user, self.nonce)
133-
self.auth_message += client_first_bare
134-
return 'n,,' + client_first_bare
135-
136-
def process_server_first_message(self, server_first_message):
137-
self.auth_message += ',' + server_first_message
138-
params = dict(pair.split('=', 1) for pair in server_first_message.split(','))
139-
server_nonce = params['r']
140-
if not server_nonce.startswith(self.nonce):
141-
raise ValueError("Server nonce, did not start with client nonce!")
142-
self.nonce = server_nonce
143-
self.auth_message += ',c=biws,r=' + self.nonce
144-
145-
salt = base64.b64decode(params['s'].encode())
146-
iterations = int(params['i'])
147-
self.create_salted_password(salt, iterations)
148-
149-
self.client_key = self.hmac(self.salted_password, b'Client Key')
150-
self.stored_key = self.hashfunc(self.client_key).digest()
151-
self.client_signature = self.hmac(self.stored_key, self.auth_message.encode())
152-
self.client_proof = xor_bytes(self.client_key, self.client_signature)
153-
self.server_key = self.hmac(self.salted_password, b'Server Key')
154-
self.server_signature = self.hmac(self.server_key, self.auth_message.encode())
155-
156-
def hmac(self, key, msg):
157-
return hmac.new(key, msg, digestmod=self.hashfunc).digest()
158-
159-
def create_salted_password(self, salt, iterations):
160-
self.salted_password = hashlib.pbkdf2_hmac(
161-
self.hashname, self.password, salt, iterations
162-
)
163-
164-
def final_message(self):
165-
client_final_no_proof = 'c=biws,r=' + self.nonce
166-
return 'c=biws,r={},p={}'.format(self.nonce, base64.b64encode(self.client_proof).decode())
167-
168-
def process_server_final_message(self, server_final_message):
169-
params = dict(pair.split('=', 1) for pair in server_final_message.split(','))
170-
if self.server_signature != base64.b64decode(params['v'].encode()):
171-
raise ValueError("Server sent wrong signature!")
172-
173100
class BrokerConnection(object):
174101
"""Initialize a Kafka broker connection
175102
@@ -747,20 +674,20 @@ def _try_authenticate_scram(self, future):
747674
close = False
748675
else:
749676
try:
750-
client_first = scram_client.first_message().encode()
677+
client_first = scram_client.first_message().encode('utf-8')
751678
size = Int32.encode(len(client_first))
752679
self._send_bytes_blocking(size + client_first)
753680

754681
(data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
755-
server_first = self._recv_bytes_blocking(data_len).decode()
682+
server_first = self._recv_bytes_blocking(data_len).decode('utf-8')
756683
scram_client.process_server_first_message(server_first)
757684

758-
client_final = scram_client.final_message().encode()
685+
client_final = scram_client.final_message().encode('utf-8')
759686
size = Int32.encode(len(client_final))
760687
self._send_bytes_blocking(size + client_final)
761688

762689
(data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
763-
server_final = self._recv_bytes_blocking(data_len).decode()
690+
server_final = self._recv_bytes_blocking(data_len).decode('utf-8')
764691
scram_client.process_server_final_message(server_final)
765692

766693
except (ConnectionError, TimeoutError) as e:

kafka/scram.py

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
from __future__ import absolute_import
2+
3+
import base64
4+
import hashlib
5+
import hmac
6+
import uuid
7+
8+
from kafka.vendor import six
9+
10+
11+
if six.PY2:
12+
def xor_bytes(left, right):
13+
return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right))
14+
else:
15+
def xor_bytes(left, right):
16+
return bytes(lb ^ rb for lb, rb in zip(left, right))
17+
18+
19+
class ScramClient:
20+
MECHANISMS = {
21+
'SCRAM-SHA-256': hashlib.sha256,
22+
'SCRAM-SHA-512': hashlib.sha512
23+
}
24+
25+
def __init__(self, user, password, mechanism):
26+
self.nonce = str(uuid.uuid4()).replace('-', '')
27+
self.auth_message = ''
28+
self.salted_password = None
29+
self.user = user
30+
self.password = password.encode('utf-8')
31+
self.hashfunc = self.MECHANISMS[mechanism]
32+
self.hashname = ''.join(mechanism.lower().split('-')[1:3])
33+
self.stored_key = None
34+
self.client_key = None
35+
self.client_signature = None
36+
self.client_proof = None
37+
self.server_key = None
38+
self.server_signature = None
39+
40+
def first_message(self):
41+
client_first_bare = 'n={},r={}'.format(self.user, self.nonce)
42+
self.auth_message += client_first_bare
43+
return 'n,,' + client_first_bare
44+
45+
def process_server_first_message(self, server_first_message):
46+
self.auth_message += ',' + server_first_message
47+
params = dict(pair.split('=', 1) for pair in server_first_message.split(','))
48+
server_nonce = params['r']
49+
if not server_nonce.startswith(self.nonce):
50+
raise ValueError("Server nonce, did not start with client nonce!")
51+
self.nonce = server_nonce
52+
self.auth_message += ',c=biws,r=' + self.nonce
53+
54+
salt = base64.b64decode(params['s'].encode('utf-8'))
55+
iterations = int(params['i'])
56+
self.create_salted_password(salt, iterations)
57+
58+
self.client_key = self.hmac(self.salted_password, b'Client Key')
59+
self.stored_key = self.hashfunc(self.client_key).digest()
60+
self.client_signature = self.hmac(self.stored_key, self.auth_message.encode('utf-8'))
61+
self.client_proof = xor_bytes(self.client_key, self.client_signature)
62+
self.server_key = self.hmac(self.salted_password, b'Server Key')
63+
self.server_signature = self.hmac(self.server_key, self.auth_message.encode('utf-8'))
64+
65+
def hmac(self, key, msg):
66+
return hmac.new(key, msg, digestmod=self.hashfunc).digest()
67+
68+
def create_salted_password(self, salt, iterations):
69+
self.salted_password = hashlib.pbkdf2_hmac(
70+
self.hashname, self.password, salt, iterations
71+
)
72+
73+
def final_message(self):
74+
client_final_no_proof = 'c=biws,r=' + self.nonce
75+
return 'c=biws,r={},p={}'.format(self.nonce, base64.b64encode(self.client_proof).decode('utf-8'))
76+
77+
def process_server_final_message(self, server_final_message):
78+
params = dict(pair.split('=', 1) for pair in server_final_message.split(','))
79+
if self.server_signature != base64.b64decode(params['v'].encode('utf-8')):
80+
raise ValueError("Server sent wrong signature!")
81+
82+

test/fixtures.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,10 @@ def _sasl_config(self):
318318
if not self.sasl_enabled:
319319
return ''
320320

321-
sasl_config = "sasl.enabled.mechanisms={mechanism}\n"
322-
sasl_config += "sasl.mechanism.inter.broker.protocol={mechanism}\n"
321+
sasl_config = (
322+
'sasl.enabled.mechanisms={mechanism}\n'
323+
'sasl.mechanism.inter.broker.protocol={mechanism}\n'
324+
)
323325
return sasl_config.format(mechanism=self.sasl_mechanism)
324326

325327
def _jaas_config(self):
@@ -328,12 +330,12 @@ def _jaas_config(self):
328330

329331
elif self.sasl_mechanism == 'PLAIN':
330332
jaas_config = (
331-
"org.apache.kafka.common.security.plain.PlainLoginModule required\n"
333+
'org.apache.kafka.common.security.plain.PlainLoginModule required\n'
332334
' username="{user}" password="{password}" user_{user}="{password}";\n'
333335
)
334336
elif self.sasl_mechanism in ("SCRAM-SHA-256", "SCRAM-SHA-512"):
335337
jaas_config = (
336-
"org.apache.kafka.common.security.scram.ScramLoginModule required\n"
338+
'org.apache.kafka.common.security.scram.ScramLoginModule required\n'
337339
' username="{user}" password="{password}";\n'
338340
)
339341
else:

0 commit comments

Comments
 (0)