Skip to content

Commit d9186f1

Browse files
garyrussellartembilan
authored andcommitted
INT-4465: Fix delay in close propagation with NIO
JIRA: https://jira.spring.io/browse/INT-4465 There is a one second delay before a socket close is propagated if there is an active assembler. This is generally only a problem with deserializers that use EOF to signal message end (such as the `ByteArrayElasticRawDeserializer`). Attempt to insert an EOF marker into the buffer queue so that the `getNextBuffer()` will exit immediately on `close()` if it is blocked awaiting a buffer. **cherry-pick to 5.0.x, 4.3.x**
1 parent 081d0d1 commit d9186f1

File tree

2 files changed

+44
-2
lines changed

2 files changed

+44
-2
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -58,6 +58,8 @@ public class TcpNioConnection extends TcpConnectionSupport {
5858

5959
private static final long DEFAULT_PIPE_TIMEOUT = 60000;
6060

61+
private static final byte[] EOF = new byte[0]; // EOF marker buffer
62+
6163
private final SocketChannel socketChannel;
6264

6365
private final ChannelOutputStream channelOutputStream;
@@ -714,7 +716,7 @@ private byte[] getNextBuffer() throws IOException {
714716
while (buffer == null) {
715717
try {
716718
buffer = this.buffers.poll(1, TimeUnit.SECONDS);
717-
if (buffer == null && this.isClosed) {
719+
if (buffer == EOF || (buffer == null && this.isClosed)) {
718720
return null;
719721
}
720722
}
@@ -758,6 +760,12 @@ public void write(ByteBuffer byteBuffer) throws IOException {
758760
public void close() throws IOException {
759761
super.close();
760762
this.isClosed = true;
763+
try {
764+
this.buffers.offer(EOF, TcpNioConnection.this.pipeTimeout, TimeUnit.SECONDS);
765+
}
766+
catch (InterruptedException e) {
767+
Thread.currentThread().interrupt();
768+
}
761769
}
762770

763771
@Override

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.ip.tcp.connection;
1818

1919
import static org.hamcrest.Matchers.containsString;
20+
import static org.hamcrest.Matchers.lessThan;
2021
import static org.hamcrest.Matchers.not;
2122
import static org.junit.Assert.assertEquals;
2223
import static org.junit.Assert.assertNotNull;
@@ -64,6 +65,7 @@
6465

6566
import org.apache.commons.logging.Log;
6667
import org.apache.commons.logging.LogFactory;
68+
import org.junit.Ignore;
6769
import org.junit.Rule;
6870
import org.junit.Test;
6971
import org.junit.rules.TestName;
@@ -90,6 +92,7 @@
9092
import org.springframework.messaging.support.ErrorMessage;
9193
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
9294
import org.springframework.util.ReflectionUtils;
95+
import org.springframework.util.StopWatch;
9396

9497

9598
/**
@@ -130,6 +133,7 @@ public void testWriteTimeout() throws Exception {
130133
Socket s = server.accept();
131134
// block so we fill the buffer
132135
done.await(10, TimeUnit.SECONDS);
136+
s.close();
133137
}
134138
catch (Exception e) {
135139
e.printStackTrace();
@@ -806,6 +810,36 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
806810
te.shutdown();
807811
}
808812

813+
@Test
814+
@Ignore // Timing is too short for CI/Travis
815+
public void testNoDelayOnClose() throws Exception {
816+
TcpNioServerConnectionFactory cf = new TcpNioServerConnectionFactory(0);
817+
final CountDownLatch reading = new CountDownLatch(1);
818+
final StopWatch watch = new StopWatch();
819+
cf.setDeserializer(is -> {
820+
reading.countDown();
821+
watch.start();
822+
is.read();
823+
is.read();
824+
watch.stop();
825+
return null;
826+
});
827+
cf.registerListener(m -> false);
828+
final CountDownLatch listening = new CountDownLatch(1);
829+
cf.setApplicationEventPublisher(e -> {
830+
listening.countDown();
831+
});
832+
cf.afterPropertiesSet();
833+
cf.start();
834+
assertTrue(listening.await(10, TimeUnit.SECONDS));
835+
Socket socket = SocketFactory.getDefault().createSocket("localhost", cf.getPort());
836+
socket.getOutputStream().write("x".getBytes());
837+
assertTrue(reading.await(10, TimeUnit.SECONDS));
838+
socket.close();
839+
cf.stop();
840+
assertThat(watch.getLastTaskTimeMillis(), lessThan(950L));
841+
}
842+
809843
private void readFully(InputStream is, byte[] buff) throws IOException {
810844
for (int i = 0; i < buff.length; i++) {
811845
buff[i] = (byte) is.read();

0 commit comments

Comments
 (0)