Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions google/cloud/storage/internal/async/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,6 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
auto current = internal::MakeImmutableOptions(std::move(p.options));
auto request = p.request;
std::int64_t persisted_size = 0;
std::shared_ptr<storage::internal::HashFunction> hash_function =
CreateHashFunction(*current);
auto retry =
std::shared_ptr<storage::AsyncRetryPolicy>(retry_policy(*current));
auto backoff =
Expand Down Expand Up @@ -404,17 +402,28 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
auto pending = factory(std::move(request));
return pending.then(
[current, request = std::move(p.request), persisted_size,
hash = std::move(hash_function), fa = std::move(factory)](auto f) mutable
-> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
fa = std::move(factory)](auto f) mutable
-> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
auto rpc = f.get();
if (!rpc) return std::move(rpc).status();

std::shared_ptr<storage::internal::HashFunction> hash;
std::unique_ptr<AsyncWriterConnectionImpl> impl;

if (rpc->first_response.has_resource()) {
auto const& resource = rpc->first_response.resource();
if (current->get<storage::EnableCrc32cValidationOption>() &&
resource.has_checksums() && resource.checksums().has_crc32c()) {
hash = std::make_shared<storage::internal::Crc32cHashFunction>(
resource.checksums().crc32c(), resource.size());
} else {
hash = CreateHashFunction(*current);
}
impl = std::make_unique<AsyncWriterConnectionImpl>(
current, request, std::move(rpc->stream), hash,
rpc->first_response.resource(), false);
current, request, std::move(rpc->stream), hash, resource, false);
} else {
persisted_size = rpc->first_response.persisted_size();
hash = CreateHashFunction(*current);
impl = std::make_unique<AsyncWriterConnectionImpl>(
current, request, std::move(rpc->stream), hash, persisted_size,
false);
Expand Down Expand Up @@ -461,7 +470,7 @@ AsyncConnectionImpl::StartBufferedUpload(UploadParams p) {
return StartUnbufferedUpload(std::move(p))
.then([current = std::move(current),
async_write_object = std::move(async_write_object)](auto f) mutable
-> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
-> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
auto w = f.get();
if (!w) return std::move(w).status();
auto factory = [upload_id = (*w)->UploadId(),
Expand Down Expand Up @@ -495,14 +504,15 @@ AsyncConnectionImpl::ResumeBufferedUpload(ResumeUploadParams p) {
};

auto f = make_unbuffered();
return f.then([current = std::move(current),
make_unbuffered = std::move(make_unbuffered)](auto f) mutable
-> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
auto w = f.get();
if (!w) return std::move(w).status();
return MakeWriterConnectionBuffered(std::move(make_unbuffered),
*std::move(w), *current);
});
return f.then(
[current = std::move(current),
make_unbuffered = std::move(make_unbuffered)](auto f) mutable
-> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
auto w = f.get();
if (!w) return std::move(w).status();
return MakeWriterConnectionBuffered(std::move(make_unbuffered),
*std::move(w), *current);
});
}

future<StatusOr<google::storage::v2::Object>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/storage/async/options.h"
#include "google/cloud/storage/async/retry_policy.h"
#include "google/cloud/storage/async/writer_connection.h"
#include "google/cloud/storage/internal/async/connection_impl.h"
Expand Down Expand Up @@ -627,6 +628,135 @@ TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadRedirectNoHandle) {
next.first.set_value(true);
}

TEST_F(AsyncConnectionImplAppendableTest,
StartAppendableObjectUploadWithChecksum) {
auto constexpr kRequestText = R"pb(
write_object_spec {
resource {
bucket: "projects/_/buckets/test-bucket"
name: "test-object"
content_type: "text/plain"
}
}
)pb";
AsyncSequencer<bool> sequencer;
auto mock = std::make_shared<storage::testing::MockStorageStub>();

google::storage::v2::Object initial_resource;
initial_resource.set_bucket("projects/_/buckets/test-bucket");
initial_resource.set_name("test-object");
initial_resource.set_size(1024);
initial_resource.mutable_checksums()->set_crc32c(12345); // Some dummy CRC

auto stream = std::make_unique<MockAsyncBidiWriteObjectStream>();
EXPECT_CALL(*stream, Start).WillOnce([&] {
return sequencer.PushBack("Start");
});

EXPECT_CALL(*stream, Read)
.WillOnce([&, initial_resource] {
return sequencer.PushBack("Read(Takeover)")
.then([initial_resource](auto) {
auto response = google::storage::v2::BidiWriteObjectResponse{};
*response.mutable_resource() = initial_resource;
return absl::make_optional(std::move(response));
});
})
.WillOnce([&, initial_resource] {
return sequencer.PushBack("Read(FinalObject)")
.then([initial_resource](auto) {
auto response = google::storage::v2::BidiWriteObjectResponse{};
*response.mutable_resource() = initial_resource;
response.mutable_resource()->set_size(
initial_resource.size() + 9); // "some data" size is 9
return absl::make_optional(std::move(response));
});
});

EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Finish).WillOnce([&] {
return sequencer.PushBack("Finish").then([](auto) { return Status{}; });
});

EXPECT_CALL(*stream, Write)
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions wopt) {
EXPECT_TRUE(request.state_lookup());
EXPECT_FALSE(wopt.is_last_message());
return sequencer.PushBack("Write(StateLookup)");
})
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions wopt) {
EXPECT_FALSE(wopt.is_last_message());
return sequencer.PushBack("Write(data)");
})
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions wopt) {
EXPECT_TRUE(request.finish_write());
EXPECT_TRUE(wopt.is_last_message());
// Here we expect full checksums to be set because we had the resource
// in takeover.
EXPECT_TRUE(request.has_object_checksums());
return sequencer.PushBack("Write(Finalize)");
});

EXPECT_CALL(*mock, AsyncBidiWriteObject).WillOnce([&] {
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
});

internal::AutomaticallyCreatedBackgroundThreads pool(1);
// Enable CRC32C validation in options
auto options = TestOptions().set<storage::EnableCrc32cValidationOption>(true);
auto connection = MakeTestConnection(pool.cq(), mock, options);

auto request = google::storage::v2::BidiWriteObjectRequest{};
ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request));
request.mutable_write_object_spec()->set_appendable(true);

auto pending = connection->StartAppendableObjectUpload(
{std::move(request), connection->options()});

auto next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Start");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write(StateLookup)");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read(Takeover)");
next.first.set_value(true);

auto r = pending.get();
ASSERT_STATUS_OK(r);
auto writer = *std::move(r);

// Write some data.
auto w1 = writer->Write(storage::WritePayload("some data"));
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write(data)");
next.first.set_value(true);
EXPECT_STATUS_OK(w1.get());

// Finalize the upload.
auto w2 = writer->Finalize({});
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write(Finalize)");
next.first.set_value(true);
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read(FinalObject)");
next.first.set_value(true);

auto response = w2.get();
ASSERT_STATUS_OK(response);

writer.reset();
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Finish");
next.first.set_value(true);
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
Expand Down
31 changes: 27 additions & 4 deletions google/cloud/storage/internal/async/writer_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,14 @@ AsyncWriterConnectionImpl::Finalize(storage::WritePayload payload) {

auto p = WritePayloadImpl::GetImpl(payload);
auto size = p.size();
auto action = request_.has_append_object_spec() ||
request_.write_object_spec().appendable()
? PartialUpload::kFinalize
: PartialUpload::kFinalizeWithChecksum;
auto action = PartialUpload::kFinalizeWithChecksum;
if (request_.has_append_object_spec() ||
request_.write_object_spec().appendable()) {
if (!absl::holds_alternative<google::storage::v2::Object>(
persisted_state_)) {
action = PartialUpload::kFinalize;
}
}
auto coro = PartialUpload::Call(impl_, hash_function_, std::move(write),
std::move(p), std::move(action));
return coro->Start().then([coro, size, this](auto f) mutable {
Expand Down Expand Up @@ -256,7 +260,26 @@ future<StatusOr<std::int64_t>> AsyncWriterConnectionImpl::OnQuery(
latest_write_handle_ = response->write_handle();
}
if (response->has_persisted_size()) {
absl::optional<google::storage::v2::Object> old_obj;
if (absl::holds_alternative<google::storage::v2::Object>(
persisted_state_)) {
old_obj = absl::get<google::storage::v2::Object>(persisted_state_);
}

persisted_state_ = response->persisted_size();

if (response->has_persisted_data_checksums()) {
auto const& checksums = response->persisted_data_checksums();
if (checksums.has_crc32c()) {
google::storage::v2::Object obj;
obj.set_size(response->persisted_size());
*obj.mutable_checksums() = checksums;
if (old_obj) {
obj.set_generation(old_obj->generation());
}
persisted_state_ = obj;
}
}
return make_ready_future(make_status_or(response->persisted_size()));
}
if (response->has_resource()) {
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/storage/internal/hash_function_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class MD5HashFunction : public HashFunction {
class Crc32cHashFunction : public HashFunction {
public:
Crc32cHashFunction() = default;
explicit Crc32cHashFunction(std::uint32_t initial_crc, std::int64_t initial_offset)
: current_(initial_crc), minimum_offset_(initial_offset) {}

Crc32cHashFunction(Crc32cHashFunction const&) = delete;
Crc32cHashFunction& operator=(Crc32cHashFunction const&) = delete;
Expand Down
Loading