Skip to content

Add optional timeout_ms kwarg to remaining consumer/coordinator methods #2544

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,21 @@ def send_fetches(self):
self._clean_done_fetch_futures()
return futures

def reset_offsets_if_needed(self, partitions):
def reset_offsets_if_needed(self, partitions, timeout_ms=None):
"""Lookup and set offsets for any partitions which are awaiting an
explicit reset.

Arguments:
partitions (set of TopicPartitions): the partitions to reset

Raises:
KafkaTimeoutError if timeout_ms provided
"""
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout resetting offsets')
for tp in partitions:
# TODO: If there are several offsets to reset, we could submit offset requests in parallel
if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp)
self._reset_offset(tp, timeout_ms=inner_timeout_ms())

def _clean_done_fetch_futures(self):
while True:
Expand All @@ -160,7 +164,7 @@ def in_flight_fetches(self):
self._clean_done_fetch_futures()
return bool(self._fetch_futures)

def update_fetch_positions(self, partitions):
def update_fetch_positions(self, partitions, timeout_ms=None):
"""Update the fetch positions for the provided partitions.

Arguments:
Expand All @@ -169,7 +173,9 @@ def update_fetch_positions(self, partitions):
Raises:
NoOffsetForPartitionError: if no offset is stored for a given
partition and no reset policy is available
KafkaTimeoutError if timeout_ms provided.
"""
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions')
# reset the fetch position to the committed position
for tp in partitions:
if not self._subscriptions.is_assigned(tp):
Expand All @@ -182,12 +188,12 @@ def update_fetch_positions(self, partitions):
continue

if self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp)
self._reset_offset(tp, timeout_ms=inner_timeout_ms())
elif self._subscriptions.assignment[tp].committed is None:
# there's no committed position, so we need to reset with the
# default strategy
self._subscriptions.need_offset_reset(tp)
self._reset_offset(tp)
self._reset_offset(tp, timeout_ms=inner_timeout_ms())
else:
committed = self._subscriptions.assignment[tp].committed.offset
log.debug("Resetting offset for partition %s to the committed"
Expand Down Expand Up @@ -216,14 +222,15 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
offsets[tp] = offsets[tp].offset
return offsets

def _reset_offset(self, partition):
def _reset_offset(self, partition, timeout_ms=None):
"""Reset offsets for the given partition using the offset reset strategy.

Arguments:
partition (TopicPartition): the partition that needs reset offset

Raises:
NoOffsetForPartitionError: if no offset reset strategy is defined
KafkaTimeoutError if timeout_ms provided
"""
timestamp = self._subscriptions.assignment[partition].reset_strategy
if timestamp is OffsetResetStrategy.EARLIEST:
Expand All @@ -235,7 +242,7 @@ def _reset_offset(self, partition):

log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
offsets = self._retrieve_offsets({partition: timestamp})
offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms=timeout_ms)

if partition in offsets:
offset = offsets[partition].offset
Expand Down Expand Up @@ -263,11 +270,14 @@ def _retrieve_offsets(self, timestamps, timeout_ms=None):
retrieved offset, timestamp, and leader_epoch. If offset does not exist for
the provided timestamp, that partition will be missing from
this mapping.

Raises:
KafkaTimeoutError if timeout_ms provided
"""
if not timestamps:
return {}

inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout attempting to find coordinator')
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout fetching offsets')
timestamps = copy.copy(timestamps)
while True:
if not timestamps:
Expand Down
84 changes: 45 additions & 39 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import socket
import time

from kafka.errors import KafkaConfigurationError, UnsupportedVersionError
from kafka.errors import KafkaConfigurationError, KafkaTimeoutError, UnsupportedVersionError

from kafka.vendor import six

Expand All @@ -18,6 +18,7 @@
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.list_offsets import OffsetResetStrategy
from kafka.structs import OffsetAndMetadata, TopicPartition
from kafka.util import timeout_ms_fn
from kafka.version import __version__

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -521,7 +522,7 @@ def commit_async(self, offsets=None, callback=None):
offsets, callback=callback)
return future

def commit(self, offsets=None):
def commit(self, offsets=None, timeout_ms=None):
"""Commit offsets to kafka, blocking until success or error.

This commits offsets only to Kafka. The offsets committed using this API
Expand All @@ -545,9 +546,9 @@ def commit(self, offsets=None):
assert self.config['group_id'] is not None, 'Requires group_id'
if offsets is None:
offsets = self._subscription.all_consumed_offsets()
self._coordinator.commit_offsets_sync(offsets)
self._coordinator.commit_offsets_sync(offsets, timeout_ms=timeout_ms)

def committed(self, partition, metadata=False):
def committed(self, partition, metadata=False, timeout_ms=None):
"""Get the last committed offset for the given partition.

This offset will be used as the position for the consumer
Expand All @@ -564,6 +565,9 @@ def committed(self, partition, metadata=False):

Returns:
The last committed offset (int or OffsetAndMetadata), or None if there was no prior commit.

Raises:
KafkaTimeoutError if timeout_ms provided
"""
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
assert self.config['group_id'] is not None, 'Requires group_id'
Expand All @@ -572,10 +576,10 @@ def committed(self, partition, metadata=False):
if self._subscription.is_assigned(partition):
committed = self._subscription.assignment[partition].committed
if committed is None:
self._coordinator.refresh_committed_offsets_if_needed()
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms)
committed = self._subscription.assignment[partition].committed
else:
commit_map = self._coordinator.fetch_committed_offsets([partition])
commit_map = self._coordinator.fetch_committed_offsets([partition], timeout_ms=timeout_ms)
if partition in commit_map:
committed = commit_map[partition]
else:
Expand Down Expand Up @@ -670,17 +674,13 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
assert not self._closed, 'KafkaConsumer is closed'

