Skip to content

Admin - Implement perform leader election #2536

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 86 additions & 13 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest)
DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest, ElectLeadersRequest, ElectionType)
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.find_coordinator import FindCoordinatorRequest
from kafka.protocol.metadata import MetadataRequest
Expand Down Expand Up @@ -393,27 +393,55 @@ def _send_request_to_controller(self, request):
# So this is a little brittle in that it assumes all responses have
# one of these attributes and that they always unpack into
# (topic, error_code) tuples.
topic_error_tuples = (response.topic_errors if hasattr(response, 'topic_errors')
else response.topic_error_codes)
# Also small py2/py3 compatibility -- py3 can ignore extra values
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
# So for now we have to map across the list and explicitly drop any
# extra values (usually the error_message)
for topic, error_code in map(lambda e: e[:2], topic_error_tuples):
topic_error_tuples = getattr(response, 'topic_errors', getattr(response, 'topic_error_codes', None))
if topic_error_tuples is not None:
success = self._parse_topic_request_response(topic_error_tuples, request, response, tries)
else:
# Leader Election request has a two layer error response (topic and partition)
success = self._parse_topic_partition_request_response(request, response, tries)

if success:
return response
raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")

def _parse_topic_request_response(self, topic_error_tuples, request, response, tries):
# Also small py2/py3 compatibility -- py3 can ignore extra values
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
# So for now we have to map across the list and explicitly drop any
# extra values (usually the error_message)
for topic, error_code in map(lambda e: e[:2], topic_error_tuples):
error_type = Errors.for_code(error_code)
if tries and error_type is NotControllerError:
# No need to inspect the rest of the errors for
# non-retriable errors because NotControllerError should
# either be thrown for all errors or no errors.
self._refresh_controller_id()
return False
elif error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
return True

def _parse_topic_partition_request_response(self, request, response, tries):
# Also small py2/py3 compatibility -- py3 can ignore extra values
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
# So for now we have to map across the list and explicitly drop any
# extra values (usually the error_message)
for topic, partition_results in response.replication_election_results:
for partition_id, error_code in map(lambda e: e[:2], partition_results):
error_type = Errors.for_code(error_code)
if tries and error_type is NotControllerError:
# No need to inspect the rest of the errors for
# non-retriable errors because NotControllerError should
# either be thrown for all errors or no errors.
self._refresh_controller_id()
break
elif error_type is not Errors.NoError:
return False
elif error_type not in [Errors.NoError, Errors.ElectionNotNeeded]:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
else:
return response
raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")
return True

@staticmethod
def _convert_new_topic_request(new_topic):
Expand Down Expand Up @@ -1651,6 +1679,51 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
.format(version))
return self._send_request_to_node(group_coordinator_id, request)

@staticmethod
def _convert_topic_partitions(topic_partitions):
return [
(
topic,
partition_ids
)
for topic, partition_ids in topic_partitions.items()
]

def _get_all_topic_partitions(self):
return [
(
topic,
[partition_info.partition for partition_info in self._client.cluster._partitions[topic].values()]
)
for topic in self._client.cluster.topics()
]

def _get_topic_partitions(self, topic_partitions):
if topic_partitions is None:
return self._get_all_topic_partitions()
return self._convert_topic_partitions(topic_partitions)

def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None):
"""Perform leader election on the topic partitions.

:param election_type: Type of election to attempt. 0 for Perferred, 1 for Unclean
:param topic_partitions: A map of topic name strings to partition ids list.
By default, will run on all topic partitions
:param timeout_ms: Milliseconds to wait for the leader election process to complete
before the broker returns.

:return: Appropriate version of ElectLeadersResponse class.
"""
version = self._client.api_version(ElectLeadersRequest, max_version=1)
timeout_ms = self._validate_timeout(timeout_ms)
request = ElectLeadersRequest[version](
election_type=ElectionType(election_type),
topic_partitions=self._get_topic_partitions(topic_partitions),
timeout=timeout_ms,
)
# TODO convert structs to a more pythonic interface
return self._send_request_to_controller(request)

def _wait_for_futures(self, futures):
"""Block until all futures complete. If any fail, raise the encountered exception.

Expand Down
81 changes: 81 additions & 0 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
from __future__ import absolute_import

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum

from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields

Expand Down Expand Up @@ -1031,3 +1038,77 @@ class ListPartitionReassignmentsRequest_v0(Request):
ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]

ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]


class ElectLeadersResponse_v0(Response):
API_KEY = 43
API_VERSION = 1
SCHEMA = Schema(
('throttle_time_ms', Int32),
('error_code', Int16),
('replication_election_results', Array(
('topic', String('utf-8')),
('partition_result', Array(
('partition_id', Int32),
('error_code', Int16),
('error_message', String('utf-8'))
))
))
)


class ElectLeadersRequest_v0(Request):
API_KEY = 43
API_VERSION = 1
RESPONSE_TYPE = ElectLeadersResponse_v0
SCHEMA = Schema(
('election_type', Int8),
('topic_partitions', Array(
('topic', String('utf-8')),
('partition_ids', Array(Int32))
)),
('timeout', Int32),
)


class ElectLeadersResponse_v1(Response):
API_KEY = 43
API_VERSION = 1
SCHEMA = Schema(
('throttle_time_ms', Int32),
('error_code', Int16),
('replication_election_results', Array(
('topic', String('utf-8')),
('partition_result', Array(
('partition_id', Int32),
('error_code', Int16),
('error_message', String('utf-8'))
))
))
)


class ElectLeadersRequest_v1(Request):
API_KEY = 43
API_VERSION = 1
RESPONSE_TYPE = ElectLeadersResponse_v1
SCHEMA = Schema(
('election_type', Int8),
('topic_partitions', Array(
('topic', String('utf-8')),
('partition_ids', Array(Int32))
)),
('timeout', Int32),
)


class ElectionType(IntEnum):
""" Leader election type
"""

PREFERRED = 0,
UNCLEAN = 1


ElectLeadersRequest = [ElectLeadersRequest_v0, ElectLeadersRequest_v1]
ElectLeadersResponse = [ElectLeadersResponse_v0, ElectLeadersResponse_v1]
Loading