From 3d2f6eff678ba244de9a9b1150e7c82307614fa4 Mon Sep 17 00:00:00 2001 From: Ger Roza Date: Tue, 27 Nov 2018 17:32:34 -0200 Subject: [PATCH] [BAEL-2278] spring-5-reactive | debugging reactive streams (#5789) * First approach for the scenario, not stable yet. * Added 2 services for teh article Consumer is the main example project, with all the debugging functionality * * cleaned unused spring-5-data functionality * cleaning unused spring-5-data pom * *fixed indentation error * addressed Jira comments: * Created live test, and renamed unit test as integrtion test * Tried to fix issue in CI, couldnt reproduce locally --- .../consumer/ConsumerSSEApplication.java | 33 +++++ .../consumer/chronjobs/ChronJobs.java | 122 ++++++++++++++++++ .../ReactiveConfigsToggleRestController.java | 23 ++++ .../debugging/consumer/model/Foo.java | 26 ++++ .../debugging/consumer/model/FooDto.java | 12 ++ .../consumer/service/FooNameHelper.java | 45 +++++++ .../consumer/service/FooQuantityHelper.java | 31 +++++ .../consumer/service/FooReporter.java | 26 ++++ .../consumer/service/FooService.java | 91 +++++++++++++ .../server/ServerSSEApplication.java | 29 +++++ .../server/handlers/ServerHandler.java | 47 +++++++ .../baeldung/debugging/server/model/Foo.java | 16 +++ .../server/routers/ServerRouter.java | 22 ++++ .../src/main/resources/application.properties | 3 +- .../ConsumerFooServiceIntegrationTest.java | 65 ++++++++++ .../consumer/ConsumerFooServiceLiveTest.java | 49 +++++++ .../consumer/utils/ListAppender.java | 25 ++++ .../src/test/resources/logback-test.xml | 16 +++ 18 files changed, 679 insertions(+), 2 deletions(-) create mode 100644 spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/ConsumerSSEApplication.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/chronjobs/ChronJobs.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/model/Foo.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/model/FooDto.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooNameHelper.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooQuantityHelper.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooReporter.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooService.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/debugging/server/ServerSSEApplication.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/debugging/server/handlers/ServerHandler.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/debugging/server/model/Foo.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/debugging/server/routers/ServerRouter.java create mode 100644 spring-5-reactive/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceIntegrationTest.java create mode 100644 spring-5-reactive/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceLiveTest.java create mode 100644 spring-5-reactive/src/test/java/com/baeldung/debugging/consumer/utils/ListAppender.java create mode 100644 spring-5-reactive/src/test/resources/logback-test.xml diff --git a/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/ConsumerSSEApplication.java b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/ConsumerSSEApplication.java new file mode 100644 index 0000000000..55db3d7392 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/ConsumerSSEApplication.java @@ -0,0 +1,33 @@ +package com.baeldung.debugging.consumer; + +import java.util.Collections; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.security.config.web.server.ServerHttpSecurity; +import org.springframework.security.web.server.SecurityWebFilterChain; + +import reactor.core.publisher.Hooks; + +@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class) +@EnableScheduling +public class ConsumerSSEApplication { + + public static void main(String[] args) { + Hooks.onOperatorDebug(); + SpringApplication app = new SpringApplication(ConsumerSSEApplication.class); + app.setDefaultProperties(Collections.singletonMap("server.port", "8082")); + app.run(args); + } + + @Bean + public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) { + http.authorizeExchange() + .anyExchange() + .permitAll(); + return http.build(); + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/chronjobs/ChronJobs.java b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/chronjobs/ChronJobs.java new file mode 100644 index 0000000000..09cbc34a6f --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/chronjobs/ChronJobs.java @@ -0,0 +1,122 @@ +package com.baeldung.debugging.consumer.chronjobs; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; + +import com.baeldung.debugging.consumer.model.Foo; +import com.baeldung.debugging.consumer.model.FooDto; +import com.baeldung.debugging.consumer.service.FooService; + +import reactor.core.publisher.Flux; + +@Component +public class ChronJobs { + + private static Logger logger = LoggerFactory.getLogger(ChronJobs.class); + private WebClient client = WebClient.create("http://localhost:8081"); + + @Autowired + private FooService service; + + @Scheduled(fixedRate = 10000) + public void consumeInfiniteFlux() { + Flux fluxFoo = client.get() + .uri("/functional-reactive/periodic-foo") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 1 with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); + Integer random = ThreadLocalRandom.current() + .nextInt(0, 3); + switch (random) { + case 0: + logger.info("process 1 with approach 1"); + service.processFoo(fluxFoo); + break; + case 1: + logger.info("process 1 with approach 1 EH"); + service.processUsingApproachOneWithErrorHandling(fluxFoo); + break; + default: + logger.info("process 1 with approach 2"); + service.processFooInAnotherScenario(fluxFoo); + break; + + } + } + + @Scheduled(fixedRate = 20000) + public void consumeFiniteFlux2() { + Flux fluxFoo = client.get() + .uri("/functional-reactive/periodic-foo-2") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 2 with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); + Integer random = ThreadLocalRandom.current() + .nextInt(0, 3); + switch (random) { + case 0: + logger.info("process 2 with approach 1"); + service.processFoo(fluxFoo); + break; + case 1: + logger.info("process 2 with approach 1 EH"); + service.processUsingApproachOneWithErrorHandling(fluxFoo); + break; + default: + logger.info("process 2 with approach 2"); + service.processFooInAnotherScenario(fluxFoo); + break; + + } + } + + @Scheduled(fixedRate = 20000) + public void consumeFiniteFlux3() { + Flux fluxFoo = client.get() + .uri("/functional-reactive/periodic-foo-2") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 3 with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); + logger.info("process 3 with approach 3"); + service.processUsingApproachThree(fluxFoo); + } + + @Scheduled(fixedRate = 20000) + public void consumeFiniteFluxWithCheckpoint4() { + Flux fluxFoo = client.get() + .uri("/functional-reactive/periodic-foo-2") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 4 with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); + logger.info("process 4 with approach 4"); + service.processUsingApproachFourWithCheckpoint(fluxFoo); + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java new file mode 100644 index 0000000000..3dcdc6c7c0 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java @@ -0,0 +1,23 @@ +package com.baeldung.debugging.consumer.controllers; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import reactor.core.publisher.Hooks; + +@RestController +public class ReactiveConfigsToggleRestController { + + @GetMapping("/debug-hook-on") + public String setReactiveDebugOn() { + Hooks.onOperatorDebug(); + return "DEBUG HOOK ON"; + } + + @GetMapping("/debug-hook-off") + public String setReactiveDebugOff() { + Hooks.resetOnOperatorDebug(); + return "DEBUG HOOK OFF"; + } + +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/model/Foo.java b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/model/Foo.java new file mode 100644 index 0000000000..e101457b84 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/model/Foo.java @@ -0,0 +1,26 @@ +package com.baeldung.debugging.consumer.model; + +import java.util.concurrent.ThreadLocalRandom; + +import org.springframework.data.annotation.Id; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class Foo { + + @Id + private Integer id; + private String formattedName; + private Integer quantity; + + public Foo(FooDto dto) { + this.id = (ThreadLocalRandom.current() + .nextInt(0, 100) == 0) ? null : dto.getId(); + this.formattedName = dto.getName(); + this.quantity = ThreadLocalRandom.current() + .nextInt(0, 10); + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/model/FooDto.java b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/model/FooDto.java new file mode 100644 index 0000000000..50508fd216 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/model/FooDto.java @@ -0,0 +1,12 @@ +package com.baeldung.debugging.consumer.model; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class FooDto { + + private Integer id; + private String name; +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooNameHelper.java b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooNameHelper.java new file mode 100644 index 0000000000..772b360437 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooNameHelper.java @@ -0,0 +1,45 @@ +package com.baeldung.debugging.consumer.service; + +import java.util.concurrent.ThreadLocalRandom; + +import com.baeldung.debugging.consumer.model.Foo; + +import reactor.core.publisher.Flux; + +public class FooNameHelper { + + public static Flux concatAndSubstringFooName(Flux flux) { + flux = concatFooName(flux); + flux = substringFooName(flux); + return flux; + } + + public static Flux concatFooName(Flux flux) { + flux = flux.map(foo -> { + String processedName = null; + Integer random = ThreadLocalRandom.current() + .nextInt(0, 80); + processedName = (random != 0) ? foo.getFormattedName() : foo.getFormattedName() + "-bael"; + foo.setFormattedName(processedName); + return foo; + }); + return flux; + } + + public static Flux substringFooName(Flux flux) { + return flux.map(foo -> { + String processedName; + Integer random = ThreadLocalRandom.current() + .nextInt(0, 100); + + processedName = (random == 0) ? foo.getFormattedName() + .substring(10, 15) + : foo.getFormattedName() + .substring(0, 5); + + foo.setFormattedName(processedName); + return foo; + }); + } + +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooQuantityHelper.java b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooQuantityHelper.java new file mode 100644 index 0000000000..615239313d --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooQuantityHelper.java @@ -0,0 +1,31 @@ +package com.baeldung.debugging.consumer.service; + +import java.util.concurrent.ThreadLocalRandom; + +import com.baeldung.debugging.consumer.model.Foo; + +import reactor.core.publisher.Flux; + +public class FooQuantityHelper { + + public static Flux processFooReducingQuantity(Flux flux) { + flux = flux.map(foo -> { + Integer result; + Integer random = ThreadLocalRandom.current() + .nextInt(0, 90); + result = (random == 0) ? result = 0 : foo.getQuantity() + 2; + foo.setQuantity(result); + return foo; + }); + return divideFooQuantity(flux); + } + + public static Flux divideFooQuantity(Flux flux) { + return flux.map(foo -> { + Integer result = Math.round(5 / foo.getQuantity()); + foo.setQuantity(result); + return foo; + }); + } + +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooReporter.java b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooReporter.java new file mode 100644 index 0000000000..f53cd238e0 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooReporter.java @@ -0,0 +1,26 @@ +package com.baeldung.debugging.consumer.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.baeldung.debugging.consumer.model.Foo; + +import reactor.core.publisher.Flux; + +public class FooReporter { + + private static Logger logger = LoggerFactory.getLogger(FooReporter.class); + + public static Flux reportResult(Flux input, String approach) { + return input.map(foo -> { + if (foo.getId() == null) + throw new IllegalArgumentException("Null id is not valid!"); + logger.info("Reporting for approach {}: Foo with id '{}' name '{}' and quantity '{}'", approach, foo.getId(), foo.getFormattedName(), foo.getQuantity()); + return foo; + }); + } + + public static Flux reportResult(Flux input) { + return reportResult(input, "default"); + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooService.java b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooService.java new file mode 100644 index 0000000000..937e445ef5 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/debugging/consumer/service/FooService.java @@ -0,0 +1,91 @@ +package com.baeldung.debugging.consumer.service; + +import static com.baeldung.debugging.consumer.service.FooNameHelper.concatAndSubstringFooName; +import static com.baeldung.debugging.consumer.service.FooNameHelper.substringFooName; +import static com.baeldung.debugging.consumer.service.FooQuantityHelper.divideFooQuantity; +import static com.baeldung.debugging.consumer.service.FooQuantityHelper.processFooReducingQuantity; +import static com.baeldung.debugging.consumer.service.FooReporter.reportResult; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import com.baeldung.debugging.consumer.model.Foo; + +import reactor.core.publisher.Flux; + +@Component +public class FooService { + + private static Logger logger = LoggerFactory.getLogger(FooService.class); + + public void processFoo(Flux flux) { + flux = FooNameHelper.concatFooName(flux); + flux = FooNameHelper.substringFooName(flux); + flux = flux.log(); + flux = FooReporter.reportResult(flux); + flux = flux.doOnError(error -> { + logger.error("The following error happened on processFoo method!", error); + }); + flux.subscribe(); + } + + public void processFooInAnotherScenario(Flux flux) { + flux = FooNameHelper.substringFooName(flux); + flux = FooQuantityHelper.divideFooQuantity(flux); + flux.subscribe(); + } + + public void processUsingApproachOneWithErrorHandling(Flux flux) { + logger.info("starting approach one w error handling!"); + flux = concatAndSubstringFooName(flux); + flux = concatAndSubstringFooName(flux); + flux = substringFooName(flux); + flux = processFooReducingQuantity(flux); + flux = processFooReducingQuantity(flux); + flux = processFooReducingQuantity(flux); + flux = reportResult(flux, "ONE w/ EH"); + flux = flux.doOnError(error -> { + logger.error("Approach 1 with Error Handling failed!", error); + }); + flux.subscribe(); + } + + public void processUsingApproachThree(Flux flux) { + logger.info("starting approach three!"); + flux = concatAndSubstringFooName(flux); + flux = reportResult(flux, "THREE"); + flux = flux.doOnError(error -> { + logger.error("Approach 3 failed!", error); + }); + flux.subscribe(); + } + + public void processUsingApproachFourWithCheckpoint(Flux flux) { + logger.info("starting approach four!"); + flux = concatAndSubstringFooName(flux); + flux = flux.checkpoint("CHECKPOINT 1"); + flux = concatAndSubstringFooName(flux); + flux = divideFooQuantity(flux); + flux = flux.checkpoint("CHECKPOINT 2", true); + flux = reportResult(flux, "FOUR"); + flux = concatAndSubstringFooName(flux).doOnError(error -> { + logger.error("Approach 4 failed!", error); + }); + flux.subscribe(); + } + + public void processUsingApproachFourWithInitialCheckpoint(Flux flux) { + logger.info("starting approach four!"); + flux = concatAndSubstringFooName(flux); + flux = flux.checkpoint("CHECKPOINT 1", true); + flux = concatAndSubstringFooName(flux); + flux = divideFooQuantity(flux); + flux = reportResult(flux, "FOUR"); + flux = flux.doOnError(error -> { + logger.error("Approach 4-2 failed!", error); + }); + flux.subscribe(); + } + +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/debugging/server/ServerSSEApplication.java b/spring-5-reactive/src/main/java/com/baeldung/debugging/server/ServerSSEApplication.java new file mode 100644 index 0000000000..6b24ee39f0 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/debugging/server/ServerSSEApplication.java @@ -0,0 +1,29 @@ +package com.baeldung.debugging.server; + +import java.util.Collections; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.security.config.web.server.ServerHttpSecurity; +import org.springframework.security.web.server.SecurityWebFilterChain; +import org.springframework.web.reactive.config.EnableWebFlux; + +@EnableWebFlux +@SpringBootApplication +public class ServerSSEApplication { + + public static void main(String[] args) { + SpringApplication app = new SpringApplication(ServerSSEApplication.class); + app.setDefaultProperties(Collections.singletonMap("server.port", "8081")); + app.run(args); + } + + @Bean + public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) { + http.authorizeExchange() + .anyExchange() + .permitAll(); + return http.build(); + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/debugging/server/handlers/ServerHandler.java b/spring-5-reactive/src/main/java/com/baeldung/debugging/server/handlers/ServerHandler.java new file mode 100644 index 0000000000..759cd9b01d --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/debugging/server/handlers/ServerHandler.java @@ -0,0 +1,47 @@ +package com.baeldung.debugging.server.handlers; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; + +import com.baeldung.debugging.server.model.Foo; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Component +public class ServerHandler { + + private static Logger logger = LoggerFactory.getLogger(ServerHandler.class); + + public Mono useHandler(final ServerRequest request) { + // there are chances that something goes wrong here... + return ServerResponse.ok() + .contentType(MediaType.TEXT_EVENT_STREAM) + .body(Flux.interval(Duration.ofSeconds(1)) + .map(sequence -> { + logger.info("retrieving Foo. Sequence: {}", sequence); + if (ThreadLocalRandom.current() + .nextInt(0, 50) == 1) { + throw new RuntimeException("There was an error retrieving the Foo!"); + } + return new Foo(sequence, "name" + sequence); + + }), Foo.class); + } + + public Mono useHandlerFinite(final ServerRequest request) { + return ServerResponse.ok() + .contentType(MediaType.TEXT_EVENT_STREAM) + .body(Flux.range(0, 50) + .map(sequence -> { + return new Foo(new Long(sequence), "theFooNameNumber" + sequence); + }), Foo.class); + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/debugging/server/model/Foo.java b/spring-5-reactive/src/main/java/com/baeldung/debugging/server/model/Foo.java new file mode 100644 index 0000000000..a60e468e7f --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/debugging/server/model/Foo.java @@ -0,0 +1,16 @@ +package com.baeldung.debugging.server.model; + +import org.springframework.data.annotation.Id; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class Foo { + + @Id + private Long id; + private String name; + +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/debugging/server/routers/ServerRouter.java b/spring-5-reactive/src/main/java/com/baeldung/debugging/server/routers/ServerRouter.java new file mode 100644 index 0000000000..6378b2213d --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/debugging/server/routers/ServerRouter.java @@ -0,0 +1,22 @@ +package com.baeldung.debugging.server.routers; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.function.server.RequestPredicates; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.RouterFunctions; +import org.springframework.web.reactive.function.server.ServerResponse; + +import com.baeldung.debugging.server.handlers.ServerHandler; + +@Configuration +public class ServerRouter { + + @Bean + public RouterFunction responseRoute(@Autowired ServerHandler handler) { + return RouterFunctions.route(RequestPredicates.GET("/functional-reactive/periodic-foo"), handler::useHandler) + .andRoute(RequestPredicates.GET("/functional-reactive/periodic-foo-2"), handler::useHandlerFinite); + } + +} diff --git a/spring-5-reactive/src/main/resources/application.properties b/spring-5-reactive/src/main/resources/application.properties index 92f3116f84..4b49e8e8a2 100644 --- a/spring-5-reactive/src/main/resources/application.properties +++ b/spring-5-reactive/src/main/resources/application.properties @@ -1,2 +1 @@ -logging.level.root=INFO - +logging.level.root=INFO \ No newline at end of file diff --git a/spring-5-reactive/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceIntegrationTest.java b/spring-5-reactive/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceIntegrationTest.java new file mode 100644 index 0000000000..b7ed031ec7 --- /dev/null +++ b/spring-5-reactive/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceIntegrationTest.java @@ -0,0 +1,65 @@ +package com.baeldung.debugging.consumer; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.baeldung.debugging.consumer.model.Foo; +import com.baeldung.debugging.consumer.service.FooService; +import com.baeldung.debugging.consumer.utils.ListAppender; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; + +public class ConsumerFooServiceIntegrationTest { + + FooService service = new FooService(); + + @BeforeEach + public void clearLogList() { + Hooks.onOperatorDebug(); + ListAppender.clearEventList(); + } + + @Test + public void givenFooWithNullId_whenProcessFoo_thenLogsWithDebugTrace() { + Foo one = new Foo(1, "nameverylong", 8); + Foo two = new Foo(null, "nameverylong", 4); + Flux flux = Flux.just(one, two); + + service.processFoo(flux); + + Collection allLoggedEntries = ListAppender.getEvents() + .stream() + .map(ILoggingEvent::getFormattedMessage) + .collect(Collectors.toList()); + + Collection allSuppressedEntries = ListAppender.getEvents() + .stream() + .map(ILoggingEvent::getThrowableProxy) + .flatMap(t -> { + return Optional.ofNullable(t) + .map(IThrowableProxy::getSuppressed) + .map(Arrays::stream) + .orElse(Stream.empty()); + }) + .map(IThrowableProxy::getMessage) + .collect(Collectors.toList()); + assertThat(allLoggedEntries).anyMatch(entry -> entry.contains("The following error happened on processFoo method!")) + .anyMatch(entry -> entry.contains("| onSubscribe")) + .anyMatch(entry -> entry.contains("| cancel()")); + + assertThat(allSuppressedEntries).anyMatch(entry -> entry.contains("Assembly trace from producer")) + .anyMatch(entry -> entry.contains("Error has been observed by the following operator(s)")); + } + +} diff --git a/spring-5-reactive/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceLiveTest.java b/spring-5-reactive/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceLiveTest.java new file mode 100644 index 0000000000..af9bdfbc9b --- /dev/null +++ b/spring-5-reactive/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceLiveTest.java @@ -0,0 +1,49 @@ +package com.baeldung.debugging.consumer; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.test.web.reactive.server.WebTestClient; +import org.springframework.test.web.reactive.server.WebTestClient.ResponseSpec; + +import com.baeldung.debugging.consumer.service.FooService; + +public class ConsumerFooServiceLiveTest { + + FooService service = new FooService(); + + private static final String BASE_URL = "http://localhost:8082"; + private static final String DEBUG_HOOK_ON = BASE_URL + "/debug-hook-on"; + private static final String DEBUG_HOOK_OFF = BASE_URL + "/debug-hook-off"; + + private static WebTestClient client; + + @BeforeAll + public static void setup() { + client = WebTestClient.bindToServer() + .baseUrl(BASE_URL) + .build(); + } + + @Test + public void whenRequestingDebugHookOn_thenObtainExpectedMessage() { + ResponseSpec response = client.get() + .uri(DEBUG_HOOK_ON) + .exchange(); + response.expectStatus() + .isOk() + .expectBody(String.class) + .isEqualTo("DEBUG HOOK ON"); + } + + @Test + public void whenRequestingDebugHookOff_thenObtainExpectedMessage() { + ResponseSpec response = client.get() + .uri(DEBUG_HOOK_OFF) + .exchange(); + response.expectStatus() + .isOk() + .expectBody(String.class) + .isEqualTo("DEBUG HOOK OFF"); + } + +} diff --git a/spring-5-reactive/src/test/java/com/baeldung/debugging/consumer/utils/ListAppender.java b/spring-5-reactive/src/test/java/com/baeldung/debugging/consumer/utils/ListAppender.java new file mode 100644 index 0000000000..c8c1c110bb --- /dev/null +++ b/spring-5-reactive/src/test/java/com/baeldung/debugging/consumer/utils/ListAppender.java @@ -0,0 +1,25 @@ +package com.baeldung.debugging.consumer.utils; + +import java.util.ArrayList; +import java.util.List; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.AppenderBase; + +public class ListAppender extends AppenderBase { + + static private List events = new ArrayList<>(); + + @Override + protected void append(ILoggingEvent eventObject) { + events.add(eventObject); + } + + public static List getEvents() { + return events; + } + + public static void clearEventList() { + events.clear(); + } +} diff --git a/spring-5-reactive/src/test/resources/logback-test.xml b/spring-5-reactive/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..514029e402 --- /dev/null +++ b/spring-5-reactive/src/test/resources/logback-test.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + \ No newline at end of file