Skip to content

Commit b8d4e6b

Browse files
artembilangaryrussell
authored andcommitted
Use Lettuce Redis client; Redis module fixes (#2435)
* Use Lettuce Redis client; Redis module fixes The Lettuce client is based on the Netty and more stable, than Jedis therefore we get a performance improvement for tests (it saves us at about 30 seconds). Also this client doesn't fail for me on Windows sporadically (very often) with the `ConnectionClosedException` * After switching to the Netty-based client, we expose the interrupted Thread issue in the `LockRegistryLeaderInitiator`. If we interrupted (expected behavior), we try to unlock calling `RedisLockRegistry`, but Netty client reject our request because the thread is interrupted, therefore we never delete the lock when we yield our leadership. Fix the issue with shifting a `RedisTemplate.delete()` operation to the `ExecutorService` when the current thread is interrupted * Allow to configure such an `ExecutorService` and timeout to wait for the `submit()` result * Refactor `RedisAvailableRule` and all the Redis tests do not expose the target `RedisConnectionFactory` implementation. * Make all the test-cases based on the `connectionFactory` created by the `RedisAvailableTests.setupConnectionFactory()` * Tweak some unnecessary timeouts and sleeps for better tests task throughput * Add a `Log4j2LevelAdjuster` into the `RedisLockRegistryLeaderInitiatorTests`
1 parent 664d67f commit b8d4e6b

File tree

36 files changed

+390
-317
lines changed

36 files changed

+390
-317
lines changed

build.gradle

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ subprojects { subproject ->
108108
jackson2Version = '2.9.5'
109109
javaxActivationVersion = '1.1.1'
110110
javaxMailVersion = '1.6.1'
111-
jedisVersion = '2.9.0'
112111
jmsApiVersion = '2.0.1'
113112
jpa21ApiVersion = '1.0.0.Final'
114113
jpaApiVersion = '2.1.1'
@@ -120,6 +119,7 @@ subprojects { subproject ->
120119
junitPlatformVersion = '1.1.1'
121120
jythonVersion = '2.5.3'
122121
kryoShadedVersion = '3.0.3'
122+
lettuceVersion = '5.0.3.RELEASE'
123123
log4jVersion = '2.11.0'
124124
micrometerVersion = '1.0.3'
125125
mockitoVersion = '2.18.0'
@@ -364,7 +364,7 @@ project('spring-integration-file') {
364364
testCompile project(":spring-integration-gemfire")
365365
testCompile project(":spring-integration-jdbc")
366366
testCompile "com.h2database:h2:$h2Version"
367-
testCompile "redis.clients:jedis:$jedisVersion"
367+
testCompile "io.lettuce:lettuce-core:$lettuceVersion"
368368
testCompile "io.projectreactor:reactor-test:$reactorVersion"
369369
}
370370
}
@@ -542,7 +542,7 @@ project('spring-integration-redis') {
542542
compile project(":spring-integration-core")
543543
compile ("org.springframework.data:spring-data-redis:$springDataRedisVersion")
544544

545-
testCompile "redis.clients:jedis:$jedisVersion"
545+
testCompile "io.lettuce:lettuce-core:$lettuceVersion"
546546
}
547547
}
548548

@@ -642,7 +642,7 @@ project('spring-integration-twitter') {
642642
compile("javax.activation:activation:$javaxActivationVersion", optional)
643643
testCompile project(":spring-integration-redis")
644644
testCompile project(":spring-integration-redis").sourceSets.test.output
645-
testCompile "redis.clients:jedis:$jedisVersion"
645+
testCompile "io.lettuce:lettuce-core:$lettuceVersion"
646646
}
647647
}
648648

spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,8 +386,8 @@ else if (acquired) {
386386
this.lock.unlock();
387387
}
388388
catch (Exception e1) {
389-
logger.debug("Could not unlock - treat as broken. " +
390-
"Revoking " + (isRunning() ? " and retrying..." : "..."), e);
389+
logger.debug("Could not unlock - treat as broken: " + this.context +
390+
". Revoking " + (isRunning() ? " and retrying..." : "..."), e);
391391

392392
}
393393
// The lock was broken and we are no longer leader

spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2017 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,8 @@
2323
import java.util.Map;
2424
import java.util.UUID;
2525
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
2628
import java.util.concurrent.TimeUnit;
2729
import java.util.concurrent.locks.Condition;
2830
import java.util.concurrent.locks.Lock;
@@ -31,14 +33,17 @@
3133
import org.apache.commons.logging.Log;
3234
import org.apache.commons.logging.LogFactory;
3335

