Skip to content

Commit bd1dfc2

Browse files
authored
Merge pull request #1549 from rabbitmq/lukebakken/connectionrecovery-test-speedup
Separate out connection recovery tests
2 parents bf86447 + 10e58e4 commit bd1dfc2

30 files changed

+1404
-799
lines changed

.ci/ubuntu/gha-setup.sh

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ else
3131
readonly run_toxiproxy='false'
3232
fi
3333

34+
if [[ $2 == 'pull' ]]
35+
then
36+
readonly docker_pull_args='--pull always'
37+
else
38+
readonly docker_pull_args=''
39+
fi
40+
3441
set -o nounset
3542

3643
declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq"
@@ -43,7 +50,8 @@ function start_toxiproxy
4350
# sudo ss -4nlp
4451
echo "[INFO] starting Toxiproxy server docker container"
4552
docker rm --force "$toxiproxy_docker_name" 2>/dev/null || echo "[INFO] $toxiproxy_docker_name was not running"
46-
docker run --detach \
53+
# shellcheck disable=SC2086
54+
docker run --detach $docker_pull_args \
4755
--name "$toxiproxy_docker_name" \
4856
--hostname "$toxiproxy_docker_name" \
4957
--publish 8474:8474 \
@@ -58,7 +66,8 @@ function start_rabbitmq
5866
echo "[INFO] starting RabbitMQ server docker container"
5967
chmod 0777 "$GITHUB_WORKSPACE/.ci/ubuntu/log"
6068
docker rm --force "$rabbitmq_docker_name" 2>/dev/null || echo "[INFO] $rabbitmq_docker_name was not running"
61-
docker run --detach \
69+
# shellcheck disable=SC2086
70+
docker run --detach $docker_pull_args \
6271
--name "$rabbitmq_docker_name" \
6372
--hostname "$rabbitmq_docker_name" \
6473
--publish 5671:5671 \
@@ -101,7 +110,8 @@ function wait_rabbitmq
101110

102111
function get_rabbitmq_id
103112
{
104-
local rabbitmq_docker_id="$(docker inspect --format='{{.Id}}' "$rabbitmq_docker_name")"
113+
local rabbitmq_docker_id
114+
rabbitmq_docker_id="$(docker inspect --format='{{.Id}}' "$rabbitmq_docker_name")"
105115
echo "[INFO] '$rabbitmq_docker_name' docker id is '$rabbitmq_docker_id'"
106116
if [[ -v GITHUB_OUTPUT ]]
107117
then

build.ps1

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ if ($RunTests)
2525
dotnet test $csproj_file --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --no-restore --no-build --logger "console;verbosity=detailed"
2626
if ($LASTEXITCODE -ne 0)
2727
{
28-
Write-Host "[ERROR] tests errored, exiting" -Foreground "Red"
29-
Exit 1
28+
Write-Host "[WARNING] tests errored, exiting" -Foreground "Red"
3029
}
3130
else
3231
{

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
912912
~RabbitMQ.Client.IChannel.QueueDeclarePassiveAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk>
913913
~RabbitMQ.Client.IChannel.QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>
914914
~RabbitMQ.Client.IChannel.QueuePurgeAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>
915-
~RabbitMQ.Client.IChannel.QueueUnbindAsync(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
915+
~RabbitMQ.Client.IChannel.QueueUnbindAsync(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
916916
~RabbitMQ.Client.IChannel.TxCommitAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
917917
~RabbitMQ.Client.IChannel.TxRollbackAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
918918
~RabbitMQ.Client.IChannel.TxSelectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task

projects/RabbitMQ.Client/client/api/IChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ Task QueueBindAsync(string queue, string exchange, string routingKey,
418418
/// Routing key must be shorter than 255 bytes.
419419
/// </remarks>
420420
Task QueueUnbindAsync(string queue, string exchange, string routingKey,
421-
IDictionary<string, object> arguments,
421+
IDictionary<string, object> arguments = null,
422422
CancellationToken cancellationToken = default);
423423

424424
/// <summary>

projects/RabbitMQ.Client/client/impl/RecordedBinding.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,11 @@ public Task RecoverAsync(IChannel channel)
8181

8282
public bool Equals(RecordedBinding other)
8383
{
84-
return _isQueueBinding == other._isQueueBinding && _destination == other._destination && _source == other._source &&
85-
_routingKey == other._routingKey && _arguments == other._arguments;
84+
return _isQueueBinding == other._isQueueBinding &&
85+
_destination == other._destination &&
86+
_source == other._source &&
87+
_routingKey == other._routingKey &&
88+
_arguments == other._arguments;
8689
}
8790

8891
public override bool Equals(object? obj)

projects/Test/Common/IntegrationFixture.cs

Lines changed: 111 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ public abstract class IntegrationFixture : IAsyncLifetime
5252
private static readonly bool s_isVerbose = false;
5353
private static int _connectionIdx = 0;
5454

55+
private Exception _connectionCallbackException;
56+
private Exception _connectionRecoveryException;
57+
private Exception _channelCallbackException;
58+
5559
protected readonly RabbitMQCtl _rabbitMQCtl;
5660

5761
protected ConnectionFactory _connFactory;
@@ -77,7 +81,12 @@ public abstract class IntegrationFixture : IAsyncLifetime
7781

7882
static IntegrationFixture()
7983
{
84+
85+
#if NET6_0_OR_GREATER
86+
S_Random = Random.Shared;
87+
#else
8088
S_Random = new Random();
89+
#endif
8190
s_isRunningInCI = InitIsRunningInCI();
8291
s_isVerbose = InitIsVerbose();
8392

@@ -146,8 +155,10 @@ public virtual async Task InitializeAsync()
146155

147156
if (IsVerbose)
148157
{
149-
AddCallbackHandlers();
158+
AddCallbackShutdownHandlers();
150159
}
160+
161+
AddCallbackExceptionHandlers();
151162
}
152163

153164
if (_connFactory.AutomaticRecoveryEnabled)
@@ -181,42 +192,122 @@ public virtual async Task DisposeAsync()
181192
_channel = null;
182193
_conn = null;
183194
}
195+
196+
DisposeAssertions();
197+
}
198+
199+
protected virtual void DisposeAssertions()
200+
{
201+
if (_connectionRecoveryException != null)
202+
{
203+
Assert.Fail($"unexpected connection recovery exception: {_connectionRecoveryException}");
204+
}
205+
206+
if (_connectionCallbackException != null)
207+
{
208+
Assert.Fail($"unexpected connection callback exception: {_connectionCallbackException}");
209+
}
210+
211+
if (_channelCallbackException != null)
212+
{
213+
Assert.Fail($"unexpected channel callback exception: {_channelCallbackException}");
214+
}
184215
}
185216

186-
protected virtual void AddCallbackHandlers()
217+
protected void AddCallbackExceptionHandlers()
187218
{
188219
if (_conn != null)
189220
{
190-
_conn.CallbackException += (o, ea) =>
221+
_conn.ConnectionRecoveryError += (s, ea) =>
191222
{
192-
_output.WriteLine("{0} connection callback exception: {1}",
193-
_testDisplayName, ea.Exception);
223+
_connectionRecoveryException = ea.Exception;
224+
225+
if (IsVerbose)
226+
{
227+
try
228+
{
229+
_output.WriteLine($"{0} connection recovery exception: {1}",
230+
_testDisplayName, _connectionRecoveryException);
231+
}
232+
catch (InvalidOperationException)
233+
{
234+
}
235+
}
194236
};
195237

196-
_conn.ConnectionShutdown += (o, ea) =>
238+
_conn.CallbackException += (o, ea) =>
197239
{
198-
HandleConnectionShutdown(_conn, ea, (args) =>
240+
_connectionCallbackException = ea.Exception;
241+
242+
if (IsVerbose)
199243
{
200-
_output.WriteLine("{0} connection shutdown, args: {1}",
201-
_testDisplayName, args);
202-
});
244+
try
245+
{
246+
_output.WriteLine("{0} connection callback exception: {1}",
247+
_testDisplayName, _connectionCallbackException);
248+
}
249+
catch (InvalidOperationException)
250+
{
251+
}
252+
}
203253
};
204254
}
205255

206256
if (_channel != null)
207257
{
208258
_channel.CallbackException += (o, ea) =>
209259
{
210-
_output.WriteLine("{0} channel callback exception: {1}",
211-
_testDisplayName, ea.Exception);
260+
_channelCallbackException = ea.Exception;
261+
262+
if (IsVerbose)
263+
{
264+
try
265+
{
266+
_output.WriteLine("{0} channel callback exception: {1}",
267+
_testDisplayName, _channelCallbackException);
268+
}
269+
catch (InvalidOperationException)
270+
{
271+
}
272+
}
273+
};
274+
}
275+
}
276+
277+
protected void AddCallbackShutdownHandlers()
278+
{
279+
if (_conn != null)
280+
{
281+
_conn.ConnectionShutdown += (o, ea) =>
282+
{
283+
HandleConnectionShutdown(_conn, ea, (args) =>
284+
{
285+
try
286+
{
287+
_output.WriteLine("{0} connection shutdown, args: {1}",
288+
_testDisplayName, args);
289+
}
290+
catch (InvalidOperationException)
291+
{
292+
}
293+
});
212294
};
295+
}
213296

297+
if (_channel != null)
298+
{
214299
_channel.ChannelShutdown += (o, ea) =>
215300
{
216301
HandleChannelShutdown(_channel, ea, (args) =>
217302
{
218-
_output.WriteLine("{0} channel shutdown, args: {1}",
219-
_testDisplayName, args);
303+
try
304+
{
305+
_output.WriteLine("{0} channel shutdown, args: {1}",
306+
_testDisplayName, args);
307+
}
308+
catch (InvalidOperationException)
309+
{
310+
}
220311
});
221312
};
222313
}
@@ -405,6 +496,11 @@ protected static Task AssertRanToCompletion(IEnumerable<Task> tasks)
405496
return DoAssertRanToCompletion(tasks);
406497
}
407498

499+
internal static void AssertRecordedQueues(AutorecoveringConnection c, int n)
500+
{
501+
Assert.Equal(n, c.RecordedQueuesCount);
502+
}
503+
408504
protected static Task WaitAsync(TaskCompletionSource<bool> tcs, string desc)
409505
{
410506
return WaitAsync(tcs, WaitSpan, desc);
@@ -524,11 +620,7 @@ protected static string GetUniqueString(ushort length)
524620
protected static byte[] GetRandomBody(ushort size = 1024)
525621
{
526622
var body = new byte[size];
527-
#if NET6_0_OR_GREATER
528-
Random.Shared.NextBytes(body);
529-
#else
530623
S_Random.NextBytes(body);
531-
#endif
532624
return body;
533625
}
534626

@@ -543,7 +635,7 @@ protected static TaskCompletionSource<bool> PrepareForRecovery(IConnection conn)
543635
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
544636

545637
AutorecoveringConnection aconn = conn as AutorecoveringConnection;
546-
aconn.RecoverySucceeded += (source, ea) => tcs.SetResult(true);
638+
aconn.RecoverySucceeded += (source, ea) => tcs.TrySetResult(true);
547639

548640
return tcs;
549641
}

projects/Test/Common/TestConnectionRecoveryBase.cs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public class TestConnectionRecoveryBase : IntegrationFixture
4646
protected const ushort TotalMessageCount = 16384;
4747
protected const ushort CloseAtCount = 16;
4848

49-
public TestConnectionRecoveryBase(ITestOutputHelper output) : base(output)
49+
public TestConnectionRecoveryBase(ITestOutputHelper output, bool dispatchConsumersAsync = false)
50+
: base(output, dispatchConsumersAsync: dispatchConsumersAsync)
5051
{
5152
_messageBody = GetRandomBody(4096);
5253
}
@@ -107,11 +108,6 @@ internal void AssertRecordedExchanges(AutorecoveringConnection c, int n)
107108
Assert.Equal(n, c.RecordedExchangesCount);
108109
}
109110

110-
internal void AssertRecordedQueues(AutorecoveringConnection c, int n)
111-
{
112-
Assert.Equal(n, c.RecordedQueuesCount);
113-
}
114-
115111
internal Task<AutorecoveringConnection> CreateAutorecoveringConnectionAsync()
116112
{
117113
return CreateAutorecoveringConnectionAsync(RecoveryInterval);
@@ -226,7 +222,7 @@ protected static TaskCompletionSource<bool> PrepareForShutdown(IConnection conn)
226222
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
227223

228224
AutorecoveringConnection aconn = conn as AutorecoveringConnection;
229-
aconn.ConnectionShutdown += (c, args) => tcs.SetResult(true);
225+
aconn.ConnectionShutdown += (c, args) => tcs.TrySetResult(true);
230226

231227
return tcs;
232228
}

0 commit comments

Comments
 (0)