Skip to content

Commit e83e2ca

Browse files
committed
Fix JDBC tests
https://build.spring.io/browse/INT-MJATS41-1279 Looks like there is a race condition when our polling rate around data base is too fast and we have access to the DB files even during application context close. * Stop polling channel adapter in the tests explicitly after test methods * Increase some tests performance decreasing timeouts to wait * Increase timeouts and performance in the `StreamTransformerParserTests` # Conflicts: # build.gradle # spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelIntegrationTests-context.xml # spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelIntegrationTests.java # spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelOnePollerIntegrationTests-context.xml # spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelOnePollerIntegrationTests.java # spring-integration-stomp/src/test/java/org/springframework/integration/stomp/client/StompServerIntegrationTests.java Cherry-picked from cb0d43d
1 parent 0f47fd2 commit e83e2ca

File tree

6 files changed

+100
-79
lines changed

6 files changed

+100
-79
lines changed

spring-integration-core/src/test/java/org/springframework/integration/config/xml/StreamTransformerParserTests-context.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<stream-transformer input-channel="directInput" output-channel="output"/>
2121

2222
<stream-transformer input-channel="queueInput" output-channel="output">
23-
<poller fixed-delay="10000"/>
23+
<poller fixed-delay="1"/>
2424
</stream-transformer>
2525

2626
<chain input-channel="charsetChannel" output-channel="output">

spring-integration-core/src/test/java/org/springframework/integration/config/xml/StreamTransformerParserTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@
3939
/**
4040
* @author Mark Fisher
4141
* @author Gary Russell
42+
* @author Artem Bilan
4243
*/
4344
@ContextConfiguration
4445
@RunWith(SpringJUnit4ClassRunner.class)
@@ -64,23 +65,23 @@ public class StreamTransformerParserTests {
6465
@Test
6566
public void directChannelWithStringMessage() {
6667
this.directInput.send(new GenericMessage<InputStream>(new ByteArrayInputStream("foo".getBytes())));
67-
Message<?> result = output.receive(0);
68+
Message<?> result = output.receive(10000);
6869
assertNotNull(result);
6970
assertArrayEquals("foo".getBytes(), (byte[]) result.getPayload());
7071
}
7172

7273
@Test
7374
public void queueChannelWithStringMessage() {
7475
this.queueInput.send(new GenericMessage<InputStream>(new ByteArrayInputStream("foo".getBytes())));
75-
Message<?> result = output.receive(3000);
76+
Message<?> result = output.receive(10000);
7677
assertNotNull(result);
7778
assertArrayEquals("foo".getBytes(), (byte[]) result.getPayload());
7879
}
7980

8081
@Test
8182
public void charset() {
8283
this.charsetChannel.send(new GenericMessage<InputStream>(new ByteArrayInputStream("foo".getBytes())));
83-
Message<?> result = output.receive(0);
84+
Message<?> result = output.receive(10000);
8485
assertNotNull(result);
8586
assertEquals("foo", result.getPayload());
8687
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelIntegrationTests-context.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
<bean id="transactionManager"
6666
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
6767
<property name="dataSource" ref="dataSource"/>
68+
<property name="defaultTimeout" value="10"/>
6869
</bean>
6970

7071
<int:header-enricher input-channel="routingSlip" output-channel="input">

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelIntegrationTests.java

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.TimeUnit;
3232

3333
import org.hamcrest.Matchers;
34+
import org.junit.After;
3435
import org.junit.Before;
3536
import org.junit.Test;
3637
import org.junit.runner.RunWith;
@@ -39,6 +40,7 @@
3940
import org.springframework.beans.factory.annotation.Qualifier;
4041
import org.springframework.core.serializer.support.SerializationFailedException;
4142
import org.springframework.integration.channel.QueueChannel;
43+
import org.springframework.integration.endpoint.AbstractEndpoint;
4244
import org.springframework.integration.store.MessageGroup;
4345
import org.springframework.messaging.MessageChannel;
4446
import org.springframework.messaging.MessageDeliveryException;
@@ -83,25 +85,36 @@ public class JdbcMessageStoreChannelIntegrationTests {
8385
@Autowired
8486
private MessageChannel routingSlip;
8587

88+
@Autowired
89+
@Qualifier("service-activator")
90+
private AbstractEndpoint serviceActivator;
91+
8692
@Before
8793
public void clear() {
8894
Service.reset(1);
8995
for (MessageGroup group : messageStore) {
9096
messageStore.removeMessageGroup(group.getGroupId());
9197
}
98+
99+
this.serviceActivator.start();
100+
}
101+
102+
@After
103+
public void tearDown() {
104+
this.serviceActivator.stop();
92105
}
93106

94107
@Test
95108
public void testSendAndActivate() throws Exception {
96-
input.send(new GenericMessage<String>("foo"));
109+
input.send(new GenericMessage<>("foo"));
97110
Service.await(10000);
98111
assertEquals(1, Service.messages.size());
99112
}
100113

101114
@Test
102115
public void testSendAndActivateWithRollback() throws Exception {
103116
Service.fail = true;
104-
input.send(new GenericMessage<String>("foo"));
117+
input.send(new GenericMessage<>("foo"));
105118
Service.await(10000);
106119
assertThat(Service.messages.size(), Matchers.greaterThanOrEqualTo(1));
107120
// After a rollback in the poller the message is still waiting to be delivered
@@ -128,28 +141,24 @@ public Void doInTransaction(TransactionStatus status) {
128141
@Repeat(2)
129142
public void testTransactionalSendAndReceive() throws Exception {
130143

131-
boolean result = new TransactionTemplate(transactionManager).execute(new TransactionCallback<Boolean>() {
132-
133-
@Override
134-
public Boolean doInTransaction(TransactionStatus status) {
144+
boolean result = new TransactionTemplate(transactionManager).execute(status -> {
135145

136-
synchronized (storeLock) {
137-
138-
boolean result = input.send(new GenericMessage<String>("foo"), 500L);
139-
// This will time out because the transaction has not committed yet
140-
try {
141-
Service.await(3000);
142-
fail("Expected timeout");
143-
}
144-
catch (Exception e) {
145-
// expected
146-
}
147-
148-
return result;
146+
synchronized (storeLock) {
149147

148+
boolean result1 = input.send(new GenericMessage<>("foo"), 100L);
149+
// This will time out because the transaction has not committed yet
150+
try {
151+
Service.await(100);
152+
fail("Expected timeout");
150153
}
154+
catch (Exception e) {
155+
// expected
156+
}
157+
158+
return result1;
151159

152160
}
161+
153162
});
154163

155164
assertTrue("Could not send message", result);
@@ -184,7 +193,7 @@ protected void waitForMessage() throws InterruptedException {
184193
}
185194

186195
@Test
187-
public void testSameTransactionSendAndReceive() throws Exception {
196+
public void testSameTransactionSendAndReceive() {
188197

189198
final StopWatch stopWatch = new StopWatch();
190199
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
@@ -239,7 +248,7 @@ public Boolean doInTransaction(TransactionStatus status) {
239248
@Test
240249
public void testWithRoutingSlip() {
241250
try {
242-
this.routingSlip.send(new GenericMessage<String>("foo"));
251+
this.routingSlip.send(new GenericMessage<>("foo"));
243252
fail("MessageDeliveryException expected");
244253
}
245254
catch (Exception e) {
@@ -255,7 +264,7 @@ public static class Service {
255264

256265
private static boolean fail = false;
257266

258-
private static List<String> messages = new CopyOnWriteArrayList<String>();
267+
private static List<String> messages = new CopyOnWriteArrayList<>();
259268

260269
private static CountDownLatch latch;
261270

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelOnePollerIntegrationTests-context.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
<bean id="transactionManager"
6060
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
6161
<property name="dataSource" ref="dataSource" />
62+
<property name="defaultTimeout" value="10"/>
6263
</bean>
6364

6465
</beans>

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelOnePollerIntegrationTests.java

Lines changed: 60 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,13 +26,15 @@
2626
import java.util.concurrent.CountDownLatch;
2727
import java.util.concurrent.TimeUnit;
2828

29+
import org.junit.After;
2930
import org.junit.Before;
3031
import org.junit.Test;
3132
import org.junit.runner.RunWith;
3233

3334
import org.springframework.beans.factory.annotation.Autowired;
3435
import org.springframework.beans.factory.annotation.Qualifier;
3536
import org.springframework.integration.channel.QueueChannel;
37+
import org.springframework.integration.endpoint.AbstractEndpoint;
3638
import org.springframework.integration.store.MessageGroup;
3739
import org.springframework.messaging.support.GenericMessage;
3840
import org.springframework.test.annotation.DirtiesContext;
@@ -44,6 +46,11 @@
4446
import org.springframework.transaction.support.TransactionTemplate;
4547
import org.springframework.util.StopWatch;
4648

49+
/**
50+
* @author Dave Syer
51+
* @author Gary Russell
52+
* @author Artem Bilan
53+
*/
4754
@ContextConfiguration
4855
@RunWith(SpringJUnit4ClassRunner.class)
4956
@DirtiesContext // close at the end after class
@@ -65,54 +72,58 @@ public class JdbcMessageStoreChannelOnePollerIntegrationTests {
6572
@Autowired
6673
private PlatformTransactionManager transactionManager;
6774

75+
@Autowired
76+
@Qualifier("service-relay")
77+
private AbstractEndpoint serviceRelay;
78+
6879
@Before
6980
public void clear() {
70-
for (MessageGroup group : messageStore) {
71-
messageStore.removeMessageGroup(group.getGroupId());
81+
for (MessageGroup group : this.messageStore) {
82+
this.messageStore.removeMessageGroup(group.getGroupId());
7283
}
7384
}
7485

86+
@After
87+
public void tearDown() {
88+
this.serviceRelay.stop();
89+
}
90+
7591
@Test
76-
// @Repeat(50)
7792
public void testSameTransactionDifferentChannelSendAndReceive() throws Exception {
78-
7993
Service.reset(1);
80-
assertNull(durable.receive(100L));
81-
assertNull(relay.receive(100L));
94+
assertNull(this.durable.receive(100L));
95+
assertNull(this.relay.receive(100L));
8296
final StopWatch stopWatch = new StopWatch();
8397

84-
boolean result = new TransactionTemplate(transactionManager).execute(new TransactionCallback<Boolean>() {
85-
86-
@Override
87-
public Boolean doInTransaction(TransactionStatus status) {
88-
89-
synchronized (storeLock) {
98+
boolean result =
99+
new TransactionTemplate(this.transactionManager)
100+
.execute(status -> {
90101

91-
boolean result = relay.send(new GenericMessage<String>("foo"), 500L);
92-
// This will time out because the transaction has not committed yet
93-
try {
94-
Service.await(1000);
95-
fail("Expected timeout");
96-
}
97-
catch (Exception e) {
98-
// expected
99-
}
102+
synchronized (this.storeLock) {
100103

101-
try {
102-
stopWatch.start();
103-
// It hasn't arrive yet because we are still in the sending transaction
104-
assertNull(durable.receive(100L));
105-
}
106-
finally {
107-
stopWatch.stop();
108-
}
104+
boolean result1 = this.relay.send(new GenericMessage<>("foo"), 500L);
105+
// This will time out because the transaction has not committed yet
106+
try {
107+
Service.await(100);
108+
fail("Expected timeout");
109+
}
110+
catch (Exception e) {
111+
// expected
112+
}
109113

110-
return result;
114+
try {
115+
stopWatch.start();
116+
// It hasn't arrive yet because we are still in the sending transaction
117+
assertNull(this.durable.receive(100L));
118+
}
119+
finally {
120+
stopWatch.stop();
121+
}
111122

112-
}
123+
return result1;
113124

114-
}
115-
});
125+
}
126+
});
116127

117128
assertTrue("Could not send message", result);
118129
// If the poll blocks in the RDBMS there is no way for the queue to respect the timeout
@@ -130,32 +141,29 @@ public Boolean doInTransaction(TransactionStatus status) {
130141
*
131142
* With the storeLock: It doesn't deadlock as long as the lock is injected into the poller as well.
132143
*/
133-
new TransactionTemplate(transactionManager).execute(new TransactionCallback<Void>() {
144+
new TransactionTemplate(this.transactionManager)
145+
.execute(status -> {
146+
synchronized (this.storeLock) {
147+
148+
try {
149+
stopWatch.start();
150+
this.durable.receive(100L);
151+
return null;
152+
}
153+
finally {
154+
stopWatch.stop();
155+
}
134156

135-
@Override
136-
public Void doInTransaction(TransactionStatus status) {
137-
synchronized (storeLock) {
138-
139-
try {
140-
stopWatch.start();
141-
durable.receive(100L);
142-
return null;
143-
}
144-
finally {
145-
stopWatch.stop();
146157
}
147-
148-
}
149-
}
150-
151-
});
158+
});
152159

153160
// If the poll blocks in the RDBMS there is no way for the queue to respect the timeout
154161
assertTrue("Timed out waiting for receive", stopWatch.getTotalTimeMillis() < 10000);
155162

156163
}
157164

158165
public static class Service {
166+
159167
private static boolean fail = false;
160168

161169
private static List<String> messages = new CopyOnWriteArrayList<String>();
@@ -182,6 +190,7 @@ public String echo(String input) {
182190
}
183191
return input;
184192
}
193+
185194
}
186195

187196
}

0 commit comments

Comments
 (0)