Closed
Description
While using seek method of `kafka.consumer.group.seek' for a given partition, offset, we are seeing the inconsistent behavior for the messages returned with the subsequent poll method.
The issue is easily reproducible for the given topic (compacted).
Part of Workflow:
from kafka.consumer.group import KafkaConsumer
topic_partition = TopicPartition(topic, 0)
consumer = KafkaConsumer(*consumer_config)
consumer.assign([topic_partition])
start_offset = 100 # Example value: highwatermark - 10
consumer.seek(partition=topic_partition, offset=start_offset)
messages = consumer.poll(timeout_ms=1000, max_records=1)[topic_partition]
message = messages[0]
print('Offset found:', message.offset, 'Expected offset:', start_offset)
Sample Output:
$ Offset found:80 Expected offset:100
Observation:
-
If iterator interface is used instead of poll interface, the issue no longer exists. My guess is somewhere while polling for messages, the fetched offsets are not updated or fetched messages are not skipped. It looks like iterator method is not using fetched_records api that's why it works fine.
-
At times it does give correct messages (especially when given offset is closer to highwatermark)
Please let me know if any other details are required.
Metadata
Metadata
Assignees
Labels
No labels