Skip to content

Fake api_versions for old brokers, rename to ApiVersionsRequest, and handle error decoding #2494

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from kafka.metrics import AnonMeasurable
from kafka.metrics.stats import Avg, Count, Rate
from kafka.metrics.stats.rate import TimeUnit
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
from kafka.protocol.metadata import MetadataRequest
from kafka.util import Dict, WeakMethod
# Although this looks unused, it actually monkey-patches socket.socketpair()
Expand Down Expand Up @@ -239,6 +240,25 @@ def __init__(self, **configs):
if self.config['api_version'] is None:
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
self.config['api_version'] = self.check_version(timeout=check_timeout)
elif self.config['api_version'] in BROKER_API_VERSIONS:
self._api_versions = BROKER_API_VERSIONS[self.config['api_version']]
elif (self.config['api_version'] + (0,)) in BROKER_API_VERSIONS:
log.warning('Configured api_version %s is ambiguous; using %s',
self.config['api_version'], self.config['api_version'] + (0,))
self.config['api_version'] = self.config['api_version'] + (0,)
self._api_versions = BROKER_API_VERSIONS[self.config['api_version']]
else:
compatible_version = None
for v in sorted(BROKER_API_VERSIONS.keys(), reverse=True):
if v <= self.config['api_version']:
compatible_version = v
break
if compatible_version:
log.warning('Configured api_version %s not supported; using %s',
self.config['api_version'], compatible_version)
self._api_versions = BROKER_API_VERSIONS[compatible_version]
else:
raise Errors.UnrecognizedBrokerVersion(self.config['api_version'])

def _init_wakeup_socketpair(self):
self._wake_r, self._wake_w = socket.socketpair()
Expand Down Expand Up @@ -849,8 +869,8 @@ def _maybe_refresh_metadata(self, wakeup=False):
topics = list(self.config['bootstrap_topics_filter'])

if self.cluster.need_all_topic_metadata or not topics:
topics = [] if self.config['api_version'] < (0, 10) else None
api_version = 0 if self.config['api_version'] < (0, 10) else 1
topics = [] if self.config['api_version'] < (0, 10, 0) else None
api_version = 0 if self.config['api_version'] < (0, 10, 0) else 1
request = MetadataRequest[api_version](topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request, wakeup=wakeup)
Expand Down Expand Up @@ -898,7 +918,7 @@ def check_version(self, node_id=None, timeout=2, strict=False):
is down and the client enters a bootstrap backoff sleep.
This is only possible if node_id is None.

Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
Returns: version tuple, i.e. (3, 9), (2, 0), (0, 10, 2) etc

Raises:
NodeNotReadyError (if node_id is provided)
Expand All @@ -925,9 +945,7 @@ def check_version(self, node_id=None, timeout=2, strict=False):
try:
remaining = end - time.time()
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
if version >= (0, 10, 0):
# cache the api versions map if it's available (starting
# in 0.10 cluster version)
if not self._api_versions:
self._api_versions = conn.get_api_versions()
self._lock.release()
return version
Expand Down
67 changes: 36 additions & 31 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ class BrokerConnection(object):
or other configuration forbids use of all the specified ciphers),
an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
api_version (tuple): Specify which Kafka API version to use.
Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9),
(0, 10). Default: (0, 8, 2)
Must be None or >= (0, 10, 0) to enable SASL authentication.
Default: None
api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker
api version. Only applies if api_version is None
Expand Down Expand Up @@ -214,7 +214,7 @@ class BrokerConnection(object):
'ssl_crlfile': None,
'ssl_password': None,
'ssl_ciphers': None,
'api_version': (0, 8, 2), # default to most restrictive
'api_version': None,
'selector': selectors.DefaultSelector,
'state_change_callback': lambda node_id, sock, conn: True,
'metrics': None,
Expand Down Expand Up @@ -522,7 +522,7 @@ def _try_handshake(self):
return False

def _try_authenticate(self):
assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10)
assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10, 0)

if self._sasl_auth_future is None:
# Build a SaslHandShakeRequest message
Expand Down Expand Up @@ -1154,9 +1154,10 @@ def next_ifr_request_timeout_ms(self):
else:
return float('inf')

def _handle_api_version_response(self, response):
def _handle_api_versions_response(self, response):
error_type = Errors.for_code(response.error_code)
assert error_type is Errors.NoError, "API version check failed"
if error_type is not Errors.NoError:
return False
self._api_versions = dict([
(api_key, (min_version, max_version))
for api_key, min_version, max_version in response.api_versions
Expand All @@ -1168,12 +1169,7 @@ def get_api_versions(self):
return self._api_versions

version = self.check_version()
if version < (0, 10, 0):
raise Errors.UnsupportedVersionError(
"ApiVersion not supported by cluster version {} < 0.10.0"
.format(version))
# _api_versions is set as a side effect of check_versions() on a cluster
# that supports 0.10.0 or later
# _api_versions is set as a side effect of check_versions()
return self._api_versions

def _infer_broker_version_from_api_versions(self, api_versions):
Expand All @@ -1182,16 +1178,16 @@ def _infer_broker_version_from_api_versions(self, api_versions):
test_cases = [
# format (<broker version>, <needed struct>)
# Make sure to update consumer_integration test check when adding newer versions.
((2, 6, 0), DescribeClientQuotasRequest[0]),
((2, 5, 0), DescribeAclsRequest_v2),
((2, 4, 0), ProduceRequest[8]),
((2, 3, 0), FetchRequest[11]),
((2, 2, 0), OffsetRequest[5]),
((2, 1, 0), FetchRequest[10]),
((2, 0, 0), FetchRequest[8]),
((1, 1, 0), FetchRequest[7]),
((1, 0, 0), MetadataRequest[5]),
((0, 11, 0), MetadataRequest[4]),
((2, 6), DescribeClientQuotasRequest[0]),
((2, 5), DescribeAclsRequest_v2),
((2, 4), ProduceRequest[8]),
((2, 3), FetchRequest[11]),
((2, 2), OffsetRequest[5]),
((2, 1), FetchRequest[10]),
((2, 0), FetchRequest[8]),
((1, 1), FetchRequest[7]),
((1, 0), MetadataRequest[5]),
((0, 11), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
((0, 10, 1), MetadataRequest[2]),
]
Expand All @@ -1204,7 +1200,7 @@ def _infer_broker_version_from_api_versions(self, api_versions):
if min_version <= struct.API_VERSION <= max_version:
return broker_version

# We know that ApiVersionResponse is only supported in 0.10+
# We know that ApiVersionsResponse is only supported in 0.10+
# so if all else fails, choose that
return (0, 10, 0)

Expand All @@ -1213,7 +1209,7 @@ def check_version(self, timeout=2, strict=False, topics=[]):

Note: This is a blocking call.

Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
Returns: version tuple, i.e. (3, 9), (2, 4), etc ...
"""
timeout_at = time.time() + timeout
log.info('Probing node %s broker version', self.node_id)
Expand All @@ -1236,12 +1232,15 @@ def reset_override_configs():
# vanilla MetadataRequest. If the server did not recognize the first
# request, both will be failed with a ConnectionError that wraps
# socket.error (32, 54, or 104)
from kafka.protocol.admin import ApiVersionRequest, ListGroupsRequest
from kafka.protocol.admin import ListGroupsRequest
from kafka.protocol.api_versions import ApiVersionsRequest
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest

test_cases = [
# All cases starting from 0.10 will be based on ApiVersionResponse
((0, 10), ApiVersionRequest[0]()),
# All cases starting from 0.10 will be based on ApiVersionsResponse
((0, 11), ApiVersionsRequest[1]()),
((0, 10, 0), ApiVersionsRequest[0]()),
((0, 9), ListGroupsRequest[0]()),
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
Expand Down Expand Up @@ -1274,11 +1273,17 @@ def reset_override_configs():
selector.close()

if f.succeeded():
if isinstance(request, ApiVersionRequest[0]):
if version >= (0, 10, 0):
# Starting from 0.10 kafka broker we determine version
# by looking at ApiVersionResponse
api_versions = self._handle_api_version_response(f.value)
# by looking at ApiVersionsResponse
api_versions = self._handle_api_versions_response(f.value)
if not api_versions:
continue
version = self._infer_broker_version_from_api_versions(api_versions)
else:
if version not in BROKER_API_VERSIONS:
raise Errors.UnrecognizedBrokerVersion(version)
self._api_versions = BROKER_API_VERSIONS[version]
log.info('Broker version identified as %s', '.'.join(map(str, version)))
log.info('Set configuration api_version=%s to skip auto'
' check_version requests on startup', version)
Expand All @@ -1298,7 +1303,7 @@ def reset_override_configs():
# requests (bug...). In this case we expect to see a correlation
# id mismatch
elif (isinstance(f.exception, Errors.CorrelationIdError) and
version == (0, 10)):
version > (0, 9)):
pass
elif six.PY2:
assert isinstance(f.exception.args[0], socket.error)
Expand Down
4 changes: 2 additions & 2 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,11 +702,11 @@ def _create_fetch_requests(self):
log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s",
partition, node_id)

if self.config['api_version'] >= (0, 11, 0):
if self.config['api_version'] >= (0, 11):
version = 4
elif self.config['api_version'] >= (0, 10, 1):
version = 3
elif self.config['api_version'] >= (0, 10):
elif self.config['api_version'] >= (0, 10, 0):
version = 2
elif self.config['api_version'] == (0, 9):
version = 1
Expand Down
10 changes: 5 additions & 5 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ def _send_join_group_request(self):
self._generation.member_id,
self.protocol_type(),
member_metadata)
elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0):
elif (0, 10, 1) <= self.config['api_version'] < (0, 11):
request = JoinGroupRequest[1](
self.group_id,
self.config['session_timeout_ms'],
Expand Down Expand Up @@ -562,7 +562,7 @@ def _handle_join_group_response(self, future, send_time, response):

def _on_join_follower(self):
# send follower's sync group with an empty assignment
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
version = 0 if self.config['api_version'] < (0, 11) else 1
request = SyncGroupRequest[version](
self.group_id,
self._generation.generation_id,
Expand Down Expand Up @@ -590,7 +590,7 @@ def _on_join_leader(self, response):
except Exception as e:
return Future().failure(e)

version = 0 if self.config['api_version'] < (0, 11, 0) else 1
version = 0 if self.config['api_version'] < (0, 11) else 1
request = SyncGroupRequest[version](
self.group_id,
self._generation.generation_id,
Expand Down Expand Up @@ -771,7 +771,7 @@ def maybe_leave_group(self):
# this is a minimal effort attempt to leave the group. we do not
# attempt any resending if the request fails or times out.
log.info('Leaving consumer group (%s).', self.group_id)
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
version = 0 if self.config['api_version'] < (0, 11) else 1
request = LeaveGroupRequest[version](self.group_id, self._generation.member_id)
future = self._client.send(self.coordinator_id, request)
future.add_callback(self._handle_leave_group_response)
Expand Down Expand Up @@ -799,7 +799,7 @@ def _send_heartbeat_request(self):
e = Errors.NodeNotReadyError(self.coordinator_id)
return Future().failure(e)

version = 0 if self.config['api_version'] < (0, 11, 0) else 1
version = 0 if self.config['api_version'] < (0, 11) else 1
request = HeartbeatRequest[version](self.group_id,
self._generation.generation_id,
self._generation.member_id)
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ def __init__(self, **configs):
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'

if self.config['compression_type'] == 'zstd':
assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers'
assert self.config['api_version'] >= (2, 1), 'Zstd Requires >= Kafka 2.1 Brokers'

# Check compression_type for library support
ct = self.config['compression_type']
Expand Down Expand Up @@ -524,7 +524,7 @@ def partitions_for(self, topic):
def _max_usable_produce_magic(self):
if self.config['api_version'] >= (0, 11):
return 2
elif self.config['api_version'] >= (0, 10):
elif self.config['api_version'] >= (0, 10, 0):
return 1
else:
return 0
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def _produce_request(self, node_id, acks, timeout, batches):
elif self.config['api_version'] >= (0, 11):
version = 3
kwargs = dict(transactional_id=None)
elif self.config['api_version'] >= (0, 10):
elif self.config['api_version'] >= (0, 10, 0):
version = 2
elif self.config['api_version'] == (0, 9):
version = 1
Expand Down
60 changes: 0 additions & 60 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,66 +4,6 @@
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields


class ApiVersionResponse_v0(Response):
API_KEY = 18
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('api_versions', Array(
('api_key', Int16),
('min_version', Int16),
('max_version', Int16)))
)


class ApiVersionResponse_v1(Response):
API_KEY = 18
API_VERSION = 1
SCHEMA = Schema(
('error_code', Int16),
('api_versions', Array(
('api_key', Int16),
('min_version', Int16),
('max_version', Int16))),
('throttle_time_ms', Int32)
)


class ApiVersionResponse_v2(Response):
API_KEY = 18
API_VERSION = 2
SCHEMA = ApiVersionResponse_v1.SCHEMA


class ApiVersionRequest_v0(Request):
API_KEY = 18
API_VERSION = 0
RESPONSE_TYPE = ApiVersionResponse_v0
SCHEMA = Schema()


class ApiVersionRequest_v1(Request):
API_KEY = 18
API_VERSION = 1
RESPONSE_TYPE = ApiVersionResponse_v1
SCHEMA = ApiVersionRequest_v0.SCHEMA


class ApiVersionRequest_v2(Request):
API_KEY = 18
API_VERSION = 2
RESPONSE_TYPE = ApiVersionResponse_v1
SCHEMA = ApiVersionRequest_v0.SCHEMA


ApiVersionRequest = [
ApiVersionRequest_v0, ApiVersionRequest_v1, ApiVersionRequest_v2,
]
ApiVersionResponse = [
ApiVersionResponse_v0, ApiVersionResponse_v1, ApiVersionResponse_v2,
]


class CreateTopicsResponse_v0(Response):
API_KEY = 19
API_VERSION = 0
Expand Down
Loading