Skip to content

Commit 72fdf39

Browse files
committed
Merge branch 'master' into develop
2 parents 8bcf0f0 + 4abf7ee commit 72fdf39

File tree

4 files changed

+168
-128
lines changed

4 files changed

+168
-128
lines changed

README.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE`
1717

1818
# Status
1919

20-
The current version of this package is **0.9.0** and is compatible with
20+
The current version of this package is **0.9.0** and is compatible with
2121
Kafka brokers running version **0.8.1**.
2222

2323
# Usage
@@ -32,24 +32,24 @@ from kafka.producer import SimpleProducer, KeyedProducer
3232
kafka = KafkaClient("localhost", 9092)
3333

3434
# To send messages synchronously
35-
producer = SimpleProducer(kafka, "my-topic")
36-
producer.send_messages("some message")
37-
producer.send_messages("this method", "is variadic")
35+
producer = SimpleProducer(kafka)
36+
producer.send_messages("my-topic", "some message")
37+
producer.send_messages("my-topic", "this method", "is variadic")
3838

3939
# To send messages asynchronously
40-
producer = SimpleProducer(kafka, "my-topic", async=True)
41-
producer.send_messages("async message")
40+
producer = SimpleProducer(kafka, async=True)
41+
producer.send_messages("my-topic", "async message")
4242

4343
# To wait for acknowledgements
4444
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
4545
# a local log before sending response
4646
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
4747
# by all in sync replicas before sending a response
48-
producer = SimpleProducer(kafka, "my-topic", async=False,
48+
producer = SimpleProducer(kafka, async=False,
4949
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
5050
ack_timeout=2000)
5151

52-
response = producer.send_messages("async message")
52+
response = producer.send_messages("my-topic", "async message")
5353

5454
if response:
5555
print(response[0].error)
@@ -62,7 +62,7 @@ if response:
6262
# Notes:
6363
# * If the producer dies before the messages are sent, there will be losses
6464
# * Call producer.stop() to send the messages and cleanup
65-
producer = SimpleProducer(kafka, "my-topic", batch_send=True,
65+
producer = SimpleProducer(kafka, batch_send=True,
6666
batch_send_every_n=20,
6767
batch_send_every_t=60)
6868

@@ -83,11 +83,11 @@ from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
8383
kafka = KafkaClient("localhost", 9092)
8484

8585
# HashedPartitioner is default
86-
producer = KeyedProducer(kafka, "my-topic")
87-
producer.send("key1", "some message")
88-
producer.send("key2", "this methode")
86+
producer = KeyedProducer(kafka)
87+
producer.send("my-topic", "key1", "some message")
88+
producer.send("my-topic", "key2", "this methode")
8989

90-
producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
90+
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
9191
```
9292

9393
## Multiprocess consumer

kafka/consumer.py

Lines changed: 70 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import absolute_import
22

3-
from collections import defaultdict
43
from itertools import izip_longest, repeat
54
import logging
65
import time
@@ -235,6 +234,12 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
235234
buffer_size=FETCH_BUFFER_SIZE_BYTES,
236235
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
237236
iter_timeout=None):
237+
super(SimpleConsumer, self).__init__(
238+
client, group, topic,
239+
partitions=partitions,
240+
auto_commit=auto_commit,
241+
auto_commit_every_n=auto_commit_every_n,
242+
auto_commit_every_t=auto_commit_every_t)
238243

