Skip to content

Commit 5dac195

Browse files
garyrussellartembilan
authored andcommitted
GH-2471: MQTT: Fix Thread Leak
Fixes #2471 Call `close()` on the client whenever the connection is lost or can't be established, to release resources in the client. **cherry-pick to 5.0.x, 4.3.x** (cherry picked from commit f9cea64)
1 parent 76bbbdd commit 5dac195

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -262,6 +262,14 @@ private synchronized void connectAndSubscribe() throws MqttException {
262262
}
263263
logger.error("Error connecting or subscribing to " + Arrays.toString(topics), e);
264264
this.client.disconnectForcibly(this.completionTimeout);
265+
try {
266+
this.client.setCallback(null);
267+
this.client.close();
268+
}
269+
catch (MqttException e1) {
270+
// NOSONAR
271+
}
272+
this.client = null;
265273
throw e;
266274
}
267275
finally {
@@ -316,6 +324,14 @@ public synchronized void connectionLost(Throwable cause) {
316324
if (isRunning()) {
317325
this.logger.error("Lost connection: " + cause.getMessage() + "; retrying...");
318326
this.connected = false;
327+
try {
328+
this.client.setCallback(null);
329+
this.client.close();
330+
}
331+
catch (MqttException e) {
332+
// NOSONAR
333+
}
334+
this.client = null;
319335
scheduleReconnect();
320336
if (this.applicationEventPublisher != null) {
321337
this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.mockito.BDDMockito.given;
3232
import static org.mockito.BDDMockito.willAnswer;
3333
import static org.mockito.BDDMockito.willReturn;
34+
import static org.mockito.BDDMockito.willThrow;
3435
import static org.mockito.Mockito.atLeastOnce;
3536
import static org.mockito.Mockito.mock;
3637
import static org.mockito.Mockito.never;
@@ -111,6 +112,16 @@ public class MqttAdapterTests {
111112
this.alwaysComplete = (IMqttToken) pfb.getObject();
112113
}
113114

115+
@Test
116+
public void testCloseOnBadConnect() throws Exception {
117+
final IMqttClient client = mock(IMqttClient.class);
118+
willThrow(new MqttException(0)).given(client).connect(any());
119+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
120+
adapter.start();
121+
verify(client).close();
122+
adapter.stop();
123+
}
124+
114125
@Test
115126
public void testOutboundOptionsApplied() throws Exception {
116127
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
@@ -383,6 +394,7 @@ public void testReconnect() throws Exception {
383394
adapter.setTaskScheduler(taskScheduler);
384395
adapter.start();
385396
adapter.connectionLost(new RuntimeException("initial"));
397+
verify(client).close();
386398
Thread.sleep(1000);
387399
// the following assertion should be equalTo, but leq to protect against a slow CI server
388400
assertThat(attemptingReconnectCount.get(), lessThanOrEqualTo(2));

0 commit comments

Comments
 (0)