Skip to content

Commit da25df6

Browse files
authored
Add private map of api key -> min/max versions to BrokerConnection (#1169)
1 parent 6b97eae commit da25df6

File tree

2 files changed

+58
-11
lines changed

2 files changed

+58
-11
lines changed

kafka/conn.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from kafka.metrics.stats import Avg, Count, Max, Rate
1818
from kafka.protocol.api import RequestHeader
1919
from kafka.protocol.admin import SaslHandShakeRequest
20-
from kafka.protocol.commit import GroupCoordinatorResponse
20+
from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
2121
from kafka.protocol.metadata import MetadataRequest
2222
from kafka.protocol.types import Int32
2323
from kafka.version import __version__
@@ -195,6 +195,7 @@ def __init__(self, host, port, afi, **configs):
195195
self._init_port = port
196196
self._init_afi = afi
197197
self.in_flight_requests = collections.deque()
198+
self._api_versions = None
198199

199200
self.config = copy.copy(self.DEFAULT_CONFIG)
200201
for key in self.config:
@@ -874,23 +875,31 @@ def _next_correlation_id(self):
874875
self._correlation_id = (self._correlation_id + 1) % 2**31
875876
return self._correlation_id
876877

877-
def _check_api_version_response(self, response):
878+
def _handle_api_version_response(self, response):
879+
error_type = Errors.for_code(response.error_code)
880+
assert error_type is Errors.NoError, "API version check failed"
881+
self._api_versions = dict([
882+
(api_key, (min_version, max_version))
883+
for api_key, min_version, max_version in response.api_versions
884+
])
885+
return self._api_versions
886+
887+
def _infer_broker_version_from_api_versions(self, api_versions):
878888
# The logic here is to check the list of supported request versions
879889
# in descending order. As soon as we find one that works, return it
880890
test_cases = [
881891
# format (<broker verion>, <needed struct>)
882-
((0, 10, 1), MetadataRequest[2])
892+
((0, 11, 0), MetadataRequest[4]),
893+
((0, 10, 2), OffsetFetchRequest[2]),
894+
((0, 10, 1), MetadataRequest[2]),
883895
]
884896

885-
error_type = Errors.for_code(response.error_code)
886-
assert error_type is Errors.NoError, "API version check failed"
887-
max_versions = dict([
888-
(api_key, max_version)
889-
for api_key, _, max_version in response.api_versions
890-
])
891897
# Get the best match of test cases
892898
for broker_version, struct in sorted(test_cases, reverse=True):
893-
if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
899+
if struct.API_KEY not in api_versions:
900+
continue
901+
min_version, max_version = api_versions[struct.API_KEY]
902+
if min_version <= struct.API_VERSION <= max_version:
894903
return broker_version
895904

896905
# We know that ApiVersionResponse is only supported in 0.10+
@@ -978,7 +987,8 @@ def connect():
978987
if isinstance(request, ApiVersionRequest[0]):
979988
# Starting from 0.10 kafka broker we determine version
980989
# by looking at ApiVersionResponse
981-
version = self._check_api_version_response(f.value)
990+
api_versions = self._handle_api_version_response(f.value)
991+
version = self._infer_broker_version_from_api_versions(api_versions)
982992
log.info('Broker version identifed as %s', '.'.join(map(str, version)))
983993
log.info('Set configuration api_version=%s to skip auto'
984994
' check_version requests on startup', version)

kafka/protocol/__init__.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,40 @@
66
CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, ALL_CODECS,
77
ATTRIBUTE_CODEC_MASK, KafkaProtocol,
88
)
9+
10+
API_KEYS = {
11+
0: 'Produce',
12+
1: 'Fetch',
13+
2: 'ListOffsets',
14+
3: 'Metadata',
15+
4: 'LeaderAndIsr',
16+
5: 'StopReplica',
17+
6: 'UpdateMetadata',
18+
7: 'ControlledShutdown',
19+
8: 'OffsetCommit',
20+
9: 'OffsetFetch',
21+
10: 'FindCoordinator',
22+
11: 'JoinGroup',
23+
12: 'Heartbeat',
24+
13: 'LeaveGroup',
25+
14: 'SyncGroup',
26+
15: 'DescribeGroups',
27+
16: 'ListGroups',
28+
17: 'SaslHandshake',
29+
18: 'ApiVersions',
30+
19: 'CreateTopics',
31+
20: 'DeleteTopics',
32+
21: 'DeleteRecords',
33+
22: 'InitProducerId',
34+
23: 'OffsetForLeaderEpoch',
35+
24: 'AddPartitionsToTxn',
36+
25: 'AddOffsetsToTxn',
37+
26: 'EndTxn',
38+
27: 'WriteTxnMarkers',
39+
28: 'TxnOffsetCommit',
40+
29: 'DescribeAcls',
41+
30: 'CreateAcls',
42+
31: 'DeleteAcls',
43+
32: 'DescribeConfigs',
44+
33: 'AlterConfigs',
45+
}

0 commit comments

Comments
 (0)