Skip to content

Commit 2a41fa1

Browse files
kaiterramikedpkp
authored andcommitted
Deal with brokers that disappear, reappear with different IP address (#1085)
When KafkaClient connects to a broker in _maybe_connect, it inserts into self._conns a BrokerConnection configured with the current host/port for that node. The BrokerConnection remains there forever, though, so if the broker's IP or host ever changes, KafkaClient has no way to deal with this. The fix is to compare the latest metadata with the current node's connection, and if the host/IP has changed, decommission the old connection and allow a new one to be created. There's also a common race condition on broker startup where the initial metadata request sometimes returns an empty list of brokers, but subsequent requests behave normally. So, we must deal with broker being None here. This change is conservative in that it doesn't remove the connection from self._conns unless the new broker metadata contains an entry for that same node with a new IP/port.
1 parent 26a8102 commit 2a41fa1

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

kafka/client_async.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,19 @@ def _conn_state_change(self, node_id, conn):
324324

325325
def _maybe_connect(self, node_id):
326326
"""Idempotent non-blocking connection attempt to the given node id."""
327+
broker = self.cluster.broker_metadata(node_id)
328+
329+
# If broker metadata indicates that a node's host/port has changed, remove it
330+
if node_id in self._conns and broker is not None:
331+
conn = self._conns[node_id]
332+
host, _, __ = get_ip_port_afi(broker.host)
333+
if conn.host != host or conn.port != broker.port:
334+
log.debug("Closing connection to decommissioned node %s at %s:%s",
335+
node_id, conn.host, conn.port)
336+
conn.close()
337+
self._conns.pop(node_id)
338+
327339
if node_id not in self._conns:
328-
broker = self.cluster.broker_metadata(node_id)
329340
assert broker, 'Broker id %s not in current metadata' % node_id
330341

331342
log.debug("Initiating connection to node %s at %s:%s",

0 commit comments

Comments
 (0)