From be7326e4d186ca21d5b73eafda62fc96c0adfd14 Mon Sep 17 00:00:00 2001 From: Bruce Arctor <5032356+brucearctor@users.noreply.github.com> Date: Sat, 20 Jun 2026 20:40:13 -0700 Subject: [PATCH 1/2] feat: Add S3 External Storage Driver (Claim Check pattern) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements temporalio/sdk-java#2882 — porting the External Storage feature from the Go and Python SDKs to Java. Core interfaces (temporal-sdk): - StorageDriver: interface for external storage drivers - StorageDriverClaim: claim reference token stored in Event History - StorageDriverStoreContext/RetrieveContext: identity context for drivers - StorageDriverSelector: multi-driver selection strategy - ExternalStorage: configuration with builder pattern - StorageDriverException: driver error type Pipeline integration: - CodecDataConverter: wired external storage as last pipeline stage (after PayloadCodec). Payloads >= threshold are stored externally and replaced with claim tokens. Claims are auto-detected and resolved on deserialization. - WorkflowClientOptions: added setExternalStorage() builder method S3 Driver (temporal-s3-external-storage module): - S3Client: abstract interface for S3 operations (testability) - AwsSdkV2S3Client: AWS SDK v2 adapter - S3StorageDriver: content-addressed storage using SHA-256 hashing - S3StorageDriverOptions: builder-based configuration Tests: - 11 core unit tests (ExternalStorageTest) - 6 S3 driver unit tests (S3StorageDriverTest) --- settings.gradle | 1 + temporal-s3-external-storage/build.gradle | 14 ++ .../aws/s3driver/AwsSdkV2S3Client.java | 70 ++++++ .../contrib/aws/s3driver/S3Client.java | 32 +++ .../contrib/aws/s3driver/S3StorageDriver.java | 156 ++++++++++++ .../aws/s3driver/S3StorageDriverOptions.java | 106 ++++++++ .../aws/s3driver/S3StorageDriverTest.java | 111 +++++++++ .../client/WorkflowClientOptions.java | 41 +++ .../common/converter/CodecDataConverter.java | 233 +++++++++++++++++- .../common/converter/ExternalStorage.java | 223 +++++++++++++++++ .../common/converter/StorageDriver.java | 71 ++++++ .../common/converter/StorageDriverClaim.java | 56 +++++ .../converter/StorageDriverException.java | 25 ++ .../StorageDriverRetrieveContext.java | 56 +++++ .../converter/StorageDriverSelector.java | 28 +++ .../converter/StorageDriverStoreContext.java | 107 ++++++++ .../common/converter/ExternalStorageTest.java | 175 +++++++++++++ 17 files changed, 1499 insertions(+), 6 deletions(-) create mode 100644 temporal-s3-external-storage/build.gradle create mode 100644 temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java create mode 100644 temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java create mode 100644 temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java create mode 100644 temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java create mode 100644 temporal-s3-external-storage/src/test/java/io/temporal/contrib/aws/s3driver/S3StorageDriverTest.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/ExternalStorage.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverClaim.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverException.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverStoreContext.java create mode 100644 temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java diff --git a/settings.gradle b/settings.gradle index 918ceaa28e..d64929e8b0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -12,3 +12,4 @@ include 'temporal-remote-data-encoder' include 'temporal-shaded' include 'temporal-workflowcheck' include 'temporal-envconfig' +include 'temporal-s3-external-storage' diff --git a/temporal-s3-external-storage/build.gradle b/temporal-s3-external-storage/build.gradle new file mode 100644 index 0000000000..838e513fb7 --- /dev/null +++ b/temporal-s3-external-storage/build.gradle @@ -0,0 +1,14 @@ +description = '''Temporal S3 External Storage Driver''' + +ext { + awsSdkVersion = '2.25.0' +} + +dependencies { + api project(':temporal-sdk') + implementation "software.amazon.awssdk:s3:${awsSdkVersion}" + + testImplementation "junit:junit:${junitVersion}" + testImplementation "org.mockito:mockito-core:${mockitoVersion}" + testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}" +} diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java new file mode 100644 index 0000000000..2c554994c7 --- /dev/null +++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java @@ -0,0 +1,70 @@ +package io.temporal.contrib.aws.s3driver; + +import io.temporal.common.Experimental; +import io.temporal.common.converter.StorageDriverException; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +/** + * Adapter wrapping the AWS SDK for Java v2 S3 client into the {@link S3Client} interface used by + * {@link S3StorageDriver}. + * + *

Example usage: + * + *

