Added reactor core

This commit is contained in:
Andrew Morgan
2017-02-22 20:32:38 +00:00
parent 86029a3487
commit 73026b6565
3 changed files with 160 additions and 0 deletions
@@ -0,0 +1,107 @@
package com.baeldung.reactor;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import static java.time.Duration.ofSeconds;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
public class ReactorTest {
@Test
public void givenFlux_whenSubscribing_shouldStream() throws InterruptedException {
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribe(elements::add);
assertThat(elements).containsExactly(2, 4, 6, 8);
}
@Test
public void givenFlux_whenZipping_shouldCombine() {
List<String> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE).log(), (two, one) -> String.format("First Flux: %d, Second Flux: %d", one, two))
.subscribe(elements::add);
assertThat(elements).containsExactly(
"First Flux: 0, Second Flux: 2",
"First Flux: 1, Second Flux: 4",
"First Flux: 2, Second Flux: 6",
"First Flux: 3, Second Flux: 8");
}
@Test
public void givenFlux_whenApplyingBackPressure_shouldPushLessElements() throws InterruptedException {
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.onBackpressureBuffer()
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int onNextAmount;
@Override
public void onSubscribe(final Subscription s) {
this.s = s;
s.request(2);
}
@Override
public void onNext(final Integer integer) {
elements.add(integer);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}
@Override
public void onError(final Throwable t) {
}
@Override
public void onComplete() {
int ham = 2;
}
});
assertThat(elements).containsExactly(2, 4, 6, 8);
}
@Test
public void givenFlux_whenInParalle_shouldSubscribeInDifferentThreads() {
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
assertThat(elements).containsExactly(2, 4, 6, 8);
}
}