Skip to content

No infinite loops during metadata requests, invalidate metadata more, exception hierarchy #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
fa351d9
Add error hierarchy, remove client infinite loop
turtlesoupy Jan 8, 2014
7d7a587
Better error handling on broker exception, flake8 fixes
turtlesoupy Jan 8, 2014
d842c78
Add back the global load metadata on client initialization
turtlesoupy Jan 8, 2014
5cccd63
No exception for empty partitions, fix topic metadata reset
turtlesoupy Jan 8, 2014
0dd8c3f
Change exception names, make a new "KafkaRequestError"
turtlesoupy Jan 10, 2014
ec23d94
Check for socket status on read as well as send
turtlesoupy Jan 7, 2014
ae44207
Propagate error immediately if dirty
turtlesoupy Jan 7, 2014
6fe9dce
Enable absolute imports for modules using Queue.
Oct 21, 2013
40db4e1
Allow customizing socket timeouts.
rdiomar Dec 19, 2013
6d691b9
Read the correct number of bytes from kafka.
rdiomar Dec 19, 2013
be564d9
* Guarantee reading the expected number of bytes from the socket ever…
rdiomar Dec 19, 2013
d7387cd
Allow None timeout in FetchContext even if block is False
rdiomar Dec 19, 2013
a359a13
Reset consumer fields to original values rather than defaults in Fetc…
rdiomar Dec 19, 2013
6480712
SimpleConsumer flow changes:
rdiomar Dec 19, 2013
8f301af
Remove SimpleConsumer queue size limit since it can cause the iterator
rdiomar Dec 19, 2013
9d9c209
Add buffer_size param description to docstring
rdiomar Dec 19, 2013
e5ed16b
Add iter_timeout option to SimpleConsumer. If not None, it causes the…
rdiomar Dec 19, 2013
b2219ab
Add comments and maintain 80 character line limit
rdiomar Dec 19, 2013
21b46b1
Add and fix comments to protocol.py
rdiomar Jan 3, 2014
f332985
Add note about questionable error handling while decoding messages.
rdiomar Jan 3, 2014
c9205fe
Fix unit tests.
rdiomar Jan 3, 2014
165eb8b
Style fix for imports
rdiomar Jan 3, 2014
ca59b2f
Fix seek offset deltas
rdiomar Jan 3, 2014
2a55bd3
Raise a ConnectionError when a socket.error is raised when receiving …
rdiomar Jan 3, 2014
4ba0943
Fix client error handling
rdiomar Jan 3, 2014
02932af
Add a limit to fetch buffer size, and actually retry requests when fe…
rdiomar Jan 3, 2014
7c6e519
Handle starting/stopping Kafka brokers that are already started/stopp…
rdiomar Jan 3, 2014
f5ffd85
Remove unnecessary brackets
rdiomar Jan 4, 2014
33551ba
Fix client and consumer params in integration tests
rdiomar Jan 4, 2014
78f7caa
Add tests for limited and unlimited consumer max_buffer_size
rdiomar Jan 4, 2014
dbc3d80
Make kafka brokers per-test in failover integration tests
rdiomar Jan 6, 2014
36c3930
Add object type and ID to message prefix in fixtures output for easie…
rdiomar Jan 6, 2014
3a12e5c
Use the same timeout when reinitializing a connection
rdiomar Jan 8, 2014
11b09c1
Handle dirty flag in conn.recv()
rdiomar Jan 8, 2014
adcfff0
Remove unnecessary method
rdiomar Jan 8, 2014
283ce22
Skip snappy/gzip tests if they're not available
rdiomar Jan 8, 2014
27ae269
Some cleanup and easier to read test fixture output
rdiomar Jan 8, 2014
cf4d220
Change BufferUnderflowError to ConnectionError in conn._read_bytes()
rdiomar Jan 8, 2014
6899063
Change log.error() back to log.exception()
rdiomar Jan 8, 2014
adb7a63
Check for socket status on read as well as send
turtlesoupy Jan 7, 2014
4bf32b9
Propagate error immediately if dirty
turtlesoupy Jan 7, 2014
9b7d863
Add error hierarchy, remove client infinite loop
turtlesoupy Jan 8, 2014
ab79531
Better error handling on broker exception, flake8 fixes
turtlesoupy Jan 8, 2014
fbec8ff
No exception for empty partitions, fix topic metadata reset
turtlesoupy Jan 8, 2014
1118bbd
Change exception names, make a new "KafkaRequestError"
turtlesoupy Jan 10, 2014
88b8ff3
Fix test errors after rebase
turtlesoupy Jan 13, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 74 additions & 77 deletions kafka/client.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import copy
import logging

from collections import defaultdict
from functools import partial
from itertools import count
import logging
import socket
import time

