Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions s3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,27 @@ The S3 client requires a JSON configuration file with the following structure:

``` json
{
"bucket_name": "<string> (required)",
"credentials_source": "<string> [static|env_or_profile|none]",
"access_key_id": "<string> (required if credentials_source = 'static')",
"secret_access_key": "<string> (required if credentials_source = 'static')",
"region": "<string> (optional - default: 'us-east-1')",
"host": "<string> (optional)",
"port": <int> (optional),
"ssl_verify_peer": <bool> (optional - default: true),
"use_ssl": <bool> (optional - default: true),
"signature_version": "<string> (optional)",
"server_side_encryption": "<string> (optional)",
"sse_kms_key_id": "<string> (optional)",
"multipart_upload": <bool> (optional - default: true),
"download_concurrency": <int> (optional - default: 5),
"download_part_size": <int64> (optional - default: 5242880), # 5 MB
"upload_concurrency": <int> (optional - default: 5),
"upload_part_size": <int64> (optional - default: 5242880) # 5 MB
"multipart_copy_threshold": <int64> (optional - default: 5368709120) # default 5 GB
"multipart_copy_part_size": <int64> (optional - default: 104857600) # default 100 MB - must be at least 5 MB
"bucket_name": "<string> (required)",
"credentials_source": "<string> [static|env_or_profile|none]",
"access_key_id": "<string> (required if credentials_source = 'static')",
"secret_access_key": "<string> (required if credentials_source = 'static')",
"region": "<string> (optional - default: 'us-east-1')",
"host": "<string> (optional)",
"port": <int> (optional),
"ssl_verify_peer": <bool> (optional - default: true),
"use_ssl": <bool> (optional - default: true),
"signature_version": "<string> (optional)",
"server_side_encryption": "<string> (optional)",
"sse_kms_key_id": "<string> (optional)",
"download_concurrency": <int> (optional - default: 5),
"download_part_size": <int64> (optional - default: 5242880), # 5 MB
"upload_concurrency": <int> (optional - default: 5),
"upload_part_size": <int64> (optional - default: 5242880), # 5 MB
"multipart_copy_threshold": <int64> (optional - default: 5368709120), # 5 GB - files larger than this use multipart copy
"multipart_copy_part_size": <int64> (optional - default: 104857600), # 100 MB - must be at least 5 MB
"single_upload_threshold": <int64> (optional - default: 0) # bytes; files <= this use a single PutObject call, larger files use multipart upload. 0 means always use multipart. Max 5 GB for AWS S3. GCS ignores this and always uses single upload.
}
```
> Note: **multipart_upload** is not supported by Google - it's automatically set to false by parsing the provided 'host'

