diff --git a/s3/README.md b/s3/README.md index 9fdfa65..50b4d58 100644 --- a/s3/README.md +++ b/s3/README.md @@ -12,28 +12,27 @@ The S3 client requires a JSON configuration file with the following structure: ``` json { - "bucket_name": " (required)", - "credentials_source": " [static|env_or_profile|none]", - "access_key_id": " (required if credentials_source = 'static')", - "secret_access_key": " (required if credentials_source = 'static')", - "region": " (optional - default: 'us-east-1')", - "host": " (optional)", - "port": (optional), - "ssl_verify_peer": (optional - default: true), - "use_ssl": (optional - default: true), - "signature_version": " (optional)", - "server_side_encryption": " (optional)", - "sse_kms_key_id": " (optional)", - "multipart_upload": (optional - default: true), - "download_concurrency": (optional - default: 5), - "download_part_size": (optional - default: 5242880), # 5 MB - "upload_concurrency": (optional - default: 5), - "upload_part_size": (optional - default: 5242880) # 5 MB - "multipart_copy_threshold": (optional - default: 5368709120) # default 5 GB - "multipart_copy_part_size": (optional - default: 104857600) # default 100 MB - must be at least 5 MB + "bucket_name": " (required)", + "credentials_source": " [static|env_or_profile|none]", + "access_key_id": " (required if credentials_source = 'static')", + "secret_access_key": " (required if credentials_source = 'static')", + "region": " (optional - default: 'us-east-1')", + "host": " (optional)", + "port": (optional), + "ssl_verify_peer": (optional - default: true), + "use_ssl": (optional - default: true), + "signature_version": " (optional)", + "server_side_encryption": " (optional)", + "sse_kms_key_id": " (optional)", + "download_concurrency": (optional - default: 5), + "download_part_size": (optional - default: 5242880), # 5 MB + "upload_concurrency": (optional - default: 5), + "upload_part_size": (optional - default: 5242880), # 5 MB + "multipart_copy_threshold": (optional - default: 5368709120), # 5 GB - files larger than this use multipart copy + "multipart_copy_part_size": (optional - default: 104857600), # 100 MB - must be at least 5 MB + "single_upload_threshold": (optional - default: 0) # bytes; files <= this use a single PutObject call, larger files use multipart upload. 0 means always use multipart. Max 5 GB for AWS S3. GCS ignores this and always uses single upload. } ``` -> Note: **multipart_upload** is not supported by Google - it's automatically set to false by parsing the provided 'host' **Usage examples:** ```shell diff --git a/s3/client/aws_s3_blobstore.go b/s3/client/aws_s3_blobstore.go index 97cb87c..d087b1d 100644 --- a/s3/client/aws_s3_blobstore.go +++ b/s3/client/aws_s3_blobstore.go @@ -21,7 +21,6 @@ import ( ) var errorInvalidCredentialsSourceValue = errors.New("the client operates in read only mode. Change 'credentials_source' parameter value ") -var oneTB = int64(1000 * 1024 * 1024 * 1024) // Default settings for transfer concurrency and part size. // These values are chosen to align with typical AWS CLI and SDK defaults for efficient S3 uploads and downloads. @@ -33,6 +32,7 @@ const ( // AWS CopyObject limit is 5GB, use 100MB parts for multipart copy defaultMultipartCopyThreshold = int64(5 * 1024 * 1024 * 1024) // 5 GB defaultMultipartCopyPartSize = int64(100 * 1024 * 1024) // 100 MB + maxRetries = 3 ) // awsS3Client encapsulates AWS S3 blobstore interactions @@ -84,17 +84,9 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error { u.Concurrency = cfg.UploadConcurrency } - // PartSize: if multipart uploads disabled, force a very large part to avoid multipart. - // Otherwise, use configured upload part size if present, otherwise default. - if !cfg.MultipartUpload { - // disable multipart uploads by way of large PartSize configuration - u.PartSize = oneTB - } else { - if cfg.UploadPartSize > 0 { - u.PartSize = cfg.UploadPartSize - } else { - u.PartSize = defaultTransferPartSize - } + u.PartSize = defaultTransferPartSize + if cfg.UploadPartSize > 0 { + u.PartSize = cfg.UploadPartSize } if cfg.ShouldDisableUploaderRequestChecksumCalculation() { @@ -116,7 +108,6 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error { } retry := 0 - maxRetries := 3 for { putResult, err := uploader.Upload(context.TODO(), uploadInput) //nolint:staticcheck if err != nil { @@ -136,6 +127,50 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error { } } +// PutSinglePart uploads a blob using a single PutObject call (no multipart). +// Use this for small files where multipart overhead is unnecessary. +func (b *awsS3Client) PutSinglePart(src io.ReadSeeker, dest string) error { + cfg := b.s3cliConfig + if cfg.CredentialsSource == config.NoneCredentialsSource { + return errorInvalidCredentialsSourceValue + } + + input := &s3.PutObjectInput{ + Body: src, + Bucket: aws.String(cfg.BucketName), + Key: b.key(dest), + } + if cfg.ServerSideEncryption != "" { + input.ServerSideEncryption = types.ServerSideEncryption(cfg.ServerSideEncryption) + } + if cfg.SSEKMSKeyID != "" { + input.SSEKMSKeyId = aws.String(cfg.SSEKMSKeyID) + } + + retry := 0 + for { + // Seek back to the start on retries so the full body is re-sent + if retry > 0 { + if _, seekErr := src.Seek(0, io.SeekStart); seekErr != nil { + return fmt.Errorf("failed to seek source for retry: %s", seekErr.Error()) + } + } + + _, err := b.s3Client.PutObject(context.TODO(), input) + if err != nil { + if retry == maxRetries { + return fmt.Errorf("single part upload retry limit exceeded: %s", err.Error()) + } + retry++ + time.Sleep(time.Second * time.Duration(retry)) + continue + } + + slog.Info("Successfully uploaded file (single part)", "key", dest) + return nil + } +} + // Delete removes a blob - no error is returned if the object does not exist func (b *awsS3Client) Delete(dest string) error { if b.s3cliConfig.CredentialsSource == config.NoneCredentialsSource { diff --git a/s3/client/client.go b/s3/client/client.go index 40383b0..d805c70 100644 --- a/s3/client/client.go +++ b/s3/client/client.go @@ -44,6 +44,16 @@ func (c *S3CompatibleClient) Put(src string, dest string) error { return err } defer sourceFile.Close() //nolint:errcheck + + info, err := sourceFile.Stat() + if err != nil { + return err + } + size := info.Size() + + if size <= c.s3cliConfig.SingleUploadThreshold { + return c.awsS3BlobstoreClient.PutSinglePart(sourceFile, dest) + } return c.awsS3BlobstoreClient.Put(sourceFile, dest) } diff --git a/s3/client/sdk.go b/s3/client/sdk.go index b29ccc5..11f654e 100644 --- a/s3/client/sdk.go +++ b/s3/client/sdk.go @@ -37,7 +37,7 @@ func NewAwsS3ClientWithApiOptions( var httpClient *http.Client if c.SSLVerifyPeer { - httpClient = boshhttp.CreateDefaultClient(nil) + httpClient = boshhttp.CreateKeepAliveDefaultClient(nil) } else { httpClient = boshhttp.CreateDefaultClientInsecureSkipVerify() } diff --git a/s3/config/config.go b/s3/config/config.go index b174f4d..5147805 100644 --- a/s3/config/config.go +++ b/s3/config/config.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math" "strings" ) @@ -23,7 +24,6 @@ type S3Cli struct { ServerSideEncryption string `json:"server_side_encryption"` SSEKMSKeyID string `json:"sse_kms_key_id"` AssumeRoleArn string `json:"assume_role_arn"` - MultipartUpload bool `json:"multipart_upload"` HostStyle bool `json:"host_style"` SwiftAuthAccount string `json:"swift_auth_account"` SwiftTempURLKey string `json:"swift_temp_url_key"` @@ -39,12 +39,21 @@ type S3Cli struct { UploadPartSize int64 `json:"upload_part_size"` MultipartCopyThreshold int64 `json:"multipart_copy_threshold"` // Default: 5GB - files larger than this use multipart copy MultipartCopyPartSize int64 `json:"multipart_copy_part_size"` // Default: 100MB - size of each part in multipart copy + + // Files smaller than or equal to this size (in bytes) are uploaded using a single PutObject call. + // Files exceeding this size use multipart upload. Omit or set to 0 to always use multipart upload. + // Must not exceed 5GB (AWS S3 hard limit for PutObject, https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html). + // For GCS, leave this unset (0); it will be automatically set to math.MaxInt64 since GCS requires single put for all uploads but has no size limit. + SingleUploadThreshold int64 `json:"single_upload_threshold"` } const ( // multipartCopyMinPartSize is the AWS minimum part size for multipart operations. // Other providers may have different limits - users should consult their provider's documentation. multipartCopyMinPartSize = 5 * 1024 * 1024 // 5MB - AWS minimum part size + + // singlePutMaxSize is the AWS S3 hard limit for a single PutObject call. + singlePutMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB ) const defaultAWSRegion = "us-east-1" //nolint:unused @@ -85,7 +94,6 @@ func NewFromReader(reader io.Reader) (S3Cli, error) { c := S3Cli{ SSLVerifyPeer: true, UseSSL: true, - MultipartUpload: true, RequestChecksumCalculationEnabled: true, ResponseChecksumCalculationEnabled: true, UploaderRequestChecksumCalculationEnabled: true, @@ -101,6 +109,11 @@ func NewFromReader(reader io.Reader) (S3Cli, error) { return S3Cli{}, errors.New("bucket_name must be set") } + // Validate single put threshold + if c.SingleUploadThreshold < 0 { + return S3Cli{}, errors.New("single_upload_threshold must not be negative") + } + // Validate numeric fields: disallow negative values (zero means "use defaults") if c.DownloadConcurrency < 0 || c.UploadConcurrency < 0 || c.DownloadPartSize < 0 || c.UploadPartSize < 0 { return S3Cli{}, errors.New("download/upload concurrency and part sizes must be non-negative") @@ -158,6 +171,12 @@ func NewFromReader(reader io.Reader) (S3Cli, error) { c.configureDefault() } + // Validate SingleUploadThreshold against the 5GB AWS limit, but only for non-GCS providers. + // GCS has no such limit, and configureGoogle() sets math.MaxInt64 internally. + if !c.IsGoogle() && c.SingleUploadThreshold > singlePutMaxSize { + return S3Cli{}, fmt.Errorf("single_upload_threshold must not exceed %d bytes (5GB - AWS S3 PutObject limit)", singlePutMaxSize) + } + return c, nil } @@ -174,8 +193,6 @@ func Provider(host string) string { } func (c *S3Cli) configureAWS() { - c.MultipartUpload = true - if c.Region == "" { if region := AWSHostToRegion(c.Host); region != "" { c.Region = region @@ -186,7 +203,6 @@ func (c *S3Cli) configureAWS() { } func (c *S3Cli) configureAlicloud() { - c.MultipartUpload = true c.HostStyle = true c.Host = strings.Split(c.Host, ":")[0] @@ -198,7 +214,9 @@ func (c *S3Cli) configureAlicloud() { } func (c *S3Cli) configureGoogle() { - c.MultipartUpload = false + // GCS does not support multipart upload, so all files must be uploaded via a single PutObject call. + // Unlike AWS S3, GCS has no 5GB hard limit on single uploads, so math.MaxInt64 is safe here. + c.SingleUploadThreshold = math.MaxInt64 c.RequestChecksumCalculationEnabled = false } diff --git a/s3/config/config_test.go b/s3/config/config_test.go index 70eca8c..ad7923d 100644 --- a/s3/config/config_test.go +++ b/s3/config/config_test.go @@ -112,26 +112,6 @@ var _ = Describe("BlobstoreClient configuration", func() { }) }) - Context("when MultipartUpload have been set", func() { - dummyJSONBytes := []byte(`{"access_key_id": "id", "secret_access_key": "key", "bucket_name": "some-bucket", "host": "some-host", "region": "some-region", "multipart_upload": false}`) - dummyJSONReader := bytes.NewReader(dummyJSONBytes) - It("sets MultipartUpload to user-specified values", func() { - c, err := config.NewFromReader(dummyJSONReader) - Expect(err).ToNot(HaveOccurred()) - Expect(c.MultipartUpload).To(BeFalse()) - }) - }) - - Context("when MultipartUpload have not been set", func() { - dummyJSONBytes := []byte(`{"access_key_id": "id", "secret_access_key": "key", "bucket_name": "some-bucket", "host": "some-host", "region": "some-region"}`) - dummyJSONReader := bytes.NewReader(dummyJSONBytes) - It("default MultipartUpload to true", func() { - c, err := config.NewFromReader(dummyJSONReader) - Expect(err).ToNot(HaveOccurred()) - Expect(c.MultipartUpload).To(BeTrue()) - }) - }) - Context("when HostStyle has been set", func() { dummyJSONBytes := []byte(`{"access_key_id": "id", "secret_access_key": "key", "bucket_name": "some-bucket", "host": "some-host", "region": "some-region", "host_style": true}`) dummyJSONReader := bytes.NewReader(dummyJSONBytes) @@ -633,16 +613,70 @@ var _ = Describe("BlobstoreClient configuration", func() { }) }) - Describe("checking the alibaba cloud MultipartUpload", func() { - emptyJSONBytes := []byte(`{"access_key_id": "id", "secret_access_key": "key", "bucket_name": "some-bucket", "host": "oss-some-region.aliyuncs.com"}`) - emptyJSONReader := bytes.NewReader(emptyJSONBytes) + Describe("single_upload_threshold", func() { + It("defaults to 0 when not set", func() { + dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket"}`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) - It("defaults to support multipart uploading", func() { - c, err := config.NewFromReader(emptyJSONReader) + c, err := config.NewFromReader(dummyJSONReader) Expect(err).ToNot(HaveOccurred()) - Expect(c.MultipartUpload).To(BeTrue()) + Expect(c.SingleUploadThreshold).To(Equal(int64(0))) + }) + + It("accepts a valid positive value", func() { + dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","single_upload_threshold":104857600}`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.SingleUploadThreshold).To(Equal(int64(104857600))) // 100MB + }) + + It("accepts exactly 5GB (AWS maximum)", func() { + dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","single_upload_threshold":5368709120}`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.SingleUploadThreshold).To(Equal(int64(5368709120))) + }) + + It("rejects negative values", func() { + dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","single_upload_threshold":-1}`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + _, err := config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("single_upload_threshold must not be negative")) + }) + + It("rejects values above 5GB for non-GCS providers", func() { + dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","single_upload_threshold":5368709121}`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + _, err := config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError(ContainSubstring("single_upload_threshold must not exceed"))) + }) + + It("allows values above 5GB for GCS (no hard limit)", func() { + dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","host":"storage.googleapis.com","single_upload_threshold":10737418240}`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + // configureGoogle() overrides to math.MaxInt64 regardless of user input + Expect(c.SingleUploadThreshold).To(Equal(int64(9223372036854775807))) + }) + + It("automatically sets threshold to MaxInt64 for GCS regardless of config", func() { + dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","host":"storage.googleapis.com"}`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.SingleUploadThreshold).To(Equal(int64(9223372036854775807))) }) }) + }) type explodingReader struct{} diff --git a/s3/integration/assertions.go b/s3/integration/assertions.go index 4fe8feb..967fd1d 100644 --- a/s3/integration/assertions.go +++ b/s3/integration/assertions.go @@ -504,3 +504,59 @@ func AssertOnSignedURLs(s3CLIPath string, cfg *config.S3Cli) { _, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename+"_put_test") Expect(err).ToNot(HaveOccurred()) } + +// AssertSinglePartUploadWorks verifies that when SingleUploadThreshold is set above the file size, +// the upload uses a single PutObject call (PutSinglePart) instead of the multipart manager. +func AssertSinglePartUploadWorks(s3CLIPath string, cfg *config.S3Cli) { + s3Filename := GenerateRandomString() + expectedContent := GenerateRandomString(1024) // 1KB — well within the 1MB threshold + sourceFile := MakeContentFile(expectedContent) + defer os.Remove(sourceFile) //nolint:errcheck + + storageType := "s3" + // Set threshold to 1MB — any small test string routes through PutSinglePart + cfg.SingleUploadThreshold = 1 * 1024 * 1024 + + configPath := MakeConfigFile(cfg) + defer os.Remove(configPath) //nolint:errcheck + + configFile, err := os.Open(configPath) + Expect(err).ToNot(HaveOccurred()) + + s3Config, err := config.NewFromReader(configFile) + Expect(err).ToNot(HaveOccurred()) + + // Track API calls to confirm a single PutObject was used (no multipart operations) + calls := []string{} + s3Client, err := CreateTracingS3Client(&s3Config, &calls) + if err != nil { + log.Fatalln(err) + } + + blobstoreClient := client.New(s3Client, &s3Config) + + err = blobstoreClient.Put(sourceFile, s3Filename) + Expect(err).ToNot(HaveOccurred()) + + // A single PutObject call is the fingerprint of PutSinglePart + Expect(calls).To(Equal(expectedPutUploadCalls), "Expected single PutObject (PutSinglePart), got: %v", calls) + + // Download and verify content integrity + tmpLocalFile, err := os.CreateTemp("", "s3cli-download-single-upload") + Expect(err).ToNot(HaveOccurred()) + err = tmpLocalFile.Close() + Expect(err).ToNot(HaveOccurred()) + defer os.Remove(tmpLocalFile.Name()) //nolint:errcheck + + s3CLISession, err := RunS3CLI(s3CLIPath, configPath, storageType, "get", s3Filename, tmpLocalFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + gottenBytes, err := os.ReadFile(tmpLocalFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(string(gottenBytes)).To(Equal(expectedContent)) + + // Clean up + _, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename) + Expect(err).ToNot(HaveOccurred()) +} diff --git a/s3/integration/aws_esc_test.go b/s3/integration/aws_esc_test.go index 930ca62..fc6083f 100644 --- a/s3/integration/aws_esc_test.go +++ b/s3/integration/aws_esc_test.go @@ -118,7 +118,6 @@ var _ = Describe("Testing for AWS European Sovereign Cloud region", func() { SecretAccessKey: secretAccessKey, BucketName: bucketName, Region: region, - MultipartUpload: true, } msg := "upload retry limit exceeded" integration.AssertOnPutFailures(cfg, largeContent, msg) diff --git a/s3/integration/general_aws_test.go b/s3/integration/general_aws_test.go index 2871901..ec9b12c 100644 --- a/s3/integration/general_aws_test.go +++ b/s3/integration/general_aws_test.go @@ -83,6 +83,10 @@ var _ = Describe("General testing for all AWS regions", func() { func(cfg *config.S3Cli) { integration.AssertMultipartCopyWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Single part upload works when threshold exceeds file size", + func(cfg *config.S3Cli) { integration.AssertSinglePartUploadWorks(s3CLIPath, cfg) }, + configurations, + ) configurations = []TableEntry{ Entry("with encryption", &config.S3Cli{ @@ -124,7 +128,6 @@ var _ = Describe("General testing for all AWS regions", func() { SecretAccessKey: secretAccessKey, BucketName: bucketName, Region: region, - MultipartUpload: true, } msg := "upload retry limit exceeded" integration.AssertOnPutFailures(cfg, largeContent, msg) diff --git a/s3/integration/s3_compatible_test.go b/s3/integration/s3_compatible_test.go index 698ed74..52cdc5c 100644 --- a/s3/integration/s3_compatible_test.go +++ b/s3/integration/s3_compatible_test.go @@ -36,7 +36,6 @@ var _ = Describe("Testing in any non-AWS, S3 compatible storage service", func() BucketName: bucketName, Host: s3Host, Region: "invalid-region", - MultipartUpload: true, }), Entry("with use_ssl set to false", &config.S3Cli{ AccessKeyID: accessKeyID, @@ -45,7 +44,6 @@ var _ = Describe("Testing in any non-AWS, S3 compatible storage service", func() Host: s3Host, Region: "invalid-region", UseSSL: false, - MultipartUpload: true, }), Entry("with the maximal configuration", &config.S3Cli{ CredentialsSource: "static", @@ -57,7 +55,6 @@ var _ = Describe("Testing in any non-AWS, S3 compatible storage service", func() UseSSL: true, SSLVerifyPeer: true, Region: "invalid-region", - MultipartUpload: true, }), }