{@code
+ * software.amazon.awssdk.services.s3.S3Client awsS3 = software.amazon.awssdk.services.s3.S3Client.builder()
+ *     .region(Region.US_EAST_1)
+ *     .build();
+ * S3Client client = new AwsSdkV2S3Client(awsS3);
+ * }
+ */ +@Experimental +public final class AwsSdkV2S3Client implements S3Client { + + private final software.amazon.awssdk.services.s3.S3Client delegate; + + /** + * Creates a new adapter wrapping the given AWS SDK v2 S3 client. + * + * @param delegate the AWS SDK v2 S3 client to delegate to + */ + public AwsSdkV2S3Client(software.amazon.awssdk.services.s3.S3Client delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate"); + } + + @Override + public void putObject(String bucket, String key, byte[] data) { + try { + delegate.putObject( + PutObjectRequest.builder().bucket(bucket).key(key).build(), RequestBody.fromBytes(data)); + } catch (Exception e) { + throw new StorageDriverException( + "Failed to upload object to S3: bucket=" + bucket + ", key=" + key, e); + } + } + + @Override + public byte[] getObject(String bucket, String key) { + try (InputStream is = + delegate.getObject(GetObjectRequest.builder().bucket(bucket).key(key).build())) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + baos.write(buffer, 0, bytesRead); + } + return baos.toByteArray(); + } catch (IOException e) { + throw new StorageDriverException( + "Failed to download object from S3: bucket=" + bucket + ", key=" + key, e); + } catch (Exception e) { + throw new StorageDriverException( + "Failed to download object from S3: bucket=" + bucket + ", key=" + key, e); + } + } +} diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java new file mode 100644 index 0000000000..cb5b5318cf --- /dev/null +++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java @@ -0,0 +1,32 @@ +package io.temporal.contrib.aws.s3driver; + +import io.temporal.common.Experimental; + +/** + * Abstraction over S3 client operations used by {@link S3StorageDriver}. This interface allows + * plugging in different S3 client implementations and enables testing with mocks. + * + * @see AwsSdkV2S3Client + */ +@Experimental +public interface S3Client { + /** + * Uploads an object to S3. + * + * @param bucket the S3 bucket name + * @param key the S3 object key + * @param data the object content + * @throws io.temporal.common.converter.StorageDriverException if the upload fails + */ + void putObject(String bucket, String key, byte[] data); + + /** + * Downloads an object from S3. + * + * @param bucket the S3 bucket name + * @param key the S3 object key + * @return the object content + * @throws io.temporal.common.converter.StorageDriverException if the download fails + */ + byte[] getObject(String bucket, String key); +} diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java new file mode 100644 index 0000000000..813df0f016 --- /dev/null +++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java @@ -0,0 +1,156 @@ +package io.temporal.contrib.aws.s3driver; + +import io.temporal.common.Experimental; +import io.temporal.common.converter.StorageDriver; +import io.temporal.common.converter.StorageDriverClaim; +import io.temporal.common.converter.StorageDriverException; +import io.temporal.common.converter.StorageDriverRetrieveContext; +import io.temporal.common.converter.StorageDriverStoreContext; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nonnull; + +/** + * {@link StorageDriver} implementation that stores payloads in Amazon S3. Uses content-addressed + * storage with SHA-256 hashing for natural deduplication. + * + *

S3 object keys are constructed as: + * + *

+ *   {keyPrefix}{namespace}/{workflowId}/{sha256-hash}
+ * 
+ * + *

Example usage: + * + *

{@code
+ * software.amazon.awssdk.services.s3.S3Client awsS3 = S3Client.builder()
+ *     .region(Region.US_EAST_1)
+ *     .build();
+ *
+ * S3StorageDriver driver = new S3StorageDriver(
+ *     S3StorageDriverOptions.newBuilder()
+ *         .setClient(new AwsSdkV2S3Client(awsS3))
+ *         .setBucket("my-temporal-payloads")
+ *         .build());
+ *
+ * ExternalStorage externalStorage = ExternalStorage.newBuilder()
+ *     .addDriver(driver)
+ *     .build();
+ * }
+ * + *

Important: Temporal does not manage the lifecycle of stored objects. Users must + * configure S3 Lifecycle Policies or similar mechanisms for cleanup. Objects should be retained at + * least for the workflow lifetime plus the namespace retention period. + */ +@Experimental +public final class S3StorageDriver implements StorageDriver { + + private static final String DRIVER_TYPE = "aws.s3driver"; + static final String CLAIM_KEY_BUCKET = "bucket"; + static final String CLAIM_KEY_OBJECT_KEY = "key"; + + private final S3Client client; + private final String bucket; + private final String keyPrefix; + private final String driverName; + + /** + * Creates a new S3 storage driver with the given options. + * + * @param options the driver configuration options + */ + public S3StorageDriver(@Nonnull S3StorageDriverOptions options) { + Objects.requireNonNull(options, "options"); + this.client = options.getClient(); + this.bucket = options.getBucket(); + this.keyPrefix = options.getKeyPrefix(); + this.driverName = options.getDriverName(); + } + + @Override + public String name() { + return driverName; + } + + @Override + public String type() { + return DRIVER_TYPE; + } + + @Override + public List store(StorageDriverStoreContext context, List payloads) + throws StorageDriverException { + List claims = new ArrayList<>(payloads.size()); + for (byte[] payload : payloads) { + String hash = sha256Hex(payload); + String objectKey = buildObjectKey(context, hash); + + client.putObject(bucket, objectKey, payload); + + Map claimData = new HashMap<>(); + claimData.put(CLAIM_KEY_BUCKET, bucket); + claimData.put(CLAIM_KEY_OBJECT_KEY, objectKey); + claims.add(new StorageDriverClaim(claimData)); + } + return claims; + } + + @Override + public List retrieve( + StorageDriverRetrieveContext context, List claims) + throws StorageDriverException { + List results = new ArrayList<>(claims.size()); + for (StorageDriverClaim claim : claims) { + String claimBucket = claim.getClaimData().get(CLAIM_KEY_BUCKET); + String objectKey = claim.getClaimData().get(CLAIM_KEY_OBJECT_KEY); + if (objectKey == null) { + throw new StorageDriverException( + "Claim is missing required '" + CLAIM_KEY_OBJECT_KEY + "' field: " + claim); + } + // Use the bucket from the claim if present, otherwise fall back to configured bucket. + String effectiveBucket = claimBucket != null ? claimBucket : bucket; + results.add(client.getObject(effectiveBucket, objectKey)); + } + return results; + } + + private String buildObjectKey(StorageDriverStoreContext context, String hash) { + StringBuilder sb = new StringBuilder(); + if (!keyPrefix.isEmpty()) { + sb.append(keyPrefix); + if (!keyPrefix.endsWith("/")) { + sb.append('/'); + } + } + sb.append(context.getNamespace()).append('/'); + sb.append(context.getWorkflowId()).append('/'); + if (context.getActivityType() != null) { + sb.append(context.getActivityType()).append('/'); + } + sb.append(hash); + return sb.toString(); + } + + static String sha256Hex(byte[] data) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(data); + StringBuilder hexString = new StringBuilder(hash.length * 2); + for (byte b : hash) { + String hex = Integer.toHexString(0xff & b); + if (hex.length() == 1) { + hexString.append('0'); + } + hexString.append(hex); + } + return hexString.toString(); + } catch (NoSuchAlgorithmException e) { + throw new StorageDriverException("SHA-256 algorithm not available", e); + } + } +} diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java new file mode 100644 index 0000000000..472f639d42 --- /dev/null +++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java @@ -0,0 +1,106 @@ +package io.temporal.contrib.aws.s3driver; + +import com.google.common.base.Preconditions; +import io.temporal.common.Experimental; +import java.util.Objects; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Configuration options for {@link S3StorageDriver}. + * + *

Example: + * + *

{@code
+ * S3StorageDriverOptions options = S3StorageDriverOptions.newBuilder()
+ *     .setClient(new AwsSdkV2S3Client(awsS3Client))
+ *     .setBucket("my-temporal-payloads")
+ *     .setKeyPrefix("temporal/payloads/")
+ *     .build();
+ * }
+ */ +@Experimental +public final class S3StorageDriverOptions { + + private static final String DEFAULT_DRIVER_NAME = "aws.s3driver"; + + private final S3Client client; + private final String bucket; + private final String keyPrefix; + private final String driverName; + + private S3StorageDriverOptions( + S3Client client, String bucket, String keyPrefix, String driverName) { + this.client = client; + this.bucket = bucket; + this.keyPrefix = keyPrefix; + this.driverName = driverName; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public S3Client getClient() { + return client; + } + + public String getBucket() { + return bucket; + } + + @Nonnull + public String getKeyPrefix() { + return keyPrefix; + } + + @Nonnull + public String getDriverName() { + return driverName; + } + + public static final class Builder { + private S3Client client; + private String bucket; + private String keyPrefix = ""; + private String driverName = DEFAULT_DRIVER_NAME; + + private Builder() {} + + /** Sets the S3 client to use for storage operations. Required. */ + public Builder setClient(@Nonnull S3Client client) { + this.client = Objects.requireNonNull(client, "client"); + return this; + } + + /** Sets the S3 bucket name. Required. */ + public Builder setBucket(@Nonnull String bucket) { + this.bucket = Objects.requireNonNull(bucket, "bucket"); + return this; + } + + /** + * Sets an optional prefix prepended to all S3 object keys. Useful for organizing payloads + * within a shared bucket. Default is empty string (no prefix). + */ + public Builder setKeyPrefix(@Nullable String keyPrefix) { + this.keyPrefix = keyPrefix == null ? "" : keyPrefix; + return this; + } + + /** + * Sets the driver instance name. This is stored in claim tokens. Default is "aws.s3driver". + * Changing this after payloads are stored will break retrieval. + */ + public Builder setDriverName(@Nonnull String driverName) { + this.driverName = Objects.requireNonNull(driverName, "driverName"); + return this; + } + + public S3StorageDriverOptions build() { + Preconditions.checkState(client != null, "S3Client must be set"); + Preconditions.checkState(bucket != null && !bucket.isEmpty(), "Bucket must be set"); + return new S3StorageDriverOptions(client, bucket, keyPrefix, driverName); + } + } +} diff --git a/temporal-s3-external-storage/src/test/java/io/temporal/contrib/aws/s3driver/S3StorageDriverTest.java b/temporal-s3-external-storage/src/test/java/io/temporal/contrib/aws/s3driver/S3StorageDriverTest.java new file mode 100644 index 0000000000..67ad920e23 --- /dev/null +++ b/temporal-s3-external-storage/src/test/java/io/temporal/contrib/aws/s3driver/S3StorageDriverTest.java @@ -0,0 +1,111 @@ +package io.temporal.contrib.aws.s3driver; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import io.temporal.common.converter.StorageDriverClaim; +import io.temporal.common.converter.StorageDriverRetrieveContext; +import io.temporal.common.converter.StorageDriverStoreContext; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class S3StorageDriverTest { + + private S3Client mockClient; + private S3StorageDriver driver; + + @Before + public void setUp() { + mockClient = mock(S3Client.class); + S3StorageDriverOptions options = + S3StorageDriverOptions.newBuilder() + .setClient(mockClient) + .setBucket("test-bucket") + .setKeyPrefix("temporal/") + .build(); + driver = new S3StorageDriver(options); + } + + @Test + public void testNameAndType() { + assertEquals("aws.s3driver", driver.name()); + assertEquals("aws.s3driver", driver.type()); + } + + @Test + public void testStoreAndRetrieve() { + byte[] payload = "test payload data".getBytes(StandardCharsets.UTF_8); + StorageDriverStoreContext storeCtx = + StorageDriverStoreContext.forWorkflow("default", "workflow-123", "run-456"); + + // Store + List claims = driver.store(storeCtx, Collections.singletonList(payload)); + assertEquals(1, claims.size()); + + StorageDriverClaim claim = claims.get(0); + assertEquals("test-bucket", claim.getClaimData().get("bucket")); + assertNotNull(claim.getClaimData().get("key")); + assertTrue(claim.getClaimData().get("key").startsWith("temporal/default/workflow-123/")); + + // Verify put was called + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(String.class); + verify(mockClient).putObject(eq("test-bucket"), keyCaptor.capture(), eq(payload)); + String storedKey = keyCaptor.getValue(); + + // Retrieve + when(mockClient.getObject("test-bucket", storedKey)).thenReturn(payload); + StorageDriverRetrieveContext retrieveCtx = + StorageDriverRetrieveContext.create("default", "workflow-123"); + List retrieved = driver.retrieve(retrieveCtx, claims); + assertEquals(1, retrieved.size()); + assertArrayEquals(payload, retrieved.get(0)); + } + + @Test + public void testContentAddressedKeys() { + byte[] payload = "same data".getBytes(StandardCharsets.UTF_8); + StorageDriverStoreContext ctx = StorageDriverStoreContext.forWorkflow("ns", "wf", null); + + // Same payload should produce same hash/key. + List claims1 = driver.store(ctx, Collections.singletonList(payload)); + List claims2 = driver.store(ctx, Collections.singletonList(payload)); + assertEquals( + claims1.get(0).getClaimData().get("key"), claims2.get(0).getClaimData().get("key")); + } + + @Test + public void testActivityContext() { + byte[] payload = "activity data".getBytes(StandardCharsets.UTF_8); + StorageDriverStoreContext ctx = + StorageDriverStoreContext.forActivity("ns", "wf", "run", "MyActivity"); + + List claims = driver.store(ctx, Collections.singletonList(payload)); + String key = claims.get(0).getClaimData().get("key"); + assertTrue("Key should contain activity type", key.contains("MyActivity/")); + } + + @Test + public void testSha256Hex() { + byte[] data = "hello".getBytes(StandardCharsets.UTF_8); + String hash = S3StorageDriver.sha256Hex(data); + assertEquals("2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824", hash); + } + + @Test + public void testCustomDriverName() { + S3StorageDriverOptions options = + S3StorageDriverOptions.newBuilder() + .setClient(mockClient) + .setBucket("bucket") + .setDriverName("my-custom-s3") + .build(); + S3StorageDriver customDriver = new S3StorageDriver(options); + assertEquals("my-custom-s3", customDriver.name()); + assertEquals("aws.s3driver", customDriver.type()); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java index ca67852751..967bf05452 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java @@ -4,6 +4,7 @@ import io.temporal.common.Experimental; import io.temporal.common.context.ContextPropagator; import io.temporal.common.converter.DataConverter; +import io.temporal.common.converter.ExternalStorage; import io.temporal.common.converter.GlobalDataConverter; import io.temporal.common.interceptors.WorkflowClientInterceptor; import java.lang.management.ManagementFactory; @@ -11,6 +12,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import javax.annotation.Nullable; /** Options for WorkflowClient configuration. */ public final class WorkflowClientOptions { @@ -43,6 +45,7 @@ public static final class Builder { private String namespace; private DataConverter dataConverter; + private ExternalStorage externalStorage; private WorkflowClientInterceptor[] interceptors; private String identity; private String binaryChecksum; @@ -58,6 +61,7 @@ private Builder(WorkflowClientOptions options) { } namespace = options.namespace; dataConverter = options.dataConverter; + externalStorage = options.externalStorage; interceptors = options.interceptors; identity = options.identity; binaryChecksum = options.binaryChecksum; @@ -82,6 +86,22 @@ public Builder setDataConverter(DataConverter dataConverter) { return this; } + /** + * Configures external storage for offloading large payloads to external storage (e.g., S3) + * using the Claim Check pattern. Payloads exceeding the configured threshold will be stored + * externally and replaced with small reference tokens in Event History. + * + *

This is completely transparent to workflow and activity business logic. + * + * @param externalStorage the external storage configuration, or null to disable + * @see ExternalStorage + */ + @Experimental + public Builder setExternalStorage(@Nullable ExternalStorage externalStorage) { + this.externalStorage = externalStorage; + return this; + } + /** * Interceptor used to intercept workflow client calls. * @@ -157,6 +177,7 @@ public WorkflowClientOptions build() { return new WorkflowClientOptions( namespace, dataConverter, + externalStorage, interceptors, identity, binaryChecksum, @@ -181,6 +202,7 @@ public WorkflowClientOptions validateAndBuildWithDefaults() { return new WorkflowClientOptions( namespace == null ? DEFAULT_NAMESPACE : namespace, dataConverter == null ? GlobalDataConverter.get() : dataConverter, + externalStorage, interceptors == null ? EMPTY_INTERCEPTOR_ARRAY : interceptors, name, binaryChecksum == null ? DEFAULT_BINARY_CHECKSUM : binaryChecksum, @@ -203,6 +225,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() { private final DataConverter dataConverter; + private final @Nullable ExternalStorage externalStorage; + private final WorkflowClientInterceptor[] interceptors; private final String identity; @@ -218,6 +242,7 @@ public WorkflowClientOptions validateAndBuildWithDefaults() { private WorkflowClientOptions( String namespace, DataConverter dataConverter, + @Nullable ExternalStorage externalStorage, WorkflowClientInterceptor[] interceptors, String identity, String binaryChecksum, @@ -226,6 +251,7 @@ private WorkflowClientOptions( WorkflowClientPlugin[] plugins) { this.namespace = namespace; this.dataConverter = dataConverter; + this.externalStorage = externalStorage; this.interceptors = interceptors; this.identity = identity; this.binaryChecksum = binaryChecksum; @@ -247,6 +273,17 @@ public DataConverter getDataConverter() { return dataConverter; } + /** + * Returns the external storage configuration, or null if external storage is not configured. + * + * @see ExternalStorage + */ + @Experimental + @Nullable + public ExternalStorage getExternalStorage() { + return externalStorage; + } + public WorkflowClientInterceptor[] getInterceptors() { return interceptors; } @@ -297,6 +334,8 @@ public String toString() { + '\'' + ", dataConverter=" + dataConverter + + ", externalStorage=" + + externalStorage + ", interceptors=" + Arrays.toString(interceptors) + ", identity='" @@ -321,6 +360,7 @@ public boolean equals(Object o) { WorkflowClientOptions that = (WorkflowClientOptions) o; return com.google.common.base.Objects.equal(namespace, that.namespace) && com.google.common.base.Objects.equal(dataConverter, that.dataConverter) + && com.google.common.base.Objects.equal(externalStorage, that.externalStorage) && Arrays.equals(interceptors, that.interceptors) && com.google.common.base.Objects.equal(identity, that.identity) && com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum) @@ -334,6 +374,7 @@ public int hashCode() { return com.google.common.base.Objects.hashCode( namespace, dataConverter, + externalStorage, Arrays.hashCode(interceptors), identity, binaryChecksum, diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java b/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java index a82172348b..69840acc34 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.Payloads; import io.temporal.api.failure.v1.ApplicationFailureInfo; @@ -11,11 +12,17 @@ import io.temporal.api.failure.v1.TimeoutFailureInfo; import io.temporal.payload.codec.ChainCodec; import io.temporal.payload.codec.PayloadCodec; +import io.temporal.payload.context.ActivitySerializationContext; +import io.temporal.payload.context.HasWorkflowSerializationContext; import io.temporal.payload.context.SerializationContext; import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -31,9 +38,23 @@ public class CodecDataConverter implements DataConverter, PayloadCodec { private static final String ENCODED_FAILURE_MESSAGE = "Encoded failure"; + /** + * Metadata encoding value for claim check payloads stored in external storage. When a payload is + * stored externally, the payload in Event History has this encoding and contains the serialized + * claim data. + */ + static final String EXTERNAL_STORAGE_ENCODING_NAME = "temporal-external-storage/claim"; + + static final ByteString EXTERNAL_STORAGE_ENCODING = + ByteString.copyFrom(EXTERNAL_STORAGE_ENCODING_NAME, StandardCharsets.UTF_8); + + /** Metadata key for the driver name in a claim payload. */ + static final String CLAIM_DRIVER_NAME_KEY = "driver-name"; + private final DataConverter dataConverter; private final ChainCodec chainCodec; private final boolean encodeFailureAttributes; + private final @Nullable ExternalStorage externalStorage; private final @Nullable SerializationContext serializationContext; /** @@ -95,17 +116,39 @@ public CodecDataConverter( DataConverter dataConverter, Collection codecs, boolean encodeFailureAttributes) { - this(dataConverter, new ChainCodec(codecs), encodeFailureAttributes, null); + this(dataConverter, new ChainCodec(codecs), encodeFailureAttributes, null, null); + } + + /** + * Creates a {@link CodecDataConverter} with external storage support. + * + *

When {@code externalStorage} is configured, payloads exceeding the configured size threshold + * are offloaded to external storage after codec encoding. On deserialization, claim tokens are + * detected and the original payloads are retrieved from external storage before codec decoding. + * + * @param dataConverter to delegate data conversion to + * @param codecs to delegate bytes encoding/decoding to + * @param encodeFailureAttributes enable encoding of Failure attributes + * @param externalStorage optional external storage configuration + */ + public CodecDataConverter( + DataConverter dataConverter, + Collection codecs, + boolean encodeFailureAttributes, + @Nullable ExternalStorage externalStorage) { + this(dataConverter, new ChainCodec(codecs), encodeFailureAttributes, externalStorage, null); } CodecDataConverter( DataConverter dataConverter, ChainCodec codecs, boolean encodeFailureAttributes, + @Nullable ExternalStorage externalStorage, @Nullable SerializationContext serializationContext) { this.dataConverter = dataConverter; this.chainCodec = codecs; this.encodeFailureAttributes = encodeFailureAttributes; + this.externalStorage = externalStorage; this.serializationContext = serializationContext; } @@ -117,14 +160,15 @@ public Optional toPayload(T value) { ConverterUtils.withContext(chainCodec, serializationContext) .encode(Collections.singletonList(payload.get())); Preconditions.checkState(encodedPayloads.size() == 1, "Expected one encoded payload"); + encodedPayloads = storeIfNeeded(encodedPayloads); return Optional.of(encodedPayloads.get(0)); } @Override public T fromPayload(Payload payload, Class valueClass, Type valueType) { + List payloads = retrieveIfNeeded(Collections.singletonList(payload)); List decodedPayload = - ConverterUtils.withContext(chainCodec, serializationContext) - .decode(Collections.singletonList(payload)); + ConverterUtils.withContext(chainCodec, serializationContext).decode(payloads); Preconditions.checkState(decodedPayload.size() == 1, "Expected one decoded payload"); return ConverterUtils.withContext(dataConverter, serializationContext) .fromPayload(decodedPayload.get(0), valueClass, valueType); @@ -138,6 +182,7 @@ public Optional toPayloads(Object... values) throws DataConverterExcep List encodedPayloads = ConverterUtils.withContext(chainCodec, serializationContext) .encode(payloads.get().getPayloadsList()); + encodedPayloads = storeIfNeeded(encodedPayloads); payloads = Optional.of(Payloads.newBuilder().addAllPayloads(encodedPayloads).build()); } return payloads; @@ -148,7 +193,7 @@ public T fromPayloads( int index, Optional content, Class valueType, Type valueGenericType) throws DataConverterException { if (content.isPresent()) { - content = Optional.of(decodePayloads(content.get())); + content = Optional.of(retrieveAndDecodePayloads(content.get())); } return ConverterUtils.withContext(dataConverter, serializationContext) .fromPayloads(index, content, valueType, valueGenericType); @@ -159,7 +204,7 @@ public Object[] fromPayloads( Optional content, Class[] parameterTypes, Type[] genericParameterTypes) throws DataConverterException { if (content.isPresent()) { - content = Optional.of(decodePayloads(content.get())); + content = Optional.of(retrieveAndDecodePayloads(content.get())); } return ConverterUtils.withContext(dataConverter, serializationContext) .fromPayloads(content, parameterTypes, genericParameterTypes); @@ -187,7 +232,8 @@ public RuntimeException failureToException(@Nonnull Failure failure) { @Nonnull @Override public CodecDataConverter withContext(@Nonnull SerializationContext context) { - return new CodecDataConverter(dataConverter, chainCodec, encodeFailureAttributes, context); + return new CodecDataConverter( + dataConverter, chainCodec, encodeFailureAttributes, externalStorage, context); } @Nonnull @@ -327,6 +373,181 @@ private Payloads decodePayloads(Payloads encodedPayloads) { return Payloads.newBuilder().addAllPayloads(decode(encodedPayloads.getPayloadsList())).build(); } + /** + * Retrieves externally stored payloads first, then decodes them through the codec chain. This + * replaces the previous {@code decodePayloads} in the fromPayloads path. + */ + private Payloads retrieveAndDecodePayloads(Payloads payloads) { + List retrieved = retrieveIfNeeded(payloads.getPayloadsList()); + List decoded = + ConverterUtils.withContext(chainCodec, serializationContext).decode(retrieved); + return Payloads.newBuilder().addAllPayloads(decoded).build(); + } + + // ---- External Storage Helpers ---- + + /** + * Checks each encoded payload against the external storage threshold. Payloads exceeding the + * threshold are uploaded to external storage and replaced with claim token payloads. + */ + private List storeIfNeeded(List encodedPayloads) { + if (externalStorage == null) { + return encodedPayloads; + } + + StorageDriverStoreContext storeContext = buildStoreContext(); + if (storeContext == null) { + // No serialization context available; cannot construct store context. + return encodedPayloads; + } + + StorageDriver driver = externalStorage.selectDriver(storeContext); + if (driver == null) { + return encodedPayloads; + } + + int threshold = externalStorage.getPayloadSizeThreshold(); + List result = new ArrayList<>(encodedPayloads.size()); + for (Payload payload : encodedPayloads) { + if (payload.getSerializedSize() >= threshold) { + result.add(storePayload(driver, storeContext, payload)); + } else { + result.add(payload); + } + } + return result; + } + + /** Stores a single payload externally and returns a claim token payload. */ + private Payload storePayload( + StorageDriver driver, StorageDriverStoreContext storeContext, Payload payload) { + byte[] payloadBytes = payload.toByteArray(); + List claims = + driver.store(storeContext, Collections.singletonList(payloadBytes)); + Preconditions.checkState(claims.size() == 1, "Expected one claim from store"); + return claimToPayload(driver.name(), claims.get(0)); + } + + /** + * Checks each payload for claim tokens and retrieves the original payloads from external storage. + */ + private List retrieveIfNeeded(List payloads) { + if (externalStorage == null) { + return payloads; + } + + boolean hasClaims = false; + for (Payload payload : payloads) { + if (isClaimPayload(payload)) { + hasClaims = true; + break; + } + } + if (!hasClaims) { + return payloads; + } + + StorageDriverRetrieveContext retrieveContext = buildRetrieveContext(); + List result = new ArrayList<>(payloads.size()); + for (Payload payload : payloads) { + if (isClaimPayload(payload)) { + result.add(retrievePayload(retrieveContext, payload)); + } else { + result.add(payload); + } + } + return result; + } + + /** + * Retrieves a single payload from external storage using the claim information in the payload. + */ + private Payload retrievePayload( + StorageDriverRetrieveContext retrieveContext, Payload claimPayload) { + String driverName = getClaimDriverName(claimPayload); + StorageDriverClaim claim = payloadToClaim(claimPayload); + StorageDriver driver = externalStorage.findDriverByName(driverName); + List retrieved = driver.retrieve(retrieveContext, Collections.singletonList(claim)); + Preconditions.checkState(retrieved.size() == 1, "Expected one payload from retrieve"); + try { + return Payload.parseFrom(retrieved.get(0)); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw new StorageDriverException("Failed to parse retrieved payload", e); + } + } + + /** Checks if a payload is a claim token by inspecting its encoding metadata. */ + static boolean isClaimPayload(Payload payload) { + ByteString encoding = + payload.getMetadataOrDefault(EncodingKeys.METADATA_ENCODING_KEY, ByteString.EMPTY); + return EXTERNAL_STORAGE_ENCODING.equals(encoding); + } + + /** + * Serializes a claim into a Payload with the external storage encoding. The claim data map is + * stored as metadata entries and the payload body is empty. + */ + static Payload claimToPayload(String driverName, StorageDriverClaim claim) { + Payload.Builder builder = Payload.newBuilder(); + builder.putMetadata(EncodingKeys.METADATA_ENCODING_KEY, EXTERNAL_STORAGE_ENCODING); + builder.putMetadata(CLAIM_DRIVER_NAME_KEY, ByteString.copyFromUtf8(driverName)); + for (Map.Entry entry : claim.getClaimData().entrySet()) { + builder.putMetadata(entry.getKey(), ByteString.copyFromUtf8(entry.getValue())); + } + return builder.build(); + } + + /** Deserializes a claim from a claim token payload's metadata. */ + static StorageDriverClaim payloadToClaim(Payload claimPayload) { + Map claimData = new HashMap<>(); + for (Map.Entry entry : claimPayload.getMetadataMap().entrySet()) { + String key = entry.getKey(); + // Skip the encoding and driver name metadata keys — they are not part of claim data. + if (EncodingKeys.METADATA_ENCODING_KEY.equals(key) || CLAIM_DRIVER_NAME_KEY.equals(key)) { + continue; + } + claimData.put(key, entry.getValue().toStringUtf8()); + } + return new StorageDriverClaim(claimData); + } + + /** Extracts the driver name from a claim token payload. */ + static String getClaimDriverName(Payload claimPayload) { + ByteString driverNameBytes = + claimPayload.getMetadataOrDefault(CLAIM_DRIVER_NAME_KEY, ByteString.EMPTY); + if (driverNameBytes.isEmpty()) { + throw new StorageDriverException( + "Claim payload is missing '" + CLAIM_DRIVER_NAME_KEY + "' metadata"); + } + return driverNameBytes.toStringUtf8(); + } + + @Nullable + private StorageDriverStoreContext buildStoreContext() { + if (serializationContext instanceof ActivitySerializationContext) { + ActivitySerializationContext actCtx = (ActivitySerializationContext) serializationContext; + return StorageDriverStoreContext.forActivity( + actCtx.getNamespace(), actCtx.getWorkflowId(), null, actCtx.getActivityType()); + } else if (serializationContext instanceof HasWorkflowSerializationContext) { + HasWorkflowSerializationContext wfCtx = + (HasWorkflowSerializationContext) serializationContext; + return StorageDriverStoreContext.forWorkflow( + wfCtx.getNamespace(), wfCtx.getWorkflowId(), null); + } + // No context available — fall back to a default context. + return StorageDriverStoreContext.forWorkflow("unknown", "unknown", null); + } + + @Nonnull + private StorageDriverRetrieveContext buildRetrieveContext() { + if (serializationContext instanceof HasWorkflowSerializationContext) { + HasWorkflowSerializationContext wfCtx = + (HasWorkflowSerializationContext) serializationContext; + return StorageDriverRetrieveContext.create(wfCtx.getNamespace(), wfCtx.getWorkflowId()); + } + return StorageDriverRetrieveContext.create(null, null); + } + static class EncodedAttributes { private String message; diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/ExternalStorage.java b/temporal-sdk/src/main/java/io/temporal/common/converter/ExternalStorage.java new file mode 100644 index 0000000000..13787771e1 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/ExternalStorage.java @@ -0,0 +1,223 @@ +package io.temporal.common.converter; + +import com.google.common.base.Preconditions; +import io.temporal.common.Experimental; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Configuration for external storage that offloads large payloads to an external store (e.g., S3) + * using the Claim Check pattern. Payloads exceeding the {@link #getPayloadSizeThreshold()} are + * stored externally and replaced with a small {@link StorageDriverClaim} reference token in Event + * History. + * + *

External storage is the last stage of the data conversion pipeline, running after + * {@link io.temporal.payload.codec.PayloadCodec} (compression/encryption): + * + *

+ *   PayloadConverter → PayloadCodec → ExternalStorage
+ * 
+ * + *

This is completely transparent to workflow and activity business logic. + * + *

Example usage: + * + *

{@code
+ * ExternalStorage externalStorage = ExternalStorage.newBuilder()
+ *     .addDriver(s3Driver)
+ *     .setPayloadSizeThreshold(256 * 1024) // 256 KiB
+ *     .build();
+ * }
+ * + * @see StorageDriver + */ +@Experimental +public final class ExternalStorage { + + /** Default payload size threshold: 256 KiB (262144 bytes). */ + public static final int DEFAULT_PAYLOAD_SIZE_THRESHOLD = 256 * 1024; + + private final List drivers; + private final @Nullable StorageDriverSelector driverSelector; + private final int payloadSizeThreshold; + + private ExternalStorage( + List drivers, + @Nullable StorageDriverSelector driverSelector, + int payloadSizeThreshold) { + this.drivers = Collections.unmodifiableList(new ArrayList<>(drivers)); + this.driverSelector = driverSelector; + this.payloadSizeThreshold = payloadSizeThreshold; + } + + /** Creates a new builder for {@link ExternalStorage}. */ + public static Builder newBuilder() { + return new Builder(); + } + + /** Returns the list of registered storage drivers. */ + @Nonnull + public List getDrivers() { + return drivers; + } + + /** + * Returns the driver selector, or null if only one driver is registered. Required when multiple + * drivers are configured. + */ + @Nullable + public StorageDriverSelector getDriverSelector() { + return driverSelector; + } + + /** + * Returns the payload size threshold in bytes. Payloads with serialized size greater than or + * equal to this threshold will be offloaded to external storage. + * + *

Default is {@link #DEFAULT_PAYLOAD_SIZE_THRESHOLD} (256 KiB). Set to 1 to externalize all + * payloads. + */ + public int getPayloadSizeThreshold() { + return payloadSizeThreshold; + } + + /** + * Selects the appropriate driver for the given store context. If only one driver is registered, + * returns that driver. Otherwise, delegates to the configured {@link StorageDriverSelector}. + * + * @param context the store context + * @return the selected driver, or null if the payload should remain inline + */ + @Nullable + StorageDriver selectDriver(StorageDriverStoreContext context) { + if (drivers.size() == 1) { + return drivers.get(0); + } + if (driverSelector != null) { + return driverSelector.select(context, drivers); + } + // Should not happen if builder validates correctly. + return drivers.get(0); + } + + /** + * Finds a driver by name for retrieval operations. + * + * @param driverName the driver name stored in the claim + * @return the matching driver + * @throws StorageDriverException if no driver with the given name is found + */ + @Nonnull + StorageDriver findDriverByName(String driverName) { + for (StorageDriver driver : drivers) { + if (driver.name().equals(driverName)) { + return driver; + } + } + throw new StorageDriverException( + "No storage driver found with name '" + + driverName + + "'. Registered drivers: " + + driverNames()); + } + + private String driverNames() { + StringBuilder sb = new StringBuilder("["); + for (int i = 0; i < drivers.size(); i++) { + if (i > 0) sb.append(", "); + sb.append(drivers.get(i).name()); + } + sb.append("]"); + return sb.toString(); + } + + @Override + public String toString() { + return "ExternalStorage{" + + "drivers=" + + driverNames() + + ", payloadSizeThreshold=" + + payloadSizeThreshold + + '}'; + } + + /** Builder for {@link ExternalStorage}. */ + public static final class Builder { + private final List drivers = new ArrayList<>(); + private StorageDriverSelector driverSelector; + private int payloadSizeThreshold = DEFAULT_PAYLOAD_SIZE_THRESHOLD; + + private Builder() {} + + /** + * Adds a storage driver. + * + * @param driver the driver to add + * @return this builder + */ + public Builder addDriver(@Nonnull StorageDriver driver) { + this.drivers.add(Preconditions.checkNotNull(driver, "driver")); + return this; + } + + /** + * Sets the list of storage drivers, replacing any previously added. + * + * @param drivers the drivers to use + * @return this builder + */ + public Builder setDrivers(@Nonnull List drivers) { + this.drivers.clear(); + this.drivers.addAll(Preconditions.checkNotNull(drivers, "drivers")); + return this; + } + + /** + * Sets the driver selector for choosing which driver stores a payload. Required when multiple + * drivers are registered. + * + * @param driverSelector the selector + * @return this builder + */ + public Builder setDriverSelector(@Nullable StorageDriverSelector driverSelector) { + this.driverSelector = driverSelector; + return this; + } + + /** + * Sets the payload size threshold in bytes. Payloads with serialized size greater than or equal + * to this value will be offloaded to external storage. + * + *

Default is {@link #DEFAULT_PAYLOAD_SIZE_THRESHOLD} (256 KiB). Set to 1 to externalize all + * payloads. + * + * @param payloadSizeThreshold threshold in bytes, must be positive + * @return this builder + */ + public Builder setPayloadSizeThreshold(int payloadSizeThreshold) { + Preconditions.checkArgument( + payloadSizeThreshold > 0, + "payloadSizeThreshold must be positive, got %s", + payloadSizeThreshold); + this.payloadSizeThreshold = payloadSizeThreshold; + return this; + } + + /** + * Builds the {@link ExternalStorage} configuration. + * + * @throws IllegalStateException if no drivers are configured or if multiple drivers are + * configured without a selector + */ + public ExternalStorage build() { + Preconditions.checkState(!drivers.isEmpty(), "At least one StorageDriver must be configured"); + Preconditions.checkState( + drivers.size() <= 1 || driverSelector != null, + "A StorageDriverSelector is required when multiple drivers are configured"); + return new ExternalStorage(drivers, driverSelector, payloadSizeThreshold); + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java new file mode 100644 index 0000000000..f8947b7c82 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java @@ -0,0 +1,71 @@ +package io.temporal.common.converter; + +import io.temporal.common.Experimental; +import java.util.List; + +/** + * Interface for external storage driver implementations. A storage driver is responsible for + * uploading large payloads to external storage (e.g., S3, GCS) and retrieving them using claim + * references. + * + *

Implementations must be thread-safe as the SDK may call {@link #store} and {@link #retrieve} + * concurrently from multiple threads. + * + *

The driver's {@link #name()} is stored in claim tokens persisted in workflow history. Changing + * a driver's name after payloads have been stored will break retrieval of those payloads. + * + * @see ExternalStorage + * @see StorageDriverClaim + */ +@Experimental +public interface StorageDriver { + + /** + * Returns a unique string that identifies this specific driver instance. This name is stored in + * the claim reference in workflow history to route retrieval requests to the correct driver. + * + *

Critical: Changing this after payloads are stored will break retrieval. + * + * @return unique driver instance name, never null + */ + String name(); + + /** + * Returns a string identifying the driver implementation type (e.g., "aws.s3driver"). Must be the + * same across all instances of the same driver type. + * + * @return driver type identifier, never null + */ + String type(); + + /** + * Uploads payloads to external storage and returns a {@link StorageDriverClaim} for each payload. + * The claims contain addressing information needed to retrieve the data later. + * + *

Each element in the returned list corresponds to the payload at the same index in the input + * list. + * + * @param context context with identity information about the workflow/activity owning the + * payloads + * @param payloads serialized payload bytes to store; each byte array is one serialized Payload + * protobuf + * @return list of claims, one per input payload, in the same order + * @throws StorageDriverException if the store operation fails + */ + List store(StorageDriverStoreContext context, List payloads) + throws StorageDriverException; + + /** + * Downloads payloads from external storage using the claim references produced by {@link #store}. + * + *

Each element in the returned list corresponds to the claim at the same index in the input + * list. + * + * @param context context with identity information about the retrieval environment + * @param claims list of claims identifying the stored payloads + * @return list of serialized payload bytes, one per input claim, in the same order + * @throws StorageDriverException if the retrieve operation fails + */ + List retrieve(StorageDriverRetrieveContext context, List claims) + throws StorageDriverException; +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverClaim.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverClaim.java new file mode 100644 index 0000000000..2dc18dd089 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverClaim.java @@ -0,0 +1,56 @@ +package io.temporal.common.converter; + +import io.temporal.common.Experimental; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * A claim reference token that replaces large payloads in Event History when using external + * storage. The claim contains an opaque map of key-value pairs that the {@link StorageDriver} uses + * to locate the stored payload. The SDK persists this in workflow history and does not interpret + * the contents. + * + *

For example, an S3 driver might store {@code {"key": ""}} in the claim data. + */ +@Experimental +public final class StorageDriverClaim { + + private final Map claimData; + + /** + * Creates a new claim with the given claim data. + * + * @param claimData opaque map of key-value pairs used by the driver to locate the payload + */ + public StorageDriverClaim(Map claimData) { + this.claimData = Collections.unmodifiableMap(Objects.requireNonNull(claimData, "claimData")); + } + + /** + * Returns the opaque map of key-value pairs that the driver uses to locate the stored payload. + * + * @return unmodifiable map of claim data + */ + public Map getClaimData() { + return claimData; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StorageDriverClaim that = (StorageDriverClaim) o; + return Objects.equals(claimData, that.claimData); + } + + @Override + public int hashCode() { + return Objects.hash(claimData); + } + + @Override + public String toString() { + return "StorageDriverClaim{" + "claimData=" + claimData + '}'; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverException.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverException.java new file mode 100644 index 0000000000..04bb327290 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverException.java @@ -0,0 +1,25 @@ +package io.temporal.common.converter; + +import io.temporal.common.Experimental; + +/** + * Exception thrown by {@link StorageDriver} implementations when store or retrieve operations fail. + * + *

This exception wraps underlying storage errors (e.g., network failures, permission errors) + * into a common type that the SDK can handle uniformly. + */ +@Experimental +public class StorageDriverException extends RuntimeException { + + public StorageDriverException(String message) { + super(message); + } + + public StorageDriverException(String message, Throwable cause) { + super(message, cause); + } + + public StorageDriverException(Throwable cause) { + super(cause); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java new file mode 100644 index 0000000000..a81f1c8ac2 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java @@ -0,0 +1,56 @@ +package io.temporal.common.converter; + +import io.temporal.common.Experimental; +import javax.annotation.Nullable; + +/** + * Context passed to {@link StorageDriver#retrieve} providing identity information about the + * retrieval environment. This may be used by drivers for logging, metrics, or access control. + */ +@Experimental +public final class StorageDriverRetrieveContext { + + private final @Nullable String namespace; + private final @Nullable String workflowId; + + private StorageDriverRetrieveContext(@Nullable String namespace, @Nullable String workflowId) { + this.namespace = namespace; + this.workflowId = workflowId; + } + + /** + * Creates a retrieve context with available identity information. + * + * @param namespace the namespace, may be null if not available + * @param workflowId the workflow execution ID, may be null if not available + * @return a new retrieve context + */ + public static StorageDriverRetrieveContext create( + @Nullable String namespace, @Nullable String workflowId) { + return new StorageDriverRetrieveContext(namespace, workflowId); + } + + /** Returns the namespace, or null if not available in the retrieval context. */ + @Nullable + public String getNamespace() { + return namespace; + } + + /** Returns the workflow execution ID, or null if not available in the retrieval context. */ + @Nullable + public String getWorkflowId() { + return workflowId; + } + + @Override + public String toString() { + return "StorageDriverRetrieveContext{" + + "namespace='" + + namespace + + '\'' + + ", workflowId='" + + workflowId + + '\'' + + '}'; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java new file mode 100644 index 0000000000..69cdb54dae --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java @@ -0,0 +1,28 @@ +package io.temporal.common.converter; + +import io.temporal.common.Experimental; +import java.util.List; + +/** + * Functional interface for selecting which {@link StorageDriver} should be used to store a specific + * payload. Required when multiple drivers are registered in {@link ExternalStorage}. + * + *

Drivers not selected for storage remain available for retrieval, which supports migration + * scenarios (e.g., switching from one S3 bucket to another). + * + * @see ExternalStorage + */ +@Experimental +@FunctionalInterface +public interface StorageDriverSelector { + + /** + * Selects a storage driver for the given store context. + * + * @param context context with identity information about the workflow/activity owning the payload + * @param drivers the list of available drivers + * @return the selected driver to use for storage, or null to keep the payload inline (not + * externalized) + */ + StorageDriver select(StorageDriverStoreContext context, List drivers); +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverStoreContext.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverStoreContext.java new file mode 100644 index 0000000000..a1da82c920 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverStoreContext.java @@ -0,0 +1,107 @@ +package io.temporal.common.converter; + +import io.temporal.common.Experimental; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Context passed to {@link StorageDriver#store} providing identity information about the workflow + * or activity that owns the payload being stored. Drivers can use this context to organize stored + * objects (e.g., by namespace and workflow ID in the object key path). + */ +@Experimental +public final class StorageDriverStoreContext { + + private final @Nonnull String namespace; + private final @Nonnull String workflowId; + private final @Nullable String runId; + private final @Nullable String activityType; + + private StorageDriverStoreContext( + @Nonnull String namespace, + @Nonnull String workflowId, + @Nullable String runId, + @Nullable String activityType) { + this.namespace = namespace; + this.workflowId = workflowId; + this.runId = runId; + this.activityType = activityType; + } + + /** + * Creates a store context for a workflow-level payload. + * + * @param namespace the namespace the workflow belongs to + * @param workflowId the workflow execution ID + * @param runId the workflow run ID, may be null + * @return a new store context + */ + public static StorageDriverStoreContext forWorkflow( + @Nonnull String namespace, @Nonnull String workflowId, @Nullable String runId) { + return new StorageDriverStoreContext(namespace, workflowId, runId, null); + } + + /** + * Creates a store context for an activity-level payload. + * + * @param namespace the namespace the workflow belongs to + * @param workflowId the workflow execution ID + * @param runId the workflow run ID, may be null + * @param activityType the activity type name + * @return a new store context + */ + public static StorageDriverStoreContext forActivity( + @Nonnull String namespace, + @Nonnull String workflowId, + @Nullable String runId, + @Nonnull String activityType) { + return new StorageDriverStoreContext(namespace, workflowId, runId, activityType); + } + + /** Returns the namespace the workflow execution belongs to. */ + @Nonnull + public String getNamespace() { + return namespace; + } + + /** Returns the workflow execution ID. */ + @Nonnull + public String getWorkflowId() { + return workflowId; + } + + /** Returns the workflow run ID, or null if not available. */ + @Nullable + public String getRunId() { + return runId; + } + + /** Returns the activity type name, or null if this is a workflow-level context. */ + @Nullable + public String getActivityType() { + return activityType; + } + + /** Returns true if this context is for an activity payload (vs. a workflow payload). */ + public boolean isActivityContext() { + return activityType != null; + } + + @Override + public String toString() { + return "StorageDriverStoreContext{" + + "namespace='" + + namespace + + '\'' + + ", workflowId='" + + workflowId + + '\'' + + ", runId='" + + runId + + '\'' + + ", activityType='" + + activityType + + '\'' + + '}'; + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java b/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java new file mode 100644 index 0000000000..2ece09de29 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java @@ -0,0 +1,175 @@ +package io.temporal.common.converter; + +import static org.junit.Assert.*; + +import io.temporal.api.common.v1.Payload; +import java.util.*; +import org.junit.Test; + +public class ExternalStorageTest { + + @Test + public void testBuilderDefaults() { + StorageDriver driver = new InMemoryStorageDriver("test"); + ExternalStorage es = ExternalStorage.newBuilder().addDriver(driver).build(); + assertEquals(ExternalStorage.DEFAULT_PAYLOAD_SIZE_THRESHOLD, es.getPayloadSizeThreshold()); + assertEquals(1, es.getDrivers().size()); + assertNull(es.getDriverSelector()); + } + + @Test + public void testCustomThreshold() { + StorageDriver driver = new InMemoryStorageDriver("test"); + ExternalStorage es = + ExternalStorage.newBuilder().addDriver(driver).setPayloadSizeThreshold(1024).build(); + assertEquals(1024, es.getPayloadSizeThreshold()); + } + + @Test(expected = IllegalStateException.class) + public void testNoDriversFails() { + ExternalStorage.newBuilder().build(); + } + + @Test(expected = IllegalStateException.class) + public void testMultiDriversWithoutSelectorFails() { + ExternalStorage.newBuilder() + .addDriver(new InMemoryStorageDriver("a")) + .addDriver(new InMemoryStorageDriver("b")) + .build(); + } + + @Test + public void testMultiDriversWithSelector() { + StorageDriver driverA = new InMemoryStorageDriver("a"); + StorageDriver driverB = new InMemoryStorageDriver("b"); + ExternalStorage es = + ExternalStorage.newBuilder() + .addDriver(driverA) + .addDriver(driverB) + .setDriverSelector((context, drivers) -> drivers.get(0)) + .build(); + assertEquals(2, es.getDrivers().size()); + } + + @Test + public void testSelectDriverSingleDriver() { + StorageDriver driver = new InMemoryStorageDriver("test"); + ExternalStorage es = ExternalStorage.newBuilder().addDriver(driver).build(); + StorageDriverStoreContext ctx = StorageDriverStoreContext.forWorkflow("ns", "wf", null); + assertSame(driver, es.selectDriver(ctx)); + } + + @Test + public void testFindDriverByName() { + StorageDriver driverA = new InMemoryStorageDriver("a"); + StorageDriver driverB = new InMemoryStorageDriver("b"); + ExternalStorage es = + ExternalStorage.newBuilder() + .addDriver(driverA) + .addDriver(driverB) + .setDriverSelector((context, drivers) -> drivers.get(0)) + .build(); + assertSame(driverB, es.findDriverByName("b")); + } + + @Test(expected = StorageDriverException.class) + public void testFindDriverByNameNotFound() { + StorageDriver driver = new InMemoryStorageDriver("test"); + ExternalStorage es = ExternalStorage.newBuilder().addDriver(driver).build(); + es.findDriverByName("nonexistent"); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidThreshold() { + ExternalStorage.newBuilder() + .addDriver(new InMemoryStorageDriver("test")) + .setPayloadSizeThreshold(0); + } + + @Test + public void testClaimToPayloadAndBack() { + Map data = new HashMap<>(); + data.put("bucket", "my-bucket"); + data.put("key", "some/object/key"); + StorageDriverClaim claim = new StorageDriverClaim(data); + + Payload claimPayload = CodecDataConverter.claimToPayload("my-driver", claim); + assertTrue(CodecDataConverter.isClaimPayload(claimPayload)); + assertEquals("my-driver", CodecDataConverter.getClaimDriverName(claimPayload)); + + StorageDriverClaim roundTripped = CodecDataConverter.payloadToClaim(claimPayload); + assertEquals("my-bucket", roundTripped.getClaimData().get("bucket")); + assertEquals("some/object/key", roundTripped.getClaimData().get("key")); + } + + @Test + public void testNonClaimPayloadNotDetected() { + Payload normalPayload = + Payload.newBuilder() + .putMetadata("encoding", com.google.protobuf.ByteString.copyFromUtf8("json/plain")) + .build(); + assertFalse(CodecDataConverter.isClaimPayload(normalPayload)); + } + + @Test + public void testStorageDriverClaimEquality() { + Map data1 = new HashMap<>(); + data1.put("key", "value"); + Map data2 = new HashMap<>(); + data2.put("key", "value"); + + StorageDriverClaim claim1 = new StorageDriverClaim(data1); + StorageDriverClaim claim2 = new StorageDriverClaim(data2); + assertEquals(claim1, claim2); + assertEquals(claim1.hashCode(), claim2.hashCode()); + } + + /** A simple in-memory storage driver for testing. */ + static class InMemoryStorageDriver implements StorageDriver { + private final String driverName; + private final Map storage = new HashMap<>(); + + InMemoryStorageDriver(String name) { + this.driverName = name; + } + + @Override + public String name() { + return driverName; + } + + @Override + public String type() { + return "in-memory"; + } + + @Override + public List store( + StorageDriverStoreContext context, List payloads) { + List claims = new ArrayList<>(); + for (byte[] payload : payloads) { + String key = UUID.randomUUID().toString(); + storage.put(key, payload); + Map claimData = new HashMap<>(); + claimData.put("key", key); + claims.add(new StorageDriverClaim(claimData)); + } + return claims; + } + + @Override + public List retrieve( + StorageDriverRetrieveContext context, List claims) { + List results = new ArrayList<>(); + for (StorageDriverClaim claim : claims) { + String key = claim.getClaimData().get("key"); + byte[] data = storage.get(key); + if (data == null) { + throw new StorageDriverException("Key not found: " + key); + } + results.add(data); + } + return results; + } + } +} From c9e530c736be0e6fe6b9b389eafd0c309c031569 Mon Sep 17 00:00:00 2001 From: Bruce Arctor <5032356+brucearctor@users.noreply.github.com> Date: Sat, 20 Jun 2026 21:09:39 -0700 Subject: [PATCH 2/2] fix: Address all review findings for S3 External Storage Driver Critical fixes: - Wire ExternalStorage from WorkflowClientOptions into CodecDataConverter - Apply external storage to failure encode/decode paths (encodePayloads/decodePayloads) - Namespace claim data keys with 'claim.' prefix to prevent metadata key collisions - Remove dynamic bucket override in S3StorageDriver.retrieve() (confused deputy fix) - URL-encode path segments in S3 key construction (path injection fix) - Return null from buildStoreContext() when no context (instead of 'unknown'/'unknown') Improvements: - Add SLF4J logging to S3StorageDriver and AwsSdkV2S3Client (DEBUG/WARN) - Add serialVersionUID to StorageDriverException - Remove misleading 'throws StorageDriverException' from StorageDriver interface - Add null checks in StorageDriverStoreContext factory methods - Validate setDrivers() list elements for null in ExternalStorage.Builder - Add equals()/hashCode() to ExternalStorage - Replace unreachable fallback with IllegalStateException in selectDriver() - Remove redundant IOException catch block in AwsSdkV2S3Client - Add withExternalStorage() method to CodecDataConverter Documentation: - Add retry configuration guidance (AWS SDK v2 built-in retries) - Document SSE encryption, S3 Lifecycle Policies, memory implications - Add thread-safety notes to StorageDriver, S3Client, context classes - Document Claim Check pattern, threshold rationale, server limits - Warn about reserved claim data key names Tests: - Add full pipeline integration test (serialize -> externalize -> retrieve -> deserialize) - Add small payload inline test (below threshold) - Add claim key collision protection test - Add WorkflowClientOptions wiring test --- .../aws/s3driver/AwsSdkV2S3Client.java | 30 ++++- .../contrib/aws/s3driver/S3Client.java | 3 + .../contrib/aws/s3driver/S3StorageDriver.java | 76 +++++++++-- .../aws/s3driver/S3StorageDriverOptions.java | 20 +++ .../client/WorkflowClientOptions.java | 22 ++- .../common/converter/CodecDataConverter.java | 60 +++++++-- .../common/converter/ExternalStorage.java | 37 ++++- .../common/converter/StorageDriver.java | 15 +-- .../common/converter/StorageDriverClaim.java | 7 + .../converter/StorageDriverException.java | 5 + .../StorageDriverRetrieveContext.java | 6 + .../converter/StorageDriverSelector.java | 5 + .../converter/StorageDriverStoreContext.java | 7 +- .../common/converter/ExternalStorageTest.java | 126 ++++++++++++++++++ 14 files changed, 381 insertions(+), 38 deletions(-) diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java index 2c554994c7..cf15b3fe64 100644 --- a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java +++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java @@ -3,9 +3,10 @@ import io.temporal.common.Experimental; import io.temporal.common.converter.StorageDriverException; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.InputStream; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; @@ -14,6 +15,24 @@ * Adapter wrapping the AWS SDK for Java v2 S3 client into the {@link S3Client} interface used by * {@link S3StorageDriver}. * + *

Retry behavior: The AWS SDK v2 has built-in retry logic that is configurable on the + * {@link software.amazon.awssdk.services.s3.S3Client} instance. Users should configure + * appropriate retry policies for production use. For example: + * + *

{@code
+ * software.amazon.awssdk.services.s3.S3Client awsS3 =
+ *     software.amazon.awssdk.services.s3.S3Client.builder()
+ *         .region(Region.US_EAST_1)
+ *         .overrideConfiguration(o -> o.retryPolicy(
+ *             RetryPolicy.builder().numRetries(3).build()))
+ *         .build();
+ * S3Client client = new AwsSdkV2S3Client(awsS3);
+ * }
+ * + *

Impact on workflows: S3 failures during workflow replay will block workflow progress + * until retries succeed or the operation is abandoned. Configure retry policies and timeouts + * accordingly to avoid indefinite blocking. + * *

Example usage: * *

{@code
@@ -26,6 +45,8 @@
 @Experimental
 public final class AwsSdkV2S3Client implements S3Client {
 
+  private static final Logger log = LoggerFactory.getLogger(AwsSdkV2S3Client.class);
+
   private final software.amazon.awssdk.services.s3.S3Client delegate;
 
   /**
@@ -39,10 +60,12 @@ public AwsSdkV2S3Client(software.amazon.awssdk.services.s3.S3Client delegate) {
 
   @Override
   public void putObject(String bucket, String key, byte[] data) {
+    log.debug("S3 PutObject: bucket={}, key={}, size={} bytes", bucket, key, data.length);
     try {
       delegate.putObject(
           PutObjectRequest.builder().bucket(bucket).key(key).build(), RequestBody.fromBytes(data));
     } catch (Exception e) {
+      log.warn("S3 PutObject failed: bucket={}, key={}", bucket, key, e);
       throw new StorageDriverException(
           "Failed to upload object to S3: bucket=" + bucket + ", key=" + key, e);
     }
@@ -50,6 +73,7 @@ public void putObject(String bucket, String key, byte[] data) {
 
   @Override
   public byte[] getObject(String bucket, String key) {
+    log.debug("S3 GetObject: bucket={}, key={}", bucket, key);
     try (InputStream is =
         delegate.getObject(GetObjectRequest.builder().bucket(bucket).key(key).build())) {
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -59,10 +83,8 @@ public byte[] getObject(String bucket, String key) {
         baos.write(buffer, 0, bytesRead);
       }
       return baos.toByteArray();
-    } catch (IOException e) {
-      throw new StorageDriverException(
-          "Failed to download object from S3: bucket=" + bucket + ", key=" + key, e);
     } catch (Exception e) {
+      log.warn("S3 GetObject failed: bucket={}, key={}", bucket, key, e);
       throw new StorageDriverException(
           "Failed to download object from S3: bucket=" + bucket + ", key=" + key, e);
     }
diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java
index cb5b5318cf..5398a93386 100644
--- a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java
+++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java
@@ -6,6 +6,9 @@
  * Abstraction over S3 client operations used by {@link S3StorageDriver}. This interface allows
  * plugging in different S3 client implementations and enables testing with mocks.
  *
+ * 

Implementations must be thread-safe. A single {@code S3Client} instance may be shared + * across multiple workflow threads and activity workers concurrently. + * * @see AwsSdkV2S3Client */ @Experimental diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java index 813df0f016..c06ce17651 100644 --- a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java +++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java @@ -6,6 +6,8 @@ import io.temporal.common.converter.StorageDriverException; import io.temporal.common.converter.StorageDriverRetrieveContext; import io.temporal.common.converter.StorageDriverStoreContext; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -14,6 +16,8 @@ import java.util.Map; import java.util.Objects; import javax.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link StorageDriver} implementation that stores payloads in Amazon S3. Uses content-addressed @@ -46,10 +50,19 @@ *

Important: Temporal does not manage the lifecycle of stored objects. Users must * configure S3 Lifecycle Policies or similar mechanisms for cleanup. Objects should be retained at * least for the workflow lifetime plus the namespace retention period. + * + * @implNote Each payload is held in memory as a {@code byte[]}, so peak memory usage during store + * and retrieve operations is approximately 2–3× the payload size (original bytes, hash + * computation buffer, and S3 request/response buffer). S3 provides data integrity via checksums + * automatically. Content-addressed storage means that duplicate payloads produce identical + * object keys, making writes idempotent — re-uploading the same payload simply overwrites the + * same S3 object with identical content. */ @Experimental public final class S3StorageDriver implements StorageDriver { + private static final Logger log = LoggerFactory.getLogger(S3StorageDriver.class); + private static final String DRIVER_TYPE = "aws.s3driver"; static final String CLAIM_KEY_BUCKET = "bucket"; static final String CLAIM_KEY_OBJECT_KEY = "key"; @@ -90,7 +103,17 @@ public List store(StorageDriverStoreContext context, List claimData = new HashMap<>(); claimData.put(CLAIM_KEY_BUCKET, bucket); @@ -100,25 +123,49 @@ public List store(StorageDriverStoreContext context, ListSecurity note: This method always uses the bucket configured at construction time + * ({@code this.bucket}) and ignores any bucket value stored in claim data. Although claims + * include a bucket field for informational purposes, honoring it at retrieval time would be a + * confused-deputy vulnerability — an attacker who can craft or tamper with claim tokens could + * redirect reads to an arbitrary bucket. + */ @Override public List retrieve( StorageDriverRetrieveContext context, List claims) throws StorageDriverException { List results = new ArrayList<>(claims.size()); for (StorageDriverClaim claim : claims) { - String claimBucket = claim.getClaimData().get(CLAIM_KEY_BUCKET); String objectKey = claim.getClaimData().get(CLAIM_KEY_OBJECT_KEY); if (objectKey == null) { throw new StorageDriverException( "Claim is missing required '" + CLAIM_KEY_OBJECT_KEY + "' field: " + claim); } - // Use the bucket from the claim if present, otherwise fall back to configured bucket. - String effectiveBucket = claimBucket != null ? claimBucket : bucket; - results.add(client.getObject(effectiveBucket, objectKey)); + log.debug("Retrieving payload from S3: bucket={}, key={}", bucket, objectKey); + // Always use the configured bucket for security — claim data could be tampered. + try { + results.add(client.getObject(bucket, objectKey)); + } catch (StorageDriverException e) { + log.warn("Failed to retrieve payload from S3: bucket={}, key={}", bucket, objectKey, e); + throw e; + } } return results; } + /** + * Builds the S3 object key for a payload. + * + *

Each path segment derived from user-controlled input (namespace, workflowId, activityType) + * is URL-encoded to prevent path-injection attacks (e.g., a workflowId containing {@code "../"} + * could escape the intended key hierarchy). + * + * @param context the store context providing namespace, workflowId, and optional activityType + * @param hash the SHA-256 hex digest of the payload + * @return the fully-qualified S3 object key + */ private String buildObjectKey(StorageDriverStoreContext context, String hash) { StringBuilder sb = new StringBuilder(); if (!keyPrefix.isEmpty()) { @@ -127,15 +174,28 @@ private String buildObjectKey(StorageDriverStoreContext context, String hash) { sb.append('/'); } } - sb.append(context.getNamespace()).append('/'); - sb.append(context.getWorkflowId()).append('/'); + sb.append(encodePathSegment(context.getNamespace())).append('/'); + sb.append(encodePathSegment(context.getWorkflowId())).append('/'); if (context.getActivityType() != null) { - sb.append(context.getActivityType()).append('/'); + sb.append(encodePathSegment(context.getActivityType())).append('/'); } sb.append(hash); return sb.toString(); } + /** + * URL-encodes a single path segment, replacing {@code +} with {@code %20} so that spaces are + * represented correctly in S3 object keys. + */ + private static String encodePathSegment(String segment) { + try { + return URLEncoder.encode(segment, "UTF-8").replace("+", "%20"); + } catch (UnsupportedEncodingException e) { + // UTF-8 is guaranteed to be available on all JVMs + throw new AssertionError("UTF-8 encoding not available", e); + } + } + static String sha256Hex(byte[] data) { try { MessageDigest digest = MessageDigest.getInstance("SHA-256"); diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java index 472f639d42..3a0f0067bb 100644 --- a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java +++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java @@ -18,6 +18,26 @@ * .setKeyPrefix("temporal/payloads/") * .build(); * }

+ * + *

Retry configuration: The AWS SDK v2 has built-in retry logic that is configurable on + * the {@code S3Client} passed to {@link AwsSdkV2S3Client}. Users should configure + * appropriate retry policies for production use. For example: + * + *

{@code
+ * S3Client.builder()
+ *     .overrideConfiguration(o -> o.retryPolicy(
+ *         RetryPolicy.builder().numRetries(3).build()))
+ *     .build();
+ * }
+ * + *

S3 failures during workflow replay will block workflow progress until retries succeed. + * + *

Data confidentiality: Payload data is stored as-is in S3. To protect data at rest, + * configure S3 bucket-level encryption (SSE-S3 or SSE-KMS) on the target bucket. + * + *

Data cleanup: Temporal does not manage the lifecycle of stored S3 objects. Configure S3 + * Lifecycle Policies on the target bucket to automatically expire or transition objects. Objects + * should be retained at least for the workflow lifetime plus the namespace retention period. */ @Experimental public final class S3StorageDriverOptions { diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java index 967bf05452..23e5dd4a21 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java @@ -3,6 +3,7 @@ import io.temporal.api.enums.v1.QueryRejectCondition; import io.temporal.common.Experimental; import io.temporal.common.context.ContextPropagator; +import io.temporal.common.converter.CodecDataConverter; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.ExternalStorage; import io.temporal.common.converter.GlobalDataConverter; @@ -250,7 +251,8 @@ private WorkflowClientOptions( QueryRejectCondition queryRejectCondition, WorkflowClientPlugin[] plugins) { this.namespace = namespace; - this.dataConverter = dataConverter; + // Wire external storage into the data converter if configured. + this.dataConverter = wrapWithExternalStorage(dataConverter, externalStorage); this.externalStorage = externalStorage; this.interceptors = interceptors; this.identity = identity; @@ -260,6 +262,24 @@ private WorkflowClientOptions( this.plugins = plugins; } + /** + * If external storage is configured, ensures the data converter is a {@link CodecDataConverter} + * with external storage wired in. If the data converter is already a {@link CodecDataConverter}, + * it is replaced with a new instance that includes external storage. Otherwise, it is wrapped in + * a new {@link CodecDataConverter} with an empty codec chain. + */ + private static DataConverter wrapWithExternalStorage( + DataConverter dataConverter, @Nullable ExternalStorage externalStorage) { + if (externalStorage == null) { + return dataConverter; + } + if (dataConverter instanceof CodecDataConverter) { + CodecDataConverter cdc = (CodecDataConverter) dataConverter; + return cdc.withExternalStorage(externalStorage); + } + return new CodecDataConverter(dataConverter, Collections.emptyList(), false, externalStorage); + } + /** * Should be non-null on a valid instance * diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java b/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java index 69840acc34..4649bfc1f6 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java @@ -51,6 +51,13 @@ public class CodecDataConverter implements DataConverter, PayloadCodec { /** Metadata key for the driver name in a claim payload. */ static final String CLAIM_DRIVER_NAME_KEY = "driver-name"; + /** + * Prefix for claim data keys in payload metadata. All claim data keys are prefixed with this + * string to prevent collisions with reserved metadata keys ({@code encoding}, {@code + * driver-name}). + */ + static final String CLAIM_DATA_PREFIX = "claim."; + private final DataConverter dataConverter; private final ChainCodec chainCodec; private final boolean encodeFailureAttributes; @@ -236,6 +243,20 @@ public CodecDataConverter withContext(@Nonnull SerializationContext context) { dataConverter, chainCodec, encodeFailureAttributes, externalStorage, context); } + /** + * Returns a new {@link CodecDataConverter} with the given external storage configuration. This is + * used internally to wire external storage from {@code WorkflowClientOptions} into the converter + * pipeline without requiring users to pass it to the constructor directly. + * + * @param externalStorage the external storage configuration + * @return a new CodecDataConverter with external storage configured + */ + @Nonnull + public CodecDataConverter withExternalStorage(@Nonnull ExternalStorage externalStorage) { + return new CodecDataConverter( + dataConverter, chainCodec, encodeFailureAttributes, externalStorage, serializationContext); + } + @Nonnull @Override public List encode(@Nonnull List payloads) { @@ -365,12 +386,26 @@ private Failure.Builder decodeFailure(Failure.Builder failure) { return failure; } + /** + * Encodes failure payloads through the codec chain AND external storage. This ensures large + * failure details (heartbeat data, error context) are also offloaded when external storage is + * configured. + */ private Payloads encodePayloads(Payloads decodedPayloads) { - return Payloads.newBuilder().addAllPayloads(encode(decodedPayloads.getPayloadsList())).build(); + List encoded = encode(decodedPayloads.getPayloadsList()); + encoded = storeIfNeeded(encoded); + return Payloads.newBuilder().addAllPayloads(encoded).build(); } + /** + * Decodes failure payloads, retrieving from external storage first if needed, then through the + * codec chain. + */ private Payloads decodePayloads(Payloads encodedPayloads) { - return Payloads.newBuilder().addAllPayloads(decode(encodedPayloads.getPayloadsList())).build(); + List payloads = retrieveIfNeeded(encodedPayloads.getPayloadsList()); + List decoded = + ConverterUtils.withContext(chainCodec, serializationContext).decode(payloads); + return Payloads.newBuilder().addAllPayloads(decoded).build(); } /** @@ -492,7 +527,9 @@ static Payload claimToPayload(String driverName, StorageDriverClaim claim) { builder.putMetadata(EncodingKeys.METADATA_ENCODING_KEY, EXTERNAL_STORAGE_ENCODING); builder.putMetadata(CLAIM_DRIVER_NAME_KEY, ByteString.copyFromUtf8(driverName)); for (Map.Entry entry : claim.getClaimData().entrySet()) { - builder.putMetadata(entry.getKey(), ByteString.copyFromUtf8(entry.getValue())); + // Prefix all claim data keys to prevent collisions with reserved metadata keys. + builder.putMetadata( + CLAIM_DATA_PREFIX + entry.getKey(), ByteString.copyFromUtf8(entry.getValue())); } return builder.build(); } @@ -502,11 +539,10 @@ static StorageDriverClaim payloadToClaim(Payload claimPayload) { Map claimData = new HashMap<>(); for (Map.Entry entry : claimPayload.getMetadataMap().entrySet()) { String key = entry.getKey(); - // Skip the encoding and driver name metadata keys — they are not part of claim data. - if (EncodingKeys.METADATA_ENCODING_KEY.equals(key) || CLAIM_DRIVER_NAME_KEY.equals(key)) { - continue; + // Only include keys with the claim data prefix; strip the prefix. + if (key.startsWith(CLAIM_DATA_PREFIX)) { + claimData.put(key.substring(CLAIM_DATA_PREFIX.length()), entry.getValue().toStringUtf8()); } - claimData.put(key, entry.getValue().toStringUtf8()); } return new StorageDriverClaim(claimData); } @@ -522,6 +558,11 @@ static String getClaimDriverName(Payload claimPayload) { return driverNameBytes.toStringUtf8(); } + /** + * Builds a store context from the current serialization context. Returns {@code null} when no + * serialization context is available, which causes {@link #storeIfNeeded} to skip externalization + * and keep payloads inline. + */ @Nullable private StorageDriverStoreContext buildStoreContext() { if (serializationContext instanceof ActivitySerializationContext) { @@ -534,8 +575,9 @@ private StorageDriverStoreContext buildStoreContext() { return StorageDriverStoreContext.forWorkflow( wfCtx.getNamespace(), wfCtx.getWorkflowId(), null); } - // No context available — fall back to a default context. - return StorageDriverStoreContext.forWorkflow("unknown", "unknown", null); + // No serialization context available — cannot determine where to store payloads. + // Return null so storeIfNeeded() keeps payloads inline. + return null; } @Nonnull diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/ExternalStorage.java b/temporal-sdk/src/main/java/io/temporal/common/converter/ExternalStorage.java index 13787771e1..182b93b7df 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/converter/ExternalStorage.java +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/ExternalStorage.java @@ -5,14 +5,20 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import javax.annotation.Nonnull; import javax.annotation.Nullable; /** * Configuration for external storage that offloads large payloads to an external store (e.g., S3) - * using the Claim Check pattern. Payloads exceeding the {@link #getPayloadSizeThreshold()} are - * stored externally and replaced with a small {@link StorageDriverClaim} reference token in Event - * History. + * using the Claim + * Check enterprise integration pattern. Payloads exceeding the {@link + * #getPayloadSizeThreshold()} are stored externally and replaced with a small {@link + * StorageDriverClaim} reference token in Event History. + * + *

The default threshold of {@link #DEFAULT_PAYLOAD_SIZE_THRESHOLD} (256 KiB) matches the + * Temporal server's gRPC warning threshold. Note that the Temporal server's hard limit for a single + * payload is typically 2 MiB. * *

External storage is the last stage of the data conversion pipeline, running after * {@link io.temporal.payload.codec.PayloadCodec} (compression/encryption): @@ -99,8 +105,8 @@ StorageDriver selectDriver(StorageDriverStoreContext context) { if (driverSelector != null) { return driverSelector.select(context, drivers); } - // Should not happen if builder validates correctly. - return drivers.get(0); + throw new IllegalStateException( + "Multiple drivers configured without a selector. This should not happen — please report this as a bug."); } /** @@ -134,6 +140,21 @@ private String driverNames() { return sb.toString(); } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ExternalStorage that = (ExternalStorage) o; + return payloadSizeThreshold == that.payloadSizeThreshold + && Objects.equals(drivers, that.drivers) + && Objects.equals(driverSelector, that.driverSelector); + } + + @Override + public int hashCode() { + return Objects.hash(payloadSizeThreshold, drivers, driverSelector); + } + @Override public String toString() { return "ExternalStorage{" @@ -170,8 +191,12 @@ public Builder addDriver(@Nonnull StorageDriver driver) { * @return this builder */ public Builder setDrivers(@Nonnull List drivers) { + Preconditions.checkNotNull(drivers, "drivers"); + for (StorageDriver d : drivers) { + Preconditions.checkNotNull(d, "driver in list must not be null"); + } this.drivers.clear(); - this.drivers.addAll(Preconditions.checkNotNull(drivers, "drivers")); + this.drivers.addAll(drivers); return this; } diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java index f8947b7c82..355ea2ff85 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java @@ -8,12 +8,11 @@ * uploading large payloads to external storage (e.g., S3, GCS) and retrieving them using claim * references. * - *

Implementations must be thread-safe as the SDK may call {@link #store} and {@link #retrieve} - * concurrently from multiple threads. - * *

The driver's {@link #name()} is stored in claim tokens persisted in workflow history. Changing * a driver's name after payloads have been stored will break retrieval of those payloads. * + * @implSpec Implementations must be thread-safe since the SDK may call {@link #store} and {@link + * #retrieve} concurrently from multiple workflow task threads. * @see ExternalStorage * @see StorageDriverClaim */ @@ -43,7 +42,9 @@ public interface StorageDriver { * The claims contain addressing information needed to retrieve the data later. * *

Each element in the returned list corresponds to the payload at the same index in the input - * list. + * list. Each {@code byte[]} in the input is the serialized protobuf {@code Payload} message (not + * raw user data) — it has already passed through {@code PayloadConverter} and {@code + * PayloadCodec}. * * @param context context with identity information about the workflow/activity owning the * payloads @@ -52,8 +53,7 @@ public interface StorageDriver { * @return list of claims, one per input payload, in the same order * @throws StorageDriverException if the store operation fails */ - List store(StorageDriverStoreContext context, List payloads) - throws StorageDriverException; + List store(StorageDriverStoreContext context, List payloads); /** * Downloads payloads from external storage using the claim references produced by {@link #store}. @@ -66,6 +66,5 @@ List store(StorageDriverStoreContext context, List p * @return list of serialized payload bytes, one per input claim, in the same order * @throws StorageDriverException if the retrieve operation fails */ - List retrieve(StorageDriverRetrieveContext context, List claims) - throws StorageDriverException; + List retrieve(StorageDriverRetrieveContext context, List claims); } diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverClaim.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverClaim.java index 2dc18dd089..c853f8aabc 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverClaim.java +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverClaim.java @@ -12,6 +12,13 @@ * the contents. * *

For example, an S3 driver might store {@code {"key": ""}} in the claim data. + * + *

Reserved keys: Claim data keys should not use the reserved key names {@code + * encoding}, {@code driver-name}, or any key starting with {@code claim.}, as these are used + * internally by the SDK. + * + *

Note: {@link #toString()} outputs the full claim data map, which may include sensitive + * information such as S3 bucket names and object keys. Use caution when logging. */ @Experimental public final class StorageDriverClaim { diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverException.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverException.java index 04bb327290..9dabcb0396 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverException.java +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverException.java @@ -7,10 +7,15 @@ * *

This exception wraps underlying storage errors (e.g., network failures, permission errors) * into a common type that the SDK can handle uniformly. + * + *

This is an unchecked exception (extends {@link RuntimeException}) for consistency with {@link + * io.temporal.common.converter.DataConverterException}. */ @Experimental public class StorageDriverException extends RuntimeException { + private static final long serialVersionUID = 1L; + public StorageDriverException(String message) { super(message); } diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java index a81f1c8ac2..e4b5cd5a1c 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java @@ -6,6 +6,12 @@ /** * Context passed to {@link StorageDriver#retrieve} providing identity information about the * retrieval environment. This may be used by drivers for logging, metrics, or access control. + * + *

Unlike {@link StorageDriverStoreContext}, fields in this context are nullable because + * retrieval may happen in contexts where the full workflow identity is not available (e.g., during + * query handling). + * + *

Instances of this class are immutable and therefore thread-safe. */ @Experimental public final class StorageDriverRetrieveContext { diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java index 69cdb54dae..2b99d341fd 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java @@ -10,6 +10,11 @@ *

Drivers not selected for storage remain available for retrieval, which supports migration * scenarios (e.g., switching from one S3 bucket to another). * + *

Returning {@code null} from {@link #select} indicates that the payload should not be + * externalized and should remain inline in Event History. This can be used to implement conditional + * externalization logic (e.g., only externalize payloads for specific namespaces or workflow + * types). + * * @see ExternalStorage */ @Experimental diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverStoreContext.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverStoreContext.java index a1da82c920..c0ff6b3e4d 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverStoreContext.java +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverStoreContext.java @@ -1,6 +1,7 @@ package io.temporal.common.converter; import io.temporal.common.Experimental; +import java.util.Objects; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -8,6 +9,8 @@ * Context passed to {@link StorageDriver#store} providing identity information about the workflow * or activity that owns the payload being stored. Drivers can use this context to organize stored * objects (e.g., by namespace and workflow ID in the object key path). + * + *

Instances of this class are immutable and therefore thread-safe. */ @Experimental public final class StorageDriverStoreContext { @@ -22,8 +25,8 @@ private StorageDriverStoreContext( @Nonnull String workflowId, @Nullable String runId, @Nullable String activityType) { - this.namespace = namespace; - this.workflowId = workflowId; + this.namespace = Objects.requireNonNull(namespace, "namespace"); + this.workflowId = Objects.requireNonNull(workflowId, "workflowId"); this.runId = runId; this.activityType = activityType; } diff --git a/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java b/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java index 2ece09de29..88e21d79de 100644 --- a/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java +++ b/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java @@ -3,6 +3,9 @@ import static org.junit.Assert.*; import io.temporal.api.common.v1.Payload; +import io.temporal.api.common.v1.Payloads; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.payload.context.HasWorkflowSerializationContext; import java.util.*; import org.junit.Test; @@ -124,6 +127,129 @@ public void testStorageDriverClaimEquality() { assertEquals(claim1.hashCode(), claim2.hashCode()); } + /** + * Integration test: full CodecDataConverter pipeline roundtrip with external storage. Verifies + * that large payloads are stored and retrieved correctly through the complete pipeline. + */ + @Test + public void testFullPipelineRoundtrip() { + InMemoryStorageDriver driver = new InMemoryStorageDriver("test"); + ExternalStorage es = + ExternalStorage.newBuilder() + .addDriver(driver) + .setPayloadSizeThreshold(10) // Low threshold to trigger externalization + .build(); + + // Create a simple serialization context + HasWorkflowSerializationContext ctx = + new HasWorkflowSerializationContext() { + @Override + public String getNamespace() { + return "test-ns"; + } + + @Override + public String getWorkflowId() { + return "test-wf"; + } + }; + + CodecDataConverter converter = + new CodecDataConverter( + DefaultDataConverter.STANDARD_INSTANCE, Collections.emptyList(), false, es); + CodecDataConverter contextualConverter = converter.withContext(ctx); + + // Serialize a large string (will exceed threshold) + String largeValue = new String(new char[1000]).replace('\0', 'A'); + Optional serialized = contextualConverter.toPayloads(largeValue); + assertTrue(serialized.isPresent()); + + // The payload in serialized should be a claim token, not the original data + Payload payload = serialized.get().getPayloads(0); + assertTrue("Payload should be a claim token", CodecDataConverter.isClaimPayload(payload)); + + // Deserialize — should retrieve from external storage and return original value + String deserialized = + contextualConverter.fromPayloads(0, serialized, String.class, String.class); + assertEquals(largeValue, deserialized); + } + + /** Integration test: small payloads stay inline (below threshold). */ + @Test + public void testSmallPayloadsStayInline() { + InMemoryStorageDriver driver = new InMemoryStorageDriver("test"); + ExternalStorage es = + ExternalStorage.newBuilder() + .addDriver(driver) + .setPayloadSizeThreshold(1_000_000) // Very high threshold + .build(); + + HasWorkflowSerializationContext ctx = + new HasWorkflowSerializationContext() { + @Override + public String getNamespace() { + return "ns"; + } + + @Override + public String getWorkflowId() { + return "wf"; + } + }; + + CodecDataConverter converter = + new CodecDataConverter( + DefaultDataConverter.STANDARD_INSTANCE, Collections.emptyList(), false, es); + CodecDataConverter contextualConverter = converter.withContext(ctx); + + Optional serialized = contextualConverter.toPayloads("small"); + assertTrue(serialized.isPresent()); + Payload payload = serialized.get().getPayloads(0); + assertFalse( + "Small payload should NOT be a claim token", CodecDataConverter.isClaimPayload(payload)); + } + + /** + * Test that claim data keys with reserved names ("encoding", "driver-name") do not collide + * because of the "claim." prefix namespacing. + */ + @Test + public void testClaimKeyCollisionProtection() { + // A driver that uses "encoding" as a claim data key (would collide without prefixing) + Map data = new HashMap<>(); + data.put("encoding", "some-value"); + data.put("driver-name", "another-value"); + data.put("bucket", "my-bucket"); + StorageDriverClaim claim = new StorageDriverClaim(data); + + Payload claimPayload = CodecDataConverter.claimToPayload("my-driver", claim); + + // Verify the claim is still detected as a claim payload + assertTrue(CodecDataConverter.isClaimPayload(claimPayload)); + // Verify the driver name is still correct (not overwritten) + assertEquals("my-driver", CodecDataConverter.getClaimDriverName(claimPayload)); + + // Verify round-trip preserves all claim data including the 'encoding' key + StorageDriverClaim roundTripped = CodecDataConverter.payloadToClaim(claimPayload); + assertEquals("some-value", roundTripped.getClaimData().get("encoding")); + assertEquals("another-value", roundTripped.getClaimData().get("driver-name")); + assertEquals("my-bucket", roundTripped.getClaimData().get("bucket")); + } + + /** Test that WorkflowClientOptions properly wires ExternalStorage into the data converter. */ + @Test + public void testWorkflowClientOptionsWiring() { + InMemoryStorageDriver driver = new InMemoryStorageDriver("test"); + ExternalStorage es = ExternalStorage.newBuilder().addDriver(driver).build(); + + // Without CodecDataConverter — should auto-wrap + WorkflowClientOptions options = + WorkflowClientOptions.newBuilder().setExternalStorage(es).build(); + assertTrue( + "DataConverter should be wrapped in CodecDataConverter when ExternalStorage is set", + options.getDataConverter() instanceof CodecDataConverter); + } + /** A simple in-memory storage driver for testing. */ static class InMemoryStorageDriver implements StorageDriver { private final String driverName;