Skip to content

Commit 51c4951

Browse files
committed
Fix RedisLockRegLeaderInitTests race condition
https://build.spring.io/browse/INT-FATS5IC-501/ When we try to wait for the `Latch` in the interruptable code flow, it is a fact that we step away from the waiting and end up with the race condition downstream. * Wrap `Latch` in the interruptable `publishOnRevoked()` code to the `Executor.execute()` * Remove `deleteTimeoutMillis` option from the `RedisLockRegistry` since it doesn't make sense in the interruptable code. * Add `RedisLockRegistry.setExecutor()` to allow to inject an external `Executor` * Add more debug logging into the `LockRegistryLeaderInitiator` **Cherry-pick to 5.0.x**
1 parent 3c1b547 commit 51c4951

File tree

3 files changed

+67
-39
lines changed

3 files changed

+67
-39
lines changed

spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ public void stop() {
315315
this.future.cancel(true);
316316
}
317317
this.future = null;
318-
logger.debug("Stopped LeaderInitiator");
318+
logger.debug("Stopped LeaderInitiator for " + getContext());
319319
}
320320
}
321321
}
@@ -347,6 +347,11 @@ public Void call() throws Exception {
347347
try {
348348
while (isRunning()) {
349349
try {
350+
351+
if (logger.isDebugEnabled()) {
352+
logger.debug("Acquiring the lock for " + this.context);
353+
}
354+
350355
// We always try to acquire the lock, in case it expired
351356
boolean acquired = this.lock.tryLock(LockRegistryLeaderInitiator.this.heartBeatMillis,
352357
TimeUnit.MILLISECONDS);
@@ -386,8 +391,8 @@ else if (acquired) {
386391
this.lock.unlock();
387392
}
388393
catch (Exception e1) {
389-
logger.debug("Could not unlock - treat as broken: " + this.context +
390-
". Revoking " + (isRunning() ? " and retrying..." : "..."), e);
394+
logger.debug("Could not unlock - treat as broken " + this.context +
395+
". Revoking " + (isRunning() ? " and retrying..." : "..."), e1);
391396

392397
}
393398
// The lock was broken and we are no longer leader
@@ -401,14 +406,15 @@ else if (acquired) {
401406
if (e instanceof InterruptedException || Thread.currentThread().isInterrupted()) {
402407
Thread.currentThread().interrupt();
403408
if (isRunning()) {
404-
logger.warn("Restarting LeaderSelector because of error.", e);
409+
logger.warn("Restarting LeaderSelector for " + this.context + " because of error.", e);
405410
LockRegistryLeaderInitiator.this.future =
406411
LockRegistryLeaderInitiator.this.executorService.submit(this);
407412
}
408413
return null;
409414
}
410415
else if (logger.isDebugEnabled()) {
411-
logger.debug("Error acquiring the lock. " + (isRunning() ? "Retrying..." : ""), e);
416+
logger.debug("Error acquiring the lock for " + this.context +
417+
". " + (isRunning() ? "Retrying..." : ""), e);
412418
}
413419
}
414420
}
@@ -420,7 +426,8 @@ else if (logger.isDebugEnabled()) {
420426
this.lock.unlock();
421427
}
422428
catch (Exception e) {
423-
logger.debug("Could not unlock during stop - treat as broken. Revoking...", e);
429+
logger.debug("Could not unlock during stop for " + this.context
430+
+ " - treat as broken. Revoking...", e);
424431
}
425432
// We are stopping, therefore not leading any more
426433
handleRevoked();
@@ -492,6 +499,9 @@ public boolean isLeader() {
492499

493500
@Override
494501
public void yield() {
502+
if (logger.isDebugEnabled()) {
503+
logger.debug("Yielding leadership from " + this);
504+
}
495505
if (LockRegistryLeaderInitiator.this.future != null) {
496506
LockRegistryLeaderInitiator.this.future.cancel(true);
497507
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424
import java.util.UUID;
2525
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.Executor;
2627
import java.util.concurrent.ExecutorService;
2728
import java.util.concurrent.Executors;
2829
import java.util.concurrent.TimeUnit;
@@ -79,8 +80,6 @@ public final class RedisLockRegistry implements ExpirableLockRegistry, Disposabl
7980

8081
private static final long DEFAULT_EXPIRE_AFTER = 60000L;
8182

82-
public static final long DEFAULT_DELETE_TIMEOUT = 10000L;
83-
8483
private static final String OBTAIN_LOCK_SCRIPT =
8584
"local lockClientId = redis.call('GET', KEYS[1])\n" +
8685
"if lockClientId == ARGV[1] then\n" +
@@ -97,21 +96,14 @@ public final class RedisLockRegistry implements ExpirableLockRegistry, Disposabl
9796
* An {@link ExecutorService} to call {@link StringRedisTemplate#delete(Object)} in
9897
* the separate thread when the current one is interrupted.
9998
*/
100-
private ExecutorService executorService =
99+
private Executor executor =
101100
Executors.newCachedThreadPool(new CustomizableThreadFactory("redis-lock-registry-"));
102101

103102
/**
104103
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
105104
* thus should not be shutdown when {@link #destroy()} is called
106105
*/
107-
private boolean executorServiceExplicitlySet;
108-
109-
/**
110-
* Time in milliseconds to wait for the {@link StringRedisTemplate#delete(Object)} in
111-
* the background thread.
112-
*/
113-
private long deleteTimeoutMillis = DEFAULT_DELETE_TIMEOUT;
114-
106+
private boolean executorExplicitlySet;
115107

116108
private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
117109

@@ -149,6 +141,17 @@ public RedisLockRegistry(RedisConnectionFactory connectionFactory, String regist
149141
this.expireAfter = expireAfter;
150142
}
151143

144+
/**
145+
* Set the {@link Executor}, where is not provided then a default of
146+
* cached thread pool Executor will be used.
147+
* @param executor the executor service
148+
* @since 5.0.5
149+
*/
150+
public void setExecutor(Executor executor) {
151+
this.executor = executor;
152+
this.executorExplicitlySet = true;
153+
}
154+
152155
@Override
153156
public Lock obtain(Object lockKey) {
154157
Assert.isInstanceOf(String.class, lockKey);
@@ -171,8 +174,8 @@ public void expireUnusedOlderThan(long age) {
171174

172175
@Override
173176
public void destroy() {
174-
if (!this.executorServiceExplicitlySet) {
175-
this.executorService.shutdown();
177+
if (!this.executorExplicitlySet) {
178+
((ExecutorService) this.executor).shutdown();
176179
}
177180
}
178181

@@ -298,11 +301,9 @@ public void unlock() {
298301
return;
299302
}
300303
try {
301-
302304
if (Thread.currentThread().isInterrupted()) {
303-
RedisLockRegistry.this.executorService.submit(() ->
304-
RedisLockRegistry.this.redisTemplate.delete(this.lockKey))
305-
.get(RedisLockRegistry.this.deleteTimeoutMillis, TimeUnit.MILLISECONDS);
305+
RedisLockRegistry.this.executor.execute(() ->
306+
RedisLockRegistry.this.redisTemplate.delete(this.lockKey));
306307
}
307308
else {
308309
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);

spring-integration-redis/src/test/java/org/springframework/integration/redis/leader/RedisLockRegistryLeaderInitiatorTests.java

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@
2323
import java.util.ArrayList;
2424
import java.util.List;
2525
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.Executor;
27+
import java.util.concurrent.Executors;
2628
import java.util.concurrent.TimeUnit;
2729

30+
import org.apache.commons.logging.Log;
31+
import org.apache.commons.logging.LogFactory;
2832
import org.junit.Rule;
2933
import org.junit.Test;
3034

@@ -36,6 +40,8 @@
3640
import org.springframework.integration.redis.util.RedisLockRegistry;
3741
import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;
3842
import org.springframework.integration.test.rule.Log4j2LevelAdjuster;
43+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
44+
import org.springframework.util.ReflectionUtils;
3945

4046
/**
4147
* @author Artem Bilan
@@ -46,10 +52,12 @@
4652
*/
4753
public class RedisLockRegistryLeaderInitiatorTests extends RedisAvailableTests {
4854

55+
private static final Log logger = LogFactory.getLog(RedisLockRegistryLeaderInitiatorTests.class);
56+
4957
@Rule
5058
public Log4j2LevelAdjuster adjuster =
5159
Log4j2LevelAdjuster.trace()
52-
.categories(true, "org.springframework.data.redis");
60+
.categories(true, "org.springframework.integration.redis.leader");
5361

5462
@Test
5563
@RedisAvailable
@@ -62,6 +70,8 @@ public void testDistributedLeaderElection() throws Exception {
6270
for (int i = 0; i < 2; i++) {
6371
LockRegistryLeaderInitiator initiator =
6472
new LockRegistryLeaderInitiator(registry, new DefaultCandidate("foo:" + i, "bar"));
73+
initiator.setExecutorService(
74+
Executors.newSingleThreadExecutor(new CustomizableThreadFactory("lock-leadership-" + i + "-")));
6575
initiator.setLeaderEventPublisher(countingPublisher);
6676
initiators.add(initiator);
6777
}
@@ -95,18 +105,23 @@ public void testDistributedLeaderElection() throws Exception {
95105
CountDownLatch acquireLockFailed1 = new CountDownLatch(1);
96106
CountDownLatch acquireLockFailed2 = new CountDownLatch(1);
97107

108+
Executor latchesExecutor = Executors.newCachedThreadPool();
109+
98110
initiator1.setLeaderEventPublisher(new CountingPublisher(granted1, revoked1, acquireLockFailed1) {
99111

100112
@Override
101113
public void publishOnRevoked(Object source, Context context, String role) {
102-
try {
114+
latchesExecutor.execute(() -> {
103115
// It's difficult to see round-robin election, so block one initiator until the second is elected.
104-
assertThat(granted2.await(10, TimeUnit.SECONDS), is(true));
105-
}
106-
catch (InterruptedException e) {
107-
// No op
108-
}
109-
super.publishOnRevoked(source, context, role);
116+
try {
117+
assertThat(granted2.await(10, TimeUnit.SECONDS), is(true));
118+
}
119+
catch (InterruptedException e) {
120+
ReflectionUtils.rethrowRuntimeException(e);
121+
}
122+
super.publishOnRevoked(source, context, role);
123+
});
124+
110125
}
111126

112127
});
@@ -115,14 +130,16 @@ public void publishOnRevoked(Object source, Context context, String role) {
115130

116131
@Override
117132
public void publishOnRevoked(Object source, Context context, String role) {
118-
try {
119-
// It's difficult to see round-robin election, so block one initiator until the second is elected.
120-
assertThat(granted1.await(10, TimeUnit.SECONDS), is(true));
121-
}
122-
catch (InterruptedException e) {
123-
// No op
124-
}
125-
super.publishOnRevoked(source, context, role);
133+
latchesExecutor.execute(() -> {
134+
try {
135+
// It's difficult to see round-robin election, so block one initiator until the second is elected.
136+
assertThat(granted1.await(10, TimeUnit.SECONDS), is(true));
137+
}
138+
catch (InterruptedException e) {
139+
ReflectionUtils.rethrowRuntimeException(e);
140+
}
141+
super.publishOnRevoked(source, context, role);
142+
});
126143
}
127144

128145
});

0 commit comments

Comments
 (0)