Skip to content

Commit ab2037c

Browse files
authored
GH-2302: Enable consumer seek based on matching group Id (#3318)
Fixes: #2302 * GH-2302: Enable consumer seek only on matching group Id * Add groupId getter API in `ConsumerSeekCallback` so end-user applications can seek based on group IDs when multiple `KafkaListener` methods exist in the same class. * Adding tests to verify the new API. * Resolve checkstyle violations and add missing copyright sections, author tags, etc. * Update "seek.adoc" and "whats-new.adoc".
1 parent c35a2bb commit ab2037c

File tree

5 files changed

+192
-8
lines changed

5 files changed

+192
-8
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ void registerSeekCallback(ConsumerSeekCallback callback);
99
1010
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
1111
12-
void onPartitionsRevoked(Collection<TopicPartition> partitions)
12+
void onPartitionsRevoked(Collection<TopicPartition> partitions);
1313
1414
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
1515
----
@@ -49,6 +49,8 @@ void seekRelative(String topic, int partition, long offset, boolean toCurrent);
4949
void seekToTimestamp(String topic, int partition, long timestamp);
5050
5151
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
52+
53+
String getGroupId();
5254
----
5355

5456
The two different variants of the `seek` methods provide a way to seek to an arbitrary offset.
@@ -232,4 +234,11 @@ public class SomeOtherBean {
232234
233235
----
234236

237+
As of version 3.3, a new method `getGroupId()` was introduced in the `ConsumerSeekAware.ConsumerSeekCallback` interface.
238+
This method is particularly useful when you need to identify the consumer group associated with a specific seek callback.
239+
240+
NOTE: When using a class that extends `AbstractConsumerSeekAware`, a seek operation performed in one listener may impact all listeners in the same class.
241+
This might not always be the desired behavior.
242+
To address this, you can use the `getGroupId()` method provided by the callback.
243+
This allows you to perform seek operations selectively, targeting only the consumer group of interest.
235244

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@
77
This section covers the changes made from version 3.2 to version 3.3.
88
For changes in earlier version, see xref:appendix/change-history.adoc[Change History].
99

10-
1110
[[x33-dlt-topic-naming]]
1211
=== DLT Topic Naming Convention
1312

1413
The naming convention for DLT topics has been standardized to use the "-dlt" suffix consistently. This change ensures compatibility and avoids conflicts when transitioning between different retry solutions. Users who wish to retain the ".DLT" suffix behavior need to opt-in explicitly by setting the appropriate DLT name property.
1514

15+
[[x33-seek-with-group-id]]
16+
=== Enhanced Seek Operations for Consumer Groups
1617

17-
18+
A new method, `getGroupId()`, has been added to the `ConsumerSeekCallback` interface.
19+
This method allows for more selective seek operations by targeting only the desired consumer group.
20+
For more details, see xref:kafka/seek.adoc#seek[Seek API Docs].
1821

1922

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,17 @@
2222

2323
import org.apache.kafka.common.TopicPartition;
2424

25+
import org.springframework.lang.Nullable;
26+
2527
/**
2628
* Listeners that implement this interface are provided with a
2729
* {@link ConsumerSeekCallback} which can be used to perform a
2830
* seek operation.
2931
*
3032
* @author Gary Russell
3133
* @author Soby Chacko
34+
* @author Borahm Lee
3235
* @since 1.1
33-
*
3436
*/
3537
public interface ConsumerSeekAware {
3638

@@ -229,6 +231,17 @@ default void seekToEnd(Collection<TopicPartition> partitions) {
229231
*/
230232
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
231233

234+
/**
235+
* Retrieve the group ID associated with this consumer seek callback, if available.
236+
* This method returns {@code null} by default, indicating that the group ID is not specified.
237+
* Implementations may override this method to provide a specific group ID value.
238+
* @return the consumer group ID.
239+
* @since 3.3
240+
*/
241+
@Nullable
242+
default String getGroupId() {
243+
return null;
244+
}
232245
}
233246

234247
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@
165165
* @author Raphael Rösch
166166
* @author Christian Mergenthaler
167167
* @author Mikael Carlstedt
168+
* @author Borahm Lee
168169
*/
169170
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
170171
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -681,7 +682,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
681682

682683
private final TransactionTemplate transactionTemplate;
683684

684-
private final String consumerGroupId = getGroupId();
685+
private final String consumerGroupId = KafkaMessageListenerContainer.this.getGroupId();
685686

686687
private final TaskScheduler taskScheduler;
687688

@@ -1362,8 +1363,8 @@ protected void initialize() {
13621363
}
13631364
publishConsumerStartingEvent();
13641365
this.consumerThread = Thread.currentThread();
1365-
setupSeeks();
13661366
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
1367+
setupSeeks();
13671368
this.count = 0;
13681369
this.last = System.currentTimeMillis();
13691370
initAssignedPartitions();
@@ -1906,7 +1907,7 @@ private void wrapUp(@Nullable Throwable throwable) {
19061907
this.consumerSeekAwareListener.onPartitionsRevoked(partitions);
19071908
this.consumerSeekAwareListener.unregisterSeekCallback();
19081909
}
1909-
this.logger.info(() -> getGroupId() + ": Consumer stopped");
1910+
this.logger.info(() -> this.consumerGroupId + ": Consumer stopped");
19101911
publishConsumerStoppedEvent(throwable);
19111912
}
19121913

@@ -2693,7 +2694,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
26932694
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
26942695
this.containerProperties.getObservationConvention(),
26952696
DefaultKafkaListenerObservationConvention.INSTANCE,
2696-
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), getGroupId(),
2697+
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), this.consumerGroupId,
26972698
this::clusterId),
26982699
this.observationRegistry);
26992700
return observation.observe(() -> {
@@ -3327,6 +3328,11 @@ public void seekToTimestamp(Collection<TopicPartition> topicParts, long timestam
33273328
topicParts.forEach(tp -> seekToTimestamp(tp.topic(), tp.partition(), timestamp));
33283329
}
33293330

3331+
@Override
3332+
public String getGroupId() {
3333+
return this.consumerGroupId;
3334+
}
3335+
33303336
@Override
33313337
public String toString() {
33323338
return "KafkaMessageListenerContainer.ListenerConsumer ["
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.kafka.annotation.EnableKafka;
30+
import org.springframework.kafka.annotation.KafkaListener;
31+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
32+
import org.springframework.kafka.core.ConsumerFactory;
33+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
34+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
35+
import org.springframework.kafka.core.KafkaTemplate;
36+
import org.springframework.kafka.core.ProducerFactory;
37+
import org.springframework.kafka.listener.AbstractConsumerSeekAwareTests.Config.MultiGroupListener;
38+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
39+
import org.springframework.kafka.test.context.EmbeddedKafka;
40+
import org.springframework.kafka.test.utils.KafkaTestUtils;
41+
import org.springframework.stereotype.Component;
42+
import org.springframework.test.annotation.DirtiesContext;
43+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
44+
45+
/**
46+
* @author Borahm Lee
47+
* @since 3.3
48+
*/
49+
@DirtiesContext
50+
@SpringJUnitConfig
51+
@EmbeddedKafka(topics = {AbstractConsumerSeekAwareTests.TOPIC}, partitions = 3)
52+
class AbstractConsumerSeekAwareTests {
53+
54+
static final String TOPIC = "Seek";
55+
56+
@Autowired
57+
Config config;
58+
59+
@Autowired
60+
KafkaTemplate<String, String> template;
61+
62+
@Autowired
63+
MultiGroupListener multiGroupListener;
64+
65+
@Test
66+
void seekForAllGroups() throws Exception {
67+
template.send(TOPIC, "test-data");
68+
template.send(TOPIC, "test-data");
69+
assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
70+
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
71+
72+
MultiGroupListener.latch1 = new CountDownLatch(2);
73+
MultiGroupListener.latch2 = new CountDownLatch(2);
74+
75+
multiGroupListener.seekToBeginning();
76+
assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
77+
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
78+
}
79+
80+
@Test
81+
void seekForSpecificGroup() throws Exception {
82+
template.send(TOPIC, "test-data");
83+
template.send(TOPIC, "test-data");
84+
assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
85+
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
86+
87+
MultiGroupListener.latch1 = new CountDownLatch(2);
88+
MultiGroupListener.latch2 = new CountDownLatch(2);
89+
90+
multiGroupListener.seekToBeginningForGroup("group2");
91+
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
92+
assertThat(MultiGroupListener.latch1.await(100, TimeUnit.MICROSECONDS)).isFalse();
93+
assertThat(MultiGroupListener.latch1.getCount()).isEqualTo(2);
94+
}
95+
96+
@EnableKafka
97+
@Configuration
98+
static class Config {
99+
100+
@Autowired
101+
EmbeddedKafkaBroker broker;
102+
103+
@Bean
104+
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
105+
ConsumerFactory<String, String> consumerFactory) {
106+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
107+
factory.setConsumerFactory(consumerFactory);
108+
return factory;
109+
}
110+
111+
@Bean
112+
ConsumerFactory<String, String> consumerFactory() {
113+
return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("test-group", "false", this.broker));
114+
}
115+
116+
@Bean
117+
ProducerFactory<String, String> producerFactory() {
118+
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(this.broker));
119+
}
120+
121+
@Bean
122+
KafkaTemplate<String, String> template(ProducerFactory<String, String> pf) {
123+
return new KafkaTemplate<>(pf);
124+
}
125+
126+
@Component
127+
static class MultiGroupListener extends AbstractConsumerSeekAware {
128+
129+
static CountDownLatch latch1 = new CountDownLatch(2);
130+
131+
static CountDownLatch latch2 = new CountDownLatch(2);
132+
133+
@KafkaListener(groupId = "group1", topics = TOPIC)
134+
void listenForGroup1(String in) {
135+
latch1.countDown();
136+
}
137+
138+
@KafkaListener(groupId = "group2", topics = TOPIC)
139+
void listenForGroup2(String in) {
140+
latch2.countDown();
141+
}
142+
143+
void seekToBeginningForGroup(String groupIdForSeek) {
144+
getCallbacksAndTopics().forEach((cb, topics) -> {
145+
if (groupIdForSeek.equals(cb.getGroupId())) {
146+
topics.forEach(tp -> cb.seekToBeginning(tp.topic(), tp.partition()));
147+
}
148+
});
149+
}
150+
}
151+
}
152+
153+
}

0 commit comments

Comments
 (0)