Skip to content

Seeking on a consumer from the rebalance listener causes consumer to silently discard records #2310

Closed
@rvesse

Description

@rvesse

Discovered what appears to be a multi-threading bug that can be provoked by trying to do a seek from a rebalance listener. After the seek the consumer returns the record at the seeked offset and then silently resets the position to some subsequent offset potentially skipping a bunch of records.

Turning on Python logging shows that the fetcher is incorrectly thinking that there are compacted/deleted records to skip and resetting the position i.e. the code block at

# advance position for any deleted compacted messages if required
if self._subscriptions.assignment[partition].last_offset_from_message_batch:
next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_message_batch + 1
if next_offset_from_batch_header > self._subscriptions.assignment[partition].position:
log.debug(
"Advance position for partition %s from %s to %s (last message batch location plus one)"
" to correct for deleted compacted messages",
partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header)
self._subscriptions.assignment[partition].position = next_offset_from_batch_header
is triggered

I understand that the package is not fully type-safe so this probably should not be expected to work but wanted to document in case anyone else runs into this.

Code to Reproduce

def str_to_bytes(value: str) -> bytes:
    return value.encode("UTF-8")


def bytes_to_str(value: bytes) -> str:
    return value.decode("UTF-8")


def consumer_fetch_misses_records():
    producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
                             key_serializer=str_to_bytes,
                             value_serializer=str_to_bytes)
    for i in range(1, 11):
        producer.send(topic="test", key=str(i), value=str(i))

    producer.close()

    consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test",
                             key_deserializer=bytes_to_str,
                             value_deserializer=bytes_to_str)
    partition = TopicPartition("test", 0)
    listener = TestRebalanceListener(consumer)
    consumer.subscribe("test", listener=listener)

    for i, record in enumerate(consumer):
        print(f"[{record.topic}-{record.partition}#{record.offset}] {record.key}: {record.value}")

class TestRebalanceListener(ConsumerRebalanceListener):

    def __init__(self, consumer):
        self.consumer = consumer

    def on_partitions_revoked(self, revoked):
        pass

    def on_partitions_assigned(self, assigned):
        for partition in assigned:
            self.consumer.seek(partition, 0)
            self.consumer.commit(offsets=dict([(partition, OffsetAndMetadata(0, "Reset to beginning"))]))


if __name__ == "__main__":
    consumer_fetch_misses_records()

Produces the following output:

[test-0#0] 1: 1

When it should produce all 10 lines i.e.:

[test-0#0] 1: 1
[test-0#0] 1: 1
[test-0#1] 2: 2
[test-0#2] 3: 3
[test-0#3] 4: 4
[test-0#4] 5: 5
[test-0#5] 6: 6
[test-0#6] 7: 7
[test-0#7] 8: 8
[test-0#8] 9: 9
[test-0#9] 10: 10

Workaround

The workaround I found is to have the rebalance listener set a boolean flag when the assignment changes and then have my main loop seek appropriately e.g.

def consumer_fetch_misses_records():
    producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
                             key_serializer=str_to_bytes,
                             value_serializer=str_to_bytes)
    for i in range(1, 11):
        producer.send(topic="test", key=str(i), value=str(i))

    producer.close()

    consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test",
                             key_deserializer=bytes_to_str,
                             value_deserializer=bytes_to_str)
    partition = TopicPartition("test", 0)
    listener = TestRebalanceListener()
    consumer.subscribe("test", listener=listener)

    for i, record in enumerate(consumer):
        if listener.should_seek:
            consumer.seek(partition, 0)
            consumer.commit(offsets=dict([(partition, OffsetAndMetadata(0, "Reset to beginning"))]))
            listener.should_seek = False
        print(f"[{record.topic}-{record.partition}#{record.offset}] {record.key}: {record.value}")


class TestRebalanceListener(ConsumerRebalanceListener):

    def __init__(self):
        self.should_seek = False

    def on_partitions_revoked(self, revoked):
        pass

    def on_partitions_assigned(self, assigned):
        self.should_seek = assigned is not None

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions