-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Allow KafkaClient to take in a list of brokers for bootstrapping #70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
reduce memory copies when consuming kafka responses
Replace _send_upstream datetime logic with simpler time().
Conflicts: setup.py
What's the status of this pull? This would come in handy right now (I've had to work around it by reconstructing my client with a random host on certain exceptions)... |
Thx for bringing it up. @mumrah did you have a chance to look at this? One thing to note with my changes is it only handles failover during initialization. During the client initialization with |
I have a diff in #91 that addresses the infinite loop by propagating an exception immediately for no brokers available, and lazily doing an exception when you send to a partition with -1 leader. In my understanding, it will fix your issue - can you take a look? Also, I'm not sure what you mean by failover during initialization - any time the metadata is reset, you will try to get metadata from all brokers, no? |
Failover during initialization means switching to the next available host in the list provided in I'll take a look at #91 but my first thought when trying to resolve this was simply to remove the recursion (remove inner calls to |
@@ -10,6 +11,26 @@ | |||
log = logging.getLogger("kafka") | |||
|
|||
|
|||
def collect_hosts(hosts, randomize=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think hosts should already be a list of "host:port" strings. The caller should handle creating it from a config or command line or whatever, and KafkaClient shouldn't have to worry about parsing and splitting strings. Also, as it is here, if the caller already gets the hosts as a list, it would have to convert it to a single string to use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I see where you are going with this argument.
I took the same approach here than what is used for the KazooClient (I actually stole the collect_host method there). Since both libraries may well be used together in many places (it is, in my case), I thought it would make sense to use the same kind of API.
Also, if you are using kafka command line tools, you'll pass a string in the same format as this.
Can you suggest a different API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that a [list] would be a more pythonic API.
From what I can tell, the KazooClient uses the comma-separated connect string because that's the java zookeeper API (see connectString in http://zookeeper.apache.org/doc/r3.2.2/api/org/apache/zookeeper/ZooKeeper.html)
The current kafka java api also uses a comma-separated string for bootstrapping (metadata.broker.list in http://kafka.apache.org/documentation.html) and it does not appear that that will be changing w/ new Producer API (http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html)
How about supporting both w/ a little polymorphism?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure is more pythonic. I can easily check for a comma-separated string or a list of "host:port" and store the info accordingly.
thanks for your comment, I'll add this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The connect string spec for ZooKeeper is a little funky, I'll agree. From their docs "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" is the general form. If we are going to use something other than the raw connect string, we most definitely need to include a chroot in addition to host:port pairs.
E.g.,
def collection_hosts(hosts=[], chroot="", randomize=True):
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mumrah this is for the brokers list here, no need for chroot. I simply used kazoo as a good example for this since it is very similar to what I was trying to do.
I am reworking this, I will probably open another PR with the latest merge but please respond to my comment here. I'll update my next PR accordingly and close this one once I have enough feedback. |
Closing, replaced by #122 |
Support for multiple hosts on KafkaClient boostrap (improves on #70)
Proposed resolution for enhancement #25
This change breaks the KafkaClient constructor API but replacing the
host
andport
parameters withhosts
represented as a comma-separated list ofhost:port
.E.g.
It also introduces the mock library for unit testing.
Note: some unit tests are marked as skipped until disabling recursion on
_load_metadata_for_topics
, an issue I found during testing (#68).