JAVA-1525: Moved 1 article to reactive-2 module

This commit is contained in:
sampadawagde
2020-07-30 18:07:49 +05:30
parent 06ac0b5589
commit 4dc2535bc5
5 changed files with 3 additions and 1 deletions
@@ -1,31 +0,0 @@
package com.baeldung.reactive.serversentevents.consumer;
import java.util.Collections;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
@SpringBootApplication(exclude = { RedisReactiveAutoConfiguration.class })
@EnableAsync
public class ConsumerSSEApplication {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(ConsumerSSEApplication.class);
app.setDefaultProperties(Collections.singletonMap("server.port", "8082"));
app.run(args);
}
@Bean
public SecurityWebFilterChain sseConsumerSpringSecurityFilterChain(ServerHttpSecurity http) {
http.authorizeExchange()
.anyExchange()
.permitAll();
return http.build();
}
}
@@ -1,83 +0,0 @@
package com.baeldung.reactive.serversentevents.consumer.controller;
import java.time.LocalTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/sse-consumer")
public class ClientController {
private static Logger logger = LoggerFactory.getLogger(ClientController.class);
private WebClient client = WebClient.create("http://localhost:8081/sse-server");
@GetMapping("/launch-sse-client")
public String launchSSEFromSSEWebClient() {
consumeSSE();
return "LAUNCHED EVENT CLIENT!!! Check the logs...";
}
@GetMapping("/launch-flux-client")
public String launchcFluxFromSSEWebClient() {
consumeFlux();
return "LAUNCHED EVENT CLIENT!!! Check the logs...";
}
@GetMapping("/launch-sse-from-flux-endpoint-client")
public String launchFluxFromFluxWebClient() {
consumeSSEFromFluxEndpoint();
return "LAUNCHED EVENT CLIENT!!! Check the logs...";
}
@Async
public void consumeSSE() {
ParameterizedTypeReference<ServerSentEvent<String>> type = new ParameterizedTypeReference<ServerSentEvent<String>>() {
};
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/stream-sse")
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(content -> logger.info("Current time: {} - Received SSE: name[{}], id [{}], content[{}] ", LocalTime.now(), content.event(), content.id(), content.data()), error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
}
@Async
public void consumeFlux() {
Flux<String> stringStream = client.get()
.uri("/stream-flux")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class);
stringStream.subscribe(content -> logger.info("Current time: {} - Received content: {} ", LocalTime.now(), content), error -> logger.error("Error retrieving content: {}", error), () -> logger.info("Completed!!!"));
}
@Async
public void consumeSSEFromFluxEndpoint() {
ParameterizedTypeReference<ServerSentEvent<String>> type = new ParameterizedTypeReference<ServerSentEvent<String>>() {
};
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/stream-flux")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(content -> logger.info("Current time: {} - Received SSE: name[{}], id [{}], content[{}] ", LocalTime.now(), content.event(), content.id(), content.data()), error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
}
}
@@ -1,29 +0,0 @@
package com.baeldung.reactive.serversentevents.server;
import java.util.Collections;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
@SpringBootApplication(exclude = { RedisReactiveAutoConfiguration.class })
public class ServerSSEApplication {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(ServerSSEApplication.class);
app.setDefaultProperties(Collections.singletonMap("server.port", "8081"));
app.run(args);
}
@Bean
public SecurityWebFilterChain sseServerSpringSecurityFilterChain(ServerHttpSecurity http) {
http.authorizeExchange()
.anyExchange()
.permitAll();
return http.build();
}
}
@@ -1,35 +0,0 @@
package com.baeldung.reactive.serversentevents.server.controllers;
import java.time.Duration;
import java.time.LocalTime;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
@RequestMapping("/sse-server")
public class ServerController {
@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("SSE - " + LocalTime.now()
.toString())
.build());
}
@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Flux - " + LocalTime.now()
.toString());
}
}