Skip to content

Commit 9409d33

Browse files
artembilangaryrussell
authored andcommitted
Use Lettuce Redis client; Redis module fixes (#2435)
* Use Lettuce Redis client; Redis module fixes The Lettuce client is based on the Netty and more stable, than Jedis therefore we get a performance improvement for tests (it saves us at about 30 seconds). Also this client doesn't fail for me on Windows sporadically (very often) with the `ConnectionClosedException` * After switching to the Netty-based client, we expose the interrupted Thread issue in the `LockRegistryLeaderInitiator`. If we interrupted (expected behavior), we try to unlock calling `RedisLockRegistry`, but Netty client reject our request because the thread is interrupted, therefore we never delete the lock when we yield our leadership. Fix the issue with shifting a `RedisTemplate.delete()` operation to the `ExecutorService` when the current thread is interrupted * Allow to configure such an `ExecutorService` and timeout to wait for the `submit()` result * Refactor `RedisAvailableRule` and all the Redis tests do not expose the target `RedisConnectionFactory` implementation. * Make all the test-cases based on the `connectionFactory` created by the `RedisAvailableTests.setupConnectionFactory()` * Tweak some unnecessary timeouts and sleeps for better tests task throughput * Add a `Log4j2LevelAdjuster` into the `RedisLockRegistryLeaderInitiatorTests`
1 parent 87757e1 commit 9409d33

File tree

36 files changed

+392
-319
lines changed

36 files changed

+392
-319
lines changed

build.gradle

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,7 @@ subprojects { subproject ->
121121
h2Version = '1.4.197'
122122
jackson2Version = '2.9.4'
123123
javaxActivationVersion = '1.1.1'
124-
javaxMailVersion = '1.6.0'
125-
jedisVersion = '2.9.0'
124+
javaxMailVersion = '1.6.1'
126125
jmsApiVersion = '2.0.1'
127126
jpa21ApiVersion = '1.0.0.Final'
128127
jpaApiVersion = '2.1.1'
@@ -135,8 +134,9 @@ subprojects { subproject ->
135134
junitVintageVersion = '5.1.0'
136135
jythonVersion = '2.5.3'
137136
kryoShadedVersion = '3.0.3'
137+
lettuceVersion = '5.0.3.RELEASE'
138138
log4jVersion = '2.10.0'
139-
micrometerVersion = '1.0.2'
139+
micrometerVersion = '1.0.3'
140140
mockitoVersion = '2.11.0'
141141
mysqlVersion = '6.0.6'
142142
pahoMqttClientVersion = '1.2.0'
@@ -385,7 +385,7 @@ project('spring-integration-file') {
385385
testCompile project(":spring-integration-gemfire")
386386
testCompile project(":spring-integration-jdbc")
387387
testCompile "com.h2database:h2:$h2Version"
388-
testCompile "redis.clients:jedis:$jedisVersion"
388+
testCompile "io.lettuce:lettuce-core:$lettuceVersion"
389389
testCompile "io.projectreactor:reactor-test:$reactorVersion"
390390
}
391391
}
@@ -594,7 +594,7 @@ project('spring-integration-redis') {
594594
exclude group: 'org.springframework', module: 'spring-aop'
595595
exclude group: 'org.springframework', module: 'spring-oxm'
596596
}
597-
testCompile "redis.clients:jedis:$jedisVersion"
597+
testCompile "io.lettuce:lettuce-core:$lettuceVersion"
598598
}
599599
}
600600

@@ -704,7 +704,7 @@ project('spring-integration-twitter') {
704704
compile("javax.activation:activation:$javaxActivationVersion", optional)
705705
testCompile project(":spring-integration-redis")
706706
testCompile project(":spring-integration-redis").sourceSets.test.output
707-
testCompile "redis.clients:jedis:$jedisVersion"
707+
testCompile "io.lettuce:lettuce-core:$lettuceVersion"
708708
}
709709
}
710710

spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,8 +386,8 @@ else if (acquired) {
386386
this.lock.unlock();
387387
}
388388
catch (Exception e1) {
389-
logger.debug("Could not unlock - treat as broken. " +
390-
"Revoking " + (isRunning() ? " and retrying..." : "..."), e);
389+
logger.debug("Could not unlock - treat as broken: " + this.context +
390+
". Revoking " + (isRunning() ? " and retrying..." : "..."), e);
391391

392392
}
393393
// The lock was broken and we are no longer leader

spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2017 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,8 @@
2323
import java.util.Map;
2424
import java.util.UUID;
2525
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
2628
import java.util.concurrent.TimeUnit;
2729
import java.util.concurrent.locks.Condition;
2830
import java.util.concurrent.locks.Lock;
@@ -31,14 +33,17 @@
3133
import org.apache.commons.logging.Log;
3234
import org.apache.commons.logging.LogFactory;
3335

36+
import org.springframework.beans.factory.DisposableBean;
3437
import org.springframework.dao.CannotAcquireLockException;
3538
import org.springframework.data.redis.connection.RedisConnectionFactory;
3639
import org.springframework.data.redis.core.StringRedisTemplate;
3740
import org.springframework.data.redis.core.script.DefaultRedisScript;
3841
import org.springframework.data.redis.core.script.RedisScript;
3942
import org.springframework.integration.support.locks.ExpirableLockRegistry;
4043
import org.springframework.integration.support.locks.LockRegistry;
44+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
4145
import org.springframework.util.Assert;
46+
import org.springframework.util.ReflectionUtils;
4247

