Skip to content

Commit a9eb922

Browse files
garyrussellartembilan
authored andcommitted
Fix AMQP MessageSource Tests
https://build.spring.io/browse/INT-MASTER-983/ Race between the poller and template to get the requeued message. Also add trace logging to the acknowledgment. **cherry-pick to 5.0.x**
1 parent 2975255 commit a9eb922

File tree

2 files changed

+37
-2
lines changed

2 files changed

+37
-2
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpMessageSource.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import java.nio.charset.StandardCharsets;
2121
import java.util.Map;
2222

23+
import org.apache.commons.logging.Log;
24+
import org.apache.commons.logging.LogFactory;
25+
2326
import org.springframework.amqp.core.MessageProperties;
2427
import org.springframework.amqp.rabbit.connection.Connection;
2528
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -199,6 +202,8 @@ public AcknowledgmentCallback createCallback(AmqpAckInfo info) {
199202

200203
public static class AmqpAckCallback implements AcknowledgmentCallback {
201204

205+
private static Log logger = LogFactory.getLog(AmqpAckCallback.class);
206+
202207
private final AmqpAckInfo ackInfo;
203208

204209
private boolean acknowledged;
@@ -235,6 +240,9 @@ public boolean isAutoAck() {
235240
@Override
236241
public void acknowledge(Status status) {
237242
Assert.notNull(status, "'status' cannot be null");
243+
if (logger.isTraceEnabled()) {
244+
logger.trace("acknowledge(" + status.name() + ") for " + this);
245+
}
238246
try {
239247
long deliveryTag = this.ackInfo.getGetResponse().getEnvelope().getDeliveryTag();
240248
switch (status) {
@@ -264,6 +272,12 @@ public void acknowledge(Status status) {
264272
}
265273
}
266274

275+
@Override
276+
public String toString() {
277+
return "AmqpAckCallback [ackInfo=" + this.ackInfo + ", acknowledged=" + this.acknowledged
278+
+ ", autoAckEnabled=" + this.autoAckEnabled + "]";
279+
}
280+
267281
}
268282

269283
/**
@@ -302,6 +316,12 @@ public GetResponse getGetResponse() {
302316
return this.getResponse;
303317
}
304318

319+
@Override
320+
public String toString() {
321+
return "AmqpAckInfo [connection=" + this.connection + ", channel=" + this.channel + ", transacted="
322+
+ this.transacted + ", getResponse=" + this.getResponse + "]";
323+
}
324+
305325
}
306326

307327
}

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceIntegrationTests.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.springframework.integration.dsl.IntegrationFlow;
6161
import org.springframework.integration.dsl.IntegrationFlows;
6262
import org.springframework.integration.dsl.Pollers;
63+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
6364
import org.springframework.messaging.handler.annotation.Header;
6465
import org.springframework.messaging.support.MessageBuilder;
6566
import org.springframework.test.annotation.DirtiesContext;
@@ -132,8 +133,9 @@ public void testImplicitNackThenAck() throws Exception {
132133
assertThat(this.config.fromDsl, equalTo("bar"));
133134
assertThat(this.config.fromInterceptedSource, equalTo("BAZ"));
134135
assertNull(template.receive(NOAUTOACK_QUEUE));
136+
assertThat(this.config.requeueLatch.getCount(), equalTo(1L));
135137
this.config.callback.acknowledge(Status.REQUEUE);
136-
assertNotNull(template.receive(NOAUTOACK_QUEUE, 10_000));
138+
assertTrue(this.config.requeueLatch.await(10, TimeUnit.SECONDS));
137139
}
138140

139141
@Configuration
@@ -142,6 +144,8 @@ public static class Config {
142144

143145
private final CountDownLatch latch = new CountDownLatch(5);
144146

147+
private final CountDownLatch requeueLatch = new CountDownLatch(2);
148+
145149
private volatile String received;
146150

147151
private volatile Object fromDsl;
@@ -174,7 +178,18 @@ public void in(String in) {
174178
@InboundChannelAdapter(channel = "noAutoAck", poller = @Poller(fixedDelay = "100"), autoStartup = "false")
175179
@Bean
176180
public MessageSource<?> noAutoAckSource() {
177-
return new AmqpMessageSource(connectionFactory(), NOAUTOACK_QUEUE);
181+
return new AmqpMessageSource(connectionFactory(), NOAUTOACK_QUEUE) {
182+
183+
@Override
184+
protected AbstractIntegrationMessageBuilder<Object> doReceive() {
185+
AbstractIntegrationMessageBuilder<Object> builder = super.doReceive();
186+
if (builder != null) {
187+
Config.this.requeueLatch.countDown();
188+
}
189+
return builder;
190+
}
191+
192+
};
178193
}
179194

180195
@ServiceActivator(inputChannel = "noAutoAck")

0 commit comments

Comments
 (0)