Skip to content

Commit febfdac

Browse files
authored
Delay group coordinator until after bootstrap (#2539)
1 parent c3c20cb commit febfdac

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

kafka/coordinator/base.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -252,12 +252,16 @@ def ensure_coordinator_ready(self, timeout_ms=None):
252252
# so we will just pick a node at random and treat
253253
# it as the "coordinator"
254254
if self.config['api_version'] < (0, 8, 2):
255-
self.coordinator_id = self._client.least_loaded_node()
256-
if self.coordinator_id is not None:
255+
maybe_coordinator_id = self._client.least_loaded_node()
256+
if maybe_coordinator_id is None or self._client.cluster.is_bootstrap(maybe_coordinator_id):
257+
future = Future().failure(Errors.NoBrokersAvailable())
258+
else:
259+
self.coordinator_id = maybe_coordinator_id
257260
self._client.maybe_connect(self.coordinator_id)
258-
continue
261+
continue
262+
else:
263+
future = self.lookup_coordinator()
259264

260-
future = self.lookup_coordinator()
261265
self._client.poll(future=future, timeout_ms=inner_timeout_ms())
262266

263267
if not future.is_done:
@@ -677,7 +681,7 @@ def _send_group_coordinator_request(self):
677681
Future: resolves to the node id of the coordinator
678682
"""
679683
node_id = self._client.least_loaded_node()
680-
if node_id is None:
684+
if node_id is None or self._client.cluster.is_bootstrap(node_id):
681685
return Future().failure(Errors.NoBrokersAvailable())
682686

683687
elif not self._client.ready(node_id, metadata_priority=False):

0 commit comments

Comments
 (0)