Skip to content

Commit 61b272d

Browse files
artembilangaryrussell
authored andcommitted
Wrap non-StandardIntegrationFlow into Proxy (#2440)
* Wrap non-`StandardIntegrationFlow` into Proxy In previous version all the `IntegrationFlow` beans have been replaced by the `StandardIntegrationFlow` i the `IntegrationFlowBeanPostProcessor` if they are lambda. That works for Java, but doesn't with Kotlin, since lambdas i last one are not synthetic classes. Therefore some Java DSL definitions (especially `.subFlowMapping()`) don't work consistently in two languages. * Introduce `IntegrationFlowLifecycleAdvice` to wrap all the non-`StandardIntegrationFlow`s (excluding `IntegrationFlowAdapter`) into the `Proxy` to expose `SmartLifecycle` and `getInputChannel()` operations and delegate them to the internal `StandardIntegrationFlow` created by the `IntegrationFlowBeanPostProcessor`. * This way any custom `IntegrationFlow` implementations can be used for manual flow registration via `IntegrationFlowContext` * Polish `RouterDslTests.kt` for the `@Bean`s for sub-flows. * Document in the `dsl.adoc` a request-reply approach for the case when `.subFlowMapping()` refers to an `IntegrationFlow` `@Bean`. * Polishing for the `FlowServiceTests` since all the non-`StandardIntegrationFlow`s and not-`IntegrationFlowAdapter`s are wrapped now to the Proxy. * Add missing `from()` delegations into the `IntegrationFlowAdapter` * Polishing for the `ManualFlowTests` since all the `IntegrationFlow` now are `Lifecycle` after wrapping to the Proxy. * Add JavaDocs to the IntegrationFlowLifecycleAdvice and polishing for the dsl.adoc * * Add JavaDocs for the `IntegrationFlowBeanPostProcessor.processIntegrationFlowImpl()` * Improve JavaDoc for the `RouterSpec.subFlowMapping()` * Assert in the `FlowServiceTests` that proxied custom flow implements all the expected interfaces
1 parent 8fa1ea7 commit 61b272d

File tree

8 files changed

+300
-39
lines changed

8 files changed

+300
-39
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowAdapter.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818

1919
import java.util.concurrent.atomic.AtomicBoolean;
2020
import java.util.function.Consumer;
21+
import java.util.function.Supplier;
22+
23+
import org.reactivestreams.Publisher;
2124

2225
import org.springframework.context.SmartLifecycle;
2326
import org.springframework.integration.core.MessageSource;
2427
import org.springframework.integration.endpoint.MessageProducerSupport;
2528
import org.springframework.integration.gateway.MessagingGatewaySupport;
29+
import org.springframework.messaging.Message;
2630
import org.springframework.messaging.MessageChannel;
2731
import org.springframework.util.Assert;
2832

@@ -178,6 +182,27 @@ protected IntegrationFlowBuilder from(Object service, String methodName,
178182
return IntegrationFlows.from(service, methodName, endpointConfigurer);
179183
}
180184

185+
protected <T> IntegrationFlowBuilder from(Supplier<T> messageSource) {
186+
return IntegrationFlows.from(messageSource);
187+
}
188+
189+
protected <T> IntegrationFlowBuilder from(Supplier<T> messageSource,
190+
Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer) {
191+
return IntegrationFlows.from(messageSource, endpointConfigurer);
192+
}
193+
194+
protected IntegrationFlowBuilder from(Class<?> serviceInterface) {
195+
return IntegrationFlows.from(serviceInterface);
196+
}
197+
198+
protected IntegrationFlowBuilder from(Class<?> serviceInterface, String beanName) {
199+
return IntegrationFlows.from(serviceInterface, beanName);
200+
}
201+
202+
protected IntegrationFlowBuilder from(Publisher<Message<?>> publisher) {
203+
return IntegrationFlows.from(publisher);
204+
}
205+
181206
protected abstract IntegrationFlowDefinition<?> buildFlow();
182207

183208
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowBeanPostProcessor.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.util.LinkedHashMap;
2121
import java.util.Map;
2222

23+
import org.springframework.aop.framework.ProxyFactory;
24+
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
2325
import org.springframework.beans.BeansException;
2426
import org.springframework.beans.factory.BeanCreationNotAllowedException;
2527
import org.springframework.beans.factory.BeanFactory;
@@ -33,6 +35,7 @@
3335
import org.springframework.beans.factory.support.AbstractBeanDefinition;
3436
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
3537
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
38+
import org.springframework.context.SmartLifecycle;
3639
import org.springframework.core.io.DescriptiveResource;
3740
import org.springframework.integration.channel.AbstractMessageChannel;
3841
import org.springframework.integration.channel.DirectChannel;
@@ -250,11 +253,44 @@ else if (component instanceof AnnotationGatewayProxyFactoryBean) {
250253
return flow;
251254
}
252255

256+
/**
257+
* Only invoked for {@link IntegrationFlow} instances that are not
258+
* {@link StandardIntegrationFlow}s; typically lambdas. Creates a new
259+
* {@link StandardIntegrationFlow} with an input channel named {@code beanName.input}
260+
* and the flow defined by the flow parameter. If the flow is not an
261+
* {@link IntegrationFlowAdapter} the original, user-provided {@link IntegrationFlow}
262+
* is wrapped in a proxy and advised with a {@link IntegrationFlowLifecycleAdvice};
263+
* see its javadocs for more information.
264+
*/
253265
private Object processIntegrationFlowImpl(IntegrationFlow flow, String beanName) {
254266
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(beanName + ".input");
267+
255268
flow.configure(flowBuilder);
256-
Object standardIntegrationFlow = processStandardIntegrationFlow(flowBuilder.get(), beanName);
257-
return isLambda(flow) ? standardIntegrationFlow : flow;
269+
270+
StandardIntegrationFlow target = flowBuilder.get();
271+
processStandardIntegrationFlow(target, beanName);
272+
273+
if (!(flow instanceof IntegrationFlowAdapter)) {
274+
NameMatchMethodPointcutAdvisor integrationFlowAdvice =
275+
new NameMatchMethodPointcutAdvisor(new IntegrationFlowLifecycleAdvice(target));
276+
integrationFlowAdvice.setMappedNames(
277+
"getInputChannel",
278+
"start",
279+
"stop",
280+
"isRunning",
281+
"isAutoStartup",
282+
"getPhase");
283+
284+
ProxyFactory proxyFactory = new ProxyFactory(flow);
285+
proxyFactory.addAdvisor(integrationFlowAdvice);
286+
if (!(flow instanceof SmartLifecycle)) {
287+
proxyFactory.addInterface(SmartLifecycle.class);
288+
}
289+
return proxyFactory.getProxy(this.beanFactory.getBeanClassLoader());
290+
}
291+
else {
292+
return flow;
293+
}
258294
}
259295

260296
private void processIntegrationComponentSpec(IntegrationComponentSpec<?, ?> bean) {
@@ -321,9 +357,4 @@ else if (fallbackId != null) {
321357
return id;
322358
}
323359

324-
private static boolean isLambda(Object o) {
325-
Class<?> aClass = o.getClass();
326-
return aClass.isSynthetic() && !aClass.isAnonymousClass() && !aClass.isLocalClass();
327-
}
328-
329360
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.dsl;
18+
19+
import org.aopalliance.intercept.MethodInterceptor;
20+
import org.aopalliance.intercept.MethodInvocation;
21+
22+
import org.springframework.context.SmartLifecycle;
23+
import org.springframework.util.ObjectUtils;
24+
25+
/**
26+
* An AOP {@link MethodInterceptor} for the {@link IntegrationFlow} proxies
27+
* with a delegation to an associated {@link StandardIntegrationFlow} instance, which
28+
* is not exposed as a bean during a target {@link IntegrationFlow} bean processing.
29+
*
30+
* <p> In most cases an associated internal {@link StandardIntegrationFlow}
31+
* exposes an {@code inputChannel} bean for the target {@link IntegrationFlow},
32+
* which doesn't start from the channel, e.g. instantiated from lambda.
33+
* This way the advice first tries to obtain an {@code inputChannel} from the
34+
* target {@link IntegrationFlow} and then falls back to the {@link #delegate}.
35+
*
36+
* <p> Another aspect of this advice is to control and delegate {@link SmartLifecycle}
37+
* of the target {@link IntegrationFlow} and associated {@link #delegate}.
38+
* The {@link SmartLifecycle#start()} and {@link SmartLifecycle#stop()} operations
39+
* are delegated to the {@link StandardIntegrationFlow} as is because that instance
40+
* isn't controlled by the standard application context lifecycle.
41+
* The {@link SmartLifecycle#isAutoStartup()}, {@link SmartLifecycle#getPhase()}
42+
* and {@link SmartLifecycle#isRunning()} are called on the {@link #delegate}
43+
* only in case when {@link MethodInvocation#proceed()} returns {@code null}
44+
* or isn't called at all, e.g. when target {@link IntegrationFlow} doesn't
45+
* implement {@link SmartLifecycle}.
46+
*
47+
* @author Artem Bilan
48+
*
49+
* @since 5.1
50+
*/
51+
class IntegrationFlowLifecycleAdvice implements MethodInterceptor {
52+
53+
private final StandardIntegrationFlow delegate;
54+
55+
IntegrationFlowLifecycleAdvice(StandardIntegrationFlow delegate) {
56+
this.delegate = delegate;
57+
}
58+
59+
@Override
60+
public Object invoke(MethodInvocation invocation) throws Throwable {
61+
Object target = invocation.getThis();
62+
String method = invocation.getMethod().getName();
63+
Object result = null;
64+
65+
if ("getInputChannel".equals(method)) {
66+
result = invocation.proceed();
67+
if (result == null) {
68+
result = this.delegate.getInputChannel();
69+
}
70+
}
71+
else {
72+
if (target instanceof SmartLifecycle) {
73+
result = invocation.proceed();
74+
}
75+
76+
switch (method) {
77+
78+
case "start":
79+
this.delegate.start();
80+
break;
81+
82+
case "stop":
83+
Object[] arguments = invocation.getArguments();
84+
if (!ObjectUtils.isEmpty(arguments)) {
85+
this.delegate.stop((Runnable) arguments[0]);
86+
}
87+
else {
88+
this.delegate.stop();
89+
}
90+
break;
91+
92+
case "isRunning":
93+
if (result == null) {
94+
result = this.delegate.isRunning();
95+
}
96+
break;
97+
98+
case "isAutoStartup":
99+
if (result == null) {
100+
result = this.delegate.isAutoStartup();
101+
}
102+
break;
103+
104+
case "getPhase":
105+
if (result == null) {
106+
result = this.delegate.getPhase();
107+
}
108+
break;
109+
}
110+
}
111+
112+
return result;
113+
}
114+
115+
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/RouterSpec.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,14 @@ public String getComponentType() {
143143
* Add a subflow as an alternative to a {@link #channelMapping(Object, String)}.
144144
* {@link #prefix(String)} and {@link #suffix(String)} cannot be used when subflow
145145
* mappings are used.
146+
* <p> If subflow should refer to the external {@link IntegrationFlow} bean and
147+
* there is a requirement to expect reply from there, such a reference should be
148+
* wrapped with a {@code .gateway()}:
149+
* <pre class="code">
150+
* {@code
151+
* .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
152+
* }
153+
* </pre>
146154
* @param key the key.
147155
* @param subFlow the subFlow.
148156
* @return the router spec.

spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616

1717
package org.springframework.integration.dsl.flowservices;
1818

19+
import static org.hamcrest.Matchers.instanceOf;
1920
import static org.junit.Assert.assertEquals;
2021
import static org.junit.Assert.assertNotNull;
22+
import static org.junit.Assert.assertThat;
23+
import static org.junit.Assert.assertTrue;
2124

2225
import java.util.Collection;
2326
import java.util.Collections;
@@ -30,11 +33,15 @@
3033
import org.junit.Test;
3134
import org.junit.runner.RunWith;
3235

36+
import org.springframework.aop.framework.Advised;
37+
import org.springframework.aop.support.AopUtils;
3338
import org.springframework.beans.factory.annotation.Autowired;
3439
import org.springframework.beans.factory.annotation.Qualifier;
40+
import org.springframework.context.SmartLifecycle;
3541
import org.springframework.context.annotation.Bean;
3642
import org.springframework.context.annotation.ComponentScan;
3743
import org.springframework.context.annotation.Configuration;
44+
import org.springframework.core.Ordered;
3845
import org.springframework.integration.annotation.Aggregator;
3946
import org.springframework.integration.annotation.CorrelationStrategy;
4047
import org.springframework.integration.annotation.Filter;
@@ -70,21 +77,29 @@
7077
@DirtiesContext
7178
public class FlowServiceTests {
7279

80+
@Autowired(required = false)
81+
@Qualifier("flowServiceTests.MyFlow")
82+
private IntegrationFlow myFlow;
83+
7384
@Autowired
7485
@Qualifier("flowServiceTests.MyFlow.input")
7586
private MessageChannel input;
7687

77-
@Autowired(required = false)
78-
private MyFlow myFlow;
79-
8088
@Autowired
8189
private PollableChannel myFlowAdapterOutput;
8290

8391
@Test
84-
public void testFlowServiceAndLogAsLastNoError() {
92+
public void testFlowServiceAndLogAsLastNoError() throws Exception {
8593
assertNotNull(this.myFlow);
94+
assertTrue(AopUtils.isAopProxy(this.myFlow));
95+
assertThat(this.myFlow, instanceOf(Advised.class));
96+
assertThat(this.myFlow, instanceOf(Ordered.class));
97+
assertThat(this.myFlow, instanceOf(SmartLifecycle.class));
98+
8699
this.input.send(MessageBuilder.withPayload("foo").build());
87-
Object result = this.myFlow.resultOverLoggingHandler.get();
100+
101+
MyFlow myFlow = (MyFlow) ((Advised) this.myFlow).getTargetSource().getTarget();
102+
Object result = myFlow.resultOverLoggingHandler.get();
88103
assertNotNull(result);
89104
assertEquals("FOO", result);
90105
}
@@ -134,7 +149,7 @@ public IntegrationFlow subFlow() {
134149
}
135150

136151
@Component
137-
public static class MyFlow implements IntegrationFlow {
152+
public static class MyFlow implements IntegrationFlow, Ordered {
138153

139154
private final AtomicReference<Object> resultOverLoggingHandler = new AtomicReference<>();
140155

@@ -147,6 +162,11 @@ public void configure(IntegrationFlowDefinition<?> f) {
147162
});
148163
}
149164

165+
@Override
166+
public int getOrder() {
167+
return 0;
168+
}
169+
150170
}
151171

152172
@Component

spring-integration-core/src/test/java/org/springframework/integration/dsl/manualflow/ManualFlowTests.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -204,32 +204,6 @@ public void testManualFlowRegistration() throws InterruptedException {
204204

205205
@Test
206206
public void testWrongLifecycle() {
207-
208-
class MyIntegrationFlow implements IntegrationFlow {
209-
210-
@Override
211-
public void configure(IntegrationFlowDefinition<?> flow) {
212-
flow.bridge();
213-
}
214-
215-
}
216-
217-
IntegrationFlow testFlow = new MyIntegrationFlow();
218-
219-
// This is fine because we are not going to start it automatically.
220-
assertNotNull(this.integrationFlowContext.registration(testFlow)
221-
.autoStartup(false)
222-
.register());
223-
224-
try {
225-
this.integrationFlowContext.registration(testFlow).register();
226-
fail("IllegalStateException expected");
227-
}
228-
catch (Exception e) {
229-
assertThat(e, instanceOf(IllegalStateException.class));
230-
assertThat(e.getMessage(), containsString("Consider to implement it for [" + testFlow + "]."));
231-
}
232-
233207
try {
234208
this.integrationFlowContext.remove("foo");
235209
fail("IllegalStateException expected");

0 commit comments

Comments
 (0)