Skip to content

Commit b547c92

Browse files
committed
Use Flux.handle() instead of doOnNext() in FluxMC
For better back-pressure handling do not perform active operations directly on the current `Flux` (e.g. via `doOnNext()`). It's better to postpone such a handling to back-pressure ready operators Like in this `FluxMessageChannel` case, the `.handle()` operator is wrapping the "hard" `send()` operation. Also include `.errorStrategyContinue()` do not stop during message processing
1 parent a62832c commit b547c92

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class FluxMessageChannel extends AbstractMessageChannel
4545

4646
private final List<Subscriber<? super Message<?>>> subscribers = new ArrayList<>();
4747

48-
private final Map<Publisher<Message<?>>, ConnectableFlux<Message<?>>> publishers = new ConcurrentHashMap<>();
48+
private final Map<Publisher<Message<?>>, ConnectableFlux<?>> publishers = new ConcurrentHashMap<>();
4949

5050
private final Flux<Message<?>> flux;
5151

@@ -79,10 +79,11 @@ public void subscribe(Subscriber<? super Message<?>> subscriber) {
7979

8080
@Override
8181
public void subscribeTo(Publisher<Message<?>> publisher) {
82-
ConnectableFlux<Message<?>> connectableFlux =
82+
ConnectableFlux<?> connectableFlux =
8383
Flux.from(publisher)
84+
.handle((message, sink) -> sink.next(send(message)))
85+
.errorStrategyContinue()
8486
.doOnComplete(() -> this.publishers.remove(publisher))
85-
.doOnNext(this::send)
8687
.publish();
8788

8889
this.publishers.put(publisher, connectableFlux);

0 commit comments

Comments
 (0)