Skip to content

Commit 10c3563

Browse files
odeke-emolavloite
andauthored
chore(x-goog-spanner-request-id): plumb for BatchCreateSessions (#3815)
* chore(x-goog-spanner-request-id): plumb for BatchCreateSessions This change plumbs x-goog-spanner-request-id into BatchCreateSessions and asserts that the header is present for that method. Updates #3537 * test: fix failing test cases * Add setRequestIdCreator inside session tests * Plumb in test for DatabaseClientImpl.runWithSessionRetry behavior * Remove unnecessary debugs * Address review feedback * More code review updates * Ensure that requestIdCreator is non-null * More code review updates * More updates * DatabaseClientImpl.runWithSessionRetry update on ID * Move reqId attempt increase out of runWithRetries * Apply allOptions copy everywhere * Update tests for createMultiplexedSession * chore(x-goog-spanner-request-id): plumb for BatchCreateSessions This change plumbs x-goog-spanner-request-id into BatchCreateSessions and asserts that the header is present for that method. Updates #3537 * test: fix failing test cases * Add setRequestIdCreator inside session tests * Remove unnecessary debugs * Address review feedback * More code review updates * More code review updates More updates DatabaseClientImpl.runWithSessionRetry update on ID Move reqId attempt increase out of runWithRetries Apply allOptions copy everywhere Complete testrunWithSessionRetry_withRequestId * chore: fix failing test cases --------- Co-authored-by: Knut Olav Løite <[email protected]>
1 parent 140c02f commit 10c3563

16 files changed

+688
-60
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,17 @@
2424
import com.google.cloud.spanner.SpannerImpl.ClosedException;
2525
import com.google.cloud.spanner.Statement.StatementFactory;
2626
import com.google.common.annotations.VisibleForTesting;
27-
import com.google.common.base.Function;
2827
import com.google.common.util.concurrent.ListenableFuture;
2928
import com.google.spanner.v1.BatchWriteResponse;
3029
import io.opentelemetry.api.common.Attributes;
30+
import java.util.HashMap;
31+
import java.util.Map;
3132
import java.util.concurrent.ExecutionException;
3233
import java.util.concurrent.Future;
3334
import java.util.concurrent.TimeUnit;
3435
import java.util.concurrent.TimeoutException;
36+
import java.util.concurrent.atomic.AtomicInteger;
37+
import java.util.function.BiFunction;
3538
import javax.annotation.Nullable;
3639

3740
class DatabaseClientImpl implements DatabaseClient {
@@ -45,6 +48,9 @@ class DatabaseClientImpl implements DatabaseClient {
4548
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
4649
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
4750
@VisibleForTesting final boolean useMultiplexedSessionForRW;
51+
private final int dbId;
52+
private final AtomicInteger nthRequest;
53+
private final Map<String, Integer> clientIdToOrdinalMap;
4854

4955
final boolean useMultiplexedSessionBlindWrite;
5056

@@ -91,6 +97,20 @@ class DatabaseClientImpl implements DatabaseClient {
9197
this.tracer = tracer;
9298
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
9399
this.commonAttributes = commonAttributes;
100+
101+
this.clientIdToOrdinalMap = new HashMap<String, Integer>();
102+
this.dbId = this.dbIdFromClientId(this.clientId);
103+
this.nthRequest = new AtomicInteger(0);
104+
}
105+
106+
@VisibleForTesting
107+
synchronized int dbIdFromClientId(String clientId) {
108+
Integer id = this.clientIdToOrdinalMap.get(clientId);
109+
if (id == null) {
110+
id = this.clientIdToOrdinalMap.size() + 1;
111+
this.clientIdToOrdinalMap.put(clientId, id);
112+
}
113+
return id;
94114
}
95115

96116
@VisibleForTesting
@@ -188,7 +208,11 @@ public CommitResponse writeWithOptions(
188208
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
189209
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
190210
}
191-
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));
211+
212+
return runWithSessionRetry(
213+
(session, reqId) -> {
214+
return session.writeWithOptions(mutations, withReqId(reqId, options));
215+
});
192216
} catch (RuntimeException e) {
193217
span.setStatus(e);
194218
throw e;
@@ -213,7 +237,8 @@ public CommitResponse writeAtLeastOnceWithOptions(
213237
.writeAtLeastOnceWithOptions(mutations, options);
214238
}
215239
return runWithSessionRetry(
216-
session -> session.writeAtLeastOnceWithOptions(mutations, options));
240+
(session, reqId) ->
241+
session.writeAtLeastOnceWithOptions(mutations, withReqId(reqId, options)));
217242
} catch (RuntimeException e) {
218243
span.setStatus(e);
219244
throw e;
@@ -222,6 +247,15 @@ public CommitResponse writeAtLeastOnceWithOptions(
222247
}
223248
}
224249

