diff --git a/rxjava-2/pom.xml b/rxjava-2/pom.xml index 3b4c167f3a..3038c20037 100644 --- a/rxjava-2/pom.xml +++ b/rxjava-2/pom.xml @@ -1,45 +1,53 @@ - - 4.0.0 - rxjava-2 - 1.0-SNAPSHOT - rxjava-2 - - - com.baeldung - parent-java - 0.0.1-SNAPSHOT - ../parent-java - + + 4.0.0 - - - io.reactivex.rxjava2 - rxjava - ${rx.java2.version} - - - com.jayway.awaitility - awaitility - ${awaitility.version} - - - org.assertj - assertj-core - ${assertj.version} - - - com.jakewharton.rxrelay2 - rxrelay - ${rxrelay.version} - - + rxjava-2 + 1.0-SNAPSHOT - - 3.8.0 - 2.2.2 - 1.7.0 - 2.0.0 - + + com.baeldung + parent-java + 0.0.1-SNAPSHOT + ../parent-java + + + + + io.reactivex.rxjava2 + rxjava + ${rx.java2.version} + + + com.jayway.awaitility + awaitility + ${awaitility.version} + + + org.assertj + assertj-core + ${assertj.version} + + + com.jakewharton.rxrelay2 + rxrelay + ${rxrelay.version} + + + + com.github.akarnokd + rxjava2-extensions + ${rxjava2.ext.version} + + + + + 3.8.0 + 2.2.2 + 1.7.0 + 2.0.0 + 0.20.4 + \ No newline at end of file diff --git a/rxjava-2/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java b/rxjava-2/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java new file mode 100644 index 0000000000..90f4fe94ae --- /dev/null +++ b/rxjava-2/src/test/java/com/baeldung/rxjava/AsyncAndSyncToObservableIntegrationTest.java @@ -0,0 +1,107 @@ +package com.baeldung.rxjava; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import hu.akarnokd.rxjava2.async.AsyncObservable; +import io.reactivex.Observable; + +public class AsyncAndSyncToObservableIntegrationTest { + + AtomicInteger counter = new AtomicInteger(); + Callable callable = () -> counter.incrementAndGet(); + + /* Method will execute every time it gets subscribed*/ + @Test + public void givenSyncMethod_whenConvertedWithFromCallable_thenReturnObservable() { + + Observable source = Observable.fromCallable(callable); + + for (int i = 1; i < 5; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(i); + + assertEquals(i, counter.get()); + } + } + + /* Method will execute only once and cache its result.*/ + @Test + public void givenSyncMethod_whenConvertedWithStart_thenReturnObservable() { + + Observable source = AsyncObservable.start(callable); + + for (int i = 1; i < 5; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + + assertEquals(1, counter.get()); + } + } + + /* Method will execute only once and cache its result.*/ + @Test + public void givenAsyncMethod_whenConvertedWithFromFuture_thenRetrunObservble() { + + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = executor.submit(callable); + Observable source = Observable.fromFuture(future); + + for (int i = 1; i < 5; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + + assertEquals(1, counter.get()); + } + + executor.shutdown(); + } + + /* Method will execute every time it gets subscribed*/ + @Test + public void givenAsyncMethod_whenConvertedWithStartFuture_thenRetrunObservble() { + + ExecutorService executor = Executors.newSingleThreadExecutor(); + Observable source = AsyncObservable.startFuture(() -> executor.submit(callable)); + + for (int i = 1; i < 5; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(i); + + assertEquals(i, counter.get()); + } + + executor.shutdown(); + } + + /*Method will execute only once and cache its result.*/ + @Test + public void givenAsyncMethod_whenConvertedWithDeferFuture_thenRetrunObservble() { + List list = Arrays.asList(new Integer[] { counter.incrementAndGet(), counter.incrementAndGet(), counter.incrementAndGet() }); + ExecutorService exec = Executors.newSingleThreadExecutor(); + Callable> callable = () -> Observable.fromIterable(list); + Observable source = AsyncObservable.deferFuture(() -> exec.submit(callable)); + for (int i = 1; i < 4; i++) { + source.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3); + } + + exec.shutdown(); + } + +}