Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions drivers/115_open/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,30 @@ func (d *Open115) multpartUpload(ctx context.Context, stream model.FileStreamer,
return err
}

// The OSS STS token returned by UploadGetToken expires in about 1 hour.
// A slow or large upload can outlive it, causing SecurityTokenExpired /
// InvalidAccessKeyId. Refresh the token ahead of expiry and rebuild the OSS
// client with the new credentials, reusing the same multipart session (imur)
// to keep uploading the remaining parts.
tokenObtained := time.Now()
refreshOSSToken := func() error {
newToken, err := d.client.UploadGetToken(ctx)
if err != nil {
return err
}
newClient, err := netutil.NewOSSClient(newToken.Endpoint, newToken.AccessKeyId, newToken.AccessKeySecret, oss.SecurityToken(newToken.SecurityToken))
if err != nil {
return err
}
newBucket, err := newClient.Bucket(initResp.Bucket)
if err != nil {
return err
}
bucket = newBucket
tokenObtained = time.Now()
return nil
}

fileSize := stream.GetSize()
chunkSize := calPartSize(fileSize)
ss, err := streamPkg.NewStreamSectionReader(stream, int(chunkSize), &up)
Expand All @@ -100,6 +124,13 @@ func (d *Open115) multpartUpload(ctx context.Context, stream model.FileStreamer,
return ctx.Err()
}

// The token lives ~1 hour; refresh after 45 minutes to keep a safe margin.
if time.Since(tokenObtained) > 45*time.Minute {
if err := refreshOSSToken(); err != nil {
return err
}
}

partSize := chunkSize
if i == partNum {
partSize = fileSize - (i-1)*chunkSize
Expand Down