Skip to content

Commit bbbac3d

Browse files
committed
Fixup for #1085 -- only check for changed metadata on disconnected nodes
1 parent 2a41fa1 commit bbbac3d

File tree

1 file changed

+23
-17
lines changed

1 file changed

+23
-17
lines changed

kafka/client_async.py

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -325,31 +325,37 @@ def _conn_state_change(self, node_id, conn):
325325
def _maybe_connect(self, node_id):
326326
"""Idempotent non-blocking connection attempt to the given node id."""
327327
broker = self.cluster.broker_metadata(node_id)
328+
conn = self._conns.get(node_id)
328329

329-
# If broker metadata indicates that a node's host/port has changed, remove it
330-
if node_id in self._conns and broker is not None:
331-
conn = self._conns[node_id]
332-
host, _, __ = get_ip_port_afi(broker.host)
333-
if conn.host != host or conn.port != broker.port:
334-
log.debug("Closing connection to decommissioned node %s at %s:%s",
335-
node_id, conn.host, conn.port)
336-
conn.close()
337-
self._conns.pop(node_id)
338-
339-
if node_id not in self._conns:
330+
if conn is None:
340331
assert broker, 'Broker id %s not in current metadata' % node_id
341332

342333
log.debug("Initiating connection to node %s at %s:%s",
343334
node_id, broker.host, broker.port)
344335
host, port, afi = get_ip_port_afi(broker.host)
345336
cb = functools.partial(self._conn_state_change, node_id)
346-
self._conns[node_id] = BrokerConnection(host, broker.port, afi,
347-
state_change_callback=cb,
348-
node_id=node_id,
349-
**self.config)
350-
conn = self._conns[node_id]
351-
if conn.connected():
337+
conn = BrokerConnection(host, broker.port, afi,
338+
state_change_callback=cb,
339+
node_id=node_id,
340+
**self.config)
341+
self._conns[node_id] = conn
342+
343+
# Check if existing connection should be recreated because host/port changed
344+
elif conn.disconnected() and broker is not None:
345+
host, _, __ = get_ip_port_afi(broker.host)
346+
if conn.host != host or conn.port != broker.port:
347+
log.info("Broker metadata change detected for node %s"
348+
" from %s:%s to %s:%s", node_id, conn.host, conn.port,
349+
broker.host, broker.port)
350+
351+
# Drop old connection object.
352+
# It will be recreated on next _maybe_connect
353+
self._conns.pop(node_id)
354+
return False
355+
356+
elif conn.connected():
352357
return True
358+
353359
conn.connect()
354360
return conn.connected()
355361

0 commit comments

Comments
 (0)