Skip to content

KIP-70: Auto-commit offsets on consumer.unsubscribe(), defer assignment changes to rejoin #2560

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,15 @@ def assign(self, partitions):
no rebalance operation triggered when group membership or cluster
and topic metadata change.
"""
self._subscription.assign_from_user(partitions)
self._client.set_topics([tp.topic for tp in partitions])
if not partitions:
self.unsubscribe()
else:
# make sure the offsets of topic partitions the consumer is unsubscribing from
# are committed since there will be no following rebalance
self._coordinator.maybe_auto_commit_offsets_now()
self._subscription.assign_from_user(partitions)
self._client.set_topics([tp.topic for tp in partitions])
log.debug("Subscribed to partition(s): %s", partitions)

def assignment(self):
"""Get the TopicPartitions currently assigned to this consumer.
Expand Down Expand Up @@ -959,8 +966,11 @@ def subscription(self):

def unsubscribe(self):
"""Unsubscribe from all topics and clear all assigned partitions."""
# make sure the offsets of topic partitions the consumer is unsubscribing from
# are committed since there will be no following rebalance
self._coordinator.maybe_auto_commit_offsets_now()
self._subscription.unsubscribe()
self._coordinator.close()
self._coordinator.maybe_leave_group()
self._client.cluster.need_all_topic_metadata = False
self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
Expand Down
11 changes: 6 additions & 5 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from __future__ import absolute_import

import abc
try:
from collections import Sequence
except ImportError:
from collections.abc import Sequence
Comment on lines +5 to +7
Copy link

@millerdev millerdev Mar 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.10 it will stop working

I think the imports should be reversed here:

Suggested change
from collections import Sequence
except ImportError:
from collections.abc import Sequence
from collections.abc import Sequence
except ImportError:
from collections import Sequence

Or possibly just replace the entire try/except with a single import since the other import is only valid on very old/outdated Pythons.

from collections.abc import Sequence

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, yea we still support very old pythons (2.7). I did try old, except new but that does cause deprecation warnings for interim pythons. I can see how you might prefer try new, except old

import logging
import re

Expand Down Expand Up @@ -114,6 +118,8 @@ def subscribe(self, topics=(), pattern=None, listener=None):
self.subscription = set()
self.subscribed_pattern = re.compile(pattern)
else:
if isinstance(topics, str) or not isinstance(topics, Sequence):
raise TypeError('Topics must be a list (or non-str sequence)')
self.change_subscription(topics)

if listener and not isinstance(listener, ConsumerRebalanceListener):
Expand Down Expand Up @@ -151,11 +157,6 @@ def change_subscription(self, topics):
self.subscription = set(topics)
self._group_subscription.update(topics)

# Remove any assigned partitions which are no longer subscribed to
for tp in set(self.assignment.keys()):
if tp.topic not in self.subscription:
del self.assignment[tp]

