Skip to content

Commit 7d16805

Browse files
artembilangaryrussell
authored andcommitted
INT-4411: DSL: Fix sub-flows for dynamic routers
JIRA: https://jira.spring.io/browse/INT-4411 The `RouterSpec.RouterMappingProvider` relies on the `ContextRefreshedEvent` which happens only during application start up. When we register `IntegrationFlow` at runtime, this event doesn't happen and therefore sub-flow mappings don't populated. * Fix `RouterSpec.RouterMappingProvider` to parse sub-flow mappings in the `onInit()` * Reorder components registration for the router in the `IntegrationFlowDefinition` to let lately sub-flows to start earlier, then lifecycles in the main flow
1 parent f608493 commit 7d16805

File tree

3 files changed

+74
-43
lines changed

3 files changed

+74
-43
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -535,7 +535,7 @@ public <S, T> B transform(GenericTransformer<S, T> genericTransformer) {
535535
* {@link org.springframework.integration.handler.MessageProcessor} from provided {@link MessageProcessorSpec}.
536536
* <pre class="code">
537537
* {@code
538-
* .transform(Scripts.script("classpath:myScript.py").valiable("foo", bar()))
538+
* .transform(Scripts.script("classpath:myScript.py").variable("foo", bar()))
539539
* }
540540
* </pre>
541541
* @param messageProcessorSpec the {@link MessageProcessorSpec} to use.
@@ -552,7 +552,7 @@ public B transform(MessageProcessorSpec<?> messageProcessorSpec) {
552552
* In addition accept options for the integration endpoint using {@link GenericEndpointSpec}.
553553
* <pre class="code">
554554
* {@code
555-
* .transform(Scripts.script("classpath:myScript.py").valiable("foo", bar()),
555+
* .transform(Scripts.script("classpath:myScript.py").variable("foo", bar()),
556556
* e -> e.autoStartup(false))
557557
* }
558558
* </pre>
@@ -1973,7 +1973,16 @@ private <R extends AbstractMessageRouter, S extends AbstractRouterSpec<S, R>> B
19731973

19741974
BridgeHandler bridgeHandler = new BridgeHandler();
19751975
boolean registerSubflowBridge = false;
1976-
Map<Object, String> componentsToRegister = routerSpec.getComponentsToRegister();
1976+
1977+
Map<Object, String> componentsToRegister = null;
1978+
Map<Object, String> routerComponents = routerSpec.getComponentsToRegister();
1979+
if (routerComponents != null) {
1980+
componentsToRegister = new LinkedHashMap<>(routerComponents);
1981+
routerComponents.clear();
1982+
}
1983+
1984+
register(routerSpec, null);
1985+
19771986
if (!CollectionUtils.isEmpty(componentsToRegister)) {
19781987
for (Map.Entry<Object, String> entry : componentsToRegister.entrySet()) {
19791988
Object component = entry.getKey();
@@ -1991,12 +2000,6 @@ private <R extends AbstractMessageRouter, S extends AbstractRouterSpec<S, R>> B
19912000
}
19922001
}
19932002

1994-
if (componentsToRegister != null) {
1995-
componentsToRegister.clear();
1996-
}
1997-
1998-
register(routerSpec, null);
1999-
20002003
if (routerSpec.isDefaultToParentFlow()) {
20012004
routerSpec.defaultOutputChannel(new FixedSubscriberChannel(bridgeHandler));
20022005
registerSubflowBridge = true;

spring-integration-core/src/main/java/org/springframework/integration/dsl/RouterSpec.java

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,10 +18,7 @@
1818

1919
import java.util.HashMap;
2020
import java.util.Map;
21-
import java.util.concurrent.atomic.AtomicBoolean;
2221

23-
import org.springframework.context.ApplicationListener;
24-
import org.springframework.context.event.ContextRefreshedEvent;
2522
import org.springframework.core.convert.ConversionService;
2623
import org.springframework.core.convert.support.DefaultConversionService;
2724
import org.springframework.integration.channel.DirectChannel;
@@ -179,14 +176,11 @@ public Map<Object, String> getComponentsToRegister() {
179176
return super.getComponentsToRegister();
180177
}
181178

182-
private static class RouterMappingProvider extends IntegrationObjectSupport
183-
implements ApplicationListener<ContextRefreshedEvent> {
184-
185-
private final AtomicBoolean initialized = new AtomicBoolean();
179+
private static class RouterMappingProvider extends IntegrationObjectSupport {
186180

187181
private final MappingMessageRouterManagement router;
188182

189-
private final Map<Object, NamedComponent> mapping = new HashMap<Object, NamedComponent>();
183+
private final Map<Object, NamedComponent> mapping = new HashMap<>();
190184

191185
RouterMappingProvider(MappingMessageRouterManagement router) {
192186
this.router = router;
@@ -197,31 +191,30 @@ void addMapping(Object key, NamedComponent channel) {
197191
}
198192

199193
@Override
200-
public void onApplicationEvent(ContextRefreshedEvent event) {
201-
if (event.getApplicationContext() == getApplicationContext() && !this.initialized.getAndSet(true)) {
202-
ConversionService conversionService = getConversionService();
203-
if (conversionService == null) {
204-
conversionService = DefaultConversionService.getSharedInstance();
194+
protected void onInit() throws Exception {
195+
super.onInit();
196+
ConversionService conversionService = getConversionService();
197+
if (conversionService == null) {
198+
conversionService = DefaultConversionService.getSharedInstance();
199+
}
200+
for (Map.Entry<Object, NamedComponent> entry : this.mapping.entrySet()) {
201+
Object key = entry.getKey();
202+
String channelKey;
203+
if (key instanceof String) {
204+
channelKey = (String) key;
205+
}
206+
else if (key instanceof Class) {
207+
channelKey = ((Class<?>) key).getName();
205208
}
206-
for (Map.Entry<Object, NamedComponent> entry : this.mapping.entrySet()) {
207-
Object key = entry.getKey();
208-
String channelKey;
209-
if (key instanceof String) {
210-
channelKey = (String) key;
211-
}
212-
else if (key instanceof Class) {
213-
channelKey = ((Class<?>) key).getName();
214-
}
215-
else if (conversionService.canConvert(key.getClass(), String.class)) {
216-
channelKey = conversionService.convert(key, String.class);
217-
}
218-
else {
219-
throw new MessagingException("Unsupported channel mapping type for router ["
220-
+ key.getClass() + "]");
221-
}
222-
223-
this.router.setChannelMapping(channelKey, entry.getValue().getComponentName());
209+
else if (conversionService.canConvert(key.getClass(), String.class)) {
210+
channelKey = conversionService.convert(key, String.class);
224211
}
212+
else {
213+
throw new MessagingException("Unsupported channel mapping type for router ["
214+
+ key.getClass() + "]");
215+
}
216+
217+
this.router.setChannelMapping(channelKey, entry.getValue().getComponentName());
225218
}
226219
}
227220

spring-integration-core/src/test/java/org/springframework/integration/dsl/manualflow/ManualFlowTests.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,11 +21,13 @@
2121
import static org.junit.Assert.assertEquals;
2222
import static org.junit.Assert.assertFalse;
2323
import static org.junit.Assert.assertNotNull;
24+
import static org.junit.Assert.assertNull;
2425
import static org.junit.Assert.assertSame;
2526
import static org.junit.Assert.assertThat;
2627
import static org.junit.Assert.assertTrue;
2728
import static org.junit.Assert.fail;
2829

30+
import java.util.Arrays;
2931
import java.util.Date;
3032
import java.util.Objects;
3133
import java.util.concurrent.atomic.AtomicBoolean;
@@ -70,6 +72,8 @@
7072
import org.springframework.test.context.ContextConfiguration;
7173
import org.springframework.test.context.junit4.SpringRunner;
7274

75+
import reactor.core.publisher.Flux;
76+
7377
/**
7478
* @author Artem Bilan
7579
* @author Gary Russell
@@ -356,6 +360,37 @@ public void testRoleControl() {
356360
assertTrue(this.roleController.getEndpointsRunningStatus(testRole).isEmpty());
357361
}
358362

363+
@Test
364+
public void testDynaSubFlowCreation() {
365+
Flux<Message<?>> messageFlux =
366+
Flux.just("1,2,3,4")
367+
.map(v -> v.split(","))
368+
.flatMapIterable(Arrays::asList)
369+
.map(Integer::parseInt)
370+
.map(GenericMessage::new);
371+
372+
QueueChannel resultChannel = new QueueChannel();
373+
374+
IntegrationFlow integrationFlow = IntegrationFlows
375+
.from(messageFlux)
376+
.<Integer, Boolean>route(p -> p % 2 == 0, m -> m
377+
.subFlowMapping(true, sf -> sf.<Integer, String>transform(em -> "even:" + em))
378+
.subFlowMapping(false, sf -> sf.<Integer, String>transform(em -> "odd:" + em))
379+
.defaultOutputToParentFlow()
380+
)
381+
.channel(resultChannel)
382+
.get();
383+
384+
this.integrationFlowContext.registration(integrationFlow).register();
385+
386+
for (int i = 0; i < 4; i++) {
387+
Message<?> receive = resultChannel.receive(10_000);
388+
assertNotNull(receive);
389+
}
390+
391+
assertNull(resultChannel.receive(0));
392+
}
393+
359394
@Configuration
360395
@EnableIntegration
361396
public static class RootConfiguration {

0 commit comments

Comments
 (0)