Skip to content

Commit 997b07a

Browse files
garyrussellartembilan
authored andcommitted
INT-4476: Fall back for ID and Timestamp Headers
JIRA: https://jira.spring.io/browse/INT-4476 When mapping outbound headers, if the `AmqpHeaders` `ID` and `Timestamp` headers are not present, fall back to mapping the `MessageHeaders` variants (if present). Also fix some PDF overflows.
1 parent 4c0f767 commit 997b07a

File tree

6 files changed

+68
-6
lines changed

6 files changed

+68
-6
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/DefaultAmqpHeaderMapper.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,13 +21,16 @@
2121
import java.util.HashMap;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.UUID;
2425

2526
import org.springframework.amqp.core.MessageDeliveryMode;
2627
import org.springframework.amqp.core.MessageProperties;
2728
import org.springframework.amqp.support.AmqpHeaders;
2829
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2930
import org.springframework.integration.mapping.AbstractHeaderMapper;
3031
import org.springframework.integration.mapping.support.JsonHeaders;
32+
import org.springframework.lang.Nullable;
33+
import org.springframework.messaging.MessageHeaders;
3134
import org.springframework.util.MimeType;
3235
import org.springframework.util.StringUtils;
3336

@@ -213,6 +216,16 @@ protected Map<String, Object> extractUserDefinedHeaders(MessageProperties amqpMe
213216
*/
214217
@Override
215218
protected void populateStandardHeaders(Map<String, Object> headers, MessageProperties amqpMessageProperties) {
219+
populateStandardHeaders(null, headers, amqpMessageProperties);
220+
}
221+
222+
/**
223+
* Maps headers from a Spring Integration MessageHeaders instance to the MessageProperties
224+
* of an AMQP Message.
225+
*/
226+
@Override
227+
protected void populateStandardHeaders(@Nullable Map<String, Object> allHeaders, Map<String, Object> headers,
228+
MessageProperties amqpMessageProperties) {
216229
String appId = getHeaderIfAvailable(headers, AmqpHeaders.APP_ID, String.class);
217230
if (StringUtils.hasText(appId)) {
218231
amqpMessageProperties.setAppId(appId);
@@ -265,6 +278,12 @@ protected void populateStandardHeaders(Map<String, Object> headers, MessagePrope
265278
if (StringUtils.hasText(messageId)) {
266279
amqpMessageProperties.setMessageId(messageId);
267280
}
281+
else if (allHeaders != null) {
282+
UUID id = getHeaderIfAvailable(allHeaders, MessageHeaders.ID, UUID.class);
283+
if (id != null) {
284+
amqpMessageProperties.setMessageId(id.toString());
285+
}
286+
}
268287
Integer priority = getHeaderIfAvailable(headers, IntegrationMessageHeaderAccessor.PRIORITY, Integer.class);
269288
if (priority != null) {
270289
amqpMessageProperties.setPriority(priority);
@@ -289,6 +308,12 @@ protected void populateStandardHeaders(Map<String, Object> headers, MessagePrope
289308
if (timestamp != null) {
290309
amqpMessageProperties.setTimestamp(timestamp);
291310
}
311+
else if (allHeaders != null) {
312+
Long ts = getHeaderIfAvailable(allHeaders, MessageHeaders.TIMESTAMP, Long.class);
313+
if (ts != null) {
314+
amqpMessageProperties.setTimestamp(new Date(ts));
315+
}
316+
}
292317
String type = getHeaderIfAvailable(headers, AmqpHeaders.TYPE, String.class);
293318
if (type != null) {
294319
amqpMessageProperties.setType(type);

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/DefaultAmqpHeaderMapperTests.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.amqp.support;
1818

19+
import static org.assertj.core.api.Assertions.assertThat;
1920
import static org.junit.Assert.assertEquals;
2021
import static org.junit.Assert.assertNull;
2122
import static org.junit.Assert.fail;
@@ -37,6 +38,7 @@
3738
import org.springframework.http.MediaType;
3839
import org.springframework.messaging.MessageChannel;
3940
import org.springframework.messaging.MessageHeaders;
41+
import org.springframework.messaging.support.GenericMessage;
4042
import org.springframework.util.MimeType;
4143
import org.springframework.util.MimeTypeUtils;
4244

@@ -50,6 +52,16 @@
5052
*/
5153
public class DefaultAmqpHeaderMapperTests {
5254

55+
@Test
56+
public void fromHeadersFallbackIdTimestamp() {
57+
DefaultAmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.outboundMapper();
58+
org.springframework.messaging.Message<?> message = new GenericMessage<>("");
59+
MessageProperties messageProperties = new MessageProperties();
60+
headerMapper.fromHeadersToRequest(message.getHeaders(), messageProperties);
61+
assertThat(message.getHeaders().getId().toString()).isEqualTo(messageProperties.getMessageId());
62+
assertThat(message.getHeaders().getTimestamp()).isEqualTo(messageProperties.getTimestamp().getTime());
63+
}
64+
5365
@Test
5466
public void fromHeaders() {
5567
DefaultAmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.outboundMapper();

spring-integration-core/src/main/java/org/springframework/integration/mapping/AbstractHeaderMapper.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
import org.apache.commons.logging.Log;
2929
import org.apache.commons.logging.LogFactory;
3030

31+
import org.springframework.lang.Nullable;
3132
import org.springframework.messaging.MessageChannel;
3233
import org.springframework.messaging.MessageHeaders;
3334
import org.springframework.util.Assert;
@@ -201,7 +202,7 @@ private void fromHeaders(MessageHeaders headers, T target, HeaderMatcher headerM
201202
subset.put(headerName, entry.getValue());
202203
}
203204
}
204-
this.populateStandardHeaders(subset, target);
205+
this.populateStandardHeaders(headers, subset, target);
205206
this.populateUserDefinedHeaders(subset, target);
206207
}
207208
catch (Exception e) {
@@ -337,6 +338,20 @@ protected Collection<String> getTransientHeaderNames() {
337338
*/
338339
protected abstract void populateStandardHeaders(Map<String, Object> headers, T target);
339340

341+
/**
342+
* Populate the specified standard headers to the specified source.
343+
* If not implemented, calls {@link #populateStandardHeaders(Map, Object)}.
344+
* @param allHeaders all headers including transient.
345+
* @param subset the map of standard headers to be populated.
346+
* @param target the target object to populate headers.
347+
* @since 5.1
348+
*/
349+
protected void populateStandardHeaders(@Nullable Map<String, Object> allHeaders, Map<String, Object> subset,
350+
T target) {
351+
352+
populateStandardHeaders(subset, target);
353+
}
354+
340355
/**
341356
* Populate the specified user-defined headers to the specified source.
342357
* @param headerName the user defined header name to be populated.

src/reference/asciidoc/amqp.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1504,6 +1504,10 @@ Negated patterns get priority, so a list such as
15041504
IMPORTANT: If you have a user defined header that begins with `!` that you *do* wish to map, you need to escape it with
15051505
`\` thus: `STANDARD_REQUEST_HEADERS,\!myBangHeader` and it *WILL* be mapped.
15061506

1507+
NOTE: Starting with _version 5.1_, the `DefaultAmqpHeaderMapper` will fall back to mapping `MessageHeaders.ID` and `MessageHeaders.TIMESTAMP` to `MessageProperties.messageId` and `MessageProperties.timestamp` respectively, if the corresponding `amqp_messageId` or `amqp_timestamp` headers are not present on outbound messages.
1508+
Inbound properties will be mapped to the `amqp_*` headers as before.
1509+
It is useful to populate the `messageId` property when message consumers are using stateful retry.
1510+
15071511
[[amqp-strict-ordering]]
15081512
=== Strict Message Ordering
15091513

src/reference/asciidoc/dsl.adoc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -581,8 +581,9 @@ And Lambda flow can't start from `MessageSource` or `MessageProducer`.
581581
Starting _version 5.1_, this kind of `IntegrationFlow` are wrapped to the proxy for exposing lifecycle control and provide access to the `inputChannel` of the internally associated `StandardIntegrationFlow`.
582582

583583
Starting with _version 5.0.6_, the generated bean names for the components in an `IntegrationFlow` include the flow bean followed by a dot as a prefix.
584-
For example the `ConsumerEndpointFactoryBean` for the `.transform("Hello "::concat)` in the sample above, will end up with te bean name like `lambdaFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0`.
585-
The `Transformer` implementation bean for that endpoint will have a bean name such as `lambdaFlow.org.springframework.integration.transformer.MethodInvokingTransformer#0`.
584+
For example the `ConsumerEndpointFactoryBean` for the `.transform("Hello "::concat)` in the sample above, will result with a bean name `lambdaFlow.o.s.integration.config.ConsumerEndpointFactoryBean#0`.
585+
The `Transformer` implementation bean for that endpoint will have a bean name `lambdaFlow.o.s.integration.transformer.MethodInvokingTransformer#0`.
586+
(In both cases, `o.s` is `org.springframework`; shortened here to fit on the page).
586587
These generated bean names are prepended with the flow id prefix for purposes such as parsing logs or grouping components together in some analysis tool, as well as to avoid a race condition when we concurrently register integration flows at runtime.
587588
See <<java-dsl-runtime-flows>> for more information.
588589

src/reference/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,8 @@ See <<json-transformers>> for more information.
5050
Starting with _version 5.0.5_, generated bean names for the components in an `IntegrationFlow` include the flow bean name, followed by a dot, as a prefix.
5151

5252
See <<java-dsl-flows>> for more information.
53+
54+
==== AMQP Changes
55+
56+
`ID` and `Timestamp` header mapping changes in the `DefaultAmqpHeaderMapper`.
57+
See the note near the bottom of <<amqp-message-headers>> for more information.

0 commit comments

Comments
 (0)