Skip to content

Call default_offset_commit_callback after _maybe_auto_commit_offsets_async #2546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 16, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, client, subscription, metrics, **configs):
auto_commit_interval_ms (int): milliseconds between automatic
offset commits, if enable_auto_commit is True. Default: 5000.
default_offset_commit_callback (callable): called as
callback(offsets, exception) response will be either an Exception
callback(offsets, response) response will be either an Exception
or None. This callback can be used to trigger custom actions when
a commit request completes.
assignors (list): List of objects to use to distribute partition
Expand Down Expand Up @@ -453,8 +453,8 @@ def close(self, autocommit=True, timeout_ms=None):

def _invoke_completed_offset_commit_callbacks(self):
while self.completed_offset_commits:
callback, offsets, exception = self.completed_offset_commits.popleft()
callback(offsets, exception)
callback, offsets, res_or_exc = self.completed_offset_commits.popleft()
callback(offsets, res_or_exc)

def commit_offsets_async(self, offsets, callback=None):
"""Commit specific offsets asynchronously.
Expand Down Expand Up @@ -859,20 +859,19 @@ def _handle_offset_fetch_response(self, future, response):
" %s", self.group_id, tp)
future.success(offsets)

def _default_offset_commit_callback(self, offsets, exception):
if exception is not None:
log.error("Offset commit failed: %s", exception)

def _commit_offsets_async_on_complete(self, offsets, exception):
if exception is not None:
def _default_offset_commit_callback(self, offsets, res_or_exc):
if isinstance(res_or_exc, Exception):
log.warning("Auto offset commit failed for group %s: %s",
self.group_id, exception)
if getattr(exception, 'retriable', False):
self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline)
self.group_id, res_or_exc)
else:
log.debug("Completed autocommit of offsets %s for group %s",
offsets, self.group_id)

def _commit_offsets_async_on_complete(self, offsets, res_or_exc):
if isinstance(res_or_exc, Exception) and getattr(res_or_exc, 'retriable', False):
self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline)
self.config['default_offset_commit_callback'](offsets, res_or_exc)

def _maybe_auto_commit_offsets_async(self):
if self.config['enable_auto_commit']:
if self.coordinator_unknown():
Expand Down
Loading