Closed
Description
Describe the bug
com.rabbitmq.stream.StreamException: Error while creating stream connection to xxxx:5552. connection timed out after 30000 ms: xxxx/xxxx:5552. This may be due to the usage of a load balancer that makes topology discovery fail. Use a custom AddressResolver or the --load-balancer flag if using StreamPerfTest. See https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#understanding-connection-logic and https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer.
at com.rabbitmq.stream.impl.Utils.lambda$coordinatorClientFactory$10(Utils.java:175) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.<init>(ConsumersCoordinator.java:806) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.<init>(ConsumersCoordinator.java:569) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator.addToManager(ConsumersCoordinator.java:194) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator.access$2000(ConsumersCoordinator.java:56) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.recoverSubscription(ConsumersCoordinator.java:909) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.maybeRecoverSubscription(ConsumersCoordinator.java:876) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.lambda$assignConsumersToStream$12(ConsumersCoordinator.java:845) ~[stream-client-0.17.0.jar!/:0.17.0]
Looking at the implementation of ConsumersCoordinator.recoverSubscription method, it seems like the exception is catched by the general Exception catch block where reassignmentCompleted parameter is set to true which ends the recovery process, even though the error message indicates a connection timeout -
private void recoverSubscription(List<Broker> candidates, SubscriptionTracker tracker) {
boolean reassignmentCompleted = false;
while (!reassignmentCompleted) {
try {
if (tracker.consumer.isOpen()) {
Broker broker = pickBroker(candidates);
LOGGER.debug("Using {} to resume consuming from {}", broker, tracker.stream);
synchronized (tracker.consumer) {
if (tracker.consumer.isOpen()) {
OffsetSpecification offsetSpecification;
if (tracker.hasReceivedSomething) {
offsetSpecification = OffsetSpecification.offset(tracker.offset);
} else {
offsetSpecification = tracker.initialOffsetSpecification;
}
addToManager(broker, tracker, offsetSpecification, false);
}
}
} else {
LOGGER.debug(
"Not re-assigning consumer {} (stream '{}') because it has been closed",
tracker.consumer.id(),
tracker.stream);
}
reassignmentCompleted = true;
} catch (ConnectionStreamException
| ClientClosedException
| StreamNotAvailableException e) {
LOGGER.debug(
"Consumer {} re-assignment on stream {} timed out or connection closed or stream not available, "
+ "refreshing candidates and retrying",
tracker.consumer.id(),
tracker.stream);
// maybe not a good candidate, let's refresh and retry for this one
candidates =
Utils.callAndMaybeRetry(
findBrokersForStream(tracker.stream),
ex -> !(ex instanceof StreamDoesNotExistException),
recoveryBackOffDelayPolicy(),
"Candidate lookup to consume from '%s' (subscription recovery)",
tracker.stream);
} catch (Exception e) {
LOGGER.warn("Error while re-assigning subscription from stream {}", tracker.stream, e);
reassignmentCompleted = true;
}
}
}
Reproduction steps
- Set up a 3 node rabbitmq stream environment with single active consumer
- Make a connection timeout longer than 30000 ms
...
Expected behavior
Client would not stop trying to reconnect the consumer when a timeout occurs.
Additional context
RabbitMQ is running on 3 nodes with a single active consumer. Restarting the nodes works as intended - leader node changes accordingly and consumer recovers after the node restart (Maybe due to shorter timeout period?)