Closed
Description
Describe the bug
The super stream doesn't elect the single active consumer when the consumers are restarted.
The consumers stop consuming and the status is always in waiting
status. see the image:
Reproduction steps
- Create super stream with ten partitions
rabbitmq-streams add_super_stream invoices --partitions 10
- pump it with a few thousand messages (70k is enough)
- stop the producer
- start ten instances of this java client:
public static void main(String[] args) throws IOException {
System.out.println("Connecting...");
Address entryPoint = new Address("127.0.0.1", 5552);
Environment environment = Environment.builder()
// .host(entryPoint.host())
// .port(entryPoint.port())
// .username("test")
// .password("test")
.addressResolver(address -> entryPoint)
.maxConsumersByConnection(1).
build();
String AppName = "reference";
String stream = "invoices";
AtomicInteger consumed = new AtomicInteger();
Date start = new Date();
for (int i = 0; i < 500; i++) {
Map<String, Integer> consumedMap = new HashMap<>();
Consumer consumer = environment.consumerBuilder()
.superStream("invoices")
.name("reference")
.offset(OffsetSpecification.first())
.singleActiveConsumer()
.messageHandler((context, message) -> {
if (consumedMap.containsKey(stream)) {
consumedMap.put(stream, consumedMap.get(stream) + 1);
} else {
consumedMap.put(stream, 1);
}
if (consumedMap.get(stream) % 10 == 0) {
Date end = new Date();
System.out.println("Stream: " + context.stream() + " - Consumed " + consumedMap.get(stream) + " - Time " + (end.getTime() - start.getTime()));
}
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(200, 1000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).build();
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Restarting");
consumer.close();
}
}
- Wait a couple of restarts
- you will have the issue
Expected behavior
One single active consumer has to be active
Additional context
I noticed that the invoices-1
is usually the first portion to have problems.
The invoices-0
partition usually works
The other partitions, at some point, will have the same issue.
RabbitMQ 3.11.11
Java RabbitMQ Stream / Java 0.10.0-SNAPSHOT