Skip to content

Commit 935da81

Browse files
committed
INT-4396: Add retrying lock in case of exception
JIRA: https://jira.spring.io/browse/INT-4396 When target distributed `Lock` implementation throws an exception, e.g. in case of no connection to the service, the `LockRegistryLeaderInitiator` exists the loop and can come back to the elections only after restart * Catch all the exception on `this.lock.tryLock()` and resubmit `LeaderSelector` for a new locking cycle if `LockRegistryLeaderInitiator.isRunning()` and `InterruptedException` * Remove diagnostics from the `JdbcLockRegistryLeaderInitiatorTests` since this fix confirms that we just didn't have a reconnect logic before when this test failed sporadically **Cherry-pick to 4.3.x** # Conflicts: # spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java # spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java
1 parent 4f0f3d1 commit 935da81

File tree

3 files changed

+56
-21
lines changed

3 files changed

+56
-21
lines changed

spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -312,7 +312,7 @@ protected class LeaderSelector implements Callable<Void> {
312312
@Override
313313
public Void call() throws Exception {
314314
try {
315-
while (LockRegistryLeaderInitiator.this.running) {
315+
while (isRunning()) {
316316
try {
317317
// We always try to acquire the lock, in case it expired
318318
// TODO obtain(this.lockKey) because of INT-4248. Should be fixed in 5.0
@@ -330,27 +330,41 @@ else if (acquired) {
330330
// should release it
331331
LockRegistryLeaderInitiator.this.locks.obtain(this.lockKey)
332332
.unlock();
333-
// Give it a chance to expire.
334-
Thread.sleep(LockRegistryLeaderInitiator.this.heartBeatMillis);
333+
if (isRunning()) {
334+
// Give it a chance to expire.
335+
Thread.sleep(LockRegistryLeaderInitiator.this.heartBeatMillis);
336+
}
335337
}
336338
else {
337339
this.locked = false;
338340
// We were not able to acquire it, therefore not leading any more
339341
handleRevoked();
340-
// Try again quickly in case the lock holder dropped it
341-
Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis);
342+
if (isRunning()) {
343+
// Try again quickly in case the lock holder dropped it
344+
Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis);
345+
}
342346
}
343347
}
344-
catch (InterruptedException e) {
348+
catch (Exception e) {
345349
if (this.locked) {
346350
LockRegistryLeaderInitiator.this.locks.obtain(this.lockKey)
347351
.unlock();
348352
this.locked = false;
349353
// The lock was broken and we are no longer leader
350354
handleRevoked();
351-
// Give it a chance to elect some other leader.
352-
Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis);
355+
if (isRunning()) {
356+
// Give it a chance to elect some other leader.
357+
Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis);
358+
}
359+
}
360+
361+
if (e instanceof InterruptedException) {
353362
Thread.currentThread().interrupt();
363+
if (isRunning()) {
364+
logger.warn("Restarting LeaderSelector because of error.", e);
365+
LockRegistryLeaderInitiator.this.future =
366+
LockRegistryLeaderInitiator.this.executorService.submit(this);
367+
}
354368
return null;
355369
}
356370
}
@@ -405,11 +419,6 @@ public boolean isLeader() {
405419
public void yield() {
406420
if (LockRegistryLeaderInitiator.this.future != null) {
407421
LockRegistryLeaderInitiator.this.future.cancel(true);
408-
if (isRunning()) {
409-
LockRegistryLeaderInitiator.this.future =
410-
LockRegistryLeaderInitiator.this.executorService
411-
.submit(LockRegistryLeaderInitiator.this.leaderSelector);
412-
}
413422
}
414423
}
415424

spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2017 the original author or authors.
2+
* Copyright 2012-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -214,6 +214,38 @@ public void testGracefulLeaderSelectorExit() throws Exception {
214214
assertNull(throwable);
215215
}
216216

217+
@Test
218+
public void testExceptionFromLock() throws Exception {
219+
Lock mockLock = mock(Lock.class);
220+
221+
AtomicBoolean exceptionThrown = new AtomicBoolean();
222+
223+
willAnswer(invocation -> {
224+
if (!exceptionThrown.getAndSet(true)) {
225+
throw new RuntimeException("lock is broken");
226+
}
227+
else {
228+
return true;
229+
}
230+
}).given(mockLock).tryLock(anyLong(), any(TimeUnit.class));
231+
232+
LockRegistry registry = lockKey -> mockLock;
233+
234+
CountDownLatch onGranted = new CountDownLatch(1);
235+
236+
LockRegistryLeaderInitiator initiator = new LockRegistryLeaderInitiator(registry);
237+
238+
initiator.setLeaderEventPublisher(new CountingPublisher(onGranted));
239+
240+
initiator.start();
241+
242+
assertTrue(onGranted.await(10, TimeUnit.SECONDS));
243+
assertTrue(initiator.getContext().isLeader());
244+
assertTrue(exceptionThrown.get());
245+
246+
initiator.stop();
247+
}
248+
217249
private static class CountingPublisher implements LeaderEventPublisher {
218250

219251
private final CountDownLatch granted;

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/leader/JdbcLockRegistryLeaderInitiatorTests.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.log4j.Level;
2929
import org.junit.AfterClass;
3030
import org.junit.BeforeClass;
31-
import org.junit.Rule;
3231
import org.junit.Test;
3332

3433
import org.springframework.integration.jdbc.lock.DefaultLockRepository;
@@ -37,7 +36,6 @@
3736
import org.springframework.integration.leader.DefaultCandidate;
3837
import org.springframework.integration.leader.event.LeaderEventPublisher;
3938
import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;
40-
import org.springframework.integration.test.rule.Log4jLevelAdjuster;
4139
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
4240
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
4341
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@@ -52,10 +50,6 @@ public class JdbcLockRegistryLeaderInitiatorTests {
5250

5351
public static EmbeddedDatabase dataSource;
5452

55-
@Rule
56-
public Log4jLevelAdjuster adjuster = new Log4jLevelAdjuster(Level.DEBUG, "org.springframework.integration",
57-
"org.springframework.integration.jdbc", "org.springframework.jdbc", "org.apache.derby");
58-
5953
@BeforeClass
6054
public static void init() {
6155
dataSource = new EmbeddedDatabaseBuilder()

0 commit comments

Comments
 (0)