[BAEL-6275] PostgreSQL NOTIFY/LISTEN (#14153)

* [BAEL-4849] Article code

* [BAEL-4968] Article code

* [BAEL-4968] Article code

* [BAEL-4968] Article code

* [BAEL-4968] Remove extra comments

* [BAEL-5258] Article Code

* [BAEL-2765] PKCE Support for Secret Clients

* [BAEL-5698] Article code

* [BAEL-5698] Article code

* [BAEL-5905] Initial code

* [BAEL-5905] Article code

* [BAEL-5905] Relocate article code to new module

* [BAEL-6275] PostgreSQL NOTIFY/LISTEN

* [BAEL-6275] Minor correction

---------

Co-authored-by: Philippe Sevestre <psevestre@gmail.com>
This commit is contained in:
psevestre
2023-06-02 13:24:47 -03:00
committed by GitHub
parent 2e023a6ba2
commit 70acdb27ee
18 changed files with 485 additions and 0 deletions
@@ -0,0 +1,12 @@
package com.baeldung.messaging.postgresql;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@@ -0,0 +1,34 @@
package com.baeldung.messaging.postgresql.config;
import java.util.Arrays;
import java.util.Collection;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.cache.concurrent.ConcurrentMapCache;
import org.springframework.cache.support.SimpleCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ConcurrentLruCache;
import com.baeldung.messaging.postgresql.domain.Order;
@Configuration
public class CacheConfiguration {
@Bean
Cache ordersCache(CacheManager cm) {
return cm.getCache("orders");
}
@Bean
@ConditionalOnMissingBean
CacheManager defaultCacheManager() {
SimpleCacheManager cm = new SimpleCacheManager();
Cache cache = new ConcurrentMapCache("orders",false);
cm.setCaches(Arrays.asList(cache));
return cm;
}
}
@@ -0,0 +1,25 @@
package com.baeldung.messaging.postgresql.config;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.baeldung.messaging.postgresql.service.NotifierService;
import com.baeldung.messaging.postgresql.service.NotificationHandler;
import lombok.extern.slf4j.Slf4j;
@Configuration
@Slf4j
public class ListenerConfiguration {
@Bean
CommandLineRunner startListener(NotifierService notifier, NotificationHandler handler) {
return (args) -> {
log.info("Starting order listener thread...");
Runnable listener = notifier.createNotificationHandler(handler);
Thread t = new Thread(listener, "order-listener");
t.start();
};
}
}
@@ -0,0 +1,30 @@
package com.baeldung.messaging.postgresql.config;
import java.util.Properties;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import com.baeldung.messaging.postgresql.service.NotifierService;
import com.zaxxer.hikari.util.DriverDataSource;
@Configuration
public class NotifierConfiguration {
@Bean
NotifierService notifier(DataSourceProperties props) {
DriverDataSource ds = new DriverDataSource(
props.determineUrl(),
props.determineDriverClassName(),
new Properties(),
props.determineUsername(),
props.determinePassword());
JdbcTemplate tpl = new JdbcTemplate(ds);
return new NotifierService(tpl);
}
}
@@ -0,0 +1,53 @@
package com.baeldung.messaging.postgresql.controller;
import java.math.BigDecimal;
import java.util.Optional;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import com.baeldung.messaging.postgresql.domain.Order;
import com.baeldung.messaging.postgresql.domain.OrderType;
import com.baeldung.messaging.postgresql.service.OrdersService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@RestController
@RequiredArgsConstructor
@Slf4j
public class OrdersController {
private final OrdersService orders;
@PostMapping("/orders/sell")
public ResponseEntity<Order> postSellOrder(String symbol, BigDecimal quantity, BigDecimal price) {
log.info("postSellOrder: symbol={},quantity={},price={}", symbol,quantity,price);
Order order = orders.createOrder(OrderType.SELL, symbol, quantity, price);
return ResponseEntity.status(HttpStatus.CREATED).body(order);
}
@PostMapping("/orders/buy")
public ResponseEntity<Order> postBuyOrder(String symbol, BigDecimal quantity, BigDecimal price) {
log.info("postBuyOrder: symbol={},quantity={},price={}", symbol,quantity,price);
Order order = orders.createOrder(OrderType.BUY, symbol, quantity, price);
return ResponseEntity.status(HttpStatus.CREATED).body(order);
}
@GetMapping("/orders/{id}")
public ResponseEntity<Order> getOrderById(@PathVariable Long id) {
Optional<Order> o = orders.findById(id);
if (o.isEmpty()) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(o.get());
}
}
@@ -0,0 +1,21 @@
package com.baeldung.messaging.postgresql.domain;
import java.math.BigDecimal;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Table(name = "orders")
public class Order {
@Id
private Long id;
private String symbol;
private OrderType orderType;
private BigDecimal price;
private BigDecimal quantity;
}
@@ -0,0 +1,12 @@
package com.baeldung.messaging.postgresql.domain;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
@Getter
public enum OrderType {
BUY('B'),
SELL('S');
private final char c;
}
@@ -0,0 +1,9 @@
package com.baeldung.messaging.postgresql.repository;
import org.springframework.data.repository.CrudRepository;
import com.baeldung.messaging.postgresql.domain.Order;
public interface OrdersRepository extends CrudRepository<Order, Long>{
}
@@ -0,0 +1,31 @@
package com.baeldung.messaging.postgresql.service;
import java.util.Optional;
import java.util.function.Consumer;
import org.postgresql.PGNotification;
import org.springframework.cache.Cache;
import org.springframework.stereotype.Component;
import com.baeldung.messaging.postgresql.domain.Order;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
@RequiredArgsConstructor
public class NotificationHandler implements Consumer<PGNotification>{
private final OrdersService orders;
@Override
public void accept(PGNotification t) {
log.info("Notification received: pid={}, name={}, param={}",t.getPID(),t.getName(),t.getParameter());
Optional<Order> order = orders.findById(Long.valueOf(t.getParameter()));
if ( !order.isEmpty()) {
log.info("order details: {}", order.get());
}
}
}
@@ -0,0 +1,55 @@
package com.baeldung.messaging.postgresql.service;
import java.sql.Connection;
import java.util.function.Consumer;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baeldung.messaging.postgresql.domain.Order;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class NotifierService {
private static final String ORDERS_CHANNEL = "orders";
private final JdbcTemplate tpl;
@Transactional
public void notifyOrderCreated(Order order) {
tpl.execute("NOTIFY " + ORDERS_CHANNEL + ", '" + order.getId() + "'");
}
public Runnable createNotificationHandler(Consumer<PGNotification> consumer) {
return () -> {
tpl.execute((Connection c) -> {
log.info("notificationHandler: sending LISTEN command...");
c.createStatement().execute("LISTEN " + ORDERS_CHANNEL);
PGConnection pgconn = c.unwrap(PGConnection.class);
while(!Thread.currentThread().isInterrupted()) {
PGNotification[] nts = pgconn.getNotifications(10000);
if ( nts == null || nts.length == 0 ) {
continue;
}
for( PGNotification nt : nts) {
consumer.accept(nt);
}
}
return 0;
});
};
}
}
@@ -0,0 +1,61 @@
package com.baeldung.messaging.postgresql.service;
import java.math.BigDecimal;
import java.util.Optional;
import javax.sql.DataSource;
import org.springframework.cache.Cache;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baeldung.messaging.postgresql.domain.Order;
import com.baeldung.messaging.postgresql.domain.OrderType;
import com.baeldung.messaging.postgresql.repository.OrdersRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Service
@RequiredArgsConstructor
@Slf4j
public class OrdersService {
private final OrdersRepository repo;
private final NotifierService notifier;
private final Cache ordersCache;
@Transactional
public Order createOrder(OrderType orderType, String symbol, BigDecimal quantity, BigDecimal price) {
Order order = new Order();
order.setOrderType(orderType);
order.setSymbol(symbol);
order.setQuantity(quantity);
order.setPrice(price);
order = repo.save(order);
notifier.notifyOrderCreated(order);
return order;
}
@Transactional(readOnly = true)
public Optional<Order> findById(Long id) {
Optional<Order> o = Optional.ofNullable(ordersCache.get(id, Order.class));
if ( !o.isEmpty() ) {
log.info("findById: cache hit, id={}",id);
return o;
}
log.info("findById: cache miss, id={}",id);
o = repo.findById(id);
if ( o.isEmpty()) {
return o;
}
ordersCache.put(id, o.get());
return o;
}
}
@@ -0,0 +1,5 @@
# Replace the properties below with proper values for your environment
spring.sql.init.mode=ALWAYS
#spring.datasource.url=jdbc:postgresql://some-postgresql-host/some-postgresql-database
#spring.datasource.username=your-postgresql-username
#spring.datasource.password=your-postgresql-password
@@ -0,0 +1,7 @@
create table if not exists orders (
id serial primary key,
symbol varchar(16) not null,
order_type varchar(8) not null,
price NUMERIC(10,2) not null,
quantity NUMERIC(10,2) not null
);