Skip to content

Commit 13510df

Browse files
artembilangaryrussell
authored andcommitted
INT-4452: Expire immediately when group timeout<0
JIRA: https://jira.spring.io/browse/INT-4452 This is pretty typical in practice to get a `groupTimeout` to be evaluated to a negative value: some business decisions, sensitive calculations and so on. * Treat any non-positive `groupTimeout` as a signal to expire group immediately without scheduling. Only `null` is considered as a signal do nothing for the current message * Polishing for some tests for better performance - saves some execution time
1 parent ee501c8 commit 13510df

File tree

15 files changed

+161
-121
lines changed

15 files changed

+161
-121
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
100100

101101
private final Map<UUID, ScheduledFuture<?>> expireGroupScheduledFutures = new ConcurrentHashMap<>();
102102

103-
private final Set<Object> groupIds = ConcurrentHashMap.newKeySet();
103+
private final Set<Object> groupIds = ConcurrentHashMap.newKeySet();
104104

105105
private MessageGroupProcessor outputProcessor;
106106

@@ -144,15 +144,23 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
144144

145145
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
146146
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
147+
147148
Assert.notNull(processor, "'processor' must not be null");
148149
Assert.notNull(store, "'store' must not be null");
149150

150151
setMessageStore(store);
151152
this.outputProcessor = processor;
152-
this.correlationStrategy = (correlationStrategy == null
153-
? new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
154-
: correlationStrategy);
155-
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
153+
154+
this.correlationStrategy =
155+
correlationStrategy == null
156+
? new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
157+
: correlationStrategy;
158+
159+
this.releaseStrategy =
160+
releaseStrategy == null
161+
? new SimpleSequenceSizeReleaseStrategy()
162+
: releaseStrategy;
163+
156164
this.releaseStrategySet = releaseStrategy != null;
157165
this.sequenceAware = this.releaseStrategy instanceof SequenceSizeReleaseStrategy;
158166
}
@@ -195,7 +203,7 @@ public void setGroupTimeoutExpression(Expression groupTimeoutExpression) {
195203
}
196204

197205
public void setForceReleaseAdviceChain(List<Advice> forceReleaseAdviceChain) {
198-
Assert.notNull(forceReleaseAdviceChain, "forceReleaseAdviceChain must not be null");
206+
Assert.notNull(forceReleaseAdviceChain, "'forceReleaseAdviceChain' must not be null");
199207
this.forceReleaseAdviceChain = forceReleaseAdviceChain;
200208
}
201209

@@ -242,7 +250,7 @@ protected void onInit() throws Exception {
242250
}
243251

