Skip to content

Commit d34c45b

Browse files
committed
Polishing.
Allow Jedis to exist pub/sub loop gracefully. Await unsubscribe. Reduce retry backoff to allow from broken connection recovery. Do not use pooling to avoid using broken connections. See #2256
1 parent bc07540 commit d34c45b

File tree

4 files changed

+74
-26
lines changed

4 files changed

+74
-26
lines changed

src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java

Lines changed: 70 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ private void lazyListen() {
265265
}
266266

267267
try {
268-
futureToAwait.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.SECONDS);
268+
futureToAwait.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
269269
} catch (InterruptedException e) {
270270
Thread.currentThread().interrupt();
271271
} catch (ExecutionException e) {
@@ -391,15 +391,13 @@ private boolean doUnsubscribe() {
391391
return true;
392392
}
393393

394-
try {
395-
listenFuture.join();
396-
} catch (Exception e) {
397-
// ignore, just await completion here.
398-
}
394+
awaitRegistrationTime(listenFuture);
399395

400396
if (this.state.compareAndSet(state, State.prepareUnsubscribe())) {
401397

402-
getRequiredSubscriber().cancel();
398+
getRequiredSubscriber().unsubscribeAll();
399+
400+
awaitRegistrationTime(this.unsubscribeFuture);
403401

404402
this.state.set(State.notListening());
405403

@@ -416,6 +414,16 @@ private boolean doUnsubscribe() {
416414
}
417415
}
418416

417+
private void awaitRegistrationTime(CompletableFuture<Void> future) {
418+
try {
419+
future.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
420+
} catch (InterruptedException e) {
421+
Thread.currentThread().interrupt();
422+
} catch (ExecutionException | TimeoutException e) {
423+
// ignore
424+
}
425+
}
426+
419427
@Override
420428
public boolean isRunning() {
421429
return this.started.get();
@@ -876,7 +884,8 @@ protected void handleSubscriptionException(CompletableFuture<Void> future, BackO
876884
long recoveryInterval = backOffExecution.nextBackOff();
877885

878886
if (recoveryInterval != BackOffExecution.STOP) {
879-
logger.error(String.format("Connection failure occurred: %s. Restarting subscription task after %s ms.", ex, recoveryInterval));
887+
logger.error(String.format("Connection failure occurred: %s. Restarting subscription task after %s ms.", ex,
888+
recoveryInterval), ex);
880889
}
881890

882891
return recoveryInterval;
@@ -897,7 +906,9 @@ protected void handleSubscriptionException(CompletableFuture<Void> future, BackO
897906
return;
898907
}
899908

900-
logger.error("SubscriptionTask aborted with exception:", ex);
909+
if (isRunning()) { // log only if the container is still running to prevent close errors from logging
910+
logger.error("SubscriptionTask aborted with exception:", ex);
911+
}
901912
future.completeExceptionally(ex);
902913
}
903914

@@ -1216,6 +1227,26 @@ void addSynchronization(SynchronizingMessageListener.SubscriptionSynchronizion s
12161227
this.synchronizingMessageListener.addSynchronization(synchronizer);
12171228
}
12181229

1230+
public void unsubscribeAll() {
1231+
1232+
synchronized (localMonitor) {
1233+
1234+
RedisConnection connection = this.connection;
1235+
if (connection == null) {
1236+
return;
1237+
}
1238+
1239+
doUnsubscribe(connection);
1240+
}
1241+
}
1242+
1243+
void doUnsubscribe(RedisConnection connection) {
1244+
closeSubscription(connection);
1245+
closeConnection();
1246+
1247+
unsubscribeFuture.complete(null);
1248+
}
1249+
12191250
/**
12201251
* Cancel all subscriptions and close the connection.
12211252
*/
@@ -1228,26 +1259,34 @@ public void cancel() {
12281259
return;
12291260
}
12301261

1231-
if (logger.isTraceEnabled()) {
1232-
logger.trace("Cancelling Redis subscription...");
1233-
}
1262+
doCancel(connection);
1263+
}
1264+
}
12341265

1235-
Subscription sub = connection.getSubscription();
1266+
void doCancel(RedisConnection connection) {
1267+
closeSubscription(connection);
1268+
closeConnection();
1269+
}
12361270

1237-
if (sub != null) {
1271+
void closeSubscription(RedisConnection connection) {
12381272

1239-
if (logger.isTraceEnabled()) {
1240-
logger.trace("Unsubscribing from all channels");
1241-
}
1273+
if (logger.isTraceEnabled()) {
1274+
logger.trace("Cancelling Redis subscription...");
1275+
}
12421276

1243-
try {
1244-
sub.close();
1245-
} catch (Exception e) {
1246-
logger.warn("Unable to unsubscribe from subscriptions", e);
1247-
}
1277+
Subscription sub = connection.getSubscription();
1278+
1279+
if (sub != null) {
1280+
1281+
if (logger.isTraceEnabled()) {
1282+
logger.trace("Unsubscribing from all channels");
12481283
}
12491284

1250-
closeConnection();
1285+
try {
1286+
sub.close();
1287+
} catch (Exception e) {
1288+
logger.warn("Unable to unsubscribe from subscriptions", e);
1289+
}
12511290
}
12521291
}
12531292

@@ -1324,6 +1363,7 @@ private void doWithSubscription(byte[][] data, BiConsumer<Subscription, byte[][]
13241363
}
13251364
}
13261365
}
1366+
13271367
}
13281368

13291369
/**
@@ -1341,6 +1381,11 @@ class BlockingSubscriber extends Subscriber {
13411381
this.executor = executor;
13421382
}
13431383

1384+
@Override
1385+
void doUnsubscribe(RedisConnection connection) {
1386+
closeSubscription(connection); // connection will be closed after exiting the doSubscribe method
1387+
}
1388+
13441389
@Override
13451390
protected void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution,
13461391
CompletableFuture<Void> subscriptionDone, Collection<byte[]> patterns, Collection<byte[]> channels) {
@@ -1369,6 +1414,8 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
13691414

13701415
try {
13711416
doSubscribe(connection, patterns, initiallySubscribeToChannels);
1417+
closeConnection();
1418+
unsubscribeFuture.complete(null);
13721419
} catch (Throwable t) {
13731420
handleSubscriptionException(subscriptionDone, backOffExecution, t);
13741421
}

src/test/java/org/springframework/data/redis/connection/jedis/extension/JedisConnectionFactoryExtension.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class JedisConnectionFactoryExtension implements ParameterResolver {
5656
.create(JedisConnectionFactoryExtension.class);
5757

5858
private static final JedisClientConfiguration CLIENT_CONFIGURATION = JedisClientConfiguration.builder()
59-
.clientName("jedis-client").usePooling().build();
59+
.clientName("jedis-client").build();
6060

6161
private static final NewableLazy<JedisConnectionFactory> STANDALONE = NewableLazy.of(() -> {
6262

src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerIntegrationTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public RedisMessageListenerContainerIntegrationTests(RedisConnectionFactory conn
6363
void setUp() {
6464

6565
container = new RedisMessageListenerContainer();
66+
container.setRecoveryInterval(100);
6667
container.setConnectionFactory(connectionFactory);
6768
container.setBeanName("container");
6869
container.afterPropertiesSet();

src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ void containerShouldStopGracefullyOnUnsubscribeErrors() {
8181
doAnswer(it -> {
8282

8383
Runnable r = it.getArgument(0);
84-
new Thread(r).start();
84+
r.run();
8585
return null;
8686
}).when(executorMock).execute(any());
8787

@@ -103,7 +103,7 @@ void containerShouldStopGracefullyOnUnsubscribeErrors() {
103103
container.stop();
104104

105105
assertThat(container.isRunning()).isFalse();
106-
verify(subscriptionMock).close();
106+
verify(connectionMock).close();
107107
}
108108

109109
@Test // GH-964

0 commit comments

Comments
 (0)