Skip to content

Add private map of api key -> min/max versions to BrokerConnection #1169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Int32
from kafka.version import __version__
Expand Down Expand Up @@ -193,6 +193,7 @@ def __init__(self, host, port, afi, **configs):
self._init_port = port
self._init_afi = afi
self.in_flight_requests = collections.deque()
self._api_versions = None

self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
Expand Down Expand Up @@ -872,23 +873,31 @@ def _next_correlation_id(self):
self._correlation_id = (self._correlation_id + 1) % 2**31
return self._correlation_id

def _check_api_version_response(self, response):
def _handle_api_version_response(self, response):
error_type = Errors.for_code(response.error_code)
assert error_type is Errors.NoError, "API version check failed"
self._api_versions = dict([
(api_key, (min_version, max_version))
for api_key, min_version, max_version in response.api_versions
])
return self._api_versions

def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
# in descending order. As soon as we find one that works, return it
test_cases = [
# format (<broker verion>, <needed struct>)
((0, 10, 1), MetadataRequest[2])
((0, 11, 0), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
((0, 10, 1), MetadataRequest[2]),
]

error_type = Errors.for_code(response.error_code)
assert error_type is Errors.NoError, "API version check failed"
max_versions = dict([
(api_key, max_version)
for api_key, _, max_version in response.api_versions
])
# Get the best match of test cases
for broker_version, struct in sorted(test_cases, reverse=True):
if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
if struct.API_KEY not in api_versions:
continue
min_version, max_version = api_versions[struct.API_KEY]
if min_version <= struct.API_VERSION <= max_version:
return broker_version

# We know that ApiVersionResponse is only supported in 0.10+
Expand Down Expand Up @@ -976,7 +985,8 @@ def connect():
if isinstance(request, ApiVersionRequest[0]):
# Starting from 0.10 kafka broker we determine version
# by looking at ApiVersionResponse
version = self._check_api_version_response(f.value)
api_versions = self._handle_api_version_response(f.value)
version = self._infer_broker_version_from_api_versions(api_versions)
log.info('Broker version identifed as %s', '.'.join(map(str, version)))
log.info('Set configuration api_version=%s to skip auto'
' check_version requests on startup', version)
Expand Down
37 changes: 37 additions & 0 deletions kafka/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,40 @@
CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, ALL_CODECS,
ATTRIBUTE_CODEC_MASK, KafkaProtocol,
)

API_KEYS = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case for this mapping? I could only think of logging...

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I created it primarily just to document the values. But there is some potential to switch the values from strings to the versioned list of class structs we have and use it for decoding api messages. That's obviously not in this PR though.

0: 'Produce',
1: 'Fetch',
2: 'ListOffsets',
3: 'Metadata',
4: 'LeaderAndIsr',
5: 'StopReplica',
6: 'UpdateMetadata',
7: 'ControlledShutdown',
8: 'OffsetCommit',
9: 'OffsetFetch',
10: 'FindCoordinator',
11: 'JoinGroup',
12: 'Heartbeat',
13: 'LeaveGroup',
14: 'SyncGroup',
15: 'DescribeGroups',
16: 'ListGroups',
17: 'SaslHandshake',
18: 'ApiVersions',
19: 'CreateTopics',
20: 'DeleteTopics',
21: 'DeleteRecords',
22: 'InitProducerId',
23: 'OffsetForLeaderEpoch',
24: 'AddPartitionsToTxn',
25: 'AddOffsetsToTxn',
26: 'EndTxn',
27: 'WriteTxnMarkers',
28: 'TxnOffsetCommit',
29: 'DescribeAcls',
30: 'CreateAcls',
31: 'DeleteAcls',
32: 'DescribeConfigs',
33: 'AlterConfigs',
}