Closed as not planned
Description
I discovered that there is an option called batch size.
StreamMessageListenerContainerOptions<String, ObjectRecord<String, Long>> containerOptions = StreamMessageListenerContainerOptions
.builder().batchSize(3).pollTimeout(Duration.ofMillis(100)).targetType(Long.class).build();
So, I expected that using this option would fetch all the data at once.
However, onMessage
method cannot take a list as a parameter.
class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
@Override
//here
public void onMessage(MapRecord<String, String, String> message) {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
}
}
It reads the data in batch size at once, but uses a for loop to deliver the data one by one.
class StreamPollTask<K, V extends Record<K, ?>> implements Task {
....
private void deserializeAndEmitRecords(List<ByteRecord> records) {
for (ByteRecord raw : records) {
try {
pollState.updateReadOffset(raw.getId().getValue());
V record = convertRecord(raw);
listener.onMessage(record);
} catch (RuntimeException ex) {
if (cancelSubscriptionOnError.test(ex)) {
cancel();
errorHandler.handleError(ex);
return;
}
errorHandler.handleError(ex);
}
}
}
}
I'm curious why it doesn't provide the data as a list at once, but instead delivers it this way.
(KafkaListener allow list, https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html#batch-listeners)