Skip to content

Commit e79579a

Browse files
committed
* Introduce CloudEventHeaders with well-known constant for Cloud Event headers in the message
* Introduce a `ContentTypeDelegatingDataMarshaller` based on the `org.springframework.core.codec.Encoder` abstraction to delegate * Use `ContentTypeDelegatingDataMarshaller` from the `ToCloudEventTransformer` * Modify `ToCloudEventTransformerTests` to use constants from the `CloudEventHeaders` and verify that `text/plain` marshalling works well for cloud events
1 parent 962ba1a commit e79579a

File tree

6 files changed

+239
-21
lines changed

6 files changed

+239
-21
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support.cloudevents;
18+
19+
/**
20+
* Message headers for basic cloud event attributes.
21+
* These headers might be remapped to respective attributes/headers
22+
* in the target protocol binder.
23+
*
24+
* @author Artem Bilan
25+
*
26+
* @since 5.3
27+
*/
28+
public final class CloudEventHeaders {
29+
30+
private CloudEventHeaders() {
31+
}
32+
33+
/**
34+
* Header prefix as a {@value PREFIX} for cloud event attributes.
35+
*/
36+
public static final String PREFIX = "ce_";
37+
38+
/**
39+
* The header name for cloud event {@code id} attribute.
40+
*/
41+
public static final String ID = PREFIX + "id";
42+
43+
/**
44+
* The header name for cloud event {@code source} attribute.
45+
*/
46+
public static final String SOURCE = PREFIX + "source";
47+
48+
/**
49+
* The header name for cloud event {@code specversion} attribute.
50+
*/
51+
public static final String SPEC_VERSION = PREFIX + "specversion";
52+
53+
/**
54+
* The header name for cloud event {@code type} attribute.
55+
*/
56+
public static final String TYPE = PREFIX + "type";
57+
58+
/**
59+
* The header name for cloud event {@code datacontenttype} attribute.
60+
*/
61+
public static final String DATA_CONTENT_TYPE = PREFIX + "datacontenttype";
62+
63+
/**
64+
* The header name for cloud event {@code dataschema} attribute.
65+
*/
66+
public static final String DATA_SCHEMA = PREFIX + "dataschema";
67+
68+
/**
69+
* The header name for cloud event {@code subject} attribute.
70+
*/
71+
public static final String SUBJECT = PREFIX + "subject";
72+
73+
/**
74+
* The header name for cloud event {@code time} attribute.
75+
*/
76+
public static final String TIME = PREFIX + "time";
77+
78+
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support.cloudevents;
18+
19+
import java.nio.ByteBuffer;
20+
import java.util.ArrayList;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
import org.springframework.core.ResolvableType;
26+
import org.springframework.core.codec.CharSequenceEncoder;
27+
import org.springframework.core.codec.Encoder;
28+
import org.springframework.core.io.buffer.DataBuffer;
29+
import org.springframework.core.io.buffer.DataBufferFactory;
30+
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
31+
import org.springframework.messaging.MessageHeaders;
32+
import org.springframework.util.Assert;
33+
import org.springframework.util.MimeType;
34+
35+
import io.cloudevents.fun.DataMarshaller;
36+
import io.cloudevents.json.Json;
37+
38+
/**
39+
* A {@link DataMarshaller} implementation for delegating
40+
* to the provided {@link Encoder}s according a {@link MessageHeaders#CONTENT_TYPE}
41+
* header value.
42+
*
43+
* @author Artem Bilan
44+
*
45+
* @since 5.3
46+
*/
47+
public class ContentTypeDelegatingDataMarshaller<T> implements DataMarshaller<byte[], T, String> {
48+
49+
private final DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
50+
51+
private final List<Encoder<?>> encoders = new ArrayList<>();
52+
53+
@SafeVarargs
54+
public ContentTypeDelegatingDataMarshaller(Encoder<T>... encoders) {
55+
this.encoders.add(CharSequenceEncoder.allMimeTypes());
56+
setEncoders(encoders);
57+
}
58+
59+
@SafeVarargs
60+
public final void setEncoders(Encoder<T>... encoders) {
61+
Assert.notNull(encoders, "'encoders' must not be null");
62+
Assert.noNullElements(encoders, "'encoders' must not contain null elements");
63+
this.encoders.addAll(Arrays.asList(encoders));
64+
}
65+
66+
@Override
67+
@SuppressWarnings({ "unchecked", "rawtypes" })
68+
public byte[] marshal(T data, Map<String, String> headers) throws RuntimeException {
69+
String contentType = headers.get(MessageHeaders.CONTENT_TYPE);
70+
if (contentType == null) { // Assume JSON by default
71+
return Json.binaryMarshal(data, headers);
72+
}
73+
else {
74+
ResolvableType elementType = ResolvableType.forClass(data.getClass());
75+
MimeType mimeType = MimeType.valueOf(contentType);
76+
Encoder<T> encoder = encoder(elementType, mimeType);
77+
DataBuffer dataBuffer =
78+
encoder.encodeValue(data, this.dataBufferFactory, elementType,
79+
mimeType, (Map<String, Object>) (Map) headers);
80+
81+
ByteBuffer buf = dataBuffer.asByteBuffer();
82+
byte[] result = new byte[buf.remaining()];
83+
buf.get(result);
84+
return result;
85+
}
86+
}
87+
88+
@SuppressWarnings("unchecked")
89+
private Encoder<T> encoder(ResolvableType elementType, MimeType mimeType) {
90+
for (Encoder<?> encoder : this.encoders) {
91+
if (encoder.canEncode(elementType, mimeType)) {
92+
return (Encoder<T>) encoder;
93+
}
94+
}
95+
throw new IllegalArgumentException("No encoder for " + elementType);
96+
}
97+
98+
}

spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,7 @@
3434
*
3535
* @since 5.3
3636
*/
37-
public class HeaderMapper {
38-
39-
/**
40-
* Cloud event headers prefix as a {@value HEADER_PREFIX}.
41-
*/
42-
public static final String HEADER_PREFIX = "ce_";
37+
public final class HeaderMapper {
4338

4439
/**
4540
* Following the signature of {@link io.cloudevents.fun.FormatHeaderMapper}
@@ -59,7 +54,7 @@ public static Map<String, String> map(Map<String, String> attributes, Map<String
5954
&& !ContextAttributes.datacontenttype.name().equals(attribute.getKey()))
6055
.map(header ->
6156
new AbstractMap.SimpleEntry<>(
62-
HEADER_PREFIX + header.getKey().toLowerCase(Locale.US),
57+
CloudEventHeaders.PREFIX + header.getKey().toLowerCase(Locale.US),
6358
header.getValue()))
6459
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
6560

spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/Marshallers.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.cloudevents.format.StructuredMarshaller;
2727
import io.cloudevents.format.Wire;
2828
import io.cloudevents.format.builder.EventStep;
29+
import io.cloudevents.fun.DataMarshaller;
2930
import io.cloudevents.json.Json;
3031
import io.cloudevents.v1.Accessor;
3132
import io.cloudevents.v1.AttributesImpl;
@@ -49,12 +50,27 @@ public final class Marshallers {
4950
* @see BinaryMarshaller
5051
*/
5152
public static <T> EventStep<AttributesImpl, T, byte[], String> binary() {
53+
return binary(Json::binaryMarshal);
54+
}
55+
56+
/**
57+
* Builds a Binary Content Mode marshaller to marshal cloud events as a {@code byte[]} for
58+
* any Transport Binding.
59+
* The data marshalling is based on the provided {@link DataMarshaller}.
60+
* @param marshaller the {@link DataMarshaller} for cloud event payload.
61+
* @param <T> The data type
62+
* @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON
63+
* @see BinaryMarshaller
64+
*/
65+
public static <T> EventStep<AttributesImpl, T, byte[], String> binary(
66+
DataMarshaller<byte[], T, String> marshaller) {
67+
5268
return BinaryMarshaller.<AttributesImpl, T, byte[], String>builder()
5369
.map(AttributesImpl::marshal)
5470
.map(Accessor::extensionsOf)
5571
.map(ExtensionFormat::marshal)
5672
.map(HeaderMapper::map)
57-
.map(Json::binaryMarshal)
73+
.map(marshaller)
5874
.builder(Wire::new);
5975
}
6076

spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
import java.time.ZonedDateTime;
2121
import java.util.UUID;
2222

23+
import org.springframework.core.codec.Encoder;
2324
import org.springframework.expression.EvaluationContext;
2425
import org.springframework.expression.Expression;
2526
import org.springframework.integration.StaticMessageHeaderAccessor;
2627
import org.springframework.integration.expression.ExpressionUtils;
2728
import org.springframework.integration.expression.FunctionExpression;
29+
import org.springframework.integration.support.cloudevents.ContentTypeDelegatingDataMarshaller;
2830
import org.springframework.integration.support.cloudevents.Marshallers;
2931
import org.springframework.lang.Nullable;
3032
import org.springframework.messaging.Message;
@@ -70,6 +72,9 @@ public enum Result {
7072

7173
private final URI source;
7274

75+
private final ContentTypeDelegatingDataMarshaller<Object> dataMarshaller =
76+
new ContentTypeDelegatingDataMarshaller<>();
77+
7378
@Nullable
7479
private final EventStep<AttributesImpl, Object, byte[], String> wireBuilder;
7580

@@ -97,7 +102,7 @@ public ToCloudEventTransformer(URI source, Result resultMode) {
97102
this.source = source;
98103
switch (resultMode) {
99104
case BINARY:
100-
this.wireBuilder = Marshallers.binary();
105+
this.wireBuilder = Marshallers.binary(this.dataMarshaller);
101106
break;
102107
case STRUCTURED:
103108
this.wireBuilder = Marshallers.structured();
@@ -108,7 +113,7 @@ public ToCloudEventTransformer(URI source, Result resultMode) {
108113
}
109114

110115
public void setTypeExpression(Expression typeExpression) {
111-
Assert.notNull(source, "'typeExpression' must not be null");
116+
Assert.notNull(typeExpression, "'typeExpression' must not be null");
112117
this.typeExpression = typeExpression;
113118
}
114119

@@ -124,6 +129,18 @@ public void setExtensionExpression(@Nullable Expression extensionExpression) {
124129
this.extensionExpression = extensionExpression;
125130
}
126131

132+
/**
133+
* Configure a set of {@link Encoder}s for content type based data marshalling.
134+
* They are used only for the the {@link Result#BINARY} mode and when inbound payload
135+
* is not a {@code byte[]} already.
136+
* Plus {@link MessageHeaders#CONTENT_TYPE} must be present in the request message.
137+
* @param encoders the {@link Encoder}s to use.
138+
*/
139+
@SafeVarargs
140+
public final void setEncoders(Encoder<Object>... encoders) {
141+
this.dataMarshaller.setEncoders(encoders);
142+
}
143+
127144
@Override
128145
protected void onInit() {
129146
super.onInit();
@@ -140,7 +157,7 @@ protected Object doTransform(Message<?> message) {
140157
.marshal();
141158

142159
return getMessageBuilderFactory()
143-
.withPayload(wire.getPayload().orElse(new byte[0]))
160+
.withPayload(wire.getPayload().orElse(cloudEvent.getDataBase64()))
144161
.copyHeaders(wire.getHeaders())
145162
.copyHeadersIfAbsent(message.getHeaders())
146163
.build();

spring-integration-core/src/test/java/org/springframework/integration/transformer/ToCloudEventTransformerTests.java

+23-10
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,13 @@
2929
import org.springframework.context.ConfigurableApplicationContext;
3030
import org.springframework.expression.common.LiteralExpression;
3131
import org.springframework.integration.json.JsonPathUtils;
32+
import org.springframework.integration.support.MessageBuilder;
33+
import org.springframework.integration.support.cloudevents.CloudEventHeaders;
3234
import org.springframework.integration.test.util.TestUtils;
3335
import org.springframework.messaging.Message;
3436
import org.springframework.messaging.MessageHeaders;
3537
import org.springframework.messaging.support.GenericMessage;
38+
import org.springframework.util.MimeTypeUtils;
3639

3740
import io.cloudevents.CloudEvent;
3841

@@ -74,18 +77,24 @@ void testBinary() {
7477
ToCloudEventTransformer transformer =
7578
new ToCloudEventTransformer(SOURCE, ToCloudEventTransformer.Result.BINARY);
7679
transformer.setSubjectExpression(new LiteralExpression("some_subject"));
77-
GenericMessage<String> message = new GenericMessage<>("test");
80+
Message<String> message =
81+
MessageBuilder.withPayload("test")
82+
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
83+
.build();
7884
Message<?> result = transformer.transform(message);
7985
assertThat(result.getHeaders())
80-
.containsEntry("ce_type", String.class.getName())
81-
.containsEntry("ce_source", SOURCE.toString())
82-
.containsEntry("ce_id", message.getHeaders().getId().toString())
83-
.containsEntry("ce_subject", "some_subject")
84-
.containsKeys("ce_time", "ce_specversion")
85-
.doesNotContainKeys("ce_datacontenttype", MessageHeaders.CONTENT_TYPE, "ce_content_type");
86+
.containsEntry(CloudEventHeaders.TYPE, String.class.getName())
87+
.containsEntry(CloudEventHeaders.SOURCE, SOURCE.toString())
88+
.containsEntry(CloudEventHeaders.ID, message.getHeaders().getId().toString())
89+
.containsEntry(CloudEventHeaders.SUBJECT, "some_subject")
90+
.containsEntry(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE)
91+
.containsKeys(CloudEventHeaders.TIME, CloudEventHeaders.SPEC_VERSION)
92+
.doesNotContainKeys(
93+
CloudEventHeaders.DATA_CONTENT_TYPE,
94+
"ce_content_type");
8695
assertThat(result.getPayload())
8796
.isInstanceOf(byte[].class)
88-
.isEqualTo("\"test\"".getBytes());
97+
.isEqualTo("test".getBytes());
8998
}
9099

91100
@Test
@@ -96,7 +105,12 @@ void testStructured() throws IOException {
96105
Message<?> result = transformer.transform(message);
97106
assertThat(result.getHeaders())
98107
.containsEntry(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json")
99-
.doesNotContainKeys("ce_id", "ce_source", "ce_datacontenttype", "ce_time", "ce_specversion");
108+
.doesNotContainKeys(
109+
CloudEventHeaders.ID,
110+
CloudEventHeaders.SOURCE,
111+
CloudEventHeaders.DATA_CONTENT_TYPE,
112+
CloudEventHeaders.TIME,
113+
CloudEventHeaders.SPEC_VERSION);
100114
Object payload = result.getPayload();
101115
assertThat(payload).isInstanceOf(byte[].class);
102116

@@ -108,7 +122,6 @@ void testStructured() throws IOException {
108122

109123
jsonPath = JsonPathUtils.evaluate(payload, "$..type");
110124
assertThat(jsonPath.get(0)).isEqualTo(String.class.getName());
111-
112125
}
113126

114127
}

0 commit comments

Comments
 (0)