Skip to content

Add operation identifier to retry logs. #1112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.mongodb.internal.binding.WriteBinding;
import com.mongodb.internal.connection.AsyncConnection;
import com.mongodb.internal.connection.Connection;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.operation.OperationHelper.ResourceSupplierInternalException;
import com.mongodb.internal.operation.retry.AttachmentKeys;
import com.mongodb.internal.validator.NoOpFieldNameValidator;
Expand Down Expand Up @@ -183,19 +184,20 @@ static RetryState initialRetryState(final boolean retry) {
return new RetryState(retry ? RetryState.RETRIES : 0);
}

static <R> Supplier<R> decorateReadWithRetries(final RetryState retryState, final Supplier<R> readFunction) {
static <R> Supplier<R> decorateReadWithRetries(final RetryState retryState, final OperationContext operationContext,
final Supplier<R> readFunction) {
return new RetryingSyncSupplier<>(retryState, CommandOperationHelper::chooseRetryableReadException,
CommandOperationHelper::shouldAttemptToRetryRead, () -> {
logRetryExecute(retryState);
logRetryExecute(retryState, operationContext);
return readFunction.get();
});
}

static <R> AsyncCallbackSupplier<R> decorateReadWithRetries(final RetryState retryState,
static <R> AsyncCallbackSupplier<R> decorateReadWithRetries(final RetryState retryState, final OperationContext operationContext,
final AsyncCallbackSupplier<R> asyncReadFunction) {
return new RetryingAsyncCallbackSupplier<>(retryState, CommandOperationHelper::chooseRetryableReadException,
CommandOperationHelper::shouldAttemptToRetryRead, callback -> {
logRetryExecute(retryState);
logRetryExecute(retryState, operationContext);
asyncReadFunction.get(callback);
});
}
Expand All @@ -219,7 +221,7 @@ static <D, T> T executeRetryableRead(
final CommandReadTransformer<D, T> transformer,
final boolean retryReads) {
RetryState retryState = initialRetryState(retryReads);
Supplier<T> read = decorateReadWithRetries(retryState, () ->
Supplier<T> read = decorateReadWithRetries(retryState, binding.getOperationContext(), () ->
withSourceAndConnection(readConnectionSourceSupplier, false, (source, connection) -> {
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getSessionContext()));
return createReadCommandAndExecute(retryState, binding, source, database, commandCreator, decoder, transformer, connection);
Expand Down Expand Up @@ -288,7 +290,8 @@ static <D, T> void executeRetryableReadAsync(
final SingleResultCallback<T> callback) {
RetryState retryState = initialRetryState(retryReads);
binding.retain();
AsyncCallbackSupplier<T> asyncRead = CommandOperationHelper.<T>decorateReadWithRetries(retryState, funcCallback ->
AsyncCallbackSupplier<T> asyncRead = CommandOperationHelper.<T>decorateReadWithRetries(retryState, binding.getOperationContext(),
funcCallback ->
withAsyncSourceAndConnection(sourceAsyncSupplier, false, funcCallback,
(source, connection, releasingCallback) -> {
if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryRead(source.getServerDescription(),
Expand Down Expand Up @@ -375,19 +378,20 @@ static <T> void executeCommandAsync(final AsyncWriteBinding binding,
binding, transformingWriteCallback(transformer, connection, addingRetryableLabelCallback));
}

static <R> Supplier<R> decorateWriteWithRetries(final RetryState retryState, final Supplier<R> writeFunction) {
static <R> Supplier<R> decorateWriteWithRetries(final RetryState retryState,
final OperationContext operationContext, final Supplier<R> writeFunction) {
return new RetryingSyncSupplier<>(retryState, CommandOperationHelper::chooseRetryableWriteException,
CommandOperationHelper::shouldAttemptToRetryWrite, () -> {
logRetryExecute(retryState);
logRetryExecute(retryState, operationContext);
return writeFunction.get();
});
}

static <R> AsyncCallbackSupplier<R> decorateWriteWithRetries(final RetryState retryState,
static <R> AsyncCallbackSupplier<R> decorateWriteWithRetries(final RetryState retryState, final OperationContext operationContext,
final AsyncCallbackSupplier<R> asyncWriteFunction) {
return new RetryingAsyncCallbackSupplier<>(retryState, CommandOperationHelper::chooseRetryableWriteException,
CommandOperationHelper::shouldAttemptToRetryWrite, callback -> {
logRetryExecute(retryState);
logRetryExecute(retryState, operationContext);
asyncWriteFunction.get(callback);
});
}
Expand All @@ -402,7 +406,7 @@ static <T, R> R executeRetryableWrite(
final CommandWriteTransformer<T, R> transformer,
final Function<BsonDocument, BsonDocument> retryCommandModifier) {
RetryState retryState = initialRetryState(true);
Supplier<R> retryingWrite = decorateWriteWithRetries(retryState, () -> {
Supplier<R> retryingWrite = decorateWriteWithRetries(retryState, binding.getOperationContext(), () -> {
boolean firstAttempt = retryState.isFirstAttempt();
if (!firstAttempt && binding.getSessionContext().hasActiveTransaction()) {
binding.getSessionContext().clearTransactionContext();
Expand Down Expand Up @@ -451,7 +455,8 @@ static <T, R> void executeRetryableWriteAsync(
final SingleResultCallback<R> callback) {
RetryState retryState = initialRetryState(true);
binding.retain();
AsyncCallbackSupplier<R> asyncWrite = CommandOperationHelper.<R>decorateWriteWithRetries(retryState, funcCallback -> {
AsyncCallbackSupplier<R> asyncWrite = CommandOperationHelper.<R>decorateWriteWithRetries(retryState,
binding.getOperationContext(), funcCallback -> {
boolean firstAttempt = retryState.isFirstAttempt();
if (!firstAttempt && binding.getSessionContext().hasActiveTransaction()) {
binding.getSessionContext().clearTransactionContext();
Expand Down Expand Up @@ -601,15 +606,17 @@ static void addRetryableWriteErrorLabel(final MongoException exception, final in
}
}

static void logRetryExecute(final RetryState retryState) {
static void logRetryExecute(final RetryState retryState, final OperationContext operationContext) {
if (LOGGER.isDebugEnabled() && !retryState.isFirstAttempt()) {
String commandDescription = retryState.attachment(AttachmentKeys.commandDescriptionSupplier()).map(Supplier::get).orElse(null);
Throwable exception = retryState.exception().orElseThrow(Assertions::fail);
int oneBasedAttempt = retryState.attempt() + 1;
long operationId = operationContext.getId();
LOGGER.debug(commandDescription == null
? format("Retrying the operation due to the error \"%s\"; attempt #%d", exception, oneBasedAttempt)
: format("Retrying the operation '%s' due to the error \"%s\"; attempt #%d",
commandDescription, exception, oneBasedAttempt));
? format("Retrying the operation with operation ID %s due to the error \"%s\". Attempt number: #%d",
operationId, exception, oneBasedAttempt)
: format("Retrying the operation '%s' with operation ID %s due to the error \"%s\". Attempt number: #%d",
commandDescription, operationId, exception, oneBasedAttempt));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public FindOperation<T> allowDiskUse(@Nullable final Boolean allowDiskUse) {
@Override
public BatchCursor<T> execute(final ReadBinding binding) {
RetryState retryState = initialRetryState(retryReads);
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, () ->
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, binding.getOperationContext(), () ->
withSourceAndConnection(binding::getReadConnectionSource, false, (source, connection) -> {
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getSessionContext()));
try {
Expand All @@ -338,7 +338,7 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
RetryState retryState = initialRetryState(retryReads);
binding.retain();
AsyncCallbackSupplier<AsyncBatchCursor<T>> asyncRead = CommandOperationHelper.<AsyncBatchCursor<T>>decorateReadWithRetries(
retryState, funcCallback ->
retryState, binding.getOperationContext(), funcCallback ->
withAsyncSourceAndConnection(binding::getReadConnectionSource, false, funcCallback,
(source, connection, releasingCallback) -> {
if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryRead(source.getServerDescription(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public ListCollectionsOperation<T> comment(@Nullable final BsonValue comment) {
@Override
public BatchCursor<T> execute(final ReadBinding binding) {
RetryState retryState = initialRetryState(retryReads);
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, () ->
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, binding.getOperationContext(), () ->
withSourceAndConnection(binding::getReadConnectionSource, false, (source, connection) -> {
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getSessionContext()));
try {
Expand All @@ -161,7 +161,7 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
RetryState retryState = initialRetryState(retryReads);
binding.retain();
AsyncCallbackSupplier<AsyncBatchCursor<T>> asyncRead = CommandOperationHelper.<AsyncBatchCursor<T>>decorateReadWithRetries(
retryState, funcCallback ->
retryState, binding.getOperationContext(), funcCallback ->
withAsyncSourceAndConnection(binding::getReadConnectionSource, false, funcCallback,
(source, connection, releasingCallback) -> {
if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryRead(source.getServerDescription(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public ListIndexesOperation<T> comment(@Nullable final BsonValue comment) {
@Override
public BatchCursor<T> execute(final ReadBinding binding) {
RetryState retryState = initialRetryState(retryReads);
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, () ->
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, binding.getOperationContext(), () ->
withSourceAndConnection(binding::getReadConnectionSource, false, (source, connection) -> {
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getSessionContext()));
try {
Expand All @@ -139,7 +139,7 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
RetryState retryState = initialRetryState(retryReads);
binding.retain();
AsyncCallbackSupplier<AsyncBatchCursor<T>> asyncRead = CommandOperationHelper.<AsyncBatchCursor<T>>decorateReadWithRetries(
retryState, funcCallback ->
retryState, binding.getOperationContext(), funcCallback ->
withAsyncSourceAndConnection(binding::getReadConnectionSource, false, funcCallback,
(source, connection, releasingCallback) -> {
if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryRead(source.getServerDescription(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.mongodb.internal.connection.AsyncConnection;
import com.mongodb.internal.connection.Connection;
import com.mongodb.internal.connection.MongoWriteConcernWithResponseException;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.connection.ProtocolHelper;
import com.mongodb.internal.operation.retry.AttachmentKeys;
import com.mongodb.internal.session.SessionContext;
Expand Down Expand Up @@ -137,19 +138,20 @@ public Boolean getRetryWrites() {
return retryWrites;
}

private <R> Supplier<R> decorateWriteWithRetries(final RetryState retryState, final Supplier<R> writeFunction) {
private <R> Supplier<R> decorateWriteWithRetries(final RetryState retryState, final OperationContext operationContext,
final Supplier<R> writeFunction) {
return new RetryingSyncSupplier<>(retryState, CommandOperationHelper::chooseRetryableWriteException,
this::shouldAttemptToRetryWrite, () -> {
logRetryExecute(retryState);
logRetryExecute(retryState, operationContext);
return writeFunction.get();
});
}

private <R> AsyncCallbackSupplier<R> decorateWriteWithRetries(final RetryState retryState,
private <R> AsyncCallbackSupplier<R> decorateWriteWithRetries(final RetryState retryState, final OperationContext operationContext,
final AsyncCallbackSupplier<R> writeFunction) {
return new RetryingAsyncCallbackSupplier<>(retryState, CommandOperationHelper::chooseRetryableWriteException,
this::shouldAttemptToRetryWrite, callback -> {
logRetryExecute(retryState);
logRetryExecute(retryState, operationContext);
writeFunction.get(callback);
});
}
Expand Down Expand Up @@ -182,7 +184,7 @@ public BulkWriteResult execute(final WriteBinding binding) {
* and the code related to the attempt tracking in `BulkWriteTracker` will be removed. */
RetryState retryState = new RetryState();
BulkWriteTracker.attachNew(retryState, retryWrites);
Supplier<BulkWriteResult> retryingBulkWrite = decorateWriteWithRetries(retryState, () ->
Supplier<BulkWriteResult> retryingBulkWrite = decorateWriteWithRetries(retryState, binding.getOperationContext(), () ->
withSourceAndConnection(binding::getWriteConnectionSource, true, (source, connection) -> {
ConnectionDescription connectionDescription = connection.getDescription();
// attach `maxWireVersion` ASAP because it is used to check whether we can retry
Expand Down Expand Up @@ -214,6 +216,7 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
BulkWriteTracker.attachNew(retryState, retryWrites);
binding.retain();
AsyncCallbackSupplier<BulkWriteResult> retryingBulkWrite = this.<BulkWriteResult>decorateWriteWithRetries(retryState,
binding.getOperationContext(),
funcCallback ->
withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, funcCallback,
(source, connection, releasingCallback) -> {
Expand Down