Skip to content

Commit 0e30349

Browse files
garyrussellartembilan
authored andcommitted
Protect TcpNetConnectionSupport calls
5.0 introduced the `TcpNetConnectionSupport` user hook to create connection objects. If a user-supplied instance threw an exception, the server socket would remain open but the server `accept()` thread is gone. - wrap the connection initialization code in try/catch - close the server socket if necessary on an exception
1 parent d5303ca commit 0e30349

File tree

3 files changed

+114
-12
lines changed

3 files changed

+114
-12
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetServerConnectionFactory.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -140,22 +140,31 @@ public void run() {
140140
if (isShuttingDown()) {
141141
if (logger.isInfoEnabled()) {
142142
logger.info("New connection from " + socket.getInetAddress().getHostAddress()
143+
+ ":" + socket.getPort()
143144
+ " rejected; the server is in the process of shutting down.");
144145
}
145146
socket.close();
146147
}
147148
else {
148149
if (logger.isDebugEnabled()) {
149-
logger.debug("Accepted connection from " + socket.getInetAddress().getHostAddress());
150+
logger.debug("Accepted connection from " + socket.getInetAddress().getHostAddress()
151+
+ ":" + socket.getPort());
152+
}
153+
try {
154+
setSocketAttributes(socket);
155+
TcpConnectionSupport connection = this.tcpNetConnectionSupport.createNewConnection(socket, true,
156+
isLookupHost(), getApplicationEventPublisher(), getComponentName());
157+
connection = wrapConnection(connection);
158+
initializeConnection(connection, socket);
159+
getTaskExecutor().execute(connection);
160+
harvestClosedConnections();
161+
connection.publishConnectionOpenEvent();
162+
}
163+
catch (Exception e) {
164+
this.logger.error("Failed to create and configure a TcpConnection for the new socket: "
165+
+ socket.getInetAddress().getHostAddress() + ":" + socket.getPort(), e);
166+
socket.close();
150167
}
151-
setSocketAttributes(socket);
152-
TcpConnectionSupport connection = this.tcpNetConnectionSupport.createNewConnection(socket, true,
153-
isLookupHost(), getApplicationEventPublisher(), getComponentName());
154-
connection = wrapConnection(connection);
155-
initializeConnection(connection, socket);
156-
getTaskExecutor().execute(connection);
157-
harvestClosedConnections();
158-
connection.publishConnectionOpenEvent();
159168
}
160169
}
161170
}
@@ -167,6 +176,12 @@ public void run() {
167176
else if (isActive()) {
168177
logger.error("Error on ServerSocket; port = " + getPort(), e);
169178
publishServerExceptionEvent(e);
179+
try {
180+
this.serverSocket.close();
181+
}
182+
catch (IOException e1) {
183+
// empty
184+
}
170185
}
171186
}
172187
finally {

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioServerConnectionFactory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -203,6 +203,7 @@ protected void doAccept(final Selector selector, ServerSocketChannel server, lon
203203
if (isShuttingDown()) {
204204
if (logger.isInfoEnabled()) {
205205
logger.info("New connection from " + channel.socket().getInetAddress().getHostAddress()
206+
+ ":" + channel.socket().getPort()
206207
+ " rejected; the server is in the process of shutting down.");
207208
}
208209
channel.close();
@@ -226,7 +227,9 @@ protected void doAccept(final Selector selector, ServerSocketChannel server, lon
226227
connection.publishConnectionOpenEvent();
227228
}
228229
catch (Exception e) {
229-
logger.error("Exception accepting new connection", e);
230+
logger.error("Exception accepting new connection from "
231+
+ channel.socket().getInetAddress().getHostAddress()
232+
+ ":" + channel.socket().getPort(), e);
230233
channel.close();
231234
}
232235
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.ip.tcp.connection;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.net.Socket;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
27+
import javax.net.SocketFactory;
28+
29+
import org.junit.Test;
30+
31+
import org.springframework.context.ApplicationEventPublisher;
32+
import org.springframework.messaging.Message;
33+
34+
/**
35+
* @author Gary Russell
36+
* @since 5.0.3
37+
*
38+
*/
39+
public class TcpNetConnectionSupportTests {
40+
41+
@Test
42+
public void testBadCode() throws Exception {
43+
TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(0);
44+
AtomicReference<Message<?>> message = new AtomicReference<>();
45+
CountDownLatch latch1 = new CountDownLatch(1);
46+
server.registerListener(m -> {
47+
message.set(m);
48+
latch1.countDown();
49+
return false;
50+
});
51+
AtomicBoolean firstTime = new AtomicBoolean(true);
52+
server.setTcpNetConnectionSupport(new DefaultTcpNetConnectionSupport() {
53+
54+
@Override
55+
public TcpNetConnection createNewConnection(Socket socket, boolean server, boolean lookupHost,
56+
ApplicationEventPublisher applicationEventPublisher, String connectionFactoryName)
57+
throws Exception {
58+
if (firstTime.getAndSet(false)) {
59+
throw new RuntimeException("intended");
60+
}
61+
return super.createNewConnection(socket, server, lookupHost, applicationEventPublisher, connectionFactoryName);
62+
}
63+
64+
});
65+
CountDownLatch latch2 = new CountDownLatch(1);
66+
server.setApplicationEventPublisher(e -> {
67+
if (e instanceof TcpConnectionServerListeningEvent) {
68+
latch2.countDown();
69+
}
70+
});
71+
server.afterPropertiesSet();
72+
server.start();
73+
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
74+
Socket socket = SocketFactory.getDefault().createSocket("localhost", server.getPort());
75+
socket.close();
76+
socket = SocketFactory.getDefault().createSocket("localhost", server.getPort());
77+
socket.getOutputStream().write("foo\r\n".getBytes());
78+
socket.close();
79+
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
80+
assertThat(message.get()).isNotNull();
81+
server.stop();
82+
}
83+
84+
}

0 commit comments

Comments
 (0)