Skip to content

Commit 663984a

Browse files
artembilangaryrussell
authored andcommitted
INT-4509: Fix memory leak for static beans stores
JIRA: https://jira.spring.io/browse/INT-4509 To avoid a re-usage of the `AbstractReplyProducingMessageHandler`, the `IntegrationFlowDefinition` and `AbstractStandardMessageHandlerFactoryBean` have a `static Set<>` to store already used producers and check it for newly provided. These stores are not cleaned when beans are destroyed leading to memory leaks * Implements a `DisposableBean` for the `AbstractStandardMessageHandlerFactoryBean` to remove its `replyHandler` from the `referencedReplyProducers` on `destroy()` * Introduce a `IntegrationFlowDefinition.ReplyProducerCleaner` - an `DestructionAwareBeanPostProcessor` to clean up removing `MessageProducer` from the `IntegrationFlowDefinition.REFERENCED_REPLY_PRODUCERS` **Cherry-pick to 5.0.x**
1 parent d4cf440 commit 663984a

File tree

5 files changed

+132
-43
lines changed

5 files changed

+132
-43
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/AbstractStandardMessageHandlerFactoryBean.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121

2222
import org.springframework.aop.TargetSource;
2323
import org.springframework.aop.framework.Advised;
24+
import org.springframework.beans.factory.DisposableBean;
2425
import org.springframework.expression.Expression;
2526
import org.springframework.expression.ExpressionParser;
2627
import org.springframework.expression.spel.standard.SpelExpressionParser;
@@ -41,21 +42,23 @@
4142
* @author David Liu
4243
*/
4344
public abstract class AbstractStandardMessageHandlerFactoryBean
44-
extends AbstractSimpleMessageHandlerFactoryBean<MessageHandler> {
45+
extends AbstractSimpleMessageHandlerFactoryBean<MessageHandler> implements DisposableBean {
4546

4647
private static final ExpressionParser expressionParser = new SpelExpressionParser();
4748

4849
private static final Set<MessageHandler> referencedReplyProducers = new HashSet<>();
4950

50-
private volatile Boolean requiresReply;
51+
private Boolean requiresReply;
5152

52-
private volatile Object targetObject;
53+
private Object targetObject;
5354

54-
private volatile String targetMethodName;
55+
private String targetMethodName;
5556

56-
private volatile Expression expression;
57+
private Expression expression;
5758

58-
private volatile Long sendTimeout;
59+
private Long sendTimeout;
60+
61+
private MessageHandler replyHandler;
5962

6063
/**
6164
* Set the target POJO for the message handler.
@@ -101,6 +104,13 @@ public Long getSendTimeout() {
101104
return this.sendTimeout;
102105
}
103106

107+
@Override
108+
public void destroy() {
109+
if (this.replyHandler != null) {
110+
referencedReplyProducers.remove(this.replyHandler);
111+
}
112+
}
113+
104114
@Override
105115
protected MessageHandler createHandler() {
106116
MessageHandler handler;
@@ -158,6 +168,7 @@ private void checkReuse(AbstractMessageProducingHandler replyHandler) {
158168
"An AbstractMessageProducingMessageHandler may only be referenced once (" +
159169
replyHandler.getComponentName() + ") - use scope=\"prototype\"");
160170
referencedReplyProducers.add(replyHandler);
171+
this.replyHandler = replyHandler;
161172
}
162173

163174
/**

spring-integration-core/src/main/java/org/springframework/integration/config/dsl/DslIntegrationConfigurationInitializer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.springframework.beans.factory.support.RootBeanDefinition;
2525
import org.springframework.integration.config.IntegrationConfigurationInitializer;
2626
import org.springframework.integration.dsl.IntegrationComponentSpec;
27+
import org.springframework.integration.dsl.IntegrationFlowDefinition;
2728
import org.springframework.integration.dsl.context.IntegrationFlowContext;
2829
import org.springframework.util.Assert;
2930

@@ -46,6 +47,9 @@ public class DslIntegrationConfigurationInitializer implements IntegrationConfig
4647
private static final String INTEGRATION_FLOW_CONTEXT_BEAN_NAME =
4748
Introspector.decapitalize(IntegrationFlowContext.class.getName());
4849

50+
private static final String INTEGRATION_FLOW_REPLY_PRODUCER_CLEANER_BEAN_NAME =
51+
Introspector.decapitalize(IntegrationFlowDefinition.ReplyProducerCleaner.class.getName());
52+
4953
@Override
5054
public void initialize(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
5155
Assert.isInstanceOf(BeanDefinitionRegistry.class, configurableListableBeanFactory,
@@ -59,6 +63,8 @@ public void initialize(ConfigurableListableBeanFactory configurableListableBeanF
5963
new RootBeanDefinition(IntegrationFlowBeanPostProcessor.class));
6064
registry.registerBeanDefinition(INTEGRATION_FLOW_CONTEXT_BEAN_NAME,
6165
new RootBeanDefinition(IntegrationFlowContext.class));
66+
registry.registerBeanDefinition(INTEGRATION_FLOW_REPLY_PRODUCER_CLEANER_BEAN_NAME,
67+
new RootBeanDefinition(IntegrationFlowDefinition.ReplyProducerCleaner.class));
6268
}
6369
}
6470

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929

3030
import org.springframework.aop.framework.Advised;
3131
import org.springframework.aop.support.AopUtils;
32+
import org.springframework.beans.BeansException;
3233
import org.springframework.beans.factory.BeanCreationException;
34+
import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor;
3335
import org.springframework.expression.Expression;
3436
import org.springframework.expression.spel.standard.SpelExpressionParser;
3537
import org.springframework.integration.aggregator.AggregatingMessageHandler;
@@ -122,9 +124,9 @@ public abstract class IntegrationFlowDefinition<B extends IntegrationFlowDefinit
122124

123125
protected final Map<Object, String> integrationComponents = new LinkedHashMap<>();
124126

125-
protected MessageChannel currentMessageChannel;
127+
private MessageChannel currentMessageChannel;
126128

127-
protected Object currentComponent;
129+
private Object currentComponent;
128130

129131
private StandardIntegrationFlow integrationFlow;
130132

@@ -481,7 +483,9 @@ public B transform(String expression) {
481483
* @return the current {@link IntegrationFlowDefinition}.
482484
* @see ExpressionEvaluatingTransformer
483485
*/
484-
public B transform(String expression, Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
486+
public B transform(String expression,
487+
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
488+
485489
Assert.hasText(expression, "'expression' must not be empty");
486490
return transform(new ExpressionEvaluatingTransformer(PARSER.parseExpression(expression)),
487491
endpointConfigurer);
@@ -2773,4 +2777,21 @@ private void checkReuse(MessageProducer replyHandler) {
27732777
REFERENCED_REPLY_PRODUCERS.add(replyHandler);
27742778
}
27752779

2780+
public static final class ReplyProducerCleaner implements DestructionAwareBeanPostProcessor {
2781+
2782+
private ReplyProducerCleaner() {
2783+
}
2784+
2785+
@Override
2786+
public boolean requiresDestruction(Object bean) {
2787+
return IntegrationFlowDefinition.REFERENCED_REPLY_PRODUCERS.contains(bean);
2788+
}
2789+
2790+
@Override
2791+
public void postProcessBeforeDestruction(Object bean, String beanName) throws BeansException {
2792+
IntegrationFlowDefinition.REFERENCED_REPLY_PRODUCERS.remove(bean);
2793+
}
2794+
2795+
}
2796+
27762797
}

spring-integration-core/src/test/java/org/springframework/integration/config/xml/DelegatingConsumerParserTests.java

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2017 the original author or authors.
2+
* Copyright 2013-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.config.xml;
1818

1919
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertFalse;
2021
import static org.junit.Assert.assertNotNull;
2122
import static org.junit.Assert.assertTrue;
2223
import static org.junit.Assert.fail;
@@ -25,6 +26,7 @@
2526
import java.util.ArrayList;
2627
import java.util.Collection;
2728
import java.util.List;
29+
import java.util.Set;
2830

2931
import org.junit.Test;
3032
import org.junit.runner.RunWith;
@@ -52,68 +54,89 @@
5254

5355
/**
5456
* @author Gary Russell
57+
* @author Artem Bilan
58+
*
5559
* @since 3.0
5660
*
5761
*/
5862
@ContextConfiguration
5963
@RunWith(SpringJUnit4ClassRunner.class)
6064
public class DelegatingConsumerParserTests {
6165

62-
@Autowired @Qualifier("directFilter.handler")
66+
@Autowired
67+
@Qualifier("directFilter.handler")
6368
private MessageHandler directFilter;
6469

65-
@Autowired @Qualifier("refFilter.handler")
70+
@Autowired
71+
@Qualifier("refFilter.handler")
6672
private MessageHandler refFilter;
6773

68-
@Autowired @Qualifier("filterWithMessageSelectorThatsAlsoAnARPMH.handler")
74+
@Autowired
75+
@Qualifier("filterWithMessageSelectorThatsAlsoAnARPMH.handler")
6976
private MessageHandler filterWithMessageSelectorThatsAlsoAnARPMH;
7077

71-
@Autowired @Qualifier("directRouter.handler")
78+
@Autowired
79+
@Qualifier("directRouter.handler")
7280
private MessageHandler directRouter;
7381

74-
@Autowired @Qualifier("refRouter.handler")
82+
@Autowired
83+
@Qualifier("refRouter.handler")
7584
private MessageHandler refRouter;
7685

77-
@Autowired @Qualifier("directRouterMH.handler")
86+
@Autowired
87+
@Qualifier("directRouterMH.handler")
7888
private MessageHandler directRouterMH;
7989

80-
@Autowired @Qualifier("refRouterMH.handler")
90+
@Autowired
91+
@Qualifier("refRouterMH.handler")
8192
private MessageHandler refRouterMH;
8293

83-
@Autowired @Qualifier("directRouterARPMH.handler")
94+
@Autowired
95+
@Qualifier("directRouterARPMH.handler")
8496
private MessageHandler directRouterARPMH;
8597

86-
@Autowired @Qualifier("refRouterARPMH.handler")
98+
@Autowired
99+
@Qualifier("refRouterARPMH.handler")
87100
private MessageHandler refRouterARPMH;
88101

89-
@Autowired @Qualifier("directServiceARPMH.handler")
102+
@Autowired
103+
@Qualifier("directServiceARPMH.handler")
90104
private MessageHandler directServiceARPMH;
91105

92-
@Autowired @Qualifier("refServiceARPMH.handler")
106+
@Autowired
107+
@Qualifier("refServiceARPMH.handler")
93108
private MessageHandler refServiceARPMH;
94109

95-
@Autowired @Qualifier("directSplitter.handler")
110+
@Autowired
111+
@Qualifier("directSplitter.handler")
96112
private MessageHandler directSplitter;
97113

98-
@Autowired @Qualifier("refSplitter.handler")
114+
@Autowired
115+
@Qualifier("refSplitter.handler")
99116
private MessageHandler refSplitter;
100117

101-
@Autowired @Qualifier("splitterWithARPMH.handler")
118+
@Autowired
119+
@Qualifier("splitterWithARPMH.handler")
102120
private MessageHandler splitterWithARPMH;
103121

104-
@Autowired @Qualifier("splitterWithARPMHWithAtts.handler")
122+
@Autowired
123+
@Qualifier("splitterWithARPMHWithAtts.handler")
105124
private MessageHandler splitterWithARPMHWithAtts;
106125

107-
@Autowired @Qualifier("directTransformer.handler")
126+
@Autowired
127+
@Qualifier("directTransformer.handler")
108128
private MessageHandler directTransformer;
109129

110-
@Autowired @Qualifier("refTransformer.handler")
130+
@Autowired
131+
@Qualifier("refTransformer.handler")
111132
private MessageHandler refTransformer;
112133

113-
@Autowired @Qualifier("directTransformerARPMH.handler")
134+
@Autowired
135+
@Qualifier("directTransformerARPMH.handler")
114136
private MessageHandler directTransformerARPMH;
115137

116-
@Autowired @Qualifier("refTransformerARPMH.handler")
138+
@Autowired
139+
@Qualifier("refTransformerARPMH.handler")
117140
private MessageHandler refTransformerARPMH;
118141

119142
private static QueueChannel replyChannel = new QueueChannel();
@@ -153,7 +176,8 @@ public void testDelegates() {
153176
assertTrue(splitterWithARPMH instanceof MySplitterThatsAnARPMH);
154177
testHandler(splitterWithARPMH);
155178
assertTrue(splitterWithARPMHWithAtts instanceof MySplitterThatsAnARPMH);
156-
assertEquals(Long.valueOf(123), TestUtils.getPropertyValue(splitterWithARPMHWithAtts, "messagingTemplate.sendTimeout", Long.class));
179+
assertEquals(Long.valueOf(123),
180+
TestUtils.getPropertyValue(splitterWithARPMHWithAtts, "messagingTemplate.sendTimeout", Long.class));
157181
testHandler(splitterWithARPMHWithAtts);
158182

159183
assertTrue(directTransformer instanceof MessageTransformingHandler);
@@ -170,24 +194,32 @@ public void testDelegates() {
170194
}
171195

172196
@Test
197+
@SuppressWarnings("unchecked")
173198
public void testOneRefOnly() throws Exception {
174199
ServiceActivatorFactoryBean fb = new ServiceActivatorFactoryBean();
175200
fb.setBeanFactory(mock(BeanFactory.class));
176201
MyServiceARPMH service = new MyServiceARPMH();
177202
service.setBeanName("foo");
178203
fb.setTargetObject(service);
179204
fb.getObject();
180-
fb = new ServiceActivatorFactoryBean();
181-
fb.setBeanFactory(mock(BeanFactory.class));
182-
fb.setTargetObject(service);
205+
206+
assertTrue(TestUtils.getPropertyValue(fb, "referencedReplyProducers", Set.class).contains(service));
207+
208+
ServiceActivatorFactoryBean fb2 = new ServiceActivatorFactoryBean();
209+
fb2.setBeanFactory(mock(BeanFactory.class));
210+
fb2.setTargetObject(service);
183211
try {
184-
fb.getObject();
212+
fb2.getObject();
185213
fail("expected exception");
186214
}
187215
catch (Exception e) {
188216
assertEquals("An AbstractMessageProducingMessageHandler may only be referenced once (foo) - "
189217
+ "use scope=\"prototype\"", e.getMessage());
190218
}
219+
220+
fb.destroy();
221+
222+
assertFalse(TestUtils.getPropertyValue(fb, "referencedReplyProducers", Set.class).contains(service));
191223
}
192224

193225
private void testHandler(MessageHandler handler) {

0 commit comments

Comments
 (0)