Skip to content

Allow the dispatcher concurrency to be overriden per channel #1669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class BasicDeliverConsumerDispatching : ConsumerDispatcherBase
public int Count { get; set; }

[Params(1, 2)]
public int Concurrency { get; set; }
public ushort Concurrency { get; set; }

[GlobalSetup(Target = nameof(AsyncConsumerDispatcher))]
public async Task SetUpAsyncConsumer()
Expand Down
10 changes: 6 additions & 4 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ RabbitMQ.Client.ConnectionFactory.ClientProperties.set -> void
RabbitMQ.Client.ConnectionFactory.ClientProvidedName.get -> string
RabbitMQ.Client.ConnectionFactory.ClientProvidedName.set -> void
RabbitMQ.Client.ConnectionFactory.ConnectionFactory() -> void
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> int
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> ushort
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.set -> void
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.get -> System.TimeSpan
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.set -> void
Expand Down Expand Up @@ -472,7 +472,7 @@ RabbitMQ.Client.IConnectionFactory.ClientProperties.get -> System.Collections.Ge
RabbitMQ.Client.IConnectionFactory.ClientProperties.set -> void
RabbitMQ.Client.IConnectionFactory.ClientProvidedName.get -> string
RabbitMQ.Client.IConnectionFactory.ClientProvidedName.set -> void
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.get -> int
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.get -> ushort
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.set -> void
RabbitMQ.Client.IConnectionFactory.ContinuationTimeout.get -> System.TimeSpan
RabbitMQ.Client.IConnectionFactory.ContinuationTimeout.set -> void
Expand Down Expand Up @@ -700,7 +700,6 @@ readonly RabbitMQ.Client.ConnectionConfig.AuthMechanisms -> System.Collections.G
readonly RabbitMQ.Client.ConnectionConfig.ClientProperties -> System.Collections.Generic.IDictionary<string, object>
readonly RabbitMQ.Client.ConnectionConfig.ClientProvidedName -> string
readonly RabbitMQ.Client.ConnectionConfig.ContinuationTimeout -> System.TimeSpan
readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumerConcurrency -> int
readonly RabbitMQ.Client.ConnectionConfig.HandshakeContinuationTimeout -> System.TimeSpan
readonly RabbitMQ.Client.ConnectionConfig.HeartbeatInterval -> System.TimeSpan
readonly RabbitMQ.Client.ConnectionConfig.MaxChannelCount -> ushort
Expand Down Expand Up @@ -826,7 +825,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~RabbitMQ.Client.IChannel.WaitForConfirmsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<bool>
~RabbitMQ.Client.IChannel.WaitForConfirmsOrDieAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IConnection.CreateChannelAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel>
~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint> endpoints, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
Expand Down Expand Up @@ -894,3 +892,7 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/client/api/ConnectionConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public sealed class ConnectionConfig
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
/// </summary>
public readonly int DispatchConsumerConcurrency;
public readonly ushort ConsumerDispatchConcurrency;

internal readonly Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> FrameHandlerFactoryAsync;

Expand All @@ -150,7 +150,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
ushort maxChannelCount, uint maxFrameSize, uint maxInboundMessageBodySize, bool topologyRecoveryEnabled,
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
int dispatchConsumerConcurrency, Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
ushort consumerDispatchConcurrency, Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
{
VirtualHost = virtualHost;
UserName = userName;
Expand All @@ -170,7 +170,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
ContinuationTimeout = continuationTimeout;
HandshakeContinuationTimeout = handshakeContinuationTimeout;
RequestedConnectionTimeout = requestedConnectionTimeout;
DispatchConsumerConcurrency = dispatchConsumerConcurrency;
ConsumerDispatchConcurrency = consumerDispatchConcurrency;
FrameHandlerFactoryAsync = frameHandlerFactoryAsync;
}
}
Expand Down
7 changes: 6 additions & 1 deletion projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ namespace RabbitMQ.Client
///hosts with an empty name are not addressable. </para></remarks>
public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
{
/// <summary>
/// Default value for consumer dispatch concurrency.
/// </summary>
public const ushort DefaultConsumerDispatchConcurrency = 1;

/// <summary>
/// Default value for the desired maximum channel number. Default: 2047.
/// </summary>
Expand Down Expand Up @@ -175,7 +180,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
/// </summary>
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
public int ConsumerDispatchConcurrency { get; set; } = 1;
public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency;

/// <summary>The host to connect to.</summary>
public string HostName { get; set; } = "localhost";
Expand Down
13 changes: 11 additions & 2 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,17 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
/// <summary>
/// Asynchronously create and return a fresh channel, session, and channel.
/// </summary>
/// <param name="consumerDispatchConcurrency">
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
///
/// Defaults to <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>.
///
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great to see this mentioned, it's has been a repeated question with the Java client (which has a comparable setting) for many years.

/// In addition to that consumers need to be thread/concurrency safe.
/// </param>
/// <param name="cancellationToken">Cancellation token</param>
Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default);

Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default);
}
}
6 changes: 6 additions & 0 deletions projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ namespace RabbitMQ.Client
{
public static class IConnectionExtensions
{
/// <summary>
/// Asynchronously create and return a fresh channel, session, and channel.
/// </summary>
public static Task<IChannel> CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) =>
connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken);