250+
private int nextNthRequest() {
251+
return this.nthRequest.incrementAndGet();
252+
}
253+
254+
@VisibleForTesting
255+
int getNthRequest() {
256+
return this.nthRequest.get();
257+
}
258+
225259
@Override
226260
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
227261
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
@@ -231,7 +265,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
231265
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
232266
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
233267
}
234-
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
268+
return runWithSessionRetry(
269+
(session, reqId) ->
270+
session.batchWriteAtLeastOnce(mutationGroups, withReqId(reqId, options)));
235271
} catch (RuntimeException e) {
236272
span.setStatus(e);
237273
throw e;
@@ -383,27 +419,65 @@ private Future<Dialect> getDialectAsync() {
383419
return pool.getDialectAsync();
384420
}
385421

422+
private UpdateOption[] withReqId(
423+
final XGoogSpannerRequestId reqId, final UpdateOption... options) {
424+
if (reqId == null) {
425+
return options;
426+
}
427+
if (options == null || options.length == 0) {
428+
return new UpdateOption[] {new Options.RequestIdOption(reqId)};
429+
}
430+
UpdateOption[] allOptions = new UpdateOption[options.length + 1];
431+
System.arraycopy(options, 0, allOptions, 0, options.length);
432+
allOptions[options.length] = new Options.RequestIdOption(reqId);
433+
return allOptions;
434+
}
435+
436+
private TransactionOption[] withReqId(
437+
final XGoogSpannerRequestId reqId, final TransactionOption... options) {
438+
if (reqId == null) {
439+
return options;
440+
}
441+
if (options == null || options.length == 0) {
442+
return new TransactionOption[] {new Options.RequestIdOption(reqId)};
443+
}
444+
TransactionOption[] allOptions = new TransactionOption[options.length + 1];
445+
System.arraycopy(options, 0, allOptions, 0, options.length);
446+
allOptions[options.length] = new Options.RequestIdOption(reqId);
447+
return allOptions;
448+
}
449+
386450
private long executePartitionedUpdateWithPooledSession(
387451
final Statement stmt, final UpdateOption... options) {
388452
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
389453
try (IScope s = tracer.withSpan(span)) {
390-
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
454+
return runWithSessionRetry(
455+
(session, reqId) -> {
456+
return session.executePartitionedUpdate(stmt, withReqId(reqId, options));
457+
});
391458
} catch (RuntimeException e) {
392459
span.setStatus(e);
393460
span.end();
394461
throw e;
395462
}
396463
}
397464

398-
private <T> T runWithSessionRetry(Function<Session, T> callable) {
465+
@VisibleForTesting
466+
<T> T runWithSessionRetry(BiFunction<Session, XGoogSpannerRequestId, T> callable) {
399467
PooledSessionFuture session = getSession();
468+
XGoogSpannerRequestId reqId =
469+
XGoogSpannerRequestId.of(
470+
this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 1);
400471
while (true) {
401472
try {
402-
return callable.apply(session);
473+
return callable.apply(session, reqId);
403474
} catch (SessionNotFoundException e) {
404475
session =
405476
(PooledSessionFuture)
406477
pool.getPooledSessionReplacementHandler().replaceSession(e, session);
478+
reqId =
479+
XGoogSpannerRequestId.of(
480+
this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 1);
407481
}
408482
}
409483
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,10 @@ public static UpdateTransactionOption excludeTxnFromChangeStreams() {
177177
return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION;
178178
}
179179

180+
public static RequestIdOption requestId(XGoogSpannerRequestId reqId) {
181+
return new RequestIdOption(reqId);
182+
}
183+
180184
/**
181185
* Specifying this will cause the read to yield at most this many rows. This should be greater
182186
* than 0.
@@ -535,6 +539,7 @@ void appendToOptions(Options options) {
535539
private RpcLockHint lockHint;
536540
private Boolean lastStatement;
537541
private IsolationLevel isolationLevel;
542+
private XGoogSpannerRequestId reqId;
538543

539544
// Construction is via factory methods below.
540545
private Options() {}
@@ -599,6 +604,14 @@ String filter() {
599604
return filter;
600605
}
601606

607+
boolean hasReqId() {
608+
return reqId != null;
609+
}
610+
611+
XGoogSpannerRequestId reqId() {
612+
return reqId;
613+
}
614+
602615
boolean hasPriority() {
603616
return priority != null;
604617
}
@@ -756,6 +769,9 @@ public String toString() {
756769
if (isolationLevel != null) {
757770
b.append("isolationLevel: ").append(isolationLevel).append(' ');
758771
}
772+
if (reqId != null) {
773+
b.append("requestId: ").append(reqId.toString());
774+
}
759775
return b.toString();
760776
}
761777

@@ -798,7 +814,8 @@ public boolean equals(Object o) {
798814
&& Objects.equals(orderBy(), that.orderBy())
799815
&& Objects.equals(isLastStatement(), that.isLastStatement())
800816
&& Objects.equals(lockHint(), that.lockHint())
801-
&& Objects.equals(isolationLevel(), that.isolationLevel());
817+
&& Objects.equals(isolationLevel(), that.isolationLevel())
818+
&& Objects.equals(reqId(), that.reqId());
802819
}
803820

804821
@Override
@@ -867,6 +884,9 @@ public int hashCode() {
867884
if (isolationLevel != null) {
868885
result = 31 * result + isolationLevel.hashCode();
869886
}
887+
if (reqId != null) {
888+
result = 31 * result + reqId.hashCode();
889+
}
870890
return result;
871891
}
872892

@@ -1052,4 +1072,28 @@ public boolean equals(Object o) {
10521072
return o instanceof LastStatementUpdateOption;
10531073
}
10541074
}
1075+
1076+
static final class RequestIdOption extends InternalOption
1077+
implements ReadOption, TransactionOption, UpdateOption {
1078+
private final XGoogSpannerRequestId reqId;
1079+
1080+
RequestIdOption(XGoogSpannerRequestId reqId) {
1081+
this.reqId = reqId;
1082+
}
1083+
1084+
@Override
1085+
void appendToOptions(Options options) {
1086+
options.reqId = this.reqId;
1087+
}
1088+
1089+
@Override
1090+
public int hashCode() {
1091+
return RequestIdOption.class.hashCode();
1092+
}
1093+
1094+
@Override
1095+
public boolean equals(Object o) {
1096+
return o instanceof RequestIdOption;
1097+
}
1098+
}
10551099
}

0 commit comments

Comments
 (0)