Skip to content

Commit 9775734

Browse files
committed
DATAREDIS-693 - Polishing.
Add Template methods for unlink(…). Align unlink with del/mDel methods on reactive API using concatMap to retain order and provide batching/streaming. Reword Javadoc. Extend tests. Replace blocking calls with StepVerifier in LettuceReactiveKeyCommandsTests. Original pull request: #294.
1 parent c11b8d8 commit 9775734

File tree

11 files changed

+326
-91
lines changed

11 files changed

+326
-91
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveKeyCommands.java

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -265,52 +265,81 @@ default Mono<Long> del(ByteBuffer key) {
265265
Flux<NumericResponse<KeyCommand, Long>> del(Publisher<KeyCommand> keys);
266266

267267
/**
268-
* Unlink {@literal key}.
268+
* Delete multiple {@literal keys} one in one batch.
269269
*
270270
* @param keys must not be {@literal null}.
271271
* @return
272+
* @see <a href="http://redis.io/commands/del">Redis Documentation: DEL</a>
273+
*/
274+
default Mono<Long> mDel(List<ByteBuffer> keys) {
275+
276+
Assert.notEmpty(keys, "Keys must not be empty or null!");
277+
278+
return mDel(Mono.just(keys)).next().map(NumericResponse::getOutput);
279+
}
280+
281+
/**
282+
* Delete multiple {@literal keys} in batches.
283+
*
284+
* @param keys must not be {@literal null}.
285+
* @return {@link Flux} of {@link NumericResponse} holding the {@literal keys} removed along with the deletion result.
286+
* @see <a href="http://redis.io/commands/del">Redis Documentation: DEL</a>
287+
*/
288+
Flux<NumericResponse<List<ByteBuffer>, Long>> mDel(Publisher<List<ByteBuffer>> keys);
289+
290+
/**
291+
* Unlink the {@code key} from the keyspace. Unlike with {@link #del(ByteBuffer)} the actual memory reclaiming here
292+
* happens asynchronously.
293+
*
294+
* @param key must not be {@literal null}.
295+
* @return
272296
* @see <a href="http://redis.io/commands/unlink">Redis Documentation: UNLINK</a>
273297
* @since 2.1
274298
*/
275-
default Mono<Long> unlink(List<ByteBuffer> keys) {
299+
default Mono<Long> unlink(ByteBuffer key) {
276300

277-
Assert.notNull(keys, "Key must not be null!");
301+
Assert.notNull(key, "Keys must not be null!");
278302

279-
return unlink(Mono.just(keys)).next().map(NumericResponse::getOutput);
303+
return unlink(Mono.just(key).map(KeyCommand::new)).next().map(NumericResponse::getOutput);
280304
}
281305

282306
/**
283-
* Unlink {@literal keys}.
307+
* Unlink the {@code key} from the keyspace. Unlike with {@link #del(ByteBuffer)} the actual memory reclaiming here
308+
* happens asynchronously.
284309
*
285310
* @param keys must not be {@literal null}.
286-
* @return {@link Flux} of {@link NumericResponse} holding the {@literal key} removed along with the deletion result.
311+
* @return {@link Flux} of {@link NumericResponse} holding the {@literal key} removed along with the unlink result.
287312
* @see <a href="http://redis.io/commands/unlink">Redis Documentation: UNLINK</a>
288313
* @since 2.1
289314
*/
290-
Flux<NumericResponse<List<ByteBuffer>, Long>> unlink(Publisher<List<ByteBuffer>> keys);
315+
Flux<NumericResponse<KeyCommand, Long>> unlink(Publisher<KeyCommand> keys);
291316

292317
/**
293-
* Delete multiple {@literal keys} one in one batch.
318+
* Unlink the {@code keys} from the keyspace. Unlike with {@link #mDel(List)} the actual memory reclaiming here
319+
* happens asynchronously.
294320
*
295321
* @param keys must not be {@literal null}.
296322
* @return
297-
* @see <a href="http://redis.io/commands/del">Redis Documentation: DEL</a>
323+
* @see <a href="http://redis.io/commands/unlink">Redis Documentation: UNLINK</a>
324+
* @since 2.1
298325
*/
299-
default Mono<Long> mDel(List<ByteBuffer> keys) {
326+
default Mono<Long> mUnlink(List<ByteBuffer> keys) {
300327

301-
Assert.notEmpty(keys, "Keys must not be empty or null!");
328+
Assert.notNull(keys, "Keys must not be null!");
302329

303-
return mDel(Mono.just(keys)).next().map(NumericResponse::getOutput);
330+
return mUnlink(Mono.just(keys)).next().map(NumericResponse::getOutput);
304331
}
305332

306333
/**
307-
* Delete multiple {@literal keys} in batches.
334+
* Unlink the {@code keys} from the keyspace. Unlike with {@link #mDel(Publisher)} the actual memory reclaiming here
335+
* happens asynchronously.
308336
*
309337
* @param keys must not be {@literal null}.
310-
* @return {@link Flux} of {@link NumericResponse} holding the {@literal keys} removed along with the deletion result.
311-
* @see <a href="http://redis.io/commands/del">Redis Documentation: DEL</a>
338+
* @return {@link Flux} of {@link NumericResponse} holding the {@literal key} removed along with the deletion result.
339+
* @see <a href="http://redis.io/commands/unlink">Redis Documentation: UNLINK</a>
340+
* @since 2.1
312341
*/
313-
Flux<NumericResponse<List<ByteBuffer>, Long>> mDel(Publisher<List<ByteBuffer>> keys);
342+
Flux<NumericResponse<List<ByteBuffer>, Long>> mUnlink(Publisher<List<ByteBuffer>> keys);
314343

315344
/**
316345
* {@code EXPIRE}/{@code PEXPIRE} command parameters.

src/main/java/org/springframework/data/redis/connection/RedisKeyCommands.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ default Boolean exists(byte[] key) {
7171
Long del(byte[]... keys);
7272

7373
/**
74-
* Unlinks the {@code keys} from the keyspace. Unlike with {@link #del(byte[]...)} the actual removal here happens
75-
* asynchronously.
74+
* Unlink the {@code keys} from the keyspace. Unlike with {@link #del(byte[]...)} the actual memory reclaiming here
75+
* happens asynchronously.
7676
*
7777
* @param keys must not be {@literal null}.
7878
* @return {@literal null} when used in pipeline / transaction.

src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ interface StringTuple extends Tuple {
114114
Long del(String... keys);
115115

116116
/**
117-
* Unlinks the {@code keys} from the keyspace. Unlike with {@link #del(byte[]...)} the actual removal here happens
118-
* asynchronously.
117+
* Unlink the {@code keys} from the keyspace. Unlike with {@link #del(String...)} the actual memory reclaiming here
118+
* happens asynchronously.
119119
*
120120
* @param keys must not be {@literal null}.
121121
* @return {@literal null} when used in pipeline / transaction.

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveKeyCommands.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.nio.ByteBuffer;
2323
import java.util.Collection;
2424
import java.util.List;
25-
import java.util.stream.Collectors;
2625

2726
import org.reactivestreams.Publisher;
2827
import org.springframework.data.redis.connection.DataType;
@@ -177,33 +176,47 @@ public Flux<NumericResponse<KeyCommand, Long>> del(Publisher<KeyCommand> command
177176

178177
/*
179178
* (non-Javadoc)
180-
* @see org.springframework.data.redis.connection.ReactiveRedisConnection.ReactiveKeyCommands#unlink(org.reactivestreams.Publisher)
179+
* @see org.springframework.data.redis.connection.ReactiveRedisConnection.ReactiveKeyCommands#mDel(org.reactivestreams.Publisher)
181180
*/
182181
@Override
183-
public Flux<NumericResponse<List<ByteBuffer>, Long>> unlink(Publisher<List<ByteBuffer>> keysCollection) {
182+
public Flux<NumericResponse<List<ByteBuffer>, Long>> mDel(Publisher<List<ByteBuffer>> keysCollection) {
184183

185-
return connection.execute(cmd -> Flux.from(keysCollection).flatMap((keys) -> {
184+
return connection.execute(cmd -> Flux.from(keysCollection).concatMap((keys) -> {
186185

187186
Assert.notEmpty(keys, "Keys must not be null!");
188187

189-
return cmd.unlink(keys.stream().collect(Collectors.toList()).toArray(new ByteBuffer[keys.size()]))
188+
return cmd.del(keys.toArray(new ByteBuffer[keys.size()]))
190189
.map((value) -> new NumericResponse<>(keys, value));
191190
}));
192191
}
193192

194193
/*
195194
* (non-Javadoc)
196-
* @see org.springframework.data.redis.connection.ReactiveRedisConnection.ReactiveKeyCommands#mDel(org.reactivestreams.Publisher)
195+
* @see org.springframework.data.redis.connection.ReactiveRedisConnection.ReactiveKeyCommands#unlink(org.reactivestreams.Publisher)
197196
*/
198197
@Override
199-
public Flux<NumericResponse<List<ByteBuffer>, Long>> mDel(Publisher<List<ByteBuffer>> keysCollection) {
198+
public Flux<NumericResponse<KeyCommand, Long>> unlink(Publisher<KeyCommand> commands) {
199+
200+
return connection.execute(cmd -> Flux.from(commands).concatMap((command) -> {
201+
202+
Assert.notNull(command.getKey(), "Key must not be null!");
203+
204+
return cmd.unlink(command.getKey()).map((value) -> new NumericResponse<>(command, value));
205+
}));
206+
}
207+
208+
/*
209+
* (non-Javadoc)
210+
* @see org.springframework.data.redis.connection.ReactiveRedisConnection.ReactiveKeyCommands#mUnlink(org.reactivestreams.Publisher)
211+
*/
212+
@Override
213+
public Flux<NumericResponse<List<ByteBuffer>, Long>> mUnlink(Publisher<List<ByteBuffer>> keysCollection) {
200214

201215
return connection.execute(cmd -> Flux.from(keysCollection).concatMap((keys) -> {
202216

203217
Assert.notEmpty(keys, "Keys must not be null!");
204218

205-
return cmd.del(keys.stream().collect(Collectors.toList()).toArray(new ByteBuffer[keys.size()]))
206-
.map((value) -> new NumericResponse<>(keys, value));
219+
return cmd.unlink(keys.toArray(new ByteBuffer[keys.size()])).map((value) -> new NumericResponse<>(keys, value));
207220
}));
208221
}
209222

src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,28 @@ public interface ReactiveRedisOperations<K, V> {
132132
*/
133133
Mono<Long> delete(Publisher<K> keys);
134134

135+
/**
136+
* Unlink the {@code key} from the keyspace. Unlike with {@link #delete(Object[])} the actual memory reclaiming here
137+
* happens asynchronously.
138+
*
139+
* @param key must not be {@literal null}.
140+
* @return The number of keys that were removed. {@literal null} when used in pipeline / transaction.
141+
* @see <a href="http://redis.io/commands/unlink">Redis Documentation: UNLINK</a>
142+
* @since 2.1
143+
*/
144+
Mono<Long> unlink(K... key);
145+
146+
/**
147+
* Unlink the {@code keys} from the keyspace. Unlike with {@link #delete(Publisher)} the actual memory reclaiming here
148+
* happens asynchronously.
149+
*
150+
* @param keys must not be {@literal null}.
151+
* @return The number of keys that were removed. {@literal null} when used in pipeline / transaction.
152+
* @see <a href="http://redis.io/commands/unlink">Redis Documentation: UNLINK</a>
153+
* @since 2.1
154+
*/
155+
Mono<Long> unlink(Publisher<K> keys);
156+
135157
/**
136158
* Set time to live for given {@code key}.
137159
*

src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,40 @@ public Mono<Long> delete(Publisher<K> keys) {
304304
.map(CommandResponse::getOutput));
305305
}
306306

307+
/*
308+
* (non-Javadoc)
309+
* @see org.springframework.data.redis.core.ReactiveRedisOperations#unlink(java.lang.Object[])
310+
*/
311+
@Override
312+
@SafeVarargs
313+
public final Mono<Long> unlink(K... keys) {
314+
315+
Assert.notNull(keys, "Keys must not be null!");
316+
Assert.notEmpty(keys, "Keys must not be empty!");
317+
Assert.noNullElements(keys, "Keys must not contain null elements!");
318+
319+
if (keys.length == 1) {
320+
return createMono(connection -> connection.keyCommands().unlink(rawKey(keys[0])));
321+
}
322+
323+
Mono<List<ByteBuffer>> listOfKeys = Flux.fromArray(keys).map(this::rawKey).collectList();
324+
return createMono(connection -> listOfKeys.flatMap(rawKeys -> connection.keyCommands().mUnlink(rawKeys)));
325+
}
326+
327+
/*
328+
* (non-Javadoc)
329+
* @see org.springframework.data.redis.core.ReactiveRedisOperations#unlink(org.reactivestreams.Publisher)
330+
*/
331+
@Override
332+
public Mono<Long> unlink(Publisher<K> keys) {
333+
334+
Assert.notNull(keys, "Keys must not be null!");
335+
336+
return createMono(connection -> connection.keyCommands() //
337+
.unlink(Flux.from(keys).map(this::rawKey).map(KeyCommand::new)) //
338+
.map(CommandResponse::getOutput));
339+
}
340+
307341
/*
308342
* (non-Javadoc)
309343
* @see org.springframework.data.redis.core.ReactiveRedisOperations#expire(java.lang.Object, java.time.Duration)

src/main/java/org/springframework/data/redis/core/RedisOperations.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,30 @@ <T> T execute(RedisScript<T> script, RedisSerializer<?> argsSerializer, RedisSer
194194
@Nullable
195195
Long delete(Collection<K> keys);
196196

197+
/**
198+
* Unlink the {@code key} from the keyspace. Unlike with {@link #delete(Object)} the actual memory reclaiming here
199+
* happens asynchronously.
200+
*
201+
* @param key must not be {@literal null}.
202+
* @return The number of keys that were removed. {@literal null} when used in pipeline / transaction.
203+
* @see <a href="http://redis.io/commands/unlink">Redis Documentation: UNLINK</a>
204+
* @since 2.1
205+
*/
206+
@Nullable
207+
Boolean unlink(K key);
208+
209+
/**
210+
* Unlink the {@code keys} from the keyspace. Unlike with {@link #delete(Collection)} the actual memory reclaiming
211+
* here happens asynchronously.
212+
*
213+
* @param keys must not be {@literal null}.
214+
* @return The number of keys that were removed. {@literal null} when used in pipeline / transaction.
215+
* @see <a href="http://redis.io/commands/unlink">Redis Documentation: UNLINK</a>
216+
* @since 2.1
217+
*/
218+
@Nullable
219+
Long unlink(Collection<K> keys);
220+
197221
/**
198222
* Determine the type stored at {@code key}.
199223
*

src/main/java/org/springframework/data/redis/core/RedisTemplate.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,36 @@ public Long delete(Collection<K> keys) {
727727
return execute(connection -> connection.del(rawKeys), true);
728728
}
729729

730+
/*
731+
* (non-Javadoc)
732+
* @see org.springframework.data.redis.core.RedisOperations#unlink(java.lang.Object)
733+
*/
734+
@Override
735+
public Boolean unlink(K key) {
736+
737+
byte[] rawKey = rawKey(key);
738+
739+
Long result = execute(connection -> connection.unlink(rawKey), true);
740+
741+
return result != null && result.intValue() == 1;
742+
}
743+
744+
/*
745+
* (non-Javadoc)
746+
* @see org.springframework.data.redis.core.RedisOperations#unlink(java.util.Collection)
747+
*/
748+
@Override
749+
public Long unlink(Collection<K> keys) {
750+
751+
if (CollectionUtils.isEmpty(keys)) {
752+
return 0L;
753+
}
754+
755+
byte[][] rawKeys = rawKeys(keys);
756+
757+
return execute(connection -> connection.unlink(rawKeys), true);
758+
}
759+
730760
/*
731761
* (non-Javadoc)
732762
* @see org.springframework.data.redis.core.RedisOperations#hasKey(java.lang.Object)

src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,6 +1128,38 @@ public void testDel() {
11281128
verifyResults(Arrays.asList(true, 1L, false));
11291129
}
11301130

1131+
@Test // DATAREDIS-693
1132+
@IfProfileValue(name = "redisVersion", value = "4.0+")
1133+
public void unlinkReturnsNrOfKeysRemoved() {
1134+
1135+
connection.set("unlink.this", "Can't track this!");
1136+
1137+
actual.add(connection.unlink("unlink.this", "unlink.that"));
1138+
1139+
verifyResults(Arrays.asList(new Object[] { 1L }));
1140+
}
1141+
1142+
@Test // DATAREDIS-693
1143+
@IfProfileValue(name = "redisVersion", value = "4.0+")
1144+
public void testUnlinkBatch() {
1145+
1146+
actual.add(connection.set("testing", "123"));
1147+
actual.add(connection.set("foo", "bar"));
1148+
actual.add(connection.unlink("testing", "foo"));
1149+
actual.add(connection.exists("testing"));
1150+
1151+
verifyResults(Arrays.asList(true, true, 2L, false));
1152+
}
1153+
1154+
@Test // DATAREDIS-693
1155+
@IfProfileValue(name = "redisVersion", value = "4.0+")
1156+
public void unlinkReturnsZeroIfNoKeysRemoved() {
1157+
1158+
actual.add(connection.unlink("unlink.this"));
1159+
1160+
verifyResults(Arrays.asList(new Object[] { 0L }));
1161+
}
1162+
11311163
@Test
11321164
public void testType() {
11331165

@@ -2790,26 +2822,6 @@ public void touchReturnsZeroIfNoKeysTouched() {
27902822
verifyResults(Arrays.asList(new Object[] { 0L }));
27912823
}
27922824

2793-
@Test // DATAREDIS-693
2794-
@IfProfileValue(name = "redisVersion", value = "4.0+")
2795-
public void unlinkReturnsNrOfKeysRemoved() {
2796-
2797-
connection.set("unlink.this", "Can't track this!");
2798-
2799-
actual.add(connection.unlink("unlink.this", "unlink.that"));
2800-
2801-
verifyResults(Arrays.asList(new Object[] { 1L }));
2802-
}
2803-
2804-
@Test // DATAREDIS-693
2805-
@IfProfileValue(name = "redisVersion", value = "4.0+")
2806-
public void unlinkReturnsZeroIfNoKeysRemoved() {
2807-
2808-
actual.add(connection.unlink("unlink.this"));
2809-
2810-
verifyResults(Arrays.asList(new Object[] { 0L }));
2811-
}
2812-
28132825
protected void verifyResults(List<Object> expected) {
28142826
assertEquals(expected, getResults());
28152827
}

0 commit comments

Comments
 (0)