Skip to content

Commit 6065745

Browse files
garyrussellartembilan
authored andcommitted
INT-4421: Fix failedMessage in some exceptions
JIRA: https://jira.spring.io/browse/INT-4421 If a component invoked by an SPCA threw a `MessagingException` with no `failedMessage`, the resulting ErrorMessage payload had no `failedMessage`. The `UnicastingDispatcher` had a check for this so it wasn't an issue as long as at least one `DirectChannel` was between the poller and the component. Promote the wrapping code to `IntgrationUtils` and invoke it from places that blindly rethrew `MessagingException`s.
1 parent 1c4db65 commit 6065745

File tree

8 files changed

+96
-27
lines changed

8 files changed

+96
-27
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@
3939
import org.springframework.integration.support.management.MetricsContext;
4040
import org.springframework.integration.support.management.Statistics;
4141
import org.springframework.integration.support.management.TrackableComponent;
42+
import org.springframework.integration.support.utils.IntegrationUtils;
4243
import org.springframework.messaging.Message;
4344
import org.springframework.messaging.MessageChannel;
4445
import org.springframework.messaging.MessageDeliveryException;
45-
import org.springframework.messaging.MessagingException;
4646
import org.springframework.messaging.converter.MessageConverter;
4747
import org.springframework.messaging.support.ChannelInterceptor;
4848
import org.springframework.util.Assert;
@@ -473,11 +473,8 @@ public boolean send(Message<?> message, long timeout) {
473473
if (interceptorStack != null) {
474474
interceptors.afterSendCompletion(message, this, sent, e, interceptorStack);
475475
}
476-
if (e instanceof MessagingException) {
477-
throw (MessagingException) e;
478-
}
479-
throw new MessageDeliveryException(message,
480-
"failed to send Message to channel '" + this.getComponentName() + "'", e);
476+
throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
477+
() -> "failed to send Message to channel '" + this.getComponentName() + "'", e);
481478
}
482479
}
483480

spring-integration-core/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -123,6 +123,23 @@ protected boolean tryOptimizedDispatch(Message<?> message) {
123123
return false;
124124
}
125125

126+
/**
127+
* If the exception is not a {@link MessagingException} or does not have a
128+
* {@link MessagingException#getFailedMessage() failedMessage}, wrap it in a new
129+
* {@link MessagingException} with the message. There is some inconsistency here in
130+
* that {@link MessagingException}s are wrapped in a {@link MessagingException} whereas
131+
* {@link Exception}s are wrapped in {@link MessageDeliveryException}. It is retained
132+
* for backwards compatibility and will be resolved in 5.1.
133+
* It also does not wrap other {@link RuntimeException}s.
134+
* TODO: Remove this in favor of
135+
* {@code #wrapInDeliveryExceptionIfNecessary(Message, Supplier, Exception)} in 5.1.
136+
* @deprecated in favor of
137+
* {@code IntegrationUtils#wrapInDeliveryExceptionIfNecessary(Message, Supplier, Exception)}
138+
* @param message the message.
139+
* @param e the exception.
140+
* @return the wrapper, if necessary, or the original exception.
141+
*/
142+
@Deprecated
126143
protected RuntimeException wrapExceptionIfNecessary(Message<?> message, Exception e) {
127144
RuntimeException runtimeException = (e instanceof RuntimeException)
128145
? (RuntimeException) e

spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -145,7 +145,8 @@ private boolean doDispatch(Message<?> message) {
145145
success = true; // we have a winner.
146146
}
147147
catch (Exception e) {
148-
RuntimeException runtimeException = this.wrapExceptionIfNecessary(message, e);
148+
@SuppressWarnings("deprecation")
149+
RuntimeException runtimeException = wrapExceptionIfNecessary(message, e);
149150
exceptions.add(runtimeException);
150151
this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
151152
}

spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
import org.springframework.integration.channel.ExecutorChannelInterceptorAware;
2626
import org.springframework.integration.core.MessageProducer;
2727
import org.springframework.integration.router.MessageRouter;
28+
import org.springframework.integration.support.utils.IntegrationUtils;
2829
import org.springframework.integration.transaction.IntegrationResourceHolder;
2930
import org.springframework.messaging.Message;
3031
import org.springframework.messaging.MessageChannel;
3132
import org.springframework.messaging.MessageDeliveryException;
3233
import org.springframework.messaging.MessageHandler;
33-
import org.springframework.messaging.MessagingException;
3434
import org.springframework.messaging.PollableChannel;
3535
import org.springframework.messaging.support.ChannelInterceptor;
3636
import org.springframework.messaging.support.ExecutorChannelInterceptor;
@@ -135,11 +135,10 @@ protected void handleMessage(Message<?> message) {
135135
if (!CollectionUtils.isEmpty(interceptorStack)) {
136136
triggerAfterMessageHandled(theMessage, ex, interceptorStack);
137137
}
138-
if (ex instanceof MessagingException) {
139-
throw (MessagingException) ex;
140-
}
141-
String description = "Failed to handle " + theMessage + " to " + this + " in " + this.handler;
142-
throw new MessageDeliveryException(theMessage, description, ex);
138+
// TODO: In 5.1 remove this; adding the failed message to the text is redundant
139+
final Message<?> messageForText = theMessage;
140+
throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(theMessage,
141+
() -> "Failed to handle " + messageForText + " to " + this + " in " + this.handler, ex);
143142
}
144143
catch (Error ex) { //NOSONAR - ok, we re-throw below
145144
if (!CollectionUtils.isEmpty(interceptorStack)) {

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.springframework.integration.support.management.MetricsContext;
3131
import org.springframework.integration.support.management.Statistics;
3232
import org.springframework.integration.support.management.TrackableComponent;
33+
import org.springframework.integration.support.utils.IntegrationUtils;
3334
import org.springframework.messaging.Message;
3435
import org.springframework.messaging.MessageHandler;
35-
import org.springframework.messaging.MessageHandlingException;
3636
import org.springframework.messaging.MessagingException;
3737
import org.springframework.util.Assert;
3838

@@ -171,10 +171,8 @@ public void handleMessage(Message<?> message) {
171171
if (countsEnabled) {
172172
handlerMetrics.afterHandle(start, false);
173173
}
174-
if (e instanceof MessagingException) {
175-
throw (MessagingException) e;
176-
}
177-
throw new MessageHandlingException(message, "error occurred in message handler [" + this + "]", e);
174+
throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(message,
175+
() -> "error occurred in message handler [" + this + "]", e);
178176
}
179177
}
180178

spring-integration-core/src/main/java/org/springframework/integration/support/utils/IntegrationUtils.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.support.utils;
1818

1919
import java.io.UnsupportedEncodingException;
20+
import java.util.function.Supplier;
2021

2122
import org.apache.commons.logging.Log;
2223
import org.apache.commons.logging.LogFactory;
@@ -25,6 +26,10 @@
2526
import org.springframework.core.convert.ConversionService;
2627
import org.springframework.integration.support.DefaultMessageBuilderFactory;
2728
import org.springframework.integration.support.MessageBuilderFactory;
29+
import org.springframework.messaging.Message;
30+
import org.springframework.messaging.MessageDeliveryException;
31+
import org.springframework.messaging.MessageHandlingException;
32+
import org.springframework.messaging.MessagingException;
2833
import org.springframework.util.Assert;
2934

3035
/**
@@ -139,4 +144,46 @@ public static String bytesToString(byte[] bytes, String encoding) {
139144
}
140145
}
141146

147+
/**
148+
* If the exception is not a {@link MessagingException} or does not have
149+
* a {@link MessagingException#getFailedMessage() failedMessage}, wrap it
150+
* in a new {@link MessageDeliveryException} with the message.
151+
* @param message the message.
152+
* @param text a Supplier for the new exception's message text.
153+
* @param e the exception.
154+
* @return the wrapper, if necessary, or the original exception.
155+
* @since 5.0.4
156+
*/
157+
public static RuntimeException wrapInDeliveryExceptionIfNecessary(Message<?> message, Supplier<String> text, Exception e) {
158+
RuntimeException runtimeException = (e instanceof RuntimeException)
159+
? (RuntimeException) e
160+
: new MessageDeliveryException(message, text.get(), e);
161+
if (!(e instanceof MessagingException) ||
162+
((MessagingException) e).getFailedMessage() == null) {
163+
runtimeException = new MessageDeliveryException(message, text.get(), e);
164+
}
165+
return runtimeException;
166+
}
167+
168+
/**
169+
* If the exception is not a {@link MessagingException} or does not have
170+
* a {@link MessagingException#getFailedMessage() failedMessage}, wrap it
171+
* in a new {@link MessageHandlingException} with the message.
172+
* @param message the message.
173+
* @param text a Supplier for the new exception's message text.
174+
* @param e the exception.
175+
* @return the wrapper, if necessary, or the original exception.
176+
* @since 5.0.4
177+
*/
178+
public static RuntimeException wrapInHandlingExceptionIfNecessary(Message<?> message, Supplier<String> text, Exception e) {
179+
RuntimeException runtimeException = (e instanceof RuntimeException)
180+
? (RuntimeException) e
181+
: new MessageHandlingException(message, text.get(), e);
182+
if (!(e instanceof MessagingException) ||
183+
((MessagingException) e).getFailedMessage() == null) {
184+
runtimeException = new MessageHandlingException(message, text.get(), e);
185+
}
186+
return runtimeException;
187+
}
188+
142189
}

spring-integration-core/src/test/java/org/springframework/integration/handler/BridgeHandlerTests.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,8 +16,10 @@
1616

1717
package org.springframework.integration.handler;
1818

19+
import static org.hamcrest.CoreMatchers.instanceOf;
1920
import static org.junit.Assert.assertNotNull;
2021
import static org.junit.Assert.assertThat;
22+
import static org.junit.Assert.fail;
2123

2224
import org.hamcrest.Factory;
2325
import org.hamcrest.Matcher;
@@ -27,17 +29,19 @@
2729
import org.springframework.integration.message.MessageMatcher;
2830
import org.springframework.integration.support.MessageBuilder;
2931
import org.springframework.messaging.Message;
32+
import org.springframework.messaging.MessageHandlingException;
3033
import org.springframework.messaging.PollableChannel;
3134
import org.springframework.messaging.core.DestinationResolutionException;
3235
import org.springframework.messaging.support.GenericMessage;
3336

3437
/**
3538
* @author Mark Fisher
3639
* @author Iwein Fuld
40+
* @author Gary Russell
3741
*/
3842
public class BridgeHandlerTests {
3943

40-
private BridgeHandler handler = new BridgeHandler();
44+
private final BridgeHandler handler = new BridgeHandler();
4145

4246
@Factory
4347
public static Matcher<Message<?>> sameExceptImmutableHeaders(Message<?> expected) {
@@ -55,10 +59,16 @@ public void simpleBridge() {
5559
assertThat(reply, sameExceptImmutableHeaders(request));
5660
}
5761

58-
@Test(expected = DestinationResolutionException.class)
62+
@Test
5963
public void missingOutputChannelVerifiedAtRuntime() {
6064
Message<?> request = new GenericMessage<String>("test");
61-
handler.handleMessage(request);
65+
try {
66+
handler.handleMessage(request);
67+
fail("Expected exception");
68+
}
69+
catch (MessageHandlingException e) {
70+
assertThat(e.getCause(), instanceOf(DestinationResolutionException.class));
71+
}
6272
}
6373

6474
@Test(timeout = 1000)

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpOutboundGatewayTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public class TcpOutboundGatewayTests {
9191

9292
private static final Log logger = LogFactory.getLog(TcpOutboundGatewayTests.class);
9393

94-
private AsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
94+
private final AsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
9595

9696
@ClassRule
9797
public static LongRunningIntegrationTest longTests = new LongRunningIntegrationTest();
@@ -730,7 +730,7 @@ private void testGWPropagatesSocketCloseGuts(final int port, AbstractClientConne
730730
fail("expected failure");
731731
}
732732
catch (Exception e) {
733-
assertThat(e.getCause(), instanceOf(EOFException.class));
733+
assertThat(e.getCause().getCause(), instanceOf(EOFException.class));
734734
}
735735
assertEquals(0, TestUtils.getPropertyValue(gateway, "pendingReplies", Map.class).size());
736736
Message<?> reply = replyChannel.receive(0);
@@ -839,7 +839,7 @@ private void testGWPropagatesSocketTimeoutGuts(final int port, AbstractClientCon
839839
fail("expected failure");
840840
}
841841
catch (Exception e) {
842-
assertThat(e.getCause(), instanceOf(SocketTimeoutException.class));
842+
assertThat(e.getCause().getCause(), instanceOf(SocketTimeoutException.class));
843843
}
844844
assertEquals(0, TestUtils.getPropertyValue(gateway, "pendingReplies", Map.class).size());
845845
Message<?> reply = replyChannel.receive(0);

0 commit comments

Comments
 (0)