Skip to content

Commit cb61a50

Browse files
committed
DATAREDIS-719 - Use asynchronous Lettuce API for transactions.
We now use the asynchronous API to dispatch commands within a transaction using the Lettuce driver. This allows us dropping our additional result wrappers and use a single, uniform api for deferred results. Original pull request: #289.
1 parent 3954112 commit cb61a50

12 files changed

+223
-293
lines changed

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.springframework.data.redis.connection.convert.TransactionResultConverter;
6767
import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider.TargetAware;
6868
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceResultBuilder;
69+
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceStatusResult;
6970
import org.springframework.data.redis.core.RedisCommand;
7071
import org.springframework.data.redis.core.ScanOptions;
7172
import org.springframework.lang.Nullable;
@@ -135,29 +136,6 @@ <T, R> LettuceResult<T, R> newLettuceStatusResult(Future<T> resultHolder) {
135136
return LettuceResultBuilder.<T, R> forResponse(resultHolder).buildStatusResult();
136137
}
137138

138-
<T> LettuceTxResult<T, Object> newLettuceTxResult(T resultHolder) {
139-
return newLettuceTxResult(resultHolder, (val) -> val);
140-
}
141-
142-
@SuppressWarnings("unchecked")
143-
<T> LettuceTxResult<T, Object> newLettuceTxResult(T resultHolder, Converter<T, ?> converter) {
144-
145-
return LettuceResultBuilder.forResponse(resultHolder).mappedWith((Converter) converter)
146-
.convertPipelineAndTxResults(convertPipelineAndTxResults).buildTxResult();
147-
}
148-
149-
@SuppressWarnings("unchecked")
150-
<T> LettuceTxResult<T, Object> newLettuceTxResult(T resultHolder, Converter<T, ?> converter,
151-
Supplier<?> defaultValue) {
152-
153-
return LettuceResultBuilder.forResponse(resultHolder).mappedWith((Converter) converter)
154-
.convertPipelineAndTxResults(convertPipelineAndTxResults).defaultNullTo(defaultValue).buildTxResult();
155-
}
156-
157-
LettuceTxStatusResult newLettuceTxStatusResult(Object resultHolder) {
158-
return new LettuceTxStatusResult(resultHolder);
159-
}
160-
161139
private class LettuceTransactionResultConverter<T> extends TransactionResultConverter<T> {
162140
public LettuceTransactionResultConverter(Queue<FutureResult<T>> txResults,
163141
Converter<Exception, DataAccessException> exceptionConverter) {
@@ -425,7 +403,7 @@ public Object execute(String command, @Nullable CommandOutput commandOutputTypeH
425403

426404
if (isQueueing()) {
427405

428-
transaction(newLettuceTxResult(connectionImpl.dispatch(cmd.getType(), cmd.getOutput(), cmd.getArgs())));
406+
transaction(newLettuceResult(connectionImpl.dispatch(cmd.getType(), cmd.getOutput(), cmd.getArgs())));
429407
return null;
430408
}
431409

@@ -566,7 +544,7 @@ public byte[] echo(byte[] message) {
566544
return null;
567545
}
568546
if (isQueueing()) {
569-
transaction(new LettuceTxResult<>(getConnection().echo(message)));
547+
transaction(newLettuceResult(getAsyncConnection().echo(message)));
570548
return null;
571549
}
572550
return getConnection().echo(message);
@@ -583,7 +561,7 @@ public String ping() {
583561
return null;
584562
}
585563
if (isQueueing()) {
586-
transaction(new LettuceTxResult<>(getConnection().ping()));
564+
transaction(newLettuceResult(getAsyncConnection().ping()));
587565
return null;
588566
}
589567
return getConnection().ping();
@@ -646,10 +624,10 @@ public void multi() {
646624
isMulti = true;
647625
try {
648626
if (isPipelined()) {
649-
((RedisAsyncCommands) getAsyncDedicatedConnection()).multi();
627+
getAsyncDedicatedRedisCommands().multi();
650628
return;
651629
}
652-
(getDedicatedRedisCommands()).multi();
630+
getDedicatedRedisCommands().multi();
653631
} catch (Exception ex) {
654632
throw convertLettuceAccessException(ex);
655633
}
@@ -662,14 +640,20 @@ public void select(int dbIndex) {
662640
throw new UnsupportedOperationException("Selecting a new database not supported due to shared connection. "
663641
+ "Use separate ConnectionFactorys to work with multiple databases");
664642
}
665-
if (isPipelined()) {
666-
throw new UnsupportedOperationException("Lettuce blocks for #select");
667-
}
643+
668644
try {
669645

670646
this.dbIndex = dbIndex;
647+
648+
if (isPipelined()) {
649+
pipeline(new LettuceStatusResult(getAsyncConnection().dispatch(CommandType.SELECT,
650+
new StatusOutput<>(ByteArrayCodec.INSTANCE), new CommandArgs<>(ByteArrayCodec.INSTANCE).add(dbIndex))));
651+
return;
652+
}
653+
671654
if (isQueueing()) {
672-
transaction(newLettuceStatusResult(((RedisCommands)getAsyncConnection()).select(dbIndex)));
655+
transaction(newLettuceStatusResult(getAsyncConnection().dispatch(CommandType.SELECT,
656+
new StatusOutput<>(ByteArrayCodec.INSTANCE), new CommandArgs<>(ByteArrayCodec.INSTANCE).add(dbIndex))));
673657
return;
674658
}
675659
((RedisCommands) getConnection()).select(dbIndex);
@@ -682,11 +666,11 @@ public void select(int dbIndex) {
682666
public void unwatch() {
683667
try {
684668
if (isPipelined()) {
685-
pipeline(newLettuceStatusResult(((RedisAsyncCommands) getAsyncDedicatedConnection()).unwatch()));
669+
pipeline(newLettuceStatusResult(getAsyncDedicatedRedisCommands().unwatch()));
686670
return;
687671
}
688672
if (isQueueing()) {
689-
transaction(newLettuceStatusResult(getDedicatedRedisCommands().unwatch()));
673+
transaction(newLettuceStatusResult(getAsyncDedicatedRedisCommands().unwatch()));
690674
return;
691675
}
692676
getDedicatedRedisCommands().unwatch();
@@ -706,7 +690,7 @@ public void watch(byte[]... keys) {
706690
return;
707691
}
708692
if (isQueueing()) {
709-
transaction(new LettuceTxStatusResult(getDedicatedRedisCommands().watch(keys)));
693+
transaction(new LettuceStatusResult(getAsyncDedicatedRedisCommands().watch(keys)));
710694
return;
711695
}
712696
getDedicatedRedisCommands().watch(keys);
@@ -727,7 +711,7 @@ public Long publish(byte[] channel, byte[] message) {
727711
return null;
728712
}
729713
if (isQueueing()) {
730-
transaction(newLettuceTxResult(getConnection().publish(channel, message)));
714+
transaction(newLettuceResult(getAsyncConnection().publish(channel, message)));
731715
return null;
732716
}
733717
return getConnection().publish(channel, message);

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceGeoCommands.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.springframework.data.geo.Point;
4141
import org.springframework.data.redis.connection.RedisGeoCommands;
4242
import org.springframework.data.redis.connection.convert.ListConverter;
43-
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceTxResult;
4443
import org.springframework.lang.Nullable;
4544
import org.springframework.util.Assert;
4645

@@ -71,7 +70,7 @@ public Long geoAdd(byte[] key, Point point, byte[] member) {
7170
return null;
7271
}
7372
if (isQueueing()) {
74-
transaction(connection.newLettuceTxResult(getConnection().geoadd(key, point.getX(), point.getY(), member)));
73+
transaction(connection.newLettuceResult(getAsyncConnection().geoadd(key, point.getX(), point.getY(), member)));
7574
return null;
7675
}
7776
return getConnection().geoadd(key, point.getX(), point.getY(), member);
@@ -132,7 +131,7 @@ private Long geoAdd(byte[] key, Collection<Object> values) {
132131
return null;
133132
}
134133
if (isQueueing()) {
135-
transaction(connection.newLettuceTxResult(getConnection().geoadd(key, values.toArray())));
134+
transaction(connection.newLettuceResult(getAsyncConnection().geoadd(key, values.toArray())));
136135
return null;
137136
}
138137
return getConnection().geoadd(key, values.toArray());
@@ -172,8 +171,8 @@ public Distance geoDist(byte[] key, byte[] member1, byte[] member2, Metric metri
172171
return null;
173172
}
174173
if (isQueueing()) {
175-
transaction(
176-
connection.newLettuceTxResult(getConnection().geodist(key, member1, member2, geoUnit), distanceConverter));
174+
transaction(connection.newLettuceResult(getAsyncConnection().geodist(key, member1, member2, geoUnit),
175+
distanceConverter));
177176
return null;
178177
}
179178
return distanceConverter.convert(getConnection().geodist(key, member1, member2, geoUnit));
@@ -199,7 +198,7 @@ public List<String> geoHash(byte[] key, byte[]... members) {
199198
return null;
200199
}
201200
if (isQueueing()) {
202-
transaction(connection.newLettuceTxResult(getConnection().geohash(key, members)));
201+
transaction(connection.newLettuceResult(getAsyncConnection().geohash(key, members)));
203202
return null;
204203
}
205204
return getConnection().geohash(key, members).stream().map(value -> value.getValueOrElse(null))
@@ -228,7 +227,7 @@ public List<Point> geoPos(byte[] key, byte[]... members) {
228227
return null;
229228
}
230229
if (isQueueing()) {
231-
transaction(connection.newLettuceTxResult(getConnection().geopos(key, members), converter));
230+
transaction(connection.newLettuceResult(getAsyncConnection().geopos(key, members), converter));
232231
return null;
233232
}
234233
return converter.convert(getConnection().geopos(key, members));
@@ -259,8 +258,8 @@ public GeoResults<GeoLocation<byte[]>> geoRadius(byte[] key, Circle within) {
259258
return null;
260259
}
261260
if (isQueueing()) {
262-
transaction(connection.newLettuceTxResult(
263-
getConnection().georadius(key, within.getCenter().getX(), within.getCenter().getY(),
261+
transaction(connection.newLettuceResult(
262+
getAsyncConnection().georadius(key, within.getCenter().getX(), within.getCenter().getY(),
264263
within.getRadius().getValue(), LettuceConverters.toGeoArgsUnit(within.getRadius().getMetric())),
265264
geoResultsConverter));
266265
return null;
@@ -296,7 +295,7 @@ public GeoResults<GeoLocation<byte[]>> geoRadius(byte[] key, Circle within, GeoR
296295
return null;
297296
}
298297
if (isQueueing()) {
299-
transaction(connection.newLettuceTxResult(getConnection().georadius(key, within.getCenter().getX(),
298+
transaction(connection.newLettuceResult(getAsyncConnection().georadius(key, within.getCenter().getX(),
300299
within.getCenter().getY(), within.getRadius().getValue(),
301300
LettuceConverters.toGeoArgsUnit(within.getRadius().getMetric()), geoArgs), geoResultsConverter));
302301
return null;
@@ -340,8 +339,8 @@ public GeoResults<GeoLocation<byte[]>> geoRadiusByMember(byte[] key, byte[] memb
340339
return null;
341340
}
342341
if (isQueueing()) {
343-
transaction(connection
344-
.newLettuceTxResult(getConnection().georadiusbymember(key, member, radius.getValue(), geoUnit), converter));
342+
transaction(connection.newLettuceResult(
343+
getAsyncConnection().georadiusbymember(key, member, radius.getValue(), geoUnit), converter));
345344
return null;
346345
}
347346
return converter.convert(getConnection().georadiusbymember(key, member, radius.getValue(), geoUnit));
@@ -376,8 +375,9 @@ public GeoResults<GeoLocation<byte[]>> geoRadiusByMember(byte[] key, byte[] memb
376375
return null;
377376
}
378377
if (isQueueing()) {
379-
transaction(connection.newLettuceTxResult(
380-
getConnection().georadiusbymember(key, member, radius.getValue(), geoUnit, geoArgs), geoResultsConverter));
378+
transaction(connection.newLettuceResult(
379+
getAsyncConnection().georadiusbymember(key, member, radius.getValue(), geoUnit, geoArgs),
380+
geoResultsConverter));
381381
return null;
382382
}
383383
return geoResultsConverter
@@ -408,7 +408,7 @@ private void pipeline(LettuceResult result) {
408408
connection.pipeline(result);
409409
}
410410

411-
private void transaction(LettuceTxResult result) {
411+
private void transaction(LettuceResult result) {
412412
connection.transaction(result);
413413
}
414414

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceHashCommands.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
import org.springframework.dao.DataAccessException;
3131
import org.springframework.data.redis.connection.RedisHashCommands;
32-
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceTxResult;
3332
import org.springframework.data.redis.core.Cursor;
3433
import org.springframework.data.redis.core.KeyBoundCursor;
3534
import org.springframework.data.redis.core.ScanIteration;
@@ -64,7 +63,7 @@ public Boolean hSet(byte[] key, byte[] field, byte[] value) {
6463
return null;
6564
}
6665
if (isQueueing()) {
67-
transaction(connection.newLettuceTxResult(getConnection().hset(key, field, value)));
66+
transaction(connection.newLettuceResult(getAsyncConnection().hset(key, field, value)));
6867
return null;
6968
}
7069
return getConnection().hset(key, field, value);
@@ -90,7 +89,7 @@ public Boolean hSetNX(byte[] key, byte[] field, byte[] value) {
9089
return null;
9190
}
9291
if (isQueueing()) {
93-
transaction(connection.newLettuceTxResult(getConnection().hsetnx(key, field, value)));
92+
transaction(connection.newLettuceResult(getAsyncConnection().hsetnx(key, field, value)));
9493
return null;
9594
}
9695
return getConnection().hsetnx(key, field, value);
@@ -115,7 +114,7 @@ public Long hDel(byte[] key, byte[]... fields) {
115114
return null;
116115
}
117116
if (isQueueing()) {
118-
transaction(connection.newLettuceTxResult(getConnection().hdel(key, fields)));
117+
transaction(connection.newLettuceResult(getAsyncConnection().hdel(key, fields)));
119118
return null;
120119
}
121120
return getConnection().hdel(key, fields);
@@ -140,7 +139,7 @@ public Boolean hExists(byte[] key, byte[] field) {
140139
return null;
141140
}
142141
if (isQueueing()) {
143-
transaction(connection.newLettuceTxResult(getConnection().hexists(key, field)));
142+
transaction(connection.newLettuceResult(getAsyncConnection().hexists(key, field)));
144143
return null;
145144
}
146145
return getConnection().hexists(key, field);
@@ -165,7 +164,7 @@ public byte[] hGet(byte[] key, byte[] field) {
165164
return null;
166165
}
167166
if (isQueueing()) {
168-
transaction(connection.newLettuceTxResult(getConnection().hget(key, field)));
167+
transaction(connection.newLettuceResult(getAsyncConnection().hget(key, field)));
169168
return null;
170169
}
171170
return getConnection().hget(key, field);
@@ -189,7 +188,7 @@ public Map<byte[], byte[]> hGetAll(byte[] key) {
189188
return null;
190189
}
191190
if (isQueueing()) {
192-
transaction(connection.newLettuceTxResult(getConnection().hgetall(key)));
191+
transaction(connection.newLettuceResult(getAsyncConnection().hgetall(key)));
193192
return null;
194193
}
195194
return getConnection().hgetall(key);
@@ -214,7 +213,7 @@ public Long hIncrBy(byte[] key, byte[] field, long delta) {
214213
return null;
215214
}
216215
if (isQueueing()) {
217-
transaction(connection.newLettuceTxResult(getConnection().hincrby(key, field, delta)));
216+
transaction(connection.newLettuceResult(getAsyncConnection().hincrby(key, field, delta)));
218217
return null;
219218
}
220219
return getConnection().hincrby(key, field, delta);
@@ -239,7 +238,7 @@ public Double hIncrBy(byte[] key, byte[] field, double delta) {
239238
return null;
240239
}
241240
if (isQueueing()) {
242-
transaction(connection.newLettuceTxResult(getConnection().hincrbyfloat(key, field, delta)));
241+
transaction(connection.newLettuceResult(getAsyncConnection().hincrbyfloat(key, field, delta)));
243242
return null;
244243
}
245244
return getConnection().hincrbyfloat(key, field, delta);
@@ -263,7 +262,8 @@ public Set<byte[]> hKeys(byte[] key) {
263262
return null;
264263
}
265264
if (isQueueing()) {
266-
transaction(connection.newLettuceTxResult(getConnection().hkeys(key), LettuceConverters.bytesListToBytesSet()));
265+
transaction(
266+
connection.newLettuceResult(getAsyncConnection().hkeys(key), LettuceConverters.bytesListToBytesSet()));
267267
return null;
268268
}
269269
return LettuceConverters.toBytesSet(getConnection().hkeys(key));
@@ -287,7 +287,7 @@ public Long hLen(byte[] key) {
287287
return null;
288288
}
289289
if (isQueueing()) {
290-
transaction(connection.newLettuceTxResult(getConnection().hlen(key)));
290+
transaction(connection.newLettuceResult(getAsyncConnection().hlen(key)));
291291
return null;
292292
}
293293
return getConnection().hlen(key);
@@ -313,7 +313,7 @@ public List<byte[]> hMGet(byte[] key, byte[]... fields) {
313313
return null;
314314
}
315315
if (isQueueing()) {
316-
transaction(connection.newLettuceTxResult(getConnection().hmget(key, fields),
316+
transaction(connection.newLettuceResult(getAsyncConnection().hmget(key, fields),
317317
LettuceConverters.keyValueListUnwrapper()));
318318
return null;
319319
}
@@ -339,7 +339,7 @@ public void hMSet(byte[] key, Map<byte[], byte[]> hashes) {
339339
return;
340340
}
341341
if (isQueueing()) {
342-
transaction(connection.newLettuceTxStatusResult(getConnection().hmset(key, hashes)));
342+
transaction(connection.newLettuceStatusResult(getAsyncConnection().hmset(key, hashes)));
343343
return;
344344
}
345345
getConnection().hmset(key, hashes);
@@ -363,7 +363,7 @@ public List<byte[]> hVals(byte[] key) {
363363
return null;
364364
}
365365
if (isQueueing()) {
366-
transaction(connection.newLettuceTxResult(getConnection().hvals(key)));
366+
transaction(connection.newLettuceResult(getAsyncConnection().hvals(key)));
367367
return null;
368368
}
369369
return getConnection().hvals(key);
@@ -434,7 +434,7 @@ public Long hStrLen(byte[] key, byte[] field) {
434434
return null;
435435
}
436436
if (isQueueing()) {
437-
transaction(connection.newLettuceTxResult(getConnection().hstrlen(key, field)));
437+
transaction(connection.newLettuceResult(getAsyncConnection().hstrlen(key, field)));
438438
return null;
439439
}
440440
return getConnection().hstrlen(key, field);
@@ -455,7 +455,7 @@ private void pipeline(LettuceResult result) {
455455
connection.pipeline(result);
456456
}
457457

458-
private void transaction(LettuceTxResult result) {
458+
private void transaction(LettuceResult result) {
459459
connection.transaction(result);
460460
}
461461

0 commit comments

Comments
 (0)