@@ -276,14 +276,15 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
276
276
CL = coordinator_leader (Config ),
277
277
278
278
Ss = rabbit_data_coercion :to_binary (? FUNCTION_NAME ),
279
- [Partition , _ , _ ] = init_super_stream (Config , CL , Ss ),
279
+
280
+ Partition = init_super_stream (Config , CL , Ss , 1 , CL ),
280
281
[L , F1 , F2 ] = topology (Config , Partition ),
281
282
282
283
wait_for_coordinator_ready (Config ),
283
284
284
285
% % we expect the stream leader and the coordinator leader to be on the same node
285
286
% % another node will be isolated
286
- ? assertEqual (L # node .name , coordinator_leader ( Config ) ),
287
+ ? assertEqual (L # node .name , CL ),
287
288
288
289
{ok , So0 , C0_00 } = stream_test_utils :connect (L # node .stream_port ),
289
290
{ok , So1 , C1_00 } = stream_test_utils :connect (F1 # node .stream_port ),
@@ -362,15 +363,16 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
362
363
super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partition (Config ) ->
363
364
init_coordinator (Config ),
364
365
CL = coordinator_leader (Config ),
366
+ [CF1 , _ ] = all_nodes (Config ) -- [CL ],
365
367
Ss = rabbit_data_coercion :to_binary (? FUNCTION_NAME ),
366
- [ _ , Partition , _ ] = init_super_stream (Config , CL , Ss ),
368
+ Partition = init_super_stream (Config , CL , Ss , 2 , CF1 ),
367
369
[L , F1 , F2 ] = topology (Config , Partition ),
368
370
369
371
wait_for_coordinator_ready (Config ),
370
372
371
373
% % check stream leader and coordinator are not on the same node
372
374
% % the coordinator leader node will be isolated
373
- ? assertNotEqual (L # node .name , coordinator_leader ( Config ) ),
375
+ ? assertNotEqual (L # node .name , CL ),
374
376
375
377
{ok , So0 , C0_00 } = stream_test_utils :connect (L # node .stream_port ),
376
378
{ok , So1 , C1_00 } = stream_test_utils :connect (F1 # node .stream_port ),
@@ -533,7 +535,7 @@ delete_stream(Port, St) ->
533
535
{ok , C1 } = stream_test_utils :delete_stream (S , C0 , St ),
534
536
{ok , _ } = stream_test_utils :close (S , C1 ).
535
537
536
- init_super_stream (Config , Node , Ss ) ->
538
+ init_super_stream (Config , Node , Ss , PartitionIndex , ExpectedNode ) ->
537
539
{ok , S , C0 } = stream_test_utils :connect (Config , Node ),
538
540
NC = node_count (Config ),
539
541
Partitions = [unicode :characters_to_binary ([Ss , <<" -" >>, integer_to_binary (N )])
@@ -544,10 +546,46 @@ init_super_stream(Config, Node, Ss) ->
544
546
{Cmd1 , C1 } = receive_commands (S , C0 ),
545
547
? assertMatch ({response , ? CORR_ID , {create_super_stream , ? RESPONSE_CODE_OK }},
546
548
Cmd1 ),
547
- Partition = lists :nth (2 , Partitions ),
548
- wait_for_members (S , C1 , Partition , NC ),
549
+ [wait_for_members (S , C1 , P , NC ) || P <- Partitions ],
550
+ Partition = lists :nth (PartitionIndex , Partitions ),
551
+ [# node {name = LN } | _ ] = topology (Config , Partition ),
552
+ P = case LN of
553
+ ExpectedNode ->
554
+ Partition ;
555
+ _ ->
556
+ enforce_stream_leader_on_node (Config , S , C1 ,
557
+ Partitions , Partition ,
558
+ ExpectedNode , 10 )
559
+ end ,
549
560
{ok , _ } = stream_test_utils :close (S , C1 ),
550
- Partitions .
561
+ P .
562
+
563
+
564
+ enforce_stream_leader_on_node (_ , _ , _ , _ , _ , _ , 0 ) ->
565
+ ct :fail (" could not create super stream partition on chosen node" );
566
+ enforce_stream_leader_on_node (Config , S , C ,
567
+ Partitions , Partition , Node , Count ) ->
568
+ CL = coordinator_leader (Config ),
569
+ NC = node_count (Config ),
570
+ [begin
571
+ case P of
572
+ Partition ->
573
+ restart_stream (Config , CL , P , Node );
574
+ _ ->
575
+ restart_stream (Config , CL , P , undefined )
576
+ end ,
577
+ wait_for_members (S , C , P , NC )
578
+ end || P <- Partitions ],
579
+ [# node {name = LN } | _ ] = topology (Config , Partition ),
580
+ case LN of
581
+ Node ->
582
+ Partition ;
583
+ _ ->
584
+ timer :sleep (500 ),
585
+ enforce_stream_leader_on_node (Config , S , C ,
586
+ Partitions , Partition , Node ,
587
+ Count - 1 )
588
+ end .
551
589
552
590
delete_super_stream (Port , Ss ) ->
553
591
{ok , S , C0 } = stream_test_utils :connect (Port ),
@@ -631,6 +669,13 @@ coordinator_leader(Config) ->
631
669
undefined
632
670
end .
633
671
672
+ restart_stream (Config , Node , S , undefined ) ->
673
+ rpc (Config , Node , rabbit_stream_queue , restart_stream , [<<" /" >>, S , #{}]);
674
+ restart_stream (Config , Node , S , Leader ) ->
675
+ Opts = #{preferred_leader_node => Leader },
676
+ rpc (Config , Node , rabbit_stream_queue , restart_stream , [<<" /" >>, S , Opts ]).
677
+
678
+
634
679
rpc (Config , M , F , A ) ->
635
680
rpc (Config , 0 , M , F , A ).
636
681
0 commit comments