Skip to content

Commit e71e5d4

Browse files
authored
Merge pull request #338 from rabbitmq/rabbitmq-dotnet-client-337
Dispose of sockets that failed to connect
2 parents 6a1bf83 + d79f406 commit e71e5d4

File tree

5 files changed

+54
-127
lines changed

5 files changed

+54
-127
lines changed

projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
<PackageReference Include="System.Net.Sockets" Version="4.3.0" />
6262
<PackageReference Include="System.Reflection.Extensions" Version="4.3.0" />
6363
<PackageReference Include="System.Reflection.TypeExtensions" Version="4.3.0" />
64+
<PackageReference Include="System.Runtime" Version="4.3.0" />
6465
<PackageReference Include="System.Runtime.Extensions" Version="4.3.0" />
6566
<PackageReference Include="System.Text.RegularExpressions" Version="4.3.0" />
6667
<PackageReference Include="System.Threading" Version="4.3.0" />

projects/client/RabbitMQ.Client/src/client/api/ITcpClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace RabbitMQ.Client
1010
/// Wrapper interface for standard TCP-client. Provides socket for socket frame handler class.
1111
/// </summary>
1212
/// <remarks>Contains all methods that are currenty in use in rabbitmq client.</remarks>
13-
public interface ITcpClient
13+
public interface ITcpClient : IDisposable
1414
{
1515
bool Connected { get; }
1616

projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -84,29 +84,20 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
8484
int connectionTimeout, int readTimeout, int writeTimeout)
8585
{
8686
Endpoint = endpoint;
87-
m_socket = null;
88-
if (Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork)
87+
88+
if (ShouldTryIPv6(endpoint))
8989
{
90-
try
91-
{
92-
m_socket = socketFactory(AddressFamily.InterNetworkV6);
93-
Connect(m_socket, endpoint, connectionTimeout);
94-
}
95-
catch (ConnectFailureException) // could not connect using IPv6
96-
{
97-
m_socket = null;
98-
}
99-
// Mono might raise a SocketException when using IPv4 addresses on
100-
// an OS that supports IPv6
101-
catch (SocketException)
90+
try {
91+
m_socket = ConnectUsingIPv6(endpoint, socketFactory, connectionTimeout);
92+
} catch (ConnectFailureException)
10293
{
10394
m_socket = null;
10495
}
10596
}
97+
10698
if (m_socket == null && endpoint.AddressFamily != AddressFamily.InterNetworkV6)
10799
{
108-
m_socket = socketFactory(AddressFamily.InterNetwork);
109-
Connect(m_socket, endpoint, connectionTimeout);
100+
m_socket = ConnectUsingIPv4(endpoint, socketFactory, connectionTimeout);
110101
}
111102

112103
Stream netstream = m_socket.GetStream();
@@ -164,12 +155,10 @@ public int ReadTimeout
164155
m_socket.ReceiveTimeout = value;
165156
}
166157
}
167-
#pragma warning disable 0168
168-
catch (SocketException _)
158+
catch (SocketException)
169159
{
170160
// means that the socket is already closed
171161
}
172-
#pragma warning restore 0168
173162
}
174163
}
175164

@@ -190,13 +179,6 @@ public void Close()
190179
{
191180
try
192181
{
193-
try
194-
{
195-
196-
} catch (ArgumentException)
197-
{
198-
// ignore, we are closing anyway
199-
};
200182
m_socket.Close();
201183
}
202184
catch (Exception)
@@ -273,14 +255,48 @@ public void Flush()
273255
}
274256
}
275257

276-
private void Connect(ITcpClient socket, AmqpTcpEndpoint endpoint, int timeout)
258+
private bool ShouldTryIPv6(AmqpTcpEndpoint endpoint)
259+
{
260+
return (Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork);
261+
}
262+
263+
private ITcpClient ConnectUsingIPv6(AmqpTcpEndpoint endpoint,
264+
Func<AddressFamily, ITcpClient> socketFactory,
265+
int timeout)
266+
{
267+
return ConnectUsingAddressFamily(endpoint, socketFactory, timeout, AddressFamily.InterNetworkV6);
268+
}
269+
270+
private ITcpClient ConnectUsingIPv4(AmqpTcpEndpoint endpoint,
271+
Func<AddressFamily, ITcpClient> socketFactory,
272+
int timeout)
273+
{
274+
return ConnectUsingAddressFamily(endpoint, socketFactory, timeout, AddressFamily.InterNetwork);
275+
}
276+
277+
private ITcpClient ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint,
278+
Func<AddressFamily, ITcpClient> socketFactory,
279+
int timeout, AddressFamily family)
280+
{
281+
ITcpClient socket = socketFactory(family);
282+
try {
283+
ConnectOrFail(socket, endpoint, timeout);
284+
return socket;
285+
} catch (ConnectFailureException e) {
286+
socket.Dispose();
287+
throw e;
288+
}
289+
}
290+
291+
private void ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, int timeout)
277292
{
278293
try
279294
{
280295
socket.ConnectAsync(endpoint.HostName, endpoint.Port)
281296
.TimeoutAfter(timeout)
282297
.ConfigureAwait(false)
283-
.GetAwaiter()//this ensures exceptions aren't wrapped in an AggregateException
298+
// this ensures exceptions aren't wrapped in an AggregateException
299+
.GetAwaiter()
284300
.GetResult();
285301
}
286302
catch (ArgumentException e)

projects/client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,15 @@ public virtual async Task ConnectAsync(string host, int port)
4141

4242
public virtual void Close()
4343
{
44-
if(sock != null)
44+
this.Dispose();
45+
}
46+
47+
public virtual void Dispose()
48+
{
49+
if (sock != null)
50+
{
4551
sock.Dispose();
52+
}
4653
sock = null;
4754
}
4855

projects/client/Unit/src/unit/TestConnectionChurnHandleLeak.cs

Lines changed: 0 additions & 97 deletions
This file was deleted.

0 commit comments

Comments
 (0)