Skip to content

Commit 17e794d

Browse files
artembilangaryrussell
authored andcommitted
INT-4471: PubSubChannel: Add errorHandler warn (#2459)
* INT-4471: PubSubChannel: Add errorHandler warn JIRA: https://jira.spring.io/browse/INT-4471 * When an `Executor` is not provided, log warn that the provided `ErrorHandler` is ignored. **Cherry-pick to 5.0.x and 4.3.x** * * Polish warn message
1 parent 9a88140 commit 17e794d

File tree

4 files changed

+45
-51
lines changed

4 files changed

+45
-51
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,19 +35,18 @@
3535
*/
3636
public class PublishSubscribeChannel extends AbstractExecutorChannel {
3737

38-
private volatile ErrorHandler errorHandler;
38+
private ErrorHandler errorHandler;
3939

40-
private volatile boolean ignoreFailures;
40+
private boolean ignoreFailures;
4141

42-
private volatile boolean applySequence;
42+
private boolean applySequence;
4343

44-
private volatile int minSubscribers;
44+
private int minSubscribers;
4545

4646
/**
4747
* Create a PublishSubscribeChannel that will use an {@link Executor}
4848
* to invoke the handlers. If this is null, each invocation will occur in
4949
* the message sender's thread.
50-
*
5150
* @param executor The executor.
5251
*/
5352
public PublishSubscribeChannel(Executor executor) {
@@ -79,9 +78,7 @@ public String getComponentType() {
7978
* a {@link MessagePublishingErrorHandler} that sends error messages to
8079
* the failed request Message's error channel header if available or to
8180
* the default 'errorChannel' otherwise.
82-
*
8381
* @param errorHandler The error handler.
84-
*
8582
* @see #PublishSubscribeChannel(Executor)
8683
*/
8784
public void setErrorHandler(ErrorHandler errorHandler) {
@@ -149,6 +146,14 @@ public final void onInit() throws Exception {
149146
getDispatcher().setApplySequence(this.applySequence);
150147
getDispatcher().setMinSubscribers(this.minSubscribers);
151148
}
149+
else if (this.errorHandler != null) {
150+
if (this.logger.isWarnEnabled()) {
151+
this.logger.warn("The 'errorHandler' is ignored for the '" + getComponentName() +
152+
"' (an 'executor' is not provided) and exceptions will be thrown " +
153+
"directly within the sending Thread");
154+
}
155+
}
156+
152157
if (this.maxSubscribers == null) {
153158
Integer maxSubscribers =
154159
getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS, Integer.class);
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
<publish-subscribe-channel id="channelWithApplySequenceEnabledAndTaskExecutor" apply-sequence="true" task-executor="pool"/>
2121

22-
<publish-subscribe-channel id="channelWithErrorHandler" error-handler="testErrorHandler"/>
22+
<publish-subscribe-channel id="channelWithErrorHandler" error-handler="testErrorHandler" task-executor="pool"/>
2323

2424
<task:executor id="pool" pool-size="1"/>
2525

Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,76 +26,71 @@
2626
import java.util.concurrent.Executor;
2727

2828
import org.junit.Test;
29+
import org.junit.runner.RunWith;
2930

3031
import org.springframework.beans.DirectFieldAccessor;
31-
import org.springframework.context.support.ClassPathXmlApplicationContext;
32+
import org.springframework.beans.factory.annotation.Autowired;
33+
import org.springframework.context.ApplicationContext;
3234
import org.springframework.integration.channel.PublishSubscribeChannel;
3335
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
3436
import org.springframework.integration.support.utils.IntegrationUtils;
3537
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
3638
import org.springframework.messaging.support.GenericMessage;
39+
import org.springframework.test.context.junit4.SpringRunner;
3740
import org.springframework.util.ErrorHandler;
3841

3942
/**
4043
* @author Mark Fisher
4144
* @author Gary Russell
4245
* @author Artem Bilan
4346
*/
47+
@RunWith(SpringRunner.class)
4448
public class PublishSubscribeChannelParserTests {
4549

50+
@Autowired
51+
private ApplicationContext context;
52+
4653
@Test
4754
public void defaultChannel() {
48-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
49-
"publishSubscribeChannelParserTests.xml", this.getClass());
50-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
51-
context.getBean("defaultChannel");
55+
PublishSubscribeChannel channel = this.context.getBean("defaultChannel", PublishSubscribeChannel.class);
5256
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
5357
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
5458
accessor.getPropertyValue("dispatcher");
5559
dispatcher.setApplySequence(true);
5660
dispatcher.addHandler(message -> { });
57-
dispatcher.dispatch(new GenericMessage<String>("foo"));
61+
dispatcher.dispatch(new GenericMessage<>("foo"));
5862
DirectFieldAccessor dispatcherAccessor = new DirectFieldAccessor(dispatcher);
5963
assertNull(dispatcherAccessor.getPropertyValue("executor"));
6064
assertFalse((Boolean) dispatcherAccessor.getPropertyValue("ignoreFailures"));
6165
assertTrue((Boolean) dispatcherAccessor.getPropertyValue("applySequence"));
62-
Object mbf = context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
66+
Object mbf = this.context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
6367
assertSame(mbf, dispatcherAccessor.getPropertyValue("messageBuilderFactory"));
64-
context.close();
6568
}
6669

6770
@Test
6871
public void ignoreFailures() {
69-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
70-
"publishSubscribeChannelParserTests.xml", this.getClass());
71-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
72-
context.getBean("channelWithIgnoreFailures");
72+
PublishSubscribeChannel channel =
73+
this.context.getBean("channelWithIgnoreFailures", PublishSubscribeChannel.class);
7374
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
7475
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
7576
accessor.getPropertyValue("dispatcher");
7677
assertTrue((Boolean) new DirectFieldAccessor(dispatcher).getPropertyValue("ignoreFailures"));
77-
context.close();
7878
}
7979

8080
@Test
8181
public void applySequenceEnabled() {
82-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
83-
"publishSubscribeChannelParserTests.xml", this.getClass());
84-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
85-
context.getBean("channelWithApplySequenceEnabled");
82+
PublishSubscribeChannel channel =
83+
this.context.getBean("channelWithApplySequenceEnabled", PublishSubscribeChannel.class);
8684
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
8785
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
8886
accessor.getPropertyValue("dispatcher");
8987
assertTrue((Boolean) new DirectFieldAccessor(dispatcher).getPropertyValue("applySequence"));
90-
context.close();
9188
}
9289

9390
@Test
9491
public void channelWithTaskExecutor() {
95-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
96-
"publishSubscribeChannelParserTests.xml", this.getClass());
97-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
98-
context.getBean("channelWithTaskExecutor");
92+
PublishSubscribeChannel channel =
93+
this.context.getBean("channelWithTaskExecutor", PublishSubscribeChannel.class);
9994
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
10095
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
10196
accessor.getPropertyValue("dispatcher");
@@ -106,15 +101,12 @@ public void channelWithTaskExecutor() {
106101
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
107102
Executor innerExecutor = (Executor) executorAccessor.getPropertyValue("executor");
108103
assertEquals(context.getBean("pool"), innerExecutor);
109-
context.close();
110104
}
111105

112106
@Test
113107
public void ignoreFailuresWithTaskExecutor() {
114-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
115-
"publishSubscribeChannelParserTests.xml", this.getClass());
116-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
117-
context.getBean("channelWithIgnoreFailuresAndTaskExecutor");
108+
PublishSubscribeChannel channel =
109+
this.context.getBean("channelWithIgnoreFailuresAndTaskExecutor", PublishSubscribeChannel.class);
118110
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
119111
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
120112
accessor.getPropertyValue("dispatcher");
@@ -125,16 +117,13 @@ public void ignoreFailuresWithTaskExecutor() {
125117
assertEquals(ErrorHandlingTaskExecutor.class, executor.getClass());
126118
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
127119
Executor innerExecutor = (Executor) executorAccessor.getPropertyValue("executor");
128-
assertEquals(context.getBean("pool"), innerExecutor);
129-
context.close();
120+
assertEquals(this.context.getBean("pool"), innerExecutor);
130121
}
131122

132123
@Test
133124
public void applySequenceEnabledWithTaskExecutor() {
134-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
135-
"publishSubscribeChannelParserTests.xml", this.getClass());
136-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
137-
context.getBean("channelWithApplySequenceEnabledAndTaskExecutor");
125+
PublishSubscribeChannel channel =
126+
this.context.getBean("channelWithApplySequenceEnabledAndTaskExecutor", PublishSubscribeChannel.class);
138127
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
139128
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
140129
accessor.getPropertyValue("dispatcher");
@@ -145,21 +134,17 @@ public void applySequenceEnabledWithTaskExecutor() {
145134
assertEquals(ErrorHandlingTaskExecutor.class, executor.getClass());
146135
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
147136
Executor innerExecutor = (Executor) executorAccessor.getPropertyValue("executor");
148-
assertEquals(context.getBean("pool"), innerExecutor);
149-
context.close();
137+
assertEquals(this.context.getBean("pool"), innerExecutor);
150138
}
151139

152140
@Test
153141
public void channelWithErrorHandler() {
154-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
155-
"publishSubscribeChannelParserTests.xml", this.getClass());
156-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
157-
context.getBean("channelWithErrorHandler");
142+
PublishSubscribeChannel channel =
143+
this.context.getBean("channelWithErrorHandler", PublishSubscribeChannel.class);
158144
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
159145
ErrorHandler errorHandler = (ErrorHandler) accessor.getPropertyValue("errorHandler");
160146
assertNotNull(errorHandler);
161-
assertEquals(context.getBean("testErrorHandler"), errorHandler);
162-
context.close();
147+
assertEquals(this.context.getBean("testErrorHandler"), errorHandler);
163148
}
164149

165150
}

src/reference/asciidoc/channel.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,10 @@ When using this element, you can also specify the `task-executor` used for publi
572572
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
573573
----
574574

575+
Alongside with the `Executor`, an `ErrorHandler` can be configured as well.
576+
By default the `PublishSubscribeChannel` uses a `MessagePublishingErrorHandler` implementation to send error to the `MessageChannel` from the `errorChannel` header or a global `errorChannel` instance.
577+
If an `Executor` is not configured, the `ErrorHandler` is ignored and exceptions are thrown directly to the caller's Thread.
578+
575579
If you are providing a _Resequencer_ or _Aggregator_ downstream from a `PublishSubscribeChannel`, then you can set the 'apply-sequence' property on the channel to `true`.
576580
That will indicate that the channel should set the sequence-size and sequence-number Message headers as well as the correlation id prior to passing the Messages along.
577581
For example, if there are 5 subscribers, the sequence-size would be set to 5, and the Messages would have sequence-number header values ranging from 1 to 5.

0 commit comments

Comments
 (0)