-
Notifications
You must be signed in to change notification settings - Fork 1.4k
No infinite loops during metadata requests, invalidate metadata more, exception hierarchy #91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
fa351d9
7d7a587
d842c78
5cccd63
0dd8c3f
ec23d94
ae44207
6fe9dce
40db4e1
6d691b9
be564d9
d7387cd
a359a13
6480712
8f301af
9d9c209
e5ed16b
b2219ab
21b46b1
f332985
c9205fe
165eb8b
ca59b2f
2a55bd3
4ba0943
02932af
7c6e519
f5ffd85
33551ba
78f7caa
dbc3d80
36c3930
3a12e5c
11b09c1
adcfff0
283ce22
27ae269
cf4d220
6899063
adb7a63
4bf32b9
9b7d863
ab79531
fbec8ff
1118bbd
88b8ff3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,13 @@ | ||
import copy | ||
import logging | ||
|
||
from collections import defaultdict | ||
from functools import partial | ||
from itertools import count | ||
import logging | ||
import socket | ||
import time | ||
|
||
from kafka.common import ErrorMapping, TopicAndPartition | ||
from kafka.common import ConnectionError, FailedPayloadsException | ||
from kafka.common import ErrorMapping, TopicAndPartition, \ | ||
ConnectionError, FailedPayloadsException, BrokerErrorResponse, \ | ||
PartitionUnavailableError, KafkaException | ||
from kafka.conn import KafkaConnection | ||
from kafka.protocol import KafkaProtocol | ||
|
||
|
@@ -29,7 +29,7 @@ def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): | |
self.brokers = {} # broker_id -> BrokerMetadata | ||
self.topics_to_brokers = {} # topic_id -> broker_id | ||
self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] | ||
self._load_metadata_for_topics() | ||
self.load_metadata_for_topics() # bootstrap with all metadata | ||
|
||
################## | ||
# Private API # | ||
|
@@ -48,55 +48,13 @@ def _get_conn_for_broker(self, broker): | |
def _get_leader_for_partition(self, topic, partition): | ||
key = TopicAndPartition(topic, partition) | ||
if key not in self.topics_to_brokers: | ||
self._load_metadata_for_topics(topic) | ||
self.load_metadata_for_topics(topic) | ||
|
||
if key not in self.topics_to_brokers: | ||
raise Exception("Partition does not exist: %s" % str(key)) | ||
raise BrokerErrorResponse("Partition does not exist: %s" % str(key)) | ||
|
||
return self.topics_to_brokers[key] | ||
|
||
def _load_metadata_for_topics(self, *topics): | ||
""" | ||
Discover brokers and metadata for a set of topics. This method will | ||
recurse in the event of a retry. | ||
""" | ||
request_id = self._next_id() | ||
request = KafkaProtocol.encode_metadata_request(self.client_id, | ||
request_id, topics) | ||
|
||
response = self._send_broker_unaware_request(request_id, request) | ||
if response is None: | ||
raise Exception("All servers failed to process request") | ||
|
||
(brokers, topics) = KafkaProtocol.decode_metadata_response(response) | ||
|
||
log.debug("Broker metadata: %s", brokers) | ||
log.debug("Topic metadata: %s", topics) | ||
|
||
self.brokers = brokers | ||
self.topics_to_brokers = {} | ||
|
||
for topic, partitions in topics.items(): | ||
# Clear the list once before we add it. This removes stale entries | ||
# and avoids duplicates | ||
self.topic_partitions.pop(topic, None) | ||
|
||
if not partitions: | ||
log.info("Partition is unassigned, delay for 1s and retry") | ||
time.sleep(1) | ||
self._load_metadata_for_topics(topic) | ||
break | ||
|
||
for partition, meta in partitions.items(): | ||
if meta.leader == -1: | ||
log.info("Partition is unassigned, delay for 1s and retry") | ||
time.sleep(1) | ||
self._load_metadata_for_topics(topic) | ||
else: | ||
topic_part = TopicAndPartition(topic, partition) | ||
self.topics_to_brokers[topic_part] = brokers[meta.leader] | ||
self.topic_partitions[topic].append(partition) | ||
|
||
def _next_id(self): | ||
""" | ||
Generate a new correlation id | ||
|
@@ -118,7 +76,7 @@ def _send_broker_unaware_request(self, requestId, request): | |
"trying next server: %s" % (request, conn, e)) | ||
continue | ||
|
||
return None | ||
raise KafkaException("All servers failed to process request") | ||
|
||
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): | ||
""" | ||
|
@@ -149,6 +107,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): | |
for payload in payloads: | ||
leader = self._get_leader_for_partition(payload.topic, | ||
payload.partition) | ||
if leader == -1: | ||
raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition) | ||
payloads_by_broker[leader].append(payload) | ||
original_keys.append((payload.topic, payload.partition)) | ||
|
||
|
@@ -174,7 +134,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): | |
except ConnectionError, e: # ignore BufferUnderflow for now | ||
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) | ||
failed_payloads += payloads | ||
self.topics_to_brokers = {} # reset metadata | ||
self.reset_all_metadata() | ||
continue | ||
|
||
for response in decoder_fn(response): | ||
|
@@ -186,9 +146,27 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): | |
# Order the accumulated responses by the original key order | ||
return (acc[k] for k in original_keys) if acc else () | ||
|
||
def _raise_on_response_error(self, resp): | ||
if resp.error == ErrorMapping.NO_ERROR: | ||
return | ||
|
||
if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON, | ||
ErrorMapping.NOT_LEADER_FOR_PARTITION): | ||
self.reset_topic_metadata(resp.topic) | ||
|
||
raise BrokerErrorResponse( | ||
"Request for %s failed with errorcode=%d" % | ||
(TopicAndPartition(resp.topic, resp.partition), resp.error)) | ||
|
||
################# | ||
# Public API # | ||
################# | ||
def reset_topic_metadata(self, *topics): | ||
for topic in topics: | ||
del self.topics_to_brokers[topic] | ||
|
||
def reset_all_metadata(self): | ||
self.topics_to_brokers = {} | ||
|
||
def close(self): | ||
for conn in self.conns.values(): | ||
|
@@ -208,6 +186,38 @@ def reinit(self): | |
for conn in self.conns.values(): | ||
conn.reinit() | ||
|
||
def load_metadata_for_topics(self, *topics): | ||
""" | ||
Discover brokers and metadata for a set of topics. This function is called | ||
lazily whenever metadata is unavailable. | ||
""" | ||
request_id = self._next_id() | ||
request = KafkaProtocol.encode_metadata_request(self.client_id, | ||
request_id, topics) | ||
|
||
response = self._send_broker_unaware_request(request_id, request) | ||
|
||
(brokers, topics) = KafkaProtocol.decode_metadata_response(response) | ||
|
||
log.debug("Broker metadata: %s", brokers) | ||
log.debug("Topic metadata: %s", topics) | ||
|
||
self.brokers = brokers | ||
self.topics_to_brokers = {} | ||
|
||
for topic, partitions in topics.items(): | ||
# Clear the list once before we add it. This removes stale entries | ||
# and avoids duplicates | ||
self.topic_partitions.pop(topic, None) | ||
|
||
if not partitions: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Raise here and the client is unable to work with topics that are in perfectly good shape. Your client might not even care about topic with unassigned partitions You could let if flow, any further call for this topic will generate a call to this function specifically for that topic. It will eventually fail if you still don't have a leader for the said partitions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see your point, this doesn't work well for wildcard metadata queries. I like the strategy of throwing an exception on demand if you try to send to a bad topic (which is currently done if leader is -1). Let me see what I can do |
||
raise PartitionUnavailableError("Partitions for %s are unassigned!" % topic) | ||
|
||
for partition, meta in partitions.items(): | ||
topic_part = TopicAndPartition(topic, partition) | ||
self.topics_to_brokers[topic_part] = brokers[meta.leader] | ||
self.topic_partitions[topic].append(partition) | ||
|
||
def send_produce_request(self, payloads=[], acks=1, timeout=1000, | ||
fail_on_error=True, callback=None): | ||
""" | ||
|
@@ -245,14 +255,9 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000, | |
|
||
out = [] | ||
for resp in resps: | ||
# Check for errors | ||
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: | ||
raise Exception( | ||
"ProduceRequest for %s failed with errorcode=%d" % | ||
(TopicAndPartition(resp.topic, resp.partition), | ||
resp.error)) | ||
|
||
# Run the callback | ||
if fail_on_error is True: | ||
self._raise_on_response_error(resp) | ||
|
||
if callback is not None: | ||
out.append(callback(resp)) | ||
else: | ||
|
@@ -278,14 +283,9 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, | |
|
||
out = [] | ||
for resp in resps: | ||
# Check for errors | ||
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: | ||
raise Exception( | ||
"FetchRequest for %s failed with errorcode=%d" % | ||
(TopicAndPartition(resp.topic, resp.partition), | ||
resp.error)) | ||
|
||
# Run the callback | ||
if fail_on_error is True: | ||
self._raise_on_response_error(resp) | ||
|
||
if callback is not None: | ||
out.append(callback(resp)) | ||
else: | ||
|
@@ -301,9 +301,8 @@ def send_offset_request(self, payloads=[], fail_on_error=True, | |
|
||
out = [] | ||
for resp in resps: | ||
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: | ||
raise Exception("OffsetRequest failed with errorcode=%s", | ||
resp.error) | ||
if fail_on_error is True: | ||
self._raise_on_response_error(resp) | ||
if callback is not None: | ||
out.append(callback(resp)) | ||
else: | ||
|
@@ -319,9 +318,8 @@ def send_offset_commit_request(self, group, payloads=[], | |
|
||
out = [] | ||
for resp in resps: | ||
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: | ||
raise Exception("OffsetCommitRequest failed with " | ||
"errorcode=%s", resp.error) | ||
if fail_on_error is True: | ||
self._raise_on_response_error(resp) | ||
|
||
if callback is not None: | ||
out.append(callback(resp)) | ||
|
@@ -339,9 +337,8 @@ def send_offset_fetch_request(self, group, payloads=[], | |
|
||
out = [] | ||
for resp in resps: | ||
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: | ||
raise Exception("OffsetCommitRequest failed with errorcode=%s", | ||
resp.error) | ||
if fail_on_error is True: | ||
self._raise_on_response_error(resp) | ||
if callback is not None: | ||
out.append(callback(resp)) | ||
else: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,23 +69,38 @@ class ErrorMapping(object): | |
# Exceptions # | ||
################# | ||
|
||
class FailedPayloadsException(Exception): | ||
|
||
class KafkaException(RuntimeError): | ||
pass | ||
|
||
|
||
class BrokerErrorResponse(KafkaException): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BrokerErrorException, BrokerResponseError or just BrokerError? http://www.python.org/dev/peps/pep-0008/#exception-names This actually reports an error from the broker, is it too generic? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It probably is too generic. I'll update the naming conventions and submit a PR in a bit (anxious to get this merged ;) ) |
||
pass | ||
|
||
|
||
class PartitionUnavailableError(KafkaException): | ||
pass | ||
|
||
|
||
class FailedPayloadsException(KafkaException): | ||
pass | ||
|
||
class ConnectionError(Exception): | ||
|
||
class ConnectionError(KafkaException): | ||
pass | ||
|
||
class BufferUnderflowError(Exception): | ||
|
||
class BufferUnderflowError(KafkaException): | ||
pass | ||
|
||
|
||
class ChecksumError(Exception): | ||
class ChecksumError(KafkaException): | ||
pass | ||
|
||
|
||
class ConsumerFetchSizeTooSmall(Exception): | ||
class ConsumerFetchSizeTooSmall(KafkaException): | ||
pass | ||
|
||
|
||
class ConsumerNoMoreData(Exception): | ||
class ConsumerNoMoreData(KafkaException): | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: you can combine multiple import lines by enclosing them within parenthesis instead of line continuation with
\