diff --git a/spring-cloud/pom.xml b/spring-cloud/pom.xml
index 44e72535f8..f3083f69b7 100644
--- a/spring-cloud/pom.xml
+++ b/spring-cloud/pom.xml
@@ -16,6 +16,7 @@
spring-cloud-rest
spring-cloud-zookeeper
spring-cloud-gateway
+ spring-cloud-stream
pom
@@ -37,6 +38,7 @@
1.2.3.RELEASE
1.2.3.RELEASE
1.2.3.RELEASE
+ 1.3.0.RELEASE
1.4.2.RELEASE
3.6.0
1.4.2.RELEASE
diff --git a/spring-cloud/spring-cloud-stream/pom.xml b/spring-cloud/spring-cloud-stream/pom.xml
new file mode 100644
index 0000000000..352b52cc17
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/pom.xml
@@ -0,0 +1,70 @@
+
+
+ 4.0.0
+
+ org.baeldung
+ spring-cloud-stream
+
+ spring-cloud-stream-rabbit
+
+ pom
+
+ spring-cloud-stream
+
+
+ com.baeldung.spring.cloud
+ spring-cloud
+ 1.0.0-SNAPSHOT
+
+
+
+ UTF-8
+ 3.6.0
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-starter-stream-rabbit
+ ${spring-cloud-stream.version}
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream
+ ${spring-cloud-stream.version}
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream-test-support
+ ${spring-cloud-stream.version}
+ test
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ ${maven-compiler-plugin.version}
+
+ 1.8
+ 1.8
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring-boot-maven-plugin.version}
+
+
+
+
+
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/pom.xml b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/pom.xml
new file mode 100644
index 0000000000..9eede9e92c
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/pom.xml
@@ -0,0 +1,33 @@
+
+
+ 4.0.0
+
+ spring-cloud-stream-rabbit
+ jar
+
+ spring-cloud-stream-rabbit
+ Simple Spring Cloud Stream
+
+
+ org.baeldung
+ spring-cloud-stream
+ 1.0.0-SNAPSHOT
+ ..
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-starter-stream-rabbit
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream-test-support
+ test
+
+
+
+
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplication.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplication.java
new file mode 100644
index 0000000000..375494dfac
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplication.java
@@ -0,0 +1,38 @@
+package com.baeldung.spring.cloud.stream.rabbit;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+
+import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor;
+
+@SpringBootApplication
+@EnableBinding(MyProcessor.class)
+public class MultipleOutputsServiceApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(MultipleOutputsServiceApplication.class, args);
+ }
+
+ @Autowired
+ private MyProcessor processor;
+
+ @StreamListener(MyProcessor.INPUT)
+ public void routeValues(Integer val) {
+ if (val < 10) {
+ processor.anOutput()
+ .send(message(val));
+ } else {
+ processor.anotherOutput()
+ .send(message(val));
+ }
+ }
+
+ private static final Message message(T val) {
+ return MessageBuilder.withPayload(val)
+ .build();
+ }
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplication.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplication.java
new file mode 100644
index 0000000000..4729e418b6
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplication.java
@@ -0,0 +1,39 @@
+package com.baeldung.spring.cloud.stream.rabbit;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+
+import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor;
+
+@SpringBootApplication
+@EnableBinding(MyProcessor.class)
+public class MultipleOutputsWithConditionsServiceApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(MultipleOutputsWithConditionsServiceApplication.class, args);
+ }
+
+ @Autowired
+ private MyProcessor processor;
+
+ @StreamListener(target = MyProcessor.INPUT, condition = "payload < 10")
+ public void routeValuesToAnOutput(Integer val) {
+ processor.anOutput()
+ .send(message(val));
+ }
+
+ @StreamListener(target = MyProcessor.INPUT, condition = "payload >= 10")
+ public void routeValuesToAnotherOutput(Integer val) {
+ processor.anotherOutput()
+ .send(message(val));
+ }
+
+ private static final Message message(T val) {
+ return MessageBuilder.withPayload(val)
+ .build();
+ }
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerServiceApplication.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerServiceApplication.java
new file mode 100644
index 0000000000..aac551e544
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerServiceApplication.java
@@ -0,0 +1,32 @@
+package com.baeldung.spring.cloud.stream.rabbit;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.cloud.stream.messaging.Processor;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.handler.annotation.SendTo;
+
+import com.baeldung.spring.cloud.stream.rabbit.messages.TextPlainMessageConverter;
+import com.baeldung.spring.cloud.stream.rabbit.model.LogMessage;
+
+@SpringBootApplication
+@EnableBinding(Processor.class)
+public class MyLoggerServiceApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(MyLoggerServiceApplication.class, args);
+ }
+
+ @Bean
+ public MessageConverter providesTextPlainMessageConverter() {
+ return new TextPlainMessageConverter();
+ }
+
+ @StreamListener(Processor.INPUT)
+ @SendTo(Processor.OUTPUT)
+ public LogMessage enrichLogMessage(LogMessage log) {
+ return new LogMessage(String.format("[1]: %s", log.getMessage()));
+ }
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/messages/TextPlainMessageConverter.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/messages/TextPlainMessageConverter.java
new file mode 100644
index 0000000000..d690ee38a9
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/messages/TextPlainMessageConverter.java
@@ -0,0 +1,26 @@
+package com.baeldung.spring.cloud.stream.rabbit.messages;
+
+import org.springframework.messaging.Message;
+import org.springframework.messaging.converter.AbstractMessageConverter;
+import org.springframework.util.MimeType;
+
+import com.baeldung.spring.cloud.stream.rabbit.model.LogMessage;
+
+public class TextPlainMessageConverter extends AbstractMessageConverter {
+
+ public TextPlainMessageConverter() {
+ super(new MimeType("text", "plain"));
+ }
+
+ @Override
+ protected boolean supports(Class> clazz) {
+ return (LogMessage.class == clazz);
+ }
+
+ @Override
+ protected Object convertFromInternal(Message> message, Class> targetClass, Object conversionHint) {
+ Object payload = message.getPayload();
+ String text = payload instanceof String ? (String) payload : new String((byte[]) payload);
+ return new LogMessage(text);
+ }
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/model/LogMessage.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/model/LogMessage.java
new file mode 100644
index 0000000000..44a6ca4d4e
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/model/LogMessage.java
@@ -0,0 +1,32 @@
+package com.baeldung.spring.cloud.stream.rabbit.model;
+
+import java.io.Serializable;
+
+public class LogMessage implements Serializable {
+
+ private static final long serialVersionUID = -5857383701708275796L;
+
+ private String message;
+
+ public LogMessage() {
+
+ }
+
+ public LogMessage(String message) {
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public String toString() {
+ return message;
+ }
+
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/processor/MyProcessor.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/processor/MyProcessor.java
new file mode 100644
index 0000000000..563ce06b6f
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/processor/MyProcessor.java
@@ -0,0 +1,19 @@
+package com.baeldung.spring.cloud.stream.rabbit.processor;
+
+import org.springframework.cloud.stream.annotation.Input;
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.SubscribableChannel;
+
+public interface MyProcessor {
+ String INPUT = "myInput";
+
+ @Input
+ SubscribableChannel myInput();
+
+ @Output("myOutput")
+ MessageChannel anOutput();
+
+ @Output
+ MessageChannel anotherOutput();
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/resources/application.yml b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/resources/application.yml
new file mode 100644
index 0000000000..3d9d97a736
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/resources/application.yml
@@ -0,0 +1,28 @@
+spring:
+ cloud:
+ stream:
+ bindings:
+ input:
+ destination: queue.log.messages
+ binder: local_rabbit
+ group: logMessageConsumers
+ output:
+ destination: queue.pretty.log.messages
+ binder: local_rabbit
+ binders:
+ local_rabbit:
+ type: rabbit
+ environment:
+ spring:
+ rabbitmq:
+ host: localhost
+ port: 5672
+ username: guest
+ password: guest
+ virtual-host: /
+server:
+ port: 0
+management:
+ health:
+ binders:
+ enabled: true
\ No newline at end of file
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplicationTests.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplicationTests.java
new file mode 100644
index 0000000000..18dcd8da7c
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplicationTests.java
@@ -0,0 +1,52 @@
+package com.baeldung.spring.cloud.stream.rabbit;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = MultipleOutputsServiceApplication.class)
+@DirtiesContext
+public class MultipleOutputsServiceApplicationTests {
+
+ @Autowired
+ private MyProcessor pipe;
+
+ @Autowired
+ private MessageCollector messageCollector;
+
+ @Test
+ public void shouldReceiveMessageInAnOutput() {
+ whenSendMessage(1);
+ thenPayloadInChannelIs(pipe.anOutput(), 1);
+ }
+
+ @Test
+ public void shouldReceiveMessageInAnAnotherOutput() {
+ whenSendMessage(11);
+ thenPayloadInChannelIs(pipe.anotherOutput(), 11);
+ }
+
+ private void whenSendMessage(Integer val) {
+ pipe.myInput()
+ .send(MessageBuilder.withPayload(val)
+ .build());
+ }
+
+ private void thenPayloadInChannelIs(MessageChannel channel, Integer expectedValue) {
+ Object payload = messageCollector.forChannel(channel)
+ .poll()
+ .getPayload();
+ assertEquals(expectedValue, payload);
+ }
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplicationTests.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplicationTests.java
new file mode 100644
index 0000000000..02638dea4c
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplicationTests.java
@@ -0,0 +1,52 @@
+package com.baeldung.spring.cloud.stream.rabbit;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = MultipleOutputsWithConditionsServiceApplication.class)
+@DirtiesContext
+public class MultipleOutputsWithConditionsServiceApplicationTests {
+
+ @Autowired
+ private MyProcessor pipe;
+
+ @Autowired
+ private MessageCollector messageCollector;
+
+ @Test
+ public void shouldReceiveMessageInAnOutput() {
+ whenSendMessage(1);
+ thenPayloadInChannelIs(pipe.anotherOutput(), 1);
+ }
+
+ @Test
+ public void shouldReceiveMessageInAnAnotherOutput() {
+ whenSendMessage(11);
+ thenPayloadInChannelIs(pipe.anotherOutput(), 11);
+ }
+
+ private void whenSendMessage(Integer val) {
+ pipe.myInput()
+ .send(MessageBuilder.withPayload(val)
+ .build());
+ }
+
+ private void thenPayloadInChannelIs(MessageChannel channel, Integer expectedValue) {
+ Object payload = messageCollector.forChannel(channel)
+ .poll()
+ .getPayload();
+ assertEquals(expectedValue, payload);
+ }
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerApplicationTests.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerApplicationTests.java
new file mode 100644
index 0000000000..d004987929
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerApplicationTests.java
@@ -0,0 +1,44 @@
+package com.baeldung.spring.cloud.stream.rabbit;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.stream.messaging.Processor;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import com.baeldung.spring.cloud.stream.rabbit.MyLoggerServiceApplication;
+import com.baeldung.spring.cloud.stream.rabbit.model.LogMessage;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = MyLoggerServiceApplication.class)
+@DirtiesContext
+public class MyLoggerApplicationTests {
+
+ @Autowired
+ private Processor pipe;
+
+ @Autowired
+ private MessageCollector messageCollector;
+
+ @Test
+ public void shouldEnrichMessage() {
+ // Send message
+ pipe.input()
+ .send(MessageBuilder.withPayload(new LogMessage("This is my message"))
+ .build());
+
+ // Get response from the service
+ Object payload = messageCollector.forChannel(pipe.output())
+ .poll()
+ .getPayload();
+
+ // Assert
+ assertEquals("[1]: This is my message", payload.toString());
+ }
+}