Skip to content

Commit eb7a6c7

Browse files
authored
Merge pull request #13938 from rabbitmq/mergify/bp/v4.1.x/pr-13548
For 4.1.x, by @aaron-seo: introduce a command that would force QQs to take a checkpoint and truncate its segments (backport #13548)
2 parents 413ad94 + 4574b38 commit eb7a6c7

File tree

5 files changed

+301
-0
lines changed

5 files changed

+301
-0
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
% exclusive_owner
3030
get_exclusive_owner/1,
3131
get_leader/1,
32+
get_nodes/1,
3233
% name (#resource)
3334
get_name/1,
3435
set_name/2,
@@ -391,6 +392,24 @@ get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) ->
391392

392393
get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader.
393394

395+
-spec get_leader_node(amqqueue_v2()) -> node() | none.
396+
397+
%% Introduced in rabbitmq/rabbitmq-server#13905 for 4.2.0,
398+
%% used in v4.1.x as of rabbitmq/rabbitmq-server#13548. MK.
399+
get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader;
400+
get_leader_node(#amqqueue{pid = none}) -> none;
401+
get_leader_node(#amqqueue{pid = Pid}) -> node(Pid).
402+
403+
-spec get_nodes(amqqueue_v2()) -> [node(),...].
404+
405+
get_nodes(Q) ->
406+
case amqqueue:get_type_state(Q) of
407+
#{nodes := Nodes} ->
408+
Nodes;
409+
_ ->
410+
[get_leader_node(Q)]
411+
end.
412+
394413
% operator_policy
395414

396415
-spec get_operator_policy(amqqueue()) -> binary() | none | undefined.

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@
7777
force_vhost_queues_shrink_member_to_current_member/1,
7878
force_all_queues_shrink_member_to_current_member/0]).
7979

80+
-export([force_checkpoint/2, force_checkpoint_on_queue/1]).
81+
8082
%% for backwards compatibility
8183
-export([file_handle_leader_reservation/1,
8284
file_handle_other_reservation/0,
@@ -141,6 +143,7 @@
141143
-define(RPC_TIMEOUT, 1000).
142144
-define(START_CLUSTER_TIMEOUT, 5000).
143145
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT
146+
-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000).
144147
-define(TICK_INTERVAL, 5000). %% the ra server tick time
145148
-define(DELETE_TIMEOUT, 5000).
146149
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
@@ -2105,6 +2108,40 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
21052108
rabbit_log:warning("Shrinking finished"),
21062109
ok.
21072110

2111+
force_checkpoint_on_queue(QName) ->
2112+
QNameFmt = rabbit_misc:rs(QName),
2113+
case rabbit_db_queue:get_durable(QName) of
2114+
{ok, Q} when ?amqqueue_is_classic(Q) ->
2115+
{error, classic_queue_not_supported};
2116+
{ok, Q} when ?amqqueue_is_quorum(Q) ->
2117+
{RaName, _} = amqqueue:get_pid(Q),
2118+
rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
2119+
Nodes = amqqueue:get_nodes(Q),
2120+
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
2121+
|| Node <- Nodes],
2122+
ok;
2123+
{ok, _Q} ->
2124+
{error, not_quorum_queue};
2125+
{error, _} = E ->
2126+
E
2127+
end.
2128+
2129+
force_checkpoint(VhostSpec, QueueSpec) ->
2130+
[begin
2131+
QName = amqqueue:get_name(Q),
2132+
case force_checkpoint_on_queue(QName) of
2133+
ok ->
2134+
{QName, {ok}};
2135+
{error, Err} ->
2136+
rabbit_log:warning("~ts: failed to force checkpoint, error: ~w",
2137+
[rabbit_misc:rs(QName), Err]),
2138+
{QName, {error, Err}}
2139+
end
2140+
end
2141+
|| Q <- rabbit_db_queue:get_all_durable_by_type(?MODULE),
2142+
is_match(amqqueue:get_vhost(Q), VhostSpec)
2143+
andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)].
2144+
21082145
is_minority(All, Up) ->
21092146
MinQuorum = length(All) div 2 + 1,
21102147
length(Up) < MinQuorum.

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
-include_lib("eunit/include/eunit.hrl").
1111
-include_lib("amqp_client/include/amqp_client.hrl").
1212
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
13+
-include_lib("rabbit/src/rabbit_fifo.hrl").
1314

1415
-import(queue_utils, [wait_for_messages_ready/3,
1516
wait_for_messages_pending_ack/3,
@@ -98,6 +99,8 @@ groups() ->
9899
force_shrink_member_to_current_member,
99100
force_all_queues_shrink_member_to_current_member,
100101
force_vhost_queues_shrink_member_to_current_member,
102+
force_checkpoint_on_queue,
103+
force_checkpoint,
101104
policy_repair,
102105
gh_12635,
103106
replica_states
@@ -1333,6 +1336,96 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
13331336
?assertEqual(3, length(Nodes0))
13341337
end || Q <- QQs, VHost <- VHosts].
13351338

1339+
force_checkpoint_on_queue(Config) ->
1340+
[Server0, Server1, Server2] =
1341+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1342+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
1343+
QQ = ?config(queue_name, Config),
1344+
RaName = ra_name(QQ),
1345+
QName = rabbit_misc:r(<<"/">>, queue, QQ),
1346+
1347+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1348+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1349+
1350+
N = 20_000,
1351+
rabbit_ct_client_helpers:publish(Ch, QQ, N),
1352+
wait_for_messages_ready([Server0], RaName, N),
1353+
1354+
%% The state before any checkpoints
1355+
rabbit_ct_helpers:await_condition(
1356+
fun() ->
1357+
{ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
1358+
#{log := #{latest_checkpoint_index := LCI}} = State,
1359+
LCI =:= undefined
1360+
end),
1361+
rabbit_ct_helpers:await_condition(
1362+
fun() ->
1363+
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
1364+
#{log := #{latest_checkpoint_index := LCI}} = State,
1365+
LCI =:= undefined
1366+
end),
1367+
rabbit_ct_helpers:await_condition(
1368+
fun() ->
1369+
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
1370+
#{log := #{latest_checkpoint_index := LCI}} = State,
1371+
LCI =:= undefined
1372+
end),
1373+
1374+
{ok, State0, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
1375+
ct:pal("Ra server state before forcing a checkpoint: ~tp~n", [State0]),
1376+
1377+
%% wait for longer than ?CHECK_MIN_INTERVAL_MS ms
1378+
timer:sleep(?CHECK_MIN_INTERVAL_MS + 1000),
1379+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1380+
force_checkpoint_on_queue, [QName]),
1381+
1382+
%% Wait for initial checkpoint and make sure it's not 0
1383+
rabbit_ct_helpers:await_condition(
1384+
fun() ->
1385+
{ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
1386+
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
1387+
#{log := #{latest_checkpoint_index := LCI}} = State,
1388+
(LCI =/= undefined) andalso (LCI >= N)
1389+
end),
1390+
rabbit_ct_helpers:await_condition(
1391+
fun() ->
1392+
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
1393+
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
1394+
#{log := #{latest_checkpoint_index := LCI}} = State,
1395+
(LCI =/= undefined) andalso (LCI >= N)
1396+
end),
1397+
rabbit_ct_helpers:await_condition(
1398+
fun() ->
1399+
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
1400+
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
1401+
#{log := #{latest_checkpoint_index := LCI}} = State,
1402+
(LCI =/= undefined) andalso (LCI >= N)
1403+
end).
1404+
1405+
force_checkpoint(Config) ->
1406+
[Server0, _Server1, _Server2] =
1407+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1408+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
1409+
QQ = ?config(queue_name, Config),
1410+
QQName = rabbit_misc:r(<<"/">>, queue, QQ),
1411+
CQ = <<"force_checkpoint_cq">>,
1412+
RaName = ra_name(QQ),
1413+
1414+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1415+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1416+
1417+
?assertEqual({'queue.declare_ok', CQ, 0, 0},
1418+
declare(Ch, CQ, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
1419+
1420+
rabbit_ct_client_helpers:publish(Ch, QQ, 3),
1421+
wait_for_messages_ready([Server0], RaName, 3),
1422+
1423+
ForceCheckpointRes = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1424+
force_checkpoint, [<<".*">>, <<".*">>]),
1425+
ExpectedRes = [{QQName, {ok}}],
1426+
1427+
% Result should only have quorum queue
1428+
?assertEqual(ExpectedRes, ForceCheckpointRes).
13361429

13371430
% Tests that, if the process of a QQ is dead in the moment of declaring a policy
13381431
% that affects such queue, when the process is made available again, the policy
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do
8+
alias RabbitMQ.CLI.Core.{DocGuide}
9+
10+
@behaviour RabbitMQ.CLI.CommandBehaviour
11+
12+
defp default_opts,
13+
do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}
14+
15+
def switches(),
16+
do: [
17+
vhost_pattern: :string,
18+
queue_pattern: :string,
19+
errors_only: :boolean
20+
]
21+
22+
def merge_defaults(args, opts) do
23+
{args, Map.merge(default_opts(), opts)}
24+
end
25+
26+
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
27+
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
28+
29+
def run([], %{
30+
node: node_name,
31+
vhost_pattern: vhost_pat,
32+
queue_pattern: queue_pat,
33+
errors_only: errors_only
34+
}) do
35+
args = [vhost_pat, queue_pat]
36+
37+
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do
38+
{:badrpc, _} = error ->
39+
error
40+
41+
results when errors_only ->
42+
for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results,
43+
do: [
44+
{:vhost, vhost},
45+
{:name, name},
46+
{:result, res}
47+
]
48+
49+
results ->
50+
for {{:resource, vhost, _kind, name}, res} <- results,
51+
do: [
52+
{:vhost, vhost},
53+
{:name, name},
54+
{:result, res}
55+
]
56+
end
57+
end
58+
59+
use RabbitMQ.CLI.DefaultOutput
60+
61+
def formatter(), do: RabbitMQ.CLI.Formatters.Table
62+
63+
def usage,
64+
do: "force_checkpoint [--vhost-pattern <pattern>] [--queue-pattern <pattern>]"
65+
66+
def usage_additional do
67+
[
68+
["--queue-pattern <pattern>", "regular expression to match queue names"],
69+
["--vhost-pattern <pattern>", "regular expression to match virtual host names"],
70+
["--errors-only", "only list queues which reported an error"]
71+
]
72+
end
73+
74+
def usage_doc_guides() do
75+
[
76+
DocGuide.quorum_queues()
77+
]
78+
end
79+
80+
def help_section, do: :replication
81+
82+
def description,
83+
do: "Forces checkpoints for all matching quorum queues"
84+
85+
def banner([], _) do
86+
"Forcing checkpoint for all matching quorum queues..."
87+
end
88+
end
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommandTest do
8+
use ExUnit.Case, async: false
9+
import TestHelper
10+
11+
@command RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand
12+
13+
setup_all do
14+
RabbitMQ.CLI.Core.Distribution.start()
15+
16+
:ok
17+
end
18+
19+
setup context do
20+
{:ok,
21+
opts: %{
22+
node: get_rabbit_hostname(),
23+
timeout: context[:test_timeout] || 30000,
24+
vhost_pattern: ".*",
25+
queue_pattern: ".*",
26+
errors_only: false
27+
}}
28+
end
29+
30+
test "merge_defaults: defaults to reporting complete results" do
31+
assert @command.merge_defaults([], %{}) ==
32+
{[],
33+
%{
34+
vhost_pattern: ".*",
35+
queue_pattern: ".*",
36+
errors_only: false
37+
}}
38+
end
39+
40+
test "validate: accepts no positional arguments" do
41+
assert @command.validate([], %{}) == :ok
42+
end
43+
44+
test "validate: any positional arguments fail validation" do
45+
assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args}
46+
47+
assert @command.validate(["quorum-queue-a", "two"], %{}) ==
48+
{:validation_failure, :too_many_args}
49+
50+
assert @command.validate(["quorum-queue-a", "two", "three"], %{}) ==
51+
{:validation_failure, :too_many_args}
52+
end
53+
54+
@tag test_timeout: 3000
55+
test "run: targeting an unreachable node throws a badrpc", context do
56+
assert match?(
57+
{:badrpc, _},
58+
@command.run(
59+
[],
60+
Map.merge(context[:opts], %{node: :jake@thedog})
61+
)
62+
)
63+
end
64+
end

0 commit comments

Comments
 (0)