239244
if max_buffer_size is not None and buffer_size > max_buffer_size:
240245
raise ValueError("buffer_size (%d) is greater than "
@@ -245,17 +250,10 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
245250
self.partition_info = False # Do not return partition info in msgs
246251
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
247252
self.fetch_min_bytes = fetch_size_bytes
248-
self.fetch_started = defaultdict(bool) # defaults to false
253+
self.fetch_offsets = self.offsets.copy()
249254
self.iter_timeout = iter_timeout
250255
self.queue = Queue()
251256

252-
super(SimpleConsumer, self).__init__(
253-
client, group, topic,
254-
partitions=partitions,
255-
auto_commit=auto_commit,
256-
auto_commit_every_n=auto_commit_every_n,
257-
auto_commit_every_t=auto_commit_every_t)
258-
259257
def __repr__(self):
260258
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
261259
(self.group, self.topic, str(self.offsets.keys()))
@@ -305,6 +303,10 @@ def seek(self, offset, whence):
305303
else:
306304
raise ValueError("Unexpected value for `whence`, %d" % whence)
307305

306+
# Reset queue and fetch offsets since they are invalid
307+
self.fetch_offsets = self.offsets.copy()
308+
self.queue = Queue()
309+
308310
def get_messages(self, count=1, block=True, timeout=0.1):
309311
"""
310312
Fetch the specified number of messages
@@ -316,33 +318,69 @@ def get_messages(self, count=1, block=True, timeout=0.1):
316318
it will block forever.
317319
"""
318320
messages = []
319-
if timeout:
321+
if timeout is not None:
320322
max_time = time.time() + timeout
321323

324+
new_offsets = {}
322325
while count > 0 and (timeout is None or timeout > 0):
323-
message = self.get_message(block, timeout)
324-
if message:
325-
messages.append(message)
326+
result = self._get_message(block, timeout, get_partition_info=True,
327+
update_offset=False)
328+
if result:
329+
partition, message = result
330+
if self.partition_info:
331+
messages.append(result)
332+
else:
333+
messages.append(message)
334+
new_offsets[partition] = message.offset + 1
326335
count -= 1
327336
else:
328337
# Ran out of messages for the last request.
329338
if not block:
330339
# If we're not blocking, break.
331340
break
332-
if timeout:
341+
if timeout is not None:
333342
# If we're blocking and have a timeout, reduce it to the
334343
# appropriate value
335344
timeout = max_time - time.time()
336345

346+
# Update and commit offsets if necessary
347+
self.offsets.update(new_offsets)
348+
self.count_since_commit += len(messages)
349+
self._auto_commit()
337350
return messages
338351

339-
def get_message(self, block=True, timeout=0.1):
352+
def get_message(self, block=True, timeout=0.1, get_partition_info=None):
353+
return self._get_message(block, timeout, get_partition_info)
354+
355+
def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
356+
update_offset=True):
357+
"""
358+
If no messages can be fetched, returns None.
359+
If get_partition_info is None, it defaults to self.partition_info
360+
If get_partition_info is True, returns (partition, message)
361+
If get_partition_info is False, returns message
362+
"""
340363
if self.queue.empty():
341364
# We're out of messages, go grab some more.
342365
with FetchContext(self, block, timeout):
343366
self._fetch()
344367
try:
345-
return self.queue.get_nowait()
368+
partition, message = self.queue.get_nowait()
369+
370+
if update_offset:
371+
# Update partition offset
372+
self.offsets[partition] = message.offset + 1
373+
374+
# Count, check and commit messages if necessary
375+
self.count_since_commit += 1
376+
self._auto_commit()
377+
378+
if get_partition_info is None:
379+
get_partition_info = self.partition_info
380+
if get_partition_info:
381+
return partition, message
382+
else:
383+
return message
346384
except Empty:
347385
return None
348386

@@ -367,11 +405,11 @@ def __iter__(self):
367405
def _fetch(self):
368406
# Create fetch request payloads for all the partitions
369407
requests = []
370-
partitions = self.offsets.keys()
408+
partitions = self.fetch_offsets.keys()
371409
while partitions:
372410
for partition in partitions:
373411
requests.append(FetchRequest(self.topic, partition,
374-
self.offsets[partition],
412+
self.fetch_offsets[partition],
375413
self.buffer_size))
376414
# Send request
377415
responses = self.client.send_fetch_request(
@@ -384,18 +422,9 @@ def _fetch(self):
384422
partition = resp.partition
385423
try:
386424
for message in resp.messages:
387-
# Update partition offset
388-
self.offsets[partition] = message.offset + 1
389-
390-
# Count, check and commit messages if necessary
391-
self.count_since_commit += 1
392-
self._auto_commit()
393-
394425
# Put the message in our queue
395-
if self.partition_info:
396-
self.queue.put((partition, message))
397-
else:
398-
self.queue.put(message)
426+
self.queue.put((partition, message))
427+
self.fetch_offsets[partition] = message.offset + 1
399428
except ConsumerFetchSizeTooSmall, e:
400429
if (self.max_buffer_size is not None and
401430
self.buffer_size == self.max_buffer_size):
@@ -585,12 +614,11 @@ def __iter__(self):
585614
break
586615

587616
# Count, check and commit messages if necessary
588-
self.offsets[partition] = message.offset
617+
self.offsets[partition] = message.offset + 1
589618
self.start.clear()
590-
yield message
591-
592619
self.count_since_commit += 1
593620
self._auto_commit()
621+
yield message
594622

595623
self.start.clear()
596624

@@ -613,9 +641,10 @@ def get_messages(self, count=1, block=True, timeout=10):
613641
self.size.value = count
614642
self.pause.clear()
615643

616-
if timeout:
644+
if timeout is not None:
617645
max_time = time.time() + timeout
618646

647+
new_offsets = {}
619648
while count > 0 and (timeout is None or timeout > 0):
620649
# Trigger consumption only if the queue is empty
621650
# By doing this, we will ensure that consumers do not
@@ -630,16 +659,18 @@ def get_messages(self, count=1, block=True, timeout=10):
630659
break
631660

632661
messages.append(message)
633-
634-
# Count, check and commit messages if necessary
635-
self.offsets[partition] = message.offset
636-
self.count_since_commit += 1
637-
self._auto_commit()
662+
new_offsets[partition] = message.offset + 1
638663
count -= 1
639-
timeout = max_time - time.time()
664+
if timeout is not None:
665+
timeout = max_time - time.time()
640666

641667
self.size.value = 0
642668
self.start.clear()
643669
self.pause.set()
644670

671+
# Update and commit offsets if necessary
672+
self.offsets.update(new_offsets)
673+
self.count_since_commit += len(messages)
674+
self._auto_commit()
675+
645676
return messages

0 commit comments

Comments
 (0)