Skip to content

Commit ef1c97a

Browse files
committed
Ensure that the underlying timer for Task.Delay is canceled.
Part of the fix to #1425 * Add EnsureCompleted task extensions. * Don't use compiler shenanigans for AsyncRpcContinuation * Add Ignore() extension to ignore the results of a Task
1 parent 10a3499 commit ef1c97a

File tree

9 files changed

+142
-72
lines changed

9 files changed

+142
-72
lines changed

projects/RabbitMQ.Client/client/TaskExtensions.cs

Lines changed: 83 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ internal static class TaskExtensions
3939
{
4040
#if !NET6_0_OR_GREATER
4141
private static readonly TaskContinuationOptions s_tco = TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously;
42-
private static void continuation(Task t, object s) => t.Exception.Handle(e => true);
42+
private static void IgnoreTaskContinuation(Task t, object s) => t.Exception.Handle(e => true);
4343
#endif
4444

4545
public static Task TimeoutAfter(this Task task, TimeSpan timeout)
@@ -59,60 +59,112 @@ public static Task TimeoutAfter(this Task task, TimeSpan timeout)
5959

6060
return DoTimeoutAfter(task, timeout);
6161

62+
// https://github.com/davidfowl/AspNetCoreDiagnosticScenarios/blob/master/AsyncGuidance.md#using-a-timeout
6263
static async Task DoTimeoutAfter(Task task, TimeSpan timeout)
6364
{
64-
if (task == await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false))
65+
using (var cts = new CancellationTokenSource())
6566
{
67+
Task delayTask = Task.Delay(timeout, cts.Token);
68+
Task resultTask = await Task.WhenAny(task, delayTask).ConfigureAwait(false);
69+
if (resultTask == delayTask)
70+
{
71+
task.Ignore();
72+
throw new TimeoutException();
73+
}
74+
else
75+
{
76+
cts.Cancel();
77+
}
78+
6679
await task.ConfigureAwait(false);
6780
}
68-
else
69-
{
70-
Task supressErrorTask = task.ContinueWith(
71-
continuationAction: continuation,
72-
state: null,
73-
cancellationToken: CancellationToken.None,
74-
continuationOptions: s_tco,
75-
scheduler: TaskScheduler.Default);
76-
throw new TimeoutException();
77-
}
7881
}
7982
#endif
8083
}
8184

82-
public static async ValueTask TimeoutAfter(this ValueTask task, TimeSpan timeout)
85+
public static async ValueTask TimeoutAfter(this ValueTask valueTask, TimeSpan timeout)
8386
{
84-
if (task.IsCompletedSuccessfully)
87+
if (valueTask.IsCompletedSuccessfully)
8588
{
8689
return;
8790
}
8891

8992
#if NET6_0_OR_GREATER
90-
Task actualTask = task.AsTask();
91-
await actualTask.WaitAsync(timeout)
93+
Task task = valueTask.AsTask();
94+
await task.WaitAsync(timeout)
9295
.ConfigureAwait(false);
9396
#else
94-
await DoTimeoutAfter(task, timeout)
97+
await DoTimeoutAfter(valueTask, timeout)
9598
.ConfigureAwait(false);
9699

97-
async static ValueTask DoTimeoutAfter(ValueTask task, TimeSpan timeout)
100+
// https://github.com/davidfowl/AspNetCoreDiagnosticScenarios/blob/master/AsyncGuidance.md#using-a-timeout
101+
static async ValueTask DoTimeoutAfter(ValueTask valueTask, TimeSpan timeout)
98102
{
99-
Task actualTask = task.AsTask();
100-
if (actualTask == await Task.WhenAny(actualTask, Task.Delay(timeout)).ConfigureAwait(false))
103+
Task task = valueTask.AsTask();
104+
using (var cts = new CancellationTokenSource())
101105
{
102-
await actualTask.ConfigureAwait(false);
103-
}
104-
else
105-
{
106-
Task supressErrorTask = actualTask.ContinueWith(
107-
continuationAction: continuation,
108-
state: null,
109-
cancellationToken: CancellationToken.None,
110-
continuationOptions: s_tco,
111-
scheduler: TaskScheduler.Default);
112-
throw new TimeoutException();
106+
Task delayTask = Task.Delay(timeout, cts.Token);
107+
Task resultTask = await Task.WhenAny(task, delayTask).ConfigureAwait(false);
108+
if (resultTask == delayTask)
109+
{
110+
task.Ignore();
111+
throw new TimeoutException();
112+
}
113+
else
114+
{
115+
cts.Cancel();
116+
}
117+
118+
await valueTask.ConfigureAwait(false);
113119
}
114120
}
115121
#endif
116122
}
123+
124+
/*
125+
* https://devblogs.microsoft.com/dotnet/configureawait-faq/
126+
* I'm using GetAwaiter().GetResult(). Do I need to use ConfigureAwait(false)?
127+
* Answer: No
128+
*/
129+
public static void EnsureCompleted(this Task task)
130+
{
131+
task.GetAwaiter().GetResult();
132+
}
133+
134+
public static T EnsureCompleted<T>(this Task<T> task)
135+
{
136+
return task.GetAwaiter().GetResult();
137+
}
138+
139+
public static T EnsureCompleted<T>(this ValueTask<T> task)
140+
{
141+
return task.GetAwaiter().GetResult();
142+
}
143+
144+
public static void EnsureCompleted(this ValueTask task)
145+
{
146+
task.GetAwaiter().GetResult();
147+
}
148+
149+
#if !NET6_0_OR_GREATER
150+
// https://github.com/dotnet/runtime/issues/23878
151+
// https://github.com/dotnet/runtime/issues/23878#issuecomment-1398958645
152+
public static void Ignore(this Task task)
153+
{
154+
if (task.IsCompleted)
155+
{
156+
_ = task.Exception;
157+
}
158+
else
159+
{
160+
_ = task.ContinueWith(
161+
continuationAction: IgnoreTaskContinuation,
162+
state: null,
163+
cancellationToken: CancellationToken.None,
164+
continuationOptions: s_tco,
165+
scheduler: TaskScheduler.Default);
166+
}
167+
}
168+
#endif
117169
}
118170
}

projects/RabbitMQ.Client/client/api/ITcpClient.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ public interface ITcpClient : IDisposable
1818

1919
Socket Client { get; }
2020

21+
// TODO CancellationToken
2122
Task ConnectAsync(string host, int port);
23+
// TODO CancellationToken
2224
Task ConnectAsync(IPAddress host, int port);
2325

2426
NetworkStream GetStream();

projects/RabbitMQ.Client/client/api/TcpClientAdapter.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@ await ConnectAsync(ep, port)
3434
.ConfigureAwait(false);
3535
}
3636

37-
public virtual Task ConnectAsync(IPAddress ep, int port)
37+
public virtual async Task ConnectAsync(IPAddress ep, int port)
3838
{
3939
AssertSocket();
40-
return _sock.ConnectAsync(ep, port);
40+
await _sock.ConnectAsync(ep, port)
41+
.ConfigureAwait(false);
4142
}
4243

4344
public virtual void Close()

projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131

3232
using System;
3333
using System.Diagnostics;
34-
using System.Runtime.CompilerServices;
3534
using System.Threading;
3635
using System.Threading.Tasks;
3736
using RabbitMQ.Client.client.framing;
@@ -44,7 +43,6 @@ namespace RabbitMQ.Client.Impl
4443
internal abstract class AsyncRpcContinuation<T> : IRpcContinuation, IDisposable
4544
{
4645
private readonly CancellationTokenSource _cancellationTokenSource;
47-
private readonly ConfiguredTaskAwaitable<T> _taskAwaitable;
4846

4947
protected readonly TaskCompletionSource<T> _tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
5048

@@ -68,13 +66,11 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout)
6866
// in the same manner as BlockingCell?
6967
}
7068
}, useSynchronizationContext: false);
71-
72-
_taskAwaitable = _tcs.Task.ConfigureAwait(false);
7369
}
7470

75-
public ConfiguredTaskAwaitable<T>.ConfiguredTaskAwaiter GetAwaiter()
71+
public Task<T> WaitAsync()
7672
{
77-
return _taskAwaitable.GetAwaiter();
73+
return _tcs.Task;
7874
}
7975

8076
public abstract void HandleCommand(in IncomingCommand cmd);

0 commit comments

Comments
 (0)