Skip to content

Commit 280f5f4

Browse files
committed
Make StreamListener read batch messages at once
1 parent 5e71dbe commit 280f5f4

File tree

4 files changed

+20
-13
lines changed

4 files changed

+20
-13
lines changed

src/main/java/org/springframework/data/redis/stream/StreamListener.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import org.springframework.data.redis.connection.stream.Record;
1919

20+
import java.util.List;
21+
2022
/**
2123
* Listener interface to receive delivery of {@link Record messages}.
2224
*
@@ -33,5 +35,5 @@ public interface StreamListener<K, V extends Record<K, ?>> {
3335
*
3436
* @param message never {@literal null}.
3537
*/
36-
void onMessage(V message);
38+
void onMessage(List<V> message);
3739
}

src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.data.redis.stream;
1717

1818
import java.time.Duration;
19+
import java.util.List;
1920
import java.util.OptionalInt;
2021
import java.util.concurrent.Executor;
2122
import java.util.function.Predicate;
@@ -82,10 +83,10 @@
8283
* <p>
8384
* {@link StreamMessageListenerContainer} requires a {@link Executor} to fork long-running polling tasks on a different
8485
* {@link Thread}. This thread is used as event loop to poll for stream messages and invoke the
85-
* {@link StreamListener#onMessage(Record) listener callback}.
86+
* {@link StreamListener#onMessage(List)} listener callback}.
8687
* <p>
8788
* {@link StreamMessageListenerContainer} tasks propagate errors during stream reads and
88-
* {@link StreamListener#onMessage(Record) listener notification} to a configurable {@link ErrorHandler}. Errors stop a
89+
* {@link StreamListener#onMessage(List)} listener notification} to a configurable {@link ErrorHandler}. Errors stop a
8990
* {@link Subscription} by default. Configuring a {@link Predicate} for a {@link StreamReadRequest} allows conditional
9091
* subscription cancelling or continuing on all errors.
9192
* <p>

src/main/java/org/springframework/data/redis/stream/StreamPollTask.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.data.redis.stream;
1717

1818
import java.time.Duration;
19+
import java.util.ArrayList;
1920
import java.util.List;
2021
import java.util.Optional;
2122
import java.util.concurrent.CountDownLatch;
@@ -147,14 +148,14 @@ private List<ByteRecord> readRecords() {
147148
}
148149

149150
private void deserializeAndEmitRecords(List<ByteRecord> records) {
151+
List<V> messages = new ArrayList<>();
150152

151153
for (ByteRecord raw : records) {
152154

153155
try {
154-
155156
pollState.updateReadOffset(raw.getId().getValue());
156157
V record = convertRecord(raw);
157-
listener.onMessage(record);
158+
messages.add(record);
158159
} catch (RuntimeException ex) {
159160

160161
if (cancelSubscriptionOnError.test(ex)) {
@@ -166,8 +167,11 @@ private void deserializeAndEmitRecords(List<ByteRecord> records) {
166167
}
167168

168169
errorHandler.handleError(ex);
170+
return;
169171
}
170172
}
173+
174+
listener.onMessage(messages);
171175
}
172176

173177
private V convertRecord(ByteRecord record) {

src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ void shouldReceiveMapMessages() throws InterruptedException {
9191
BlockingQueue<MapRecord<String, String, String>> queue = new LinkedBlockingQueue<>();
9292

9393
container.start();
94-
Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add);
94+
Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll);
9595

9696
subscription.await(DEFAULT_TIMEOUT);
9797

@@ -119,7 +119,7 @@ void shouldReceiveSimpleObjectHashRecords() throws InterruptedException {
119119
BlockingQueue<ObjectRecord<String, String>> queue = new LinkedBlockingQueue<>();
120120

121121
container.start();
122-
Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add);
122+
Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll);
123123

124124
subscription.await(DEFAULT_TIMEOUT);
125125

@@ -143,7 +143,7 @@ void shouldReceiveObjectHashRecords() throws InterruptedException {
143143
BlockingQueue<ObjectRecord<String, LoginEvent>> queue = new LinkedBlockingQueue<>();
144144

145145
container.start();
146-
Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add);
146+
Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll);
147147

148148
subscription.await(DEFAULT_TIMEOUT);
149149

@@ -168,7 +168,7 @@ void shouldReceiveMessagesInConsumerGroup() throws InterruptedException {
168168

169169
container.start();
170170
Subscription subscription = container.receive(Consumer.from("my-group", "my-consumer"),
171-
StreamOffset.create("my-stream", ReadOffset.lastConsumed()), queue::add);
171+
StreamOffset.create("my-stream", ReadOffset.lastConsumed()), queue::addAll);
172172

173173
subscription.await(DEFAULT_TIMEOUT);
174174

@@ -194,7 +194,7 @@ void shouldReceiveAndAckMessagesInConsumerGroup() throws InterruptedException {
194194

195195
container.start();
196196
Subscription subscription = container.receiveAutoAck(Consumer.from("my-group", "my-consumer"),
197-
StreamOffset.create("my-stream", ReadOffset.lastConsumed()), queue::add);
197+
StreamOffset.create("my-stream", ReadOffset.lastConsumed()), queue::addAll);
198198

199199
subscription.await(DEFAULT_TIMEOUT);
200200

@@ -316,7 +316,7 @@ void deserializationShouldContinueStreamRead() throws InterruptedException {
316316
redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("payload", "3"));
317317

318318
container.start();
319-
Subscription subscription = container.register(readRequest, records::add);
319+
Subscription subscription = container.register(readRequest, records::addAll);
320320

321321
subscription.await(DEFAULT_TIMEOUT);
322322

@@ -347,7 +347,7 @@ void cancelledStreamShouldNotReceiveMessages() throws InterruptedException {
347347
BlockingQueue<MapRecord<String, String, String>> queue = new LinkedBlockingQueue<>();
348348

349349
container.start();
350-
Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add);
350+
Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll);
351351

352352
subscription.await(DEFAULT_TIMEOUT);
353353
cancelAwait(subscription);
@@ -365,7 +365,7 @@ void containerRestartShouldRestartSubscription() throws InterruptedException {
365365
BlockingQueue<MapRecord<String, String, String>> queue = new LinkedBlockingQueue<>();
366366

367367
container.start();
368-
Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add);
368+
Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll);
369369

370370
subscription.await(DEFAULT_TIMEOUT);
371371

0 commit comments

Comments
 (0)