Skip to content

Commit 1388e02

Browse files
dpkp88manpreet
authored andcommitted
Add private map of api key -> min/max versions to BrokerConnection (dpkp#1169)
1 parent 7a149fe commit 1388e02

File tree

2 files changed

+58
-11
lines changed

2 files changed

+58
-11
lines changed

kafka/conn.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from kafka.metrics.stats import Avg, Count, Max, Rate
1818
from kafka.protocol.api import RequestHeader
1919
from kafka.protocol.admin import SaslHandShakeRequest
20-
from kafka.protocol.commit import GroupCoordinatorResponse
20+
from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
2121
from kafka.protocol.metadata import MetadataRequest
2222
from kafka.protocol.types import Int32
2323
from kafka.version import __version__
@@ -192,6 +192,7 @@ def __init__(self, host, port, afi, **configs):
192192
self.port = port
193193
self.afi = afi
194194
self.in_flight_requests = collections.deque()
195+
self._api_versions = None
195196

196197
self.config = copy.copy(self.DEFAULT_CONFIG)
197198
for key in self.config:
@@ -870,23 +871,31 @@ def _next_correlation_id(self):
870871
self._correlation_id = (self._correlation_id + 1) % 2**31
871872
return self._correlation_id
872873

873-
def _check_api_version_response(self, response):
874+
def _handle_api_version_response(self, response):
875+
error_type = Errors.for_code(response.error_code)
876+
assert error_type is Errors.NoError, "API version check failed"
877+
self._api_versions = dict([
878+
(api_key, (min_version, max_version))
879+
for api_key, min_version, max_version in response.api_versions
880+
])
881+
return self._api_versions
882+
883+
def _infer_broker_version_from_api_versions(self, api_versions):
874884
# The logic here is to check the list of supported request versions
875885
# in descending order. As soon as we find one that works, return it
876886
test_cases = [
877887
# format (<broker verion>, <needed struct>)
878-
((0, 10, 1), MetadataRequest[2])
888+
((0, 11, 0), MetadataRequest[4]),
889+
((0, 10, 2), OffsetFetchRequest[2]),
890+
((0, 10, 1), MetadataRequest[2]),
879891
]
880892

881-
error_type = Errors.for_code(response.error_code)
882-
assert error_type is Errors.NoError, "API version check failed"
883-
max_versions = dict([
884-
(api_key, max_version)
885-
for api_key, _, max_version in response.api_versions
886-
])
887893
# Get the best match of test cases
888894
for broker_version, struct in sorted(test_cases, reverse=True):
889-
if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
895+
if struct.API_KEY not in api_versions:
896+
continue
897+
min_version, max_version = api_versions[struct.API_KEY]
898+
if min_version <= struct.API_VERSION <= max_version:
890899
return broker_version
891900

892901
# We know that ApiVersionResponse is only supported in 0.10+
@@ -974,7 +983,8 @@ def connect():
974983
if isinstance(request, ApiVersionRequest[0]):
975984
# Starting from 0.10 kafka broker we determine version
976985
# by looking at ApiVersionResponse
977-
version = self._check_api_version_response(f.value)
986+
api_versions = self._handle_api_version_response(f.value)
987+
version = self._infer_broker_version_from_api_versions(api_versions)
978988
log.info('Broker version identifed as %s', '.'.join(map(str, version)))
979989
log.info('Set configuration api_version=%s to skip auto'
980990
' check_version requests on startup', version)

kafka/protocol/__init__.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,40 @@
66
CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, ALL_CODECS,
77
ATTRIBUTE_CODEC_MASK, KafkaProtocol,
88
)
9+
10+
API_KEYS = {
11+
0: 'Produce',
12+
1: 'Fetch',
13+
2: 'ListOffsets',
14+
3: 'Metadata',
15+
4: 'LeaderAndIsr',
16+
5: 'StopReplica',
17+
6: 'UpdateMetadata',
18+
7: 'ControlledShutdown',
19+
8: 'OffsetCommit',
20+
9: 'OffsetFetch',
21+
10: 'FindCoordinator',
22+
11: 'JoinGroup',
23+
12: 'Heartbeat',
24+
13: 'LeaveGroup',
25+
14: 'SyncGroup',
26+
15: 'DescribeGroups',
27+
16: 'ListGroups',
28+
17: 'SaslHandshake',
29+
18: 'ApiVersions',
30+
19: 'CreateTopics',
31+
20: 'DeleteTopics',
32+
21: 'DeleteRecords',
33+
22: 'InitProducerId',
34+
23: 'OffsetForLeaderEpoch',
35+
24: 'AddPartitionsToTxn',
36+
25: 'AddOffsetsToTxn',
37+
26: 'EndTxn',
38+
27: 'WriteTxnMarkers',
39+
28: 'TxnOffsetCommit',
40+
29: 'DescribeAcls',
41+
30: 'CreateAcls',
42+
31: 'DeleteAcls',
43+
32: 'DescribeConfigs',
44+
33: 'AlterConfigs',
45+
}

0 commit comments

Comments
 (0)