Open
Description
Problem Description
- In certain concurrent scenarios, we have observed that the pub/sub mechanism can result in subscription failures and subsequently lead to memory leaks.#2425
Code Execution Process
addListener Method
- Initially, the listener is added to the
listenerTopics
collection. - Subsequently, the
subscribeChannel
method is invoked to establish the subscription.
org.springframework.data.redis.connection.util.AbstractSubscription#subscribe Method
- The method checks if the
alive
flag is set to true. If it is not, aRedisInvalidSubscriptionException
is thrown.
removeMessageListener Method
- The listener is first removed from the
listenerTopics
collection. - Next, it is verified whether
listenerTopics
is empty. If so, thestopListening
method is called. - Following this, the channel is removed from the subscription's channels, and the
closeIfUnsubscribed
method is invoked.
org.springframework.data.redis.connection.util.AbstractSubscription#closeIfUnsubscribed Method
- The method checks if the channels collection is empty. If it is, the
alive
flag is set to false usingalive.compareAndSet(true, false)
.
Bug Analysis
- Steps 2 and 3 of the
removeMessageListener
method introduce a potential race condition. During the interval between these steps, it is possible for step 2 to detect thatlistenerTopics
is not empty while step 3 detects that the channel is empty (resulting in settingalive
to false). - Consequently, any subsequent calls to
addListener
will fail because the subscription process is deemed unsuccessful. As a result, applications typically do not invokeremoveMessageListener
again, leading to the listener remaining in thelistenerTopics
collection indefinitely.
Reproduction
- I made some changes to the code to make the issue easier to reproduce.
Modify Code (RedisMessageListenerContainer)
- The first invocation of
addListener
executes successfully and promptly. - The second invocation of
addListener
experiences a delay between Step 1 and Step 2. Execution resumes after the first call toremoveMessageListener
.
public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle {
// ==============add code start
private static final AtomicInteger counter = new AtomicInteger(0);
private static final AtomicBoolean firstListenerHasRemoved = new AtomicBoolean(false);
// ==============add code end
// ==============add code start
private static final AtomicInteger counter = new AtomicInteger(0);
private static final AtomicBoolean firstListenerHasRemoved = new AtomicBoolean(false);
private void addListener(MessageListener listener, Collection<? extends Topic> topics) {
// ....
for (Topic topic : topics) {
// ...
}
// ============== add code start
boolean wasListening = isListening();
counter.incrementAndGet();
while (counter.get() == 2 && !firstListenerHasRemoved.get()) { }
// ============== add code end
if (isRunning()) {
lazyListen();
// .....
}
}
Add test case to RedisMessageListenerContainerIntegrationTests
@ParameterizedRedisTest
void subscribeErrorAfterStart() throws Exception {
CompositeListener listener1 = (message, pattern) -> { };
CompositeListener listener2 = (message, pattern) -> { };
container.start();
// the first subscribe will succeed
container.addMessageListener(listener1, new ChannelTopic("a"));
CompletableFuture<Exception> subscribeExceptionFuture = new CompletableFuture<>();
new Thread(() -> {
try {
container.addMessageListener(listener2, new ChannelTopic("b"));
subscribeExceptionFuture.complete(null);
} catch (Exception e) {
subscribeExceptionFuture.complete(e);
}
}).start();
new Thread(() -> {
container.removeMessageListener(listener1);
}).start();
// the second subscribe will fail (not expected)
subscribeExceptionFuture.get();
CompositeListener listener3 = (message, pattern) -> { };
// after the exception, we should still be able to subscribe to channels
CompletableFuture<Exception> otherSubscribeExceptionFuture = new CompletableFuture<>();
new Thread(() -> {
try {
container.addMessageListener(listener3, new ChannelTopic("c"));
otherSubscribeExceptionFuture.complete(null);
} catch (Exception e) {
otherSubscribeExceptionFuture.complete(e);
}
}).start();
assertThat(otherSubscribeExceptionFuture.get()).isNull();
container.destroy();
}
