Skip to content

Commit 0d0605b

Browse files
artembilangaryrussell
authored andcommitted
INT-4381: MessageSources refactoring (#2517)
* INT-4381: MessageSources refactoring JIRA: https://jira.spring.io/browse/INT-4381 * Make all the out-of-the-box `MessageSource` implementations based on the `AbstractMessageSource` * Fix `JdbcPollingChannelAdapterIntegrationTests` for sporadic failure because of `fixed-rate` for the poller * Fix HeaderEnricherTests race condition The `errorChannel()` expect an error in the `testErrorChannel` after `1000` ms, but at the same time the `poller` in configured for the `3000` ms. * Increase all the timeouts for replies * Decrease a `fixed-delay` on the `poller` * Some other code style polishing for the `HeaderEnricherTests`
1 parent d6c8baf commit 0d0605b

File tree

28 files changed

+207
-319
lines changed

28 files changed

+207
-319
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/MethodInvokingMessageSource.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -68,8 +68,7 @@ public String getComponentType() {
6868
}
6969

7070
@Override
71-
public void afterPropertiesSet() throws Exception {
72-
super.afterPropertiesSet();
71+
protected void onInit() {
7372
synchronized (this.initializationMonitor) {
7473
if (this.initialized) {
7574
return;

spring-integration-core/src/main/java/org/springframework/integration/resource/ResourceRetrievingMessageSource.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,6 +39,8 @@
3939
* @author Oleg Zhurakousky
4040
* @author Mark Fisher
4141
* @author Gary Russell
42+
* @author Artem Bilan
43+
*
4244
* @since 2.1
4345
*/
4446
public class ResourceRetrievingMessageSource extends AbstractMessageSource<Resource[]>
@@ -79,8 +81,7 @@ public String getComponentType() {
7981

8082

8183
@Override
82-
public void afterPropertiesSet() throws Exception {
83-
super.afterPropertiesSet();
84+
protected void onInit() {
8485
if (this.patternResolver == null) {
8586
this.patternResolver = this.applicationContext;
8687
}
@@ -100,7 +101,7 @@ else if (this.filter != null) {
100101
resources = null;
101102
}
102103
else {
103-
resources = filteredResources.toArray(new Resource[filteredResources.size()]);
104+
resources = filteredResources.toArray(new Resource[0]);
104105
}
105106
}
106107
return resources;

spring-integration-core/src/main/java/org/springframework/integration/util/AbstractExpressionEvaluator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,13 @@ protected MessageBuilderFactory getMessageBuilderFactory() {
8888
}
8989

9090
@Override
91-
public void afterPropertiesSet() throws Exception {
91+
public final void afterPropertiesSet() throws Exception {
9292
getEvaluationContext();
9393
if (this.beanFactory != null) {
9494
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
9595
}
96+
97+
onInit();
9698
}
9799

98100
protected StandardEvaluationContext getEvaluationContext() {
@@ -169,4 +171,8 @@ protected <T> T evaluateExpression(Expression expression, Object input, Class<T>
169171
return expression.getValue(this.getEvaluationContext(), input, expectedType);
170172
}
171173

174+
protected void onInit() {
175+
176+
}
177+
172178
}

spring-integration-core/src/test/java/org/springframework/integration/config/xml/HeaderEnricherTests-context.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
</channel>
3131

3232
<header-enricher input-channel="errorChannelInput" output-channel="failInput">
33-
<poller max-messages-per-poll="1" fixed-delay="3000"/>
33+
<poller max-messages-per-poll="1" fixed-delay="10"/>
3434
<error-channel ref="testErrorChannel"/>
3535
</header-enricher>
3636

spring-integration-core/src/test/java/org/springframework/integration/config/xml/HeaderEnricherTests.java

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,6 +50,7 @@
5050
/**
5151
* @author Mark Fisher
5252
* @author Artem Bilan
53+
*
5354
* @since 2.0
5455
*/
5556
@ContextConfiguration
@@ -63,8 +64,8 @@ public class HeaderEnricherTests {
6364
public void replyChannel() {
6465
PollableChannel replyChannel = context.getBean("testReplyChannel", PollableChannel.class);
6566
MessageChannel inputChannel = context.getBean("replyChannelInput", MessageChannel.class);
66-
inputChannel.send(new GenericMessage<String>("test"));
67-
Message<?> result = replyChannel.receive(0);
67+
inputChannel.send(new GenericMessage<>("test"));
68+
Message<?> result = replyChannel.receive(10000);
6869
assertNotNull(result);
6970
assertEquals("TEST", result.getPayload());
7071
assertEquals(replyChannel, result.getHeaders().getReplyChannel());
@@ -74,8 +75,8 @@ public void replyChannel() {
7475
public void replyChannelName() {
7576
PollableChannel replyChannel = context.getBean("testReplyChannel", PollableChannel.class);
7677
MessageChannel inputChannel = context.getBean("replyChannelNameInput", MessageChannel.class);
77-
inputChannel.send(new GenericMessage<String>("test"));
78-
Message<?> result = replyChannel.receive(0);
78+
inputChannel.send(new GenericMessage<>("test"));
79+
Message<?> result = replyChannel.receive(10000);
7980
assertNotNull(result);
8081
assertEquals("TEST", result.getPayload());
8182
assertEquals("testReplyChannel", result.getHeaders().getReplyChannel());
@@ -85,8 +86,8 @@ public void replyChannelName() {
8586
public void replyChannelExpression() {
8687
PollableChannel replyChannel = context.getBean("testReplyChannel", PollableChannel.class);
8788
MessageChannel inputChannel = context.getBean("replyChannelExpressionInput", MessageChannel.class);
88-
inputChannel.send(new GenericMessage<String>("test"));
89-
Message<?> result = replyChannel.receive(0);
89+
inputChannel.send(new GenericMessage<>("test"));
90+
Message<?> result = replyChannel.receive(10000);
9091
assertNotNull(result);
9192
assertEquals("TEST", result.getPayload());
9293
assertEquals(replyChannel, result.getHeaders().getReplyChannel());
@@ -96,8 +97,8 @@ public void replyChannelExpression() {
9697
public void errorChannel() {
9798
PollableChannel errorChannel = context.getBean("testErrorChannel", PollableChannel.class);
9899
MessageChannel inputChannel = context.getBean("errorChannelInput", MessageChannel.class);
99-
inputChannel.send(new GenericMessage<String>("test"));
100-
Message<?> errorMessage = errorChannel.receive(1000);
100+
inputChannel.send(new GenericMessage<>("test"));
101+
Message<?> errorMessage = errorChannel.receive(10000);
101102
assertNotNull(errorMessage);
102103
Object errorPayload = errorMessage.getPayload();
103104
assertEquals(MessageTransformationException.class, errorPayload.getClass());
@@ -110,7 +111,7 @@ public void errorChannel() {
110111
public void correlationIdValue() {
111112
MessagingTemplate template = new MessagingTemplate();
112113
MessageChannel channel = context.getBean("correlationIdValueInput", MessageChannel.class);
113-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<String>("test"));
114+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>("test"));
114115
assertNotNull(result);
115116
assertEquals("ABC", new IntegrationMessageHeaderAccessor(result).getCorrelationId());
116117
}
@@ -119,7 +120,7 @@ public void correlationIdValue() {
119120
public void correlationIdValueWithType() {
120121
MessagingTemplate template = new MessagingTemplate();
121122
MessageChannel channel = context.getBean("correlationIdValueWithTypeInput", MessageChannel.class);
122-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<String>("test"));
123+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>("test"));
123124
assertNotNull(result);
124125
Object correlationId = new IntegrationMessageHeaderAccessor(result).getCorrelationId();
125126
assertEquals(Long.class, correlationId.getClass());
@@ -130,7 +131,7 @@ public void correlationIdValueWithType() {
130131
public void correlationIdRef() {
131132
MessagingTemplate template = new MessagingTemplate();
132133
MessageChannel channel = context.getBean("correlationIdRefInput", MessageChannel.class);
133-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<String>("test"));
134+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>("test"));
134135
assertNotNull(result);
135136
assertEquals(123, new IntegrationMessageHeaderAccessor(result).getCorrelationId());
136137
}
@@ -139,7 +140,7 @@ public void correlationIdRef() {
139140
public void expirationDateValue() {
140141
MessagingTemplate template = new MessagingTemplate();
141142
MessageChannel channel = context.getBean("expirationDateValueInput", MessageChannel.class);
142-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<String>("test"));
143+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>("test"));
143144
assertNotNull(result);
144145
assertEquals(new Long(1111), new IntegrationMessageHeaderAccessor(result).getExpirationDate());
145146
}
@@ -148,7 +149,7 @@ public void expirationDateValue() {
148149
public void expirationDateRef() {
149150
MessagingTemplate template = new MessagingTemplate();
150151
MessageChannel channel = context.getBean("expirationDateRefInput", MessageChannel.class);
151-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<String>("test"));
152+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>("test"));
152153
assertNotNull(result);
153154
assertEquals(new Long(9999), new IntegrationMessageHeaderAccessor(result).getExpirationDate());
154155
}
@@ -157,7 +158,7 @@ public void expirationDateRef() {
157158
public void priority() {
158159
MessagingTemplate template = new MessagingTemplate();
159160
MessageChannel channel = context.getBean("priorityInput", MessageChannel.class);
160-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<String>("test"));
161+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>("test"));
161162
assertNotNull(result);
162163
assertEquals(new Integer(42), new IntegrationMessageHeaderAccessor(result).getPriority());
163164
}
@@ -167,7 +168,7 @@ public void priorityExpression() {
167168
MessagingTemplate template = new MessagingTemplate();
168169
MessageChannel channel = context.getBean("priorityExpressionInput", MessageChannel.class);
169170
Message<?> result = template.sendAndReceive(channel,
170-
new GenericMessage<Map<String, String>>(Collections.singletonMap("priority", "-10")));
171+
new GenericMessage<>(Collections.singletonMap("priority", "-10")));
171172
assertNotNull(result);
172173
assertEquals(new Integer(-10), new IntegrationMessageHeaderAccessor(result).getPriority());
173174
}
@@ -176,7 +177,7 @@ public void priorityExpression() {
176177
public void expressionUsingPayload() {
177178
MessagingTemplate template = new MessagingTemplate();
178179
MessageChannel channel = context.getBean("payloadExpressionInput", MessageChannel.class);
179-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<TestBean>(new TestBean("foo")));
180+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>(new TestBean("foo")));
180181
assertNotNull(result);
181182
assertEquals("foobar", result.getHeaders().get("testHeader"));
182183
}
@@ -195,7 +196,7 @@ public void expressionUsingHeader() {
195196
public void expressionWithDateType() {
196197
MessagingTemplate template = new MessagingTemplate();
197198
MessageChannel channel = context.getBean("expressionWithDateTypeInput", MessageChannel.class);
198-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<String>("test"));
199+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>("test"));
199200
assertNotNull(result);
200201
Object headerValue = result.getHeaders().get("currentDate");
201202
assertEquals(Date.class, headerValue.getClass());
@@ -207,7 +208,7 @@ public void expressionWithDateType() {
207208
public void expressionWithLongType() {
208209
MessagingTemplate template = new MessagingTemplate();
209210
MessageChannel channel = context.getBean("expressionWithLongTypeInput", MessageChannel.class);
210-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<String>("test"));
211+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>("test"));
211212
assertNotNull(result);
212213
assertEquals(Long.class, result.getHeaders().get("number").getClass());
213214
assertEquals(12345L, result.getHeaders().get("number"));
@@ -217,7 +218,7 @@ public void expressionWithLongType() {
217218
public void refWithMethod() {
218219
MessagingTemplate template = new MessagingTemplate();
219220
MessageChannel channel = context.getBean("refWithMethod", MessageChannel.class);
220-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<String>("test"));
221+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>("test"));
221222
assertNotNull(result);
222223
assertEquals(String.class, result.getHeaders().get("testHeader").getClass());
223224
assertEquals("testBeanForMethodInvoker", result.getHeaders().get("testHeader"));
@@ -227,7 +228,7 @@ public void refWithMethod() {
227228
public void ref() {
228229
MessagingTemplate template = new MessagingTemplate();
229230
MessageChannel channel = context.getBean("ref", MessageChannel.class);
230-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<String>("test"));
231+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>("test"));
231232
assertNotNull(result);
232233
assertEquals(TestBean.class, result.getHeaders().get("testHeader").getClass());
233234
TestBean testBeanForRef = context.getBean("testBean1", TestBean.class);
@@ -238,7 +239,7 @@ public void ref() {
238239
public void innerBean() {
239240
MessagingTemplate template = new MessagingTemplate();
240241
MessageChannel channel = context.getBean("innerBean", MessageChannel.class);
241-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<String>("test"));
242+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>("test"));
242243
assertNotNull(result);
243244
assertEquals(TestBean.class, result.getHeaders().get("testHeader").getClass());
244245
TestBean testBeanForInnerBean = new TestBean("testBeanForInnerBean");
@@ -249,7 +250,7 @@ public void innerBean() {
249250
public void innerBeanWithMethod() {
250251
MessagingTemplate template = new MessagingTemplate();
251252
MessageChannel channel = context.getBean("innerBeanWithMethod", MessageChannel.class);
252-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<String>("test"));
253+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>("test"));
253254
assertNotNull(result);
254255
assertEquals(String.class, result.getHeaders().get("testHeader").getClass());
255256
assertEquals("testBeanForInnerBeanWithMethod", result.getHeaders().get("testHeader"));
@@ -265,7 +266,7 @@ public void testFailConfigUnexpectedSubElement() {
265266
public void testRoutingSlip() {
266267
MessagingTemplate template = new MessagingTemplate();
267268
MessageChannel channel = context.getBean("routingSlipInput", MessageChannel.class);
268-
Message<?> result = template.sendAndReceive(channel, new GenericMessage<String>("test"));
269+
Message<?> result = template.sendAndReceive(channel, new GenericMessage<>("test"));
269270
assertNotNull(result);
270271
Object routingSlip = new IntegrationMessageHeaderAccessor(result)
271272
.getHeader(IntegrationMessageHeaderAccessor.ROUTING_SLIP);
@@ -311,6 +312,7 @@ public boolean equals(Object o) {
311312
public int hashCode() {
312313
return name != null ? name.hashCode() : 0;
313314
}
315+
314316
}
315317

316318
}

spring-integration-feed/src/main/java/org/springframework/integration/feed/inbound/FeedEntryMessageSource.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@
2828
import org.springframework.beans.factory.BeanFactory;
2929
import org.springframework.core.io.Resource;
3030
import org.springframework.integration.context.IntegrationContextUtils;
31-
import org.springframework.integration.context.IntegrationObjectSupport;
3231
import org.springframework.integration.core.MessageSource;
32+
import org.springframework.integration.endpoint.AbstractMessageSource;
3333
import org.springframework.integration.metadata.MetadataStore;
3434
import org.springframework.integration.metadata.SimpleMetadataStore;
35-
import org.springframework.messaging.Message;
3635
import org.springframework.messaging.MessagingException;
3736
import org.springframework.util.Assert;
3837
import org.springframework.util.CollectionUtils;
@@ -55,7 +54,7 @@
5554
*
5655
* @since 2.0
5756
*/
58-
public class FeedEntryMessageSource extends IntegrationObjectSupport implements MessageSource<SyndEntry> {
57+
public class FeedEntryMessageSource extends AbstractMessageSource<SyndEntry> {
5958

6059
private final URL feedUrl;
6160

@@ -145,18 +144,7 @@ public String getComponentType() {
145144
}
146145

147146
@Override
148-
public Message<SyndEntry> receive() {
149-
Assert.isTrue(this.initialized,
150-
"'FeedEntryReaderMessageSource' must be initialized before it can produce Messages.");
151-
SyndEntry entry = doReceive();
152-
if (entry == null) {
153-
return null;
154-
}
155-
return this.getMessageBuilderFactory().withPayload(entry).build();
156-
}
157-
158-
@Override
159-
protected void onInit() throws Exception {
147+
protected void onInit() {
160148
if (this.metadataStore == null) {
161149
// first try to look for a 'messageStore' in the context
162150
BeanFactory beanFactory = this.getBeanFactory();
@@ -176,13 +164,16 @@ protected void onInit() throws Exception {
176164
this.initialized = true;
177165
}
178166

179-
private SyndEntry doReceive() {
167+
@Override
168+
protected SyndEntry doReceive() {
169+
Assert.isTrue(this.initialized,
170+
"'FeedEntryReaderMessageSource' must be initialized before it can produce Messages.");
180171
SyndEntry nextEntry = null;
181172
synchronized (this.monitor) {
182173
nextEntry = getNextEntry();
183174
if (nextEntry == null) {
184175
// read feed and try again
185-
this.populateEntryList();
176+
populateEntryList();
186177
nextEntry = getNextEntry();
187178
}
188179
}

spring-integration-feed/src/test/java/org/springframework/integration/feed/inbound/FeedEntryMessageSourceTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -75,7 +75,7 @@ public void testReceiveFeedWithNoEntries() throws Exception {
7575
public void testReceiveFeedWithEntriesSorted() throws Exception {
7676
ClassPathResource resource = new ClassPathResource("org/springframework/integration/feed/sample.rss");
7777
FeedEntryMessageSource source = new FeedEntryMessageSource(resource, "foo");
78-
source.setComponentName("feedReader");
78+
source.setBeanName("feedReader");
7979
source.setBeanFactory(mock(BeanFactory.class));
8080
source.afterPropertiesSet();
8181
Message<SyndEntry> message1 = source.receive();

0 commit comments

Comments
 (0)