-
Notifications
You must be signed in to change notification settings - Fork 1.6k
GH-2302: Enable consumer seek only on matching group Id #3318
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java
Outdated
Show resolved
Hide resolved
spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerSeekAwareTests.java
Outdated
Show resolved
Hide resolved
spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerSeekAwareTests.java
Outdated
Show resolved
Hide resolved
@bky373 I see that the general idea is what we discussed. We need to add some documentation, explaining how end users can match the group id. |
@bky373 I think we may need to take a slightly different approach in this PR. While the solution you have in the PR currently satisfies the immediate use cases at hand, this may not be flexible enough for other wider set of requirements. Take a look at this code snippet you provided in the issue.
While you register the callbacks based on the new method's boolean return value, which works as expected, this is not flexible enough. What if the user wants to seek other listeners based on some other conditions during runtime? The callback registration happens at startup only once, and the callbacks cannot be re-registered during the runtime. Moreover, the seeks must happen on the listener threads, which the PR is doing currently. A better solution is a hybrid of what we discussed in the issue. After the CC @artembilan for any additional comments. |
Okay, I see. I guess we'll need a flexible implementation. |
@sobychacko |
...g-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java
Outdated
Show resolved
Hide resolved
...g-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java
Show resolved
Hide resolved
@bky373 I took a look at your latest changes. It looks good to me. The changes look very simple, and it also makes the API flexible. I also get the other issue you have encountered, and it is a good idea to work on it in a separate PR. I added some review comments in the new test class. |
@sobychacko The implementation has changed, but should we add documentation on how to enable seek for only certain groupIds in |
Yes, I think we need to add this in the |
...g-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java
Outdated
Show resolved
Hide resolved
...g-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java
Outdated
Show resolved
Hide resolved
...g-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java
Outdated
Show resolved
Hide resolved
@sobychacko |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, find some nit-picks from me.
In general it looks good and very close to merge.
Thank you!
Backgrounds
@KafkaListener
, listeners with different group IDs can exist within the same class.AbstractConsumerSeekAware
, when seek is performed in one listener, it is executed in all listeners, even if it is not desired.Changes
matchGroupId()
) toConsumerSeekAware
. This will allow setting the callback and performing seeks only for the desired group ID. (default:false
)matchGroupId()
inAbstractConsumerSeekAware
to returntrue
. It would be nice to set this tofalse
so that users should specify the groupId themselves, but in this case the callback fromKafkaMessageListenerContainer.ListenerConsumer.setupSeeks()
would not be registered, so I set it totrue
. And this is also to ensure backwards compatibility.