Skip to content

Commit 6cddcee

Browse files
committed
Refine and clarify operations in asynchronous caching implementation.
Uses more descriptive names for operations, especially Reactive operations, by either calling a local, private method or introducing a variable with strongly typed parameters. Original Pull Request: spring-projects#2717 Closes spring-projects#2743
1 parent cc668b4 commit 6cddcee

File tree

1 file changed

+103
-85
lines changed

1 file changed

+103
-85
lines changed

src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java

Lines changed: 103 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
20+
import reactor.core.publisher.SignalType;
2021

2122
import java.nio.ByteBuffer;
2223
import java.nio.charset.StandardCharsets;
2324
import java.time.Duration;
2425
import java.util.concurrent.CompletableFuture;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.atomic.AtomicLong;
28+
import java.util.function.Consumer;
2729
import java.util.function.Function;
2830

2931
import org.springframework.dao.PessimisticLockingFailureException;
@@ -37,7 +39,6 @@
3739
import org.springframework.data.redis.util.ByteUtils;
3840
import org.springframework.lang.Nullable;
3941
import org.springframework.util.Assert;
40-
import org.springframework.util.ClassUtils;
4142
import org.springframework.util.ObjectUtils;
4243

4344
/**
@@ -47,11 +48,11 @@
4748
* <p>
4849
* {@link DefaultRedisCacheWriter} can be used in
4950
* {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or
50-
* {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
51-
* {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for
52-
* operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents
53-
* command overlap by setting an explicit lock key and checking against presence of this key which leads to additional
54-
* requests and potential command wait times.
51+
* {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While {@literal non-locking}
52+
* aims for maximum performance it may result in overlapping, non-atomic, command execution for operations spanning
53+
* multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents command overlap
54+
* by setting an explicit lock key and checking against presence of this key which leads to additional requests
55+
* and potential command wait times.
5556
*
5657
* @author Christoph Strobl
5758
* @author Mark Paluch
@@ -61,8 +62,7 @@
6162
*/
6263
class DefaultRedisCacheWriter implements RedisCacheWriter {
6364

64-
private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils
65-
.isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null);
65+
private final AsyncCacheWriter asyncCacheWriter;
6666

6767
private final BatchStrategy batchStrategy;
6868

@@ -74,33 +74,21 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
7474

7575
private final TtlFunction lockTtl;
7676

77-
private final AsyncCacheWriter asyncCacheWriter;
78-
79-
/**
80-
* @param connectionFactory must not be {@literal null}.
81-
* @param batchStrategy must not be {@literal null}.
82-
*/
8377
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, BatchStrategy batchStrategy) {
8478
this(connectionFactory, Duration.ZERO, batchStrategy);
8579
}
8680

8781
/**
88-
* @param connectionFactory must not be {@literal null}.
89-
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
90-
* to disable locking.
91-
* @param batchStrategy must not be {@literal null}.
82+
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
83+
* Use {@link Duration#ZERO} to disable locking.
9284
*/
9385
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, BatchStrategy batchStrategy) {
9486
this(connectionFactory, sleepTime, TtlFunction.persistent(), CacheStatisticsCollector.none(), batchStrategy);
9587
}
9688

9789
/**
98-
* @param connectionFactory must not be {@literal null}.
99-
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
100-
* to disable locking.
101-
* @param lockTtl Lock TTL function must not be {@literal null}.
102-
* @param cacheStatisticsCollector must not be {@literal null}.
103-
* @param batchStrategy must not be {@literal null}.
90+
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
91+
* Use {@link Duration#ZERO} to disable locking.
10492
*/
10593
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, TtlFunction lockTtl,
10694
CacheStatisticsCollector cacheStatisticsCollector, BatchStrategy batchStrategy) {
@@ -116,12 +104,12 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
116104
this.lockTtl = lockTtl;
117105
this.statistics = cacheStatisticsCollector;
118106
this.batchStrategy = batchStrategy;
107+
this.asyncCacheWriter = isAsyncCacheSupportEnabled() ? new AsynchronousCacheWriterDelegate()
108+
: UnsupportedAsyncCacheWriter.INSTANCE;
109+
}
119110

