Skip to content

Commit ab89a44

Browse files
committed
Merge pull request #122 from mrtheb/multihosts
Support for multiple hosts on KafkaClient boostrap (improves on #70)
2 parents e5fdc1c + 51910f9 commit ab89a44

File tree

8 files changed

+154
-31
lines changed

8 files changed

+154
-31
lines changed

README.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ from kafka.client import KafkaClient
2929
from kafka.consumer import SimpleConsumer
3030
from kafka.producer import SimpleProducer, KeyedProducer
3131

32-
kafka = KafkaClient("localhost", 9092)
32+
kafka = KafkaClient("localhost:9092")
3333

3434
# To send messages synchronously
3535
producer = SimpleProducer(kafka)
@@ -80,7 +80,7 @@ from kafka.client import KafkaClient
8080
from kafka.producer import KeyedProducer
8181
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
8282

83-
kafka = KafkaClient("localhost", 9092)
83+
kafka = KafkaClient("localhost:9092")
8484

8585
# HashedPartitioner is default
8686
producer = KeyedProducer(kafka)
@@ -95,7 +95,7 @@ producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
9595
from kafka.client import KafkaClient
9696
from kafka.consumer import MultiProcessConsumer
9797

98-
kafka = KafkaClient("localhost", 9092)
98+
kafka = KafkaClient("localhost:9092")
9999

100100
# This will split the number of partitions among two processes
101101
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
@@ -115,7 +115,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):
115115

