Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,11 @@ public void setup() {

Observable<Integer> source = Observable.fromArray(sourceArray);

observablePlain = source.concatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) {
return Observable.empty();
}
});

observableConvert = source.concatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) {
return Completable.complete().toObservable();
}
});

observableDedicated = source.concatMapCompletable(new Function<Integer, Completable>() {
@Override
public Completable apply(Integer v) {
return Completable.complete();
}
});
observablePlain = source.concatMap((Function<Integer, Observable<Integer>>) _ -> Observable.empty());

observableConvert = source.concatMap((Function<Integer, Observable<Integer>>) _ -> Completable.complete().toObservable());

observableDedicated = source.concatMapCompletable((Function<Integer, Completable>) _ -> Completable.complete());
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,11 @@ public void setup() {

Observable<Integer> source = Observable.fromArray(sourceArray);

observablePlain = source.concatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) {
return Observable.just(v);
}
});

observableConvert = source.concatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) {
return Maybe.just(v).toObservable();
}
});

observableDedicated = source.concatMapMaybe(new Function<Integer, Maybe<Integer>>() {
@Override
public Maybe<Integer> apply(Integer v) {
return Maybe.just(v);
}
});
observablePlain = source.concatMap((Function<Integer, Observable<Integer>>) Observable::just);

observableConvert = source.concatMap((Function<Integer, Observable<Integer>>) v -> Maybe.just(v).toObservable());

observableDedicated = source.concatMapMaybe((Function<Integer, Maybe<Integer>>) Maybe::just);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,11 @@ public void setup() {

Observable<Integer> source = Observable.fromArray(sourceArray);

observablePlain = source.concatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) {
return Observable.just(v);
}
});

observableConvert = source.concatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) {
return Single.just(v).toObservable();
}
});

observableDedicated = source.concatMapSingle(new Function<Integer, Single<Integer>>() {
@Override
public Single<Integer> apply(Integer v) {
return Single.just(v);
}
});
observablePlain = source.concatMap((Function<Integer, Observable<Integer>>) Observable::just);

observableConvert = source.concatMap((Function<Integer, Observable<Integer>>) v -> Single.just(v).toObservable());

observableDedicated = source.concatMapSingle((Function<Integer, Single<Integer>>) Single::just);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,11 @@ public void setup() {

Observable<Integer> source = Observable.fromArray(sourceArray);

observablePlain = source.flatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) {
return Observable.empty();
}
});

observableConvert = source.flatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) {
return Completable.complete().toObservable();
}
});

observableDedicated = source.flatMapCompletable(new Function<Integer, Completable>() {
@Override
public Completable apply(Integer v) {
return Completable.complete();
}
});
observablePlain = source.flatMap((Function<Integer, Observable<Integer>>) _ -> Observable.empty());

observableConvert = source.flatMap((Function<Integer, Observable<Integer>>) _ -> Completable.complete().toObservable());

observableDedicated = source.flatMapCompletable((Function<Integer, Completable>) _ -> Completable.complete());
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,11 @@ public void setup() {

Observable<Integer> source = Observable.fromArray(sourceArray);

observablePlain = source.flatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) {
return Observable.just(v);
}
});

observableConvert = source.flatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) {
return Maybe.just(v).toObservable();
}
});

observableDedicated = source.flatMapMaybe(new Function<Integer, Maybe<Integer>>() {
@Override
public Maybe<Integer> apply(Integer v) {
return Maybe.just(v);
}
});
observablePlain = source.flatMap((Function<Integer, Observable<Integer>>) Observable::just);

observableConvert = source.flatMap((Function<Integer, Observable<Integer>>) v -> Maybe.just(v).toObservable());

observableDedicated = source.flatMapMaybe((Function<Integer, Maybe<Integer>>) Maybe::just);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,11 @@ public void setup() {

Observable<Integer> source = Observable.fromArray(sourceArray);

observablePlain = source.flatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) {
return Observable.just(v);
}
});

observableConvert = source.flatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) {
return Single.just(v).toObservable();
}
});

observableDedicated = source.flatMapSingle(new Function<Integer, Single<Integer>>() {
@Override
public Single<Integer> apply(Integer v) {
return Single.just(v);
}
});
observablePlain = source.flatMap((Function<Integer, Observable<Integer>>) Observable::just);

observableConvert = source.flatMap((Function<Integer, Observable<Integer>>) v -> Single.just(v).toObservable());

observableDedicated = source.flatMapSingle((Function<Integer, Single<Integer>>) Single::just);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,7 @@ public void simple() {
public void simple2() {
final AtomicInteger counter = new AtomicInteger();
Observable.range(1, 5)
.concatMapCompletable(Functions.justFunction(Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
counter.incrementAndGet();
}
})))
.concatMapCompletable(Functions.justFunction(Completable.fromAction(counter::incrementAndGet)))
.test()
.assertResult();

