Skip to content

chore(x-goog-spanner-request-id): plumb for BatchCreateSessions #3815

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.cloud.spanner.Statement.StatementFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.spanner.v1.BatchWriteResponse;
import io.opentelemetry.api.common.Attributes;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import javax.annotation.Nullable;

class DatabaseClientImpl implements DatabaseClient {
Expand All @@ -45,6 +48,9 @@ class DatabaseClientImpl implements DatabaseClient {
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
@VisibleForTesting final boolean useMultiplexedSessionForRW;
private final int dbId;
private final AtomicInteger nthRequest;
private final Map<String, Integer> clientIdToOrdinalMap;

final boolean useMultiplexedSessionBlindWrite;

Expand Down Expand Up @@ -91,6 +97,20 @@ class DatabaseClientImpl implements DatabaseClient {
this.tracer = tracer;
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
this.commonAttributes = commonAttributes;

this.clientIdToOrdinalMap = new HashMap<String, Integer>();
this.dbId = this.dbIdFromClientId(this.clientId);
this.nthRequest = new AtomicInteger(0);
}

@VisibleForTesting
synchronized int dbIdFromClientId(String clientId) {
Integer id = this.clientIdToOrdinalMap.get(clientId);
if (id == null) {
id = this.clientIdToOrdinalMap.size() + 1;
this.clientIdToOrdinalMap.put(clientId, id);
}
return id;
}

@VisibleForTesting
Expand Down Expand Up @@ -188,7 +208,11 @@ public CommitResponse writeWithOptions(
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
}
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));

return runWithSessionRetry(
(session, reqId) -> {
return session.writeWithOptions(mutations, withReqId(reqId, options));
});
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -213,7 +237,8 @@ public CommitResponse writeAtLeastOnceWithOptions(
.writeAtLeastOnceWithOptions(mutations, options);
}
return runWithSessionRetry(
session -> session.writeAtLeastOnceWithOptions(mutations, options));
(session, reqId) ->
session.writeAtLeastOnceWithOptions(mutations, withReqId(reqId, options)));
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -222,6 +247,15 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

private int nextNthRequest() {
return this.nthRequest.incrementAndGet();
}

@VisibleForTesting
int getNthRequest() {
return this.nthRequest.get();
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
Expand All @@ -231,7 +265,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
}
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
return runWithSessionRetry(
(session, reqId) ->
session.batchWriteAtLeastOnce(mutationGroups, withReqId(reqId, options)));
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand Down Expand Up @@ -383,27 +419,65 @@ private Future<Dialect> getDialectAsync() {
return pool.getDialectAsync();
}

private UpdateOption[] withReqId(
final XGoogSpannerRequestId reqId, final UpdateOption... options) {
if (reqId == null) {
return options;
}
if (options == null || options.length == 0) {
return new UpdateOption[] {new Options.RequestIdOption(reqId)};
}
UpdateOption[] allOptions = new UpdateOption[options.length + 1];
System.arraycopy(options, 0, allOptions, 0, options.length);
allOptions[options.length] = new Options.RequestIdOption(reqId);
return allOptions;
}

private TransactionOption[] withReqId(
final XGoogSpannerRequestId reqId, final TransactionOption... options) {
if (reqId == null) {
return options;
}
if (options == null || options.length == 0) {
return new TransactionOption[] {new Options.RequestIdOption(reqId)};
}
TransactionOption[] allOptions = new TransactionOption[options.length + 1];
System.arraycopy(options, 0, allOptions, 0, options.length);
allOptions[options.length] = new Options.RequestIdOption(reqId);
return allOptions;
}

private long executePartitionedUpdateWithPooledSession(
final Statement stmt, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
return runWithSessionRetry(
(session, reqId) -> {
return session.executePartitionedUpdate(stmt, withReqId(reqId, options));
});
} catch (RuntimeException e) {
span.setStatus(e);
span.end();
throw e;
}
}

private <T> T runWithSessionRetry(Function<Session, T> callable) {
@VisibleForTesting
<T> T runWithSessionRetry(BiFunction<Session, XGoogSpannerRequestId, T> callable) {
PooledSessionFuture session = getSession();
XGoogSpannerRequestId reqId =
XGoogSpannerRequestId.of(
this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 1);
while (true) {
try {
return callable.apply(session);
return callable.apply(session, reqId);
} catch (SessionNotFoundException e) {
session =
(PooledSessionFuture)
pool.getPooledSessionReplacementHandler().replaceSession(e, session);
reqId =
XGoogSpannerRequestId.of(
this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ public static UpdateTransactionOption excludeTxnFromChangeStreams() {
return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION;
}

public static RequestIdOption requestId(XGoogSpannerRequestId reqId) {
return new RequestIdOption(reqId);
}

/**
* Specifying this will cause the read to yield at most this many rows. This should be greater
* than 0.
Expand Down Expand Up @@ -535,6 +539,7 @@ void appendToOptions(Options options) {
private RpcLockHint lockHint;
private Boolean lastStatement;
private IsolationLevel isolationLevel;
private XGoogSpannerRequestId reqId;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -599,6 +604,14 @@ String filter() {
return filter;
}

boolean hasReqId() {
return reqId != null;
}

XGoogSpannerRequestId reqId() {
return reqId;
}

boolean hasPriority() {
return priority != null;
}
Expand Down Expand Up @@ -756,6 +769,9 @@ public String toString() {
if (isolationLevel != null) {
b.append("isolationLevel: ").append(isolationLevel).append(' ');
}
if (reqId != null) {
b.append("requestId: ").append(reqId.toString());
}
return b.toString();
}

Expand Down Expand Up @@ -798,7 +814,8 @@ public boolean equals(Object o) {
&& Objects.equals(orderBy(), that.orderBy())
&& Objects.equals(isLastStatement(), that.isLastStatement())
&& Objects.equals(lockHint(), that.lockHint())
&& Objects.equals(isolationLevel(), that.isolationLevel());
&& Objects.equals(isolationLevel(), that.isolationLevel())
&& Objects.equals(reqId(), that.reqId());
}

@Override
Expand Down Expand Up @@ -867,6 +884,9 @@ public int hashCode() {
if (isolationLevel != null) {
result = 31 * result + isolationLevel.hashCode();
}
if (reqId != null) {
result = 31 * result + reqId.hashCode();
}
return result;
}

Expand Down Expand Up @@ -1052,4 +1072,28 @@ public boolean equals(Object o) {
return o instanceof LastStatementUpdateOption;
}
}

static final class RequestIdOption extends InternalOption
implements ReadOption, TransactionOption, UpdateOption {
private final XGoogSpannerRequestId reqId;

RequestIdOption(XGoogSpannerRequestId reqId) {
this.reqId = reqId;
}

@Override
void appendToOptions(Options options) {
options.reqId = this.reqId;
}

@Override
public int hashCode() {
return RequestIdOption.class.hashCode();
}

@Override
public boolean equals(Object o) {
return o instanceof RequestIdOption;
}
Comment on lines +1090 to +1097
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding #3815 (comment)

LastStatementOption does not contain any state, and hence the hashCode and equals methods just returns the hash code of the class / checks if the other object is also an instance of the same class.

This option does contain state, and the hashCode and equals methods should use that state to calculate a hash value and check equality. Let's fix that in a follow-up PR.

}
}
Loading
Loading