244252
if (this.releasePartialSequences) {
245-
Assert.isInstanceOf(SequenceSizeReleaseStrategy.class, this.releaseStrategy,
253+
Assert.isInstanceOf(SequenceSizeReleaseStrategy.class, this.releaseStrategy, () ->
246254
"Release strategy of type [" + this.releaseStrategy.getClass().getSimpleName() +
247255
"] cannot release partial sequences. Use a SequenceSizeReleaseStrategy instead.");
248256
((SequenceSizeReleaseStrategy) this.releaseStrategy)
@@ -519,7 +527,7 @@ private void scheduleGroupToForceComplete(MessageGroup messageGroup) {
519527
* When 'groupTimeout' is evaluated to 'null' we do nothing.
520528
* The 'MessageGroupStoreReaper' can be used to 'forceComplete' message groups.
521529
*/
522-
if (groupTimeout != null && groupTimeout >= 0) {
530+
if (groupTimeout != null) {
523531
if (groupTimeout > 0) {
524532
final Object groupId = messageGroup.getGroupId();
525533
final long timestamp = messageGroup.getTimestamp();

spring-integration-core/src/test/java/org/springframework/integration/aggregator/ConcurrentAggregatorTests.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -47,6 +47,7 @@
4747
* @author Mark Fisher
4848
* @author Marius Bogoevici
4949
* @author Iwein Fuld
50+
* @author Artem Bilan
5051
*/
5152
public class ConcurrentAggregatorTests {
5253

@@ -90,8 +91,7 @@ public void testCompleteGroupWithinTimeout() throws InterruptedException {
9091
@Test
9192
@Ignore
9293
// dropped backwards compatibility for duplicate ID's
93-
public void testCompleteGroupWithinTimeoutWithSameId()
94-
throws InterruptedException {
94+
public void testCompleteGroupWithinTimeoutWithSameId() {
9595
QueueChannel replyChannel = new QueueChannel();
9696
Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel,
9797
"ID#1");
@@ -111,8 +111,8 @@ public void testCompleteGroupWithinTimeoutWithSameId()
111111
}
112112

113113
@Test
114-
public void testShouldNotSendPartialResultOnTimeoutByDefault()
115-
throws InterruptedException {
114+
public void testShouldNotSendPartialResultOnTimeoutByDefault() throws InterruptedException {
115+
116116
QueueChannel discardChannel = new QueueChannel();
117117
this.aggregator.setDiscardChannel(discardChannel);
118118
QueueChannel replyChannel = new QueueChannel();
@@ -126,17 +126,16 @@ public void testShouldNotSendPartialResultOnTimeoutByDefault()
126126

127127
assertEquals("Task should have completed within timeout", 0, latch
128128
.getCount());
129-
Message<?> reply = replyChannel.receive(1000);
129+
Message<?> reply = replyChannel.receive(10);
130130
assertNull("No message should have been sent normally", reply);
131131
this.store.expireMessageGroups(-10000);
132-
Message<?> discardedMessage = discardChannel.receive(1000);
132+
Message<?> discardedMessage = discardChannel.receive(10000);
133133
assertNotNull("A message should have been discarded", discardedMessage);
134134
assertEquals(message, discardedMessage);
135135
}
136136

137137
@Test
138-
public void testShouldSendPartialResultOnTimeoutTrue()
139-
throws InterruptedException {
138+
public void testShouldSendPartialResultOnTimeoutTrue() throws InterruptedException {
140139
this.aggregator.setSendPartialResultOnExpiry(true);
141140
QueueChannel replyChannel = new QueueChannel();
142141
Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel, null);
@@ -252,16 +251,14 @@ public void testTrackedCorrelationIdsCapacityPassesLimit() {
252251
}
253252

254253
@Test(expected = MessageHandlingException.class)
255-
public void testExceptionThrownIfNoCorrelationId()
256-
throws InterruptedException {
254+
public void testExceptionThrownIfNoCorrelationId() {
257255
Message<?> message = createMessage(3, null, 2, 1, new QueueChannel(),
258256
null);
259257
this.aggregator.handleMessage(message);
260258
}
261259

262260
@Test
263-
public void testAdditionalMessageAfterCompletion()
264-
throws InterruptedException {
261+
public void testAdditionalMessageAfterCompletion() throws InterruptedException {
265262
QueueChannel replyChannel = new QueueChannel();
266263
Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel, null);
267264
Message<?> message2 = createMessage(5, "ABC", 3, 2, replyChannel, null);
@@ -291,6 +288,7 @@ public void testAdditionalMessageAfterCompletion()
291288
private static Message<?> createMessage(Object payload,
292289
Object correlationId, int sequenceSize, int sequenceNumber,
293290
MessageChannel replyChannel, String predefinedId) {
291+
294292
MessageBuilder<Object> builder = MessageBuilder.withPayload(payload)
295293
.setCorrelationId(correlationId).setSequenceSize(sequenceSize)
296294
.setSequenceNumber(sequenceNumber)

spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/AggregatorIntegrationTests-context.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242

4343
<aggregator input-channel="groupTimeoutExpressionAggregatorInput" output-channel="output" discard-channel="discard"
4444
send-partial-result-on-expiry="true"
45-
group-timeout-expression="size() ge 2 ? 100 : -1"
45+
group-timeout-expression="size() ge 2 ? 100 : null"
4646
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
4747

4848
<aggregator input-channel="zeroGroupTimeoutExpressionAggregatorInput" output-channel="output" discard-channel="discard"

spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/AggregatorIntegrationTests.java

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -45,8 +45,7 @@
4545
import org.springframework.messaging.support.ErrorMessage;
4646
import org.springframework.messaging.support.GenericMessage;
4747
import org.springframework.test.annotation.DirtiesContext;
48-
import org.springframework.test.context.ContextConfiguration;
49-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
48+
import org.springframework.test.context.junit4.SpringRunner;
5049

5150
/**
5251
* @author Iwein Fuld
@@ -55,8 +54,7 @@
5554
* @author Artem Bilan
5655
* @author Gary Russell
5756
*/
58-
@RunWith(SpringJUnit4ClassRunner.class)
59-
@ContextConfiguration
57+
@RunWith(SpringRunner.class)
6058
@DirtiesContext
6159
public class AggregatorIntegrationTests {
6260

@@ -91,29 +89,29 @@ public class AggregatorIntegrationTests {
9189
private QueueChannel errors;
9290

9391
@Test
94-
public void testVanillaAggregation() throws Exception {
92+
public void testVanillaAggregation() {
9593
for (int i = 0; i < 5; i++) {
9694
Map<String, Object> headers = stubHeaders(i, 5, 1);
97-
input.send(new GenericMessage<Integer>(i, headers));
95+
input.send(new GenericMessage<>(i, headers));
9896
}
9997
Message<?> receive = output.receive(10000);
10098
assertNotNull(receive);
10199
assertEquals(1 + 2 + 3 + 4, receive.getPayload());
102100
}
103101

104102
@Test
105-
public void testNonExpiringAggregator() throws Exception {
103+
public void testNonExpiringAggregator() {
106104
for (int i = 0; i < 5; i++) {
107105
Map<String, Object> headers = stubHeaders(i, 5, 1);
108-
nonExpiringAggregatorInput.send(new GenericMessage<Integer>(i, headers));
106+
nonExpiringAggregatorInput.send(new GenericMessage<>(i, headers));
109107
}
110108
assertNotNull(output.receive(0));
111109

112110
assertNull(discard.receive(0));
113111

114112
for (int i = 5; i < 10; i++) {
115113
Map<String, Object> headers = stubHeaders(i, 5, 1);
116-
nonExpiringAggregatorInput.send(new GenericMessage<Integer>(i, headers));
114+
nonExpiringAggregatorInput.send(new GenericMessage<>(i, headers));
117115
}
118116
assertNull(output.receive(0));
119117

@@ -125,18 +123,18 @@ public void testNonExpiringAggregator() throws Exception {
125123
}
126124

127125
@Test
128-
public void testExpiringAggregator() throws Exception {
126+
public void testExpiringAggregator() {
129127
for (int i = 0; i < 5; i++) {
130128
Map<String, Object> headers = stubHeaders(i, 5, 1);
131-
expiringAggregatorInput.send(new GenericMessage<Integer>(i, headers));
129+
expiringAggregatorInput.send(new GenericMessage<>(i, headers));
132130
}
133131
assertNotNull(output.receive(0));
134132

135133
assertNull(discard.receive(0));
136134

137135
for (int i = 5; i < 10; i++) {
138136
Map<String, Object> headers = stubHeaders(i, 5, 1);
139-
expiringAggregatorInput.send(new GenericMessage<Integer>(i, headers));
137+
expiringAggregatorInput.send(new GenericMessage<>(i, headers));
140138
}
141139
assertNotNull(output.receive(0));
142140

@@ -148,7 +146,7 @@ public void testExpiringAggregator() throws Exception {
148146
public void testGroupTimeoutScheduling() throws Exception {
149147
for (int i = 0; i < 5; i++) {
150148
Map<String, Object> headers = stubHeaders(i, 5, 1);
151-
this.groupTimeoutAggregatorInput.send(new GenericMessage<Integer>(i, headers));
149+
this.groupTimeoutAggregatorInput.send(new GenericMessage<>(i, headers));
152150

153151
//Wait until 'group-timeout' does its stuff.
154152
MessageGroupStore mgs = TestUtils.getPropertyValue(this.context.getBean("gta.handler"), "messageStore",
@@ -166,11 +164,11 @@ public void testGroupTimeoutScheduling() throws Exception {
166164
@Test
167165
public void testGroupTimeoutReschedulingOnMessageDeliveryException() throws Exception {
168166
for (int i = 0; i < 5; i++) {
169-
this.output.send(new GenericMessage<String>("fake message"));
167+
this.output.send(new GenericMessage<>("fake message"));
170168
}
171169

172170
Map<String, Object> headers = stubHeaders(1, 2, 1);
173-
this.groupTimeoutAggregatorInput.send(new GenericMessage<Integer>(1, headers));
171+
this.groupTimeoutAggregatorInput.send(new GenericMessage<>(1, headers));
174172

175173
//Wait until 'group-timeout' does its stuff.
176174
MessageGroupStore mgs = TestUtils.getPropertyValue(this.context.getBean("gta.handler"), "messageStore",
@@ -190,56 +188,56 @@ public void testGroupTimeoutReschedulingOnMessageDeliveryException() throws Exce
190188
}
191189

192190
@Test
193-
public void testGroupTimeoutExpressionScheduling() throws Exception {
194-
// Since group-timeout-expression="size() >= 2 ? 100 : -1". The first message won't be scheduled to 'forceComplete'
195-
this.groupTimeoutExpressionAggregatorInput.send(new GenericMessage<Integer>(1, stubHeaders(1, 6, 1)));
191+
public void testGroupTimeoutExpressionScheduling() {
192+
// Since group-timeout-expression="size() >= 2 ? 100 : null". The first message won't be scheduled to 'forceComplete'
193+
this.groupTimeoutExpressionAggregatorInput.send(new GenericMessage<>(1, stubHeaders(1, 6, 1)));
196194
assertNull(this.output.receive(0));
197195
assertNull(this.discard.receive(0));
198196

199197
// As far as 'group.size() >= 2' it will be scheduled to 'forceComplete'
200-
this.groupTimeoutExpressionAggregatorInput.send(new GenericMessage<Integer>(2, stubHeaders(2, 6, 1)));
198+
this.groupTimeoutExpressionAggregatorInput.send(new GenericMessage<>(2, stubHeaders(2, 6, 1)));
201199
assertNull(this.output.receive(0));
202200
Message<?> receive = this.output.receive(10000);
203201
assertNotNull(receive);
204202
assertEquals(2, ((Collection<?>) receive.getPayload()).size());
205203
assertNull(this.discard.receive(0));
206204

207205
// The same with these three messages
208-
this.groupTimeoutExpressionAggregatorInput.send(new GenericMessage<Integer>(3, stubHeaders(3, 6, 1)));
206+
this.groupTimeoutExpressionAggregatorInput.send(new GenericMessage<>(3, stubHeaders(3, 6, 1)));
209207
assertNull(this.output.receive(0));
210208
assertNull(this.discard.receive(0));
211209

212-
this.groupTimeoutExpressionAggregatorInput.send(new GenericMessage<Integer>(4, stubHeaders(4, 6, 1)));
210+
this.groupTimeoutExpressionAggregatorInput.send(new GenericMessage<>(4, stubHeaders(4, 6, 1)));
213211
assertNull(this.output.receive(0));
214212
assertNull(this.discard.receive(0));
215213

216-
this.groupTimeoutExpressionAggregatorInput.send(new GenericMessage<Integer>(5, stubHeaders(5, 6, 1)));
214+
this.groupTimeoutExpressionAggregatorInput.send(new GenericMessage<>(5, stubHeaders(5, 6, 1)));
217215
assertNull(this.output.receive(0));
218216
receive = this.output.receive(10000);
219217
assertNotNull(receive);
220218
assertEquals(3, ((Collection<?>) receive.getPayload()).size());
221219
assertNull(this.discard.receive(0));
222220

223221
// The last message in the sequence - normal release by provided 'ReleaseStrategy'
224-
this.groupTimeoutExpressionAggregatorInput.send(new GenericMessage<Integer>(6, stubHeaders(6, 6, 1)));
222+
this.groupTimeoutExpressionAggregatorInput.send(new GenericMessage<>(6, stubHeaders(6, 6, 1)));
225223
receive = this.output.receive(10000);
226224
assertNotNull(receive);
227225
assertEquals(1, ((Collection<?>) receive.getPayload()).size());
228226
assertNull(this.discard.receive(0));
229227
}
230228

231229
@Test
232-
public void testZeroGroupTimeoutExpressionScheduling() throws Exception {
230+
public void testZeroGroupTimeoutExpressionScheduling() {
233231
try {
234232
this.output.purge(null);
235233
this.errors.purge(null);
236-
GenericMessage<String> message = new GenericMessage<String>("foo");
234+
GenericMessage<String> message = new GenericMessage<>("foo");
237235
this.output.send(message);
238236
this.output.send(message);
239237
this.output.send(message);
240238
this.output.send(message);
241239
this.output.send(message);
242-
this.zeroGroupTimeoutExpressionAggregatorInput.send(new GenericMessage<Integer>(1, stubHeaders(1, 2, 1)));
240+
this.zeroGroupTimeoutExpressionAggregatorInput.send(new GenericMessage<>(1, stubHeaders(1, 2, 1)));
243241
ErrorMessage em = (ErrorMessage) this.errors.receive(10000);
244242
assertNotNull(em);
245243
assertThat(em.getPayload().getMessage().toLowerCase(),
@@ -263,7 +261,7 @@ public Integer sum(List<Integer> numbers) {
263261
}
264262

265263
private Map<String, Object> stubHeaders(int sequenceNumber, int sequenceSize, int correlationId) {
266-
Map<String, Object> headers = new HashMap<String, Object>();
264+
Map<String, Object> headers = new HashMap<>();
267265
headers.put(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, sequenceNumber);
268266
headers.put(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, sequenceSize);
269267
headers.put(IntegrationMessageHeaderAccessor.CORRELATION_ID, correlationId);

0 commit comments

Comments
 (0)