findById(String orderId) {
+ return Optional.ofNullable(orders.get(orderId));
+ }
+}
diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/projection/OrderProcessStatus.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/projection/OrderProcessStatus.java
new file mode 100644
index 00000000..664ead63
--- /dev/null
+++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/projection/OrderProcessStatus.java
@@ -0,0 +1,18 @@
+package io.axoniq.demo.workflowsaga.projection;
+
+public record OrderProcessStatus(
+ String orderId,
+ Phase phase,
+ boolean paid,
+ boolean delivered
+) {
+
+ public enum Phase {
+ IN_PROGRESS,
+ COMPLETED
+ }
+
+ public OrderProcessStatus completed(boolean paid, boolean delivered) {
+ return new OrderProcessStatus(orderId, Phase.COMPLETED, paid, delivered);
+ }
+}
diff --git a/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/workflow/ProcessOrderWorkflow.java b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/workflow/ProcessOrderWorkflow.java
new file mode 100644
index 00000000..d53ea3e8
--- /dev/null
+++ b/workflow-saga/src/main/java/io/axoniq/demo/workflowsaga/workflow/ProcessOrderWorkflow.java
@@ -0,0 +1,130 @@
+package io.axoniq.demo.workflowsaga.workflow;
+
+import io.axoniq.demo.workflowsaga.api.OrderPaidEvent;
+import io.axoniq.demo.workflowsaga.api.OrderPaymentCancelledEvent;
+import io.axoniq.demo.workflowsaga.api.ShipmentStatus;
+import io.axoniq.demo.workflowsaga.api.ShipmentStatusUpdatedEvent;
+import io.axoniq.workflow.dsl.simple.SimpleWorkflowContext;
+import io.axoniq.workflow.runtime.api.annotation.Workflow;
+import io.axoniq.workflow.runtime.api.execution.context.EventNameCustomizer;
+import io.axoniq.workflow.runtime.api.execution.state.WorkflowStepResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.UUID;
+
+import static io.axoniq.workflow.dsl.api.AssociationsUtils.associate;
+import static io.axoniq.workflow.dsl.api.Payload.payload;
+import static io.axoniq.workflow.dsl.simple.SimpleWorkflowContext.equalsTo;
+import static io.axoniq.workflow.runtime.association.PayloadPropertyValueRetriever.payloadProperty;
+import static io.axoniq.workflow.runtime.execution.DefaultEventNameCustomizer.Builder.namespace;
+
+/**
+ * Workflow rewrite of the legacy {@code ProcessOrderSaga}.
+ *
+ * The workflow does not call {@code eventGateway.publish(...)} anywhere — every step the workflow
+ * runs (the {@code awaitExecute} calls) automatically produces a Started + Completed event in the
+ * event store. Those emitted events are the durable signal external systems and projections
+ * subscribe to.
+ */
+@Component
+public class ProcessOrderWorkflow {
+
+ private static final Logger logger = LoggerFactory.getLogger(ProcessOrderWorkflow.class);
+ private static final Duration ORDER_DEADLINE = Duration.ofDays(5);
+ private static final Duration STEP_TIMEOUT = Duration.ofSeconds(30);
+
+ /** Emit step events under our own namespace so the projection's @{@code @EventHandler} records line up by FQCN. */
+ private static EventNameCustomizer apiNamespace() {
+ return namespace("io.axoniq.demo.workflowsaga.api");
+ }
+
+ @Workflow(
+ idProperty = "orderId",
+ startOnEvent = "io.axoniq.demo.workflowsaga.api.OrderConfirmedEvent",
+ workflowName = "ProcessOrderWorkflow"
+ )
+ public void execute(SimpleWorkflowContext ctx) {
+ var orderId = (String) ctx.workflowPayload().get("orderId");
+ var paymentId = UUID.randomUUID().toString();
+ var shipmentId = UUID.randomUUID().toString();
+ ctx.setPayload("registerIds", payload("paymentId", paymentId, "shipmentId", shipmentId));
+
+ logger.info("Order {} workflow started (payment {}, shipment {}).", orderId, paymentId, shipmentId);
+
+ var paid = ctx.waitForEvent(
+ "paid",
+ OrderPaidEvent.class,
+ associate(payloadProperty("paymentId"), equalsTo(paymentId)),
+ ORDER_DEADLINE
+ );
+ var paymentCancelled = ctx.waitForEvent(
+ "paymentCancelled",
+ OrderPaymentCancelledEvent.class,
+ associate(payloadProperty("paymentId"), equalsTo(paymentId)),
+ ORDER_DEADLINE
+ );
+ var delivered = ctx.waitForEvent(
+ "delivered",
+ ShipmentStatusUpdatedEvent.class,
+ associate(payloadProperty("shipmentId"), equalsTo(shipmentId))
+ .and(payloadProperty("shipmentStatus"), "=", ShipmentStatus.DELIVERED.name()),
+ ORDER_DEADLINE
+ );
+
+ // The Started events of these steps — `RequestPaymentStarted` and `RequestShipmentStarted`
+ // — are the durable signals projections subscribe to. No manual publishing needed.
+ ctx.awaitExecute(
+ "requestPayment",
+ Map.of("orderId", orderId, "paymentId", paymentId),
+ (pc, p) -> Map.of(),
+ STEP_TIMEOUT,
+ apiNamespace()
+ );
+ ctx.awaitExecute(
+ "requestShipment",
+ Map.of("orderId", orderId, "shipmentId", shipmentId),
+ (pc, p) -> Map.of(),
+ STEP_TIMEOUT,
+ apiNamespace()
+ );
+
+ var paymentOutcome = ctx.anyMatch(WorkflowStepResult::success, paid, paymentCancelled);
+ paymentOutcome.await();
+
+ var matched = paymentOutcome.matched().stream().map(WorkflowStepResult::getStepName).toList();
+ if (matched.contains("paymentCancelled")) {
+ logger.info("Order {} payment cancelled — cancelling shipment.", orderId);
+ ctx.awaitExecute(
+ "cancelShipment",
+ Map.of("orderId", orderId, "shipmentId", shipmentId),
+ (pc, p) -> Map.of(),
+ STEP_TIMEOUT,
+ apiNamespace()
+ );
+ delivered.cancel("payment cancelled");
+ ctx.awaitExecute(
+ "completeOrder",
+ Map.of("orderId", orderId, "paid", false, "delivered", false),
+ (pc, p) -> Map.of(),
+ STEP_TIMEOUT,
+ apiNamespace()
+ );
+ return;
+ }
+
+ delivered.await();
+ var isDelivered = delivered.success();
+ logger.info("Order {} completed (paid {}, delivered {}).", orderId, true, isDelivered);
+ ctx.awaitExecute(
+ "completeOrder",
+ Map.of("orderId", orderId, "paid", true, "delivered", isDelivered),
+ (pc, p) -> Map.of(),
+ STEP_TIMEOUT,
+ apiNamespace()
+ );
+ }
+}
diff --git a/workflow-saga/src/main/resources/application.yaml b/workflow-saga/src/main/resources/application.yaml
new file mode 100644
index 00000000..14c682cb
--- /dev/null
+++ b/workflow-saga/src/main/resources/application.yaml
@@ -0,0 +1,10 @@
+server:
+ port: 9091
+
+spring:
+ application:
+ name: workflow-saga
+
+axon:
+ axon-server:
+ enabled: true
diff --git a/workflow-saga/src/test/java/io/axoniq/demo/workflowsaga/ProcessOrderWorkflowIT.java b/workflow-saga/src/test/java/io/axoniq/demo/workflowsaga/ProcessOrderWorkflowIT.java
new file mode 100644
index 00000000..28edd0da
--- /dev/null
+++ b/workflow-saga/src/test/java/io/axoniq/demo/workflowsaga/ProcessOrderWorkflowIT.java
@@ -0,0 +1,81 @@
+package io.axoniq.demo.workflowsaga;
+
+import io.axoniq.demo.workflowsaga.projection.IdRegistry;
+import io.axoniq.demo.workflowsaga.projection.OrderProcessStatus;
+import io.axoniq.framework.testcontainer.AxonServerContainer;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.resttestclient.TestRestTemplate;
+import org.springframework.boot.resttestclient.autoconfigure.AutoConfigureTestRestTemplate;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+@AutoConfigureTestRestTemplate
+@Testcontainers
+class ProcessOrderWorkflowIT {
+
+ @Container
+ static final AxonServerContainer AXON_SERVER = new AxonServerContainer()
+ .withAxonServerHostname("localhost")
+ .withDevMode(true)
+ .withDcbContext(true);
+
+ @DynamicPropertySource
+ static void axonProperties(DynamicPropertyRegistry registry) {
+ registry.add("axon.axonserver.servers",
+ () -> AXON_SERVER.getHost() + ":" + AXON_SERVER.getMappedPort(8124));
+ }
+
+ @Autowired
+ private TestRestTemplate restTemplate;
+
+ @Test
+ void happy_path_paid_then_delivered() {
+ var orderId = restTemplate.postForObject("/orders", null, String.class);
+ assertThat(orderId).isNotBlank();
+
+ var ids = await().atMost(20, TimeUnit.SECONDS).until(
+ () -> restTemplate.getForObject("/orders/" + orderId + "/ids", IdRegistry.Ids.class),
+ i -> i != null && i.paymentId() != null && i.shipmentId() != null);
+
+ restTemplate.postForEntity("/payments/" + ids.paymentId() + "/paid", null, Void.class);
+ restTemplate.postForEntity("/shipments/" + ids.shipmentId() + "/status?status=DELIVERED", null, Void.class);
+
+ await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+ var status = restTemplate.getForObject("/orders/" + orderId, OrderProcessStatus.class);
+ assertThat(status).isNotNull();
+ assertThat(status.phase()).isEqualTo(OrderProcessStatus.Phase.COMPLETED);
+ assertThat(status.paid()).isTrue();
+ assertThat(status.delivered()).isTrue();
+ });
+ }
+
+ @Test
+ void payment_cancelled_triggers_shipment_cancellation() {
+ var orderId = restTemplate.postForObject("/orders", null, String.class);
+ assertThat(orderId).isNotBlank();
+
+ var ids = await().atMost(20, TimeUnit.SECONDS).until(
+ () -> restTemplate.getForObject("/orders/" + orderId + "/ids", IdRegistry.Ids.class),
+ i -> i != null && i.paymentId() != null && i.shipmentId() != null);
+
+ restTemplate.postForEntity("/payments/" + ids.paymentId() + "/cancel", null, Void.class);
+
+ await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+ var status = restTemplate.getForObject("/orders/" + orderId, OrderProcessStatus.class);
+ assertThat(status).isNotNull();
+ assertThat(status.phase()).isEqualTo(OrderProcessStatus.Phase.COMPLETED);
+ assertThat(status.paid()).isFalse();
+ assertThat(status.delivered()).isFalse();
+ });
+ }
+}
From 8760436795dff1dd9a76a5f1e949db5f0f53de58 Mon Sep 17 00:00:00 2001
From: Stefan <91stefan@gmail.com>
Date: Thu, 30 Apr 2026 13:37:42 +0200
Subject: [PATCH 2/5] ui 1
---
.../controller/OrderController.java | 14 ++-
.../projection/OrderEventStream.java | 80 ++++++++++++
.../src/main/resources/static/index.html | 115 ++++++++++++++++++
3 files changed, 208 insertions(+), 1 deletion(-)
create mode 100644 order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderEventStream.java
create mode 100644 order-fulfillment-workflow/src/main/resources/static/index.html
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/controller/OrderController.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/controller/OrderController.java
index 119ccb38..147d5b11 100644
--- a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/controller/OrderController.java
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/controller/OrderController.java
@@ -2,11 +2,13 @@
import io.axoniq.demo.orderfulfillment.api.OrderPlaced;
import io.axoniq.demo.orderfulfillment.api.PaymentConfirmed;
+import io.axoniq.demo.orderfulfillment.projection.OrderEventStream;
import io.axoniq.demo.orderfulfillment.projection.OrderStatus;
import io.axoniq.demo.orderfulfillment.projection.OrderStatusProjection;
import org.axonframework.messaging.eventhandling.gateway.EventGateway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@@ -14,6 +16,7 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.UUID;
@@ -25,10 +28,14 @@ public class OrderController {
private final EventGateway eventGateway;
private final OrderStatusProjection projection;
+ private final OrderEventStream eventStream;
- public OrderController(EventGateway eventGateway, OrderStatusProjection projection) {
+ public OrderController(EventGateway eventGateway,
+ OrderStatusProjection projection,
+ OrderEventStream eventStream) {
this.eventGateway = eventGateway;
this.projection = projection;
+ this.eventStream = eventStream;
}
@PostMapping
@@ -53,4 +60,9 @@ public ResponseEntity getStatus(@PathVariable("orderId") String ord
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
+
+ @GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public SseEmitter stream() {
+ return eventStream.subscribe();
+ }
}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderEventStream.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderEventStream.java
new file mode 100644
index 00000000..1892604c
--- /dev/null
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderEventStream.java
@@ -0,0 +1,80 @@
+package io.axoniq.demo.orderfulfillment.projection;
+
+import io.axoniq.demo.orderfulfillment.api.InitiatingPaymentForCustomerStarted;
+import io.axoniq.demo.orderfulfillment.api.OrderPlaced;
+import io.axoniq.demo.orderfulfillment.api.ShipOrderCompleted;
+import org.axonframework.messaging.eventhandling.annotation.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * Pushes workflow Started/Completed updates to subscribed SSE clients. Uses the same workflow
+ * events the {@link OrderStatusProjection} listens to, so the live view reflects the durable
+ * projection state.
+ */
+@Component
+public class OrderEventStream {
+
+ private static final Logger logger = LoggerFactory.getLogger(OrderEventStream.class);
+
+ private final List emitters = new CopyOnWriteArrayList<>();
+
+ public SseEmitter subscribe() {
+ var emitter = new SseEmitter(0L);
+ emitters.add(emitter);
+ emitter.onCompletion(() -> emitters.remove(emitter));
+ emitter.onTimeout(() -> emitters.remove(emitter));
+ emitter.onError(e -> emitters.remove(emitter));
+ return emitter;
+ }
+
+ @EventHandler
+ public void on(OrderPlaced event) {
+ broadcast(Map.of(
+ "type", "PLACED",
+ "orderId", event.orderId(),
+ "customerId", event.customerId(),
+ "email", event.email(),
+ "amount", event.amount(),
+ "timestamp", Instant.now().toString()
+ ));
+ }
+
+ @EventHandler
+ public void on(InitiatingPaymentForCustomerStarted event) {
+ broadcast(Map.of(
+ "type", "AWAITING_PAYMENT",
+ "orderId", event.orderId(),
+ "timestamp", Instant.now().toString()
+ ));
+ }
+
+ @EventHandler
+ public void on(ShipOrderCompleted event) {
+ broadcast(Map.of(
+ "type", "SHIPPED",
+ "orderId", event.orderId(),
+ "trackingNumber", event.trackingNumber(),
+ "timestamp", Instant.now().toString()
+ ));
+ }
+
+ private void broadcast(Map payload) {
+ for (SseEmitter emitter : emitters) {
+ try {
+ emitter.send(SseEmitter.event().name("order").data(payload));
+ } catch (IOException | IllegalStateException e) {
+ logger.debug("Dropping dead SSE emitter: {}", e.getMessage());
+ emitters.remove(emitter);
+ }
+ }
+ }
+}
diff --git a/order-fulfillment-workflow/src/main/resources/static/index.html b/order-fulfillment-workflow/src/main/resources/static/index.html
new file mode 100644
index 00000000..c8ff51fe
--- /dev/null
+++ b/order-fulfillment-workflow/src/main/resources/static/index.html
@@ -0,0 +1,115 @@
+
+
+
+
+ Order Fulfillment — Live Tracking
+
+
+
+Order Fulfillment
+Live workflow tracking via Server-Sent Events.
+
+
+
+
+
+
+
+
Live updates
+ connecting…
+
+
+
+ | Time | Order | Status | Detail |
+
+
+
+
+
+
+
+
From cb50ac2d1a47f42a4de68fd1a6004412efd1e3c2 Mon Sep 17 00:00:00 2001
From: Stefan <91stefan@gmail.com>
Date: Thu, 30 Apr 2026 14:01:08 +0200
Subject: [PATCH 3/5] ui 2 simulation
---
.../orderfulfillment/api/OrderDelivered.java | 10 +
.../orderfulfillment/api/OrderFailed.java | 10 +
.../orderfulfillment/api/OrderPlaced.java | 9 +-
.../api/ShipOrderCompleted.java | 14 +-
.../api/TruckLocationUpdated.java | 16 +
.../controller/OrderController.java | 14 +-
.../projection/OrderEventStream.java | 82 +++-
.../projection/OrderStatus.java | 60 ++-
.../projection/OrderStatusProjection.java | 44 +-
.../service/FailureRecorder.java | 29 ++
.../service/InventoryService.java | 10 +-
.../service/ShippingService.java | 12 +-
.../simulator/AutoPaymentRobot.java | 67 +++
.../orderfulfillment/simulator/Cities.java | 48 ++
.../simulator/CustomerPool.java | 33 ++
.../simulator/SimulationController.java | 45 ++
.../orderfulfillment/simulator/Simulator.java | 58 +++
.../simulator/TruckMovementSimulator.java | 81 ++++
.../workflow/OrderFulfillmentWorkflow.java | 63 ++-
.../src/main/resources/static/index.html | 442 ++++++++++++++----
.../orderfulfillment/OrderFulfillmentIT.java | 17 +-
21 files changed, 1011 insertions(+), 153 deletions(-)
create mode 100644 order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderDelivered.java
create mode 100644 order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderFailed.java
create mode 100644 order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/TruckLocationUpdated.java
create mode 100644 order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/FailureRecorder.java
create mode 100644 order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/AutoPaymentRobot.java
create mode 100644 order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Cities.java
create mode 100644 order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/CustomerPool.java
create mode 100644 order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/SimulationController.java
create mode 100644 order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Simulator.java
create mode 100644 order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/TruckMovementSimulator.java
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderDelivered.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderDelivered.java
new file mode 100644
index 00000000..66d513e4
--- /dev/null
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderDelivered.java
@@ -0,0 +1,10 @@
+package io.axoniq.demo.orderfulfillment.api;
+
+import org.axonframework.messaging.eventhandling.annotation.Event;
+
+@Event
+public record OrderDelivered(
+ String orderId,
+ String trackingNumber
+) {
+}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderFailed.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderFailed.java
new file mode 100644
index 00000000..43ba23aa
--- /dev/null
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderFailed.java
@@ -0,0 +1,10 @@
+package io.axoniq.demo.orderfulfillment.api;
+
+import org.axonframework.messaging.eventhandling.annotation.Event;
+
+@Event
+public record OrderFailed(
+ String orderId,
+ String reason
+) {
+}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderPlaced.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderPlaced.java
index 2e3190de..66bbb494 100644
--- a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderPlaced.java
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/OrderPlaced.java
@@ -7,6 +7,13 @@ public record OrderPlaced(
String orderId,
String customerId,
String email,
- double amount
+ double amount,
+ String originCity,
+ double originLat,
+ double originLng,
+ String destinationCity,
+ double destinationLat,
+ double destinationLng,
+ String scenario
) {
}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/ShipOrderCompleted.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/ShipOrderCompleted.java
index 509fff4a..b6572a68 100644
--- a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/ShipOrderCompleted.java
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/ShipOrderCompleted.java
@@ -2,8 +2,16 @@
/**
* Emitted automatically by the workflow engine when the {@code shipOrder} step completes.
- * The Completed event's payload is whatever the action returned — here {@code orderId} and
- * {@code trackingNumber}. The workflow does not publish this event itself.
+ * The Completed event's payload is whatever the action returned — here {@code orderId},
+ * {@code trackingNumber} and the route coordinates so downstream simulators can animate
+ * truck movement without re-reading the projection.
*/
-public record ShipOrderCompleted(String orderId, String trackingNumber) {
+public record ShipOrderCompleted(
+ String orderId,
+ String trackingNumber,
+ double originLat,
+ double originLng,
+ double destinationLat,
+ double destinationLng
+) {
}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/TruckLocationUpdated.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/TruckLocationUpdated.java
new file mode 100644
index 00000000..7053f336
--- /dev/null
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/api/TruckLocationUpdated.java
@@ -0,0 +1,16 @@
+package io.axoniq.demo.orderfulfillment.api;
+
+import org.axonframework.messaging.eventhandling.annotation.Event;
+
+/**
+ * Published by the truck-movement simulator while a shipment is in transit. Not part of the
+ * workflow — purely a visualization signal driving the live map.
+ */
+@Event
+public record TruckLocationUpdated(
+ String orderId,
+ double lat,
+ double lng,
+ double progress
+) {
+}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/controller/OrderController.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/controller/OrderController.java
index 147d5b11..58885465 100644
--- a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/controller/OrderController.java
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/controller/OrderController.java
@@ -5,6 +5,7 @@
import io.axoniq.demo.orderfulfillment.projection.OrderEventStream;
import io.axoniq.demo.orderfulfillment.projection.OrderStatus;
import io.axoniq.demo.orderfulfillment.projection.OrderStatusProjection;
+import io.axoniq.demo.orderfulfillment.simulator.Cities;
import org.axonframework.messaging.eventhandling.gateway.EventGateway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,10 +42,17 @@ public OrderController(EventGateway eventGateway,
@PostMapping
public String placeOrder(@RequestParam("customerId") String customerId,
@RequestParam("email") String email,
- @RequestParam("amount") double amount) {
+ @RequestParam("amount") double amount,
+ @RequestParam(value = "scenario", required = false) String scenario) {
var orderId = UUID.randomUUID().toString();
- logger.info("Publishing OrderPlaced for order {}.", orderId);
- eventGateway.publish(null, new OrderPlaced(orderId, customerId, email, amount));
+ var route = Cities.randomPair();
+ logger.info("Publishing OrderPlaced for order {} ({} → {}).",
+ orderId, route[0].name(), route[1].name());
+ eventGateway.publish(null, new OrderPlaced(
+ orderId, customerId, email, amount,
+ route[0].name(), route[0].lat(), route[0].lng(),
+ route[1].name(), route[1].lat(), route[1].lng(),
+ scenario == null ? "happy" : scenario));
return orderId;
}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderEventStream.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderEventStream.java
index 1892604c..997f48b4 100644
--- a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderEventStream.java
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderEventStream.java
@@ -1,8 +1,11 @@
package io.axoniq.demo.orderfulfillment.projection;
import io.axoniq.demo.orderfulfillment.api.InitiatingPaymentForCustomerStarted;
+import io.axoniq.demo.orderfulfillment.api.OrderDelivered;
+import io.axoniq.demo.orderfulfillment.api.OrderFailed;
import io.axoniq.demo.orderfulfillment.api.OrderPlaced;
import io.axoniq.demo.orderfulfillment.api.ShipOrderCompleted;
+import io.axoniq.demo.orderfulfillment.api.TruckLocationUpdated;
import org.axonframework.messaging.eventhandling.annotation.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -11,14 +14,15 @@
import java.io.IOException;
import java.time.Instant;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
/**
- * Pushes workflow Started/Completed updates to subscribed SSE clients. Uses the same workflow
- * events the {@link OrderStatusProjection} listens to, so the live view reflects the durable
- * projection state.
+ * Pushes workflow Started/Completed updates plus simulator location ticks to subscribed SSE
+ * clients. New subscribers receive a snapshot of all known orders so the map can render existing
+ * shipments immediately.
*/
@Component
public class OrderEventStream {
@@ -26,6 +30,11 @@ public class OrderEventStream {
private static final Logger logger = LoggerFactory.getLogger(OrderEventStream.class);
private final List emitters = new CopyOnWriteArrayList<>();
+ private final OrderStatusProjection projection;
+
+ public OrderEventStream(OrderStatusProjection projection) {
+ this.projection = projection;
+ }
public SseEmitter subscribe() {
var emitter = new SseEmitter(0L);
@@ -33,24 +42,36 @@ public SseEmitter subscribe() {
emitter.onCompletion(() -> emitters.remove(emitter));
emitter.onTimeout(() -> emitters.remove(emitter));
emitter.onError(e -> emitters.remove(emitter));
+ try {
+ emitter.send(SseEmitter.event().name("snapshot").data(projection.findAll()));
+ } catch (IOException e) {
+ emitters.remove(emitter);
+ }
return emitter;
}
@EventHandler
public void on(OrderPlaced event) {
- broadcast(Map.of(
- "type", "PLACED",
- "orderId", event.orderId(),
- "customerId", event.customerId(),
- "email", event.email(),
- "amount", event.amount(),
- "timestamp", Instant.now().toString()
- ));
+ var payload = new HashMap();
+ payload.put("type", "PLACED");
+ payload.put("orderId", event.orderId());
+ payload.put("customerId", event.customerId());
+ payload.put("email", event.email());
+ payload.put("amount", event.amount());
+ payload.put("originCity", event.originCity());
+ payload.put("originLat", event.originLat());
+ payload.put("originLng", event.originLng());
+ payload.put("destinationCity", event.destinationCity());
+ payload.put("destinationLat", event.destinationLat());
+ payload.put("destinationLng", event.destinationLng());
+ payload.put("scenario", event.scenario());
+ payload.put("timestamp", Instant.now().toString());
+ broadcast("order", payload);
}
@EventHandler
public void on(InitiatingPaymentForCustomerStarted event) {
- broadcast(Map.of(
+ broadcast("order", Map.of(
"type", "AWAITING_PAYMENT",
"orderId", event.orderId(),
"timestamp", Instant.now().toString()
@@ -59,18 +80,47 @@ public void on(InitiatingPaymentForCustomerStarted event) {
@EventHandler
public void on(ShipOrderCompleted event) {
- broadcast(Map.of(
- "type", "SHIPPED",
+ broadcast("order", Map.of(
+ "type", "IN_TRANSIT",
"orderId", event.orderId(),
"trackingNumber", event.trackingNumber(),
"timestamp", Instant.now().toString()
));
}
- private void broadcast(Map payload) {
+ @EventHandler
+ public void on(TruckLocationUpdated event) {
+ broadcast("location", Map.of(
+ "orderId", event.orderId(),
+ "lat", event.lat(),
+ "lng", event.lng(),
+ "progress", event.progress()
+ ));
+ }
+
+ @EventHandler
+ public void on(OrderDelivered event) {
+ broadcast("order", Map.of(
+ "type", "DELIVERED",
+ "orderId", event.orderId(),
+ "timestamp", Instant.now().toString()
+ ));
+ }
+
+ @EventHandler
+ public void on(OrderFailed event) {
+ broadcast("order", Map.of(
+ "type", "FAILED",
+ "orderId", event.orderId(),
+ "reason", event.reason(),
+ "timestamp", Instant.now().toString()
+ ));
+ }
+
+ private void broadcast(String name, Object payload) {
for (SseEmitter emitter : emitters) {
try {
- emitter.send(SseEmitter.event().name("order").data(payload));
+ emitter.send(SseEmitter.event().name(name).data(payload));
} catch (IOException | IllegalStateException e) {
logger.debug("Dropping dead SSE emitter: {}", e.getMessage());
emitters.remove(emitter);
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatus.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatus.java
index 944e5ede..221457a9 100644
--- a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatus.java
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatus.java
@@ -6,20 +6,70 @@ public record OrderStatus(
String email,
double amount,
Status status,
- String trackingNumber
+ String trackingNumber,
+ String originCity,
+ double originLat,
+ double originLng,
+ String destinationCity,
+ double destinationLat,
+ double destinationLng,
+ double currentLat,
+ double currentLng,
+ double progress,
+ String scenario,
+ String failureReason
) {
public enum Status {
PLACED,
AWAITING_PAYMENT,
- SHIPPED
+ IN_TRANSIT,
+ DELIVERED,
+ FAILED
}
public OrderStatus awaitingPayment() {
- return new OrderStatus(orderId, customerId, email, amount, Status.AWAITING_PAYMENT, trackingNumber);
+ return new OrderStatus(orderId, customerId, email, amount, Status.AWAITING_PAYMENT,
+ trackingNumber,
+ originCity, originLat, originLng,
+ destinationCity, destinationLat, destinationLng,
+ currentLat, currentLng, progress,
+ scenario, failureReason);
}
- public OrderStatus shipped(String trackingNumber) {
- return new OrderStatus(orderId, customerId, email, amount, Status.SHIPPED, trackingNumber);
+ public OrderStatus dispatched(String trackingNumber) {
+ return new OrderStatus(orderId, customerId, email, amount, Status.IN_TRANSIT,
+ trackingNumber,
+ originCity, originLat, originLng,
+ destinationCity, destinationLat, destinationLng,
+ originLat, originLng, 0.0,
+ scenario, failureReason);
+ }
+
+ public OrderStatus moved(double lat, double lng, double progress) {
+ return new OrderStatus(orderId, customerId, email, amount, status,
+ trackingNumber,
+ originCity, originLat, originLng,
+ destinationCity, destinationLat, destinationLng,
+ lat, lng, progress,
+ scenario, failureReason);
+ }
+
+ public OrderStatus delivered() {
+ return new OrderStatus(orderId, customerId, email, amount, Status.DELIVERED,
+ trackingNumber,
+ originCity, originLat, originLng,
+ destinationCity, destinationLat, destinationLng,
+ destinationLat, destinationLng, 1.0,
+ scenario, failureReason);
+ }
+
+ public OrderStatus failed(String reason) {
+ return new OrderStatus(orderId, customerId, email, amount, Status.FAILED,
+ trackingNumber,
+ originCity, originLat, originLng,
+ destinationCity, destinationLat, destinationLng,
+ currentLat, currentLng, progress,
+ scenario, reason);
}
}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatusProjection.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatusProjection.java
index 86f57e2e..d7f2cfe9 100644
--- a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatusProjection.java
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/projection/OrderStatusProjection.java
@@ -1,21 +1,24 @@
package io.axoniq.demo.orderfulfillment.projection;
import io.axoniq.demo.orderfulfillment.api.InitiatingPaymentForCustomerStarted;
+import io.axoniq.demo.orderfulfillment.api.OrderDelivered;
+import io.axoniq.demo.orderfulfillment.api.OrderFailed;
import io.axoniq.demo.orderfulfillment.api.OrderPlaced;
import io.axoniq.demo.orderfulfillment.api.ShipOrderCompleted;
+import io.axoniq.demo.orderfulfillment.api.TruckLocationUpdated;
import org.axonframework.messaging.eventhandling.annotation.EventHandler;
import org.springframework.stereotype.Component;
+import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
- * Builds order status by listening to the externally-published {@link OrderPlaced} and to the
- * workflow engine's auto-emitted {@link InitiatingPaymentForCustomerStarted} (the Started event
- * of the {@code initiatePayment} step) and {@link ShipOrderCompleted} (the Completed event of
- * the {@code shipOrder} step).
+ * Builds order status from workflow Started/Completed events plus the simulator's truck-movement
+ * events. The projection is the system of record for the live UI — every status change here is
+ * what the SSE stream broadcasts.
*/
@Component
public class OrderStatusProjection {
@@ -30,6 +33,17 @@ public void on(OrderPlaced event) {
event.email(),
event.amount(),
OrderStatus.Status.PLACED,
+ null,
+ event.originCity(),
+ event.originLat(),
+ event.originLng(),
+ event.destinationCity(),
+ event.destinationLat(),
+ event.destinationLng(),
+ event.originLat(),
+ event.originLng(),
+ 0.0,
+ event.scenario(),
null));
}
@@ -41,10 +55,30 @@ public void on(InitiatingPaymentForCustomerStarted event) {
@EventHandler
public void on(ShipOrderCompleted event) {
orders.computeIfPresent(event.orderId(),
- (id, current) -> current.shipped(event.trackingNumber()));
+ (id, current) -> current.dispatched(event.trackingNumber()));
+ }
+
+ @EventHandler
+ public void on(TruckLocationUpdated event) {
+ orders.computeIfPresent(event.orderId(),
+ (id, current) -> current.moved(event.lat(), event.lng(), event.progress()));
+ }
+
+ @EventHandler
+ public void on(OrderDelivered event) {
+ orders.computeIfPresent(event.orderId(), (id, current) -> current.delivered());
+ }
+
+ @EventHandler
+ public void on(OrderFailed event) {
+ orders.computeIfPresent(event.orderId(), (id, current) -> current.failed(event.reason()));
}
public Optional findById(String orderId) {
return Optional.ofNullable(orders.get(orderId));
}
+
+ public Collection findAll() {
+ return orders.values();
+ }
}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/FailureRecorder.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/FailureRecorder.java
new file mode 100644
index 00000000..fe0a74fb
--- /dev/null
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/FailureRecorder.java
@@ -0,0 +1,29 @@
+package io.axoniq.demo.orderfulfillment.service;
+
+import io.axoniq.demo.orderfulfillment.api.OrderFailed;
+import org.axonframework.messaging.eventhandling.gateway.EventGateway;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * Lets the workflow publish an {@link OrderFailed} event from inside an {@code awaitExecute}
+ * step, so failure broadcasts go through the regular event pipeline instead of being side-effects
+ * of the workflow body.
+ */
+@Component
+public class FailureRecorder {
+
+ private final EventGateway eventGateway;
+
+ public FailureRecorder(EventGateway eventGateway) {
+ this.eventGateway = eventGateway;
+ }
+
+ public Boolean record(Map payload) {
+ var orderId = (String) payload.get("orderId");
+ var reason = (String) payload.get("reason");
+ eventGateway.publish(null, new OrderFailed(orderId, reason));
+ return true;
+ }
+}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/InventoryService.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/InventoryService.java
index bb0f7750..2da3b687 100644
--- a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/InventoryService.java
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/InventoryService.java
@@ -12,8 +12,14 @@ public class InventoryService {
private static final Logger logger = LoggerFactory.getLogger(InventoryService.class);
public boolean reserveStock(Map payload) {
- logger.info("Reserving stock for customer {} (amount {}).",
- payload.get("customerId"), payload.get("amount"));
+ var customerId = payload.get("customerId");
+ var amount = payload.get("amount");
+ var scenario = (String) payload.get("scenario");
+ if ("out-of-stock".equals(scenario)) {
+ logger.info("Stock unavailable (forced) for customer {} (amount {}).", customerId, amount);
+ return false;
+ }
+ logger.info("Reserving stock for customer {} (amount {}).", customerId, amount);
return true;
}
}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/ShippingService.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/ShippingService.java
index d915cb07..4fdfc15f 100644
--- a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/ShippingService.java
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/service/ShippingService.java
@@ -5,7 +5,6 @@
import org.springframework.stereotype.Component;
import java.util.Map;
-import java.util.UUID;
@Component
public class ShippingService {
@@ -14,8 +13,15 @@ public class ShippingService {
public Map shipOrder(Map payload) {
var orderId = (String) payload.get("orderId");
- var trackingNumber = "TRK-" + UUID.randomUUID();
+ var trackingNumber = (String) payload.get("trackingNumber");
logger.info("Shipping order {} with tracking number {}.", orderId, trackingNumber);
- return Map.of("orderId", orderId, "trackingNumber", trackingNumber);
+ return Map.of(
+ "orderId", orderId,
+ "trackingNumber", trackingNumber,
+ "originLat", payload.get("originLat"),
+ "originLng", payload.get("originLng"),
+ "destinationLat", payload.get("destinationLat"),
+ "destinationLng", payload.get("destinationLng")
+ );
}
}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/AutoPaymentRobot.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/AutoPaymentRobot.java
new file mode 100644
index 00000000..bbf7696a
--- /dev/null
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/AutoPaymentRobot.java
@@ -0,0 +1,67 @@
+package io.axoniq.demo.orderfulfillment.simulator;
+
+import io.axoniq.demo.orderfulfillment.api.InitiatingPaymentForCustomerStarted;
+import io.axoniq.demo.orderfulfillment.api.PaymentConfirmed;
+import io.axoniq.demo.orderfulfillment.projection.OrderStatus;
+import io.axoniq.demo.orderfulfillment.projection.OrderStatusProjection;
+import org.axonframework.messaging.eventhandling.annotation.EventHandler;
+import org.axonframework.messaging.eventhandling.gateway.EventGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Auto-confirms payments after a randomized delay so the demo runs end-to-end without manual
+ * clicks. Skips orders flagged with the {@code payment-timeout} scenario, letting the workflow's
+ * waitForEvent expire on purpose.
+ */
+@Component
+public class AutoPaymentRobot {
+
+ private static final Logger logger = LoggerFactory.getLogger(AutoPaymentRobot.class);
+
+ private final EventGateway eventGateway;
+ private final OrderStatusProjection projection;
+ private final ScheduledExecutorService scheduler =
+ Executors.newScheduledThreadPool(2, r -> {
+ var t = new Thread(r, "auto-payment");
+ t.setDaemon(true);
+ return t;
+ });
+
+ @Autowired
+ public AutoPaymentRobot(EventGateway eventGateway, OrderStatusProjection projection) {
+ this.eventGateway = eventGateway;
+ this.projection = projection;
+ }
+
+ @EventHandler
+ public void on(InitiatingPaymentForCustomerStarted event) {
+ var orderId = event.orderId();
+ var status = projection.findById(orderId).orElse(null);
+ var scenario = status == null ? null : status.scenario();
+ if ("payment-timeout".equals(scenario)) {
+ logger.info("Skipping auto-payment for {} — scenario forces timeout.", orderId);
+ return;
+ }
+ var delayMs = ThreadLocalRandom.current().nextInt(800, 3500);
+ scheduler.schedule(() -> confirm(orderId), delayMs, TimeUnit.MILLISECONDS);
+ }
+
+ private void confirm(String orderId) {
+ var status = projection.findById(orderId).orElse(null);
+ if (status == null || status.status() == OrderStatus.Status.FAILED) {
+ return;
+ }
+ var transactionId = "txn-" + UUID.randomUUID();
+ logger.info("Auto-confirming payment for {} ({}).", orderId, transactionId);
+ eventGateway.publish(null, new PaymentConfirmed(orderId, transactionId));
+ }
+}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Cities.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Cities.java
new file mode 100644
index 00000000..ab3db0b5
--- /dev/null
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Cities.java
@@ -0,0 +1,48 @@
+package io.axoniq.demo.orderfulfillment.simulator;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public final class Cities {
+
+ public record City(String name, double lat, double lng) {}
+
+ public static final List ALL = List.of(
+ new City("New York", 40.7128, -74.0060),
+ new City("Los Angeles", 34.0522, -118.2437),
+ new City("Chicago", 41.8781, -87.6298),
+ new City("Houston", 29.7604, -95.3698),
+ new City("Phoenix", 33.4484, -112.0740),
+ new City("Philadelphia", 39.9526, -75.1652),
+ new City("San Antonio", 29.4241, -98.4936),
+ new City("San Diego", 32.7157, -117.1611),
+ new City("Dallas", 32.7767, -96.7970),
+ new City("Austin", 30.2672, -97.7431),
+ new City("Boston", 42.3601, -71.0589),
+ new City("Seattle", 47.6062, -122.3321),
+ new City("Denver", 39.7392, -104.9903),
+ new City("Miami", 25.7617, -80.1918),
+ new City("Atlanta", 33.7490, -84.3880),
+ new City("Portland", 45.5152, -122.6784),
+ new City("Minneapolis", 44.9778, -93.2650),
+ new City("St. Louis", 38.6270, -90.1994),
+ new City("Nashville", 36.1627, -86.7816),
+ new City("New Orleans", 29.9511, -90.0715)
+ );
+
+ private Cities() {}
+
+ public static City random() {
+ return ALL.get(ThreadLocalRandom.current().nextInt(ALL.size()));
+ }
+
+ public static City[] randomPair() {
+ var rnd = ThreadLocalRandom.current();
+ var origin = ALL.get(rnd.nextInt(ALL.size()));
+ City destination;
+ do {
+ destination = ALL.get(rnd.nextInt(ALL.size()));
+ } while (destination == origin);
+ return new City[]{origin, destination};
+ }
+}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/CustomerPool.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/CustomerPool.java
new file mode 100644
index 00000000..e5aee31a
--- /dev/null
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/CustomerPool.java
@@ -0,0 +1,33 @@
+package io.axoniq.demo.orderfulfillment.simulator;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public final class CustomerPool {
+
+ public record Customer(String id, String name, String email) {}
+
+ public static final List ALL = List.of(
+ new Customer("cust-001", "Alice Johnson", "alice@example.com"),
+ new Customer("cust-002", "Bob Martinez", "bob@example.com"),
+ new Customer("cust-003", "Carla Singh", "carla@example.com"),
+ new Customer("cust-004", "Daniel Park", "daniel@example.com"),
+ new Customer("cust-005", "Elena Petrova", "elena@example.com"),
+ new Customer("cust-006", "Felix Tanaka", "felix@example.com"),
+ new Customer("cust-007", "Grace O'Connor", "grace@example.com"),
+ new Customer("cust-008", "Hassan Reyes", "hassan@example.com"),
+ new Customer("cust-009", "Iris Berg", "iris@example.com"),
+ new Customer("cust-010", "Jonas Weber", "jonas@example.com")
+ );
+
+ private CustomerPool() {}
+
+ public static Customer random() {
+ return ALL.get(ThreadLocalRandom.current().nextInt(ALL.size()));
+ }
+
+ public static double randomAmount() {
+ var rnd = ThreadLocalRandom.current();
+ return Math.round(rnd.nextDouble(15.0, 499.0) * 100.0) / 100.0;
+ }
+}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/SimulationController.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/SimulationController.java
new file mode 100644
index 00000000..df7dc19e
--- /dev/null
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/SimulationController.java
@@ -0,0 +1,45 @@
+package io.axoniq.demo.orderfulfillment.simulator;
+
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Map;
+
+@RestController
+@RequestMapping("/simulate")
+public class SimulationController {
+
+ private final Simulator simulator;
+
+ public SimulationController(Simulator simulator) {
+ this.simulator = simulator;
+ }
+
+ @PostMapping("/single")
+ public Map single(@RequestParam(value = "scenario", required = false) String scenario) {
+ var orderId = simulator.placeRandom(normalize(scenario));
+ return Map.of("orderId", orderId);
+ }
+
+ @PostMapping("/burst")
+ public Map burst(@RequestParam(value = "count", defaultValue = "10") int count,
+ @RequestParam(value = "scenario", required = false) String scenario) {
+ simulator.burst(count, normalize(scenario));
+ return Map.of("queued", count, "scenario", String.valueOf(normalize(scenario)));
+ }
+
+ @PostMapping("/scenario")
+ public Map scenario(@RequestParam("type") String type) {
+ var orderId = simulator.placeRandom(normalize(type));
+ return Map.of("orderId", orderId, "scenario", type);
+ }
+
+ private static String normalize(String scenario) {
+ if (scenario == null || scenario.isBlank() || "happy".equalsIgnoreCase(scenario)) {
+ return "happy";
+ }
+ return scenario;
+ }
+}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Simulator.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Simulator.java
new file mode 100644
index 00000000..258c8cf2
--- /dev/null
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/Simulator.java
@@ -0,0 +1,58 @@
+package io.axoniq.demo.orderfulfillment.simulator;
+
+import io.axoniq.demo.orderfulfillment.api.OrderPlaced;
+import org.axonframework.messaging.eventhandling.gateway.EventGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class Simulator {
+
+ private static final Logger logger = LoggerFactory.getLogger(Simulator.class);
+
+ private final EventGateway eventGateway;
+ private final ScheduledExecutorService scheduler =
+ Executors.newScheduledThreadPool(2, r -> {
+ var t = new Thread(r, "sim-burst");
+ t.setDaemon(true);
+ return t;
+ });
+
+ public Simulator(EventGateway eventGateway) {
+ this.eventGateway = eventGateway;
+ }
+
+ public String placeRandom(String scenario) {
+ var customer = CustomerPool.random();
+ var route = Cities.randomPair();
+ var orderId = UUID.randomUUID().toString();
+ var event = new OrderPlaced(
+ orderId,
+ customer.id(),
+ customer.email(),
+ CustomerPool.randomAmount(),
+ route[0].name(), route[0].lat(), route[0].lng(),
+ route[1].name(), route[1].lat(), route[1].lng(),
+ scenario
+ );
+ logger.info("Placing simulated order {} ({} → {}, scenario={}).",
+ orderId, route[0].name(), route[1].name(), scenario);
+ eventGateway.publish(null, event);
+ return orderId;
+ }
+
+ public void burst(int count, String scenario) {
+ var n = Math.max(1, Math.min(count, 100));
+ for (int i = 0; i < n; i++) {
+ var staggerMs = ThreadLocalRandom.current().nextInt(50, 350);
+ scheduler.schedule(() -> placeRandom(scenario), (long) i * staggerMs, TimeUnit.MILLISECONDS);
+ }
+ }
+}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/TruckMovementSimulator.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/TruckMovementSimulator.java
new file mode 100644
index 00000000..04dfd3b5
--- /dev/null
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/simulator/TruckMovementSimulator.java
@@ -0,0 +1,81 @@
+package io.axoniq.demo.orderfulfillment.simulator;
+
+import io.axoniq.demo.orderfulfillment.api.OrderDelivered;
+import io.axoniq.demo.orderfulfillment.api.ShipOrderCompleted;
+import io.axoniq.demo.orderfulfillment.api.TruckLocationUpdated;
+import org.axonframework.messaging.eventhandling.annotation.EventHandler;
+import org.axonframework.messaging.eventhandling.gateway.EventGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Animates truck movement after the workflow's {@code shipOrder} step completes. Publishes
+ * {@link TruckLocationUpdated} ticks along the great-circle-ish straight line between origin and
+ * destination, then emits a final {@link OrderDelivered} event. Decoupled from the workflow so
+ * many shipments can animate concurrently without blocking workflow threads.
+ */
+@Component
+public class TruckMovementSimulator {
+
+ private static final Logger logger = LoggerFactory.getLogger(TruckMovementSimulator.class);
+ private static final int TICKS = 30;
+ private static final long TICK_INTERVAL_MS = 400;
+
+ private final EventGateway eventGateway;
+ private final ScheduledExecutorService scheduler =
+ Executors.newScheduledThreadPool(8, r -> {
+ var t = new Thread(r, "truck-sim");
+ t.setDaemon(true);
+ return t;
+ });
+
+ public TruckMovementSimulator(EventGateway eventGateway) {
+ this.eventGateway = eventGateway;
+ }
+
+ @EventHandler
+ public void on(ShipOrderCompleted event) {
+ var jitter = ThreadLocalRandom.current().nextDouble(0.85, 1.25);
+ var orderId = event.orderId();
+ var trackingNumber = event.trackingNumber();
+ var originLat = event.originLat();
+ var originLng = event.originLng();
+ var destLat = event.destinationLat();
+ var destLng = event.destinationLng();
+
+ logger.info("Starting truck animation for order {} ({}).", orderId, trackingNumber);
+
+ var step = new AtomicInteger(1);
+ var handle = scheduler.scheduleAtFixedRate(() -> {
+ int i = step.getAndIncrement();
+ if (i > TICKS) {
+ return;
+ }
+ double t = (double) i / TICKS;
+ double lat = originLat + (destLat - originLat) * t;
+ double lng = originLng + (destLng - originLng) * t;
+ try {
+ eventGateway.publish(null, new TruckLocationUpdated(orderId, lat, lng, t));
+ } catch (Exception e) {
+ logger.warn("Failed to publish location for {}: {}", orderId, e.getMessage());
+ }
+ }, 100, (long) (TICK_INTERVAL_MS * jitter), TimeUnit.MILLISECONDS);
+
+ scheduler.schedule(() -> {
+ handle.cancel(false);
+ try {
+ eventGateway.publish(null, new OrderDelivered(orderId, trackingNumber));
+ logger.info("Order {} delivered.", orderId);
+ } catch (Exception e) {
+ logger.warn("Failed to publish delivery for {}: {}", orderId, e.getMessage());
+ }
+ }, (long) (TICK_INTERVAL_MS * jitter * (TICKS + 1)), TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/workflow/OrderFulfillmentWorkflow.java b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/workflow/OrderFulfillmentWorkflow.java
index 01d6a173..685989e6 100644
--- a/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/workflow/OrderFulfillmentWorkflow.java
+++ b/order-fulfillment-workflow/src/main/java/io/axoniq/demo/orderfulfillment/workflow/OrderFulfillmentWorkflow.java
@@ -1,6 +1,7 @@
package io.axoniq.demo.orderfulfillment.workflow;
import io.axoniq.demo.orderfulfillment.api.PaymentConfirmed;
+import io.axoniq.demo.orderfulfillment.service.FailureRecorder;
import io.axoniq.demo.orderfulfillment.service.InventoryService;
import io.axoniq.demo.orderfulfillment.service.NotificationService;
import io.axoniq.demo.orderfulfillment.service.PaymentService;
@@ -12,7 +13,9 @@
import org.springframework.stereotype.Component;
import java.time.Duration;
+import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import static io.axoniq.workflow.dsl.api.AssociationsUtils.associate;
import static io.axoniq.workflow.dsl.simple.SimpleWorkflowContext.equalsTo;
@@ -29,15 +32,18 @@ public class OrderFulfillmentWorkflow {
private final PaymentService payment;
private final ShippingService shipping;
private final NotificationService notifications;
+ private final FailureRecorder failures;
public OrderFulfillmentWorkflow(InventoryService inventory,
PaymentService payment,
ShippingService shipping,
- NotificationService notifications) {
+ NotificationService notifications,
+ FailureRecorder failures) {
this.inventory = inventory;
this.payment = payment;
this.shipping = shipping;
this.notifications = notifications;
+ this.failures = failures;
}
@Workflow(
@@ -50,24 +56,40 @@ public void execute(SimpleWorkflowContext ctx) {
var customerId = (String) ctx.workflowPayload().get("customerId");
var email = (String) ctx.workflowPayload().get("email");
var amount = ctx.workflowPayload().get("amount");
+ var scenario = (String) ctx.workflowPayload().get("scenario");
+ var originLat = ((Number) ctx.workflowPayload().get("originLat")).doubleValue();
+ var originLng = ((Number) ctx.workflowPayload().get("originLng")).doubleValue();
+ var destLat = ((Number) ctx.workflowPayload().get("destinationLat")).doubleValue();
+ var destLng = ((Number) ctx.workflowPayload().get("destinationLng")).doubleValue();
- logger.info("Order {} workflow started for customer {} (amount {}).", orderId, customerId, amount);
+ logger.info("Order {} workflow started for customer {} (amount {}, scenario {}).",
+ orderId, customerId, amount, scenario);
+
+ var paymentTimeout = "payment-timeout".equals(scenario)
+ ? Duration.ofSeconds(4)
+ : Duration.ofSeconds(45);
var paymentConfirmation = ctx.waitForEvent(
"awaitPayment",
PaymentConfirmed.class,
associate(payloadProperty("orderId"), equalsTo(orderId)),
- Duration.ofMinutes(15)
+ paymentTimeout
);
var reserved = ctx.awaitExecute(
"reserveStock",
- Map.of("customerId", customerId, "amount", amount),
+ Map.of("customerId", customerId, "amount", amount, "scenario", scenario),
Boolean.class,
inventory::reserveStock
);
if (!reserved) {
paymentConfirmation.cancel("Stock unavailable");
+ ctx.awaitExecute(
+ "recordOutOfStock",
+ Map.of("orderId", orderId, "reason", "Out of stock"),
+ Boolean.class,
+ failures::record
+ );
ctx.fail(new RuntimeException("Stock unavailable for order " + orderId));
return;
}
@@ -84,21 +106,38 @@ public void execute(SimpleWorkflowContext ctx) {
);
paymentConfirmation.await();
- var confirmation = paymentConfirmation.