Skip to content

Commit 168a160

Browse files
garyrussellartembilan
authored andcommitted
INT-4463: Full access to MqttConnectOptions
JIRA: https://jira.spring.io/browse/INT-4463 Certain options, such as `maxInFlight` were not exposed. Deprecate the setters on the factory and allow the user to inject a pre-configured `MqttConnectOptions`, thus making all (and any new) properties available to be configured. # Conflicts: # spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java # Conflicts: # spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java # src/reference/asciidoc/mqtt.adoc
1 parent 238f87f commit 168a160

File tree

4 files changed

+116
-116
lines changed

4 files changed

+116
-116
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/DefaultMqttPahoClientFactory.java

Lines changed: 74 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,65 +41,97 @@
4141
*/
4242
public class DefaultMqttPahoClientFactory implements MqttPahoClientFactory {
4343

44-
private volatile Boolean cleanSession;
44+
private MqttConnectOptions options = new MqttConnectOptions();
4545

46-
private volatile Integer connectionTimeout;
46+
private MqttClientPersistence persistence;
4747

48-
private volatile Integer keepAliveInterval;
49-
50-
private volatile String password;
51-
52-
private volatile SocketFactory socketFactory;
53-
54-
private volatile Properties sslProperties;
55-
56-
private volatile String userName;
57-
58-
private volatile MqttClientPersistence persistence;
59-
60-
private volatile Will will;
61-
62-
private volatile String[] serverURIs;
63-
64-
private volatile ConsumerStopAction consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
48+
private ConsumerStopAction consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
6549

50+
/**
51+
* Set the cleanSession.
52+
* @param cleanSession the cleanSession to set.
53+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
54+
*/
55+
@Deprecated
6656
public void setCleanSession(Boolean cleanSession) {
67-
this.cleanSession = cleanSession;
57+
this.options.setCleanSession(cleanSession);
6858
}
6959

60+
/**
61+
* Set the connectionTimeout.
62+
* @param connectionTimeout the connectionTimeout to set.
63+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
64+
*/
65+
@Deprecated
7066
public void setConnectionTimeout(Integer connectionTimeout) {
71-
this.connectionTimeout = connectionTimeout;
67+
this.options.setConnectionTimeout(connectionTimeout);
7268
}
7369

70+
/**
71+
* Set the keepAliveInterval.
72+
* @param keepAliveInterval the keepAliveInterval to set.
73+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
74+
*/
75+
@Deprecated
7476
public void setKeepAliveInterval(Integer keepAliveInterval) {
75-
this.keepAliveInterval = keepAliveInterval;
77+
this.options.setKeepAliveInterval(keepAliveInterval);
7678
}
7779

80+
/**
81+
* Set the password.
82+
* @param password the password to set.
83+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
84+
*/
85+
@Deprecated
7886
public void setPassword(String password) {
79-
this.password = password;
87+
this.options.setPassword(password.toCharArray());
8088
}
8189

90+
/**
91+
* Set the socketFactory.
92+
* @param socketFactory the socketFactory to set.
93+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
94+
*/
95+
@Deprecated
8296
public void setSocketFactory(SocketFactory socketFactory) {
83-
this.socketFactory = socketFactory;
97+
this.options.setSocketFactory(socketFactory);
8498
}
8599

100+
/**
101+
* Set the sslProperties.
102+
* @param sslProperties the sslProperties to set.
103+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
104+
*/
105+
@Deprecated
86106
public void setSslProperties(Properties sslProperties) {
87-
this.sslProperties = sslProperties;
107+
this.options.setSSLProperties(sslProperties);
88108
}
89109

110+
/**
111+
* Set the userName.
112+
* @param userName the userName to set.
113+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
114+
*/
115+
@Deprecated
90116
public void setUserName(String userName) {
91-
this.userName = userName;
117+
this.options.setUserName(userName);
92118
}
93119

94120
/**
95121
* Will be used to set the "Last Will and Testament" (LWT) for the connection.
96122
* @param will The will.
97123
* @see MqttConnectOptions#setWill
124+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
98125
*/
126+
@Deprecated
99127
public void setWill(Will will) {
100-
this.will = will;
128+
this.options.setWill(will.getTopic(), will.getPayload(), will.getQos(), will.isRetained());
101129
}
102130

131+
/**
132+
* Set the persistence to pass into the client constructor.
133+
* @param persistence the persistence to set.
134+
*/
103135
public void setPersistence(MqttClientPersistence persistence) {
104136
this.persistence = persistence;
105137
}
@@ -109,10 +141,12 @@ public void setPersistence(MqttClientPersistence persistence) {
109141
* @param serverURIs The URIs.
110142
* @see MqttConnectOptions#setServerURIs(String[])
111143
* @since 4.1
144+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
112145
*/
146+
@Deprecated
113147
public void setServerURIs(String... serverURIs) {
114148
Assert.notNull(serverURIs, "'serverURIs' must not be null.");
115-
this.serverURIs = Arrays.copyOf(serverURIs, serverURIs.length);
149+
this.options.setServerURIs(Arrays.copyOf(serverURIs, serverURIs.length));
116150
}
117151

118152
/**
@@ -147,37 +181,19 @@ public IMqttAsyncClient getAsyncClientInstance(String uri, String clientId) thro
147181
return new MqttAsyncClient(uri == null ? "tcp://NO_URL_PROVIDED" : uri, clientId, this.persistence);
148182
}
149183

184+
/**
185+
* Set the preconfigured {@link MqttConnectOptions}.
186+
* @param options the options.
187+
* @since 4.3.16
188+
*/
189+
public void setConnectionOptions(MqttConnectOptions options) {
190+
Assert.notNull(options, "MqttConnectOptions cannot be null");
191+
this.options = options;
192+
}
193+
150194
@Override
151195
public MqttConnectOptions getConnectionOptions() {
152-
MqttConnectOptions options = new MqttConnectOptions();
153-
if (this.cleanSession != null) {
154-
options.setCleanSession(this.cleanSession);
155-
}
156-
if (this.connectionTimeout != null) {
157-
options.setConnectionTimeout(this.connectionTimeout);
158-
}
159-
if (this.keepAliveInterval != null) {
160-
options.setKeepAliveInterval(this.keepAliveInterval);
161-
}
162-
if (this.password != null) {
163-
options.setPassword(this.password.toCharArray());
164-
}
165-
if (this.socketFactory != null) {
166-
options.setSocketFactory(this.socketFactory);
167-
}
168-
if (this.sslProperties != null) {
169-
options.setSSLProperties(this.sslProperties);
170-
}
171-
if (this.userName != null) {
172-
options.setUserName(this.userName);
173-
}
174-
if (this.will != null) {
175-
options.setWill(this.will.getTopic(), this.will.getPayload(), this.will.getQos(), this.will.isRetained());
176-
}
177-
if (this.serverURIs != null) {
178-
options.setServerURIs(this.serverURIs);
179-
}
180-
return options;
196+
return this.options;
181197
}
182198

183199
public static class Will {

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests-context.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
</int:channel>
2121

2222
<bean id="multiUriClientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
23-
<property name="serverURIs" value="tcp://localhost:1883,tcp://localhost:1883"/>
23+
<property name="connectionOptions">
24+
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
25+
<property name="serverURIs" value="tcp://localhost:1883,tcp://localhost:1883"/>
26+
</bean>
27+
</property>
2428
</bean>
2529

2630
</beans>

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 25 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -69,7 +69,6 @@
6969
import org.springframework.integration.channel.QueueChannel;
7070
import org.springframework.integration.mqtt.core.ConsumerStopAction;
7171
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
72-
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory.Will;
7372
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
7473
import org.springframework.integration.mqtt.event.MqttIntegrationEvent;
7574
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
@@ -99,51 +98,23 @@ public class MqttAdapterTests {
9998
this.alwaysComplete = (IMqttToken) pfb.getObject();
10099
}
101100

102-
@Test
103-
public void testPahoConnectOptions() {
104-
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
105-
factory.setCleanSession(false);
106-
factory.setConnectionTimeout(23);
107-
factory.setKeepAliveInterval(45);
108-
factory.setPassword("pass");
109-
SocketFactory socketFactory = mock(SocketFactory.class);
110-
factory.setSocketFactory(socketFactory);
111-
Properties props = new Properties();
112-
factory.setSslProperties(props);
113-
factory.setUserName("user");
114-
Will will = new Will("foo", "bar".getBytes(), 2, true);
115-
factory.setWill(will);
116-
117-
MqttConnectOptions options = factory.getConnectionOptions();
118-
119-
assertEquals(23, options.getConnectionTimeout());
120-
assertEquals(45, options.getKeepAliveInterval());
121-
assertEquals("pass", new String(options.getPassword()));
122-
assertSame(socketFactory, options.getSocketFactory());
123-
assertSame(props, options.getSSLProperties());
124-
assertEquals("user", options.getUserName());
125-
assertEquals("foo", options.getWillDestination());
126-
assertEquals("bar", new String(options.getWillMessage().getPayload()));
127-
assertEquals(2, options.getWillMessage().getQos());
128-
129-
}
130-
131101
@Test
132102
public void testOutboundOptionsApplied() throws Exception {
133103
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
134-
factory.setCleanSession(false);
135-
factory.setConnectionTimeout(23);
136-
factory.setKeepAliveInterval(45);
137-
factory.setPassword("pass");
104+
MqttConnectOptions connectOptions = new MqttConnectOptions();
105+
connectOptions.setCleanSession(false);
106+
connectOptions.setConnectionTimeout(23);
107+
connectOptions.setKeepAliveInterval(45);
108+
connectOptions.setPassword("pass".toCharArray());
138109
MemoryPersistence persistence = new MemoryPersistence();
139110
factory.setPersistence(persistence);
140111
final SocketFactory socketFactory = mock(SocketFactory.class);
141-
factory.setSocketFactory(socketFactory);
112+
connectOptions.setSocketFactory(socketFactory);
142113
final Properties props = new Properties();
143-
factory.setSslProperties(props);
144-
factory.setUserName("user");
145-
Will will = new Will("foo", "bar".getBytes(), 2, true);
146-
factory.setWill(will);
114+
connectOptions.setSSLProperties(props);
115+
connectOptions.setUserName("user");
116+
connectOptions.setWill("foo", "bar".getBytes(), 2, true);
117+
factory.setConnectionOptions(connectOptions);
147118

148119
factory = spy(factory);
149120
final MqttAsyncClient client = mock(MqttAsyncClient.class);
@@ -192,19 +163,20 @@ public void testOutboundOptionsApplied() throws Exception {
192163
@Test
193164
public void testInboundOptionsApplied() throws Exception {
194165
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
195-
factory.setCleanSession(false);
196-
factory.setConnectionTimeout(23);
197-
factory.setKeepAliveInterval(45);
198-
factory.setPassword("pass");
166+
MqttConnectOptions connectOptions = new MqttConnectOptions();
167+
connectOptions.setCleanSession(false);
168+
connectOptions.setConnectionTimeout(23);
169+
connectOptions.setKeepAliveInterval(45);
170+
connectOptions.setPassword("pass".toCharArray());
199171
MemoryPersistence persistence = new MemoryPersistence();
200172
factory.setPersistence(persistence);
201173
final SocketFactory socketFactory = mock(SocketFactory.class);
202-
factory.setSocketFactory(socketFactory);
174+
connectOptions.setSocketFactory(socketFactory);
203175
final Properties props = new Properties();
204-
factory.setSslProperties(props);
205-
factory.setUserName("user");
206-
Will will = new Will("foo", "bar".getBytes(), 2, true);
207-
factory.setWill(will);
176+
connectOptions.setSSLProperties(props);
177+
connectOptions.setUserName("user");
178+
connectOptions.setWill("foo", "bar".getBytes(), 2, true);
179+
factory.setConnectionOptions(connectOptions);
208180

209181
factory = spy(factory);
210182
final IMqttClient client = mock(IMqttClient.class);
@@ -390,13 +362,15 @@ public IMqttClient getClientInstance(String uri, String clientId) throws MqttExc
390362
}
391363

392364
};
393-
factory.setServerURIs("tcp://localhost:1883");
365+
MqttConnectOptions connectOptions = new MqttConnectOptions();
366+
connectOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
394367
if (cleanSession != null) {
395-
factory.setCleanSession(cleanSession);
368+
connectOptions.setCleanSession(cleanSession);
396369
}
397370
if (action != null) {
398371
factory.setConsumerStopAction(action);
399372
}
373+
factory.setConnectionOptions(connectOptions);
400374
given(client.isConnected()).willReturn(true);
401375
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("client", factory, "foo");
402376
adapter.setApplicationEventPublisher(mock(ApplicationEventPublisher.class));

src/reference/asciidoc/mqtt.adoc

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ The current implementation uses the http://www.eclipse.org/paho/[Eclipse Paho MQ
1010
Configuration of both adapters is achieved using the `DefaultMqttPahoClientFactory`.
1111
Refer to the Paho documentation for more information about configuration options.
1212

13+
NOTE: It is preferred to configure an `MqttConnectOptions` object and inject it into the factory, instead of setting the (deprecated) options on the factory itself.
14+
1315
[[mqtt-inbound]]
1416
=== Inbound (message-driven) Channel Adapter
1517

@@ -20,9 +22,11 @@ A minimal configuration might be:
2022
[source,xml]
2123
----
2224
<bean id="clientFactory"
23-
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
24-
<property name="userName" value="${mqtt.username}"/>
25-
<property name="password" value="${mqtt.password}"/>
25+
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
26+
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
27+
<property name="userName" value="${mqtt.username}"/>
28+
<property name="password" value="${mqtt.password}"/>
29+
</bean>
2630
</bean>
2731
2832
<int-mqtt:message-driven-channel-adapter id="mqttInbound"
@@ -255,9 +259,11 @@ public class MqttJavaApplication {
255259
@Bean
256260
public MqttPahoClientFactory mqttClientFactory() {
257261
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
258-
factory.setServerURIs("tcp://host1:1883", "tcp://host2:1883");
259-
factory.setUserName("username");
260-
factory.setPassword("password");
262+
MqttConnectOptions options = new MqttConnectOptions();
263+
options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
264+
options.setUserName("username");
265+
options.setPassword("password".toCharArray());
266+
factory.setConnectionOptions(options);
261267
return factory;
262268
}
263269

0 commit comments

Comments
 (0)