diff --git a/settings.gradle b/settings.gradle index 918ceaa28e..d64929e8b0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -12,3 +12,4 @@ include 'temporal-remote-data-encoder' include 'temporal-shaded' include 'temporal-workflowcheck' include 'temporal-envconfig' +include 'temporal-s3-external-storage' diff --git a/temporal-s3-external-storage/build.gradle b/temporal-s3-external-storage/build.gradle new file mode 100644 index 0000000000..838e513fb7 --- /dev/null +++ b/temporal-s3-external-storage/build.gradle @@ -0,0 +1,14 @@ +description = '''Temporal S3 External Storage Driver''' + +ext { + awsSdkVersion = '2.25.0' +} + +dependencies { + api project(':temporal-sdk') + implementation "software.amazon.awssdk:s3:${awsSdkVersion}" + + testImplementation "junit:junit:${junitVersion}" + testImplementation "org.mockito:mockito-core:${mockitoVersion}" + testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}" +} diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java new file mode 100644 index 0000000000..cf15b3fe64 --- /dev/null +++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java @@ -0,0 +1,92 @@ +package io.temporal.contrib.aws.s3driver; + +import io.temporal.common.Experimental; +import io.temporal.common.converter.StorageDriverException; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +/** + * Adapter wrapping the AWS SDK for Java v2 S3 client into the {@link S3Client} interface used by + * {@link S3StorageDriver}. + * + *
Retry behavior: The AWS SDK v2 has built-in retry logic that is configurable on the + * {@link software.amazon.awssdk.services.s3.S3Client} instance. Users should configure + * appropriate retry policies for production use. For example: + * + *
{@code
+ * software.amazon.awssdk.services.s3.S3Client awsS3 =
+ * software.amazon.awssdk.services.s3.S3Client.builder()
+ * .region(Region.US_EAST_1)
+ * .overrideConfiguration(o -> o.retryPolicy(
+ * RetryPolicy.builder().numRetries(3).build()))
+ * .build();
+ * S3Client client = new AwsSdkV2S3Client(awsS3);
+ * }
+ *
+ * Impact on workflows: S3 failures during workflow replay will block workflow progress + * until retries succeed or the operation is abandoned. Configure retry policies and timeouts + * accordingly to avoid indefinite blocking. + * + *
Example usage: + * + *
{@code
+ * software.amazon.awssdk.services.s3.S3Client awsS3 = software.amazon.awssdk.services.s3.S3Client.builder()
+ * .region(Region.US_EAST_1)
+ * .build();
+ * S3Client client = new AwsSdkV2S3Client(awsS3);
+ * }
+ */
+@Experimental
+public final class AwsSdkV2S3Client implements S3Client {
+
+ private static final Logger log = LoggerFactory.getLogger(AwsSdkV2S3Client.class);
+
+ private final software.amazon.awssdk.services.s3.S3Client delegate;
+
+ /**
+ * Creates a new adapter wrapping the given AWS SDK v2 S3 client.
+ *
+ * @param delegate the AWS SDK v2 S3 client to delegate to
+ */
+ public AwsSdkV2S3Client(software.amazon.awssdk.services.s3.S3Client delegate) {
+ this.delegate = Objects.requireNonNull(delegate, "delegate");
+ }
+
+ @Override
+ public void putObject(String bucket, String key, byte[] data) {
+ log.debug("S3 PutObject: bucket={}, key={}, size={} bytes", bucket, key, data.length);
+ try {
+ delegate.putObject(
+ PutObjectRequest.builder().bucket(bucket).key(key).build(), RequestBody.fromBytes(data));
+ } catch (Exception e) {
+ log.warn("S3 PutObject failed: bucket={}, key={}", bucket, key, e);
+ throw new StorageDriverException(
+ "Failed to upload object to S3: bucket=" + bucket + ", key=" + key, e);
+ }
+ }
+
+ @Override
+ public byte[] getObject(String bucket, String key) {
+ log.debug("S3 GetObject: bucket={}, key={}", bucket, key);
+ try (InputStream is =
+ delegate.getObject(GetObjectRequest.builder().bucket(bucket).key(key).build())) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] buffer = new byte[8192];
+ int bytesRead;
+ while ((bytesRead = is.read(buffer)) != -1) {
+ baos.write(buffer, 0, bytesRead);
+ }
+ return baos.toByteArray();
+ } catch (Exception e) {
+ log.warn("S3 GetObject failed: bucket={}, key={}", bucket, key, e);
+ throw new StorageDriverException(
+ "Failed to download object from S3: bucket=" + bucket + ", key=" + key, e);
+ }
+ }
+}
diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java
new file mode 100644
index 0000000000..5398a93386
--- /dev/null
+++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java
@@ -0,0 +1,35 @@
+package io.temporal.contrib.aws.s3driver;
+
+import io.temporal.common.Experimental;
+
+/**
+ * Abstraction over S3 client operations used by {@link S3StorageDriver}. This interface allows
+ * plugging in different S3 client implementations and enables testing with mocks.
+ *
+ * Implementations must be thread-safe. A single {@code S3Client} instance may be shared + * across multiple workflow threads and activity workers concurrently. + * + * @see AwsSdkV2S3Client + */ +@Experimental +public interface S3Client { + /** + * Uploads an object to S3. + * + * @param bucket the S3 bucket name + * @param key the S3 object key + * @param data the object content + * @throws io.temporal.common.converter.StorageDriverException if the upload fails + */ + void putObject(String bucket, String key, byte[] data); + + /** + * Downloads an object from S3. + * + * @param bucket the S3 bucket name + * @param key the S3 object key + * @return the object content + * @throws io.temporal.common.converter.StorageDriverException if the download fails + */ + byte[] getObject(String bucket, String key); +} diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java new file mode 100644 index 0000000000..c06ce17651 --- /dev/null +++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java @@ -0,0 +1,216 @@ +package io.temporal.contrib.aws.s3driver; + +import io.temporal.common.Experimental; +import io.temporal.common.converter.StorageDriver; +import io.temporal.common.converter.StorageDriverClaim; +import io.temporal.common.converter.StorageDriverException; +import io.temporal.common.converter.StorageDriverRetrieveContext; +import io.temporal.common.converter.StorageDriverStoreContext; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link StorageDriver} implementation that stores payloads in Amazon S3. Uses content-addressed + * storage with SHA-256 hashing for natural deduplication. + * + *
S3 object keys are constructed as: + * + *
+ * {keyPrefix}{namespace}/{workflowId}/{sha256-hash}
+ *
+ *
+ * Example usage: + * + *
{@code
+ * software.amazon.awssdk.services.s3.S3Client awsS3 = S3Client.builder()
+ * .region(Region.US_EAST_1)
+ * .build();
+ *
+ * S3StorageDriver driver = new S3StorageDriver(
+ * S3StorageDriverOptions.newBuilder()
+ * .setClient(new AwsSdkV2S3Client(awsS3))
+ * .setBucket("my-temporal-payloads")
+ * .build());
+ *
+ * ExternalStorage externalStorage = ExternalStorage.newBuilder()
+ * .addDriver(driver)
+ * .build();
+ * }
+ *
+ * Important: Temporal does not manage the lifecycle of stored objects. Users must
+ * configure S3 Lifecycle Policies or similar mechanisms for cleanup. Objects should be retained at
+ * least for the workflow lifetime plus the namespace retention period.
+ *
+ * @implNote Each payload is held in memory as a {@code byte[]}, so peak memory usage during store
+ * and retrieve operations is approximately 2–3× the payload size (original bytes, hash
+ * computation buffer, and S3 request/response buffer). S3 provides data integrity via checksums
+ * automatically. Content-addressed storage means that duplicate payloads produce identical
+ * object keys, making writes idempotent — re-uploading the same payload simply overwrites the
+ * same S3 object with identical content.
+ */
+@Experimental
+public final class S3StorageDriver implements StorageDriver {
+
+ private static final Logger log = LoggerFactory.getLogger(S3StorageDriver.class);
+
+ private static final String DRIVER_TYPE = "aws.s3driver";
+ static final String CLAIM_KEY_BUCKET = "bucket";
+ static final String CLAIM_KEY_OBJECT_KEY = "key";
+
+ private final S3Client client;
+ private final String bucket;
+ private final String keyPrefix;
+ private final String driverName;
+
+ /**
+ * Creates a new S3 storage driver with the given options.
+ *
+ * @param options the driver configuration options
+ */
+ public S3StorageDriver(@Nonnull S3StorageDriverOptions options) {
+ Objects.requireNonNull(options, "options");
+ this.client = options.getClient();
+ this.bucket = options.getBucket();
+ this.keyPrefix = options.getKeyPrefix();
+ this.driverName = options.getDriverName();
+ }
+
+ @Override
+ public String name() {
+ return driverName;
+ }
+
+ @Override
+ public String type() {
+ return DRIVER_TYPE;
+ }
+
+ @Override
+ public List Security note: This method always uses the bucket configured at construction time
+ * ({@code this.bucket}) and ignores any bucket value stored in claim data. Although claims
+ * include a bucket field for informational purposes, honoring it at retrieval time would be a
+ * confused-deputy vulnerability — an attacker who can craft or tamper with claim tokens could
+ * redirect reads to an arbitrary bucket.
+ */
+ @Override
+ public List Each path segment derived from user-controlled input (namespace, workflowId, activityType)
+ * is URL-encoded to prevent path-injection attacks (e.g., a workflowId containing {@code "../"}
+ * could escape the intended key hierarchy).
+ *
+ * @param context the store context providing namespace, workflowId, and optional activityType
+ * @param hash the SHA-256 hex digest of the payload
+ * @return the fully-qualified S3 object key
+ */
+ private String buildObjectKey(StorageDriverStoreContext context, String hash) {
+ StringBuilder sb = new StringBuilder();
+ if (!keyPrefix.isEmpty()) {
+ sb.append(keyPrefix);
+ if (!keyPrefix.endsWith("/")) {
+ sb.append('/');
+ }
+ }
+ sb.append(encodePathSegment(context.getNamespace())).append('/');
+ sb.append(encodePathSegment(context.getWorkflowId())).append('/');
+ if (context.getActivityType() != null) {
+ sb.append(encodePathSegment(context.getActivityType())).append('/');
+ }
+ sb.append(hash);
+ return sb.toString();
+ }
+
+ /**
+ * URL-encodes a single path segment, replacing {@code +} with {@code %20} so that spaces are
+ * represented correctly in S3 object keys.
+ */
+ private static String encodePathSegment(String segment) {
+ try {
+ return URLEncoder.encode(segment, "UTF-8").replace("+", "%20");
+ } catch (UnsupportedEncodingException e) {
+ // UTF-8 is guaranteed to be available on all JVMs
+ throw new AssertionError("UTF-8 encoding not available", e);
+ }
+ }
+
+ static String sha256Hex(byte[] data) {
+ try {
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ byte[] hash = digest.digest(data);
+ StringBuilder hexString = new StringBuilder(hash.length * 2);
+ for (byte b : hash) {
+ String hex = Integer.toHexString(0xff & b);
+ if (hex.length() == 1) {
+ hexString.append('0');
+ }
+ hexString.append(hex);
+ }
+ return hexString.toString();
+ } catch (NoSuchAlgorithmException e) {
+ throw new StorageDriverException("SHA-256 algorithm not available", e);
+ }
+ }
+}
diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java
new file mode 100644
index 0000000000..3a0f0067bb
--- /dev/null
+++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java
@@ -0,0 +1,126 @@
+package io.temporal.contrib.aws.s3driver;
+
+import com.google.common.base.Preconditions;
+import io.temporal.common.Experimental;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Configuration options for {@link S3StorageDriver}.
+ *
+ * Example:
+ *
+ * Retry configuration: The AWS SDK v2 has built-in retry logic that is configurable on
+ * the {@code S3Client} passed to {@link AwsSdkV2S3Client}. Users should configure
+ * appropriate retry policies for production use. For example:
+ *
+ * S3 failures during workflow replay will block workflow progress until retries succeed.
+ *
+ * Data confidentiality: Payload data is stored as-is in S3. To protect data at rest,
+ * configure S3 bucket-level encryption (SSE-S3 or SSE-KMS) on the target bucket.
+ *
+ * Data cleanup: Temporal does not manage the lifecycle of stored S3 objects. Configure S3
+ * Lifecycle Policies on the target bucket to automatically expire or transition objects. Objects
+ * should be retained at least for the workflow lifetime plus the namespace retention period.
+ */
+@Experimental
+public final class S3StorageDriverOptions {
+
+ private static final String DEFAULT_DRIVER_NAME = "aws.s3driver";
+
+ private final S3Client client;
+ private final String bucket;
+ private final String keyPrefix;
+ private final String driverName;
+
+ private S3StorageDriverOptions(
+ S3Client client, String bucket, String keyPrefix, String driverName) {
+ this.client = client;
+ this.bucket = bucket;
+ this.keyPrefix = keyPrefix;
+ this.driverName = driverName;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public S3Client getClient() {
+ return client;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ @Nonnull
+ public String getKeyPrefix() {
+ return keyPrefix;
+ }
+
+ @Nonnull
+ public String getDriverName() {
+ return driverName;
+ }
+
+ public static final class Builder {
+ private S3Client client;
+ private String bucket;
+ private String keyPrefix = "";
+ private String driverName = DEFAULT_DRIVER_NAME;
+
+ private Builder() {}
+
+ /** Sets the S3 client to use for storage operations. Required. */
+ public Builder setClient(@Nonnull S3Client client) {
+ this.client = Objects.requireNonNull(client, "client");
+ return this;
+ }
+
+ /** Sets the S3 bucket name. Required. */
+ public Builder setBucket(@Nonnull String bucket) {
+ this.bucket = Objects.requireNonNull(bucket, "bucket");
+ return this;
+ }
+
+ /**
+ * Sets an optional prefix prepended to all S3 object keys. Useful for organizing payloads
+ * within a shared bucket. Default is empty string (no prefix).
+ */
+ public Builder setKeyPrefix(@Nullable String keyPrefix) {
+ this.keyPrefix = keyPrefix == null ? "" : keyPrefix;
+ return this;
+ }
+
+ /**
+ * Sets the driver instance name. This is stored in claim tokens. Default is "aws.s3driver".
+ * Changing this after payloads are stored will break retrieval.
+ */
+ public Builder setDriverName(@Nonnull String driverName) {
+ this.driverName = Objects.requireNonNull(driverName, "driverName");
+ return this;
+ }
+
+ public S3StorageDriverOptions build() {
+ Preconditions.checkState(client != null, "S3Client must be set");
+ Preconditions.checkState(bucket != null && !bucket.isEmpty(), "Bucket must be set");
+ return new S3StorageDriverOptions(client, bucket, keyPrefix, driverName);
+ }
+ }
+}
diff --git a/temporal-s3-external-storage/src/test/java/io/temporal/contrib/aws/s3driver/S3StorageDriverTest.java b/temporal-s3-external-storage/src/test/java/io/temporal/contrib/aws/s3driver/S3StorageDriverTest.java
new file mode 100644
index 0000000000..67ad920e23
--- /dev/null
+++ b/temporal-s3-external-storage/src/test/java/io/temporal/contrib/aws/s3driver/S3StorageDriverTest.java
@@ -0,0 +1,111 @@
+package io.temporal.contrib.aws.s3driver;
+
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+import io.temporal.common.converter.StorageDriverClaim;
+import io.temporal.common.converter.StorageDriverRetrieveContext;
+import io.temporal.common.converter.StorageDriverStoreContext;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class S3StorageDriverTest {
+
+ private S3Client mockClient;
+ private S3StorageDriver driver;
+
+ @Before
+ public void setUp() {
+ mockClient = mock(S3Client.class);
+ S3StorageDriverOptions options =
+ S3StorageDriverOptions.newBuilder()
+ .setClient(mockClient)
+ .setBucket("test-bucket")
+ .setKeyPrefix("temporal/")
+ .build();
+ driver = new S3StorageDriver(options);
+ }
+
+ @Test
+ public void testNameAndType() {
+ assertEquals("aws.s3driver", driver.name());
+ assertEquals("aws.s3driver", driver.type());
+ }
+
+ @Test
+ public void testStoreAndRetrieve() {
+ byte[] payload = "test payload data".getBytes(StandardCharsets.UTF_8);
+ StorageDriverStoreContext storeCtx =
+ StorageDriverStoreContext.forWorkflow("default", "workflow-123", "run-456");
+
+ // Store
+ List This is completely transparent to workflow and activity business logic.
+ *
+ * @param externalStorage the external storage configuration, or null to disable
+ * @see ExternalStorage
+ */
+ @Experimental
+ public Builder setExternalStorage(@Nullable ExternalStorage externalStorage) {
+ this.externalStorage = externalStorage;
+ return this;
+ }
+
/**
* Interceptor used to intercept workflow client calls.
*
@@ -157,6 +178,7 @@ public WorkflowClientOptions build() {
return new WorkflowClientOptions(
namespace,
dataConverter,
+ externalStorage,
interceptors,
identity,
binaryChecksum,
@@ -181,6 +203,7 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
return new WorkflowClientOptions(
namespace == null ? DEFAULT_NAMESPACE : namespace,
dataConverter == null ? GlobalDataConverter.get() : dataConverter,
+ externalStorage,
interceptors == null ? EMPTY_INTERCEPTOR_ARRAY : interceptors,
name,
binaryChecksum == null ? DEFAULT_BINARY_CHECKSUM : binaryChecksum,
@@ -203,6 +226,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
private final DataConverter dataConverter;
+ private final @Nullable ExternalStorage externalStorage;
+
private final WorkflowClientInterceptor[] interceptors;
private final String identity;
@@ -218,6 +243,7 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
private WorkflowClientOptions(
String namespace,
DataConverter dataConverter,
+ @Nullable ExternalStorage externalStorage,
WorkflowClientInterceptor[] interceptors,
String identity,
String binaryChecksum,
@@ -225,7 +251,9 @@ private WorkflowClientOptions(
QueryRejectCondition queryRejectCondition,
WorkflowClientPlugin[] plugins) {
this.namespace = namespace;
- this.dataConverter = dataConverter;
+ // Wire external storage into the data converter if configured.
+ this.dataConverter = wrapWithExternalStorage(dataConverter, externalStorage);
+ this.externalStorage = externalStorage;
this.interceptors = interceptors;
this.identity = identity;
this.binaryChecksum = binaryChecksum;
@@ -234,6 +262,24 @@ private WorkflowClientOptions(
this.plugins = plugins;
}
+ /**
+ * If external storage is configured, ensures the data converter is a {@link CodecDataConverter}
+ * with external storage wired in. If the data converter is already a {@link CodecDataConverter},
+ * it is replaced with a new instance that includes external storage. Otherwise, it is wrapped in
+ * a new {@link CodecDataConverter} with an empty codec chain.
+ */
+ private static DataConverter wrapWithExternalStorage(
+ DataConverter dataConverter, @Nullable ExternalStorage externalStorage) {
+ if (externalStorage == null) {
+ return dataConverter;
+ }
+ if (dataConverter instanceof CodecDataConverter) {
+ CodecDataConverter cdc = (CodecDataConverter) dataConverter;
+ return cdc.withExternalStorage(externalStorage);
+ }
+ return new CodecDataConverter(dataConverter, Collections.emptyList(), false, externalStorage);
+ }
+
/**
* Should be non-null on a valid instance
*
@@ -247,6 +293,17 @@ public DataConverter getDataConverter() {
return dataConverter;
}
+ /**
+ * Returns the external storage configuration, or null if external storage is not configured.
+ *
+ * @see ExternalStorage
+ */
+ @Experimental
+ @Nullable
+ public ExternalStorage getExternalStorage() {
+ return externalStorage;
+ }
+
public WorkflowClientInterceptor[] getInterceptors() {
return interceptors;
}
@@ -297,6 +354,8 @@ public String toString() {
+ '\''
+ ", dataConverter="
+ dataConverter
+ + ", externalStorage="
+ + externalStorage
+ ", interceptors="
+ Arrays.toString(interceptors)
+ ", identity='"
@@ -321,6 +380,7 @@ public boolean equals(Object o) {
WorkflowClientOptions that = (WorkflowClientOptions) o;
return com.google.common.base.Objects.equal(namespace, that.namespace)
&& com.google.common.base.Objects.equal(dataConverter, that.dataConverter)
+ && com.google.common.base.Objects.equal(externalStorage, that.externalStorage)
&& Arrays.equals(interceptors, that.interceptors)
&& com.google.common.base.Objects.equal(identity, that.identity)
&& com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum)
@@ -334,6 +394,7 @@ public int hashCode() {
return com.google.common.base.Objects.hashCode(
namespace,
dataConverter,
+ externalStorage,
Arrays.hashCode(interceptors),
identity,
binaryChecksum,
diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java b/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java
index a82172348b..4649bfc1f6 100644
--- a/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java
+++ b/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java
@@ -2,6 +2,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.failure.v1.ApplicationFailureInfo;
@@ -11,11 +12,17 @@
import io.temporal.api.failure.v1.TimeoutFailureInfo;
import io.temporal.payload.codec.ChainCodec;
import io.temporal.payload.codec.PayloadCodec;
+import io.temporal.payload.context.ActivitySerializationContext;
+import io.temporal.payload.context.HasWorkflowSerializationContext;
import io.temporal.payload.context.SerializationContext;
import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -31,9 +38,30 @@
public class CodecDataConverter implements DataConverter, PayloadCodec {
private static final String ENCODED_FAILURE_MESSAGE = "Encoded failure";
+ /**
+ * Metadata encoding value for claim check payloads stored in external storage. When a payload is
+ * stored externally, the payload in Event History has this encoding and contains the serialized
+ * claim data.
+ */
+ static final String EXTERNAL_STORAGE_ENCODING_NAME = "temporal-external-storage/claim";
+
+ static final ByteString EXTERNAL_STORAGE_ENCODING =
+ ByteString.copyFrom(EXTERNAL_STORAGE_ENCODING_NAME, StandardCharsets.UTF_8);
+
+ /** Metadata key for the driver name in a claim payload. */
+ static final String CLAIM_DRIVER_NAME_KEY = "driver-name";
+
+ /**
+ * Prefix for claim data keys in payload metadata. All claim data keys are prefixed with this
+ * string to prevent collisions with reserved metadata keys ({@code encoding}, {@code
+ * driver-name}).
+ */
+ static final String CLAIM_DATA_PREFIX = "claim.";
+
private final DataConverter dataConverter;
private final ChainCodec chainCodec;
private final boolean encodeFailureAttributes;
+ private final @Nullable ExternalStorage externalStorage;
private final @Nullable SerializationContext serializationContext;
/**
@@ -95,17 +123,39 @@ public CodecDataConverter(
DataConverter dataConverter,
Collection When {@code externalStorage} is configured, payloads exceeding the configured size threshold
+ * are offloaded to external storage after codec encoding. On deserialization, claim tokens are
+ * detected and the original payloads are retrieved from external storage before codec decoding.
+ *
+ * @param dataConverter to delegate data conversion to
+ * @param codecs to delegate bytes encoding/decoding to
+ * @param encodeFailureAttributes enable encoding of Failure attributes
+ * @param externalStorage optional external storage configuration
+ */
+ public CodecDataConverter(
+ DataConverter dataConverter,
+ Collection The default threshold of {@link #DEFAULT_PAYLOAD_SIZE_THRESHOLD} (256 KiB) matches the
+ * Temporal server's gRPC warning threshold. Note that the Temporal server's hard limit for a single
+ * payload is typically 2 MiB.
+ *
+ * External storage is the last stage of the data conversion pipeline, running after
+ * {@link io.temporal.payload.codec.PayloadCodec} (compression/encryption):
+ *
+ * This is completely transparent to workflow and activity business logic.
+ *
+ * Example usage:
+ *
+ * Default is {@link #DEFAULT_PAYLOAD_SIZE_THRESHOLD} (256 KiB). Set to 1 to externalize all
+ * payloads.
+ */
+ public int getPayloadSizeThreshold() {
+ return payloadSizeThreshold;
+ }
+
+ /**
+ * Selects the appropriate driver for the given store context. If only one driver is registered,
+ * returns that driver. Otherwise, delegates to the configured {@link StorageDriverSelector}.
+ *
+ * @param context the store context
+ * @return the selected driver, or null if the payload should remain inline
+ */
+ @Nullable
+ StorageDriver selectDriver(StorageDriverStoreContext context) {
+ if (drivers.size() == 1) {
+ return drivers.get(0);
+ }
+ if (driverSelector != null) {
+ return driverSelector.select(context, drivers);
+ }
+ throw new IllegalStateException(
+ "Multiple drivers configured without a selector. This should not happen — please report this as a bug.");
+ }
+
+ /**
+ * Finds a driver by name for retrieval operations.
+ *
+ * @param driverName the driver name stored in the claim
+ * @return the matching driver
+ * @throws StorageDriverException if no driver with the given name is found
+ */
+ @Nonnull
+ StorageDriver findDriverByName(String driverName) {
+ for (StorageDriver driver : drivers) {
+ if (driver.name().equals(driverName)) {
+ return driver;
+ }
+ }
+ throw new StorageDriverException(
+ "No storage driver found with name '"
+ + driverName
+ + "'. Registered drivers: "
+ + driverNames());
+ }
+
+ private String driverNames() {
+ StringBuilder sb = new StringBuilder("[");
+ for (int i = 0; i < drivers.size(); i++) {
+ if (i > 0) sb.append(", ");
+ sb.append(drivers.get(i).name());
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ExternalStorage that = (ExternalStorage) o;
+ return payloadSizeThreshold == that.payloadSizeThreshold
+ && Objects.equals(drivers, that.drivers)
+ && Objects.equals(driverSelector, that.driverSelector);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(payloadSizeThreshold, drivers, driverSelector);
+ }
+
+ @Override
+ public String toString() {
+ return "ExternalStorage{"
+ + "drivers="
+ + driverNames()
+ + ", payloadSizeThreshold="
+ + payloadSizeThreshold
+ + '}';
+ }
+
+ /** Builder for {@link ExternalStorage}. */
+ public static final class Builder {
+ private final List Default is {@link #DEFAULT_PAYLOAD_SIZE_THRESHOLD} (256 KiB). Set to 1 to externalize all
+ * payloads.
+ *
+ * @param payloadSizeThreshold threshold in bytes, must be positive
+ * @return this builder
+ */
+ public Builder setPayloadSizeThreshold(int payloadSizeThreshold) {
+ Preconditions.checkArgument(
+ payloadSizeThreshold > 0,
+ "payloadSizeThreshold must be positive, got %s",
+ payloadSizeThreshold);
+ this.payloadSizeThreshold = payloadSizeThreshold;
+ return this;
+ }
+
+ /**
+ * Builds the {@link ExternalStorage} configuration.
+ *
+ * @throws IllegalStateException if no drivers are configured or if multiple drivers are
+ * configured without a selector
+ */
+ public ExternalStorage build() {
+ Preconditions.checkState(!drivers.isEmpty(), "At least one StorageDriver must be configured");
+ Preconditions.checkState(
+ drivers.size() <= 1 || driverSelector != null,
+ "A StorageDriverSelector is required when multiple drivers are configured");
+ return new ExternalStorage(drivers, driverSelector, payloadSizeThreshold);
+ }
+ }
+}
diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java
new file mode 100644
index 0000000000..355ea2ff85
--- /dev/null
+++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java
@@ -0,0 +1,70 @@
+package io.temporal.common.converter;
+
+import io.temporal.common.Experimental;
+import java.util.List;
+
+/**
+ * Interface for external storage driver implementations. A storage driver is responsible for
+ * uploading large payloads to external storage (e.g., S3, GCS) and retrieving them using claim
+ * references.
+ *
+ * The driver's {@link #name()} is stored in claim tokens persisted in workflow history. Changing
+ * a driver's name after payloads have been stored will break retrieval of those payloads.
+ *
+ * @implSpec Implementations must be thread-safe since the SDK may call {@link #store} and {@link
+ * #retrieve} concurrently from multiple workflow task threads.
+ * @see ExternalStorage
+ * @see StorageDriverClaim
+ */
+@Experimental
+public interface StorageDriver {
+
+ /**
+ * Returns a unique string that identifies this specific driver instance. This name is stored in
+ * the claim reference in workflow history to route retrieval requests to the correct driver.
+ *
+ * Critical: Changing this after payloads are stored will break retrieval.
+ *
+ * @return unique driver instance name, never null
+ */
+ String name();
+
+ /**
+ * Returns a string identifying the driver implementation type (e.g., "aws.s3driver"). Must be the
+ * same across all instances of the same driver type.
+ *
+ * @return driver type identifier, never null
+ */
+ String type();
+
+ /**
+ * Uploads payloads to external storage and returns a {@link StorageDriverClaim} for each payload.
+ * The claims contain addressing information needed to retrieve the data later.
+ *
+ * Each element in the returned list corresponds to the payload at the same index in the input
+ * list. Each {@code byte[]} in the input is the serialized protobuf {@code Payload} message (not
+ * raw user data) — it has already passed through {@code PayloadConverter} and {@code
+ * PayloadCodec}.
+ *
+ * @param context context with identity information about the workflow/activity owning the
+ * payloads
+ * @param payloads serialized payload bytes to store; each byte array is one serialized Payload
+ * protobuf
+ * @return list of claims, one per input payload, in the same order
+ * @throws StorageDriverException if the store operation fails
+ */
+ List Each element in the returned list corresponds to the claim at the same index in the input
+ * list.
+ *
+ * @param context context with identity information about the retrieval environment
+ * @param claims list of claims identifying the stored payloads
+ * @return list of serialized payload bytes, one per input claim, in the same order
+ * @throws StorageDriverException if the retrieve operation fails
+ */
+ List For example, an S3 driver might store {@code {"key": " Reserved keys: Claim data keys should not use the reserved key names {@code
+ * encoding}, {@code driver-name}, or any key starting with {@code claim.}, as these are used
+ * internally by the SDK.
+ *
+ * Note: {@link #toString()} outputs the full claim data map, which may include sensitive
+ * information such as S3 bucket names and object keys. Use caution when logging.
+ */
+@Experimental
+public final class StorageDriverClaim {
+
+ private final Map This exception wraps underlying storage errors (e.g., network failures, permission errors)
+ * into a common type that the SDK can handle uniformly.
+ *
+ * This is an unchecked exception (extends {@link RuntimeException}) for consistency with {@link
+ * io.temporal.common.converter.DataConverterException}.
+ */
+@Experimental
+public class StorageDriverException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public StorageDriverException(String message) {
+ super(message);
+ }
+
+ public StorageDriverException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public StorageDriverException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java
new file mode 100644
index 0000000000..e4b5cd5a1c
--- /dev/null
+++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java
@@ -0,0 +1,62 @@
+package io.temporal.common.converter;
+
+import io.temporal.common.Experimental;
+import javax.annotation.Nullable;
+
+/**
+ * Context passed to {@link StorageDriver#retrieve} providing identity information about the
+ * retrieval environment. This may be used by drivers for logging, metrics, or access control.
+ *
+ * Unlike {@link StorageDriverStoreContext}, fields in this context are nullable because
+ * retrieval may happen in contexts where the full workflow identity is not available (e.g., during
+ * query handling).
+ *
+ * Instances of this class are immutable and therefore thread-safe.
+ */
+@Experimental
+public final class StorageDriverRetrieveContext {
+
+ private final @Nullable String namespace;
+ private final @Nullable String workflowId;
+
+ private StorageDriverRetrieveContext(@Nullable String namespace, @Nullable String workflowId) {
+ this.namespace = namespace;
+ this.workflowId = workflowId;
+ }
+
+ /**
+ * Creates a retrieve context with available identity information.
+ *
+ * @param namespace the namespace, may be null if not available
+ * @param workflowId the workflow execution ID, may be null if not available
+ * @return a new retrieve context
+ */
+ public static StorageDriverRetrieveContext create(
+ @Nullable String namespace, @Nullable String workflowId) {
+ return new StorageDriverRetrieveContext(namespace, workflowId);
+ }
+
+ /** Returns the namespace, or null if not available in the retrieval context. */
+ @Nullable
+ public String getNamespace() {
+ return namespace;
+ }
+
+ /** Returns the workflow execution ID, or null if not available in the retrieval context. */
+ @Nullable
+ public String getWorkflowId() {
+ return workflowId;
+ }
+
+ @Override
+ public String toString() {
+ return "StorageDriverRetrieveContext{"
+ + "namespace='"
+ + namespace
+ + '\''
+ + ", workflowId='"
+ + workflowId
+ + '\''
+ + '}';
+ }
+}
diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java
new file mode 100644
index 0000000000..2b99d341fd
--- /dev/null
+++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java
@@ -0,0 +1,33 @@
+package io.temporal.common.converter;
+
+import io.temporal.common.Experimental;
+import java.util.List;
+
+/**
+ * Functional interface for selecting which {@link StorageDriver} should be used to store a specific
+ * payload. Required when multiple drivers are registered in {@link ExternalStorage}.
+ *
+ * Drivers not selected for storage remain available for retrieval, which supports migration
+ * scenarios (e.g., switching from one S3 bucket to another).
+ *
+ * Returning {@code null} from {@link #select} indicates that the payload should not be
+ * externalized and should remain inline in Event History. This can be used to implement conditional
+ * externalization logic (e.g., only externalize payloads for specific namespaces or workflow
+ * types).
+ *
+ * @see ExternalStorage
+ */
+@Experimental
+@FunctionalInterface
+public interface StorageDriverSelector {
+
+ /**
+ * Selects a storage driver for the given store context.
+ *
+ * @param context context with identity information about the workflow/activity owning the payload
+ * @param drivers the list of available drivers
+ * @return the selected driver to use for storage, or null to keep the payload inline (not
+ * externalized)
+ */
+ StorageDriver select(StorageDriverStoreContext context, List Instances of this class are immutable and therefore thread-safe.
+ */
+@Experimental
+public final class StorageDriverStoreContext {
+
+ private final @Nonnull String namespace;
+ private final @Nonnull String workflowId;
+ private final @Nullable String runId;
+ private final @Nullable String activityType;
+
+ private StorageDriverStoreContext(
+ @Nonnull String namespace,
+ @Nonnull String workflowId,
+ @Nullable String runId,
+ @Nullable String activityType) {
+ this.namespace = Objects.requireNonNull(namespace, "namespace");
+ this.workflowId = Objects.requireNonNull(workflowId, "workflowId");
+ this.runId = runId;
+ this.activityType = activityType;
+ }
+
+ /**
+ * Creates a store context for a workflow-level payload.
+ *
+ * @param namespace the namespace the workflow belongs to
+ * @param workflowId the workflow execution ID
+ * @param runId the workflow run ID, may be null
+ * @return a new store context
+ */
+ public static StorageDriverStoreContext forWorkflow(
+ @Nonnull String namespace, @Nonnull String workflowId, @Nullable String runId) {
+ return new StorageDriverStoreContext(namespace, workflowId, runId, null);
+ }
+
+ /**
+ * Creates a store context for an activity-level payload.
+ *
+ * @param namespace the namespace the workflow belongs to
+ * @param workflowId the workflow execution ID
+ * @param runId the workflow run ID, may be null
+ * @param activityType the activity type name
+ * @return a new store context
+ */
+ public static StorageDriverStoreContext forActivity(
+ @Nonnull String namespace,
+ @Nonnull String workflowId,
+ @Nullable String runId,
+ @Nonnull String activityType) {
+ return new StorageDriverStoreContext(namespace, workflowId, runId, activityType);
+ }
+
+ /** Returns the namespace the workflow execution belongs to. */
+ @Nonnull
+ public String getNamespace() {
+ return namespace;
+ }
+
+ /** Returns the workflow execution ID. */
+ @Nonnull
+ public String getWorkflowId() {
+ return workflowId;
+ }
+
+ /** Returns the workflow run ID, or null if not available. */
+ @Nullable
+ public String getRunId() {
+ return runId;
+ }
+
+ /** Returns the activity type name, or null if this is a workflow-level context. */
+ @Nullable
+ public String getActivityType() {
+ return activityType;
+ }
+
+ /** Returns true if this context is for an activity payload (vs. a workflow payload). */
+ public boolean isActivityContext() {
+ return activityType != null;
+ }
+
+ @Override
+ public String toString() {
+ return "StorageDriverStoreContext{"
+ + "namespace='"
+ + namespace
+ + '\''
+ + ", workflowId='"
+ + workflowId
+ + '\''
+ + ", runId='"
+ + runId
+ + '\''
+ + ", activityType='"
+ + activityType
+ + '\''
+ + '}';
+ }
+}
diff --git a/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java b/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java
new file mode 100644
index 0000000000..88e21d79de
--- /dev/null
+++ b/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java
@@ -0,0 +1,301 @@
+package io.temporal.common.converter;
+
+import static org.junit.Assert.*;
+
+import io.temporal.api.common.v1.Payload;
+import io.temporal.api.common.v1.Payloads;
+import io.temporal.client.WorkflowClientOptions;
+import io.temporal.payload.context.HasWorkflowSerializationContext;
+import java.util.*;
+import org.junit.Test;
+
+public class ExternalStorageTest {
+
+ @Test
+ public void testBuilderDefaults() {
+ StorageDriver driver = new InMemoryStorageDriver("test");
+ ExternalStorage es = ExternalStorage.newBuilder().addDriver(driver).build();
+ assertEquals(ExternalStorage.DEFAULT_PAYLOAD_SIZE_THRESHOLD, es.getPayloadSizeThreshold());
+ assertEquals(1, es.getDrivers().size());
+ assertNull(es.getDriverSelector());
+ }
+
+ @Test
+ public void testCustomThreshold() {
+ StorageDriver driver = new InMemoryStorageDriver("test");
+ ExternalStorage es =
+ ExternalStorage.newBuilder().addDriver(driver).setPayloadSizeThreshold(1024).build();
+ assertEquals(1024, es.getPayloadSizeThreshold());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testNoDriversFails() {
+ ExternalStorage.newBuilder().build();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMultiDriversWithoutSelectorFails() {
+ ExternalStorage.newBuilder()
+ .addDriver(new InMemoryStorageDriver("a"))
+ .addDriver(new InMemoryStorageDriver("b"))
+ .build();
+ }
+
+ @Test
+ public void testMultiDriversWithSelector() {
+ StorageDriver driverA = new InMemoryStorageDriver("a");
+ StorageDriver driverB = new InMemoryStorageDriver("b");
+ ExternalStorage es =
+ ExternalStorage.newBuilder()
+ .addDriver(driverA)
+ .addDriver(driverB)
+ .setDriverSelector((context, drivers) -> drivers.get(0))
+ .build();
+ assertEquals(2, es.getDrivers().size());
+ }
+
+ @Test
+ public void testSelectDriverSingleDriver() {
+ StorageDriver driver = new InMemoryStorageDriver("test");
+ ExternalStorage es = ExternalStorage.newBuilder().addDriver(driver).build();
+ StorageDriverStoreContext ctx = StorageDriverStoreContext.forWorkflow("ns", "wf", null);
+ assertSame(driver, es.selectDriver(ctx));
+ }
+
+ @Test
+ public void testFindDriverByName() {
+ StorageDriver driverA = new InMemoryStorageDriver("a");
+ StorageDriver driverB = new InMemoryStorageDriver("b");
+ ExternalStorage es =
+ ExternalStorage.newBuilder()
+ .addDriver(driverA)
+ .addDriver(driverB)
+ .setDriverSelector((context, drivers) -> drivers.get(0))
+ .build();
+ assertSame(driverB, es.findDriverByName("b"));
+ }
+
+ @Test(expected = StorageDriverException.class)
+ public void testFindDriverByNameNotFound() {
+ StorageDriver driver = new InMemoryStorageDriver("test");
+ ExternalStorage es = ExternalStorage.newBuilder().addDriver(driver).build();
+ es.findDriverByName("nonexistent");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidThreshold() {
+ ExternalStorage.newBuilder()
+ .addDriver(new InMemoryStorageDriver("test"))
+ .setPayloadSizeThreshold(0);
+ }
+
+ @Test
+ public void testClaimToPayloadAndBack() {
+ Map{@code
+ * S3StorageDriverOptions options = S3StorageDriverOptions.newBuilder()
+ * .setClient(new AwsSdkV2S3Client(awsS3Client))
+ * .setBucket("my-temporal-payloads")
+ * .setKeyPrefix("temporal/payloads/")
+ * .build();
+ * }
+ *
+ * {@code
+ * S3Client.builder()
+ * .overrideConfiguration(o -> o.retryPolicy(
+ * RetryPolicy.builder().numRetries(3).build()))
+ * .build();
+ * }
+ *
+ *
+ * PayloadConverter → PayloadCodec → ExternalStorage
+ *
+ *
+ * {@code
+ * ExternalStorage externalStorage = ExternalStorage.newBuilder()
+ * .addDriver(s3Driver)
+ * .setPayloadSizeThreshold(256 * 1024) // 256 KiB
+ * .build();
+ * }
+ *
+ * @see StorageDriver
+ */
+@Experimental
+public final class ExternalStorage {
+
+ /** Default payload size threshold: 256 KiB (262144 bytes). */
+ public static final int DEFAULT_PAYLOAD_SIZE_THRESHOLD = 256 * 1024;
+
+ private final List