Skip to content

Commit b24a9be

Browse files
authored
Merge pull request #403 from vikinghawk/5.x.x-stable
tweaks to recovery retry
2 parents 8b8c58a + b88f7f5 commit b24a9be

File tree

3 files changed

+10
-13
lines changed

3 files changed

+10
-13
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,11 @@ public RetryResult retryConsumerRecovery(RetryContext context) throws Exception
103103

104104
protected RetryResult doRetry(BiPredicate<RecordedEntity, Exception> condition, RetryOperation<?> operation, RecordedEntity entity, RetryContext context)
105105
throws Exception {
106-
log(entity, context.exception());
107106
int attempts = 0;
108107
Exception exception = context.exception();
109108
while (attempts < retryAttempts) {
110109
if (condition.test(entity, exception)) {
110+
log(entity, exception, attempts);
111111
backoffPolicy.backoff(attempts + 1);
112112
try {
113113
Object result = operation.call(context);
@@ -122,11 +122,11 @@ protected RetryResult doRetry(BiPredicate<RecordedEntity, Exception> condition,
122122
throw exception;
123123
}
124124
}
125-
throw context.exception();
125+
throw exception;
126126
}
127127

128-
protected void log(RecordedEntity entity, Exception exception) {
129-
LOGGER.info("Error while recovering {}, retrying with {} attempt(s).", entity, retryAttempts, exception);
128+
protected void log(RecordedEntity entity, Exception exception, int attempts) {
129+
LOGGER.info("Error while recovering {}, retrying with {} more attempt(s).", entity, retryAttempts - attempts, exception);
130130
}
131131

132132
public interface RetryOperation<T> {

src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717

1818
import com.rabbitmq.client.AMQP;
1919
import com.rabbitmq.client.ShutdownSignalException;
20+
import com.rabbitmq.utility.Utility;
2021

21-
import java.util.List;
2222
import java.util.function.BiPredicate;
23-
import java.util.function.Predicate;
24-
2523
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder;
2624

2725
/**
@@ -106,7 +104,7 @@ public abstract class TopologyRecoveryRetryLogic {
106104
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_CONSUMER_QUEUE_BINDINGS = context -> {
107105
if (context.entity() instanceof RecordedConsumer) {
108106
String queue = context.consumer().getQueue();
109-
for (RecordedBinding recordedBinding : context.connection().getRecordedBindings()) {
107+
for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) {
110108
if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) {
111109
recordedBinding.recover();
112110
}
@@ -121,16 +119,15 @@ public abstract class TopologyRecoveryRetryLogic {
121119
public static final DefaultRetryHandler.RetryOperation<String> RECOVER_CONSUMER = context -> context.consumer().recover();
122120

123121
/**
124-
* Pre-configured {@link DefaultRetryHandler} that retries recovery of bindings and consumers
122+
* Pre-configured {@link TopologyRecoveryRetryHandlerBuilder} that retries recovery of bindings and consumers
125123
* when their respective queue is not found.
126124
* This retry handler can be useful for long recovery processes, whereby auto-delete queues
127125
* can be deleted between queue recovery and binding/consumer recovery.
128126
*/
129-
public static final RetryHandler RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
127+
public static final TopologyRecoveryRetryHandlerBuilder RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
130128
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
131129
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
132130
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING))
133131
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)
134-
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)))
135-
.build();
132+
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)));
136133
}

src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void topologyRecoveryRetry() throws Exception {
5151
@Override
5252
protected ConnectionFactory newConnectionFactory() {
5353
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
54-
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER);
54+
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.build());
5555
connectionFactory.setNetworkRecoveryInterval(1000);
5656
return connectionFactory;
5757
}

0 commit comments

Comments
 (0)