Skip to content

Commit b7fb631

Browse files
committed
feat(reactive): Add idle argument to xPending
1 parent cd612b2 commit b7fb631

File tree

2 files changed

+103
-4
lines changed

2 files changed

+103
-4
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

+93-4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
* @author Dengliming
6161
* @author Mark John Moreno
6262
* @author jinkshower
63+
* @author Jeonggyu Choi
6364
* @since 2.2
6465
*/
6566
public interface ReactiveStreamCommands {
@@ -747,6 +748,25 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?
747748
.map(CommandResponse::getOutput);
748749
}
749750

751+
/**
752+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
753+
* {@literal consumer group} and over a given {@link Duration} of idle time.
754+
*
755+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
756+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
757+
* @param range the range of messages ids to search within. Must not be {@literal null}.
758+
* @param count limit the number of results. Must not be {@literal null}.
759+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
760+
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
761+
* transaction.
762+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
763+
* @since 3.5
764+
*/
765+
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count, Duration idle) {
766+
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).idle(idle))).next()
767+
.map(CommandResponse::getOutput);
768+
}
769+
750770
/**
751771
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
752772
* {@link Consumer} within a {@literal consumer group}.
@@ -763,6 +783,23 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
763783
return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
764784
}
765785

786+
/**
787+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
788+
* {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
789+
*
790+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
791+
* @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
792+
* @param range the range of messages ids to search within. Must not be {@literal null}.
793+
* @param count limit the number of results. Must not be {@literal null}.
794+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
795+
* @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
796+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
797+
* @since 3.5
798+
*/
799+
default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<?> range, Long count, Duration idle) {
800+
return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idle);
801+
}
802+
766803
/**
767804
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
768805
* {@literal consumer} within a {@literal consumer group}.
@@ -783,6 +820,27 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
783820
.next().map(CommandResponse::getOutput);
784821
}
785822

823+
/**
824+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
825+
* {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
826+
*
827+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
828+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
829+
* @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
830+
* @param range the range of messages ids to search within. Must not be {@literal null}.
831+
* @param count limit the number of results. Must not be {@literal null}.
832+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
833+
* @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
834+
* when used in pipeline / transaction.
835+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
836+
* @since 3.5
837+
*/
838+
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
839+
Long count, Duration idle) {
840+
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count)
841+
.idle(idle))).next().map(CommandResponse::getOutput);
842+
}
843+
786844
/**
787845
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
788846
* options}.
@@ -798,6 +856,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
798856
* Value Object holding parameters for obtaining pending messages.
799857
*
800858
* @author Christoph Strobl
859+
* @author Jeonggyu Choi
801860
* @since 2.3
802861
*/
803862
class PendingRecordsCommand extends KeyCommand {
@@ -806,16 +865,18 @@ class PendingRecordsCommand extends KeyCommand {
806865
private final @Nullable String consumerName;
807866
private final Range<?> range;
808867
private final @Nullable Long count;
868+
private final @Nullable Duration idle;
809869

810870
private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range<?> range,
811-
@Nullable Long count) {
871+
@Nullable Long count, @Nullable Duration idle) {
812872

813873
super(key);
814874

815875
this.groupName = groupName;
816876
this.consumerName = consumerName;
817877
this.range = range;
818878
this.count = count;
879+
this.idle = idle;
819880
}
820881

821882
/**
@@ -826,7 +887,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String
826887
* @return new instance of {@link PendingRecordsCommand}.
827888
*/
828889
static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
829-
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null);
890+
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null);
830891
}
831892

832893
/**
@@ -841,7 +902,7 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
841902
Assert.notNull(range, "Range must not be null");
842903
Assert.isTrue(count > -1, "Count must not be negative");
843904

844-
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
905+
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, null);
845906
}
846907

847908
/**
@@ -851,7 +912,20 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
851912
* @return new instance of {@link PendingRecordsCommand}.
852913
*/
853914
public PendingRecordsCommand consumer(String consumerName) {
854-
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
915+
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idle);
916+
}
917+
918+
/**
919+
* Append given idle time.
920+
*
921+
* @param idle must not be {@literal null}.
922+
* @return new instance of {@link PendingRecordsCommand}.
923+
*/
924+
public PendingRecordsCommand idle(Duration idle) {
925+
926+
Assert.notNull(idle, "Idle must not be null");
927+
928+
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idle);
855929
}
856930

857931
public String getGroupName() {
@@ -881,6 +955,14 @@ public Long getCount() {
881955
return count;
882956
}
883957

958+
/**
959+
* @return can be {@literal null}.
960+
*/
961+
@Nullable
962+
public Duration getIdle() {
963+
return idle;
964+
}
965+
884966
/**
885967
* @return {@literal true} if a consumer name is present.
886968
*/
@@ -894,6 +976,13 @@ public boolean hasConsumer() {
894976
public boolean isLimited() {
895977
return count != null;
896978
}
979+
980+
/**
981+
* @return {@literal true} if idle is set.
982+
*/
983+
public boolean hasIdle() {
984+
return idle != null;
985+
}
897986
}
898987

899988
/**

src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java

+10
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,14 @@ void pendingRecordsCommandRangeShouldThrowExceptionWhenCountIsNegative() {
5353

5454
assertThatIllegalArgumentException().isThrownBy(() -> command.range(range, -1L));
5555
}
56+
57+
@Test // GH-2049
58+
void pendingRecordsCommandIdleShouldThrowExceptionWhenIdleIsNull() {
59+
ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes());
60+
String groupName = "my-group";
61+
62+
PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName);
63+
64+
assertThatIllegalArgumentException().isThrownBy(() -> command.idle(null));
65+
}
5666
}

0 commit comments

Comments
 (0)