Skip to content

Seek method returning incorrect messages on compressed topic when using max_poll_records #1214

Closed
@88manpreet

Description

@88manpreet

While using seek method of `kafka.consumer.group.seek' for a given partition, offset, we are seeing the inconsistent behavior for the messages returned with the subsequent poll method.
The issue is easily reproducible for the given topic (compacted).

Part of Workflow:

from kafka.consumer.group import KafkaConsumer

topic_partition = TopicPartition(topic, 0)
consumer = KafkaConsumer(*consumer_config)
consumer.assign([topic_partition]) 
start_offset = 100 # Example value: highwatermark - 10
consumer.seek(partition=topic_partition, offset=start_offset)
messages = consumer.poll(timeout_ms=1000, max_records=1)[topic_partition]
message = messages[0]
print('Offset found:', message.offset, 'Expected offset:', start_offset)

Sample Output:
$ Offset found:80 Expected offset:100

Observation:

  • If iterator interface is used instead of poll interface, the issue no longer exists. My guess is somewhere while polling for messages, the fetched offsets are not updated or fetched messages are not skipped. It looks like iterator method is not using fetched_records api that's why it works fine.

  • At times it does give correct messages (especially when given offset is closer to highwatermark)

Please let me know if any other details are required.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions