18
18
19
19
import org .reactivestreams .Publisher ;
20
20
import org .reactivestreams .Subscriber ;
21
- import reactor .core .CoreSubscriber ;
22
21
import reactor .core .publisher .Flux ;
23
22
import reactor .core .publisher .FluxSink ;
24
23
import reactor .core .publisher .Mono ;
25
- import reactor .util .context .Context ;
26
24
27
25
import java .util .Objects ;
28
26
import java .util .concurrent .atomic .AtomicBoolean ;
@@ -48,9 +46,9 @@ public void subscribe(final Subscriber<? super T> subscriber) {
48
46
if (calculateDemand (demand ) > 0 && inProgress .compareAndSet (false , true )) {
49
47
if (batchCursor == null ) {
50
48
int batchSize = calculateBatchSize (sink .requestedFromDownstream ());
51
- Context initialContext = subscriber instanceof CoreSubscriber <?>
52
- ? (( CoreSubscriber <?>) subscriber ). currentContext () : null ;
53
- batchCursorPublisher . batchCursor ( batchSize ) .subscribe (bc -> {
49
+ batchCursorPublisher . batchCursor ( batchSize )
50
+ . contextWrite ( sink . contextView ())
51
+ .subscribe (bc -> {
54
52
batchCursor = bc ;
55
53
inProgress .set (false );
56
54
@@ -60,7 +58,7 @@ public void subscribe(final Subscriber<? super T> subscriber) {
60
58
} else {
61
59
recurseCursor ();
62
60
}
63
- }, sink ::error , null , initialContext );
61
+ }, sink ::error );
64
62
} else {
65
63
inProgress .set (false );
66
64
recurseCursor ();
@@ -86,6 +84,7 @@ private void recurseCursor(){
86
84
} else {
87
85
batchCursor .setBatchSize (calculateBatchSize (sink .requestedFromDownstream ()));
88
86
Mono .from (batchCursor .next (() -> sink .isCancelled ()))
87
+ .contextWrite (sink .contextView ())
89
88
.doOnCancel (this ::closeCursor )
90
89
.subscribe (results -> {
91
90
if (!results .isEmpty ()) {
0 commit comments