Description
There is some sort of deadlock that we are intermittently hitting within kafka-python
when our applications are calling commit()
. The consumer will drop out of the group w/o the process actually dying and the only fix is to restart the process.
This is hurting us badly, we are having to restart multiple applications every six hours. However, I have not yet been able to reliably reproduce what's happening. Here is what I have so far.
The code is wrapped in a thin-layer, so the pseudo code/configs may be slightly named differently, but it should be relatively clear what's happening.
Kafka Broker Version: 1.0.1
kafka-python
Version: commit eed59ba (this is newer than 1.4.4, as I wanted to be sure we picked up the latest deadlock fix from a4f0cb8)
KafkaConsumer
config:
{
"max_poll_records": 5,
"connections_max_idle_ms":540000,
"request_timeout_ms":305000,
"session_timeout_ms":10000,
"heartbeat_interval_ms":3000,
"auto_offset_reset": "earliest"
}
Application pseudo-code:
while True:
logging.info("About to call consumer.poll for new messages")
messages = consumer.poll()
logging.info("Finished consumer.poll, now process_messages")
process_messages(messages)
logging.info("Finished process_messages, now committing new offsets")
consumer.commit()
I enabled debug logging for kafka-python
, and noticed a difference between healthy and unhealthy logs.
Here are healthy logs:
2019-03-05 00:55:35,422 INFO __main__ ns_consumer.py:0213 NSLog: Finished process_messages, now committing new offsets
# here the application calls consumer.commit()
2019-03-05 00:55:35,422 DEBUG kafka.coordinator.consumer consumer.py:0616 NSLog: Sending offset-commit request with {TopicPartition(topic=u'secret_topic', partition=54): OffsetAndMetadata(offset=11822203, metadata='')} for group secret_group.ns_consumer to 5
2019-03-05 00:55:35,422 DEBUG kafka.protocol.parser parser.py:0053 NSLog: Sending request OffsetCommitRequest_v2(consumer_group='secret_group.ns_consumer', consumer_group_generation_id=3238, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=54, offset=11822203, metadata='')])])
2019-03-05 00:55:35,423 DEBUG kafka.conn conn.py:0768 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Request 401: OffsetCommitRequest_v2(consumer_group='secret_group.ns_consumer', consumer_group_generation_id=3238, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=54, offset=11822203, metadata='')])])
2019-03-05 00:55:35,426 DEBUG kafka.protocol.parser parser.py:0133 NSLog: Received correlation id: 401
2019-03-05 00:55:35,426 DEBUG kafka.protocol.parser parser.py:0160 NSLog: Processing response OffsetCommitResponse_v2
2019-03-05 00:55:35,427 DEBUG kafka.conn conn.py:0816 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Response 401 (3.74102592468 ms): OffsetCommitResponse_v2(topics=[(topic=u'secret_topic', partitions=[(partition=54, error_code=0)])])
2019-03-05 00:55:35,427 DEBUG kafka.coordinator.consumer consumer.py:0637 NSLog: Group secret_group.ns_consumer committed offset OffsetAndMetadata(offset=11822203, metadata='') for partition TopicPartition(topic=u'secret_topic', partition=54)
# offset successfully committed, control returns to the application which calls poll() again
2019-03-05 00:55:35,427 INFO __main__ ns_consumer.py:0195 NSLog: About to call consumer.poll for new messages
Here are the stuck logs:
2019-03-05 00:55:51,023 INFO __main__ ns_consumer.py:0213 NSLog: Finished process_messages, now committing new offsets
# here the application calls consumer.commit()
2019-03-05 00:55:51,023 DEBUG kafka.coordinator.consumer consumer.py:0616 NSLog: Sending offset-commit request with {TopicPartition(topic=u'secret_topic', partition=54): OffsetAndMetadata(offset=11822204, metadata='')} for group secret_group.ns_consumer to 5
2019-03-05 00:55:51,023 DEBUG kafka.protocol.parser parser.py:0053 NSLog: Sending request OffsetCommitRequest_v2(consumer_group='secret_group.ns_consumer', consumer_group_generation_id=3238, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=54, offset=11822204, metadata='')])])
2019-03-05 00:55:51,024 DEBUG kafka.conn conn.py:0768 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Request 402: OffsetCommitRequest_v2(consumer_group='secret_group.ns_consumer', consumer_group_generation_id=3238, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=54, offset=11822204, metadata='')])])
2019-03-05 00:55:51,034 DEBUG kafka.protocol.parser parser.py:0133 NSLog: Received correlation id: 402
2019-03-05 00:55:51,034 DEBUG kafka.protocol.parser parser.py:0160 NSLog: Processing response OffsetCommitResponse_v2
2019-03-05 00:55:51,034 DEBUG kafka.conn conn.py:0816 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Response 402 (10.4370117188 ms): OffsetCommitResponse_v2(topics=[(topic=u'secret_topic', partitions=[(partition=54, error_code=0)])])
2019-03-05 00:32:22,882 DEBUG kafka.coordinator base.py:0801 NSLog: Heartbeat: secret_group.ns_consumer[33844] secret_id
2019-03-05 00:32:22,883 DEBUG kafka.protocol.parser parser.py:0053 NSLog: Sending request HeartbeatRequest_v0(group='secret_group.ns_consumer', generation_id=33844, member_id=u'secret_id')
2019-03-05 00:32:22,883 DEBUG kafka.conn conn.py:0768 NSLog: <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Request 1406: HeartbeatRequest_v0(group='secret_group.ns_consumer', generation_id=33844, member_id=u'secret_id')
2019-03-05 00:32:22,884 DEBUG kafka.protocol.parser parser.py:0133 NSLog: Received correlation id: 1406
2019-03-05 00:32:22,884 DEBUG kafka.protocol.parser parser.py:0160 NSLog: Processing response HeartbeatResponse_v0
2019-03-05 00:32:22,884 DEBUG kafka.conn conn.py:0816 NSLog: <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Response 1406 (1.1990070343 ms): HeartbeatResponse_v0(error_code=0)
2019-03-05 00:32:22,885 DEBUG kafka.coordinator base.py:0814 NSLog: Received successful heartbeat response for group secret_group.ns_consumer
# Logs continue to show heartbeating until the heartbeat timer expires, then the heartbeat thread issues a LeaveGroupRequest
# Also within the rest of the logs are intermittent metadata calls. I'm not sure which thread is issuing them, for some reason the python logging formatter wasn't printing any info for `thread`/`threadName`
Diff'ing the two, it appears that something is breaking between the following two lines:
2019-03-05 00:55:35,427 DEBUG kafka.conn conn.py:0816 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Response 401 (3.74102592468 ms): OffsetCommitResponse_v2(topics=[(topic=u'secret_topic', partitions=[(partition=54, error_code=0)])])
2019-03-05 00:55:35,427 DEBUG kafka.coordinator.consumer consumer.py:0637 NSLog: Group secret_group.ns_consumer committed offset OffsetAndMetadata(offset=11822203, metadata='') for partition TopicPartition(topic=u'secret_topic', partition=54)
I am not sure if the problem is in the main thread or the background heartbeat thread:
- The background heartbeat thread might be getting swapped in and then takes a lock and never lets go.
- The main thread could be getting stuck and simply sits idle, in which case the background thread is working as expected.
- The background thread might be reading the offset commit response in which case the main thread never receives it... I think this unlikely, but the python logging formatter wasn't printing anything for
thread
/threadID
, so I cannot completely rule this out yet. I assume if this happened that the main thread would have thrown an error timing out the offset commit request, but I have not yet confirmed that the code works that way.