4348
/**
4449
* Implementation of {@link LockRegistry} providing a distributed lock using Redis.
@@ -68,11 +73,13 @@
6873
* @since 4.0
6974
*
7075
*/
71-
public final class RedisLockRegistry implements ExpirableLockRegistry {
76+
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {
7277

7378
private static final Log logger = LogFactory.getLog(RedisLockRegistry.class);
7479

75-
private static final long DEFAULT_EXPIRE_AFTER = 60000;
80+
private static final long DEFAULT_EXPIRE_AFTER = 60000L;
81+
82+
public static final long DEFAULT_DELETE_TIMEOUT = 10000L;
7683

7784
private static final String OBTAIN_LOCK_SCRIPT =
7885
"local lockClientId = redis.call('GET', KEYS[1])\n" +
@@ -85,6 +92,27 @@ public final class RedisLockRegistry implements ExpirableLockRegistry {
8592
"end\n" +
8693
"return false";
8794

95+
96+
/**
97+
* An {@link ExecutorService} to call {@link StringRedisTemplate#delete(Object)} in
98+
* the separate thread when the current one is interrupted.
99+
*/
100+
private ExecutorService executorService =
101+
Executors.newCachedThreadPool(new CustomizableThreadFactory("redis-lock-registry-"));
102+
103+
/**
104+
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
105+
* thus should not be shutdown when {@link #destroy()} is called
106+
*/
107+
private boolean executorServiceExplicitlySet;
108+
109+
/**
110+
* Time in milliseconds to wait for the {@link StringRedisTemplate#delete(Object)} in
111+
* the background thread.
112+
*/
113+
private long deleteTimeoutMillis = DEFAULT_DELETE_TIMEOUT;
114+
115+
88116
private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
89117

90118
private final String clientId = UUID.randomUUID().toString();
@@ -141,6 +169,13 @@ public void expireUnusedOlderThan(long age) {
141169
}
142170
}
143171

172+
@Override
173+
public void destroy() {
174+
if (!this.executorServiceExplicitlySet) {
175+
this.executorService.shutdown();
176+
}
177+
}
178+
144179
private final class RedisLock implements Lock {
145180

146181
private final String lockKey;
@@ -172,11 +207,11 @@ public void lock() {
172207
break;
173208
}
174209
catch (InterruptedException e) {
175-
/*
176-
* This method must be uninterruptible so catch and ignore
177-
* interrupts and only break out of the while loop when
178-
* we get the lock.
179-
*/
210+
/*
211+
* This method must be uninterruptible so catch and ignore
212+
* interrupts and only break out of the while loop when
213+
* we get the lock.
214+
*/
180215
}
181216
catch (Exception e) {
182217
this.localLock.unlock();
@@ -263,11 +298,23 @@ public void unlock() {
263298
return;
264299
}
265300
try {
266-
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
301+
302+
if (Thread.currentThread().isInterrupted()) {
303+
RedisLockRegistry.this.executorService.submit(() ->
304+
RedisLockRegistry.this.redisTemplate.delete(this.lockKey))
305+
.get(RedisLockRegistry.this.deleteTimeoutMillis, TimeUnit.MILLISECONDS);
306+
}
307+
else {
308+
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
309+
}
310+
267311
if (logger.isDebugEnabled()) {
268312
logger.debug("Released lock; " + this);
269313
}
270314
}
315+
catch (Exception e) {
316+
ReflectionUtils.rethrowRuntimeException(e);
317+
}
271318
finally {
272319
this.localLock.unlock();
273320
}

spring-integration-redis/src/test/java/org/springframework/integration/redis/config/RedisChannelParserTests-context.xml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@
22
<beans xmlns="http://www.springframework.org/schema/beans"
33
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
44
xmlns:int-redis="http://www.springframework.org/schema/integration/redis"
5+
xmlns:util="http://www.springframework.org/schema/util"
56
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
6-
http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd">
7+
http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd
8+
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
9+
10+
<util:constant id="redisConnectionFactory"
11+
static-field="org.springframework.integration.redis.rules.RedisAvailableRule.connectionFactory"/>
712

813
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic.parser"
914
serializer="redisSerializer"/>
1015

11-
<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
12-
<property name="port" value="#{T(org.springframework.integration.redis.rules.RedisAvailableRule).REDIS_PORT}"/>
13-
</bean>
14-
1516
<bean id="redisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
1617

1718
<int-redis:publish-subscribe-channel id="redisChannelWithSubLimit" topic-name="si.test.topic"
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,70 +23,91 @@
2323
import java.util.concurrent.CountDownLatch;
2424
import java.util.concurrent.TimeUnit;
2525

26+
import org.junit.After;
27+
import org.junit.Before;
2628
import org.junit.Test;
29+
import org.junit.runner.RunWith;
2730

28-
import org.springframework.context.support.ClassPathXmlApplicationContext;
31+
import org.springframework.beans.factory.annotation.Autowired;
32+
import org.springframework.context.ApplicationContext;
2933
import org.springframework.data.redis.connection.RedisConnectionFactory;
3034
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
3135
import org.springframework.data.redis.serializer.RedisSerializer;
36+
import org.springframework.integration.redis.channel.SubscribableRedisChannel;
3237
import org.springframework.integration.redis.rules.RedisAvailable;
3338
import org.springframework.integration.redis.rules.RedisAvailableTests;
3439
import org.springframework.integration.support.utils.IntegrationUtils;
3540
import org.springframework.integration.test.util.TestUtils;
3641
import org.springframework.messaging.Message;
37-
import org.springframework.messaging.SubscribableChannel;
3842
import org.springframework.messaging.support.GenericMessage;
43+
import org.springframework.test.annotation.DirtiesContext;
44+
import org.springframework.test.context.junit4.SpringRunner;
3945

4046
/**
4147
* @author Oleg Zhurakousky
4248
* @author Gary Russell
4349
* @author Gunnar Hillert
4450
* @author Artem Bilan
4551
*/
52+
@RunWith(SpringRunner.class)
53+
@DirtiesContext
4654
public class RedisChannelParserTests extends RedisAvailableTests {
4755

56+
@Autowired
57+
private SubscribableRedisChannel redisChannel;
58+
59+
@Autowired
60+
private SubscribableRedisChannel redisChannelWithSubLimit;
61+
62+
@Autowired
63+
private ApplicationContext context;
64+
65+
@Before
66+
public void setup() {
67+
this.redisChannel.start();
68+
this.redisChannelWithSubLimit.start();
69+
}
70+
71+
@After
72+
public void tearDown() {
73+
this.redisChannel.stop();
74+
this.redisChannelWithSubLimit.stop();
75+
}
76+
4877
@Test
49-
@RedisAvailable
5078
public void testPubSubChannelConfig() {
51-
ClassPathXmlApplicationContext context =
52-
new ClassPathXmlApplicationContext("RedisChannelParserTests-context.xml", this.getClass());
53-
SubscribableChannel redisChannel = context.getBean("redisChannel", SubscribableChannel.class);
5479
RedisConnectionFactory connectionFactory =
55-
TestUtils.getPropertyValue(redisChannel, "connectionFactory", RedisConnectionFactory.class);
80+
TestUtils.getPropertyValue(this.redisChannel, "connectionFactory", RedisConnectionFactory.class);
5681
RedisSerializer<?> redisSerializer = TestUtils.getPropertyValue(redisChannel, "serializer", RedisSerializer.class);
57-
assertEquals(connectionFactory, context.getBean("redisConnectionFactory"));
58-
assertEquals(redisSerializer, context.getBean("redisSerializer"));
82+
assertEquals(connectionFactory, this.context.getBean("redisConnectionFactory"));
83+
assertEquals(redisSerializer, this.context.getBean("redisSerializer"));
5984
assertEquals("si.test.topic.parser", TestUtils.getPropertyValue(redisChannel, "topicName"));
6085
assertEquals(Integer.MAX_VALUE, TestUtils.getPropertyValue(
61-
TestUtils.getPropertyValue(redisChannel, "dispatcher"), "maxSubscribers", Integer.class).intValue());
62-
redisChannel = context.getBean("redisChannelWithSubLimit", SubscribableChannel.class);
63-
assertEquals(1, TestUtils.getPropertyValue(redisChannel, "dispatcher.maxSubscribers", Integer.class).intValue());
64-
Object mbf = context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
65-
assertSame(mbf, TestUtils.getPropertyValue(redisChannel, "messageBuilderFactory"));
66-
context.close();
86+
TestUtils.getPropertyValue(this.redisChannel, "dispatcher"), "maxSubscribers", Integer.class).intValue());
87+
88+
assertEquals(1,
89+
TestUtils.getPropertyValue(this.redisChannelWithSubLimit, "dispatcher.maxSubscribers", Integer.class).intValue());
90+
Object mbf = this.context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
91+
assertSame(mbf, TestUtils.getPropertyValue(this.redisChannelWithSubLimit, "messageBuilderFactory"));
6792
}
6893

6994
@Test
7095
@RedisAvailable
7196
public void testPubSubChannelUsage() throws Exception {
72-
ClassPathXmlApplicationContext context =
73-
new ClassPathXmlApplicationContext("RedisChannelParserTests-context.xml", this.getClass());
74-
SubscribableChannel redisChannel = context.getBean("redisChannel", SubscribableChannel.class);
75-
76-
this.awaitContainerSubscribed(TestUtils.getPropertyValue(redisChannel, "container",
97+
this.awaitContainerSubscribed(TestUtils.getPropertyValue(this.redisChannel, "container",
7798
RedisMessageListenerContainer.class));
7899

79-
final Message<?> m = new GenericMessage<String>("Hello Redis");
100+
final Message<?> m = new GenericMessage<>("Hello Redis");
80101

81102
final CountDownLatch latch = new CountDownLatch(1);
82-
redisChannel.subscribe(message -> {
103+
this.redisChannel.subscribe(message -> {
83104
assertEquals(m.getPayload(), message.getPayload());
84105
latch.countDown();
85106
});
86-
redisChannel.send(m);
87107

88-
assertTrue(latch.await(5, TimeUnit.SECONDS));
89-
context.close();
108+
this.redisChannel.send(m);
109+
110+
assertTrue(latch.await(10, TimeUnit.SECONDS));
90111
}
91112

92113
}

0 commit comments

Comments
 (0)