120-
if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this.connectionFactory instanceof ReactiveRedisConnectionFactory) {
121-
asyncCacheWriter = new AsynchronousCacheWriterDelegate();
122-
} else {
123-
asyncCacheWriter = UnsupportedAsyncCacheWriter.INSTANCE;
124-
}
111+
private boolean isAsyncCacheSupportEnabled() {
112+
return this.connectionFactory instanceof ReactiveRedisConnectionFactory;
125113
}
126114

127115
@Override
@@ -162,18 +150,19 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
162150
Assert.notNull(key, "Key must not be null");
163151

164152
return asyncCacheWriter.retrieve(name, key, ttl) //
165-
.thenApply(cachedValue -> {
153+
.thenApply(cachedValue -> {
166154

167-
statistics.incGets(name);
155+
statistics.incGets(name);
168156

169-
if (cachedValue != null) {
170-
statistics.incHits(name);
171-
} else {
172-
statistics.incMisses(name);
173-
}
157+
if (cachedValue != null) {
158+
statistics.incHits(name);
159+
}
160+
else {
161+
statistics.incMisses(name);
162+
}
174163

175-
return cachedValue;
176-
});
164+
return cachedValue;
165+
});
177166
}
178167

179168
@Override
@@ -186,8 +175,7 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
186175
execute(name, connection -> {
187176

188177
if (shouldExpireWithin(ttl)) {
189-
connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS),
190-
SetOption.upsert());
178+
connection.stringCommands().set(key, value, toExpiration(ttl), SetOption.upsert());
191179
} else {
192180
connection.stringCommands().set(key, value);
193181
}
@@ -224,16 +212,11 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat
224212

225213
try {
226214

227-
boolean put;
228-
229-
if (shouldExpireWithin(ttl)) {
230-
put = ObjectUtils.nullSafeEquals(
231-
connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent()), true);
232-
} else {
233-
put = ObjectUtils.nullSafeEquals(connection.stringCommands().setNX(key, value), true);
234-
}
215+
Boolean wasSet = shouldExpireWithin(ttl)
216+
? connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent())
217+
: connection.stringCommands().setNX(key, value);
235218

236-
if (put) {
219+
if (Boolean.TRUE.equals(wasSet)) {
237220
statistics.incPuts(name);
238221
return null;
239222
}
@@ -378,12 +361,10 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
378361
Thread.sleep(this.sleepTime.toMillis());
379362
}
380363
} catch (InterruptedException cause) {
381-
382364
// Re-interrupt current Thread to allow other participants to react.
383365
Thread.currentThread().interrupt();
384-
385-
throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name),
386-
cause);
366+
String message = "Interrupted while waiting to unlock cache %s".formatted(name);
367+
throw new PessimisticLockingFailureException(message, cause);
387368
} finally {
388369
this.statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs);
389370
}
@@ -401,6 +382,14 @@ private static boolean shouldExpireWithin(@Nullable Duration ttl) {
401382
return ttl != null && !ttl.isZero() && !ttl.isNegative();
402383
}
403384

385+
private Expiration toExpiration(Duration ttl) {
386+
return Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS);
387+
}
388+
389+
private Expiration toExpiration(Object key, @Nullable Object value) {
390+
return Expiration.from(this.lockTtl.getTimeToLive(key, value));
391+
}
392+
404393
/**
405394
* Interface for asynchronous cache retrieval.
406395
*
@@ -419,8 +408,8 @@ interface AsyncCacheWriter {
419408
* @param name the cache name from which to retrieve the cache entry.
420409
* @param key the cache entry key.
421410
* @param ttl optional TTL to set for Time-to-Idle eviction.
422-
* @return a future that completes either with a value if the value exists or completing with {@code null} if the
423-
* cache does not contain an entry.
411+
* @return a future that completes either with a value if the value exists or completing with {@code null}
412+
* if the cache does not contain an entry.
424413
*/
425414
CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Duration ttl);
426415

