Skip to content

Commit 4995e9b

Browse files
authored
KIP-511: Use ApiVersions v4 on initial connect w/ client_software_name + version (#2558)
1 parent e6fa9f3 commit 4995e9b

File tree

5 files changed

+83
-18
lines changed

5 files changed

+83
-18
lines changed

kafka/conn.py

+21-4
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ class BrokerConnection(object):
101101
server-side log entries that correspond to this client. Also
102102
submitted to GroupCoordinator for logging with respect to
103103
consumer group administration. Default: 'kafka-python-{version}'
104+
client_software_name (str): Sent to kafka broker for KIP-511.
105+
Default: 'kafka-python'
106+
client_software_version (str): Sent to kafka broker for KIP-511.
107+
Default: The kafka-python version (via kafka.version).
104108
reconnect_backoff_ms (int): The amount of time in milliseconds to
105109
wait before attempting to reconnect to a given host.
106110
Default: 50.
@@ -191,6 +195,8 @@ class BrokerConnection(object):
191195

192196
DEFAULT_CONFIG = {
193197
'client_id': 'kafka-python-' + __version__,
198+
'client_software_name': 'kafka-python',
199+
'client_software_version': __version__,
194200
'node_id': 0,
195201
'request_timeout_ms': 30000,
196202
'reconnect_backoff_ms': 50,
@@ -242,7 +248,7 @@ def __init__(self, host, port, afi, **configs):
242248
self._api_versions = None
243249
self._api_version = None
244250
self._check_version_idx = None
245-
self._api_versions_idx = 2
251+
self._api_versions_idx = 4 # version of ApiVersionsRequest to try on first connect
246252
self._throttle_time = None
247253
self._socks5_proxy = None
248254

@@ -538,7 +544,14 @@ def _try_api_versions_check(self):
538544
log.debug('%s: Using pre-configured api_version %s for ApiVersions', self, self._api_version)
539545
return True
540546
elif self._check_version_idx is None:
541-
request = ApiVersionsRequest[self._api_versions_idx]()
547+
version = self._api_versions_idx
548+
if version >= 3:
549+
request = ApiVersionsRequest[version](
550+
client_software_name=self.config['client_software_name'],
551+
client_software_version=self.config['client_software_version'],
552+
_tagged_fields={})
553+
else:
554+
request = ApiVersionsRequest[version]()
542555
future = Future()
543556
response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8))
544557
response.add_callback(self._handle_api_versions_response, future)
@@ -573,11 +586,15 @@ def _try_api_versions_check(self):
573586

574587
def _handle_api_versions_response(self, future, response):
575588
error_type = Errors.for_code(response.error_code)
576-
# if error_type i UNSUPPORTED_VERSION: retry w/ latest version from response
577589
if error_type is not Errors.NoError:
578590
future.failure(error_type())
579591
if error_type is Errors.UnsupportedVersionError:
580592
self._api_versions_idx -= 1
593+
for api_key, min_version, max_version, *rest in response.api_versions:
594+
# If broker provides a lower max_version, skip to that
595+
if api_key == response.API_KEY:
596+
self._api_versions_idx = min(self._api_versions_idx, max_version)
597+
break
581598
if self._api_versions_idx >= 0:
582599
self._api_versions_future = None
583600
self.state = ConnectionStates.API_VERSIONS_SEND
@@ -587,7 +604,7 @@ def _handle_api_versions_response(self, future, response):
587604
return
588605
self._api_versions = dict([
589606
(api_key, (min_version, max_version))
590-
for api_key, min_version, max_version in response.api_versions
607+
for api_key, min_version, max_version, *rest in response.api_versions
591608
])
592609
self._api_version = self._infer_broker_version_from_api_versions(self._api_versions)
593610
log.info('Broker version identified as %s', '.'.join(map(str, self._api_version)))

