@@ -92,7 +92,9 @@ groups() ->
92
92
format ,
93
93
add_member_2 ,
94
94
single_active_consumer_priority_take_over ,
95
- single_active_consumer_priority
95
+ single_active_consumer_priority ,
96
+ force_shrink_member_to_current_member ,
97
+ force_all_queues_shrink_member_to_current_member
96
98
]
97
99
++ all_tests ()},
98
100
{cluster_size_5 , [], [start_queue ,
@@ -1151,6 +1153,85 @@ single_active_consumer_priority(Config) ->
1151
1153
rpc :call (Server0 , ra , local_query , [RaNameQ3 , QueryFun ])),
1152
1154
ok .
1153
1155
1156
+ force_shrink_member_to_current_member (Config ) ->
1157
+ [Server0 , Server1 , Server2 ] =
1158
+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1159
+
1160
+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1161
+ QQ = ? config (queue_name , Config ),
1162
+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1163
+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1164
+
1165
+ RaName = ra_name (QQ ),
1166
+ rabbit_ct_client_helpers :publish (Ch , QQ , 3 ),
1167
+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1168
+
1169
+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1170
+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1171
+ ? assertEqual (3 , length (Nodes0 )),
1172
+
1173
+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1174
+ force_shrink_member_to_current_member , [<<" /" >>, QQ ]),
1175
+
1176
+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1177
+
1178
+ {ok , Q1 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1179
+ #{nodes := Nodes1 } = amqqueue :get_type_state (Q1 ),
1180
+ ? assertEqual (1 , length (Nodes1 )),
1181
+
1182
+ % % grow queues back to all nodes
1183
+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1184
+
1185
+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1186
+ {ok , Q2 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1187
+ #{nodes := Nodes2 } = amqqueue :get_type_state (Q2 ),
1188
+ ? assertEqual (3 , length (Nodes2 )).
1189
+
1190
+ force_all_queues_shrink_member_to_current_member (Config ) ->
1191
+ [Server0 , Server1 , Server2 ] =
1192
+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1193
+
1194
+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1195
+ QQ = ? config (queue_name , Config ),
1196
+ AQ = ? config (alt_queue_name , Config ),
1197
+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1198
+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1199
+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1200
+ declare (Ch , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1201
+
1202
+ QQs = [QQ , AQ ],
1203
+
1204
+ [begin
1205
+ RaName = ra_name (Q ),
1206
+ rabbit_ct_client_helpers :publish (Ch , Q , 3 ),
1207
+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1208
+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1209
+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1210
+ ? assertEqual (3 , length (Nodes0 ))
1211
+ end || Q <- QQs ],
1212
+
1213
+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1214
+ force_all_queues_shrink_member_to_current_member , []),
1215
+
1216
+ [begin
1217
+ RaName = ra_name (Q ),
1218
+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1219
+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1220
+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1221
+ ? assertEqual (1 , length (Nodes0 ))
1222
+ end || Q <- QQs ],
1223
+
1224
+ % % grow queues back to all nodes
1225
+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1226
+
1227
+ [begin
1228
+ RaName = ra_name (Q ),
1229
+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1230
+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1231
+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1232
+ ? assertEqual (3 , length (Nodes0 ))
1233
+ end || Q <- QQs ].
1234
+
1154
1235
priority_queue_fifo (Config ) ->
1155
1236
% % testing: if hi priority messages are published before lo priority
1156
1237
% % messages they are always consumed first (fifo)
0 commit comments