Closed
Description
The code below in kafka-python-cluster.py at line 280 supposes that the API_VERSION of MetadataResponse doesn't contain version 5. So it forgets a field 'offline_replicas'. So it won't work when PartitionMetadata is of API_VERSION 5.
So I purpose a correction like below around line 280 in 'https://github.com/dpkp/kafka-python/blob/master/kafka/cluster.py':
if metadata.API_VERSION==5:
for p_error, partition, leader, replicas, isr, offline_replicas in partitions:
_new_partitions[topic][partition] = PartitionMetadata(
topic=topic, partition=partition, leader=leader,
replicas=replicas, isr=isr, offline_replicas = offline_replicas, error=p_error)
if leader != -1:
_new_broker_partitions[leader].add(
TopicPartition(topic, partition))
elif metadata.API_VERSION<5:
for p_error, partition, leader, replicas, isr in partitions:
_new_partitions[topic][partition] = PartitionMetadata(
topic=topic, partition=partition, leader=leader,
replicas=replicas, isr=isr, offline_replicas = None, error=p_error)
if leader != -1:
_new_broker_partitions[leader].add(
TopicPartition(topic, partition))
else:
raise ValueError(f"error unpacking: topic: {topic}, partitions: {partitions}, API VERSION:{metadata.API_VERSION}")
And add PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr", "offline_replicas", "error"])
to kafka/structs.py
Hope this will help!
Metadata
Metadata
Assignees
Labels
No labels