Skip to content

Commit ce88b6d

Browse files
committed
KIP-70: Auto-commit offsets on consumer.unsubscribe()
1 parent cd4830a commit ce88b6d

File tree

3 files changed

+22
-10
lines changed

3 files changed

+22
-10
lines changed

kafka/consumer/group.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -444,8 +444,15 @@ def assign(self, partitions):
444444
no rebalance operation triggered when group membership or cluster
445445
and topic metadata change.
446446
"""
447-
self._subscription.assign_from_user(partitions)
448-
self._client.set_topics([tp.topic for tp in partitions])
447+
if not partitions:
448+
self.unsubscribe()
449+
else:
450+
# make sure the offsets of topic partitions the consumer is unsubscribing from
451+
# are committed since there will be no following rebalance
452+
self._coordinator.maybe_auto_commit_offsets_now()
453+
self._subscription.assign_from_user(partitions)
454+
self._client.set_topics([tp.topic for tp in partitions])
455+
log.debug("Subscribed to partition(s): %s", partitions)
449456

450457
def assignment(self):
451458
"""Get the TopicPartitions currently assigned to this consumer.
@@ -959,8 +966,11 @@ def subscription(self):
959966

960967
def unsubscribe(self):
961968
"""Unsubscribe from all topics and clear all assigned partitions."""
969+
# make sure the offsets of topic partitions the consumer is unsubscribing from
970+
# are committed since there will be no following rebalance
971+
self._coordinator.maybe_auto_commit_offsets_now()
962972
self._subscription.unsubscribe()
963-
self._coordinator.close()
973+
self._coordinator.maybe_leave_group()
964974
self._client.cluster.need_all_topic_metadata = False
965975
self._client.set_topics([])
966976
log.debug("Unsubscribed all topics or patterns and assigned partitions")

kafka/consumer/subscription_state.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,6 @@ def change_subscription(self, topics):
172172
self.subscription = set(topics)
173173
self._group_subscription.update(topics)
174174

175-
# Remove any assigned partitions which are no longer subscribed to
176-
for tp in set(self.assignment.keys()):
177-
if tp.topic not in self.subscription:
178-
del self.assignment[tp]
179-
180175
def group_subscribe(self, topics):
181176
"""Add topics to the current group subscription.
182177

kafka/coordinator/consumer.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -878,8 +878,15 @@ def _maybe_auto_commit_offsets_async(self):
878878
self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000
879879
elif time.time() > self.next_auto_commit_deadline:
880880
self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
881-
self.commit_offsets_async(self._subscription.all_consumed_offsets(),
882-
self._commit_offsets_async_on_complete)
881+
self._do_auto_commit_offsets_async()
882+
883+
def maybe_auto_commit_offsets_now(self):
884+
if self.config['enable_auto_commit'] and not self.coordinator_unknown():
885+
self._do_auto_commit_offsets_async()
886+
887+
def _do_auto_commit_offsets_async(self):
888+
self.commit_offsets_async(self._subscription.all_consumed_offsets(),
889+
self._commit_offsets_async_on_complete)
883890

884891

885892
class ConsumerCoordinatorMetrics(object):

0 commit comments

Comments
 (0)