Skip to content

Commit 9bed11d

Browse files
committed
Merge pull request #134 from wizzat/conn_refactor
conn.py performance improvements, make examples work, add another example
2 parents e937e3f + a6fc260 commit 9bed11d

File tree

5 files changed

+133
-39
lines changed

5 files changed

+133
-39
lines changed

example.py

100644100755
+38-13
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,48 @@
1-
import logging
1+
#!/usr/bin/env python
2+
import threading, logging, time
23

3-
from kafka.client import KafkaClient, FetchRequest, ProduceRequest
4+
from kafka.client import KafkaClient
45
from kafka.consumer import SimpleConsumer
56
from kafka.producer import SimpleProducer
67

7-
def produce_example(client):
8-
producer = SimpleProducer(client, "my-topic")
9-
producer.send_messages("test")
8+
class Producer(threading.Thread):
9+
daemon = True
1010

11-
def consume_example(client):
12-
consumer = SimpleConsumer(client, "test-group", "my-topic")
13-
for message in consumer:
14-
print(message)
11+
def run(self):
12+
client = KafkaClient("localhost:9092")
13+
producer = SimpleProducer(client)
14+
15+
while True:
16+
producer.send_messages('my-topic', "test")
17+
producer.send_messages('my-topic', "\xc2Hola, mundo!")
18+
19+
time.sleep(1)
20+
21+
22+
class Consumer(threading.Thread):
23+
daemon = True
24+
25+
def run(self):
26+
client = KafkaClient("localhost:9092")
27+
consumer = SimpleConsumer(client, "test-group", "my-topic")
28+
29+
for message in consumer:
30+
print(message)
1531

1632
def main():
17-
client = KafkaClient("localhost:9092")
18-
produce_example(client)
19-
consume_example(client)
33+
threads = [
34+
Producer(),
35+
Consumer()
36+
]
37+
38+
for t in threads:
39+
t.start()
40+
41+
time.sleep(5)
2042

2143
if __name__ == "__main__":
22-
logging.basicConfig(level=logging.DEBUG)
44+
logging.basicConfig(
45+
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
46+
level=logging.DEBUG
47+
)
2348
main()

kafka/client.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from functools import partial
66
from itertools import count
77

8-
from kafka.common import (ErrorMapping, TopicAndPartition,
8+
from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
99
ConnectionError, FailedPayloadsError,
1010
BrokerResponseError, PartitionUnavailableError,
1111
LeaderUnavailableError,
@@ -199,8 +199,8 @@ def _raise_on_response_error(self, resp):
199199
self.reset_topic_metadata(resp.topic)
200200

201201
raise BrokerResponseError(
202-
"Request for %s failed with errorcode=%d" %
203-
(TopicAndPartition(resp.topic, resp.partition), resp.error))
202+
"Request for %s failed with errorcode=%d (%s)" %
203+
(TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error]))
204204

205205
#################
206206
# Public API #

kafka/common.py

+21-15
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,28 @@
4848
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
4949

5050

51+
ErrorStrings = {
52+
-1 : 'UNKNOWN',
53+
0 : 'NO_ERROR',
54+
1 : 'OFFSET_OUT_OF_RANGE',
55+
2 : 'INVALID_MESSAGE',
56+
3 : 'UNKNOWN_TOPIC_OR_PARTITON',
57+
4 : 'INVALID_FETCH_SIZE',
58+
5 : 'LEADER_NOT_AVAILABLE',
59+
6 : 'NOT_LEADER_FOR_PARTITION',
60+
7 : 'REQUEST_TIMED_OUT',
61+
8 : 'BROKER_NOT_AVAILABLE',
62+
9 : 'REPLICA_NOT_AVAILABLE',
63+
10 : 'MESSAGE_SIZE_TOO_LARGE',
64+
11 : 'STALE_CONTROLLER_EPOCH',
65+
12 : 'OFFSET_METADATA_TOO_LARGE',
66+
}
67+
5168
class ErrorMapping(object):
52-
# Many of these are not actually used by the client
53-
UNKNOWN = -1
54-
NO_ERROR = 0
55-
OFFSET_OUT_OF_RANGE = 1
56-
INVALID_MESSAGE = 2
57-
UNKNOWN_TOPIC_OR_PARTITON = 3
58-
INVALID_FETCH_SIZE = 4
59-
LEADER_NOT_AVAILABLE = 5
60-
NOT_LEADER_FOR_PARTITION = 6
61-
REQUEST_TIMED_OUT = 7
62-
BROKER_NOT_AVAILABLE = 8
63-
REPLICA_NOT_AVAILABLE = 9
64-
MESSAGE_SIZE_TO_LARGE = 10
65-
STALE_CONTROLLER_EPOCH = 11
66-
OFFSET_METADATA_TOO_LARGE = 12
69+
pass
70+
71+
for k, v in ErrorStrings.items():
72+
setattr(ErrorMapping, v, k)
6773

6874
#################
6975
# Exceptions #

kafka/conn.py

+11-8
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,10 @@ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
5454
super(KafkaConnection, self).__init__()
5555
self.host = host
5656
self.port = port
57-
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
58-
self._sock.connect((host, port))
5957
self.timeout = timeout
60-
self._sock.settimeout(self.timeout)
61-
self._dirty = False
58+
self._sock = None
59+
60+
self.reinit()
6261

6362
def __repr__(self):
6463
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
@@ -73,24 +72,28 @@ def _raise_connection_error(self):
7372

7473
def _read_bytes(self, num_bytes):
7574
bytes_left = num_bytes
76-
resp = ''
75+
responses = []
76+
7777
log.debug("About to read %d bytes from Kafka", num_bytes)
7878
if self._dirty:
7979
self.reinit()
80+
8081
while bytes_left:
8182
try:
82-
data = self._sock.recv(bytes_left)
83+
data = self._sock.recv(min(bytes_left, 4096))
8384
except socket.error:
8485
log.exception('Unable to receive data from Kafka')
8586
self._raise_connection_error()
87+
8688
if data == '':
8789
log.error("Not enough data to read this response")
8890
self._raise_connection_error()
91+
8992
bytes_left -= len(data)
9093
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
91-
resp += data
94+
responses.append(data)
9295

93-
return resp
96+
return ''.join(responses)
9497

9598
##################
9699
# Public API #

load_example.py

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#!/usr/bin/env python
2+
import threading, logging, time, collections
3+
4+
from kafka.client import KafkaClient
5+
from kafka.consumer import SimpleConsumer
6+
from kafka.producer import SimpleProducer
7+
8+
msg_size = 524288
9+
10+
class Producer(threading.Thread):
11+
daemon = True
12+
big_msg = "1" * msg_size
13+
14+
def run(self):
15+
client = KafkaClient("localhost:9092")
16+
producer = SimpleProducer(client)
17+
self.sent = 0
18+
19+
while True:
20+
producer.send_messages('my-topic', self.big_msg)
21+
self.sent += 1
22+
23+
24+
class Consumer(threading.Thread):
25+
daemon = True
26+
27+
def run(self):
28+
client = KafkaClient("localhost:9092")
29+
consumer = SimpleConsumer(client, "test-group", "my-topic",
30+
max_buffer_size = None,
31+
)
32+
self.valid = 0
33+
self.invalid = 0
34+
35+
for message in consumer:
36+
if len(message.message.value) == msg_size:
37+
self.valid += 1
38+
else:
39+
self.invalid += 1
40+
41+
def main():
42+
threads = [
43+
Producer(),
44+
Consumer()
45+
]
46+
47+
for t in threads:
48+
t.start()
49+
50+
time.sleep(10)
51+
print 'Messages sent: %d' % threads[0].sent
52+
print 'Messages recvd: %d' % threads[1].valid
53+
print 'Messages invalid: %d' % threads[1].invalid
54+
55+
if __name__ == "__main__":
56+
logging.basicConfig(
57+
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
58+
level=logging.DEBUG
59+
)
60+
main()

0 commit comments

Comments
 (0)