Skip to content

ClusterMetadata update_metadata method is not complete facing different API_VERSION of MetadataResponse  #2365

Closed
@Jijun-TANG

Description

@Jijun-TANG

The code below in kafka-python-cluster.py at line 280 supposes that the API_VERSION of MetadataResponse doesn't contain version 5. So it forgets a field 'offline_replicas'. So it won't work when PartitionMetadata is of API_VERSION 5.

So I purpose a correction like below around line 280 in 'https://github.com/dpkp/kafka-python/blob/master/kafka/cluster.py':

if metadata.API_VERSION==5:
    for p_error, partition, leader, replicas, isr, offline_replicas in partitions:
        _new_partitions[topic][partition] = PartitionMetadata(
            topic=topic, partition=partition, leader=leader,
            replicas=replicas, isr=isr, offline_replicas = offline_replicas, error=p_error)
        if leader != -1:
            _new_broker_partitions[leader].add(
                TopicPartition(topic, partition))
elif metadata.API_VERSION<5:
    for p_error, partition, leader, replicas, isr in partitions:
            _new_partitions[topic][partition] = PartitionMetadata(
                topic=topic, partition=partition, leader=leader,
                replicas=replicas, isr=isr, offline_replicas = None, error=p_error)
            if leader != -1:
                _new_broker_partitions[leader].add(
                    TopicPartition(topic, partition))
else:
    raise ValueError(f"error unpacking: topic: {topic}, partitions: {partitions}, API VERSION:{metadata.API_VERSION}")

And add PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr", "offline_replicas", "error"]) to kafka/structs.py

Hope this will help!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions