Skip to content

GH-2302: Enable consumer seek only on matching group Id #3318

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 25, 2024

Conversation

bky373
Copy link
Contributor

@bky373 bky373 commented Jun 18, 2024

Backgrounds

Changes

  • Add a new default method (matchGroupId()) to ConsumerSeekAware. This will allow setting the callback and performing seeks only for the desired group ID. (default: false)
  • Override matchGroupId() in AbstractConsumerSeekAware to return true. It would be nice to set this to false so that users should specify the groupId themselves, but in this case the callback from KafkaMessageListenerContainer.ListenerConsumer.setupSeeks() would not be registered, so I set it to true. And this is also to ensure backwards compatibility.

@sobychacko
Copy link
Contributor

@bky373 I see that the general idea is what we discussed. We need to add some documentation, explaining how end users can match the group id.

@sobychacko
Copy link
Contributor

sobychacko commented Jun 19, 2024

@bky373 I think we may need to take a slightly different approach in this PR. While the solution you have in the PR currently satisfies the immediate use cases at hand, this may not be flexible enough for other wider set of requirements. Take a look at this code snippet you provided in the issue.

@Component
    public static class Listener extends AbstractConsumerSeekAware {

        @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
        public void listen(String payload) {
            System.out.println("Listener received: " + payload);
        }

        @KafkaListener(id = "seekExample3", topics = "seekExample", concurrency = "3")
        public void listen3(String payload) {
            System.out.println("Listener3 received: " + payload);
        }

        public void seekToStart() {
            getSeekCallbacks().forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
        }
    }

While you register the callbacks based on the new method's boolean return value, which works as expected, this is not flexible enough. What if the user wants to seek other listeners based on some other conditions during runtime? The callback registration happens at startup only once, and the callbacks cannot be re-registered during the runtime. Moreover, the seeks must happen on the listener threads, which the PR is doing currently. A better solution is a hybrid of what we discussed in the issue.

After the getSeekCallbacks(), the user needs to do some filtering operations to get the group IDs. Currently, the ConsumerSeekCallback returned doesn't give you the group ID. This is where you need to make the core changes so that the ConsumerSeekCallback adds a new public API for retrieving the group ID. Could you rework the PR to accommodate this new design? Thanks!

CC @artembilan for any additional comments.

@bky373
Copy link
Contributor Author

bky373 commented Jun 21, 2024

Okay, I see. I guess we'll need a flexible implementation.
I'll have to think about this a bit more and work on it
Thank you so much for working with me on this.

@bky373
Copy link
Contributor Author

bky373 commented Jun 22, 2024

@sobychacko
a81a06d (It is better to see Files changed)
I have added a groupId getter to ConsumerSeekCallback. This approach also minimizes changes, and based on the new tests created, users can utilize it as needed in their preferred manner. Your feedback on this would be greatly appreciated. Thank you.

@sobychacko
Copy link
Contributor

@bky373 I took a look at your latest changes. It looks good to me. The changes look very simple, and it also makes the API flexible. I also get the other issue you have encountered, and it is a good idea to work on it in a separate PR. I added some review comments in the new test class.

@bky373
Copy link
Contributor Author

bky373 commented Jun 24, 2024

@bky373 I see that the general idea is what we discussed. We need to add some documentation, explaining how end users can match the group id.

@sobychacko The implementation has changed, but should we add documentation on how to enable seek for only certain groupIds in seek.adoc?

@sobychacko
Copy link
Contributor

Yes, I think we need to add this in the seek section of the ref docs. Also, since we are adding a new public API, we need to mention this in the whats-new section.

@bky373
Copy link
Contributor Author

bky373 commented Jun 24, 2024

Yes, I think we need to add this in the seek section of the ref docs. Also, since we are adding a new public API, we need to mention this in the whats-new section.

@sobychacko
Please check the documentation.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, find some nit-picks from me.
In general it looks good and very close to merge.

Thank you!

@sobychacko sobychacko merged commit ab2037c into spring-projects:main Jun 25, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

consumer group specific offset seeking for AbstractConsumerSeekAware
3 participants