Skip to content

Commit 8c6f7eb

Browse files
committed
DATAREDIS-580 - Support Master/Slave connections with ReadFrom settings using Lettuce.
We now allow configuration of ReadFrom settings using the Lettuce driver to enable slave reads. If ReadFrom is configured, we opt-in to Master/Slave connections instead of plain connections. Master/Slave connections route commands to the configured type of node depending on whether the command is a read or write command. LettuceClientConfiguration configuration = LettuceClientConfiguration.builder().readFrom(ReadFrom.SLAVE).build(); ReadFrom is available for: * Static Master/Slave Redis without Redis Sentinel * Sentinel-Managed Master/Slave Redis * Redis Cluster ReadFrom is not configured for Pub/Sub connections or connections to the actual Sentinel servers.
1 parent 3986832 commit 8c6f7eb

13 files changed

+191
-26
lines changed

src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@
1515
*/
1616
package org.springframework.data.redis.connection.lettuce;
1717

18+
import io.lettuce.core.ReadFrom;
1819
import io.lettuce.core.api.StatefulConnection;
1920
import io.lettuce.core.cluster.RedisClusterClient;
2021
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
2122
import io.lettuce.core.codec.RedisCodec;
2223
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
24+
import lombok.RequiredArgsConstructor;
25+
26+
import java.util.Optional;
2327

