Skip to content

Commit bbc33f1

Browse files
authored
Revert "Follow-up to #1669 - per-channel dispatch concurrency"
1 parent 735bbca commit bbc33f1

15 files changed

+44
-87
lines changed

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,7 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
892892
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
893893
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
894894
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
895-
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
895+
const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort
896+
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
897+
static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
896898
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
897-
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ namespace RabbitMQ.Client
9292
///hosts with an empty name are not addressable. </para></remarks>
9393
public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
9494
{
95+
/// <summary>
96+
/// Default value for consumer dispatch concurrency.
97+
/// </summary>
98+
public const ushort DefaultConsumerDispatchConcurrency = 1;
99+
95100
/// <summary>
96101
/// Default value for the desired maximum channel number. Default: 2047.
97102
/// </summary>
@@ -175,7 +180,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
175180
/// </summary>
176181
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
177182
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
178-
public ushort ConsumerDispatchConcurrency { get; set; } = Constants.DefaultConsumerDispatchConcurrency;
183+
public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency;
179184

180185
/// <summary>The host to connect to.</summary>
181186
public string HostName { get; set; } = "localhost";

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,12 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
239239
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
240240
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
241241
///
242-
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
242+
/// Defaults to <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>.
243243
///
244244
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
245245
/// In addition to that consumers need to be thread/concurrency safe.
246246
/// </param>
247247
/// <param name="cancellationToken">Cancellation token</param>
248-
Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
249-
CancellationToken cancellationToken = default);
248+
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default);
250249
}
251250
}

projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ namespace RabbitMQ.Client
77
{
88
public static class IConnectionExtensions
99
{
10+
/// <summary>
11+
/// Asynchronously create and return a fresh channel, session, and channel.
12+
/// </summary>
13+
public static Task<IChannel> CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) =>
14+
connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken);
15+
1016
/// <summary>
1117
/// Asynchronously close this connection and all its channels.
1218
/// </summary>

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ namespace RabbitMQ.Client.Framing.Impl
3838
{
3939
internal class Channel : ChannelBase
4040
{
41-
public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null)
41+
public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
4242
: base(config, session, consumerDispatchConcurrency)
4343
{
4444
}

projects/RabbitMQ.Client/client/framing/Constants.cs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,5 @@ public static class Constants
8383
public const int NotImplemented = 540;
8484
///<summary>(= 541)</summary>
8585
public const int InternalError = 541;
86-
87-
/// <summary>
88-
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
89-
/// to set this value for every channel created on a connection,
90-
/// and <see cref="IConnection.CreateChannelAsync(ushort?, System.Threading.CancellationToken)"/>
91-
/// for setting this value for a particular channel.
92-
/// </summary>
93-
public const ushort DefaultConsumerDispatchConcurrency = 1;
9486
}
9587
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,12 @@ await CloseInnerConnectionAsync()
240240
}
241241
}
242242

243-
public async Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
244-
CancellationToken cancellationToken = default)
243+
public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
245244
{
246245
EnsureIsOpen();
247-
ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
248-
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken)
246+
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken)
249247
.ConfigureAwait(false);
250-
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc);
248+
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, consumerDispatchConcurrency);
251249
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
252250
.ConfigureAwait(false);
253251
return channel;

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,10 @@ internal abstract class ChannelBase : IChannel, IRecoverable
7373

7474
internal readonly IConsumerDispatcher ConsumerDispatcher;
7575

76-
protected ChannelBase(ConnectionConfig config, ISession session,
77-
ushort? perChannelConsumerDispatchConcurrency = null)
76+
protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
7877
{
7978
ContinuationTimeout = config.ContinuationTimeout;
80-
ConsumerDispatcher = new AsyncConsumerDispatcher(this,
81-
perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency));
79+
ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency);
8280
Action<Exception, string> onException = (exception, context) =>
8381
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
8482
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
7272

7373
_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
7474
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
75-
_channel0 = new Channel(_config, _session0);
75+
_channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ;
7676

7777
ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
7878
{
@@ -253,8 +253,7 @@ await CloseAsync(ea, true,
253253
}
254254
}
255255

