From cafee5736f33378a842dff350b68ebaf0316f94b Mon Sep 17 00:00:00 2001 From: Dominik Date: Mon, 8 Jan 2018 00:06:53 +0100 Subject: [PATCH 1/3] BAEL-1409: Rxjava2 Flowable Tests --- rxjava/pom.xml | 7 ++ .../com/baeldung/rxjava/FlowableTest.java | 94 +++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java diff --git a/rxjava/pom.xml b/rxjava/pom.xml index 0f950914ff..a6c5e9d2fb 100644 --- a/rxjava/pom.xml +++ b/rxjava/pom.xml @@ -31,6 +31,13 @@ 1.0.0 + + junit + junit + 4.12 + test + + com.jayway.awaitility awaitility diff --git a/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java b/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java new file mode 100644 index 0000000000..71d4f87b35 --- /dev/null +++ b/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java @@ -0,0 +1,94 @@ +package com.baeldung.rxjava; + +import io.reactivex.BackpressureStrategy; +import io.reactivex.Flowable; +import io.reactivex.FlowableOnSubscribe; +import io.reactivex.Observable; +import io.reactivex.exceptions.MissingBackpressureException; +import io.reactivex.exceptions.OnErrorNotImplementedException; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.TestSubscriber; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class FlowableTest { + + @Test public void whenFlowableIsCreated_thenItIsProperlyInitialized() { + Flowable integerFlowable = Flowable.just(1, 2, 3, 4); + assertNotNull(integerFlowable); + } + + @Test public void whenFlowableIsCreatedFromObservable_thenItIsProperlyInitialized() throws InterruptedException { + Observable integerObservable = Observable.just(1, 2, 3); + Flowable integerFlowable = integerObservable.toFlowable(BackpressureStrategy.BUFFER); + assertNotNull(integerFlowable); + + } + + @Test public void whenFlowableIsCreatedFromFlowableOnSubscribe_thenItIsProperlyInitialized() throws InterruptedException { + FlowableOnSubscribe flowableOnSubscribe = flowableEmitter -> { + flowableEmitter.onNext(1); + }; + Flowable integerFlowable = Flowable.create(flowableOnSubscribe, BackpressureStrategy.BUFFER); + assertNotNull(integerFlowable); + } + + @Test public void givenFlowableWithBufferStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenAllValuesAreBufferedAndReceived() throws InterruptedException { + List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); + List listToFill = new ArrayList(); + + Observable observable = Observable.fromIterable(testList); + observable.toFlowable(BackpressureStrategy.BUFFER).observeOn(Schedulers.computation()).subscribe(listToFill::add); + Thread.sleep(5000); + assertEquals(testList, listToFill); + } + + @Test public void givenFlowableWithDropStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenNotAllValuesAreReceived() throws InterruptedException { + List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); + List listToFill = new ArrayList(); + + Observable observable = Observable.fromIterable(testList); + observable.toFlowable(BackpressureStrategy.DROP).observeOn(Schedulers.computation()).subscribe(listToFill::add); + Thread.sleep(5000); + assertThat(listToFill.size() < testList.size()); + assertThat(!listToFill.contains(100000)); + } + + @Test + public void givenFlowableWithMissingStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenExceptionIsThrown() throws InterruptedException { + Observable observable = Observable.range(1, 100000); + TestSubscriber subscriber =observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test(); + subscriber.awaitTerminalEvent(); + subscriber.assertError(MissingBackpressureException.class); + } + + @Test + public void givenFlowableWithErrorStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenExceptionIsThrown() throws InterruptedException { + Observable observable = Observable.range(1, 100000); + TestSubscriber subscriber =observable.toFlowable(BackpressureStrategy.ERROR).observeOn(Schedulers.computation()).test(); + + subscriber.awaitTerminalEvent(); + subscriber.assertError(MissingBackpressureException.class); + } + + @Test + public void givenFlowableWithLatestStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenExceptionIsThrown() throws InterruptedException { + List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); + List listToFill = new ArrayList(); + + Observable observable = Observable.fromIterable(testList); + observable.toFlowable(BackpressureStrategy.LATEST).observeOn(Schedulers.computation()).subscribe(listToFill::add); + Thread.sleep(6000); + assertThat(listToFill.size() < testList.size()); + assertThat(listToFill.contains(100000)); + } + +} \ No newline at end of file From 71abd78dff6065b5ceb6c96ce8eae0a857141355 Mon Sep 17 00:00:00 2001 From: Dominik Date: Wed, 10 Jan 2018 22:57:28 +0100 Subject: [PATCH 2/3] Fixes. --- .../com/baeldung/rxjava/FlowableTest.java | 79 +++++++++---------- 1 file changed, 39 insertions(+), 40 deletions(-) diff --git a/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java b/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java index 71d4f87b35..26b17ec163 100644 --- a/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java +++ b/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java @@ -5,12 +5,11 @@ import io.reactivex.Flowable; import io.reactivex.FlowableOnSubscribe; import io.reactivex.Observable; import io.reactivex.exceptions.MissingBackpressureException; -import io.reactivex.exceptions.OnErrorNotImplementedException; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; + import org.junit.Test; -import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -34,61 +33,61 @@ public class FlowableTest { } @Test public void whenFlowableIsCreatedFromFlowableOnSubscribe_thenItIsProperlyInitialized() throws InterruptedException { - FlowableOnSubscribe flowableOnSubscribe = flowableEmitter -> { - flowableEmitter.onNext(1); - }; + FlowableOnSubscribe flowableOnSubscribe = flowableEmitter -> flowableEmitter.onNext(1); Flowable integerFlowable = Flowable.create(flowableOnSubscribe, BackpressureStrategy.BUFFER); assertNotNull(integerFlowable); } - @Test public void givenFlowableWithBufferStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenAllValuesAreBufferedAndReceived() throws InterruptedException { + @Test public void whenFlowableUsesBufferStragegy_thenOnBackpressureAllValuesAreBufferedAndReceived() { + List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); + Observable observable = Observable.fromIterable(testList); + TestSubscriber testSubscriber = observable.toFlowable(BackpressureStrategy.BUFFER).observeOn(Schedulers.computation()).test(); + + testSubscriber.awaitTerminalEvent(); + + List receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList()); + + assertEquals(testList, receivedInts); + } + + @Test public void whenFlowableUsesDropStrategy_thenOnBackpressureNotAllValuesAreReceivedAndTheLastElementIsNotReceived() { List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); - List listToFill = new ArrayList(); Observable observable = Observable.fromIterable(testList); - observable.toFlowable(BackpressureStrategy.BUFFER).observeOn(Schedulers.computation()).subscribe(listToFill::add); - Thread.sleep(5000); - assertEquals(testList, listToFill); + TestSubscriber testSubscriber = observable.toFlowable(BackpressureStrategy.DROP).observeOn(Schedulers.computation()).test(); + testSubscriber.awaitTerminalEvent(); + List receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList()); + + assertThat(receivedInts.size() < testList.size()); + assertThat(!receivedInts.contains(100000)); } - @Test public void givenFlowableWithDropStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenNotAllValuesAreReceived() throws InterruptedException { - List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); - List listToFill = new ArrayList(); - - Observable observable = Observable.fromIterable(testList); - observable.toFlowable(BackpressureStrategy.DROP).observeOn(Schedulers.computation()).subscribe(listToFill::add); - Thread.sleep(5000); - assertThat(listToFill.size() < testList.size()); - assertThat(!listToFill.contains(100000)); - } - - @Test - public void givenFlowableWithMissingStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenExceptionIsThrown() throws InterruptedException { + @Test public void whenFlowableUsesMissingStrategy_thenExceptionIsThrownOnBackpressure() { Observable observable = Observable.range(1, 100000); - TestSubscriber subscriber =observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test(); - subscriber.awaitTerminalEvent(); - subscriber.assertError(MissingBackpressureException.class); - } - - @Test - public void givenFlowableWithErrorStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenExceptionIsThrown() throws InterruptedException { - Observable observable = Observable.range(1, 100000); - TestSubscriber subscriber =observable.toFlowable(BackpressureStrategy.ERROR).observeOn(Schedulers.computation()).test(); + TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test(); subscriber.awaitTerminalEvent(); subscriber.assertError(MissingBackpressureException.class); } - @Test - public void givenFlowableWithLatestStrategy_whenSourceEmitsFasterThanConsumerConsumes_thenExceptionIsThrown() throws InterruptedException { - List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); - List listToFill = new ArrayList(); + @Test public void whenFlowableUsesErrorStrategy_thenExceptionIsThrownOnBackpressure() { + Observable observable = Observable.range(1, 100000); + TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.ERROR).observeOn(Schedulers.computation()).test(); + subscriber.awaitTerminalEvent(); + subscriber.assertError(MissingBackpressureException.class); + } + + @Test public void whenFlowableUsesLatesStrategy_thenNotElementsAreReceivedButTheLastElementIs() { + List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); Observable observable = Observable.fromIterable(testList); - observable.toFlowable(BackpressureStrategy.LATEST).observeOn(Schedulers.computation()).subscribe(listToFill::add); - Thread.sleep(6000); - assertThat(listToFill.size() < testList.size()); - assertThat(listToFill.contains(100000)); + TestSubscriber testSubscriber = observable.toFlowable(BackpressureStrategy.LATEST).observeOn(Schedulers.computation()).test(); + + testSubscriber.awaitTerminalEvent(); + List receivedInts = testSubscriber.getEvents().get(0).stream().mapToInt(object -> (int) object).boxed().collect(Collectors.toList()); + + assertThat(receivedInts.size() < testList.size()); + assertThat(receivedInts.contains(100000)); } } \ No newline at end of file From 9ba2cebdfd294a25567d27d54ee00aaaabab68c1 Mon Sep 17 00:00:00 2001 From: Dominik Date: Mon, 19 Feb 2018 00:26:27 +0100 Subject: [PATCH 3/3] Changed test names. --- .../test/java/com/baeldung/rxjava/FlowableTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java b/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java index 26b17ec163..b9d1d64c24 100644 --- a/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java +++ b/rxjava/src/test/java/com/baeldung/rxjava/FlowableTest.java @@ -38,7 +38,7 @@ public class FlowableTest { assertNotNull(integerFlowable); } - @Test public void whenFlowableUsesBufferStragegy_thenOnBackpressureAllValuesAreBufferedAndReceived() { + @Test public void thenAllValuesAreBufferedAndReceived() { List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); Observable observable = Observable.fromIterable(testList); TestSubscriber testSubscriber = observable.toFlowable(BackpressureStrategy.BUFFER).observeOn(Schedulers.computation()).test(); @@ -50,7 +50,7 @@ public class FlowableTest { assertEquals(testList, receivedInts); } - @Test public void whenFlowableUsesDropStrategy_thenOnBackpressureNotAllValuesAreReceivedAndTheLastElementIsNotReceived() { + @Test public void whenDropStrategyUsed_thenOnBackpressureDropped() { List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); Observable observable = Observable.fromIterable(testList); @@ -62,7 +62,7 @@ public class FlowableTest { assertThat(!receivedInts.contains(100000)); } - @Test public void whenFlowableUsesMissingStrategy_thenExceptionIsThrownOnBackpressure() { + @Test public void whenMissingStrategyUsed_thenException() { Observable observable = Observable.range(1, 100000); TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test(); @@ -70,15 +70,15 @@ public class FlowableTest { subscriber.assertError(MissingBackpressureException.class); } - @Test public void whenFlowableUsesErrorStrategy_thenExceptionIsThrownOnBackpressure() { - Observable observable = Observable.range(1, 100000); + @Test public void whenErrorStrategyUsed_thenExceptionIsThrown() { + Observable observable = Observable.range(1, 100000); TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.ERROR).observeOn(Schedulers.computation()).test(); subscriber.awaitTerminalEvent(); subscriber.assertError(MissingBackpressureException.class); } - @Test public void whenFlowableUsesLatesStrategy_thenNotElementsAreReceivedButTheLastElementIs() { + @Test public void whenLatestStrategyUsed_thenTheLastElementReceived() { List testList = IntStream.range(0, 100000).boxed().collect(Collectors.toList()); Observable observable = Observable.fromIterable(testList); TestSubscriber testSubscriber = observable.toFlowable(BackpressureStrategy.LATEST).observeOn(Schedulers.computation()).test();