2428
/**
2529
* Connection provider for Cluster connections.
@@ -28,16 +32,12 @@
2832
* @author Christoph Strobl
2933
* @since 2.0
3034
*/
35+
@RequiredArgsConstructor
3136
class ClusterConnectionProvider implements LettuceConnectionProvider {
3237

3338
private final RedisClusterClient client;
3439
private final RedisCodec<?, ?> codec;
35-
36-
ClusterConnectionProvider(RedisClusterClient client, RedisCodec<?, ?> codec) {
37-
38-
this.client = client;
39-
this.codec = codec;
40-
}
40+
private final Optional<ReadFrom> readFrom;
4141

4242
/*
4343
* (non-Javadoc)
@@ -52,7 +52,11 @@ class ClusterConnectionProvider implements LettuceConnectionProvider {
5252

5353
if (StatefulRedisClusterConnection.class.isAssignableFrom(connectionType)
5454
|| connectionType.equals(StatefulConnection.class)) {
55-
return connectionType.cast(client.connect(codec));
55+
56+
StatefulRedisClusterConnection<?, ?> connection = client.connect(codec);
57+
readFrom.ifPresent(connection::setReadFrom);
58+
59+
return connectionType.cast(connection);
5660
}
5761

5862
throw new UnsupportedOperationException("Connection type " + connectionType + " not supported!");

src/main/java/org/springframework/data/redis/connection/lettuce/DefaultLettuceClientConfiguration.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.data.redis.connection.lettuce;
1717

1818
import io.lettuce.core.ClientOptions;
19+
import io.lettuce.core.ReadFrom;
1920
import io.lettuce.core.resource.ClientResources;
2021

2122
import java.time.Duration;
@@ -38,11 +39,13 @@ class DefaultLettuceClientConfiguration implements LettuceClientConfiguration {
3839
private final Optional<ClientResources> clientResources;
3940
private final Optional<ClientOptions> clientOptions;
4041
private final Optional<String> clientName;
42+
private final Optional<ReadFrom> readFrom;
4143
private final Duration timeout;
4244
private final Duration shutdownTimeout;
4345

4446
DefaultLettuceClientConfiguration(boolean useSsl, boolean verifyPeer, boolean startTls,
4547
@Nullable ClientResources clientResources, @Nullable ClientOptions clientOptions, @Nullable String clientName,
48+
@Nullable ReadFrom readFrom,
4649
Duration timeout, Duration shutdownTimeout) {
4750

4851
this.useSsl = useSsl;
@@ -51,6 +54,7 @@ class DefaultLettuceClientConfiguration implements LettuceClientConfiguration {
5154
this.clientResources = Optional.ofNullable(clientResources);
5255
this.clientOptions = Optional.ofNullable(clientOptions);
5356
this.clientName = Optional.ofNullable(clientName);
57+
this.readFrom = Optional.ofNullable(readFrom);
5458
this.timeout = timeout;
5559
this.shutdownTimeout = shutdownTimeout;
5660
}
@@ -91,8 +95,7 @@ public Optional<ClientResources> getClientResources() {
9195
return clientResources;
9296
}
9397

94-
/*
95-
* (non-Javadoc)
98+
/* (non-Javadoc)
9699
* @see org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration#getClientOptions()
97100
*/
98101
@Override
@@ -109,6 +112,15 @@ public Optional<String> getClientName() {
109112
return clientName;
110113
}
111114

115+
/*
116+
* (non-Javadoc)
117+
* @see org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration#getReadFrom()
118+
*/
119+
@Override
120+
public Optional<ReadFrom> getReadFrom() {
121+
return readFrom;
122+
}
123+
112124
/*
113125
* (non-Javadoc)
114126
* @see org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration#getTimeout()

src/main/java/org/springframework/data/redis/connection/lettuce/DefaultLettucePoolingClientConfiguration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.data.redis.connection.lettuce;
1717

1818
import io.lettuce.core.ClientOptions;
19+
import io.lettuce.core.ReadFrom;
1920
import io.lettuce.core.resource.ClientResources;
2021

2122
import java.time.Duration;
@@ -96,6 +97,15 @@ public Optional<String> getClientName() {
9697
return clientConfiguration.getClientName();
9798
}
9899

100+
/*
101+
* (non-Javadoc)
102+
* @see org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration#getReadFrom()
103+
*/
104+
@Override
105+
public Optional<ReadFrom> getReadFrom() {
106+
return clientConfiguration.getReadFrom();
107+
}
108+
99109
/*
100110
* (non-Javadoc)
101111
* @see org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration#getCommandTimeout()

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClientConfiguration.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.data.redis.connection.lettuce;
1717

1818
import io.lettuce.core.ClientOptions;
19+
import io.lettuce.core.ReadFrom;
1920
import io.lettuce.core.RedisURI;
2021
import io.lettuce.core.resource.ClientResources;
2122

@@ -38,6 +39,7 @@
3839
* <li>Optional {@link ClientResources}</li>
3940
* <li>Optional {@link ClientOptions}</li>
4041
* <li>Optional client name</li>
42+
* <li>Optional {@link ReadFrom}. Enables Master/Slave operations if configured.</li>
4143
* <li>Client {@link Duration timeout}</li>
4244
* <li>Shutdown {@link Duration timeout}</li>
4345
* </ul>
@@ -82,6 +84,12 @@ public interface LettuceClientConfiguration {
8284
*/
8385
Optional<String> getClientName();
8486

87+
/**
88+
* @return the optional {@link io.lettuce.core.ReadFrom} setting.
89+
* @since 2.1
90+
*/
91+
Optional<ReadFrom> getReadFrom();
92+
8593
/**
8694
* @return the timeout.
8795
*/
@@ -118,6 +126,8 @@ static LettuceClientConfigurationBuilder builder() {
118126
* <dd>none</dd>
119127
* <dt>Client name</dt>
120128
* <dd>none</dd>
129+
* <dt>Read From</dt>
130+
* <dd>none</dd>
121131
* <dt>Connect Timeout</dt>
122132
* <dd>60 Seconds</dd>
123133
* <dt>Shutdown Timeout</dt>
@@ -142,6 +152,7 @@ class LettuceClientConfigurationBuilder {
142152
@Nullable ClientResources clientResources;
143153
@Nullable ClientOptions clientOptions;
144154
@Nullable String clientName;
155+
@Nullable ReadFrom readFrom;
145156
Duration timeout = Duration.ofSeconds(RedisURI.DEFAULT_TIMEOUT);
146157
Duration shutdownTimeout = Duration.ofMillis(100);
147158

@@ -188,6 +199,22 @@ public LettuceClientConfigurationBuilder clientOptions(ClientOptions clientOptio
188199
return this;
189200
}
190201

202+
/**
203+
* Configure {@link ReadFrom}. Enables Master/Slave operations if configured.
204+
*
205+
* @param readFrom must not be {@literal null}.
206+
* @return {@literal this} builder.
207+
* @throws IllegalArgumentException if clientOptions is {@literal null}.
208+
* @since 2.1
209+
*/
210+
public LettuceClientConfigurationBuilder readFrom(ReadFrom readFrom) {
211+
212+
Assert.notNull(readFrom, "ReadFrom must not be null!");
213+
214+
this.readFrom = readFrom;
215+
return this;
216+
}
217+
191218
/**
192219
* Configure a {@code clientName} to be set with {@code CLIENT SETNAME}.
193220
*
@@ -242,7 +269,7 @@ public LettuceClientConfigurationBuilder shutdownTimeout(Duration shutdownTimeou
242269
public LettuceClientConfiguration build() {
243270

244271
return new DefaultLettuceClientConfiguration(useSsl, verifyPeer, startTls, clientResources, clientOptions,
245-
clientName, timeout, shutdownTimeout);
272+
clientName, readFrom, timeout, shutdownTimeout);
246273
}
247274
}
248275

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.LinkedHashMap;
3131
import java.util.List;
3232
import java.util.Map;
33+
import java.util.Optional;
3334
import java.util.Set;
3435

3536
import org.apache.commons.logging.Log;
@@ -74,7 +75,7 @@ public class LettuceClusterConnection extends LettuceConnection implements Defau
7475
* @param clusterClient must not be {@literal null}.
7576
*/
7677
public LettuceClusterConnection(RedisClusterClient clusterClient) {
77-
this(new ClusterConnectionProvider(clusterClient, CODEC));
78+
this(new ClusterConnectionProvider(clusterClient, CODEC, Optional.empty()));
7879
}
7980

8081
/**
@@ -98,7 +99,7 @@ public LettuceClusterConnection(RedisClusterClient clusterClient, ClusterCommand
9899
* @since 2.0
99100
*/
100101
public LettuceClusterConnection(RedisClusterClient clusterClient, ClusterCommandExecutor executor, Duration timeout) {
101-
this(new ClusterConnectionProvider(clusterClient, CODEC), executor, timeout);
102+
this(new ClusterConnectionProvider(clusterClient, CODEC, Optional.empty()), executor, timeout);
102103
}
103104

104105
/**

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.LinkedList;
5050
import java.util.List;
5151
import java.util.Map;
52+
import java.util.Optional;
5253
import java.util.Queue;
5354
import java.util.concurrent.ConcurrentHashMap;
5455
import java.util.concurrent.Future;
@@ -224,7 +225,7 @@ public LettuceConnection(@Nullable StatefulRedisConnection<byte[], byte[]> share
224225
if (pool != null) {
225226
this.connectionProvider = new LettucePoolConnectionProvider(pool);
226227
} else {
227-
this.connectionProvider = new StandaloneConnectionProvider((RedisClient) client, CODEC);
228+
this.connectionProvider = new StandaloneConnectionProvider((RedisClient) client, CODEC, Optional.empty());
228229
}
229230

230231
this.asyncSharedConn = sharedConnection;

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.lettuce.core.AbstractRedisClient;
1919
import io.lettuce.core.ClientOptions;
20+
import io.lettuce.core.ReadFrom;
2021
import io.lettuce.core.RedisClient;
2122
import io.lettuce.core.RedisException;
2223
import io.lettuce.core.RedisURI;
@@ -822,7 +823,7 @@ protected StatefulConnection<ByteBuffer, ByteBuffer> getSharedReactiveConnection
822823

823824
private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
824825

825-
LettuceConnectionProvider connectionProvider = doConnectionProvider(client, codec);
826+
LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client, codec);
826827

827828
if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) {
828829
return new LettucePoolingConnectionProvider(connectionProvider,
@@ -832,12 +833,29 @@ private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient c
832833
return connectionProvider;
833834
}
834835

835-
private LettuceConnectionProvider doConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
836+
/**
837+
* Create a {@link LettuceConnectionProvider} given {@link AbstractRedisClient} and {@link RedisCodec}. Configuration
838+
* of this connection factory specifies the type of the created connection provider. This method creates either a
839+
* {@link LettuceConnectionProvider} for either {@link RedisClient} or {@link RedisClusterClient}. Subclasses may
840+
* override this method to decorate the connection provider.
841+
*
842+
* @param client either {@link RedisClient} or {@link RedisClusterClient}, must not be {@literal null}.
843+
* @param codec used for connection creation, must not be {@literal null}. By default, a {@code byte[]} codec.
844+
* Reactive connections require a {@link java.nio.ByteBuffer} codec.
845+
* @return the connection provider.
846+
* @since 2.1
847+
* @see io.lettuce.core.codec.ByteArrayCodec
848+
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection.ByteBufferCodec
849+
*/
850+
protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
851+
852+
Optional<ReadFrom> readFrom = getClientConfiguration().getReadFrom();
853+
836854
if (isClusterAware()) {
837-
return new ClusterConnectionProvider((RedisClusterClient) client, codec);
855+
return new ClusterConnectionProvider((RedisClusterClient) client, codec, readFrom);
838856
}
839857

840-
return new StandaloneConnectionProvider((RedisClient) client, codec);
858+
return new StandaloneConnectionProvider((RedisClient) client, codec, readFrom);
841859
}
842860

843861
private AbstractRedisClient createClient() {
@@ -1124,6 +1142,14 @@ public Optional<ClientOptions> getClientOptions() {
11241142
return Optional.empty();
11251143
}
11261144

1145+
/* (non-Javadoc)
1146+
* @see org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration#getReadFrom()
1147+
*/
1148+
@Override
1149+
public Optional<ReadFrom> getReadFrom() {
1150+
return Optional.empty();
1151+
}
1152+
11271153
/*
11281154
* (non-Javadoc)
11291155
* @see org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration#getClientName()

src/main/java/org/springframework/data/redis/connection/lettuce/StandaloneConnectionProvider.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,20 @@
1515
*/
1616
package org.springframework.data.redis.connection.lettuce;
1717

18+
import io.lettuce.core.ReadFrom;
1819
import io.lettuce.core.RedisClient;
1920
import io.lettuce.core.RedisURI;
2021
import io.lettuce.core.api.StatefulConnection;
2122
import io.lettuce.core.codec.RedisCodec;
23+
import io.lettuce.core.masterslave.MasterSlave;
24+
import io.lettuce.core.masterslave.StatefulRedisMasterSlaveConnection;
2225
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
2326
import io.lettuce.core.sentinel.api.StatefulRedisSentinelConnection;
2427
import lombok.RequiredArgsConstructor;
2528

29+
import java.util.Optional;
30+
31+
import org.springframework.beans.DirectFieldAccessor;
2632
import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider.TargetAware;
2733

2834
/**
@@ -35,6 +41,7 @@ class StandaloneConnectionProvider implements LettuceConnectionProvider, TargetA
3541

3642
private final RedisClient client;
3743
private final RedisCodec<?, ?> codec;
44+
private final Optional<ReadFrom> readFrom;
3845

3946
/*
4047
* (non-Javadoc)
@@ -53,7 +60,17 @@ class StandaloneConnectionProvider implements LettuceConnectionProvider, TargetA
5360
}
5461

5562
if (StatefulConnection.class.isAssignableFrom(connectionType)) {
56-
return connectionType.cast(client.connect(codec));
63+
64+
return readFrom.map(it -> {
65+
66+
DirectFieldAccessor fieldAccessor = new DirectFieldAccessor(client);
67+
RedisURI redisURI = RedisURI.class.cast(fieldAccessor.getPropertyValue("redisURI"));
68+
69+
StatefulRedisMasterSlaveConnection<?, ?> connection = MasterSlave.connect(client, codec, redisURI);
70+
connection.setReadFrom(it);
71+
72+
return connectionType.cast(connection);
73+
}).orElseGet(() -> connectionType.cast(client.connect(codec)));
5774
}
5875

5976
throw new UnsupportedOperationException("Connection type " + connectionType + " not supported!");

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.hamcrest.core.IsNull.*;
2121
import static org.junit.Assert.*;
2222

23+
import io.lettuce.core.ReadFrom;
2324
import io.lettuce.core.RedisException;
2425
import io.lettuce.core.api.async.RedisAsyncCommands;
2526
import io.lettuce.core.api.reactive.BaseRedisReactiveCommands;
@@ -362,6 +363,28 @@ public void factoryCreatesPooledConnections() {
362363
factory.destroy();
363364
}
364365

366+
@Test // DATAREDIS-580
367+
public void factoryUsesMasterSlaveConnections() {
368+
369+
LettuceClientConfiguration configuration = LettuceTestClientConfiguration.builder().readFrom(ReadFrom.SLAVE)
370+
.build();
371+
372+
LettuceConnectionFactory factory = new LettuceConnectionFactory(SettingsUtils.standaloneConfiguration(),
373+
configuration);
374+
factory.afterPropertiesSet();
375+
376+
RedisConnection connection = factory.getConnection();
377+
378+
try {
379+
assertThat(connection.ping(), is(equalTo("PONG")));
380+
assertThat(connection.info().getProperty("role"), is(equalTo("slave")));
381+
} finally {
382+
this.connection.close();
383+
}
384+
385+
factory.destroy();
386+
}
387+
365388
@Test // DATAREDIS-576
366389
public void connectionAppliesClientName() {
367390

0 commit comments

Comments
 (0)