Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,18 @@ Result<std::unique_ptr<LocationProvider>> Table::location_provider() const {
return LocationProvider::Make(metadata_->location, metadata_->properties);
}

Result<std::unique_ptr<TableScanBuilder>> Table::NewScan() const {
return TableScanBuilder::Make(metadata_, io_);
Result<std::unique_ptr<DataTableScanBuilder>> Table::NewScan() const {
return DataTableScanBuilder::Make(metadata_, io_);
}

Result<std::unique_ptr<IncrementalAppendScanBuilder>> Table::NewIncrementalAppendScan()
const {
return IncrementalAppendScanBuilder::Make(metadata_, io_);
}

Result<std::unique_ptr<IncrementalChangelogScanBuilder>>
Table::NewIncrementalChangelogScan() const {
return IncrementalChangelogScanBuilder::Make(metadata_, io_);
}

Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
Expand Down Expand Up @@ -247,7 +257,7 @@ Result<std::shared_ptr<StagedTable>> StagedTable::Make(

StagedTable::~StagedTable() = default;

Result<std::unique_ptr<TableScanBuilder>> StagedTable::NewScan() const {
Result<std::unique_ptr<DataTableScanBuilder>> StagedTable::NewScan() const {
return NotSupported("Cannot scan a staged table");
}

Expand Down
12 changes: 10 additions & 2 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,15 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
///
/// Once a table scan builder is created, it can be refined to project columns and
/// filter data.
virtual Result<std::unique_ptr<TableScanBuilder>> NewScan() const;
virtual Result<std::unique_ptr<DataTableScanBuilder>> NewScan() const;

/// \brief Create a new incremental append scan builder for this table
virtual Result<std::unique_ptr<IncrementalAppendScanBuilder>> NewIncrementalAppendScan()
const;

/// \brief Create a new incremental changelog scan builder for this table
virtual Result<std::unique_ptr<IncrementalChangelogScanBuilder>>
NewIncrementalChangelogScan() const;

/// \brief Create a new Transaction to commit multiple table operations at once.
virtual Result<std::shared_ptr<Transaction>> NewTransaction();
Expand Down Expand Up @@ -196,7 +204,7 @@ class ICEBERG_EXPORT StagedTable final : public Table {

Status Refresh() override { return {}; }

Result<std::unique_ptr<TableScanBuilder>> NewScan() const override;
Result<std::unique_ptr<DataTableScanBuilder>> NewScan() const override;

private:
using Table::Table;
Expand Down
198 changes: 153 additions & 45 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,39 +210,73 @@ Result<ArrowArrayStream> FileScanTask::ToArrow(
return MakeArrowArrayStream(std::move(reader));
}

Result<std::unique_ptr<TableScanBuilder>> TableScanBuilder::Make(
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io) {
// Template specialization for DataTableScan (default)
template <>
Result<std::unique_ptr<TableScanBuilder<DataTableScan>>>
TableScanBuilder<DataTableScan>::Make(std::shared_ptr<TableMetadata> metadata,
std::shared_ptr<FileIO> io) {
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
return std::unique_ptr<TableScanBuilder>(
new TableScanBuilder(std::move(metadata), std::move(io)));
return std::unique_ptr<TableScanBuilder<DataTableScan>>(
new TableScanBuilder<DataTableScan>(std::move(metadata), std::move(io)));
}

TableScanBuilder::TableScanBuilder(std::shared_ptr<TableMetadata> table_metadata,
std::shared_ptr<FileIO> file_io)
// Template specialization for IncrementalAppendScan
template <>
Result<std::unique_ptr<TableScanBuilder<IncrementalAppendScan>>>
TableScanBuilder<IncrementalAppendScan>::Make(std::shared_ptr<TableMetadata> metadata,
std::shared_ptr<FileIO> io) {
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
return std::unique_ptr<TableScanBuilder<IncrementalAppendScan>>(
new TableScanBuilder<IncrementalAppendScan>(std::move(metadata), std::move(io)));
}

// Template specialization for IncrementalChangelogScan
template <>
Result<std::unique_ptr<TableScanBuilder<IncrementalChangelogScan>>>
TableScanBuilder<IncrementalChangelogScan>::Make(std::shared_ptr<TableMetadata> metadata,
std::shared_ptr<FileIO> io) {
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
return std::unique_ptr<TableScanBuilder<IncrementalChangelogScan>>(
new TableScanBuilder<IncrementalChangelogScan>(std::move(metadata), std::move(io)));
}

template <typename ScanType>
TableScanBuilder<ScanType>::TableScanBuilder(
std::shared_ptr<TableMetadata> table_metadata, std::shared_ptr<FileIO> file_io)
: metadata_(std::move(table_metadata)), io_(std::move(file_io)) {}

TableScanBuilder& TableScanBuilder::Option(std::string key, std::string value) {
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::Option(std::string key,
std::string value) {
context_.options[std::move(key)] = std::move(value);
return *this;
}

TableScanBuilder& TableScanBuilder::Project(std::shared_ptr<Schema> schema) {
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::Project(
std::shared_ptr<Schema> schema) {
context_.projected_schema = std::move(schema);
return *this;
}

TableScanBuilder& TableScanBuilder::CaseSensitive(bool case_sensitive) {
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::CaseSensitive(
bool case_sensitive) {
context_.case_sensitive = case_sensitive;
return *this;
}

TableScanBuilder& TableScanBuilder::IncludeColumnStats() {
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::IncludeColumnStats() {
context_.return_column_stats = true;
return *this;
}

TableScanBuilder& TableScanBuilder::IncludeColumnStats(
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::IncludeColumnStats(
const std::vector<std::string>& requested_columns) {
context_.return_column_stats = true;
context_.columns_to_keep_stats.clear();
Expand All @@ -260,27 +294,35 @@ TableScanBuilder& TableScanBuilder::IncludeColumnStats(
return *this;
}

TableScanBuilder& TableScanBuilder::Select(const std::vector<std::string>& column_names) {
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::Select(
const std::vector<std::string>& column_names) {
context_.selected_columns = column_names;
return *this;
}

TableScanBuilder& TableScanBuilder::Filter(std::shared_ptr<Expression> filter) {
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::Filter(
std::shared_ptr<Expression> filter) {
context_.filter = std::move(filter);
return *this;
}

TableScanBuilder& TableScanBuilder::IgnoreResiduals() {
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::IgnoreResiduals() {
context_.ignore_residuals = true;
return *this;
}

TableScanBuilder& TableScanBuilder::MinRowsRequested(int64_t num_rows) {
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::MinRowsRequested(
int64_t num_rows) {
context_.min_rows_requested = num_rows;
return *this;
}

TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) {
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseSnapshot(int64_t snapshot_id) {
ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(),
"Cannot override snapshot, already set snapshot id={}",
context_.snapshot_id.value());
Expand All @@ -289,7 +331,8 @@ TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) {
return *this;
}

TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) {
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseRef(const std::string& ref) {
if (ref == SnapshotRef::kMainBranch) {
snapshot_schema_ = nullptr;
context_.snapshot_id.reset();
Expand All @@ -309,38 +352,61 @@ TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) {
return *this;
}

TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) {
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::AsOfTime(
int64_t timestamp_millis) {
auto time_point_ms = TimePointMsFromUnixMs(timestamp_millis);
ICEBERG_BUILDER_ASSIGN_OR_RETURN(
auto snapshot_id, SnapshotUtil::SnapshotIdAsOfTime(*metadata_, time_point_ms));
return UseSnapshot(snapshot_id);
}

TableScanBuilder& TableScanBuilder::FromSnapshot(
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) {
return AddError(NotImplemented("Incremental scan is not implemented"));
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive)
requires IsIncrementalScan<ScanType>
{
AddError(NotImplemented("Incremental scan is not implemented"));
return *this;
}

TableScanBuilder& TableScanBuilder::FromSnapshot([[maybe_unused]] const std::string& ref,
[[maybe_unused]] bool inclusive) {
return AddError(NotImplemented("Incremental scan is not implemented"));
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
const std::string& ref, bool inclusive)
requires IsIncrementalScan<ScanType>
{
AddError(NotImplemented("Incremental scan is not implemented"));
return *this;
}

TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] int64_t to_snapshot_id) {
return AddError(NotImplemented("Incremental scan is not implemented"));
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(int64_t to_snapshot_id)
requires IsIncrementalScan<ScanType>
{
AddError(NotImplemented("Incremental scan is not implemented"));
return *this;
}

TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] const std::string& ref) {
return AddError(NotImplemented("Incremental scan is not implemented"));
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(const std::string& ref)
requires IsIncrementalScan<ScanType>
{
AddError(NotImplemented("Incremental scan is not implemented"));
return *this;
}

TableScanBuilder& TableScanBuilder::UseBranch(const std::string& branch) {
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseBranch(
const std::string& branch)
requires IsIncrementalScan<ScanType>
{
context_.branch = branch;
return *this;
}

template <typename ScanType>
Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
TableScanBuilder::ResolveSnapshotSchema() {
TableScanBuilder<ScanType>::ResolveSnapshotSchema() {
if (snapshot_schema_ == nullptr) {
if (context_.snapshot_id.has_value()) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot,
Expand All @@ -355,22 +421,32 @@ TableScanBuilder::ResolveSnapshotSchema() {
return snapshot_schema_;
}

bool TableScanBuilder::IsIncrementalScan() const {
return context_.from_snapshot_id.has_value() || context_.to_snapshot_id.has_value();
}

Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
template <>
Result<std::unique_ptr<DataTableScan>> TableScanBuilder<DataTableScan>::Build() {
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
ICEBERG_RETURN_UNEXPECTED(context_.Validate());

if (IsIncrementalScan()) {
return NotImplemented("Incremental scan is not yet implemented");
}

ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema());
return DataTableScan::Make(metadata_, schema.get(), io_, std::move(context_));
}

template <>
Result<std::unique_ptr<IncrementalAppendScan>>
TableScanBuilder<IncrementalAppendScan>::Build() {
return NotImplemented("IncrementalAppendScanBuilder is not implemented");
}

template <>
Result<std::unique_ptr<IncrementalChangelogScan>>
TableScanBuilder<IncrementalChangelogScan>::Build() {
return NotImplemented("IncrementalChangelogScanBuilder is not implemented");
}

// Explicit template instantiations
template class TableScanBuilder<DataTableScan>;
template class TableScanBuilder<IncrementalAppendScan>;
template class TableScanBuilder<IncrementalChangelogScan>;

TableScan::TableScan(std::shared_ptr<TableMetadata> metadata,
std::shared_ptr<Schema> schema, std::shared_ptr<FileIO> file_io,
internal::TableScanContext context)
Expand Down Expand Up @@ -466,12 +542,6 @@ Result<std::unique_ptr<DataTableScan>> DataTableScan::Make(
std::move(metadata), std::move(schema), std::move(io), std::move(context)));
}

DataTableScan::DataTableScan(std::shared_ptr<TableMetadata> metadata,
std::shared_ptr<Schema> schema, std::shared_ptr<FileIO> io,
internal::TableScanContext context)
: TableScan(std::move(metadata), std::move(schema), std::move(io),
std::move(context)) {}

Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() const {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot());
if (!snapshot) {
Expand Down Expand Up @@ -501,4 +571,42 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
return manifest_group->PlanFiles();
}

template <typename ScanTaskType>
Result<std::vector<std::shared_ptr<ScanTaskType>>>
IncrementalScan<ScanTaskType>::PlanFiles() const {
return NotImplemented("IncrementalScan::PlanFiles is not implemented");
}

// IncrementalAppendScan implementation

Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
[[maybe_unused]] std::shared_ptr<Schema> schema,
[[maybe_unused]] std::shared_ptr<FileIO> io,
[[maybe_unused]] internal::TableScanContext context) {
return NotImplemented("IncrementalAppendScan is not implemented");
}

Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFiles(
std::optional<int64_t> from_snapshot_id_exclusive,
int64_t to_snapshot_id_inclusive) const {
return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
}

// IncrementalChangelogScan implementation

Result<std::unique_ptr<IncrementalChangelogScan>> IncrementalChangelogScan::Make(
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
[[maybe_unused]] std::shared_ptr<Schema> schema,
[[maybe_unused]] std::shared_ptr<FileIO> io,
[[maybe_unused]] internal::TableScanContext context) {
return NotImplemented("IncrementalChangelogScan is not implemented");
}

Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_exclusive,
int64_t to_snapshot_id_inclusive) const {
return NotImplemented("IncrementalChangelogScan::PlanFiles is not implemented");
}

} // namespace iceberg
Loading
Loading