Skip to content

Allow KafkaClient to take in a list of brokers for bootstrapping #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# To send messages synchronously
producer = SimpleProducer(kafka, "my-topic")
Expand Down Expand Up @@ -81,7 +81,7 @@ from kafka.client import KafkaClient
from kafka.producer import KeyedProducer
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# HashedPartitioner is default
producer = KeyedProducer(kafka, "my-topic")
Expand All @@ -96,7 +96,7 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
from kafka.client import KafkaClient
from kafka.consumer import MultiProcessConsumer

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# This will split the number of partitions among two processes
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
Expand All @@ -116,7 +116,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):

```python
from kafka.client import KafkaClient
kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProdocol.encode_message("some message")])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
Expand Down
2 changes: 1 addition & 1 deletion example.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def consume_example(client):
print(message)

def main():
client = KafkaClient("localhost", 9092)
client = KafkaClient("localhost:9092")
produce_example(client)
consume_example(client)

Expand Down
2 changes: 1 addition & 1 deletion kafka/NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ There are a few levels of abstraction:

# Possible API

client = KafkaClient("localhost", 9092)
client = KafkaClient("localhost:9092")

producer = KafkaProducer(client, "topic")
producer.send_string("hello")
Expand Down
35 changes: 21 additions & 14 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from kafka.common import ErrorMapping, TopicAndPartition
from kafka.common import ConnectionError, FailedPayloadsException
from kafka.conn import KafkaConnection
from kafka.conn import collect_hosts, KafkaConnection
from kafka.protocol import KafkaProtocol

log = logging.getLogger("kafka")
Expand All @@ -19,13 +19,15 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()

def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
def __init__(self, hosts, bufsize=4096, client_id=CLIENT_ID):
# We need one connection to bootstrap
self.bufsize = bufsize
self.client_id = client_id
self.conns = { # (host, port) -> KafkaConnection
(host, port): KafkaConnection(host, port, bufsize)
}

self.hosts = collect_hosts(hosts)

# create connections only when we need them
self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
Expand All @@ -35,15 +37,19 @@ def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
# Private API #
##################

def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"

host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(host, port, self.bufsize)

return self.conns[host_key]

def _get_conn_for_broker(self, broker):
"""
Get or create a connection to a broker
"""
if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = \
KafkaConnection(broker.host, broker.port, self.bufsize)
"Get or create a connection to a broker"

return self.conns[(broker.host, broker.port)]
return self._get_conn(broker.host, broker.port)

def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition)
Expand Down Expand Up @@ -108,7 +114,8 @@ def _send_broker_unaware_request(self, requestId, request):
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
for conn in self.conns.values():
for (host, port) in self.hosts:
conn = self._get_conn(host, port)
try:
conn.send(requestId, request)
response = conn.recv(requestId)
Expand Down Expand Up @@ -174,7 +181,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
except ConnectionError, e: # ignore BufferUnderflow for now
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
failed_payloads += payloads
self.topics_to_brokers = {} # reset metadata
self.topics_to_brokers = {} # reset metadata
continue

for response in decoder_fn(response):
Expand Down
40 changes: 28 additions & 12 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import socket
import struct
from random import shuffle
from threading import local

from kafka.common import BufferUnderflowError
Expand All @@ -10,6 +11,26 @@
log = logging.getLogger("kafka")


def collect_hosts(hosts, randomize=True):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think hosts should already be a list of "host:port" strings. The caller should handle creating it from a config or command line or whatever, and KafkaClient shouldn't have to worry about parsing and splitting strings. Also, as it is here, if the caller already gets the hosts as a list, it would have to convert it to a single string to use it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I see where you are going with this argument.

I took the same approach here than what is used for the KazooClient (I actually stole the collect_host method there). Since both libraries may well be used together in many places (it is, in my case), I thought it would make sense to use the same kind of API.

Also, if you are using kafka command line tools, you'll pass a string in the same format as this.

Can you suggest a different API?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that a [list] would be a more pythonic API.

From what I can tell, the KazooClient uses the comma-separated connect string because that's the java zookeeper API (see connectString in http://zookeeper.apache.org/doc/r3.2.2/api/org/apache/zookeeper/ZooKeeper.html)

The current kafka java api also uses a comma-separated string for bootstrapping (metadata.broker.list in http://kafka.apache.org/documentation.html) and it does not appear that that will be changing w/ new Producer API (http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html)

How about supporting both w/ a little polymorphism?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure is more pythonic. I can easily check for a comma-separated string or a list of "host:port" and store the info accordingly.

thanks for your comment, I'll add this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connect string spec for ZooKeeper is a little funky, I'll agree. From their docs "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" is the general form. If we are going to use something other than the raw connect string, we most definitely need to include a chroot in addition to host:port pairs.

E.g.,

def collection_hosts(hosts=[], chroot="", randomize=True):
    ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mumrah this is for the brokers list here, no need for chroot. I simply used kazoo as a good example for this since it is very similar to what I was trying to do.

"""
Collects a comma-separated set of hosts (host:port) and optionnaly
randomize the returned list.
"""

result = []
for host_port in hosts.split(","):

res = host_port.split(':')
host = res[0]
port = int(res[1]) if len(res) > 1 else 9092
result.append((host.strip(), port))

if randomize:
shuffle(result)

return result


class KafkaConnection(local):
"""
A socket connection to a single Kafka broker
Expand All @@ -19,14 +40,14 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
def __init__(self, host, port, bufsize=4096):
def __init__(self, host, port, bufsize=4096, timeout=10):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

socket timeouts were added in #88

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course, I should have done a separate PR for that, I would have been first ;-)

super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self.bufsize = bufsize
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self._sock.settimeout(10)
self.timeout = timeout

self._sock = socket.create_connection((host, port), timeout=timeout)
self._dirty = False

def __str__(self):
Expand All @@ -38,12 +59,9 @@ def __str__(self):

def _consume_response(self):
"""
Fully consumer the response iterator
Fully consume the response iterator
"""
data = ""
for chunk in self._consume_response_iter():
data += chunk
return data
return "".join(self._consume_response_iter())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was also changed in #88


def _consume_response_iter(self):
"""
Expand Down Expand Up @@ -125,7 +143,5 @@ def reinit(self):
Re-initialize the socket connection
"""
self.close()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self.host, self.port))
self._sock.settimeout(10)
self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
self._dirty = False
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import os.path
import sys

from setuptools import setup, Command


class Tox(Command):

user_options = []

def initialize_options(self):
pass

Expand All @@ -21,7 +22,7 @@ def run(self):
name="kafka-python",
version="0.8.1-1",

install_requires=["distribute", "tox"],
install_requires=["distribute", "mock"],
tests_require=["tox"],
cmdclass={"test": Tox},

Expand Down
23 changes: 14 additions & 9 deletions test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class TestKafkaClient(unittest.TestCase):
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server.host, cls.server.port)
cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))

@classmethod
def tearDownClass(cls): # noqa
Expand Down Expand Up @@ -554,7 +554,7 @@ def setUpClass(cls):
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192)
cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192)

@classmethod
def tearDownClass(cls): # noqa
Expand Down Expand Up @@ -770,20 +770,23 @@ def test_large_messages(self):
self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19)


class TestFailover(unittest.TestCase):

@classmethod
def setUpClass(cls):

zk_chroot = random_string(10)
replicas = 2
replicas = 2
partitions = 2

# mini zookeeper, 2 kafka brokers
cls.zk = ZookeeperFixture.instance()
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
cls.zk = ZookeeperFixture.instance()
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port)

hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers])
cls.client = KafkaClient(hosts)

@classmethod
def tearDownClass(cls):
Expand Down Expand Up @@ -858,17 +861,19 @@ def _send_random_messages(self, producer, n):
resp = producer.send_messages(random_string(10))
if len(resp) > 0:
self.assertEquals(resp[0].error, 0)
time.sleep(1) # give it some time
time.sleep(1) # give it some time

def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
broker = self.brokers[leader.nodeId]
broker.close()
time.sleep(1) # give it some time
time.sleep(1) # give it some time
return broker

def _count_messages(self, group, topic):
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)

hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
client = KafkaClient(hosts)
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
all_messages = []
for message in consumer:
Expand Down
Loading