Skip to content

Consumer not recovering connection after timeout #630

Closed
@Hendr-ik-a

Description

@Hendr-ik-a

Describe the bug


com.rabbitmq.stream.StreamException: Error while creating stream connection to xxxx:5552. connection timed out after 30000 ms: xxxx/xxxx:5552. This may be due to the usage of a load balancer that makes topology discovery fail. Use a custom AddressResolver or the --load-balancer flag if using StreamPerfTest. See https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#understanding-connection-logic and https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer.
at com.rabbitmq.stream.impl.Utils.lambda$coordinatorClientFactory$10(Utils.java:175) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.<init>(ConsumersCoordinator.java:806) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.<init>(ConsumersCoordinator.java:569) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator.addToManager(ConsumersCoordinator.java:194) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator.access$2000(ConsumersCoordinator.java:56) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.recoverSubscription(ConsumersCoordinator.java:909) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.maybeRecoverSubscription(ConsumersCoordinator.java:876) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.lambda$assignConsumersToStream$12(ConsumersCoordinator.java:845) ~[stream-client-0.17.0.jar!/:0.17.0]

Looking at the implementation of ConsumersCoordinator.recoverSubscription method, it seems like the exception is catched by the general Exception catch block where reassignmentCompleted parameter is set to true which ends the recovery process, even though the error message indicates a connection timeout -

private void recoverSubscription(List<Broker> candidates, SubscriptionTracker tracker) {
      boolean reassignmentCompleted = false;
      while (!reassignmentCompleted) {
        try {
          if (tracker.consumer.isOpen()) {
            Broker broker = pickBroker(candidates);
            LOGGER.debug("Using {} to resume consuming from {}", broker, tracker.stream);
            synchronized (tracker.consumer) {
              if (tracker.consumer.isOpen()) {
                OffsetSpecification offsetSpecification;
                if (tracker.hasReceivedSomething) {
                  offsetSpecification = OffsetSpecification.offset(tracker.offset);
                } else {
                  offsetSpecification = tracker.initialOffsetSpecification;
                }
                addToManager(broker, tracker, offsetSpecification, false);
              }
            }
          } else {
            LOGGER.debug(
                "Not re-assigning consumer {} (stream '{}') because it has been closed",
                tracker.consumer.id(),
                tracker.stream);
          }
          reassignmentCompleted = true;
        } catch (ConnectionStreamException
            | ClientClosedException
            | StreamNotAvailableException e) {
          LOGGER.debug(
              "Consumer {} re-assignment on stream {} timed out or connection closed or stream not available, "
                  + "refreshing candidates and retrying",
              tracker.consumer.id(),
              tracker.stream);
          // maybe not a good candidate, let's refresh and retry for this one
          candidates =
              Utils.callAndMaybeRetry(
                  findBrokersForStream(tracker.stream),
                  ex -> !(ex instanceof StreamDoesNotExistException),
                  recoveryBackOffDelayPolicy(),
                  "Candidate lookup to consume from '%s' (subscription recovery)",
                  tracker.stream);
        } catch (Exception e) {
          LOGGER.warn("Error while re-assigning subscription from stream {}", tracker.stream, e);
          reassignmentCompleted = true;
        }
      }
    }

Reproduction steps

  1. Set up a 3 node rabbitmq stream environment with single active consumer
  2. Make a connection timeout longer than 30000 ms

...

Expected behavior

Client would not stop trying to reconnect the consumer when a timeout occurs.

Additional context

RabbitMQ is running on 3 nodes with a single active consumer. Restarting the nodes works as intended - leader node changes accordingly and consumer recovers after the node restart (Maybe due to shorter timeout period?)

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions