Skip to content

No infinite loops during metadata requests, invalidate metadata more, exception hierarchy #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 46 commits into from
Closed

Conversation

turtlesoupy
Copy link
Contributor

As issue #68 suggests, you can run into an infinite loop with this client if your partition has no leader assigned. This failure scenario should be pushed into the caller since they can handle it in different ways (i.e. retry with a different partition, page the on-call). This pull request primarily addresses the issue (see the new load_metadata_for_topics that has no sleeps).

I've also created an exception hierarchy so that the client can differentiate between Kafka errors and other errors when they handle issues.

Finally, using this hierarchy, I invalidate the topic metadata properly if we send a message to the wrong broker.

Integration tests pass after creating a setUp() method that warms the partition.

# and avoids duplicates
self.topic_partitions.pop(topic, None)

if not partitions:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raise here and the client is unable to work with topics that are in perfectly good shape. Your client might not even care about topic with unassigned partitions

You could let if flow, any further call for this topic will generate a call to this function specifically for that topic. It will eventually fail if you still don't have a leader for the said partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, this doesn't work well for wildcard metadata queries. I like the strategy of throwing an exception on demand if you try to send to a bad topic (which is currently done if leader is -1). Let me see what I can do

@turtlesoupy
Copy link
Contributor Author

@mrtheb thanks for the comments - I've updated the diff so it doesn't throw an exception for empty partitions. The _get_leader_for_partition call should fail when a partition you care about actually doesn't exist.

I've also fixed some clowniness in my reset_topic_metadata method (which didn't work before) and changed the defaultdict into a normal dict for safety. Mind taking another look?


from kafka.common import ErrorMapping, TopicAndPartition
from kafka.common import ConnectionError, FailedPayloadsException
from kafka.common import ErrorMapping, TopicAndPartition, \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: you can combine multiple import lines by enclosing them within parenthesis instead of line continuation with \

from kafka.common import (
    ErrorMapping, TopicAndPartition,
    ConnectionError, FailedPayloadsException, BrokerErrorResponse,
    PartitionUnavailableError, KafkaException
)

@mrtheb
Copy link
Collaborator

mrtheb commented Jan 9, 2014

Looks good to me, once you respond to my comments ;-)

@turtlesoupy
Copy link
Contributor Author

@mrtheb all ready to go :suspect:

turtlesoupy and others added 18 commits January 13, 2014 14:42
When running on Linux with code on a case-insensitive file system,
imports of the `Queue` module fail because python resolves the
wrong file (It is trying to use a relative import of `queue.py` in
the kafka directory). This change forces absolute imports via PEP328.
Previously, if you try to consume a message with a timeout greater than 10 seconds,
but you don't receive data in those 10 seconds, a socket.timeout exception is raised.
This allows a higher socket timeout to be set, or even None for no timeout.
According to the protocol documentation, the 4 byte integer at the beginning
of a response represents the size of the payload only, not including those bytes.
See http://goo.gl/rg5uom
…y time

* Remove bufsize from client and conn, since they're not actually enforced

Notes:

This commit changes behavior a bit by raising a BufferUnderflowError when
no data is received for the message size rather than a ConnectionError.

Since bufsize in the socket is not actually enforced, but it is used by the consumer
when creating requests, moving it there until a better solution is implemented.
* Combine partition fetch requests into a single request
* Put the messages received in a queue and update offsets
* Grab as many messages from the queue as requested
* When the queue is empty, request more
* timeout param for get_messages() is the actual timeout for getting those messages
* Based on #74 -
  don't increase min_bytes if the consumer fetch buffer size is too small.

Notes:

Change MultiProcessConsumer and _mp_consume() accordingly.

Previously, when querying each partition separately, it was possible to
block waiting for messages on partition 0 even if there are new ones in partition 1.
These changes allow us to block while waiting for messages on all partitions,
and reduce total number of kafka requests.

Use Queue.Queue for single proc Queue instead of already imported
multiprocessing.Queue because the latter doesn't seem to guarantee immediate
availability of items after a put:

>>> from multiprocessing import Queue
>>> q = Queue()
>>> q.put(1); q.get_nowait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait
    return self.get(False)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get
    raise Empty
Queue.Empty
… iterator to exit when reached.

Also put constant timeout values in pre-defined constants
Will remove once any error handling issues are resolved.
This is pretty much a rewrite. The tests that involve offset requests/responses
are not implemented since that API is not supported in kafka 0.8 yet.
Only kafka.codec and kafka.protocol are currently tested, so there is more work to be done here.
We always store the offset of the next available message, so we shouldn't decrement the offset deltas
when seeking by an extra 1
rdiomar and others added 23 commits January 13, 2014 14:44
…data

Also, log.exception() is unhelpfully noisy. Use log.error() with some error details in the message instead.
This differentiates between errors that occur when sending the request
and receiving the response, and adds BufferUnderflowError handling.
…tch size is too small

Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size.
…ed in integration tests

If some of the tests stop brokers then error out, the teardown method will try to close the
same brokers and fail. This change allows it to continue.
This is better since the tests stop/start brokers, and if something goes wrong
they can affect eachother.
* If the connection is dirty, reinit
* If we get a BufferUnderflowError, the server could have gone away, so mark it dirty
Both errors are handled the same way when raised and caught, so this makes sense.
@turtlesoupy
Copy link
Contributor Author

Moving to #100 because I idiotically rebased :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants