Skip to content

Commit dc92ebe

Browse files
committed
feat: Add incremental scan API with IncrementalAppendScan and IncrementalChangelogScan
1 parent cd93b99 commit dc92ebe

File tree

6 files changed

+310
-95
lines changed

6 files changed

+310
-95
lines changed

src/iceberg/table.cc

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,18 @@ Result<std::unique_ptr<LocationProvider>> Table::location_provider() const {
149149
return LocationProvider::Make(metadata_->location, metadata_->properties);
150150
}
151151

152-
Result<std::unique_ptr<TableScanBuilder>> Table::NewScan() const {
153-
return TableScanBuilder::Make(metadata_, io_);
152+
Result<std::unique_ptr<DataTableScanBuilder>> Table::NewScan() const {
153+
return DataTableScanBuilder::Make(metadata_, io_);
154+
}
155+
156+
Result<std::unique_ptr<IncrementalAppendScanBuilder>> Table::NewIncrementalAppendScan()
157+
const {
158+
return IncrementalAppendScanBuilder::Make(metadata_, io_);
159+
}
160+
161+
Result<std::unique_ptr<IncrementalChangelogScanBuilder>>
162+
Table::NewIncrementalChangelogScan() const {
163+
return IncrementalChangelogScanBuilder::Make(metadata_, io_);
154164
}
155165

156166
Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
@@ -247,7 +257,7 @@ Result<std::shared_ptr<StagedTable>> StagedTable::Make(
247257

248258
StagedTable::~StagedTable() = default;
249259

250-
Result<std::unique_ptr<TableScanBuilder>> StagedTable::NewScan() const {
260+
Result<std::unique_ptr<DataTableScanBuilder>> StagedTable::NewScan() const {
251261
return NotSupported("Cannot scan a staged table");
252262
}
253263

src/iceberg/table.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,15 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
127127
///
128128
/// Once a table scan builder is created, it can be refined to project columns and
129129
/// filter data.
130-
virtual Result<std::unique_ptr<TableScanBuilder>> NewScan() const;
130+
virtual Result<std::unique_ptr<DataTableScanBuilder>> NewScan() const;
131+
132+
/// \brief Create a new incremental append scan builder for this table
133+
virtual Result<std::unique_ptr<IncrementalAppendScanBuilder>> NewIncrementalAppendScan()
134+
const;
135+
136+
/// \brief Create a new incremental changelog scan builder for this table
137+
virtual Result<std::unique_ptr<IncrementalChangelogScanBuilder>>
138+
NewIncrementalChangelogScan() const;
131139

132140
/// \brief Create a new Transaction to commit multiple table operations at once.
133141
virtual Result<std::shared_ptr<Transaction>> NewTransaction();
@@ -196,7 +204,7 @@ class ICEBERG_EXPORT StagedTable final : public Table {
196204

197205
Status Refresh() override { return {}; }
198206

199-
Result<std::unique_ptr<TableScanBuilder>> NewScan() const override;
207+
Result<std::unique_ptr<DataTableScanBuilder>> NewScan() const override;
200208

201209
private:
202210
using Table::Table;

src/iceberg/table_scan.cc

Lines changed: 153 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -210,39 +210,73 @@ Result<ArrowArrayStream> FileScanTask::ToArrow(
210210
return MakeArrowArrayStream(std::move(reader));
211211
}
212212

213-
Result<std::unique_ptr<TableScanBuilder>> TableScanBuilder::Make(
214-
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io) {
213+
// Template specialization for DataTableScan (default)
214+
template <>
215+
Result<std::unique_ptr<TableScanBuilder<DataTableScan>>>
216+
TableScanBuilder<DataTableScan>::Make(std::shared_ptr<TableMetadata> metadata,
217+
std::shared_ptr<FileIO> io) {
215218
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
216219
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
217-
return std::unique_ptr<TableScanBuilder>(
218-
new TableScanBuilder(std::move(metadata), std::move(io)));
220+
return std::unique_ptr<TableScanBuilder<DataTableScan>>(
221+
new TableScanBuilder<DataTableScan>(std::move(metadata), std::move(io)));
219222
}
220223

221-
TableScanBuilder::TableScanBuilder(std::shared_ptr<TableMetadata> table_metadata,
222-
std::shared_ptr<FileIO> file_io)
224+
// Template specialization for IncrementalAppendScan
225+
template <>
226+
Result<std::unique_ptr<TableScanBuilder<IncrementalAppendScan>>>
227+
TableScanBuilder<IncrementalAppendScan>::Make(std::shared_ptr<TableMetadata> metadata,
228+
std::shared_ptr<FileIO> io) {
229+
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
230+
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
231+
return std::unique_ptr<TableScanBuilder<IncrementalAppendScan>>(
232+
new TableScanBuilder<IncrementalAppendScan>(std::move(metadata), std::move(io)));
233+
}
234+
235+
// Template specialization for IncrementalChangelogScan
236+
template <>
237+
Result<std::unique_ptr<TableScanBuilder<IncrementalChangelogScan>>>
238+
TableScanBuilder<IncrementalChangelogScan>::Make(std::shared_ptr<TableMetadata> metadata,
239+
std::shared_ptr<FileIO> io) {
240+
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
241+
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
242+
return std::unique_ptr<TableScanBuilder<IncrementalChangelogScan>>(
243+
new TableScanBuilder<IncrementalChangelogScan>(std::move(metadata), std::move(io)));
244+
}
245+
246+
template <typename ScanType>
247+
TableScanBuilder<ScanType>::TableScanBuilder(
248+
std::shared_ptr<TableMetadata> table_metadata, std::shared_ptr<FileIO> file_io)
223249
: metadata_(std::move(table_metadata)), io_(std::move(file_io)) {}
224250

225-
TableScanBuilder& TableScanBuilder::Option(std::string key, std::string value) {
251+
template <typename ScanType>
252+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::Option(std::string key,
253+
std::string value) {
226254
context_.options[std::move(key)] = std::move(value);
227255
return *this;
228256
}
229257

230-
TableScanBuilder& TableScanBuilder::Project(std::shared_ptr<Schema> schema) {
258+
template <typename ScanType>
259+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::Project(
260+
std::shared_ptr<Schema> schema) {
231261
context_.projected_schema = std::move(schema);
232262
return *this;
233263
}
234264

235-
TableScanBuilder& TableScanBuilder::CaseSensitive(bool case_sensitive) {
265+
template <typename ScanType>
266+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::CaseSensitive(
267+
bool case_sensitive) {
236268
context_.case_sensitive = case_sensitive;
237269
return *this;
238270
}
239271

240-
TableScanBuilder& TableScanBuilder::IncludeColumnStats() {
272+
template <typename ScanType>
273+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::IncludeColumnStats() {
241274
context_.return_column_stats = true;
242275
return *this;
243276
}
244277

245-
TableScanBuilder& TableScanBuilder::IncludeColumnStats(
278+
template <typename ScanType>
279+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::IncludeColumnStats(
246280
const std::vector<std::string>& requested_columns) {
247281
context_.return_column_stats = true;
248282
context_.columns_to_keep_stats.clear();
@@ -260,27 +294,35 @@ TableScanBuilder& TableScanBuilder::IncludeColumnStats(
260294
return *this;
261295
}
262296

263-
TableScanBuilder& TableScanBuilder::Select(const std::vector<std::string>& column_names) {
297+
template <typename ScanType>
298+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::Select(
299+
const std::vector<std::string>& column_names) {
264300
context_.selected_columns = column_names;
265301
return *this;
266302
}
267303

268-
TableScanBuilder& TableScanBuilder::Filter(std::shared_ptr<Expression> filter) {
304+
template <typename ScanType>
305+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::Filter(
306+
std::shared_ptr<Expression> filter) {
269307
context_.filter = std::move(filter);
270308
return *this;
271309
}
272310

273-
TableScanBuilder& TableScanBuilder::IgnoreResiduals() {
311+
template <typename ScanType>
312+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::IgnoreResiduals() {
274313
context_.ignore_residuals = true;
275314
return *this;
276315
}
277316

278-
TableScanBuilder& TableScanBuilder::MinRowsRequested(int64_t num_rows) {
317+
template <typename ScanType>
318+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::MinRowsRequested(
319+
int64_t num_rows) {
279320
context_.min_rows_requested = num_rows;
280321
return *this;
281322
}
282323

283-
TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) {
324+
template <typename ScanType>
325+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseSnapshot(int64_t snapshot_id) {
284326
ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(),
285327
"Cannot override snapshot, already set snapshot id={}",
286328
context_.snapshot_id.value());
@@ -289,7 +331,8 @@ TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) {
289331
return *this;
290332
}
291333

292-
TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) {
334+
template <typename ScanType>
335+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseRef(const std::string& ref) {
293336
if (ref == SnapshotRef::kMainBranch) {
294337
snapshot_schema_ = nullptr;
295338
context_.snapshot_id.reset();
@@ -309,38 +352,61 @@ TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) {
309352
return *this;
310353
}
311354

312-
TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) {
355+
template <typename ScanType>
356+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::AsOfTime(
357+
int64_t timestamp_millis) {
313358
auto time_point_ms = TimePointMsFromUnixMs(timestamp_millis);
314359
ICEBERG_BUILDER_ASSIGN_OR_RETURN(
315360
auto snapshot_id, SnapshotUtil::SnapshotIdAsOfTime(*metadata_, time_point_ms));
316361
return UseSnapshot(snapshot_id);
317362
}
318363

319-
TableScanBuilder& TableScanBuilder::FromSnapshot(
320-
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) {
321-
return AddError(NotImplemented("Incremental scan is not implemented"));
364+
template <typename ScanType>
365+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
366+
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive)
367+
requires IsIncrementalScan<ScanType>
368+
{
369+
AddError(NotImplemented("Incremental scan is not implemented"));
370+
return *this;
322371
}
323372

324-
TableScanBuilder& TableScanBuilder::FromSnapshot([[maybe_unused]] const std::string& ref,
325-
[[maybe_unused]] bool inclusive) {
326-
return AddError(NotImplemented("Incremental scan is not implemented"));
373+
template <typename ScanType>
374+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
375+
const std::string& ref, bool inclusive)
376+
requires IsIncrementalScan<ScanType>
377+
{
378+
AddError(NotImplemented("Incremental scan is not implemented"));
379+
return *this;
327380
}
328381

329-
TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] int64_t to_snapshot_id) {
330-
return AddError(NotImplemented("Incremental scan is not implemented"));
382+
template <typename ScanType>
383+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(int64_t to_snapshot_id)
384+
requires IsIncrementalScan<ScanType>
385+
{
386+
AddError(NotImplemented("Incremental scan is not implemented"));
387+
return *this;
331388
}
332389

333-
TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] const std::string& ref) {
334-
return AddError(NotImplemented("Incremental scan is not implemented"));
390+
template <typename ScanType>
391+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(const std::string& ref)
392+
requires IsIncrementalScan<ScanType>
393+
{
394+
AddError(NotImplemented("Incremental scan is not implemented"));
395+
return *this;
335396
}
336397

337-
TableScanBuilder& TableScanBuilder::UseBranch(const std::string& branch) {
398+
template <typename ScanType>
399+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseBranch(
400+
const std::string& branch)
401+
requires IsIncrementalScan<ScanType>
402+
{
338403
context_.branch = branch;
339404
return *this;
340405
}
341406

407+
template <typename ScanType>
342408
Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
343-
TableScanBuilder::ResolveSnapshotSchema() {
409+
TableScanBuilder<ScanType>::ResolveSnapshotSchema() {
344410
if (snapshot_schema_ == nullptr) {
345411
if (context_.snapshot_id.has_value()) {
346412
ICEBERG_ASSIGN_OR_RAISE(auto snapshot,
@@ -355,22 +421,32 @@ TableScanBuilder::ResolveSnapshotSchema() {
355421
return snapshot_schema_;
356422
}
357423

358-
bool TableScanBuilder::IsIncrementalScan() const {
359-
return context_.from_snapshot_id.has_value() || context_.to_snapshot_id.has_value();
360-
}
361-
362-
Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
424+
template <>
425+
Result<std::unique_ptr<DataTableScan>> TableScanBuilder<DataTableScan>::Build() {
363426
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
364427
ICEBERG_RETURN_UNEXPECTED(context_.Validate());
365428

366-
if (IsIncrementalScan()) {
367-
return NotImplemented("Incremental scan is not yet implemented");
368-
}
369-
370429
ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema());
371430
return DataTableScan::Make(metadata_, schema.get(), io_, std::move(context_));
372431
}
373432

433+
template <>
434+
Result<std::unique_ptr<IncrementalAppendScan>>
435+
TableScanBuilder<IncrementalAppendScan>::Build() {
436+
return NotImplemented("IncrementalAppendScanBuilder is not implemented");
437+
}
438+
439+
template <>
440+
Result<std::unique_ptr<IncrementalChangelogScan>>
441+
TableScanBuilder<IncrementalChangelogScan>::Build() {
442+
return NotImplemented("IncrementalChangelogScanBuilder is not implemented");
443+
}
444+
445+
// Explicit template instantiations
446+
template class TableScanBuilder<DataTableScan>;
447+
template class TableScanBuilder<IncrementalAppendScan>;
448+
template class TableScanBuilder<IncrementalChangelogScan>;
449+
374450
TableScan::TableScan(std::shared_ptr<TableMetadata> metadata,
375451
std::shared_ptr<Schema> schema, std::shared_ptr<FileIO> file_io,
376452
internal::TableScanContext context)
@@ -466,12 +542,6 @@ Result<std::unique_ptr<DataTableScan>> DataTableScan::Make(
466542
std::move(metadata), std::move(schema), std::move(io), std::move(context)));
467543
}
468544

469-
DataTableScan::DataTableScan(std::shared_ptr<TableMetadata> metadata,
470-
std::shared_ptr<Schema> schema, std::shared_ptr<FileIO> io,
471-
internal::TableScanContext context)
472-
: TableScan(std::move(metadata), std::move(schema), std::move(io),
473-
std::move(context)) {}
474-
475545
Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() const {
476546
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot());
477547
if (!snapshot) {
@@ -501,4 +571,42 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
501571
return manifest_group->PlanFiles();
502572
}
503573

574+
template <typename ScanTaskType>
575+
Result<std::vector<std::shared_ptr<ScanTaskType>>>
576+
IncrementalScan<ScanTaskType>::PlanFiles() const {
577+
return NotImplemented("IncrementalScan::PlanFiles is not implemented");
578+
}
579+
580+
// IncrementalAppendScan implementation
581+
582+
Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
583+
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
584+
[[maybe_unused]] std::shared_ptr<Schema> schema,
585+
[[maybe_unused]] std::shared_ptr<FileIO> io,
586+
[[maybe_unused]] internal::TableScanContext context) {
587+
return NotImplemented("IncrementalAppendScan is not implemented");
588+
}
589+
590+
Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFiles(
591+
std::optional<int64_t> from_snapshot_id_exclusive,
592+
int64_t to_snapshot_id_inclusive) const {
593+
return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
594+
}
595+
596+
// IncrementalChangelogScan implementation
597+
598+
Result<std::unique_ptr<IncrementalChangelogScan>> IncrementalChangelogScan::Make(
599+
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
600+
[[maybe_unused]] std::shared_ptr<Schema> schema,
601+
[[maybe_unused]] std::shared_ptr<FileIO> io,
602+
[[maybe_unused]] internal::TableScanContext context) {
603+
return NotImplemented("IncrementalChangelogScan is not implemented");
604+
}
605+
606+
Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
607+
IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_exclusive,
608+
int64_t to_snapshot_id_inclusive) const {
609+
return NotImplemented("IncrementalChangelogScan::PlanFiles is not implemented");
610+
}
611+
504612
} // namespace iceberg

0 commit comments

Comments
 (0)