Skip to content

KAFKA-3486: fix autocommit when partitions assigned manually #767

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 8 additions & 44 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,12 @@ def __init__(self, client, subscription, metrics, metric_group_prefix,
else:
interval = self.config['auto_commit_interval_ms'] / 1000.0
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)

# When using broker-coordinated consumer groups, auto-commit will
# be automatically enabled on group join (see _on_join_complete)
# Otherwise, we should enable now b/c there will be no group join
if self.config['api_version'] < (0, 9):
self._auto_commit_task.enable()
self._auto_commit_task.reschedule()

self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
self._subscription)

def __del__(self):
if hasattr(self, '_auto_commit_task') and self._auto_commit_task:
self._auto_commit_task.disable()
if hasattr(self, '_cluster') and self._cluster:
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))

Expand Down Expand Up @@ -211,9 +204,9 @@ def _on_join_complete(self, generation, member_id, protocol,
# based on the received assignment
assignor.on_assignment(assignment)

# restart the autocommit task if needed
# reschedule the auto commit starting from now
if self._auto_commit_task:
self._auto_commit_task.enable()
self._auto_commit_task.reschedule()

assigned = set(self._subscription.assigned_partitions())
log.info("Setting newly assigned partitions %s for group %s",
Expand Down Expand Up @@ -396,10 +389,6 @@ def _maybe_auto_commit_offsets_sync(self):
if self._auto_commit_task is None:
return

# disable periodic commits prior to committing synchronously. note that they will
# be re-enabled after a rebalance completes
self._auto_commit_task.disable()

try:
self.commit_offsets_sync(self._subscription.all_consumed_offsets())

Expand Down Expand Up @@ -672,47 +661,25 @@ def __init__(self, coordinator, interval):
self._coordinator = coordinator
self._client = coordinator._client
self._interval = interval
self._enabled = False
self._request_in_flight = False

def enable(self):
if self._enabled:
log.warning("AutoCommitTask is already enabled")
return

self._enabled = True
if not self._request_in_flight:
self._client.schedule(self, time.time() + self._interval)

def disable(self):
self._enabled = False
try:
self._client.unschedule(self)
except KeyError:
pass

def _reschedule(self, at):
assert self._enabled, 'AutoCommitTask not enabled'
def reschedule(self, at=None):
if at is None:
at = time.time() + self._interval
self._client.schedule(self, at)

def __call__(self):
if not self._enabled:
return

if self._coordinator.coordinator_unknown():
log.debug("Cannot auto-commit offsets for group %s because the"
" coordinator is unknown", self._coordinator.group_id)
backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0
self._client.schedule(self, time.time() + backoff)
self.reschedule(time.time() + backoff)
return

self._request_in_flight = True
self._coordinator.commit_offsets_async(
self._coordinator._subscription.all_consumed_offsets(),
self._handle_commit_response)

def _handle_commit_response(self, offsets, result):
self._request_in_flight = False
if result is True:
log.debug("Successfully auto-committed offsets for group %s",
self._coordinator.group_id)
Expand All @@ -731,10 +698,7 @@ def _handle_commit_response(self, offsets, result):
self._coordinator.group_id, result)
next_at = time.time() + self._interval

if not self._enabled:
log.warning("Skipping auto-commit reschedule -- it is disabled")
return
self._reschedule(next_at)
self.reschedule(next_at)


class ConsumerCoordinatorMetrics(object):
Expand Down
5 changes: 0 additions & 5 deletions test/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,18 +370,13 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
side_effect=error)
if has_auto_commit:
assert coordinator._auto_commit_task is not None
# auto-commit enable is defered until after group join in 0.9+
if api_version >= (0, 9):
coordinator._auto_commit_task.enable()
assert coordinator._auto_commit_task._enabled is True
else:
assert coordinator._auto_commit_task is None

assert coordinator._maybe_auto_commit_offsets_sync() is None

if has_auto_commit:
assert coordinator._auto_commit_task is not None
assert coordinator._auto_commit_task._enabled is False

assert commit_sync.call_count == (1 if commit_offsets else 0)
assert mock_warn.call_count == (1 if warn else 0)
Expand Down