kafka/protocol/admin.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,6 @@ class DescribeConfigsRequest_v2(Request):
737737
class DescribeLogDirsResponse_v0(Response):
738738
API_KEY = 35
739739
API_VERSION = 0
740-
FLEXIBLE_VERSION = True
741740
SCHEMA = Schema(
742741
('throttle_time_ms', Int32),
743742
('log_dirs', Array(
@@ -970,6 +969,7 @@ class AlterPartitionReassignmentsResponse_v0(Response):
970969
)),
971970
("tags", TaggedFields)
972971
)
972+
FLEXIBLE_VERSION = True
973973

974974

975975
class AlterPartitionReassignmentsRequest_v0(Request):
@@ -1017,6 +1017,7 @@ class ListPartitionReassignmentsResponse_v0(Response):
10171017
)),
10181018
("tags", TaggedFields)
10191019
)
1020+
FLEXIBLE_VERSION = True
10201021

10211022

10221023
class ListPartitionReassignmentsRequest_v0(Request):

kafka/protocol/api.py

+8-6
Original file line numberDiff line numberDiff line change
@@ -82,19 +82,15 @@ def expect_response(self):
8282
def to_object(self):
8383
return _to_object(self.SCHEMA, self)
8484

85-
def build_request_header(self, correlation_id, client_id):
85+
def build_header(self, correlation_id, client_id):
8686
if self.FLEXIBLE_VERSION:
8787
return RequestHeaderV2(self, correlation_id=correlation_id, client_id=client_id)
8888
return RequestHeader(self, correlation_id=correlation_id, client_id=client_id)
8989

90-
def parse_response_header(self, read_buffer):
91-
if self.FLEXIBLE_VERSION:
92-
return ResponseHeaderV2.decode(read_buffer)
93-
return ResponseHeader.decode(read_buffer)
94-
9590

9691
@add_metaclass(abc.ABCMeta)
9792
class Response(Struct):
93+
FLEXIBLE_VERSION = False
9894

9995
@abc.abstractproperty
10096
def API_KEY(self):
@@ -114,6 +110,12 @@ def SCHEMA(self):
114110
def to_object(self):
115111
return _to_object(self.SCHEMA, self)
116112

113+
@classmethod
114+
def parse_header(cls, read_buffer):
115+
if cls.FLEXIBLE_VERSION:
116+
return ResponseHeaderV2.decode(read_buffer)
117+
return ResponseHeader.decode(read_buffer)
118+
117119

118120
def _to_object(schema, data):
119121
obj = {}

kafka/protocol/api_versions.py

+45-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from io import BytesIO
44

55
from kafka.protocol.api import Request, Response
6-
from kafka.protocol.types import Array, Int16, Int32, Schema
6+
from kafka.protocol.types import Array, CompactArray, CompactString, Int16, Int32, Schema, TaggedFields
77

88

99
class BaseApiVersionsResponse(Response):
@@ -61,6 +61,28 @@ class ApiVersionsResponse_v2(BaseApiVersionsResponse):
6161
SCHEMA = ApiVersionsResponse_v1.SCHEMA
6262

6363

64+
class ApiVersionsResponse_v3(BaseApiVersionsResponse):
65+
API_KEY = 18
66+
API_VERSION = 3
67+
SCHEMA = Schema(
68+
('error_code', Int16),
69+
('api_versions', CompactArray(
70+
('api_key', Int16),
71+
('min_version', Int16),
72+
('max_version', Int16),
73+
('_tagged_fields', TaggedFields))),
74+
('throttle_time_ms', Int32),
75+
('_tagged_fields', TaggedFields)
76+
)
77+
# Note: ApiVersions Response does not send FLEXIBLE_VERSION header!
78+
79+
80+
class ApiVersionsResponse_v4(BaseApiVersionsResponse):
81+
API_KEY = 18
82+
API_VERSION = 4
83+
SCHEMA = ApiVersionsResponse_v3.SCHEMA
84+
85+
6486
class ApiVersionsRequest_v0(Request):
6587
API_KEY = 18
6688
API_VERSION = 0
@@ -82,9 +104,31 @@ class ApiVersionsRequest_v2(Request):
82104
SCHEMA = ApiVersionsRequest_v1.SCHEMA
83105

84106

107+
class ApiVersionsRequest_v3(Request):
108+
API_KEY = 18
109+
API_VERSION = 3
110+
RESPONSE_TYPE = ApiVersionsResponse_v3
111+
SCHEMA = Schema(
112+
('client_software_name', CompactString('utf-8')),
113+
('client_software_version', CompactString('utf-8')),
114+
('_tagged_fields', TaggedFields)
115+
)
116+
FLEXIBLE_VERSION = True
117+
118+
119+
class ApiVersionsRequest_v4(Request):
120+
API_KEY = 18
121+
API_VERSION = 4
122+
RESPONSE_TYPE = ApiVersionsResponse_v4
123+
SCHEMA = ApiVersionsRequest_v3.SCHEMA
124+
FLEXIBLE_VERSION = True
125+
126+
85127
ApiVersionsRequest = [
86128
ApiVersionsRequest_v0, ApiVersionsRequest_v1, ApiVersionsRequest_v2,
129+
ApiVersionsRequest_v3, ApiVersionsRequest_v4,
87130
]
88131
ApiVersionsResponse = [
89132
ApiVersionsResponse_v0, ApiVersionsResponse_v1, ApiVersionsResponse_v2,
133+
ApiVersionsResponse_v3, ApiVersionsResponse_v4,
90134
]

kafka/protocol/parser.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def send_request(self, request, correlation_id=None):
5959
if correlation_id is None:
6060
correlation_id = self._next_correlation_id()
6161

62-
header = request.build_request_header(correlation_id=correlation_id, client_id=self._client_id)
62+
header = request.build_header(correlation_id=correlation_id, client_id=self._client_id)
6363
message = b''.join([header.encode(), request.encode()])
6464
size = Int32.encode(len(message))
6565
data = size + message
@@ -136,13 +136,14 @@ def _process_response(self, read_buffer):
136136
if not self.in_flight_requests:
137137
raise Errors.CorrelationIdError('No in-flight-request found for server response')
138138
(correlation_id, request) = self.in_flight_requests.popleft()
139-
response_header = request.parse_response_header(read_buffer)
139+
response_type = request.RESPONSE_TYPE
140+
response_header = response_type.parse_header(read_buffer)
140141
recv_correlation_id = response_header.correlation_id
141142
log.debug('Received correlation id: %d', recv_correlation_id)
142143
# 0.8.2 quirk
143144
if (recv_correlation_id == 0 and
144145
correlation_id != 0 and
145-
request.RESPONSE_TYPE is FindCoordinatorResponse[0] and
146+
response_type is FindCoordinatorResponse[0] and
146147
(self._api_version == (0, 8, 2) or self._api_version is None)):
147148
log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse'
148149
' Correlation ID does not match request. This'
@@ -156,15 +157,15 @@ def _process_response(self, read_buffer):
156157
% (correlation_id, recv_correlation_id))
157158

158159
# decode response
159-
log.debug('Processing response %s', request.RESPONSE_TYPE.__name__)
160+
log.debug('Processing response %s', response_type.__name__)
160161
try:
161-
response = request.RESPONSE_TYPE.decode(read_buffer)
162+
response = response_type.decode(read_buffer)
162163
except ValueError:
163164
read_buffer.seek(0)
164165
buf = read_buffer.read()
165166
log.error('Response %d [ResponseType: %s Request: %s]:'
166167
' Unable to decode %d-byte buffer: %r',
167-
correlation_id, request.RESPONSE_TYPE,
168+
correlation_id, response_type,
168169
request, len(buf), buf)
169170
raise Errors.KafkaProtocolError('Unable to decode response')
170171

0 commit comments

Comments
 (0)