JAVA-15787 Created new messaging-modules and saas-modules

- Moved jgroups, rabbitmq, spring-amqp, spring-apache-camel, spring-jms to messaging-modules
- Moved twilio, twitter4j, strip to saas-modules
- Renamed existing saas to jira-rest-integration
This commit is contained in:
Dhawal Kapil
2022-11-26 11:54:53 +05:30
parent 8ee7dcd350
commit ad9ddfc6e3
142 changed files with 91 additions and 57 deletions
@@ -0,0 +1,52 @@
package com.baeldung.springamqp.broadcast;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BroadcastConfig {
private static final boolean NON_DURABLE = false;
public final static String FANOUT_QUEUE_1_NAME = "com.baeldung.spring-amqp-simple.fanout.queue1";
public final static String FANOUT_QUEUE_2_NAME = "com.baeldung.spring-amqp-simple.fanout.queue2";
public final static String FANOUT_EXCHANGE_NAME = "com.baeldung.spring-amqp-simple.fanout.exchange";
public final static String TOPIC_QUEUE_1_NAME = "com.baeldung.spring-amqp-simple.topic.queue1";
public final static String TOPIC_QUEUE_2_NAME = "com.baeldung.spring-amqp-simple.topic.queue2";
public final static String TOPIC_EXCHANGE_NAME = "com.baeldung.spring-amqp-simple.topic.exchange";
public static final String BINDING_PATTERN_IMPORTANT = "*.important.*";
public static final String BINDING_PATTERN_ERROR = "#.error";
@Bean
public Declarables topicBindings() {
Queue topicQueue1 = new Queue(TOPIC_QUEUE_1_NAME, NON_DURABLE);
Queue topicQueue2 = new Queue(TOPIC_QUEUE_2_NAME, NON_DURABLE);
TopicExchange topicExchange = new TopicExchange(TOPIC_EXCHANGE_NAME, NON_DURABLE, false);
return new Declarables(topicQueue1, topicQueue2, topicExchange, BindingBuilder
.bind(topicQueue1)
.to(topicExchange)
.with(BINDING_PATTERN_IMPORTANT), BindingBuilder
.bind(topicQueue2)
.to(topicExchange)
.with(BINDING_PATTERN_ERROR));
}
@Bean
public Declarables fanoutBindings() {
Queue fanoutQueue1 = new Queue(FANOUT_QUEUE_1_NAME, NON_DURABLE);
Queue fanoutQueue2 = new Queue(FANOUT_QUEUE_2_NAME, NON_DURABLE);
FanoutExchange fanoutExchange = new FanoutExchange(FANOUT_EXCHANGE_NAME, NON_DURABLE, false);
return new Declarables(fanoutQueue1, fanoutQueue2, fanoutExchange, BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange), BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange));
}
}
@@ -0,0 +1,59 @@
package com.baeldung.springamqp.broadcast;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import static com.baeldung.springamqp.broadcast.BroadcastConfig.*;
/**
* Simple test application to send messages to rabbitMQ.
*
*<p>To run this particular application with mvn you use the following command:</p>
* {@code
* mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.broadcast.BroadcastMessageApp
* }
*/
@SpringBootApplication
public class BroadcastMessageApp {
private static String ROUTING_KEY_USER_IMPORTANT_WARN = "user.important.warn";
private static String ROUTING_KEY_USER_IMPORTANT_ERROR = "user.important.error";
public static void main(String[] args) {
SpringApplication.run(BroadcastMessageApp.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate rabbitTemplate) {
String message = " payload is broadcast";
return args -> {
rabbitTemplate.convertAndSend(BroadcastConfig.FANOUT_EXCHANGE_NAME, "", "fanout" + message);
rabbitTemplate.convertAndSend(BroadcastConfig.TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, "topic important warn" + message);
rabbitTemplate.convertAndSend(BroadcastConfig.TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, "topic important error" + message);
};
}
@RabbitListener(queues = { FANOUT_QUEUE_1_NAME })
public void receiveMessageFromFanout1(String message) {
System.out.println("Received fanout 1 message: " + message);
}
@RabbitListener(queues = { FANOUT_QUEUE_2_NAME })
public void receiveMessageFromFanout2(String message) {
System.out.println("Received fanout 2 message: " + message);
}
@RabbitListener(queues = { TOPIC_QUEUE_1_NAME })
public void receiveMessageFromTopic1(String message) {
System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message);
}
@RabbitListener(queues = { TOPIC_QUEUE_2_NAME })
public void receiveMessageFromTopic2(String message) {
System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message);
}
}
@@ -0,0 +1,26 @@
package com.baeldung.springamqp.errorhandling;
import com.baeldung.springamqp.errorhandling.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class ErrorHandlingApp {
@Autowired
MessageProducer messageProducer;
public static void main(String[] args) {
SpringApplication.run(ErrorHandlingApp.class, args);
}
@EventListener(ApplicationReadyEvent.class)
public void doSomethingAfterStartup() {
messageProducer.sendMessage();
}
}
@@ -0,0 +1,55 @@
package com.baeldung.springamqp.errorhandling.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
@Configuration
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "dlx-custom")
public class DLXCustomAmqpConfiguration {
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
.build();
}
@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}
@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}
@Bean
DirectExchange messagesExchange() {
return new DirectExchange(EXCHANGE_MESSAGES);
}
@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}
}
@@ -0,0 +1,72 @@
package com.baeldung.springamqp.errorhandling.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
@Configuration
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "parking-lot-dlx")
public class DLXParkingLotAmqpConfiguration {
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot";
@Bean
FanoutExchange parkingLotExchange() {
return new FanoutExchange(EXCHANGE_PARKING_LOT);
}
@Bean
Queue parkingLotQueue() {
return QueueBuilder.durable(QUEUE_PARKING_LOT).build();
}
@Bean
Binding parkingLotBinding() {
return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());
}
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
.build();
}
@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}
@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}
@Bean
DirectExchange messagesExchange() {
return new DirectExchange(EXCHANGE_MESSAGES);
}
@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}
}
@@ -0,0 +1,63 @@
package com.baeldung.springamqp.errorhandling.configuration;
import com.baeldung.springamqp.errorhandling.errorhandler.CustomFatalExceptionStrategy;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ErrorHandler;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
@Configuration
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "fatal-error-strategy")
public class FatalExceptionStrategyAmqpConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setErrorHandler(errorHandler());
return factory;
}
@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(customExceptionStrategy());
}
@Bean
FatalExceptionStrategy customExceptionStrategy() {
return new CustomFatalExceptionStrategy();
}
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.build();
}
@Bean
DirectExchange messagesExchange() {
return new DirectExchange(EXCHANGE_MESSAGES);
}
@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}
}
@@ -0,0 +1,55 @@
package com.baeldung.springamqp.errorhandling.configuration;
import com.baeldung.springamqp.errorhandling.errorhandler.CustomErrorHandler;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ErrorHandler;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
@Configuration
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "listener-error")
public class ListenerErrorHandlerAmqpConfiguration {
@Bean
public ErrorHandler errorHandler() {
return new CustomErrorHandler();
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setErrorHandler(errorHandler());
return factory;
}
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.build();
}
@Bean
DirectExchange messagesExchange() {
return new DirectExchange(EXCHANGE_MESSAGES);
}
@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}
}
@@ -0,0 +1,55 @@
package com.baeldung.springamqp.errorhandling.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
@Configuration
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "routing-dlq")
public class RoutingKeyDLQAmqpConfiguration {
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
.build();
}
@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}
@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}
@Bean
DirectExchange messagesExchange() {
return new DirectExchange(EXCHANGE_MESSAGES);
}
@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}
}
@@ -0,0 +1,43 @@
package com.baeldung.springamqp.errorhandling.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "simple-dlq")
public class SimpleDLQAmqpConfiguration {
public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
public static final String QUEUE_MESSAGES_DLQ = QUEUE_MESSAGES + ".dlq";
public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange";
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
.build();
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}
@Bean
DirectExchange messagesExchange() {
return new DirectExchange(EXCHANGE_MESSAGES);
}
@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}
}
@@ -0,0 +1,35 @@
package com.baeldung.springamqp.errorhandling.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
import static com.baeldung.springamqp.errorhandling.consumer.MessagesConsumer.HEADER_X_RETRIES_COUNT;
public class DLQCustomAmqpContainer {
private static final Logger log = LoggerFactory.getLogger(DLQCustomAmqpContainer.class);
private final RabbitTemplate rabbitTemplate;
public static final int MAX_RETRIES_COUNT = 2;
public DLQCustomAmqpContainer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryHeaders(Message failedMessage) {
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT);
if (retriesCnt == null)
retriesCnt = 1;
if (retriesCnt > MAX_RETRIES_COUNT) {
log.info("Discarding message");
return;
}
log.info("Retrying message for the {} time", retriesCnt);
failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}
}
@@ -0,0 +1,63 @@
package com.baeldung.springamqp.errorhandling.consumer;
import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration;
import com.baeldung.springamqp.errorhandling.errorhandler.BusinessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessagesConsumer {
public static final String HEADER_X_RETRIES_COUNT = "x-retries-count";
private static final Logger log = LoggerFactory.getLogger(MessagesConsumer.class);
private final RabbitTemplate rabbitTemplate;
public MessagesConsumer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
public void receiveMessage(Message message) throws BusinessException {
log.info("Received message: {}", message.toString());
throw new BusinessException();
}
@Bean
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "simple-dlq")
public SimpleDLQAmqpContainer simpleAmqpContainer() {
return new SimpleDLQAmqpContainer(rabbitTemplate);
}
@Bean
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "routing-dlq")
public RoutingDLQAmqpContainer routingDLQAmqpContainer() {
return new RoutingDLQAmqpContainer(rabbitTemplate);
}
@Bean
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "dlx-custom")
public DLQCustomAmqpContainer dlqAmqpContainer() {
return new DLQCustomAmqpContainer(rabbitTemplate);
}
@Bean
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "parking-lot-dlx")
public ParkingLotDLQAmqpContainer parkingLotDLQAmqpContainer() {
return new ParkingLotDLQAmqpContainer(rabbitTemplate);
}
}
@@ -0,0 +1,43 @@
package com.baeldung.springamqp.errorhandling.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.EXCHANGE_PARKING_LOT;
import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.QUEUE_PARKING_LOT;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
import static com.baeldung.springamqp.errorhandling.consumer.MessagesConsumer.HEADER_X_RETRIES_COUNT;
public class ParkingLotDLQAmqpContainer {
private static final Logger log = LoggerFactory.getLogger(ParkingLotDLQAmqpContainer.class);
private final RabbitTemplate rabbitTemplate;
public static final int MAX_RETRIES_COUNT = 2;
public ParkingLotDLQAmqpContainer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryWithParkingLot(Message failedMessage) {
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT);
if (retriesCnt == null)
retriesCnt = 1;
if (retriesCnt > MAX_RETRIES_COUNT) {
log.info("Sending message to the parking lot queue");
rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
return;
}
log.info("Retrying message for the {} time", retriesCnt);
failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}
@RabbitListener(queues = QUEUE_PARKING_LOT)
public void processParkingLotQueue(Message failedMessage) {
log.info("Received message in parking lot queue {}", failedMessage.toString());
}
}
@@ -0,0 +1,33 @@
package com.baeldung.springamqp.errorhandling.consumer;
import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration;
import com.baeldung.springamqp.errorhandling.errorhandler.BusinessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
public class RoutingDLQAmqpContainer {
private static final Logger log = LoggerFactory.getLogger(RoutingDLQAmqpContainer.class);
private final RabbitTemplate rabbitTemplate;
public RoutingDLQAmqpContainer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
public void receiveMessage(Message message) throws BusinessException {
log.info("Received message: {}", message.toString());
throw new BusinessException();
}
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(Message failedMessage) {
log.info("Received failed message, requeueing: {}", failedMessage.toString());
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}
}
@@ -0,0 +1,31 @@
package com.baeldung.springamqp.errorhandling.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
public class SimpleDLQAmqpContainer {
private static final Logger log = LoggerFactory.getLogger(SimpleDLQAmqpContainer.class);
private final RabbitTemplate rabbitTemplate;
public SimpleDLQAmqpContainer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessages(Message message) {
log.info("Received failed message: {}", message.toString());
}
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(Message failedMessage) {
log.info("Received failed message, requeueing: {}", failedMessage.toString());
log.info("Received failed message, requeueing: {}", failedMessage.getMessageProperties().getReceivedRoutingKey());
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}
}
@@ -0,0 +1,4 @@
package com.baeldung.springamqp.errorhandling.errorhandler;
public class BusinessException extends Exception {
}
@@ -0,0 +1,14 @@
package com.baeldung.springamqp.errorhandling.errorhandler;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.util.ErrorHandler;
public class CustomErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
if (!(t.getCause() instanceof BusinessException)) {
throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
}
}
}
@@ -0,0 +1,10 @@
package com.baeldung.springamqp.errorhandling.errorhandler;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
public class CustomFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
@Override
public boolean isFatal(Throwable t) {
return !(t.getCause() instanceof BusinessException);
}
}
@@ -0,0 +1,24 @@
package com.baeldung.springamqp.errorhandling.producer;
import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
private static final Logger log = LoggerFactory.getLogger(MessageProducer.class);
private int messageNumber = 0;
private final RabbitTemplate rabbitTemplate;
public MessageProducer(final RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage() {
log.info("Sending message...");
rabbitTemplate.convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES, SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++);
}
}
@@ -0,0 +1,11 @@
package com.baeldung.springamqp.exponentialbackoff;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ExponentialBackoffApp {
public static void main(String[] args) {
SpringApplication.run(ExponentialBackoffApp.class, args);
}
}
@@ -0,0 +1,21 @@
package com.baeldung.springamqp.exponentialbackoff;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
public class ObservableRejectAndDontRequeueRecoverer extends RejectAndDontRequeueRecoverer {
private Runnable observer;
@Override
public void recover(Message message, Throwable cause) {
if(observer != null) {
observer.run();
}
super.recover(message, cause);
}
void setObserver(Runnable observer){
this.observer = observer;
}
}
@@ -0,0 +1,162 @@
package com.baeldung.springamqp.exponentialbackoff;
import org.aopalliance.aop.Advice;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import com.rabbitmq.client.Channel;
@EnableRabbit
@Configuration
public class RabbitConfiguration {
private static Logger logger = LoggerFactory.getLogger(RabbitConfiguration.class);
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue blockingQueue() {
return QueueBuilder.nonDurable("blocking-queue")
.build();
}
@Bean
public Queue nonBlockingQueue() {
return QueueBuilder.nonDurable("non-blocking-queue")
.build();
}
@Bean
public Queue retryWaitEndedQueue() {
return QueueBuilder.nonDurable("retry-wait-ended-queue")
.build();
}
@Bean
public Queue retryQueue1() {
return QueueBuilder.nonDurable("retry-queue-1")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public Queue retryQueue2() {
return QueueBuilder.nonDurable("retry-queue-2")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public Queue retryQueue3() {
return QueueBuilder.nonDurable("retry-queue-3")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public RetryQueues retryQueues() {
return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3());
}
@Bean
public ObservableRejectAndDontRequeueRecoverer observableRecoverer() {
return new ObservableRejectAndDontRequeueRecoverer();
}
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.backOffOptions(1000, 3.0, 10000)
.maxAttempts(5)
.recoverer(observableRecoverer())
.build();
}
@Bean
public RetryQueuesInterceptor retryQueuesInterceptor(RabbitTemplate rabbitTemplate, RetryQueues retryQueues) {
return new RetryQueuesInterceptor(rabbitTemplate, retryQueues);
}
@Bean
public SimpleRabbitListenerContainerFactory defaultContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
Advice[] adviceChain = { retryInterceptor };
factory.setAdviceChain(adviceChain);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory(ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
Advice[] adviceChain = { retryInterceptor };
factory.setAdviceChain(adviceChain);
return factory;
}
@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory")
public void consumeBlocking(String payload) throws Exception {
logger.info("Processing message from blocking-queue: {}", payload);
throw new Exception("exception occured!");
}
@RabbitListener(queues = "non-blocking-queue", containerFactory = "retryQueuesContainerFactory", ackMode = "MANUAL")
public void consumeNonBlocking(String payload) throws Exception {
logger.info("Processing message from non-blocking-queue: {}", payload);
throw new Exception("Error occured!");
}
@RabbitListener(queues = "retry-wait-ended-queue", containerFactory = "defaultContainerFactory")
public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception {
MessageProperties props = message.getMessageProperties();
rabbitTemplate().convertAndSend(props.getHeader("x-original-exchange"), props.getHeader("x-original-routing-key"), message);
}
}
@@ -0,0 +1,34 @@
package com.baeldung.springamqp.exponentialbackoff;
import org.springframework.amqp.core.Queue;
public class RetryQueues {
private Queue[] queues;
private long initialInterval;
private double factor;
private long maxWait;
public RetryQueues(long initialInterval, double factor, long maxWait, Queue... queues) {
this.queues = queues;
this.initialInterval = initialInterval;
this.factor = factor;
this.maxWait = maxWait;
}
public boolean retriesExhausted(int retry) {
return retry >= queues.length;
}
public String getQueueName(int retry) {
return queues[retry].getName();
}
public long getTimeToWait(int retry) {
double time = initialInterval * Math.pow(factor, (double) retry);
if (time > maxWait) {
return maxWait;
}
return (long) time;
}
}
@@ -0,0 +1,109 @@
package com.baeldung.springamqp.exponentialbackoff;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import com.rabbitmq.client.Channel;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class RetryQueuesInterceptor implements MethodInterceptor {
private RabbitTemplate rabbitTemplate;
private RetryQueues retryQueues;
private Runnable observer;
public RetryQueuesInterceptor(RabbitTemplate rabbitTemplate, RetryQueues retryQueues) {
this.rabbitTemplate = rabbitTemplate;
this.retryQueues = retryQueues;
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
return tryConsume(invocation, this::ack, (mac, e) -> {
try {
int retryCount = tryGetRetryCountOrFail(mac, e);
sendToNextRetryQueue(mac, retryCount);
} catch (Throwable t) {
if (observer != null) {
observer.run();
}
throw new RuntimeException(t);
}
});
}
void setObserver(Runnable observer) {
this.observer = observer;
}
private Object tryConsume(MethodInvocation invocation, Consumer<MessageAndChannel> successHandler, BiConsumer<MessageAndChannel, Throwable> errorHandler) throws Throwable {
MessageAndChannel mac = new MessageAndChannel((Message) invocation.getArguments()[1], (Channel) invocation.getArguments()[0]);
Object ret = null;
try {
ret = invocation.proceed();
successHandler.accept(mac);
} catch (Throwable e) {
errorHandler.accept(mac, e);
}
return ret;
}
private void ack(MessageAndChannel mac) {
try {
mac.channel.basicAck(mac.message.getMessageProperties()
.getDeliveryTag(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private int tryGetRetryCountOrFail(MessageAndChannel mac, Throwable originalError) throws Throwable {
MessageProperties props = mac.message.getMessageProperties();
String xRetriedCountHeader = (String) props.getHeader("x-retried-count");
final int xRetriedCount = xRetriedCountHeader == null ? 0 : Integer.valueOf(xRetriedCountHeader);
if (retryQueues.retriesExhausted(xRetriedCount)) {
mac.channel.basicReject(props.getDeliveryTag(), false);
throw originalError;
}
return xRetriedCount;
}
private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception {
String retryQueueName = retryQueues.getQueueName(retryCount);
rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> {
MessageProperties props = m.getMessageProperties();
props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount)));
props.setHeader("x-retried-count", String.valueOf(retryCount + 1));
props.setHeader("x-original-exchange", props.getReceivedExchange());
props.setHeader("x-original-routing-key", props.getReceivedRoutingKey());
return m;
});
mac.channel.basicReject(mac.message.getMessageProperties()
.getDeliveryTag(), false);
}
private class MessageAndChannel {
private Message message;
private Channel channel;
private MessageAndChannel(Message message, Channel channel) {
this.message = message;
this.channel = channel;
}
}
}
@@ -0,0 +1,38 @@
package com.baeldung.springamqp.simple;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class HelloWorldMessageApp {
private static final boolean NON_DURABLE = false;
private static final String MY_QUEUE_NAME = "myQueue";
public static void main(String[] args) {
SpringApplication.run(HelloWorldMessageApp.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("myQueue", "Hello, world!");
};
}
@Bean
public Queue myQueue() {
return new Queue(MY_QUEUE_NAME, NON_DURABLE);
}
@RabbitListener(queues = MY_QUEUE_NAME)
public void listen(String in) {
System.out.println("Message read from myQueue : " + in);
}
}
@@ -0,0 +1,3 @@
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.main.allow-bean-definition-overriding=true
amqp.configuration.current=parking-lot-dlx
@@ -0,0 +1,58 @@
package com.baeldung.springamqp.exponentialbackoff;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* This live test requires:
*
* - A running RabbitMQ instance on localhost (e.g. docker run -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management)
*
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { RabbitConfiguration.class })
public class ExponentialBackoffLiveTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObservableRejectAndDontRequeueRecoverer observableRecoverer;
@Autowired
private RetryQueuesInterceptor retryQueues;
@Test
public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception {
int nb = 2;
CountDownLatch latch = new CountDownLatch(nb);
observableRecoverer.setObserver(() -> latch.countDown());
for (int i = 1; i <= nb; i++) {
rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i);
}
latch.await();
}
@Test
public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception {
int nb = 2;
CountDownLatch latch = new CountDownLatch(nb);
retryQueues.setObserver(() -> latch.countDown());
for (int i = 1; i <= nb; i++) {
rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i);
}
latch.await();
}
}
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include
resource="org/springframework/boot/logging/logback/base.xml" />
<logger name="org.springframework" level="INFO" />
</configuration>