Skip to content

Commit d2001e4

Browse files
authored
Handle lookup_coordinator send failures (#1279)
1 parent 8b05ee8 commit d2001e4

File tree

2 files changed

+19
-5
lines changed

2 files changed

+19
-5
lines changed

kafka/coordinator/base.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,17 @@ def _reset_find_coordinator_future(self, result):
228228
self._find_coordinator_future = None
229229

230230
def lookup_coordinator(self):
231-
if self._find_coordinator_future is None:
232-
self._find_coordinator_future = self._send_group_coordinator_request()
233-
234-
self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
235-
return self._find_coordinator_future
231+
if self._find_coordinator_future is not None:
232+
return self._find_coordinator_future
233+
234+
# If there is an error sending the group coordinator request
235+
# then _reset_find_coordinator_future will immediately fire and
236+
# set _find_coordinator_future = None
237+
# To avoid returning None, we capture the future in a local variable
238+
self._find_coordinator_future = self._send_group_coordinator_request()
239+
future = self._find_coordinator_future
240+
self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
241+
return future
236242

237243
def need_rejoin(self):
238244
"""Check whether the group should be rejoined (e.g. if metadata changes)

test/test_coordinator.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,3 +590,11 @@ def test_heartbeat(patched_coord):
590590
patched_coord.heartbeat_task()
591591
assert patched_coord._client.schedule.call_count == 1
592592
assert patched_coord.heartbeat_task._handle_heartbeat_failure.call_count == 1
593+
594+
595+
def test_lookup_coordinator_failure(mocker, coordinator):
596+
597+
mocker.patch.object(coordinator, '_send_group_coordinator_request',
598+
return_value=Future().failure(Exception('foobar')))
599+
future = coordinator.lookup_coordinator()
600+
assert future.failed()

0 commit comments

Comments
 (0)