@@ -463,8 +452,8 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
463452
}
464453

465454
/**
466-
* Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using
467-
* {@link ReactiveRedisConnectionFactory}.
455+
* Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations
456+
* using {@link ReactiveRedisConnectionFactory}.
468457
*
469458
* @since 3.2
470459
*/
@@ -481,11 +470,13 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
481470
return doWithConnection(connection -> {
482471

483472
ByteBuffer wrappedKey = ByteBuffer.wrap(key);
473+
484474
Mono<?> cacheLockCheck = isLockingCacheWriter() ? waitForLock(connection, name) : Mono.empty();
475+
485476
ReactiveStringCommands stringCommands = connection.stringCommands();
486477

487478
Mono<ByteBuffer> get = shouldExpireWithin(ttl)
488-
? stringCommands.getEx(wrappedKey, Expiration.from(ttl))
479+
? stringCommands.getEx(wrappedKey, toExpiration(ttl))
489480
: stringCommands.get(wrappedKey);
490481

491482
return cacheLockCheck.then(get).map(ByteUtils::getBytes).toFuture();
@@ -498,41 +489,44 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
498489
return doWithConnection(connection -> {
499490

500491
Mono<?> mono = isLockingCacheWriter()
501-
? doStoreWithLocking(name, key, value, ttl, connection)
492+
? doLockStoreUnlock(name, key, value, ttl, connection)
502493
: doStore(key, value, ttl, connection);
503494

504495
return mono.then().toFuture();
505496
});
506497
}
507498

508-
private Mono<Boolean> doStoreWithLocking(String name, byte[] key, byte[] value, @Nullable Duration ttl,
509-
ReactiveRedisConnection connection) {
510-
511-
return Mono.usingWhen(doLock(name, key, value, connection), unused -> doStore(key, value, ttl, connection),
512-
unused -> doUnlock(name, connection));
513-
}
514-
515499
private Mono<Boolean> doStore(byte[] cacheKey, byte[] value, @Nullable Duration ttl,
516500
ReactiveRedisConnection connection) {
517501

518502
ByteBuffer wrappedKey = ByteBuffer.wrap(cacheKey);
519503
ByteBuffer wrappedValue = ByteBuffer.wrap(value);
520504

521-
if (shouldExpireWithin(ttl)) {
522-
return connection.stringCommands().set(wrappedKey, wrappedValue,
523-
Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
524-
} else {
525-
return connection.stringCommands().set(wrappedKey, wrappedValue);
526-
}
505+
ReactiveStringCommands stringCommands = connection.stringCommands();
506+
507+
return shouldExpireWithin(ttl)
508+
? stringCommands.set(wrappedKey, wrappedValue, toExpiration(ttl), SetOption.upsert())
509+
: stringCommands.set(wrappedKey, wrappedValue);
527510
}
528511

512+
private Mono<Boolean> doLockStoreUnlock(String name, byte[] key, byte[] value, @Nullable Duration ttl,
513+
ReactiveRedisConnection connection) {
514+
515+
Mono<Object> lock = doLock(name, key, value, connection);
516+
517+
Function<Object, Mono<Boolean>> store = unused -> doStore(key, value, ttl, connection);
518+
Function<Object, Mono<Void>> unlock = unused -> doUnlock(name, connection);
519+
520+
return Mono.usingWhen(lock, store, unlock);
521+
}
529522

530523
private Mono<Object> doLock(String name, Object contextualKey, @Nullable Object contextualValue,
531524
ReactiveRedisConnection connection) {
532525

533526
ByteBuffer key = ByteBuffer.wrap(createCacheLockKey(name));
534527
ByteBuffer value = ByteBuffer.wrap(new byte[0]);
535-
Expiration expiration = Expiration.from(lockTtl.getTimeToLive(contextualKey, contextualValue));
528+
529+
Expiration expiration = toExpiration(contextualKey, contextualValue);
536530

537531
return connection.stringCommands().set(key, value, expiration, SetOption.SET_IF_ABSENT) //
538532
// Ensure we emit an object, otherwise, the Mono.usingWhen operator doesn't run the inner resource function.
@@ -545,28 +539,52 @@ private Mono<Void> doUnlock(String name, ReactiveRedisConnection connection) {
545539

546540
private Mono<Void> waitForLock(ReactiveRedisConnection connection, String cacheName) {
547541

548-
AtomicLong lockWaitTimeNs = new AtomicLong();
549-
byte[] cacheLockKey = createCacheLockKey(cacheName);
542+
AtomicLong lockWaitNanoTime = new AtomicLong();
543+
544+
Consumer<org.reactivestreams.Subscription> setNanoTimeOnLockWait = subscription ->
545+
lockWaitNanoTime.set(System.nanoTime());
550546

551-
Flux<Long> wait = Flux.interval(Duration.ZERO, sleepTime);
552-
Mono<Boolean> exists = connection.keyCommands().exists(ByteBuffer.wrap(cacheLockKey)).filter(it -> !it);
547+
Consumer<SignalType> recordStatistics = signalType ->
548+
statistics.incLockTime(cacheName, System.nanoTime() - lockWaitNanoTime.get());
553549

554-
return wait.doOnSubscribe(subscription -> lockWaitTimeNs.set(System.nanoTime())) //
555-
.flatMap(it -> exists) //
556-
.doFinally(signalType -> statistics.incLockTime(cacheName, System.nanoTime() - lockWaitTimeNs.get())) //
550+
Function<Long, Mono<Boolean>> doWhileCacheLockExists = lockWaitTime -> connection.keyCommands()
551+
.exists(toCacheLockKey(cacheName)).filter(cacheLockKeyExists -> !cacheLockKeyExists);
552+
553+
return waitInterval(sleepTime) //
554+
.doOnSubscribe(setNanoTimeOnLockWait) //
555+
.flatMap(doWhileCacheLockExists) //
556+
.doFinally(recordStatistics) //
557557
.next() //
558558
.then();
559559
}
560560

561+
private Flux<Long> waitInterval(Duration period) {
562+
return Flux.interval(Duration.ZERO, period);
563+
}
564+
565+
private ByteBuffer toCacheLockKey(String cacheName) {
566+
return ByteBuffer.wrap(createCacheLockKey(cacheName));
567+
}
568+
569+
private ReactiveRedisConnectionFactory getReactiveConnectionFactory() {
570+
return (ReactiveRedisConnectionFactory) DefaultRedisCacheWriter.this.connectionFactory;
571+
}
572+
573+
private Mono<ReactiveRedisConnection> getReactiveConnection() {
574+
return Mono.fromSupplier(getReactiveConnectionFactory()::getReactiveConnection);
575+
}
576+
561577
private <T> CompletableFuture<T> doWithConnection(
562578
Function<ReactiveRedisConnection, CompletableFuture<T>> callback) {
563579

564-
ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory) connectionFactory;
580+
Function<ReactiveRedisConnection, Mono<T>> commandExecution = connection ->
581+
Mono.fromCompletionStage(callback.apply(connection));
582+
583+
Function<ReactiveRedisConnection, Mono<Void>> connectionClose = ReactiveRedisConnection::closeLater;
584+
585+
Mono<T> result = Mono.usingWhen(getReactiveConnection(), commandExecution, connectionClose);
565586

566-
return Mono.usingWhen(Mono.fromSupplier(cf::getReactiveConnection), //
567-
it -> Mono.fromCompletionStage(callback.apply(it)), //
568-
ReactiveRedisConnection::closeLater) //
569-
.toFuture();
587+
return result.toFuture();
570588
}
571589
}
572590
}

0 commit comments

Comments
 (0)