Skip to content

Commit 488db15

Browse files
committed
fix: retry on RST_STREAM internal error
1 parent a40bda9 commit 488db15

File tree

3 files changed

+55
-0
lines changed

3 files changed

+55
-0
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsRetryableInternalError.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ public class IsRetryableInternalError implements Predicate<Throwable> {
2929
private static final String EOS_ERROR_MESSAGE =
3030
"Received unexpected EOS on DATA frame from server";
3131

32+
private static final String RST_STREAM_ERROR_MESSAGE = "stream terminated by RST_STREAM";
33+
3234
@Override
3335
public boolean apply(Throwable cause) {
3436
if (isInternalError(cause)) {
@@ -38,6 +40,8 @@ public boolean apply(Throwable cause) {
3840
return true;
3941
} else if (cause.getMessage().contains(EOS_ERROR_MESSAGE)) {
4042
return true;
43+
} else if (cause.getMessage().contains(RST_STREAM_ERROR_MESSAGE)) {
44+
return true;
4145
}
4246
}
4347
return false;

google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsRetryableInternalErrorTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,18 @@ public void genericInternalStatusRuntimeExceptionIsRetryable() {
114114
assertThat(predicate.apply(e)).isFalse();
115115
}
116116

117+
@Test
118+
public void rstStreamInternalExceptionIsRetryable() {
119+
final InternalException e =
120+
new InternalException(
121+
"INTERNAL: stream terminated by RST_STREAM.",
122+
null,
123+
GrpcStatusCode.of(Code.INTERNAL),
124+
false);
125+
126+
assertThat(predicate.apply(e)).isTrue();
127+
}
128+
117129
@Test
118130
public void genericInternalExceptionIsNotRetryable() {
119131
final InternalException e =

google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,45 @@ public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() {
344344
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class));
345345
}
346346

347+
@Test
348+
public void testExecuteStreamingPartitionedUpdateRSTstream() {
349+
ResultSetStats stats = ResultSetStats.newBuilder().setRowCountLowerBound(1000L).build();
350+
PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build();
351+
PartialResultSet p2 = PartialResultSet.newBuilder().setStats(stats).build();
352+
ServerStream<PartialResultSet> stream1 = mock(ServerStream.class);
353+
Iterator<PartialResultSet> iterator = mock(Iterator.class);
354+
when(iterator.hasNext()).thenReturn(true, true, false);
355+
when(iterator.next())
356+
.thenReturn(p1)
357+
.thenThrow(
358+
new InternalException(
359+
"INTERNAL: stream terminated by RST_STREAM.",
360+
null,
361+
GrpcStatusCode.of(Code.INTERNAL),
362+
true));
363+
when(stream1.iterator()).thenReturn(iterator);
364+
ServerStream<PartialResultSet> stream2 = mock(ServerStream.class);
365+
when(stream2.iterator()).thenReturn(ImmutableList.of(p1, p2).iterator());
366+
when(rpc.executeStreamingPartitionedDml(
367+
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)))
368+
.thenReturn(stream1);
369+
when(rpc.executeStreamingPartitionedDml(
370+
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class)))
371+
.thenReturn(stream2);
372+
373+
PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker);
374+
long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10));
375+
376+
assertThat(count).isEqualTo(1000L);
377+
verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap());
378+
verify(rpc)
379+
.executeStreamingPartitionedDml(
380+
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class));
381+
verify(rpc)
382+
.executeStreamingPartitionedDml(
383+
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class));
384+
}
385+
347386
@Test
348387
public void testExecuteStreamingPartitionedUpdateGenericInternalException() {
349388
PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build();

0 commit comments

Comments
 (0)