New unit test format

This commit is contained in:
Nick
2019-08-30 21:11:18 +01:00
parent db85c8f275
commit 6cd385e4c0
19972 changed files with 1626600 additions and 0 deletions
@@ -0,0 +1,22 @@
package com.baeldung.reactor;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.function.Consumer;
public class ItemProducerCreate {
Consumer<List<String>> listener;
public Flux<String> create() {
Flux<String> articlesFlux = Flux.create((sink) -> {
ItemProducerCreate.this.listener = (items) -> {
items.stream()
.forEach(article -> sink.next(article));
};
});
return articlesFlux;
}
}
@@ -0,0 +1,23 @@
package com.baeldung.reactor;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
public class NetworkTrafficProducerPush {
Consumer<String> listener;
public void subscribe(Consumer<String> consumer) {
Flux<String> flux = Flux.push(sink -> {
NetworkTrafficProducerPush.this.listener = (t) -> sink.next(t);
}, OverflowStrategy.DROP);
flux.subscribe(consumer);
}
public void onPacket(String packet) {
listener.accept(packet);
}
}
@@ -0,0 +1,38 @@
package com.baeldung.reactor;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.publisher.Flux;
public class ProgrammaticSequences {
public Flux<String> statefulImutableGenerate() {
return Flux.generate(() -> 1, (state, sink) -> {
sink.next("2 + " + state + " = " + (2 + state));
if (state == 101)
sink.complete();
return state + 1;
});
}
public Flux<String> statefulMutableGenerate() {
return Flux.generate(AtomicInteger::new, (state, sink) -> {
int i = state.getAndIncrement();
sink.next("2 + " + i + " = " + (2 + i));
if (i == 101)
sink.complete();
return state;
});
}
public Flux<String> handle() {
return Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.handle((i, sink) -> {
String animal = "Elephant nr " + i;
if (i % 2 == 0) {
sink.next(animal);
}
});
}
}
@@ -0,0 +1,12 @@
package com.baeldung.reactor;
import reactor.core.publisher.Flux;
public class StatelessGenerate {
public Flux<String> statelessGenerate() {
return Flux.generate((sink) -> {
sink.next("hello");
});
}
}
@@ -0,0 +1,27 @@
package com.baeldung.reactor.creation;
public class FibonacciState {
private int former;
private int latter;
public FibonacciState(int former, int latter) {
this.former = former;
this.latter = latter;
}
public int getFormer() {
return former;
}
public void setFormer(int former) {
this.former = former;
}
public int getLatter() {
return latter;
}
public void setLatter(int latter) {
this.latter = latter;
}
}
@@ -0,0 +1,14 @@
package com.baeldung.reactor.creation;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.function.Consumer;
public class SequenceCreator {
public Consumer<List<Integer>> consumer;
public Flux<Integer> createNumberSequence() {
return Flux.create(sink -> SequenceCreator.this.consumer = items -> items.forEach(sink::next));
}
}
@@ -0,0 +1,31 @@
package com.baeldung.reactor.creation;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;
public class SequenceGenerator {
public Flux<Integer> generateFibonacciWithTuples() {
return Flux.generate(
() -> Tuples.of(0, 1),
(state, sink) -> {
sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
}
);
}
public Flux<Integer> generateFibonacciWithCustomClass(int limit) {
return Flux.generate(
() -> new FibonacciState(0, 1),
(state, sink) -> {
sink.next(state.getFormer());
if (state.getLatter() > limit) {
sink.complete();
}
int temp = state.getFormer();
state.setFormer(state.getLatter());
state.setLatter(temp + state.getLatter());
return state;
});
}
}
@@ -0,0 +1,13 @@
package com.baeldung.reactor.creation;
import reactor.core.publisher.Flux;
public class SequenceHandler {
public Flux<Integer> handleIntegerSequence(Flux<Integer> sequence) {
return sequence.handle((number, sink) -> {
if (number % 2 == 0) {
sink.next(number / 2);
}
});
}
}
@@ -0,0 +1,33 @@
package com.baeldung.reactor;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class ItemProducerCreateUnitTest {
@Test
public void givenFluxWithAsynchronousCreate_whenProduceItemsFromDifferentThread_thenAllItemsAreCollectedByTheSubscriber() throws InterruptedException {
List<String> elements = new ArrayList<>();
ItemProducerCreate producer = new ItemProducerCreate();
producer.create()
.subscribe(elements::add);
Thread producerThread = new Thread(() -> {
List<String> items = new ArrayList<>();
items.add("Item 1");
items.add("Item 2");
items.add("Item 3");
producer.listener.accept(items);
});
producerThread.start();
producerThread.join();
assertThat(elements).containsExactly("Item 1", "Item 2", "Item 3");
}
}
@@ -0,0 +1,23 @@
package com.baeldung.reactor;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class NetworkTrafficProducerPushUnitTest {
@Test
public void givenFluxWithAsynchronousPushWithListener_whenListenerIsInvoked_thenItemCollectedByTheSubscriber() throws InterruptedException {
List<String> elements = new ArrayList<>();
NetworkTrafficProducerPush trafficProducer = new NetworkTrafficProducerPush();
trafficProducer.subscribe(elements::add);
trafficProducer.onPacket("Packet[A18]");
assertThat(elements).containsExactly("Packet[A18]");
}
}
@@ -0,0 +1,42 @@
package com.baeldung.reactor;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class ProgrammaticSequencesUnitTest {
@Test
public void givenFluxWithStatefulImmutableGenerate_whenSubscribeAddItemsToCollect_thenAllItemsAreCollectedByTheSubscriber() throws InterruptedException {
List<String> elements = new ArrayList<>();
ProgrammaticSequences producer = new ProgrammaticSequences();
producer.statefulImutableGenerate()
.subscribe(elements::add);
assertThat(elements).hasSize(101);
assertThat(elements).contains("2 + 1 = 3", "2 + 101 = 103");
}
@Test
public void givenFluxWithStatefulMutableGenerate_whenSubscribeAddItemsToCollect_thenAllItemsAreCollectedByTheSubscriber() throws InterruptedException {
List<String> elements = new ArrayList<>();
ProgrammaticSequences producer = new ProgrammaticSequences();
producer.statefulMutableGenerate()
.subscribe(elements::add);
assertThat(elements).hasSize(102);
assertThat(elements).contains("2 + 0 = 2", "2 + 101 = 103");
}
@Test
public void givenFluxWithHandle_whenSubscribeAddItemsToCollect_thenAllItemsAreCollectedByTheSubscriber() throws InterruptedException {
List<String> elements = new ArrayList<>();
ProgrammaticSequences producer = new ProgrammaticSequences();
producer.handle()
.subscribe(elements::add);
assertThat(elements).hasSize(5);
assertThat(elements).contains("Elephant nr 2", "Elephant nr 10");
}
}
@@ -0,0 +1,122 @@
package com.baeldung.reactor;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class ReactorIntegrationTest {
@Test
public void givenFlux_whenSubscribing_thenStream() throws InterruptedException {
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> {
System.out.println(i + ":" + Thread.currentThread());
return i * 2;
})
.subscribe(elements::add);
assertThat(elements).containsExactly(2, 4, 6, 8);
}
@Test
public void givenFlux_whenZipping_thenCombine() {
List<String> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE).log(), (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))
.subscribe(elements::add);
assertThat(elements).containsExactly(
"First Flux: 2, Second Flux: 0",
"First Flux: 4, Second Flux: 1",
"First Flux: 6, Second Flux: 2",
"First Flux: 8, Second Flux: 3");
}
@Test
public void givenFlux_whenApplyingBackPressure_thenPushElementsInBatches() throws InterruptedException {
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.onBackpressureBuffer()
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int onNextAmount;
@Override
public void onSubscribe(final Subscription s) {
this.s = s;
s.request(2);
}
@Override
public void onNext(final Integer integer) {
elements.add(integer);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}
@Override
public void onError(final Throwable t) {
}
@Override
public void onComplete() {
int ham = 2;
}
});
assertThat(elements).containsExactly(2, 4, 6, 8);
}
@Test
public void givenFlux_whenInParallel_thenSubscribeInDifferentThreads() throws InterruptedException {
List<String> threadNames = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> Thread.currentThread().getName())
.subscribeOn(Schedulers.parallel())
.subscribe(threadNames::add);
Thread.sleep(1000);
assertThat(threadNames).containsExactly("parallel-1", "parallel-1", "parallel-1", "parallel-1");
}
@Test
public void givenConnectableFlux_whenConnected_thenShouldStream() {
List<Integer> elements = new ArrayList<>();
final ConnectableFlux<Integer> publish = Flux.just(1, 2, 3, 4).publish();
publish.subscribe(elements::add);
assertThat(elements).isEmpty();
publish.connect();
assertThat(elements).containsExactly(1, 2, 3, 4);
}
}
@@ -0,0 +1,180 @@
package com.baeldung.reactor.core;
import java.time.Duration;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
public class CombiningPublishersIntegrationTest {
private static Integer min = 1;
private static Integer max = 5;
private static Flux<Integer> evenNumbers = Flux.range(min, max).filter(x -> x % 2 == 0);
private static Flux<Integer> oddNumbers = Flux.range(min, max).filter(x -> x % 2 > 0);
@Test
public void givenFluxes_whenMergeDelayErrorIsInvoked_thenMergeDelayError() {
Flux<Integer> fluxOfIntegers = Flux.mergeDelayError(1,
evenNumbers.delayElements(Duration.ofMillis(2000L)),
oddNumbers.delayElements(Duration.ofMillis(1000L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
}
/*@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers.delayElements(Duration.ofMillis(2000L)),
oddNumbers.delayElements(Duration.ofMillis(1000L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
}*/
@Test
public void givenFluxes_whenConcatIsInvoked_thenConcat() {
Flux<Integer> fluxOfIntegers = Flux.concat(
evenNumbers.delayElements(Duration.ofMillis(2000L)),
oddNumbers.delayElements(Duration.ofMillis(1000L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
@Test
public void givenFluxes_whenMergeIsInvoked_thenMerge() {
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
@Test
public void givenFluxes_whenConcatWithIsInvoked_thenConcatWith() {
Flux<Integer> fluxOfIntegers = evenNumbers
.concatWith(oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
@Test
public void givenFluxes_whenCombineLatestIsInvoked_thenCombineLatest() {
Flux<Integer> fluxOfIntegers = Flux.combineLatest(
evenNumbers,
oddNumbers,
(a, b) -> a + b);
StepVerifier.create(fluxOfIntegers)
.expectNext(5)
.expectNext(7)
.expectNext(9)
.expectComplete()
.verify();
}
@Test
public void givenFluxes_whenCombineLatestIsInvoked_thenCombineLatest1() {
StepVerifier.create(Flux.combineLatest(obj -> (int) obj[1], evenNumbers, oddNumbers))
.expectNext(1)
.expectNext(3)
.expectNext(5)
.verifyComplete();
}
@Test
public void givenFluxes_whenMergeSequentialIsInvoked_thenMergeSequential() {
Flux<Integer> fluxOfIntegers = Flux.mergeSequential(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
@Test
public void givenFluxes_whenMergeWithIsInvoked_thenMergeWith() {
Flux<Integer> fluxOfIntegers = evenNumbers.mergeWith(oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
@Test
public void givenFluxes_whenZipIsInvoked_thenZip() {
Flux<Integer> fluxOfIntegers = Flux.zip(
evenNumbers,
oddNumbers,
(a, b) -> a + b);
StepVerifier.create(fluxOfIntegers)
.expectNext(3)
.expectNext(7)
.expectComplete()
.verify();
}
@Test
public void givenFluxes_whenZipWithIsInvoked_thenZipWith() {
Flux<Integer> fluxOfIntegers = evenNumbers
.zipWith(oddNumbers,
(a, b) -> a * b);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(12)
.expectComplete()
.verify();
}
}
@@ -0,0 +1,70 @@
package com.baeldung.reactor.creation;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class SequenceUnitTest {
@Test
public void whenGeneratingNumbersWithTuplesState_thenFibonacciSequenceIsProduced() {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
Flux<Integer> fibonacciFlux = sequenceGenerator.generateFibonacciWithTuples().take(5);
StepVerifier.create(fibonacciFlux)
.expectNext(0, 1, 1, 2, 3)
.expectComplete()
.verify();
}
@Test
public void whenGeneratingNumbersWithCustomClass_thenFibonacciSequenceIsProduced() {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
StepVerifier.create(sequenceGenerator.generateFibonacciWithCustomClass(10))
.expectNext(0, 1, 1, 2, 3, 5, 8)
.expectComplete()
.verify();
}
@Test
public void whenCreatingNumbers_thenSequenceIsProducedAsynchronously() throws InterruptedException {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
List<Integer> sequence1 = sequenceGenerator.generateFibonacciWithTuples().take(3).collectList().block();
List<Integer> sequence2 = sequenceGenerator.generateFibonacciWithTuples().take(4).collectList().block();
SequenceCreator sequenceCreator = new SequenceCreator();
Thread producingThread1 = new Thread(
() -> sequenceCreator.consumer.accept(sequence1)
);
Thread producingThread2 = new Thread(
() -> sequenceCreator.consumer.accept(sequence2)
);
List<Integer> consolidated = new ArrayList<>();
sequenceCreator.createNumberSequence().subscribe(consolidated::add);
producingThread1.start();
producingThread2.start();
producingThread1.join();
producingThread2.join();
assertThat(consolidated).containsExactlyInAnyOrder(0, 1, 1, 0, 1, 1, 2);
}
@Test
public void whenHandlingNumbers_thenSequenceIsMappedAndFiltered() {
SequenceHandler sequenceHandler = new SequenceHandler();
SequenceGenerator sequenceGenerator = new SequenceGenerator();
Flux<Integer> sequence = sequenceGenerator.generateFibonacciWithTuples().take(10);
StepVerifier.create(sequenceHandler.handleIntegerSequence(sequence))
.expectNext(0, 1, 4, 17)
.expectComplete()
.verify();
}
}