Skip to content

Commit 212bd46

Browse files
GH-3763: Add handleReactive() for Java DSL (#8605)
* GH-3763: Add `handleReactive()` for Java DSL Fixes #3763 Add a convenient terminal operator to `BaseIntegrationFlowDefinition` based on a `ReactiveMessageHandler`. Also add an overload like `handleReactive(ReactiveMessageHandlerSpec)` to let end-user to choose a protocol-specific channel adapter * * Fix `Namespace Factory` wording in the `BaseIntegrationFlowDefinition` Javadocs * Fix language in Docs Co-authored-by: Gary Russell <[email protected]> --------- Co-authored-by: Gary Russell <[email protected]>
1 parent 8291fb9 commit 212bd46

File tree

7 files changed

+109
-17
lines changed

7 files changed

+109
-17
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java

+70-4
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.springframework.integration.handler.LoggingHandler;
6666
import org.springframework.integration.handler.MessageProcessor;
6767
import org.springframework.integration.handler.MessageTriggerAction;
68+
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
6869
import org.springframework.integration.handler.ServiceActivatingHandler;
6970
import org.springframework.integration.router.AbstractMessageRouter;
7071
import org.springframework.integration.router.ErrorMessageExceptionTypeRouter;
@@ -91,6 +92,7 @@
9192
import org.springframework.messaging.Message;
9293
import org.springframework.messaging.MessageChannel;
9394
import org.springframework.messaging.MessageHandler;
95+
import org.springframework.messaging.ReactiveMessageHandler;
9496
import org.springframework.messaging.support.ChannelInterceptor;
9597
import org.springframework.messaging.support.InterceptableChannel;
9698
import org.springframework.util.Assert;
@@ -891,7 +893,8 @@ public <P> B filter(@Nullable Class<P> expectedType, GenericSelector<P> genericS
891893

892894
/**
893895
* Populate a {@link ServiceActivatingHandler} for the selected protocol specific
894-
* {@link MessageHandler} implementation from {@code Namespace Factory}:
896+
* {@link MessageHandler} implementation
897+
* from the respective namespace factory (e.g. {@code Http, Kafka, Files}):
895898
* <pre class="code">
896899
* {@code
897900
* .handle(Amqp.outboundAdapter(this.amqpTemplate).routingKeyExpression("headers.routingKey"))
@@ -1097,7 +1100,8 @@ public B handle(MessageProcessorSpec<?> messageProcessorSpec,
10971100

10981101
/**
10991102
* Populate a {@link ServiceActivatingHandler} for the selected protocol specific
1100-
* {@link MessageHandler} implementation from {@code Namespace Factory}:
1103+
* {@link MessageHandler} implementation
1104+
* from the respective namespace factory (e.g. {@code Http, Kafka, Files}).
11011105
* In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}.
11021106
* Typically, used with a Lambda expression:
11031107
* <pre class="code">
@@ -1221,7 +1225,7 @@ public B enrich(Consumer<EnricherSpec> enricherConfigurer) {
12211225
* Populate a {@link MessageTransformingHandler} for
12221226
* a {@link org.springframework.integration.transformer.HeaderEnricher}
12231227
* using header values from provided {@link MapBuilder}.
1224-
* Can be used together with {@code Namespace Factory}:
1228+
* Can be used together with a namespace factory:
12251229
* <pre class="code">
12261230
* {@code
12271231
* .enrichHeaders(Mail.headers()
@@ -1242,7 +1246,7 @@ public B enrichHeaders(MapBuilder<?, String, Object> headers) {
12421246
* a {@link org.springframework.integration.transformer.HeaderEnricher}
12431247
* using header values from provided {@link MapBuilder}.
12441248
* In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}.
1245-
* Can be used together with {@code Namespace Factory}:
1249+
* Can be used together with a namespace factory:
12461250
* <pre class="code">
12471251
* {@code
12481252
* .enrichHeaders(Mail.headers()
@@ -2916,6 +2920,68 @@ public IntegrationFlow nullChannel() {
29162920
.get();
29172921
}
29182922

2923+
/**
2924+
* Populate a terminal consumer endpoint for the selected protocol specific
2925+
* {@link MessageHandler} implementation
2926+
* from the respective namespace factory (e.g. {@code Http, Kafka, Files}).
2927+
* In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}.
2928+
* @param messageHandlerSpec the {@link MessageHandlerSpec} to configure the protocol specific
2929+
* {@link MessageHandler}.
2930+
* @param <H> the {@link MessageHandler} type.
2931+
* @return the current {@link BaseIntegrationFlowDefinition}.
2932+
* @since 6.1
2933+
*/
2934+
public <H extends ReactiveMessageHandler> IntegrationFlow handleReactive(
2935+
ReactiveMessageHandlerSpec<?, H> messageHandlerSpec) {
2936+
2937+
return handleReactive(messageHandlerSpec, null);
2938+
}
2939+
2940+
/**
2941+
* Populate a terminal consumer endpoint for the selected protocol specific
2942+
* {@link MessageHandler} implementation
2943+
* from the respective namespace factory (e.g. {@code Http, Kafka, Files}).
2944+
* In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}.
2945+
* @param messageHandlerSpec the {@link MessageHandlerSpec} to configure the protocol specific
2946+
* {@link MessageHandler}.
2947+
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
2948+
* @param <H> the {@link MessageHandler} type.
2949+
* @return the current {@link BaseIntegrationFlowDefinition}.
2950+
* @since 6.1
2951+
*/
2952+
public <H extends ReactiveMessageHandler> IntegrationFlow handleReactive(
2953+
ReactiveMessageHandlerSpec<?, H> messageHandlerSpec,
2954+
@Nullable Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> endpointConfigurer) {
2955+
2956+
return
2957+
addComponents(messageHandlerSpec.getComponentsToRegister()).
2958+
handleReactive(messageHandlerSpec.getObject().getDelegate(), endpointConfigurer);
2959+
}
2960+
2961+
/**
2962+
* Add a {@link ReactiveMessageHandler} as a terminal {@link IntegrationFlow} operator.
2963+
* @param reactiveMessageHandler the {@link ReactiveMessageHandler} to finish the flow.
2964+
* @return The {@link IntegrationFlow} instance based on this definition.
2965+
* @since 6.1
2966+
*/
2967+
public IntegrationFlow handleReactive(ReactiveMessageHandler reactiveMessageHandler) {
2968+
return handleReactive(reactiveMessageHandler, null);
2969+
}
2970+
2971+
/**
2972+
* Add a {@link ReactiveMessageHandler} as a terminal {@link IntegrationFlow} operator.
2973+
* @param reactiveMessageHandler the {@link ReactiveMessageHandler} to finish the flow.
2974+
* @param endpointConfigurer the {@link Consumer} to configure a target endpoint for the handler.
2975+
* @return The {@link IntegrationFlow} instance based on this definition.
2976+
* @since 6.1
2977+
*/
2978+
public IntegrationFlow handleReactive(ReactiveMessageHandler reactiveMessageHandler,
2979+
@Nullable Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> endpointConfigurer) {
2980+
2981+
return handle(new ReactiveMessageHandlerAdapter(reactiveMessageHandler), endpointConfigurer)
2982+
.get();
2983+
}
2984+
29192985
/**
29202986
* Finish this flow with delegation to other {@link IntegrationFlow} instance.
29212987
* @param other the {@link IntegrationFlow} to compose with.

spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
7171
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
7272
import org.springframework.integration.handler.LoggingHandler;
73-
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
7473
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
7574
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
7675
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
@@ -718,8 +717,7 @@ public MessageHandler loggingMessageHandler() {
718717
public IntegrationFlow wireTapFlow1() {
719718
return IntegrationFlow.from("tappedChannel1")
720719
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
721-
.handle(new ReactiveMessageHandlerAdapter((message) -> Mono.just(message).log().then()))
722-
.get();
720+
.handleReactive((message) -> Mono.just(message).log().then());
723721
}
724722

725723
@Bean

spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/dsl/MongoDbTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway(
425425
public IntegrationFlow reactiveStore() {
426426
return f -> f
427427
.channel(MessageChannels.flux())
428-
.handle(MongoDb.reactiveOutboundChannelAdapter(REACTIVE_MONGO_DATABASE_FACTORY));
428+
.handleReactive(MongoDb.reactiveOutboundChannelAdapter(REACTIVE_MONGO_DATABASE_FACTORY));
429429
}
430430

431431
}

spring-integration-r2dbc/src/test/java/org/springframework/integration/r2dbc/dsl/R2dbcDslTests.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -114,12 +114,12 @@ IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
114114
e -> e.poller(p -> p.fixedDelay(100)).autoStartup(false).id("r2dbcInboundChannelAdapter"))
115115
.<Mono<?>>handle((p, h) -> p, e -> e.async(true))
116116
.channel(MessageChannels.flux())
117-
.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
118-
.queryType(R2dbcMessageHandler.Type.UPDATE)
119-
.tableNameExpression("payload.class.simpleName")
120-
.criteria((message) -> Criteria.where("id").is(2))
121-
.values("{age:36}"))
122-
.get();
117+
.handleReactive(
118+
R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
119+
.queryType(R2dbcMessageHandler.Type.UPDATE)
120+
.tableNameExpression("payload.class.simpleName")
121+
.criteria((message) -> Criteria.where("id").is(2))
122+
.values("{age:36}"));
123123
}
124124

125125
}

src/reference/asciidoc/r2dbc.adoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,10 @@ With Java DSL a configuration for this channel adapter is like this:
110110
====
111111
[source, java]
112112
----
113-
.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
113+
.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
114114
.queryType(R2dbcMessageHandler.Type.UPDATE)
115115
.tableNameExpression("payload.class.simpleName")
116116
.criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
117117
.values("{age:36}"))
118118
----
119-
====
119+
====

src/reference/asciidoc/reactive-streams.adoc

+23
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,29 @@ However, when a `ReactiveStreamsConsumer` is involved in the flow (e.g. when cha
154154
One of the out-of-the-box `ReactiveMessageHandler` implementation is a `ReactiveMongoDbStoringMessageHandler` for Outbound Channel Adapter.
155155
See <<./mongodb.adoc#mongodb-reactive-channel-adapters,MongoDB Reactive Channel Adapters>> for more information.
156156

157+
Starting with version 6.1, the `IntegrationFlowDefinition` exposes a convenient `handleReactive(ReactiveMessageHandler)` terminal operator.
158+
Any `ReactiveMessageHandler` implementation (even just a plain lambda using the `Mono` API) can be used for this operator.
159+
The framework subscribes to the returned `Mono<Void>` automatically.
160+
Here is a simple sample of possible configuration for this operator:
161+
162+
====
163+
[source, java]
164+
----
165+
@Bean
166+
public IntegrationFlow wireTapFlow1() {
167+
return IntegrationFlow.from("tappedChannel1")
168+
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
169+
.handleReactive((message) -> Mono.just(message).log().then());
170+
}
171+
----
172+
====
173+
174+
An overloaded version of this operator accepts a `Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>` to customize a consumer endpoint around the provided `ReactiveMessageHandler`.
175+
176+
In addition, a `ReactiveMessageHandlerSpec`-based variants are also provided.
177+
In most cases they are used for protocol-specific channel adapter implementations.
178+
See the next section following links to the target technologies with respective reactive channel adapters.
179+
157180
[[reactive-channel-adapters]]
158181
=== Reactive Channel Adapters
159182

src/reference/asciidoc/whats-new.adoc

+5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ See <<./zip.adoc#zip,Zip Support>> for more information.
2929
The `ContextHolderRequestHandlerAdvice` allows to store a value from a request message into some context around `MessageHandler` execution.
3030
See <<./handler-advice.adoc#context-holder-advice, Context Holder Advice>> for more information.
3131

32+
[[x6.1-handle-reactive]]
33+
==== The `handleReactive()` operator for Java DSL
34+
The `IntegrationFlow` can now end with a convenient `handleReactive(ReactiveMessageHandler)` operator.
35+
See <<./reactive-streams.adoc#reactive-message-handler, `ReactiveMessageHandler`>> for more information.
36+
3237
[[x6.1-general]]
3338
=== General Changes
3439

0 commit comments

Comments
 (0)