diff --git a/Readme.md b/Readme.md index ef7c52c..cbf507f 100644 --- a/Readme.md +++ b/Readme.md @@ -117,6 +117,51 @@ Files.write(Paths.get("TestReadme.result"), (result + "\n").getBytes()); > [https://www.openpolicyagent.org/docs/latest/wasm/](https://www.openpolicyagent.org/docs/latest/wasm/) > for more details. +## Policy Pool + +`OpaPolicyPool` manages a bounded set of `OpaPolicy` instances for concurrent +use. It uses lock-free data structures internally, so it is safe to use with +virtual threads (no carrier-thread pinning). + +```java +import com.styra.opa.wasm.OpaPolicyPool; + +var pool = OpaPolicyPool.create( + () -> OpaPolicy.builder().withPolicy(policyWasm).build(), + 4); // at most 4 concurrent instances + +try (var loan = pool.borrow()) { // blocks if all 4 are in use + loan.policy() + .data("{\"role\": {\"alice\": \"admin\"}}") + .input("{\"user\": \"alice\"}"); + String result = loan.policy().evaluate(); +} +// policy is automatically returned to the pool + +pool.close(); +``` + +Each policy is reset to a clean state when returned to the pool (data, input +and entrypoint are cleared), so the next borrower always starts fresh. + +If a processing error may have left the policy in a bad state, call +`loan.discard()` instead of letting `close()` return it: + +```java +try (var loan = pool.borrow()) { + try { + loan.policy().data(data).input(input); + result = loan.policy().evaluate(); + } catch (RuntimeException ex) { + loan.discard(); // destroys this instance; pool will create a fresh one + // handle error ... + } +} +``` + +> **Note:** A single `OpaPolicy` instance is **not thread-safe**. +> Without the pool, use one instance per thread. + ## Builtins support: At the moment the following builtins are supported(and, by default, automatically injected when needed): diff --git a/core/src/main/java/com/styra/opa/wasm/OpaPolicy.java b/core/src/main/java/com/styra/opa/wasm/OpaPolicy.java index e08d8d6..f676754 100644 --- a/core/src/main/java/com/styra/opa/wasm/OpaPolicy.java +++ b/core/src/main/java/com/styra/opa/wasm/OpaPolicy.java @@ -180,6 +180,18 @@ public String evaluate(JsonNode input) { return evaluate(); } + /** Package-private: used by {@link OpaPolicyPool} on return. */ + void reset() { + if (dataAddr != -1) { + wasm.exports().opaValueFree(dataAddr); + } + wasm.exports().opaHeapPtrSet(baseHeapPtr); + dataHeapPtr = baseHeapPtr; + dataAddr = -1; + inputAddr = -1; + entrypoint = 0; + } + public static Builder builder() { return new Builder(); } diff --git a/core/src/main/java/com/styra/opa/wasm/OpaPolicyPool.java b/core/src/main/java/com/styra/opa/wasm/OpaPolicyPool.java new file mode 100644 index 0000000..836928c --- /dev/null +++ b/core/src/main/java/com/styra/opa/wasm/OpaPolicyPool.java @@ -0,0 +1,161 @@ +package com.styra.opa.wasm; + +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +/** + * Thread-safe pool of {@link OpaPolicy} instances. + * + *
Each {@link #borrow()} returns a {@link Loan} that must be {@linkplain Loan#close() closed} + * (ideally via try-with-resources) to return the policy to the pool. The pool caps the number of + * live instances and blocks callers when the limit is reached. + * + *
Uses only lock-free data structures and {@link Semaphore} internally — no {@code synchronized} + * blocks — so it is safe to use with virtual threads (no carrier-thread pinning). + * + *
{@code
+ * var pool = OpaPolicyPool.create(
+ * () -> OpaPolicy.builder().withPolicy(policyBytes).build(),
+ * 4);
+ *
+ * try (var loan = pool.borrow()) {
+ * loan.policy()
+ * .data("{\"role\":{\"alice\":\"admin\"}}")
+ * .input("{\"user\":\"alice\"}");
+ * String result = loan.policy().evaluate();
+ * }
+ *
+ * pool.close();
+ * }
+ */
+public final class OpaPolicyPool implements AutoCloseable {
+
+ private final ConcurrentLinkedDequeThe returned {@link Loan} must be closed when done (preferably via + * try-with-resources) to return the policy to the pool. + * + * @throws InterruptedException if the calling thread is interrupted while waiting for an + * available permit + * @throws IllegalStateException if the pool has been closed + */ + public Loan borrow() throws InterruptedException { + if (closed.get()) { + throw new IllegalStateException("Pool is closed"); + } + permits.acquire(); + try { + OpaPolicy policy = idle.pollFirst(); + if (policy == null) { + policy = factory.get(); + } + return new Loan(this, policy); + } catch (RuntimeException t) { + permits.release(); + throw t; + } + } + + private void release(OpaPolicy policy) { + try { + policy.reset(); + if (!closed.get()) { + idle.offerFirst(policy); + } + } finally { + permits.release(); + } + } + + private void discard() { + permits.release(); + } + + /** + * Closes the pool. Outstanding {@link Loan}s are not forcibly closed — they will be cleaned up + * as they are returned. + */ + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + idle.clear(); + } + } + + /** + * A loan of an {@link OpaPolicy} from the pool. + * + *
Must be closed to return the policy to the pool. If a processing error leaves the policy
+ * in a bad state, call {@link #discard()} instead of {@link #close()} to destroy the instance
+ * rather than returning it.
+ */
+ public static final class Loan implements AutoCloseable {
+ private final OpaPolicyPool pool;
+ private OpaPolicy policy;
+
+ Loan(OpaPolicyPool pool, OpaPolicy policy) {
+ this.pool = pool;
+ this.policy = policy;
+ }
+
+ /**
+ * Returns the borrowed policy for configuration and evaluation.
+ *
+ * @throws IllegalStateException if the loan has already been closed or discarded
+ */
+ public OpaPolicy policy() {
+ if (policy == null) {
+ throw new IllegalStateException("Loan already returned");
+ }
+ return policy;
+ }
+
+ /** Returns the policy to the pool for reuse. */
+ @Override
+ public void close() {
+ if (policy != null) {
+ pool.release(policy);
+ policy = null;
+ }
+ }
+
+ /**
+ * Destroys the policy instead of returning it to the pool. Use this when a processing error
+ * may have left the policy in a corrupt state.
+ */
+ public void discard() {
+ if (policy != null) {
+ pool.discard();
+ policy = null;
+ }
+ }
+ }
+}
diff --git a/core/src/test/java/com/styra/opa/wasm/OpaPolicyPoolTest.java b/core/src/test/java/com/styra/opa/wasm/OpaPolicyPoolTest.java
new file mode 100644
index 0000000..a01dd4b
--- /dev/null
+++ b/core/src/test/java/com/styra/opa/wasm/OpaPolicyPoolTest.java
@@ -0,0 +1,168 @@
+package com.styra.opa.wasm;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class OpaPolicyPoolTest {
+ static Path wasmFile;
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ wasmFile = OpaCli.compile("base", "opa/wasm/test/allowed").resolve("policy.wasm");
+ }
+
+ @Test
+ public void basicBorrowAndReturn() throws InterruptedException {
+ var pool = OpaPolicyPool.create(() -> OpaPolicy.builder().withPolicy(wasmFile).build(), 2);
+
+ try (var loan = pool.borrow()) {
+ loan.policy()
+ .data("{ \"role\" : { \"alice\" : \"admin\" } }")
+ .input("{\"user\": \"alice\"}");
+ assertTrue(Utils.getResult(loan.policy().evaluate()).asBoolean());
+ }
+
+ pool.close();
+ }
+
+ @Test
+ public void instanceIsReusedAcrossBorrows() throws InterruptedException {
+ var pool = OpaPolicyPool.create(() -> OpaPolicy.builder().withPolicy(wasmFile).build(), 1);
+
+ try (var loan = pool.borrow()) {
+ loan.policy()
+ .data("{ \"role\" : { \"alice\" : \"admin\" } }")
+ .input("{\"user\": \"alice\"}");
+ assertTrue(Utils.getResult(loan.policy().evaluate()).asBoolean());
+ }
+
+ // Second borrow should reuse the same instance; state was reset
+ try (var loan = pool.borrow()) {
+ loan.policy()
+ .data("{ \"role\" : { \"bob\" : \"admin\" } }")
+ .input("{\"user\": \"bob\"}");
+ assertTrue(Utils.getResult(loan.policy().evaluate()).asBoolean());
+
+ // Previous data should not be visible
+ loan.policy().input("{\"user\": \"alice\"}");
+ assertFalse(Utils.getResult(loan.policy().evaluate()).asBoolean());
+ }
+
+ pool.close();
+ }
+
+ @Test
+ public void concurrentAccess() throws Exception {
+ int poolSize = 4;
+ int tasks = 16;
+ var pool =
+ OpaPolicyPool.create(
+ () -> OpaPolicy.builder().withPolicy(wasmFile).build(), poolSize);
+ var barrier = new CyclicBarrier(tasks);
+ List