Skip to content

Commit 97d08d9

Browse files
garyrussellartembilan
authored andcommitted
INT-4473: Support prefix bean names with flow id
JIRA: https://jira.spring.io/browse/INT-4473 Previously, dynamic registration of integration flows with components configured with the same `id` would fail with duplicate bean names. Add `useFlowIdAsPrefix()` to the registration builder to enable the option. Then, in the BPP, check the flag before naming the beans. **cherry-pick to 5.0.x** * Polishing according PR comments * Widen `flowNamePrefix` responsibility in the `IntegrationFlowBeanPostProcessor` since we may have many other components in the dynamic flow with the same id, not only consumer endpoints
1 parent 8a61e16 commit 97d08d9

File tree

5 files changed

+117
-13
lines changed

5 files changed

+117
-13
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowBeanPostProcessor.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.springframework.integration.config.IntegrationConfigUtils;
4545
import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean;
4646
import org.springframework.integration.core.MessageSource;
47+
import org.springframework.integration.dsl.context.IntegrationFlowContext;
4748
import org.springframework.integration.dsl.support.MessageChannelReference;
4849
import org.springframework.integration.gateway.AnnotationGatewayProxyFactoryBean;
4950
import org.springframework.integration.support.context.NamedComponent;
@@ -67,6 +68,8 @@ public class IntegrationFlowBeanPostProcessor
6768

6869
private ConfigurableListableBeanFactory beanFactory;
6970

71+
private IntegrationFlowContext flowContext;
72+
7073
@Override
7174
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
7275
Assert.isInstanceOf(ConfigurableListableBeanFactory.class, beanFactory,
@@ -75,6 +78,8 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
7578
);
7679

7780
this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
81+
this.flowContext = this.beanFactory.getBean(IntegrationFlowContext.class);
82+
Assert.notNull(this.flowContext, "There must be an IntegrationFlowContext in the application context");
7883
}
7984

8085
@Override
@@ -100,14 +105,15 @@ public void afterSingletonsInstantiated() {
100105
throw new BeanCreationNotAllowedException(beanName, "IntegrationFlows can not be scoped beans. " +
101106
"Any dependant beans are registered as singletons, meanwhile IntegrationFlow is just a " +
102107
"logical container for them. \n" +
103-
"Consider to use [IntegrationFlowContext] for manual registration of IntegrationFlows.");
108+
"Consider using [IntegrationFlowContext] for manual registration of IntegrationFlows.");
104109
}
105110
}
106111
}
107112
}
108113

109114
private Object processStandardIntegrationFlow(StandardIntegrationFlow flow, String flowBeanName) {
110115
String flowNamePrefix = flowBeanName + ".";
116+
boolean useFlowIdAsPrefix = this.flowContext.isUseIdAsPrefix(flowBeanName);
111117
int subFlowNameIndex = 0;
112118
int channelNameIndex = 0;
113119

@@ -123,7 +129,10 @@ private Object processStandardIntegrationFlow(StandardIntegrationFlow flow, Stri
123129
String id = endpointSpec.getId();
124130

125131
if (id == null) {
126-
id = generateBeanName(endpoint, flowNamePrefix, entry.getValue());
132+
id = generateBeanName(endpoint, flowNamePrefix, entry.getValue(), useFlowIdAsPrefix);
133+
}
134+
else if (useFlowIdAsPrefix) {
135+
id = flowNamePrefix + id;
127136
}
128137

129138
Collection<?> messageHandlers =
@@ -185,13 +194,19 @@ else if (component instanceof SourcePollingChannelAdapterSpec) {
185194
.contains(o.getKey()))
186195
.forEach(o ->
187196
registerComponent(o.getKey(),
188-
generateBeanName(o.getKey(), flowNamePrefix, o.getValue())));
197+
generateBeanName(o.getKey(), flowNamePrefix, o.getValue(),
198+
useFlowIdAsPrefix)));
189199
}
190200
SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = spec.get().getT1();
191201
String id = spec.getId();
192-
if (!StringUtils.hasText(id)) {
193-
id = generateBeanName(pollingChannelAdapterFactoryBean, flowNamePrefix, entry.getValue());
202+
if (id == null) {
203+
id = generateBeanName(pollingChannelAdapterFactoryBean, flowNamePrefix, entry.getValue(),
204+
useFlowIdAsPrefix);
205+
}
206+
else if (useFlowIdAsPrefix) {
207+
id = flowNamePrefix + id;
194208
}
209+
195210
registerComponent(pollingChannelAdapterFactoryBean, id, flowBeanName);
196211
targetIntegrationComponents.put(pollingChannelAdapterFactoryBean, id);
197212

@@ -236,7 +251,9 @@ else if (component instanceof AnnotationGatewayProxyFactoryBean) {
236251
targetIntegrationComponents.put(component, gatewayId);
237252
}
238253
else {
239-
String generatedBeanName = generateBeanName(component, flowNamePrefix, entry.getValue());
254+
String generatedBeanName =
255+
generateBeanName(component, flowNamePrefix, entry.getValue(), useFlowIdAsPrefix);
256+
240257
registerComponent(component, generatedBeanName, flowBeanName);
241258
targetIntegrationComponents.put(component, generatedBeanName);
242259
}
@@ -334,15 +351,19 @@ private void registerComponent(Object component, String beanName, String parentN
334351
}
335352

336353
private String generateBeanName(Object instance, String prefix) {
337-
return generateBeanName(instance, prefix, null);
354+
return generateBeanName(instance, prefix, null, false);
338355
}
339356

340-
private String generateBeanName(Object instance, String prefix, String fallbackId) {
357+
private String generateBeanName(Object instance, String prefix, String fallbackId, boolean useFlowIdAsPrefix) {
341358
if (instance instanceof NamedComponent && ((NamedComponent) instance).getComponentName() != null) {
342-
return ((NamedComponent) instance).getComponentName();
359+
return useFlowIdAsPrefix
360+
? prefix + ((NamedComponent) instance).getComponentName()
361+
: ((NamedComponent) instance).getComponentName();
343362
}
344363
else if (fallbackId != null) {
345-
return fallbackId;
364+
return useFlowIdAsPrefix
365+
? prefix + fallbackId
366+
: fallbackId;
346367
}
347368

348369
String generatedBeanName = prefix + instance.getClass().getName();

spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowContext.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,16 @@ public interface IntegrationFlowContext {
9595
*/
9696
Map<String, IntegrationFlowRegistration> getRegistry();
9797

98+
/**
99+
* Return true to prefix flow bean names with the flow id and a period.
100+
* @param flowId the flow id.
101+
* @return true to use as a prefix.
102+
* @since 5.0.6
103+
*/
104+
default boolean isUseIdAsPrefix(String flowId) {
105+
return false;
106+
}
107+
98108
/**
99109
* @author Gary Russell
100110
* @since 5.1
@@ -209,6 +219,19 @@ interface IntegrationFlowRegistrationBuilder {
209219
*/
210220
IntegrationFlowRegistrationBuilder addBean(String name, Object bean);
211221

222+
/**
223+
* Invoke this method to prefix bean names in the flow with the (required) flow id
224+
* and a period. This is useful if you wish to register the same flow multiple times
225+
* while retaining the ability to reference beans within the flow; adding the unique
226+
* flow id to the bean name makes the name unique.
227+
* @return the current builder instance.
228+
* @see #id(String)
229+
* @since 5.0.6
230+
*/
231+
default IntegrationFlowRegistrationBuilder useFlowIdAsPrefix() {
232+
return this;
233+
}
234+
212235
/**
213236
* Register an {@link IntegrationFlow} and all the dependant and support components
214237
* in the application context and return an associated {@link IntegrationFlowRegistration}

spring-integration-core/src/main/java/org/springframework/integration/dsl/context/StandardIntegrationFlowContext.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.integration.support.context.NamedComponent;
3838
import org.springframework.messaging.MessageChannel;
3939
import org.springframework.util.Assert;
40+
import org.springframework.util.StringUtils;
4041

4142
/**
4243
* Standard implementation of {@link IntegrationFlowContext}.
@@ -51,6 +52,8 @@ public final class StandardIntegrationFlowContext implements IntegrationFlowCont
5152

5253
private final Map<String, IntegrationFlowRegistration> registry = new ConcurrentHashMap<>();
5354

55+
private final Map<String, Boolean> useFlowIdAsPrefix = new ConcurrentHashMap<>();
56+
5457
private final Lock registerFlowsLock = new ReentrantLock();
5558

5659
private ConfigurableListableBeanFactory beanFactory;
@@ -78,6 +81,11 @@ public StandardIntegrationFlowRegistrationBuilder registration(IntegrationFlow i
7881
return new StandardIntegrationFlowRegistrationBuilder(integrationFlow);
7982
}
8083

84+
@Override
85+
public boolean isUseIdAsPrefix(String flowId) {
86+
return Boolean.TRUE.equals(this.useFlowIdAsPrefix.get(flowId));
87+
}
88+
8189
private void register(StandardIntegrationFlowRegistrationBuilder builder) {
8290
IntegrationFlow integrationFlow = builder.integrationFlowRegistration.getIntegrationFlow();
8391
String flowId = builder.integrationFlowRegistration.getId();
@@ -224,6 +232,8 @@ public final class StandardIntegrationFlowRegistrationBuilder implements Integra
224232

225233
private boolean autoStartup = true;
226234

235+
private boolean idAsPrefix;
236+
227237
StandardIntegrationFlowRegistrationBuilder(IntegrationFlow integrationFlow) {
228238
this.integrationFlowRegistration = new StandardIntegrationFlowRegistration(integrationFlow);
229239
this.integrationFlowRegistration.setBeanFactory(StandardIntegrationFlowContext.this.beanFactory);
@@ -282,6 +292,12 @@ public StandardIntegrationFlowRegistrationBuilder addBean(String name, Object be
282292
return this;
283293
}
284294

295+
@Override
296+
public IntegrationFlowRegistrationBuilder useFlowIdAsPrefix() {
297+
this.idAsPrefix = true;
298+
return this;
299+
}
300+
285301
/**
286302
* Register an {@link IntegrationFlow} and all the dependant and support components
287303
* in the application context and return an associated {@link IntegrationFlowRegistration}
@@ -290,6 +306,12 @@ public StandardIntegrationFlowRegistrationBuilder addBean(String name, Object be
290306
*/
291307
@Override
292308
public IntegrationFlowRegistration register() {
309+
String id = this.integrationFlowRegistration.getId();
310+
Assert.state(!this.idAsPrefix || StringUtils.hasText(id),
311+
"An 'id' must be present to use 'useFlowIdAsPrefix'");
312+
if (this.idAsPrefix) {
313+
StandardIntegrationFlowContext.this.useFlowIdAsPrefix.put(id, this.idAsPrefix);
314+
}
293315
StandardIntegrationFlowContext.this.register(this);
294316
return this.integrationFlowRegistration;
295317
}

spring-integration-core/src/test/java/org/springframework/integration/dsl/manualflow/ManualFlowTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,12 +171,13 @@ public void testManualFlowRegistration() throws InterruptedException {
171171
.fixedDelay(10)
172172
.maxMessagesPerPoll(1)
173173
.receiveTimeout(10)))
174-
.handle(new BeanFactoryHandler());
174+
.handle(new BeanFactoryHandler(), e -> e.id("anId"));
175175

176176
BeanFactoryHandler additionalBean = new BeanFactoryHandler();
177177
IntegrationFlowRegistration flowRegistration =
178178
this.integrationFlowContext.registration(myFlow)
179179
.id(flowId)
180+
.useFlowIdAsPrefix()
180181
.addBean(additionalBean)
181182
.register();
182183

@@ -185,6 +186,7 @@ public void testManualFlowRegistration() throws InterruptedException {
185186
BeanFactoryHandler.class);
186187
assertSame(additionalBean, bean);
187188
assertSame(this.beanFactory, bean.beanFactory);
189+
bean = this.beanFactory.getBean(flowRegistration.getId() + "." + "anId.handler", BeanFactoryHandler.class);
188190

189191
MessagingTemplate messagingTemplate = flowRegistration.getMessagingTemplate();
190192
messagingTemplate.setReceiveTimeout(10000);

src/reference/asciidoc/dsl.adoc

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,7 @@ And Lambda flow can't start from `MessageSource` or `MessageProducer`.
580580

581581
Starting _version 5.1_, this kind of `IntegrationFlow` are wrapped to the proxy for exposing lifecycle control and provide access to the `inputChannel` of the internally associated `StandardIntegrationFlow`.
582582

583-
Starting with _version 5.0.5_, the generated bean names for the components in an `IntegrationFlow` include the flow bean followed by a dot as a prefix.
583+
Starting with _version 5.0.6_, the generated bean names for the components in an `IntegrationFlow` include the flow bean followed by a dot as a prefix.
584584
For example the `ConsumerEndpointFactoryBean` for the `.transform("Hello "::concat)` in the sample above, will end up with te bean name like `lambdaFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0`.
585585
The `Transformer` implementation bean for that endpoint will have a bean name such as `lambdaFlow.org.springframework.integration.transformer.MethodInvokingTransformer#0`.
586586
These generated bean names are prepended with the flow id prefix for purposes such as parsing logs or grouping components together in some analysis tool, as well as to avoid a race condition when we concurrently register integration flows at runtime.
@@ -933,10 +933,46 @@ Usually those additional beans are connection factories (AMQP, JMS, (S)FTP, TCP/
933933
Such a dynamically registered `IntegrationFlow` and all its dependant beans can be removed afterwards using `IntegrationFlowRegistration.destroy()` callback.
934934
See `IntegrationFlowContext` JavaDocs for more information.
935935

936-
NOTE: Starting with _version 5.0.5_, all generated bean names in an `IntegrationFlow` definition are prepended with flow id as a prefix.
936+
NOTE: Starting with _version 5.0.6_, all generated bean names in an `IntegrationFlow` definition are prepended with flow id as a prefix.
937937
It is recommended to always specify an explicit flow id, otherwise a synchronization barrier is initiated in the `IntegrationFlowContext` to generate the bean name for the `IntegrationFlow` and register its beans.
938938
We synchronize on these two operations to avoid a race condition when the same generated bean name may be used for different `IntegrationFlow` instances.
939939

940+
Also, starting with _version 5.0.6_, the registration builder API has a new method `useFlowIdAsPrefix()`.
941+
This is useful if you wish to declare multiple instances of the same flow and avoid bean name collisions if components in the flows have the same id.
942+
943+
For example:
944+
945+
[source, java]
946+
----
947+
private void registerFlows() {
948+
IntegrationFlowRegistration flow1 =
949+
this.flowContext.registration(buildFlow(1234))
950+
.id("tcp1")
951+
.useFlowIdAsPrefix()
952+
.register();
953+
954+
IntegrationFlowRegistration flow2 =
955+
this.flowContext.registration(buildFlow(1235))
956+
.id("tcp2")
957+
.useFlowIdAsPrefix()
958+
.register();
959+
}
960+
961+
private IntegrationFlow buildFlow(int port) {
962+
return f -> f
963+
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
964+
.serializer(TcpCodecs.crlf())
965+
.deserializer(TcpCodecs.lengthHeader1())
966+
.id("client"))
967+
.remoteTimeout(m -> 5000))
968+
.transform(Transformers.objectToString());
969+
}
970+
----
971+
972+
In this case, the message handler for the first flow can be referenced with bean name `tcp1.client.handler`.
973+
974+
NOTE: an `id` is required when using `useFlowIdAsPrefix()`.
975+
940976
[[java-dsl-gateway]]
941977
=== IntegrationFlow as Gateway
942978

0 commit comments

Comments
 (0)