From 57a993de64fc0959b0511096f07ac9d59133fa01 Mon Sep 17 00:00:00 2001 From: Jose Carvajal Date: Sun, 8 Oct 2017 08:17:24 +0200 Subject: [PATCH] BAEL-719: Intro to Spring Cloud Stream --- spring-cloud/pom.xml | 2 + spring-cloud/spring-cloud-stream/pom.xml | 70 +++++++++++++++++++ .../spring-cloud-stream-rabbit/pom.xml | 33 +++++++++ .../MultipleOutputsServiceApplication.java | 38 ++++++++++ ...tputsWithConditionsServiceApplication.java | 39 +++++++++++ .../rabbit/MyLoggerServiceApplication.java | 32 +++++++++ .../messages/TextPlainMessageConverter.java | 26 +++++++ .../cloud/stream/rabbit/model/LogMessage.java | 32 +++++++++ .../stream/rabbit/processor/MyProcessor.java | 19 +++++ .../src/main/resources/application.yml | 28 ++++++++ ...ultipleOutputsServiceApplicationTests.java | 52 ++++++++++++++ ...WithConditionsServiceApplicationTests.java | 52 ++++++++++++++ .../rabbit/MyLoggerApplicationTests.java | 44 ++++++++++++ 13 files changed, 467 insertions(+) create mode 100644 spring-cloud/spring-cloud-stream/pom.xml create mode 100644 spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/pom.xml create mode 100644 spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplication.java create mode 100644 spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplication.java create mode 100644 spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerServiceApplication.java create mode 100644 spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/messages/TextPlainMessageConverter.java create mode 100644 spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/model/LogMessage.java create mode 100644 spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/processor/MyProcessor.java create mode 100644 spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/resources/application.yml create mode 100644 spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplicationTests.java create mode 100644 spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplicationTests.java create mode 100644 spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerApplicationTests.java diff --git a/spring-cloud/pom.xml b/spring-cloud/pom.xml index 44e72535f8..f3083f69b7 100644 --- a/spring-cloud/pom.xml +++ b/spring-cloud/pom.xml @@ -16,6 +16,7 @@ spring-cloud-rest spring-cloud-zookeeper spring-cloud-gateway + spring-cloud-stream pom @@ -37,6 +38,7 @@ 1.2.3.RELEASE 1.2.3.RELEASE 1.2.3.RELEASE + 1.3.0.RELEASE 1.4.2.RELEASE 3.6.0 1.4.2.RELEASE diff --git a/spring-cloud/spring-cloud-stream/pom.xml b/spring-cloud/spring-cloud-stream/pom.xml new file mode 100644 index 0000000000..352b52cc17 --- /dev/null +++ b/spring-cloud/spring-cloud-stream/pom.xml @@ -0,0 +1,70 @@ + + + 4.0.0 + + org.baeldung + spring-cloud-stream + + spring-cloud-stream-rabbit + + pom + + spring-cloud-stream + + + com.baeldung.spring.cloud + spring-cloud + 1.0.0-SNAPSHOT + + + + UTF-8 + 3.6.0 + + + + + + org.springframework.cloud + spring-cloud-starter-stream-rabbit + ${spring-cloud-stream.version} + + + + org.springframework.cloud + spring-cloud-stream + ${spring-cloud-stream.version} + + + + org.springframework.cloud + spring-cloud-stream-test-support + ${spring-cloud-stream.version} + test + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven-compiler-plugin.version} + + 1.8 + 1.8 + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot-maven-plugin.version} + + + + + diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/pom.xml b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/pom.xml new file mode 100644 index 0000000000..9eede9e92c --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/pom.xml @@ -0,0 +1,33 @@ + + + 4.0.0 + + spring-cloud-stream-rabbit + jar + + spring-cloud-stream-rabbit + Simple Spring Cloud Stream + + + org.baeldung + spring-cloud-stream + 1.0.0-SNAPSHOT + .. + + + + + + org.springframework.cloud + spring-cloud-starter-stream-rabbit + + + + org.springframework.cloud + spring-cloud-stream-test-support + test + + + + diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplication.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplication.java new file mode 100644 index 0000000000..375494dfac --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplication.java @@ -0,0 +1,38 @@ +package com.baeldung.spring.cloud.stream.rabbit; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor; + +@SpringBootApplication +@EnableBinding(MyProcessor.class) +public class MultipleOutputsServiceApplication { + public static void main(String[] args) { + SpringApplication.run(MultipleOutputsServiceApplication.class, args); + } + + @Autowired + private MyProcessor processor; + + @StreamListener(MyProcessor.INPUT) + public void routeValues(Integer val) { + if (val < 10) { + processor.anOutput() + .send(message(val)); + } else { + processor.anotherOutput() + .send(message(val)); + } + } + + private static final Message message(T val) { + return MessageBuilder.withPayload(val) + .build(); + } +} diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplication.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplication.java new file mode 100644 index 0000000000..4729e418b6 --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplication.java @@ -0,0 +1,39 @@ +package com.baeldung.spring.cloud.stream.rabbit; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor; + +@SpringBootApplication +@EnableBinding(MyProcessor.class) +public class MultipleOutputsWithConditionsServiceApplication { + public static void main(String[] args) { + SpringApplication.run(MultipleOutputsWithConditionsServiceApplication.class, args); + } + + @Autowired + private MyProcessor processor; + + @StreamListener(target = MyProcessor.INPUT, condition = "payload < 10") + public void routeValuesToAnOutput(Integer val) { + processor.anOutput() + .send(message(val)); + } + + @StreamListener(target = MyProcessor.INPUT, condition = "payload >= 10") + public void routeValuesToAnotherOutput(Integer val) { + processor.anotherOutput() + .send(message(val)); + } + + private static final Message message(T val) { + return MessageBuilder.withPayload(val) + .build(); + } +} diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerServiceApplication.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerServiceApplication.java new file mode 100644 index 0000000000..aac551e544 --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerServiceApplication.java @@ -0,0 +1,32 @@ +package com.baeldung.spring.cloud.stream.rabbit; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.handler.annotation.SendTo; + +import com.baeldung.spring.cloud.stream.rabbit.messages.TextPlainMessageConverter; +import com.baeldung.spring.cloud.stream.rabbit.model.LogMessage; + +@SpringBootApplication +@EnableBinding(Processor.class) +public class MyLoggerServiceApplication { + public static void main(String[] args) { + SpringApplication.run(MyLoggerServiceApplication.class, args); + } + + @Bean + public MessageConverter providesTextPlainMessageConverter() { + return new TextPlainMessageConverter(); + } + + @StreamListener(Processor.INPUT) + @SendTo(Processor.OUTPUT) + public LogMessage enrichLogMessage(LogMessage log) { + return new LogMessage(String.format("[1]: %s", log.getMessage())); + } +} diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/messages/TextPlainMessageConverter.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/messages/TextPlainMessageConverter.java new file mode 100644 index 0000000000..d690ee38a9 --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/messages/TextPlainMessageConverter.java @@ -0,0 +1,26 @@ +package com.baeldung.spring.cloud.stream.rabbit.messages; + +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.AbstractMessageConverter; +import org.springframework.util.MimeType; + +import com.baeldung.spring.cloud.stream.rabbit.model.LogMessage; + +public class TextPlainMessageConverter extends AbstractMessageConverter { + + public TextPlainMessageConverter() { + super(new MimeType("text", "plain")); + } + + @Override + protected boolean supports(Class clazz) { + return (LogMessage.class == clazz); + } + + @Override + protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { + Object payload = message.getPayload(); + String text = payload instanceof String ? (String) payload : new String((byte[]) payload); + return new LogMessage(text); + } +} diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/model/LogMessage.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/model/LogMessage.java new file mode 100644 index 0000000000..44a6ca4d4e --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/model/LogMessage.java @@ -0,0 +1,32 @@ +package com.baeldung.spring.cloud.stream.rabbit.model; + +import java.io.Serializable; + +public class LogMessage implements Serializable { + + private static final long serialVersionUID = -5857383701708275796L; + + private String message; + + public LogMessage() { + + } + + public LogMessage(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Override + public String toString() { + return message; + } + +} diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/processor/MyProcessor.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/processor/MyProcessor.java new file mode 100644 index 0000000000..563ce06b6f --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/processor/MyProcessor.java @@ -0,0 +1,19 @@ +package com.baeldung.spring.cloud.stream.rabbit.processor; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; + +public interface MyProcessor { + String INPUT = "myInput"; + + @Input + SubscribableChannel myInput(); + + @Output("myOutput") + MessageChannel anOutput(); + + @Output + MessageChannel anotherOutput(); +} diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/resources/application.yml b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/resources/application.yml new file mode 100644 index 0000000000..3d9d97a736 --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/resources/application.yml @@ -0,0 +1,28 @@ +spring: + cloud: + stream: + bindings: + input: + destination: queue.log.messages + binder: local_rabbit + group: logMessageConsumers + output: + destination: queue.pretty.log.messages + binder: local_rabbit + binders: + local_rabbit: + type: rabbit + environment: + spring: + rabbitmq: + host: localhost + port: 5672 + username: guest + password: guest + virtual-host: / +server: + port: 0 +management: + health: + binders: + enabled: true \ No newline at end of file diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplicationTests.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplicationTests.java new file mode 100644 index 0000000000..18dcd8da7c --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplicationTests.java @@ -0,0 +1,52 @@ +package com.baeldung.spring.cloud.stream.rabbit; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = MultipleOutputsServiceApplication.class) +@DirtiesContext +public class MultipleOutputsServiceApplicationTests { + + @Autowired + private MyProcessor pipe; + + @Autowired + private MessageCollector messageCollector; + + @Test + public void shouldReceiveMessageInAnOutput() { + whenSendMessage(1); + thenPayloadInChannelIs(pipe.anOutput(), 1); + } + + @Test + public void shouldReceiveMessageInAnAnotherOutput() { + whenSendMessage(11); + thenPayloadInChannelIs(pipe.anotherOutput(), 11); + } + + private void whenSendMessage(Integer val) { + pipe.myInput() + .send(MessageBuilder.withPayload(val) + .build()); + } + + private void thenPayloadInChannelIs(MessageChannel channel, Integer expectedValue) { + Object payload = messageCollector.forChannel(channel) + .poll() + .getPayload(); + assertEquals(expectedValue, payload); + } +} diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplicationTests.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplicationTests.java new file mode 100644 index 0000000000..02638dea4c --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplicationTests.java @@ -0,0 +1,52 @@ +package com.baeldung.spring.cloud.stream.rabbit; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = MultipleOutputsWithConditionsServiceApplication.class) +@DirtiesContext +public class MultipleOutputsWithConditionsServiceApplicationTests { + + @Autowired + private MyProcessor pipe; + + @Autowired + private MessageCollector messageCollector; + + @Test + public void shouldReceiveMessageInAnOutput() { + whenSendMessage(1); + thenPayloadInChannelIs(pipe.anotherOutput(), 1); + } + + @Test + public void shouldReceiveMessageInAnAnotherOutput() { + whenSendMessage(11); + thenPayloadInChannelIs(pipe.anotherOutput(), 11); + } + + private void whenSendMessage(Integer val) { + pipe.myInput() + .send(MessageBuilder.withPayload(val) + .build()); + } + + private void thenPayloadInChannelIs(MessageChannel channel, Integer expectedValue) { + Object payload = messageCollector.forChannel(channel) + .poll() + .getPayload(); + assertEquals(expectedValue, payload); + } +} diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerApplicationTests.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerApplicationTests.java new file mode 100644 index 0000000000..d004987929 --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerApplicationTests.java @@ -0,0 +1,44 @@ +package com.baeldung.spring.cloud.stream.rabbit; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import com.baeldung.spring.cloud.stream.rabbit.MyLoggerServiceApplication; +import com.baeldung.spring.cloud.stream.rabbit.model.LogMessage; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = MyLoggerServiceApplication.class) +@DirtiesContext +public class MyLoggerApplicationTests { + + @Autowired + private Processor pipe; + + @Autowired + private MessageCollector messageCollector; + + @Test + public void shouldEnrichMessage() { + // Send message + pipe.input() + .send(MessageBuilder.withPayload(new LogMessage("This is my message")) + .build()); + + // Get response from the service + Object payload = messageCollector.forChannel(pipe.output()) + .poll() + .getPayload(); + + // Assert + assertEquals("[1]: This is my message", payload.toString()); + } +}