**Usage examples:**
```shell
Expand Down
61 changes: 48 additions & 13 deletions s3/client/aws_s3_blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
)

var errorInvalidCredentialsSourceValue = errors.New("the client operates in read only mode. Change 'credentials_source' parameter value ")
var oneTB = int64(1000 * 1024 * 1024 * 1024)

// Default settings for transfer concurrency and part size.
// These values are chosen to align with typical AWS CLI and SDK defaults for efficient S3 uploads and downloads.
Expand All @@ -33,6 +32,7 @@ const (
// AWS CopyObject limit is 5GB, use 100MB parts for multipart copy
defaultMultipartCopyThreshold = int64(5 * 1024 * 1024 * 1024) // 5 GB
defaultMultipartCopyPartSize = int64(100 * 1024 * 1024) // 100 MB
maxRetries = 3
)

// awsS3Client encapsulates AWS S3 blobstore interactions
Expand Down Expand Up @@ -84,17 +84,9 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error {
u.Concurrency = cfg.UploadConcurrency
}

// PartSize: if multipart uploads disabled, force a very large part to avoid multipart.
// Otherwise, use configured upload part size if present, otherwise default.
if !cfg.MultipartUpload {
// disable multipart uploads by way of large PartSize configuration
u.PartSize = oneTB
} else {
if cfg.UploadPartSize > 0 {
u.PartSize = cfg.UploadPartSize
} else {
u.PartSize = defaultTransferPartSize
}
u.PartSize = defaultTransferPartSize
if cfg.UploadPartSize > 0 {
u.PartSize = cfg.UploadPartSize
}

if cfg.ShouldDisableUploaderRequestChecksumCalculation() {
Expand All @@ -116,7 +108,6 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error {
}

retry := 0
maxRetries := 3
for {
putResult, err := uploader.Upload(context.TODO(), uploadInput) //nolint:staticcheck
if err != nil {
Expand All @@ -136,6 +127,50 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error {
}
}

// PutSinglePart uploads a blob using a single PutObject call (no multipart).
// Use this for small files where multipart overhead is unnecessary.
func (b *awsS3Client) PutSinglePart(src io.ReadSeeker, dest string) error {
cfg := b.s3cliConfig
if cfg.CredentialsSource == config.NoneCredentialsSource {
return errorInvalidCredentialsSourceValue
}

input := &s3.PutObjectInput{
Body: src,
Bucket: aws.String(cfg.BucketName),
Key: b.key(dest),
}
if cfg.ServerSideEncryption != "" {
input.ServerSideEncryption = types.ServerSideEncryption(cfg.ServerSideEncryption)
}
if cfg.SSEKMSKeyID != "" {
input.SSEKMSKeyId = aws.String(cfg.SSEKMSKeyID)
}

retry := 0
for {
// Seek back to the start on retries so the full body is re-sent
if retry > 0 {
if _, seekErr := src.Seek(0, io.SeekStart); seekErr != nil {
return fmt.Errorf("failed to seek source for retry: %s", seekErr.Error())
}
}

_, err := b.s3Client.PutObject(context.TODO(), input)
if err != nil {
if retry == maxRetries {
return fmt.Errorf("single part upload retry limit exceeded: %s", err.Error())
}
retry++
time.Sleep(time.Second * time.Duration(retry))
continue
}

slog.Info("Successfully uploaded file (single part)", "key", dest)
return nil
}
}

// Delete removes a blob - no error is returned if the object does not exist
func (b *awsS3Client) Delete(dest string) error {
if b.s3cliConfig.CredentialsSource == config.NoneCredentialsSource {
Expand Down
10 changes: 10 additions & 0 deletions s3/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ func (c *S3CompatibleClient) Put(src string, dest string) error {
return err
}
defer sourceFile.Close() //nolint:errcheck

info, err := sourceFile.Stat()
if err != nil {
return err
}
size := info.Size()

if size <= c.s3cliConfig.SingleUploadThreshold {
return c.awsS3BlobstoreClient.PutSinglePart(sourceFile, dest)
}
return c.awsS3BlobstoreClient.Put(sourceFile, dest)
}

Expand Down
2 changes: 1 addition & 1 deletion s3/client/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewAwsS3ClientWithApiOptions(
var httpClient *http.Client

if c.SSLVerifyPeer {
httpClient = boshhttp.CreateDefaultClient(nil)
httpClient = boshhttp.CreateKeepAliveDefaultClient(nil)
} else {
httpClient = boshhttp.CreateDefaultClientInsecureSkipVerify()
}
Expand Down
30 changes: 24 additions & 6 deletions s3/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math"
"strings"
)

Expand All @@ -23,7 +24,6 @@ type S3Cli struct {
ServerSideEncryption string `json:"server_side_encryption"`
SSEKMSKeyID string `json:"sse_kms_key_id"`
AssumeRoleArn string `json:"assume_role_arn"`
MultipartUpload bool `json:"multipart_upload"`
HostStyle bool `json:"host_style"`
SwiftAuthAccount string `json:"swift_auth_account"`
SwiftTempURLKey string `json:"swift_temp_url_key"`
Expand All @@ -39,12 +39,21 @@ type S3Cli struct {
UploadPartSize int64 `json:"upload_part_size"`
MultipartCopyThreshold int64 `json:"multipart_copy_threshold"` // Default: 5GB - files larger than this use multipart copy
MultipartCopyPartSize int64 `json:"multipart_copy_part_size"` // Default: 100MB - size of each part in multipart copy

// Files smaller than or equal to this size (in bytes) are uploaded using a single PutObject call.
// Files exceeding this size use multipart upload. Omit or set to 0 to always use multipart upload.
// Must not exceed 5GB (AWS S3 hard limit for PutObject, https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html).
// For GCS, leave this unset (0); it will be automatically set to math.MaxInt64 since GCS requires single put for all uploads but has no size limit.
SingleUploadThreshold int64 `json:"single_upload_threshold"`
}

const (
// multipartCopyMinPartSize is the AWS minimum part size for multipart operations.
// Other providers may have different limits - users should consult their provider's documentation.
multipartCopyMinPartSize = 5 * 1024 * 1024 // 5MB - AWS minimum part size

// singlePutMaxSize is the AWS S3 hard limit for a single PutObject call.
singlePutMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB
)

const defaultAWSRegion = "us-east-1" //nolint:unused
Expand Down Expand Up @@ -85,7 +94,6 @@ func NewFromReader(reader io.Reader) (S3Cli, error) {
c := S3Cli{
SSLVerifyPeer: true,
UseSSL: true,
MultipartUpload: true,
RequestChecksumCalculationEnabled: true,
ResponseChecksumCalculationEnabled: true,
UploaderRequestChecksumCalculationEnabled: true,
Expand All @@ -101,6 +109,11 @@ func NewFromReader(reader io.Reader) (S3Cli, error) {
return S3Cli{}, errors.New("bucket_name must be set")
}

// Validate single put threshold
if c.SingleUploadThreshold < 0 {
return S3Cli{}, errors.New("single_upload_threshold must not be negative")
}

// Validate numeric fields: disallow negative values (zero means "use defaults")
if c.DownloadConcurrency < 0 || c.UploadConcurrency < 0 || c.DownloadPartSize < 0 || c.UploadPartSize < 0 {
return S3Cli{}, errors.New("download/upload concurrency and part sizes must be non-negative")
Expand Down Expand Up @@ -158,6 +171,12 @@ func NewFromReader(reader io.Reader) (S3Cli, error) {
c.configureDefault()
}

// Validate SingleUploadThreshold against the 5GB AWS limit, but only for non-GCS providers.
// GCS has no such limit, and configureGoogle() sets math.MaxInt64 internally.
if !c.IsGoogle() && c.SingleUploadThreshold > singlePutMaxSize {
return S3Cli{}, fmt.Errorf("single_upload_threshold must not exceed %d bytes (5GB - AWS S3 PutObject limit)", singlePutMaxSize)
}

return c, nil
}

Expand All @@ -174,8 +193,6 @@ func Provider(host string) string {
}

func (c *S3Cli) configureAWS() {
c.MultipartUpload = true

if c.Region == "" {
if region := AWSHostToRegion(c.Host); region != "" {
c.Region = region
Expand All @@ -186,7 +203,6 @@ func (c *S3Cli) configureAWS() {
}

func (c *S3Cli) configureAlicloud() {
c.MultipartUpload = true
c.HostStyle = true

c.Host = strings.Split(c.Host, ":")[0]
Expand All @@ -198,7 +214,9 @@ func (c *S3Cli) configureAlicloud() {
}

func (c *S3Cli) configureGoogle() {
c.MultipartUpload = false
// GCS does not support multipart upload, so all files must be uploaded via a single PutObject call.
// Unlike AWS S3, GCS has no 5GB hard limit on single uploads, so math.MaxInt64 is safe here.
c.SingleUploadThreshold = math.MaxInt64
c.RequestChecksumCalculationEnabled = false
}

Expand Down
86 changes: 60 additions & 26 deletions s3/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,26 +112,6 @@ var _ = Describe("BlobstoreClient configuration", func() {
})
})

Context("when MultipartUpload have been set", func() {
dummyJSONBytes := []byte(`{"access_key_id": "id", "secret_access_key": "key", "bucket_name": "some-bucket", "host": "some-host", "region": "some-region", "multipart_upload": false}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)
It("sets MultipartUpload to user-specified values", func() {
c, err := config.NewFromReader(dummyJSONReader)
Expect(err).ToNot(HaveOccurred())
Expect(c.MultipartUpload).To(BeFalse())
})
})

Context("when MultipartUpload have not been set", func() {
dummyJSONBytes := []byte(`{"access_key_id": "id", "secret_access_key": "key", "bucket_name": "some-bucket", "host": "some-host", "region": "some-region"}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)
It("default MultipartUpload to true", func() {
c, err := config.NewFromReader(dummyJSONReader)
Expect(err).ToNot(HaveOccurred())
Expect(c.MultipartUpload).To(BeTrue())
})
})

Context("when HostStyle has been set", func() {
dummyJSONBytes := []byte(`{"access_key_id": "id", "secret_access_key": "key", "bucket_name": "some-bucket", "host": "some-host", "region": "some-region", "host_style": true}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)
Expand Down Expand Up @@ -633,16 +613,70 @@ var _ = Describe("BlobstoreClient configuration", func() {
})
})

Describe("checking the alibaba cloud MultipartUpload", func() {
emptyJSONBytes := []byte(`{"access_key_id": "id", "secret_access_key": "key", "bucket_name": "some-bucket", "host": "oss-some-region.aliyuncs.com"}`)
emptyJSONReader := bytes.NewReader(emptyJSONBytes)
Describe("single_upload_threshold", func() {
It("defaults to 0 when not set", func() {
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket"}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

It("defaults to support multipart uploading", func() {
c, err := config.NewFromReader(emptyJSONReader)
c, err := config.NewFromReader(dummyJSONReader)
Expect(err).ToNot(HaveOccurred())
Expect(c.MultipartUpload).To(BeTrue())
Expect(c.SingleUploadThreshold).To(Equal(int64(0)))
})

It("accepts a valid positive value", func() {
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","single_upload_threshold":104857600}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

c, err := config.NewFromReader(dummyJSONReader)
Expect(err).ToNot(HaveOccurred())
Expect(c.SingleUploadThreshold).To(Equal(int64(104857600))) // 100MB
})

It("accepts exactly 5GB (AWS maximum)", func() {
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","single_upload_threshold":5368709120}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

c, err := config.NewFromReader(dummyJSONReader)
Expect(err).ToNot(HaveOccurred())
Expect(c.SingleUploadThreshold).To(Equal(int64(5368709120)))
})

It("rejects negative values", func() {
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","single_upload_threshold":-1}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

_, err := config.NewFromReader(dummyJSONReader)
Expect(err).To(MatchError("single_upload_threshold must not be negative"))
})

It("rejects values above 5GB for non-GCS providers", func() {
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","single_upload_threshold":5368709121}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

_, err := config.NewFromReader(dummyJSONReader)
Expect(err).To(MatchError(ContainSubstring("single_upload_threshold must not exceed")))
})

It("allows values above 5GB for GCS (no hard limit)", func() {
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","host":"storage.googleapis.com","single_upload_threshold":10737418240}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

c, err := config.NewFromReader(dummyJSONReader)
Expect(err).ToNot(HaveOccurred())
// configureGoogle() overrides to math.MaxInt64 regardless of user input
Expect(c.SingleUploadThreshold).To(Equal(int64(9223372036854775807)))
})

It("automatically sets threshold to MaxInt64 for GCS regardless of config", func() {
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","host":"storage.googleapis.com"}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

c, err := config.NewFromReader(dummyJSONReader)
Expect(err).ToNot(HaveOccurred())
Expect(c.SingleUploadThreshold).To(Equal(int64(9223372036854775807)))
})
})

})

type explodingReader struct{}
Expand Down
Loading
Loading