Skip to content

Commit 81cfa80

Browse files
garyrussellartembilan
authored andcommitted
GH-2471: MQTT: Fix Thread Leak
Fixes #2471 Call `close()` on the client whenever the connection is lost or can't be established, to release resources in the client. **cherry-pick to 5.0.x, 4.3.x** (cherry picked from commit f9cea64) # Conflicts: # spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java
1 parent 37c92a1 commit 81cfa80

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -262,6 +262,14 @@ private synchronized void connectAndSubscribe() throws MqttException {
262262
}
263263
logger.error("Error connecting or subscribing to " + Arrays.toString(topics), e);
264264
this.client.disconnectForcibly(this.completionTimeout);
265+
try {
266+
this.client.setCallback(null);
267+
this.client.close();
268+
}
269+
catch (MqttException e1) {
270+
// NOSONAR
271+
}
272+
this.client = null;
265273
throw e;
266274
}
267275
finally {
@@ -321,6 +329,14 @@ public synchronized void connectionLost(Throwable cause) {
321329
if (isRunning()) {
322330
this.logger.error("Lost connection: " + cause.getMessage() + "; retrying...");
323331
this.connected = false;
332+
try {
333+
this.client.setCallback(null);
334+
this.client.close();
335+
}
336+
catch (MqttException e) {
337+
// NOSONAR
338+
}
339+
this.client = null;
324340
scheduleReconnect();
325341
if (this.applicationEventPublisher != null) {
326342
this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.mockito.BDDMockito.given;
2727
import static org.mockito.BDDMockito.willAnswer;
2828
import static org.mockito.BDDMockito.willReturn;
29+
import static org.mockito.BDDMockito.willThrow;
2930
import static org.mockito.Matchers.any;
3031
import static org.mockito.Matchers.anyLong;
3132
import static org.mockito.Matchers.anyString;
@@ -98,6 +99,16 @@ public class MqttAdapterTests {
9899
this.alwaysComplete = (IMqttToken) pfb.getObject();
99100
}
100101

102+
@Test
103+
public void testCloseOnBadConnect() throws Exception {
104+
final IMqttClient client = mock(IMqttClient.class);
105+
willThrow(new MqttException(0)).given(client).connect(any());
106+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
107+
adapter.start();
108+
verify(client).close();
109+
adapter.stop();
110+
}
111+
101112
@Test
102113
public void testOutboundOptionsApplied() throws Exception {
103114
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
@@ -345,6 +356,7 @@ public void testReconnect() throws Exception {
345356
adapter.setTaskScheduler(taskScheduler);
346357
adapter.start();
347358
adapter.connectionLost(new RuntimeException("initial"));
359+
verify(client).close();
348360
Thread.sleep(1000);
349361
// the following assertion should be equalTo, but leq to protect against a slow CI server
350362
assertThat(attemptingReconnectCount.get(), lessThanOrEqualTo(2));

0 commit comments

Comments
 (0)