Skip to content

KafkaConsumer.position can block forever when single broker is unreachable #1346

Closed
@lconnell

Description

@lconnell

I have a situation where I am attempting to get the offset of the next record for each partition in a topic. When one of my brokers are down/unreachable via network, then it blocks. I can't figure out which of these timeout options I can tune to break out of this.

consumer = KafkaConsumer(bootstrap_servers=config.brokers, api_version=(0,10,1), reconnect_backoff_max_ms=100, fetch_max_wait_ms=config.session_timeout_ms, session_timeout_ms=config.session_timeout_ms, \
                        request_timeout_ms=config.request_timeout_ms, consumer_timeout_ms=2000, auto_offset_reset='latest')
        ps = [TopicPartition(config.topic, p) for p in consumer.partitions_for_topic(config.topic)]
        print("Assigning...")
        consumer.assign(ps)
        for p in ps:
            print(p)
            pos = consumer.position(p)

This will block at pos = consumer.position(p) once it attempts to retrieve the position from a partition belonging to the unreachable broker.

  File "/usr/lib64/nagios/plugins/kafka/consumer/fetcher.py", line 194, in update_fetch_positions
    self._reset_offset(tp)
  File "/usr/lib64/nagios/plugins/kafka/consumer/fetcher.py", line 245, in _reset_offset
    offsets = self._retrieve_offsets({partition: timestamp})
  File "/usr/lib64/nagios/plugins/kafka/consumer/fetcher.py", line 295, in _retrieve_offsets
    time.sleep(self.config['retry_backoff_ms'] / 1000.0)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions