Skip to content

Commit a8f6d25

Browse files
Merge pull request #678 from vikinghawk/recovery_improvements_master
Recovery related improvements - master
2 parents 3db2425 + c775043 commit a8f6d25

File tree

6 files changed

+132
-40
lines changed

6 files changed

+132
-40
lines changed

src/main/java/com/rabbitmq/client/TopologyRecoveryException.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,28 @@
1515

1616
package com.rabbitmq.client;
1717

18+
import com.rabbitmq.client.impl.recovery.RecordedEntity;
19+
1820
/**
1921
* Indicates an exception thrown during topology recovery.
2022
*
2123
* @see com.rabbitmq.client.ConnectionFactory#setTopologyRecoveryEnabled(boolean)
2224
* @since 3.3.0
2325
*/
2426
public class TopologyRecoveryException extends Exception {
27+
28+
private final RecordedEntity recordedEntity;
29+
2530
public TopologyRecoveryException(String message, Throwable cause) {
31+
this(message, cause, null);
32+
}
33+
34+
public TopologyRecoveryException(String message, Throwable cause, final RecordedEntity recordedEntity) {
2635
super(message, cause);
36+
this.recordedEntity = recordedEntity;
37+
}
38+
39+
public RecordedEntity getRecordedEntity() {
40+
return recordedEntity;
2741
}
2842
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -900,7 +900,11 @@ private void recordConsumer(String result,
900900
this.connection.recordConsumer(result, consumer);
901901
}
902902

903-
private void deleteRecordedConsumer(String consumerTag) {
903+
/**
904+
* Delete the recorded consumer from this channel and accompanying connection
905+
* @param consumerTag consumer tag to delete
906+
*/
907+
public void deleteRecordedConsumer(String consumerTag) {
904908
this.consumerTags.remove(consumerTag);
905909
RecordedConsumer c = this.connection.deleteRecordedConsumer(consumerTag);
906910
if (c != null) {

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ private void setupErrorOnWriteListenerForPotentialRecovery() {
143143
});
144144
}
145145

146-
private TopologyRecoveryFilter letAllPassFilter() {
146+
private static TopologyRecoveryFilter letAllPassFilter() {
147147
return new TopologyRecoveryFilter() {};
148148
}
149149

@@ -644,7 +644,7 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
644644
}
645645
}
646646

647-
void recoverChannel(AutorecoveringChannel channel) throws IOException {
647+
public void recoverChannel(AutorecoveringChannel channel) throws IOException {
648648
channel.automaticallyRecover(this, this.delegate);
649649
}
650650

@@ -666,6 +666,38 @@ private void notifyTopologyRecoveryListenersStarted() {
666666
}
667667
}
668668

669+
/**
670+
* Recover a closed channel and all topology (i.e. RecordedEntities) associated to it.
671+
* Any errors will be sent to the {@link #getExceptionHandler()}.
672+
* @param channel channel to recover
673+
* @throws IllegalArgumentException if this channel is not owned by this connection
674+
*/
675+
public void recoverChannelAndTopology(final AutorecoveringChannel channel) {
676+
if (!channels.containsValue(channel)) {
677+
throw new IllegalArgumentException("This channel is not owned by this connection");
678+
}
679+
try {
680+
LOGGER.debug("Recovering channel={}", channel);
681+
recoverChannel(channel);
682+
LOGGER.debug("Recovered channel={}. Now recovering its topology", channel);
683+
Utility.copy(recordedExchanges).values().stream()
684+
.filter(e -> e.getChannel() == channel)
685+
.forEach(e -> recoverExchange(e, false));
686+
Utility.copy(recordedQueues).values().stream()
687+
.filter(q -> q.getChannel() == channel)
688+
.forEach(q -> recoverQueue(q.getName(), q, false));
689+
Utility.copy(recordedBindings).stream()
690+
.filter(b -> b.getChannel() == channel)
691+
.forEach(b -> recoverBinding(b, false));
692+
Utility.copy(consumers).values().stream()
693+
.filter(c -> c.getChannel() == channel)
694+
.forEach(c -> recoverConsumer(c.getConsumerTag(), c, false));
695+
LOGGER.debug("Recovered topology for channel={}", channel);
696+
} catch (Exception e) {
697+
getExceptionHandler().handleChannelRecoveryException(channel, e);
698+
}
699+
}
700+
669701
private void recoverTopology(final ExecutorService executor) {
670702
// The recovery sequence is the following:
671703
// 1. Recover exchanges
@@ -704,7 +736,7 @@ private void recoverTopology(final ExecutorService executor) {
704736
}
705737
}
706738

707-
private void recoverExchange(RecordedExchange x, boolean retry) {
739+
public void recoverExchange(RecordedExchange x, boolean retry) {
708740
// recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
709741
try {
710742
if (topologyRecoveryFilter.filterExchange(x)) {
@@ -722,7 +754,7 @@ private void recoverExchange(RecordedExchange x, boolean retry) {
722754
} catch (Exception cause) {
723755
final String message = "Caught an exception while recovering exchange " + x.getName() +
724756
": " + cause.getMessage();
725-
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
757+
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, x);
726758
this.getExceptionHandler().handleTopologyRecoveryException(delegate, x.getDelegateChannel(), e);
727759
}
728760
}
@@ -766,12 +798,12 @@ public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
766798
} catch (Exception cause) {
767799
final String message = "Caught an exception while recovering queue " + oldName +
768800
": " + cause.getMessage();
769-
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
801+
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, q);
770802
this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e);
771803
}
772804
}
773805