Expand Down Expand Up @@ -84,12 +79,7 @@ public void innerError() {
public void innerErrorDelayed() {
TestObserverEx<Void> to = Observable.range(1, 5)
.concatMapCompletableDelayError(
new Function<Integer, CompletableSource>() {
@Override
public CompletableSource apply(Integer v) throws Exception {
return Completable.error(new TestException());
}
}
_ -> Completable.error(new TestException())
)
.to(TestHelper.<Void>testConsumer())
.assertFailure(CompositeException.class)
Expand All @@ -101,11 +91,8 @@ public CompletableSource apply(Integer v) throws Exception {
@Test
public void mapperCrash() {
Observable.just(1)
.concatMapCompletable(new Function<Integer, CompletableSource>() {
@Override
public CompletableSource apply(Integer v) throws Exception {
throw new TestException();
}
.concatMapCompletable(_ -> {
throw new TestException();
})
.test()
.assertFailure(TestException.class);
Expand All @@ -114,11 +101,8 @@ public CompletableSource apply(Integer v) throws Exception {
@Test
public void mapperCrashHidden() {
Observable.just(1).hide()
.concatMapCompletable(new Function<Integer, CompletableSource>() {
@Override
public CompletableSource apply(Integer v) throws Exception {
throw new TestException();
}
.concatMapCompletable(_ -> {
throw new TestException();
})
.test()
.assertFailure(TestException.class);
Expand Down Expand Up @@ -207,15 +191,12 @@ public void endError() {
final CompletableSubject cs2 = CompletableSubject.create();

TestObserver<Void> to = ps.concatMapCompletableDelayError(
new Function<Integer, CompletableSource>() {
@Override
public CompletableSource apply(Integer v) throws Exception {
if (v == 1) {
return cs;
}
return cs2;
}
}, true, 32
v -> {
if (v == 1) {
return cs;
}
return cs2;
}, true, 32
)
.test();

Expand Down Expand Up @@ -250,14 +231,8 @@ public CompletableSource apply(Integer v) throws Exception {
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeObservableToCompletable(
new Function<Observable<Object>, Completable>() {
@Override
public Completable apply(Observable<Object> f)
throws Exception {
return f.concatMapCompletable(
Functions.justFunction(Completable.complete()));
}
}
(Function<Observable<Object>, Completable>) f -> f.concatMapCompletable(
Functions.justFunction(Completable.complete()))
);
}

Expand Down Expand Up @@ -286,28 +261,13 @@ public void immediateOuterInnerErrorRace() {

ps.onNext(1);

Runnable r1 = new Runnable() {
@Override
public void run() {
ps.onError(ex);
}
};
Runnable r1 = () -> ps.onError(ex);

Runnable r2 = new Runnable() {
@Override
public void run() {
cs.onError(ex);
}
};
Runnable r2 = () -> cs.onError(ex);

TestHelper.race(r1, r2);

to.assertError(new Predicate<Throwable>() {
@Override
public boolean test(Throwable e) throws Exception {
return e instanceof TestException || e instanceof CompositeException;
}
})
to.assertError(e -> e instanceof TestException || e instanceof CompositeException)
.assertNotComplete();

if (!errors.isEmpty()) {
Expand All @@ -332,19 +292,11 @@ public void disposeInDrainLoop() {

ps.onNext(1);

Runnable r1 = new Runnable() {
@Override
public void run() {
ps.onNext(2);
}
};
Runnable r1 = () -> ps.onNext(2);

Runnable r2 = new Runnable() {
@Override
public void run() {
cs.onComplete();
to.dispose();
}
Runnable r2 = () -> {
cs.onComplete();
to.dispose();
};

TestHelper.race(r1, r2);
Expand Down Expand Up @@ -432,47 +384,20 @@ public void justScalarSource() {

@Test
public void undeliverableUponCancel() {
TestHelper.checkUndeliverableUponCancel(new ObservableConverter<Integer, Completable>() {
@Override
public Completable apply(Observable<Integer> upstream) {
return upstream.concatMapCompletable(new Function<Integer, Completable>() {
@Override
public Completable apply(Integer v) throws Throwable {
return Completable.complete().hide();
}
});
}
});
TestHelper.checkUndeliverableUponCancel((ObservableConverter<Integer, Completable>) upstream ->
upstream.concatMapCompletable((Function<Integer, Completable>) _ -> Completable.complete().hide()));
}

@Test
public void undeliverableUponCancelDelayError() {
TestHelper.checkUndeliverableUponCancel(new ObservableConverter<Integer, Completable>() {
@Override
public Completable apply(Observable<Integer> upstream) {
return upstream.concatMapCompletableDelayError(new Function<Integer, Completable>() {
@Override
public Completable apply(Integer v) throws Throwable {
return Completable.complete().hide();
}
}, false, 2);
}
});
TestHelper.checkUndeliverableUponCancel((ObservableConverter<Integer, Completable>) upstream ->
upstream.concatMapCompletableDelayError((Function<Integer, Completable>) _ -> Completable.complete().hide(), false, 2));
}

@Test
public void undeliverableUponCancelDelayErrorTillEnd() {
TestHelper.checkUndeliverableUponCancel(new ObservableConverter<Integer, Completable>() {
@Override
public Completable apply(Observable<Integer> upstream) {
return upstream.concatMapCompletableDelayError(new Function<Integer, Completable>() {
@Override
public Completable apply(Integer v) throws Throwable {
return Completable.complete().hide();
}
}, true, 2);
}
});
TestHelper.checkUndeliverableUponCancel((ObservableConverter<Integer, Completable>) upstream ->
upstream.concatMapCompletableDelayError((Function<Integer, Completable>) _ -> Completable.complete().hide(), true, 2));
}

@Test
Expand Down
Loading