diff --git a/parent-spring-6/pom.xml b/parent-spring-6/pom.xml index 77afe2072a..7b61ac524b 100644 --- a/parent-spring-6/pom.xml +++ b/parent-spring-6/pom.xml @@ -1,7 +1,7 @@ + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 parent-spring-6 0.0.1-SNAPSHOT diff --git a/pom.xml b/pom.xml index 060f158888..e9a6da387b 100644 --- a/pom.xml +++ b/pom.xml @@ -892,6 +892,7 @@ spring-5 spring-5-webflux spring-5-webflux-2 + spring-6-rsocket spring-activiti spring-actuator spring-core-2 diff --git a/spring-6-rsocket/README.md b/spring-6-rsocket/README.md new file mode 100644 index 0000000000..21ab282bd1 --- /dev/null +++ b/spring-6-rsocket/README.md @@ -0,0 +1,7 @@ +## RSocket + +This module contains articles about RSocket in Spring Framework 6. + +### Relevant articles + +- [Introduction to RSocket](#) \ No newline at end of file diff --git a/spring-6-rsocket/pom.xml b/spring-6-rsocket/pom.xml new file mode 100644 index 0000000000..5d15a605ae --- /dev/null +++ b/spring-6-rsocket/pom.xml @@ -0,0 +1,74 @@ + + + 4.0.0 + com.bealdung + rsocket + 0.0.1-SNAPSHOT + rsocket + + + com.baeldung + parent-spring-6 + 0.0.1-SNAPSHOT + ../parent-spring-6 + + + + + + org.springframework.boot + spring-boot-starter-rsocket + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit + junit-bom + ${junit-jupiter.version} + pom + import + + + ch.qos.logback + logback-classic + ${logback.version} + + + ch.qos.logback + logback-core + ${logback.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + jcl-over-slf4j + ${slf4j.version} + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + + + 3.1.3 + 1.4.11 + 2.0.9 + + diff --git a/spring-6-rsocket/src/main/java/com/bealdung/rsocket/requester/MessageClient.java b/spring-6-rsocket/src/main/java/com/bealdung/rsocket/requester/MessageClient.java new file mode 100644 index 0000000000..8fed6bee9b --- /dev/null +++ b/spring-6-rsocket/src/main/java/com/bealdung/rsocket/requester/MessageClient.java @@ -0,0 +1,21 @@ +package com.bealdung.rsocket.requester; + +import org.springframework.messaging.rsocket.service.RSocketExchange; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface MessageClient { + + @RSocketExchange("MyDestination") + Mono sendMessage(Mono input); + + @RSocketExchange("Counter") + Flux Counter(); + + @RSocketExchange("Warning") + Mono Warning(Mono warning); + + @RSocketExchange("channel") + Flux channel(Flux input); +} diff --git a/spring-6-rsocket/src/main/java/com/bealdung/rsocket/responder/MessageController.java b/spring-6-rsocket/src/main/java/com/bealdung/rsocket/responder/MessageController.java new file mode 100644 index 0000000000..a2cfcc69ca --- /dev/null +++ b/spring-6-rsocket/src/main/java/com/bealdung/rsocket/responder/MessageController.java @@ -0,0 +1,45 @@ +package com.bealdung.rsocket.responder; + +import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.stereotype.Controller; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Controller +public class MessageController { + + @MessageMapping("MyDestination") + public Mono message(Mono input) { + return input.doOnNext(msg -> System.out.println("Request is:" + msg + ",Request!")) + .map(msg -> msg + ",Response!"); + } + + @MessageMapping("Counter") + public Flux Counter() { + return Flux.range(1, 10) + .map(i -> "Count is: " + i); + } + + @MessageMapping("Warning") + public Mono Warning(Mono error) { + error.doOnNext(e -> System.out.println("warning is :" + e)) + .subscribe(); + return Mono.empty(); + } + + @MessageMapping("channel") + public Flux channel(Flux input) { + return input.doOnNext(i -> { + System.out.println("Received message is : " + i); + }) + .map(m -> m.toUpperCase()) + .doOnNext(r -> { + System.out.println("RESPONSE IS :" + r); + }); + } + +} + + + diff --git a/spring-6-rsocket/src/main/java/com/bealdung/rsocket/responder/RSocketApplication.java b/spring-6-rsocket/src/main/java/com/bealdung/rsocket/responder/RSocketApplication.java new file mode 100644 index 0000000000..9e763007fe --- /dev/null +++ b/spring-6-rsocket/src/main/java/com/bealdung/rsocket/responder/RSocketApplication.java @@ -0,0 +1,73 @@ +package com.bealdung.rsocket.responder; + +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.rsocket.RSocketRequester; +import org.springframework.messaging.rsocket.service.RSocketServiceProxyFactory; + +import com.bealdung.rsocket.requester.MessageClient; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@SpringBootApplication +public class RSocketApplication { + public static void main(String[] args) { + SpringApplication.run(RSocketApplication.class, args); + } + + @Bean + public RSocketServiceProxyFactory getRSocketServiceProxyFactory(RSocketRequester.Builder requestBuilder) { + RSocketRequester requester = requestBuilder.tcp("localhost", 7000); + return RSocketServiceProxyFactory.builder(requester) + .build(); + } + + @Bean + public MessageClient getClient(RSocketServiceProxyFactory factory) { + return factory.createClient(MessageClient.class); + } + + @Bean + public ApplicationRunner runRequestResponseModel(MessageClient client) { + return args -> { + client.sendMessage(Mono.just("Request-Response test ")) + .doOnNext(message -> { + System.out.println("Response is :" + message); + }) + .subscribe(); + }; + } + + @Bean + public ApplicationRunner runStreamModel(MessageClient client) { + return args -> { + client.Counter() + .doOnNext(t -> { + System.out.println("message is :" + t); + }) + .subscribe(); + }; + } + + @Bean + public ApplicationRunner runFireAndForget(MessageClient client) { + return args -> { + client.Warning(Mono.just("Important Warning")) + .subscribe(); + }; + } + + @Bean + public ApplicationRunner runChannel(MessageClient client) { + return args -> { + client.channel(Flux.just("a", "b", "c", "d", "e")) + .doOnNext(i -> { + System.out.println(i); + }) + .subscribe(); + }; + } +} \ No newline at end of file diff --git a/spring-6-rsocket/src/main/resources/application.properties b/spring-6-rsocket/src/main/resources/application.properties new file mode 100644 index 0000000000..cab786cd30 --- /dev/null +++ b/spring-6-rsocket/src/main/resources/application.properties @@ -0,0 +1 @@ +spring.rsocket.server.port=7000 \ No newline at end of file diff --git a/spring-6-rsocket/src/test/java/com/bealdung/rsocket/RSocketRequestResponseIntegrationTest.java b/spring-6-rsocket/src/test/java/com/bealdung/rsocket/RSocketRequestResponseIntegrationTest.java new file mode 100644 index 0000000000..4b85c4c6fc --- /dev/null +++ b/spring-6-rsocket/src/test/java/com/bealdung/rsocket/RSocketRequestResponseIntegrationTest.java @@ -0,0 +1,35 @@ +package com.bealdung.rsocket; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.messaging.rsocket.RSocketRequester; +import org.springframework.messaging.rsocket.service.RSocketServiceProxyFactory; + +import com.bealdung.rsocket.requester.MessageClient; +import com.bealdung.rsocket.responder.RSocketApplication; + +import reactor.core.publisher.Mono; + +@SpringBootTest(classes = RSocketApplication.class) +public class RSocketRequestResponseIntegrationTest { + + MessageClient client; + + public RSocketRequestResponseIntegrationTest() { + RSocketRequester.Builder requesterBuilder = RSocketRequester.builder(); + RSocketRequester requester = requesterBuilder.tcp("localhost", 7000); + RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester) + .build(); + client = factory.createClient(MessageClient.class); + } + + @Test + public void whenSendingStream_thenReceiveTheSameStream() { + String message = "test message"; + assertEquals(message, client.sendMessage(Mono.just(message)) + .block()); + } + +}