def group_subscribe(self, topics):
"""Add topics to the current group subscription.

Expand Down
11 changes: 9 additions & 2 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,8 +878,15 @@ def _maybe_auto_commit_offsets_async(self):
self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000
elif time.time() > self.next_auto_commit_deadline:
self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
self.commit_offsets_async(self._subscription.all_consumed_offsets(),
self._commit_offsets_async_on_complete)
self._do_auto_commit_offsets_async()

def maybe_auto_commit_offsets_now(self):
if self.config['enable_auto_commit'] and not self.coordinator_unknown():
self._do_auto_commit_offsets_async()

def _do_auto_commit_offsets_async(self):
self.commit_offsets_async(self._subscription.all_consumed_offsets(),
self._commit_offsets_async_on_complete)


class ConsumerCoordinatorMetrics(object):
Expand Down
64 changes: 45 additions & 19 deletions test/test_consumer.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,52 @@
from __future__ import absolute_import

import pytest

from kafka import KafkaConsumer
from kafka.errors import KafkaConfigurationError
from kafka import KafkaConsumer, TopicPartition
from kafka.errors import KafkaConfigurationError, IllegalStateError


def test_session_timeout_larger_than_request_timeout_raises():
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=50000, request_timeout_ms=40000)


def test_fetch_max_wait_larger_than_request_timeout_raises():
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=50000, request_timeout_ms=40000)


def test_request_timeout_larger_than_connections_max_idle_ms_raises():
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), request_timeout_ms=50000, connections_max_idle_ms=40000)


def test_subscription_copy():
consumer = KafkaConsumer('foo', api_version=(0, 10, 0))
sub = consumer.subscription()
assert sub is not consumer.subscription()
assert sub == set(['foo'])
sub.add('fizz')
assert consumer.subscription() == set(['foo'])

class TestKafkaConsumer:
def test_session_timeout_larger_than_request_timeout_raises(self):
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=50000, request_timeout_ms=40000)

def test_fetch_max_wait_larger_than_request_timeout_raises(self):
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=50000, request_timeout_ms=40000)
def test_assign():
# Consumer w/ subscription to topic 'foo'
consumer = KafkaConsumer('foo', api_version=(0, 10, 0))
assert consumer.assignment() == set()
# Cannot assign manually
with pytest.raises(IllegalStateError):
consumer.assign([TopicPartition('foo', 0)])

def test_request_timeout_larger_than_connections_max_idle_ms_raises(self):
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), request_timeout_ms=50000, connections_max_idle_ms=40000)
assert 'foo' in consumer._client._topics

def test_subscription_copy(self):
consumer = KafkaConsumer('foo', api_version=(0, 10, 0))
sub = consumer.subscription()
assert sub is not consumer.subscription()
assert sub == set(['foo'])
sub.add('fizz')
assert consumer.subscription() == set(['foo'])
consumer = KafkaConsumer(api_version=(0, 10, 0))
assert consumer.assignment() == set()
consumer.assign([TopicPartition('foo', 0)])
assert consumer.assignment() == set([TopicPartition('foo', 0)])
assert 'foo' in consumer._client._topics
# Cannot subscribe
with pytest.raises(IllegalStateError):
consumer.subscribe(topics=['foo'])
consumer.assign([])
assert consumer.assignment() == set()
2 changes: 1 addition & 1 deletion test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def test_update_fetch_positions(fetcher, topic, mocker):

def test__reset_offset(fetcher, mocker):
tp = TopicPartition("topic", 0)
fetcher._subscriptions.subscribe(topics="topic")
fetcher._subscriptions.subscribe(topics=["topic"])
fetcher._subscriptions.assign_from_subscribed([tp])
fetcher._subscriptions.need_offset_reset(tp)
mocked = mocker.patch.object(fetcher, '_retrieve_offsets')
Expand Down
57 changes: 57 additions & 0 deletions test/test_subscription_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from __future__ import absolute_import

import pytest

from kafka import TopicPartition
from kafka.consumer.subscription_state import SubscriptionState, TopicPartitionState
from kafka.vendor import six


def test_type_error():
s = SubscriptionState()
with pytest.raises(TypeError):
s.subscribe(topics='foo')

s.subscribe(topics=['foo'])


def test_change_subscription():
s = SubscriptionState()
s.subscribe(topics=['foo'])
assert s.subscription == set(['foo'])
s.change_subscription(['bar'])
assert s.subscription == set(['bar'])


def test_group_subscribe():
s = SubscriptionState()
s.subscribe(topics=['foo'])
assert s.subscription == set(['foo'])
s.group_subscribe(['bar'])
assert s.subscription == set(['foo'])
assert s._group_subscription == set(['foo', 'bar'])

s.reset_group_subscription()
assert s.subscription == set(['foo'])
assert s._group_subscription == set(['foo'])


def test_assign_from_subscribed():
s = SubscriptionState()
s.subscribe(topics=['foo'])
with pytest.raises(ValueError):
s.assign_from_subscribed([TopicPartition('bar', 0)])

s.assign_from_subscribed([TopicPartition('foo', 0), TopicPartition('foo', 1)])
assert set(s.assignment.keys()) == set([TopicPartition('foo', 0), TopicPartition('foo', 1)])
assert all([isinstance(s, TopicPartitionState) for s in six.itervalues(s.assignment)])
assert all([not s.has_valid_position for s in six.itervalues(s.assignment)])


def test_change_subscription_after_assignment():
s = SubscriptionState()
s.subscribe(topics=['foo'])
s.assign_from_subscribed([TopicPartition('foo', 0), TopicPartition('foo', 1)])
# Changing subscription retains existing assignment until next rebalance
s.change_subscription(['bar'])
assert set(s.assignment.keys()) == set([TopicPartition('foo', 0), TopicPartition('foo', 1)])
Loading