JAVA-15787 Moved spring-rector and spring-webflux-amqp to spring-reactive-modules
This commit is contained in:
+20
@@ -0,0 +1,20 @@
|
||||
package com.baeldung.reactorbus;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import reactor.Environment;
|
||||
import reactor.bus.EventBus;
|
||||
|
||||
@Configuration
|
||||
public class Config {
|
||||
|
||||
@Bean
|
||||
public Environment env() {
|
||||
return Environment.initializeIfEmpty().assignErrorJournal();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public EventBus createEventBus(Environment env) {
|
||||
return EventBus.create(env, Environment.THREAD_POOL);
|
||||
}
|
||||
}
|
||||
+29
@@ -0,0 +1,29 @@
|
||||
package com.baeldung.reactorbus;
|
||||
|
||||
import com.baeldung.reactorbus.consumer.NotificationConsumer;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import reactor.bus.EventBus;
|
||||
|
||||
import static reactor.bus.selector.Selectors.$;
|
||||
|
||||
@SpringBootApplication
|
||||
public class NotificationApplication implements CommandLineRunner {
|
||||
|
||||
@Autowired
|
||||
private EventBus eventBus;
|
||||
|
||||
@Autowired
|
||||
private NotificationConsumer notificationConsumer;
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
eventBus.on($("notificationConsumer"), notificationConsumer);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(NotificationApplication.class, args);
|
||||
}
|
||||
}
|
||||
+29
@@ -0,0 +1,29 @@
|
||||
package com.baeldung.reactorbus.consumer;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.baeldung.reactorbus.domain.NotificationData;
|
||||
import com.baeldung.reactorbus.service.NotificationService;
|
||||
|
||||
import reactor.bus.Event;
|
||||
import reactor.fn.Consumer;
|
||||
|
||||
@Service
|
||||
public class NotificationConsumer implements Consumer<Event<NotificationData>> {
|
||||
|
||||
@Autowired
|
||||
private NotificationService notificationService;
|
||||
|
||||
@Override
|
||||
public void accept(Event<NotificationData> notificationDataEvent) {
|
||||
|
||||
NotificationData notificationData = notificationDataEvent.getData();
|
||||
try {
|
||||
notificationService.initiateNotification(notificationData);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
+34
@@ -0,0 +1,34 @@
|
||||
package com.baeldung.reactorbus.controller;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import com.baeldung.reactorbus.domain.NotificationData;
|
||||
|
||||
import reactor.bus.Event;
|
||||
import reactor.bus.EventBus;
|
||||
|
||||
@RestController
|
||||
public class NotificationController {
|
||||
|
||||
@Autowired
|
||||
private EventBus eventBus;
|
||||
|
||||
@GetMapping("/startNotification/{param}")
|
||||
public void startNotification(@PathVariable Integer param) {
|
||||
|
||||
for (int i = 0; i < param; i++) {
|
||||
|
||||
NotificationData data = new NotificationData();
|
||||
data.setId(i);
|
||||
|
||||
eventBus.notify("notificationConsumer", Event.wrap(data));
|
||||
|
||||
System.out.println("Notification " + i + ": notification task submitted successfully");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
+42
@@ -0,0 +1,42 @@
|
||||
package com.baeldung.reactorbus.domain;
|
||||
|
||||
public class NotificationData {
|
||||
|
||||
private long id;
|
||||
private String name;
|
||||
private String email;
|
||||
private String mobile;
|
||||
|
||||
public long getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(long id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getEmail() {
|
||||
return email;
|
||||
}
|
||||
|
||||
public void setEmail(String email) {
|
||||
this.email = email;
|
||||
}
|
||||
|
||||
public String getMobile() {
|
||||
return mobile;
|
||||
}
|
||||
|
||||
public void setMobile(String mobile) {
|
||||
this.mobile = mobile;
|
||||
}
|
||||
|
||||
}
|
||||
+9
@@ -0,0 +1,9 @@
|
||||
package com.baeldung.reactorbus.service;
|
||||
|
||||
import com.baeldung.reactorbus.domain.NotificationData;
|
||||
|
||||
public interface NotificationService {
|
||||
|
||||
void initiateNotification(NotificationData notificationData) throws InterruptedException;
|
||||
|
||||
}
|
||||
+21
@@ -0,0 +1,21 @@
|
||||
package com.baeldung.reactorbus.service.impl;
|
||||
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.baeldung.reactorbus.domain.NotificationData;
|
||||
import com.baeldung.reactorbus.service.NotificationService;
|
||||
|
||||
@Service
|
||||
public class NotificationServiceimpl implements NotificationService {
|
||||
|
||||
@Override
|
||||
public void initiateNotification(NotificationData notificationData) throws InterruptedException {
|
||||
|
||||
System.out.println("Notification service started for Notification ID: " + notificationData.getId());
|
||||
|
||||
Thread.sleep(5000);
|
||||
|
||||
System.out.println("Notification service ended for Notification ID: " + notificationData.getId());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
|
||||
</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
</configuration>
|
||||
+17
@@ -0,0 +1,17 @@
|
||||
package com.baeldung;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import com.baeldung.reactorbus.NotificationApplication;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = NotificationApplication.class)
|
||||
public class SpringContextTest {
|
||||
|
||||
@Test
|
||||
public void whenSpringContextIsBootstrapped_thenNoExceptions() {
|
||||
}
|
||||
}
|
||||
+22
@@ -0,0 +1,22 @@
|
||||
package com.baeldung.reactorbus;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.web.server.LocalServerPort;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||||
public class NotificationApplicationIntegrationTest {
|
||||
|
||||
@LocalServerPort
|
||||
private int port;
|
||||
|
||||
@Test
|
||||
public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
|
||||
RestTemplate restTemplate = new RestTemplate();
|
||||
restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user