Skip to content

SuperStream doesn't elect the single active consumer #7743

Closed
@Gsantomaggio

Description

@Gsantomaggio

Describe the bug

The super stream doesn't elect the single active consumer when the consumers are restarted.
The consumers stop consuming and the status is always in waiting status. see the image:
Screenshot 2023-03-27 at 10 42 15

Reproduction steps

  1. Create super stream with ten partitions rabbitmq-streams add_super_stream invoices --partitions 10
  2. pump it with a few thousand messages (70k is enough)
  3. stop the producer
  4. start ten instances of this java client:
    public static void main(String[] args) throws IOException {

        System.out.println("Connecting...");
        Address entryPoint = new Address("127.0.0.1", 5552);


        Environment environment = Environment.builder()
//                .host(entryPoint.host())
//                .port(entryPoint.port())
//                .username("test")
//                .password("test")
                .addressResolver(address -> entryPoint)
                .maxConsumersByConnection(1).
                build();
        String AppName = "reference";
        String stream = "invoices";

        AtomicInteger consumed = new AtomicInteger();
        Date start = new Date();
        for (int i = 0; i < 500; i++) {


            Map<String, Integer> consumedMap = new HashMap<>();
            Consumer consumer = environment.consumerBuilder()
                    .superStream("invoices")
                    .name("reference")
                    .offset(OffsetSpecification.first())
                    .singleActiveConsumer()
                    .messageHandler((context, message) -> {

                        if (consumedMap.containsKey(stream)) {
                            consumedMap.put(stream, consumedMap.get(stream) + 1);
                        } else {
                            consumedMap.put(stream, 1);
                        }

                        if (consumedMap.get(stream) % 10 == 0) {
                            Date end = new Date();
                            System.out.println("Stream: " + context.stream() + " - Consumed " + consumedMap.get(stream) + " - Time " + (end.getTime() - start.getTime()));
                        }


                        try {
                            Thread.sleep(ThreadLocalRandom.current().nextInt(200, 1000));
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }

                    }).build();

            try {
                Thread.sleep(60000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("Restarting");

            consumer.close();
        }

    }
  1. Wait a couple of restarts
  2. you will have the issue

Expected behavior

One single active consumer has to be active

Additional context

I noticed that the invoices-1 is usually the first portion to have problems.
The invoices-0 partition usually works
The other partitions, at some point, will have the same issue.

RabbitMQ 3.11.11
Java RabbitMQ Stream / Java 0.10.0-SNAPSHOT

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions