Skip to content

Commit b253166

Browse files
committed
check for broker None in send_broker_aware_request (added test for it)
1 parent 72fdf39 commit b253166

File tree

2 files changed

+42
-5
lines changed

2 files changed

+42
-5
lines changed

kafka/client.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,22 @@ def _get_conn_for_broker(self, broker):
5252
return self.conns[(broker.host, broker.port)]
5353

5454
def _get_leader_for_partition(self, topic, partition):
55+
"""
56+
Returns the leader for a partition or None if the partition exists
57+
but has no leader.
58+
59+
PartitionUnavailableError will be raised if the topic or partition
60+
is not part of the metadata.
61+
"""
62+
5563
key = TopicAndPartition(topic, partition)
5664
# reload metadata whether the partition is not available
57-
# or has not leader (broker is None)
65+
# or has no leader (broker is None)
5866
if self.topics_to_brokers.get(key) is None:
5967
self.load_metadata_for_topics(topic)
6068

6169
if key not in self.topics_to_brokers:
62-
raise PartitionUnavailableError("No leader for %s" % str(key))
70+
raise PartitionUnavailableError("%s not available" % str(key))
6371

6472
return self.topics_to_brokers[key]
6573

@@ -115,8 +123,9 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
115123
for payload in payloads:
116124
leader = self._get_leader_for_partition(payload.topic,
117125
payload.partition)
118-
if leader == -1:
119-
raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition)
126+
if leader is None:
127+
raise PartitionUnavailableError(
128+
"No leader for topic %s partition %s" % (payload.topic, payload.partition))
120129
payloads_by_broker[leader].append(payload)
121130
original_keys.append((payload.topic, payload.partition))
122131

@@ -249,7 +258,7 @@ def load_metadata_for_topics(self, *topics):
249258
self.topic_partitions[topic].append(partition)
250259
topic_part = TopicAndPartition(topic, partition)
251260
if meta.leader == -1:
252-
log.info('No leader for topic %s partition %d', topic, partition)
261+
log.info('No leader for topic %s partition %s', topic, partition)
253262
self.topics_to_brokers[topic_part] = None
254263
else:
255264
self.topics_to_brokers[topic_part] = brokers[meta.leader]

test/test_unit.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,5 +517,33 @@ def test_get_leader_returns_none_when_noleader(self, protocol, conn):
517517
self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
518518
self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))
519519

520+
@patch('kafka.client.KafkaConnection')
521+
@patch('kafka.client.KafkaProtocol')
522+
def test_send_produce_request_raises_when_noleader(self, protocol, conn):
523+
"Getting leader for partitions returns None when the partiion has no leader"
524+
525+
conn.recv.return_value = 'response' # anything but None
526+
527+
brokers = {}
528+
brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
529+
brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
530+
531+
topics = {}
532+
topics['topic_noleader'] = {
533+
0: PartitionMetadata('topic_noleader', 0, -1, [], []),
534+
1: PartitionMetadata('topic_noleader', 1, -1, [], [])
535+
}
536+
protocol.decode_metadata_response.return_value = (brokers, topics)
537+
538+
client = KafkaClient(host='broker_1', port=4567)
539+
540+
requests = [ProduceRequest(
541+
"topic_noleader", 0,
542+
[create_message("a"), create_message("b")])]
543+
544+
self.assertRaises(
545+
PartitionUnavailableError,
546+
client.send_produce_request, requests)
547+
520548
if __name__ == '__main__':
521549
unittest.main()

0 commit comments

Comments
 (0)