Skip to content

Commit a99a738

Browse files
mp911dechristophstrobl
authored andcommitted
DATAREDIS-711 - Polishing.
Strengthen non-null requirements when obtaining the native connection from LettuceReactiveRedisConnection. Guard close against multiple calls and use concatMap instead of flatMap to retain publisher sequence. Original Pull Request: #282
1 parent ebc0c5e commit a99a738

20 files changed

+133
-123
lines changed

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterHyperLogLogCommands.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class LettuceReactiveClusterHyperLogLogCommands extends LettuceReactiveHyperLogL
5353
@Override
5454
public Flux<BooleanResponse<PfMergeCommand>> pfMerge(Publisher<PfMergeCommand> commands) {
5555

56-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
56+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
5757

5858
Assert.notNull(command.getKey(), "Key must not be null for PFMERGE");
5959
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty for PFMERGE!");
@@ -76,7 +76,7 @@ public Flux<BooleanResponse<PfMergeCommand>> pfMerge(Publisher<PfMergeCommand> c
7676
@Override
7777
public Flux<NumericResponse<PfCountCommand, Long>> pfCount(Publisher<PfCountCommand> commands) {
7878

79-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
79+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
8080

8181
Assert.notEmpty(command.getKeys(), "Keys must be null or empty for PFCOUNT!");
8282

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterKeyCommands.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public Mono<ByteBuffer> randomKey(RedisClusterNode node) {
7676
@Override
7777
public Flux<BooleanResponse<RenameCommand>> rename(Publisher<RenameCommand> commands) {
7878

79-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
79+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
8080

8181
Assert.notNull(command.getKey(), "key must not be null.");
8282
Assert.notNull(command.getNewName(), "NewName must not be null!");
@@ -102,7 +102,7 @@ public Flux<BooleanResponse<RenameCommand>> rename(Publisher<RenameCommand> comm
102102
@Override
103103
public Flux<BooleanResponse<RenameCommand>> renameNX(Publisher<RenameCommand> commands) {
104104

105-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
105+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
106106

107107
Assert.notNull(command.getKey(), "Key must not be null.");
108108
Assert.notNull(command.getNewName(), "NewName must not be null!");

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterListCommands.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class LettuceReactiveClusterListCommands extends LettuceReactiveListCommands imp
4949
@Override
5050
public Flux<PopResponse> bPop(Publisher<BPopCommand> commands) {
5151

52-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
52+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
5353

5454
Assert.notNull(command.getKeys(), "Keys must not be null!");
5555
Assert.notNull(command.getDirection(), "Direction must not be null!");
@@ -68,7 +68,7 @@ public Flux<PopResponse> bPop(Publisher<BPopCommand> commands) {
6868
@Override
6969
public Flux<ByteBufferResponse<RPopLPushCommand>> rPopLPush(Publisher<RPopLPushCommand> commands) {
7070

71-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
71+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
7272

7373
Assert.notNull(command.getKey(), "Key must not be null!");
7474
Assert.notNull(command.getDestination(), "Destination key must not be null!");

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterServerCommands.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ public Flux<RedisClientInfo> getClientList() {
280280
public Flux<RedisClientInfo> getClientList(RedisClusterNode node) {
281281

282282
return connection.execute(node, RedisServerReactiveCommands::clientList)
283-
.flatMapIterable(LettuceConverters.stringToRedisClientListConverter()::convert);
283+
.concatMapIterable(LettuceConverters.stringToRedisClientListConverter()::convert);
284284
}
285285

286286
private <T> Collection<Publisher<Tuple2<RedisClusterNode, T>>> executeOnAllNodes(

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterSetCommands.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class LettuceReactiveClusterSetCommands extends LettuceReactiveSetCommands imple
5353
@Override
5454
public Flux<CommandResponse<SUnionCommand, Flux<ByteBuffer>>> sUnion(Publisher<SUnionCommand> commands) {
5555

56-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
56+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
5757

5858
Assert.notNull(command.getKeys(), "Keys must not be null!");
5959

@@ -74,7 +74,7 @@ public Flux<CommandResponse<SUnionCommand, Flux<ByteBuffer>>> sUnion(Publisher<S
7474
@Override
7575
public Flux<NumericResponse<SUnionStoreCommand, Long>> sUnionStore(Publisher<SUnionStoreCommand> commands) {
7676

77-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
77+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
7878

7979
Assert.notNull(command.getKeys(), "Source keys must not be null!");
8080
Assert.notNull(command.getKey(), "Destination key must not be null!");
@@ -99,7 +99,7 @@ public Flux<NumericResponse<SUnionStoreCommand, Long>> sUnionStore(Publisher<SUn
9999
@Override
100100
public Flux<CommandResponse<SInterCommand, Flux<ByteBuffer>>> sInter(Publisher<SInterCommand> commands) {
101101

102-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
102+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
103103

104104
Assert.notNull(command.getKeys(), "Keys must not be null!");
105105

@@ -124,7 +124,7 @@ public Flux<CommandResponse<SInterCommand, Flux<ByteBuffer>>> sInter(Publisher<S
124124
return source;
125125
});
126126

127-
return Mono.just(new CommandResponse<>(command, result.flatMap(v -> Flux.fromStream(v.stream()))));
127+
return Mono.just(new CommandResponse<>(command, result.concatMap(v -> Flux.fromStream(v.stream()))));
128128
}));
129129
}
130130

@@ -134,7 +134,7 @@ public Flux<CommandResponse<SInterCommand, Flux<ByteBuffer>>> sInter(Publisher<S
134134
@Override
135135
public Flux<NumericResponse<SInterStoreCommand, Long>> sInterStore(Publisher<SInterStoreCommand> commands) {
136136

137-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
137+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
138138

139139
Assert.notNull(command.getKeys(), "Source keys must not be null!");
140140
Assert.notNull(command.getKey(), "Destination key must not be null!");
@@ -159,7 +159,7 @@ public Flux<NumericResponse<SInterStoreCommand, Long>> sInterStore(Publisher<SIn
159159
@Override
160160
public Flux<CommandResponse<SDiffCommand, Flux<ByteBuffer>>> sDiff(Publisher<SDiffCommand> commands) {
161161

162-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
162+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
163163

164164
Assert.notNull(command.getKeys(), "Keys must not be null!");
165165

@@ -185,7 +185,7 @@ public Flux<CommandResponse<SDiffCommand, Flux<ByteBuffer>>> sDiff(Publisher<SDi
185185
return source;
186186
});
187187

188-
return Mono.just(new CommandResponse<>(command, result.flatMap(v -> Flux.fromStream(v.stream()))));
188+
return Mono.just(new CommandResponse<>(command, result.concatMap(v -> Flux.fromStream(v.stream()))));
189189

190190
}));
191191
}
@@ -196,7 +196,7 @@ public Flux<CommandResponse<SDiffCommand, Flux<ByteBuffer>>> sDiff(Publisher<SDi
196196
@Override
197197
public Flux<NumericResponse<SDiffStoreCommand, Long>> sDiffStore(Publisher<SDiffStoreCommand> commands) {
198198

199-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
199+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
200200

201201
Assert.notNull(command.getKeys(), "Source keys must not be null!");
202202
Assert.notNull(command.getKey(), "Destination key must not be null!");
@@ -221,7 +221,7 @@ public Flux<NumericResponse<SDiffStoreCommand, Long>> sDiffStore(Publisher<SDiff
221221
@Override
222222
public Flux<BooleanResponse<SMoveCommand>> sMove(Publisher<SMoveCommand> commands) {
223223

224-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
224+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
225225

226226
Assert.notNull(command.getKey(), "Source key must not be null!");
227227
Assert.notNull(command.getDestination(), "Destination key must not be null!");

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterStringCommands.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class LettuceReactiveClusterStringCommands extends LettuceReactiveStringCommands
5151
@Override
5252
public Flux<ReactiveRedisConnection.NumericResponse<BitOpCommand, Long>> bitOp(Publisher<BitOpCommand> commands) {
5353

54-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
54+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
5555

5656
List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
5757
keys.add(command.getDestinationKey());
@@ -71,7 +71,7 @@ public Flux<ReactiveRedisConnection.NumericResponse<BitOpCommand, Long>> bitOp(P
7171
@Override
7272
public Flux<ReactiveRedisConnection.BooleanResponse<MSetCommand>> mSetNX(Publisher<MSetCommand> commands) {
7373

74-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
74+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
7575

7676
if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getKeyValuePairs().keySet())) {
7777
return super.mSetNX(Mono.just(command));

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterZSetCommands.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class LettuceReactiveClusterZSetCommands extends LettuceReactiveZSetCommands imp
4747
@Override
4848
public Flux<NumericResponse<ZUnionStoreCommand, Long>> zUnionStore(Publisher<ZUnionStoreCommand> commands) {
4949

50-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
50+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
5151

5252
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty.");
5353

@@ -65,7 +65,7 @@ public Flux<NumericResponse<ZUnionStoreCommand, Long>> zUnionStore(Publisher<ZUn
6565
*/
6666
@Override
6767
public Flux<NumericResponse<ZInterStoreCommand, Long>> zInterStore(Publisher<ZInterStoreCommand> commands) {
68-
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
68+
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
6969

7070
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty.");
7171

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveGeoCommands.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class LettuceReactiveGeoCommands implements ReactiveGeoCommands {
6868
@Override
6969
public Flux<NumericResponse<GeoAddCommand, Long>> geoAdd(Publisher<GeoAddCommand> commands) {
7070

71-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
71+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
7272

7373
Assert.notNull(command.getKey(), "Key must not be null!");
7474
Assert.notNull(command.getGeoLocations(), "Locations must not be null!");
@@ -95,7 +95,7 @@ public Flux<NumericResponse<GeoAddCommand, Long>> geoAdd(Publisher<GeoAddCommand
9595
@Override
9696
public Flux<CommandResponse<GeoDistCommand, Distance>> geoDist(Publisher<GeoDistCommand> commands) {
9797

98-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
98+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
9999

100100
Assert.notNull(command.getKey(), "Key must not be null!");
101101
Assert.notNull(command.getFrom(), "From member must not be null!");
@@ -121,7 +121,7 @@ public Flux<CommandResponse<GeoDistCommand, Distance>> geoDist(Publisher<GeoDist
121121
@Override
122122
public Flux<MultiValueResponse<GeoHashCommand, String>> geoHash(Publisher<GeoHashCommand> commands) {
123123

124-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
124+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
125125

126126
Assert.notNull(command.getKey(), "Key must not be null!");
127127
Assert.notNull(command.getMembers(), "Members must not be null!");
@@ -139,7 +139,7 @@ public Flux<MultiValueResponse<GeoHashCommand, String>> geoHash(Publisher<GeoHas
139139
@Override
140140
public Flux<MultiValueResponse<GeoPosCommand, Point>> geoPos(Publisher<GeoPosCommand> commands) {
141141

142-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
142+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
143143

144144
Assert.notNull(command.getKey(), "Key must not be null!");
145145
Assert.notNull(command.getMembers(), "Members must not be null!");
@@ -160,7 +160,7 @@ public Flux<MultiValueResponse<GeoPosCommand, Point>> geoPos(Publisher<GeoPosCom
160160
public Flux<CommandResponse<GeoRadiusCommand, Flux<GeoResult<GeoLocation<ByteBuffer>>>>> geoRadius(
161161
Publisher<GeoRadiusCommand> commands) {
162162

163-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
163+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
164164

165165
Assert.notNull(command.getKey(), "Key must not be null!");
166166
Assert.notNull(command.getPoint(), "Point must not be null!");
@@ -187,7 +187,7 @@ public Flux<CommandResponse<GeoRadiusCommand, Flux<GeoResult<GeoLocation<ByteBuf
187187
public Flux<CommandResponse<GeoRadiusByMemberCommand, Flux<GeoResult<GeoLocation<ByteBuffer>>>>> geoRadiusByMember(
188188
Publisher<GeoRadiusByMemberCommand> commands) {
189189

190-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
190+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
191191

192192
Assert.notNull(command.getKey(), "Key must not be null!");
193193
Assert.notNull(command.getMember(), "Member must not be null!");

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveHashCommands.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class LettuceReactiveHashCommands implements ReactiveHashCommands {
6363
@Override
6464
public Flux<BooleanResponse<HSetCommand>> hSet(Publisher<HSetCommand> commands) {
6565

66-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
66+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
6767

6868
Assert.notNull(command.getKey(), "Key must not be null!");
6969
Assert.notNull(command.getFieldValueMap(), "FieldValueMap must not be null!");
@@ -95,7 +95,7 @@ public Flux<BooleanResponse<HSetCommand>> hSet(Publisher<HSetCommand> commands)
9595
@Override
9696
public Flux<MultiValueResponse<HGetCommand, ByteBuffer>> hMGet(Publisher<HGetCommand> commands) {
9797

98-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
98+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
9999

100100
Assert.notNull(command.getKey(), "Key must not be null!");
101101
Assert.notNull(command.getFields(), "Fields must not be null!");
@@ -122,7 +122,7 @@ public Flux<MultiValueResponse<HGetCommand, ByteBuffer>> hMGet(Publisher<HGetCom
122122
@Override
123123
public Flux<BooleanResponse<HExistsCommand>> hExists(Publisher<HExistsCommand> commands) {
124124

125-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
125+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
126126

127127
Assert.notNull(command.getKey(), "Key must not be null!");
128128
Assert.notNull(command.getName(), "Name must not be null!");
@@ -138,7 +138,7 @@ public Flux<BooleanResponse<HExistsCommand>> hExists(Publisher<HExistsCommand> c
138138
@Override
139139
public Flux<NumericResponse<HDelCommand, Long>> hDel(Publisher<HDelCommand> commands) {
140140

141-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
141+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
142142

143143
Assert.notNull(command.getKey(), "Key must not be null!");
144144
Assert.notNull(command.getFields(), "Fields must not be null!");
@@ -155,7 +155,7 @@ public Flux<NumericResponse<HDelCommand, Long>> hDel(Publisher<HDelCommand> comm
155155
@Override
156156
public Flux<NumericResponse<KeyCommand, Long>> hLen(Publisher<KeyCommand> commands) {
157157

158-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
158+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
159159

160160
Assert.notNull(command.getKey(), "Command.getKey() must not be null!");
161161

@@ -170,7 +170,7 @@ public Flux<NumericResponse<KeyCommand, Long>> hLen(Publisher<KeyCommand> comman
170170
@Override
171171
public Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> hKeys(Publisher<KeyCommand> commands) {
172172

173-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
173+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
174174

175175
Assert.notNull(command.getKey(), "Key must not be null!");
176176

@@ -187,7 +187,7 @@ public Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> hKeys(Publisher<KeyCo
187187
@Override
188188
public Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> hVals(Publisher<KeyCommand> commands) {
189189

190-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
190+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
191191

192192
Assert.notNull(command.getKey(), "Key must not be null!");
193193

@@ -205,7 +205,7 @@ public Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> hVals(Publisher<KeyCo
205205
public Flux<CommandResponse<KeyCommand, Flux<Map.Entry<ByteBuffer, ByteBuffer>>>> hGetAll(
206206
Publisher<KeyCommand> commands) {
207207

208-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
208+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
209209

210210
Assert.notNull(command.getKey(), "Key must not be null!");
211211

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveHyperLogLogCommands.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class LettuceReactiveHyperLogLogCommands implements ReactiveHyperLogLogCommands
5353
@Override
5454
public Flux<NumericResponse<PfAddCommand, Long>> pfAdd(Publisher<PfAddCommand> commands) {
5555

56-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
56+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
5757

5858
Assert.notNull(command.getKey(), "key must not be null!");
5959

@@ -70,7 +70,7 @@ public Flux<NumericResponse<PfAddCommand, Long>> pfAdd(Publisher<PfAddCommand> c
7070
@Override
7171
public Flux<NumericResponse<PfCountCommand, Long>> pfCount(Publisher<PfCountCommand> commands) {
7272

73-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
73+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
7474

7575
Assert.notEmpty(command.getKeys(), "Keys must not be empty for PFCOUNT.");
7676

@@ -86,7 +86,7 @@ public Flux<NumericResponse<PfCountCommand, Long>> pfCount(Publisher<PfCountComm
8686
@Override
8787
public Flux<BooleanResponse<PfMergeCommand>> pfMerge(Publisher<PfMergeCommand> commands) {
8888

89-
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
89+
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
9090

9191
Assert.notNull(command.getKey(), "Destination key must not be null for PFMERGE.");
9292
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null for PFMERGE.");

0 commit comments

Comments
 (0)