From 73026b65653fe38fded09d85212d108f5b43ab27 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 22 Feb 2017 20:32:38 +0000 Subject: [PATCH] Added reactor core --- pom.xml | 1 + reactor-core/pom.xml | 52 +++++++++ .../com/baeldung/reactor/ReactorTest.java | 107 ++++++++++++++++++ 3 files changed, 160 insertions(+) create mode 100644 reactor-core/pom.xml create mode 100644 reactor-core/src/test/java/com/baeldung/reactor/ReactorTest.java diff --git a/pom.xml b/pom.xml index 9d6c5931e3..014e4016c5 100644 --- a/pom.xml +++ b/pom.xml @@ -94,6 +94,7 @@ querydsl + reactor-core redis rest-assured rest-testing diff --git a/reactor-core/pom.xml b/reactor-core/pom.xml new file mode 100644 index 0000000000..017b59f42e --- /dev/null +++ b/reactor-core/pom.xml @@ -0,0 +1,52 @@ + + 4.0.0 + + org.baeldung + reactor-core + 0.0.1-SNAPSHOT + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + + + + io.projectreactor + reactor-core + 3.0.4.RELEASE + + + + junit + junit + 4.12 + test + + + + org.assertj + assertj-core + 3.6.1 + test + + + + ch.qos.logback + logback-classic + 1.1.3 + + + + + diff --git a/reactor-core/src/test/java/com/baeldung/reactor/ReactorTest.java b/reactor-core/src/test/java/com/baeldung/reactor/ReactorTest.java new file mode 100644 index 0000000000..a90346803e --- /dev/null +++ b/reactor-core/src/test/java/com/baeldung/reactor/ReactorTest.java @@ -0,0 +1,107 @@ +package com.baeldung.reactor; + +import org.junit.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.publisher.ConnectableFlux; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.scheduler.Schedulers; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +import static java.time.Duration.ofSeconds; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; + +public class ReactorTest { + + @Test + public void givenFlux_whenSubscribing_shouldStream() throws InterruptedException { + + List elements = new ArrayList<>(); + + Flux.just(1, 2, 3, 4) + .log() + .map(i -> i * 2) + .subscribe(elements::add); + + assertThat(elements).containsExactly(2, 4, 6, 8); + } + + @Test + public void givenFlux_whenZipping_shouldCombine() { + List elements = new ArrayList<>(); + + Flux.just(1, 2, 3, 4) + .log() + .map(i -> i * 2) + .zipWith(Flux.range(0, Integer.MAX_VALUE).log(), (two, one) -> String.format("First Flux: %d, Second Flux: %d", one, two)) + .subscribe(elements::add); + + assertThat(elements).containsExactly( + "First Flux: 0, Second Flux: 2", + "First Flux: 1, Second Flux: 4", + "First Flux: 2, Second Flux: 6", + "First Flux: 3, Second Flux: 8"); + } + + @Test + public void givenFlux_whenApplyingBackPressure_shouldPushLessElements() throws InterruptedException { + + List elements = new ArrayList<>(); + + Flux.just(1, 2, 3, 4) + .log() + .map(i -> i * 2) + .onBackpressureBuffer() + .subscribe(new Subscriber() { + private Subscription s; + int onNextAmount; + + @Override + public void onSubscribe(final Subscription s) { + this.s = s; + s.request(2); + } + + @Override + public void onNext(final Integer integer) { + elements.add(integer); + onNextAmount++; + if (onNextAmount % 2 == 0) { + s.request(2); + } + } + + @Override + public void onError(final Throwable t) { + } + + @Override + public void onComplete() { + int ham = 2; + } + }); + + assertThat(elements).containsExactly(2, 4, 6, 8); + } + + @Test + public void givenFlux_whenInParalle_shouldSubscribeInDifferentThreads() { + List elements = new ArrayList<>(); + + Flux.just(1, 2, 3, 4) + .log() + .map(i -> i * 2) + .subscribeOn(Schedulers.parallel()) + .subscribe(elements::add); + + assertThat(elements).containsExactly(2, 4, 6, 8); + } + +}