Skip to content

Offset enhancement for consumer #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,18 @@ def load_metadata_for_topics(self, *topics):

self.topic_partitions[topic] = []
for partition, meta in partitions.items():
self.topic_partitions[topic].append(partition)
topic_part = TopicAndPartition(topic, partition)
if meta.leader == -1:
log.warning('No leader for topic %s partition %s', topic, partition)
self.topics_to_brokers[topic_part] = None
else:
self.topics_to_brokers[topic_part] = brokers[meta.leader]
try:

self.topic_partitions[topic].append(partition)
topic_part = TopicAndPartition(topic, partition)
if meta.leader == -1:
log.warning('No leader for topic %s partition %s', topic, partition)
self.topics_to_brokers[topic_part] = None
else:
self.topics_to_brokers[topic_part] = brokers[meta.leader]

except Exception, e:
logging.warn("Failed to get partition meta for topic " + topic + "," + str(e))

def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
Expand Down
8 changes: 8 additions & 0 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,11 @@ class ConsumerFetchSizeTooSmall(KafkaError):

class ConsumerNoMoreData(KafkaError):
pass

class ClientOffset:
PREVIOUS = -1 # position stores in zookeeper last time
CURRENT_BEGINNING = -2 # current beginning offset (may not be 0)
PREVIOUS_OR_CURRENT_BEGINNING = -3 # Get previous offset firstly, if not
# available, use current beginning
LATEST = -4 # Start from latest, like tail

93 changes: 74 additions & 19 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

from kafka.common import (
ErrorMapping, FetchRequest,
OffsetRequest, OffsetCommitRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
ClientOffset, BrokerResponseError
)

from kafka.util import ReentrantTimer
Expand Down Expand Up @@ -67,8 +68,17 @@ class Consumer(object):
* initialization and fetching metadata of partitions
* Auto-commit logic
* APIs for fetching pending message count
Offset:
#ClientOffset.Zero or 0;
ClientOffset.Previous or -1;
ClientOffset.CurrentBeginning or -2;
ClientOffset.PreviousOrCurrentBeginning or -3; Default.
ClientOffset.Latest or -4;
Other value >= 0;
"""
def __init__(self, client, group, topic, partitions=None, auto_commit=True,
def __init__(self, client, group, topic, partitions=None,
offset = ClientOffset.PREVIOUS_OR_CURRENT_BEGINNING,
auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL):

Expand All @@ -95,27 +105,64 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
self.commit)
self.commit_timer.start()

def get_or_init_offset_callback(resp):
def get_current_offsets_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
return resp.offset
return resp.offsets
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
return 0
else:
raise Exception("OffsetFetchRequest for topic=%s, "
raise BrokerResponseError("OffsetRequest for topic=%s, "
"partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))

# Uncomment for 0.8.1
#
#for partition in partitions:
# req = OffsetFetchRequest(topic, partition)
# (offset,) = self.client.send_offset_fetch_request(group, [req],
# callback=get_or_init_offset_callback,
# fail_on_error=False)
# self.offsets[partition] = offset

# callback for fetching on zookeeper
def get_or_init_previous_offset_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
return resp.offset
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
return 0
else:
raise BrokerResponseError("OffsetFetchRequest for topic=%s, "
"partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))

currTimeMs = int(time.time()*1000)
PAYLOAD_MAX_OFFSET = 2147483647
for partition in partitions:
self.offsets[partition] = 0
# current stream
req = OffsetRequest(topic, partition,currTimeMs,PAYLOAD_MAX_OFFSET)
(raw_offsets,) = self.client.send_offset_request([req],
fail_on_error=False,
callback=get_current_offsets_callback)
offset_start = raw_offsets[-1]
offset_end = raw_offsets[0]

# zookeeper
req = OffsetFetchRequest(topic, partition)
(last_offset,) = self.client.send_offset_fetch_request(group, [req],
callback=get_or_init_previous_offset_callback,
fail_on_error=False)

if offset == ClientOffset.PREVIOUS_OR_CURRENT_BEGINNING:
if offset_start <= last_offset <= offset_end:
self.offsets[partition] = last_offset
else:
self.offsets[partition] = offset_start
elif offset == ClientOffset.PREVIOUS:
self.offsets[partition] = last_offset
elif offset == ClientOffset.CURRENT_BEGINNING:
self.offsets[partition] = offset_start
elif offset == ClientOffset.LATEST:
self.offsets[partition] = offset_end
elif offset >=0:
if offset_start <= offset <= offset_end:
for partition in partitions:
self.offsets[partition] = offset
else:
raise ValueError("Invalid parameter value offset=%d,"
"allowed range %d to %d"
% (offset,offset_start,offset_end))

def commit(self, partitions=None):
"""
Expand Down Expand Up @@ -205,6 +252,7 @@ class SimpleConsumer(Consumer):
client: a connected KafkaClient
group: a name for this consumer, used for offset storage and must be unique
topic: the topic to consume
offset: default to previous position if available, or the current beginning
partitions: An optional list of partitions to consume the data from

auto_commit: default True. Whether or not to auto commit the offsets
Expand All @@ -227,7 +275,9 @@ class SimpleConsumer(Consumer):
commit method on this class. A manual call to commit will also reset
these triggers
"""
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
def __init__(self, client, group, topic,
offset = ClientOffset.PREVIOUS_OR_CURRENT_BEGINNING,
auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
fetch_size_bytes=FETCH_MIN_BYTES,
Expand All @@ -236,6 +286,7 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
iter_timeout=None):
super(SimpleConsumer, self).__init__(
client, group, topic,
offset=offset,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
Expand Down Expand Up @@ -450,7 +501,7 @@ def _fetch(self):
log.debug("Done iterating over partition %s" % partition)
partitions = retry_partitions

def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
def _mp_consume(client, group, topic, offset, chunk, queue, start, exit, pause, size):
"""
A child process worker which consumes messages based on the
notifications given by the controller process
Expand All @@ -467,6 +518,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# done by the master controller process.
consumer = SimpleConsumer(client, group, topic,
partitions=chunk,
offset = offset,
auto_commit=False,
auto_commit_every_n=None,
auto_commit_every_t=None)
Expand Down Expand Up @@ -532,14 +584,17 @@ class MultiProcessConsumer(Consumer):
commit method on this class. A manual call to commit will also reset
these triggers
"""
def __init__(self, client, group, topic, auto_commit=True,
def __init__(self, client, group, topic,
offset = ClientOffset.PREVIOUS_OR_CURRENT_BEGINNING,
auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
num_procs=1, partitions_per_proc=0):

# Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(
client, group, topic,
offset=offset,
partitions=None,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
Expand Down Expand Up @@ -572,7 +627,7 @@ def __init__(self, client, group, topic, auto_commit=True,
for chunk in chunks:
chunk = filter(lambda x: x is not None, chunk)
args = (client.copy(),
group, topic, chunk,
group, topic, offset, chunk,
self.queue, self.start, self.exit,
self.pause, self.size)

Expand Down