Skip to content

Commit 14d0107

Browse files
Merge pull request #1084 from rabbitmq/close-async-if-in-nio-loop-thread
Close asynchronously if called in NIO loop thread
2 parents b70bfc5 + 8d853c8 commit 14d0107

File tree

4 files changed

+75
-2
lines changed

4 files changed

+75
-2
lines changed

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
5858
private final ScheduledExecutorService heartbeatExecutor;
5959
private final ExecutorService shutdownExecutor;
6060
private Thread mainLoopThread;
61+
private final AtomicBoolean ioLoopThreadSet = new AtomicBoolean(false);
62+
private volatile Thread ioLoopThread;
6163
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
6264
private String id;
6365

@@ -504,6 +506,7 @@ public void startMainLoop() {
504506
MainLoop loop = new MainLoop();
505507
final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
506508
mainLoopThread = Environment.newThread(threadFactory, loop, name);
509+
ioLoopThread(mainLoopThread);
507510
mainLoopThread.start();
508511
}
509512

@@ -1104,7 +1107,7 @@ public void close(int closeCode,
11041107
boolean abort)
11051108
throws IOException
11061109
{
1107-
boolean sync = !(Thread.currentThread() == mainLoopThread);
1110+
boolean sync = !(Thread.currentThread() == ioLoopThread);
11081111

11091112
try {
11101113
AMQP.Connection.Close reason =
@@ -1195,6 +1198,12 @@ public void setId(String id) {
11951198
this.id = id;
11961199
}
11971200

1201+
public void ioLoopThread(Thread thread) {
1202+
if (this.ioLoopThreadSet.compareAndSet(false, true)) {
1203+
this.ioLoopThread = thread;
1204+
}
1205+
}
1206+
11981207
public int getChannelRpcTimeout() {
11991208
return channelRpcTimeout;
12001209
}

src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java

+1
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ public void run() {
157157

158158
if (frame != null) {
159159
try {
160+
state.getConnection().ioLoopThread(Thread.currentThread());
160161
boolean noProblem = state.getConnection().handleReadFrame(frame);
161162
if (noProblem && (!state.getConnection().isRunning() || state.getConnection().hasBrokerInitiatedShutdown())) {
162163
// looks like the frame was Close-Ok or Close
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client.test;
17+
18+
import static com.rabbitmq.client.test.TestUtils.LatchConditions.completed;
19+
import static com.rabbitmq.client.test.TestUtils.waitAtMost;
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
22+
import com.rabbitmq.client.Channel;
23+
import com.rabbitmq.client.Connection;
24+
import com.rabbitmq.client.ConnectionFactory;
25+
import java.util.concurrent.CountDownLatch;
26+
import org.junit.jupiter.params.ParameterizedTest;
27+
import org.junit.jupiter.params.provider.ValueSource;
28+
29+
public class BlockedConnectionTest extends BrokerTestCase {
30+
31+
@ParameterizedTest
32+
@ValueSource(booleans = {true, false})
33+
void errorInBlockListenerShouldCloseConnection(boolean nio) throws Exception {
34+
ConnectionFactory cf = TestUtils.connectionFactory();
35+
if (nio) {
36+
cf.useNio();
37+
} else {
38+
cf.useBlockingIo();
39+
}
40+
Connection c = cf.newConnection();
41+
CountDownLatch shutdownLatch = new CountDownLatch(1);
42+
c.addShutdownListener(cause -> shutdownLatch.countDown());
43+
CountDownLatch blockedLatch = new CountDownLatch(1);
44+
c.addBlockedListener(
45+
reason -> {
46+
blockedLatch.countDown();
47+
throw new RuntimeException("error in blocked listener!");
48+
},
49+
() -> {});
50+
try {
51+
block();
52+
Channel ch = c.createChannel();
53+
ch.basicPublish("", "", null, "dummy".getBytes());
54+
assertThat(blockedLatch).is(completed());
55+
} finally {
56+
unblock();
57+
}
58+
assertThat(shutdownLatch).is(completed());
59+
waitAtMost(() -> !c.isOpen());
60+
}
61+
62+
}

src/test/java/com/rabbitmq/client/test/ClientTestSuite.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@
7373
OAuth2ClientCredentialsGrantCredentialsProviderTest.class,
7474
RefreshCredentialsTest.class,
7575
AMQConnectionRefreshCredentialsTest.class,
76-
ValueWriterTest.class
76+
ValueWriterTest.class,
77+
BlockedConnectionTest.class
7778
})
7879
public class ClientTestSuite {
7980

0 commit comments

Comments
 (0)