Skip to content

Pub/Sub in a concurrent scenario may result in the inability to use subscriptions anymore, which can ultimately lead to memory leaks #3080

Open
@caiuswang

Description

@caiuswang

Problem Description

  • In certain concurrent scenarios, we have observed that the pub/sub mechanism can result in subscription failures and subsequently lead to memory leaks.#2425

Code Execution Process

addListener Method

  1. Initially, the listener is added to the listenerTopics collection.
  2. Subsequently, the subscribeChannel method is invoked to establish the subscription.

org.springframework.data.redis.connection.util.AbstractSubscription#subscribe Method

  1. The method checks if the alive flag is set to true. If it is not, a RedisInvalidSubscriptionException is thrown.

removeMessageListener Method

  1. The listener is first removed from the listenerTopics collection.
  2. Next, it is verified whether listenerTopics is empty. If so, the stopListening method is called.
  3. Following this, the channel is removed from the subscription's channels, and the closeIfUnsubscribed method is invoked.

org.springframework.data.redis.connection.util.AbstractSubscription#closeIfUnsubscribed Method

  1. The method checks if the channels collection is empty. If it is, the alive flag is set to false using alive.compareAndSet(true, false).

Bug Analysis

  • Steps 2 and 3 of the removeMessageListener method introduce a potential race condition. During the interval between these steps, it is possible for step 2 to detect that listenerTopics is not empty while step 3 detects that the channel is empty (resulting in setting alive to false).
  • Consequently, any subsequent calls to addListener will fail because the subscription process is deemed unsuccessful. As a result, applications typically do not invoke removeMessageListener again, leading to the listener remaining in the listenerTopics collection indefinitely.

Reproduction

  • I made some changes to the code to make the issue easier to reproduce.

Modify Code (RedisMessageListenerContainer)

  • The first invocation of addListener executes successfully and promptly.
  • The second invocation of addListener experiences a delay between Step 1 and Step 2. Execution resumes after the first call to removeMessageListener.
public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle {
    // ==============add  code start
	private static final AtomicInteger counter = new AtomicInteger(0);
	private static final AtomicBoolean firstListenerHasRemoved = new AtomicBoolean(false);
    // ==============add  code end

    // ==============add  code start
	private static final AtomicInteger counter = new AtomicInteger(0);
	private static final AtomicBoolean firstListenerHasRemoved = new AtomicBoolean(false);
   
    private void addListener(MessageListener listener, Collection<? extends Topic> topics) {
        // ....
        for (Topic topic : topics) {
            // ...
        }
        //  ============== add code start
        boolean wasListening = isListening();
        counter.incrementAndGet();
        while (counter.get() == 2 && !firstListenerHasRemoved.get()) { }
        // ============== add code end
        if (isRunning()) {
            lazyListen();
            // .....
        }
    }

Add test case to RedisMessageListenerContainerIntegrationTests

	@ParameterizedRedisTest
	void subscribeErrorAfterStart() throws Exception {

		CompositeListener listener1 = (message, pattern) -> { };
		CompositeListener listener2 = (message, pattern) -> { };

		container.start();
		// the first subscribe will succeed
		container.addMessageListener(listener1, new ChannelTopic("a"));
		CompletableFuture<Exception> subscribeExceptionFuture = new CompletableFuture<>();
		new Thread(() -> {
			try {
				container.addMessageListener(listener2, new ChannelTopic("b"));
				subscribeExceptionFuture.complete(null);
			} catch (Exception e) {
				subscribeExceptionFuture.complete(e);
			}
		}).start();
		new Thread(() -> {
			container.removeMessageListener(listener1);
		}).start();

		// the second subscribe will fail (not expected)
		subscribeExceptionFuture.get();
		CompositeListener listener3 = (message, pattern) -> { };

		// after the exception, we should still be able to subscribe to channels
		CompletableFuture<Exception> otherSubscribeExceptionFuture = new CompletableFuture<>();
		new Thread(() -> {
			try {
				container.addMessageListener(listener3, new ChannelTopic("c"));
				otherSubscribeExceptionFuture.complete(null);
			} catch (Exception e) {
				otherSubscribeExceptionFuture.complete(e);
			}
		}).start();
		assertThat(otherSubscribeExceptionFuture.get()).isNull();

		container.destroy();
	}
SCR-20241220-bbsb

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions