Skip to content

Commit 7980c85

Browse files
committed
2.x: Fix switchMap not canceling properly during onNext-cancel races
1 parent e2b7816 commit 7980c85

File tree

3 files changed

+143
-3
lines changed

3 files changed

+143
-3
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,6 @@ void drain() {
199199
for (;;) {
200200

201201
if (cancelled) {
202-
active.lazySet(null);
203202
return;
204203
}
205204

src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@
1414
package io.reactivex.internal.operators.flowable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.util.*;
2021
import java.util.concurrent.*;
21-
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.atomic.*;
2223

2324
import org.junit.*;
2425
import org.mockito.InOrder;
@@ -1229,4 +1230,74 @@ public Publisher<Integer> apply(Integer v)
12291230
.test()
12301231
.assertResult(10, 20);
12311232
}
1233+
1234+
@Test
1235+
public void cancellationShouldTriggerInnerCancellationRace() throws Throwable {
1236+
final AtomicInteger outer = new AtomicInteger();
1237+
final AtomicInteger inner = new AtomicInteger();
1238+
1239+
int n = 10000;
1240+
for (int i = 0; i < n; i++) {
1241+
Flowable.<Integer>create(new FlowableOnSubscribe<Integer>() {
1242+
@Override
1243+
public void subscribe(FlowableEmitter<Integer> it)
1244+
throws Exception {
1245+
it.onNext(0);
1246+
}
1247+
}, BackpressureStrategy.MISSING)
1248+
.switchMap(new Function<Integer, Publisher<? extends Object>>() {
1249+
@Override
1250+
public Publisher<? extends Object> apply(Integer v)
1251+
throws Exception {
1252+
return createFlowable(inner);
1253+
}
1254+
})
1255+
.observeOn(Schedulers.computation())
1256+
.doFinally(new Action() {
1257+
@Override
1258+
public void run() throws Exception {
1259+
outer.incrementAndGet();
1260+
}
1261+
})
1262+
.take(1)
1263+
.blockingSubscribe(Functions.emptyConsumer(), new Consumer<Throwable>() {
1264+
@Override
1265+
public void accept(Throwable e) throws Exception {
1266+
e.printStackTrace();
1267+
}
1268+
});
1269+
}
1270+
1271+
Thread.sleep(100);
1272+
assertEquals(inner.get(), outer.get());
1273+
assertEquals(n, inner.get());
1274+
}
1275+
1276+
Flowable<Integer> createFlowable(final AtomicInteger inner) {
1277+
return Flowable.<Integer>unsafeCreate(new Publisher<Integer>() {
1278+
@Override
1279+
public void subscribe(Subscriber<? super Integer> s) {
1280+
final SerializedSubscriber<Integer> it = new SerializedSubscriber<Integer>(s);
1281+
it.onSubscribe(new BooleanSubscription());
1282+
Schedulers.io().scheduleDirect(new Runnable() {
1283+
@Override
1284+
public void run() {
1285+
it.onNext(1);
1286+
}
1287+
}, 0, TimeUnit.MILLISECONDS);
1288+
Schedulers.io().scheduleDirect(new Runnable() {
1289+
@Override
1290+
public void run() {
1291+
it.onNext(2);
1292+
}
1293+
}, 0, TimeUnit.MILLISECONDS);
1294+
}
1295+
})
1296+
.doFinally(new Action() {
1297+
@Override
1298+
public void run() throws Exception {
1299+
inner.incrementAndGet();
1300+
}
1301+
});
1302+
}
12321303
}

src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import io.reactivex.internal.functions.Functions;
3434
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3535
import io.reactivex.internal.util.ExceptionHelper;
36-
import io.reactivex.observers.TestObserver;
36+
import io.reactivex.observers.*;
3737
import io.reactivex.plugins.RxJavaPlugins;
3838
import io.reactivex.schedulers.*;
3939
import io.reactivex.subjects.PublishSubject;
@@ -1222,4 +1222,74 @@ public Observable<Integer> apply(Integer v)
12221222
.test()
12231223
.assertResult(10, 20);
12241224
}
1225+
1226+
@Test
1227+
public void cancellationShouldTriggerInnerCancellationRace() throws Throwable {
1228+
final AtomicInteger outer = new AtomicInteger();
1229+
final AtomicInteger inner = new AtomicInteger();
1230+
1231+
int n = 10000;
1232+
for (int i = 0; i < n; i++) {
1233+
Observable.<Integer>create(new ObservableOnSubscribe<Integer>() {
1234+
@Override
1235+
public void subscribe(ObservableEmitter<Integer> it)
1236+
throws Exception {
1237+
it.onNext(0);
1238+
}
1239+
})
1240+
.switchMap(new Function<Integer, ObservableSource<Integer>>() {
1241+
@Override
1242+
public ObservableSource<Integer> apply(Integer v)
1243+
throws Exception {
1244+
return createObservable(inner);
1245+
}
1246+
})
1247+
.observeOn(Schedulers.computation())
1248+
.doFinally(new Action() {
1249+
@Override
1250+
public void run() throws Exception {
1251+
outer.incrementAndGet();
1252+
}
1253+
})
1254+
.take(1)
1255+
.blockingSubscribe(Functions.emptyConsumer(), new Consumer<Throwable>() {
1256+
@Override
1257+
public void accept(Throwable e) throws Exception {
1258+
e.printStackTrace();
1259+
}
1260+
});
1261+
}
1262+
1263+
Thread.sleep(100);
1264+
assertEquals(inner.get(), outer.get());
1265+
assertEquals(n, inner.get());
1266+
}
1267+
1268+
Observable<Integer> createObservable(final AtomicInteger inner) {
1269+
return Observable.<Integer>unsafeCreate(new ObservableSource<Integer>() {
1270+
@Override
1271+
public void subscribe(Observer<? super Integer> observer) {
1272+
final SerializedObserver<Integer> it = new SerializedObserver<Integer>(observer);
1273+
it.onSubscribe(Disposables.empty());
1274+
Schedulers.io().scheduleDirect(new Runnable() {
1275+
@Override
1276+
public void run() {
1277+
it.onNext(1);
1278+
}
1279+
}, 0, TimeUnit.MILLISECONDS);
1280+
Schedulers.io().scheduleDirect(new Runnable() {
1281+
@Override
1282+
public void run() {
1283+
it.onNext(2);
1284+
}
1285+
}, 0, TimeUnit.MILLISECONDS);
1286+
}
1287+
})
1288+
.doFinally(new Action() {
1289+
@Override
1290+
public void run() throws Exception {
1291+
inner.incrementAndGet();
1292+
}
1293+
});
1294+
}
12251295
}

0 commit comments

Comments
 (0)