Skip to content

Commit e06ea70

Browse files
TylerLubeckdpkp
authored andcommitted
Admin protocol updates (#1948)
1 parent e3362ac commit e06ea70

File tree

3 files changed

+284
-30
lines changed

3 files changed

+284
-30
lines changed

kafka/admin/client.py

+27-10
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
435435
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
436436
timeout=timeout_ms
437437
)
438-
elif version <= 2:
438+
elif version <= 3:
439439
request = CreateTopicsRequest[version](
440440
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
441441
timeout=timeout_ms,
@@ -459,7 +459,7 @@ def delete_topics(self, topics, timeout_ms=None):
459459
"""
460460
version = self._matching_api_version(DeleteTopicsRequest)
461461
timeout_ms = self._validate_timeout(timeout_ms)
462-
if version <= 1:
462+
if version <= 3:
463463
request = DeleteTopicsRequest[version](
464464
topics=topics,
465465
timeout=timeout_ms
@@ -803,7 +803,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
803803
DescribeConfigsRequest[version](resources=topic_resources)
804804
))
805805

806-
elif version == 1:
806+
elif version <= 2:
807807
if len(broker_resources) > 0:
808808
for broker_resource in broker_resources:
809809
try:
@@ -853,7 +853,7 @@ def alter_configs(self, config_resources):
853853
:return: Appropriate version of AlterConfigsResponse class.
854854
"""
855855
version = self._matching_api_version(AlterConfigsRequest)
856-
if version == 0:
856+
if version <= 1:
857857
request = AlterConfigsRequest[version](
858858
resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
859859
)
@@ -901,7 +901,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
901901
"""
902902
version = self._matching_api_version(CreatePartitionsRequest)
903903
timeout_ms = self._validate_timeout(timeout_ms)
904-
if version == 0:
904+
if version <= 1:
905905
request = CreatePartitionsRequest[version](
906906
topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
907907
timeout=timeout_ms,
@@ -928,7 +928,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
928928
# describe delegation_token protocol not yet implemented
929929
# Note: send the request to the least_loaded_node()
930930

931-
def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id):
931+
def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, include_authorized_operations=False):
932932
"""Send a DescribeGroupsRequest to the group's coordinator.
933933
934934
:param group_id: The group name as a string
@@ -937,13 +937,24 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id)
937937
:return: A message future.
938938
"""
939939
version = self._matching_api_version(DescribeGroupsRequest)
940-
if version <= 1:
940+
if version <= 2:
941+
if include_authorized_operations:
942+
raise IncompatibleBrokerVersion(
943+
"include_authorized_operations requests "
944+
"DescribeGroupsRequest >= v3, which is not "
945+
"supported by Kafka {}".format(version)
946+
)
941947
# Note: KAFKA-6788 A potential optimization is to group the
942948
# request per coordinator and send one request with a list of
943949
# all consumer groups. Java still hasn't implemented this
944950
# because the error checking is hard to get right when some
945951
# groups error and others don't.
946952
request = DescribeGroupsRequest[version](groups=(group_id,))
953+
elif version <= 3:
954+
request = DescribeGroupsRequest[version](
955+
groups=(group_id,),
956+
include_authorized_operations=include_authorized_operations
957+
)
947958
else:
948959
raise NotImplementedError(
949960
"Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient."
@@ -952,7 +963,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id)
952963

953964
def _describe_consumer_groups_process_response(self, response):
954965
"""Process a DescribeGroupsResponse into a group description."""
955-
if response.API_VERSION <= 1:
966+
if response.API_VERSION <= 3:
956967
assert len(response.groups) == 1
957968
# TODO need to implement converting the response tuple into
958969
# a more accessible interface like a namedtuple and then stop
@@ -976,7 +987,7 @@ def _describe_consumer_groups_process_response(self, response):
976987
.format(response.API_VERSION))
977988
return group_description
978989

979-
def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
990+
def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False):
980991
"""Describe a set of consumer groups.
981992
982993
Any errors are immediately raised.
@@ -989,6 +1000,9 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
9891000
useful for avoiding extra network round trips if you already know
9901001
the group coordinator. This is only useful when all the group_ids
9911002
have the same coordinator, otherwise it will error. Default: None.
1003+
:param include_authorized_operatoins: Whether or not to include
1004+
information about the operations a group is allowed to perform.
1005+
Only supported on API version >= v3. Default: False.
9921006
:return: A list of group descriptions. For now the group descriptions
9931007
are the raw results from the DescribeGroupsResponse. Long-term, we
9941008
plan to change this to return namedtuples as well as decoding the
@@ -1001,7 +1015,10 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
10011015
this_groups_coordinator_id = group_coordinator_id
10021016
else:
10031017
this_groups_coordinator_id = self._find_coordinator_id(group_id)
1004-
f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id)
1018+
f = self._describe_consumer_groups_send_request(
1019+
group_id,
1020+
this_groups_coordinator_id,
1021+
include_authorized_operations)
10051022
futures.append(f)
10061023

10071024
self._wait_for_futures(futures)

0 commit comments

Comments
 (0)