Skip to content

TopicAndPartition fix when partition leader = -1 #109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 22, 2014
33 changes: 26 additions & 7 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from kafka.common import (ErrorMapping, TopicAndPartition,
ConnectionError, FailedPayloadsError,
BrokerResponseError, PartitionUnavailableError,
KafkaUnavailableError, KafkaRequestError)
LeaderUnavailableError,
KafkaUnavailableError)

from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
Expand Down Expand Up @@ -62,12 +63,22 @@ def _get_conn_for_broker(self, broker):
return self._get_conn(broker.host, broker.port)

def _get_leader_for_partition(self, topic, partition):
"""
Returns the leader for a partition or None if the partition exists
but has no leader.

PartitionUnavailableError will be raised if the topic or partition
is not part of the metadata.
"""

key = TopicAndPartition(topic, partition)
if key not in self.topics_to_brokers:
# reload metadata whether the partition is not available
# or has no leader (broker is None)
if self.topics_to_brokers.get(key) is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be if key not in self.topics_to_brokers:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because I want to handle cases where key is not in dict and when the value at key is None. Get returns None by default in both cases which suits my need.

Should I clarify the comment because that is what it is supposed to say.

self.load_metadata_for_topics(topic)

if key not in self.topics_to_brokers:
raise KafkaRequestError("Partition does not exist: %s" % str(key))
raise PartitionUnavailableError("%s not available" % str(key))

return self.topics_to_brokers[key]

Expand Down Expand Up @@ -124,8 +135,11 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
if leader == -1:
raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition)
if leader is None:
raise LeaderUnavailableError(
"Leader not available for topic %s partition %s" %
(payload.topic, payload.partition))

payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))

Expand Down Expand Up @@ -250,13 +264,18 @@ def load_metadata_for_topics(self, *topics):
self.reset_topic_metadata(topic)

if not partitions:
log.warning('No partitions for %s', topic)
continue

self.topic_partitions[topic] = []
for partition, meta in partitions.items():
topic_part = TopicAndPartition(topic, partition)
self.topics_to_brokers[topic_part] = brokers[meta.leader]
self.topic_partitions[topic].append(partition)
topic_part = TopicAndPartition(topic, partition)
if meta.leader == -1:
log.warning('No leader for topic %s partition %s', topic, partition)
self.topics_to_brokers[topic_part] = None
else:
self.topics_to_brokers[topic_part] = brokers[meta.leader]

def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
Expand Down
6 changes: 3 additions & 3 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ class KafkaError(RuntimeError):
pass


class KafkaRequestError(KafkaError):
class KafkaUnavailableError(KafkaError):
pass


class KafkaUnavailableError(KafkaError):
class BrokerResponseError(KafkaError):
pass


class BrokerResponseError(KafkaError):
class LeaderUnavailableError(KafkaError):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should stay close to the mainline scala client error types:

so maybe:

LeaderNotAvailableError
UnknownTopicOrPartitionError
BrokerNotAvailableError

looking at exception classes in
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/common/

and error codes in
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I juggled with that idea but none of the existing exception really maps 1:1 now so I preferred keeping the current naming convention (LeaderUnavailableError vs LeaderNotAvailableError) instead of having this specific one different.

pass


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def run(self):
version="0.9.0",

install_requires=["distribute"],
tests_require=["tox"],
tests_require=["tox", "mock"],
cmdclass={"test": Tox},

packages=["kafka"],
Expand Down
193 changes: 174 additions & 19 deletions test/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

from mock import MagicMock, patch


from kafka import KafkaClient
from kafka.common import (
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError,
LeaderUnavailableError, PartitionUnavailableError
)
from kafka.common import KafkaUnavailableError
from kafka.codec import (
has_gzip, has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
Expand Down Expand Up @@ -410,6 +410,7 @@ def test_encode_offset_request(self):
def test_decode_offset_response(self):
pass


@unittest.skip("Not Implemented")
def test_encode_offset_commit_request(self):
pass
Expand Down Expand Up @@ -464,18 +465,17 @@ def mock_get_conn(host, port):
return mocked_conns[(host, port)]

# patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):

client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])
with patch.object(KafkaClient, 'load_metadata_for_topics'):
with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])

self.assertRaises(
KafkaUnavailableError,
client._send_broker_unaware_request,
1, 'fake request')
self.assertRaises(
KafkaUnavailableError,
client._send_broker_unaware_request,
1, 'fake request')

for key, conn in mocked_conns.iteritems():
conn.send.assert_called_with(1, 'fake request')
for key, conn in mocked_conns.iteritems():
conn.send.assert_called_with(1, 'fake request')

def test_send_broker_unaware_request(self):
'Tests that call works when at least one of the host is available'
Expand All @@ -494,16 +494,171 @@ def mock_get_conn(host, port):
return mocked_conns[(host, port)]

# patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')

resp = client._send_broker_unaware_request(1, 'fake request')

self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)

@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_load_metadata(self, protocol, conn):
"Load metadata for all topics"

conn.recv.return_value = 'response' # anything but None

brokers = {}
brokers[0] = BrokerMetadata(1, 'broker_1', 4567)
brokers[1] = BrokerMetadata(2, 'broker_2', 5678)

topics = {}
topics['topic_1'] = {
0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2])
}
topics['topic_noleader'] = {
0: PartitionMetadata('topic_noleader', 0, -1, [], []),
1: PartitionMetadata('topic_noleader', 1, -1, [], [])
}
topics['topic_no_partitions'] = {}
topics['topic_3'] = {
0: PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1]),
1: PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0]),
2: PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1])
}
protocol.decode_metadata_response.return_value = (brokers, topics)

# client loads metadata at init
client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual({
TopicAndPartition('topic_1', 0): brokers[1],
TopicAndPartition('topic_noleader', 0): None,
TopicAndPartition('topic_noleader', 1): None,
TopicAndPartition('topic_3', 0): brokers[0],
TopicAndPartition('topic_3', 1): brokers[1],
TopicAndPartition('topic_3', 2): brokers[0]},
client.topics_to_brokers)

@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn):
"Get leader for partitions reload metadata if it is not available"

conn.recv.return_value = 'response' # anything but None

brokers = {}
brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
brokers[1] = BrokerMetadata(1, 'broker_2', 5678)

topics = {'topic_no_partitions': {}}
protocol.decode_metadata_response.return_value = (brokers, topics)

client = KafkaClient(hosts=['broker_1:4567'])

# topic metadata is loaded but empty
self.assertDictEqual({}, client.topics_to_brokers)

topics['topic_no_partitions'] = {
0: PartitionMetadata('topic_no_partitions', 0, 0, [0, 1], [0, 1])
}
protocol.decode_metadata_response.return_value = (brokers, topics)

# calling _get_leader_for_partition (from any broker aware request)
# will try loading metadata again for the same topic
leader = client._get_leader_for_partition('topic_no_partitions', 0)

self.assertEqual(brokers[0], leader)
self.assertDictEqual({
TopicAndPartition('topic_no_partitions', 0): brokers[0]},
client.topics_to_brokers)

@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_get_leader_for_unassigned_partitions(self, protocol, conn):
"Get leader raises if no partitions is defined for a topic"

conn.recv.return_value = 'response' # anything but None

brokers = {}
brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
brokers[1] = BrokerMetadata(1, 'broker_2', 5678)

topics = {'topic_no_partitions': {}}
protocol.decode_metadata_response.return_value = (brokers, topics)

client = KafkaClient(hosts=['broker_1:4567'])

self.assertDictEqual({}, client.topics_to_brokers)
self.assertRaises(
PartitionUnavailableError,
client._get_leader_for_partition,
'topic_no_partitions', 0)

@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_get_leader_returns_none_when_noleader(self, protocol, conn):
"Getting leader for partitions returns None when the partiion has no leader"

conn.recv.return_value = 'response' # anything but None

client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
brokers = {}
brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
brokers[1] = BrokerMetadata(1, 'broker_2', 5678)

topics = {}
topics['topic_noleader'] = {
0: PartitionMetadata('topic_noleader', 0, -1, [], []),
1: PartitionMetadata('topic_noleader', 1, -1, [], [])
}
protocol.decode_metadata_response.return_value = (brokers, topics)

client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual(
{
TopicAndPartition('topic_noleader', 0): None,
TopicAndPartition('topic_noleader', 1): None
},
client.topics_to_brokers)
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))

topics['topic_noleader'] = {
0: PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1]),
1: PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0])
}
protocol.decode_metadata_response.return_value = (brokers, topics)
self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))

@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_send_produce_request_raises_when_noleader(self, protocol, conn):
"Send producer request raises LeaderUnavailableError if leader is not available"

conn.recv.return_value = 'response' # anything but None

brokers = {}
brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
brokers[1] = BrokerMetadata(1, 'broker_2', 5678)

topics = {}
topics['topic_noleader'] = {
0: PartitionMetadata('topic_noleader', 0, -1, [], []),
1: PartitionMetadata('topic_noleader', 1, -1, [], [])
}
protocol.decode_metadata_response.return_value = (brokers, topics)

resp = client._send_broker_unaware_request(1, 'fake request')
client = KafkaClient(hosts=['broker_1:4567'])

self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
requests = [ProduceRequest(
"topic_noleader", 0,
[create_message("a"), create_message("b")])]

self.assertRaises(
LeaderUnavailableError,
client.send_produce_request, requests)

if __name__ == '__main__':
unittest.main()
4 changes: 3 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
[tox]
envlist = py26, py27
[testenv]
deps = pytest
deps =
pytest
mock
commands = py.test --basetemp={envtmpdir} []
setenv =
PROJECT_ROOT = {toxinidir}
Expand Down