Skip to content

Commit adb8970

Browse files
kurthongkurthong77
andauthored
Remove RetryListener from KafkaInbounds
* KafkaMessageDrivenChannelAdapter's ATTRIBUTES_HOLDER should be isolated. In order to achieve custom retry in batch mode, we may to use a RetryTemplate in listener itself. But if the RetryTemplate is shared with another KafkaMessageDrivenChannelAdapter, batch mode's ATTRIBUTES_HOLDER might be over-written by another KafkaMessageDrivenChannelAdapter's IntegrationRecordMessageListener. The situation is like shown below. - There is only one RetryTemple bean in the application. - There are two KafkaMessageDrivenChannelAdapters(A,B) in the application. - A KafkaMessageDrivenChannelAdapter is batch mode and utilizing the retryTemplate in the listener. - B KafkaMessageDrivenChannelAdapter is record mode and using the retryTemplate itself. - (B KafkaMessageDrivenChannelAdapter's recordListener is registered in the retryTemplate.) - When A retry is attempted in the listener, it will trigger B KafkaMessageDrivenChannelAdapter's recordListener. - B KafkaMessageDrivenChannelAdapter's recordListener will overwrite A KafkaMessageDrivenChannelAdapter's ATTRIBUTES_HOLDER. * should not mutate an externally provided RetryTemplate * should not expose KafkaInboundEndpoint's methods outside the class and fix a checkstyle error. * removing unused code and polishing * restore retry around batch and bring back KafkaInboundEndpoint to endpoints. * remove retry logic in batch mode and move ATTRIBUTES_HOLDER into KafkaInboundEndpoint * remove generic type parameters of KafkaInboundEndpoint * fix style error --------- Co-authored-by: kurt <[email protected]>
1 parent f0561b6 commit adb8970

File tree

3 files changed

+25
-109
lines changed

3 files changed

+25
-109
lines changed

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundEndpoint.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,9 @@
1717
package org.springframework.integration.kafka.inbound;
1818

1919
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2021

22+
import org.springframework.core.AttributeAccessor;
2123
import org.springframework.kafka.KafkaException;
2224
import org.springframework.kafka.support.Acknowledgment;
2325
import org.springframework.retry.RecoveryCallback;
@@ -50,6 +52,8 @@ public interface KafkaInboundEndpoint {
5052
*/
5153
String CONTEXT_RECORD = "record";
5254

55+
ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();
56+
5357
/**
5458
* Execute the runnable with the retry template and recovery callback.
5559
* @param template the template.
@@ -59,21 +63,27 @@ public interface KafkaInboundEndpoint {
5963
* @param consumer the consumer.
6064
* @param runnable the runnable.
6165
*/
62-
default void doWithRetry(RetryTemplate template, RecoveryCallback<?> callback, Object data,
66+
default void doWithRetry(RetryTemplate template, RecoveryCallback<?> callback, ConsumerRecord<?, ?> record,
6367
Acknowledgment acknowledgment, Consumer<?, ?> consumer, Runnable runnable) {
6468

6569
try {
6670
template.execute(context -> {
67-
context.setAttribute(CONTEXT_RECORD, data);
68-
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
69-
context.setAttribute(CONTEXT_CONSUMER, consumer);
71+
if (context.getRetryCount() == 0) {
72+
context.setAttribute(CONTEXT_RECORD, record);
73+
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
74+
context.setAttribute(CONTEXT_CONSUMER, consumer);
75+
ATTRIBUTES_HOLDER.set(context);
76+
}
7077
runnable.run();
7178
return null;
7279
}, callback);
7380
}
7481
catch (Exception ex) {
7582
throw new KafkaException("Failed to execute runnable", ex);
7683
}
84+
finally {
85+
ATTRIBUTES_HOLDER.remove();
86+
}
7787
}
7888

7989
}

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -55,9 +55,7 @@
5555
import org.springframework.messaging.MessageChannel;
5656
import org.springframework.messaging.MessageHeaders;
5757
import org.springframework.retry.RecoveryCallback;
58-
import org.springframework.retry.RetryCallback;
5958
import org.springframework.retry.RetryContext;
60-
import org.springframework.retry.RetryListener;
6159
import org.springframework.retry.support.RetryTemplate;
6260
import org.springframework.util.Assert;
6361

@@ -78,8 +76,6 @@
7876
public class KafkaInboundGateway<K, V, R> extends MessagingGatewaySupport
7977
implements KafkaInboundEndpoint, Pausable, OrderlyShutdownCapable {
8078

81-
private static final ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();
82-
8379
private final IntegrationRecordMessageListener listener = new IntegrationRecordMessageListener();
8480

8581
private final AbstractMessageListenerContainer<K, V> messageListenerContainer;
@@ -195,7 +191,6 @@ public void setBindSourceRecord(boolean bindSourceRecord) {
195191
protected void onInit() {
196192
super.onInit();
197193
if (this.retryTemplate != null) {
198-
this.retryTemplate.registerListener(this.listener);
199194
MessageChannel errorChannel = getErrorChannel();
200195
if (this.recoveryCallback != null && errorChannel != null) {
201196
this.recoveryCallback = new ErrorMessageSendingRecoverer(errorChannel, getErrorMessageStrategy());
@@ -261,7 +256,7 @@ public int afterShutdown() {
261256
private void setAttributesIfNecessary(Object record, @Nullable Message<?> message, boolean conversionError) {
262257
boolean needHolder = ATTRIBUTES_HOLDER.get() == null
263258
&& (getErrorChannel() != null && (this.retryTemplate == null || conversionError));
264-
boolean needAttributes = needHolder | this.retryTemplate != null;
259+
boolean needAttributes = needHolder || this.retryTemplate != null;
265260
if (needHolder) {
266261
ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
267262
}
@@ -285,8 +280,7 @@ protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
285280
}
286281
}
287282

288-
private class IntegrationRecordMessageListener extends RecordMessagingMessageListenerAdapter<K, V>
289-
implements RetryListener {
283+
private class IntegrationRecordMessageListener extends RecordMessagingMessageListenerAdapter<K, V> {
290284

291285
IntegrationRecordMessageListener() {
292286
super(null, null); // NOSONAR - out of use
@@ -332,9 +326,8 @@ private void sendAndReceive(ConsumerRecord<K, V> record, Message<?> message, Ack
332326
RetryTemplate template = KafkaInboundGateway.this.retryTemplate;
333327
if (template != null) {
334328
doWithRetry(template, KafkaInboundGateway.this.recoveryCallback, record, acknowledgment, consumer,
335-
() -> {
336-
doSendAndReceive(enhanceHeadersAndSaveAttributes(message, record));
337-
});
329+
() -> doSendAndReceive(enhanceHeadersAndSaveAttributes(message, record))
330+
);
338331
}
339332
else {
340333
doSendAndReceive(enhanceHeadersAndSaveAttributes(message, record));
@@ -427,27 +420,6 @@ private Message<?> enhanceReply(Message<?> message, Message<?> reply) {
427420
return reply;
428421
}
429422

430-
@Override
431-
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
432-
if (KafkaInboundGateway.this.retryTemplate != null) {
433-
ATTRIBUTES_HOLDER.set(context);
434-
}
435-
return true;
436-
}
437-
438-
@Override
439-
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
440-
Throwable throwable) {
441-
442-
ATTRIBUTES_HOLDER.remove();
443-
}
444-
445-
@Override
446-
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
447-
Throwable throwable) {
448-
// Empty
449-
}
450-
451423
}
452424

453425
}

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java

Lines changed: 5 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,7 @@
6363
import org.springframework.messaging.Message;
6464
import org.springframework.messaging.MessageChannel;
6565
import org.springframework.retry.RecoveryCallback;
66-
import org.springframework.retry.RetryCallback;
6766
import org.springframework.retry.RetryContext;
68-
import org.springframework.retry.RetryListener;
6967
import org.springframework.retry.support.RetryTemplate;
7068
import org.springframework.util.Assert;
7169

@@ -85,8 +83,6 @@
8583
public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSupport
8684
implements KafkaInboundEndpoint, OrderlyShutdownCapable, Pausable {
8785

88-
private static final ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();
89-
9086
private final AbstractMessageListenerContainer<K, V> messageListenerContainer;
9187

9288
private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener();
@@ -298,7 +294,6 @@ protected void onInit() {
298294
if (this.recoveryCallback != null && errorChannel != null) {
299295
this.recoveryCallback = new ErrorMessageSendingRecoverer(errorChannel, getErrorMessageStrategy());
300296
}
301-
this.retryTemplate.registerListener(this.recordListener);
302297
}
303298
if (!doFilterInRetry && this.recordFilterStrategy != null) {
304299
listener = new FilteringMessageListenerAdapter<>(listener, this.recordFilterStrategy,
@@ -366,7 +361,7 @@ public int afterShutdown() {
366361
private void setAttributesIfNecessary(Object record, @Nullable Message<?> message, boolean conversionError) {
367362
boolean needHolder = ATTRIBUTES_HOLDER.get() == null
368363
&& (getErrorChannel() != null && (this.retryTemplate == null || conversionError));
369-
boolean needAttributes = needHolder | this.retryTemplate != null;
364+
boolean needAttributes = needHolder || this.retryTemplate != null;
370365
if (needHolder) {
371366
ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
372367
}
@@ -424,8 +419,7 @@ public enum ListenerMode {
424419
batch
425420
}
426421

427-
private class IntegrationRecordMessageListener extends RecordMessagingMessageListenerAdapter<K, V>
428-
implements RetryListener {
422+
private class IntegrationRecordMessageListener extends RecordMessagingMessageListenerAdapter<K, V> {
429423

430424
IntegrationRecordMessageListener() {
431425
super(null, null); // NOSONAR - out of use
@@ -511,34 +505,12 @@ else if (KafkaMessageDrivenChannelAdapter.this.containerDeliveryAttemptPresent)
511505
return messageToReturn;
512506
}
513507

514-
@Override
515-
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
516-
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
517-
ATTRIBUTES_HOLDER.set(context);
518-
}
519-
return true;
520-
}
521-
522-
@Override
523-
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
524-
Throwable throwable) {
525-
526-
ATTRIBUTES_HOLDER.remove();
527-
}
528-
529-
@Override
530-
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
531-
Throwable throwable) {
532-
// Empty
533-
}
534-
535508
}
536509

537-
private class IntegrationBatchMessageListener extends BatchMessagingMessageListenerAdapter<K, V>
538-
implements RetryListener {
510+
private class IntegrationBatchMessageListener extends BatchMessagingMessageListenerAdapter<K, V> {
539511

540512
IntegrationBatchMessageListener() {
541-
super(null, null); // NOSONAR - out if use
513+
super(null, null); // NOSONAR - out of use
542514
}
543515

544516
@Override
@@ -557,33 +529,10 @@ public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowl
557529
message = toMessage(records, acknowledgment, consumer);
558530
}
559531
if (message != null) {
560-
RetryTemplate template = KafkaMessageDrivenChannelAdapter.this.retryTemplate;
561-
if (template != null) {
562-
doWIthRetry(records, acknowledgment, consumer, message, template);
563-
}
564-
else {
565-
sendMessageIfAny(message, records);
566-
}
532+
sendMessageIfAny(message, records);
567533
}
568534
}
569535

570-
private void doWIthRetry(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment,
571-
Consumer<?, ?> consumer, Message<?> message, RetryTemplate template) {
572-
573-
doWithRetry(template, KafkaMessageDrivenChannelAdapter.this.recoveryCallback, records, acknowledgment,
574-
consumer, () -> {
575-
if (KafkaMessageDrivenChannelAdapter.this.filterInRetry) {
576-
List<ConsumerRecord<K, V>> filtered =
577-
KafkaMessageDrivenChannelAdapter.this.recordFilterStrategy.filterBatch(records);
578-
Message<?> toSend = message;
579-
if (filtered.size() != records.size()) {
580-
toSend = toMessage(filtered, acknowledgment, consumer);
581-
}
582-
sendMessageIfAny(toSend, filtered);
583-
}
584-
});
585-
}
586-
587536
@Nullable
588537
private Message<?> toMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment,
589538
Consumer<?, ?> consumer) {
@@ -607,21 +556,6 @@ private Message<?> toMessage(List<ConsumerRecord<K, V>> records, Acknowledgment
607556
return message;
608557
}
609558

610-
@Override
611-
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
612-
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
613-
ATTRIBUTES_HOLDER.set(context);
614-
}
615-
return true;
616-
}
617-
618-
@Override
619-
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
620-
Throwable throwable) {
621-
622-
ATTRIBUTES_HOLDER.remove();
623-
}
624-
625559
}
626560

627561
}

0 commit comments

Comments
 (0)