feat: implement DataWriter for Iceberg data files#552
Conversation
8944a75 to
a201953
Compare
src/iceberg/data/data_writer.cc
Outdated
|
|
||
| ICEBERG_ASSIGN_OR_RAISE(writer_, | ||
| WriterFactoryRegistry::Open(options_.format, writer_options)); | ||
| return {}; |
There was a problem hiding this comment.
It is odd that an empty structure is always returned. Also, since this is initialization why not doing in the ctor?
There was a problem hiding this comment.
Refactored the initialization logic
| if (closed_) { | ||
| return InvalidArgument("Writer already closed"); | ||
| } |
There was a problem hiding this comment.
I could see a case for making close idempotent, is there any strong reason why we want to return this error instead of no op for example?
| return InvalidArgument("Writer already closed"); | ||
| } | ||
| ICEBERG_RETURN_UNEXPECTED(writer_->Close()); | ||
| closed_ = true; |
There was a problem hiding this comment.
Should this class address thread safety?
There was a problem hiding this comment.
Good question! I've added explicit documentation that this class is not thread-safe:
There was a problem hiding this comment.
I don't think a single writer (or reader) should support thread safety so it is fine not to add comment like this.
There was a problem hiding this comment.
@wgtmac out of curiosity for my own knowledge, what guarantees that a single writer/reader will be using the class?
There was a problem hiding this comment.
These file writers are supposed to be used by a single write task, which for example can be an unit of a table sink operator in a sql job plan. Usually the writer is responsible for partitioned (and sometimes sorted) data chunks.
There was a problem hiding this comment.
Agreed. Removed the thread safety comment from the header.
src/iceberg/test/data_writer_test.cc
Outdated
| TEST_F(DataWriterTest, CreateWithParquetFormat) { | ||
| DataWriterOptions options{ | ||
| .path = "test_data.parquet", | ||
| .schema = schema_, | ||
| .spec = partition_spec_, | ||
| .partition = PartitionValues{}, | ||
| .format = FileFormatType::kParquet, | ||
| .io = file_io_, | ||
| .properties = {{"write.parquet.compression-codec", "uncompressed"}}, | ||
| }; | ||
|
|
||
| auto writer_result = DataWriter::Make(options); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
| ASSERT_NE(writer, nullptr); | ||
| } | ||
|
|
||
| TEST_F(DataWriterTest, CreateWithAvroFormat) { | ||
| DataWriterOptions options{ | ||
| .path = "test_data.avro", | ||
| .schema = schema_, | ||
| .spec = partition_spec_, | ||
| .partition = PartitionValues{}, | ||
| .format = FileFormatType::kAvro, | ||
| .io = file_io_, | ||
| }; | ||
|
|
||
| auto writer_result = DataWriter::Make(options); | ||
| ASSERT_THAT(writer_result, IsOk()); | ||
| auto writer = std::move(writer_result.value()); | ||
| ASSERT_NE(writer, nullptr); | ||
| } |
There was a problem hiding this comment.
nit: The two tests are quite similar, it is probably possible to leverage a function to reduce duplication
There was a problem hiding this comment.
Consolidated the two tests using parameterized testing.
| // Check length before close | ||
| auto length_result = writer->Length(); | ||
| ASSERT_THAT(length_result, IsOk()); | ||
| EXPECT_GT(length_result.value(), 0); |
There was a problem hiding this comment.
nit: check the size of the data passed to the write function?
src/iceberg/data/data_writer.cc
Outdated
| } | ||
|
|
||
| Result<FileWriter::WriteResult> Metadata() { | ||
| if (!closed_) { |
There was a problem hiding this comment.
nit: use ICEBERG_CHECK here
src/iceberg/test/data_writer_test.cc
Outdated
| EXPECT_GT(length.value(), 0); | ||
| } | ||
|
|
||
| } // namespace |
There was a problem hiding this comment.
nit: move this closing namespace curly before the first TEST_F?
90d324e to
153d763
Compare
Implements DataWriter class for writing Iceberg data files as part of issue apache#441 (task 2). Implementation: - Static factory method DataWriter::Make() for creating writer instances - Support for Parquet and Avro file formats via WriterFactoryRegistry - Complete DataFile metadata generation including partition info, column statistics, serialized bounds, and sort order ID - Proper lifecycle management with Write/Close/Metadata methods - Idempotent Close() - multiple calls succeed (no-op after first) - PIMPL idiom for ABI stability - Not thread-safe (documented) Tests: - 13 comprehensive unit tests including parameterized format tests - Coverage: creation, write/close lifecycle, metadata generation, error handling, feature validation, and data size verification - All tests passing (13/13) Related to apache#441
153d763 to
147f25b
Compare
| return InvalidArgument("Writer already closed"); | ||
| } | ||
| ICEBERG_RETURN_UNEXPECTED(writer_->Close()); | ||
| closed_ = true; |
There was a problem hiding this comment.
I don't think a single writer (or reader) should support thread safety so it is fine not to add comment like this.
src/iceberg/data/data_writer.cc
Outdated
| } | ||
|
|
||
| Result<FileWriter::WriteResult> Metadata() { | ||
| ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer"); |
There was a problem hiding this comment.
| ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer"); | |
| ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer"); |
We should return invalid state instead of invalid argument in this case.
- Use aggregate initialization for WriterOptions and DataFile - Change ICEBERG_PRECHECK(writer_) to ICEBERG_DCHECK (can never fail) - Use ICEBERG_CHECK for closed state check (returns ValidationFailed) - Use value_or(-1) for missing row count to match Java impl - Use range constructors for metrics map conversion - Remove unnecessary thread safety comment - Use int32()/string() factory functions in tests - Consolidate test cases and add helpers to reduce boilerplate
wgtmac
left a comment
There was a problem hiding this comment.
LGTM. Thanks @shangxinli for working on this and @evindj for the review!
Implements DataWriter class for writing Iceberg data files as part of issue #441 (task 2).
Implementation:
Related to #441