diff --git a/reactor-core/pom.xml b/reactor-core/pom.xml
index 017b59f42e..2be8892983 100644
--- a/reactor-core/pom.xml
+++ b/reactor-core/pom.xml
@@ -24,29 +24,36 @@
io.projectreactor
reactor-core
- 3.0.4.RELEASE
+ ${reactor-core.version}
junit
junit
- 4.12
+ ${junit.version}
test
org.assertj
assertj-core
- 3.6.1
+ ${assertj.version}
test
ch.qos.logback
logback-classic
- 1.1.3
+ ${logback.version}
+
+ 3.0.4.RELEASE
+ 4.12
+ 3.6.1
+ 1.1.3
+
+
diff --git a/reactor-core/src/test/java/com/baeldung/reactor/ReactorTest.java b/reactor-core/src/test/java/com/baeldung/reactor/ReactorTest.java
index 6c30691f26..46b774c30e 100644
--- a/reactor-core/src/test/java/com/baeldung/reactor/ReactorTest.java
+++ b/reactor-core/src/test/java/com/baeldung/reactor/ReactorTest.java
@@ -5,36 +5,33 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
-import java.time.Duration;
-import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Stream;
-import static java.time.Duration.ofSeconds;
-import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
public class ReactorTest {
@Test
- public void givenFlux_whenSubscribing_shouldStream() throws InterruptedException {
+ public void givenFlux_whenSubscribing_thenStream() throws InterruptedException {
List elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
- .map(i -> i * 2)
+ .map(i -> {
+ System.out.println(i + ":" + Thread.currentThread());
+ return i * 2;
+ })
.subscribe(elements::add);
assertThat(elements).containsExactly(2, 4, 6, 8);
}
@Test
- public void givenFlux_whenZipping_shouldCombine() {
+ public void givenFlux_whenZipping_thenCombine() {
List elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
@@ -51,7 +48,7 @@ public class ReactorTest {
}
@Test
- public void givenFlux_whenApplyingBackPressure_shouldPushLessElements() throws InterruptedException {
+ public void givenFlux_whenApplyingBackPressure_thenPushElementsInBatches() throws InterruptedException {
List elements = new ArrayList<>();
@@ -92,18 +89,34 @@ public class ReactorTest {
}
@Test
- public void givenFlux_whenInParalle_shouldSubscribeInDifferentThreads() throws InterruptedException {
- List elements = new ArrayList<>();
+ public void givenFlux_whenInParallel_thenSubscribeInDifferentThreads() throws InterruptedException {
+ List threadNames = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
- .map(i -> i * 2)
+ .map(i -> Thread.currentThread().getName())
.subscribeOn(Schedulers.parallel())
- .subscribe(elements::add);
+ .subscribe(threadNames::add);
Thread.sleep(1000);
- assertThat(elements).containsExactly(2, 4, 6, 8);
+ assertThat(threadNames).containsExactly("parallel-1", "parallel-1", "parallel-1", "parallel-1");
+ }
+
+ @Test
+ public void givenConnectableFlux_thenShouldStream_onConnect() {
+
+ List elements = new ArrayList<>();
+
+ final ConnectableFlux publish = Flux.just(1, 2, 3, 4).publish();
+
+ publish.subscribe(elements::add);
+
+ assertThat(elements).isEmpty();
+
+ publish.connect();
+
+ assertThat(elements).containsExactly(1, 2, 3, 4);
}
}