Skip to content

Commit 6cec0dd

Browse files
committed
KAFKA-3486: fix autocommit when partitions assigned manually
1 parent 506d023 commit 6cec0dd

File tree

1 file changed

+8
-44
lines changed

1 file changed

+8
-44
lines changed

kafka/coordinator/consumer.py

Lines changed: 8 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -99,19 +99,12 @@ def __init__(self, client, subscription, metrics, metric_group_prefix,
9999
else:
100100
interval = self.config['auto_commit_interval_ms'] / 1000.0
101101
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
102-
103-
# When using broker-coordinated consumer groups, auto-commit will
104-
# be automatically enabled on group join (see _on_join_complete)
105-
# Otherwise, we should enable now b/c there will be no group join
106-
if self.config['api_version'] < (0, 9):
107-
self._auto_commit_task.enable()
102+
self._auto_commit_task.reschedule()
108103

109104
self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
110105
self._subscription)
111106

112107
def __del__(self):
113-
if hasattr(self, '_auto_commit_task') and self._auto_commit_task:
114-
self._auto_commit_task.disable()
115108
if hasattr(self, '_cluster') and self._cluster:
116109
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
117110

@@ -199,9 +192,9 @@ def _on_join_complete(self, generation, member_id, protocol,
199192
# based on the received assignment
200193
assignor.on_assignment(assignment)
201194

202-
# restart the autocommit task if needed
195+
# reschedule the auto commit starting from now
203196
if self._auto_commit_task:
204-
self._auto_commit_task.enable()
197+
self._auto_commit_task.reschedule()
205198

206199
assigned = set(self._subscription.assigned_partitions())
207200
log.info("Setting newly assigned partitions %s for group %s",
@@ -378,10 +371,6 @@ def _maybe_auto_commit_offsets_sync(self):
378371
if self._auto_commit_task is None:
379372
return
380373

381-
# disable periodic commits prior to committing synchronously. note that they will
382-
# be re-enabled after a rebalance completes
383-
self._auto_commit_task.disable()
384-
385374
try:
386375
self.commit_offsets_sync(self._subscription.all_consumed_offsets())
387376

@@ -654,47 +643,25 @@ def __init__(self, coordinator, interval):
654643
self._coordinator = coordinator
655644
self._client = coordinator._client
656645
self._interval = interval
657-
self._enabled = False
658-
self._request_in_flight = False
659-
660-
def enable(self):
661-
if self._enabled:
662-
log.warning("AutoCommitTask is already enabled")
663-
return
664-
665-
self._enabled = True
666-
if not self._request_in_flight:
667-
self._client.schedule(self, time.time() + self._interval)
668646

669-
def disable(self):
670-
self._enabled = False
671-
try:
672-
self._client.unschedule(self)
673-
except KeyError:
674-
pass
675-
676-
def _reschedule(self, at):
677-
assert self._enabled, 'AutoCommitTask not enabled'
647+
def reschedule(self, at=None):
648+
if at is None:
649+
at = time.time() + self._interval
678650
self._client.schedule(self, at)
679651

680652
def __call__(self):
681-
if not self._enabled:
682-
return
683-
684653
if self._coordinator.coordinator_unknown():
685654
log.debug("Cannot auto-commit offsets for group %s because the"
686655
" coordinator is unknown", self._coordinator.group_id)
687656
backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0
688-
self._client.schedule(self, time.time() + backoff)
657+
self.reschedule(time.time() + backoff)
689658
return
690659

691-
self._request_in_flight = True
692660
self._coordinator.commit_offsets_async(
693661
self._coordinator._subscription.all_consumed_offsets(),
694662
self._handle_commit_response)
695663

696664
def _handle_commit_response(self, offsets, result):
697-
self._request_in_flight = False
698665
if result is True:
699666
log.debug("Successfully auto-committed offsets for group %s",
700667
self._coordinator.group_id)
@@ -713,10 +680,7 @@ def _handle_commit_response(self, offsets, result):
713680
self._coordinator.group_id, result)
714681
next_at = time.time() + self._interval
715682

716-
if not self._enabled:
717-
log.warning("Skipping auto-commit reschedule -- it is disabled")
718-
return
719-
self._reschedule(next_at)
683+
self.reschedule(next_at)
720684

721685

722686
class ConsumerCoordinatorMetrics(object):

0 commit comments

Comments
 (0)