from kafka.common import ErrorMapping, TopicAndPartition
from kafka.common import ConnectionError, FailedPayloadsException
from kafka.common import ErrorMapping, TopicAndPartition, \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: you can combine multiple import lines by enclosing them within parenthesis instead of line continuation with \

from kafka.common import (
    ErrorMapping, TopicAndPartition,
    ConnectionError, FailedPayloadsException, BrokerErrorResponse,
    PartitionUnavailableError, KafkaException
)

ConnectionError, FailedPayloadsException, BrokerErrorResponse, \
PartitionUnavailableError, KafkaException
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol

Expand All @@ -29,7 +29,7 @@ def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
self._load_metadata_for_topics()
self.load_metadata_for_topics() # bootstrap with all metadata

##################
# Private API #
Expand All @@ -48,55 +48,13 @@ def _get_conn_for_broker(self, broker):
def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition)
if key not in self.topics_to_brokers:
self._load_metadata_for_topics(topic)
self.load_metadata_for_topics(topic)

if key not in self.topics_to_brokers:
raise Exception("Partition does not exist: %s" % str(key))
raise BrokerErrorResponse("Partition does not exist: %s" % str(key))

return self.topics_to_brokers[key]

def _load_metadata_for_topics(self, *topics):
"""
Discover brokers and metadata for a set of topics. This method will
recurse in the event of a retry.
"""
request_id = self._next_id()
request = KafkaProtocol.encode_metadata_request(self.client_id,
request_id, topics)

response = self._send_broker_unaware_request(request_id, request)
if response is None:
raise Exception("All servers failed to process request")

(brokers, topics) = KafkaProtocol.decode_metadata_response(response)

log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics)

self.brokers = brokers
self.topics_to_brokers = {}

for topic, partitions in topics.items():
# Clear the list once before we add it. This removes stale entries
# and avoids duplicates
self.topic_partitions.pop(topic, None)

if not partitions:
log.info("Partition is unassigned, delay for 1s and retry")
time.sleep(1)
self._load_metadata_for_topics(topic)
break

for partition, meta in partitions.items():
if meta.leader == -1:
log.info("Partition is unassigned, delay for 1s and retry")
time.sleep(1)
self._load_metadata_for_topics(topic)
else:
topic_part = TopicAndPartition(topic, partition)
self.topics_to_brokers[topic_part] = brokers[meta.leader]
self.topic_partitions[topic].append(partition)

def _next_id(self):
"""
Generate a new correlation id
Expand All @@ -118,7 +76,7 @@ def _send_broker_unaware_request(self, requestId, request):
"trying next server: %s" % (request, conn, e))
continue

return None
raise KafkaException("All servers failed to process request")

def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
"""
Expand Down Expand Up @@ -149,6 +107,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
if leader == -1:
raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition)
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))

Expand All @@ -174,7 +134,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
except ConnectionError, e: # ignore BufferUnderflow for now
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
failed_payloads += payloads
self.topics_to_brokers = {} # reset metadata
self.reset_all_metadata()
continue

for response in decoder_fn(response):
Expand All @@ -186,9 +146,27 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys) if acc else ()

def _raise_on_response_error(self, resp):
if resp.error == ErrorMapping.NO_ERROR:
return

if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON,
ErrorMapping.NOT_LEADER_FOR_PARTITION):
self.reset_topic_metadata(resp.topic)

raise BrokerErrorResponse(
"Request for %s failed with errorcode=%d" %
(TopicAndPartition(resp.topic, resp.partition), resp.error))

#################
# Public API #
#################
def reset_topic_metadata(self, *topics):
for topic in topics:
del self.topics_to_brokers[topic]

def reset_all_metadata(self):
self.topics_to_brokers = {}

def close(self):
for conn in self.conns.values():
Expand All @@ -208,6 +186,38 @@ def reinit(self):
for conn in self.conns.values():
conn.reinit()

def load_metadata_for_topics(self, *topics):
"""
Discover brokers and metadata for a set of topics. This function is called
lazily whenever metadata is unavailable.
"""
request_id = self._next_id()
request = KafkaProtocol.encode_metadata_request(self.client_id,
request_id, topics)

response = self._send_broker_unaware_request(request_id, request)

(brokers, topics) = KafkaProtocol.decode_metadata_response(response)

log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics)

self.brokers = brokers
self.topics_to_brokers = {}

for topic, partitions in topics.items():
# Clear the list once before we add it. This removes stale entries
# and avoids duplicates
self.topic_partitions.pop(topic, None)

if not partitions:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raise here and the client is unable to work with topics that are in perfectly good shape. Your client might not even care about topic with unassigned partitions

