Skip to content

Bug: KafkaConsumer process CPU occupies 100% when kafka cluster restart #2272

Closed
@shijie3

Description

@shijie3

Hello,

bug recurrence:
1、At first, the kafka cluster running normally,and consumer was created at this time.

kafka cluster:
node id | node port
1 9092
2 9093
3 9094

consumer code:

from kafka import KafkaConsumer
class KafkaMonitor(object):
    def __init__(self):
        self.consumer = KafkaConsumer("btest", bootstrap_servers="172.20.68.142:9093")

    def run(self):
        for message in self.consumer:
            print(str(message))
if __name__ == "__main__":
    KafkaMonitor().run()

2、Secondly, the kafka cluster was killed, all node services have been terminated。
Then, I tried to restart the service, but the node ID and the corresponding port information changed。

kafka cluster:
node id | node port
1 9094
2 9092
3 9093

3、The consumer process CPU occupies 100%
image

bug found:
I found that, when kafka cluster restart, the cluster metadata has not been refreshed, but the broker node can establish TCP connection normally. self._conn directly caches the current TCP link that does not match the node ID. The consumer get the node id and send message ( self.epoll.poll(), Kafka Fetch V4 Request ), and get results quickly (Not leader For Partition).

bug fix:
file: kafka/client_async.py
image

image

def poll(self, timeout_ms=None, future=None):
...
  # check the ip\port in conn are consistent with ip\port in metadata
  for node_id, conn in self._conns.items():
      broker = self.cluster.broker_metadata(conn.node_id)
      if broker is None:
          self.close(node_id)
          continue
  
      host, _, afi = get_ip_port_afi(broker.host)
      if conn.host != host or conn.port != broker.port:
          self.close(node_id)
          continue
def _register_send_sockets(self):
    while self._sending:
        conn = self._sending.pop()
        if conn.disconnected():
            continue
        try:
            key = self._selector.get_key(conn._sock)
            events = key.events | selectors.EVENT_WRITE
            self._selector.modify(key.fileobj, events, key.data)
        except KeyError:
            self._selector.register(conn._sock, selectors.EVENT_WRITE, conn)

After testing, the bug has been resolved, please check the solution(code) whether such fixed code can be merged into next version?

thanks

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions