From 70acdb27ee5e90718266018b725c5824ff60ac62 Mon Sep 17 00:00:00 2001 From: psevestre Date: Fri, 2 Jun 2023 13:24:47 -0300 Subject: [PATCH] [BAEL-6275] PostgreSQL NOTIFY/LISTEN (#14153) * [BAEL-4849] Article code * [BAEL-4968] Article code * [BAEL-4968] Article code * [BAEL-4968] Article code * [BAEL-4968] Remove extra comments * [BAEL-5258] Article Code * [BAEL-2765] PKCE Support for Secret Clients * [BAEL-5698] Article code * [BAEL-5698] Article code * [BAEL-5905] Initial code * [BAEL-5905] Article code * [BAEL-5905] Relocate article code to new module * [BAEL-6275] PostgreSQL NOTIFY/LISTEN * [BAEL-6275] Minor correction --------- Co-authored-by: Philippe Sevestre --- messaging-modules/pom.xml | 1 + messaging-modules/postgres-notify/.gitignore | 1 + messaging-modules/postgres-notify/pom.xml | 78 +++++++++++++++++++ .../messaging/postgresql/Application.java | 12 +++ .../postgresql/config/CacheConfiguration.java | 34 ++++++++ .../config/ListenerConfiguration.java | 25 ++++++ .../config/NotifierConfiguration.java | 30 +++++++ .../controller/OrdersController.java | 53 +++++++++++++ .../messaging/postgresql/domain/Order.java | 21 +++++ .../postgresql/domain/OrderType.java | 12 +++ .../repository/OrdersRepository.java | 9 +++ .../service/NotificationHandler.java | 31 ++++++++ .../postgresql/service/NotifierService.java | 55 +++++++++++++ .../postgresql/service/OrdersService.java | 61 +++++++++++++++ .../src/main/resources/application.properties | 5 ++ .../src/main/resources/schema.sql | 7 ++ .../postgresql/ApplicationLiveTest.java | 47 +++++++++++ .../src/test/resources/application.properties | 3 + 18 files changed, 485 insertions(+) create mode 100644 messaging-modules/postgres-notify/.gitignore create mode 100644 messaging-modules/postgres-notify/pom.xml create mode 100644 messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/Application.java create mode 100644 messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/config/CacheConfiguration.java create mode 100644 messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/config/ListenerConfiguration.java create mode 100644 messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/config/NotifierConfiguration.java create mode 100644 messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/controller/OrdersController.java create mode 100644 messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/domain/Order.java create mode 100644 messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/domain/OrderType.java create mode 100644 messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/repository/OrdersRepository.java create mode 100644 messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/service/NotificationHandler.java create mode 100644 messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/service/NotifierService.java create mode 100644 messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/service/OrdersService.java create mode 100644 messaging-modules/postgres-notify/src/main/resources/application.properties create mode 100644 messaging-modules/postgres-notify/src/main/resources/schema.sql create mode 100644 messaging-modules/postgres-notify/src/test/java/com/baeldung/messaging/postgresql/ApplicationLiveTest.java create mode 100644 messaging-modules/postgres-notify/src/test/resources/application.properties diff --git a/messaging-modules/pom.xml b/messaging-modules/pom.xml index 71ff25d71b..6fd14f7c64 100644 --- a/messaging-modules/pom.xml +++ b/messaging-modules/pom.xml @@ -22,6 +22,7 @@ spring-amqp spring-apache-camel spring-jms + postgres-notify \ No newline at end of file diff --git a/messaging-modules/postgres-notify/.gitignore b/messaging-modules/postgres-notify/.gitignore new file mode 100644 index 0000000000..0776e6f133 --- /dev/null +++ b/messaging-modules/postgres-notify/.gitignore @@ -0,0 +1 @@ +/application-local.properties diff --git a/messaging-modules/postgres-notify/pom.xml b/messaging-modules/postgres-notify/pom.xml new file mode 100644 index 0000000000..174d66b7f5 --- /dev/null +++ b/messaging-modules/postgres-notify/pom.xml @@ -0,0 +1,78 @@ + + + 4.0.0 + postgres-notify + postgres-notify + PostgreSQL as a Message Broker + + + com.baeldung + messaging-modules + 0.0.1-SNAPSHOT + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-data-jdbc + + + org.postgresql + postgresql + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.boot + spring-boot-devtools + runtime + true + + + org.projectlombok + lombok + true + + + + + + + 1.8 + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + instance1 + + + + org.springframework.boot + spring-boot-maven-plugin + + -Dserver.port=8081 + + + + + + + + \ No newline at end of file diff --git a/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/Application.java b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/Application.java new file mode 100644 index 0000000000..b3b1f83d2f --- /dev/null +++ b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/Application.java @@ -0,0 +1,12 @@ +package com.baeldung.messaging.postgresql; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } +} \ No newline at end of file diff --git a/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/config/CacheConfiguration.java b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/config/CacheConfiguration.java new file mode 100644 index 0000000000..73763316c3 --- /dev/null +++ b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/config/CacheConfiguration.java @@ -0,0 +1,34 @@ +package com.baeldung.messaging.postgresql.config; + +import java.util.Arrays; +import java.util.Collection; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.cache.concurrent.ConcurrentMapCache; +import org.springframework.cache.support.SimpleCacheManager; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.ConcurrentLruCache; + +import com.baeldung.messaging.postgresql.domain.Order; + +@Configuration +public class CacheConfiguration { + + @Bean + Cache ordersCache(CacheManager cm) { + return cm.getCache("orders"); + } + + @Bean + @ConditionalOnMissingBean + CacheManager defaultCacheManager() { + SimpleCacheManager cm = new SimpleCacheManager(); + Cache cache = new ConcurrentMapCache("orders",false); + cm.setCaches(Arrays.asList(cache)); + + return cm; + } +} diff --git a/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/config/ListenerConfiguration.java b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/config/ListenerConfiguration.java new file mode 100644 index 0000000000..b053ba8bc2 --- /dev/null +++ b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/config/ListenerConfiguration.java @@ -0,0 +1,25 @@ +package com.baeldung.messaging.postgresql.config; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.baeldung.messaging.postgresql.service.NotifierService; +import com.baeldung.messaging.postgresql.service.NotificationHandler; + +import lombok.extern.slf4j.Slf4j; + +@Configuration +@Slf4j +public class ListenerConfiguration { + + @Bean + CommandLineRunner startListener(NotifierService notifier, NotificationHandler handler) { + return (args) -> { + log.info("Starting order listener thread..."); + Runnable listener = notifier.createNotificationHandler(handler); + Thread t = new Thread(listener, "order-listener"); + t.start(); + }; + } +} diff --git a/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/config/NotifierConfiguration.java b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/config/NotifierConfiguration.java new file mode 100644 index 0000000000..51d6016c57 --- /dev/null +++ b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/config/NotifierConfiguration.java @@ -0,0 +1,30 @@ +package com.baeldung.messaging.postgresql.config; + +import java.util.Properties; + +import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; + +import com.baeldung.messaging.postgresql.service.NotifierService; +import com.zaxxer.hikari.util.DriverDataSource; + +@Configuration +public class NotifierConfiguration { + + @Bean + NotifierService notifier(DataSourceProperties props) { + + DriverDataSource ds = new DriverDataSource( + props.determineUrl(), + props.determineDriverClassName(), + new Properties(), + props.determineUsername(), + props.determinePassword()); + + JdbcTemplate tpl = new JdbcTemplate(ds); + + return new NotifierService(tpl); + } +} diff --git a/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/controller/OrdersController.java b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/controller/OrdersController.java new file mode 100644 index 0000000000..70daa14abd --- /dev/null +++ b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/controller/OrdersController.java @@ -0,0 +1,53 @@ +package com.baeldung.messaging.postgresql.controller; + +import java.math.BigDecimal; +import java.util.Optional; + +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; + +import com.baeldung.messaging.postgresql.domain.Order; +import com.baeldung.messaging.postgresql.domain.OrderType; +import com.baeldung.messaging.postgresql.service.OrdersService; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@RestController +@RequiredArgsConstructor +@Slf4j +public class OrdersController { + + private final OrdersService orders; + + @PostMapping("/orders/sell") + public ResponseEntity postSellOrder(String symbol, BigDecimal quantity, BigDecimal price) { + log.info("postSellOrder: symbol={},quantity={},price={}", symbol,quantity,price); + Order order = orders.createOrder(OrderType.SELL, symbol, quantity, price); + return ResponseEntity.status(HttpStatus.CREATED).body(order); + } + + @PostMapping("/orders/buy") + public ResponseEntity postBuyOrder(String symbol, BigDecimal quantity, BigDecimal price) { + log.info("postBuyOrder: symbol={},quantity={},price={}", symbol,quantity,price); + Order order = orders.createOrder(OrderType.BUY, symbol, quantity, price); + return ResponseEntity.status(HttpStatus.CREATED).body(order); + } + + @GetMapping("/orders/{id}") + public ResponseEntity getOrderById(@PathVariable Long id) { + + Optional o = orders.findById(id); + if (o.isEmpty()) { + return ResponseEntity.notFound().build(); + } + + return ResponseEntity.ok(o.get()); + + } + +} diff --git a/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/domain/Order.java b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/domain/Order.java new file mode 100644 index 0000000000..d7b19aa10d --- /dev/null +++ b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/domain/Order.java @@ -0,0 +1,21 @@ +package com.baeldung.messaging.postgresql.domain; + +import java.math.BigDecimal; + +import org.springframework.data.annotation.Id; +import org.springframework.data.relational.core.mapping.Table; + +import lombok.Data; +import lombok.ToString; + +@Data +@ToString +@Table(name = "orders") +public class Order { + @Id + private Long id; + private String symbol; + private OrderType orderType; + private BigDecimal price; + private BigDecimal quantity; +} diff --git a/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/domain/OrderType.java b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/domain/OrderType.java new file mode 100644 index 0000000000..6c053bc25c --- /dev/null +++ b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/domain/OrderType.java @@ -0,0 +1,12 @@ +package com.baeldung.messaging.postgresql.domain; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +@Getter +public enum OrderType { + BUY('B'), + SELL('S'); + private final char c; +} diff --git a/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/repository/OrdersRepository.java b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/repository/OrdersRepository.java new file mode 100644 index 0000000000..d131017930 --- /dev/null +++ b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/repository/OrdersRepository.java @@ -0,0 +1,9 @@ +package com.baeldung.messaging.postgresql.repository; + +import org.springframework.data.repository.CrudRepository; + +import com.baeldung.messaging.postgresql.domain.Order; + +public interface OrdersRepository extends CrudRepository{ + +} diff --git a/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/service/NotificationHandler.java b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/service/NotificationHandler.java new file mode 100644 index 0000000000..61b970f3a2 --- /dev/null +++ b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/service/NotificationHandler.java @@ -0,0 +1,31 @@ +package com.baeldung.messaging.postgresql.service; + +import java.util.Optional; +import java.util.function.Consumer; + +import org.postgresql.PGNotification; +import org.springframework.cache.Cache; +import org.springframework.stereotype.Component; + +import com.baeldung.messaging.postgresql.domain.Order; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Component +@Slf4j +@RequiredArgsConstructor +public class NotificationHandler implements Consumer{ + + private final OrdersService orders; + + @Override + public void accept(PGNotification t) { + log.info("Notification received: pid={}, name={}, param={}",t.getPID(),t.getName(),t.getParameter()); + Optional order = orders.findById(Long.valueOf(t.getParameter())); + if ( !order.isEmpty()) { + log.info("order details: {}", order.get()); + } + } + +} diff --git a/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/service/NotifierService.java b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/service/NotifierService.java new file mode 100644 index 0000000000..bc8215c7b3 --- /dev/null +++ b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/service/NotifierService.java @@ -0,0 +1,55 @@ +package com.baeldung.messaging.postgresql.service; + +import java.sql.Connection; +import java.util.function.Consumer; + +import org.postgresql.PGConnection; +import org.postgresql.PGNotification; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import com.baeldung.messaging.postgresql.domain.Order; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class NotifierService { + private static final String ORDERS_CHANNEL = "orders"; + private final JdbcTemplate tpl; + + + @Transactional + public void notifyOrderCreated(Order order) { + tpl.execute("NOTIFY " + ORDERS_CHANNEL + ", '" + order.getId() + "'"); + } + + public Runnable createNotificationHandler(Consumer consumer) { + + return () -> { + tpl.execute((Connection c) -> { + log.info("notificationHandler: sending LISTEN command..."); + c.createStatement().execute("LISTEN " + ORDERS_CHANNEL); + + PGConnection pgconn = c.unwrap(PGConnection.class); + + while(!Thread.currentThread().isInterrupted()) { + PGNotification[] nts = pgconn.getNotifications(10000); + if ( nts == null || nts.length == 0 ) { + continue; + } + + for( PGNotification nt : nts) { + consumer.accept(nt); + } + } + + return 0; + }); + + }; + } +} diff --git a/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/service/OrdersService.java b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/service/OrdersService.java new file mode 100644 index 0000000000..cc369c1f3e --- /dev/null +++ b/messaging-modules/postgres-notify/src/main/java/com/baeldung/messaging/postgresql/service/OrdersService.java @@ -0,0 +1,61 @@ +package com.baeldung.messaging.postgresql.service; + +import java.math.BigDecimal; +import java.util.Optional; + +import javax.sql.DataSource; + +import org.springframework.cache.Cache; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import com.baeldung.messaging.postgresql.domain.Order; +import com.baeldung.messaging.postgresql.domain.OrderType; +import com.baeldung.messaging.postgresql.repository.OrdersRepository; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Service +@RequiredArgsConstructor +@Slf4j +public class OrdersService { + private final OrdersRepository repo; + private final NotifierService notifier; + private final Cache ordersCache; + + @Transactional + public Order createOrder(OrderType orderType, String symbol, BigDecimal quantity, BigDecimal price) { + + Order order = new Order(); + order.setOrderType(orderType); + order.setSymbol(symbol); + order.setQuantity(quantity); + order.setPrice(price); + order = repo.save(order); + + notifier.notifyOrderCreated(order); + + return order; + + } + + @Transactional(readOnly = true) + public Optional findById(Long id) { + Optional o = Optional.ofNullable(ordersCache.get(id, Order.class)); + if ( !o.isEmpty() ) { + log.info("findById: cache hit, id={}",id); + return o; + } + + log.info("findById: cache miss, id={}",id); + o = repo.findById(id); + if ( o.isEmpty()) { + return o; + } + + ordersCache.put(id, o.get()); + return o; + } + +} diff --git a/messaging-modules/postgres-notify/src/main/resources/application.properties b/messaging-modules/postgres-notify/src/main/resources/application.properties new file mode 100644 index 0000000000..836e01cdca --- /dev/null +++ b/messaging-modules/postgres-notify/src/main/resources/application.properties @@ -0,0 +1,5 @@ +# Replace the properties below with proper values for your environment +spring.sql.init.mode=ALWAYS +#spring.datasource.url=jdbc:postgresql://some-postgresql-host/some-postgresql-database +#spring.datasource.username=your-postgresql-username +#spring.datasource.password=your-postgresql-password diff --git a/messaging-modules/postgres-notify/src/main/resources/schema.sql b/messaging-modules/postgres-notify/src/main/resources/schema.sql new file mode 100644 index 0000000000..fad33509bd --- /dev/null +++ b/messaging-modules/postgres-notify/src/main/resources/schema.sql @@ -0,0 +1,7 @@ +create table if not exists orders ( + id serial primary key, + symbol varchar(16) not null, + order_type varchar(8) not null, + price NUMERIC(10,2) not null, + quantity NUMERIC(10,2) not null +); diff --git a/messaging-modules/postgres-notify/src/test/java/com/baeldung/messaging/postgresql/ApplicationLiveTest.java b/messaging-modules/postgres-notify/src/test/java/com/baeldung/messaging/postgresql/ApplicationLiveTest.java new file mode 100644 index 0000000000..5311793fe7 --- /dev/null +++ b/messaging-modules/postgres-notify/src/test/java/com/baeldung/messaging/postgresql/ApplicationLiveTest.java @@ -0,0 +1,47 @@ +package com.baeldung.messaging.postgresql; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.test.context.jdbc.Sql; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import com.baeldung.messaging.postgresql.domain.Order; + +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) +@Sql("/schema.sql") +class ApplicationLiveTest { + + + @LocalServerPort + int localPort; + + @Autowired + TestRestTemplate client; + + @Test + void whenCreateBuyOrder_thenSuccess() { + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); + + MultiValueMap data= new LinkedMultiValueMap<>(); + data.add("symbol", "BAEL"); + data.add("price", "14.56"); + data.add("quantity", "100"); + HttpEntity> request = new HttpEntity>(data, headers); + + client.postForEntity("http://localhost:" + localPort + "/orders/buy", data, Order.class); + + } + +} diff --git a/messaging-modules/postgres-notify/src/test/resources/application.properties b/messaging-modules/postgres-notify/src/test/resources/application.properties new file mode 100644 index 0000000000..ebc3ef54e1 --- /dev/null +++ b/messaging-modules/postgres-notify/src/test/resources/application.properties @@ -0,0 +1,3 @@ +spring.datasource.url=jdbc:postgresql://localhost:5432/baeldung +spring.datasource.username=baeldung +spring.datasource.password=SqD64PtsGhDXjn9f \ No newline at end of file