Skip to content

Allow KafkaClient to take in a list of brokers for bootstrapping #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

mrtheb
Copy link
Collaborator

@mrtheb mrtheb commented Nov 14, 2013

Proposed resolution for enhancement #25

This change breaks the KafkaClient constructor API but replacing the host and port parameters with hosts represented as a comma-separated list of host:port.

E.g.

kafka = KafkaClient("localhost:9092,localhost:9093")

It also introduces the mock library for unit testing.

Note: some unit tests are marked as skipped until disabling recursion on _load_metadata_for_topics, an issue I found during testing (#68).

@turtlesoupy
Copy link
Contributor

What's the status of this pull? This would come in handy right now (I've had to work around it by reconstructing my client with a random host on certain exceptions)...

@mrtheb
Copy link
Collaborator Author

mrtheb commented Jan 8, 2014

Thx for bringing it up.

@mumrah did you have a chance to look at this?

One thing to note with my changes is it only handles failover during initialization. During the client initialization with load_metadata_for_topic, it will cycle through all available hosts. However, once the client receives an error from a broker while doing a produce/fetch request, proper retry mechanism must be implemented on top of the current client.

@turtlesoupy
Copy link
Contributor

I have a diff in #91 that addresses the infinite loop by propagating an exception immediately for no brokers available, and lazily doing an exception when you send to a partition with -1 leader. In my understanding, it will fix your issue - can you take a look?

Also, I'm not sure what you mean by failover during initialization - any time the metadata is reset, you will try to get metadata from all brokers, no?

@mrtheb
Copy link
Collaborator Author

mrtheb commented Jan 8, 2014

Failover during initialization means switching to the next available host in the list provided in hosts if the one just tried fails. It does that until all hosts have been tried. Initialization, hence _load_metadata_for_topic() is the only caller for _send_broker_unaware_request which does cycle through all hosts.

I'll take a look at #91 but my first thought when trying to resolve this was simply to remove the recursion (remove inner calls to _load_metadata_for_topic and lazily retry when an actual caller requests information about a specific topic. My use case was the a topic had a problem while the one I was interested in didn't but the whole thing was blocked. Just removing the recursion was enough for me. I don't see a need to raise an exception up to the caller for the same reasons.

@turtlesoupy turtlesoupy mentioned this pull request Jan 15, 2014
@@ -10,6 +11,26 @@
log = logging.getLogger("kafka")


def collect_hosts(hosts, randomize=True):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think hosts should already be a list of "host:port" strings. The caller should handle creating it from a config or command line or whatever, and KafkaClient shouldn't have to worry about parsing and splitting strings. Also, as it is here, if the caller already gets the hosts as a list, it would have to convert it to a single string to use it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I see where you are going with this argument.

I took the same approach here than what is used for the KazooClient (I actually stole the collect_host method there). Since both libraries may well be used together in many places (it is, in my case), I thought it would make sense to use the same kind of API.

Also, if you are using kafka command line tools, you'll pass a string in the same format as this.

Can you suggest a different API?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that a [list] would be a more pythonic API.

From what I can tell, the KazooClient uses the comma-separated connect string because that's the java zookeeper API (see connectString in http://zookeeper.apache.org/doc/r3.2.2/api/org/apache/zookeeper/ZooKeeper.html)

The current kafka java api also uses a comma-separated string for bootstrapping (metadata.broker.list in http://kafka.apache.org/documentation.html) and it does not appear that that will be changing w/ new Producer API (http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html)

How about supporting both w/ a little polymorphism?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure is more pythonic. I can easily check for a comma-separated string or a list of "host:port" and store the info accordingly.

thanks for your comment, I'll add this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connect string spec for ZooKeeper is a little funky, I'll agree. From their docs "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" is the general form. If we are going to use something other than the raw connect string, we most definitely need to include a chroot in addition to host:port pairs.

E.g.,

def collection_hosts(hosts=[], chroot="", randomize=True):
    ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mumrah this is for the brokers list here, no need for chroot. I simply used kazoo as a good example for this since it is very similar to what I was trying to do.

@mrtheb
Copy link
Collaborator Author

mrtheb commented Feb 1, 2014

I am reworking this, I will probably open another PR with the latest merge but please respond to my comment here. I'll update my next PR accordingly and close this one once I have enough feedback.

@mrtheb
Copy link
Collaborator Author

mrtheb commented Feb 9, 2014

Closing, replaced by #122

@mrtheb mrtheb closed this Feb 9, 2014
rdiomar added a commit that referenced this pull request Feb 27, 2014
Support for multiple hosts on KafkaClient boostrap (improves on #70)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants