Skip to content

Commit 143f697

Browse files
committed
Follow-up to #1669 - per-channel dispatch concurrency
PR #1669 by @danielmarbach adds the ability to configure consumer dispatch on a per-channel basis. * Test that consumer dispatch concurrency is set on the dispatcher.
1 parent 624cf2e commit 143f697

19 files changed

+106
-53
lines changed

projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ namespace Benchmarks.Networking
99
[MemoryDiagnoser]
1010
public class Networking_BasicDeliver_Commons
1111
{
12-
public static async Task Publish_Hello_World(IConnection connection, uint messageCount, byte[] body)
12+
public static async Task Publish_Hello_World(IConnection connection,
13+
uint messageCount, byte[] body, ushort consumerDispatchConcurrency = 1)
1314
{
14-
using (IChannel channel = await connection.CreateChannelAsync())
15+
using (IChannel channel = await connection.CreateChannelAsync(consumerDispatchConcurrency))
1516
{
1617
QueueDeclareOk queue = await channel.QueueDeclareAsync();
1718
var consumer = new CountingConsumer(channel, messageCount);

projects/Benchmarks/Networking/Networking_BasicDeliver_ConnectionChurn.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,17 @@ public void GlobalCleanup()
2929
[Benchmark(Baseline = true)]
3030
public async Task Publish_Hello_World()
3131
{
32-
var cf = new ConnectionFactory { ConsumerDispatchConcurrency = 2 };
32+
var cf = new ConnectionFactory();
3333
using (IConnection connection = await cf.CreateConnectionAsync())
3434
{
35-
await Publish_Hello_World(connection);
35+
await Publish_Hello_World(connection, 2);
3636
}
3737
}
3838

39-
public static async Task Publish_Hello_World(IConnection connection)
39+
public static async Task Publish_Hello_World(IConnection connection, ushort consumerDispatchConcurrency)
4040
{
41-
await Networking_BasicDeliver_Commons.Publish_Hello_World(connection, messageCount, _body);
41+
await Networking_BasicDeliver_Commons.Publish_Hello_World(connection, messageCount, _body,
42+
consumerDispatchConcurrency);
4243
}
4344
}
4445
}

projects/Benchmarks/Networking/Networking_BasicDeliver_LongLivedConnection.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ public class Networking_BasicDeliver_LongLivedConnection
1919
public void GlobalSetup()
2020
{
2121
_container = RabbitMQBroker.Start();
22-
23-
var cf = new ConnectionFactory { ConsumerDispatchConcurrency = 2 };
22+
var cf = new ConnectionFactory();
2423
// NOTE: https://github.com/dotnet/BenchmarkDotNet/issues/1738
2524
_connection = EnsureCompleted(cf.CreateConnectionAsync());
2625
}
@@ -35,7 +34,8 @@ public void GlobalCleanup()
3534
[Benchmark(Baseline = true)]
3635
public Task Publish_Hello_World()
3736
{
38-
return Networking_BasicDeliver_Commons.Publish_Hello_World(_connection, messageCount, _body);
37+
return Networking_BasicDeliver_Commons.Publish_Hello_World(_connection, messageCount, _body,
38+
consumerDispatchConcurrency: 2);
3939
}
4040

4141
private static T EnsureCompleted<T>(Task<T> task) => task.GetAwaiter().GetResult();

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,7 @@ readonly RabbitMQ.Client.ConnectionConfig.AuthMechanisms -> System.Collections.G
700700
readonly RabbitMQ.Client.ConnectionConfig.ClientProperties -> System.Collections.Generic.IDictionary<string, object>
701701
readonly RabbitMQ.Client.ConnectionConfig.ClientProvidedName -> string
702702
readonly RabbitMQ.Client.ConnectionConfig.ContinuationTimeout -> System.TimeSpan
703+
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
703704
readonly RabbitMQ.Client.ConnectionConfig.HandshakeContinuationTimeout -> System.TimeSpan
704705
readonly RabbitMQ.Client.ConnectionConfig.HeartbeatInterval -> System.TimeSpan
705706
readonly RabbitMQ.Client.ConnectionConfig.MaxChannelCount -> ushort
@@ -891,8 +892,6 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
891892
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
892893
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
893894
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
895+
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
894896
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
895-
const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort
896-
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
897-
static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
898-
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
897+
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!

projects/RabbitMQ.Client/client/api/ConnectionConfig.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
150150
ushort maxChannelCount, uint maxFrameSize, uint maxInboundMessageBodySize, bool topologyRecoveryEnabled,
151151
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
152152
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
153-
ushort consumerDispatchConcurrency, Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
153+
ushort consumerDispatchConcurrency,
154+
Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
154155
{
155156
VirtualHost = virtualHost;
156157
UserName = userName;

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,6 @@ namespace RabbitMQ.Client
9292
///hosts with an empty name are not addressable. </para></remarks>
9393
public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
9494
{
95-
/// <summary>
96-
/// Default value for consumer dispatch concurrency.
97-
/// </summary>
98-
public const ushort DefaultConsumerDispatchConcurrency = 1;
99-
10095
/// <summary>
10196
/// Default value for the desired maximum channel number. Default: 2047.
10297
/// </summary>
@@ -180,7 +175,8 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
180175
/// </summary>
181176
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
182177
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
183-
public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency;
178+
public ushort ConsumerDispatchConcurrency { get; set; } = Constants.DefaultConsumerDispatchConcurrency;
179+
184180

185181
/// <summary>The host to connect to.</summary>
186182
public string HostName { get; set; } = "localhost";

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,12 +239,13 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
239239
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
240240
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
241241
///
242-
/// Defaults to <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>.
242+
/// The default value is <see cref="Constants.DefaultConsumerDispatchConcurrency"/>
243243
///
244244
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
245245
/// In addition to that consumers need to be thread/concurrency safe.
246246
/// </param>
247247
/// <param name="cancellationToken">Cancellation token</param>
248-
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default);
248+
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
249+
CancellationToken cancellationToken = default);
249250
}
250251
}

projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,6 @@ namespace RabbitMQ.Client
77
{
88
public static class IConnectionExtensions
99
{
10-
/// <summary>
11-
/// Asynchronously create and return a fresh channel, session, and channel.
12-
/// </summary>
13-
public static Task<IChannel> CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) =>
14-
connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken);
15-
1610
/// <summary>
1711
/// Asynchronously close this connection and all its channels.
1812
/// </summary>

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ namespace RabbitMQ.Client.Framing.Impl
3838
{
3939
internal class Channel : ChannelBase
4040
{
41-
public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
41+
public Channel(ConnectionConfig config, ISession session,
42+
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
4243
: base(config, session, consumerDispatchConcurrency)
4344
{
4445
}

projects/RabbitMQ.Client/client/framing/Constants.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,13 @@ public static class Constants
8383
public const int NotImplemented = 540;
8484
///<summary>(= 541)</summary>
8585
public const int InternalError = 541;
86+
87+
/// <summary>
88+
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
89+
/// to set this value for every channel created on a connection,
90+
/// and <see cref="IConnection.CreateChannelAsync(ushort, System.Threading.CancellationToken)"/>
91+
/// for setting this value for a particular channel.
92+
/// </summary>
93+
public const ushort DefaultConsumerDispatchConcurrency = 1;
8694
}
8795
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,12 +240,14 @@ await CloseInnerConnectionAsync()
240240
}
241241
}
242242

243-
public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
243+
public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
244+
CancellationToken cancellationToken = default)
244245
{
245246
EnsureIsOpen();
246-
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken)
247+
ushort cdc = DetermineConsumerDispatchConcurrency(_config, consumerDispatchConcurrency);
248+
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken)
247249
.ConfigureAwait(false);
248-
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, consumerDispatchConcurrency);
250+
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc);
249251
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
250252
.ConfigureAwait(false);
251253
return channel;
@@ -279,6 +281,16 @@ public void Dispose()
279281
private void EnsureIsOpen()
280282
=> InnerConnection.EnsureIsOpen();
281283

284+
private static ushort DetermineConsumerDispatchConcurrency(ConnectionConfig config, ushort perChannelConsumerDispatchConcurrency)
285+
{
286+
ushort cdc = config.ConsumerDispatchConcurrency;
287+
if (perChannelConsumerDispatchConcurrency > Constants.DefaultConsumerDispatchConcurrency)
288+
{
289+
cdc = perChannelConsumerDispatchConcurrency;
290+
}
291+
return cdc;
292+
}
293+
282294
[MethodImpl(MethodImplOptions.AggressiveInlining)]
283295
private void ThrowIfDisposed()
284296
{

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,12 @@ internal abstract class ChannelBase : IChannel, IRecoverable
7373

7474
internal readonly IConsumerDispatcher ConsumerDispatcher;
7575

76-
protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
76+
protected ChannelBase(ConnectionConfig config, ISession session,
77+
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
7778
{
7879
ContinuationTimeout = config.ContinuationTimeout;
79-
ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency);
80+
ConsumerDispatcher = BuildConsumerDispatcher(config, consumerDispatchConcurrency);
81+
8082
Action<Exception, string> onException = (exception, context) =>
8183
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
8284
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);
@@ -92,6 +94,16 @@ protected ChannelBase(ConnectionConfig config, ISession session, ushort consumer
9294
Session = session;
9395
}
9496

97+
private IConsumerDispatcher BuildConsumerDispatcher(ConnectionConfig config, ushort perChannelConsumerDispatchConcurrency)
98+
{
99+
ushort cdc = config.ConsumerDispatchConcurrency;
100+
if (perChannelConsumerDispatchConcurrency > Constants.DefaultConsumerDispatchConcurrency)
101+
{
102+
cdc = perChannelConsumerDispatchConcurrency;
103+
}
104+
return new AsyncConsumerDispatcher(this, cdc);
105+
}
106+
95107
internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10);
96108
public TimeSpan ContinuationTimeout { get; set; }
97109

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
7272

7373
_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
7474
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
75-
_channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ;
75+
_channel0 = new Channel(_config, _session0);
7676

7777
ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
7878
{
@@ -253,7 +253,8 @@ await CloseAsync(ea, true,
253253
}
254254
}
255255

256-
public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
256+
public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
257+
CancellationToken cancellationToken = default)
257258
{
258259
EnsureIsOpen();
259260
ISession session = CreateSession();

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
1616
private readonly Task _worker;
1717
private bool _quiesce = false;
1818
private bool _disposed;
19+
private ushort _concurrency;
1920

2021
internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
2122
{
2223
_channel = channel;
24+
_concurrency = concurrency;
2325
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions
2426
{
25-
SingleReader = concurrency == 1,
27+
SingleReader = _concurrency == 1,
2628
SingleWriter = false,
2729
AllowSynchronousContinuations = false
2830
});
@@ -36,22 +38,18 @@ internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
3638
}
3739
else
3840
{
39-
var tasks = new Task[concurrency];
40-
for (int i = 0; i < concurrency; i++)
41+
var tasks = new Task[_concurrency];
42+
for (int i = 0; i < _concurrency; i++)
4143
{
4244
tasks[i] = Task.Run(loopStart);
4345
}
4446
_worker = Task.WhenAll(tasks);
4547
}
4648
}
4749

48-
public bool IsShutdown
49-
{
50-
get
51-
{
52-
return _quiesce;
53-
}
54-
}
50+
public bool IsShutdown => _quiesce;
51+
52+
public ushort Concurrency => _concurrency;
5553

5654
public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
5755
{

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ internal interface IConsumerDispatcher : IDisposable
4141

4242
bool IsShutdown { get; }
4343

44+
ushort Concurrency { get; }
45+
4446
IAsyncBasicConsumer GetAndRemoveConsumer(string tag);
4547

4648
ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken);

projects/Test/Common/IntegrationFixture.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public abstract class IntegrationFixture : IAsyncLifetime
7171
protected readonly ITestOutputHelper _output;
7272
protected readonly string _testDisplayName;
7373

74-
protected readonly ushort _consumerDispatchConcurrency = 1;
74+
protected readonly ushort _consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency;
7575
protected readonly bool _openChannel = true;
7676

7777
public static readonly TimeSpan ShortSpan;
@@ -109,7 +109,7 @@ static IntegrationFixture()
109109
}
110110

111111
public IntegrationFixture(ITestOutputHelper output,
112-
ushort consumerDispatchConcurrency = 1,
112+
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
113113
bool openChannel = true)
114114
{
115115
_consumerDispatchConcurrency = consumerDispatchConcurrency;
@@ -144,7 +144,6 @@ public virtual async Task InitializeAsync()
144144
if (_connFactory == null)
145145
{
146146
_connFactory = CreateConnectionFactory();
147-
_connFactory.ConsumerDispatchConcurrency = _consumerDispatchConcurrency;
148147
}
149148

150149
if (_conn == null)
@@ -153,7 +152,8 @@ public virtual async Task InitializeAsync()
153152

154153
if (_openChannel)
155154
{
156-
_channel = await _conn.CreateChannelAsync();
155+
_channel = await _conn.CreateChannelAsync(
156+
consumerDispatchConcurrency: _consumerDispatchConcurrency);
157157
}
158158

159159
if (IsVerbose)

0 commit comments

Comments
 (0)