From 13b639cdff56ee436917c6cb2160892109540eb3 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Sun, 26 Feb 2017 12:32:23 +0000 Subject: [PATCH] Renamed tests and added connectableFlux test --- reactor-core/pom.xml | 15 +++++-- .../com/baeldung/reactor/ReactorTest.java | 43 ++++++++++++------- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/reactor-core/pom.xml b/reactor-core/pom.xml index 017b59f42e..2be8892983 100644 --- a/reactor-core/pom.xml +++ b/reactor-core/pom.xml @@ -24,29 +24,36 @@ io.projectreactor reactor-core - 3.0.4.RELEASE + ${reactor-core.version} junit junit - 4.12 + ${junit.version} test org.assertj assertj-core - 3.6.1 + ${assertj.version} test ch.qos.logback logback-classic - 1.1.3 + ${logback.version} + + 3.0.4.RELEASE + 4.12 + 3.6.1 + 1.1.3 + + diff --git a/reactor-core/src/test/java/com/baeldung/reactor/ReactorTest.java b/reactor-core/src/test/java/com/baeldung/reactor/ReactorTest.java index 6c30691f26..46b774c30e 100644 --- a/reactor-core/src/test/java/com/baeldung/reactor/ReactorTest.java +++ b/reactor-core/src/test/java/com/baeldung/reactor/ReactorTest.java @@ -5,36 +5,33 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.ConnectableFlux; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.scheduler.Schedulers; -import java.time.Duration; -import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; -import java.util.stream.Stream; -import static java.time.Duration.ofSeconds; -import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; public class ReactorTest { @Test - public void givenFlux_whenSubscribing_shouldStream() throws InterruptedException { + public void givenFlux_whenSubscribing_thenStream() throws InterruptedException { List elements = new ArrayList<>(); Flux.just(1, 2, 3, 4) .log() - .map(i -> i * 2) + .map(i -> { + System.out.println(i + ":" + Thread.currentThread()); + return i * 2; + }) .subscribe(elements::add); assertThat(elements).containsExactly(2, 4, 6, 8); } @Test - public void givenFlux_whenZipping_shouldCombine() { + public void givenFlux_whenZipping_thenCombine() { List elements = new ArrayList<>(); Flux.just(1, 2, 3, 4) @@ -51,7 +48,7 @@ public class ReactorTest { } @Test - public void givenFlux_whenApplyingBackPressure_shouldPushLessElements() throws InterruptedException { + public void givenFlux_whenApplyingBackPressure_thenPushElementsInBatches() throws InterruptedException { List elements = new ArrayList<>(); @@ -92,18 +89,34 @@ public class ReactorTest { } @Test - public void givenFlux_whenInParalle_shouldSubscribeInDifferentThreads() throws InterruptedException { - List elements = new ArrayList<>(); + public void givenFlux_whenInParallel_thenSubscribeInDifferentThreads() throws InterruptedException { + List threadNames = new ArrayList<>(); Flux.just(1, 2, 3, 4) .log() - .map(i -> i * 2) + .map(i -> Thread.currentThread().getName()) .subscribeOn(Schedulers.parallel()) - .subscribe(elements::add); + .subscribe(threadNames::add); Thread.sleep(1000); - assertThat(elements).containsExactly(2, 4, 6, 8); + assertThat(threadNames).containsExactly("parallel-1", "parallel-1", "parallel-1", "parallel-1"); + } + + @Test + public void givenConnectableFlux_thenShouldStream_onConnect() { + + List elements = new ArrayList<>(); + + final ConnectableFlux publish = Flux.just(1, 2, 3, 4).publish(); + + publish.subscribe(elements::add); + + assertThat(elements).isEmpty(); + + publish.connect(); + + assertThat(elements).containsExactly(1, 2, 3, 4); } }