File tree Expand file tree Collapse file tree 1 file changed +8
-4
lines changed Expand file tree Collapse file tree 1 file changed +8
-4
lines changed Original file line number Diff line number Diff line change @@ -377,19 +377,23 @@ def ensure_active_group(self):
377
377
# before the pending rebalance has completed.
378
378
if self .join_future is None :
379
379
self .state = MemberState .REBALANCING
380
- self .join_future = self ._send_join_group_request ()
380
+ future = self ._send_join_group_request ()
381
+
382
+ self .join_future = future # this should happen before adding callbacks
381
383
382
384
# handle join completion in the callback so that the
383
385
# callback will be invoked even if the consumer is woken up
384
386
# before finishing the rebalance
385
- self . join_future .add_callback (self ._handle_join_success )
387
+ future .add_callback (self ._handle_join_success )
386
388
387
389
# we handle failures below after the request finishes.
388
390
# If the join completes after having been woken up, the
389
391
# exception is ignored and we will rejoin
390
- self .join_future .add_errback (self ._handle_join_failure )
392
+ future .add_errback (self ._handle_join_failure )
393
+
394
+ else :
395
+ future = self .join_future
391
396
392
- future = self .join_future
393
397
self ._client .poll (future = future )
394
398
395
399
if future .failed ():
You can’t perform that action at this time.
0 commit comments