Skip to content

Commit d11d306

Browse files
authored
Adding additional wait to avoid eventual consistency issues (#6006)
Co-authored-by: Ran Vaknin <[email protected]>
1 parent fe06145 commit d11d306

File tree

1 file changed

+16
-0
lines changed

1 file changed

+16
-0
lines changed

test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisStabilityTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
3939
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
4040
import software.amazon.awssdk.services.kinesis.model.Record;
41+
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
4142
import software.amazon.awssdk.services.kinesis.model.Shard;
4243
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
4344
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
@@ -202,6 +203,21 @@ private void waitForStreamToBeActive() {
202203
.join())
203204
.until(b -> b.streamDescription().streamStatus().equals(StreamStatus.ACTIVE))
204205
.orFailAfter(Duration.ofMinutes(5));
206+
207+
// Additional verification to ensure stream is fully operational
208+
Waiter.run(() -> {
209+
try {
210+
asyncClient.listShards(r -> r.streamName(streamName)).join();
211+
return true;
212+
} catch (Exception e) {
213+
if (e.getCause() instanceof ResourceInUseException) {
214+
return false;
215+
}
216+
throw e;
217+
}
218+
})
219+
.until(Boolean::booleanValue)
220+
.orFailAfter(Duration.ofMinutes(1));
205221
}
206222

207223
private void waitForConsumersToBeActive() {

0 commit comments

Comments
 (0)