Skip to content

Commit d807fc3

Browse files
committed
Fix some sporadic tests failures
* Increase timeouts in the `RoundRobinDispatcherConcurrentTests` and `ManualFlowTests` * Fix `PollerAdviceTests` to re-use `TaskScheduler` from the ctx instead of local, not closed instance * Use `OnlyOnceTrigger` instead of local implementations * Change the `primary` `Trigger` bean to the `PeriodicTrigger` as well. The minimum interval for the `CronTrigger` is 1 seconds - it doesn't matter for this test-case
1 parent b940e38 commit d807fc3

File tree

4 files changed

+44
-63
lines changed

4 files changed

+44
-63
lines changed

spring-integration-core/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherConcurrentTests.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.dispatcher;
1818

1919
import static org.junit.Assert.assertFalse;
20+
import static org.junit.Assert.assertTrue;
2021
import static org.junit.Assert.fail;
2122
import static org.mockito.Mockito.doThrow;
2223
import static org.mockito.Mockito.times;
@@ -42,6 +43,7 @@
4243

4344
/**
4445
* @author Iwein Fuld
46+
* @author Artem Bilan
4547
*/
4648
@RunWith(MockitoJUnitRunner.class)
4749
public class RoundRobinDispatcherConcurrentTests {
@@ -68,7 +70,7 @@ public class RoundRobinDispatcherConcurrentTests {
6870
private Message<?> message;
6971

7072
@Before
71-
public void initialize() throws Exception {
73+
public void initialize() {
7274
dispatcher.setLoadBalancingStrategy(new RoundRobinLoadBalancingStrategy());
7375
executor.setCorePoolSize(10);
7476
executor.setMaxPoolSize(10);
@@ -80,7 +82,7 @@ public void tearDown() {
8082
this.executor.shutdown();
8183
}
8284

83-
@Test(timeout = 1000)
85+
@Test
8486
public void noHandlerExhaustion() throws Exception {
8587
dispatcher.addHandler(handler1);
8688
dispatcher.addHandler(handler2);
@@ -106,15 +108,15 @@ public void noHandlerExhaustion() throws Exception {
106108
executor.execute(messageSenderTask);
107109
}
108110
start.countDown();
109-
allDone.await();
111+
assertTrue(allDone.await(10, TimeUnit.SECONDS));
110112
assertFalse("not all messages were accepted", failed.get());
111113
verify(handler1, times(TOTAL_EXECUTIONS / 4)).handleMessage(message);
112114
verify(handler2, times(TOTAL_EXECUTIONS / 4)).handleMessage(message);
113115
verify(handler3, times(TOTAL_EXECUTIONS / 4)).handleMessage(message);
114116
verify(handler4, times(TOTAL_EXECUTIONS / 4)).handleMessage(message);
115117
}
116118

117-
@Test(timeout = 2000)
119+
@Test
118120
public void unlockOnFailure() throws Exception {
119121
// dispatcher has no subscribers (shouldn't lead to deadlock)
120122
final CountDownLatch start = new CountDownLatch(1);
@@ -140,7 +142,7 @@ public void unlockOnFailure() throws Exception {
140142
executor.execute(messageSenderTask);
141143
}
142144
start.countDown();
143-
allDone.await();
145+
assertTrue(allDone.await(10, TimeUnit.SECONDS));
144146
}
145147

146148
@Test
@@ -152,30 +154,28 @@ public void noHandlerSkipUnderConcurrentFailureWithFailover() throws Exception {
152154
final CountDownLatch allDone = new CountDownLatch(TOTAL_EXECUTIONS);
153155
final Message<?> message = this.message;
154156
final AtomicBoolean failed = new AtomicBoolean(false);
155-
Runnable messageSenderTask = new Runnable() {
156-
@Override
157-
public void run() {
158-
try {
159-
start.await();
160-
}
161-
catch (InterruptedException e) {
162-
Thread.currentThread().interrupt();
163-
}
164-
if (!dispatcher.dispatch(message)) {
165-
failed.set(true);
166-
}
167-
else {
168-
allDone.countDown();
169-
}
157+
Runnable messageSenderTask = () -> {
158+
try {
159+
start.await();
160+
}
161+
catch (InterruptedException e) {
162+
Thread.currentThread().interrupt();
163+
}
164+
if (!dispatcher.dispatch(message)) {
165+
failed.set(true);
166+
}
167+
else {
168+
allDone.countDown();
170169
}
171170
};
172171
for (int i = 0; i < TOTAL_EXECUTIONS; i++) {
173172
executor.execute(messageSenderTask);
174173
}
175174
start.countDown();
176-
allDone.await(5000, TimeUnit.MILLISECONDS);
175+
assertTrue(allDone.await(10, TimeUnit.SECONDS));
177176
assertFalse("not all messages were accepted", failed.get());
178177
verify(handler1, times(TOTAL_EXECUTIONS / 2)).handleMessage(message);
179178
verify(handler2, times(TOTAL_EXECUTIONS)).handleMessage(message);
180179
}
180+
181181
}

spring-integration-core/src/test/java/org/springframework/integration/dsl/manualflow/ManualFlowTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.hamcrest.Matchers.containsString;
2020
import static org.hamcrest.Matchers.instanceOf;
21+
import static org.hamcrest.Matchers.lessThan;
2122
import static org.junit.Assert.assertEquals;
2223
import static org.junit.Assert.assertFalse;
2324
import static org.junit.Assert.assertNotNull;
@@ -191,8 +192,12 @@ public void testManualFlowRegistration() throws InterruptedException {
191192
assertFalse(this.beanFactory.containsBean(flowRegistration.getId() + BeanFactoryHandler.class.getName() + "#0"));
192193

193194
ThreadPoolTaskScheduler taskScheduler = this.beanFactory.getBean(ThreadPoolTaskScheduler.class);
194-
Thread.sleep(100);
195-
assertEquals(0, taskScheduler.getActiveCount());
195+
196+
int n = 0;
197+
while (taskScheduler.getActiveCount() > 0 && n++ < 100) {
198+
Thread.sleep(100);
199+
}
200+
assertThat(n, lessThan(100));
196201

197202
assertTrue(additionalBean.destroyed);
198203
}

spring-integration-core/src/test/java/org/springframework/integration/endpoint/PollerAdviceTests.java

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,11 @@
2525
import static org.junit.Assert.assertTrue;
2626
import static org.mockito.ArgumentMatchers.any;
2727
import static org.mockito.Mockito.atLeast;
28-
import static org.mockito.Mockito.mock;
2928
import static org.mockito.Mockito.spy;
3029
import static org.mockito.Mockito.verify;
3130

3231
import java.util.ArrayList;
3332
import java.util.Collections;
34-
import java.util.Date;
3533
import java.util.LinkedList;
3634
import java.util.List;
3735
import java.util.concurrent.CountDownLatch;
@@ -42,7 +40,6 @@
4240
import org.aopalliance.aop.Advice;
4341
import org.aopalliance.intercept.Joinpoint;
4442
import org.aopalliance.intercept.MethodInterceptor;
45-
import org.junit.Rule;
4643
import org.junit.Test;
4744
import org.junit.runner.RunWith;
4845

@@ -66,7 +63,6 @@
6663
import org.springframework.integration.core.MessageSource;
6764
import org.springframework.integration.scheduling.PollSkipAdvice;
6865
import org.springframework.integration.scheduling.SimplePollSkipStrategy;
69-
import org.springframework.integration.test.rule.Log4j2LevelAdjuster;
7066
import org.springframework.integration.test.util.OnlyOnceTrigger;
7167
import org.springframework.integration.test.util.TestUtils;
7268
import org.springframework.integration.util.CompoundTrigger;
@@ -94,15 +90,18 @@
9490
@DirtiesContext
9591
public class PollerAdviceTests {
9692

97-
@Rule
98-
public Log4j2LevelAdjuster adjuster = Log4j2LevelAdjuster.trace();
99-
10093
@Autowired
10194
private MessageChannel control;
10295

10396
@Autowired
10497
private SimplePollSkipStrategy skipper;
10598

99+
@Autowired
100+
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
101+
102+
@Autowired
103+
private BeanFactory beanFactory;
104+
106105
@Test
107106
public void testDefaultDontSkip() throws Exception {
108107
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
@@ -111,19 +110,9 @@ public void testDefaultDontSkip() throws Exception {
111110
latch.countDown();
112111
return null;
113112
});
114-
adapter.setTrigger(new Trigger() {
115-
116-
private boolean done;
117-
118-
@Override
119-
public Date nextExecutionTime(TriggerContext triggerContext) {
120-
Date date = done ? null : new Date(System.currentTimeMillis() + 10);
121-
done = true;
122-
return date;
123-
}
124-
});
113+
adapter.setTrigger(new OnlyOnceTrigger());
125114
configure(adapter);
126-
List<Advice> adviceChain = new ArrayList<Advice>();
115+
List<Advice> adviceChain = new ArrayList<>();
127116
PollSkipAdvice advice = new PollSkipAdvice();
128117
adviceChain.add(advice);
129118
adapter.setAdviceChain(adviceChain);
@@ -153,18 +142,7 @@ public Message<Object> receive() {
153142
}
154143
CountDownLatch latch = new CountDownLatch(1);
155144
adapter.setSource(new LocalSource(latch));
156-
class OneAndDone10msTrigger implements Trigger {
157-
158-
private boolean done;
159-
160-
@Override
161-
public Date nextExecutionTime(TriggerContext triggerContext) {
162-
Date date = done ? null : new Date(System.currentTimeMillis() + 10);
163-
done = true;
164-
return date;
165-
}
166-
}
167-
adapter.setTrigger(new OneAndDone10msTrigger());
145+
adapter.setTrigger(new OnlyOnceTrigger());
168146
configure(adapter);
169147
List<Advice> adviceChain = new ArrayList<>();
170148
SimplePollSkipStrategy skipper = new SimplePollSkipStrategy();
@@ -179,7 +157,7 @@ public Date nextExecutionTime(TriggerContext triggerContext) {
179157
skipper.reset();
180158
latch = new CountDownLatch(1);
181159
adapter.setSource(new LocalSource(latch));
182-
adapter.setTrigger(new OneAndDone10msTrigger());
160+
adapter.setTrigger(new OnlyOnceTrigger());
183161
adapter.start();
184162
assertTrue(latch.await(10, TimeUnit.SECONDS));
185163
adapter.stop();
@@ -276,7 +254,7 @@ public Message<?> afterReceive(Message<?> result, MessageSource<?> target) {
276254
public void testActiveIdleAdvice() throws Exception {
277255
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
278256
final CountDownLatch latch = new CountDownLatch(5);
279-
final LinkedList<Long> triggerPeriods = new LinkedList<Long>();
257+
final LinkedList<Long> triggerPeriods = new LinkedList<>();
280258
final DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(10);
281259
adapter.setSource(() -> {
282260
triggerPeriods.add(trigger.getPeriod());
@@ -307,7 +285,7 @@ public void testActiveIdleAdvice() throws Exception {
307285
public void testCompoundTriggerAdvice() throws Exception {
308286
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
309287
final CountDownLatch latch = new CountDownLatch(5);
310-
final LinkedList<Object> overridePresent = new LinkedList<Object>();
288+
final LinkedList<Object> overridePresent = new LinkedList<>();
311289
final CompoundTrigger compoundTrigger = new CompoundTrigger(new PeriodicTrigger(10));
312290
Trigger override = spy(new PeriodicTrigger(5));
313291
final CompoundTriggerAdvice advice = new CompoundTriggerAdvice(compoundTrigger, override);
@@ -336,10 +314,8 @@ public void testCompoundTriggerAdvice() throws Exception {
336314

337315
private void configure(SourcePollingChannelAdapter adapter) {
338316
adapter.setOutputChannel(new NullChannel());
339-
adapter.setBeanFactory(mock(BeanFactory.class));
340-
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
341-
scheduler.afterPropertiesSet();
342-
adapter.setTaskScheduler(scheduler);
317+
adapter.setBeanFactory(this.beanFactory);
318+
adapter.setTaskScheduler(this.threadPoolTaskScheduler);
343319
}
344320

345321
@Test

spring-integration-core/src/test/java/org/springframework/integration/endpoint/compound-trigger-context.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
<constructor-arg ref="primary" />
2323
</bean>
2424

25-
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
26-
<constructor-arg value="*/1 * * * * *" />
25+
<bean id="primary" class="org.springframework.scheduling.support.PeriodicTrigger">
26+
<constructor-arg value="10" />
2727
</bean>
2828

2929
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">

0 commit comments

Comments
 (0)