36+
import org.springframework.beans.factory.DisposableBean;
3437
import org.springframework.dao.CannotAcquireLockException;
3538
import org.springframework.data.redis.connection.RedisConnectionFactory;
3639
import org.springframework.data.redis.core.StringRedisTemplate;
3740
import org.springframework.data.redis.core.script.DefaultRedisScript;
3841
import org.springframework.data.redis.core.script.RedisScript;
3942
import org.springframework.integration.support.locks.ExpirableLockRegistry;
4043
import org.springframework.integration.support.locks.LockRegistry;
44+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
4145
import org.springframework.util.Assert;
46+
import org.springframework.util.ReflectionUtils;
4247

4348
/**
4449
* Implementation of {@link LockRegistry} providing a distributed lock using Redis.
@@ -68,11 +73,13 @@
6873
* @since 4.0
6974
*
7075
*/
71-
public final class RedisLockRegistry implements ExpirableLockRegistry {
76+
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {
7277

7378
private static final Log logger = LogFactory.getLog(RedisLockRegistry.class);
7479

75-
private static final long DEFAULT_EXPIRE_AFTER = 60000;
80+
private static final long DEFAULT_EXPIRE_AFTER = 60000L;
81+
82+
public static final long DEFAULT_DELETE_TIMEOUT = 10000L;
7683

7784
private static final String OBTAIN_LOCK_SCRIPT =
7885
"local lockClientId = redis.call('GET', KEYS[1])\n" +
@@ -85,6 +92,27 @@ public final class RedisLockRegistry implements ExpirableLockRegistry {
8592
"end\n" +
8693
"return false";
8794

95+
96+
/**
97+
* An {@link ExecutorService} to call {@link StringRedisTemplate#delete(Object)} in
98+
* the separate thread when the current one is interrupted.
99+
*/
100+
private ExecutorService executorService =
101+
Executors.newCachedThreadPool(new CustomizableThreadFactory("redis-lock-registry-"));
102+
103+
/**
104+
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
105+
* thus should not be shutdown when {@link #destroy()} is called
106+
*/
107+
private boolean executorServiceExplicitlySet;
108+
109+
/**
110+
* Time in milliseconds to wait for the {@link StringRedisTemplate#delete(Object)} in
111+
* the background thread.
112+
*/
113+
private long deleteTimeoutMillis = DEFAULT_DELETE_TIMEOUT;
114+
115+
88116
private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
89117

90118
private final String clientId = UUID.randomUUID().toString();
@@ -141,6 +169,13 @@ public void expireUnusedOlderThan(long age) {
141169
}
142170
}
143171

172+
@Override
173+
public void destroy() {
174+
if (!this.executorServiceExplicitlySet) {
175+
this.executorService.shutdown();
176+
}
177+
}
178+
144179
private final class RedisLock implements Lock {
145180

146181
private final String lockKey;
@@ -172,11 +207,11 @@ public void lock() {
172207
break;
173208
}
174209
catch (InterruptedException e) {
175-
/*
176-
* This method must be uninterruptible so catch and ignore
177-
* interrupts and only break out of the while loop when
178-
* we get the lock.
179-
*/
210+
/*
211+
* This method must be uninterruptible so catch and ignore
212+
* interrupts and only break out of the while loop when
213+
* we get the lock.
214+
*/
180215
}
181216
catch (Exception e) {
182217
this.localLock.unlock();
@@ -263,11 +298,23 @@ public void unlock() {
263298
return;
264299
}
265300
try {
266-
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
301+
302+
if (Thread.currentThread().isInterrupted()) {
303+
RedisLockRegistry.this.executorService.submit(() ->
304+
RedisLockRegistry.this.redisTemplate.delete(this.lockKey))
305+
.get(RedisLockRegistry.this.deleteTimeoutMillis, TimeUnit.MILLISECONDS);
306+
}
307+
else {
308+
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
309+
}
310+
267311
if (logger.isDebugEnabled()) {
268312
logger.debug("Released lock; " + this);
269313
}
270314
}
315+
catch (Exception e) {
316+
ReflectionUtils.rethrowRuntimeException(e);
317+
}
271318
finally {
272319
this.localLock.unlock();
273320
}

spring-integration-redis/src/test/java/org/springframework/integration/redis/config/RedisChannelParserTests-context.xml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@
22
<beans xmlns="http://www.springframework.org/schema/beans"
33
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
44
xmlns:int-redis="http://www.springframework.org/schema/integration/redis"
5+
xmlns:util="http://www.springframework.org/schema/util"
56
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
6-
http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd">
7+
http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd
8+
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
9+
10+
<util:constant id="redisConnectionFactory"
11+
static-field="org.springframework.integration.redis.rules.RedisAvailableRule.connectionFactory"/>
712

813
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic.parser"
914
serializer="redisSerializer"/>
1015

11-
<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
12-
<property name="port" value="#{T(org.springframework.integration.redis.rules.RedisAvailableRule).REDIS_PORT}"/>
13-
</bean>
14-
1516
<bean id="redisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
1617

1718
<int-redis:publish-subscribe-channel id="redisChannelWithSubLimit" topic-name="si.test.topic"
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,70 +23,91 @@
2323
import java.util.concurrent.CountDownLatch;
2424
import java.util.concurrent.TimeUnit;
2525

26+
import org.junit.After;
27+
import org.junit.Before;
2628
import org.junit.Test;
29+
import org.junit.runner.RunWith;
2730

28-
import org.springframework.context.support.ClassPathXmlApplicationContext;
31+
import org.springframework.beans.factory.annotation.Autowired;
32+
import org.springframework.context.ApplicationContext;
2933
import org.springframework.data.redis.connection.RedisConnectionFactory;
3034
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
3135
import org.springframework.data.redis.serializer.RedisSerializer;
36+
import org.springframework.integration.redis.channel.SubscribableRedisChannel;
3237
import org.springframework.integration.redis.rules.RedisAvailable;
3338
import org.springframework.integration.redis.rules.RedisAvailableTests;
3439
import org.springframework.integration.support.utils.IntegrationUtils;
3540
import org.springframework.integration.test.util.TestUtils;
3641
import org.springframework.messaging.Message;
37-
import org.springframework.messaging.SubscribableChannel;
3842
import org.springframework.messaging.support.GenericMessage;
43+
import org.springframework.test.annotation.DirtiesContext;
44+
import org.springframework.test.context.junit4.SpringRunner;
3945

4046
/**
4147
* @author Oleg Zhurakousky
4248
* @author Gary Russell
4349
* @author Gunnar Hillert
4450
* @author Artem Bilan
4551
*/
52+
@RunWith(SpringRunner.class)
53+
@DirtiesContext
4654
public class RedisChannelParserTests extends RedisAvailableTests {
4755

56+
@Autowired
57+
private SubscribableRedisChannel redisChannel;
58+
59+
@Autowired
60+
private SubscribableRedisChannel redisChannelWithSubLimit;
61+
62+
@Autowired
63+
private ApplicationContext context;
64+
65+
@Before
66+
public void setup() {
67+
this.redisChannel.start();
68+
this.redisChannelWithSubLimit.start();
69+
}
70+
71+
@After
72+
public void tearDown() {
73+
this.redisChannel.stop();
74+
this.redisChannelWithSubLimit.stop();
75+
}
76+
4877
@Test
49-
@RedisAvailable
5078
public void testPubSubChannelConfig() {
51-
ClassPathXmlApplicationContext context =
52-
new ClassPathXmlApplicationContext("RedisChannelParserTests-context.xml", this.getClass());
53-
SubscribableChannel redisChannel = context.getBean("redisChannel", SubscribableChannel.class);
5479
RedisConnectionFactory connectionFactory =
55-
TestUtils.getPropertyValue(redisChannel, "connectionFactory", RedisConnectionFactory.class);
80+
TestUtils.getPropertyValue(this.redisChannel, "connectionFactory", RedisConnectionFactory.class);
5681
RedisSerializer<?> redisSerializer = TestUtils.getPropertyValue(redisChannel, "serializer", RedisSerializer.class);
57-
assertEquals(connectionFactory, context.getBean("redisConnectionFactory"));
58-
assertEquals(redisSerializer, context.getBean("redisSerializer"));
82+
assertEquals(connectionFactory, this.context.getBean("redisConnectionFactory"));
83+
assertEquals(redisSerializer, this.context.getBean("redisSerializer"));
5984
assertEquals("si.test.topic.parser", TestUtils.getPropertyValue(redisChannel, "topicName"));
6085
assertEquals(Integer.MAX_VALUE, TestUtils.getPropertyValue(
61-
TestUtils.getPropertyValue(redisChannel, "dispatcher"), "maxSubscribers", Integer.class).intValue());
62-
redisChannel = context.getBean("redisChannelWithSubLimit", SubscribableChannel.class);
63-
assertEquals(1, TestUtils.getPropertyValue(redisChannel, "dispatcher.maxSubscribers", Integer.class).intValue());
64-
Object mbf = context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
65-
assertSame(mbf, TestUtils.getPropertyValue(redisChannel, "messageBuilderFactory"));
66-
context.close();
86+
TestUtils.getPropertyValue(this.redisChannel, "dispatcher"), "maxSubscribers", Integer.class).intValue());
87+
88+
assertEquals(1,
89+
TestUtils.getPropertyValue(this.redisChannelWithSubLimit, "dispatcher.maxSubscribers", Integer.class).intValue());
90+
Object mbf = this.context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
91+
assertSame(mbf, TestUtils.getPropertyValue(this.redisChannelWithSubLimit, "messageBuilderFactory"));
6792
}
6893

6994
@Test
7095
@RedisAvailable
7196
public void testPubSubChannelUsage() throws Exception {
72-
ClassPathXmlApplicationContext context =
73-
new ClassPathXmlApplicationContext("RedisChannelParserTests-context.xml", this.getClass());
74-
SubscribableChannel redisChannel = context.getBean("redisChannel", SubscribableChannel.class);
75-
76-
this.awaitContainerSubscribed(TestUtils.getPropertyValue(redisChannel, "container",
97+
this.awaitContainerSubscribed(TestUtils.getPropertyValue(this.redisChannel, "container",
7798
RedisMessageListenerContainer.class));
7899

79-
final Message<?> m = new GenericMessage<String>("Hello Redis");
100+
final Message<?> m = new GenericMessage<>("Hello Redis");
80101

81102
final CountDownLatch latch = new CountDownLatch(1);
82-
redisChannel.subscribe(message -> {
103+
this.redisChannel.subscribe(message -> {
83104
assertEquals(m.getPayload(), message.getPayload());
84105
latch.countDown();
85106
});
86-
redisChannel.send(m);
87107

88-
assertTrue(latch.await(5, TimeUnit.SECONDS));
89-
context.close();
108+
this.redisChannel.send(m);
109+
110+
assertTrue(latch.await(10, TimeUnit.SECONDS));
90111
}
91112

92113
}

spring-integration-redis/src/test/java/org/springframework/integration/redis/config/RedisInboundChannelAdapterParserTests-context.xml

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,41 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<beans xmlns="http://www.springframework.org/schema/beans"
3-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
4-
xmlns:int-redis="http://www.springframework.org/schema/integration/redis"
5-
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:int="http://www.springframework.org/schema/integration"
5+
xmlns:int-redis="http://www.springframework.org/schema/integration/redis"
6+
xmlns:util="http://www.springframework.org/schema/util"
7+
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
68
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
7-
http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd">
9+
http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd
10+
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
11+
12+
<util:constant id="redisConnectionFactory"
13+
static-field="org.springframework.integration.redis.rules.RedisAvailableRule.connectionFactory"/>
814

915
<int-redis:inbound-channel-adapter
10-
id="adapter" topics="foo" topic-patterns="f*, b*" channel="receiveChannel" error-channel="testErrorChannel"
11-
message-converter="testConverter"
12-
serializer="serializer"
13-
task-executor="executor" />
16+
id="adapter" topics="foo" topic-patterns="f*, b*" channel="receiveChannel" error-channel="testErrorChannel"
17+
message-converter="testConverter"
18+
serializer="serializer"
19+
task-executor="executor"/>
1420

1521
<bean id="executor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
16-
<property name="corePoolSize" value="5" />
17-
<property name="maxPoolSize" value="10" />
18-
<property name="queueCapacity" value="25" />
22+
<property name="corePoolSize" value="5"/>
23+
<property name="maxPoolSize" value="10"/>
24+
<property name="queueCapacity" value="25"/>
1925
</bean>
2026

2127
<int:channel id="receiveChannel">
22-
<int:queue />
28+
<int:queue/>
2329
</int:channel>
2430

25-
<int:channel id="testErrorChannel" />
26-
27-
<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
28-
<property name="port" value="#{T(org.springframework.integration.redis.rules.RedisAvailableRule).REDIS_PORT}"/>
29-
</bean>
31+
<int:channel id="testErrorChannel"/>
3032

3133
<bean id="testConverter"
32-
class="org.springframework.integration.redis.config.RedisInboundChannelAdapterParserTests$TestMessageConverter" />
34+
class="org.springframework.integration.redis.config.RedisInboundChannelAdapterParserTests$TestMessageConverter"/>
3335

3436
<int-redis:inbound-channel-adapter
35-
id="autoChannel" topics="foo1, bar1" error-channel="testErrorChannel"
36-
message-converter="testConverter" auto-startup="false"/>
37+
id="autoChannel" topics="foo1, bar1" error-channel="testErrorChannel"
38+
message-converter="testConverter" auto-startup="false"/>
3739

3840
<bean id="serializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
3941

0 commit comments

Comments
 (0)