Skip to content

Commit b697808

Browse files
authored
Fake api_versions for old brokers, rename to ApiVersionsRequest, and handle error decoding (#2494)
1 parent fe98d6c commit b697808

File tree

12 files changed

+226
-111
lines changed

12 files changed

+226
-111
lines changed

kafka/client_async.py

+24-6
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from kafka.metrics import AnonMeasurable
2626
from kafka.metrics.stats import Avg, Count, Rate
2727
from kafka.metrics.stats.rate import TimeUnit
28+
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
2829
from kafka.protocol.metadata import MetadataRequest
2930
from kafka.util import Dict, WeakMethod
3031
# Although this looks unused, it actually monkey-patches socket.socketpair()
@@ -239,6 +240,25 @@ def __init__(self, **configs):
239240
if self.config['api_version'] is None:
240241
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
241242
self.config['api_version'] = self.check_version(timeout=check_timeout)
243+
elif self.config['api_version'] in BROKER_API_VERSIONS:
244+
self._api_versions = BROKER_API_VERSIONS[self.config['api_version']]
245+
elif (self.config['api_version'] + (0,)) in BROKER_API_VERSIONS:
246+
log.warning('Configured api_version %s is ambiguous; using %s',
247+
self.config['api_version'], self.config['api_version'] + (0,))
248+
self.config['api_version'] = self.config['api_version'] + (0,)
249+
self._api_versions = BROKER_API_VERSIONS[self.config['api_version']]
250+
else:
251+
compatible_version = None
252+
for v in sorted(BROKER_API_VERSIONS.keys(), reverse=True):
253+
if v <= self.config['api_version']:
254+
compatible_version = v
255+
break
256+
if compatible_version:
257+
log.warning('Configured api_version %s not supported; using %s',
258+
self.config['api_version'], compatible_version)
259+
self._api_versions = BROKER_API_VERSIONS[compatible_version]
260+
else:
261+
raise Errors.UnrecognizedBrokerVersion(self.config['api_version'])
242262

243263
def _init_wakeup_socketpair(self):
244264
self._wake_r, self._wake_w = socket.socketpair()
@@ -849,8 +869,8 @@ def _maybe_refresh_metadata(self, wakeup=False):
849869
topics = list(self.config['bootstrap_topics_filter'])
850870

851871
if self.cluster.need_all_topic_metadata or not topics:
852-
topics = [] if self.config['api_version'] < (0, 10) else None
853-
api_version = 0 if self.config['api_version'] < (0, 10) else 1
872+
topics = [] if self.config['api_version'] < (0, 10, 0) else None
873+
api_version = 0 if self.config['api_version'] < (0, 10, 0) else 1
854874
request = MetadataRequest[api_version](topics)
855875
log.debug("Sending metadata request %s to node %s", request, node_id)
856876
future = self.send(node_id, request, wakeup=wakeup)
@@ -898,7 +918,7 @@ def check_version(self, node_id=None, timeout=2, strict=False):
898918
is down and the client enters a bootstrap backoff sleep.
899919
This is only possible if node_id is None.
900920
901-
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
921+
Returns: version tuple, i.e. (3, 9), (2, 0), (0, 10, 2) etc
902922
903923
Raises:
904924
NodeNotReadyError (if node_id is provided)
@@ -925,9 +945,7 @@ def check_version(self, node_id=None, timeout=2, strict=False):
925945
try:
926946
remaining = end - time.time()
927947
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
928-
if version >= (0, 10, 0):
929-
# cache the api versions map if it's available (starting
930-
# in 0.10 cluster version)
948+
if not self._api_versions:
931949
self._api_versions = conn.get_api_versions()
932950
self._lock.release()
933951
return version

kafka/conn.py

+36-31
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ class BrokerConnection(object):
165165
or other configuration forbids use of all the specified ciphers),
166166
an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
167167
api_version (tuple): Specify which Kafka API version to use.
168-
Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9),
169-
(0, 10). Default: (0, 8, 2)
168+
Must be None or >= (0, 10, 0) to enable SASL authentication.
169+
Default: None
170170
api_version_auto_timeout_ms (int): number of milliseconds to throw a
171171
timeout exception from the constructor when checking the broker
172172
api version. Only applies if api_version is None
@@ -214,7 +214,7 @@ class BrokerConnection(object):
214214
'ssl_crlfile': None,
215215
'ssl_password': None,
216216
'ssl_ciphers': None,
217-
'api_version': (0, 8, 2), # default to most restrictive
217+
'api_version': None,
218218
'selector': selectors.DefaultSelector,
219219
'state_change_callback': lambda node_id, sock, conn: True,
220220
'metrics': None,
@@ -522,7 +522,7 @@ def _try_handshake(self):
522522
return False
523523

524524
def _try_authenticate(self):
525-
assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10)
525+
assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10, 0)
526526

527527
if self._sasl_auth_future is None:
528528
# Build a SaslHandShakeRequest message
@@ -1154,9 +1154,10 @@ def next_ifr_request_timeout_ms(self):
11541154
else:
11551155
return float('inf')
11561156

1157-
def _handle_api_version_response(self, response):
1157+
def _handle_api_versions_response(self, response):
11581158
error_type = Errors.for_code(response.error_code)
1159-
assert error_type is Errors.NoError, "API version check failed"
1159+
if error_type is not Errors.NoError:
1160+
return False
11601161
self._api_versions = dict([
11611162
(api_key, (min_version, max_version))
11621163
for api_key, min_version, max_version in response.api_versions
@@ -1168,12 +1169,7 @@ def get_api_versions(self):
11681169
return self._api_versions
11691170

11701171
version = self.check_version()
1171-
if version < (0, 10, 0):
1172-
raise Errors.UnsupportedVersionError(
1173-
"ApiVersion not supported by cluster version {} < 0.10.0"
1174-
.format(version))
1175-
# _api_versions is set as a side effect of check_versions() on a cluster
1176-
# that supports 0.10.0 or later
1172+
# _api_versions is set as a side effect of check_versions()
11771173
return self._api_versions
11781174

11791175
def _infer_broker_version_from_api_versions(self, api_versions):
@@ -1182,16 +1178,16 @@ def _infer_broker_version_from_api_versions(self, api_versions):
11821178
test_cases = [
11831179
# format (<broker version>, <needed struct>)
11841180
# Make sure to update consumer_integration test check when adding newer versions.
1185-
((2, 6, 0), DescribeClientQuotasRequest[0]),
1186-
((2, 5, 0), DescribeAclsRequest_v2),
1187-
((2, 4, 0), ProduceRequest[8]),
1188-
((2, 3, 0), FetchRequest[11]),
1189-
((2, 2, 0), OffsetRequest[5]),
1190-
((2, 1, 0), FetchRequest[10]),
1191-
((2, 0, 0), FetchRequest[8]),
1192-
((1, 1, 0), FetchRequest[7]),
1193-
((1, 0, 0), MetadataRequest[5]),
1194-
((0, 11, 0), MetadataRequest[4]),
1181+
((2, 6), DescribeClientQuotasRequest[0]),
1182+
((2, 5), DescribeAclsRequest_v2),
1183+
((2, 4), ProduceRequest[8]),
1184+
((2, 3), FetchRequest[11]),
1185+
((2, 2), OffsetRequest[5]),
1186+
((2, 1), FetchRequest[10]),
1187+
((2, 0), FetchRequest[8]),
1188+
((1, 1), FetchRequest[7]),
1189+
((1, 0), MetadataRequest[5]),
1190+
((0, 11), MetadataRequest[4]),
11951191
((0, 10, 2), OffsetFetchRequest[2]),
11961192
((0, 10, 1), MetadataRequest[2]),
11971193
]
@@ -1204,7 +1200,7 @@ def _infer_broker_version_from_api_versions(self, api_versions):
12041200
if min_version <= struct.API_VERSION <= max_version:
12051201
return broker_version
12061202

1207-
# We know that ApiVersionResponse is only supported in 0.10+
1203+
# We know that ApiVersionsResponse is only supported in 0.10+
12081204
# so if all else fails, choose that
12091205
return (0, 10, 0)
12101206

@@ -1213,7 +1209,7 @@ def check_version(self, timeout=2, strict=False, topics=[]):
12131209
12141210
Note: This is a blocking call.
12151211
1216-
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
1212+
Returns: version tuple, i.e. (3, 9), (2, 4), etc ...
12171213
"""
12181214
timeout_at = time.time() + timeout
12191215
log.info('Probing node %s broker version', self.node_id)
@@ -1236,12 +1232,15 @@ def reset_override_configs():
12361232
# vanilla MetadataRequest. If the server did not recognize the first
12371233
# request, both will be failed with a ConnectionError that wraps
12381234
# socket.error (32, 54, or 104)
1239-
from kafka.protocol.admin import ApiVersionRequest, ListGroupsRequest
1235+
from kafka.protocol.admin import ListGroupsRequest
1236+
from kafka.protocol.api_versions import ApiVersionsRequest
1237+
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
12401238
from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
12411239

12421240
test_cases = [
1243-
# All cases starting from 0.10 will be based on ApiVersionResponse
1244-
((0, 10), ApiVersionRequest[0]()),
1241+
# All cases starting from 0.10 will be based on ApiVersionsResponse
1242+
((0, 11), ApiVersionsRequest[1]()),
1243+
((0, 10, 0), ApiVersionsRequest[0]()),
12451244
((0, 9), ListGroupsRequest[0]()),
12461245
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
12471246
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
@@ -1274,11 +1273,17 @@ def reset_override_configs():
12741273
selector.close()
12751274

12761275
if f.succeeded():
1277-
if isinstance(request, ApiVersionRequest[0]):
1276+
if version >= (0, 10, 0):
12781277
# Starting from 0.10 kafka broker we determine version
1279-
# by looking at ApiVersionResponse
1280-
api_versions = self._handle_api_version_response(f.value)
1278+
# by looking at ApiVersionsResponse
1279+
api_versions = self._handle_api_versions_response(f.value)
1280+
if not api_versions:
1281+
continue
12811282
version = self._infer_broker_version_from_api_versions(api_versions)
1283+
else:
1284+
if version not in BROKER_API_VERSIONS:
1285+
raise Errors.UnrecognizedBrokerVersion(version)
1286+
self._api_versions = BROKER_API_VERSIONS[version]
12821287
log.info('Broker version identified as %s', '.'.join(map(str, version)))
12831288
log.info('Set configuration api_version=%s to skip auto'
12841289
' check_version requests on startup', version)
@@ -1298,7 +1303,7 @@ def reset_override_configs():
12981303
# requests (bug...). In this case we expect to see a correlation
12991304
# id mismatch
13001305
elif (isinstance(f.exception, Errors.CorrelationIdError) and
1301-
version == (0, 10)):
1306+
version > (0, 9)):
13021307
pass
13031308
elif six.PY2:
13041309
assert isinstance(f.exception.args[0], socket.error)

kafka/consumer/fetcher.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -702,11 +702,11 @@ def _create_fetch_requests(self):
702702
log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s",
703703
partition, node_id)
704704

705-
if self.config['api_version'] >= (0, 11, 0):
705+
if self.config['api_version'] >= (0, 11):
706706
version = 4
707707
elif self.config['api_version'] >= (0, 10, 1):
708708
version = 3
709-
elif self.config['api_version'] >= (0, 10):
709+
elif self.config['api_version'] >= (0, 10, 0):
710710
version = 2
711711
elif self.config['api_version'] == (0, 9):
712712
version = 1

kafka/coordinator/base.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ def _send_join_group_request(self):
461461
self._generation.member_id,
462462
self.protocol_type(),
463463
member_metadata)
464-
elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0):
464+
elif (0, 10, 1) <= self.config['api_version'] < (0, 11):
465465
request = JoinGroupRequest[1](
466466
self.group_id,
467467
self.config['session_timeout_ms'],
@@ -562,7 +562,7 @@ def _handle_join_group_response(self, future, send_time, response):
562562

563563
def _on_join_follower(self):
564564
# send follower's sync group with an empty assignment
565-
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
565+
version = 0 if self.config['api_version'] < (0, 11) else 1
566566
request = SyncGroupRequest[version](
567567
self.group_id,
568568
self._generation.generation_id,
@@ -590,7 +590,7 @@ def _on_join_leader(self, response):
590590
except Exception as e:
591591
return Future().failure(e)
592592

593-
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
593+
version = 0 if self.config['api_version'] < (0, 11) else 1
594594
request = SyncGroupRequest[version](
595595
self.group_id,
596596
self._generation.generation_id,
@@ -771,7 +771,7 @@ def maybe_leave_group(self):
771771
# this is a minimal effort attempt to leave the group. we do not
772772
# attempt any resending if the request fails or times out.
773773
log.info('Leaving consumer group (%s).', self.group_id)
774-
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
774+
version = 0 if self.config['api_version'] < (0, 11) else 1
775775
request = LeaveGroupRequest[version](self.group_id, self._generation.member_id)
776776
future = self._client.send(self.coordinator_id, request)
777777
future.add_callback(self._handle_leave_group_response)
@@ -799,7 +799,7 @@ def _send_heartbeat_request(self):
799799
e = Errors.NodeNotReadyError(self.coordinator_id)
800800
return Future().failure(e)
801801

802-
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
802+
version = 0 if self.config['api_version'] < (0, 11) else 1
803803
request = HeartbeatRequest[version](self.group_id,
804804
self._generation.generation_id,
805805
self._generation.member_id)

kafka/producer/kafka.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ def __init__(self, **configs):
393393
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
394394

395395
if self.config['compression_type'] == 'zstd':
396-
assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers'
396+
assert self.config['api_version'] >= (2, 1), 'Zstd Requires >= Kafka 2.1 Brokers'
397397

398398
# Check compression_type for library support
399399
ct = self.config['compression_type']
@@ -524,7 +524,7 @@ def partitions_for(self, topic):
524524
def _max_usable_produce_magic(self):
525525
if self.config['api_version'] >= (0, 11):
526526
return 2
527-
elif self.config['api_version'] >= (0, 10):
527+
elif self.config['api_version'] >= (0, 10, 0):
528528
return 1
529529
else:
530530
return 0

kafka/producer/sender.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ def _produce_request(self, node_id, acks, timeout, batches):
313313
elif self.config['api_version'] >= (0, 11):
314314
version = 3
315315
kwargs = dict(transactional_id=None)
316-
elif self.config['api_version'] >= (0, 10):
316+
elif self.config['api_version'] >= (0, 10, 0):
317317
version = 2
318318
elif self.config['api_version'] == (0, 9):
319319
version = 1

kafka/protocol/admin.py

-60
Original file line numberDiff line numberDiff line change
@@ -4,66 +4,6 @@
44
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields
55

66

7-
class ApiVersionResponse_v0(Response):
8-
API_KEY = 18
9-
API_VERSION = 0
10-
SCHEMA = Schema(
11-
('error_code', Int16),
12-
('api_versions', Array(
13-
('api_key', Int16),
14-
('min_version', Int16),
15-
('max_version', Int16)))
16-
)
17-
18-
19-
class ApiVersionResponse_v1(Response):
20-
API_KEY = 18
21-
API_VERSION = 1
22-
SCHEMA = Schema(
23-
('error_code', Int16),
24-
('api_versions', Array(
25-
('api_key', Int16),
26-
('min_version', Int16),
27-
('max_version', Int16))),
28-
('throttle_time_ms', Int32)
29-
)
30-
31-
32-
class ApiVersionResponse_v2(Response):
33-
API_KEY = 18
34-
API_VERSION = 2
35-
SCHEMA = ApiVersionResponse_v1.SCHEMA
36-
37-
38-
class ApiVersionRequest_v0(Request):
39-
API_KEY = 18
40-
API_VERSION = 0
41-
RESPONSE_TYPE = ApiVersionResponse_v0
42-
SCHEMA = Schema()
43-
44-
45-
class ApiVersionRequest_v1(Request):
46-
API_KEY = 18
47-
API_VERSION = 1
48-
RESPONSE_TYPE = ApiVersionResponse_v1
49-
SCHEMA = ApiVersionRequest_v0.SCHEMA
50-
51-
52-
class ApiVersionRequest_v2(Request):
53-
API_KEY = 18
54-
API_VERSION = 2
55-
RESPONSE_TYPE = ApiVersionResponse_v1
56-
SCHEMA = ApiVersionRequest_v0.SCHEMA
57-
58-
59-
ApiVersionRequest = [
60-
ApiVersionRequest_v0, ApiVersionRequest_v1, ApiVersionRequest_v2,
61-
]
62-
ApiVersionResponse = [
63-
ApiVersionResponse_v0, ApiVersionResponse_v1, ApiVersionResponse_v2,
64-
]
65-
66-
677
class CreateTopicsResponse_v0(Response):
688
API_KEY = 19
699
API_VERSION = 0

0 commit comments

Comments
 (0)