Skip to content

Commit 62895a8

Browse files
authored
Support FindCoordinatorRequest v2 in consumer and admin client (#2502)
1 parent 1886cac commit 62895a8

File tree

8 files changed

+99
-79
lines changed

8 files changed

+99
-79
lines changed

kafka/admin/client.py

+14-21
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
2424
DeleteGroupsRequest, DescribeLogDirsRequest
2525
)
26-
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
26+
from kafka.protocol.commit import OffsetFetchRequest
27+
from kafka.protocol.find_coordinator import FindCoordinatorRequest
2728
from kafka.protocol.metadata import MetadataRequest
2829
from kafka.protocol.types import Array
2930
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
@@ -285,17 +286,14 @@ def _find_coordinator_id_send_request(self, group_id):
285286
Returns:
286287
A message future
287288
"""
288-
# TODO add support for dynamically picking version of
289-
# GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
290-
# When I experimented with this, the coordinator value returned in
291-
# GroupCoordinatorResponse_v1 didn't match the value returned by
292-
# GroupCoordinatorResponse_v0 and I couldn't figure out why.
293-
version = self._client.api_version(GroupCoordinatorRequest, max_version=0)
289+
version = self._client.api_version(FindCoordinatorRequest, max_version=2)
294290
if version <= 0:
295-
request = GroupCoordinatorRequest[version](group_id)
291+
request = FindCoordinatorRequest[version](group_id)
292+
elif version <= 2:
293+
request = FindCoordinatorRequest[version](group_id, 0)
296294
else:
297295
raise NotImplementedError(
298-
"Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
296+
"Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
299297
.format(version))
300298
return self._send_request_to_node(self._client.least_loaded_node(), request)
301299

@@ -308,18 +306,13 @@ def _find_coordinator_id_process_response(self, response):
308306
Returns:
309307
The node_id of the broker that is the coordinator.
310308
"""
311-
if response.API_VERSION <= 0:
312-
error_type = Errors.for_code(response.error_code)
313-
if error_type is not Errors.NoError:
314-
# Note: When error_type.retriable, Java will retry... see
315-
# KafkaAdminClient's handleFindCoordinatorError method
316-
raise error_type(
317-
"FindCoordinatorRequest failed with response '{}'."
318-
.format(response))
319-
else:
320-
raise NotImplementedError(
321-
"Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
322-
.format(response.API_VERSION))
309+
error_type = Errors.for_code(response.error_code)
310+
if error_type is not Errors.NoError:
311+
# Note: When error_type.retriable, Java will retry... see
312+
# KafkaAdminClient's handleFindCoordinatorError method
313+
raise error_type(
314+
"FindCoordinatorRequest failed with response '{}'."
315+
.format(response))
323316
return response.coordinator_id
324317

325318
def _find_coordinator_ids(self, group_ids):

kafka/cluster.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class ClusterMetadata(object):
2121
A class to manage kafka cluster metadata.
2222
2323
This class does not perform any IO. It simply updates internal state
24-
given API responses (MetadataResponse, GroupCoordinatorResponse).
24+
given API responses (MetadataResponse, FindCoordinatorResponse).
2525
2626
Keyword Arguments:
2727
retry_backoff_ms (int): Milliseconds to backoff when retrying on
@@ -367,16 +367,16 @@ def add_group_coordinator(self, group, response):
367367
"""Update with metadata for a group coordinator
368368
369369
Arguments:
370-
group (str): name of group from GroupCoordinatorRequest
371-
response (GroupCoordinatorResponse): broker response
370+
group (str): name of group from FindCoordinatorRequest
371+
response (FindCoordinatorResponse): broker response
372372
373373
Returns:
374374
string: coordinator node_id if metadata is updated, None on error
375375
"""
376376
log.debug("Updating coordinator for %s: %s", group, response)
377377
error_type = Errors.for_code(response.error_code)
378378
if error_type is not Errors.NoError:
379-
log.error("GroupCoordinatorResponse error: %s", error_type)
379+
log.error("FindCoordinatorResponse error: %s", error_type)
380380
self._groups[group] = -1
381381
return
382382

kafka/conn.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1255,14 +1255,15 @@ def reset_override_configs():
12551255
from kafka.protocol.admin import ListGroupsRequest
12561256
from kafka.protocol.api_versions import ApiVersionsRequest
12571257
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
1258-
from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
1258+
from kafka.protocol.commit import OffsetFetchRequest
1259+
from kafka.protocol.find_coordinator import FindCoordinatorRequest
12591260

12601261
test_cases = [
12611262
# All cases starting from 0.10 will be based on ApiVersionsResponse
12621263
((0, 11), ApiVersionsRequest[1]()),
12631264
((0, 10, 0), ApiVersionsRequest[0]()),
12641265
((0, 9), ListGroupsRequest[0]()),
1265-
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
1266+
((0, 8, 2), FindCoordinatorRequest[0]('kafka-python-default-group')),
12661267
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
12671268
((0, 8, 0), MetadataRequest[0](topics)),
12681269
]

kafka/coordinator/base.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
from kafka.future import Future
1515
from kafka.metrics import AnonMeasurable
1616
from kafka.metrics.stats import Avg, Count, Max, Rate
17-
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
17+
from kafka.protocol.commit import OffsetCommitRequest
18+
from kafka.protocol.find_coordinator import FindCoordinatorRequest
1819
from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest,
1920
LeaveGroupRequest, SyncGroupRequest)
2021

@@ -660,14 +661,21 @@ def _send_group_coordinator_request(self):
660661

661662
log.debug("Sending group coordinator request for group %s to broker %s",
662663
self.group_id, node_id)
663-
request = GroupCoordinatorRequest[0](self.group_id)
664+
version = self._client.api_version(FindCoordinatorRequest, max_version=2)
665+
if version == 0:
666+
request = FindCoordinatorRequest[version](self.group_id)
667+
else:
668+
request = FindCoordinatorRequest[version](self.group_id, 0)
664669
future = Future()
665670
_f = self._client.send(node_id, request)
666671
_f.add_callback(self._handle_group_coordinator_response, future)
667672
_f.add_errback(self._failed_request, node_id, request, future)
668673
return future
669674

670675
def _handle_group_coordinator_response(self, future, response):
676+
if response.API_VERSION >= 1 and response.throttle_time_ms > 0:
677+
log.warning("FindCoordinatorRequest throttled by broker (%d ms)", response.throttle_time_ms)
678+
671679
log.debug("Received group coordinator response %s", response)
672680

673681
error_type = Errors.for_code(response.error_code)

kafka/protocol/commit.py

-46
Original file line numberDiff line numberDiff line change
@@ -207,49 +207,3 @@ class OffsetFetchRequest_v3(Request):
207207
OffsetFetchResponse_v0, OffsetFetchResponse_v1,
208208
OffsetFetchResponse_v2, OffsetFetchResponse_v3,
209209
]
210-
211-
212-
class GroupCoordinatorResponse_v0(Response):
213-
API_KEY = 10
214-
API_VERSION = 0
215-
SCHEMA = Schema(
216-
('error_code', Int16),
217-
('coordinator_id', Int32),
218-
('host', String('utf-8')),
219-
('port', Int32)
220-
)
221-
222-
223-
class GroupCoordinatorResponse_v1(Response):
224-
API_KEY = 10
225-
API_VERSION = 1
226-
SCHEMA = Schema(
227-
('error_code', Int16),
228-
('error_message', String('utf-8')),
229-
('coordinator_id', Int32),
230-
('host', String('utf-8')),
231-
('port', Int32)
232-
)
233-
234-
235-
class GroupCoordinatorRequest_v0(Request):
236-
API_KEY = 10
237-
API_VERSION = 0
238-
RESPONSE_TYPE = GroupCoordinatorResponse_v0
239-
SCHEMA = Schema(
240-
('consumer_group', String('utf-8'))
241-
)
242-
243-
244-
class GroupCoordinatorRequest_v1(Request):
245-
API_KEY = 10
246-
API_VERSION = 1
247-
RESPONSE_TYPE = GroupCoordinatorResponse_v1
248-
SCHEMA = Schema(
249-
('coordinator_key', String('utf-8')),
250-
('coordinator_type', Int8)
251-
)
252-
253-
254-
GroupCoordinatorRequest = [GroupCoordinatorRequest_v0, GroupCoordinatorRequest_v1]
255-
GroupCoordinatorResponse = [GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1]

