Skip to content

Commit b0be46a

Browse files
committed
Fix logic in the PollingTransactionTests
The `transactionWithCommitAndAdvices()` uses `SimpleRepeatAdvice` which essentially performs `doPoll()` twice. In this case we don't have enough messages in the `goodInputWithAdvice` queue * Add one more message to the `goodInputWithAdvice` queue during testing * Increase all the timeouts * Increase capacity in queues for the `transactionWithCommitAndAdvices()` **Cherry-pick to 5.0.x and 4.3.x** (cherry picked from commit 000f297)
1 parent a48d54b commit b0be46a

File tree

2 files changed

+72
-52
lines changed

2 files changed

+72
-52
lines changed

spring-integration-core/src/test/java/org/springframework/integration/dispatcher/PollingTransactionTests.java

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,6 +34,7 @@
3434
import org.springframework.aop.Advisor;
3535
import org.springframework.aop.ProxyMethodInvocation;
3636
import org.springframework.aop.framework.Advised;
37+
import org.springframework.context.Lifecycle;
3738
import org.springframework.context.support.ClassPathXmlApplicationContext;
3839
import org.springframework.integration.endpoint.PollingConsumer;
3940
import org.springframework.integration.test.util.TestUtils;
@@ -68,8 +69,9 @@ public void transactionWithCommit() throws InterruptedException {
6869
PollableChannel output = (PollableChannel) context.getBean("output");
6970
assertEquals(0, txManager.getCommitCount());
7071
assertEquals(0, txManager.getRollbackCount());
71-
input.send(new GenericMessage<String>("test"));
72-
txManager.waitForCompletion(1000);
72+
context.getBean("goodService", Lifecycle.class).start();
73+
input.send(new GenericMessage<>("test"));
74+
txManager.waitForCompletion(10000);
7375
Message<?> message = output.receive(0);
7476
assertNotNull(message);
7577
assertEquals(1, txManager.getCommitCount());
@@ -82,11 +84,12 @@ public void transactionWithCommit() throws InterruptedException {
8284
public void transactionWithCommitAndAdvices() throws InterruptedException {
8385
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
8486
"transactionTests.xml", this.getClass());
85-
PollingConsumer advicedPoller = context.getBean("advicedSa", PollingConsumer.class);
87+
PollingConsumer advisedPoller = context.getBean("advisedSa", PollingConsumer.class);
8688

87-
List<Advice> adviceChain = TestUtils.getPropertyValue(advicedPoller, "adviceChain", List.class);
89+
List<Advice> adviceChain = TestUtils.getPropertyValue(advisedPoller, "adviceChain", List.class);
8890
assertEquals(4, adviceChain.size());
89-
Runnable poller = TestUtils.getPropertyValue(advicedPoller, "poller", Runnable.class);
91+
advisedPoller.start();
92+
Runnable poller = TestUtils.getPropertyValue(advisedPoller, "poller", Runnable.class);
9093
Callable<?> pollingTask = TestUtils.getPropertyValue(poller, "pollingTask", Callable.class);
9194
assertTrue("Poller is not Advised", pollingTask instanceof Advised);
9295
Advisor[] advisors = ((Advised) pollingTask).getAdvisors();
@@ -98,10 +101,14 @@ public void transactionWithCommitAndAdvices() throws InterruptedException {
98101
PollableChannel output = (PollableChannel) context.getBean("output");
99102
assertEquals(0, txManager.getCommitCount());
100103
assertEquals(0, txManager.getRollbackCount());
104+
101105
input.send(new GenericMessage<>("test"));
106+
input.send(new GenericMessage<>("test2"));
102107
txManager.waitForCompletion(10000);
103108
Message<?> message = output.receive(0);
104109
assertNotNull(message);
110+
message = output.receive(0);
111+
assertNotNull(message);
105112
assertEquals(0, txManager.getRollbackCount());
106113
context.close();
107114
}
@@ -115,8 +122,11 @@ public void transactionWithRollback() throws InterruptedException {
115122
PollableChannel output = (PollableChannel) context.getBean("output");
116123
assertEquals(0, txManager.getCommitCount());
117124
assertEquals(0, txManager.getRollbackCount());
118-
input.send(new GenericMessage<String>("test"));
119-
txManager.waitForCompletion(1000);
125+
126+
context.getBean("badService", Lifecycle.class).start();
127+
128+
input.send(new GenericMessage<>("test"));
129+
txManager.waitForCompletion(10000);
120130
Message<?> message = output.receive(0);
121131
assertNull(message);
122132
assertEquals(0, txManager.getCommitCount());
@@ -132,10 +142,10 @@ public void propagationRequired() throws InterruptedException {
132142
PollableChannel input = (PollableChannel) context.getBean("input");
133143
PollableChannel output = (PollableChannel) context.getBean("output");
134144
assertEquals(0, txManager.getCommitCount());
135-
input.send(new GenericMessage<String>("test"));
136-
Message<?> reply = output.receive(3000);
145+
input.send(new GenericMessage<>("test"));
146+
Message<?> reply = output.receive(10000);
137147
assertNotNull(reply);
138-
txManager.waitForCompletion(3000);
148+
txManager.waitForCompletion(10000);
139149
assertEquals(1, txManager.getCommitCount());
140150
assertEquals(Propagation.REQUIRED.value(), txManager.getLastDefinition().getPropagationBehavior());
141151
context.close();
@@ -149,58 +159,58 @@ public void propagationRequiresNew() throws InterruptedException {
149159
PollableChannel input = (PollableChannel) context.getBean("input");
150160
PollableChannel output = (PollableChannel) context.getBean("output");
151161
assertEquals(0, txManager.getCommitCount());
152-
input.send(new GenericMessage<String>("test"));
153-
Message<?> reply = output.receive(3000);
162+
input.send(new GenericMessage<>("test"));
163+
Message<?> reply = output.receive(10000);
154164
assertNotNull(reply);
155-
txManager.waitForCompletion(3000);
165+
txManager.waitForCompletion(10000);
156166
assertEquals(1, txManager.getCommitCount());
157167
assertEquals(Propagation.REQUIRES_NEW.value(), txManager.getLastDefinition().getPropagationBehavior());
158168
context.close();
159169
}
160170

161171
@Test
162-
public void propagationSupports() throws InterruptedException {
172+
public void propagationSupports() {
163173
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
164174
"propagationSupportsTests.xml", this.getClass());
165175
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager");
166176
PollableChannel input = (PollableChannel) context.getBean("input");
167177
PollableChannel output = (PollableChannel) context.getBean("output");
168178
assertEquals(0, txManager.getCommitCount());
169-
input.send(new GenericMessage<String>("test"));
170-
Message<?> reply = output.receive(3000);
179+
input.send(new GenericMessage<>("test"));
180+
Message<?> reply = output.receive(10000);
171181
assertNotNull(reply);
172182
assertEquals(0, txManager.getCommitCount());
173183
assertNull(txManager.getLastDefinition());
174184
context.close();
175185
}
176186

177187
@Test
178-
public void propagationNotSupported() throws InterruptedException {
188+
public void propagationNotSupported() {
179189
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
180190
"propagationNotSupportedTests.xml", this.getClass());
181191
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager");
182192
PollableChannel input = (PollableChannel) context.getBean("input");
183193
PollableChannel output = (PollableChannel) context.getBean("output");
184194
assertEquals(0, txManager.getCommitCount());
185-
input.send(new GenericMessage<String>("test"));
186-
Message<?> reply = output.receive(3000);
195+
input.send(new GenericMessage<>("test"));
196+
Message<?> reply = output.receive(10000);
187197
assertNotNull(reply);
188198
assertEquals(0, txManager.getCommitCount());
189199
assertNull(txManager.getLastDefinition());
190200
context.close();
191201
}
192202

193203
@Test
194-
public void propagationMandatory() throws Throwable {
204+
public void propagationMandatory() {
195205
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
196206
"propagationMandatoryTests.xml", this.getClass());
197207
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager");
198208
PollableChannel input = (PollableChannel) context.getBean("input");
199209
PollableChannel output = (PollableChannel) context.getBean("output");
200210
PollableChannel errorChannel = (PollableChannel) context.getBean("errorChannel");
201211
assertEquals(0, txManager.getCommitCount());
202-
input.send(new GenericMessage<String>("test"));
203-
Message<?> errorMessage = errorChannel.receive(3000);
212+
input.send(new GenericMessage<>("test"));
213+
Message<?> errorMessage = errorChannel.receive(10000);
204214
assertNotNull(errorMessage);
205215
Object payload = errorMessage.getPayload();
206216
assertEquals(MessagingException.class, payload.getClass());
@@ -212,7 +222,7 @@ public void propagationMandatory() throws Throwable {
212222
}
213223

214224
@Test
215-
public void commitFailureAndHandlerFailureTest() throws Throwable {
225+
public void commitFailureAndHandlerFailureTest() {
216226
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
217227
"transactionFailureTests.xml", this.getClass());
218228
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManagerBad");
@@ -222,7 +232,7 @@ public void commitFailureAndHandlerFailureTest() throws Throwable {
222232
PollableChannel errorChannel = (PollableChannel) context.getBean("errorChannel");
223233
assertEquals(0, txManager.getCommitCount());
224234
inputTxFail.send(new GenericMessage<>("commitFailureTest"));
225-
Message<?> errorMessage = errorChannel.receive(10000);
235+
Message<?> errorMessage = errorChannel.receive(20000);
226236
assertNotNull(errorMessage);
227237
Object payload = errorMessage.getPayload();
228238
assertEquals(MessagingException.class, payload.getClass());
@@ -232,7 +242,7 @@ public void commitFailureAndHandlerFailureTest() throws Throwable {
232242
assertNotNull(output.receive(0));
233243
assertEquals(0, txManager.getCommitCount());
234244

235-
inputHandlerFail.send(new GenericMessage<>("handlerFalilureTest"));
245+
inputHandlerFail.send(new GenericMessage<>("handlerFailureTest"));
236246
errorMessage = errorChannel.receive(10000);
237247
assertNotNull(errorMessage);
238248
payload = errorMessage.getPayload();
Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,64 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<beans:beans xmlns="http://www.springframework.org/schema/integration"
3-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans"
4-
xsi:schemaLocation="http://www.springframework.org/schema/beans
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:beans="http://www.springframework.org/schema/beans"
5+
xsi:schemaLocation="http://www.springframework.org/schema/beans
56
http://www.springframework.org/schema/beans/spring-beans.xsd
67
http://www.springframework.org/schema/integration
78
http://www.springframework.org/schema/integration/spring-integration.xsd">
89

910
<channel id="badInput">
10-
<queue capacity="1" />
11+
<queue capacity="1"/>
1112
</channel>
1213

1314
<channel id="goodInput">
14-
<queue capacity="1" />
15+
<queue capacity="1"/>
1516
</channel>
16-
17+
1718
<channel id="goodInputWithAdvice">
18-
<queue capacity="1" />
19+
<queue capacity="2"/>
1920
</channel>
2021

2122
<channel id="output">
22-
<queue capacity="1" />
23+
<queue capacity="2"/>
2324
</channel>
2425

25-
<service-activator input-channel="badInput" ref="testBean"
26-
method="bad" output-channel="output">
26+
<service-activator id="badService"
27+
auto-startup="false"
28+
input-channel="badInput"
29+
ref="testBean" method="bad"
30+
output-channel="output">
2731
<poller max-messages-per-poll="1" fixed-delay="1" receive-timeout="-1">
28-
<transactional transaction-manager="txManager" />
32+
<transactional transaction-manager="txManager"/>
2933
</poller>
3034
</service-activator>
3135

32-
<service-activator input-channel="goodInput" ref="testBean"
33-
method="good" output-channel="output">
36+
<service-activator id="goodService"
37+
auto-startup="false"
38+
input-channel="goodInput"
39+
ref="testBean" method="good"
40+
output-channel="output">
3441
<poller max-messages-per-poll="1" fixed-delay="1" receive-timeout="-1">
35-
<transactional transaction-manager="txManager" />
42+
<transactional transaction-manager="txManager"/>
3643
</poller>
3744
</service-activator>
38-
39-
<service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
40-
method="good" output-channel="output">
45+
46+
<service-activator id="advisedSa"
47+
auto-startup="false"
48+
input-channel="goodInputWithAdvice"
49+
ref="testBean" method="good"
50+
output-channel="output">
4151
<poller max-messages-per-poll="1" fixed-delay="1" receive-timeout="-1">
4252
<advice-chain>
4353
<ref bean="txAdvise"/>
44-
<ref bean="adviceA" />
45-
<beans:bean class="org.springframework.integration.dispatcher.PollingTransactionTests.SampleAdvice"/>
46-
<beans:bean class="org.springframework.integration.dispatcher.PollingTransactionTests.SimpleRepeatAdvice"/>
54+
<ref bean="adviceA"/>
55+
<beans:bean class="org.springframework.integration.dispatcher.PollingTransactionTests$SampleAdvice"/>
56+
<beans:bean
57+
class="org.springframework.integration.dispatcher.PollingTransactionTests$SimpleRepeatAdvice"/>
4758
</advice-chain>
4859
</poller>
4960
</service-activator>
50-
61+
5162
<beans:bean id="txAdvise" class="org.springframework.transaction.interceptor.TransactionInterceptor">
5263
<beans:property name="transactionManager" ref="txManager"/>
5364
<beans:property name="transactionAttributeSource">
@@ -58,12 +69,11 @@
5869
</beans:bean>
5970
</beans:property>
6071
</beans:bean>
61-
62-
<beans:bean id="adviceA" class="org.springframework.integration.dispatcher.PollingTransactionTests.SampleAdvice"/>
6372

64-
<beans:bean id="testBean"
65-
class="org.springframework.integration.dispatcher.TestBean" />
73+
<beans:bean id="adviceA" class="org.springframework.integration.dispatcher.PollingTransactionTests$SampleAdvice"/>
74+
75+
<beans:bean id="testBean" class="org.springframework.integration.dispatcher.TestBean"/>
76+
77+
<beans:bean id="txManager" class="org.springframework.integration.util.TestTransactionManager"/>
6678

67-
<beans:bean id="txManager"
68-
class="org.springframework.integration.util.TestTransactionManager" />
6979
</beans:beans>

0 commit comments

Comments
 (0)