Closed
Description
Hi,
While browsing through the code of the ReplyingKafkaTemplate, I noticed the following:
- According to the parent specifications of the
sendAndReceive(ProducerRecord<K, V> record, Duration replyTimeout)
method in ReplyingKafkaOperations, the reply timeout should be set to the default value if a null value is passed as argument. However, I don't see any check for that in the implementation and get anNullPointerException
from the scheduler when passing the null value. I guess something is missing here. - The
sendAndReceive(Message<?> message, Duration replyTimeout, @Nullable ParameterizedTypeReference<P> returnType)
method takes the timeout in parameter but never uses it. I guess the first line in the implementation should be
RequestReplyFuture<K, V, R> future = sendAndReceive((ProducerRecord<K, V>) getMessageConverter()
.fromMessage(message, getDefaultTopic()), replyTimeout);
instead of
RequestReplyFuture<K, V, R> future = sendAndReceive((ProducerRecord<K, V>) getMessageConverter()
.fromMessage(message, getDefaultTopic()));
I hope these comments are valuable and I didn't misunderstand something.
Regards,
Jeremy