kafka/protocol/find_coordinator.py

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.protocol.api import Request, Response
4+
from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String
5+
6+
7+
class FindCoordinatorResponse_v0(Response):
8+
API_KEY = 10
9+
API_VERSION = 0
10+
SCHEMA = Schema(
11+
('error_code', Int16),
12+
('coordinator_id', Int32),
13+
('host', String('utf-8')),
14+
('port', Int32)
15+
)
16+
17+
18+
class FindCoordinatorResponse_v1(Response):
19+
API_KEY = 10
20+
API_VERSION = 1
21+
SCHEMA = Schema(
22+
('throttle_time_ms', Int32),
23+
('error_code', Int16),
24+
('error_message', String('utf-8')),
25+
('coordinator_id', Int32),
26+
('host', String('utf-8')),
27+
('port', Int32)
28+
)
29+
30+
31+
class FindCoordinatorResponse_v2(Response):
32+
API_KEY = 10
33+
API_VERSION = 2
34+
SCHEMA = FindCoordinatorResponse_v1.SCHEMA
35+
36+
37+
class FindCoordinatorRequest_v0(Request):
38+
API_KEY = 10
39+
API_VERSION = 0
40+
RESPONSE_TYPE = FindCoordinatorResponse_v0
41+
SCHEMA = Schema(
42+
('consumer_group', String('utf-8'))
43+
)
44+
45+
46+
class FindCoordinatorRequest_v1(Request):
47+
API_KEY = 10
48+
API_VERSION = 1
49+
RESPONSE_TYPE = FindCoordinatorResponse_v1
50+
SCHEMA = Schema(
51+
('coordinator_key', String('utf-8')),
52+
('coordinator_type', Int8) # 0: consumer, 1: transaction
53+
)
54+
55+
56+
class FindCoordinatorRequest_v2(Request):
57+
API_KEY = 10
58+
API_VERSION = 2
59+
RESPONSE_TYPE = FindCoordinatorResponse_v2
60+
SCHEMA = FindCoordinatorRequest_v1.SCHEMA
61+
62+
63+
FindCoordinatorRequest = [FindCoordinatorRequest_v0, FindCoordinatorRequest_v1, FindCoordinatorRequest_v2]
64+
FindCoordinatorResponse = [FindCoordinatorResponse_v0, FindCoordinatorResponse_v1, FindCoordinatorResponse_v2]

kafka/protocol/parser.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import logging
55

66
import kafka.errors as Errors
7-
from kafka.protocol.commit import GroupCoordinatorResponse
7+
from kafka.protocol.find_coordinator import FindCoordinatorResponse
88
from kafka.protocol.frame import KafkaBytes
99
from kafka.protocol.types import Int32, TaggedFields
1010
from kafka.version import __version__
@@ -142,7 +142,7 @@ def _process_response(self, read_buffer):
142142
# 0.8.2 quirk
143143
if (recv_correlation_id == 0 and
144144
correlation_id != 0 and
145-
request.RESPONSE_TYPE is GroupCoordinatorResponse[0] and
145+
request.RESPONSE_TYPE is FindCoordinatorResponse[0] and
146146
(self._api_version == (0, 8, 2) or self._api_version is None)):
147147
log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse'
148148
' Correlation ID does not match request. This'

test/test_protocol.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
import pytest
66

77
from kafka.protocol.api import RequestHeader
8-
from kafka.protocol.commit import GroupCoordinatorRequest
98
from kafka.protocol.fetch import FetchRequest, FetchResponse
9+
from kafka.protocol.find_coordinator import FindCoordinatorRequest
1010
from kafka.protocol.message import Message, MessageSet, PartialMessage
1111
from kafka.protocol.metadata import MetadataRequest
1212
from kafka.protocol.types import Int16, Int32, Int64, String, UnsignedVarInt32, CompactString, CompactArray, CompactBytes
@@ -168,7 +168,7 @@ def test_encode_message_header():
168168
b'client3', # ClientId
169169
])
170170

171-
req = GroupCoordinatorRequest[0]('foo')
171+
req = FindCoordinatorRequest[0]('foo')
172172
header = RequestHeader(req, correlation_id=4, client_id='client3')
173173
assert header.encode() == expect
174174

0 commit comments

Comments
 (0)