Skip to content

Commit f1fac3d

Browse files
committed
DATAREDIS-719 - Use asynchronous API for transactions for Lettuce.
We now use the asynchronous API to dispatch commands within a transaction using the Lettuce driver. This allows us dropping our additional result wrappers and use a single, uniform api for deferred results.
1 parent 635f305 commit f1fac3d

12 files changed

+222
-299
lines changed

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

Lines changed: 19 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@
6767
import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider.TargetAware;
6868
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceResultBuilder;
6969
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceStatusResult;
70-
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceTxResult;
71-
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceTxStatusResult;
7270
import org.springframework.data.redis.core.RedisCommand;
7371
import org.springframework.data.redis.core.ScanOptions;
7472
import org.springframework.lang.Nullable;
@@ -138,29 +136,6 @@ LettuceStatusResult newLettuceStatusResult(Future<?> resultHolder) {
138136
return new LettuceStatusResult(resultHolder);
139137
}
140138

141-
<T> LettuceTxResult<T, Object> newLettuceTxResult(T resultHolder) {
142-
return newLettuceTxResult(resultHolder, (val) -> val);
143-
}
144-
145-
@SuppressWarnings("unchecked")
146-
<T> LettuceTxResult<T, Object> newLettuceTxResult(T resultHolder, Converter<T, ?> converter) {
147-
148-
return LettuceResultBuilder.forResponse(resultHolder).mappedWith((Converter) converter)
149-
.convertPipelineAndTxResults(convertPipelineAndTxResults).buildTxResult();
150-
}
151-
152-
@SuppressWarnings("unchecked")
153-
<T> LettuceTxResult<T, Object> newLettuceTxResult(T resultHolder, Converter<T, ?> converter,
154-
Supplier<?> defaultValue) {
155-
156-
return LettuceResultBuilder.forResponse(resultHolder).mappedWith((Converter) converter)
157-
.convertPipelineAndTxResults(convertPipelineAndTxResults).defaultNullTo(defaultValue).buildTxResult();
158-
}
159-
160-
LettuceTxStatusResult newLettuceTxStatusResult(Object resultHolder) {
161-
return new LettuceTxStatusResult(resultHolder);
162-
}
163-
164139
private class LettuceTransactionResultConverter<T> extends TransactionResultConverter<T> {
165140
public LettuceTransactionResultConverter(Queue<FutureResult<T>> txResults,
166141
Converter<Exception, DataAccessException> exceptionConverter) {
@@ -424,7 +399,7 @@ public Object execute(String command, @Nullable CommandOutput commandOutputTypeH
424399
return null;
425400
} else if (isQueueing()) {
426401

427-
transaction(newLettuceTxResult(connectionImpl.dispatch(cmd.getType(), cmd.getOutput(), cmd.getArgs())));
402+
transaction(newLettuceResult(connectionImpl.dispatch(cmd.getType(), cmd.getOutput(), cmd.getArgs())));
428403
return null;
429404
} else {
430405
return await(connectionImpl.dispatch(cmd.getType(), cmd.getOutput(), cmd.getArgs()));
@@ -564,7 +539,7 @@ public byte[] echo(byte[] message) {
564539
return null;
565540
}
566541
if (isQueueing()) {
567-
transaction(new LettuceTxResult<>(getConnection().echo(message)));
542+
transaction(newLettuceResult(getAsyncConnection().echo(message)));
568543
return null;
569544
}
570545
return getConnection().echo(message);
@@ -581,7 +556,7 @@ public String ping() {
581556
return null;
582557
}
583558
if (isQueueing()) {
584-
transaction(new LettuceTxResult<>(getConnection().ping()));
559+
transaction(newLettuceResult(getAsyncConnection().ping()));
585560
return null;
586561
}
587562
return getConnection().ping();
@@ -644,10 +619,10 @@ public void multi() {
644619
isMulti = true;
645620
try {
646621
if (isPipelined()) {
647-
((RedisAsyncCommands) getAsyncDedicatedConnection()).multi();
622+
getAsyncDedicatedRedisCommands().multi();
648623
return;
649624
}
650-
(getDedicatedRedisCommands()).multi();
625+
getDedicatedRedisCommands().multi();
651626
} catch (Exception ex) {
652627
throw convertLettuceAccessException(ex);
653628
}
@@ -660,14 +635,20 @@ public void select(int dbIndex) {
660635
throw new UnsupportedOperationException("Selecting a new database not supported due to shared connection. "
661636
+ "Use separate ConnectionFactorys to work with multiple databases");
662637
}
663-
if (isPipelined()) {
664-
throw new UnsupportedOperationException("Lettuce blocks for #select");
665-
}
638+
666639
try {
667640

668641
this.dbIndex = dbIndex;
642+
643+
if (isPipelined()) {
644+
pipeline(new LettuceStatusResult(getAsyncConnection().dispatch(CommandType.SELECT,
645+
new StatusOutput<>(ByteArrayCodec.INSTANCE), new CommandArgs<>(ByteArrayCodec.INSTANCE).add(dbIndex))));
646+
return;
647+
}
648+
669649
if (isQueueing()) {
670-
transaction(new LettuceTxStatusResult(((RedisCommands) getAsyncConnection()).select(dbIndex)));
650+
transaction(new LettuceStatusResult(getAsyncConnection().dispatch(CommandType.SELECT,
651+
new StatusOutput<>(ByteArrayCodec.INSTANCE), new CommandArgs<>(ByteArrayCodec.INSTANCE).add(dbIndex))));
671652
return;
672653
}
673654
((RedisCommands) getConnection()).select(dbIndex);
@@ -680,11 +661,11 @@ public void select(int dbIndex) {
680661
public void unwatch() {
681662
try {
682663
if (isPipelined()) {
683-
pipeline(new LettuceStatusResult(((RedisAsyncCommands) getAsyncDedicatedConnection()).unwatch()));
664+
pipeline(new LettuceStatusResult(getAsyncDedicatedRedisCommands().unwatch()));
684665
return;
685666
}
686667
if (isQueueing()) {
687-
transaction(new LettuceTxStatusResult(getDedicatedRedisCommands().unwatch()));
668+
transaction(new LettuceStatusResult(getAsyncDedicatedRedisCommands().unwatch()));
688669
return;
689670
}
690671
getDedicatedRedisCommands().unwatch();
@@ -704,7 +685,7 @@ public void watch(byte[]... keys) {
704685
return;
705686
}
706687
if (isQueueing()) {
707-
transaction(new LettuceTxStatusResult(getDedicatedRedisCommands().watch(keys)));
688+
transaction(new LettuceStatusResult(getAsyncDedicatedRedisCommands().watch(keys)));
708689
return;
709690
}
710691
getDedicatedRedisCommands().watch(keys);
@@ -725,7 +706,7 @@ public Long publish(byte[] channel, byte[] message) {
725706
return null;
726707
}
727708
if (isQueueing()) {
728-
transaction(newLettuceTxResult(getConnection().publish(channel, message)));
709+
transaction(newLettuceResult(getAsyncConnection().publish(channel, message)));
729710
return null;
730711
}
731712
return getConnection().publish(channel, message);

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceGeoCommands.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.springframework.data.geo.Point;
4141
import org.springframework.data.redis.connection.RedisGeoCommands;
4242
import org.springframework.data.redis.connection.convert.ListConverter;
43-
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceTxResult;
4443
import org.springframework.lang.Nullable;
4544
import org.springframework.util.Assert;
4645

@@ -71,7 +70,7 @@ public Long geoAdd(byte[] key, Point point, byte[] member) {
7170
return null;
7271
}
7372
if (isQueueing()) {
74-
transaction(connection.newLettuceTxResult(getConnection().geoadd(key, point.getX(), point.getY(), member)));
73+
transaction(connection.newLettuceResult(getAsyncConnection().geoadd(key, point.getX(), point.getY(), member)));
7574
return null;
7675
}
7776
return getConnection().geoadd(key, point.getX(), point.getY(), member);
@@ -132,7 +131,7 @@ private Long geoAdd(byte[] key, Collection<Object> values) {
132131
return null;
133132
}
134133
if (isQueueing()) {
135-
transaction(connection.newLettuceTxResult(getConnection().geoadd(key, values.toArray())));
134+
transaction(connection.newLettuceResult(getAsyncConnection().geoadd(key, values.toArray())));
136135
return null;
137136
}
138137
return getConnection().geoadd(key, values.toArray());
@@ -172,8 +171,8 @@ public Distance geoDist(byte[] key, byte[] member1, byte[] member2, Metric metri
172171
return null;
173172
}
174173
if (isQueueing()) {
175-
transaction(
176-
connection.newLettuceTxResult(getConnection().geodist(key, member1, member2, geoUnit), distanceConverter));
174+
transaction(connection.newLettuceResult(getAsyncConnection().geodist(key, member1, member2, geoUnit),
175+
distanceConverter));
177176
return null;
178177
}
179178
return distanceConverter.convert(getConnection().geodist(key, member1, member2, geoUnit));
@@ -199,7 +198,7 @@ public List<String> geoHash(byte[] key, byte[]... members) {
199198
return null;
200199
}
201200
if (isQueueing()) {
202-
transaction(connection.newLettuceTxResult(getConnection().geohash(key, members)));
201+
transaction(connection.newLettuceResult(getAsyncConnection().geohash(key, members)));
203202
return null;
204203
}
205204
return getConnection().geohash(key, members).stream().map(value -> value.getValueOrElse(null))
@@ -228,7 +227,7 @@ public List<Point> geoPos(byte[] key, byte[]... members) {
228227
return null;
229228
}
230229
if (isQueueing()) {
231-
transaction(connection.newLettuceTxResult(getConnection().geopos(key, members), converter));
230+
transaction(connection.newLettuceResult(getAsyncConnection().geopos(key, members), converter));
232231
return null;
233232
}
234233
return converter.convert(getConnection().geopos(key, members));
@@ -259,8 +258,8 @@ public GeoResults<GeoLocation<byte[]>> geoRadius(byte[] key, Circle within) {
259258
return null;
260259
}
261260
if (isQueueing()) {
262-
transaction(connection.newLettuceTxResult(
263-
getConnection().georadius(key, within.getCenter().getX(), within.getCenter().getY(),
261+
transaction(connection.newLettuceResult(
262+
getAsyncConnection().georadius(key, within.getCenter().getX(), within.getCenter().getY(),
264263
within.getRadius().getValue(), LettuceConverters.toGeoArgsUnit(within.getRadius().getMetric())),
265264
geoResultsConverter));
266265
return null;
@@ -296,7 +295,7 @@ public GeoResults<GeoLocation<byte[]>> geoRadius(byte[] key, Circle within, GeoR
296295
return null;
297296
}
298297
if (isQueueing()) {
299-
transaction(connection.newLettuceTxResult(getConnection().georadius(key, within.getCenter().getX(),
298+
transaction(connection.newLettuceResult(getAsyncConnection().georadius(key, within.getCenter().getX(),
300299
within.getCenter().getY(), within.getRadius().getValue(),
301300
LettuceConverters.toGeoArgsUnit(within.getRadius().getMetric()), geoArgs), geoResultsConverter));
302301
return null;
@@ -340,8 +339,8 @@ public GeoResults<GeoLocation<byte[]>> geoRadiusByMember(byte[] key, byte[] memb
340339
return null;
341340
}
342341
if (isQueueing()) {
343-
transaction(connection
344-
.newLettuceTxResult(getConnection().georadiusbymember(key, member, radius.getValue(), geoUnit), converter));
342+
transaction(connection.newLettuceResult(
343+
getAsyncConnection().georadiusbymember(key, member, radius.getValue(), geoUnit), converter));
345344
return null;
346345
}
347346
return converter.convert(getConnection().georadiusbymember(key, member, radius.getValue(), geoUnit));
@@ -376,8 +375,9 @@ public GeoResults<GeoLocation<byte[]>> geoRadiusByMember(byte[] key, byte[] memb
376375
return null;
377376
}
378377
if (isQueueing()) {
379-
transaction(connection.newLettuceTxResult(
380-
getConnection().georadiusbymember(key, member, radius.getValue(), geoUnit, geoArgs), geoResultsConverter));
378+
transaction(connection.newLettuceResult(
379+
getAsyncConnection().georadiusbymember(key, member, radius.getValue(), geoUnit, geoArgs),
380+
geoResultsConverter));
381381
return null;
382382
}
383383
return geoResultsConverter
@@ -408,7 +408,7 @@ private void pipeline(LettuceResult result) {
408408
connection.pipeline(result);
409409
}
410410

411-
private void transaction(LettuceTxResult result) {
411+
private void transaction(LettuceResult result) {
412412
connection.transaction(result);
413413
}
414414

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceHashCommands.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
import org.springframework.dao.DataAccessException;
3131
import org.springframework.data.redis.connection.RedisHashCommands;
32-
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceTxResult;
3332
import org.springframework.data.redis.core.Cursor;
3433
import org.springframework.data.redis.core.KeyBoundCursor;
3534
import org.springframework.data.redis.core.ScanIteration;
@@ -64,7 +63,7 @@ public Boolean hSet(byte[] key, byte[] field, byte[] value) {
6463
return null;
6564
}
6665
if (isQueueing()) {
67-
transaction(connection.newLettuceTxResult(getConnection().hset(key, field, value)));
66+
transaction(connection.newLettuceResult(getAsyncConnection().hset(key, field, value)));
6867
return null;
6968
}
7069
return getConnection().hset(key, field, value);
@@ -90,7 +89,7 @@ public Boolean hSetNX(byte[] key, byte[] field, byte[] value) {
9089
return null;
9190
}
9291
if (isQueueing()) {
93-
transaction(connection.newLettuceTxResult(getConnection().hsetnx(key, field, value)));
92+
transaction(connection.newLettuceResult(getAsyncConnection().hsetnx(key, field, value)));
9493
return null;
9594
}
9695
return getConnection().hsetnx(key, field, value);
@@ -115,7 +114,7 @@ public Long hDel(byte[] key, byte[]... fields) {
115114
return null;
116115
}
117116
if (isQueueing()) {
118-
transaction(connection.newLettuceTxResult(getConnection().hdel(key, fields)));
117+
transaction(connection.newLettuceResult(getAsyncConnection().hdel(key, fields)));
119118
return null;
120119
}
121120
return getConnection().hdel(key, fields);
@@ -140,7 +139,7 @@ public Boolean hExists(byte[] key, byte[] field) {
140139
return null;
141140
}
142141
if (isQueueing()) {
143-
transaction(connection.newLettuceTxResult(getConnection().hexists(key, field)));
142+
transaction(connection.newLettuceResult(getAsyncConnection().hexists(key, field)));
144143
return null;
145144
}
146145
return getConnection().hexists(key, field);
@@ -165,7 +164,7 @@ public byte[] hGet(byte[] key, byte[] field) {
165164
return null;
166165
}
167166
if (isQueueing()) {
168-
transaction(connection.newLettuceTxResult(getConnection().hget(key, field)));
167+
transaction(connection.newLettuceResult(getAsyncConnection().hget(key, field)));
169168
return null;
170169
}
171170
return getConnection().hget(key, field);
@@ -189,7 +188,7 @@ public Map<byte[], byte[]> hGetAll(byte[] key) {
189188
return null;
190189
}
191190
if (isQueueing()) {
192-
transaction(connection.newLettuceTxResult(getConnection().hgetall(key)));
191+
transaction(connection.newLettuceResult(getAsyncConnection().hgetall(key)));
193192
return null;
194193
}
195194
return getConnection().hgetall(key);
@@ -214,7 +213,7 @@ public Long hIncrBy(byte[] key, byte[] field, long delta) {
214213
return null;
215214
}
216215
if (isQueueing()) {
217-
transaction(connection.newLettuceTxResult(getConnection().hincrby(key, field, delta)));
216+
transaction(connection.newLettuceResult(getAsyncConnection().hincrby(key, field, delta)));
218217
return null;
219218
}
220219
return getConnection().hincrby(key, field, delta);
@@ -239,7 +238,7 @@ public Double hIncrBy(byte[] key, byte[] field, double delta) {
239238
return null;
240239
}
241240
if (isQueueing()) {
242-
transaction(connection.newLettuceTxResult(getConnection().hincrbyfloat(key, field, delta)));
241+
transaction(connection.newLettuceResult(getAsyncConnection().hincrbyfloat(key, field, delta)));
243242
return null;
244243
}
245244
return getConnection().hincrbyfloat(key, field, delta);
@@ -263,7 +262,8 @@ public Set<byte[]> hKeys(byte[] key) {
263262
return null;
264263
}
265264
if (isQueueing()) {
266-
transaction(connection.newLettuceTxResult(getConnection().hkeys(key), LettuceConverters.bytesListToBytesSet()));
265+
transaction(
266+
connection.newLettuceResult(getAsyncConnection().hkeys(key), LettuceConverters.bytesListToBytesSet()));
267267
return null;
268268
}
269269
return LettuceConverters.toBytesSet(getConnection().hkeys(key));
@@ -287,7 +287,7 @@ public Long hLen(byte[] key) {
287287
return null;
288288
}
289289
if (isQueueing()) {
290-
transaction(connection.newLettuceTxResult(getConnection().hlen(key)));
290+
transaction(connection.newLettuceResult(getAsyncConnection().hlen(key)));
291291
return null;
292292
}
293293
return getConnection().hlen(key);
@@ -313,7 +313,7 @@ public List<byte[]> hMGet(byte[] key, byte[]... fields) {
313313
return null;
314314
}
315315
if (isQueueing()) {
316-
transaction(connection.newLettuceTxResult(getConnection().hmget(key, fields),
316+
transaction(connection.newLettuceResult(getAsyncConnection().hmget(key, fields),
317317
LettuceConverters.keyValueListUnwrapper()));
318318
return null;
319319
}
@@ -339,7 +339,7 @@ public void hMSet(byte[] key, Map<byte[], byte[]> hashes) {
339339
return;
340340
}
341341
if (isQueueing()) {
342-
transaction(connection.newLettuceTxStatusResult(getConnection().hmset(key, hashes)));
342+
transaction(connection.newLettuceStatusResult(getAsyncConnection().hmset(key, hashes)));
343343
return;
344344
}
345345
getConnection().hmset(key, hashes);
@@ -363,7 +363,7 @@ public List<byte[]> hVals(byte[] key) {
363363
return null;
364364
}
365365
if (isQueueing()) {
366-
transaction(connection.newLettuceTxResult(getConnection().hvals(key)));
366+
transaction(connection.newLettuceResult(getAsyncConnection().hvals(key)));
367367
return null;
368368
}
369369
return getConnection().hvals(key);
@@ -434,7 +434,7 @@ public Long hStrLen(byte[] key, byte[] field) {
434434
return null;
435435
}
436436
if (isQueueing()) {
437-
transaction(connection.newLettuceTxResult(getConnection().hstrlen(key, field)));
437+
transaction(connection.newLettuceResult(getAsyncConnection().hstrlen(key, field)));
438438
return null;
439439
}
440440
return getConnection().hstrlen(key, field);
@@ -455,7 +455,7 @@ private void pipeline(LettuceResult result) {
455455
connection.pipeline(result);
456456
}
457457

458-
private void transaction(LettuceTxResult result) {
458+
private void transaction(LettuceResult result) {
459459
connection.transaction(result);
460460
}
461461

0 commit comments

Comments
 (0)