116116
```python
117117
from kafka.client import KafkaClient
118-
kafka = KafkaClient("localhost", 9092)
118+
kafka = KafkaClient("localhost:9092")
119119
req = ProduceRequest(topic="my-topic", partition=1,
120120
messages=[KafkaProdocol.encode_message("some message")])
121121
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)

example.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def consume_example(client):
1414
print(message)
1515

1616
def main():
17-
client = KafkaClient("localhost", 9092)
17+
client = KafkaClient("localhost:9092")
1818
produce_example(client)
1919
consume_example(client)
2020

kafka/NOTES.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ There are a few levels of abstraction:
1818

1919
# Possible API
2020

21-
client = KafkaClient("localhost", 9092)
21+
client = KafkaClient("localhost:9092")
2222

2323
producer = KafkaProducer(client, "topic")
2424
producer.send_string("hello")

kafka/client.py

+18-7
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
BrokerResponseError, PartitionUnavailableError,
1111
KafkaUnavailableError, KafkaRequestError)
1212

13-
from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
13+
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
1414
from kafka.protocol import KafkaProtocol
1515

1616
log = logging.getLogger("kafka")
@@ -24,14 +24,15 @@ class KafkaClient(object):
2424
# NOTE: The timeout given to the client should always be greater than the
2525
# one passed to SimpleConsumer.get_message(), otherwise you can get a
2626
# socket timeout.
27-
def __init__(self, host, port, client_id=CLIENT_ID,
27+
def __init__(self, hosts, client_id=CLIENT_ID,
2828
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
2929
# We need one connection to bootstrap
3030
self.client_id = client_id
3131
self.timeout = timeout
32-
self.conns = { # (host, port) -> KafkaConnection
33-
(host, port): KafkaConnection(host, port, timeout=timeout)
34-
}
32+
self.hosts = collect_hosts(hosts)
33+
34+
# create connections only when we need them
35+
self.conns = {}
3536
self.brokers = {} # broker_id -> BrokerMetadata
3637
self.topics_to_brokers = {} # topic_id -> broker_id
3738
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
@@ -41,6 +42,15 @@ def __init__(self, host, port, client_id=CLIENT_ID,
4142
# Private API #
4243
##################
4344

45+
def _get_conn(self, host, port):
46+
"Get or create a connection to a broker using host and port"
47+
48+
host_key = (host, port)
49+
if host_key not in self.conns:
50+
self.conns[host_key] = KafkaConnection(host, port)
51+
52+
return self.conns[host_key]
53+
4454
def _get_conn_for_broker(self, broker):
4555
"""
4656
Get or create a connection to a broker
@@ -49,7 +59,7 @@ def _get_conn_for_broker(self, broker):
4959
self.conns[(broker.host, broker.port)] = \
5060
KafkaConnection(broker.host, broker.port, timeout=self.timeout)
5161

52-
return self.conns[(broker.host, broker.port)]
62+
return self._get_conn(broker.host, broker.port)
5363

5464
def _get_leader_for_partition(self, topic, partition):
5565
key = TopicAndPartition(topic, partition)
@@ -72,7 +82,8 @@ def _send_broker_unaware_request(self, requestId, request):
7282
Attempt to send a broker-agnostic request to one of the available
7383
brokers. Keep trying until you succeed.
7484
"""
75-
for conn in self.conns.values():
85+
for (host, port) in self.hosts:
86+
conn = self._get_conn(host, port)
7687
try:
7788
conn.send(requestId, request)
7889
response = conn.recv(requestId)

kafka/conn.py

+27-1
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,39 @@
22
import logging
33
import socket
44
import struct
5+
from random import shuffle
56
from threading import local
67

78
from kafka.common import ConnectionError
89

910
log = logging.getLogger("kafka")
1011

1112
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
13+
DEFAULT_KAFKA_PORT = 9092
14+
15+
16+
def collect_hosts(hosts, randomize=True):
17+
"""
18+
Collects a comma-separated set of hosts (host:port) and optionnaly
19+
randomize the returned list.
20+
"""
21+
22+
if isinstance(hosts, str):
23+
hosts = hosts.strip().split(',')
24+
25+
result = []
26+
for host_port in hosts:
27+
28+
res = host_port.split(':')
29+
host = res[0]
30+
port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
31+
result.append((host.strip(), port))
32+
33+
if randomize:
34+
shuffle(result)
35+
36+
return result
37+
1238

1339
class KafkaConnection(local):
1440
"""
@@ -81,7 +107,7 @@ def send(self, request_id, payload):
81107
sent = self._sock.sendall(payload)
82108
if sent is not None:
83109
self._raise_connection_error()
84-
except socket.error, e:
110+
except socket.error:
85111
log.exception('Unable to send payload to Kafka')
86112
self._raise_connection_error()
87113

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import os.path
21
import sys
32

43
from setuptools import setup, Command
54

65

76
class Tox(Command):
7+
88
user_options = []
99

1010
def initialize_options(self):

test/test_integration.py

+19-15
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def ensure_topic_creation(client, topic_name):
3333

3434
class KafkaTestCase(unittest.TestCase):
3535
def setUp(self):
36-
self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10))
36+
self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
3737
ensure_topic_creation(self.client, self.topic)
3838

3939

@@ -42,7 +42,7 @@ class TestKafkaClient(KafkaTestCase):
4242
def setUpClass(cls): # noqa
4343
cls.zk = ZookeeperFixture.instance()
4444
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
45-
cls.client = KafkaClient(cls.server.host, cls.server.port)
45+
cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))
4646

4747
@classmethod
4848
def tearDownClass(cls): # noqa
@@ -578,7 +578,7 @@ def setUpClass(cls):
578578
cls.zk = ZookeeperFixture.instance()
579579
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
580580
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
581-
cls.client = KafkaClient(cls.server2.host, cls.server2.port)
581+
cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port))
582582

583583
@classmethod
584584
def tearDownClass(cls): # noqa
@@ -826,23 +826,26 @@ def test_large_messages(self):
826826

827827
class TestFailover(KafkaTestCase):
828828

829-
def setUp(self):
829+
@classmethod
830+
def setUpClass(cls): # noqa
830831
zk_chroot = random_string(10)
831832
replicas = 2
832833
partitions = 2
833834

834835
# mini zookeeper, 2 kafka brokers
835-
self.zk = ZookeeperFixture.instance()
836-
kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
837-
self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
838-
self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
839-
super(TestFailover, self).setUp()
840-
841-
def tearDown(self):
842-
self.client.close()
843-
for broker in self.brokers:
836+
cls.zk = ZookeeperFixture.instance()
837+
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
838+
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
839+
840+
hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers]
841+
cls.client = KafkaClient(hosts)
842+
843+
@classmethod
844+
def tearDownClass(cls):
845+
cls.client.close()
846+
for broker in cls.brokers:
844847
broker.close()
845-
self.zk.close()
848+
cls.zk.close()
846849

847850
def test_switch_leader(self):
848851
key, topic, partition = random_string(5), self.topic, 0
@@ -918,7 +921,8 @@ def _kill_leader(self, topic, partition):
918921
return broker
919922

920923
def _count_messages(self, group, topic):
921-
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
924+
hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
925+
client = KafkaClient(hosts)
922926
consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
923927
all_messages = []
924928
for message in consumer:

test/test_unit.py

+83-1
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@
33
import struct
44
import unittest
55

6+
from mock import MagicMock, patch
7+
8+
9+
from kafka import KafkaClient
610
from kafka.common import (
711
ProduceRequest, FetchRequest, Message, ChecksumError,
812
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
913
OffsetAndMessage, BrokerMetadata, PartitionMetadata
1014
)
15+
from kafka.common import KafkaUnavailableError
1116
from kafka.codec import (
1217
has_gzip, has_snappy, gzip_encode, gzip_decode,
1318
snappy_encode, snappy_decode
@@ -405,7 +410,6 @@ def test_encode_offset_request(self):
405410
def test_decode_offset_response(self):
406411
pass
407412

408-
409413
@unittest.skip("Not Implemented")
410414
def test_encode_offset_commit_request(self):
411415
pass
@@ -423,5 +427,83 @@ def test_decode_offset_fetch_response(self):
423427
pass
424428

425429

430+
class TestKafkaClient(unittest.TestCase):
431+
432+
def test_init_with_list(self):
433+
434+
with patch.object(KafkaClient, 'load_metadata_for_topics'):
435+
client = KafkaClient(
436+
hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
437+
438+
self.assertItemsEqual(
439+
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
440+
client.hosts)
441+
442+
def test_init_with_csv(self):
443+
444+
with patch.object(KafkaClient, 'load_metadata_for_topics'):
445+
client = KafkaClient(
446+
hosts='kafka01:9092,kafka02:9092,kafka03:9092')
447+
448+
self.assertItemsEqual(
449+
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
450+
client.hosts)
451+
452+
def test_send_broker_unaware_request_fail(self):
453+
'Tests that call fails when all hosts are unavailable'
454+
455+
mocked_conns = {
456+
('kafka01', 9092): MagicMock(),
457+
('kafka02', 9092): MagicMock()
458+
}
459+
# inject KafkaConnection side effects
460+
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
461+
mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)")
462+
463+
def mock_get_conn(host, port):
464+
return mocked_conns[(host, port)]
465+
466+
# patch to avoid making requests before we want it
467+
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
468+
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
469+
470+
client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])
471+
472+
self.assertRaises(
473+
KafkaUnavailableError,
474+
client._send_broker_unaware_request,
475+
1, 'fake request')
476+
477+
for key, conn in mocked_conns.iteritems():
478+
conn.send.assert_called_with(1, 'fake request')
479+
480+
def test_send_broker_unaware_request(self):
481+
'Tests that call works when at least one of the host is available'
482+
483+
mocked_conns = {
484+
('kafka01', 9092): MagicMock(),
485+
('kafka02', 9092): MagicMock(),
486+
('kafka03', 9092): MagicMock()
487+
}
488+
# inject KafkaConnection side effects
489+
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
490+
mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response'
491+
mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)")
492+
493+
def mock_get_conn(host, port):
494+
return mocked_conns[(host, port)]
495+
496+
# patch to avoid making requests before we want it
497+
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
498+
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
499+
500+
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
501+
502+
resp = client._send_broker_unaware_request(1, 'fake request')
503+
504+
self.assertEqual('valid response', resp)
505+
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
506+
507+
426508
if __name__ == '__main__':
427509
unittest.main()

0 commit comments

Comments
 (0)