Skip to content

Commit 902a778

Browse files
authored
Improve socket handling (#414)
- Improve socket handling by using the native API `socket.Connected` instead of a local variable to check if the connection is still open or not - Handle the `close` workflow in a better way - Add ``` reader = PipeReader.Create(stream, new StreamPipeReaderOptions(leaveOpen: true)); writer = PipeWriter.Create(stream, new StreamPipeWriterOptions(leaveOpen: true)); ``` - Add a `try catch` block to the message handler; in case it fails for any reason, it will be logged. - Remove one test. There was a bug, and it was not deterministic. (Many other tests cover the same functions )
1 parent 166751d commit 902a778

File tree

11 files changed

+187
-218
lines changed

11 files changed

+187
-218
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,6 @@ public int Write(Span<byte> span)
112112

113113
public class Client : IClient
114114
{
115-
private bool isClosed = true;
116-
117115
private uint correlationId = 0; // allow for some pre-amble
118116

119117
private Connection _connection;
@@ -160,24 +158,13 @@ private Client(ClientParameters parameters, ILogger logger = null)
160158
SendHeartBeat,
161159
Close,
162160
(int)parameters.Heartbeat.TotalSeconds);
163-
IsClosed = false;
164161
_logger = logger ?? NullLogger.Instance;
165162
ClientId = Guid.NewGuid().ToString();
166163
}
167164

168165
public bool IsClosed
169166
{
170-
get
171-
{
172-
if (_connection.IsClosed)
173-
{
174-
isClosed = true;
175-
}
176-
177-
return isClosed;
178-
}
179-
180-
private set => isClosed = value;
167+
get { return _connection.IsClosed; }
181168
}
182169

183170
private void StartHeartBeat()
@@ -758,7 +745,7 @@ private async ValueTask<bool> SendHeartBeat()
758745
private void InternalClose()
759746
{
760747
_heartBeatHandler.Close();
761-
IsClosed = true;
748+
_connection.Close();
762749
}
763750

764751
private async ValueTask<bool> ConsumerUpdateResponse(uint rCorrelationId, IOffsetType offsetSpecification)
@@ -773,13 +760,13 @@ private async Task<CloseResponse> Close(string reason, string closedStatus)
773760
return new CloseResponse(0, ResponseCode.Ok);
774761
}
775762

776-
InternalClose();
777763
try
778764
{
779-
_connection.UpdateCloseStatus(closedStatus);
780765
var result =
781766
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
782767
TimeSpan.FromSeconds(10)).ConfigureAwait(false);
768+
InternalClose();
769+
_connection.UpdateCloseStatus(closedStatus);
783770

784771
return result;
785772
}

RabbitMQ.Stream.Client/Connection.cs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ internal static class ConnectionClosedReason
1919
public const string Normal = "TCP connection closed normal";
2020
public const string Unexpected = "TCP connection closed unexpected";
2121
public const string TooManyHeartbeatsMissing = "TCP connection closed by too many heartbeats missing";
22-
2322
}
2423

2524
public class Connection : IDisposable
@@ -32,7 +31,6 @@ public class Connection : IDisposable
3231
private readonly Func<string, Task> closedCallback;
3332
private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
3433
private int numFrames;
35-
private bool isClosed = false;
3634
private string _closedReason = ConnectionClosedReason.Unexpected;
3735
private bool _disposedValue;
3836
private readonly ILogger _logger;
@@ -43,7 +41,8 @@ public class Connection : IDisposable
4341

4442
internal int NumFrames => numFrames;
4543
internal string ClientId { get; set; }
46-
public bool IsClosed => isClosed;
44+
public bool IsClosed => !socket.Connected;
45+
4746
public void UpdateCloseStatus(string reason)
4847
{
4948
_closedReason = reason;
@@ -59,17 +58,31 @@ private Connection(Socket socket, Func<Memory<byte>, Task> callback,
5958
{
6059
_logger = logger;
6160
this.socket = socket;
61+
6262
commandCallback = callback;
6363
closedCallback = closedCallBack;
6464
var networkStream = new NetworkStream(socket);
6565
var stream = MaybeTcpUpgrade(networkStream, sslOption);
66-
writer = PipeWriter.Create(stream);
67-
reader = PipeReader.Create(stream);
66+
reader = PipeReader.Create(stream, new StreamPipeReaderOptions(leaveOpen: true));
67+
writer = PipeWriter.Create(stream, new StreamPipeWriterOptions(leaveOpen: true));
68+
6869
// ProcessIncomingFrames is dropped as soon as the connection is closed
6970
// no need to stop it manually when the connection is closed
7071
_incomingFramesTask = Task.Run(ProcessIncomingFrames);
7172
}
7273

74+
internal void Close()
75+
{
76+
if (!_cancelTokenSource.IsCancellationRequested)
77+
{
78+
_cancelTokenSource.Cancel();
79+
}
80+
81+
writer.Complete();
82+
reader.Complete();
83+
socket.Close();
84+
}
85+
7386
public static async Task<Connection> Create(EndPoint endpoint, Func<Memory<byte>, Task> commandCallback,
7487
Func<string, Task> closedCallBack, SslOption sslOption, ILogger logger)
7588
{
@@ -120,7 +133,7 @@ private async Task WriteCommand<T>(T command) where T : struct, ICommand
120133
throw new OperationCanceledException("Token Cancellation Requested Connection");
121134
}
122135

123-
if (isClosed)
136+
if (!socket.Connected)
124137
{
125138
throw new InvalidOperationException("Connection is closed");
126139
}
@@ -148,7 +161,7 @@ private async Task ProcessIncomingFrames()
148161
Exception caught = null;
149162
try
150163
{
151-
while (!isClosed)
164+
while (socket.Connected)
152165
{
153166
if (!reader.TryRead(out var result))
154167
{
@@ -158,14 +171,13 @@ private async Task ProcessIncomingFrames()
158171
var buffer = result.Buffer;
159172
if (buffer.Length == 0)
160173
{
161-
Debug.WriteLine("TCP Connection Closed!");
174+
_logger?.LogDebug("TCP Connection Closed!");
162175
// We're not going to receive any more bytes from the connection.
163176
break;
164177
}
165178

166179
// Let's try to read some frames!
167-
168-
while (TryReadFrame(ref buffer, out var frame) && !isClosed)
180+
while (TryReadFrame(ref buffer, out var frame) && socket.Connected)
169181
{
170182
// Let's rent some memory to copy the frame from the network stream. This memory will be reclaimed once the frame has been handled.
171183

@@ -196,20 +208,28 @@ private async Task ProcessIncomingFrames()
196208
// closedCallback event.
197209
// It is useful to trace the error, but at this point
198210
// the socket is closed maybe not in the correct way
199-
if (!isClosed)
211+
if (socket.Connected)
200212
{
201213
_logger?.LogError(e, "Error reading the socket");
202214
}
203215
}
204216
finally
205217
{
206-
isClosed = true;
207218
_logger?.LogDebug(
208219
"TCP Connection Closed ClientId: {ClientId}, Reason {Reason}. IsCancellationRequested {Token} ",
209220
ClientId, _closedReason, Token.IsCancellationRequested);
210221
// Mark the PipeReader as complete
211222
await reader.CompleteAsync(caught).ConfigureAwait(false);
212-
closedCallback?.Invoke(_closedReason)!.ConfigureAwait(false);
223+
if (closedCallback != null)
224+
{
225+
// that's mandatory for the ReliableProducer / ReliableConsumer
226+
// to deal with the connection closed. Null callback won't raise the event
227+
await closedCallback(_closedReason).ConfigureAwait(false);
228+
}
229+
else
230+
{
231+
_logger?.LogWarning("Connection: Closed callback is null. ClientId: {ClientId}", ClientId);
232+
}
213233
}
214234
}
215235

@@ -242,15 +262,6 @@ public void Dispose()
242262
{
243263
try
244264
{
245-
if (!_cancelTokenSource.IsCancellationRequested)
246-
{
247-
_cancelTokenSource.Cancel();
248-
}
249-
250-
isClosed = true;
251-
writer.Complete();
252-
reader.Complete();
253-
socket.Close();
254265
if (!_incomingFramesTask.Wait(Consts.MidWait))
255266
{
256267
_logger?.LogWarning("ProcessIncomingFrames reader task did not exit in {MidWait}",

RabbitMQ.Stream.Client/Consts.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,10 @@ internal static int RandomMid()
3535
{
3636
return Random.Shared.Next(1000, 2500);
3737
}
38+
39+
internal static int RandomLarge()
40+
{
41+
return Random.Shared.Next(1500, 3000);
42+
}
3843
}
3944
}

RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ private async Task<IConsumer> StandardConsumer(bool boot)
4747
// before creating a new consumer, the old one is disposed
4848
// This is just a safety check, the consumer should be already disposed
4949
_consumer?.Dispose();
50-
5150
return await _consumerConfig.StreamSystem.CreateRawConsumer(new RawConsumerConfig(_consumerConfig.Stream)
5251
{
5352
ClientProvidedName = _consumerConfig.ClientProvidedName,
@@ -72,7 +71,7 @@ await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
7271
catch (Exception e)
7372
{
7473
BaseLogger?.LogError(e,
75-
$"Stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}");
74+
$"Stream consumer.ConnectionClosedHandler error. Auto recovery failed for: {_consumerConfig.Stream}");
7675
}
7776
},
7877
MetadataHandler = async _ =>
@@ -88,18 +87,27 @@ await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
8887
catch (Exception e)
8988
{
9089
BaseLogger?.LogError(e,
91-
$"Stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}");
90+
$"Stream consumer.MetadataHandler error. Auto recovery failed for stream: {_consumerConfig.Stream}");
9291
}
9392
},
9493
MessageHandler = async (consumer, ctx, message) =>
9594
{
96-
if (_consumerConfig.MessageHandler != null)
95+
try
96+
{
97+
if (_consumerConfig.MessageHandler != null)
98+
{
99+
await _consumerConfig.MessageHandler(_consumerConfig.Stream, consumer, ctx, message)
100+
.ConfigureAwait(false);
101+
}
102+
103+
_consumedFirstTime = true;
104+
}
105+
catch (Exception e)
97106
{
98-
await _consumerConfig.MessageHandler(_consumerConfig.Stream, consumer, ctx, message)
99-
.ConfigureAwait(false);
107+
BaseLogger?.LogError("MessageHandler {Error} for stream {Stream} ", e.Message,
108+
_consumerConfig.Stream);
100109
}
101110

102-
_consumedFirstTime = true;
103111
_lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset;
104112
},
105113
}, BaseLogger).ConfigureAwait(false);
@@ -144,9 +152,9 @@ private async Task<IConsumer> SuperConsumer(bool boot)
144152
Identifier = _consumerConfig.Identifier,
145153
ConnectionClosedHandler = async (closeReason, partitionStream) =>
146154
{
147-
await RandomWait().ConfigureAwait(false);
148155
if (IsClosedNormally(closeReason))
149156
return;
157+
await RandomWait().ConfigureAwait(false);
150158
try
151159
{
152160
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
@@ -163,9 +171,9 @@ await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
163171
{
164172
try
165173
{
166-
await RandomWait().ConfigureAwait(false);
167174
if (IsClosedNormally())
168175
return;
176+
await RandomWait().ConfigureAwait(false);
169177

170178
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
171179
await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
@@ -180,14 +188,22 @@ await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
180188
},
181189
MessageHandler = async (partitionStream, consumer, ctx, message) =>
182190
{
183-
if (_consumerConfig.MessageHandler != null)
191+
try
184192
{
185-
await _consumerConfig.MessageHandler(partitionStream, consumer, ctx,
186-
message).ConfigureAwait(false);
193+
if (_consumerConfig.MessageHandler != null)
194+
{
195+
await _consumerConfig.MessageHandler(partitionStream, consumer, ctx,
196+
message).ConfigureAwait(false);
197+
}
198+
199+
_consumedFirstTime = true;
200+
_lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset;
201+
}
202+
catch (Exception e)
203+
{
204+
BaseLogger?.LogError("MessageHandler {Error} for stream {Stream} ", e.Message,
205+
_consumerConfig.Stream);
187206
}
188-
189-
_consumedFirstTime = true;
190-
_lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset;
191207
},
192208
}, BaseLogger).ConfigureAwait(false);
193209
}

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,7 @@ internal async Task OnEntityClosed(StreamSystem system, string stream,
414414
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
415415
UpdateStatus(ReliableEntityStatus.Reconnection, reason,
416416
[stream]);
417+
await Task.Delay(Consts.RandomLarge()).ConfigureAwait(false);
417418
try
418419
{
419420
var (localStreamExists, streamInfo) = await CheckIfStreamIsAvailable(stream, system)
@@ -446,6 +447,7 @@ internal async Task OnEntityClosed(StreamSystem system, string stream, ChangeSta
446447
var streamExists = false;
447448
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
448449
UpdateStatus(ReliableEntityStatus.Reconnection, reason, [stream]);
450+
await Task.Delay(Consts.RandomLarge()).ConfigureAwait(false);
449451
try
450452
{
451453
(streamExists, _) = await CheckIfStreamIsAvailable(stream, system)

Tests/RawConsumerSystemTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,6 @@ public async Task ConsumerQueryOffset()
526526
await Assert.ThrowsAsync<OffsetNotFoundException>(() =>
527527
system.QueryOffset("reference_does_not_exist", stream));
528528

529-
Assert.Null(await system.TryQueryOffset("reference_does_not_exist", stream));
530529
await Assert.ThrowsAsync<GenericProtocolException>(() =>
531530
(system.TryQueryOffset(Reference, "stream_does_not_exist")));
532531

0 commit comments

Comments
 (0)