Skip to content

GH-2482: Option for Containers to Stop Immediately #2483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,12 @@ public abstract class AbstractMessageListenerContainer extends ObservableListene

private final Map<String, Object> consumerArgs = new HashMap<>();

private ContainerDelegate proxy = this.delegate;

private final AtomicBoolean logDeclarationException = new AtomicBoolean(true);

protected final AtomicBoolean stopNow = new AtomicBoolean(); // NOSONAR

private ContainerDelegate proxy = this.delegate;

private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;

private ApplicationEventPublisher applicationEventPublisher;
Expand Down Expand Up @@ -245,6 +247,8 @@ public abstract class AbstractMessageListenerContainer extends ObservableListene
@Nullable
private RabbitListenerObservationConvention observationConvention;

private boolean forceStop;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -1153,6 +1157,25 @@ protected MessageAckListener getMessageAckListener() {
return this.messageAckListener;
}

/**
* Stop container after current message(s) are processed and requeue any prefetched.
* @return true to stop when current message(s) are processed.
* @since 2.4.14
*/
protected boolean isForceStop() {
return this.forceStop;
}

/**
* Set to true to stop the container after the current message(s) are processed and
* requeue any prefetched. Useful when using exclusive or single-active consumers.
* @param forceStop true to stop when current messsage(s) are processed.
* @since 2.4.14
*/
public void setForceStop(boolean forceStop) {
this.forceStop = forceStop;
}

/**
* Delegates to {@link #validateConfiguration()} and {@link #initialize()}.
*/
Expand Down Expand Up @@ -1302,7 +1325,21 @@ protected void setNotRunning() {
* A shared Rabbit Connection, if any, will automatically be closed <i>afterwards</i>.
* @see #shutdown()
*/
protected abstract void doShutdown();
protected void doShutdown() {
shutdownAndWaitOrCallback(null);
}

@Override
public void stop(Runnable callback) {
shutdownAndWaitOrCallback(() -> {
setNotRunning();
callback.run();
});
}

protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
}


/**
* @return Whether this container is currently active, that is, whether it has been set up but not shut down yet.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -784,11 +784,17 @@ public synchronized void stop() {
if (logger.isDebugEnabled()) {
logger.debug("Closing Rabbit Channel: " + this.channel);
}
RabbitUtils.setPhysicalCloseRequired(this.channel, true);
ConnectionFactoryUtils.releaseResources(this.resourceHolder);
this.deliveryTags.clear();
this.consumers.clear();
this.queue.clear(); // in case we still have a client thread blocked
forceCloseAndClearQueue();
}

public void forceCloseAndClearQueue() {
if (this.channel != null && this.channel.isOpen()) {
RabbitUtils.setPhysicalCloseRequired(this.channel, true);
ConnectionFactoryUtils.releaseResources(this.resourceHolder);
this.deliveryTags.clear();
this.consumers.clear();
this.queue.clear(); // in case we still have a client thread blocked
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -813,7 +813,7 @@ else if (this.logger.isWarnEnabled()) {
}

@Override
protected void doShutdown() {
protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
LinkedList<SimpleConsumer> canceledConsumers = null;
boolean waitForConsumers = false;
synchronized (this.consumersMonitor) {
Expand All @@ -826,44 +826,66 @@ protected void doShutdown() {
}
}
if (waitForConsumers) {
try {
if (this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
this.logger.info("Successfully waited for consumers to finish.");
}
else {
this.logger.info("Consumers not finished.");
if (isForceCloseChannel()) {
canceledConsumers.forEach(consumer -> {
String eventMessage = "Closing channel for unresponsive consumer: " + consumer;
if (logger.isWarnEnabled()) {
logger.warn(eventMessage);
}
consumer.cancelConsumer(eventMessage);
});
LinkedList<SimpleConsumer> consumersToWait = canceledConsumers;
Runnable awaitShutdown = () -> {
try {
if (this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
this.logger.info("Successfully waited for consumers to finish.");
}
else {
this.logger.info("Consumers not finished.");
if (isForceCloseChannel() || this.stopNow.get()) {
consumersToWait.forEach(consumer -> {
String eventMessage = "Closing channel for unresponsive consumer: " + consumer;
if (logger.isWarnEnabled()) {
logger.warn(eventMessage);
}
consumer.cancelConsumer(eventMessage);
});
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
this.logger.warn("Interrupted waiting for consumers. Continuing with shutdown.");
}
finally {
this.startedLatch = new CountDownLatch(1);
this.started = false;
this.aborted = false;
this.hasStopped = true;
}
this.stopNow.set(false);
runCallbackIfNotNull(callback);
};
if (callback == null) {
awaitShutdown.run();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
this.logger.warn("Interrupted waiting for consumers. Continuing with shutdown.");
}
finally {
this.startedLatch = new CountDownLatch(1);
this.started = false;
this.aborted = false;
this.hasStopped = true;
else {
getTaskExecutor().execute(awaitShutdown);
}
}
}

private void runCallbackIfNotNull(@Nullable Runnable callback) {
if (callback != null) {
callback.run();
}
}

/**
* Must hold this.consumersMonitor.
* @param consumers a copy of this.consumers.
*/
private void actualShutDown(List<SimpleConsumer> consumers) {
Assert.state(getTaskExecutor() != null, "Cannot shut down if not initialized");
this.logger.debug("Shutting down");
consumers.forEach(this::cancelConsumer);
if (isForceStop()) {
this.stopNow.set(true);
}
else {
consumers.forEach(this::cancelConsumer);
}
this.consumers.clear();
this.consumersByQueue.clear();
this.logger.debug("All consumers canceled");
Expand Down Expand Up @@ -1031,6 +1053,10 @@ int incrementAndGetEpoch() {
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) {

if (!getChannel().isOpen()) {
this.logger.debug("Discarding prefetch, channel closed");
return;
}
MessageProperties messageProperties =
getMessagePropertiesConverter().toMessageProperties(properties, envelope, "UTF-8");
messageProperties.setConsumerTag(consumerTag);
Expand Down Expand Up @@ -1072,6 +1098,9 @@ public void handleDelivery(String consumerTag, Envelope envelope,
// NOSONAR
}
}
if (DirectMessageListenerContainer.this.stopNow.get()) {
closeChannel();
}
}

private void executeListenerInTransaction(Object data, long deliveryTag) {
Expand Down Expand Up @@ -1308,11 +1337,15 @@ void cancelConsumer(final String eventMessage) {
}

private void finalizeConsumer() {
closeChannel();
DirectMessageListenerContainer.this.cancellationLock.release(this);
consumerRemoved(this);
}

private void closeChannel() {
RabbitUtils.setPhysicalCloseRequired(getChannel(), true);
RabbitUtils.closeChannel(getChannel());
RabbitUtils.closeConnection(this.connection);
DirectMessageListenerContainer.this.cancellationLock.release(this);
consumerRemoved(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,19 +607,7 @@ private void waitForConsumersToStart(Set<AsyncMessageProcessingConsumer> process
}

@Override
protected void doShutdown() {
shutdownAndWaitOrCallback(null);
}

@Override
public void stop(Runnable callback) {
shutdownAndWaitOrCallback(() -> {
setNotRunning();
callback.run();
});
}

private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
Thread thread = this.containerStoppingForAbort.get();
if (thread != null && !thread.equals(Thread.currentThread())) {
logger.info("Shutdown ignored - container is stopping due to an aborted consumer");
Expand All @@ -631,9 +619,14 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
if (isForceStop()) {
this.stopNow.set(true);
}
while (consumerIterator.hasNext()) {
BlockingQueueConsumer consumer = consumerIterator.next();
consumer.basicCancel(true);
if (!isForceStop()) {
consumer.basicCancel(true);
}
canceledConsumers.add(consumer);
consumerIterator.remove();
if (consumer.declaring) {
Expand All @@ -657,7 +650,7 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
}
else {
logger.info("Workers not finished.");
if (isForceCloseChannel()) {
if (isForceCloseChannel() || this.stopNow.get()) {
canceledConsumers.forEach(consumer -> {
if (logger.isWarnEnabled()) {
logger.warn("Closing channel for unresponsive consumer: " + consumer);
Expand All @@ -676,7 +669,7 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
this.consumers = null;
this.cancellationLock.deactivate();
}

this.stopNow.set(false);
runCallbackIfNotNull(callback);
};
if (callback == null) {
Expand Down Expand Up @@ -1323,6 +1316,10 @@ public void run() { // NOSONAR - line count

private void mainLoop() throws Exception { // NOSONAR Exception
try {
if (SimpleMessageListenerContainer.this.stopNow.get()) {
this.consumer.forceCloseAndClearQueue();
return;
}
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
checkAdjust(receivedOk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueInformation;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
Expand Down Expand Up @@ -90,6 +91,7 @@
*/
@RabbitAvailable(queues = { DirectMessageListenerContainerIntegrationTests.Q1,
DirectMessageListenerContainerIntegrationTests.Q2,
DirectMessageListenerContainerIntegrationTests.Q3,
DirectMessageListenerContainerIntegrationTests.EQ1,
DirectMessageListenerContainerIntegrationTests.EQ2,
DirectMessageListenerContainerIntegrationTests.DLQ1 })
Expand All @@ -102,6 +104,8 @@ public class DirectMessageListenerContainerIntegrationTests {

public static final String Q2 = "testQ2.DirectMessageListenerContainerIntegrationTests";

public static final String Q3 = "testQ3.DirectMessageListenerContainerIntegrationTests";

public static final String EQ1 = "eventTestQ1.DirectMessageListenerContainerIntegrationTests";

public static final String EQ2 = "eventTestQ2.DirectMessageListenerContainerIntegrationTests";
Expand Down Expand Up @@ -792,6 +796,48 @@ public void onMessage(Message message) {
assertThat(ackDeliveryTag.get()).isEqualTo(1);
}

@Test
void forceStop() {
CountDownLatch latch1 = new CountDownLatch(1);
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
DirectMessageListenerContainer container = new DirectMessageListenerContainer(cf);
container.setMessageListener((ChannelAwareMessageListener) (msg, chan) -> {
latch1.await(10, TimeUnit.SECONDS);
});
RabbitTemplate template = new RabbitTemplate(cf);
try {
container.setQueueNames(Q3);
container.setForceStop(true);
template.convertAndSend(Q3, "one");
template.convertAndSend(Q3, "two");
template.convertAndSend(Q3, "three");
template.convertAndSend(Q3, "four");
template.convertAndSend(Q3, "five");
await().untilAsserted(() -> {
QueueInformation queueInfo = admin.getQueueInfo(Q3);
assertThat(queueInfo).isNotNull();
assertThat(queueInfo.getMessageCount()).isEqualTo(5);
});
container.start();
await().untilAsserted(() -> {
QueueInformation queueInfo = admin.getQueueInfo(Q3);
assertThat(queueInfo).isNotNull();
assertThat(queueInfo.getMessageCount()).isEqualTo(0);
});
container.stop(() -> {
});
latch1.countDown();
await().untilAsserted(() -> {
QueueInformation queueInfo = admin.getQueueInfo(Q3);
assertThat(queueInfo).isNotNull();
assertThat(queueInfo.getMessageCount()).isEqualTo(4);
});
}
finally {
container.stop();
}
}

@Test
public void testMessageAckListenerWithBatchAck() throws Exception {
final AtomicInteger calledTimes = new AtomicInteger();
Expand Down
Loading