From 830f586b7d0d68bf26411f7bca7b10158904c77b Mon Sep 17 00:00:00 2001 From: andreatp Date: Wed, 25 Feb 2026 17:36:03 +0000 Subject: [PATCH] Add an OpaPolicy pool --- Readme.md | 45 +++++ .../java/com/styra/opa/wasm/OpaPolicy.java | 12 ++ .../com/styra/opa/wasm/OpaPolicyPool.java | 161 +++++++++++++++++ .../com/styra/opa/wasm/OpaPolicyPoolTest.java | 168 ++++++++++++++++++ 4 files changed, 386 insertions(+) create mode 100644 core/src/main/java/com/styra/opa/wasm/OpaPolicyPool.java create mode 100644 core/src/test/java/com/styra/opa/wasm/OpaPolicyPoolTest.java diff --git a/Readme.md b/Readme.md index ef7c52c..cbf507f 100644 --- a/Readme.md +++ b/Readme.md @@ -117,6 +117,51 @@ Files.write(Paths.get("TestReadme.result"), (result + "\n").getBytes()); > [https://www.openpolicyagent.org/docs/latest/wasm/](https://www.openpolicyagent.org/docs/latest/wasm/) > for more details. +## Policy Pool + +`OpaPolicyPool` manages a bounded set of `OpaPolicy` instances for concurrent +use. It uses lock-free data structures internally, so it is safe to use with +virtual threads (no carrier-thread pinning). + +```java +import com.styra.opa.wasm.OpaPolicyPool; + +var pool = OpaPolicyPool.create( + () -> OpaPolicy.builder().withPolicy(policyWasm).build(), + 4); // at most 4 concurrent instances + +try (var loan = pool.borrow()) { // blocks if all 4 are in use + loan.policy() + .data("{\"role\": {\"alice\": \"admin\"}}") + .input("{\"user\": \"alice\"}"); + String result = loan.policy().evaluate(); +} +// policy is automatically returned to the pool + +pool.close(); +``` + +Each policy is reset to a clean state when returned to the pool (data, input +and entrypoint are cleared), so the next borrower always starts fresh. + +If a processing error may have left the policy in a bad state, call +`loan.discard()` instead of letting `close()` return it: + +```java +try (var loan = pool.borrow()) { + try { + loan.policy().data(data).input(input); + result = loan.policy().evaluate(); + } catch (RuntimeException ex) { + loan.discard(); // destroys this instance; pool will create a fresh one + // handle error ... + } +} +``` + +> **Note:** A single `OpaPolicy` instance is **not thread-safe**. +> Without the pool, use one instance per thread. + ## Builtins support: At the moment the following builtins are supported(and, by default, automatically injected when needed): diff --git a/core/src/main/java/com/styra/opa/wasm/OpaPolicy.java b/core/src/main/java/com/styra/opa/wasm/OpaPolicy.java index e08d8d6..f676754 100644 --- a/core/src/main/java/com/styra/opa/wasm/OpaPolicy.java +++ b/core/src/main/java/com/styra/opa/wasm/OpaPolicy.java @@ -180,6 +180,18 @@ public String evaluate(JsonNode input) { return evaluate(); } + /** Package-private: used by {@link OpaPolicyPool} on return. */ + void reset() { + if (dataAddr != -1) { + wasm.exports().opaValueFree(dataAddr); + } + wasm.exports().opaHeapPtrSet(baseHeapPtr); + dataHeapPtr = baseHeapPtr; + dataAddr = -1; + inputAddr = -1; + entrypoint = 0; + } + public static Builder builder() { return new Builder(); } diff --git a/core/src/main/java/com/styra/opa/wasm/OpaPolicyPool.java b/core/src/main/java/com/styra/opa/wasm/OpaPolicyPool.java new file mode 100644 index 0000000..836928c --- /dev/null +++ b/core/src/main/java/com/styra/opa/wasm/OpaPolicyPool.java @@ -0,0 +1,161 @@ +package com.styra.opa.wasm; + +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +/** + * Thread-safe pool of {@link OpaPolicy} instances. + * + *

Each {@link #borrow()} returns a {@link Loan} that must be {@linkplain Loan#close() closed} + * (ideally via try-with-resources) to return the policy to the pool. The pool caps the number of + * live instances and blocks callers when the limit is reached. + * + *

Uses only lock-free data structures and {@link Semaphore} internally — no {@code synchronized} + * blocks — so it is safe to use with virtual threads (no carrier-thread pinning). + * + *

{@code
+ * var pool = OpaPolicyPool.create(
+ *         () -> OpaPolicy.builder().withPolicy(policyBytes).build(),
+ *         4);
+ *
+ * try (var loan = pool.borrow()) {
+ *     loan.policy()
+ *         .data("{\"role\":{\"alice\":\"admin\"}}")
+ *         .input("{\"user\":\"alice\"}");
+ *     String result = loan.policy().evaluate();
+ * }
+ *
+ * pool.close();
+ * }
+ */ +public final class OpaPolicyPool implements AutoCloseable { + + private final ConcurrentLinkedDeque idle; + private final Semaphore permits; + private final Supplier factory; + private final AtomicBoolean closed = new AtomicBoolean(); + + private OpaPolicyPool(Supplier factory, int maxSize) { + this.factory = factory; + this.idle = new ConcurrentLinkedDeque<>(); + this.permits = new Semaphore(maxSize); + } + + /** + * Creates a pool that allows at most {@code maxSize} concurrent policy instances. + * + * @param factory supplies new {@link OpaPolicy} instances when the pool is empty; called + * outside any lock so it may safely do expensive work (WASM parsing, compilation) + * @param maxSize maximum number of concurrent instances + */ + public static OpaPolicyPool create(Supplier factory, int maxSize) { + if (maxSize <= 0) { + throw new IllegalArgumentException("maxSize must be positive, got: " + maxSize); + } + return new OpaPolicyPool(factory, maxSize); + } + + /** + * Borrows a policy from the pool, blocking if the pool is at capacity. + * + *

The returned {@link Loan} must be closed when done (preferably via + * try-with-resources) to return the policy to the pool. + * + * @throws InterruptedException if the calling thread is interrupted while waiting for an + * available permit + * @throws IllegalStateException if the pool has been closed + */ + public Loan borrow() throws InterruptedException { + if (closed.get()) { + throw new IllegalStateException("Pool is closed"); + } + permits.acquire(); + try { + OpaPolicy policy = idle.pollFirst(); + if (policy == null) { + policy = factory.get(); + } + return new Loan(this, policy); + } catch (RuntimeException t) { + permits.release(); + throw t; + } + } + + private void release(OpaPolicy policy) { + try { + policy.reset(); + if (!closed.get()) { + idle.offerFirst(policy); + } + } finally { + permits.release(); + } + } + + private void discard() { + permits.release(); + } + + /** + * Closes the pool. Outstanding {@link Loan}s are not forcibly closed — they will be cleaned up + * as they are returned. + */ + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + idle.clear(); + } + } + + /** + * A loan of an {@link OpaPolicy} from the pool. + * + *

Must be closed to return the policy to the pool. If a processing error leaves the policy + * in a bad state, call {@link #discard()} instead of {@link #close()} to destroy the instance + * rather than returning it. + */ + public static final class Loan implements AutoCloseable { + private final OpaPolicyPool pool; + private OpaPolicy policy; + + Loan(OpaPolicyPool pool, OpaPolicy policy) { + this.pool = pool; + this.policy = policy; + } + + /** + * Returns the borrowed policy for configuration and evaluation. + * + * @throws IllegalStateException if the loan has already been closed or discarded + */ + public OpaPolicy policy() { + if (policy == null) { + throw new IllegalStateException("Loan already returned"); + } + return policy; + } + + /** Returns the policy to the pool for reuse. */ + @Override + public void close() { + if (policy != null) { + pool.release(policy); + policy = null; + } + } + + /** + * Destroys the policy instead of returning it to the pool. Use this when a processing error + * may have left the policy in a corrupt state. + */ + public void discard() { + if (policy != null) { + pool.discard(); + policy = null; + } + } + } +} diff --git a/core/src/test/java/com/styra/opa/wasm/OpaPolicyPoolTest.java b/core/src/test/java/com/styra/opa/wasm/OpaPolicyPoolTest.java new file mode 100644 index 0000000..a01dd4b --- /dev/null +++ b/core/src/test/java/com/styra/opa/wasm/OpaPolicyPoolTest.java @@ -0,0 +1,168 @@ +package com.styra.opa.wasm; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class OpaPolicyPoolTest { + static Path wasmFile; + + @BeforeAll + public static void beforeAll() throws Exception { + wasmFile = OpaCli.compile("base", "opa/wasm/test/allowed").resolve("policy.wasm"); + } + + @Test + public void basicBorrowAndReturn() throws InterruptedException { + var pool = OpaPolicyPool.create(() -> OpaPolicy.builder().withPolicy(wasmFile).build(), 2); + + try (var loan = pool.borrow()) { + loan.policy() + .data("{ \"role\" : { \"alice\" : \"admin\" } }") + .input("{\"user\": \"alice\"}"); + assertTrue(Utils.getResult(loan.policy().evaluate()).asBoolean()); + } + + pool.close(); + } + + @Test + public void instanceIsReusedAcrossBorrows() throws InterruptedException { + var pool = OpaPolicyPool.create(() -> OpaPolicy.builder().withPolicy(wasmFile).build(), 1); + + try (var loan = pool.borrow()) { + loan.policy() + .data("{ \"role\" : { \"alice\" : \"admin\" } }") + .input("{\"user\": \"alice\"}"); + assertTrue(Utils.getResult(loan.policy().evaluate()).asBoolean()); + } + + // Second borrow should reuse the same instance; state was reset + try (var loan = pool.borrow()) { + loan.policy() + .data("{ \"role\" : { \"bob\" : \"admin\" } }") + .input("{\"user\": \"bob\"}"); + assertTrue(Utils.getResult(loan.policy().evaluate()).asBoolean()); + + // Previous data should not be visible + loan.policy().input("{\"user\": \"alice\"}"); + assertFalse(Utils.getResult(loan.policy().evaluate()).asBoolean()); + } + + pool.close(); + } + + @Test + public void concurrentAccess() throws Exception { + int poolSize = 4; + int tasks = 16; + var pool = + OpaPolicyPool.create( + () -> OpaPolicy.builder().withPolicy(wasmFile).build(), poolSize); + var barrier = new CyclicBarrier(tasks); + List errors = Collections.synchronizedList(new ArrayList<>()); + + ExecutorService executor = Executors.newFixedThreadPool(tasks); + List> futures = new ArrayList<>(); + + for (int i = 0; i < tasks; i++) { + final int idx = i; + futures.add( + executor.submit( + () -> { + try { + barrier.await(5, TimeUnit.SECONDS); + try (var loan = pool.borrow()) { + loan.policy() + .data( + "{ \"role\" : { \"user" + + idx + + "\" : \"admin\" } }") + .input("{\"user\": \"user" + idx + "\"}"); + var result = + Utils.getResult(loan.policy().evaluate()) + .asBoolean(); + if (!result) { + errors.add("task " + idx + ": expected true"); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + errors.add("task " + idx + ": " + e.getMessage()); + } catch (BrokenBarrierException + | TimeoutException + | RuntimeException e) { + errors.add("task " + idx + ": " + e.getMessage()); + } + })); + } + + for (Future f : futures) { + f.get(30, TimeUnit.SECONDS); + } + executor.shutdown(); + pool.close(); + + assertEquals(List.of(), errors); + } + + @Test + public void discardAndRecreate() throws InterruptedException { + var pool = OpaPolicyPool.create(() -> OpaPolicy.builder().withPolicy(wasmFile).build(), 1); + + try (var loan = pool.borrow()) { + loan.policy() + .data("{ \"role\" : { \"alice\" : \"admin\" } }") + .input("{\"user\": \"alice\"}"); + assertTrue(Utils.getResult(loan.policy().evaluate()).asBoolean()); + loan.discard(); + } + + // After discard, a new policy is created for the next borrow + try (var loan = pool.borrow()) { + loan.policy() + .data("{ \"role\" : { \"bob\" : \"admin\" } }") + .input("{\"user\": \"bob\"}"); + assertTrue(Utils.getResult(loan.policy().evaluate()).asBoolean()); + } + + pool.close(); + } + + @Test + public void borrowAfterCloseThrows() { + var pool = OpaPolicyPool.create(() -> OpaPolicy.builder().withPolicy(wasmFile).build(), 2); + pool.close(); + + assertThrows(IllegalStateException.class, pool::borrow); + } + + @Test + public void invalidMaxSizeThrows() { + assertThrows( + IllegalArgumentException.class, + () -> + OpaPolicyPool.create( + () -> OpaPolicy.builder().withPolicy(wasmFile).build(), 0)); + assertThrows( + IllegalArgumentException.class, + () -> + OpaPolicyPool.create( + () -> OpaPolicy.builder().withPolicy(wasmFile).build(), -1)); + } +}