Skip to content

GH-2302: Enable consumer seek only on matching group Id #3318

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onPartitionsRevoked(Collection<TopicPartition> partitions);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
----
Expand Down Expand Up @@ -49,6 +49,8 @@ void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);

void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);

String getGroupId();
----

The two different variants of the `seek` methods provide a way to seek to an arbitrary offset.
Expand Down Expand Up @@ -232,4 +234,11 @@ public class SomeOtherBean {

----

As of version 3.3, a new method `getGroupId()` was introduced in the `ConsumerSeekAware.ConsumerSeekCallback` interface.
This method is particularly useful when you need to identify the consumer group associated with a specific seek callback.

NOTE: When using a class that extends `AbstractConsumerSeekAware`, a seek operation performed in one listener may impact all listeners in the same class.
This might not always be the desired behavior.
To address this, you can use the `getGroupId()` method provided by the callback.
This allows you to perform seek operations selectively, targeting only the consumer group of interest.

Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
This section covers the changes made from version 3.2 to version 3.3.
For changes in earlier version, see xref:appendix/change-history.adoc[Change History].


[[x33-dlt-topic-naming]]
=== DLT Topic Naming Convention

The naming convention for DLT topics has been standardized to use the "-dlt" suffix consistently. This change ensures compatibility and avoids conflicts when transitioning between different retry solutions. Users who wish to retain the ".DLT" suffix behavior need to opt-in explicitly by setting the appropriate DLT name property.

[[x33-seek-with-group-id]]
=== Enhanced Seek Operations for Consumer Groups


A new method, `getGroupId()`, has been added to the `ConsumerSeekCallback` interface.
This method allows for more selective seek operations by targeting only the desired consumer group.
For more details, see xref:kafka/seek.adoc#seek[Seek API Docs].


Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@

import org.apache.kafka.common.TopicPartition;

import org.springframework.lang.Nullable;

/**
* Listeners that implement this interface are provided with a
* {@link ConsumerSeekCallback} which can be used to perform a
* seek operation.
*
* @author Gary Russell
* @author Soby Chacko
* @author Borahm Lee
* @since 1.1
*
*/
public interface ConsumerSeekAware {

Expand Down Expand Up @@ -229,6 +231,17 @@ default void seekToEnd(Collection<TopicPartition> partitions) {
*/
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);

/**
* Retrieve the group ID associated with this consumer seek callback, if available.
* This method returns {@code null} by default, indicating that the group ID is not specified.
* Implementations may override this method to provide a specific group ID value.
* @return the consumer group ID.
* @since 3.3
*/
@Nullable
default String getGroupId() {
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
* @author Raphael Rösch
* @author Christian Mergenthaler
* @author Mikael Carlstedt
* @author Borahm Lee
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -681,7 +682,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final TransactionTemplate transactionTemplate;

private final String consumerGroupId = getGroupId();
private final String consumerGroupId = KafkaMessageListenerContainer.this.getGroupId();

private final TaskScheduler taskScheduler;

Expand Down Expand Up @@ -1362,8 +1363,8 @@ protected void initialize() {
}
publishConsumerStartingEvent();
this.consumerThread = Thread.currentThread();
setupSeeks();
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
setupSeeks();
this.count = 0;
this.last = System.currentTimeMillis();
initAssignedPartitions();
Expand Down Expand Up @@ -1906,7 +1907,7 @@ private void wrapUp(@Nullable Throwable throwable) {
this.consumerSeekAwareListener.onPartitionsRevoked(partitions);
this.consumerSeekAwareListener.unregisterSeekCallback();
}
this.logger.info(() -> getGroupId() + ": Consumer stopped");
this.logger.info(() -> this.consumerGroupId + ": Consumer stopped");
publishConsumerStoppedEvent(throwable);
}

Expand Down Expand Up @@ -2693,7 +2694,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
this.containerProperties.getObservationConvention(),
DefaultKafkaListenerObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), getGroupId(),
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), this.consumerGroupId,
this::clusterId),
this.observationRegistry);
return observation.observe(() -> {
Expand Down Expand Up @@ -3327,6 +3328,11 @@ public void seekToTimestamp(Collection<TopicPartition> topicParts, long timestam
topicParts.forEach(tp -> seekToTimestamp(tp.topic(), tp.partition(), timestamp));
}

@Override
public String getGroupId() {
return this.consumerGroupId;
}

@Override
public String toString() {
return "KafkaMessageListenerContainer.ListenerConsumer ["
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractConsumerSeekAwareTests.Config.MultiGroupListener;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.stereotype.Component;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Borahm Lee
* @since 3.3
*/
@DirtiesContext
@SpringJUnitConfig
@EmbeddedKafka(topics = {AbstractConsumerSeekAwareTests.TOPIC}, partitions = 3)
class AbstractConsumerSeekAwareTests {

static final String TOPIC = "Seek";

@Autowired
Config config;

@Autowired
KafkaTemplate<String, String> template;

@Autowired
MultiGroupListener multiGroupListener;

@Test
void seekForAllGroups() throws Exception {
template.send(TOPIC, "test-data");
template.send(TOPIC, "test-data");
assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();

MultiGroupListener.latch1 = new CountDownLatch(2);
MultiGroupListener.latch2 = new CountDownLatch(2);

multiGroupListener.seekToBeginning();
assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
void seekForSpecificGroup() throws Exception {
template.send(TOPIC, "test-data");
template.send(TOPIC, "test-data");
assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();

MultiGroupListener.latch1 = new CountDownLatch(2);
MultiGroupListener.latch2 = new CountDownLatch(2);

multiGroupListener.seekToBeginningForGroup("group2");
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(MultiGroupListener.latch1.await(100, TimeUnit.MICROSECONDS)).isFalse();
assertThat(MultiGroupListener.latch1.getCount()).isEqualTo(2);
}

@EnableKafka
@Configuration
static class Config {

@Autowired
EmbeddedKafkaBroker broker;

@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}

@Bean
ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("test-group", "false", this.broker));
}

@Bean
ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(this.broker));
}

@Bean
KafkaTemplate<String, String> template(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}

@Component
static class MultiGroupListener extends AbstractConsumerSeekAware {

static CountDownLatch latch1 = new CountDownLatch(2);

static CountDownLatch latch2 = new CountDownLatch(2);

@KafkaListener(groupId = "group1", topics = TOPIC)
void listenForGroup1(String in) {
latch1.countDown();
}

@KafkaListener(groupId = "group2", topics = TOPIC)
void listenForGroup2(String in) {
latch2.countDown();
}

void seekToBeginningForGroup(String groupIdForSeek) {
getCallbacksAndTopics().forEach((cb, topics) -> {
if (groupIdForSeek.equals(cb.getGroupId())) {
topics.forEach(tp -> cb.seekToBeginning(tp.topic(), tp.partition()));
}
});
}
}
}

}