Skip to content

Commit 1ce656e

Browse files
artembilangaryrussell
authored andcommitted
INT-4486: Properly implement stop(Runnable)
JIRA: https://jira.spring.io/browse/INT-4486 The `SmartLifecycle.stop(Runnable callback)` must always call the `callback` in the end independently of the internal state * Revise all the `SmartLifecycle` implementations for the proper `callback` handling **Cherry-pick to 5.0.x and 4.3.x**
1 parent 125cc86 commit 1ce656e

File tree

11 files changed

+67
-35
lines changed

11 files changed

+67
-35
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/AbstractSubscribableAmqpChannel.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -234,6 +234,9 @@ public void stop(Runnable callback) {
234234
this.container.stop(callback);
235235
this.declared = false;
236236
}
237+
else {
238+
callback.run();
239+
}
237240
}
238241

239242
@Override

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AmqpChannelFactoryBean.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,10 +66,11 @@
6666
* @author Mark Fisher
6767
* @author Gary Russell
6868
* @author Artem Bilan
69+
*
6970
* @since 2.1
7071
*/
71-
public class AmqpChannelFactoryBean extends AbstractFactoryBean<AbstractAmqpChannel> implements SmartLifecycle,
72-
DisposableBean, BeanNameAware {
72+
public class AmqpChannelFactoryBean extends AbstractFactoryBean<AbstractAmqpChannel>
73+
implements SmartLifecycle, DisposableBean, BeanNameAware {
7374

7475
private volatile AbstractAmqpChannel channel;
7576

@@ -516,13 +517,14 @@ public void stop(Runnable callback) {
516517
if (this.channel instanceof SmartLifecycle) {
517518
((SmartLifecycle) this.channel).stop(callback);
518519
}
520+
else {
521+
callback.run();
522+
}
519523
}
520524

521525
@Override
522526
protected void destroyInstance(AbstractAmqpChannel instance) throws Exception {
523-
if (instance instanceof DisposableBean) {
524-
((DisposableBean) this.channel).destroy();
525-
}
527+
this.channel.destroy();
526528
}
527529

528530
}

spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,9 @@ public void stop(Runnable callback) {
362362
if (this.endpoint != null) {
363363
this.endpoint.stop(callback);
364364
}
365+
else {
366+
callback.run();
367+
}
365368
}
366369

367370
@Override

spring-integration-core/src/main/java/org/springframework/integration/config/SourcePollingChannelAdapterFactoryBean.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -247,6 +247,9 @@ public void stop(Runnable callback) {
247247
if (this.adapter != null) {
248248
this.adapter.stop(callback);
249249
}
250+
else {
251+
callback.run();
252+
}
250253
}
251254

252255
@Override

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowAdapter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ public void stop(Runnable callback) {
9595
if (this.running.getAndSet(false)) {
9696
this.targetIntegrationFlow.stop(callback);
9797
}
98+
else {
99+
callback.run();
100+
}
98101
}
99102

100103
@Override

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -199,6 +199,9 @@ public final void stop(Runnable callback) {
199199
logger.info("stopped " + this);
200200
}
201201
}
202+
else {
203+
callback.run();
204+
}
202205
}
203206
finally {
204207
this.lifecycleLock.unlock();

spring-integration-file/src/main/java/org/springframework/integration/file/config/FileTailInboundChannelAdapterFactoryBean.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
3636
* @author Gary Russell
3737
* @author Artem Bilan
3838
* @author Ali Shahbour
39+
*
3940
* @since 3.0
4041
*
4142
*/
@@ -192,6 +193,9 @@ public void stop(Runnable callback) {
192193
if (this.adapter != null) {
193194
this.adapter.stop(callback);
194195
}
196+
else {
197+
callback.run();
198+
}
195199
}
196200

197201
@Override

spring-integration-jms/src/main/java/org/springframework/integration/jms/SubscribableJmsChannel.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,9 +43,12 @@
4343
/**
4444
* @author Mark Fisher
4545
* @author Gary Russell
46+
* @author Artem Bilan
47+
*
4648
* @since 2.0
4749
*/
48-
public class SubscribableJmsChannel extends AbstractJmsChannel implements SubscribableChannel, SmartLifecycle, DisposableBean {
50+
public class SubscribableJmsChannel extends AbstractJmsChannel
51+
implements SubscribableChannel, SmartLifecycle, DisposableBean {
4952

5053
private final AbstractMessageListenerContainer container;
5154

@@ -72,13 +75,15 @@ public void setMaxSubscribers(int maxSubscribers) {
7275

7376
@Override
7477
public boolean subscribe(MessageHandler handler) {
75-
Assert.state(this.dispatcher != null, "'MessageDispatcher' must not be null. This channel might not have been initialized");
78+
Assert.state(this.dispatcher != null,
79+
"'MessageDispatcher' must not be null. This channel might not have been initialized");
7680
return this.dispatcher.addHandler(handler);
7781
}
7882

7983
@Override
8084
public boolean unsubscribe(MessageHandler handler) {
81-
Assert.state(this.dispatcher != null, "'MessageDispatcher' must not be null. This channel might not have been initialized");
85+
Assert.state(this.dispatcher != null,
86+
"'MessageDispatcher' must not be null. This channel might not have been initialized");
8287
return this.dispatcher.removeHandler(handler);
8388
}
8489

@@ -126,7 +131,7 @@ private void configureDispatcher(boolean isPubSub) {
126131

127132
@Override
128133
public boolean isAutoStartup() {
129-
return (this.container != null) ? this.container.isAutoStartup() : false;
134+
return (this.container != null) && this.container.isAutoStartup();
130135
}
131136

132137
@Override
@@ -136,7 +141,7 @@ public int getPhase() {
136141

137142
@Override
138143
public boolean isRunning() {
139-
return (this.container != null) ? this.container.isRunning() : false;
144+
return (this.container != null) && this.container.isRunning();
140145
}
141146

142147
@Override
@@ -158,6 +163,9 @@ public void stop(Runnable callback) {
158163
if (this.container != null) {
159164
this.container.stop(callback);
160165
}
166+
else {
167+
callback.run();
168+
}
161169
}
162170

163171
@Override
@@ -226,6 +234,7 @@ else if (this.logger.isWarnEnabled()) {
226234
throw new MessagingException("failed to handle incoming JMS Message", e);
227235
}
228236
}
237+
229238
}
230239

231240
}

spring-integration-jms/src/main/java/org/springframework/integration/jms/config/JmsChannelFactoryBean.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -548,6 +548,9 @@ public void stop(Runnable callback) {
548548
if (this.channel instanceof SubscribableJmsChannel) {
549549
((SubscribableJmsChannel) this.channel).stop(callback);
550550
}
551+
else {
552+
callback.run();
553+
}
551554
}
552555

553556
@Override

spring-integration-redis/src/main/java/org/springframework/integration/redis/channel/SubscribableRedisChannel.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,7 @@
5151
* @author Oleg Zhurakousky
5252
* @author Gary Russell
5353
* @author Artem Bilan
54+
*
5455
* @since 2.0
5556
*/
5657
@SuppressWarnings("rawtypes")
@@ -105,7 +106,6 @@ public void setSerializer(RedisSerializer<?> serializer) {
105106
/**
106107
* Specify the maximum number of subscribers supported by the
107108
* channel's dispatcher.
108-
*
109109
* @param maxSubscribers The maximum number of subscribers allowed.
110110
*/
111111
public void setMaxSubscribers(int maxSubscribers) {
@@ -168,45 +168,37 @@ public void onInit() throws Exception {
168168

169169
@Override
170170
public boolean isAutoStartup() {
171-
return (this.container != null) && this.container.isAutoStartup();
171+
return this.container.isAutoStartup();
172172
}
173173

174174
@Override
175175
public int getPhase() {
176-
return (this.container != null) ? this.container.getPhase() : 0;
176+
return this.container.getPhase();
177177
}
178178

179179
@Override
180180
public boolean isRunning() {
181-
return (this.container != null) && this.container.isRunning();
181+
return this.container.isRunning();
182182
}
183183

184184
@Override
185185
public void start() {
186-
if (this.container != null) {
187-
this.container.start();
188-
}
186+
this.container.start();
189187
}
190188

191189
@Override
192190
public void stop() {
193-
if (this.container != null) {
194-
this.container.stop();
195-
}
191+
this.container.stop();
196192
}
197193

198194
@Override
199195
public void stop(Runnable callback) {
200-
if (this.container != null) {
201-
this.container.stop(callback);
202-
}
196+
this.container.stop(callback);
203197
}
204198

205199
@Override
206200
public void destroy() throws Exception {
207-
if (this.container != null) {
208-
this.container.destroy();
209-
}
201+
this.container.destroy();
210202
}
211203

212204
private class MessageListenerDelegate {
@@ -230,6 +222,7 @@ public void handleMessage(Object payload) {
230222
+ "' (" + SubscribableRedisChannel.this.getFullChannelName() + ").", e);
231223
}
232224
}
225+
233226
}
234227

235228
}

spring-integration-syslog/src/main/java/org/springframework/integration/syslog/config/SyslogReceivingChannelAdapterFactoryBean.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,7 +32,10 @@
3232

3333
/**
3434
* Factory bean to create syslog inbound adapters (UDP or TCP).
35+
*
3536
* @author Gary Russell
37+
* @author Artem Bilan
38+
*
3639
* @since 3.0
3740
*
3841
*/
@@ -163,6 +166,9 @@ public void stop(Runnable callback) {
163166
if (this.adapter != null) {
164167
this.adapter.stop(callback);
165168
}
169+
else {
170+
callback.run();
171+
}
166172
}
167173

168174
@Override

0 commit comments

Comments
 (0)