JAVA-14288 Rename spring-5-reactive-modules to spring-reactive-modules (#12659)
* JAVA-14288 Rename spring-5-reactive-modules to spring-reactive-modules * JAVA-14288 Remove failing module * JAVA-14288 Revert commenting spring-cloud-openfeign-2 module
This commit is contained in:
+17
@@ -0,0 +1,17 @@
|
||||
package com.baeldung.reactive.concurrency;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
/**
|
||||
* Please note we assume Mongo and Kafka are running in the local machine and on default configuration.
|
||||
* Additionally, if you want to experiment with Tomcat/Jetty instead of Netty, just uncomment the lines in pom.xml and rebuild.
|
||||
*/
|
||||
@SpringBootApplication
|
||||
public class Application {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(Application.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
+127
@@ -0,0 +1,127 @@
|
||||
package com.baeldung.reactive.concurrency;
|
||||
|
||||
import io.reactivex.Observable;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.IntegerDeserializer;
|
||||
import org.apache.kafka.common.serialization.IntegerSerializer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.kafka.receiver.KafkaReceiver;
|
||||
import reactor.kafka.receiver.ReceiverOptions;
|
||||
import reactor.kafka.receiver.ReceiverRecord;
|
||||
import reactor.kafka.sender.KafkaSender;
|
||||
import reactor.kafka.sender.SenderOptions;
|
||||
import reactor.kafka.sender.SenderRecord;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/")
|
||||
public class Controller {
|
||||
|
||||
@Autowired
|
||||
private PersonRepository personRepository;
|
||||
|
||||
private Scheduler scheduler = Schedulers.newBoundedElastic(5, 10, "MyThreadGroup");
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(Controller.class);
|
||||
|
||||
@GetMapping("/threads/webflux")
|
||||
public Flux<String> getThreadsWebflux() {
|
||||
return Flux.fromIterable(getThreads());
|
||||
}
|
||||
|
||||
@GetMapping("/threads/webclient")
|
||||
public Flux<String> getThreadsWebClient() {
|
||||
WebClient.create("http://localhost:8080/index")
|
||||
.get()
|
||||
.retrieve()
|
||||
.bodyToMono(String.class)
|
||||
.subscribeOn(scheduler)
|
||||
.publishOn(scheduler)
|
||||
.doOnNext(s -> logger.info("Response: {}", s))
|
||||
.subscribe();
|
||||
return Flux.fromIterable(getThreads());
|
||||
}
|
||||
|
||||
@GetMapping("/threads/rxjava")
|
||||
public Observable<String> getIndexRxJava() {
|
||||
Observable.fromIterable(Arrays.asList("Hello", "World"))
|
||||
.map(s -> s.toUpperCase())
|
||||
.observeOn(io.reactivex.schedulers.Schedulers.trampoline())
|
||||
.doOnNext(s -> logger.info("String: {}", s))
|
||||
.subscribe();
|
||||
return Observable.fromIterable(getThreads());
|
||||
}
|
||||
|
||||
@GetMapping("/threads/mongodb")
|
||||
public Flux<String> getIndexMongo() {
|
||||
personRepository.findAll()
|
||||
.doOnNext(p -> logger.info("Person: {}", p))
|
||||
.subscribe();
|
||||
return Flux.fromIterable(getThreads());
|
||||
}
|
||||
|
||||
@GetMapping("/threads/reactor-kafka")
|
||||
public Flux<String> getIndexKafka() {
|
||||
Map<String, Object> producerProps = new HashMap<>();
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProps);
|
||||
KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);
|
||||
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = Flux.range(1, 10)
|
||||
.map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i));
|
||||
sender.send(outboundFlux)
|
||||
.subscribe();
|
||||
|
||||
Map<String, Object> consumerProps = new HashMap<>();
|
||||
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer");
|
||||
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
|
||||
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
|
||||
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
ReceiverOptions<Integer, String> receiverOptions = ReceiverOptions.create(consumerProps);
|
||||
receiverOptions.subscription(Collections.singleton("reactive-test"));
|
||||
KafkaReceiver<Integer, String> receiver = KafkaReceiver.create(receiverOptions);
|
||||
Flux<ReceiverRecord<Integer, String>> inboundFlux = receiver.receive();
|
||||
inboundFlux.subscribe(r -> {
|
||||
logger.info("Received message: {}", r.value());
|
||||
r.receiverOffset()
|
||||
.acknowledge();
|
||||
});
|
||||
return Flux.fromIterable(getThreads());
|
||||
}
|
||||
|
||||
@GetMapping("/index")
|
||||
public Mono<String> getIndex() {
|
||||
return Mono.just("Hello world!");
|
||||
}
|
||||
|
||||
private List<String> getThreads() {
|
||||
return Thread.getAllStackTraces()
|
||||
.keySet()
|
||||
.stream()
|
||||
.map(t -> String.format("%-20s \t %s \t %d \t %s\n", t.getName(), t.getState(), t.getPriority(), t.isDaemon() ? "Daemon" : "Normal"))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
+27
@@ -0,0 +1,27 @@
|
||||
package com.baeldung.reactive.concurrency;
|
||||
|
||||
import org.springframework.data.annotation.Id;
|
||||
import org.springframework.data.mongodb.core.mapping.Document;
|
||||
|
||||
@Document
|
||||
public class Person {
|
||||
@Id
|
||||
String id;
|
||||
|
||||
public Person(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Person{" + "id='" + id + '\'' + '}';
|
||||
}
|
||||
}
|
||||
+6
@@ -0,0 +1,6 @@
|
||||
package com.baeldung.reactive.concurrency;
|
||||
|
||||
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
|
||||
|
||||
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {
|
||||
}
|
||||
+33
@@ -0,0 +1,33 @@
|
||||
package com.baeldung.reactive.debugging.consumer;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.security.config.web.server.ServerHttpSecurity;
|
||||
import org.springframework.security.web.server.SecurityWebFilterChain;
|
||||
import reactor.core.publisher.Hooks;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
|
||||
@EnableScheduling
|
||||
public class ConsumerDebuggingApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
Hooks.onOperatorDebug();
|
||||
SpringApplication app = new SpringApplication(ConsumerDebuggingApplication.class);
|
||||
app.setDefaultProperties(Collections.singletonMap("server.port", "8082"));
|
||||
app.run(args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SecurityWebFilterChain debuggingConsumerSpringSecurityFilterChain(ServerHttpSecurity http) {
|
||||
http.authorizeExchange()
|
||||
.anyExchange()
|
||||
.permitAll();
|
||||
http.csrf().disable();
|
||||
return http.build();
|
||||
}
|
||||
}
|
||||
+22
@@ -0,0 +1,22 @@
|
||||
package com.baeldung.reactive.debugging.consumer.controllers;
|
||||
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import reactor.core.publisher.Hooks;
|
||||
|
||||
@RestController
|
||||
public class ReactiveConfigsToggleRestController {
|
||||
|
||||
@GetMapping("/debug-hook-on")
|
||||
public String setReactiveDebugOn() {
|
||||
Hooks.onOperatorDebug();
|
||||
return "DEBUG HOOK ON";
|
||||
}
|
||||
|
||||
@GetMapping("/debug-hook-off")
|
||||
public String setReactiveDebugOff() {
|
||||
Hooks.resetOnOperatorDebug();
|
||||
return "DEBUG HOOK OFF";
|
||||
}
|
||||
|
||||
}
|
||||
+154
@@ -0,0 +1,154 @@
|
||||
package com.baeldung.reactive.debugging.consumer.cronjobs;
|
||||
|
||||
import com.baeldung.reactive.debugging.consumer.model.Foo;
|
||||
import com.baeldung.reactive.debugging.consumer.model.FooDto;
|
||||
import com.baeldung.reactive.debugging.consumer.service.FooService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
@Component
|
||||
public class CronJobs {
|
||||
|
||||
final Logger logger = LoggerFactory.getLogger(CronJobs.class);
|
||||
final WebClient client = WebClient.create("http://localhost:8081");
|
||||
|
||||
@Autowired
|
||||
private FooService service;
|
||||
|
||||
@Scheduled(fixedRate = 10000)
|
||||
public void consumeInfiniteFlux() {
|
||||
Flux<Foo> fluxFoo = client.get()
|
||||
.uri("/functional-reactive/periodic-foo")
|
||||
.accept(MediaType.TEXT_EVENT_STREAM)
|
||||
.retrieve()
|
||||
.bodyToFlux(FooDto.class)
|
||||
.delayElements(Duration.ofMillis(100))
|
||||
.map(dto -> {
|
||||
logger.debug("process 1 with dto id {} name{}", dto.getId(), dto.getName());
|
||||
return new Foo(dto);
|
||||
});
|
||||
|
||||
int random = ThreadLocalRandom.current()
|
||||
.nextInt(0, 3);
|
||||
|
||||
switch (random) {
|
||||
case 0:
|
||||
logger.info("process 1 with approach 1");
|
||||
service.processFoo(fluxFoo);
|
||||
break;
|
||||
case 1:
|
||||
logger.info("process 1 with approach 1 EH");
|
||||
service.processUsingApproachOneWithErrorHandling(fluxFoo);
|
||||
break;
|
||||
default:
|
||||
logger.info("process 1 with approach 2");
|
||||
service.processFooInAnotherScenario(fluxFoo);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(fixedRate = 20000)
|
||||
public void consumeFiniteFlux2() {
|
||||
Flux<Foo> fluxFoo = client.get()
|
||||
.uri("/functional-reactive/periodic-foo-2")
|
||||
.accept(MediaType.TEXT_EVENT_STREAM)
|
||||
.retrieve()
|
||||
.bodyToFlux(FooDto.class)
|
||||
.delayElements(Duration.ofMillis(100))
|
||||
.map(dto -> {
|
||||
logger.debug("process 2 with dto id {} name{}", dto.getId(), dto.getName());
|
||||
return new Foo(dto);
|
||||
});
|
||||
|
||||
int random = ThreadLocalRandom.current()
|
||||
.nextInt(0, 3);
|
||||
|
||||
switch (random) {
|
||||
case 0:
|
||||
logger.info("process 2 with approach 1");
|
||||
service.processFoo(fluxFoo);
|
||||
break;
|
||||
case 1:
|
||||
logger.info("process 2 with approach 1 EH");
|
||||
service.processUsingApproachOneWithErrorHandling(fluxFoo);
|
||||
break;
|
||||
default:
|
||||
logger.info("process 2 with approach 2");
|
||||
service.processFooInAnotherScenario(fluxFoo);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(fixedRate = 20000)
|
||||
public void consumeFiniteFlux3() {
|
||||
Flux<Foo> fluxFoo = client.get()
|
||||
.uri("/functional-reactive/periodic-foo-2")
|
||||
.accept(MediaType.TEXT_EVENT_STREAM)
|
||||
.retrieve()
|
||||
.bodyToFlux(FooDto.class)
|
||||
.delayElements(Duration.ofMillis(100))
|
||||
.map(dto -> {
|
||||
logger.debug("process 3 with dto id {} name{}", dto.getId(), dto.getName());
|
||||
return new Foo(dto);
|
||||
});
|
||||
logger.info("process 3 with approach 3");
|
||||
service.processUsingApproachThree(fluxFoo);
|
||||
}
|
||||
|
||||
@Scheduled(fixedRate = 20000)
|
||||
public void consumeFiniteFluxWithCheckpoint4() {
|
||||
Flux<Foo> fluxFoo = client.get()
|
||||
.uri("/functional-reactive/periodic-foo-2")
|
||||
.accept(MediaType.TEXT_EVENT_STREAM)
|
||||
.retrieve()
|
||||
.bodyToFlux(FooDto.class)
|
||||
.delayElements(Duration.ofMillis(100))
|
||||
.map(dto -> {
|
||||
logger.debug("process 4 with dto id {} name{}", dto.getId(), dto.getName());
|
||||
return new Foo(dto);
|
||||
});
|
||||
logger.info("process 4 with approach 4");
|
||||
service.processUsingApproachFourWithCheckpoint(fluxFoo);
|
||||
}
|
||||
|
||||
@Scheduled(fixedRate = 20000)
|
||||
public void consumeFiniteFluxWitParallelScheduler() {
|
||||
Flux<Foo> fluxFoo = client.get()
|
||||
.uri("/functional-reactive/periodic-foo-2")
|
||||
.accept(MediaType.TEXT_EVENT_STREAM)
|
||||
.retrieve()
|
||||
.bodyToFlux(FooDto.class)
|
||||
.delayElements(Duration.ofMillis(100))
|
||||
.map(dto -> {
|
||||
logger.debug("process 5-parallel with dto id {} name{}", dto.getId(), dto.getName());
|
||||
return new Foo(dto);
|
||||
});
|
||||
logger.info("process 5-parallel with approach 5-parallel");
|
||||
service.processUsingApproachFivePublishingToDifferentParallelThreads(fluxFoo);
|
||||
}
|
||||
|
||||
@Scheduled(fixedRate = 20000)
|
||||
public void consumeFiniteFluxWithSingleSchedulers() {
|
||||
Flux<Foo> fluxFoo = client.get()
|
||||
.uri("/functional-reactive/periodic-foo-2")
|
||||
.accept(MediaType.TEXT_EVENT_STREAM)
|
||||
.retrieve()
|
||||
.bodyToFlux(FooDto.class)
|
||||
.delayElements(Duration.ofMillis(100))
|
||||
.map(dto -> {
|
||||
logger.debug("process 5-single with dto id {} name{}", dto.getId(), dto.getName());
|
||||
return new Foo(dto);
|
||||
});
|
||||
logger.info("process 5-single with approach 5-single");
|
||||
service.processUsingApproachFivePublishingToDifferentSingleThreads(fluxFoo);
|
||||
}
|
||||
}
|
||||
+31
@@ -0,0 +1,31 @@
|
||||
package com.baeldung.reactive.debugging.consumer.model;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class Foo {
|
||||
|
||||
private Integer id;
|
||||
private String formattedName;
|
||||
private Integer quantity;
|
||||
|
||||
public Foo(FooDto dto) {
|
||||
this.id = randomId() == 0 ? null : dto.getId();
|
||||
this.formattedName = dto.getName();
|
||||
this.quantity = randomQuantity();
|
||||
}
|
||||
|
||||
private static int randomId() {
|
||||
return ThreadLocalRandom.current().nextInt(0, 100);
|
||||
}
|
||||
|
||||
private static int randomQuantity() {
|
||||
return ThreadLocalRandom.current().nextInt(0, 10);
|
||||
}
|
||||
}
|
||||
+15
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.reactive.debugging.consumer.model;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class FooDto {
|
||||
|
||||
private Integer id;
|
||||
private String name;
|
||||
|
||||
}
|
||||
+40
@@ -0,0 +1,40 @@
|
||||
package com.baeldung.reactive.debugging.consumer.service;
|
||||
|
||||
import com.baeldung.reactive.debugging.consumer.model.Foo;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
public class FooNameHelper {
|
||||
|
||||
public static Foo concatAndSubstringFooName(Foo foo) {
|
||||
Foo concat = concatFooName(foo);
|
||||
return substringFooName(concat);
|
||||
}
|
||||
|
||||
public static Foo concatFooName(Foo foo) {
|
||||
|
||||
int random = ThreadLocalRandom.current()
|
||||
.nextInt(0, 80);
|
||||
|
||||
String processedName = (random != 0)
|
||||
? foo.getFormattedName()
|
||||
: foo.getFormattedName() + "-bael";
|
||||
|
||||
foo.setFormattedName(processedName);
|
||||
return foo;
|
||||
}
|
||||
|
||||
public static Foo substringFooName(Foo foo) {
|
||||
int random = ThreadLocalRandom.current()
|
||||
.nextInt(0, 100);
|
||||
|
||||
String processedName = (random == 0)
|
||||
? foo.getFormattedName().substring(10, 15)
|
||||
: foo.getFormattedName().substring(0, 5);
|
||||
|
||||
foo.setFormattedName(processedName);
|
||||
|
||||
return foo;
|
||||
}
|
||||
|
||||
}
|
||||
+24
@@ -0,0 +1,24 @@
|
||||
package com.baeldung.reactive.debugging.consumer.service;
|
||||
|
||||
import com.baeldung.reactive.debugging.consumer.model.Foo;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
public class FooQuantityHelper {
|
||||
|
||||
public static Foo processFooReducingQuantity(Foo foo) {
|
||||
int random = ThreadLocalRandom.current().nextInt(0, 90);
|
||||
int result = (random == 0) ? 0 : foo.getQuantity() + 2;
|
||||
foo.setQuantity(result);
|
||||
|
||||
return divideFooQuantity(foo);
|
||||
}
|
||||
|
||||
public static Foo divideFooQuantity(Foo foo) {
|
||||
|
||||
Integer result = (int) Math.round(5.0 / foo.getQuantity());
|
||||
foo.setQuantity(result);
|
||||
return foo;
|
||||
}
|
||||
|
||||
}
|
||||
+24
@@ -0,0 +1,24 @@
|
||||
package com.baeldung.reactive.debugging.consumer.service;
|
||||
|
||||
import com.baeldung.reactive.debugging.consumer.model.Foo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class FooReporter {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FooReporter.class);
|
||||
|
||||
public static Foo reportResult(Foo foo, String approach) {
|
||||
if (foo.getId() == null) {
|
||||
throw new IllegalArgumentException("Null id is not valid!");
|
||||
}
|
||||
LOGGER.info("Reporting for approach {}: Foo with id '{}' name '{}' and quantity '{}'",
|
||||
approach, foo.getId(), foo.getFormattedName(), foo.getQuantity());
|
||||
|
||||
return foo;
|
||||
}
|
||||
|
||||
public static Foo reportResult(Foo input) {
|
||||
return reportResult(input, "default");
|
||||
}
|
||||
}
|
||||
+113
@@ -0,0 +1,113 @@
|
||||
package com.baeldung.reactive.debugging.consumer.service;
|
||||
|
||||
import com.baeldung.reactive.debugging.consumer.model.Foo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
import static com.baeldung.reactive.debugging.consumer.service.FooReporter.reportResult;
|
||||
|
||||
@Component
|
||||
public class FooService {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FooService.class);
|
||||
|
||||
public void processFoo(Flux<Foo> flux) {
|
||||
flux.map(FooNameHelper::concatFooName)
|
||||
.map(FooNameHelper::substringFooName)
|
||||
.log()
|
||||
.map(FooReporter::reportResult)
|
||||
.doOnError(error -> LOGGER.error("The following error happened on processFoo method!", error))
|
||||
.subscribe();
|
||||
}
|
||||
|
||||
public void processFooInAnotherScenario(Flux<Foo> flux) {
|
||||
flux.map(FooNameHelper::substringFooName)
|
||||
.map(FooQuantityHelper::divideFooQuantity)
|
||||
.subscribe();
|
||||
}
|
||||
|
||||
public void processUsingApproachOneWithErrorHandling(Flux<Foo> flux) {
|
||||
LOGGER.info("starting approach one w error handling!");
|
||||
|
||||
flux.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.map(FooNameHelper::substringFooName)
|
||||
.map(FooQuantityHelper::processFooReducingQuantity)
|
||||
.map(FooQuantityHelper::processFooReducingQuantity)
|
||||
.map(FooQuantityHelper::processFooReducingQuantity)
|
||||
.map(FooReporter::reportResult)
|
||||
.doOnError(error -> LOGGER.error("Approach 1 with Error Handling failed!", error))
|
||||
.subscribe();
|
||||
}
|
||||
|
||||
public void processUsingApproachThree(Flux<Foo> flux) {
|
||||
LOGGER.info("starting approach three!");
|
||||
|
||||
flux.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.map(foo -> reportResult(foo, "THREE"))
|
||||
.doOnError(error -> LOGGER.error("Approach 3 failed!", error))
|
||||
.subscribe();
|
||||
}
|
||||
|
||||
public void processUsingApproachFourWithCheckpoint(Flux<Foo> flux) {
|
||||
LOGGER.info("starting approach four!");
|
||||
|
||||
flux.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.checkpoint("CHECKPOINT 1")
|
||||
.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.map(FooQuantityHelper::divideFooQuantity)
|
||||
.checkpoint("CHECKPOINT 2", true)
|
||||
.map(foo -> reportResult(foo, "FOUR"))
|
||||
.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.doOnError(error -> LOGGER.error("Approach 4 failed!", error))
|
||||
.subscribe();
|
||||
}
|
||||
|
||||
public void processUsingApproachFourWithInitialCheckpoint(Flux<Foo> flux) {
|
||||
LOGGER.info("starting approach four!");
|
||||
|
||||
flux.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.checkpoint("CHECKPOINT 1", true)
|
||||
.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.map(FooQuantityHelper::divideFooQuantity)
|
||||
.map(foo -> reportResult(foo, "FOUR"))
|
||||
.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.doOnError(error -> LOGGER.error("Approach 4-2 failed!", error))
|
||||
.subscribe();
|
||||
}
|
||||
|
||||
public void processUsingApproachFivePublishingToDifferentParallelThreads(Flux<Foo> flux) {
|
||||
LOGGER.info("starting approach five-parallel!");
|
||||
|
||||
flux.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.publishOn(Schedulers.newParallel("five-parallel-foo"))
|
||||
.log()
|
||||
.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.map(foo -> reportResult(foo, "FIVE-PARALLEL"))
|
||||
.publishOn(Schedulers.newSingle("five-parallel-bar"))
|
||||
.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.doOnError(error -> LOGGER.error("Approach 5-parallel failed!", error))
|
||||
.subscribeOn(Schedulers.newParallel("five-parallel-starter"))
|
||||
.subscribe();
|
||||
}
|
||||
|
||||
public void processUsingApproachFivePublishingToDifferentSingleThreads(Flux<Foo> flux) {
|
||||
LOGGER.info("starting approach five-single!");
|
||||
|
||||
flux.log()
|
||||
.subscribeOn(Schedulers.newSingle("five-single-starter"))
|
||||
.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.publishOn(Schedulers.newSingle("five-single-foo"))
|
||||
.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.map(FooQuantityHelper::divideFooQuantity)
|
||||
.map(foo -> reportResult(foo, "FIVE-SINGLE"))
|
||||
.publishOn(Schedulers.newSingle("five-single-bar"))
|
||||
.map(FooNameHelper::concatAndSubstringFooName)
|
||||
.doOnError(error -> LOGGER.error("Approach 5-single failed!", error))
|
||||
.subscribe();
|
||||
}
|
||||
|
||||
}
|
||||
+30
@@ -0,0 +1,30 @@
|
||||
package com.baeldung.reactive.debugging.server;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.security.config.web.server.ServerHttpSecurity;
|
||||
import org.springframework.security.web.server.SecurityWebFilterChain;
|
||||
import org.springframework.web.reactive.config.EnableWebFlux;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
@EnableWebFlux
|
||||
@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
|
||||
public class ServerDebuggingApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication app = new SpringApplication(ServerDebuggingApplication.class);
|
||||
app.setDefaultProperties(Collections.singletonMap("server.port", "8081"));
|
||||
app.run(args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SecurityWebFilterChain debuggingServerSpringSecurityFilterChain(ServerHttpSecurity http) {
|
||||
http.authorizeExchange()
|
||||
.anyExchange()
|
||||
.permitAll();
|
||||
return http.build();
|
||||
}
|
||||
}
|
||||
+46
@@ -0,0 +1,46 @@
|
||||
package com.baeldung.reactive.debugging.server.handlers;
|
||||
|
||||
import com.baeldung.reactive.debugging.server.model.Foo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.reactive.function.server.ServerRequest;
|
||||
import org.springframework.web.reactive.function.server.ServerResponse;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
@Component
|
||||
public class ServerHandler {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ServerHandler.class);
|
||||
|
||||
public Mono<ServerResponse> useHandler(final ServerRequest request) {
|
||||
// there are chances that something goes wrong here...
|
||||
return ServerResponse.ok()
|
||||
.contentType(MediaType.TEXT_EVENT_STREAM)
|
||||
.body(getFlux(), Foo.class);
|
||||
}
|
||||
|
||||
public Mono<ServerResponse> useHandlerFinite(final ServerRequest request) {
|
||||
return ServerResponse.ok()
|
||||
.contentType(MediaType.TEXT_EVENT_STREAM)
|
||||
.body(Flux.range(0, 50)
|
||||
.map(sequence -> new Foo(new Long(sequence), "theFooNameNumber" + sequence)
|
||||
), Foo.class);
|
||||
}
|
||||
|
||||
private static Flux<Foo> getFlux() {
|
||||
return Flux.interval(Duration.ofSeconds(1))
|
||||
.map(sequence -> {
|
||||
LOGGER.info("retrieving Foo. Sequence: {}", sequence);
|
||||
if (ThreadLocalRandom.current().nextInt(0, 50) == 1) {
|
||||
throw new RuntimeException("There was an error retrieving the Foo!");
|
||||
}
|
||||
return new Foo(sequence, "name" + sequence);
|
||||
});
|
||||
}
|
||||
}
|
||||
+13
@@ -0,0 +1,13 @@
|
||||
package com.baeldung.reactive.debugging.server.model;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public class Foo {
|
||||
|
||||
private Long id;
|
||||
private String name;
|
||||
|
||||
}
|
||||
+21
@@ -0,0 +1,21 @@
|
||||
package com.baeldung.reactive.debugging.server.routers;
|
||||
|
||||
import com.baeldung.reactive.debugging.server.handlers.ServerHandler;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.reactive.function.server.RequestPredicates;
|
||||
import org.springframework.web.reactive.function.server.RouterFunction;
|
||||
import org.springframework.web.reactive.function.server.RouterFunctions;
|
||||
import org.springframework.web.reactive.function.server.ServerResponse;
|
||||
|
||||
@Configuration
|
||||
public class ServerRouter {
|
||||
|
||||
@Bean
|
||||
public RouterFunction<ServerResponse> responseRoute(@Autowired ServerHandler handler) {
|
||||
return RouterFunctions.route(RequestPredicates.GET("/functional-reactive/periodic-foo"), handler::useHandler)
|
||||
.andRoute(RequestPredicates.GET("/functional-reactive/periodic-foo-2"), handler::useHandlerFinite);
|
||||
}
|
||||
|
||||
}
|
||||
+18
@@ -0,0 +1,18 @@
|
||||
package com.baeldung.reactive.errorhandling;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.security.reactive.ReactiveSecurityAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.security.reactive.ReactiveUserDetailsServiceAutoConfiguration;
|
||||
|
||||
@SpringBootApplication(exclude = {
|
||||
MongoReactiveAutoConfiguration.class,
|
||||
ReactiveSecurityAutoConfiguration.class,
|
||||
ReactiveUserDetailsServiceAutoConfiguration.class })
|
||||
public class ErrorHandlingApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ErrorHandlingApplication.class, args);
|
||||
}
|
||||
}
|
||||
+22
@@ -0,0 +1,22 @@
|
||||
package com.baeldung.reactive.errorhandling;
|
||||
|
||||
import org.springframework.boot.web.error.ErrorAttributeOptions;
|
||||
import org.springframework.boot.web.reactive.error.DefaultErrorAttributes;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.reactive.function.server.ServerRequest;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Component
|
||||
public class GlobalErrorAttributes extends DefaultErrorAttributes {
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getErrorAttributes(ServerRequest request, ErrorAttributeOptions options) {
|
||||
Map<String, Object> map = super.getErrorAttributes(request, options);
|
||||
map.put("status", HttpStatus.BAD_REQUEST);
|
||||
map.put("message", "please provide a name");
|
||||
return map;
|
||||
}
|
||||
|
||||
}
|
||||
+48
@@ -0,0 +1,48 @@
|
||||
package com.baeldung.reactive.errorhandling;
|
||||
|
||||
import org.springframework.boot.autoconfigure.web.WebProperties;
|
||||
import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler;
|
||||
import org.springframework.boot.web.error.ErrorAttributeOptions;
|
||||
import org.springframework.boot.web.reactive.error.ErrorAttributes;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.codec.ServerCodecConfigurer;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.reactive.function.BodyInserters;
|
||||
import org.springframework.web.reactive.function.server.RequestPredicates;
|
||||
import org.springframework.web.reactive.function.server.RouterFunction;
|
||||
import org.springframework.web.reactive.function.server.RouterFunctions;
|
||||
import org.springframework.web.reactive.function.server.ServerRequest;
|
||||
import org.springframework.web.reactive.function.server.ServerResponse;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Component
|
||||
@Order(-2)
|
||||
public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler {
|
||||
|
||||
public GlobalErrorWebExceptionHandler(GlobalErrorAttributes g, ApplicationContext applicationContext,
|
||||
ServerCodecConfigurer serverCodecConfigurer) {
|
||||
super(g, new WebProperties.Resources(), applicationContext);
|
||||
super.setMessageWriters(serverCodecConfigurer.getWriters());
|
||||
super.setMessageReaders(serverCodecConfigurer.getReaders());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RouterFunction<ServerResponse> getRoutingFunction(final ErrorAttributes errorAttributes) {
|
||||
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
|
||||
}
|
||||
|
||||
private Mono<ServerResponse> renderErrorResponse(final ServerRequest request) {
|
||||
|
||||
final Map<String, Object> errorPropertiesMap = getErrorAttributes(request, ErrorAttributeOptions.defaults());
|
||||
|
||||
return ServerResponse.status(HttpStatus.BAD_REQUEST)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.body(BodyInserters.fromValue(errorPropertiesMap));
|
||||
}
|
||||
|
||||
}
|
||||
+67
@@ -0,0 +1,67 @@
|
||||
package com.baeldung.reactive.errorhandling;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.reactive.function.server.ServerRequest;
|
||||
import org.springframework.web.reactive.function.server.ServerResponse;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@Component
|
||||
public class Handler {
|
||||
|
||||
public Mono<ServerResponse> handleWithErrorReturn(ServerRequest request) {
|
||||
return sayHello(request)
|
||||
.onErrorReturn("Hello, Stranger")
|
||||
.flatMap(s -> ServerResponse.ok()
|
||||
.contentType(MediaType.TEXT_PLAIN)
|
||||
.bodyValue(s));
|
||||
}
|
||||
|
||||
public Mono<ServerResponse> handleWithErrorResumeAndDynamicFallback(ServerRequest request) {
|
||||
return sayHello(request)
|
||||
.flatMap(s -> ServerResponse.ok()
|
||||
.contentType(MediaType.TEXT_PLAIN)
|
||||
.bodyValue(s))
|
||||
.onErrorResume(e -> (Mono.just("Hi, I looked around for your name but found: " + e.getMessage()))
|
||||
.flatMap(s -> ServerResponse.ok()
|
||||
.contentType(MediaType.TEXT_PLAIN)
|
||||
.bodyValue(s)));
|
||||
}
|
||||
|
||||
public Mono<ServerResponse> handleWithErrorResumeAndFallbackMethod(ServerRequest request) {
|
||||
return sayHello(request)
|
||||
.flatMap(s -> ServerResponse.ok()
|
||||
.contentType(MediaType.TEXT_PLAIN)
|
||||
.bodyValue(s))
|
||||
.onErrorResume(e -> sayHelloFallback()
|
||||
.flatMap(s -> ServerResponse.ok()
|
||||
.contentType(MediaType.TEXT_PLAIN)
|
||||
.bodyValue(s)));
|
||||
}
|
||||
|
||||
public Mono<ServerResponse> handleWithErrorResumeAndCustomException(ServerRequest request) {
|
||||
return ServerResponse.ok()
|
||||
.body(sayHello(request)
|
||||
.onErrorResume(e -> Mono.error(new NameRequiredException(
|
||||
HttpStatus.BAD_REQUEST,
|
||||
"please provide a name", e))), String.class);
|
||||
}
|
||||
|
||||
public Mono<ServerResponse> handleWithGlobalErrorHandler(ServerRequest request) {
|
||||
return ServerResponse.ok()
|
||||
.body(sayHello(request), String.class);
|
||||
}
|
||||
|
||||
private Mono<String> sayHello(ServerRequest request) {
|
||||
try {
|
||||
return Mono.just("Hello, " + request.queryParam("name").get());
|
||||
} catch (Exception e) {
|
||||
return Mono.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Mono<String> sayHelloFallback() {
|
||||
return Mono.just("Hello, Stranger");
|
||||
}
|
||||
}
|
||||
+11
@@ -0,0 +1,11 @@
|
||||
package com.baeldung.reactive.errorhandling;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.web.server.ResponseStatusException;
|
||||
|
||||
public class NameRequiredException extends ResponseStatusException {
|
||||
|
||||
public NameRequiredException(HttpStatus status, String message, Throwable e) {
|
||||
super(status, message, e);
|
||||
}
|
||||
}
|
||||
+26
@@ -0,0 +1,26 @@
|
||||
package com.baeldung.reactive.errorhandling;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.reactive.function.server.RouterFunction;
|
||||
import org.springframework.web.reactive.function.server.RouterFunctions;
|
||||
import org.springframework.web.reactive.function.server.ServerResponse;
|
||||
|
||||
import static org.springframework.http.MediaType.TEXT_PLAIN;
|
||||
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
|
||||
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
|
||||
|
||||
@Component
|
||||
public class Router {
|
||||
|
||||
@Bean
|
||||
public RouterFunction<ServerResponse> routes(Handler handler) {
|
||||
return RouterFunctions
|
||||
.route(GET("/api/endpoint1").and(accept(TEXT_PLAIN)), handler::handleWithErrorReturn)
|
||||
.andRoute(GET("/api/endpoint2").and(accept(TEXT_PLAIN)), handler::handleWithErrorResumeAndFallbackMethod)
|
||||
.andRoute(GET("/api/endpoint3").and(accept(TEXT_PLAIN)), handler::handleWithErrorResumeAndDynamicFallback)
|
||||
.andRoute(GET("/api/endpoint4").and(accept(TEXT_PLAIN)), handler::handleWithErrorResumeAndCustomException)
|
||||
.andRoute(GET("/api/endpoint5").and(accept(TEXT_PLAIN)), handler::handleWithGlobalErrorHandler);
|
||||
}
|
||||
|
||||
}
|
||||
+37
@@ -0,0 +1,37 @@
|
||||
package com.baeldung.reactive.security;
|
||||
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.security.Principal;
|
||||
|
||||
@RestController
|
||||
public class GreetingController {
|
||||
|
||||
private final GreetingService greetingService;
|
||||
|
||||
public GreetingController(GreetingService greetingService) {
|
||||
this.greetingService = greetingService;
|
||||
}
|
||||
|
||||
@GetMapping("/")
|
||||
public Mono<String> greet(Mono<Principal> principal) {
|
||||
return principal
|
||||
.map(Principal::getName)
|
||||
.map(name -> String.format("Hello, %s", name));
|
||||
}
|
||||
|
||||
@GetMapping("/admin")
|
||||
public Mono<String> greetAdmin(Mono<Principal> principal) {
|
||||
return principal
|
||||
.map(Principal::getName)
|
||||
.map(name -> String.format("Admin access: %s", name));
|
||||
}
|
||||
|
||||
@GetMapping("/greetingService")
|
||||
public Mono<String> greetingService() {
|
||||
return greetingService.greet();
|
||||
}
|
||||
|
||||
}
|
||||
+15
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.reactive.security;
|
||||
|
||||
import org.springframework.security.access.prepost.PreAuthorize;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@Service
|
||||
public class GreetingService {
|
||||
|
||||
@PreAuthorize("hasRole('ADMIN')")
|
||||
public Mono<String> greet() {
|
||||
return Mono.just("Hello from service!");
|
||||
}
|
||||
|
||||
}
|
||||
+52
@@ -0,0 +1,52 @@
|
||||
package com.baeldung.reactive.security;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
|
||||
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
|
||||
import org.springframework.security.config.web.server.ServerHttpSecurity;
|
||||
import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
|
||||
import org.springframework.security.core.userdetails.User;
|
||||
import org.springframework.security.core.userdetails.UserDetails;
|
||||
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
|
||||
import org.springframework.security.crypto.password.PasswordEncoder;
|
||||
import org.springframework.security.web.server.SecurityWebFilterChain;
|
||||
|
||||
@EnableWebFluxSecurity
|
||||
@EnableReactiveMethodSecurity
|
||||
public class SecurityConfig {
|
||||
|
||||
@Bean
|
||||
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
|
||||
return http.authorizeExchange()
|
||||
.pathMatchers("/admin").hasAuthority("ROLE_ADMIN")
|
||||
.anyExchange().authenticated()
|
||||
.and()
|
||||
.formLogin()
|
||||
.and()
|
||||
.csrf().disable()
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MapReactiveUserDetailsService userDetailsService() {
|
||||
UserDetails user = User
|
||||
.withUsername("user")
|
||||
.password(passwordEncoder().encode("password"))
|
||||
.roles("USER")
|
||||
.build();
|
||||
|
||||
UserDetails admin = User
|
||||
.withUsername("admin")
|
||||
.password(passwordEncoder().encode("password"))
|
||||
.roles("ADMIN")
|
||||
.build();
|
||||
|
||||
return new MapReactiveUserDetailsService(user, admin);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public PasswordEncoder passwordEncoder() {
|
||||
return new BCryptPasswordEncoder();
|
||||
}
|
||||
|
||||
}
|
||||
+34
@@ -0,0 +1,34 @@
|
||||
package com.baeldung.reactive.security;
|
||||
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.http.server.reactive.HttpHandler;
|
||||
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
|
||||
import org.springframework.web.reactive.config.EnableWebFlux;
|
||||
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
|
||||
import reactor.netty.DisposableServer;
|
||||
import reactor.netty.http.server.HttpServer;
|
||||
|
||||
@ComponentScan(basePackages = {"com.baeldung.reactive.security"})
|
||||
@EnableWebFlux
|
||||
public class SpringSecurity5Application {
|
||||
|
||||
public static void main(String[] args) {
|
||||
try (AnnotationConfigApplicationContext context =
|
||||
new AnnotationConfigApplicationContext(SpringSecurity5Application.class)) {
|
||||
context.getBean(DisposableServer.class).onDispose().block();
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DisposableServer disposableServer(ApplicationContext context) {
|
||||
HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context)
|
||||
.build();
|
||||
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
|
||||
HttpServer httpServer = HttpServer.create().host("localhost").port(8083);
|
||||
return httpServer.handle(adapter).bindNow();
|
||||
}
|
||||
|
||||
}
|
||||
+14
@@ -0,0 +1,14 @@
|
||||
package com.baeldung.reactive.webclient;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class Foo {
|
||||
|
||||
private String name;
|
||||
|
||||
}
|
||||
+13
@@ -0,0 +1,13 @@
|
||||
package com.baeldung.reactive.webclient;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class Tweet {
|
||||
private String text;
|
||||
private String username;
|
||||
}
|
||||
+20
@@ -0,0 +1,20 @@
|
||||
package com.baeldung.reactive.webclient;
|
||||
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
@RestController
|
||||
public class TweetsSlowServiceController {
|
||||
|
||||
@GetMapping("/slow-service-tweets")
|
||||
private List<Tweet> getAllTweets() throws Exception {
|
||||
Thread.sleep(2000L); // delay
|
||||
return Arrays.asList(
|
||||
new Tweet("RestTemplate rules", "@user1"),
|
||||
new Tweet("WebClient is better", "@user2"),
|
||||
new Tweet("OK, both are useful", "@user1"));
|
||||
}
|
||||
}
|
||||
+24
@@ -0,0 +1,24 @@
|
||||
package com.baeldung.reactive.webclient;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.security.config.web.server.ServerHttpSecurity;
|
||||
import org.springframework.security.web.server.SecurityWebFilterChain;
|
||||
|
||||
@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
|
||||
public class WebClientApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(WebClientApplication.class, args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SecurityWebFilterChain filterChain(ServerHttpSecurity http) {
|
||||
http.csrf().disable()
|
||||
.authorizeExchange()
|
||||
.anyExchange().permitAll();
|
||||
return http.build();
|
||||
}
|
||||
}
|
||||
+46
@@ -0,0 +1,46 @@
|
||||
package com.baeldung.reactive.webclient;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestPart;
|
||||
import org.springframework.web.bind.annotation.ResponseStatus;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@RestController
|
||||
public class WebClientController {
|
||||
|
||||
@ResponseStatus(HttpStatus.OK)
|
||||
@GetMapping("/resource")
|
||||
public Map<String, String> getResource() {
|
||||
Map<String, String> response = new HashMap<>();
|
||||
response.put("field", "value");
|
||||
return response;
|
||||
}
|
||||
|
||||
@PostMapping("/resource")
|
||||
public Mono<String> postStringResource(@RequestBody Mono<String> bodyString) {
|
||||
return bodyString.map(body -> "processed-" + body);
|
||||
}
|
||||
|
||||
@PostMapping("/resource-override")
|
||||
public Mono<String> postStringResourceOverride(@RequestBody Mono<String> bodyString) {
|
||||
return bodyString.map(body -> "override-processed-" + body);
|
||||
}
|
||||
|
||||
@PostMapping("/resource-foo")
|
||||
public Mono<String> postFooResource(@RequestBody Mono<Foo> bodyFoo) {
|
||||
return bodyFoo.map(foo -> "processedFoo-" + foo.getName());
|
||||
}
|
||||
|
||||
@PostMapping(value = "/resource-multipart", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
|
||||
public String handleFormUpload(@RequestPart("key1") String value1, @RequestPart("key2") String value2) {
|
||||
return "processed-" + value1 + '-' + value2;
|
||||
}
|
||||
}
|
||||
+60
@@ -0,0 +1,60 @@
|
||||
package com.baeldung.reactive.webclient;
|
||||
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@RestController
|
||||
public class WebController {
|
||||
|
||||
private static final int DEFAULT_PORT = 8080;
|
||||
|
||||
@Setter
|
||||
private int serverPort = DEFAULT_PORT;
|
||||
|
||||
@GetMapping("/tweets-blocking")
|
||||
public List<Tweet> getTweetsBlocking() {
|
||||
log.info("Starting BLOCKING Controller!");
|
||||
final String uri = getSlowServiceUri();
|
||||
|
||||
RestTemplate restTemplate = new RestTemplate();
|
||||
ResponseEntity<List<Tweet>> response = restTemplate.exchange(
|
||||
uri, HttpMethod.GET, null,
|
||||
new ParameterizedTypeReference<List<Tweet>>(){});
|
||||
|
||||
List<Tweet> result = response.getBody();
|
||||
result.forEach(tweet -> log.info(tweet.toString()));
|
||||
log.info("Exiting BLOCKING Controller!");
|
||||
return result;
|
||||
}
|
||||
|
||||
@GetMapping(value = "/tweets-non-blocking", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
|
||||
public Flux<Tweet> getTweetsNonBlocking() {
|
||||
log.info("Starting NON-BLOCKING Controller!");
|
||||
Flux<Tweet> tweetFlux = WebClient.create()
|
||||
.get()
|
||||
.uri(getSlowServiceUri())
|
||||
.retrieve()
|
||||
.bodyToFlux(Tweet.class);
|
||||
|
||||
tweetFlux.subscribe(tweet -> log.info(tweet.toString()));
|
||||
log.info("Exiting NON-BLOCKING Controller!");
|
||||
return tweetFlux;
|
||||
}
|
||||
|
||||
private String getSlowServiceUri() {
|
||||
return "http://localhost:" + serverPort + "/slow-service-tweets";
|
||||
}
|
||||
|
||||
}
|
||||
+13
@@ -0,0 +1,13 @@
|
||||
package com.baeldung.reactive.webclientrequests;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
|
||||
|
||||
@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
|
||||
public class SpringWebClientRequestsApp {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(SpringWebClientRequestsApp.class, args);
|
||||
}
|
||||
}
|
||||
+15
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.reactive.webflux;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class Employee {
|
||||
|
||||
private String id;
|
||||
private String name;
|
||||
|
||||
}
|
||||
+44
@@ -0,0 +1,44 @@
|
||||
package com.baeldung.reactive.webflux;
|
||||
|
||||
import org.springframework.stereotype.Repository;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Repository
|
||||
public class EmployeeRepository {
|
||||
|
||||
private static final Map<String, Employee> EMPLOYEE_DATA;
|
||||
|
||||
static {
|
||||
EMPLOYEE_DATA = new HashMap<>();
|
||||
EMPLOYEE_DATA.put("1", new Employee("1", "Employee 1"));
|
||||
EMPLOYEE_DATA.put("2", new Employee("2", "Employee 2"));
|
||||
EMPLOYEE_DATA.put("3", new Employee("3", "Employee 3"));
|
||||
EMPLOYEE_DATA.put("4", new Employee("4", "Employee 4"));
|
||||
EMPLOYEE_DATA.put("5", new Employee("5", "Employee 5"));
|
||||
EMPLOYEE_DATA.put("6", new Employee("6", "Employee 6"));
|
||||
EMPLOYEE_DATA.put("7", new Employee("7", "Employee 7"));
|
||||
EMPLOYEE_DATA.put("8", new Employee("8", "Employee 8"));
|
||||
EMPLOYEE_DATA.put("9", new Employee("9", "Employee 9"));
|
||||
EMPLOYEE_DATA.put("10", new Employee("10", "Employee 10"));
|
||||
}
|
||||
|
||||
public Mono<Employee> findEmployeeById(String id) {
|
||||
return Mono.just(EMPLOYEE_DATA.get(id));
|
||||
}
|
||||
|
||||
public Flux<Employee> findAllEmployees() {
|
||||
return Flux.fromIterable(EMPLOYEE_DATA.values());
|
||||
}
|
||||
|
||||
public Mono<Employee> updateEmployee(Employee employee) {
|
||||
Employee existingEmployee = EMPLOYEE_DATA.get(employee.getId());
|
||||
if (existingEmployee != null) {
|
||||
existingEmployee.setName(employee.getName());
|
||||
}
|
||||
return Mono.just(existingEmployee);
|
||||
}
|
||||
}
|
||||
+39
@@ -0,0 +1,39 @@
|
||||
package com.baeldung.reactive.webflux.annotation;
|
||||
|
||||
import com.baeldung.reactive.webflux.Employee;
|
||||
import com.baeldung.reactive.webflux.EmployeeRepository;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/employees")
|
||||
public class EmployeeController {
|
||||
|
||||
private final EmployeeRepository employeeRepository;
|
||||
|
||||
public EmployeeController(EmployeeRepository employeeRepository) {
|
||||
this.employeeRepository = employeeRepository;
|
||||
}
|
||||
|
||||
@GetMapping("/{id}")
|
||||
private Mono<Employee> getEmployeeById(@PathVariable String id) {
|
||||
return employeeRepository.findEmployeeById(id);
|
||||
}
|
||||
|
||||
@GetMapping
|
||||
private Flux<Employee> getAllEmployees() {
|
||||
return employeeRepository.findAllEmployees();
|
||||
}
|
||||
|
||||
@PostMapping("/update")
|
||||
private Mono<Employee> updateEmployee(@RequestBody Employee employee) {
|
||||
return employeeRepository.updateEmployee(employee);
|
||||
}
|
||||
|
||||
}
|
||||
+24
@@ -0,0 +1,24 @@
|
||||
package com.baeldung.reactive.webflux.annotation;
|
||||
|
||||
import com.baeldung.reactive.webflux.EmployeeRepository;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
||||
@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
|
||||
public class EmployeeSpringApplication {
|
||||
|
||||
@Bean
|
||||
EmployeeRepository employeeRepository() {
|
||||
return new EmployeeRepository();
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(EmployeeSpringApplication.class, args);
|
||||
|
||||
EmployeeWebClient employeeWebClient = new EmployeeWebClient();
|
||||
employeeWebClient.consume();
|
||||
}
|
||||
|
||||
}
|
||||
+32
@@ -0,0 +1,32 @@
|
||||
package com.baeldung.reactive.webflux.annotation;
|
||||
|
||||
import com.baeldung.reactive.webflux.Employee;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class EmployeeWebClient {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(EmployeeWebClient.class);
|
||||
|
||||
WebClient client = WebClient.create("http://localhost:8080");
|
||||
|
||||
public void consume() {
|
||||
|
||||
Mono<Employee> employeeMono = client.get()
|
||||
.uri("/employees/{id}", "1")
|
||||
.retrieve()
|
||||
.bodyToMono(Employee.class);
|
||||
|
||||
employeeMono.subscribe(employee -> LOGGER.info("Employee: {}", employee));
|
||||
|
||||
Flux<Employee> employeeFlux = client.get()
|
||||
.uri("/employees")
|
||||
.retrieve()
|
||||
.bodyToFlux(Employee.class);
|
||||
|
||||
employeeFlux.subscribe(employee -> LOGGER.info("Employee: {}", employee));
|
||||
}
|
||||
}
|
||||
+43
@@ -0,0 +1,43 @@
|
||||
package com.baeldung.reactive.webflux.annotation;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
|
||||
import org.springframework.security.config.web.server.ServerHttpSecurity;
|
||||
import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
|
||||
import org.springframework.security.core.userdetails.User;
|
||||
import org.springframework.security.core.userdetails.UserDetails;
|
||||
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
|
||||
import org.springframework.security.crypto.password.PasswordEncoder;
|
||||
import org.springframework.security.web.server.SecurityWebFilterChain;
|
||||
|
||||
@EnableWebFluxSecurity
|
||||
public class EmployeeWebSecurityConfig {
|
||||
|
||||
@Bean
|
||||
public MapReactiveUserDetailsService userDetailsService() {
|
||||
UserDetails user = User
|
||||
.withUsername("admin")
|
||||
.password(passwordEncoder().encode("password"))
|
||||
.roles("ADMIN")
|
||||
.build();
|
||||
return new MapReactiveUserDetailsService(user);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
|
||||
http
|
||||
.csrf().disable()
|
||||
.authorizeExchange()
|
||||
.pathMatchers(HttpMethod.POST, "/employees/update").hasRole("ADMIN")
|
||||
.pathMatchers("/**").permitAll()
|
||||
.and()
|
||||
.httpBasic();
|
||||
return http.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public PasswordEncoder passwordEncoder() {
|
||||
return new BCryptPasswordEncoder();
|
||||
}
|
||||
}
|
||||
+63
@@ -0,0 +1,63 @@
|
||||
package com.baeldung.reactive.webflux.functional;
|
||||
|
||||
import com.baeldung.reactive.webflux.Employee;
|
||||
import com.baeldung.reactive.webflux.EmployeeRepository;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.security.config.web.server.ServerHttpSecurity;
|
||||
import org.springframework.security.web.server.SecurityWebFilterChain;
|
||||
import org.springframework.web.reactive.function.server.RouterFunction;
|
||||
import org.springframework.web.reactive.function.server.ServerResponse;
|
||||
|
||||
import static org.springframework.web.reactive.function.BodyExtractors.toMono;
|
||||
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
|
||||
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
|
||||
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
|
||||
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
|
||||
|
||||
@Configuration
|
||||
public class EmployeeFunctionalConfig {
|
||||
|
||||
@Bean
|
||||
EmployeeRepository employeeRepository() {
|
||||
return new EmployeeRepository();
|
||||
}
|
||||
|
||||
@Bean
|
||||
RouterFunction<ServerResponse> getAllEmployeesRoute() {
|
||||
return route(GET("/employees"), req -> ok().body(employeeRepository().findAllEmployees(), Employee.class));
|
||||
}
|
||||
|
||||
@Bean
|
||||
RouterFunction<ServerResponse> getEmployeeByIdRoute() {
|
||||
return route(GET("/employees/{id}"), req -> ok().body(employeeRepository().findEmployeeById(req.pathVariable("id")), Employee.class));
|
||||
}
|
||||
|
||||
@Bean
|
||||
RouterFunction<ServerResponse> updateEmployeeRoute() {
|
||||
return route(POST("/employees/update"), req -> req.body(toMono(Employee.class))
|
||||
.doOnNext(employeeRepository()::updateEmployee)
|
||||
.then(ok().build()));
|
||||
}
|
||||
|
||||
@Bean
|
||||
RouterFunction<ServerResponse> composedRoutes() {
|
||||
return route(GET("/employees"), req -> ok().body(employeeRepository().findAllEmployees(), Employee.class))
|
||||
|
||||
.and(route(GET("/employees/{id}"), req -> ok().body(employeeRepository().findEmployeeById(req.pathVariable("id")), Employee.class)))
|
||||
|
||||
.and(route(POST("/employees/update"), req -> req.body(toMono(Employee.class))
|
||||
.doOnNext(employeeRepository()::updateEmployee)
|
||||
.then(ok().build())));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
|
||||
http.csrf()
|
||||
.disable()
|
||||
.authorizeExchange()
|
||||
.anyExchange()
|
||||
.permitAll();
|
||||
return http.build();
|
||||
}
|
||||
}
|
||||
+14
@@ -0,0 +1,14 @@
|
||||
package com.baeldung.reactive.webflux.functional;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
|
||||
|
||||
@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
|
||||
public class EmployeeSpringFunctionalApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(EmployeeSpringFunctionalApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user