256-
public Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
257-
CancellationToken cancellationToken = default)
256+
public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
258257
{
259258
EnsureIsOpen();
260259
ISession session = CreateSession();

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,42 +14,44 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
1414
protected readonly ChannelReader<WorkStruct> _reader;
1515
private readonly ChannelWriter<WorkStruct> _writer;
1616
private readonly Task _worker;
17-
private readonly ushort _concurrency;
1817
private bool _quiesce = false;
1918
private bool _disposed;
2019

2120
internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
2221
{
2322
_channel = channel;
24-
_concurrency = concurrency;
2523
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions
2624
{
27-
SingleReader = _concurrency == 1,
25+
SingleReader = concurrency == 1,
2826
SingleWriter = false,
2927
AllowSynchronousContinuations = false
3028
});
3129
_reader = workChannel.Reader;
3230
_writer = workChannel.Writer;
3331

3432
Func<Task> loopStart = ProcessChannelAsync;
35-
if (_concurrency == 1)
33+
if (concurrency == 1)
3634
{
3735
_worker = Task.Run(loopStart);
3836
}
3937
else
4038
{
41-
var tasks = new Task[_concurrency];
42-
for (int i = 0; i < _concurrency; i++)
39+
var tasks = new Task[concurrency];
40+
for (int i = 0; i < concurrency; i++)
4341
{
4442
tasks[i] = Task.Run(loopStart);
4543
}
4644
_worker = Task.WhenAll(tasks);
4745
}
4846
}
4947

50-
public bool IsShutdown => _quiesce;
51-
52-
public ushort Concurrency => _concurrency;
48+
public bool IsShutdown
49+
{
50+
get
51+
{
52+
return _quiesce;
53+
}
54+
}
5355

5456
public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
5557
{

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ internal interface IConsumerDispatcher : IDisposable
4141

4242
bool IsShutdown { get; }
4343

44-
ushort Concurrency { get; }
45-
4644
IAsyncBasicConsumer GetAndRemoveConsumer(string tag);
4745

4846
ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken);

projects/Test/Common/IntegrationFixture.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public abstract class IntegrationFixture : IAsyncLifetime
7171
protected readonly ITestOutputHelper _output;
7272
protected readonly string _testDisplayName;
7373

74-
protected readonly ushort _consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency;
74+
protected readonly ushort _consumerDispatchConcurrency = 1;
7575
protected readonly bool _openChannel = true;
7676

7777
public static readonly TimeSpan ShortSpan;
@@ -109,7 +109,7 @@ static IntegrationFixture()
109109
}
110110

111111
public IntegrationFixture(ITestOutputHelper output,
112-
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
112+
ushort consumerDispatchConcurrency = 1,
113113
bool openChannel = true)
114114
{
115115
_consumerDispatchConcurrency = consumerDispatchConcurrency;
@@ -143,7 +143,8 @@ public virtual async Task InitializeAsync()
143143
*/
144144
if (_connFactory == null)
145145
{
146-
_connFactory = CreateConnectionFactory(_consumerDispatchConcurrency);
146+
_connFactory = CreateConnectionFactory();
147+
_connFactory.ConsumerDispatchConcurrency = _consumerDispatchConcurrency;
147148
}
148149

149150
if (_conn == null)
@@ -516,15 +517,13 @@ protected static async Task WaitAsync(TaskCompletionSource<bool> tcs, TimeSpan t
516517
}
517518
}
518519

519-
protected ConnectionFactory CreateConnectionFactory(
520-
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
520+
protected ConnectionFactory CreateConnectionFactory()
521521
{
522522
return new ConnectionFactory
523523
{
524524
ClientProvidedName = $"{_testDisplayName}:{Util.Now}:{GetConnectionIdx()}",
525525
ContinuationTimeout = WaitSpan,
526526
HandshakeContinuationTimeout = WaitSpan,
527-
ConsumerDispatchConcurrency = consumerDispatchConcurrency
528527
};
529528
}
530529

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -36,28 +36,23 @@
3636
using System.Threading.Tasks;
3737
using RabbitMQ.Client;
3838
using RabbitMQ.Client.Events;
39-
using RabbitMQ.Client.Impl;
4039
using Xunit;
4140
using Xunit.Abstractions;
4241

4342
namespace Test.Integration
4443
{
4544
public class TestAsyncConsumer : IntegrationFixture
4645
{
47-
private const ushort ConsumerDispatchConcurrency = 2;
48-
4946
private readonly ShutdownEventArgs _closeArgs = new ShutdownEventArgs(ShutdownInitiator.Application, Constants.ReplySuccess, "normal shutdown");
5047

5148
public TestAsyncConsumer(ITestOutputHelper output)
52-
: base(output, consumerDispatchConcurrency: ConsumerDispatchConcurrency)
49+
: base(output, consumerDispatchConcurrency: 2)
5350
{
5451
}
5552

5653
[Fact]
5754
public async Task TestBasicRoundtripConcurrent()
5855
{
59-
await ValidateConsumerDispatchConcurrency();
60-
6156
AddCallbackExceptionHandlers();
6257
_channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output);
6358

@@ -151,8 +146,6 @@ public async Task TestBasicRoundtripConcurrent()
151146
[Fact]
152147
public async Task TestBasicRoundtripConcurrentManyMessages()
153148
{
154-
await ValidateConsumerDispatchConcurrency();
155-
156149
AddCallbackExceptionHandlers();
157150
_channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output);
158151

@@ -330,8 +323,6 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
330323
[Fact]
331324
public async Task TestBasicRejectAsync()
332325
{
333-
await ValidateConsumerDispatchConcurrency();
334-
335326
string queueName = GenerateQueueName();
336327

337328
var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -430,8 +421,6 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
430421
[Fact]
431422
public async Task TestBasicAckAsync()
432423
{
433-
await ValidateConsumerDispatchConcurrency();
434-
435424
string queueName = GenerateQueueName();
436425

437426
const int messageCount = 1024;
@@ -499,8 +488,6 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
499488
[Fact]
500489
public async Task TestBasicNackAsync()
501490
{
502-
await ValidateConsumerDispatchConcurrency();
503-
504491
var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
505492

506493
_conn.ConnectionShutdown += (o, ea) =>
@@ -574,8 +561,6 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
574561
[Fact]
575562
public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
576563
{
577-
await ValidateConsumerDispatchConcurrency();
578-
579564
AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0);
580565
var tasks = new List<Task>();
581566
for (int i = 0; i < 256; i++)
@@ -596,8 +581,6 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
596581
[Fact]
597582
public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
598583
{
599-
await ValidateConsumerDispatchConcurrency();
600-
601584
string exchangeName = GenerateExchangeName();
602585
string queue1Name = GenerateQueueName();
603586
string queue2Name = GenerateQueueName();
@@ -667,8 +650,6 @@ await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
667650
[Fact]
668651
public async Task TestCloseWithinEventHandler_GH1567()
669652
{
670-
await ValidateConsumerDispatchConcurrency();
671-
672653
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
673654

674655
QueueDeclareOk q = await _channel.QueueDeclareAsync();
@@ -698,20 +679,6 @@ await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
698679
Assert.True(await tcs.Task);
699680
}
700681

701-
private async Task ValidateConsumerDispatchConcurrency()
702-
{
703-
ushort expectedConsumerDispatchConcurrency = (ushort)S_Random.Next(3, 10);
704-
AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel;
705-
Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
706-
Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
707-
using (IChannel ch = await _conn.CreateChannelAsync(
708-
consumerDispatchConcurrency: expectedConsumerDispatchConcurrency))
709-
{
710-
AutorecoveringChannel ach = (AutorecoveringChannel)ch;
711-
Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency);
712-
}
713-
}
714-
715682
private static void SetException(Exception ex, params TaskCompletionSource<bool>[] tcsAry)
716683
{
717684
foreach (TaskCompletionSource<bool> tcs in tcsAry)

projects/Test/Integration/TestAsyncEventingBasicConsumer.cs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,13 @@
3434
using System.Threading.Tasks;
3535
using RabbitMQ.Client;
3636
using RabbitMQ.Client.Events;
37-
using RabbitMQ.Client.Impl;
3837
using Xunit;
3938
using Xunit.Abstractions;
4039

4140
namespace Test.Integration
4241
{
4342
public class TestAsyncEventingBasicConsumer : IntegrationFixture
4443
{
45-
private const ushort ConsumerDispatchConcurrency = 2;
46-
4744
private readonly CancellationTokenSource _cts = new CancellationTokenSource(ShortSpan);
4845
private readonly CancellationTokenRegistration _ctr;
4946
private readonly TaskCompletionSource<bool> _onCallbackExceptionTcs =
@@ -52,7 +49,7 @@ public class TestAsyncEventingBasicConsumer : IntegrationFixture
5249
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
5350

5451
public TestAsyncEventingBasicConsumer(ITestOutputHelper output)
55-
: base(output, consumerDispatchConcurrency: ConsumerDispatchConcurrency)
52+
: base(output, consumerDispatchConcurrency: 2)
5653
{
5754
_ctr = _cts.Token.Register(OnTokenCanceled);
5855
}
@@ -84,10 +81,6 @@ private Task AsyncConsumerOnReceived(object sender, BasicDeliverEventArgs @event
8481
[Fact]
8582
public async Task TestAsyncEventingBasicConsumer_GH1038()
8683
{
87-
AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel;
88-
Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
89-
Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
90-
9184
string exchangeName = GenerateExchangeName();
9285
string queueName = GenerateQueueName();
9386
string routingKey = string.Empty;

0 commit comments

Comments
 (0)