Skip to content

Kafka versions #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def main():

time.sleep(5)


if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
Expand Down
5 changes: 3 additions & 2 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
__license__ = 'Apache License 2.0'
__copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0'

from kafka.client import KafkaClient
from kafka.client import KafkaClient, Kafka080Client, Kafka081Client, Kafka082Client
from kafka.conn import KafkaConnection
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message
Expand All @@ -14,7 +14,8 @@
from kafka.consumer import SimpleConsumer, MultiProcessConsumer

__all__ = [
'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer',
'KafkaClient', 'Kafka080Client', 'Kafka081Client', 'Kafka082Client',
'KafkaConnection', 'SimpleProducer', 'KeyedProducer',
'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer',
'MultiProcessConsumer', 'create_message', 'create_gzip_message',
'create_snappy_message'
Expand Down
49 changes: 47 additions & 2 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

log = logging.getLogger("kafka")


class KafkaClient(object):

server_version = "unknown"
CLIENT_ID = "kafka-python"
ID_GEN = count()

Expand Down Expand Up @@ -181,7 +181,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
return (acc[k] for k in original_keys) if acc else ()

def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)
return '<KafkaClient version=%s, client_id=%s>' % (self.server_version, self.client_id)

def _raise_on_response_error(self, resp):
if resp.error == ErrorMapping.NO_ERROR:
Expand All @@ -198,6 +198,23 @@ def _raise_on_response_error(self, resp):
#################
# Public API #
#################

def keyed_producer(self, **kwargs):
import kafka
return kafka.producer.KeyedProducer(self, **kwargs)

def simple_producer(self, **kwargs):
import kafka
return kafka.producer.SimpleProducer(self, **kwargs)

def simple_consumer(self, group, topic, **kwargs):
import kafka
return kafka.consumer.SimpleConsumer(self, group, topic, **kwargs)

def multiprocess_consumer(self, group, topic, **kwargs):
import kafka
return kafka.consumer.MultiProcessConsumer(self, group, topic, **kwargs)

def reset_topic_metadata(self, *topics):
for topic in topics:
try:
Expand Down Expand Up @@ -396,3 +413,31 @@ def send_offset_fetch_request(self, group, payloads=[],
else:
out.append(resp)
return out


class Kafka082Client(KafkaClient):
server_version = "0.8.2"


class Kafka081Client(KafkaClient):
server_version = "0.8.1"


class Kafka080Client(KafkaClient):
server_version = "0.8.0"

def simple_consumer(self, group, topic, **kwargs):
assert not kwargs.get('auto_commit')
kwargs['auto_commit'] = False

consumer = super(Kafka080Client, self).simple_consumer(group, topic, **kwargs)
consumer.seek(0, 2)

return consumer

def multiprocess_consumer(self, group, topic, **kwargs):
assert not kwargs.get('auto_commit')
kwargs['auto_commit'] = False

return super(Kafka080Client, self).multiprocess_consumer(group, topic, **kwargs)

30 changes: 15 additions & 15 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from kafka.common import (
ErrorMapping, FetchRequest,
OffsetRequest, OffsetCommitRequest,
OffsetFetchRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
)

Expand Down Expand Up @@ -105,17 +106,16 @@ def get_or_init_offset_callback(resp):
"partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))

# Uncomment for 0.8.1
#
#for partition in partitions:
# req = OffsetFetchRequest(topic, partition)
# (offset,) = self.client.send_offset_fetch_request(group, [req],
# callback=get_or_init_offset_callback,
# fail_on_error=False)
# self.offsets[partition] = offset

for partition in partitions:
self.offsets[partition] = 0
if auto_commit:
for partition in partitions:
req = OffsetFetchRequest(topic, partition)
(offset,) = self.client.send_offset_fetch_request(group, [req],
callback=get_or_init_offset_callback,
fail_on_error=False)
self.offsets[partition] = offset
else:
for partition in partitions:
self.offsets[partition] = 0

def commit(self, partitions=None):
"""
Expand Down Expand Up @@ -255,8 +255,8 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
self.queue = Queue()

def __repr__(self):
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
(self.group, self.topic, str(self.offsets.keys()))
return '<SimpleConsumer version=%s, group=%s, topic=%s, partitions=%s>' % \
(self.client.server_version, self.group, self.topic, str(self.offsets.keys()))

def provide_partition_info(self):
"""
Expand Down Expand Up @@ -582,8 +582,8 @@ def __init__(self, client, group, topic, auto_commit=True,
self.procs.append(proc)

def __repr__(self):
return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
(self.group, self.topic, len(self.procs))
return '<MultiProcessConsumer version=%s, group=%s, topic=%s, consumers=%d>' % \
(self.client.server_version, self.group, self.topic, len(self.procs))

def stop(self):
# Set exit and start off all waiting consumers
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def send_messages(self, topic, *msg):
return super(SimpleProducer, self).send_messages(topic, partition, *msg)

def __repr__(self):
return '<SimpleProducer batch=%s>' % self.async
return '<SimpleProducer version=%s, batch=%s>' % (self.client.server_version, self.async)


class KeyedProducer(Producer):
Expand Down Expand Up @@ -254,4 +254,4 @@ def send(self, topic, key, msg):
return self.send_messages(topic, partition, msg)

def __repr__(self):
return '<KeyedProducer batch=%s>' % self.async
return '<KeyedProducer version=%s batch=%s>' % (self.client.server_version, self.async)