From b33baa051263de3944a2dedaeca337cb1cff8d40 Mon Sep 17 00:00:00 2001 From: Haroon Khan Date: Fri, 1 Apr 2022 13:17:58 +0100 Subject: [PATCH] [JAVA-8147] Update and cleanup code for debugging reactive streams --- .../ChronJobs.java => cronjobs/CronJobs.java} | 130 +++++++-------- .../debugging/consumer/model/Foo.java | 20 ++- .../debugging/consumer/model/FooDto.java | 7 +- .../consumer/service/FooNameHelper.java | 50 +++--- .../consumer/service/FooQuantityHelper.java | 28 ++-- .../consumer/service/FooReporter.java | 20 +-- .../consumer/service/FooService.java | 151 +++++++++--------- .../server/handlers/ServerHandler.java | 29 ++-- .../ConsumerFooServiceIntegrationTest.java | 20 +-- .../consumer/ConsumerFooServiceLiveTest.java | 3 - .../consumer/utils/ListAppender.java | 8 +- 11 files changed, 227 insertions(+), 239 deletions(-) rename spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/{chronjobs/ChronJobs.java => cronjobs/CronJobs.java} (52%) diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/chronjobs/ChronJobs.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/cronjobs/CronJobs.java similarity index 52% rename from spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/chronjobs/ChronJobs.java rename to spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/cronjobs/CronJobs.java index b58648ff9d..e71892d4e7 100644 --- a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/chronjobs/ChronJobs.java +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/cronjobs/CronJobs.java @@ -1,4 +1,4 @@ -package com.baeldung.reactive.debugging.consumer.chronjobs; +package com.baeldung.reactive.debugging.consumer.cronjobs; import com.baeldung.reactive.debugging.consumer.model.Foo; import com.baeldung.reactive.debugging.consumer.model.FooDto; @@ -16,10 +16,10 @@ import java.time.Duration; import java.util.concurrent.ThreadLocalRandom; @Component -public class ChronJobs { +public class CronJobs { - private static Logger logger = LoggerFactory.getLogger(ChronJobs.class); - private WebClient client = WebClient.create("http://localhost:8081"); + final Logger logger = LoggerFactory.getLogger(CronJobs.class); + final WebClient client = WebClient.create("http://localhost:8081"); @Autowired private FooService service; @@ -27,17 +27,19 @@ public class ChronJobs { @Scheduled(fixedRate = 10000) public void consumeInfiniteFlux() { Flux fluxFoo = client.get() - .uri("/functional-reactive/periodic-foo") - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(FooDto.class) - .delayElements(Duration.ofMillis(100)) - .map(dto -> { - logger.debug("process 1 with dto id {} name{}", dto.getId(), dto.getName()); - return new Foo(dto); - }); - Integer random = ThreadLocalRandom.current() - .nextInt(0, 3); + .uri("/functional-reactive/periodic-foo") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 1 with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); + + int random = ThreadLocalRandom.current() + .nextInt(0, 3); + switch (random) { case 0: logger.info("process 1 with approach 1"); @@ -51,24 +53,25 @@ public class ChronJobs { logger.info("process 1 with approach 2"); service.processFooInAnotherScenario(fluxFoo); break; - } } @Scheduled(fixedRate = 20000) public void consumeFiniteFlux2() { Flux fluxFoo = client.get() - .uri("/functional-reactive/periodic-foo-2") - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(FooDto.class) - .delayElements(Duration.ofMillis(100)) - .map(dto -> { - logger.debug("process 2 with dto id {} name{}", dto.getId(), dto.getName()); - return new Foo(dto); - }); - Integer random = ThreadLocalRandom.current() - .nextInt(0, 3); + .uri("/functional-reactive/periodic-foo-2") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 2 with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); + + int random = ThreadLocalRandom.current() + .nextInt(0, 3); + switch (random) { case 0: logger.info("process 2 with approach 1"); @@ -82,22 +85,21 @@ public class ChronJobs { logger.info("process 2 with approach 2"); service.processFooInAnotherScenario(fluxFoo); break; - } } @Scheduled(fixedRate = 20000) public void consumeFiniteFlux3() { Flux fluxFoo = client.get() - .uri("/functional-reactive/periodic-foo-2") - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(FooDto.class) - .delayElements(Duration.ofMillis(100)) - .map(dto -> { - logger.debug("process 3 with dto id {} name{}", dto.getId(), dto.getName()); - return new Foo(dto); - }); + .uri("/functional-reactive/periodic-foo-2") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 3 with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); logger.info("process 3 with approach 3"); service.processUsingApproachThree(fluxFoo); } @@ -105,15 +107,15 @@ public class ChronJobs { @Scheduled(fixedRate = 20000) public void consumeFiniteFluxWithCheckpoint4() { Flux fluxFoo = client.get() - .uri("/functional-reactive/periodic-foo-2") - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(FooDto.class) - .delayElements(Duration.ofMillis(100)) - .map(dto -> { - logger.debug("process 4 with dto id {} name{}", dto.getId(), dto.getName()); - return new Foo(dto); - }); + .uri("/functional-reactive/periodic-foo-2") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 4 with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); logger.info("process 4 with approach 4"); service.processUsingApproachFourWithCheckpoint(fluxFoo); } @@ -121,15 +123,15 @@ public class ChronJobs { @Scheduled(fixedRate = 20000) public void consumeFiniteFluxWitParallelScheduler() { Flux fluxFoo = client.get() - .uri("/functional-reactive/periodic-foo-2") - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(FooDto.class) - .delayElements(Duration.ofMillis(100)) - .map(dto -> { - logger.debug("process 5-parallel with dto id {} name{}", dto.getId(), dto.getName()); - return new Foo(dto); - }); + .uri("/functional-reactive/periodic-foo-2") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 5-parallel with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); logger.info("process 5-parallel with approach 5-parallel"); service.processUsingApproachFivePublishingToDifferentParallelThreads(fluxFoo); } @@ -137,15 +139,15 @@ public class ChronJobs { @Scheduled(fixedRate = 20000) public void consumeFiniteFluxWithSingleSchedulers() { Flux fluxFoo = client.get() - .uri("/functional-reactive/periodic-foo-2") - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(FooDto.class) - .delayElements(Duration.ofMillis(100)) - .map(dto -> { - logger.debug("process 5-single with dto id {} name{}", dto.getId(), dto.getName()); - return new Foo(dto); - }); + .uri("/functional-reactive/periodic-foo-2") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 5-single with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); logger.info("process 5-single with approach 5-single"); service.processUsingApproachFivePublishingToDifferentSingleThreads(fluxFoo); } diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/Foo.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/Foo.java index d20e2c9ba0..b77b8e497a 100644 --- a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/Foo.java +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/Foo.java @@ -1,14 +1,12 @@ package com.baeldung.reactive.debugging.consumer.model; import lombok.AllArgsConstructor; -import lombok.Getter; +import lombok.Data; import lombok.NoArgsConstructor; -import lombok.Setter; import java.util.concurrent.ThreadLocalRandom; -@Getter -@Setter +@Data @NoArgsConstructor @AllArgsConstructor public class Foo { @@ -18,10 +16,16 @@ public class Foo { private Integer quantity; public Foo(FooDto dto) { - this.id = (ThreadLocalRandom.current() - .nextInt(0, 100) == 0) ? null : dto.getId(); + this.id = randomId() == 0 ? null : dto.getId(); this.formattedName = dto.getName(); - this.quantity = ThreadLocalRandom.current() - .nextInt(0, 10); + this.quantity = randomQuantity(); + } + + private static int randomId() { + return ThreadLocalRandom.current().nextInt(0, 100); + } + + private static int randomQuantity() { + return ThreadLocalRandom.current().nextInt(0, 10); } } diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/FooDto.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/FooDto.java index bf6f614e18..67c916baec 100644 --- a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/FooDto.java +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/FooDto.java @@ -1,16 +1,15 @@ package com.baeldung.reactive.debugging.consumer.model; import lombok.AllArgsConstructor; -import lombok.Getter; +import lombok.Data; import lombok.NoArgsConstructor; -import lombok.Setter; -@Getter -@Setter +@Data @NoArgsConstructor @AllArgsConstructor public class FooDto { private Integer id; private String name; + } diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooNameHelper.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooNameHelper.java index cdd9ca31a6..9a85f2015a 100644 --- a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooNameHelper.java +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooNameHelper.java @@ -1,44 +1,40 @@ package com.baeldung.reactive.debugging.consumer.service; import com.baeldung.reactive.debugging.consumer.model.Foo; -import reactor.core.publisher.Flux; import java.util.concurrent.ThreadLocalRandom; public class FooNameHelper { - public static Flux concatAndSubstringFooName(Flux flux) { - flux = concatFooName(flux); - flux = substringFooName(flux); - return flux; + public static Foo concatAndSubstringFooName(Foo foo) { + Foo concat = concatFooName(foo); + return substringFooName(concat); } - public static Flux concatFooName(Flux flux) { - flux = flux.map(foo -> { - String processedName = null; - Integer random = ThreadLocalRandom.current() - .nextInt(0, 80); - processedName = (random != 0) ? foo.getFormattedName() : foo.getFormattedName() + "-bael"; - foo.setFormattedName(processedName); - return foo; - }); - return flux; + public static Foo concatFooName(Foo foo) { + + int random = ThreadLocalRandom.current() + .nextInt(0, 80); + + String processedName = (random != 0) + ? foo.getFormattedName() + : foo.getFormattedName() + "-bael"; + + foo.setFormattedName(processedName); + return foo; } - public static Flux substringFooName(Flux flux) { - return flux.map(foo -> { - String processedName; - Integer random = ThreadLocalRandom.current() - .nextInt(0, 100); + public static Foo substringFooName(Foo foo) { + int random = ThreadLocalRandom.current() + .nextInt(0, 100); - processedName = (random == 0) ? foo.getFormattedName() - .substring(10, 15) - : foo.getFormattedName() - .substring(0, 5); + String processedName = (random == 0) + ? foo.getFormattedName().substring(10, 15) + : foo.getFormattedName().substring(0, 5); - foo.setFormattedName(processedName); - return foo; - }); + foo.setFormattedName(processedName); + + return foo; } } diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooQuantityHelper.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooQuantityHelper.java index f4600b41b9..038f26dd8c 100644 --- a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooQuantityHelper.java +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooQuantityHelper.java @@ -1,30 +1,24 @@ package com.baeldung.reactive.debugging.consumer.service; import com.baeldung.reactive.debugging.consumer.model.Foo; -import reactor.core.publisher.Flux; import java.util.concurrent.ThreadLocalRandom; public class FooQuantityHelper { - public static Flux processFooReducingQuantity(Flux flux) { - flux = flux.map(foo -> { - Integer result; - Integer random = ThreadLocalRandom.current() - .nextInt(0, 90); - result = (random == 0) ? result = 0 : foo.getQuantity() + 2; - foo.setQuantity(result); - return foo; - }); - return divideFooQuantity(flux); + public static Foo processFooReducingQuantity(Foo foo) { + int random = ThreadLocalRandom.current().nextInt(0, 90); + int result = (random == 0) ? 0 : foo.getQuantity() + 2; + foo.setQuantity(result); + + return divideFooQuantity(foo); } - public static Flux divideFooQuantity(Flux flux) { - return flux.map(foo -> { - Integer result = Math.round(5 / foo.getQuantity()); - foo.setQuantity(result); - return foo; - }); + public static Foo divideFooQuantity(Foo foo) { + + Integer result = (int) Math.round(5.0 / foo.getQuantity()); + foo.setQuantity(result); + return foo; } } diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooReporter.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooReporter.java index 1a8f9bc783..740c646e8d 100644 --- a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooReporter.java +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooReporter.java @@ -3,22 +3,22 @@ package com.baeldung.reactive.debugging.consumer.service; import com.baeldung.reactive.debugging.consumer.model.Foo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; public class FooReporter { - private static Logger logger = LoggerFactory.getLogger(FooReporter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FooReporter.class); - public static Flux reportResult(Flux input, String approach) { - return input.map(foo -> { - if (foo.getId() == null) - throw new IllegalArgumentException("Null id is not valid!"); - logger.info("Reporting for approach {}: Foo with id '{}' name '{}' and quantity '{}'", approach, foo.getId(), foo.getFormattedName(), foo.getQuantity()); - return foo; - }); + public static Foo reportResult(Foo foo, String approach) { + if (foo.getId() == null) { + throw new IllegalArgumentException("Null id is not valid!"); + } + LOGGER.info("Reporting for approach {}: Foo with id '{}' name '{}' and quantity '{}'", + approach, foo.getId(), foo.getFormattedName(), foo.getQuantity()); + + return foo; } - public static Flux reportResult(Flux input) { + public static Foo reportResult(Foo input) { return reportResult(input, "default"); } } diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooService.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooService.java index bafaa3cfa0..bf630aacb5 100644 --- a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooService.java +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooService.java @@ -7,112 +7,107 @@ import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; -import static com.baeldung.reactive.debugging.consumer.service.FooNameHelper.concatAndSubstringFooName; -import static com.baeldung.reactive.debugging.consumer.service.FooNameHelper.substringFooName; -import static com.baeldung.reactive.debugging.consumer.service.FooQuantityHelper.divideFooQuantity; -import static com.baeldung.reactive.debugging.consumer.service.FooQuantityHelper.processFooReducingQuantity; import static com.baeldung.reactive.debugging.consumer.service.FooReporter.reportResult; @Component public class FooService { - private static Logger logger = LoggerFactory.getLogger(FooService.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FooService.class); public void processFoo(Flux flux) { - flux = FooNameHelper.concatFooName(flux); - flux = FooNameHelper.substringFooName(flux); - flux = flux.log(); - flux = FooReporter.reportResult(flux); - flux = flux.doOnError(error -> { - logger.error("The following error happened on processFoo method!", error); - }); - flux.subscribe(); + flux.map(FooNameHelper::concatFooName) + .map(FooNameHelper::substringFooName) + .log() + .map(FooReporter::reportResult) + .doOnError(error -> LOGGER.error("The following error happened on processFoo method!", error)) + .subscribe(); } public void processFooInAnotherScenario(Flux flux) { - flux = FooNameHelper.substringFooName(flux); - flux = FooQuantityHelper.divideFooQuantity(flux); - flux.subscribe(); + flux.map(FooNameHelper::substringFooName) + .map(FooQuantityHelper::divideFooQuantity) + .subscribe(); } public void processUsingApproachOneWithErrorHandling(Flux flux) { - logger.info("starting approach one w error handling!"); - flux = concatAndSubstringFooName(flux); - flux = concatAndSubstringFooName(flux); - flux = substringFooName(flux); - flux = processFooReducingQuantity(flux); - flux = processFooReducingQuantity(flux); - flux = processFooReducingQuantity(flux); - flux = reportResult(flux, "ONE w/ EH"); - flux = flux.doOnError(error -> { - logger.error("Approach 1 with Error Handling failed!", error); - }); - flux.subscribe(); + LOGGER.info("starting approach one w error handling!"); + + flux.map(FooNameHelper::concatAndSubstringFooName) + .map(FooNameHelper::concatAndSubstringFooName) + .map(FooNameHelper::substringFooName) + .map(FooQuantityHelper::processFooReducingQuantity) + .map(FooQuantityHelper::processFooReducingQuantity) + .map(FooQuantityHelper::processFooReducingQuantity) + .map(FooReporter::reportResult) + .doOnError(error -> LOGGER.error("Approach 1 with Error Handling failed!", error)) + .subscribe(); } public void processUsingApproachThree(Flux flux) { - logger.info("starting approach three!"); - flux = concatAndSubstringFooName(flux); - flux = reportResult(flux, "THREE"); - flux = flux.doOnError(error -> { - logger.error("Approach 3 failed!", error); - }); - flux.subscribe(); + LOGGER.info("starting approach three!"); + + flux.map(FooNameHelper::concatAndSubstringFooName) + .map(foo -> reportResult(foo, "THREE")) + .doOnError(error -> LOGGER.error("Approach 3 failed!", error)) + .subscribe(); } public void processUsingApproachFourWithCheckpoint(Flux flux) { - logger.info("starting approach four!"); - flux = concatAndSubstringFooName(flux); - flux = flux.checkpoint("CHECKPOINT 1"); - flux = concatAndSubstringFooName(flux); - flux = divideFooQuantity(flux); - flux = flux.checkpoint("CHECKPOINT 2", true); - flux = reportResult(flux, "FOUR"); - flux = concatAndSubstringFooName(flux).doOnError(error -> { - logger.error("Approach 4 failed!", error); - }); - flux.subscribe(); + LOGGER.info("starting approach four!"); + + flux.map(FooNameHelper::concatAndSubstringFooName) + .checkpoint("CHECKPOINT 1") + .map(FooNameHelper::concatAndSubstringFooName) + .map(FooQuantityHelper::divideFooQuantity) + .checkpoint("CHECKPOINT 2", true) + .map(foo -> reportResult(foo, "FOUR")) + .map(FooNameHelper::concatAndSubstringFooName) + .doOnError(error -> LOGGER.error("Approach 4 failed!", error)) + .subscribe(); } public void processUsingApproachFourWithInitialCheckpoint(Flux flux) { - logger.info("starting approach four!"); - flux = concatAndSubstringFooName(flux); - flux = flux.checkpoint("CHECKPOINT 1", true); - flux = concatAndSubstringFooName(flux); - flux = divideFooQuantity(flux); - flux = reportResult(flux, "FOUR"); - flux = flux.doOnError(error -> { - logger.error("Approach 4-2 failed!", error); - }); - flux.subscribe(); + LOGGER.info("starting approach four!"); + + flux.map(FooNameHelper::concatAndSubstringFooName) + .checkpoint("CHECKPOINT 1", true) + .map(FooNameHelper::concatAndSubstringFooName) + .map(FooQuantityHelper::divideFooQuantity) + .map(foo -> reportResult(foo, "FOUR")) + .map(FooNameHelper::concatAndSubstringFooName) + .doOnError(error -> LOGGER.error("Approach 4-2 failed!", error)) + .subscribe(); } public void processUsingApproachFivePublishingToDifferentParallelThreads(Flux flux) { - logger.info("starting approach five-parallel!"); - flux = concatAndSubstringFooName(flux).publishOn(Schedulers.newParallel("five-parallel-foo")) - .log(); - flux = concatAndSubstringFooName(flux); - flux = divideFooQuantity(flux); - flux = reportResult(flux, "FIVE-PARALLEL").publishOn(Schedulers.newSingle("five-parallel-bar")); - flux = concatAndSubstringFooName(flux).doOnError(error -> { - logger.error("Approach 5-parallel failed!", error); - }); - flux.subscribeOn(Schedulers.newParallel("five-parallel-starter")) - .subscribe(); + LOGGER.info("starting approach five-parallel!"); + + flux.map(FooNameHelper::concatAndSubstringFooName) + .publishOn(Schedulers.newParallel("five-parallel-foo")) + .log() + .map(FooNameHelper::concatAndSubstringFooName) + .map(foo -> reportResult(foo, "FIVE-PARALLEL")) + .publishOn(Schedulers.newSingle("five-parallel-bar")) + .map(FooNameHelper::concatAndSubstringFooName) + .doOnError(error -> LOGGER.error("Approach 5-parallel failed!", error)) + .subscribeOn(Schedulers.newParallel("five-parallel-starter")) + .subscribe(); } public void processUsingApproachFivePublishingToDifferentSingleThreads(Flux flux) { - logger.info("starting approach five-single!"); - flux = flux.log() - .subscribeOn(Schedulers.newSingle("five-single-starter")); - flux = concatAndSubstringFooName(flux).publishOn(Schedulers.newSingle("five-single-foo")); - flux = concatAndSubstringFooName(flux); - flux = divideFooQuantity(flux); - flux = reportResult(flux, "FIVE-SINGLE").publishOn(Schedulers.newSingle("five-single-bar")); - flux = concatAndSubstringFooName(flux).doOnError(error -> { - logger.error("Approach 5-single failed!", error); - }); - flux.subscribe(); + LOGGER.info("starting approach five-single!"); + + flux.log() + .subscribeOn(Schedulers.newSingle("five-single-starter")) + .map(FooNameHelper::concatAndSubstringFooName) + .publishOn(Schedulers.newSingle("five-single-foo")) + .map(FooNameHelper::concatAndSubstringFooName) + .map(FooQuantityHelper::divideFooQuantity) + .map(foo -> reportResult(foo, "FIVE-SINGLE")) + .publishOn(Schedulers.newSingle("five-single-bar")) + .map(FooNameHelper::concatAndSubstringFooName) + .doOnError(error -> LOGGER.error("Approach 5-single failed!", error)) + .subscribe(); } } diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/handlers/ServerHandler.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/handlers/ServerHandler.java index 15f9a4b786..a86932c6f9 100644 --- a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/handlers/ServerHandler.java +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/handlers/ServerHandler.java @@ -16,30 +16,31 @@ import java.util.concurrent.ThreadLocalRandom; @Component public class ServerHandler { - private static Logger logger = LoggerFactory.getLogger(ServerHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ServerHandler.class); public Mono useHandler(final ServerRequest request) { // there are chances that something goes wrong here... return ServerResponse.ok() .contentType(MediaType.TEXT_EVENT_STREAM) - .body(Flux.interval(Duration.ofSeconds(1)) - .map(sequence -> { - logger.info("retrieving Foo. Sequence: {}", sequence); - if (ThreadLocalRandom.current() - .nextInt(0, 50) == 1) { - throw new RuntimeException("There was an error retrieving the Foo!"); - } - return new Foo(sequence, "name" + sequence); - - }), Foo.class); + .body(getFlux(), Foo.class); } public Mono useHandlerFinite(final ServerRequest request) { return ServerResponse.ok() .contentType(MediaType.TEXT_EVENT_STREAM) .body(Flux.range(0, 50) - .map(sequence -> { - return new Foo(new Long(sequence), "theFooNameNumber" + sequence); - }), Foo.class); + .map(sequence -> new Foo(new Long(sequence), "theFooNameNumber" + sequence) + ), Foo.class); + } + + private static Flux getFlux() { + return Flux.interval(Duration.ofSeconds(1)) + .map(sequence -> { + LOGGER.info("retrieving Foo. Sequence: {}", sequence); + if (ThreadLocalRandom.current().nextInt(0, 50) == 1) { + throw new RuntimeException("There was an error retrieving the Foo!"); + } + return new Foo(sequence, "name" + sequence); + }); } } diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceIntegrationTest.java index 37e2ebe0ac..3635844056 100644 --- a/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceIntegrationTest.java +++ b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceIntegrationTest.java @@ -45,19 +45,19 @@ public class ConsumerFooServiceIntegrationTest { Collection allSuppressedEntries = ListAppender.getEvents() .stream() .map(ILoggingEvent::getThrowableProxy) - .flatMap(t -> { - return Optional.ofNullable(t) - .map(IThrowableProxy::getSuppressed) - .map(Arrays::stream) - .orElse(Stream.empty()); - }) + .flatMap(t -> Optional.ofNullable(t) + .map(IThrowableProxy::getSuppressed) + .map(Arrays::stream) + .orElse(Stream.empty())) .map(IThrowableProxy::getClassName) .collect(Collectors.toList()); - assertThat(allLoggedEntries).anyMatch(entry -> entry.contains("The following error happened on processFoo method!")) - .anyMatch(entry -> entry.contains("| onSubscribe")) - .anyMatch(entry -> entry.contains("| cancel()")); + + assertThat(allLoggedEntries) + .anyMatch(entry -> entry.contains("The following error happened on processFoo method!")) + .anyMatch(entry -> entry.contains("| onSubscribe")) + .anyMatch(entry -> entry.contains("| cancel()")); assertThat(allSuppressedEntries) - .anyMatch(entry -> entry.contains("reactor.core.publisher.FluxOnAssembly$OnAssemblyException")); + .anyMatch(entry -> entry.contains("reactor.core.publisher.FluxOnAssembly$OnAssemblyException")); } } diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceLiveTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceLiveTest.java index ff6e4b2bd2..89e92f2818 100644 --- a/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceLiveTest.java +++ b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceLiveTest.java @@ -1,6 +1,5 @@ package com.baeldung.reactive.debugging.consumer; -import com.baeldung.reactive.debugging.consumer.service.FooService; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.test.web.reactive.server.WebTestClient; @@ -13,8 +12,6 @@ import org.springframework.test.web.reactive.server.WebTestClient.ResponseSpec; */ public class ConsumerFooServiceLiveTest { - FooService service = new FooService(); - private static final String BASE_URL = "http://localhost:8082"; private static final String DEBUG_HOOK_ON = BASE_URL + "/debug-hook-on"; private static final String DEBUG_HOOK_OFF = BASE_URL + "/debug-hook-off"; diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/utils/ListAppender.java b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/utils/ListAppender.java index fe8b04e824..9e1fae9135 100644 --- a/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/utils/ListAppender.java +++ b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/utils/ListAppender.java @@ -8,18 +8,18 @@ import java.util.List; public class ListAppender extends AppenderBase { - static private List events = new ArrayList<>(); + private static final List EVENTS = new ArrayList<>(); @Override protected void append(ILoggingEvent eventObject) { - events.add(eventObject); + EVENTS.add(eventObject); } public static List getEvents() { - return events; + return EVENTS; } public static void clearEventList() { - events.clear(); + EVENTS.clear(); } }