Skip to content

Commit 06294d5

Browse files
authored
Improve test speed (#308)
* Improve test speed - reduce the collect management time, it reduces the time to wait the HTTP calls - remove some wait since there is already the wait_until function - remove the load definition and use the AMQP client to create the super-stream configuration - Add details during the tests --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 6ddbf98 commit 06294d5

File tree

9 files changed

+156
-35
lines changed

9 files changed

+156
-35
lines changed

.ci/install.ps1

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ $env:ERLANG_HOME = $erlang_home
5252

5353
Write-Host "[INFO] Setting RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS..."
5454
$env:RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS = '-rabbitmq_stream advertised_host localhost'
55-
[Environment]::SetEnvironmentVariable('RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS', '-rabbitmq_stream advertised_host localhost', 'Machine')
55+
[Environment]::SetEnvironmentVariable('RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS', '-rabbitmq_stream advertised_host localhost -rabbit collect_statistics_interval 4', 'Machine')
5656

5757
Write-Host '[INFO] Downloading RabbitMQ...'
5858

.github/workflows/build-test.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ jobs:
4545
rabbitmq:
4646
image: rabbitmq:3.13.0-beta.6-management
4747
env:
48-
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: -rabbitmq_stream advertised_host localhost
48+
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: -rabbitmq_stream advertised_host localhost -rabbit collect_statistics_interval 4
4949
ports:
5050
- 5552:5552
5151
- 5672:5672

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ build:
77
dotnet build $(CURDIR)/Build.csproj
88

99
test: build
10-
dotnet test $(CURDIR)/Tests/Tests.csproj --no-build --logger "console;verbosity=detailed" /p:AltCover=true
10+
dotnet test -c Debug $(CURDIR)/Tests/Tests.csproj --no-build --logger:"console;verbosity=detailed" /p:AltCover=true
1111

1212
rabbitmq-server:
1313
docker run -it --rm --name rabbitmq-stream-docker \

RabbitMQ.Stream.Client/RoutingClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public async Task<IClient> CreateClient(ClientParameters clientParameters, ILogg
3636
/// </summary>
3737
public static class RoutingHelper<T> where T : IRouting, new()
3838
{
39-
private static async Task<IClient> LookupConnection(
39+
internal static async Task<IClient> LookupConnection(
4040
ClientParameters clientParameters,
4141
Broker broker,
4242
int maxAttempts,

Tests/ReliableTests.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ public async void SendMessageAfterKillConnectionShouldContinueToWork()
171171
await producer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}")));
172172
}
173173

174-
SystemUtils.Wait(TimeSpan.FromSeconds(6));
175-
Assert.Equal(1, SystemUtils.HttpKillConnections(clientProvidedName).Result);
174+
SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientProvidedName).Result == 1);
175+
176176
await SystemUtils.HttpKillConnections(clientProvidedNameLocator);
177177

178178
for (var i = 0; i < 5; i++)
@@ -339,7 +339,6 @@ public async void FirstConsumeAfterKillConnectionShouldContinueToWork()
339339
await Task.CompletedTask;
340340
}
341341
});
342-
SystemUtils.Wait(TimeSpan.FromSeconds(1));
343342
// in this case we kill the connection before consume consume any message
344343
// so it should use the selected OffsetSpec in this case = new OffsetTypeFirst(),
345344

@@ -387,13 +386,11 @@ await SystemUtils.PublishMessages(system, stream, NumberOfMessages,
387386
await Task.CompletedTask;
388387
}
389388
});
390-
SystemUtils.Wait(TimeSpan.FromSeconds(4));
391389
// kill the first time
392390
SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientProviderName).Result == 1);
393391
await SystemUtils.PublishMessages(system, stream, NumberOfMessages,
394392
Guid.NewGuid().ToString(),
395393
_testOutputHelper);
396-
SystemUtils.Wait(TimeSpan.FromSeconds(4));
397394
SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientProviderName).Result == 1);
398395
new Utils<bool>(_testOutputHelper).WaitUntilTaskCompletes(testPassed);
399396
// after kill the consumer must be open
@@ -417,7 +414,7 @@ public async void ConsumerHandleDeleteStreamWithMetaDataUpdate()
417414
// When the stream is deleted the consumer has to close the
418415
// connection an become inactive.
419416
await system.DeleteStream(stream);
420-
SystemUtils.Wait(TimeSpan.FromSeconds(5));
417+
SystemUtils.Wait(TimeSpan.FromSeconds(3));
421418
Assert.False(consumer.IsOpen());
422419
await system.Close();
423420
}

Tests/Resources/definition_test.json

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,96 @@
1-
{"rabbit_version":"3.12.0-alpha-stream.304","rabbitmq_version":"3.12.0-alpha-stream.304","product_name":"RabbitMQ","product_version":"3.12.0-alpha-stream.304","users":[{"name":"test","password_hash":"DGqKjRKV+0PuybgkIoOwVXBTHShsgd6ysuSbINGv29WfBi/s","hashing_algorithm":"rabbit_password_hashing_sha256","tags":["impersonator"],"limits":{}},{"name":"guest","password_hash":"JETOd3pk4cJRpcPtzyzgy8zODxzOxmRYsZzXQyZPpntD5Uti","hashing_algorithm":"rabbit_password_hashing_sha256","tags":["administrator"],"limits":{}}],"vhosts":[{"name":"/"},{"name":"vhost2"}],"permissions":[{"user":"test","vhost":"/","configure":"test_stream","write":"test_stream","read":"test_stream"},{"user":"guest","vhost":"vhost2","configure":".*","write":".*","read":".*"},{"user":"guest","vhost":"/","configure":".*","write":".*","read":".*"}],"topic_permissions":[],"parameters":[],"global_parameters":[{"name":"internal_cluster_id","value":"rabbitmq-cluster-id-_6hTHBLoOCqgQiJla6WCXw"}],"policies":[],"queues":[{"name":"test_stream","vhost":"/","durable":true,"auto_delete":false,"arguments":{"x-queue-type":"stream"}},{"name":"invoices-0","vhost":"/","durable":true,"auto_delete":false,"arguments":{"x-queue-type":"stream"}},{"name":"invoices-1","vhost":"/","durable":true,"auto_delete":false,"arguments":{"x-queue-type":"stream"}},{"name":"invoices-2","vhost":"/","durable":true,"auto_delete":false,"arguments":{"x-queue-type":"stream"}},{"name":"no_access_stream","vhost":"/","durable":true,"auto_delete":false,"arguments":{"x-queue-type":"stream"}}],"exchanges":[{"name":"invoices","vhost":"/","type":"direct","durable":true,"auto_delete":false,"internal":false,"arguments":{"x-super-stream":true}}],"bindings":[{"source":"invoices","vhost":"/","destination":"invoices-0","destination_type":"queue","routing_key":"0","arguments":{"x-stream-partition-order":0}},{"source":"invoices","vhost":"/","destination":"invoices-1","destination_type":"queue","routing_key":"1","arguments":{"x-stream-partition-order":1}},{"source":"invoices","vhost":"/","destination":"invoices-2","destination_type":"queue","routing_key":"2","arguments":{"x-stream-partition-order":2}}]}
1+
{
2+
"rabbit_version":"3.12.0-alpha-stream.304",
3+
"rabbitmq_version":"3.12.0-alpha-stream.304",
4+
"product_name":"RabbitMQ",
5+
"product_version":"3.12.0-alpha-stream.304",
6+
"users":[
7+
{
8+
"name":"test",
9+
"password_hash":"DGqKjRKV+0PuybgkIoOwVXBTHShsgd6ysuSbINGv29WfBi/s",
10+
"hashing_algorithm":"rabbit_password_hashing_sha256",
11+
"tags":[
12+
"impersonator"
13+
],
14+
"limits":{
15+
16+
}
17+
},
18+
{
19+
"name":"guest",
20+
"password_hash":"JETOd3pk4cJRpcPtzyzgy8zODxzOxmRYsZzXQyZPpntD5Uti",
21+
"hashing_algorithm":"rabbit_password_hashing_sha256",
22+
"tags":[
23+
"administrator"
24+
],
25+
"limits":{
26+
27+
}
28+
}
29+
],
30+
"vhosts":[
31+
{
32+
"name":"/"
33+
},
34+
{
35+
"name":"vhost2"
36+
}
37+
],
38+
"permissions":[
39+
{
40+
"user":"test",
41+
"vhost":"/",
42+
"configure":"test_stream",
43+
"write":"test_stream",
44+
"read":"test_stream"
45+
},
46+
{
47+
"user":"guest",
48+
"vhost":"vhost2",
49+
"configure":".*",
50+
"write":".*",
51+
"read":".*"
52+
},
53+
{
54+
"user":"guest",
55+
"vhost":"/",
56+
"configure":".*",
57+
"write":".*",
58+
"read":".*"
59+
}
60+
],
61+
"topic_permissions":[
62+
63+
],
64+
"parameters":[
65+
66+
],
67+
"global_parameters":[
68+
{
69+
"name":"internal_cluster_id",
70+
"value":"rabbitmq-cluster-id-_6hTHBLoOCqgQiJla6WCXw"
71+
}
72+
],
73+
"policies":[
74+
75+
],
76+
"queues":[
77+
{
78+
"name":"test_stream",
79+
"vhost":"/",
80+
"durable":true,
81+
"auto_delete":false,
82+
"arguments":{
83+
"x-queue-type":"stream"
84+
}
85+
},
86+
{
87+
"name":"no_access_stream",
88+
"vhost":"/",
89+
"durable":true,
90+
"auto_delete":false,
91+
"arguments":{
92+
"x-queue-type":"stream"
93+
}
94+
}
95+
]
96+
}

Tests/UnitTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,8 @@ public async Task RoutingHelperShouldThrowIfLoadBalancerIsMisconfigured()
180180
new List<Broker>() { new Broker("replica", 5552) });
181181

182182
await Assert.ThrowsAsync<RoutingClientException>(
183-
() => RoutingHelper<MisconfiguredLoadBalancerRouting>.LookupLeaderConnection(clientParameters,
184-
metaDataInfo));
183+
() => RoutingHelper<MisconfiguredLoadBalancerRouting>.LookupConnection(clientParameters,
184+
metaDataInfo.Leader, 3));
185185
}
186186

187187
[Fact]

Tests/Utils.cs

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
using System.Text.Json;
1515
using System.Threading;
1616
using System.Threading.Tasks;
17+
using RabbitMQ.Client;
1718
using RabbitMQ.Stream.Client;
1819
using RabbitMQ.Stream.Client.AMQP;
1920
using Xunit;
@@ -128,7 +129,8 @@ public static async Task PublishMessages(StreamSystem system, string stream, int
128129
public static async Task PublishMessages(StreamSystem system, string stream, int numberOfMessages,
129130
string producerName, ITestOutputHelper testOutputHelper)
130131
{
131-
testOutputHelper.WriteLine("Publishing messages...");
132+
testOutputHelper.WriteLine(
133+
$"Publishing messages to the stream {stream} number of messages {numberOfMessages}");
132134

133135
var testPassed = new TaskCompletionSource<int>();
134136
var count = 0;
@@ -155,11 +157,17 @@ public static async Task PublishMessages(StreamSystem system, string stream, int
155157
await producer.Send(Convert.ToUInt64(i), message);
156158
}
157159

160+
testOutputHelper.WriteLine($"Messages sent to the stream {stream} number of messages {numberOfMessages}");
161+
158162
testPassed.Task.Wait(TimeSpan.FromSeconds(10));
159163
Assert.Equal(numberOfMessages, testPassed.Task.Result);
160164
WaitUntil(() => producer.ConfirmFrames >= 1);
161165
WaitUntil(() => producer.IncomingFrames >= 1);
162166
WaitUntil(() => producer.PublishCommandsSent >= 1);
167+
168+
testOutputHelper.WriteLine(
169+
$"Messages sent to the stream {stream} number of messages {numberOfMessages} " +
170+
$"confirmed {producer.ConfirmFrames} incoming {producer.IncomingFrames} publish commands sent {producer.PublishCommandsSent}");
163171
producer.Dispose();
164172
}
165173

@@ -180,7 +188,8 @@ public static async Task<ConcurrentDictionary<string, IOffsetType>> OffsetsForSu
180188
public static async Task PublishMessagesSuperStream(StreamSystem system, string stream, int numberOfMessages,
181189
string producerName, ITestOutputHelper testOutputHelper)
182190
{
183-
testOutputHelper.WriteLine("Publishing super stream messages...");
191+
testOutputHelper.WriteLine($"Publishing super stream messages...to the stream {stream} " +
192+
$"number of messages {numberOfMessages}");
184193

185194
var testPassed = new TaskCompletionSource<int>();
186195
var count = 0;
@@ -210,11 +219,16 @@ public static async Task PublishMessagesSuperStream(StreamSystem system, string
210219
await producer.Send(Convert.ToUInt64(i), message);
211220
}
212221

222+
testOutputHelper.WriteLine($"Messages sent to the stream {stream} number of messages {numberOfMessages}");
213223
testPassed.Task.Wait(TimeSpan.FromSeconds(10));
214224
Assert.Equal(numberOfMessages, testPassed.Task.Result);
215225
Assert.True(producer.ConfirmFrames >= 1);
216226
Assert.True(producer.IncomingFrames >= 1);
217227
Assert.True(producer.PublishCommandsSent >= 1);
228+
229+
testOutputHelper.WriteLine(
230+
$"Messages sent to the stream {stream} number of messages {numberOfMessages} " +
231+
$"confirmed {producer.ConfirmFrames} incoming {producer.IncomingFrames} publish commands sent {producer.PublishCommandsSent}");
218232
producer.Dispose();
219233
}
220234

@@ -373,18 +387,6 @@ public static void HttpDeleteQueue(string queue)
373387
}
374388
}
375389

376-
private static void HttpDeleteExchange(string exchange)
377-
{
378-
var task = CreateHttpClient().DeleteAsync($"http://localhost:15672/api/exchanges/%2F/{exchange}");
379-
task.Wait();
380-
var result = task.Result;
381-
if (!result.IsSuccessStatusCode && result.StatusCode != HttpStatusCode.NotFound)
382-
{
383-
throw new XunitException(string.Format("HTTP DELETE failed: {0} {1}", result.StatusCode,
384-
result.ReasonPhrase));
385-
}
386-
}
387-
388390
public static byte[] GetFileContent(string fileName)
389391
{
390392
var codeBaseUrl = new Uri(Assembly.GetExecutingAssembly().Location);
@@ -403,14 +405,40 @@ public static byte[] GetFileContent(string fileName)
403405

404406
public static void ResetSuperStreams()
405407
{
406-
HttpDeleteExchange("invoices");
407-
HttpDeleteQueue("invoices-0");
408-
HttpDeleteQueue("invoices-1");
409-
HttpDeleteQueue("invoices-2");
408+
var factory = new ConnectionFactory();
409+
using var connection = factory.CreateConnection();
410+
var channel = connection.CreateModel();
411+
412+
channel.ExchangeDelete(InvoicesExchange);
413+
414+
channel.QueueDelete(InvoicesStream0);
415+
channel.QueueDelete(InvoicesStream1);
416+
channel.QueueDelete(InvoicesStream2);
417+
Wait();
418+
419+
channel.ExchangeDeclare(InvoicesExchange, "direct", true, false,
420+
new Dictionary<string, object>() { { "x-super-stream-enabled", "true" } });
421+
422+
channel.QueueDeclare(InvoicesStream0, true, false, false,
423+
new Dictionary<string, object>() { { "x-queue-type", "stream" }, });
424+
425+
channel.QueueDeclare(InvoicesStream1, true, false, false,
426+
new Dictionary<string, object>() { { "x-queue-type", "stream" }, });
427+
428+
channel.QueueDeclare(InvoicesStream2, true, false, false,
429+
new Dictionary<string, object>() { { "x-queue-type", "stream" }, });
410430
Wait();
411-
HttpPost(
412-
Encoding.Default.GetString(
413-
GetFileContent("definition_test.json")), "definitions");
431+
432+
channel.QueueBind(InvoicesStream0, InvoicesExchange, "0",
433+
new Dictionary<string, object>() { { "x-stream-partition-order", "0" } });
434+
435+
channel.QueueBind(InvoicesStream1, InvoicesExchange, "1",
436+
new Dictionary<string, object>() { { "x-stream-partition-order", "1" } });
437+
438+
channel.QueueBind(InvoicesStream2, InvoicesExchange, "2",
439+
new Dictionary<string, object>() { { "x-stream-partition-order", "2" } });
440+
441+
connection.Close();
414442
}
415443
}
416444
}

Tests/xunit.runner.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{
22
"parallelizeAssembly": false,
3-
"parallelizeTestCollections": false
3+
"parallelizeTestCollections": false,
4+
"stopOnFail": true
45
}

0 commit comments

Comments
 (0)