Skip to content

Consumption with poll looses some (~30%) of messages forever #2173

Closed
@indywidualny

Description

@indywidualny

We noticed a critical bug (or expected behavior, you will tell but this is totally not expected IMHO) with Kafka poll(). It's very easy to reproduce I believe.

We initialize Kafka Consumer with the following code:

self.consumer = KafkaConsumer(*kafka_topics, group_id=self.group_id, bootstrap_servers=BOOTSTRAP_SERVERS,
    fetch_max_wait_ms=500, reconnect_backoff_ms=100,
    reconnect_backoff_max_ms=1000, connections_max_idle_ms=540000,
    request_timeout_ms=15000, heartbeat_interval_ms=2000,
    auto_offset_reset=self.auto_offset_reset, enable_auto_commit=False,
    metadata_max_age_ms=1500,
    partition_assignment_strategy=[StickyPartitionAssignor])

Variables:

self.auto_offset_reset = 'earliest'

Then we consume messages in an infinite loop calling the function below constantly. When we want to consume something we ask for max_records=N (N is between 1..25 in our case) and commit afterwards. When we don't want any messages we simply consume max_records=1 and don't commit. This ensures Kafka being polled all the time so heartbeats are being sent and consumer stays all the time active. We simply take care of the commit ourselves.

    def get_messages(self, max_records=1):
        data = self.consumer.poll(timeout_ms=self.timeout, max_records=max_records)
        ready_messages = list()
        if data:
            for tp, messages in data.items():
                for message in messages:
                    logger.debug("Consumed %s:%d:%d: key=%s value=%s" % (tp.topic, tp.partition, message.offset,
                                                                         message.key, message.value))
                ready_messages.extend(messages)
        return ready_messages

Here's the most interesting part. Let's say we post 1000 messages to an empty topic. We do count all the messages consumed since consumer was started. We noticed a horrible, unforgivable behavior. Although all the 1000 messages are marked as consumed (offset moved) we're receiving about 700 of them via get_messages() only!

What happens to hundreds of messages which are lost forever? I cannot believe nobody noticed it but this is a critical bug to be reported, the one which prevents us from using the library instantly.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions