Skip to content

Commit 5e3e58a

Browse files
Ayanda-Dmichaelklishin
authored andcommitted
Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster
wide consistency, ensuring only the leader is active/running (cherry picked from commit b675ce2) (cherry picked from commit d9de6d9)
1 parent 00666e2 commit 5e3e58a

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1376,6 +1376,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
13761376
_ = rabbit_amqqueue:update(QName, Fun),
13771377
case ra:force_delete_server(?RA_SYSTEM, ServerId) of
13781378
ok ->
1379+
rabbit_log:info("Deleted a replica of quorum ~ts on node ~ts", [rabbit_misc:rs(QName), Node]),
13791380
ok;
13801381
{error, {badrpc, nodedown}} ->
13811382
ok;
@@ -1957,13 +1958,15 @@ force_shrink_member_to_current_member(VHost, Name) ->
19571958
case rabbit_amqqueue:lookup(QName) of
19581959
{ok, Q} when ?is_amqqueue(Q) ->
19591960
{RaName, _} = amqqueue:get_pid(Q),
1961+
OtherNodes = lists:delete(Node, get_nodes(Q)),
19601962
ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}),
19611963
Fun = fun (Q0) ->
19621964
TS0 = amqqueue:get_type_state(Q0),
19631965
TS = TS0#{nodes => [Node]},
19641966
amqqueue:set_type_state(Q, TS)
19651967
end,
19661968
_ = rabbit_amqqueue:update(QName, Fun),
1969+
_ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes],
19671970
rabbit_log:warning("Disaster recovery procedure: shrinking finished");
19681971
_ ->
19691972
rabbit_log:warning("Disaster recovery procedure: shrinking failed, queue ~p not found at vhost ~p", [Name, VHost]),
@@ -1976,14 +1979,16 @@ force_all_queues_shrink_member_to_current_member() ->
19761979
_ = [begin
19771980
QName = amqqueue:get_name(Q),
19781981
{RaName, _} = amqqueue:get_pid(Q),
1982+
OtherNodes = lists:delete(Node, get_nodes(Q)),
19791983
rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]),
19801984
ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}),
19811985
Fun = fun (QQ) ->
19821986
TS0 = amqqueue:get_type_state(QQ),
19831987
TS = TS0#{nodes => [Node]},
19841988
amqqueue:set_type_state(QQ, TS)
19851989
end,
1986-
_ = rabbit_amqqueue:update(QName, Fun)
1990+
_ = rabbit_amqqueue:update(QName, Fun),
1991+
_ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes]
19871992
end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE],
19881993
rabbit_log:warning("Disaster recovery procedure: shrinking finished"),
19891994
ok.

0 commit comments

Comments
 (0)