Skip to content

KIP-394: handle MEMBER_ID_REQUIRED error w/ second join group request #2598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ def __init__(self, generation_id, member_id, protocol):
self.member_id = member_id
self.protocol = protocol

@property
def is_valid(self):
return self.generation_id != DEFAULT_GENERATION_ID

def __eq__(self, other):
return (self.generation_id == other.generation_id and
self.member_id == other.member_id and
self.protocol == other.protocol)


Generation.NO_GENERATION = Generation(DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, None)


Expand Down Expand Up @@ -461,7 +471,8 @@ def join_group(self, timeout_ms=None):
exception = future.exception
if isinstance(exception, (Errors.UnknownMemberIdError,
Errors.RebalanceInProgressError,
Errors.IllegalGenerationError)):
Errors.IllegalGenerationError,
Errors.MemberIdRequiredError)):
continue
elif not future.retriable():
raise exception # pylint: disable-msg=raising-bad-type
Expand Down Expand Up @@ -491,7 +502,7 @@ def _send_join_group_request(self):
(protocol, metadata if isinstance(metadata, bytes) else metadata.encode())
for protocol, metadata in self.group_protocols()
]
version = self._client.api_version(JoinGroupRequest, max_version=3)
version = self._client.api_version(JoinGroupRequest, max_version=4)
if version == 0:
request = JoinGroupRequest[version](
self.group_id,
Expand Down Expand Up @@ -585,6 +596,11 @@ def _handle_join_group_response(self, future, send_time, response):
future.failure(error)
elif error_type is Errors.GroupAuthorizationFailedError:
future.failure(error_type(self.group_id))
elif error_type is Errors.MemberIdRequiredError:
# Broker requires a concrete member id to be allowed to join the group. Update member id
# and send another join group request in next cycle.
self.reset_generation(response.member_id)
future.failure(error_type())
else:
# unexpected error, throw the exception
error = error_type()
Expand Down Expand Up @@ -762,10 +778,10 @@ def generation(self):
return None
return self._generation

def reset_generation(self):
"""Reset the generation and memberId because we have fallen out of the group."""
def reset_generation(self, member_id=UNKNOWN_MEMBER_ID):
"""Reset the generation and member_id because we have fallen out of the group."""
with self._lock:
self._generation = Generation.NO_GENERATION
self._generation = Generation(DEFAULT_GENERATION_ID, member_id, None)
self.rejoin_needed = True
self.state = MemberState.UNJOINED

Expand Down Expand Up @@ -799,8 +815,10 @@ def _close_heartbeat_thread(self, timeout_ms=None):
self._heartbeat_thread = None

def __del__(self):
if hasattr(self, '_heartbeat_thread'):
try:
self._close_heartbeat_thread()
except (TypeError, AttributeError):
pass

def close(self, timeout_ms=None):
"""Close the coordinator, leave the current group,
Expand All @@ -816,7 +834,7 @@ def maybe_leave_group(self, timeout_ms=None):
with self._client._lock, self._lock:
if (not self.coordinator_unknown()
and self.state is not MemberState.UNJOINED
and self._generation is not Generation.NO_GENERATION):
and self._generation.is_valid):

# this is a minimal effort attempt to leave the group. we do not
# attempt any resending if the request fails or times out.
Expand Down
20 changes: 17 additions & 3 deletions kafka/protocol/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ class JoinGroupResponse_v3(Response):
SCHEMA = JoinGroupResponse_v2.SCHEMA


class JoinGroupResponse_v4(Response):
API_KEY = 11
API_VERSION = 4
SCHEMA = JoinGroupResponse_v3.SCHEMA


class JoinGroupRequest_v0(Request):
API_KEY = 11
API_VERSION = 0
Expand Down Expand Up @@ -95,14 +101,22 @@ class JoinGroupRequest_v3(Request):
API_VERSION = 3
RESPONSE_TYPE = JoinGroupResponse_v3
SCHEMA = JoinGroupRequest_v2.SCHEMA
UNKNOWN_MEMBER_ID = ''


class JoinGroupRequest_v4(Request):
API_KEY = 11
API_VERSION = 4
RESPONSE_TYPE = JoinGroupResponse_v4
SCHEMA = JoinGroupRequest_v3.SCHEMA


JoinGroupRequest = [
JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2, JoinGroupRequest_v3
JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2,
JoinGroupRequest_v3, JoinGroupRequest_v4,
]
JoinGroupResponse = [
JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2, JoinGroupResponse_v3
JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2,
JoinGroupResponse_v3, JoinGroupResponse_v4,
]


Expand Down
2 changes: 1 addition & 1 deletion test/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def test_close(mocker, coordinator):
coordinator._handle_leave_group_response.assert_called_with('foobar')

assert coordinator.generation() is None
assert coordinator._generation is Generation.NO_GENERATION
assert coordinator._generation == Generation.NO_GENERATION
assert coordinator.state is MemberState.UNJOINED
assert coordinator.rejoin_needed is True

Expand Down
Loading