Skip to content

Consumer intermittently hangs when committing offsets #1728

Closed
@jeffwidman

Description

@jeffwidman

There is some sort of deadlock that we are intermittently hitting within kafka-python when our applications are calling commit(). The consumer will drop out of the group w/o the process actually dying and the only fix is to restart the process.

This is hurting us badly, we are having to restart multiple applications every six hours. However, I have not yet been able to reliably reproduce what's happening. Here is what I have so far.

The code is wrapped in a thin-layer, so the pseudo code/configs may be slightly named differently, but it should be relatively clear what's happening.

Kafka Broker Version: 1.0.1

kafka-python Version: commit eed59ba (this is newer than 1.4.4, as I wanted to be sure we picked up the latest deadlock fix from a4f0cb8)

KafkaConsumer config:

{
    "max_poll_records": 5,
    "connections_max_idle_ms":540000,
    "request_timeout_ms":305000,
    "session_timeout_ms":10000,
    "heartbeat_interval_ms":3000,
    "auto_offset_reset": "earliest"
}

Application pseudo-code:

while True:
    logging.info("About to call consumer.poll for new messages")
    messages = consumer.poll()
    logging.info("Finished consumer.poll, now process_messages")
    process_messages(messages)
    logging.info("Finished process_messages, now committing new offsets")
    consumer.commit()

I enabled debug logging for kafka-python, and noticed a difference between healthy and unhealthy logs.

Here are healthy logs:

2019-03-05 00:55:35,422     INFO __main__                 ns_consumer.py:0213 NSLog: Finished process_messages, now committing new offsets
# here the application calls consumer.commit()
2019-03-05 00:55:35,422    DEBUG    kafka.coordinator.consumer      consumer.py:0616 NSLog: Sending offset-commit request with {TopicPartition(topic=u'secret_topic', partition=54): OffsetAndMetadata(offset=11822203, metadata='')} for group secret_group.ns_consumer to 5
2019-03-05 00:55:35,422    DEBUG    kafka.protocol.parser           parser.py:0053 NSLog: Sending request OffsetCommitRequest_v2(consumer_group='secret_group.ns_consumer', consumer_group_generation_id=3238, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=54, offset=11822203, metadata='')])])
2019-03-05 00:55:35,423    DEBUG    kafka.conn                      conn.py:0768 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Request 401: OffsetCommitRequest_v2(consumer_group='secret_group.ns_consumer', consumer_group_generation_id=3238, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=54, offset=11822203, metadata='')])])
2019-03-05 00:55:35,426    DEBUG    kafka.protocol.parser           parser.py:0133 NSLog: Received correlation id: 401
2019-03-05 00:55:35,426    DEBUG    kafka.protocol.parser           parser.py:0160 NSLog: Processing response OffsetCommitResponse_v2
2019-03-05 00:55:35,427    DEBUG    kafka.conn                      conn.py:0816 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Response 401 (3.74102592468 ms): OffsetCommitResponse_v2(topics=[(topic=u'secret_topic', partitions=[(partition=54, error_code=0)])])
2019-03-05 00:55:35,427    DEBUG    kafka.coordinator.consumer      consumer.py:0637 NSLog: Group secret_group.ns_consumer committed offset OffsetAndMetadata(offset=11822203, metadata='') for partition TopicPartition(topic=u'secret_topic', partition=54)
# offset successfully committed, control returns to the application which calls poll() again
2019-03-05 00:55:35,427     INFO    __main__                        ns_consumer.py:0195 NSLog: About to call consumer.poll for new messages

Here are the stuck logs:

2019-03-05 00:55:51,023     INFO    __main__                        ns_consumer.py:0213 NSLog: Finished process_messages, now committing new offsets
# here the application calls consumer.commit()
2019-03-05 00:55:51,023    DEBUG    kafka.coordinator.consumer      consumer.py:0616 NSLog: Sending offset-commit request with {TopicPartition(topic=u'secret_topic', partition=54): OffsetAndMetadata(offset=11822204, metadata='')} for group secret_group.ns_consumer to 5
2019-03-05 00:55:51,023    DEBUG    kafka.protocol.parser           parser.py:0053 NSLog: Sending request OffsetCommitRequest_v2(consumer_group='secret_group.ns_consumer', consumer_group_generation_id=3238, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=54, offset=11822204, metadata='')])])
2019-03-05 00:55:51,024    DEBUG    kafka.conn                      conn.py:0768 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Request 402: OffsetCommitRequest_v2(consumer_group='secret_group.ns_consumer', consumer_group_generation_id=3238, consumer_id=u'secret_id', retention_time=-1, topics=[(topic=u'secret_topic', partitions=[(partition=54, offset=11822204, metadata='')])])
2019-03-05 00:55:51,034    DEBUG    kafka.protocol.parser           parser.py:0133 NSLog: Received correlation id: 402
2019-03-05 00:55:51,034    DEBUG    kafka.protocol.parser           parser.py:0160 NSLog: Processing response OffsetCommitResponse_v2
2019-03-05 00:55:51,034    DEBUG    kafka.conn                      conn.py:0816 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Response 402 (10.4370117188 ms): OffsetCommitResponse_v2(topics=[(topic=u'secret_topic', partitions=[(partition=54, error_code=0)])])
2019-03-05 00:32:22,882    DEBUG    kafka.coordinator                        base.py:0801 NSLog: Heartbeat: secret_group.ns_consumer[33844] secret_id
2019-03-05 00:32:22,883    DEBUG    kafka.protocol.parser                      parser.py:0053 NSLog: Sending request HeartbeatRequest_v0(group='secret_group.ns_consumer', generation_id=33844, member_id=u'secret_id')
2019-03-05 00:32:22,883    DEBUG    kafka.conn                        conn.py:0768 NSLog: <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Request 1406: HeartbeatRequest_v0(group='secret_group.ns_consumer', generation_id=33844, member_id=u'secret_id')
2019-03-05 00:32:22,884    DEBUG    kafka.protocol.parser                      parser.py:0133 NSLog: Received correlation id: 1406
2019-03-05 00:32:22,884    DEBUG    kafka.protocol.parser                      parser.py:0160 NSLog: Processing response HeartbeatResponse_v0
2019-03-05 00:32:22,884    DEBUG    kafka.conn                        conn.py:0816 NSLog: <BrokerConnection node_id=3 host=kafka03.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Response 1406 (1.1990070343 ms): HeartbeatResponse_v0(error_code=0)
2019-03-05 00:32:22,885    DEBUG    kafka.coordinator                        base.py:0814 NSLog: Received successful heartbeat response for group secret_group.ns_consumer
# Logs continue to show heartbeating until the heartbeat timer expires, then the heartbeat thread issues a LeaveGroupRequest
# Also within the rest of the logs are intermittent metadata calls. I'm not sure which thread is issuing them, for some reason the python logging formatter wasn't printing any info for `thread`/`threadName`

Diff'ing the two, it appears that something is breaking between the following two lines:

2019-03-05 00:55:35,427    DEBUG    kafka.conn                      conn.py:0816 NSLog: <BrokerConnection node_id=5 host=kafka05.net:9092 <connected> [IPv4 ('172.xxx', 9092)]> Response 401 (3.74102592468 ms): OffsetCommitResponse_v2(topics=[(topic=u'secret_topic', partitions=[(partition=54, error_code=0)])])
2019-03-05 00:55:35,427    DEBUG    kafka.coordinator.consumer      consumer.py:0637 NSLog: Group secret_group.ns_consumer committed offset OffsetAndMetadata(offset=11822203, metadata='') for partition TopicPartition(topic=u'secret_topic', partition=54)

I am not sure if the problem is in the main thread or the background heartbeat thread:

  • The background heartbeat thread might be getting swapped in and then takes a lock and never lets go.
  • The main thread could be getting stuck and simply sits idle, in which case the background thread is working as expected.
  • The background thread might be reading the offset commit response in which case the main thread never receives it... I think this unlikely, but the python logging formatter wasn't printing anything for thread/threadID, so I cannot completely rule this out yet. I assume if this happened that the main thread would have thrown an error timing out the offset commit request, but I have not yet confirmed that the code works that way.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions