Skip to content

Commit 5b75ab2

Browse files
committed
Improve thread safety of RedisMessageListenerContainer
RedisMessageListenerContainer relies on 2 threads for subscription when patterns and channels topics are present. With Jedis, since the subscription thread blocks while listening for messages, an additional thread is used to subscribe to patterns while the subscription threads subscribe to channels and block. There were some race conditions between those two threads that could corrupt the Jedis stream since operations are not synchronized in JedisSubscription. A lock on the JedisSubscription instance has been added to enforce that operations on the Jedis stream cannot be affected by a concurrent thread. Additionaly, there were no error handling and retry mechanism on the pattern subscription thread. Multiple conditions could trigger an unexpected behavior here, exceptions were not handled and logged to stderr with no notice. Also, if the connection was not subscribed after 3 tries, the thread would exit silently with no log. Defensive measure have been added to retry redis connection failures and the subscription will now retry indefinitely, unless canceled on shutdown and on the main subscription thread errors. Fixes spring-projects#964 for versions before spring-projects#2256 was introduced.
1 parent 0538bd5 commit 5b75ab2

File tree

4 files changed

+321
-33
lines changed

4 files changed

+321
-33
lines changed

src/main/java/org/springframework/data/redis/connection/jedis/JedisSubscription.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@ class JedisSubscription extends AbstractSubscription {
4242
*/
4343
@Override
4444
protected void doClose() {
45-
if (!getChannels().isEmpty()) {
46-
jedisPubSub.unsubscribe();
47-
}
48-
if (!getPatterns().isEmpty()) {
49-
jedisPubSub.punsubscribe();
45+
synchronized (this) {
46+
if (!getChannels().isEmpty()) {
47+
jedisPubSub.unsubscribe();
48+
}
49+
if (!getPatterns().isEmpty()) {
50+
jedisPubSub.punsubscribe();
51+
}
5052
}
5153
}
5254

@@ -56,7 +58,9 @@ protected void doClose() {
5658
*/
5759
@Override
5860
protected void doPsubscribe(byte[]... patterns) {
59-
jedisPubSub.psubscribe(patterns);
61+
synchronized (this) {
62+
jedisPubSub.psubscribe(patterns);
63+
}
6064
}
6165

6266
/*
@@ -65,10 +69,12 @@ protected void doPsubscribe(byte[]... patterns) {
6569
*/
6670
@Override
6771
protected void doPUnsubscribe(boolean all, byte[]... patterns) {
68-
if (all) {
69-
jedisPubSub.punsubscribe();
70-
} else {
71-
jedisPubSub.punsubscribe(patterns);
72+
synchronized (this) {
73+
if (all) {
74+
jedisPubSub.punsubscribe();
75+
} else {
76+
jedisPubSub.punsubscribe(patterns);
77+
}
7278
}
7379
}
7480

@@ -78,7 +84,9 @@ protected void doPUnsubscribe(boolean all, byte[]... patterns) {
7884
*/
7985
@Override
8086
protected void doSubscribe(byte[]... channels) {
81-
jedisPubSub.subscribe(channels);
87+
synchronized (this) {
88+
jedisPubSub.subscribe(channels);
89+
}
8290
}
8391

8492
/*
@@ -87,10 +95,12 @@ protected void doSubscribe(byte[]... channels) {
8795
*/
8896
@Override
8997
protected void doUnsubscribe(boolean all, byte[]... channels) {
90-
if (all) {
91-
jedisPubSub.unsubscribe();
92-
} else {
93-
jedisPubSub.unsubscribe(channels);
98+
synchronized (this) {
99+
if (all) {
100+
jedisPubSub.unsubscribe();
101+
} else {
102+
jedisPubSub.unsubscribe(channels);
103+
}
94104
}
95105
}
96106
}

src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CopyOnWriteArraySet;
2626
import java.util.concurrent.Executor;
2727
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2829

2930
import org.apache.commons.logging.Log;
3031
import org.apache.commons.logging.LogFactory;
@@ -678,7 +679,6 @@ protected void sleepBeforeRecoveryAttempt() {
678679
* @author Costin Leau
679680
*/
680681
private class SubscriptionTask implements SchedulingAwareRunnable {
681-
682682
/**
683683
* Runnable used, on a parallel thread, to do the initial pSubscribe. This is required since, during initialization,
684684
* both subscribe and pSubscribe might be needed but since the first call is blocking, the second call needs to
@@ -689,36 +689,55 @@ private class SubscriptionTask implements SchedulingAwareRunnable {
689689
private class PatternSubscriptionTask implements SchedulingAwareRunnable {
690690

691691
private long WAIT = 500;
692-
private long ROUNDS = 3;
692+
private AtomicBoolean isThreadRunning = new AtomicBoolean(true);
693+
693694

694695
public boolean isLongLived() {
695696
return false;
696697
}
697698

699+
void cancel() {
700+
isThreadRunning.set(false);
701+
}
702+
698703
public void run() {
699704
// wait for subscription to be initialized
700705
boolean done = false;
701706
// wait 3 rounds for subscription to be initialized
702-
for (int i = 0; i < ROUNDS && !done; i++) {
707+
while(!done && isThreadRunning.get() && !Thread.currentThread().isInterrupted()) {
703708
if (connection != null) {
704-
synchronized (localMonitor) {
705-
if (connection.isSubscribed()) {
706-
done = true;
707-
connection.getSubscription().pSubscribe(unwrap(patternMapping.keySet()));
708-
} else {
709-
try {
710-
Thread.sleep(WAIT);
711-
} catch (InterruptedException ex) {
712-
Thread.currentThread().interrupt();
709+
try {
710+
if (connection.isSubscribed()) {
711+
synchronized (localMonitor) {
712+
connection.getSubscription().pSubscribe(unwrap(patternMapping.keySet()));
713+
done = true;
714+
}
715+
} else {
716+
try {
717+
Thread.sleep(WAIT);
718+
} catch (InterruptedException ex) {
719+
logger.info("PatternSubscriptionTask was interrupted, exiting.");
720+
Thread.currentThread().interrupt();
721+
return;
722+
}
723+
}
724+
} catch(Throwable e) {
725+
if (e instanceof RedisConnectionFailureException) {
726+
if (isRunning() && isThreadRunning.get()) {
727+
logger.error("Connection failure occurred on pattern subscription task. Restarting subscription task after " + recoveryInterval + " ms");
728+
sleepBeforeRecoveryAttempt();
729+
}
730+
} else {
731+
logger.error("PatternSubscriptionTask aborted with exception:", e);
713732
return;
714733
}
715734
}
716-
}
717735
}
718736
}
719737
}
720738
}
721739

740+
private volatile @Nullable PatternSubscriptionTask patternSubscriptionTask;
722741
private volatile @Nullable RedisConnection connection;
723742
private boolean subscriptionTaskRunning = false;
724743
private final Object localMonitor = new Object();
@@ -759,6 +778,7 @@ public void run() {
759778
}
760779
}
761780
} catch (Throwable t) {
781+
cancelPatternSubscriptionTask();
762782
handleSubscriptionException(t);
763783
} finally {
764784
// this block is executed once the subscription thread has ended, this may or may not mean
@@ -789,7 +809,8 @@ private SubscriptionPresentCondition eventuallyPerformSubscription() {
789809
condition = new SubscriptionPresentCondition();
790810
} else {
791811
// schedule the rest of the subscription
792-
subscriptionExecutor.execute(new PatternSubscriptionTask());
812+
patternSubscriptionTask = new PatternSubscriptionTask();
813+
subscriptionExecutor.execute(patternSubscriptionTask);
793814
condition = new PatternSubscriptionPresentCondition();
794815
}
795816

@@ -815,7 +836,7 @@ public boolean passes() {
815836
* Checks whether the current connection has an associated pattern subscription.
816837
*
817838
* @author Thomas Darimont
818-
* @see org.springframework.data.redis.listener.RedisMessageListenerContainer.SubscriptionTask.SubscriptionPresentTestCondition
839+
* @see org.springframework.data.redis.listener.RedisMessageListenerContainer.SubscriptionTask.SubscriptionPresentCondition
819840
*/
820841
private class PatternSubscriptionPresentCondition extends SubscriptionPresentCondition {
821842

@@ -840,7 +861,17 @@ private byte[][] unwrap(Collection<ByteArrayWrapper> holders) {
840861
return unwrapped;
841862
}
842863

864+
private void cancelPatternSubscriptionTask() {
865+
if(patternSubscriptionTask != null) {
866+
synchronized (localMonitor) {
867+
patternSubscriptionTask.cancel();
868+
patternSubscriptionTask = null;
869+
}
870+
}
871+
}
872+
843873
void cancel() {
874+
cancelPatternSubscriptionTask();
844875

845876
if (!listening || connection == null) {
846877
return;

src/test/java/org/springframework/data/redis/listener/PubSubTests.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.core.task.SyncTaskExecutor;
3636
import org.springframework.data.redis.ObjectFactory;
3737
import org.springframework.data.redis.connection.ConnectionUtils;
38+
import org.springframework.data.redis.connection.MessageListener;
3839
import org.springframework.data.redis.connection.RedisConnectionFactory;
3940
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
4041
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
@@ -55,12 +56,14 @@
5556
public class PubSubTests<T> {
5657

5758
private static final String CHANNEL = "pubsub::test";
59+
private static final String PATTERN = "pattern::pubsub::test";
5860

5961
protected RedisMessageListenerContainer container;
6062
protected ObjectFactory<T> factory;
6163
@SuppressWarnings("rawtypes") protected RedisTemplate template;
6264

6365
private final BlockingDeque<Object> bag = new LinkedBlockingDeque<>(99);
66+
private final BlockingDeque<Object> patternBag = new LinkedBlockingDeque<>(99);
6467

6568
private final Object handler = new Object() {
6669
@SuppressWarnings("unused")
@@ -69,7 +72,15 @@ public void handleMessage(Object message) {
6972
}
7073
};
7174

75+
private final Object patternHandler = new Object() {
76+
@SuppressWarnings("unused")
77+
public void handleMessage(Object message) {
78+
patternBag.add(message);
79+
}
80+
};
81+
7282
private final MessageListenerAdapter adapter = new MessageListenerAdapter(handler);
83+
private final MessageListenerAdapter patternAdapter = new MessageListenerAdapter(patternHandler);
7384

7485
@SuppressWarnings("rawtypes")
7586
public PubSubTests(ObjectFactory<T> factory, RedisTemplate template) {
@@ -84,10 +95,14 @@ public static Collection<Object[]> testParams() {
8495
@BeforeEach
8596
void setUp() throws Exception {
8697
bag.clear();
98+
patternBag.clear();
8799

88100
adapter.setSerializer(template.getValueSerializer());
89101
adapter.afterPropertiesSet();
90102

103+
patternAdapter.setSerializer(template.getValueSerializer());
104+
patternAdapter.afterPropertiesSet();
105+
91106
Phaser phaser = new Phaser(1);
92107

93108
container = new RedisMessageListenerContainer();
@@ -198,6 +213,34 @@ void testStartListenersToNoSpecificChannelTest() throws InterruptedException {
198213
assertThat(set).contains(payload);
199214
}
200215

216+
@SuppressWarnings("unchecked")
217+
@ParameterizedRedisTest
218+
void testStartListenersToBothChannelsAndPatternTopics() throws InterruptedException {
219+
220+
assumeThat(isClusterAware(template.getConnectionFactory())).isFalse();
221+
assumeThat(ConnectionUtils.isJedis(template.getConnectionFactory())).isTrue();
222+
223+
PubSubAwaitUtil.runAndAwaitPatternSubscription(template.getRequiredConnectionFactory(), () -> {
224+
225+
container.addMessageListener(patternAdapter, Collections.singletonList(new PatternTopic(PATTERN + "*")));
226+
container.start();
227+
});
228+
229+
T payload = getT();
230+
231+
template.convertAndSend(PATTERN, payload);
232+
template.convertAndSend(CHANNEL, payload);
233+
234+
Set<T> patternSet = new LinkedHashSet<>();
235+
patternSet.add((T) patternBag.poll(3, TimeUnit.SECONDS));
236+
237+
Set<T> channelSet = new LinkedHashSet<>();
238+
channelSet.add((T) bag.poll(3, TimeUnit.SECONDS));
239+
240+
assertThat(channelSet).contains(payload);
241+
assertThat(patternSet).contains(payload);
242+
}
243+
201244
private static boolean isClusterAware(RedisConnectionFactory connectionFactory) {
202245

203246
if (connectionFactory instanceof LettuceConnectionFactory) {

0 commit comments

Comments
 (0)