Skip to content

Make StreamListener read batch messages at once #3138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

Danden1
Copy link

@Danden1 Danden1 commented Apr 24, 2025

  • You have read the Spring Data contribution guidelines.
  • You use the code formatters provided here and have them applied to your changes. Don’t submit any formatting related changes.
  • You submit test cases (unit or integration tests) that back your changes.
  • You added yourself as author in the headers of the classes you touched. Amend the date range in the Apache license header if needed. For new types, add the license header (copy from another file and set the current year only).

Resolves spring-projects/spring-data-redis#3078

This pull request implements support for batch message consumption in StreamListener,
allowing it to receive and process multiple messages at once.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Apr 24, 2025
@@ -33,5 +36,5 @@ public interface StreamListener<K, V extends Record<K, ?>> {
*
* @param message never {@literal null}.
*/
void onMessage(V message);
void onMessage(List<V> message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That won't work as this is a breaking change.

pollState.updateReadOffset(raw.getId().getValue());
V record = convertRecord(raw);
listener.onMessage(record);
messages.add(record);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batching introduces an error category of message loss if one of the messages causes an exception. In such a case, the read offset has been updated but the message was never delivered to a listener.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. I will revise the PR based on your suggestions.
I'll try implementing a separate class called StreamBatchListener to handle batch processing.

@mp911de mp911de added the status: on-hold We cannot start working on this issue yet label Apr 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: on-hold We cannot start working on this issue yet status: waiting-for-triage An issue we've not yet triaged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Why does the StreamListener consume only one record?
3 participants