From be7326e4d186ca21d5b73eafda62fc96c0adfd14 Mon Sep 17 00:00:00 2001 From: Bruce Arctor <5032356+brucearctor@users.noreply.github.com> Date: Sat, 20 Jun 2026 20:40:13 -0700 Subject: [PATCH 1/2] feat: Add S3 External Storage Driver (Claim Check pattern) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements temporalio/sdk-java#2882 — porting the External Storage feature from the Go and Python SDKs to Java. Core interfaces (temporal-sdk): - StorageDriver: interface for external storage drivers - StorageDriverClaim: claim reference token stored in Event History - StorageDriverStoreContext/RetrieveContext: identity context for drivers - StorageDriverSelector: multi-driver selection strategy - ExternalStorage: configuration with builder pattern - StorageDriverException: driver error type Pipeline integration: - CodecDataConverter: wired external storage as last pipeline stage (after PayloadCodec). Payloads >= threshold are stored externally and replaced with claim tokens. Claims are auto-detected and resolved on deserialization. - WorkflowClientOptions: added setExternalStorage() builder method S3 Driver (temporal-s3-external-storage module): - S3Client: abstract interface for S3 operations (testability) - AwsSdkV2S3Client: AWS SDK v2 adapter - S3StorageDriver: content-addressed storage using SHA-256 hashing - S3StorageDriverOptions: builder-based configuration Tests: - 11 core unit tests (ExternalStorageTest) - 6 S3 driver unit tests (S3StorageDriverTest) --- settings.gradle | 1 + temporal-s3-external-storage/build.gradle | 14 ++ .../aws/s3driver/AwsSdkV2S3Client.java | 70 ++++++ .../contrib/aws/s3driver/S3Client.java | 32 +++ .../contrib/aws/s3driver/S3StorageDriver.java | 156 ++++++++++++ .../aws/s3driver/S3StorageDriverOptions.java | 106 ++++++++ .../aws/s3driver/S3StorageDriverTest.java | 111 +++++++++ .../client/WorkflowClientOptions.java | 41 +++ .../common/converter/CodecDataConverter.java | 233 +++++++++++++++++- .../common/converter/ExternalStorage.java | 223 +++++++++++++++++ .../common/converter/StorageDriver.java | 71 ++++++ .../common/converter/StorageDriverClaim.java | 56 +++++ .../converter/StorageDriverException.java | 25 ++ .../StorageDriverRetrieveContext.java | 56 +++++ .../converter/StorageDriverSelector.java | 28 +++ .../converter/StorageDriverStoreContext.java | 107 ++++++++ .../common/converter/ExternalStorageTest.java | 175 +++++++++++++ 17 files changed, 1499 insertions(+), 6 deletions(-) create mode 100644 temporal-s3-external-storage/build.gradle create mode 100644 temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java create mode 100644 temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java create mode 100644 temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java create mode 100644 temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java create mode 100644 temporal-s3-external-storage/src/test/java/io/temporal/contrib/aws/s3driver/S3StorageDriverTest.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/ExternalStorage.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverClaim.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverException.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverStoreContext.java create mode 100644 temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java diff --git a/settings.gradle b/settings.gradle index 918ceaa28e..d64929e8b0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -12,3 +12,4 @@ include 'temporal-remote-data-encoder' include 'temporal-shaded' include 'temporal-workflowcheck' include 'temporal-envconfig' +include 'temporal-s3-external-storage' diff --git a/temporal-s3-external-storage/build.gradle b/temporal-s3-external-storage/build.gradle new file mode 100644 index 0000000000..838e513fb7 --- /dev/null +++ b/temporal-s3-external-storage/build.gradle @@ -0,0 +1,14 @@ +description = '''Temporal S3 External Storage Driver''' + +ext { + awsSdkVersion = '2.25.0' +} + +dependencies { + api project(':temporal-sdk') + implementation "software.amazon.awssdk:s3:${awsSdkVersion}" + + testImplementation "junit:junit:${junitVersion}" + testImplementation "org.mockito:mockito-core:${mockitoVersion}" + testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}" +} diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java new file mode 100644 index 0000000000..2c554994c7 --- /dev/null +++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/AwsSdkV2S3Client.java @@ -0,0 +1,70 @@ +package io.temporal.contrib.aws.s3driver; + +import io.temporal.common.Experimental; +import io.temporal.common.converter.StorageDriverException; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +/** + * Adapter wrapping the AWS SDK for Java v2 S3 client into the {@link S3Client} interface used by + * {@link S3StorageDriver}. + * + *
Example usage: + * + *
{@code
+ * software.amazon.awssdk.services.s3.S3Client awsS3 = software.amazon.awssdk.services.s3.S3Client.builder()
+ * .region(Region.US_EAST_1)
+ * .build();
+ * S3Client client = new AwsSdkV2S3Client(awsS3);
+ * }
+ */
+@Experimental
+public final class AwsSdkV2S3Client implements S3Client {
+
+ private final software.amazon.awssdk.services.s3.S3Client delegate;
+
+ /**
+ * Creates a new adapter wrapping the given AWS SDK v2 S3 client.
+ *
+ * @param delegate the AWS SDK v2 S3 client to delegate to
+ */
+ public AwsSdkV2S3Client(software.amazon.awssdk.services.s3.S3Client delegate) {
+ this.delegate = Objects.requireNonNull(delegate, "delegate");
+ }
+
+ @Override
+ public void putObject(String bucket, String key, byte[] data) {
+ try {
+ delegate.putObject(
+ PutObjectRequest.builder().bucket(bucket).key(key).build(), RequestBody.fromBytes(data));
+ } catch (Exception e) {
+ throw new StorageDriverException(
+ "Failed to upload object to S3: bucket=" + bucket + ", key=" + key, e);
+ }
+ }
+
+ @Override
+ public byte[] getObject(String bucket, String key) {
+ try (InputStream is =
+ delegate.getObject(GetObjectRequest.builder().bucket(bucket).key(key).build())) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] buffer = new byte[8192];
+ int bytesRead;
+ while ((bytesRead = is.read(buffer)) != -1) {
+ baos.write(buffer, 0, bytesRead);
+ }
+ return baos.toByteArray();
+ } catch (IOException e) {
+ throw new StorageDriverException(
+ "Failed to download object from S3: bucket=" + bucket + ", key=" + key, e);
+ } catch (Exception e) {
+ throw new StorageDriverException(
+ "Failed to download object from S3: bucket=" + bucket + ", key=" + key, e);
+ }
+ }
+}
diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java
new file mode 100644
index 0000000000..cb5b5318cf
--- /dev/null
+++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java
@@ -0,0 +1,32 @@
+package io.temporal.contrib.aws.s3driver;
+
+import io.temporal.common.Experimental;
+
+/**
+ * Abstraction over S3 client operations used by {@link S3StorageDriver}. This interface allows
+ * plugging in different S3 client implementations and enables testing with mocks.
+ *
+ * @see AwsSdkV2S3Client
+ */
+@Experimental
+public interface S3Client {
+ /**
+ * Uploads an object to S3.
+ *
+ * @param bucket the S3 bucket name
+ * @param key the S3 object key
+ * @param data the object content
+ * @throws io.temporal.common.converter.StorageDriverException if the upload fails
+ */
+ void putObject(String bucket, String key, byte[] data);
+
+ /**
+ * Downloads an object from S3.
+ *
+ * @param bucket the S3 bucket name
+ * @param key the S3 object key
+ * @return the object content
+ * @throws io.temporal.common.converter.StorageDriverException if the download fails
+ */
+ byte[] getObject(String bucket, String key);
+}
diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java
new file mode 100644
index 0000000000..813df0f016
--- /dev/null
+++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java
@@ -0,0 +1,156 @@
+package io.temporal.contrib.aws.s3driver;
+
+import io.temporal.common.Experimental;
+import io.temporal.common.converter.StorageDriver;
+import io.temporal.common.converter.StorageDriverClaim;
+import io.temporal.common.converter.StorageDriverException;
+import io.temporal.common.converter.StorageDriverRetrieveContext;
+import io.temporal.common.converter.StorageDriverStoreContext;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+
+/**
+ * {@link StorageDriver} implementation that stores payloads in Amazon S3. Uses content-addressed
+ * storage with SHA-256 hashing for natural deduplication.
+ *
+ * S3 object keys are constructed as: + * + *
+ * {keyPrefix}{namespace}/{workflowId}/{sha256-hash}
+ *
+ *
+ * Example usage: + * + *
{@code
+ * software.amazon.awssdk.services.s3.S3Client awsS3 = S3Client.builder()
+ * .region(Region.US_EAST_1)
+ * .build();
+ *
+ * S3StorageDriver driver = new S3StorageDriver(
+ * S3StorageDriverOptions.newBuilder()
+ * .setClient(new AwsSdkV2S3Client(awsS3))
+ * .setBucket("my-temporal-payloads")
+ * .build());
+ *
+ * ExternalStorage externalStorage = ExternalStorage.newBuilder()
+ * .addDriver(driver)
+ * .build();
+ * }
+ *
+ * Important: Temporal does not manage the lifecycle of stored objects. Users must
+ * configure S3 Lifecycle Policies or similar mechanisms for cleanup. Objects should be retained at
+ * least for the workflow lifetime plus the namespace retention period.
+ */
+@Experimental
+public final class S3StorageDriver implements StorageDriver {
+
+ private static final String DRIVER_TYPE = "aws.s3driver";
+ static final String CLAIM_KEY_BUCKET = "bucket";
+ static final String CLAIM_KEY_OBJECT_KEY = "key";
+
+ private final S3Client client;
+ private final String bucket;
+ private final String keyPrefix;
+ private final String driverName;
+
+ /**
+ * Creates a new S3 storage driver with the given options.
+ *
+ * @param options the driver configuration options
+ */
+ public S3StorageDriver(@Nonnull S3StorageDriverOptions options) {
+ Objects.requireNonNull(options, "options");
+ this.client = options.getClient();
+ this.bucket = options.getBucket();
+ this.keyPrefix = options.getKeyPrefix();
+ this.driverName = options.getDriverName();
+ }
+
+ @Override
+ public String name() {
+ return driverName;
+ }
+
+ @Override
+ public String type() {
+ return DRIVER_TYPE;
+ }
+
+ @Override
+ public List Example:
+ *
+ * This is completely transparent to workflow and activity business logic.
+ *
+ * @param externalStorage the external storage configuration, or null to disable
+ * @see ExternalStorage
+ */
+ @Experimental
+ public Builder setExternalStorage(@Nullable ExternalStorage externalStorage) {
+ this.externalStorage = externalStorage;
+ return this;
+ }
+
/**
* Interceptor used to intercept workflow client calls.
*
@@ -157,6 +177,7 @@ public WorkflowClientOptions build() {
return new WorkflowClientOptions(
namespace,
dataConverter,
+ externalStorage,
interceptors,
identity,
binaryChecksum,
@@ -181,6 +202,7 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
return new WorkflowClientOptions(
namespace == null ? DEFAULT_NAMESPACE : namespace,
dataConverter == null ? GlobalDataConverter.get() : dataConverter,
+ externalStorage,
interceptors == null ? EMPTY_INTERCEPTOR_ARRAY : interceptors,
name,
binaryChecksum == null ? DEFAULT_BINARY_CHECKSUM : binaryChecksum,
@@ -203,6 +225,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
private final DataConverter dataConverter;
+ private final @Nullable ExternalStorage externalStorage;
+
private final WorkflowClientInterceptor[] interceptors;
private final String identity;
@@ -218,6 +242,7 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
private WorkflowClientOptions(
String namespace,
DataConverter dataConverter,
+ @Nullable ExternalStorage externalStorage,
WorkflowClientInterceptor[] interceptors,
String identity,
String binaryChecksum,
@@ -226,6 +251,7 @@ private WorkflowClientOptions(
WorkflowClientPlugin[] plugins) {
this.namespace = namespace;
this.dataConverter = dataConverter;
+ this.externalStorage = externalStorage;
this.interceptors = interceptors;
this.identity = identity;
this.binaryChecksum = binaryChecksum;
@@ -247,6 +273,17 @@ public DataConverter getDataConverter() {
return dataConverter;
}
+ /**
+ * Returns the external storage configuration, or null if external storage is not configured.
+ *
+ * @see ExternalStorage
+ */
+ @Experimental
+ @Nullable
+ public ExternalStorage getExternalStorage() {
+ return externalStorage;
+ }
+
public WorkflowClientInterceptor[] getInterceptors() {
return interceptors;
}
@@ -297,6 +334,8 @@ public String toString() {
+ '\''
+ ", dataConverter="
+ dataConverter
+ + ", externalStorage="
+ + externalStorage
+ ", interceptors="
+ Arrays.toString(interceptors)
+ ", identity='"
@@ -321,6 +360,7 @@ public boolean equals(Object o) {
WorkflowClientOptions that = (WorkflowClientOptions) o;
return com.google.common.base.Objects.equal(namespace, that.namespace)
&& com.google.common.base.Objects.equal(dataConverter, that.dataConverter)
+ && com.google.common.base.Objects.equal(externalStorage, that.externalStorage)
&& Arrays.equals(interceptors, that.interceptors)
&& com.google.common.base.Objects.equal(identity, that.identity)
&& com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum)
@@ -334,6 +374,7 @@ public int hashCode() {
return com.google.common.base.Objects.hashCode(
namespace,
dataConverter,
+ externalStorage,
Arrays.hashCode(interceptors),
identity,
binaryChecksum,
diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java b/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java
index a82172348b..69840acc34 100644
--- a/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java
+++ b/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java
@@ -2,6 +2,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.failure.v1.ApplicationFailureInfo;
@@ -11,11 +12,17 @@
import io.temporal.api.failure.v1.TimeoutFailureInfo;
import io.temporal.payload.codec.ChainCodec;
import io.temporal.payload.codec.PayloadCodec;
+import io.temporal.payload.context.ActivitySerializationContext;
+import io.temporal.payload.context.HasWorkflowSerializationContext;
import io.temporal.payload.context.SerializationContext;
import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -31,9 +38,23 @@
public class CodecDataConverter implements DataConverter, PayloadCodec {
private static final String ENCODED_FAILURE_MESSAGE = "Encoded failure";
+ /**
+ * Metadata encoding value for claim check payloads stored in external storage. When a payload is
+ * stored externally, the payload in Event History has this encoding and contains the serialized
+ * claim data.
+ */
+ static final String EXTERNAL_STORAGE_ENCODING_NAME = "temporal-external-storage/claim";
+
+ static final ByteString EXTERNAL_STORAGE_ENCODING =
+ ByteString.copyFrom(EXTERNAL_STORAGE_ENCODING_NAME, StandardCharsets.UTF_8);
+
+ /** Metadata key for the driver name in a claim payload. */
+ static final String CLAIM_DRIVER_NAME_KEY = "driver-name";
+
private final DataConverter dataConverter;
private final ChainCodec chainCodec;
private final boolean encodeFailureAttributes;
+ private final @Nullable ExternalStorage externalStorage;
private final @Nullable SerializationContext serializationContext;
/**
@@ -95,17 +116,39 @@ public CodecDataConverter(
DataConverter dataConverter,
Collection When {@code externalStorage} is configured, payloads exceeding the configured size threshold
+ * are offloaded to external storage after codec encoding. On deserialization, claim tokens are
+ * detected and the original payloads are retrieved from external storage before codec decoding.
+ *
+ * @param dataConverter to delegate data conversion to
+ * @param codecs to delegate bytes encoding/decoding to
+ * @param encodeFailureAttributes enable encoding of Failure attributes
+ * @param externalStorage optional external storage configuration
+ */
+ public CodecDataConverter(
+ DataConverter dataConverter,
+ Collection External storage is the last stage of the data conversion pipeline, running after
+ * {@link io.temporal.payload.codec.PayloadCodec} (compression/encryption):
+ *
+ * This is completely transparent to workflow and activity business logic.
+ *
+ * Example usage:
+ *
+ * Default is {@link #DEFAULT_PAYLOAD_SIZE_THRESHOLD} (256 KiB). Set to 1 to externalize all
+ * payloads.
+ */
+ public int getPayloadSizeThreshold() {
+ return payloadSizeThreshold;
+ }
+
+ /**
+ * Selects the appropriate driver for the given store context. If only one driver is registered,
+ * returns that driver. Otherwise, delegates to the configured {@link StorageDriverSelector}.
+ *
+ * @param context the store context
+ * @return the selected driver, or null if the payload should remain inline
+ */
+ @Nullable
+ StorageDriver selectDriver(StorageDriverStoreContext context) {
+ if (drivers.size() == 1) {
+ return drivers.get(0);
+ }
+ if (driverSelector != null) {
+ return driverSelector.select(context, drivers);
+ }
+ // Should not happen if builder validates correctly.
+ return drivers.get(0);
+ }
+
+ /**
+ * Finds a driver by name for retrieval operations.
+ *
+ * @param driverName the driver name stored in the claim
+ * @return the matching driver
+ * @throws StorageDriverException if no driver with the given name is found
+ */
+ @Nonnull
+ StorageDriver findDriverByName(String driverName) {
+ for (StorageDriver driver : drivers) {
+ if (driver.name().equals(driverName)) {
+ return driver;
+ }
+ }
+ throw new StorageDriverException(
+ "No storage driver found with name '"
+ + driverName
+ + "'. Registered drivers: "
+ + driverNames());
+ }
+
+ private String driverNames() {
+ StringBuilder sb = new StringBuilder("[");
+ for (int i = 0; i < drivers.size(); i++) {
+ if (i > 0) sb.append(", ");
+ sb.append(drivers.get(i).name());
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() {
+ return "ExternalStorage{"
+ + "drivers="
+ + driverNames()
+ + ", payloadSizeThreshold="
+ + payloadSizeThreshold
+ + '}';
+ }
+
+ /** Builder for {@link ExternalStorage}. */
+ public static final class Builder {
+ private final List Default is {@link #DEFAULT_PAYLOAD_SIZE_THRESHOLD} (256 KiB). Set to 1 to externalize all
+ * payloads.
+ *
+ * @param payloadSizeThreshold threshold in bytes, must be positive
+ * @return this builder
+ */
+ public Builder setPayloadSizeThreshold(int payloadSizeThreshold) {
+ Preconditions.checkArgument(
+ payloadSizeThreshold > 0,
+ "payloadSizeThreshold must be positive, got %s",
+ payloadSizeThreshold);
+ this.payloadSizeThreshold = payloadSizeThreshold;
+ return this;
+ }
+
+ /**
+ * Builds the {@link ExternalStorage} configuration.
+ *
+ * @throws IllegalStateException if no drivers are configured or if multiple drivers are
+ * configured without a selector
+ */
+ public ExternalStorage build() {
+ Preconditions.checkState(!drivers.isEmpty(), "At least one StorageDriver must be configured");
+ Preconditions.checkState(
+ drivers.size() <= 1 || driverSelector != null,
+ "A StorageDriverSelector is required when multiple drivers are configured");
+ return new ExternalStorage(drivers, driverSelector, payloadSizeThreshold);
+ }
+ }
+}
diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java
new file mode 100644
index 0000000000..f8947b7c82
--- /dev/null
+++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriver.java
@@ -0,0 +1,71 @@
+package io.temporal.common.converter;
+
+import io.temporal.common.Experimental;
+import java.util.List;
+
+/**
+ * Interface for external storage driver implementations. A storage driver is responsible for
+ * uploading large payloads to external storage (e.g., S3, GCS) and retrieving them using claim
+ * references.
+ *
+ * Implementations must be thread-safe as the SDK may call {@link #store} and {@link #retrieve}
+ * concurrently from multiple threads.
+ *
+ * The driver's {@link #name()} is stored in claim tokens persisted in workflow history. Changing
+ * a driver's name after payloads have been stored will break retrieval of those payloads.
+ *
+ * @see ExternalStorage
+ * @see StorageDriverClaim
+ */
+@Experimental
+public interface StorageDriver {
+
+ /**
+ * Returns a unique string that identifies this specific driver instance. This name is stored in
+ * the claim reference in workflow history to route retrieval requests to the correct driver.
+ *
+ * Critical: Changing this after payloads are stored will break retrieval.
+ *
+ * @return unique driver instance name, never null
+ */
+ String name();
+
+ /**
+ * Returns a string identifying the driver implementation type (e.g., "aws.s3driver"). Must be the
+ * same across all instances of the same driver type.
+ *
+ * @return driver type identifier, never null
+ */
+ String type();
+
+ /**
+ * Uploads payloads to external storage and returns a {@link StorageDriverClaim} for each payload.
+ * The claims contain addressing information needed to retrieve the data later.
+ *
+ * Each element in the returned list corresponds to the payload at the same index in the input
+ * list.
+ *
+ * @param context context with identity information about the workflow/activity owning the
+ * payloads
+ * @param payloads serialized payload bytes to store; each byte array is one serialized Payload
+ * protobuf
+ * @return list of claims, one per input payload, in the same order
+ * @throws StorageDriverException if the store operation fails
+ */
+ List Each element in the returned list corresponds to the claim at the same index in the input
+ * list.
+ *
+ * @param context context with identity information about the retrieval environment
+ * @param claims list of claims identifying the stored payloads
+ * @return list of serialized payload bytes, one per input claim, in the same order
+ * @throws StorageDriverException if the retrieve operation fails
+ */
+ List For example, an S3 driver might store {@code {"key": " This exception wraps underlying storage errors (e.g., network failures, permission errors)
+ * into a common type that the SDK can handle uniformly.
+ */
+@Experimental
+public class StorageDriverException extends RuntimeException {
+
+ public StorageDriverException(String message) {
+ super(message);
+ }
+
+ public StorageDriverException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public StorageDriverException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java
new file mode 100644
index 0000000000..a81f1c8ac2
--- /dev/null
+++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java
@@ -0,0 +1,56 @@
+package io.temporal.common.converter;
+
+import io.temporal.common.Experimental;
+import javax.annotation.Nullable;
+
+/**
+ * Context passed to {@link StorageDriver#retrieve} providing identity information about the
+ * retrieval environment. This may be used by drivers for logging, metrics, or access control.
+ */
+@Experimental
+public final class StorageDriverRetrieveContext {
+
+ private final @Nullable String namespace;
+ private final @Nullable String workflowId;
+
+ private StorageDriverRetrieveContext(@Nullable String namespace, @Nullable String workflowId) {
+ this.namespace = namespace;
+ this.workflowId = workflowId;
+ }
+
+ /**
+ * Creates a retrieve context with available identity information.
+ *
+ * @param namespace the namespace, may be null if not available
+ * @param workflowId the workflow execution ID, may be null if not available
+ * @return a new retrieve context
+ */
+ public static StorageDriverRetrieveContext create(
+ @Nullable String namespace, @Nullable String workflowId) {
+ return new StorageDriverRetrieveContext(namespace, workflowId);
+ }
+
+ /** Returns the namespace, or null if not available in the retrieval context. */
+ @Nullable
+ public String getNamespace() {
+ return namespace;
+ }
+
+ /** Returns the workflow execution ID, or null if not available in the retrieval context. */
+ @Nullable
+ public String getWorkflowId() {
+ return workflowId;
+ }
+
+ @Override
+ public String toString() {
+ return "StorageDriverRetrieveContext{"
+ + "namespace='"
+ + namespace
+ + '\''
+ + ", workflowId='"
+ + workflowId
+ + '\''
+ + '}';
+ }
+}
diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java
new file mode 100644
index 0000000000..69cdb54dae
--- /dev/null
+++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java
@@ -0,0 +1,28 @@
+package io.temporal.common.converter;
+
+import io.temporal.common.Experimental;
+import java.util.List;
+
+/**
+ * Functional interface for selecting which {@link StorageDriver} should be used to store a specific
+ * payload. Required when multiple drivers are registered in {@link ExternalStorage}.
+ *
+ * Drivers not selected for storage remain available for retrieval, which supports migration
+ * scenarios (e.g., switching from one S3 bucket to another).
+ *
+ * @see ExternalStorage
+ */
+@Experimental
+@FunctionalInterface
+public interface StorageDriverSelector {
+
+ /**
+ * Selects a storage driver for the given store context.
+ *
+ * @param context context with identity information about the workflow/activity owning the payload
+ * @param drivers the list of available drivers
+ * @return the selected driver to use for storage, or null to keep the payload inline (not
+ * externalized)
+ */
+ StorageDriver select(StorageDriverStoreContext context, List Retry behavior: The AWS SDK v2 has built-in retry logic that is configurable on the
+ * {@link software.amazon.awssdk.services.s3.S3Client} instance. Users should configure
+ * appropriate retry policies for production use. For example:
+ *
+ * Impact on workflows: S3 failures during workflow replay will block workflow progress
+ * until retries succeed or the operation is abandoned. Configure retry policies and timeouts
+ * accordingly to avoid indefinite blocking.
+ *
* Example usage:
*
* Implementations must be thread-safe. A single {@code S3Client} instance may be shared
+ * across multiple workflow threads and activity workers concurrently.
+ *
* @see AwsSdkV2S3Client
*/
@Experimental
diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java
index 813df0f016..c06ce17651 100644
--- a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java
+++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriver.java
@@ -6,6 +6,8 @@
import io.temporal.common.converter.StorageDriverException;
import io.temporal.common.converter.StorageDriverRetrieveContext;
import io.temporal.common.converter.StorageDriverStoreContext;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
@@ -14,6 +16,8 @@
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* {@link StorageDriver} implementation that stores payloads in Amazon S3. Uses content-addressed
@@ -46,10 +50,19 @@
* Important: Temporal does not manage the lifecycle of stored objects. Users must
* configure S3 Lifecycle Policies or similar mechanisms for cleanup. Objects should be retained at
* least for the workflow lifetime plus the namespace retention period.
+ *
+ * @implNote Each payload is held in memory as a {@code byte[]}, so peak memory usage during store
+ * and retrieve operations is approximately 2–3× the payload size (original bytes, hash
+ * computation buffer, and S3 request/response buffer). S3 provides data integrity via checksums
+ * automatically. Content-addressed storage means that duplicate payloads produce identical
+ * object keys, making writes idempotent — re-uploading the same payload simply overwrites the
+ * same S3 object with identical content.
*/
@Experimental
public final class S3StorageDriver implements StorageDriver {
+ private static final Logger log = LoggerFactory.getLogger(S3StorageDriver.class);
+
private static final String DRIVER_TYPE = "aws.s3driver";
static final String CLAIM_KEY_BUCKET = "bucket";
static final String CLAIM_KEY_OBJECT_KEY = "key";
@@ -90,7 +103,17 @@ public List Each path segment derived from user-controlled input (namespace, workflowId, activityType)
+ * is URL-encoded to prevent path-injection attacks (e.g., a workflowId containing {@code "../"}
+ * could escape the intended key hierarchy).
+ *
+ * @param context the store context providing namespace, workflowId, and optional activityType
+ * @param hash the SHA-256 hex digest of the payload
+ * @return the fully-qualified S3 object key
+ */
private String buildObjectKey(StorageDriverStoreContext context, String hash) {
StringBuilder sb = new StringBuilder();
if (!keyPrefix.isEmpty()) {
@@ -127,15 +174,28 @@ private String buildObjectKey(StorageDriverStoreContext context, String hash) {
sb.append('/');
}
}
- sb.append(context.getNamespace()).append('/');
- sb.append(context.getWorkflowId()).append('/');
+ sb.append(encodePathSegment(context.getNamespace())).append('/');
+ sb.append(encodePathSegment(context.getWorkflowId())).append('/');
if (context.getActivityType() != null) {
- sb.append(context.getActivityType()).append('/');
+ sb.append(encodePathSegment(context.getActivityType())).append('/');
}
sb.append(hash);
return sb.toString();
}
+ /**
+ * URL-encodes a single path segment, replacing {@code +} with {@code %20} so that spaces are
+ * represented correctly in S3 object keys.
+ */
+ private static String encodePathSegment(String segment) {
+ try {
+ return URLEncoder.encode(segment, "UTF-8").replace("+", "%20");
+ } catch (UnsupportedEncodingException e) {
+ // UTF-8 is guaranteed to be available on all JVMs
+ throw new AssertionError("UTF-8 encoding not available", e);
+ }
+ }
+
static String sha256Hex(byte[] data) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java
index 472f639d42..3a0f0067bb 100644
--- a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java
+++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3StorageDriverOptions.java
@@ -18,6 +18,26 @@
* .setKeyPrefix("temporal/payloads/")
* .build();
* } Retry configuration: The AWS SDK v2 has built-in retry logic that is configurable on
+ * the {@code S3Client} passed to {@link AwsSdkV2S3Client}. Users should configure
+ * appropriate retry policies for production use. For example:
+ *
+ * S3 failures during workflow replay will block workflow progress until retries succeed.
+ *
+ * Data confidentiality: Payload data is stored as-is in S3. To protect data at rest,
+ * configure S3 bucket-level encryption (SSE-S3 or SSE-KMS) on the target bucket.
+ *
+ * Data cleanup: Temporal does not manage the lifecycle of stored S3 objects. Configure S3
+ * Lifecycle Policies on the target bucket to automatically expire or transition objects. Objects
+ * should be retained at least for the workflow lifetime plus the namespace retention period.
*/
@Experimental
public final class S3StorageDriverOptions {
diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java
index 967bf05452..23e5dd4a21 100644
--- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java
+++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java
@@ -3,6 +3,7 @@
import io.temporal.api.enums.v1.QueryRejectCondition;
import io.temporal.common.Experimental;
import io.temporal.common.context.ContextPropagator;
+import io.temporal.common.converter.CodecDataConverter;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.ExternalStorage;
import io.temporal.common.converter.GlobalDataConverter;
@@ -250,7 +251,8 @@ private WorkflowClientOptions(
QueryRejectCondition queryRejectCondition,
WorkflowClientPlugin[] plugins) {
this.namespace = namespace;
- this.dataConverter = dataConverter;
+ // Wire external storage into the data converter if configured.
+ this.dataConverter = wrapWithExternalStorage(dataConverter, externalStorage);
this.externalStorage = externalStorage;
this.interceptors = interceptors;
this.identity = identity;
@@ -260,6 +262,24 @@ private WorkflowClientOptions(
this.plugins = plugins;
}
+ /**
+ * If external storage is configured, ensures the data converter is a {@link CodecDataConverter}
+ * with external storage wired in. If the data converter is already a {@link CodecDataConverter},
+ * it is replaced with a new instance that includes external storage. Otherwise, it is wrapped in
+ * a new {@link CodecDataConverter} with an empty codec chain.
+ */
+ private static DataConverter wrapWithExternalStorage(
+ DataConverter dataConverter, @Nullable ExternalStorage externalStorage) {
+ if (externalStorage == null) {
+ return dataConverter;
+ }
+ if (dataConverter instanceof CodecDataConverter) {
+ CodecDataConverter cdc = (CodecDataConverter) dataConverter;
+ return cdc.withExternalStorage(externalStorage);
+ }
+ return new CodecDataConverter(dataConverter, Collections.emptyList(), false, externalStorage);
+ }
+
/**
* Should be non-null on a valid instance
*
diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java b/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java
index 69840acc34..4649bfc1f6 100644
--- a/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java
+++ b/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java
@@ -51,6 +51,13 @@ public class CodecDataConverter implements DataConverter, PayloadCodec {
/** Metadata key for the driver name in a claim payload. */
static final String CLAIM_DRIVER_NAME_KEY = "driver-name";
+ /**
+ * Prefix for claim data keys in payload metadata. All claim data keys are prefixed with this
+ * string to prevent collisions with reserved metadata keys ({@code encoding}, {@code
+ * driver-name}).
+ */
+ static final String CLAIM_DATA_PREFIX = "claim.";
+
private final DataConverter dataConverter;
private final ChainCodec chainCodec;
private final boolean encodeFailureAttributes;
@@ -236,6 +243,20 @@ public CodecDataConverter withContext(@Nonnull SerializationContext context) {
dataConverter, chainCodec, encodeFailureAttributes, externalStorage, context);
}
+ /**
+ * Returns a new {@link CodecDataConverter} with the given external storage configuration. This is
+ * used internally to wire external storage from {@code WorkflowClientOptions} into the converter
+ * pipeline without requiring users to pass it to the constructor directly.
+ *
+ * @param externalStorage the external storage configuration
+ * @return a new CodecDataConverter with external storage configured
+ */
+ @Nonnull
+ public CodecDataConverter withExternalStorage(@Nonnull ExternalStorage externalStorage) {
+ return new CodecDataConverter(
+ dataConverter, chainCodec, encodeFailureAttributes, externalStorage, serializationContext);
+ }
+
@Nonnull
@Override
public List The default threshold of {@link #DEFAULT_PAYLOAD_SIZE_THRESHOLD} (256 KiB) matches the
+ * Temporal server's gRPC warning threshold. Note that the Temporal server's hard limit for a single
+ * payload is typically 2 MiB.
*
* External storage is the last stage of the data conversion pipeline, running after
* {@link io.temporal.payload.codec.PayloadCodec} (compression/encryption):
@@ -99,8 +105,8 @@ StorageDriver selectDriver(StorageDriverStoreContext context) {
if (driverSelector != null) {
return driverSelector.select(context, drivers);
}
- // Should not happen if builder validates correctly.
- return drivers.get(0);
+ throw new IllegalStateException(
+ "Multiple drivers configured without a selector. This should not happen — please report this as a bug.");
}
/**
@@ -134,6 +140,21 @@ private String driverNames() {
return sb.toString();
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ExternalStorage that = (ExternalStorage) o;
+ return payloadSizeThreshold == that.payloadSizeThreshold
+ && Objects.equals(drivers, that.drivers)
+ && Objects.equals(driverSelector, that.driverSelector);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(payloadSizeThreshold, drivers, driverSelector);
+ }
+
@Override
public String toString() {
return "ExternalStorage{"
@@ -170,8 +191,12 @@ public Builder addDriver(@Nonnull StorageDriver driver) {
* @return this builder
*/
public Builder setDrivers(@Nonnull List Implementations must be thread-safe as the SDK may call {@link #store} and {@link #retrieve}
- * concurrently from multiple threads.
- *
* The driver's {@link #name()} is stored in claim tokens persisted in workflow history. Changing
* a driver's name after payloads have been stored will break retrieval of those payloads.
*
+ * @implSpec Implementations must be thread-safe since the SDK may call {@link #store} and {@link
+ * #retrieve} concurrently from multiple workflow task threads.
* @see ExternalStorage
* @see StorageDriverClaim
*/
@@ -43,7 +42,9 @@ public interface StorageDriver {
* The claims contain addressing information needed to retrieve the data later.
*
* Each element in the returned list corresponds to the payload at the same index in the input
- * list.
+ * list. Each {@code byte[]} in the input is the serialized protobuf {@code Payload} message (not
+ * raw user data) — it has already passed through {@code PayloadConverter} and {@code
+ * PayloadCodec}.
*
* @param context context with identity information about the workflow/activity owning the
* payloads
@@ -52,8 +53,7 @@ public interface StorageDriver {
* @return list of claims, one per input payload, in the same order
* @throws StorageDriverException if the store operation fails
*/
- List For example, an S3 driver might store {@code {"key": " Reserved keys: Claim data keys should not use the reserved key names {@code
+ * encoding}, {@code driver-name}, or any key starting with {@code claim.}, as these are used
+ * internally by the SDK.
+ *
+ * Note: {@link #toString()} outputs the full claim data map, which may include sensitive
+ * information such as S3 bucket names and object keys. Use caution when logging.
*/
@Experimental
public final class StorageDriverClaim {
diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverException.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverException.java
index 04bb327290..9dabcb0396 100644
--- a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverException.java
+++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverException.java
@@ -7,10 +7,15 @@
*
* This exception wraps underlying storage errors (e.g., network failures, permission errors)
* into a common type that the SDK can handle uniformly.
+ *
+ * This is an unchecked exception (extends {@link RuntimeException}) for consistency with {@link
+ * io.temporal.common.converter.DataConverterException}.
*/
@Experimental
public class StorageDriverException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
public StorageDriverException(String message) {
super(message);
}
diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java
index a81f1c8ac2..e4b5cd5a1c 100644
--- a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java
+++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverRetrieveContext.java
@@ -6,6 +6,12 @@
/**
* Context passed to {@link StorageDriver#retrieve} providing identity information about the
* retrieval environment. This may be used by drivers for logging, metrics, or access control.
+ *
+ * Unlike {@link StorageDriverStoreContext}, fields in this context are nullable because
+ * retrieval may happen in contexts where the full workflow identity is not available (e.g., during
+ * query handling).
+ *
+ * Instances of this class are immutable and therefore thread-safe.
*/
@Experimental
public final class StorageDriverRetrieveContext {
diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java
index 69cdb54dae..2b99d341fd 100644
--- a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java
+++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverSelector.java
@@ -10,6 +10,11 @@
* Drivers not selected for storage remain available for retrieval, which supports migration
* scenarios (e.g., switching from one S3 bucket to another).
*
+ * Returning {@code null} from {@link #select} indicates that the payload should not be
+ * externalized and should remain inline in Event History. This can be used to implement conditional
+ * externalization logic (e.g., only externalize payloads for specific namespaces or workflow
+ * types).
+ *
* @see ExternalStorage
*/
@Experimental
diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverStoreContext.java b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverStoreContext.java
index a1da82c920..c0ff6b3e4d 100644
--- a/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverStoreContext.java
+++ b/temporal-sdk/src/main/java/io/temporal/common/converter/StorageDriverStoreContext.java
@@ -1,6 +1,7 @@
package io.temporal.common.converter;
import io.temporal.common.Experimental;
+import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -8,6 +9,8 @@
* Context passed to {@link StorageDriver#store} providing identity information about the workflow
* or activity that owns the payload being stored. Drivers can use this context to organize stored
* objects (e.g., by namespace and workflow ID in the object key path).
+ *
+ * Instances of this class are immutable and therefore thread-safe.
*/
@Experimental
public final class StorageDriverStoreContext {
@@ -22,8 +25,8 @@ private StorageDriverStoreContext(
@Nonnull String workflowId,
@Nullable String runId,
@Nullable String activityType) {
- this.namespace = namespace;
- this.workflowId = workflowId;
+ this.namespace = Objects.requireNonNull(namespace, "namespace");
+ this.workflowId = Objects.requireNonNull(workflowId, "workflowId");
this.runId = runId;
this.activityType = activityType;
}
diff --git a/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java b/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java
index 2ece09de29..88e21d79de 100644
--- a/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java
+++ b/temporal-sdk/src/test/java/io/temporal/common/converter/ExternalStorageTest.java
@@ -3,6 +3,9 @@
import static org.junit.Assert.*;
import io.temporal.api.common.v1.Payload;
+import io.temporal.api.common.v1.Payloads;
+import io.temporal.client.WorkflowClientOptions;
+import io.temporal.payload.context.HasWorkflowSerializationContext;
import java.util.*;
import org.junit.Test;
@@ -124,6 +127,129 @@ public void testStorageDriverClaimEquality() {
assertEquals(claim1.hashCode(), claim2.hashCode());
}
+ /**
+ * Integration test: full CodecDataConverter pipeline roundtrip with external storage. Verifies
+ * that large payloads are stored and retrieved correctly through the complete pipeline.
+ */
+ @Test
+ public void testFullPipelineRoundtrip() {
+ InMemoryStorageDriver driver = new InMemoryStorageDriver("test");
+ ExternalStorage es =
+ ExternalStorage.newBuilder()
+ .addDriver(driver)
+ .setPayloadSizeThreshold(10) // Low threshold to trigger externalization
+ .build();
+
+ // Create a simple serialization context
+ HasWorkflowSerializationContext ctx =
+ new HasWorkflowSerializationContext() {
+ @Override
+ public String getNamespace() {
+ return "test-ns";
+ }
+
+ @Override
+ public String getWorkflowId() {
+ return "test-wf";
+ }
+ };
+
+ CodecDataConverter converter =
+ new CodecDataConverter(
+ DefaultDataConverter.STANDARD_INSTANCE, Collections.emptyList(), false, es);
+ CodecDataConverter contextualConverter = converter.withContext(ctx);
+
+ // Serialize a large string (will exceed threshold)
+ String largeValue = new String(new char[1000]).replace('\0', 'A');
+ Optional{@code
+ * S3StorageDriverOptions options = S3StorageDriverOptions.newBuilder()
+ * .setClient(new AwsSdkV2S3Client(awsS3Client))
+ * .setBucket("my-temporal-payloads")
+ * .setKeyPrefix("temporal/payloads/")
+ * .build();
+ * }
+ */
+@Experimental
+public final class S3StorageDriverOptions {
+
+ private static final String DEFAULT_DRIVER_NAME = "aws.s3driver";
+
+ private final S3Client client;
+ private final String bucket;
+ private final String keyPrefix;
+ private final String driverName;
+
+ private S3StorageDriverOptions(
+ S3Client client, String bucket, String keyPrefix, String driverName) {
+ this.client = client;
+ this.bucket = bucket;
+ this.keyPrefix = keyPrefix;
+ this.driverName = driverName;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public S3Client getClient() {
+ return client;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ @Nonnull
+ public String getKeyPrefix() {
+ return keyPrefix;
+ }
+
+ @Nonnull
+ public String getDriverName() {
+ return driverName;
+ }
+
+ public static final class Builder {
+ private S3Client client;
+ private String bucket;
+ private String keyPrefix = "";
+ private String driverName = DEFAULT_DRIVER_NAME;
+
+ private Builder() {}
+
+ /** Sets the S3 client to use for storage operations. Required. */
+ public Builder setClient(@Nonnull S3Client client) {
+ this.client = Objects.requireNonNull(client, "client");
+ return this;
+ }
+
+ /** Sets the S3 bucket name. Required. */
+ public Builder setBucket(@Nonnull String bucket) {
+ this.bucket = Objects.requireNonNull(bucket, "bucket");
+ return this;
+ }
+
+ /**
+ * Sets an optional prefix prepended to all S3 object keys. Useful for organizing payloads
+ * within a shared bucket. Default is empty string (no prefix).
+ */
+ public Builder setKeyPrefix(@Nullable String keyPrefix) {
+ this.keyPrefix = keyPrefix == null ? "" : keyPrefix;
+ return this;
+ }
+
+ /**
+ * Sets the driver instance name. This is stored in claim tokens. Default is "aws.s3driver".
+ * Changing this after payloads are stored will break retrieval.
+ */
+ public Builder setDriverName(@Nonnull String driverName) {
+ this.driverName = Objects.requireNonNull(driverName, "driverName");
+ return this;
+ }
+
+ public S3StorageDriverOptions build() {
+ Preconditions.checkState(client != null, "S3Client must be set");
+ Preconditions.checkState(bucket != null && !bucket.isEmpty(), "Bucket must be set");
+ return new S3StorageDriverOptions(client, bucket, keyPrefix, driverName);
+ }
+ }
+}
diff --git a/temporal-s3-external-storage/src/test/java/io/temporal/contrib/aws/s3driver/S3StorageDriverTest.java b/temporal-s3-external-storage/src/test/java/io/temporal/contrib/aws/s3driver/S3StorageDriverTest.java
new file mode 100644
index 0000000000..67ad920e23
--- /dev/null
+++ b/temporal-s3-external-storage/src/test/java/io/temporal/contrib/aws/s3driver/S3StorageDriverTest.java
@@ -0,0 +1,111 @@
+package io.temporal.contrib.aws.s3driver;
+
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+import io.temporal.common.converter.StorageDriverClaim;
+import io.temporal.common.converter.StorageDriverRetrieveContext;
+import io.temporal.common.converter.StorageDriverStoreContext;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class S3StorageDriverTest {
+
+ private S3Client mockClient;
+ private S3StorageDriver driver;
+
+ @Before
+ public void setUp() {
+ mockClient = mock(S3Client.class);
+ S3StorageDriverOptions options =
+ S3StorageDriverOptions.newBuilder()
+ .setClient(mockClient)
+ .setBucket("test-bucket")
+ .setKeyPrefix("temporal/")
+ .build();
+ driver = new S3StorageDriver(options);
+ }
+
+ @Test
+ public void testNameAndType() {
+ assertEquals("aws.s3driver", driver.name());
+ assertEquals("aws.s3driver", driver.type());
+ }
+
+ @Test
+ public void testStoreAndRetrieve() {
+ byte[] payload = "test payload data".getBytes(StandardCharsets.UTF_8);
+ StorageDriverStoreContext storeCtx =
+ StorageDriverStoreContext.forWorkflow("default", "workflow-123", "run-456");
+
+ // Store
+ List
+ * PayloadConverter → PayloadCodec → ExternalStorage
+ *
+ *
+ * {@code
+ * ExternalStorage externalStorage = ExternalStorage.newBuilder()
+ * .addDriver(s3Driver)
+ * .setPayloadSizeThreshold(256 * 1024) // 256 KiB
+ * .build();
+ * }
+ *
+ * @see StorageDriver
+ */
+@Experimental
+public final class ExternalStorage {
+
+ /** Default payload size threshold: 256 KiB (262144 bytes). */
+ public static final int DEFAULT_PAYLOAD_SIZE_THRESHOLD = 256 * 1024;
+
+ private final List{@code
+ * software.amazon.awssdk.services.s3.S3Client awsS3 =
+ * software.amazon.awssdk.services.s3.S3Client.builder()
+ * .region(Region.US_EAST_1)
+ * .overrideConfiguration(o -> o.retryPolicy(
+ * RetryPolicy.builder().numRetries(3).build()))
+ * .build();
+ * S3Client client = new AwsSdkV2S3Client(awsS3);
+ * }
+ *
+ * {@code
@@ -26,6 +45,8 @@
@Experimental
public final class AwsSdkV2S3Client implements S3Client {
+ private static final Logger log = LoggerFactory.getLogger(AwsSdkV2S3Client.class);
+
private final software.amazon.awssdk.services.s3.S3Client delegate;
/**
@@ -39,10 +60,12 @@ public AwsSdkV2S3Client(software.amazon.awssdk.services.s3.S3Client delegate) {
@Override
public void putObject(String bucket, String key, byte[] data) {
+ log.debug("S3 PutObject: bucket={}, key={}, size={} bytes", bucket, key, data.length);
try {
delegate.putObject(
PutObjectRequest.builder().bucket(bucket).key(key).build(), RequestBody.fromBytes(data));
} catch (Exception e) {
+ log.warn("S3 PutObject failed: bucket={}, key={}", bucket, key, e);
throw new StorageDriverException(
"Failed to upload object to S3: bucket=" + bucket + ", key=" + key, e);
}
@@ -50,6 +73,7 @@ public void putObject(String bucket, String key, byte[] data) {
@Override
public byte[] getObject(String bucket, String key) {
+ log.debug("S3 GetObject: bucket={}, key={}", bucket, key);
try (InputStream is =
delegate.getObject(GetObjectRequest.builder().bucket(bucket).key(key).build())) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -59,10 +83,8 @@ public byte[] getObject(String bucket, String key) {
baos.write(buffer, 0, bytesRead);
}
return baos.toByteArray();
- } catch (IOException e) {
- throw new StorageDriverException(
- "Failed to download object from S3: bucket=" + bucket + ", key=" + key, e);
} catch (Exception e) {
+ log.warn("S3 GetObject failed: bucket={}, key={}", bucket, key, e);
throw new StorageDriverException(
"Failed to download object from S3: bucket=" + bucket + ", key=" + key, e);
}
diff --git a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java
index cb5b5318cf..5398a93386 100644
--- a/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java
+++ b/temporal-s3-external-storage/src/main/java/io/temporal/contrib/aws/s3driver/S3Client.java
@@ -6,6 +6,9 @@
* Abstraction over S3 client operations used by {@link S3StorageDriver}. This interface allows
* plugging in different S3 client implementations and enables testing with mocks.
*
+ *
+ *
+ * {@code
+ * S3Client.builder()
+ * .overrideConfiguration(o -> o.retryPolicy(
+ * RetryPolicy.builder().numRetries(3).build()))
+ * .build();
+ * }
+ *
+ *