Description
Hello,
bug recurrence:
1、At first, the kafka cluster running normally,and consumer was created at this time.
kafka cluster:
node id | node port
1 9092
2 9093
3 9094
consumer code:
from kafka import KafkaConsumer
class KafkaMonitor(object):
def __init__(self):
self.consumer = KafkaConsumer("btest", bootstrap_servers="172.20.68.142:9093")
def run(self):
for message in self.consumer:
print(str(message))
if __name__ == "__main__":
KafkaMonitor().run()
2、Secondly, the kafka cluster was killed, all node services have been terminated。
Then, I tried to restart the service, but the node ID and the corresponding port information changed。
kafka cluster:
node id | node port
1 9094
2 9092
3 9093
3、The consumer process CPU occupies 100%
bug found:
I found that, when kafka cluster restart, the cluster metadata has not been refreshed, but the broker node can establish TCP connection normally. self._conn directly caches the current TCP link that does not match the node ID. The consumer get the node id and send message ( self.epoll.poll(), Kafka Fetch V4 Request ), and get results quickly (Not leader For Partition).
bug fix:
file: kafka/client_async.py
def poll(self, timeout_ms=None, future=None):
...
# check the ip\port in conn are consistent with ip\port in metadata
for node_id, conn in self._conns.items():
broker = self.cluster.broker_metadata(conn.node_id)
if broker is None:
self.close(node_id)
continue
host, _, afi = get_ip_port_afi(broker.host)
if conn.host != host or conn.port != broker.port:
self.close(node_id)
continue
def _register_send_sockets(self):
while self._sending:
conn = self._sending.pop()
if conn.disconnected():
continue
try:
key = self._selector.get_key(conn._sock)
events = key.events | selectors.EVENT_WRITE
self._selector.modify(key.fileobj, events, key.data)
except KeyError:
self._selector.register(conn._sock, selectors.EVENT_WRITE, conn)
After testing, the bug has been resolved, please check the solution(code) whether such fixed code can be merged into next version?
thanks