Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ This product includes code from Apache Iceberg C++.
* Avro direct decoder/encoder:
* src/paimon/format/avro/avro_direct_decoder.cpp
* src/paimon/format/avro/avro_direct_decoder.h
* src/paimon/format/avro/avro_direct_encoder.cpp
* src/paimon/format/avro/avro_direct_encoder.h

Copyright: 2024-2025 The Apache Software Foundation.
Home page: https://iceberg.apache.org/
Expand Down
2 changes: 1 addition & 1 deletion cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
define_option_string(PAIMON_CXXFLAGS "Compiler flags to append when compiling Paimon"
"")

define_option(PAIMON_BUILD_STATIC "Build static libraries" OFF)
define_option(PAIMON_BUILD_STATIC "Build static libraries" ON)

define_option(PAIMON_BUILD_SHARED "Build shared libraries" ON)
#----------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ struct PAIMON_EXPORT Options {
static const char MANIFEST_TARGET_FILE_SIZE[];

/// "manifest.format" - Specify the message format of manifest files.
/// Default value is orc.
/// Default value is avro.
static const char MANIFEST_FORMAT[];

/// "manifest.compression" - File compression for manifest, default value is zstd.
Expand Down
7 changes: 3 additions & 4 deletions include/paimon/read_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "paimon/result.h"
#include "paimon/type_fwd.h"
#include "paimon/utils/read_ahead_cache.h"
#include "paimon/utils/special_field_ids.h"
#include "paimon/visibility.h"

namespace paimon {
Expand Down Expand Up @@ -179,9 +178,9 @@ class PAIMON_EXPORT ReadContextBuilder {
/// @param read_field_ids Vector of field ids to read from the table.
/// @return Reference to this builder for method chaining.
/// @note Currently supports top-level field selection. Future versions may support
/// nested field selection using ArrowSchema for more granular projection,
/// If SetReadFieldIds() call and SetReadSchema() are natually are mutually
/// exclusive. Calling both will ignore the read schema set by SetReadSchema().
/// nested field selection using ArrowSchema for more granular projection.
/// @note SetReadFieldIds() and SetReadSchema() are mutually exclusive.
/// Calling both will ignore the read schema set by SetReadSchema().
ReadContextBuilder& SetReadFieldIds(const std::vector<int32_t>& read_field_ids);

/// Set a configuration options map to set some option entries which are not defined in the
Expand Down
2 changes: 1 addition & 1 deletion include/paimon/reader/file_batch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class PAIMON_EXPORT FileBatchReader : public BatchReader {
virtual uint64_t GetPreviousBatchFirstRowNumber() const = 0;

/// Get the number of rows in the file.
virtual uint64_t GetNumberOfRows() const = 0;
virtual Result<uint64_t> GetNumberOfRows() const = 0;

/// Get whether or not support read precisely while bitmap pushed down.
virtual bool SupportPreciseBitmapSelection() const = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/reader/delegating_prefetch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class DelegatingPrefetchReader : public FileBatchReader {
return GetReader()->GetPreviousBatchFirstRowNumber();
}

uint64_t GetNumberOfRows() const override {
Result<uint64_t> GetNumberOfRows() const override {
return GetReader()->GetNumberOfRows();
}

Expand Down
25 changes: 16 additions & 9 deletions src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,11 @@ Status PrefetchFileBatchReaderImpl::SetReadRanges(
read_ranges_.push_back(read_range);
}
// Note: add a special read range out of file row count, for trigger an EOF access.
read_ranges_.push_back(EofRange());
std::pair<uint64_t, uint64_t> eof_range;
PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
read_ranges_.push_back(eof_range);
for (auto& read_ranges : read_ranges_in_group_) {
read_ranges.push_back(EofRange());
read_ranges.push_back(eof_range);
}
return Status::OK();
}
Expand Down Expand Up @@ -420,7 +422,9 @@ Status PrefetchFileBatchReaderImpl::HandleReadResult(
}
prefetch_queue->push({read_range, std::move(read_batch_with_bitmap), first_row_number});
} else {
prefetch_queue->push({EofRange(), std::move(read_batch_with_bitmap), first_row_number});
std::pair<uint64_t, uint64_t> eof_range;
PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), first_row_number});
readers_pos_[reader_idx]->store(std::numeric_limits<uint64_t>::max());
}
return Status::OK();
Expand Down Expand Up @@ -490,7 +494,8 @@ Result<BatchReader::ReadBatchWithBitmap> PrefetchFileBatchReaderImpl::NextBatchW
}
}
value_count++;
if (IsEofRange(peek_batch->read_range)) {
PAIMON_ASSIGN_OR_RAISE(bool is_eof_range, IsEofRange(peek_batch->read_range));
if (is_eof_range) {
eof_count++;
continue;
}
Expand Down Expand Up @@ -550,7 +555,7 @@ uint64_t PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() const {
return previous_batch_first_row_num_;
}

uint64_t PrefetchFileBatchReaderImpl::GetNumberOfRows() const {
Result<uint64_t> PrefetchFileBatchReaderImpl::GetNumberOfRows() const {
assert(!readers_.empty());
return readers_[0]->GetNumberOfRows();
}
Expand All @@ -569,13 +574,15 @@ Status PrefetchFileBatchReaderImpl::GetReadStatus() const {
std::shared_lock<std::shared_mutex> lock(rw_mutex_);
return read_status_;
}
bool PrefetchFileBatchReaderImpl::IsEofRange(
Result<bool> PrefetchFileBatchReaderImpl::IsEofRange(
const std::pair<uint64_t, uint64_t>& read_range) const {
return read_range.first >= GetNumberOfRows();
PAIMON_ASSIGN_OR_RAISE(uint64_t num_rows, GetNumberOfRows());
return read_range.first >= num_rows;
}

std::pair<uint64_t, uint64_t> PrefetchFileBatchReaderImpl::EofRange() const {
return {GetNumberOfRows(), GetNumberOfRows() + 1};
Result<std::pair<uint64_t, uint64_t>> PrefetchFileBatchReaderImpl::EofRange() const {
PAIMON_ASSIGN_OR_RAISE(uint64_t num_rows, GetNumberOfRows());
return std::make_pair(num_rows, num_rows + 1);
}

void PrefetchFileBatchReaderImpl::Close() {
Expand Down
6 changes: 3 additions & 3 deletions src/paimon/common/reader/prefetch_file_batch_reader_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {

Status SeekToRow(uint64_t row_number) override;
uint64_t GetPreviousBatchFirstRowNumber() const override;
uint64_t GetNumberOfRows() const override;
Result<uint64_t> GetNumberOfRows() const override;
uint64_t GetNextRowToRead() const override;
void Close() override;
Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) override;
Expand Down Expand Up @@ -117,7 +117,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
void Workloop();
void SetReadStatus(const Status& status);
Status GetReadStatus() const;
bool IsEofRange(const std::pair<uint64_t, uint64_t>& read_range) const;
Result<bool> IsEofRange(const std::pair<uint64_t, uint64_t>& read_range) const;
Status DoReadBatch(size_t reader_idx);
void ReadBatch(size_t reader_idx);
size_t GetEnabledReaderSize() const;
Expand All @@ -128,7 +128,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
static std::vector<std::vector<std::pair<uint64_t, uint64_t>>> DispatchReadRanges(
const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges, size_t reader_count);

std::pair<uint64_t, uint64_t> EofRange() const;
Result<std::pair<uint64_t, uint64_t>> EofRange() const;
std::optional<std::pair<uint64_t, uint64_t>> GetCurrentReadRange(size_t reader_idx) const;
Status EnsureReaderPosition(size_t reader_idx,
const std::pair<uint64_t, uint64_t>& read_range) const;
Expand Down
7 changes: 5 additions & 2 deletions src/paimon/common/utils/arrow/arrow_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@ class ArrowUtils {
return std::make_shared<arrow::Schema>(struct_type->fields());
}

static std::vector<int32_t> CreateProjection(
static Result<std::vector<int32_t>> CreateProjection(
const std::shared_ptr<::arrow::Schema>& file_schema,
const arrow::FieldVector& read_fields) {
std::vector<int32_t> target_to_src_mapping;
target_to_src_mapping.reserve(read_fields.size());
for (const auto& field : read_fields) {
auto src_field_idx = file_schema->GetFieldIndex(field->name());
assert(src_field_idx >= 0);
if (src_field_idx < 0) {
return Status::Invalid(
fmt::format("Field '{}' not found or duplicate in file schema", field->name()));
}
target_to_src_mapping.push_back(src_field_idx);
}
return target_to_src_mapping;
Expand Down
101 changes: 82 additions & 19 deletions src/paimon/common/utils/arrow/arrow_utils_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,93 @@

#include "arrow/api.h"
#include "gtest/gtest.h"
#include "paimon/common/types/data_field.h"
#include "paimon/testing/utils/testharness.h"

namespace paimon::test {

TEST(ArrowUtilsTest, TestCreateProjection) {
std::vector<DataField> read_fields = {DataField(1, arrow::field("k1", arrow::int32())),
DataField(3, arrow::field("p1", arrow::int32())),
DataField(5, arrow::field("s1", arrow::utf8())),
DataField(6, arrow::field("v0", arrow::float64())),
DataField(7, arrow::field("v1", arrow::boolean()))};
auto read_schema = DataField::ConvertDataFieldsToArrowSchema(read_fields);
arrow::FieldVector file_fields = {
arrow::field("k0", arrow::int32()), arrow::field("k1", arrow::int32()),
arrow::field("p1", arrow::int32()), arrow::field("s1", arrow::utf8()),
arrow::field("v0", arrow::float64()), arrow::field("v1", arrow::boolean()),
arrow::field("s0", arrow::utf8())};
auto file_schema = arrow::schema(file_fields);

std::vector<DataField> file_fields = {DataField(0, arrow::field("k0", arrow::int32())),
DataField(1, arrow::field("k1", arrow::int32())),
DataField(3, arrow::field("p1", arrow::int32())),
DataField(5, arrow::field("s1", arrow::utf8())),
DataField(6, arrow::field("v0", arrow::float64())),
DataField(7, arrow::field("v1", arrow::boolean())),
DataField(4, arrow::field("s0", arrow::utf8()))};
auto file_schema = DataField::ConvertDataFieldsToArrowSchema(file_fields);

auto projection = ArrowUtils::CreateProjection(file_schema, read_schema->fields());
std::vector<int32_t> expected_projection = {1, 2, 3, 4, 5};
ASSERT_EQ(projection, expected_projection);
{
// normal case
arrow::FieldVector read_fields = {
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
arrow::field("s1", arrow::utf8()), arrow::field("v0", arrow::float64()),
arrow::field("v1", arrow::boolean())};
auto read_schema = arrow::schema(read_fields);
ASSERT_OK_AND_ASSIGN(std::vector<int32_t> projection,
ArrowUtils::CreateProjection(file_schema, read_schema->fields()));
std::vector<int32_t> expected_projection = {1, 2, 3, 4, 5};
ASSERT_EQ(projection, expected_projection);
}
{
// duplicate read field
arrow::FieldVector read_fields = {
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
arrow::field("s1", arrow::utf8()), arrow::field("v0", arrow::float64()),
arrow::field("v0", arrow::float64()), arrow::field("v1", arrow::boolean())};
auto read_schema = arrow::schema(read_fields);
ASSERT_OK_AND_ASSIGN(std::vector<int32_t> projection,
ArrowUtils::CreateProjection(file_schema, read_schema->fields()));
std::vector<int32_t> expected_projection = {1, 2, 3, 4, 4, 5};
ASSERT_EQ(projection, expected_projection);
}
{
// duplicate read field, and sizeof(read_fields) > sizeof(file_fields)
arrow::FieldVector read_fields = {
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
arrow::field("s1", arrow::utf8()), arrow::field("v0", arrow::float64()),
arrow::field("v0", arrow::float64()), arrow::field("v0", arrow::float64()),
arrow::field("v0", arrow::float64()), arrow::field("v0", arrow::float64()),
arrow::field("v1", arrow::boolean())};
auto read_schema = arrow::schema(read_fields);
ASSERT_OK_AND_ASSIGN(std::vector<int32_t> projection,
ArrowUtils::CreateProjection(file_schema, read_schema->fields()));
std::vector<int32_t> expected_projection = {1, 2, 3, 4, 4, 4, 4, 4, 5};
ASSERT_EQ(projection, expected_projection);
}
{
// read field not found in file schema
arrow::FieldVector read_fields = {
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
arrow::field("s1", arrow::utf8()), arrow::field("v2", arrow::float64()),
arrow::field("v1", arrow::boolean())};
auto read_schema = arrow::schema(read_fields);
ASSERT_NOK_WITH_MSG(ArrowUtils::CreateProjection(file_schema, read_schema->fields()),
"Field 'v2' not found or duplicate in file schema");
}
{
// duplicate field in file schema
arrow::FieldVector file_fields_dup = {
arrow::field("k0", arrow::int32()), arrow::field("k1", arrow::int32()),
arrow::field("p1", arrow::int32()), arrow::field("s1", arrow::utf8()),
arrow::field("v0", arrow::float64()), arrow::field("v1", arrow::boolean()),
arrow::field("v1", arrow::boolean()), arrow::field("s0", arrow::utf8())};
auto file_schema_dup = arrow::schema(file_fields_dup);
arrow::FieldVector read_fields = {
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
arrow::field("s1", arrow::utf8()), arrow::field("v1", arrow::float64()),
arrow::field("v1", arrow::boolean())};
auto read_schema = arrow::schema(read_fields);
ASSERT_NOK_WITH_MSG(ArrowUtils::CreateProjection(file_schema_dup, read_schema->fields()),
"Field 'v1' not found or duplicate in file schema");
}
{
arrow::FieldVector read_fields = {
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
arrow::field("s1", arrow::utf8()), arrow::field("v0", arrow::float64()),
arrow::field("v1", arrow::boolean())};
auto read_schema = arrow::schema(read_fields);
ASSERT_OK_AND_ASSIGN(std::vector<int32_t> projection,
ArrowUtils::CreateProjection(file_schema, read_schema->fields()));
std::vector<int32_t> expected_projection = {1, 2, 3, 4, 5};
ASSERT_EQ(projection, expected_projection);
}
}

} // namespace paimon::test
16 changes: 16 additions & 0 deletions src/paimon/common/utils/date_time_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,22 @@ class DateTimeUtils {
return tz ? tz->name() : "UTC";
}

static std::string GetArrowTimeUnitStr(arrow::TimeUnit::type unit) {
switch (unit) {
case arrow::TimeUnit::SECOND:
return "SECOND";
case arrow::TimeUnit::MILLI:
return "MILLISECOND";
case arrow::TimeUnit::MICRO:
return "MICROSECOND";
case arrow::TimeUnit::NANO:
return "NANOSECOND";
default:
break;
}
return "UNKNOWN";
}

// there may be a precision loss for nano
static Result<Timestamp> ToUTCTimestamp(const Timestamp& timestamp) {
int64_t micro_second = timestamp.ToMicrosecond();
Expand Down
5 changes: 1 addition & 4 deletions src/paimon/core/core_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,8 @@ Result<CoreOptions> CoreOptions::FromMap(
// Parse file format and file system configurations
PAIMON_RETURN_NOT_OK(parser.ParseObject<FileFormatFactory>(
Options::FILE_FORMAT, /*default_identifier=*/"parquet", &impl->file_format));
if (impl->file_format->Identifier() == "avro") {
return Status::NotImplemented("not support avro as file format");
}
PAIMON_RETURN_NOT_OK(parser.ParseObject<FileFormatFactory>(
Options::MANIFEST_FORMAT, /*default_identifier=*/"orc", &impl->manifest_file_format));
Options::MANIFEST_FORMAT, /*default_identifier=*/"avro", &impl->manifest_file_format));
PAIMON_RETURN_NOT_OK(parser.ParseFileSystem(fs_scheme_to_identifier_map, specified_file_system,
&impl->file_system));

Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/core_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace paimon::test {

TEST(CoreOptionsTest, TestDefaultValue) {
ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap({}));
ASSERT_EQ(core_options.GetManifestFormat()->Identifier(), "orc");
ASSERT_EQ(core_options.GetManifestFormat()->Identifier(), "avro");
ASSERT_EQ(core_options.GetWriteFileFormat()->Identifier(), "parquet");
ASSERT_TRUE(core_options.GetFileSystem());
ASSERT_EQ(-1, core_options.GetBucket());
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/io/complete_row_tracking_fields_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class CompleteRowTrackingFieldsBatchReader : public FileBatchReader {
return reader_->GetPreviousBatchFirstRowNumber();
}

uint64_t GetNumberOfRows() const override {
Result<uint64_t> GetNumberOfRows() const override {
return reader_->GetNumberOfRows();
}

Expand Down
1 change: 0 additions & 1 deletion src/paimon/core/operation/abstract_split_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ Result<std::unique_ptr<FileBatchReader>> AbstractSplitRead::CreateFileBatchReade
// lance do not support stream build with input stream
return reader_builder->Build(data_file_path);
}
// TODO(zhanyu.fyh): orc format support prefetch
if (context_->EnablePrefetch() && file_format_identifier != "blob" &&
file_format_identifier != "avro") {
PAIMON_ASSIGN_OR_RAISE(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class KeyValueFileStoreScanTest : public testing::Test {
Result<std::unique_ptr<KeyValueFileStoreScan>> CreateFileStoreScan(
const std::string& table_path, const std::shared_ptr<ScanFilter>& scan_filter,
int32_t table_schema_id, int32_t snapshot_id) const {
std::map<std::string, std::string> options_map = {};
std::map<std::string, std::string> options_map = {{Options::MANIFEST_FORMAT, "orc"}};
PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, CoreOptions::FromMap(options_map));
auto fs = core_options.GetFileSystem();
auto schema_manager = std::make_shared<SchemaManager>(fs, table_path);
Expand Down
5 changes: 3 additions & 2 deletions src/paimon/core/operation/merge_file_split_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ Result<std::unique_ptr<MergeFileSplitRead>> MergeFileSplitRead::Create(
int32_t key_arity = trimmed_primary_key.size();

// projection is the mapping from value_schema in KeyValue object to raw_read_schema
std::vector<int32_t> projection =
ArrowUtils::CreateProjection(value_schema, context->GetReadSchema()->fields());
PAIMON_ASSIGN_OR_RAISE(
std::vector<int32_t> projection,
ArrowUtils::CreateProjection(value_schema, context->GetReadSchema()->fields()));

return std::unique_ptr<MergeFileSplitRead>(new MergeFileSplitRead(
path_factory, context,
Expand Down
3 changes: 2 additions & 1 deletion src/paimon/core/operation/orphan_files_cleaner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,14 @@ Result<std::unique_ptr<OrphanFilesCleaner>> OrphanFilesCleaner::Create(
if (!table_schema.value()->PrimaryKeys().empty()) {
return Status::NotImplemented("orphan files cleaner only support append table");
}
// merge options
const auto& schema = table_schema.value();
auto opts = schema->Options();
for (const auto& [key, value] : ctx->GetOptions()) {
opts[key] = value;
}
PAIMON_ASSIGN_OR_RAISE(CoreOptions options,
CoreOptions::FromMap(ctx->GetOptions(), ctx->GetSpecificFileSystem()));
CoreOptions::FromMap(opts, ctx->GetSpecificFileSystem()));
auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(schema->Fields());
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> external_paths, options.CreateExternalPaths());
PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> global_index_external_path,
Expand Down
Loading
Loading