Skip to content

Commit da65a56

Browse files
authored
Fix coordinator join_future race condition (#1338)
* Fix race condition in coordinator join_future handling
1 parent a69320b commit da65a56

File tree

2 files changed

+21
-4
lines changed

2 files changed

+21
-4
lines changed

kafka/coordinator/base.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -377,19 +377,23 @@ def ensure_active_group(self):
377377
# before the pending rebalance has completed.
378378
if self.join_future is None:
379379
self.state = MemberState.REBALANCING
380-
self.join_future = self._send_join_group_request()
380+
future = self._send_join_group_request()
381+
382+
self.join_future = future # this should happen before adding callbacks
381383

382384
# handle join completion in the callback so that the
383385
# callback will be invoked even if the consumer is woken up
384386
# before finishing the rebalance
385-
self.join_future.add_callback(self._handle_join_success)
387+
future.add_callback(self._handle_join_success)
386388

387389
# we handle failures below after the request finishes.
388390
# If the join completes after having been woken up, the
389391
# exception is ignored and we will rejoin
390-
self.join_future.add_errback(self._handle_join_failure)
392+
future.add_errback(self._handle_join_failure)
393+
394+
else:
395+
future = self.join_future
391396

392-
future = self.join_future
393397
self._client.poll(future=future)
394398

395399
if future.failed():

test/test_coordinator.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,3 +620,16 @@ def test_lookup_coordinator_failure(mocker, coordinator):
620620
return_value=Future().failure(Exception('foobar')))
621621
future = coordinator.lookup_coordinator()
622622
assert future.failed()
623+
624+
625+
def test_ensure_active_group(mocker, coordinator):
626+
coordinator._subscription.subscribe(topics=['foobar'])
627+
mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
628+
mocker.patch.object(coordinator, '_send_join_group_request', return_value=Future().success(True))
629+
mocker.patch.object(coordinator, 'need_rejoin', side_effect=[True, True, False])
630+
mocker.patch.object(coordinator, '_on_join_complete')
631+
mocker.patch.object(coordinator, '_heartbeat_thread')
632+
633+
coordinator.ensure_active_group()
634+
635+
coordinator._send_join_group_request.assert_called_once_with()

0 commit comments

Comments
 (0)