-
Notifications
You must be signed in to change notification settings - Fork 1.4k
No infinite loops during metadata requests, invalidate metadata more, exception hierarchy #91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
# and avoids duplicates | ||
self.topic_partitions.pop(topic, None) | ||
|
||
if not partitions: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Raise here and the client is unable to work with topics that are in perfectly good shape. Your client might not even care about topic with unassigned partitions
You could let if flow, any further call for this topic will generate a call to this function specifically for that topic. It will eventually fail if you still don't have a leader for the said partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point, this doesn't work well for wildcard metadata queries. I like the strategy of throwing an exception on demand if you try to send to a bad topic (which is currently done if leader is -1). Let me see what I can do
@mrtheb thanks for the comments - I've updated the diff so it doesn't throw an exception for empty partitions. The I've also fixed some clowniness in my |
|
||
from kafka.common import ErrorMapping, TopicAndPartition | ||
from kafka.common import ConnectionError, FailedPayloadsException | ||
from kafka.common import ErrorMapping, TopicAndPartition, \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: you can combine multiple import lines by enclosing them within parenthesis instead of line continuation with \
from kafka.common import (
ErrorMapping, TopicAndPartition,
ConnectionError, FailedPayloadsException, BrokerErrorResponse,
PartitionUnavailableError, KafkaException
)
Looks good to me, once you respond to my comments ;-) |
@mrtheb all ready to go |
When running on Linux with code on a case-insensitive file system, imports of the `Queue` module fail because python resolves the wrong file (It is trying to use a relative import of `queue.py` in the kafka directory). This change forces absolute imports via PEP328.
Previously, if you try to consume a message with a timeout greater than 10 seconds, but you don't receive data in those 10 seconds, a socket.timeout exception is raised. This allows a higher socket timeout to be set, or even None for no timeout.
According to the protocol documentation, the 4 byte integer at the beginning of a response represents the size of the payload only, not including those bytes. See http://goo.gl/rg5uom
…y time * Remove bufsize from client and conn, since they're not actually enforced Notes: This commit changes behavior a bit by raising a BufferUnderflowError when no data is received for the message size rather than a ConnectionError. Since bufsize in the socket is not actually enforced, but it is used by the consumer when creating requests, moving it there until a better solution is implemented.
* Combine partition fetch requests into a single request * Put the messages received in a queue and update offsets * Grab as many messages from the queue as requested * When the queue is empty, request more * timeout param for get_messages() is the actual timeout for getting those messages * Based on #74 - don't increase min_bytes if the consumer fetch buffer size is too small. Notes: Change MultiProcessConsumer and _mp_consume() accordingly. Previously, when querying each partition separately, it was possible to block waiting for messages on partition 0 even if there are new ones in partition 1. These changes allow us to block while waiting for messages on all partitions, and reduce total number of kafka requests. Use Queue.Queue for single proc Queue instead of already imported multiprocessing.Queue because the latter doesn't seem to guarantee immediate availability of items after a put: >>> from multiprocessing import Queue >>> q = Queue() >>> q.put(1); q.get_nowait() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait return self.get(False) File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get raise Empty Queue.Empty
to block forever if it's reached.
… iterator to exit when reached. Also put constant timeout values in pre-defined constants
Will remove once any error handling issues are resolved.
This is pretty much a rewrite. The tests that involve offset requests/responses are not implemented since that API is not supported in kafka 0.8 yet. Only kafka.codec and kafka.protocol are currently tested, so there is more work to be done here.
We always store the offset of the next available message, so we shouldn't decrement the offset deltas when seeking by an extra 1
…data Also, log.exception() is unhelpfully noisy. Use log.error() with some error details in the message instead.
This differentiates between errors that occur when sending the request and receiving the response, and adds BufferUnderflowError handling.
…tch size is too small Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size.
…ed in integration tests If some of the tests stop brokers then error out, the teardown method will try to close the same brokers and fail. This change allows it to continue.
This is better since the tests stop/start brokers, and if something goes wrong they can affect eachother.
* If the connection is dirty, reinit * If we get a BufferUnderflowError, the server could have gone away, so mark it dirty
Both errors are handled the same way when raised and caught, so this makes sense.
Moving to #100 because I idiotically rebased :( |
As issue #68 suggests, you can run into an infinite loop with this client if your partition has no leader assigned. This failure scenario should be pushed into the caller since they can handle it in different ways (i.e. retry with a different partition, page the on-call). This pull request primarily addresses the issue (see the new load_metadata_for_topics that has no sleeps).
I've also created an exception hierarchy so that the client can differentiate between Kafka errors and other errors when they handle issues.
Finally, using this hierarchy, I invalidate the topic metadata properly if we send a message to the wrong broker.
Integration tests pass after creating a setUp() method that warms the partition.