Skip to content

Commit afb0698

Browse files
committed
feat(impv): Support Lettuce
1 parent 3a4ffcb commit afb0698

File tree

1 file changed

+10
-6
lines changed

1 file changed

+10
-6
lines changed

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.lettuce.core.XAddArgs;
1919
import io.lettuce.core.XClaimArgs;
2020
import io.lettuce.core.XGroupCreateArgs;
21+
import io.lettuce.core.XPendingArgs;
2122
import io.lettuce.core.XReadArgs;
2223
import io.lettuce.core.api.async.RedisStreamAsyncCommands;
2324
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
@@ -50,6 +51,7 @@
5051
* @author Dejan Jankov
5152
* @author Dengliming
5253
* @author Mark John Moreno
54+
* @author Jeonggyu Choi
5355
* @since 2.2
5456
*/
5557
class LettuceStreamCommands implements RedisStreamCommands {
@@ -217,15 +219,17 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op
217219
io.lettuce.core.Limit limit = options.isLimited() ? io.lettuce.core.Limit.from(options.getCount())
218220
: io.lettuce.core.Limit.unlimited();
219221

222+
XPendingArgs<byte[]> xPendingArgs = XPendingArgs.Builder.xpending(group, range, limit);
220223
if (options.hasConsumer()) {
221-
222-
return connection.invoke()
223-
.from(RedisStreamAsyncCommands::xpending, key,
224-
io.lettuce.core.Consumer.from(group, LettuceConverters.toBytes(options.getConsumerName())), range, limit)
225-
.get(it -> StreamConverters.toPendingMessages(groupName, options.getRange(), it));
224+
io.lettuce.core.Consumer<byte[]> consumer = io.lettuce.core.Consumer.from(group,
225+
LettuceConverters.toBytes(options.getConsumerName()));
226+
xPendingArgs.consumer(consumer);
227+
}
228+
if (options.hasIdle()) {
229+
xPendingArgs.idle(options.getIdle());
226230
}
227231

228-
return connection.invoke().from(RedisStreamAsyncCommands::xpending, key, group, range, limit)
232+
return connection.invoke().from(RedisStreamAsyncCommands::xpending, key, xPendingArgs)
229233
.get(it -> StreamConverters.toPendingMessages(groupName, options.getRange(), it));
230234
}
231235

0 commit comments

Comments
 (0)