44
44
import com .mongodb .internal .binding .WriteBinding ;
45
45
import com .mongodb .internal .connection .AsyncConnection ;
46
46
import com .mongodb .internal .connection .Connection ;
47
+ import com .mongodb .internal .connection .OperationContext ;
47
48
import com .mongodb .internal .operation .OperationHelper .ResourceSupplierInternalException ;
48
49
import com .mongodb .internal .operation .retry .AttachmentKeys ;
49
50
import com .mongodb .internal .validator .NoOpFieldNameValidator ;
@@ -183,19 +184,20 @@ static RetryState initialRetryState(final boolean retry) {
183
184
return new RetryState (retry ? RetryState .RETRIES : 0 );
184
185
}
185
186
186
- static <R > Supplier <R > decorateReadWithRetries (final RetryState retryState , final Supplier <R > readFunction ) {
187
+ static <R > Supplier <R > decorateReadWithRetries (final RetryState retryState , final OperationContext operationContext ,
188
+ final Supplier <R > readFunction ) {
187
189
return new RetryingSyncSupplier <>(retryState , CommandOperationHelper ::chooseRetryableReadException ,
188
190
CommandOperationHelper ::shouldAttemptToRetryRead , () -> {
189
- logRetryExecute (retryState );
191
+ logRetryExecute (retryState , operationContext );
190
192
return readFunction .get ();
191
193
});
192
194
}
193
195
194
- static <R > AsyncCallbackSupplier <R > decorateReadWithRetries (final RetryState retryState ,
196
+ static <R > AsyncCallbackSupplier <R > decorateReadWithRetries (final RetryState retryState , final OperationContext operationContext ,
195
197
final AsyncCallbackSupplier <R > asyncReadFunction ) {
196
198
return new RetryingAsyncCallbackSupplier <>(retryState , CommandOperationHelper ::chooseRetryableReadException ,
197
199
CommandOperationHelper ::shouldAttemptToRetryRead , callback -> {
198
- logRetryExecute (retryState );
200
+ logRetryExecute (retryState , operationContext );
199
201
asyncReadFunction .get (callback );
200
202
});
201
203
}
@@ -219,7 +221,7 @@ static <D, T> T executeRetryableRead(
219
221
final CommandReadTransformer <D , T > transformer ,
220
222
final boolean retryReads ) {
221
223
RetryState retryState = initialRetryState (retryReads );
222
- Supplier <T > read = decorateReadWithRetries (retryState , () ->
224
+ Supplier <T > read = decorateReadWithRetries (retryState , binding . getOperationContext (), () ->
223
225
withSourceAndConnection (readConnectionSourceSupplier , false , (source , connection ) -> {
224
226
retryState .breakAndThrowIfRetryAnd (() -> !canRetryRead (source .getServerDescription (), binding .getSessionContext ()));
225
227
return createReadCommandAndExecute (retryState , binding , source , database , commandCreator , decoder , transformer , connection );
@@ -288,7 +290,8 @@ static <D, T> void executeRetryableReadAsync(
288
290
final SingleResultCallback <T > callback ) {
289
291
RetryState retryState = initialRetryState (retryReads );
290
292
binding .retain ();
291
- AsyncCallbackSupplier <T > asyncRead = CommandOperationHelper .<T >decorateReadWithRetries (retryState , funcCallback ->
293
+ AsyncCallbackSupplier <T > asyncRead = CommandOperationHelper .<T >decorateReadWithRetries (retryState , binding .getOperationContext (),
294
+ funcCallback ->
292
295
withAsyncSourceAndConnection (sourceAsyncSupplier , false , funcCallback ,
293
296
(source , connection , releasingCallback ) -> {
294
297
if (retryState .breakAndCompleteIfRetryAnd (() -> !canRetryRead (source .getServerDescription (),
@@ -375,19 +378,20 @@ static <T> void executeCommandAsync(final AsyncWriteBinding binding,
375
378
binding , transformingWriteCallback (transformer , connection , addingRetryableLabelCallback ));
376
379
}
377
380
378
- static <R > Supplier <R > decorateWriteWithRetries (final RetryState retryState , final Supplier <R > writeFunction ) {
381
+ static <R > Supplier <R > decorateWriteWithRetries (final RetryState retryState ,
382
+ final OperationContext operationContext , final Supplier <R > writeFunction ) {
379
383
return new RetryingSyncSupplier <>(retryState , CommandOperationHelper ::chooseRetryableWriteException ,
380
384
CommandOperationHelper ::shouldAttemptToRetryWrite , () -> {
381
- logRetryExecute (retryState );
385
+ logRetryExecute (retryState , operationContext );
382
386
return writeFunction .get ();
383
387
});
384
388
}
385
389
386
- static <R > AsyncCallbackSupplier <R > decorateWriteWithRetries (final RetryState retryState ,
390
+ static <R > AsyncCallbackSupplier <R > decorateWriteWithRetries (final RetryState retryState , final OperationContext operationContext ,
387
391
final AsyncCallbackSupplier <R > asyncWriteFunction ) {
388
392
return new RetryingAsyncCallbackSupplier <>(retryState , CommandOperationHelper ::chooseRetryableWriteException ,
389
393
CommandOperationHelper ::shouldAttemptToRetryWrite , callback -> {
390
- logRetryExecute (retryState );
394
+ logRetryExecute (retryState , operationContext );
391
395
asyncWriteFunction .get (callback );
392
396
});
393
397
}
@@ -402,7 +406,7 @@ static <T, R> R executeRetryableWrite(
402
406
final CommandWriteTransformer <T , R > transformer ,
403
407
final Function <BsonDocument , BsonDocument > retryCommandModifier ) {
404
408
RetryState retryState = initialRetryState (true );
405
- Supplier <R > retryingWrite = decorateWriteWithRetries (retryState , () -> {
409
+ Supplier <R > retryingWrite = decorateWriteWithRetries (retryState , binding . getOperationContext (), () -> {
406
410
boolean firstAttempt = retryState .isFirstAttempt ();
407
411
if (!firstAttempt && binding .getSessionContext ().hasActiveTransaction ()) {
408
412
binding .getSessionContext ().clearTransactionContext ();
@@ -451,7 +455,8 @@ static <T, R> void executeRetryableWriteAsync(
451
455
final SingleResultCallback <R > callback ) {
452
456
RetryState retryState = initialRetryState (true );
453
457
binding .retain ();
454
- AsyncCallbackSupplier <R > asyncWrite = CommandOperationHelper .<R >decorateWriteWithRetries (retryState , funcCallback -> {
458
+ AsyncCallbackSupplier <R > asyncWrite = CommandOperationHelper .<R >decorateWriteWithRetries (retryState ,
459
+ binding .getOperationContext (), funcCallback -> {
455
460
boolean firstAttempt = retryState .isFirstAttempt ();
456
461
if (!firstAttempt && binding .getSessionContext ().hasActiveTransaction ()) {
457
462
binding .getSessionContext ().clearTransactionContext ();
@@ -601,15 +606,17 @@ static void addRetryableWriteErrorLabel(final MongoException exception, final in
601
606
}
602
607
}
603
608
604
- static void logRetryExecute (final RetryState retryState ) {
609
+ static void logRetryExecute (final RetryState retryState , final OperationContext operationContext ) {
605
610
if (LOGGER .isDebugEnabled () && !retryState .isFirstAttempt ()) {
606
611
String commandDescription = retryState .attachment (AttachmentKeys .commandDescriptionSupplier ()).map (Supplier ::get ).orElse (null );
607
612
Throwable exception = retryState .exception ().orElseThrow (Assertions ::fail );
608
613
int oneBasedAttempt = retryState .attempt () + 1 ;
614
+ long operationId = operationContext .getId ();
609
615
LOGGER .debug (commandDescription == null
610
- ? format ("Retrying the operation due to the error \" %s\" ; attempt #%d" , exception , oneBasedAttempt )
611
- : format ("Retrying the operation '%s' due to the error \" %s\" ; attempt #%d" ,
612
- commandDescription , exception , oneBasedAttempt ));
616
+ ? format ("Retrying the operation with operation ID %s due to the error \" %s\" . Attempt number: #%d" ,
617
+ operationId , exception , oneBasedAttempt )
618
+ : format ("Retrying the operation '%s' with operation ID %s due to the error \" %s\" . Attempt number: #%d" ,
619
+ commandDescription , operationId , exception , oneBasedAttempt ));
613
620
}
614
621
}
615
622
0 commit comments