diff --git a/order-fulfillment-workflow/README.md b/order-fulfillment-workflow/README.md new file mode 100644 index 00000000..f1db42f5 --- /dev/null +++ b/order-fulfillment-workflow/README.md @@ -0,0 +1,103 @@ +# Order Fulfillment Workflow + +Sample project showing how to model an order fulfillment process using the +**Axoniq Workflow Engine**. The workflow is written as plain imperative Java — +the engine handles event sourcing, crash recovery, and audit trails. + +```java +@Workflow(idProperty = "orderId", startOnEvent = "io.axoniq.demo.orderfulfillment.api.OrderPlaced") +public void execute(SimpleWorkflowContext ctx) { + // Register the wait FIRST, so the workflow is subscribed before any payment-confirmation + // event has a chance to land. + var paymentConfirmation = ctx.waitForEvent("awaitPayment", PaymentConfirmed.class, + associate(payloadProperty("orderId"), equalsTo(orderId)), Duration.ofMinutes(15)); + + var reserved = ctx.awaitExecute("reserveStock", payload, Boolean.class, inventory::reserveStock); + if (!reserved) { + paymentConfirmation.cancel("Stock unavailable"); + ctx.fail(new RuntimeException("Stock unavailable")); + return; + } + ctx.awaitExecute("initiatePayment", payload, payment::initiatePayment, + Duration.ofSeconds(30), + baseName("InitiatingPaymentForCustomer").namespace("io.axoniq.demo.orderfulfillment.api")); + + paymentConfirmation.await(); + + // The Completed event of `shipOrder` (`ShipOrderCompleted`, in the api namespace) is what the + // projection subscribes to — no eventGateway.publish anywhere inside the workflow. + var shipResult = ctx.awaitExecute("shipOrder", payload, shipping::shipOrder, + Duration.ofSeconds(30), namespace("io.axoniq.demo.orderfulfillment.api")); + ctx.awaitExecute("notifyCustomer", Boolean.class, () -> { notifications.sendConfirmation(email); return true; }); +} +``` + +## Prerequisites + +- Java 21+ +- Maven 3.9+ +- Docker (for Axon Server) +- The Axon Workflow Engine (`io.axoniq.framework.workflow:*:1.0.0-SNAPSHOT`) installed in the local Maven repository + +If the workflow engine isn't published yet, build it locally first: + +```bash +git clone git@github.com:AxonIQ/extension-workflow.git +cd extension-workflow +mvn clean install -DskipTests +``` + +## Running the application + +```bash +docker compose up -d +mvn spring-boot:run +``` + +The application starts on port **9090**. + +### REST endpoints + +| Method | Path | Description | +|--------|-------------------------------|---------------------------------------------| +| POST | `/orders?customerId=&email=&amount=` | Place a new order. Returns the order id. | +| POST | `/orders/{orderId}/payment` | Confirm payment and resume the workflow. | +| GET | `/orders/{orderId}` | Read the order's projected status. | + +### Quick demo + +```bash +# 1. place an order +ORDER=$(curl -s -X POST 'http://localhost:9090/orders?customerId=alice&email=alice@example.com&amount=99.95') + +# 2. confirm payment +curl -X POST "http://localhost:9090/orders/$ORDER/payment" + +# 3. observe the projected status +curl "http://localhost:9090/orders/$ORDER" +``` + +## Integration test + +```bash +mvn verify +``` + +`OrderFulfillmentIT` boots the application against a real Axon Server (started +via Testcontainers), places an order, confirms payment, and asserts that the +projection reaches the `SHIPPED` state. + +## What this sample demonstrates + +* `@Workflow` with `idProperty` and `startOnEvent` +* `awaitExecute` with payload and typed return value +* `awaitExecute` with timeout and event-name customization (`baseName(...)`) +* `awaitEvent` correlated to the workflow instance via `associate(payloadProperty(...), equalsTo(...))` +* `ctx.fail(...)` to terminate a workflow with an error +* A standard `@EventHandler` projection consuming events emitted by the workflow's actions + +## Cleanup + +```bash +docker compose down -v +``` diff --git a/order-fulfillment-workflow/docker-compose.yaml b/order-fulfillment-workflow/docker-compose.yaml new file mode 100644 index 00000000..9b4ed015 --- /dev/null +++ b/order-fulfillment-workflow/docker-compose.yaml @@ -0,0 +1,20 @@ +services: + axon-server: + image: docker.axoniq.io/axoniq/axonserver:2026.0.0 + container_name: order-fulfillment-axon-server + ports: + - "8024:8024" + - "8124:8124" + environment: + axoniq.axonserver.standalone-dcb: true + axoniq_axonserver_hostname: axon-server + axoniq_axonserver_devmode_enabled: true + volumes: + - data:/axonserver/data + - events:/axonserver/events + +volumes: + data: + driver: local + events: + driver: local diff --git a/order-fulfillment-workflow/pom.xml b/order-fulfillment-workflow/pom.xml new file mode 100644 index 00000000..9430955b --- /dev/null +++ b/order-fulfillment-workflow/pom.xml @@ -0,0 +1,187 @@ + + + 4.0.0 + + io.axoniq.demo + order-fulfillment-workflow + 0.0.1-SNAPSHOT + Order Fulfillment Workflow + Sample showing how to model an Order Fulfillment process with the Axon Workflow Engine. + + + 21 + 21 + 21 + UTF-8 + + 5.1.0 + 5.1.0 + 1.0.0-SNAPSHOT + 4.0.6 + 2.0.5 + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + org.axonframework + axon-framework-bom + ${axon.version} + pom + import + + + io.axoniq.framework + axoniq-framework-bom + ${axoniq-framework.version} + pom + import + + + org.testcontainers + testcontainers-bom + ${testcontainers.version} + pom + import + + + + + + + + io.axoniq.framework.workflow + axon-workflow-spring-boot + ${axon-workflow.version} + + + + + org.axonframework.extensions.spring + axon-spring-boot-starter + + + io.axoniq.framework + axon-server-connector + + + + + org.springframework.boot + spring-boot-starter-web + + + com.fasterxml.jackson.core + jackson-databind + + + + + io.axoniq.framework.workflow + axon-workflow-test + ${axon-workflow.version} + test + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.boot + spring-boot-resttestclient + test + + + org.springframework.boot + spring-boot-restclient + test + + + io.axoniq.framework + axoniq-testcontainer + test + + + org.testcontainers + testcontainers + test + + + org.testcontainers + testcontainers-junit-jupiter + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.14.1 + + ${java.version} + ${java.version} + true + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + org.apache.maven.plugins + maven-surefire-plugin + 3.5.4 + + + org.apache.maven.plugins + maven-failsafe-plugin + 3.5.4 + + + + integration-test + verify + + + + + + + + + + central-portal-snapshots + Central Portal Snapshots + https://central.sonatype.com/repository/maven-snapshots/ + + false + + + true + + + + diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/OrderFulfillmentApplication.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/OrderFulfillmentApplication.java new file mode 100644 index 00000000..6b8027cf --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/OrderFulfillmentApplication.java @@ -0,0 +1,12 @@ +package io.axoniq.demo.orderfulfillment; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class OrderFulfillmentApplication { + + public static void main(String[] args) { + SpringApplication.run(OrderFulfillmentApplication.class, args); + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/InitiatingPaymentForCustomerStarted.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/InitiatingPaymentForCustomerStarted.java new file mode 100644 index 00000000..3c6fc569 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/InitiatingPaymentForCustomerStarted.java @@ -0,0 +1,10 @@ +package io.axoniq.demo.orderfulfillment.api; + +/** + * Emitted automatically by the workflow engine when the {@code initiatePayment} step starts. + * Used as a synchronisation point: by the time this event is in the store, the workflow has + * already registered its {@code awaitPayment} wait (the wait is registered at the very top + * of the workflow, before any {@code awaitExecute} step runs). + */ +public record InitiatingPaymentForCustomerStarted(String orderId) { +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderDelivered.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderDelivered.java new file mode 100644 index 00000000..66d513e4 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderDelivered.java @@ -0,0 +1,10 @@ +package io.axoniq.demo.orderfulfillment.api; + +import org.axonframework.messaging.eventhandling.annotation.Event; + +@Event +public record OrderDelivered( + String orderId, + String trackingNumber +) { +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderFailed.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderFailed.java new file mode 100644 index 00000000..43ba23aa --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderFailed.java @@ -0,0 +1,10 @@ +package io.axoniq.demo.orderfulfillment.api; + +import org.axonframework.messaging.eventhandling.annotation.Event; + +@Event +public record OrderFailed( + String orderId, + String reason +) { +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderPlaced.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderPlaced.java new file mode 100644 index 00000000..66bbb494 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderPlaced.java @@ -0,0 +1,19 @@ +package io.axoniq.demo.orderfulfillment.api; + +import org.axonframework.messaging.eventhandling.annotation.Event; + +@Event +public record OrderPlaced( + String orderId, + String customerId, + String email, + double amount, + String originCity, + double originLat, + double originLng, + String destinationCity, + double destinationLat, + double destinationLng, + String scenario +) { +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/PaymentConfirmed.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/PaymentConfirmed.java new file mode 100644 index 00000000..608aae8b --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/PaymentConfirmed.java @@ -0,0 +1,10 @@ +package io.axoniq.demo.orderfulfillment.api; + +import org.axonframework.messaging.eventhandling.annotation.Event; + +@Event +public record PaymentConfirmed( + String orderId, + String transactionId +) { +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/ShipOrderCompleted.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/ShipOrderCompleted.java new file mode 100644 index 00000000..b6572a68 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/ShipOrderCompleted.java @@ -0,0 +1,17 @@ +package io.axoniq.demo.orderfulfillment.api; + +/** + * Emitted automatically by the workflow engine when the {@code shipOrder} step completes. + * The Completed event's payload is whatever the action returned — here {@code orderId}, + * {@code trackingNumber} and the route coordinates so downstream simulators can animate + * truck movement without re-reading the projection. + */ +public record ShipOrderCompleted( + String orderId, + String trackingNumber, + double originLat, + double originLng, + double destinationLat, + double destinationLng +) { +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/TruckLocationUpdated.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/TruckLocationUpdated.java new file mode 100644 index 00000000..7053f336 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/TruckLocationUpdated.java @@ -0,0 +1,16 @@ +package io.axoniq.demo.orderfulfillment.api; + +import org.axonframework.messaging.eventhandling.annotation.Event; + +/** + * Published by the truck-movement simulator while a shipment is in transit. Not part of the + * workflow — purely a visualization signal driving the live map. + */ +@Event +public record TruckLocationUpdated( + String orderId, + double lat, + double lng, + double progress +) { +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/config/AxonConfig.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/config/AxonConfig.java new file mode 100644 index 00000000..ef4f519a --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/config/AxonConfig.java @@ -0,0 +1,15 @@ +package io.axoniq.demo.orderfulfillment.config; + +import org.axonframework.messaging.eventhandling.processing.streaming.token.store.TokenStore; +import org.axonframework.messaging.eventhandling.processing.streaming.token.store.inmemory.InMemoryTokenStore; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class AxonConfig { + + @Bean + public TokenStore tokenStore() { + return new InMemoryTokenStore(); + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/controller/OrderController.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/controller/OrderController.java new file mode 100644 index 00000000..58885465 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/controller/OrderController.java @@ -0,0 +1,76 @@ +package io.axoniq.demo.orderfulfillment.controller; + +import io.axoniq.demo.orderfulfillment.api.OrderPlaced; +import io.axoniq.demo.orderfulfillment.api.PaymentConfirmed; +import io.axoniq.demo.orderfulfillment.projection.OrderEventStream; +import io.axoniq.demo.orderfulfillment.projection.OrderStatus; +import io.axoniq.demo.orderfulfillment.projection.OrderStatusProjection; +import io.axoniq.demo.orderfulfillment.simulator.Cities; +import org.axonframework.messaging.eventhandling.gateway.EventGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.util.UUID; + +@RestController +@RequestMapping("/orders") +public class OrderController { + + private static final Logger logger = LoggerFactory.getLogger(OrderController.class); + + private final EventGateway eventGateway; + private final OrderStatusProjection projection; + private final OrderEventStream eventStream; + + public OrderController(EventGateway eventGateway, + OrderStatusProjection projection, + OrderEventStream eventStream) { + this.eventGateway = eventGateway; + this.projection = projection; + this.eventStream = eventStream; + } + + @PostMapping + public String placeOrder(@RequestParam("customerId") String customerId, + @RequestParam("email") String email, + @RequestParam("amount") double amount, + @RequestParam(value = "scenario", required = false) String scenario) { + var orderId = UUID.randomUUID().toString(); + var route = Cities.randomPair(); + logger.info("Publishing OrderPlaced for order {} ({} → {}).", + orderId, route[0].name(), route[1].name()); + eventGateway.publish(null, new OrderPlaced( + orderId, customerId, email, amount, + route[0].name(), route[0].lat(), route[0].lng(), + route[1].name(), route[1].lat(), route[1].lng(), + scenario == null ? "happy" : scenario)); + return orderId; + } + + @PostMapping("/{orderId}/payment") + public void confirmPayment(@PathVariable("orderId") String orderId) { + logger.info("Publishing PaymentConfirmed for order {}.", orderId); + eventGateway.publish(null, new PaymentConfirmed(orderId, "txn-" + UUID.randomUUID())); + } + + @GetMapping("/{orderId}") + public ResponseEntity getStatus(@PathVariable("orderId") String orderId) { + return projection.findById(orderId) + .map(ResponseEntity::ok) + .orElse(ResponseEntity.notFound().build()); + } + + @GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public SseEmitter stream() { + return eventStream.subscribe(); + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderEventStream.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderEventStream.java new file mode 100644 index 00000000..997f48b4 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderEventStream.java @@ -0,0 +1,130 @@ +package io.axoniq.demo.orderfulfillment.projection; + +import io.axoniq.demo.orderfulfillment.api.InitiatingPaymentForCustomerStarted; +import io.axoniq.demo.orderfulfillment.api.OrderDelivered; +import io.axoniq.demo.orderfulfillment.api.OrderFailed; +import io.axoniq.demo.orderfulfillment.api.OrderPlaced; +import io.axoniq.demo.orderfulfillment.api.ShipOrderCompleted; +import io.axoniq.demo.orderfulfillment.api.TruckLocationUpdated; +import org.axonframework.messaging.eventhandling.annotation.EventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * Pushes workflow Started/Completed updates plus simulator location ticks to subscribed SSE + * clients. New subscribers receive a snapshot of all known orders so the map can render existing + * shipments immediately. + */ +@Component +public class OrderEventStream { + + private static final Logger logger = LoggerFactory.getLogger(OrderEventStream.class); + + private final List emitters = new CopyOnWriteArrayList<>(); + private final OrderStatusProjection projection; + + public OrderEventStream(OrderStatusProjection projection) { + this.projection = projection; + } + + public SseEmitter subscribe() { + var emitter = new SseEmitter(0L); + emitters.add(emitter); + emitter.onCompletion(() -> emitters.remove(emitter)); + emitter.onTimeout(() -> emitters.remove(emitter)); + emitter.onError(e -> emitters.remove(emitter)); + try { + emitter.send(SseEmitter.event().name("snapshot").data(projection.findAll())); + } catch (IOException e) { + emitters.remove(emitter); + } + return emitter; + } + + @EventHandler + public void on(OrderPlaced event) { + var payload = new HashMap(); + payload.put("type", "PLACED"); + payload.put("orderId", event.orderId()); + payload.put("customerId", event.customerId()); + payload.put("email", event.email()); + payload.put("amount", event.amount()); + payload.put("originCity", event.originCity()); + payload.put("originLat", event.originLat()); + payload.put("originLng", event.originLng()); + payload.put("destinationCity", event.destinationCity()); + payload.put("destinationLat", event.destinationLat()); + payload.put("destinationLng", event.destinationLng()); + payload.put("scenario", event.scenario()); + payload.put("timestamp", Instant.now().toString()); + broadcast("order", payload); + } + + @EventHandler + public void on(InitiatingPaymentForCustomerStarted event) { + broadcast("order", Map.of( + "type", "AWAITING_PAYMENT", + "orderId", event.orderId(), + "timestamp", Instant.now().toString() + )); + } + + @EventHandler + public void on(ShipOrderCompleted event) { + broadcast("order", Map.of( + "type", "IN_TRANSIT", + "orderId", event.orderId(), + "trackingNumber", event.trackingNumber(), + "timestamp", Instant.now().toString() + )); + } + + @EventHandler + public void on(TruckLocationUpdated event) { + broadcast("location", Map.of( + "orderId", event.orderId(), + "lat", event.lat(), + "lng", event.lng(), + "progress", event.progress() + )); + } + + @EventHandler + public void on(OrderDelivered event) { + broadcast("order", Map.of( + "type", "DELIVERED", + "orderId", event.orderId(), + "timestamp", Instant.now().toString() + )); + } + + @EventHandler + public void on(OrderFailed event) { + broadcast("order", Map.of( + "type", "FAILED", + "orderId", event.orderId(), + "reason", event.reason(), + "timestamp", Instant.now().toString() + )); + } + + private void broadcast(String name, Object payload) { + for (SseEmitter emitter : emitters) { + try { + emitter.send(SseEmitter.event().name(name).data(payload)); + } catch (IOException | IllegalStateException e) { + logger.debug("Dropping dead SSE emitter: {}", e.getMessage()); + emitters.remove(emitter); + } + } + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatus.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatus.java new file mode 100644 index 00000000..221457a9 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatus.java @@ -0,0 +1,75 @@ +package io.axoniq.demo.orderfulfillment.projection; + +public record OrderStatus( + String orderId, + String customerId, + String email, + double amount, + Status status, + String trackingNumber, + String originCity, + double originLat, + double originLng, + String destinationCity, + double destinationLat, + double destinationLng, + double currentLat, + double currentLng, + double progress, + String scenario, + String failureReason +) { + + public enum Status { + PLACED, + AWAITING_PAYMENT, + IN_TRANSIT, + DELIVERED, + FAILED + } + + public OrderStatus awaitingPayment() { + return new OrderStatus(orderId, customerId, email, amount, Status.AWAITING_PAYMENT, + trackingNumber, + originCity, originLat, originLng, + destinationCity, destinationLat, destinationLng, + currentLat, currentLng, progress, + scenario, failureReason); + } + + public OrderStatus dispatched(String trackingNumber) { + return new OrderStatus(orderId, customerId, email, amount, Status.IN_TRANSIT, + trackingNumber, + originCity, originLat, originLng, + destinationCity, destinationLat, destinationLng, + originLat, originLng, 0.0, + scenario, failureReason); + } + + public OrderStatus moved(double lat, double lng, double progress) { + return new OrderStatus(orderId, customerId, email, amount, status, + trackingNumber, + originCity, originLat, originLng, + destinationCity, destinationLat, destinationLng, + lat, lng, progress, + scenario, failureReason); + } + + public OrderStatus delivered() { + return new OrderStatus(orderId, customerId, email, amount, Status.DELIVERED, + trackingNumber, + originCity, originLat, originLng, + destinationCity, destinationLat, destinationLng, + destinationLat, destinationLng, 1.0, + scenario, failureReason); + } + + public OrderStatus failed(String reason) { + return new OrderStatus(orderId, customerId, email, amount, Status.FAILED, + trackingNumber, + originCity, originLat, originLng, + destinationCity, destinationLat, destinationLng, + currentLat, currentLng, progress, + scenario, reason); + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatusProjection.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatusProjection.java new file mode 100644 index 00000000..d7f2cfe9 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatusProjection.java @@ -0,0 +1,84 @@ +package io.axoniq.demo.orderfulfillment.projection; + +import io.axoniq.demo.orderfulfillment.api.InitiatingPaymentForCustomerStarted; +import io.axoniq.demo.orderfulfillment.api.OrderDelivered; +import io.axoniq.demo.orderfulfillment.api.OrderFailed; +import io.axoniq.demo.orderfulfillment.api.OrderPlaced; +import io.axoniq.demo.orderfulfillment.api.ShipOrderCompleted; +import io.axoniq.demo.orderfulfillment.api.TruckLocationUpdated; +import org.axonframework.messaging.eventhandling.annotation.EventHandler; +import org.springframework.stereotype.Component; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Builds order status from workflow Started/Completed events plus the simulator's truck-movement + * events. The projection is the system of record for the live UI — every status change here is + * what the SSE stream broadcasts. + */ +@Component +public class OrderStatusProjection { + + private final Map orders = new ConcurrentHashMap<>(); + + @EventHandler + public void on(OrderPlaced event) { + orders.put(event.orderId(), + new OrderStatus(event.orderId(), + event.customerId(), + event.email(), + event.amount(), + OrderStatus.Status.PLACED, + null, + event.originCity(), + event.originLat(), + event.originLng(), + event.destinationCity(), + event.destinationLat(), + event.destinationLng(), + event.originLat(), + event.originLng(), + 0.0, + event.scenario(), + null)); + } + + @EventHandler + public void on(InitiatingPaymentForCustomerStarted event) { + orders.computeIfPresent(event.orderId(), (id, current) -> current.awaitingPayment()); + } + + @EventHandler + public void on(ShipOrderCompleted event) { + orders.computeIfPresent(event.orderId(), + (id, current) -> current.dispatched(event.trackingNumber())); + } + + @EventHandler + public void on(TruckLocationUpdated event) { + orders.computeIfPresent(event.orderId(), + (id, current) -> current.moved(event.lat(), event.lng(), event.progress())); + } + + @EventHandler + public void on(OrderDelivered event) { + orders.computeIfPresent(event.orderId(), (id, current) -> current.delivered()); + } + + @EventHandler + public void on(OrderFailed event) { + orders.computeIfPresent(event.orderId(), (id, current) -> current.failed(event.reason())); + } + + public Optional findById(String orderId) { + return Optional.ofNullable(orders.get(orderId)); + } + + public Collection findAll() { + return orders.values(); + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/FailureRecorder.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/FailureRecorder.java new file mode 100644 index 00000000..fe0a74fb --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/FailureRecorder.java @@ -0,0 +1,29 @@ +package io.axoniq.demo.orderfulfillment.service; + +import io.axoniq.demo.orderfulfillment.api.OrderFailed; +import org.axonframework.messaging.eventhandling.gateway.EventGateway; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * Lets the workflow publish an {@link OrderFailed} event from inside an {@code awaitExecute} + * step, so failure broadcasts go through the regular event pipeline instead of being side-effects + * of the workflow body. + */ +@Component +public class FailureRecorder { + + private final EventGateway eventGateway; + + public FailureRecorder(EventGateway eventGateway) { + this.eventGateway = eventGateway; + } + + public Boolean record(Map payload) { + var orderId = (String) payload.get("orderId"); + var reason = (String) payload.get("reason"); + eventGateway.publish(null, new OrderFailed(orderId, reason)); + return true; + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/InventoryService.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/InventoryService.java new file mode 100644 index 00000000..2da3b687 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/InventoryService.java @@ -0,0 +1,25 @@ +package io.axoniq.demo.orderfulfillment.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.Map; + +@Component +public class InventoryService { + + private static final Logger logger = LoggerFactory.getLogger(InventoryService.class); + + public boolean reserveStock(Map payload) { + var customerId = payload.get("customerId"); + var amount = payload.get("amount"); + var scenario = (String) payload.get("scenario"); + if ("out-of-stock".equals(scenario)) { + logger.info("Stock unavailable (forced) for customer {} (amount {}).", customerId, amount); + return false; + } + logger.info("Reserving stock for customer {} (amount {}).", customerId, amount); + return true; + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/NotificationService.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/NotificationService.java new file mode 100644 index 00000000..0a3aa74f --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/NotificationService.java @@ -0,0 +1,15 @@ +package io.axoniq.demo.orderfulfillment.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class NotificationService { + + private static final Logger logger = LoggerFactory.getLogger(NotificationService.class); + + public void sendConfirmation(String email, String trackingNumber) { + logger.info("Sending order confirmation to {} (tracking number {}).", email, trackingNumber); + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/PaymentService.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/PaymentService.java new file mode 100644 index 00000000..78337c1b --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/PaymentService.java @@ -0,0 +1,18 @@ +package io.axoniq.demo.orderfulfillment.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.Map; + +@Component +public class PaymentService { + + private static final Logger logger = LoggerFactory.getLogger(PaymentService.class); + + public void initiatePayment(Map payload) { + logger.info("Initiating payment for customer {} (amount {}).", + payload.get("customerId"), payload.get("amount")); + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/ShippingService.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/ShippingService.java new file mode 100644 index 00000000..4fdfc15f --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/ShippingService.java @@ -0,0 +1,27 @@ +package io.axoniq.demo.orderfulfillment.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.Map; + +@Component +public class ShippingService { + + private static final Logger logger = LoggerFactory.getLogger(ShippingService.class); + + public Map shipOrder(Map payload) { + var orderId = (String) payload.get("orderId"); + var trackingNumber = (String) payload.get("trackingNumber"); + logger.info("Shipping order {} with tracking number {}.", orderId, trackingNumber); + return Map.of( + "orderId", orderId, + "trackingNumber", trackingNumber, + "originLat", payload.get("originLat"), + "originLng", payload.get("originLng"), + "destinationLat", payload.get("destinationLat"), + "destinationLng", payload.get("destinationLng") + ); + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/AutoPaymentRobot.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/AutoPaymentRobot.java new file mode 100644 index 00000000..bbf7696a --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/AutoPaymentRobot.java @@ -0,0 +1,67 @@ +package io.axoniq.demo.orderfulfillment.simulator; + +import io.axoniq.demo.orderfulfillment.api.InitiatingPaymentForCustomerStarted; +import io.axoniq.demo.orderfulfillment.api.PaymentConfirmed; +import io.axoniq.demo.orderfulfillment.projection.OrderStatus; +import io.axoniq.demo.orderfulfillment.projection.OrderStatusProjection; +import org.axonframework.messaging.eventhandling.annotation.EventHandler; +import org.axonframework.messaging.eventhandling.gateway.EventGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +/** + * Auto-confirms payments after a randomized delay so the demo runs end-to-end without manual + * clicks. Skips orders flagged with the {@code payment-timeout} scenario, letting the workflow's + * waitForEvent expire on purpose. + */ +@Component +public class AutoPaymentRobot { + + private static final Logger logger = LoggerFactory.getLogger(AutoPaymentRobot.class); + + private final EventGateway eventGateway; + private final OrderStatusProjection projection; + private final ScheduledExecutorService scheduler = + Executors.newScheduledThreadPool(2, r -> { + var t = new Thread(r, "auto-payment"); + t.setDaemon(true); + return t; + }); + + @Autowired + public AutoPaymentRobot(EventGateway eventGateway, OrderStatusProjection projection) { + this.eventGateway = eventGateway; + this.projection = projection; + } + + @EventHandler + public void on(InitiatingPaymentForCustomerStarted event) { + var orderId = event.orderId(); + var status = projection.findById(orderId).orElse(null); + var scenario = status == null ? null : status.scenario(); + if ("payment-timeout".equals(scenario)) { + logger.info("Skipping auto-payment for {} — scenario forces timeout.", orderId); + return; + } + var delayMs = ThreadLocalRandom.current().nextInt(800, 3500); + scheduler.schedule(() -> confirm(orderId), delayMs, TimeUnit.MILLISECONDS); + } + + private void confirm(String orderId) { + var status = projection.findById(orderId).orElse(null); + if (status == null || status.status() == OrderStatus.Status.FAILED) { + return; + } + var transactionId = "txn-" + UUID.randomUUID(); + logger.info("Auto-confirming payment for {} ({}).", orderId, transactionId); + eventGateway.publish(null, new PaymentConfirmed(orderId, transactionId)); + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Cities.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Cities.java new file mode 100644 index 00000000..ab3db0b5 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Cities.java @@ -0,0 +1,48 @@ +package io.axoniq.demo.orderfulfillment.simulator; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +public final class Cities { + + public record City(String name, double lat, double lng) {} + + public static final List ALL = List.of( + new City("New York", 40.7128, -74.0060), + new City("Los Angeles", 34.0522, -118.2437), + new City("Chicago", 41.8781, -87.6298), + new City("Houston", 29.7604, -95.3698), + new City("Phoenix", 33.4484, -112.0740), + new City("Philadelphia", 39.9526, -75.1652), + new City("San Antonio", 29.4241, -98.4936), + new City("San Diego", 32.7157, -117.1611), + new City("Dallas", 32.7767, -96.7970), + new City("Austin", 30.2672, -97.7431), + new City("Boston", 42.3601, -71.0589), + new City("Seattle", 47.6062, -122.3321), + new City("Denver", 39.7392, -104.9903), + new City("Miami", 25.7617, -80.1918), + new City("Atlanta", 33.7490, -84.3880), + new City("Portland", 45.5152, -122.6784), + new City("Minneapolis", 44.9778, -93.2650), + new City("St. Louis", 38.6270, -90.1994), + new City("Nashville", 36.1627, -86.7816), + new City("New Orleans", 29.9511, -90.0715) + ); + + private Cities() {} + + public static City random() { + return ALL.get(ThreadLocalRandom.current().nextInt(ALL.size())); + } + + public static City[] randomPair() { + var rnd = ThreadLocalRandom.current(); + var origin = ALL.get(rnd.nextInt(ALL.size())); + City destination; + do { + destination = ALL.get(rnd.nextInt(ALL.size())); + } while (destination == origin); + return new City[]{origin, destination}; + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/CustomerPool.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/CustomerPool.java new file mode 100644 index 00000000..e5aee31a --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/CustomerPool.java @@ -0,0 +1,33 @@ +package io.axoniq.demo.orderfulfillment.simulator; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +public final class CustomerPool { + + public record Customer(String id, String name, String email) {} + + public static final List ALL = List.of( + new Customer("cust-001", "Alice Johnson", "alice@example.com"), + new Customer("cust-002", "Bob Martinez", "bob@example.com"), + new Customer("cust-003", "Carla Singh", "carla@example.com"), + new Customer("cust-004", "Daniel Park", "daniel@example.com"), + new Customer("cust-005", "Elena Petrova", "elena@example.com"), + new Customer("cust-006", "Felix Tanaka", "felix@example.com"), + new Customer("cust-007", "Grace O'Connor", "grace@example.com"), + new Customer("cust-008", "Hassan Reyes", "hassan@example.com"), + new Customer("cust-009", "Iris Berg", "iris@example.com"), + new Customer("cust-010", "Jonas Weber", "jonas@example.com") + ); + + private CustomerPool() {} + + public static Customer random() { + return ALL.get(ThreadLocalRandom.current().nextInt(ALL.size())); + } + + public static double randomAmount() { + var rnd = ThreadLocalRandom.current(); + return Math.round(rnd.nextDouble(15.0, 499.0) * 100.0) / 100.0; + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/SimulationController.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/SimulationController.java new file mode 100644 index 00000000..df7dc19e --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/SimulationController.java @@ -0,0 +1,45 @@ +package io.axoniq.demo.orderfulfillment.simulator; + +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Map; + +@RestController +@RequestMapping("/simulate") +public class SimulationController { + + private final Simulator simulator; + + public SimulationController(Simulator simulator) { + this.simulator = simulator; + } + + @PostMapping("/single") + public Map single(@RequestParam(value = "scenario", required = false) String scenario) { + var orderId = simulator.placeRandom(normalize(scenario)); + return Map.of("orderId", orderId); + } + + @PostMapping("/burst") + public Map burst(@RequestParam(value = "count", defaultValue = "10") int count, + @RequestParam(value = "scenario", required = false) String scenario) { + simulator.burst(count, normalize(scenario)); + return Map.of("queued", count, "scenario", String.valueOf(normalize(scenario))); + } + + @PostMapping("/scenario") + public Map scenario(@RequestParam("type") String type) { + var orderId = simulator.placeRandom(normalize(type)); + return Map.of("orderId", orderId, "scenario", type); + } + + private static String normalize(String scenario) { + if (scenario == null || scenario.isBlank() || "happy".equalsIgnoreCase(scenario)) { + return "happy"; + } + return scenario; + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Simulator.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Simulator.java new file mode 100644 index 00000000..258c8cf2 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Simulator.java @@ -0,0 +1,58 @@ +package io.axoniq.demo.orderfulfillment.simulator; + +import io.axoniq.demo.orderfulfillment.api.OrderPlaced; +import org.axonframework.messaging.eventhandling.gateway.EventGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +@Component +public class Simulator { + + private static final Logger logger = LoggerFactory.getLogger(Simulator.class); + + private final EventGateway eventGateway; + private final ScheduledExecutorService scheduler = + Executors.newScheduledThreadPool(2, r -> { + var t = new Thread(r, "sim-burst"); + t.setDaemon(true); + return t; + }); + + public Simulator(EventGateway eventGateway) { + this.eventGateway = eventGateway; + } + + public String placeRandom(String scenario) { + var customer = CustomerPool.random(); + var route = Cities.randomPair(); + var orderId = UUID.randomUUID().toString(); + var event = new OrderPlaced( + orderId, + customer.id(), + customer.email(), + CustomerPool.randomAmount(), + route[0].name(), route[0].lat(), route[0].lng(), + route[1].name(), route[1].lat(), route[1].lng(), + scenario + ); + logger.info("Placing simulated order {} ({} → {}, scenario={}).", + orderId, route[0].name(), route[1].name(), scenario); + eventGateway.publish(null, event); + return orderId; + } + + public void burst(int count, String scenario) { + var n = Math.max(1, Math.min(count, 100)); + for (int i = 0; i < n; i++) { + var staggerMs = ThreadLocalRandom.current().nextInt(50, 350); + scheduler.schedule(() -> placeRandom(scenario), (long) i * staggerMs, TimeUnit.MILLISECONDS); + } + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/TruckMovementSimulator.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/TruckMovementSimulator.java new file mode 100644 index 00000000..04dfd3b5 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/TruckMovementSimulator.java @@ -0,0 +1,81 @@ +package io.axoniq.demo.orderfulfillment.simulator; + +import io.axoniq.demo.orderfulfillment.api.OrderDelivered; +import io.axoniq.demo.orderfulfillment.api.ShipOrderCompleted; +import io.axoniq.demo.orderfulfillment.api.TruckLocationUpdated; +import org.axonframework.messaging.eventhandling.annotation.EventHandler; +import org.axonframework.messaging.eventhandling.gateway.EventGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Animates truck movement after the workflow's {@code shipOrder} step completes. Publishes + * {@link TruckLocationUpdated} ticks along the great-circle-ish straight line between origin and + * destination, then emits a final {@link OrderDelivered} event. Decoupled from the workflow so + * many shipments can animate concurrently without blocking workflow threads. + */ +@Component +public class TruckMovementSimulator { + + private static final Logger logger = LoggerFactory.getLogger(TruckMovementSimulator.class); + private static final int TICKS = 30; + private static final long TICK_INTERVAL_MS = 400; + + private final EventGateway eventGateway; + private final ScheduledExecutorService scheduler = + Executors.newScheduledThreadPool(8, r -> { + var t = new Thread(r, "truck-sim"); + t.setDaemon(true); + return t; + }); + + public TruckMovementSimulator(EventGateway eventGateway) { + this.eventGateway = eventGateway; + } + + @EventHandler + public void on(ShipOrderCompleted event) { + var jitter = ThreadLocalRandom.current().nextDouble(0.85, 1.25); + var orderId = event.orderId(); + var trackingNumber = event.trackingNumber(); + var originLat = event.originLat(); + var originLng = event.originLng(); + var destLat = event.destinationLat(); + var destLng = event.destinationLng(); + + logger.info("Starting truck animation for order {} ({}).", orderId, trackingNumber); + + var step = new AtomicInteger(1); + var handle = scheduler.scheduleAtFixedRate(() -> { + int i = step.getAndIncrement(); + if (i > TICKS) { + return; + } + double t = (double) i / TICKS; + double lat = originLat + (destLat - originLat) * t; + double lng = originLng + (destLng - originLng) * t; + try { + eventGateway.publish(null, new TruckLocationUpdated(orderId, lat, lng, t)); + } catch (Exception e) { + logger.warn("Failed to publish location for {}: {}", orderId, e.getMessage()); + } + }, 100, (long) (TICK_INTERVAL_MS * jitter), TimeUnit.MILLISECONDS); + + scheduler.schedule(() -> { + handle.cancel(false); + try { + eventGateway.publish(null, new OrderDelivered(orderId, trackingNumber)); + logger.info("Order {} delivered.", orderId); + } catch (Exception e) { + logger.warn("Failed to publish delivery for {}: {}", orderId, e.getMessage()); + } + }, (long) (TICK_INTERVAL_MS * jitter * (TICKS + 1)), TimeUnit.MILLISECONDS); + } +} diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/workflow/OrderFulfillmentWorkflow.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/workflow/OrderFulfillmentWorkflow.java new file mode 100644 index 00000000..685989e6 --- /dev/null +++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/workflow/OrderFulfillmentWorkflow.java @@ -0,0 +1,149 @@ +package io.axoniq.demo.orderfulfillment.workflow; + +import io.axoniq.demo.orderfulfillment.api.PaymentConfirmed; +import io.axoniq.demo.orderfulfillment.service.FailureRecorder; +import io.axoniq.demo.orderfulfillment.service.InventoryService; +import io.axoniq.demo.orderfulfillment.service.NotificationService; +import io.axoniq.demo.orderfulfillment.service.PaymentService; +import io.axoniq.demo.orderfulfillment.service.ShippingService; +import io.axoniq.workflow.dsl.simple.SimpleWorkflowContext; +import io.axoniq.workflow.runtime.api.annotation.Workflow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static io.axoniq.workflow.dsl.api.AssociationsUtils.associate; +import static io.axoniq.workflow.dsl.simple.SimpleWorkflowContext.equalsTo; +import static io.axoniq.workflow.runtime.association.PayloadPropertyValueRetriever.payloadProperty; +import static io.axoniq.workflow.runtime.execution.DefaultEventNameCustomizer.Builder.baseName; +import static io.axoniq.workflow.runtime.execution.DefaultEventNameCustomizer.Builder.namespace; + +@Component +public class OrderFulfillmentWorkflow { + + private static final Logger logger = LoggerFactory.getLogger(OrderFulfillmentWorkflow.class); + + private final InventoryService inventory; + private final PaymentService payment; + private final ShippingService shipping; + private final NotificationService notifications; + private final FailureRecorder failures; + + public OrderFulfillmentWorkflow(InventoryService inventory, + PaymentService payment, + ShippingService shipping, + NotificationService notifications, + FailureRecorder failures) { + this.inventory = inventory; + this.payment = payment; + this.shipping = shipping; + this.notifications = notifications; + this.failures = failures; + } + + @Workflow( + idProperty = "orderId", + startOnEvent = "io.axoniq.demo.orderfulfillment.api.OrderPlaced", + workflowName = "OrderFulfillmentWorkflow" + ) + public void execute(SimpleWorkflowContext ctx) { + var orderId = (String) ctx.workflowPayload().get("orderId"); + var customerId = (String) ctx.workflowPayload().get("customerId"); + var email = (String) ctx.workflowPayload().get("email"); + var amount = ctx.workflowPayload().get("amount"); + var scenario = (String) ctx.workflowPayload().get("scenario"); + var originLat = ((Number) ctx.workflowPayload().get("originLat")).doubleValue(); + var originLng = ((Number) ctx.workflowPayload().get("originLng")).doubleValue(); + var destLat = ((Number) ctx.workflowPayload().get("destinationLat")).doubleValue(); + var destLng = ((Number) ctx.workflowPayload().get("destinationLng")).doubleValue(); + + logger.info("Order {} workflow started for customer {} (amount {}, scenario {}).", + orderId, customerId, amount, scenario); + + var paymentTimeout = "payment-timeout".equals(scenario) + ? Duration.ofSeconds(4) + : Duration.ofSeconds(45); + + var paymentConfirmation = ctx.waitForEvent( + "awaitPayment", + PaymentConfirmed.class, + associate(payloadProperty("orderId"), equalsTo(orderId)), + paymentTimeout + ); + + var reserved = ctx.awaitExecute( + "reserveStock", + Map.of("customerId", customerId, "amount", amount, "scenario", scenario), + Boolean.class, + inventory::reserveStock + ); + if (!reserved) { + paymentConfirmation.cancel("Stock unavailable"); + ctx.awaitExecute( + "recordOutOfStock", + Map.of("orderId", orderId, "reason", "Out of stock"), + Boolean.class, + failures::record + ); + ctx.fail(new RuntimeException("Stock unavailable for order " + orderId)); + return; + } + + ctx.awaitExecute( + "initiatePayment", + Map.of("orderId", orderId, "customerId", customerId, "amount", amount), + (pc, p) -> { + payment.initiatePayment(p); + return Map.of(); + }, + Duration.ofSeconds(30), + baseName("InitiatingPaymentForCustomer").namespace("io.axoniq.demo.orderfulfillment.api") + ); + + paymentConfirmation.await(); + var confirmation = paymentConfirmation.>result(); + if (confirmation.isEmpty()) { + ctx.awaitExecute( + "recordPaymentTimeout", + Map.of("orderId", orderId, "reason", "Payment timed out"), + Boolean.class, + failures::record + ); + ctx.fail(new RuntimeException("Payment timed out for order " + orderId)); + return; + } + var transactionId = (String) confirmation.get().get("transactionId"); + logger.info("Order {} payment confirmed (transaction {}).", orderId, transactionId); + + // The Completed event of this step is `ShipOrderCompleted` — that is the durable signal + // the truck-movement simulator subscribes to. The trackingNumber is generated up front so + // both the Started and Completed events expose it. + var trackingNumber = "TRK-" + UUID.randomUUID(); + var shipPayload = new HashMap(); + shipPayload.put("orderId", orderId); + shipPayload.put("trackingNumber", trackingNumber); + shipPayload.put("originLat", originLat); + shipPayload.put("originLng", originLng); + shipPayload.put("destinationLat", destLat); + shipPayload.put("destinationLng", destLng); + ctx.awaitExecute( + "shipOrder", + shipPayload, + (pc, p) -> shipping.shipOrder(p), + Duration.ofSeconds(30), + namespace("io.axoniq.demo.orderfulfillment.api") + ); + + ctx.awaitExecute("notifyCustomer", Boolean.class, () -> { + notifications.sendConfirmation(email, trackingNumber); + return true; + }); + + logger.info("Order {} workflow completed (tracking {}).", orderId, trackingNumber); + } +} diff --git a/order-fulfillment-workflow/src/main/resources/application.yaml b/order-fulfillment-workflow/src/main/resources/application.yaml new file mode 100644 index 00000000..d51d395f --- /dev/null +++ b/order-fulfillment-workflow/src/main/resources/application.yaml @@ -0,0 +1,10 @@ +server: + port: 9090 + +spring: + application: + name: order-fulfillment-workflow + +axon: + axon-server: + enabled: true diff --git a/order-fulfillment-workflow/src/main/resources/static/index.html b/order-fulfillment-workflow/src/main/resources/static/index.html new file mode 100644 index 00000000..28f7b3b3 --- /dev/null +++ b/order-fulfillment-workflow/src/main/resources/static/index.html @@ -0,0 +1,377 @@ + + + + + Order Fulfillment — Live Simulator + + + + +
+

Order Fulfillment — Live Simulator

+ connecting… +
+ + + +
+ + + + + + + diff --git a/order-fulfillment-workflow/src/test/java/io/axoniq/demo/orderfulfillment/OrderFulfillmentIT.java b/order-fulfillment-workflow/src/test/java/io/axoniq/demo/orderfulfillment/OrderFulfillmentIT.java new file mode 100644 index 00000000..4cdbced4 --- /dev/null +++ b/order-fulfillment-workflow/src/test/java/io/axoniq/demo/orderfulfillment/OrderFulfillmentIT.java @@ -0,0 +1,59 @@ +package io.axoniq.demo.orderfulfillment; + +import io.axoniq.demo.orderfulfillment.projection.OrderStatus; +import io.axoniq.framework.testcontainer.AxonServerContainer; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.resttestclient.TestRestTemplate; +import org.springframework.boot.resttestclient.autoconfigure.AutoConfigureTestRestTemplate; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@AutoConfigureTestRestTemplate +@Testcontainers +class OrderFulfillmentIT { + + @Container + static final AxonServerContainer AXON_SERVER = new AxonServerContainer("docker.axoniq.io/axoniq/axonserver:2026.0.0") + .withAxonServerHostname("localhost") + .withDevMode(true) + .withDcbContext(true) + .withStartupTimeout(Duration.ofMinutes(3)); + + @DynamicPropertySource + static void axonProperties(DynamicPropertyRegistry registry) { + registry.add("axon.axonserver.servers", + () -> AXON_SERVER.getHost() + ":" + AXON_SERVER.getMappedPort(8124)); + } + + @Autowired + private TestRestTemplate restTemplate; + + @Test + void test_order_fulfillment_process() { + var orderId = restTemplate.postForObject( + "/orders?customerId=customer-1&email=customer-1@example.com&amount=99.95", + null, + String.class); + assertThat(orderId).isNotBlank(); + + // Auto-payment robot confirms shortly after the workflow registers `awaitPayment`, so the + // order should advance through IN_TRANSIT and reach DELIVERED without a manual payment POST. + await().atMost(45, TimeUnit.SECONDS).untilAsserted(() -> { + var status = restTemplate.getForObject("/orders/" + orderId, OrderStatus.class); + assertThat(status).isNotNull(); + assertThat(status.status()).isEqualTo(OrderStatus.Status.DELIVERED); + assertThat(status.trackingNumber()).startsWith("TRK-"); + }); + } +} diff --git a/pom.xml b/pom.xml index 7cb2ebce..f2cc339b 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,7 @@ axon-spring-template distributed-exceptions multitenancy + order-fulfillment-workflow reset-handler saga serialization-avro @@ -28,6 +29,7 @@ subscription-query-rest subscription-query-streaming upcaster + workflow-saga diff --git a/workflow-saga/README.md b/workflow-saga/README.md new file mode 100644 index 00000000..a434a4a3 --- /dev/null +++ b/workflow-saga/README.md @@ -0,0 +1,104 @@ +# Workflow Saga — replacing the legacy `ProcessOrderSaga` + +This module is a one-to-one rewrite of the [`saga`](../saga/README.md) sample, using the **Axoniq +Workflow Engine** instead of `@Saga`. The exact same orchestration — start on order confirmation, +request payment + shipment in parallel, react to either of two payment outcomes, complete the +process — is expressed as plain imperative Java. + +## What got simpler + +Side-by-side: same business behaviour, expressed two different ways. + +| | `saga` module (`@Saga`) | `workflow-saga` module (`@Workflow`) | +|----------------------------------|---|---| +| Lines in the orchestration class | 107 | ~70 | +| Saga state | 4 mutable fields (`orderId`, `shipmentId`, `orderDeadlineId`, `orderIsPaid`, `orderIsDelivered`) | local variables — no fields needed | +| Associating to multiple ids | Manual `SagaLifecycle.associateWith(...)` calls + `@SagaEventHandler(associationProperty = ...)` per handler | One `associate(payloadProperty(...), equalsTo(...))` per `waitForEvent` call | +| Branching on outcomes | Multiple `@SagaEventHandler` methods, with `if (orderIsPaid && orderIsDelivered)` flags; a separate `@DeadlineHandler` for the deadline; explicit `SagaLifecycle.end()` plus `deadlineManager.cancelSchedule(...)` | Standard Java `if`/`else`. The deadline is just `Duration.ofDays(5)` passed to `waitForEvent`. No lifecycle plumbing — when the workflow returns, it ends. | +| Race / parallelism | Implicit, via independent event handlers and shared mutable fields | Explicit, via `ctx.anyMatch(WorkflowStepResult::success, paid, paymentCancelled)` | +| Cancellation of pending waits | Manual: cancel deadlines via `DeadlineManager`, end saga via `SagaLifecycle.end()` | `delivered.cancel("payment cancelled")` and the workflow simply returns | + +The whole orchestration is one `execute(SimpleWorkflowContext ctx)` method: + +```java +@Workflow(idProperty = "orderId", startOnEvent = "io.axoniq.demo.workflowsaga.api.OrderConfirmedEvent") +public void execute(SimpleWorkflowContext ctx) { + var orderId = (String) ctx.workflowPayload().get("orderId"); + var paymentId = UUID.randomUUID().toString(); + var shipmentId = UUID.randomUUID().toString(); + + var paid = ctx.waitForEvent("paid", OrderPaidEvent.class, + associate(payloadProperty("paymentId"), equalsTo(paymentId)), Duration.ofDays(5)); + var cancelled = ctx.waitForEvent("paymentCancelled", OrderPaymentCancelledEvent.class, + associate(payloadProperty("paymentId"), equalsTo(paymentId)), Duration.ofDays(5)); + var delivered = ctx.waitForEvent("delivered", ShipmentStatusUpdatedEvent.class, + associate(payloadProperty("shipmentId"), equalsTo(shipmentId)) + .and(payloadProperty("shipmentStatus"), "=", DELIVERED.name()), + Duration.ofDays(5)); + + // The Started event of each awaitExecute step is the durable signal — `RequestPaymentStarted`, + // `RequestShipmentStarted`, `CancelShipmentStarted`, `CompleteOrderStarted`. The workflow does + // not call eventGateway.publish anywhere; the engine emits those events automatically. + ctx.awaitExecute("requestPayment", payload("orderId", orderId, "paymentId", paymentId).getValues(), + (pc, p) -> Map.of(), STEP_TIMEOUT, apiNamespace()); + ctx.awaitExecute("requestShipment", payload("orderId", orderId, "shipmentId", shipmentId).getValues(), + (pc, p) -> Map.of(), STEP_TIMEOUT, apiNamespace()); + + var outcome = ctx.anyMatch(WorkflowStepResult::success, paid, cancelled); + outcome.await(); + + var matched = outcome.matched().stream().map(WorkflowStepResult::getStepName).toList(); + if (matched.contains("paymentCancelled")) { + ctx.awaitExecute("cancelShipment", payload(...).getValues(), (pc, p) -> Map.of(), + STEP_TIMEOUT, apiNamespace()); + delivered.cancel("payment cancelled"); + ctx.awaitExecute("completeOrder", + payload("orderId", orderId, "paid", false, "delivered", false).getValues(), + (pc, p) -> Map.of(), STEP_TIMEOUT, apiNamespace()); + return; + } + + delivered.await(); + ctx.awaitExecute("completeOrder", + payload("orderId", orderId, "paid", true, "delivered", delivered.success()).getValues(), + (pc, p) -> Map.of(), STEP_TIMEOUT, apiNamespace()); +} +``` + +There is no `@StartSaga`, no `@EndSaga`, no `@DeadlineHandler`, no `SagaLifecycle.associateWith`, no +serialized saga state, and no `eventGateway.publish` calls — the workflow never publishes events +itself. The engine event-sources every step and emits a Started/Completed event pair for each +`awaitExecute`; projections subscribe to those naturally-emitted events. + +## Running the application + +```bash +docker compose up -d +mvn spring-boot:run +``` + +The application listens on port **9091**. Endpoints (provided to drive the demo manually): + +| Method | Path | Description | +|--------|-----------------------------------------------------|----------------------------------------------| +| POST | `/orders` | Confirm a new order. Returns the order id. | +| GET | `/orders/{orderId}` | Read the projected status of the order. | +| GET | `/orders/{orderId}/ids` | Look up the workflow's payment & shipment id.| +| POST | `/payments/{paymentId}/paid` | Simulate the payment system marking paid. | +| POST | `/payments/{paymentId}/cancel` | Simulate the payment system cancelling. | +| POST | `/shipments/{shipmentId}/status?status=DELIVERED` | Push a shipment status update. | + +## Tests + +```bash +mvn verify +``` + +`ProcessOrderWorkflowIT` boots the whole application against a real Axon Server (Testcontainers) +and exercises both the happy path and the payment-cancelled branch. + +## Cleanup + +```bash +docker compose down -v +``` diff --git a/workflow-saga/docker-compose.yaml b/workflow-saga/docker-compose.yaml new file mode 100644 index 00000000..65f6d447 --- /dev/null +++ b/workflow-saga/docker-compose.yaml @@ -0,0 +1,20 @@ +services: + axon-server: + image: docker.axoniq.io/axoniq/axonserver:2026.0.0 + container_name: workflow-saga-axon-server + ports: + - "8024:8024" + - "8124:8124" + environment: + axoniq.axonserver.standalone-dcb: true + axoniq_axonserver_hostname: axon-server + axoniq_axonserver_devmode_enabled: true + volumes: + - data:/axonserver/data + - events:/axonserver/events + +volumes: + data: + driver: local + events: + driver: local diff --git a/workflow-saga/pom.xml b/workflow-saga/pom.xml new file mode 100644 index 00000000..310bcccc --- /dev/null +++ b/workflow-saga/pom.xml @@ -0,0 +1,183 @@ + + + 4.0.0 + + io.axoniq.demo + workflow-saga + 0.0.1-SNAPSHOT + Workflow Saga + Same order-process saga as the `saga` module — rewritten with the Axon Workflow Engine. + + + 21 + 21 + 21 + UTF-8 + + 5.1.0 + 5.1.0 + 1.0.0-SNAPSHOT + 4.0.6 + 2.0.5 + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + org.axonframework + axon-framework-bom + ${axon.version} + pom + import + + + io.axoniq.framework + axoniq-framework-bom + ${axoniq-framework.version} + pom + import + + + org.testcontainers + testcontainers-bom + ${testcontainers.version} + pom + import + + + + + + + io.axoniq.framework.workflow + axon-workflow-spring-boot + ${axon-workflow.version} + + + + org.axonframework.extensions.spring + axon-spring-boot-starter + + + io.axoniq.framework + axon-server-connector + + + + org.springframework.boot + spring-boot-starter-web + + + com.fasterxml.jackson.core + jackson-databind + + + + io.axoniq.framework.workflow + axon-workflow-test + ${axon-workflow.version} + test + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.boot + spring-boot-resttestclient + test + + + org.springframework.boot + spring-boot-restclient + test + + + io.axoniq.framework + axoniq-testcontainer + test + + + org.testcontainers + testcontainers + test + + + org.testcontainers + testcontainers-junit-jupiter + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.14.1 + + ${java.version} + ${java.version} + true + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + org.apache.maven.plugins + maven-surefire-plugin + 3.5.4 + + + org.apache.maven.plugins + maven-failsafe-plugin + 3.5.4 + + + + integration-test + verify + + + + + + + + + + central-portal-snapshots + Central Portal Snapshots + https://central.sonatype.com/repository/maven-snapshots/ + + false + + + true + + + + diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/WorkflowSagaApplication.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/WorkflowSagaApplication.java new file mode 100644 index 00000000..8d0f3c24 --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/WorkflowSagaApplication.java @@ -0,0 +1,12 @@ +package io.axoniq.demo.workflowsaga; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class WorkflowSagaApplication { + + public static void main(String[] args) { + SpringApplication.run(WorkflowSagaApplication.class, args); + } +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/CompleteOrderStarted.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/CompleteOrderStarted.java new file mode 100644 index 00000000..af939821 --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/CompleteOrderStarted.java @@ -0,0 +1,10 @@ +package io.axoniq.demo.workflowsaga.api; + +/** + * Emitted automatically by the workflow engine when the {@code completeOrder} step starts. + * The Started event's payload carries the {@code orderId}/{@code paid}/{@code delivered} + * flags the workflow passed in — the workflow does not publish a separate "process completed" + * event itself. + */ +public record CompleteOrderStarted(String orderId, boolean paid, boolean delivered) { +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/OrderConfirmedEvent.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/OrderConfirmedEvent.java new file mode 100644 index 00000000..47005a20 --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/OrderConfirmedEvent.java @@ -0,0 +1,7 @@ +package io.axoniq.demo.workflowsaga.api; + +import org.axonframework.messaging.eventhandling.annotation.Event; + +@Event +public record OrderConfirmedEvent(String orderId) { +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/OrderPaidEvent.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/OrderPaidEvent.java new file mode 100644 index 00000000..01a64b8e --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/OrderPaidEvent.java @@ -0,0 +1,7 @@ +package io.axoniq.demo.workflowsaga.api; + +import org.axonframework.messaging.eventhandling.annotation.Event; + +@Event +public record OrderPaidEvent(String paymentId) { +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/OrderPaymentCancelledEvent.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/OrderPaymentCancelledEvent.java new file mode 100644 index 00000000..24c677e5 --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/OrderPaymentCancelledEvent.java @@ -0,0 +1,7 @@ +package io.axoniq.demo.workflowsaga.api; + +import org.axonframework.messaging.eventhandling.annotation.Event; + +@Event +public record OrderPaymentCancelledEvent(String paymentId) { +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/RequestPaymentStarted.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/RequestPaymentStarted.java new file mode 100644 index 00000000..de5ccfe3 --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/RequestPaymentStarted.java @@ -0,0 +1,9 @@ +package io.axoniq.demo.workflowsaga.api; + +/** + * Emitted automatically by the workflow engine when the {@code requestPayment} step starts. + * No code publishes this — it is the Started event of the {@code awaitExecute("requestPayment", ...)} + * primitive, with the step's input payload. + */ +public record RequestPaymentStarted(String orderId, String paymentId) { +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/RequestShipmentStarted.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/RequestShipmentStarted.java new file mode 100644 index 00000000..352dcdd2 --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/RequestShipmentStarted.java @@ -0,0 +1,7 @@ +package io.axoniq.demo.workflowsaga.api; + +/** + * Emitted automatically by the workflow engine when the {@code requestShipment} step starts. + */ +public record RequestShipmentStarted(String orderId, String shipmentId) { +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/ShipmentStatus.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/ShipmentStatus.java new file mode 100644 index 00000000..ef27be2f --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/ShipmentStatus.java @@ -0,0 +1,8 @@ +package io.axoniq.demo.workflowsaga.api; + +public enum ShipmentStatus { + NEW, + SHIPPED, + DELIVERY_EXCEPTION, + DELIVERED +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/ShipmentStatusUpdatedEvent.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/ShipmentStatusUpdatedEvent.java new file mode 100644 index 00000000..68e38eeb --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/api/ShipmentStatusUpdatedEvent.java @@ -0,0 +1,7 @@ +package io.axoniq.demo.workflowsaga.api; + +import org.axonframework.messaging.eventhandling.annotation.Event; + +@Event +public record ShipmentStatusUpdatedEvent(String shipmentId, ShipmentStatus shipmentStatus) { +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/config/AxonConfig.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/config/AxonConfig.java new file mode 100644 index 00000000..4f744896 --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/config/AxonConfig.java @@ -0,0 +1,15 @@ +package io.axoniq.demo.workflowsaga.config; + +import org.axonframework.messaging.eventhandling.processing.streaming.token.store.TokenStore; +import org.axonframework.messaging.eventhandling.processing.streaming.token.store.inmemory.InMemoryTokenStore; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class AxonConfig { + + @Bean + public TokenStore tokenStore() { + return new InMemoryTokenStore(); + } +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/controller/OrderProcessController.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/controller/OrderProcessController.java new file mode 100644 index 00000000..89f9a759 --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/controller/OrderProcessController.java @@ -0,0 +1,77 @@ +package io.axoniq.demo.workflowsaga.controller; + +import io.axoniq.demo.workflowsaga.api.OrderConfirmedEvent; +import io.axoniq.demo.workflowsaga.api.OrderPaidEvent; +import io.axoniq.demo.workflowsaga.api.OrderPaymentCancelledEvent; +import io.axoniq.demo.workflowsaga.api.ShipmentStatus; +import io.axoniq.demo.workflowsaga.api.ShipmentStatusUpdatedEvent; +import io.axoniq.demo.workflowsaga.projection.IdRegistry; +import io.axoniq.demo.workflowsaga.projection.OrderProcessProjection; +import io.axoniq.demo.workflowsaga.projection.OrderProcessStatus; +import org.axonframework.messaging.eventhandling.gateway.EventGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.UUID; + +@RestController +public class OrderProcessController { + + private static final Logger logger = LoggerFactory.getLogger(OrderProcessController.class); + + private final EventGateway eventGateway; + private final OrderProcessProjection projection; + private final IdRegistry idRegistry; + + public OrderProcessController(EventGateway eventGateway, + OrderProcessProjection projection, + IdRegistry idRegistry) { + this.eventGateway = eventGateway; + this.projection = projection; + this.idRegistry = idRegistry; + } + + @PostMapping("/orders") + public String confirmOrder() { + var orderId = UUID.randomUUID().toString(); + logger.info("Publishing OrderConfirmedEvent for order {}.", orderId); + eventGateway.publish(null, new OrderConfirmedEvent(orderId)); + return orderId; + } + + @PostMapping("/payments/{paymentId}/paid") + public void markPaid(@PathVariable("paymentId") String paymentId) { + eventGateway.publish(null, new OrderPaidEvent(paymentId)); + } + + @PostMapping("/payments/{paymentId}/cancel") + public void cancelPayment(@PathVariable("paymentId") String paymentId) { + eventGateway.publish(null, new OrderPaymentCancelledEvent(paymentId)); + } + + @PostMapping("/shipments/{shipmentId}/status") + public void updateShipmentStatus(@PathVariable("shipmentId") String shipmentId, + @RequestParam("status") ShipmentStatus status) { + eventGateway.publish(null, new ShipmentStatusUpdatedEvent(shipmentId, status)); + } + + @GetMapping("/orders/{orderId}") + public ResponseEntity getStatus(@PathVariable("orderId") String orderId) { + return projection.findById(orderId) + .map(ResponseEntity::ok) + .orElse(ResponseEntity.notFound().build()); + } + + @GetMapping("/orders/{orderId}/ids") + public ResponseEntity ids(@PathVariable("orderId") String orderId) { + return idRegistry.forOrder(orderId) + .map(ResponseEntity::ok) + .orElse(ResponseEntity.notFound().build()); + } +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/projection/IdRegistry.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/projection/IdRegistry.java new file mode 100644 index 00000000..9846ccf4 --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/projection/IdRegistry.java @@ -0,0 +1,43 @@ +package io.axoniq.demo.workflowsaga.projection; + +import io.axoniq.demo.workflowsaga.api.RequestPaymentStarted; +import io.axoniq.demo.workflowsaga.api.RequestShipmentStarted; +import org.axonframework.messaging.eventhandling.annotation.EventHandler; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Captures the {@code paymentId}/{@code shipmentId} that the workflow generates per order, so the + * integration test (or a UI) can correlate domain events back to the original order. The + * subscribed events are emitted automatically by the workflow's {@code awaitExecute} steps — + * nothing publishes them by hand. + */ +@Component +public class IdRegistry { + + public record Ids(String paymentId, String shipmentId) { + } + + private final Map byOrder = new ConcurrentHashMap<>(); + + @EventHandler + public void on(RequestPaymentStarted event) { + byOrder.compute(event.orderId(), (id, current) -> current == null + ? new Ids(event.paymentId(), null) + : new Ids(event.paymentId(), current.shipmentId())); + } + + @EventHandler + public void on(RequestShipmentStarted event) { + byOrder.compute(event.orderId(), (id, current) -> current == null + ? new Ids(null, event.shipmentId()) + : new Ids(current.paymentId(), event.shipmentId())); + } + + public Optional forOrder(String orderId) { + return Optional.ofNullable(byOrder.get(orderId)); + } +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/projection/OrderProcessProjection.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/projection/OrderProcessProjection.java new file mode 100644 index 00000000..cc8a6e61 --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/projection/OrderProcessProjection.java @@ -0,0 +1,35 @@ +package io.axoniq.demo.workflowsaga.projection; + +import io.axoniq.demo.workflowsaga.api.CompleteOrderStarted; +import io.axoniq.demo.workflowsaga.api.OrderConfirmedEvent; +import org.axonframework.messaging.eventhandling.annotation.EventHandler; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +@Component +public class OrderProcessProjection { + + private final Map orders = new ConcurrentHashMap<>(); + + @EventHandler + public void on(OrderConfirmedEvent event) { + orders.put(event.orderId(), + new OrderProcessStatus(event.orderId(), + OrderProcessStatus.Phase.IN_PROGRESS, + false, + false)); + } + + @EventHandler + public void on(CompleteOrderStarted event) { + orders.computeIfPresent(event.orderId(), + (id, current) -> current.completed(event.paid(), event.delivered())); + } + + public Optional findById(String orderId) { + return Optional.ofNullable(orders.get(orderId)); + } +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/projection/OrderProcessStatus.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/projection/OrderProcessStatus.java new file mode 100644 index 00000000..664ead63 --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/projection/OrderProcessStatus.java @@ -0,0 +1,18 @@ +package io.axoniq.demo.workflowsaga.projection; + +public record OrderProcessStatus( + String orderId, + Phase phase, + boolean paid, + boolean delivered +) { + + public enum Phase { + IN_PROGRESS, + COMPLETED + } + + public OrderProcessStatus completed(boolean paid, boolean delivered) { + return new OrderProcessStatus(orderId, Phase.COMPLETED, paid, delivered); + } +} diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/workflow/ProcessOrderWorkflow.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/workflow/ProcessOrderWorkflow.java new file mode 100644 index 00000000..d53ea3e8 --- /dev/null +++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/workflow/ProcessOrderWorkflow.java @@ -0,0 +1,130 @@ +package io.axoniq.demo.workflowsaga.workflow; + +import io.axoniq.demo.workflowsaga.api.OrderPaidEvent; +import io.axoniq.demo.workflowsaga.api.OrderPaymentCancelledEvent; +import io.axoniq.demo.workflowsaga.api.ShipmentStatus; +import io.axoniq.demo.workflowsaga.api.ShipmentStatusUpdatedEvent; +import io.axoniq.workflow.dsl.simple.SimpleWorkflowContext; +import io.axoniq.workflow.runtime.api.annotation.Workflow; +import io.axoniq.workflow.runtime.api.execution.context.EventNameCustomizer; +import io.axoniq.workflow.runtime.api.execution.state.WorkflowStepResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.Map; +import java.util.UUID; + +import static io.axoniq.workflow.dsl.api.AssociationsUtils.associate; +import static io.axoniq.workflow.dsl.api.Payload.payload; +import static io.axoniq.workflow.dsl.simple.SimpleWorkflowContext.equalsTo; +import static io.axoniq.workflow.runtime.association.PayloadPropertyValueRetriever.payloadProperty; +import static io.axoniq.workflow.runtime.execution.DefaultEventNameCustomizer.Builder.namespace; + +/** + * Workflow rewrite of the legacy {@code ProcessOrderSaga}. + *

+ * The workflow does not call {@code eventGateway.publish(...)} anywhere — every step the workflow + * runs (the {@code awaitExecute} calls) automatically produces a Started + Completed event in the + * event store. Those emitted events are the durable signal external systems and projections + * subscribe to. + */ +@Component +public class ProcessOrderWorkflow { + + private static final Logger logger = LoggerFactory.getLogger(ProcessOrderWorkflow.class); + private static final Duration ORDER_DEADLINE = Duration.ofDays(5); + private static final Duration STEP_TIMEOUT = Duration.ofSeconds(30); + + /** Emit step events under our own namespace so the projection's @{@code @EventHandler} records line up by FQCN. */ + private static EventNameCustomizer apiNamespace() { + return namespace("io.axoniq.demo.workflowsaga.api"); + } + + @Workflow( + idProperty = "orderId", + startOnEvent = "io.axoniq.demo.workflowsaga.api.OrderConfirmedEvent", + workflowName = "ProcessOrderWorkflow" + ) + public void execute(SimpleWorkflowContext ctx) { + var orderId = (String) ctx.workflowPayload().get("orderId"); + var paymentId = UUID.randomUUID().toString(); + var shipmentId = UUID.randomUUID().toString(); + ctx.setPayload("registerIds", payload("paymentId", paymentId, "shipmentId", shipmentId)); + + logger.info("Order {} workflow started (payment {}, shipment {}).", orderId, paymentId, shipmentId); + + var paid = ctx.waitForEvent( + "paid", + OrderPaidEvent.class, + associate(payloadProperty("paymentId"), equalsTo(paymentId)), + ORDER_DEADLINE + ); + var paymentCancelled = ctx.waitForEvent( + "paymentCancelled", + OrderPaymentCancelledEvent.class, + associate(payloadProperty("paymentId"), equalsTo(paymentId)), + ORDER_DEADLINE + ); + var delivered = ctx.waitForEvent( + "delivered", + ShipmentStatusUpdatedEvent.class, + associate(payloadProperty("shipmentId"), equalsTo(shipmentId)) + .and(payloadProperty("shipmentStatus"), "=", ShipmentStatus.DELIVERED.name()), + ORDER_DEADLINE + ); + + // The Started events of these steps — `RequestPaymentStarted` and `RequestShipmentStarted` + // — are the durable signals projections subscribe to. No manual publishing needed. + ctx.awaitExecute( + "requestPayment", + Map.of("orderId", orderId, "paymentId", paymentId), + (pc, p) -> Map.of(), + STEP_TIMEOUT, + apiNamespace() + ); + ctx.awaitExecute( + "requestShipment", + Map.of("orderId", orderId, "shipmentId", shipmentId), + (pc, p) -> Map.of(), + STEP_TIMEOUT, + apiNamespace() + ); + + var paymentOutcome = ctx.anyMatch(WorkflowStepResult::success, paid, paymentCancelled); + paymentOutcome.await(); + + var matched = paymentOutcome.matched().stream().map(WorkflowStepResult::getStepName).toList(); + if (matched.contains("paymentCancelled")) { + logger.info("Order {} payment cancelled — cancelling shipment.", orderId); + ctx.awaitExecute( + "cancelShipment", + Map.of("orderId", orderId, "shipmentId", shipmentId), + (pc, p) -> Map.of(), + STEP_TIMEOUT, + apiNamespace() + ); + delivered.cancel("payment cancelled"); + ctx.awaitExecute( + "completeOrder", + Map.of("orderId", orderId, "paid", false, "delivered", false), + (pc, p) -> Map.of(), + STEP_TIMEOUT, + apiNamespace() + ); + return; + } + + delivered.await(); + var isDelivered = delivered.success(); + logger.info("Order {} completed (paid {}, delivered {}).", orderId, true, isDelivered); + ctx.awaitExecute( + "completeOrder", + Map.of("orderId", orderId, "paid", true, "delivered", isDelivered), + (pc, p) -> Map.of(), + STEP_TIMEOUT, + apiNamespace() + ); + } +} diff --git a/workflow-saga/src/main/resources/application.yaml b/workflow-saga/src/main/resources/application.yaml new file mode 100644 index 00000000..14c682cb --- /dev/null +++ b/workflow-saga/src/main/resources/application.yaml @@ -0,0 +1,10 @@ +server: + port: 9091 + +spring: + application: + name: workflow-saga + +axon: + axon-server: + enabled: true diff --git a/workflow-saga/src/test/java/io/axoniq/demo/workflowsaga/ProcessOrderWorkflowIT.java b/workflow-saga/src/test/java/io/axoniq/demo/workflowsaga/ProcessOrderWorkflowIT.java new file mode 100644 index 00000000..28edd0da --- /dev/null +++ b/workflow-saga/src/test/java/io/axoniq/demo/workflowsaga/ProcessOrderWorkflowIT.java @@ -0,0 +1,81 @@ +package io.axoniq.demo.workflowsaga; + +import io.axoniq.demo.workflowsaga.projection.IdRegistry; +import io.axoniq.demo.workflowsaga.projection.OrderProcessStatus; +import io.axoniq.framework.testcontainer.AxonServerContainer; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.resttestclient.TestRestTemplate; +import org.springframework.boot.resttestclient.autoconfigure.AutoConfigureTestRestTemplate; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@AutoConfigureTestRestTemplate +@Testcontainers +class ProcessOrderWorkflowIT { + + @Container + static final AxonServerContainer AXON_SERVER = new AxonServerContainer() + .withAxonServerHostname("localhost") + .withDevMode(true) + .withDcbContext(true); + + @DynamicPropertySource + static void axonProperties(DynamicPropertyRegistry registry) { + registry.add("axon.axonserver.servers", + () -> AXON_SERVER.getHost() + ":" + AXON_SERVER.getMappedPort(8124)); + } + + @Autowired + private TestRestTemplate restTemplate; + + @Test + void happy_path_paid_then_delivered() { + var orderId = restTemplate.postForObject("/orders", null, String.class); + assertThat(orderId).isNotBlank(); + + var ids = await().atMost(20, TimeUnit.SECONDS).until( + () -> restTemplate.getForObject("/orders/" + orderId + "/ids", IdRegistry.Ids.class), + i -> i != null && i.paymentId() != null && i.shipmentId() != null); + + restTemplate.postForEntity("/payments/" + ids.paymentId() + "/paid", null, Void.class); + restTemplate.postForEntity("/shipments/" + ids.shipmentId() + "/status?status=DELIVERED", null, Void.class); + + await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + var status = restTemplate.getForObject("/orders/" + orderId, OrderProcessStatus.class); + assertThat(status).isNotNull(); + assertThat(status.phase()).isEqualTo(OrderProcessStatus.Phase.COMPLETED); + assertThat(status.paid()).isTrue(); + assertThat(status.delivered()).isTrue(); + }); + } + + @Test + void payment_cancelled_triggers_shipment_cancellation() { + var orderId = restTemplate.postForObject("/orders", null, String.class); + assertThat(orderId).isNotBlank(); + + var ids = await().atMost(20, TimeUnit.SECONDS).until( + () -> restTemplate.getForObject("/orders/" + orderId + "/ids", IdRegistry.Ids.class), + i -> i != null && i.paymentId() != null && i.shipmentId() != null); + + restTemplate.postForEntity("/payments/" + ids.paymentId() + "/cancel", null, Void.class); + + await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + var status = restTemplate.getForObject("/orders/" + orderId, OrderProcessStatus.class); + assertThat(status).isNotNull(); + assertThat(status.phase()).isEqualTo(OrderProcessStatus.Phase.COMPLETED); + assertThat(status.paid()).isFalse(); + assertThat(status.delivered()).isFalse(); + }); + } +}