You could let if flow, any further call for this topic will generate a call to this function specifically for that topic. It will eventually fail if you still don't have a leader for the said partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, this doesn't work well for wildcard metadata queries. I like the strategy of throwing an exception on demand if you try to send to a bad topic (which is currently done if leader is -1). Let me see what I can do

raise PartitionUnavailableError("Partitions for %s are unassigned!" % topic)

for partition, meta in partitions.items():
topic_part = TopicAndPartition(topic, partition)
self.topics_to_brokers[topic_part] = brokers[meta.leader]
self.topic_partitions[topic].append(partition)

def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
Expand Down Expand Up @@ -245,14 +255,9 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,

out = []
for resp in resps:
# Check for errors
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
raise Exception(
"ProduceRequest for %s failed with errorcode=%d" %
(TopicAndPartition(resp.topic, resp.partition),
resp.error))

# Run the callback
if fail_on_error is True:
self._raise_on_response_error(resp)

if callback is not None:
out.append(callback(resp))
else:
Expand All @@ -278,14 +283,9 @@ def send_fetch_request(self, payloads=[], fail_on_error=True,

out = []
for resp in resps:
# Check for errors
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
raise Exception(
"FetchRequest for %s failed with errorcode=%d" %
(TopicAndPartition(resp.topic, resp.partition),
resp.error))

# Run the callback
if fail_on_error is True:
self._raise_on_response_error(resp)

if callback is not None:
out.append(callback(resp))
else:
Expand All @@ -301,9 +301,8 @@ def send_offset_request(self, payloads=[], fail_on_error=True,

out = []
for resp in resps:
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
raise Exception("OffsetRequest failed with errorcode=%s",
resp.error)
if fail_on_error is True:
self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
else:
Expand All @@ -319,9 +318,8 @@ def send_offset_commit_request(self, group, payloads=[],

out = []
for resp in resps:
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
raise Exception("OffsetCommitRequest failed with "
"errorcode=%s", resp.error)
if fail_on_error is True:
self._raise_on_response_error(resp)

if callback is not None:
out.append(callback(resp))
Expand All @@ -339,9 +337,8 @@ def send_offset_fetch_request(self, group, payloads=[],

out = []
for resp in resps:
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
raise Exception("OffsetCommitRequest failed with errorcode=%s",
resp.error)
if fail_on_error is True:
self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
else:
Expand Down
27 changes: 21 additions & 6 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,38 @@ class ErrorMapping(object):
# Exceptions #
#################

class FailedPayloadsException(Exception):

class KafkaException(RuntimeError):
pass


class BrokerErrorResponse(KafkaException):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BrokerErrorException, BrokerResponseError or just BrokerError? http://www.python.org/dev/peps/pep-0008/#exception-names

This actually reports an error from the broker, is it too generic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably is too generic. I'll update the naming conventions and submit a PR in a bit (anxious to get this merged ;) )

pass


class PartitionUnavailableError(KafkaException):
pass


class FailedPayloadsException(KafkaException):
pass

class ConnectionError(Exception):

class ConnectionError(KafkaException):
pass

class BufferUnderflowError(Exception):

class BufferUnderflowError(KafkaException):
pass


class ChecksumError(Exception):
class ChecksumError(KafkaException):
pass


class ConsumerFetchSizeTooSmall(Exception):
class ConsumerFetchSizeTooSmall(KafkaException):
pass


class ConsumerNoMoreData(Exception):
class ConsumerNoMoreData(KafkaException):
pass
2 changes: 1 addition & 1 deletion kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
self.client = client
self.topic = topic
self.group = group
self.client._load_metadata_for_topics(topic)
self.client.load_metadata_for_topics(topic)
self.offsets = {}

if not partitions:
Expand Down
15 changes: 7 additions & 8 deletions kafka/producer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import logging
import time

from Queue import Empty
from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
from Queue import Empty
import logging
import sys
import time

from kafka.common import ProduceRequest
from kafka.common import FailedPayloadsException
from kafka.protocol import create_message
from kafka.partitioner import HashedPartitioner
from kafka.protocol import create_message

log = logging.getLogger("kafka")

Expand Down Expand Up @@ -186,7 +185,7 @@ def __init__(self, client, topic, async=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
client._load_metadata_for_topics(topic)
client.load_metadata_for_topics(topic)
self.next_partition = cycle(client.topic_partitions[topic])

super(SimpleProducer, self).__init__(client, async, req_acks,
Expand Down Expand Up @@ -223,7 +222,7 @@ def __init__(self, client, topic, partitioner=None, async=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
client._load_metadata_for_topics(topic)
client.load_metadata_for_topics(topic)

if not partitioner:
partitioner = HashedPartitioner
Expand Down
Loading