BAEL-4769 Additions for 'Querying the Query Model in Axon' (#12578)

* Update to latest and include script to start axon server

* Add the two other types of queries

* Add unit and integration tests for the queries

* BAEL-4769 Formatting

Co-authored-by: bipster <openbip@gmail.com>
This commit is contained in:
Gerard Klijs
2022-09-06 20:04:07 +02:00
committed by GitHub
parent c1a759900e
commit 2f9967dcd3
17 changed files with 730 additions and 80 deletions
@@ -0,0 +1,39 @@
package com.baeldung.axon.coreapi.queries;
import java.util.Objects;
public class OrderUpdatesQuery {
private final String orderId;
public OrderUpdatesQuery(String orderId) {
this.orderId = orderId;
}
public String getOrderId() {
return orderId;
}
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OrderUpdatesQuery that = (OrderUpdatesQuery) o;
return Objects.equals(orderId, that.orderId);
}
@Override
public int hashCode() {
return Objects.hash(orderId);
}
@Override
public String toString() {
return "OrderUpdatesQuery{" +
"orderId='" + orderId + '\'' +
'}';
}
}
@@ -0,0 +1,39 @@
package com.baeldung.axon.coreapi.queries;
import java.util.Objects;
public class TotalProductsShippedQuery {
private final String productId;
public TotalProductsShippedQuery(String productId) {
this.productId = productId;
}
public String getProductId() {
return productId;
}
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TotalProductsShippedQuery that = (TotalProductsShippedQuery) o;
return Objects.equals(productId, that.productId);
}
@Override
public int hashCode() {
return Objects.hash(productId);
}
@Override
public String toString() {
return "TotalProductsShippedQuery{" +
"productId='" + productId + '\'' +
'}';
}
}
@@ -6,15 +6,15 @@ import com.baeldung.axon.coreapi.commands.CreateOrderCommand;
import com.baeldung.axon.coreapi.commands.DecrementProductCountCommand;
import com.baeldung.axon.coreapi.commands.IncrementProductCountCommand;
import com.baeldung.axon.coreapi.commands.ShipOrderCommand;
import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
import com.baeldung.axon.coreapi.queries.Order;
import com.baeldung.axon.querymodel.OrderQueryService;
import com.baeldung.axon.querymodel.OrderResponse;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.queryhandling.QueryGateway;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.UUID;
@@ -24,11 +24,11 @@ import java.util.concurrent.CompletableFuture;
public class OrderRestEndpoint {
private final CommandGateway commandGateway;
private final QueryGateway queryGateway;
private final OrderQueryService orderQueryService;
public OrderRestEndpoint(CommandGateway commandGateway, QueryGateway queryGateway) {
public OrderRestEndpoint(CommandGateway commandGateway, OrderQueryService orderQueryService) {
this.commandGateway = commandGateway;
this.queryGateway = queryGateway;
this.orderQueryService = orderQueryService;
}
@PostMapping("/ship-order")
@@ -88,7 +88,17 @@ public class OrderRestEndpoint {
}
@GetMapping("/all-orders")
public CompletableFuture<List<Order>> findAllOrders() {
return queryGateway.query(new FindAllOrderedProductsQuery(), ResponseTypes.multipleInstancesOf(Order.class));
public CompletableFuture<List<OrderResponse>> findAllOrders() {
return orderQueryService.findAllOrders();
}
@GetMapping("/total-shipped/{product-id}")
public Integer totalShipped(@PathVariable("product-id") String productId) {
return orderQueryService.totalShipped(productId);
}
@GetMapping(path = "/order-updates/{order-id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> orderUpdates(@PathVariable("order-id") String orderId) {
return orderQueryService.orderUpdates(orderId);
}
}
@@ -0,0 +1,130 @@
package com.baeldung.axon.querymodel;
import com.baeldung.axon.coreapi.events.OrderConfirmedEvent;
import com.baeldung.axon.coreapi.events.OrderCreatedEvent;
import com.baeldung.axon.coreapi.events.OrderShippedEvent;
import com.baeldung.axon.coreapi.events.ProductAddedEvent;
import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent;
import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent;
import com.baeldung.axon.coreapi.events.ProductRemovedEvent;
import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
import com.baeldung.axon.coreapi.queries.Order;
import com.baeldung.axon.coreapi.queries.OrderStatus;
import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@Service
@ProcessingGroup("orders")
public class InMemoryOrdersEventHandler implements OrdersEventHandler {
private final Map<String, Order> orders = new HashMap<>();
private final QueryUpdateEmitter emitter;
public InMemoryOrdersEventHandler(QueryUpdateEmitter emitter) {
this.emitter = emitter;
}
@EventHandler
public void on(OrderCreatedEvent event) {
String orderId = event.getOrderId();
orders.put(orderId, new Order(orderId));
}
@EventHandler
public void on(ProductAddedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.addProduct(event.getProductId());
emitUpdate(order);
return order;
});
}
@EventHandler
public void on(ProductCountIncrementedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.incrementProductInstance(event.getProductId());
emitUpdate(order);
return order;
});
}
@EventHandler
public void on(ProductCountDecrementedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.decrementProductInstance(event.getProductId());
emitUpdate(order);
return order;
});
}
@EventHandler
public void on(ProductRemovedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.removeProduct(event.getProductId());
emitUpdate(order);
return order;
});
}
@EventHandler
public void on(OrderConfirmedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.setOrderConfirmed();
emitUpdate(order);
return order;
});
}
@EventHandler
public void on(OrderShippedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.setOrderShipped();
emitUpdate(order);
return order;
});
}
@QueryHandler
public List<Order> handle(FindAllOrderedProductsQuery query) {
return new ArrayList<>(orders.values());
}
@QueryHandler
public Integer handle(TotalProductsShippedQuery query) {
return orders.values()
.stream()
.filter(o -> o.getOrderStatus() == OrderStatus.SHIPPED)
.map(o -> Optional.ofNullable(o.getProducts()
.get(query.getProductId()))
.orElse(0))
.reduce(0, Integer::sum);
}
@QueryHandler
public Order handle(OrderUpdatesQuery query) {
return orders.get(query.getOrderId());
}
private void emitUpdate(Order order) {
emitter.emit(OrderUpdatesQuery.class, q -> order.getOrderId()
.equals(q.getOrderId()), order);
}
@Override
public void reset(List<Order> orderList) {
orders.clear();
orderList.forEach(o -> orders.put(o.getOrderId(), o));
}
}
@@ -0,0 +1,22 @@
package com.baeldung.axon.querymodel;
import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Service;
@Service
public class LegacyQueryHandler {
@QueryHandler
public Integer handle(TotalProductsShippedQuery query) {
switch (query.getProductId()) {
case "Deluxe Chair":
return 234;
case "a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3":
return 10;
default:
return 0;
}
}
}
@@ -0,0 +1,53 @@
package com.baeldung.axon.querymodel;
import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
import com.baeldung.axon.coreapi.queries.Order;
import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.queryhandling.QueryGateway;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Service
public class OrderQueryService {
private final QueryGateway queryGateway;
public OrderQueryService(QueryGateway queryGateway) {
this.queryGateway = queryGateway;
}
public CompletableFuture<List<OrderResponse>> findAllOrders() {
return queryGateway.query(new FindAllOrderedProductsQuery(), ResponseTypes.multipleInstancesOf(Order.class))
.thenApply(r -> r.stream()
.map(OrderResponse::new)
.collect(Collectors.toList()));
}
public Integer totalShipped(String productId) {
return queryGateway.scatterGather(new TotalProductsShippedQuery(productId),
ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS)
.reduce(0, Integer::sum);
}
public Flux<OrderResponse> orderUpdates(String orderId) {
return subscriptionQuery(new OrderUpdatesQuery(orderId), ResponseTypes.instanceOf(Order.class)).map(OrderResponse::new);
}
private <Q, R> Flux<R> subscriptionQuery(Q query, ResponseType<R> resultType) {
SubscriptionQueryResult<R, R> result = queryGateway.subscriptionQuery(query, resultType, resultType);
return result.initialResult()
.concatWith(result.updates())
.doFinally(signal -> result.close());
}
}
@@ -0,0 +1,38 @@
package com.baeldung.axon.querymodel;
import com.baeldung.axon.coreapi.queries.Order;
import java.util.Map;
import static com.baeldung.axon.querymodel.OrderStatusResponse.toResponse;
public class OrderResponse {
private String orderId;
private Map<String, Integer> products;
private OrderStatusResponse orderStatus;
OrderResponse(Order order) {
this.orderId = order.getOrderId();
this.products = order.getProducts();
this.orderStatus = toResponse(order.getOrderStatus());
}
/**
* Added for the integration test, since it's using Jackson for the response
*/
OrderResponse() {
}
public String getOrderId() {
return orderId;
}
public Map<String, Integer> getProducts() {
return products;
}
public OrderStatusResponse getOrderStatus() {
return orderStatus;
}
}
@@ -0,0 +1,16 @@
package com.baeldung.axon.querymodel;
import com.baeldung.axon.coreapi.queries.OrderStatus;
public enum OrderStatusResponse {
CREATED, CONFIRMED, SHIPPED, UNKNOWN;
static OrderStatusResponse toResponse(OrderStatus status) {
for (OrderStatusResponse response : values()) {
if (response.toString().equals(status.toString())) {
return response;
}
}
return UNKNOWN;
}
}
@@ -9,78 +9,32 @@ import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent;
import com.baeldung.axon.coreapi.events.ProductRemovedEvent;
import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
import com.baeldung.axon.coreapi.queries.Order;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Service;
import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
@ProcessingGroup("orders")
public class OrdersEventHandler {
public interface OrdersEventHandler {
private final Map<String, Order> orders = new HashMap<>();
void on(OrderCreatedEvent event);
@EventHandler
public void on(OrderCreatedEvent event) {
String orderId = event.getOrderId();
orders.put(orderId, new Order(orderId));
}
void on(ProductAddedEvent event);
@EventHandler
public void on(ProductAddedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.addProduct(event.getProductId());
return order;
});
}
void on(ProductCountIncrementedEvent event);
@EventHandler
public void on(ProductCountIncrementedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.incrementProductInstance(event.getProductId());
return order;
});
}
void on(ProductCountDecrementedEvent event);
@EventHandler
public void on(ProductCountDecrementedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.decrementProductInstance(event.getProductId());
return order;
});
}
void on(ProductRemovedEvent event);
@EventHandler
public void on(ProductRemovedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.removeProduct(event.getProductId());
return order;
});
}
void on(OrderConfirmedEvent event);
@EventHandler
public void on(OrderConfirmedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.setOrderConfirmed();
return order;
});
}
void on(OrderShippedEvent event);
@EventHandler
public void on(OrderShippedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.setOrderShipped();
return order;
});
}
List<Order> handle(FindAllOrderedProductsQuery query);
@QueryHandler
public List<Order> handle(FindAllOrderedProductsQuery query) {
return new ArrayList<>(orders.values());
}
}
Integer handle(TotalProductsShippedQuery query);
Order handle(OrderUpdatesQuery query);
void reset(List<Order> orderList);
}
+12
View File
@@ -34,4 +34,16 @@ POST http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768/confirm
POST http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768/ship
### Retrieve shipped Deluxe Chairs
GET http://localhost:8080/total-shipped/Deluxe Chair
### Retrieve shipped a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3
GET http://localhost:8080/total-shipped/a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3
### Receive updates for 666a1661-474d-4046-8b12-8b5896312768
GET http://localhost:8080/order-updates/666a1661-474d-4046-8b12-8b5896312768
###