@@ -92,7 +92,9 @@ groups() ->
92
92
format ,
93
93
add_member_2 ,
94
94
single_active_consumer_priority_take_over ,
95
- single_active_consumer_priority
95
+ single_active_consumer_priority ,
96
+ force_shrink_member_to_current_member ,
97
+ force_all_queues_shrink_member_to_current_member
96
98
]
97
99
++ all_tests ()},
98
100
{cluster_size_5 , [], [start_queue ,
@@ -1152,6 +1154,85 @@ single_active_consumer_priority(Config) ->
1152
1154
rpc :call (Server0 , ra , local_query , [RaNameQ3 , QueryFun ])),
1153
1155
ok .
1154
1156
1157
+ force_shrink_member_to_current_member (Config ) ->
1158
+ [Server0 , Server1 , Server2 ] =
1159
+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1160
+
1161
+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1162
+ QQ = ? config (queue_name , Config ),
1163
+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1164
+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1165
+
1166
+ RaName = ra_name (QQ ),
1167
+ rabbit_ct_client_helpers :publish (Ch , QQ , 3 ),
1168
+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1169
+
1170
+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1171
+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1172
+ ? assertEqual (3 , length (Nodes0 )),
1173
+
1174
+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1175
+ force_shrink_member_to_current_member , [<<" /" >>, QQ ]),
1176
+
1177
+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1178
+
1179
+ {ok , Q1 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1180
+ #{nodes := Nodes1 } = amqqueue :get_type_state (Q1 ),
1181
+ ? assertEqual (1 , length (Nodes1 )),
1182
+
1183
+ % % grow queues back to all nodes
1184
+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1185
+
1186
+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1187
+ {ok , Q2 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1188
+ #{nodes := Nodes2 } = amqqueue :get_type_state (Q2 ),
1189
+ ? assertEqual (3 , length (Nodes2 )).
1190
+
1191
+ force_all_queues_shrink_member_to_current_member (Config ) ->
1192
+ [Server0 , Server1 , Server2 ] =
1193
+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1194
+
1195
+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1196
+ QQ = ? config (queue_name , Config ),
1197
+ AQ = ? config (alt_queue_name , Config ),
1198
+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1199
+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1200
+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1201
+ declare (Ch , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1202
+
1203
+ QQs = [QQ , AQ ],
1204
+
1205
+ [begin
1206
+ RaName = ra_name (Q ),
1207
+ rabbit_ct_client_helpers :publish (Ch , Q , 3 ),
1208
+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1209
+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1210
+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1211
+ ? assertEqual (3 , length (Nodes0 ))
1212
+ end || Q <- QQs ],
1213
+
1214
+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1215
+ force_all_queues_shrink_member_to_current_member , []),
1216
+
1217
+ [begin
1218
+ RaName = ra_name (Q ),
1219
+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1220
+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1221
+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1222
+ ? assertEqual (1 , length (Nodes0 ))
1223
+ end || Q <- QQs ],
1224
+
1225
+ % % grow queues back to all nodes
1226
+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1227
+
1228
+ [begin
1229
+ RaName = ra_name (Q ),
1230
+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1231
+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1232
+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1233
+ ? assertEqual (3 , length (Nodes0 ))
1234
+ end || Q <- QQs ].
1235
+
1155
1236
priority_queue_fifo (Config ) ->
1156
1237
% % testing: if hi priority messages are published before lo priority
1157
1238
% % messages they are always consumed first (fifo)
0 commit comments