[BAEL-3348] Moved code to algorithm-4

This commit is contained in:
dupirefr
2019-11-01 00:35:30 +01:00
parent db85c8f275
commit fee1da6091
20514 changed files with 1642355 additions and 0 deletions
+14
View File
@@ -0,0 +1,14 @@
## RxJava
This module contains articles about RxJava.
### Relevant articles:
- [RxJava and Error Handling](https://www.baeldung.com/rxjava-error-handling)
- [RxJava 2 Flowable](https://www.baeldung.com/rxjava-2-flowable)
- [RxJava Maybe](https://www.baeldung.com/rxjava-maybe)
- [Introduction to RxRelay for RxJava](https://www.baeldung.com/rx-relay)
- [Combining RxJava Completables](https://www.baeldung.com/rxjava-completable)
- [Converting Synchronous and Asynchronous APIs to Observables using RxJava2](https://www.baeldung.com/rxjava-apis-to-observables)
- [RxJava Hooks](https://www.baeldung.com/rxjava-hooks)
- More articles: [[<-- prev]](/rxjava)
+53
View File
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>rxjava-2</artifactId>
<name>rxjava-2</name>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-java</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-java</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.java2.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
</dependency>
<dependency>
<groupId>com.jakewharton.rxrelay2</groupId>
<artifactId>rxrelay</artifactId>
<version>${rxrelay.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.akarnokd/rxjava2-extensions -->
<dependency>
<groupId>com.github.akarnokd</groupId>
<artifactId>rxjava2-extensions</artifactId>
<version>${rxjava2.ext.version}</version>
</dependency>
</dependencies>
<properties>
<assertj.version>3.8.0</assertj.version>
<rx.java2.version>2.2.2</rx.java2.version>
<awaitility.version>1.7.0</awaitility.version>
<rxrelay.version>2.0.0</rxrelay.version>
<rxjava2.ext.version>0.20.4</rxjava2.ext.version>
</properties>
</project>
@@ -0,0 +1,32 @@
package com.baeldung.rxjava;
import com.jakewharton.rxrelay2.Relay;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposables;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class RandomRelay extends Relay<Integer> {
Random random = new Random();
List<Observer<? super Integer>> observers = new ArrayList<>();
@Override
public void accept(Integer integer) {
int observerIndex = random.nextInt(observers.size()) & Integer.MAX_VALUE;
observers.get(observerIndex).onNext(integer);
}
@Override
public boolean hasObservers() {
return observers.isEmpty();
}
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observers.add(observer);
observer.onSubscribe(Disposables.fromRunnable(() -> System.out.println("Disposed")));
}
}
+13
View File
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
@@ -0,0 +1,107 @@
package com.baeldung.rxjava;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import hu.akarnokd.rxjava2.async.AsyncObservable;
import io.reactivex.Observable;
public class AsyncAndSyncToObservableIntegrationTest {
AtomicInteger counter = new AtomicInteger();
Callable<Integer> callable = () -> counter.incrementAndGet();
/* Method will execute every time it gets subscribed*/
@Test
public void givenSyncMethod_whenConvertedWithFromCallable_thenReturnObservable() {
Observable<Integer> source = Observable.fromCallable(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
}
/* Method will execute only once and cache its result.*/
@Test
public void givenSyncMethod_whenConvertedWithStart_thenReturnObservable() {
Observable<Integer> source = AsyncObservable.start(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
}
/* Method will execute only once and cache its result.*/
@Test
public void givenAsyncMethod_whenConvertedWithFromFuture_thenRetrunObservble() {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(callable);
Observable<Integer> source = Observable.fromFuture(future);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
executor.shutdown();
}
/* Method will execute every time it gets subscribed*/
@Test
public void givenAsyncMethod_whenConvertedWithStartFuture_thenRetrunObservble() {
ExecutorService executor = Executors.newSingleThreadExecutor();
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
executor.shutdown();
}
/*Method will execute only once and cache its result.*/
@Test
public void givenAsyncMethod_whenConvertedWithDeferFuture_thenRetrunObservble() {
List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(), counter.incrementAndGet(), counter.incrementAndGet() });
ExecutorService exec = Executors.newSingleThreadExecutor();
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);
Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
for (int i = 1; i < 4; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3);
}
exec.shutdown();
}
}
@@ -0,0 +1,112 @@
package com.baeldung.rxjava;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.observers.DisposableCompletableObserver;
import org.junit.Before;
import org.junit.Test;
public class CompletableUnitTest {
Completable first;
Completable second;
Completable error;
Throwable throwable = new RuntimeException();
@Before
public void setUpCompletables() {
first = Completable.fromSingle(Single.just(1));
second = Completable.fromRunnable(() -> {});
error = Single.error(throwable)
.ignoreElement();
}
@Test
public void whenCompletableConstructed_thenCompletedSuccessfully() {
Completable completed = Completable.complete();
completed.subscribe(new DisposableCompletableObserver() {
@Override
public void onComplete() {
System.out.println("Completed!");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
});
Flowable<String> flowable = Flowable.just("request received", "user logged in");
Completable flowableCompletable = Completable.fromPublisher(flowable);
Completable singleCompletable = Single.just(1)
.ignoreElement();
completed.andThen(flowableCompletable)
.andThen(singleCompletable)
.test()
.assertComplete();
}
@Test
public void whenCombiningCompletables_thenCompletedSuccessfully() {
first.andThen(second)
.test()
.assertComplete();
}
@Test
public void whenCombinedWithError_thenCompletedWithError() {
first.andThen(second)
.andThen(error)
.test()
.assertError(throwable);
}
@Test
public void whenCombinedWithNever_thenDoesNotComplete() {
first.andThen(second)
.andThen(Completable.never())
.test()
.assertNotComplete();
}
@Test
public void whenMergedCompletables_thenCompletedSuccessfully() {
Completable.mergeArray(first, second)
.test()
.assertComplete();
}
@Test
public void whenMergedWithError_thenCompletedWithError() {
Completable.mergeArray(first, second, error)
.test()
.assertError(throwable);
}
@Test
public void whenFlatMaped_thenCompletedSuccessfully() {
Completable allElementsCompletable = Flowable.just("request received", "user logged in")
.flatMapCompletable(message -> Completable
.fromRunnable(() -> System.out.println(message))
);
allElementsCompletable
.test()
.assertComplete();
}
@Test
public void whenAmbWithNever_thenCompletedSuccessfully() {
Completable.ambArray(first, Completable.never(), second)
.test()
.assertComplete();
}
@Test
public void whenAmbWithError_thenCompletedWithError() {
Completable.ambArray(error, first, second)
.test()
.assertError(throwable);
}
}
@@ -0,0 +1,93 @@
package com.baeldung.rxjava;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;
import org.junit.Test;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class FlowableIntegrationTest {
@Test public void whenFlowableIsCreated_thenItIsProperlyInitialized() {
Flowable<Integer> integerFlowable = Flowable.just(1, 2, 3, 4);
assertNotNull(integerFlowable);
}
@Test public void whenFlowableIsCreatedFromObservable_thenItIsProperlyInitialized() throws InterruptedException {
Observable<Integer> integerObservable = Observable.just(1, 2, 3);
Flowable<Integer> integerFlowable = integerObservable.toFlowable(BackpressureStrategy.BUFFER);
assertNotNull(integerFlowable);
}
@Test public void whenFlowableIsCreatedFromFlowableOnSubscribe_thenItIsProperlyInitialized() throws InterruptedException {
FlowableOnSubscribe<Integer> flowableOnSubscribe = flowableEmitter -> flowableEmitter.onNext(1);
Flowable<Integer> integerFlowable = Flowable.create(flowableOnSubscribe, BackpressureStrategy.BUFFER);
assertNotNull(integerFlowable);
}
@Test public void thenAllValuesAreBufferedAndReceived() {
List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList());
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable.toFlowable(BackpressureStrategy.BUFFER).observeOn(Schedulers.computation()).test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList());
assertEquals(testList, receivedInts);
}
@Test public void whenDropStrategyUsed_thenOnBackpressureDropped() {
List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList());
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable.toFlowable(BackpressureStrategy.DROP).observeOn(Schedulers.computation()).test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList());
assertThat(receivedInts.size() < testList.size());
assertThat(!receivedInts.contains(100000));
}
@Test public void whenMissingStrategyUsed_thenException() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}
@Test public void whenErrorStrategyUsed_thenExceptionIsThrown() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.ERROR).observeOn(Schedulers.computation()).test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}
@Test public void whenLatestStrategyUsed_thenTheLastElementReceived() {
List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList());
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable.toFlowable(BackpressureStrategy.LATEST).observeOn(Schedulers.computation()).test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList());
assertThat(receivedInts.size() < testList.size());
assertThat(receivedInts.contains(100000));
}
}
@@ -0,0 +1,40 @@
package com.baeldung.rxjava;
import org.junit.Test;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
public class MaybeUnitTest {
@Test
public void whenEmitsSingleValue_thenItIsObserved() {
Maybe<Integer> maybe = Flowable.just(1, 2, 3, 4, 5)
.firstElement();
maybe.map(x -> x + 7)
.filter(x -> x > 0)
.test()
.assertResult(8)
.assertComplete();
}
@Test
public void whenEmitsNoValue_thenSignalsCompletionAndNoValueObserved() {
Maybe<Integer> maybe = Flowable.just(1, 2, 3, 4, 5)
.skip(5)
.firstElement();
maybe.test()
.assertComplete()
.assertNoValues();
}
@Test
public void whenThrowsError_thenErrorIsRaised() {
Maybe<Integer> maybe = Flowable.<Integer> error(new Exception("msg"))
.firstElement();
maybe.test()
.assertErrorMessage("msg");
}
}
@@ -0,0 +1,84 @@
package com.baeldung.rxjava;
import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Test;
import io.reactivex.Observable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
public class RxJavaHooksManualTest {
private boolean initHookCalled = false;
private boolean hookCalled = false;
@Test
public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() {
RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.io())
.test();
assertTrue(hookCalled && initHookCalled);
}
@Test
public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() {
RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 15)
.map(v -> v * 2)
.subscribeOn(Schedulers.newThread())
.test();
assertTrue(hookCalled && initHookCalled);
}
@Test
public void givenSingleScheduler_whenCalled_shouldExecuteTheHooks() {
RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();
assertTrue(hookCalled && initHookCalled);
}
@After
public void reset() {
hookCalled = false;
initHookCalled = false;
RxJavaPlugins.reset();
}
}
@@ -0,0 +1,244 @@
package com.baeldung.rxjava;
import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Test;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
public class RxJavaHooksUnitTest {
private boolean initHookCalled = false;
private boolean hookCalled = false;
@Test
public void givenObservable_whenError_shouldExecuteTheHook() {
RxJavaPlugins.setErrorHandler(throwable -> {
hookCalled = true;
});
Observable.error(new IllegalStateException())
.subscribe();
assertTrue(hookCalled);
}
@Test
public void givenCompletable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnCompletableAssembly(completable -> {
hookCalled = true;
return completable;
});
Completable.fromSingle(Single.just(1));
assertTrue(hookCalled);
}
@Test
public void givenCompletable_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
hookCalled = true;
return observer;
});
Completable.fromSingle(Single.just(1))
.test();
assertTrue(hookCalled);
}
@Test
public void givenObservable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnObservableAssembly(observable -> {
hookCalled = true;
return observable;
});
Observable.range(1, 10);
assertTrue(hookCalled);
}
@Test
public void givenObservable_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
hookCalled = true;
return observer;
});
Observable.range(1, 10)
.test();
assertTrue(hookCalled);
}
@Test
public void givenConnectableObservable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
hookCalled = true;
return connectableObservable;
});
ConnectableObservable.range(1, 10)
.publish()
.connect();
assertTrue(hookCalled);
}
@Test
public void givenFlowable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnFlowableAssembly(flowable -> {
hookCalled = true;
return flowable;
});
Flowable.range(1, 10);
assertTrue(hookCalled);
}
@Test
public void givenFlowable_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
hookCalled = true;
return observer;
});
Flowable.range(1, 10)
.test();
assertTrue(hookCalled);
}
@Test
public void givenConnectableFlowable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
hookCalled = true;
return connectableFlowable;
});
ConnectableFlowable.range(1, 10)
.publish()
.connect();
assertTrue(hookCalled);
}
@Test
public void givenParallel_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
hookCalled = true;
return parallelFlowable;
});
Flowable.range(1, 10)
.parallel();
assertTrue(hookCalled);
}
@Test
public void givenMaybe_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnMaybeAssembly(maybe -> {
hookCalled = true;
return maybe;
});
Maybe.just(1);
assertTrue(hookCalled);
}
@Test
public void givenMaybe_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
hookCalled = true;
return observer;
});
Maybe.just(1)
.test();
assertTrue(hookCalled);
}
@Test
public void givenSingle_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnSingleAssembly(single -> {
hookCalled = true;
return single;
});
Single.just(1);
assertTrue(hookCalled);
}
@Test
public void givenSingle_whenSubscribed_shouldExecuteTheHook() {
RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
hookCalled = true;
return observer;
});
Single.just(1)
.test();
assertTrue(hookCalled);
}
@Test
public void givenAnyScheduler_whenCalled_shouldExecuteTheHook() {
RxJavaPlugins.setScheduleHandler((runnable) -> {
hookCalled = true;
return runnable;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();
hookCalled = false;
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();
assertTrue(hookCalled);
}
@Test
public void givenComputationScheduler_whenCalled_shouldExecuteTheHooks() {
RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();
assertTrue(hookCalled && initHookCalled);
}
@After
public void reset() {
initHookCalled = false;
hookCalled = false;
RxJavaPlugins.reset();
}
}
@@ -0,0 +1,120 @@
package com.baeldung.rxjava;
import com.jakewharton.rxrelay2.BehaviorRelay;
import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.ReplayRelay;
import io.reactivex.internal.schedulers.SingleScheduler;
import io.reactivex.observers.TestObserver;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
public class RxRelayIntegrationTest {
@Test
public void whenObserverSubscribedToPublishRelay_thenItReceivesEmittedEvents () {
PublishRelay<Integer> publishRelay = PublishRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
publishRelay.subscribe(firstObserver);
firstObserver.assertSubscribed();
publishRelay.accept(5);
publishRelay.accept(10);
publishRelay.subscribe(secondObserver);
secondObserver.assertSubscribed();
publishRelay.accept(15);
//First Observer will receive all events
firstObserver.assertValues(5, 10, 15);
//Second Observer will receive only last event
secondObserver.assertValue(15);
}
@Test
public void whenObserverSubscribedToBehaviorRelayWithoutDefaultValue_thenItIsEmpty() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
TestObserver<Integer> firstObserver = new TestObserver<>();
behaviorRelay.subscribe(firstObserver);
firstObserver.assertEmpty();
}
@Test
public void whenObserverSubscribedToBehaviorRelay_thenItReceivesDefaultValue() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.createDefault(1);
TestObserver<Integer> firstObserver = new TestObserver<>();
behaviorRelay.subscribe(firstObserver);
firstObserver.assertValue(1);
}
@Test
public void whenObserverSubscribedToBehaviorRelay_thenItReceivesEmittedEvents () {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
behaviorRelay.accept(5);
behaviorRelay.subscribe(firstObserver);
behaviorRelay.accept(10);
behaviorRelay.subscribe(secondObserver);
behaviorRelay.accept(15);
firstObserver.assertValues(5, 10, 15);
secondObserver.assertValues(10, 15);
}
@Test
public void whenObserverSubscribedToReplayRelay_thenItReceivesEmittedEvents () {
ReplayRelay<Integer> replayRelay = ReplayRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
replayRelay.subscribe(firstObserver);
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.subscribe(secondObserver);
firstObserver.assertValues(5, 10, 15);
secondObserver.assertValues(5, 10, 15);
}
@Test
public void whenObserverSubscribedToReplayRelayWithLimitedSize_thenItReceivesEmittedEvents () {
ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2);
TestObserver<Integer> firstObserver = TestObserver.create();
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.accept(20);
replayRelay.subscribe(firstObserver);
firstObserver.assertValues(15, 20);
}
@Test
public void whenObserverSubscribedToReplayRelayWithMaxAge_thenItReceivesEmittedEvents () throws InterruptedException {
ReplayRelay<Integer> replayRelay = ReplayRelay.createWithTime(2000, TimeUnit.MILLISECONDS, new SingleScheduler());
TestObserver<Integer> firstObserver = TestObserver.create();
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.accept(20);
Thread.sleep(3000);
replayRelay.subscribe(firstObserver);
firstObserver.assertEmpty();
}
@Test
public void whenTwoObserversSubscribedToRandomRelay_thenOnlyOneReceivesEvent () {
RandomRelay randomRelay = new RandomRelay();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
randomRelay.subscribe(firstObserver);
randomRelay.subscribe(secondObserver);
randomRelay.accept(5);
if(firstObserver.values().isEmpty()) {
secondObserver.assertValue(5);
} else {
firstObserver.assertValue(5);
secondObserver.assertEmpty();
}
}
}
@@ -0,0 +1,138 @@
package com.baeldung.rxjava.onerror;
import io.reactivex.Observable;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.observers.TestObserver;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertTrue;
public class ExceptionHandlingIntegrationTest {
private Error UNKNOWN_ERROR = new Error("unknown error");
private Exception UNKNOWN_EXCEPTION = new Exception("unknown exception");
@Test
public void givenSubscriberAndError_whenHandleOnErrorReturn_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onErrorReturn(Throwable::getMessage)
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("unknown error");
}
@Test
public void givenSubscriberAndError_whenHandleOnErrorResume_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onErrorResumeNext(Observable.just("one", "two"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertValues("one", "two");
}
@Test
public void givenSubscriberAndError_whenHandleOnErrorResumeItem_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onErrorReturnItem("singleValue")
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("singleValue");
}
@Test
public void givenSubscriberAndError_whenHandleOnErrorResumeFunc_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onErrorResumeNext(throwable -> {
return Observable.just(throwable.getMessage(), "nextValue");
})
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertValues("unknown error", "nextValue");
}
@Test
public void givenSubscriberAndError_whenChangeStateOnError_thenErrorThrown() {
TestObserver<String> testObserver = new TestObserver<>();
final AtomicBoolean state = new AtomicBoolean(false);
Observable
.<String>error(UNKNOWN_ERROR)
.doOnError(throwable -> state.set(true))
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("state should be changed", state.get());
}
@Test
public void givenSubscriberAndError_whenExceptionOccurOnError_thenCompositeExceptionThrown() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.doOnError(throwable -> {
throw new RuntimeException("unexcepted");
})
.subscribe(testObserver);
testObserver.assertError(CompositeException.class);
testObserver.assertNotComplete();
testObserver.assertNoValues();
}
@Test
public void givenSubscriberAndException_whenHandleOnException_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_EXCEPTION)
.onExceptionResumeNext(Observable.just("exceptionResumed"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("exceptionResumed");
}
@Test
public void givenSubscriberAndError_whenHandleOnException_thenNotResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onExceptionResumeNext(Observable.just("exceptionResumed"))
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
}
}
@@ -0,0 +1,146 @@
package com.baeldung.rxjava.onerror;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertTrue;
public class OnErrorRetryIntegrationTest {
private Error UNKNOWN_ERROR = new Error("unknown error");
@Test
public void givenSubscriberAndError_whenRetryOnError_thenRetryConfirmed() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retry(1)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call twice", atomicCounter.get() == 2);
}
@Test
public void givenSubscriberAndError_whenRetryConditionallyOnError_thenRetryConfirmed() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retry((integer, throwable) -> integer < 4)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call 4 times", atomicCounter.get() == 4);
}
@Test
public void givenSubscriberAndError_whenRetryUntilOnError_thenRetryConfirmed() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(UNKNOWN_ERROR)
.retryUntil(() -> atomicCounter.incrementAndGet() > 3)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call 4 times", atomicCounter.get() == 4);
}
@Test
public void givenSubscriberAndError_whenRetryWhenOnError_thenRetryConfirmed() {
TestObserver<String> testObserver = new TestObserver<>();
Exception noretryException = new Exception("don't retry");
Observable
.<String>error(UNKNOWN_ERROR)
.retryWhen(throwableObservable -> Observable.<String>error(noretryException))
.subscribe(testObserver);
testObserver.assertError(noretryException);
testObserver.assertNotComplete();
testObserver.assertNoValues();
}
@Test
public void givenSubscriberAndError_whenRetryWhenOnError_thenCompleted() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retryWhen(throwableObservable -> Observable.empty())
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
assertTrue("should not retry", atomicCounter.get() == 0);
}
@Test
public void givenSubscriberAndError_whenRetryWhenOnError_thenResubscribed() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retryWhen(throwableObservable -> Observable.just("anything"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
assertTrue("should retry once", atomicCounter.get() == 1);
}
@Test
public void givenSubscriberAndError_whenRetryWhenForMultipleTimesOnError_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
long before = System.currentTimeMillis();
Observable
.<String>error(UNKNOWN_ERROR)
.retryWhen(throwableObservable -> throwableObservable
.zipWith(Observable.range(1, 3), (throwable, integer) -> integer)
.flatMap(integer -> {
System.out.println("retried " + integer + " times");
return Observable.timer(integer, TimeUnit.SECONDS);
}))
.blockingSubscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
long secondsElapsed = (System.currentTimeMillis() - before) / 1000;
assertTrue("6 seconds should elapse", secondsElapsed == 6);
}
}
@@ -0,0 +1,65 @@
package com.baeldung.rxjava.operators;
import io.reactivex.Observable;
import io.reactivex.schedulers.TestScheduler;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
public class RxFlatmapAndSwitchmapUnitTest {
@Test
public void givenObservable_whenFlatmap_shouldAssertAllItemsReturned() {
//given
List<String> actualOutput = new ArrayList<>();
final TestScheduler scheduler = new TestScheduler();
final List<String> keywordToSearch = Arrays.asList("b", "bo", "boo", "book", "books");
//when
Observable.fromIterable(keywordToSearch)
.flatMap(s -> Observable
.just(s + " FirstResult", s + " SecondResult")
.delay(10, TimeUnit.SECONDS, scheduler))
.toList()
.doOnSuccess(s -> actualOutput.addAll(s))
.subscribe();
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
//then
assertThat(actualOutput, hasItems("b FirstResult", "b SecondResult",
"boo FirstResult", "boo SecondResult",
"bo FirstResult", "bo SecondResult",
"book FirstResult", "book SecondResult",
"books FirstResult", "books SecondResult"));
}
@Test
public void givenObservable_whenSwitchmap_shouldAssertLatestItemReturned() {
//given
List<String> actualOutput = new ArrayList<>();
final TestScheduler scheduler = new TestScheduler();
final List<String> keywordToSearch = Arrays.asList("b", "bo", "boo", "book", "books");
//when
Observable.fromIterable(keywordToSearch)
.switchMap(s -> Observable
.just(s + " FirstResult", s + " SecondResult")
.delay(10, TimeUnit.SECONDS, scheduler))
.toList()
.doOnSuccess(s -> actualOutput.addAll(s))
.subscribe();
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
//then
assertEquals(2, actualOutput.size());
assertThat(actualOutput, hasItems("books FirstResult", "books SecondResult"));
}
}