774-
private void recoverBinding(RecordedBinding b, boolean retry) {
806+
public void recoverBinding(RecordedBinding b, boolean retry) {
775807
try {
776808
if (this.topologyRecoveryFilter.filterBinding(b)) {
777809
if (retry) {
@@ -788,7 +820,7 @@ private void recoverBinding(RecordedBinding b, boolean retry) {
788820
} catch (Exception cause) {
789821
String message = "Caught an exception while recovering binding between " + b.getSource() +
790822
" and " + b.getDestination() + ": " + cause.getMessage();
791-
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
823+
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, b);
792824
this.getExceptionHandler().handleTopologyRecoveryException(delegate, b.getDelegateChannel(), e);
793825
}
794826
}
@@ -800,7 +832,7 @@ public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean
800832
String newTag = null;
801833
if (retry) {
802834
final RecordedConsumer entity = consumer;
803-
RetryResult retryResult = wrapRetryIfNecessary(consumer, () -> entity.recover());
835+
RetryResult retryResult = wrapRetryIfNecessary(consumer, entity::recover);
804836
consumer = (RecordedConsumer) retryResult.getRecordedEntity();
805837
newTag = (String) retryResult.getResult();
806838
} else {
@@ -824,7 +856,7 @@ public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean
824856
} catch (Exception cause) {
825857
final String message = "Caught an exception while recovering consumer " + tag +
826858
": " + cause.getMessage();
827-
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
859+
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, consumer);
828860
this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e);
829861
}
830862
}
@@ -889,14 +921,10 @@ private void recoverEntitiesAsynchronously(ExecutorService executor, Collection<
889921

890922
private <E extends RecordedEntity> List<Callable<Object>> groupEntitiesByChannel(final Collection<E> entities) {
891923
// map entities by channel
892-
final Map<AutorecoveringChannel, List<E>> map = new LinkedHashMap<AutorecoveringChannel, List<E>>();
924+
final Map<AutorecoveringChannel, List<E>> map = new LinkedHashMap<>();
893925
for (final E entity : entities) {
894926
final AutorecoveringChannel channel = entity.getChannel();
895-
List<E> list = map.get(channel);
896-
if (list == null) {
897-
map.put(channel, list = new ArrayList<E>());
898-
}
899-
list.add(entity);
927+
map.computeIfAbsent(channel, c -> new ArrayList<>()).add(entity);
900928
}
901929
// now create a runnable per channel
902930
final List<Callable<Object>> callables = new ArrayList<>();
@@ -1083,7 +1111,7 @@ boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String q
10831111
}
10841112

10851113
Set<RecordedBinding> removeBindingsWithDestination(String s) {
1086-
final Set<RecordedBinding> result = new HashSet<RecordedBinding>();
1114+
final Set<RecordedBinding> result = new LinkedHashSet<>();
10871115
synchronized (this.recordedBindings) {
10881116
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) {
10891117
RecordedBinding b = it.next();

src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,19 @@ public class DefaultRetryHandler implements RetryHandler {
4040

4141
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRetryHandler.class);
4242

43-
private final BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition;
44-
private final BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition;
45-
private final BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition;
46-
private final BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition;
43+
protected final BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition;
44+
protected final BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition;
45+
protected final BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition;
46+
protected final BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition;
4747

48-
private final RetryOperation<?> queueRecoveryRetryOperation;
49-
private final RetryOperation<?> exchangeRecoveryRetryOperation;
50-
private final RetryOperation<?> bindingRecoveryRetryOperation;
51-
private final RetryOperation<?> consumerRecoveryRetryOperation;
48+
protected final RetryOperation<?> queueRecoveryRetryOperation;
49+
protected final RetryOperation<?> exchangeRecoveryRetryOperation;
50+
protected final RetryOperation<?> bindingRecoveryRetryOperation;
51+
protected final RetryOperation<?> consumerRecoveryRetryOperation;
5252

53-
private final int retryAttempts;
53+
protected final int retryAttempts;
5454

55-
private final BackoffPolicy backoffPolicy;
55+
protected final BackoffPolicy backoffPolicy;
5656

5757
public DefaultRetryHandler(BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition,
5858
BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition,

src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,19 @@
3030
*/
3131
public class TopologyRecoveryRetryHandlerBuilder {
3232

33-
private BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition = (q, e) -> false;
34-
private BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition = (ex, e) -> false;
35-
private BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition = (b, e) -> false;
36-
private BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition = (c, e) -> false;
33+
protected BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition = (q, e) -> false;
34+
protected BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition = (ex, e) -> false;
35+
protected BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition = (b, e) -> false;
36+
protected BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition = (c, e) -> false;
3737

38-
private DefaultRetryHandler.RetryOperation<?> queueRecoveryRetryOperation = context -> null;
39-
private DefaultRetryHandler.RetryOperation<?> exchangeRecoveryRetryOperation = context -> null;
40-
private DefaultRetryHandler.RetryOperation<?> bindingRecoveryRetryOperation = context -> null;
41-
private DefaultRetryHandler.RetryOperation<?> consumerRecoveryRetryOperation = context -> null;
38+
protected DefaultRetryHandler.RetryOperation<?> queueRecoveryRetryOperation = context -> null;
39+
protected DefaultRetryHandler.RetryOperation<?> exchangeRecoveryRetryOperation = context -> null;
40+
protected DefaultRetryHandler.RetryOperation<?> bindingRecoveryRetryOperation = context -> null;
41+
protected DefaultRetryHandler.RetryOperation<?> consumerRecoveryRetryOperation = context -> null;
4242

43-
private int retryAttempts = 2;
43+
protected int retryAttempts = 2;
4444

45-
private BackoffPolicy backoffPolicy = nbAttempts -> {
45+
protected BackoffPolicy backoffPolicy = nbAttempts -> {
4646
};
4747

4848
public static TopologyRecoveryRetryHandlerBuilder builder() {

src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,18 @@ public abstract class TopologyRecoveryRetryLogic {
5555
}
5656
return null;
5757
};
58+
59+
/**
60+
* Recover a queue
61+
*/
62+
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_QUEUE = context -> {
63+
if (context.entity() instanceof RecordedQueue) {
64+
final RecordedQueue recordedQueue = context.queue();
65+
AutorecoveringConnection connection = context.connection();
66+
connection.recoverQueue(recordedQueue.getName(), recordedQueue, false);
67+
}
68+
return null;
69+
};
5870

5971
/**
6072
* Recover the destination queue of a binding.
@@ -138,18 +150,52 @@ public abstract class TopologyRecoveryRetryLogic {
138150
* Recover a consumer.
139151
*/
140152
public static final DefaultRetryHandler.RetryOperation<String> RECOVER_CONSUMER = context -> context.consumer().recover();
153+
154+
/**
155+
* Recover earlier consumers that share the same channel as this retry context
156+
*/
157+
public static final DefaultRetryHandler.RetryOperation<String> RECOVER_PREVIOUS_CONSUMERS = context -> {
158+
if (context.entity() instanceof RecordedConsumer) {
159+
// recover all consumers for the same channel that were recovered before this current
160+
// consumer. need to do this incase some consumers had already been recovered
161+
// successfully on a different queue before this one failed
162+
final AutorecoveringChannel channel = context.consumer().getChannel();
163+
for (RecordedConsumer consumer : Utility.copy(context.connection().getRecordedConsumers()).values()) {
164+
if (consumer == context.entity()) {
165+
break;
166+
} else if (consumer.getChannel() == channel) {
167+
final RetryContext retryContext = new RetryContext(consumer, context.exception(), context.connection());
168+
RECOVER_CONSUMER_QUEUE.call(retryContext);
169+
consumer.recover();
170+
RECOVER_CONSUMER_QUEUE_BINDINGS.call(retryContext);
171+
}
172+
}
173+
}
174+
return null;
175+
};
141176

142177
/**
143178
* Pre-configured {@link TopologyRecoveryRetryHandlerBuilder} that retries recovery of bindings and consumers
144179
* when their respective queue is not found.
180+
*
145181
* This retry handler can be useful for long recovery processes, whereby auto-delete queues
146182
* can be deleted between queue recovery and binding/consumer recovery.
183+
*
184+
* Also useful to retry channel-closed 404 errors that may arise with auto-delete queues during a cluster cycle.
147185
*/
148186
public static final TopologyRecoveryRetryHandlerBuilder RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
187+
.queueRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
149188
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
150189
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
151-
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING)
190+
.queueRecoveryRetryOperation(RECOVER_CHANNEL
191+
.andThen(RECOVER_QUEUE))
192+
.bindingRecoveryRetryOperation(RECOVER_CHANNEL
193+
.andThen(RECOVER_BINDING_QUEUE)
194+
.andThen(RECOVER_BINDING)
152195
.andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS))
153-
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)
154-
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)));
196+
.consumerRecoveryRetryOperation(RECOVER_CHANNEL
197+
.andThen(RECOVER_CONSUMER_QUEUE)
198+
.andThen(RECOVER_CONSUMER)
199+
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)
200+
.andThen(RECOVER_PREVIOUS_CONSUMERS));
155201
}

0 commit comments

Comments
 (0)