Description
We noticed a critical bug (or expected behavior, you will tell but this is totally not expected IMHO) with Kafka poll()
. It's very easy to reproduce I believe.
We initialize Kafka Consumer with the following code:
self.consumer = KafkaConsumer(*kafka_topics, group_id=self.group_id, bootstrap_servers=BOOTSTRAP_SERVERS,
fetch_max_wait_ms=500, reconnect_backoff_ms=100,
reconnect_backoff_max_ms=1000, connections_max_idle_ms=540000,
request_timeout_ms=15000, heartbeat_interval_ms=2000,
auto_offset_reset=self.auto_offset_reset, enable_auto_commit=False,
metadata_max_age_ms=1500,
partition_assignment_strategy=[StickyPartitionAssignor])
Variables:
self.auto_offset_reset = 'earliest'
Then we consume messages in an infinite loop calling the function below constantly. When we want to consume something we ask for max_records=N
(N is between 1..25 in our case) and commit afterwards. When we don't want any messages we simply consume max_records=1
and don't commit. This ensures Kafka being polled all the time so heartbeats are being sent and consumer stays all the time active. We simply take care of the commit ourselves.
def get_messages(self, max_records=1):
data = self.consumer.poll(timeout_ms=self.timeout, max_records=max_records)
ready_messages = list()
if data:
for tp, messages in data.items():
for message in messages:
logger.debug("Consumed %s:%d:%d: key=%s value=%s" % (tp.topic, tp.partition, message.offset,
message.key, message.value))
ready_messages.extend(messages)
return ready_messages
Here's the most interesting part. Let's say we post 1000 messages to an empty topic. We do count all the messages consumed since consumer was started. We noticed a horrible, unforgivable behavior. Although all the 1000 messages are marked as consumed (offset moved) we're receiving about 700 of them via get_messages()
only!
What happens to hundreds of messages which are lost forever? I cannot believe nobody noticed it but this is a critical bug to be reported, the one which prevents us from using the library instantly.