# Poll for new data until the timeout expires
start = time.time()
remaining = timeout_ms
inner_timeout_ms = timeout_ms_fn(timeout_ms, None)
while not self._closed:
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
records = self._poll_once(inner_timeout_ms(), max_records, update_offsets=update_offsets)
if records:
return records

elapsed_ms = (time.time() - start) * 1000
remaining = timeout_ms - elapsed_ms

if remaining <= 0:
if inner_timeout_ms() <= 0:
break

return {}
Expand All @@ -695,14 +695,14 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
Returns:
dict: Map of topic to list of records (may be empty).
"""
begin = time.time()
if not self._coordinator.poll(timeout_ms=timeout_ms):
inner_timeout_ms = timeout_ms_fn(timeout_ms, None)
if not self._coordinator.poll(timeout_ms=inner_timeout_ms()):
return {}

# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
if not self._subscription.has_all_fetch_positions():
self._update_fetch_positions(self._subscription.missing_fetch_positions())
self._update_fetch_positions(self._subscription.missing_fetch_positions(), timeout_ms=inner_timeout_ms())

# If data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
Expand All @@ -723,9 +723,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
if len(futures):
self._client.poll(timeout_ms=0)

timeout_ms -= (time.time() - begin) * 1000
timeout_ms = max(0, min(timeout_ms, self._coordinator.time_to_next_poll() * 1000))
self._client.poll(timeout_ms=timeout_ms)
self._client.poll(timeout_ms=inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000))
# after the long poll, we should check whether the group needs to rebalance
# prior to returning data so that the group can stabilize faster
if self._coordinator.need_rejoin():
Expand All @@ -734,7 +732,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
return records

def position(self, partition):
def position(self, partition, timeout_ms=None):
"""Get the offset of the next record that will be fetched

Arguments:
Expand All @@ -748,7 +746,7 @@ def position(self, partition):
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
position = self._subscription.assignment[partition].position
if position is None:
self._update_fetch_positions([partition])
self._update_fetch_positions([partition], timeout_ms=timeout_ms)
position = self._subscription.assignment[partition].position
return position.offset if position else None

Expand Down Expand Up @@ -1103,35 +1101,43 @@ def _use_consumer_group(self):
return False
return True

def _update_fetch_positions(self, partitions):
def _update_fetch_positions(self, partitions, timeout_ms=None):
"""Set the fetch position to the committed position (if there is one)
or reset it using the offset reset policy the user has configured.

Arguments:
partitions (List[TopicPartition]): The partitions that need
updating fetch positions.

Returns True if fetch positions updated, False if timeout

Raises:
NoOffsetForPartitionError: If no offset is stored for a given
partition and no offset reset policy is defined.
"""
# Lookup any positions for partitions which are awaiting reset (which may be the
# case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do
# this check first to avoid an unnecessary lookup of committed offsets (which
# typically occurs when the user is manually assigning partitions and managing
# their own offsets).
self._fetcher.reset_offsets_if_needed(partitions)

if not self._subscription.has_all_fetch_positions():
# if we still don't have offsets for all partitions, then we should either seek
# to the last committed position or reset using the auto reset policy
if (self.config['api_version'] >= (0, 8, 1) and
self.config['group_id'] is not None):
# first refresh commits for all assigned partitions
self._coordinator.refresh_committed_offsets_if_needed()

# Then, do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions')
try:
# Lookup any positions for partitions which are awaiting reset (which may be the
# case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do
# this check first to avoid an unnecessary lookup of committed offsets (which
# typically occurs when the user is manually assigning partitions and managing
# their own offsets).
self._fetcher.reset_offsets_if_needed(partitions, timeout_ms=inner_timeout_ms())

if not self._subscription.has_all_fetch_positions():
# if we still don't have offsets for all partitions, then we should either seek
# to the last committed position or reset using the auto reset policy
if (self.config['api_version'] >= (0, 8, 1) and
self.config['group_id'] is not None):
# first refresh commits for all assigned partitions
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=inner_timeout_ms())

# Then, do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions, timeout_ms=inner_timeout_ms())
return True

except KafkaTimeoutError:
return False

def _message_generator_v2(self):
timeout_ms = 1000 * max(0, self._consumer_timeout - time.time())
Expand Down
5 changes: 3 additions & 2 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def group_protocols(self):
pass

@abc.abstractmethod
def _on_join_prepare(self, generation, member_id):
def _on_join_prepare(self, generation, member_id, timeout_ms=None):
"""Invoked prior to each group join or rejoin.

This is typically used to perform any cleanup from the previous
Expand Down Expand Up @@ -415,7 +415,8 @@ def join_group(self, timeout_ms=None):
# while another rebalance is still in progress.
if not self.rejoining:
self._on_join_prepare(self._generation.generation_id,
self._generation.member_id)
self._generation.member_id,
timeout_ms=inner_timeout_ms())
self.rejoining = True

# fence off the heartbeat thread explicitly so that it cannot
Expand Down
Loading
Loading