/// <summary>
/// Asynchronously close this connection and all its channels.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/api/IConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,6 @@ Task<IConnection> CreateConnectionAsync(IEnumerable<AmqpTcpEndpoint> endpoints,
/// </summary>
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
int ConsumerDispatchConcurrency { get; set; }
ushort ConsumerDispatchConcurrency { get; set; }
}
}
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ namespace RabbitMQ.Client.Framing.Impl
{
internal class Channel : ChannelBase
{
public Channel(ConnectionConfig config, ISession session) : base(config, session)
public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
: base(config, session, consumerDispatchConcurrency)
{
}

Expand Down
8 changes: 6 additions & 2 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private bool _usesPublisherConfirms;
private bool _tracksPublisherConfirmations;
private bool _usesTransactions;
private ushort _consumerDispatchConcurrency;

internal IConsumerDispatcher ConsumerDispatcher => InnerChannel.ConsumerDispatcher;

Expand All @@ -70,10 +71,12 @@ public TimeSpan ContinuationTimeout
set => InnerChannel.ContinuationTimeout = value;
}

public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel)
public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel,
ushort consumerDispatchConcurrency)
{
_connection = conn;
_innerChannel = innerChannel;
_consumerDispatchConcurrency = consumerDispatchConcurrency;
}

public event EventHandler<BasicAckEventArgs> BasicAcks
Expand Down Expand Up @@ -160,7 +163,8 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con

_connection = conn;

RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(cancellationToken)
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_consumerDispatchConcurrency,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
newChannel.TakeOver(_innerChannel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private async ValueTask RecoverExchangesAsync(IConnection connection,
{
try
{
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false))
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
{
await recordedExchange.RecoverAsync(ch, cancellationToken)
.ConfigureAwait(false);
Expand Down Expand Up @@ -347,7 +347,7 @@ private async Task RecoverQueuesAsync(IConnection connection,
try
{
string newName = string.Empty;
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false))
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
{
newName = await recordedQueue.RecoverAsync(ch, cancellationToken)
.ConfigureAwait(false);
Expand Down Expand Up @@ -458,7 +458,7 @@ private async ValueTask RecoverBindingsAsync(IConnection connection,
{
try
{
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false))
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
{
await binding.RecoverAsync(ch, cancellationToken)
.ConfigureAwait(false);
Expand Down
11 changes: 6 additions & 5 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer

public IProtocol Protocol => Endpoint.Protocol;

public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(CancellationToken cancellationToken)
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(ushort consumerDispatchConcurrency,
CancellationToken cancellationToken = default)
{
ISession session = InnerConnection.CreateSession();
var result = new RecoveryAwareChannel(_config, session);
var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency);
return (RecoveryAwareChannel)await result.OpenAsync(cancellationToken).ConfigureAwait(false);
}

Expand Down Expand Up @@ -239,12 +240,12 @@ await CloseInnerConnectionAsync()
}
}

public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
{
EnsureIsOpen();
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cancellationToken)
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken)
.ConfigureAwait(false);
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel);
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, consumerDispatchConcurrency);
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
return channel;
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ internal abstract class ChannelBase : IChannel, IRecoverable

internal readonly IConsumerDispatcher ConsumerDispatcher;

protected ChannelBase(ConnectionConfig config, ISession session)
protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
{
ContinuationTimeout = config.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this, config.DispatchConsumerConcurrency);
ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency);
Action<Exception, string> onException = (exception, context) =>
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);
Expand Down
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)

_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
_channel0 = new Channel(_config, _session0); ;
_channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ;

ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
{
Expand Down Expand Up @@ -253,11 +253,11 @@ await CloseAsync(ea, true,
}
}

public Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
{
EnsureIsOpen();
ISession session = CreateSession();
var channel = new Channel(_config, session);
var channel = new Channel(_config, session, consumerDispatchConcurrency);
return channel.OpenAsync(cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace RabbitMQ.Client.ConsumerDispatching
{
internal sealed class AsyncConsumerDispatcher : ConsumerDispatcherChannelBase
{
internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency)
internal AsyncConsumerDispatcher(ChannelBase channel, ushort concurrency)
: base(channel, concurrency)
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
private bool _quiesce = false;
private bool _disposed;

internal ConsumerDispatcherChannelBase(ChannelBase channel, int concurrency)
internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
{
_channel = channel;
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions
Expand Down
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/impl/RecoveryAwareChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ namespace RabbitMQ.Client.Impl
{
internal sealed class RecoveryAwareChannel : Channel
{
public RecoveryAwareChannel(ConnectionConfig config, ISession session) : base(config, session)
public RecoveryAwareChannel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
: base(config, session, consumerDispatchConcurrency)
{
ActiveDeliveryTagOffset = 0;
MaxSeenDeliveryTag = 0;
Expand Down