diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 02edcdb3..008a6255 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -790,6 +790,26 @@ class LogScanner: or timeout expires. """ ... + def to_arrow_batch_reader(self) -> pa.RecordBatchReader: + """Create a lazy Arrow RecordBatchReader that reads until latest offsets. + + Returns a ``pyarrow.RecordBatchReader`` that lazily polls batches one at + a time (streaming). Prefer this when you want to process batches without + holding the full result in memory at once. + + Do not call ``poll_arrow`` / ``poll_record_batch`` on this scanner while + iterating the reader; they share the same underlying scanner state. + Overlapping calls are not supported. Use one active + polling/consumption path at a time. + + Requires a batch-based scanner (created with ``new_scan().create_record_batch_log_scanner()``). + You must call ``subscribe()``, ``subscribe_buckets()``, ``subscribe_partition()``, + or ``subscribe_partition_buckets()`` first. + + Returns: + ``pyarrow.RecordBatchReader`` yielding ``RecordBatch`` objects. + """ + ... def to_pandas(self) -> pd.DataFrame: """Convert all data to Pandas DataFrame. @@ -802,6 +822,10 @@ class LogScanner: def to_arrow(self) -> pa.Table: """Convert all data to Arrow Table. + Batches are collected in Rust then combined into one table (no per-batch + Python iteration). Do not interleave with ``poll_arrow`` / ``poll_record_batch`` + for the same subscription session; overlapping use is not supported. + Requires a batch-based scanner (created with new_scan().create_record_batch_log_scanner()). Reads from currently subscribed buckets until reaching their latest offsets. diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index c1b46734..f58b09a8 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -18,10 +18,10 @@ use crate::TOKIO_RUNTIME; use crate::*; use arrow::array::RecordBatch as ArrowRecordBatch; +use arrow::record_batch::RecordBatchReader as _; use arrow_pyarrow::{FromPyArrow, ToPyArrow}; use arrow_schema::SchemaRef; use fluss::record::to_arrow_schema; -use fluss::rpc::message::OffsetSpec; use indexmap::IndexMap; use pyo3::IntoPyObjectExt; use pyo3::exceptions::{PyIndexError, PyRuntimeError, PyTypeError}; @@ -1933,6 +1933,38 @@ fn get_type_name(value: &Bound) -> String { .unwrap_or_else(|_| "unknown".to_string()) } +/// Thin Python iterator over [`fcore::client::SyncRecordBatchLogReader`]. +/// Used internally as the backing iterator for +/// ``pa.RecordBatchReader.from_batches()``; not registered on the module. +#[pyclass] +struct PyRecordBatchLogReader { + sync_reader: fcore::client::SyncRecordBatchLogReader, +} + +#[pymethods] +impl PyRecordBatchLogReader { + fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + + fn __next__(&mut self, py: Python) -> PyResult>> { + let result = py.detach(|| self.sync_reader.next().transpose()); + + match result { + Ok(Some(batch)) => { + let py_batch = batch + .to_pyarrow(py) + .map_err(|e| FlussError::new_err(format!("Failed to convert batch: {e}")))?; + Ok(Some(py_batch.unbind())) + } + Ok(None) => Ok(None), + Err(arrow_err) => Err(FlussError::new_err(format!( + "Error reading batch: {arrow_err}" + ))), + } + } +} + /// Wraps the two scanner variants so we never have an impossible state /// (both None or both Some). enum ScannerKind { @@ -1985,8 +2017,6 @@ pub struct LogScanner { projected_schema: SchemaRef, /// The projected row type to use for record-based scanning projected_row_type: fcore::metadata::RowType, - /// Cache for partition_id -> partition_name mapping (avoids repeated list_partition_infos calls) - partition_name_cache: std::sync::RwLock>>, } #[pymethods] @@ -2231,30 +2261,93 @@ impl LogScanner { Ok(empty_table.into()) } + /// Create a lazy Arrow RecordBatchReader that reads until latest offsets. + /// + /// Returns a PyArrow RecordBatchReader that lazily polls batches one at a + /// time. This is more memory-efficient than ``to_arrow()`` which loads all + /// data into a single table. + /// + /// **Concurrency:** The reader shares the same underlying scanner state as + /// this ``LogScanner``. Do not call ``poll_arrow``, ``poll_record_batch``, + /// or other poll methods while iterating this reader. + /// + /// You must call subscribe(), subscribe_buckets(), subscribe_partition(), + /// or subscribe_partition_buckets() first. + /// + /// Returns: + /// ``pyarrow.RecordBatchReader`` yielding ``RecordBatch`` objects + fn to_arrow_batch_reader(&self, py: Python) -> PyResult> { + let scanner = self.kind.as_batch()?; + + let sync_reader = py + .detach(|| { + TOKIO_RUNTIME.block_on(async { + let reader = fcore::client::RecordBatchLogReader::new_until_latest( + scanner.new_shared_handle(), + &self.admin, + ) + .await?; + Ok::<_, fcore::error::Error>( + reader.to_record_batch_reader(TOKIO_RUNTIME.handle().clone()), + ) + }) + }) + .map_err(|e| FlussError::from_core_error(&e))?; + + let py_schema = sync_reader + .schema() + .to_pyarrow(py) + .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; + + let py_iter = Py::new(py, PyRecordBatchLogReader { sync_reader })?; + + let pyarrow = py.import("pyarrow")?; + let batch_reader = pyarrow + .getattr("RecordBatchReader")? + .call_method1("from_batches", (py_schema, py_iter))?; + + Ok(batch_reader.into()) + } + /// Convert all data to Arrow Table. /// /// Reads from currently subscribed buckets until reaching their latest offsets. /// Works for both partitioned and non-partitioned tables. /// + /// Materializes batches in Rust (``RecordBatchLogReader::collect_all_batches``) + /// then builds one PyArrow table, avoiding per-batch Python iteration. + /// /// You must call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first. /// /// Returns: /// PyArrow Table containing all data from subscribed buckets fn to_arrow(&self, py: Python) -> PyResult> { let scanner = self.kind.as_batch()?; - let subscribed = scanner.get_subscribed_buckets(); - if subscribed.is_empty() { - return Err(FlussError::new_err( - "No buckets subscribed. Call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first.", - )); - } + let batches: Vec> = py + .detach(|| { + TOKIO_RUNTIME.block_on(async { + let mut reader = fcore::client::RecordBatchLogReader::new_until_latest( + scanner.new_shared_handle(), + &self.admin, + ) + .await?; + let scan_batches = reader.collect_all_batches().await?; + Ok::<_, fcore::error::Error>( + scan_batches + .into_iter() + .map(|sb| std::sync::Arc::new(sb.into_batch())) + .collect(), + ) + }) + }) + .map_err(|e| FlussError::from_core_error(&e))?; - // 2. Query latest offsets for all subscribed buckets - let stopping_offsets = self.query_latest_offsets(py, &subscribed)?; + if batches.is_empty() { + return self.create_empty_table(py); + } - // 3. Poll until all buckets reach their stopping offsets - self.poll_until_offsets(py, stopping_offsets) + Utils::combine_batches_to_table(py, batches) } /// Convert all data to Pandas DataFrame. @@ -2423,206 +2516,7 @@ impl LogScanner { table_info, projected_schema, projected_row_type, - partition_name_cache: std::sync::RwLock::new(None), - } - } - - /// Get partition_id -> partition_name mapping, using cache if available - fn get_partition_name_map( - &self, - py: Python, - table_path: &fcore::metadata::TablePath, - ) -> PyResult> { - // Check cache first (read lock) - { - let cache = self.partition_name_cache.read().unwrap(); - if let Some(map) = cache.as_ref() { - return Ok(map.clone()); - } } - - // Fetch partition infos (releases GIL during async call) - let partition_infos: Vec = py - .detach(|| { - TOKIO_RUNTIME.block_on(async { self.admin.list_partition_infos(table_path).await }) - }) - .map_err(|e| FlussError::from_core_error(&e))?; - - // Build and cache the mapping - let map: HashMap = partition_infos - .into_iter() - .map(|info| (info.get_partition_id(), info.get_partition_name())) - .collect(); - - // Store in cache (write lock) - { - let mut cache = self.partition_name_cache.write().unwrap(); - *cache = Some(map.clone()); - } - - Ok(map) - } - - /// Query latest offsets for subscribed buckets (handles both partitioned and non-partitioned) - fn query_latest_offsets( - &self, - py: Python, - subscribed: &[(fcore::metadata::TableBucket, i64)], - ) -> PyResult> { - let scanner = self.kind.as_batch()?; - let is_partitioned = scanner.is_partitioned(); - let table_path = &self.table_info.table_path; - - if !is_partitioned { - // Non-partitioned: simple case - just query all bucket IDs - let bucket_ids: Vec = subscribed.iter().map(|(tb, _)| tb.bucket_id()).collect(); - - let offsets: HashMap = py - .detach(|| { - TOKIO_RUNTIME.block_on(async { - self.admin - .list_offsets(table_path, &bucket_ids, OffsetSpec::Latest) - .await - }) - }) - .map_err(|e| FlussError::from_core_error(&e))?; - - // Convert to TableBucket-keyed map - let table_id = self.table_info.table_id; - Ok(offsets - .into_iter() - .filter(|(_, offset)| *offset > 0) - .map(|(bucket_id, offset)| { - ( - fcore::metadata::TableBucket::new(table_id, bucket_id), - offset, - ) - }) - .collect()) - } else { - // Partitioned: need to query per partition - self.query_partitioned_offsets(py, subscribed) - } - } - - /// Query offsets for partitioned table subscriptions - fn query_partitioned_offsets( - &self, - py: Python, - subscribed: &[(fcore::metadata::TableBucket, i64)], - ) -> PyResult> { - let table_path = &self.table_info.table_path; - - // Get partition_id -> partition_name mapping (cached) - let partition_id_to_name = self.get_partition_name_map(py, table_path)?; - - // Group subscribed buckets by partition_id - let mut by_partition: HashMap> = HashMap::new(); - for (tb, _) in subscribed { - if let Some(partition_id) = tb.partition_id() { - by_partition - .entry(partition_id) - .or_default() - .push(tb.bucket_id()); - } - } - - // Query offsets for each partition - let mut result: HashMap = HashMap::new(); - let table_id = self.table_info.table_id; - - for (partition_id, bucket_ids) in by_partition { - let partition_name = partition_id_to_name.get(&partition_id).ok_or_else(|| { - FlussError::new_err(format!("Unknown partition_id: {partition_id}")) - })?; - - let offsets: HashMap = py - .detach(|| { - TOKIO_RUNTIME.block_on(async { - self.admin - .list_partition_offsets( - table_path, - partition_name, - &bucket_ids, - OffsetSpec::Latest, - ) - .await - }) - }) - .map_err(|e| FlussError::from_core_error(&e))?; - - for (bucket_id, offset) in offsets { - if offset > 0 { - let tb = fcore::metadata::TableBucket::new_with_partition( - table_id, - Some(partition_id), - bucket_id, - ); - result.insert(tb, offset); - } - } - } - - Ok(result) - } - - /// Poll until all buckets reach their stopping offsets - fn poll_until_offsets( - &self, - py: Python, - mut stopping_offsets: HashMap, - ) -> PyResult> { - let scanner = self.kind.as_batch()?; - let mut all_batches = Vec::new(); - - while !stopping_offsets.is_empty() { - let scan_batches = py - .detach(|| { - TOKIO_RUNTIME.block_on(async { scanner.poll(Duration::from_millis(500)).await }) - }) - .map_err(|e| FlussError::from_core_error(&e))?; - - if scan_batches.is_empty() { - continue; - } - - for scan_batch in scan_batches { - let table_bucket = scan_batch.bucket().clone(); - - // Check if this bucket is still being tracked - let Some(&stop_at) = stopping_offsets.get(&table_bucket) else { - continue; - }; - - let base_offset = scan_batch.base_offset(); - let last_offset = scan_batch.last_offset(); - - // If the batch starts at or after the stop_at offset, the bucket is exhausted - if base_offset >= stop_at { - stopping_offsets.remove(&table_bucket); - continue; - } - - let batch = if last_offset >= stop_at { - // Slice batch to keep only records where offset < stop_at - let num_to_keep = (stop_at - base_offset) as usize; - let b = scan_batch.into_batch(); - let limit = num_to_keep.min(b.num_rows()); - b.slice(0, limit) - } else { - scan_batch.into_batch() - }; - - all_batches.push(Arc::new(batch)); - - // Check if we're done with this bucket - if last_offset >= stop_at - 1 { - stopping_offsets.remove(&table_bucket); - } - } - } - - Utils::combine_batches_to_table(py, all_batches) } } diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 12807988..6e8906ef 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -28,12 +28,14 @@ mod lookup; mod log_fetch_buffer; mod partition_getter; +mod reader; mod remote_log; mod scanner; mod upsert; pub use append::{AppendWriter, TableAppend}; pub use lookup::{LookupResult, Lookuper, TableLookup}; +pub use reader::{RecordBatchLogReader, SyncRecordBatchLogReader}; pub use remote_log::{ DEFAULT_REMOTE_FILE_DOWNLOAD_THREAD_NUM, DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM, }; diff --git a/crates/fluss/src/client/table/reader.rs b/crates/fluss/src/client/table/reader.rs new file mode 100644 index 00000000..48e03ffd --- /dev/null +++ b/crates/fluss/src/client/table/reader.rs @@ -0,0 +1,618 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Bounded log reader that polls until stopping offsets, then terminates. +//! +//! Unlike [`RecordBatchLogScanner`] which is unbounded (continuous streaming), +//! [`RecordBatchLogReader`] reads log data up to a finite set of stopping +//! offsets and then signals completion. This enables "snapshot-style" reads +//! from a streaming log: capture the latest offsets, then consume all data +//! up to those offsets. +//! +//! The reader **takes ownership** of the scanner (move, not clone). Once the +//! scanner is moved into a reader, the compiler prevents concurrent polls. +//! +//! The reader also provides a synchronous [`arrow::record_batch::RecordBatchReader`] +//! adapter via [`RecordBatchLogReader::to_record_batch_reader`] for Arrow +//! ecosystem interop and FFI consumers (Python, C++). + +use crate::client::admin::FlussAdmin; +use crate::client::table::RecordBatchLogScanner; +use crate::error::{Error, Result}; +use crate::metadata::TableBucket; +use crate::record::ScanBatch; +use crate::rpc::message::OffsetSpec; +use arrow::record_batch::RecordBatch; +use arrow_schema::SchemaRef; +use log::warn; +use std::collections::{HashMap, VecDeque}; +use std::time::Duration; + +const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_millis(500); + +/// Bounded log reader that consumes log data up to specified stopping offsets. +/// +/// This type wraps a [`RecordBatchLogScanner`] and adds stopping semantics: +/// it polls batches from the scanner, filters/slices them against per-bucket +/// stopping offsets, and signals completion when all buckets are caught up. +/// +/// The reader takes **ownership** of the scanner. Once moved in, no other code +/// can poll the same scanner concurrently. +/// +/// # Construction +/// +/// Use [`RecordBatchLogReader::new_until_latest`] for the common case of +/// reading all currently-available data, or [`RecordBatchLogReader::new_until_offsets`] +/// for custom stopping offsets. +/// +/// # Async iteration +/// +/// Call [`next_batch`](RecordBatchLogReader::next_batch) repeatedly to get +/// [`ScanBatch`]es lazily, one at a time. Returns `None` when all buckets +/// have reached their stopping offsets. +/// +/// # Sync adapter +/// +/// Call [`to_record_batch_reader`](RecordBatchLogReader::to_record_batch_reader) +/// to get a synchronous [`arrow::record_batch::RecordBatchReader`] suitable +/// for Arrow FFI consumers. +pub struct RecordBatchLogReader { + scanner: RecordBatchLogScanner, + stopping_offsets: HashMap, + buffer: VecDeque, + schema: SchemaRef, +} + +impl RecordBatchLogReader { + /// Create a reader that reads until the latest offsets at the time of creation. + /// + /// Queries the server for the current latest offset of each subscribed + /// bucket, then reads until those offsets are reached. Buckets whose + /// subscribed offset already meets or exceeds the latest offset are + /// excluded (nothing to read). + /// + /// Partition metadata is fetched once during construction; no caching + /// is needed since each reader is typically short-lived. + pub async fn new_until_latest( + scanner: RecordBatchLogScanner, + admin: &FlussAdmin, + ) -> Result { + let subscribed = scanner.get_subscribed_buckets(); + if subscribed.is_empty() { + return Err(Error::IllegalArgument { + message: "No buckets subscribed. Call subscribe() before creating a reader." + .to_string(), + }); + } + + let stopping_offsets = query_latest_offsets(admin, &scanner, &subscribed).await?; + let schema = scanner.schema(); + + Ok(Self { + scanner, + stopping_offsets, + buffer: VecDeque::new(), + schema, + }) + } + + /// Create a reader with explicit stopping offsets per bucket. + pub fn new_until_offsets( + scanner: RecordBatchLogScanner, + stopping_offsets: HashMap, + ) -> Self { + let schema = scanner.schema(); + Self { + scanner, + stopping_offsets, + buffer: VecDeque::new(), + schema, + } + } + + /// Returns the Arrow schema for batches produced by this reader. + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + /// Drain all remaining batches until stopping offsets are satisfied. + /// + /// This is a convenience for callers (e.g. bindings building a single Arrow + /// table) that want to materialize the full result in Rust without per-batch + /// iteration. + pub async fn collect_all_batches(&mut self) -> Result> { + let mut out = Vec::new(); + while let Some(b) = self.next_batch().await? { + out.push(b); + } + Ok(out) + } + + /// Fetch the next [`ScanBatch`], or `None` if all buckets are caught up. + /// + /// Each call may internally poll multiple batches from the scanner, + /// buffer them, and return one at a time. Batches that cross a stopping + /// offset boundary are sliced to exclude records at or beyond the stop point. + /// + /// Completed buckets are unsubscribed from the scanner to avoid wasting + /// network traffic on data the reader will discard. + pub async fn next_batch(&mut self) -> Result> { + loop { + if let Some(batch) = self.buffer.pop_front() { + return Ok(Some(batch)); + } + + if self.stopping_offsets.is_empty() { + return Ok(None); + } + + let scan_batches = self.scanner.poll(DEFAULT_POLL_TIMEOUT).await?; + + if scan_batches.is_empty() { + continue; + } + + let completed = + filter_batches(scan_batches, &mut self.stopping_offsets, &mut self.buffer); + + for tb in completed { + if let Some(partition_id) = tb.partition_id() { + self.scanner + .unsubscribe_partition(partition_id, tb.bucket_id()) + .await?; + } else { + self.scanner.unsubscribe(tb.bucket_id()).await?; + } + } + } + } + + /// Convert this async reader into a synchronous [`arrow::record_batch::RecordBatchReader`]. + /// + /// The returned adapter calls [`tokio::runtime::Handle::block_on`] on each + /// iterator step. **Do not** call this from inside a Tokio worker thread + /// while the same runtime is driving async work (nested `block_on` can + /// panic or deadlock). Prefer [`next_batch`](RecordBatchLogReader::next_batch) + /// in async Rust code. This is intended for sync/FFI boundaries (C++, some + /// Python call paths). + pub fn to_record_batch_reader( + self, + handle: tokio::runtime::Handle, + ) -> SyncRecordBatchLogReader { + SyncRecordBatchLogReader { + reader: self, + handle, + } + } +} + +/// Synchronous adapter that implements [`arrow::record_batch::RecordBatchReader`]. +/// +/// Created via [`RecordBatchLogReader::to_record_batch_reader`]. +/// Blocks the current thread on each `next()` call using the provided +/// Tokio runtime handle. +/// +/// The iterator yields plain [`RecordBatch`]es (bucket/offset metadata from +/// [`ScanBatch`] is stripped to satisfy the Arrow trait contract). +pub struct SyncRecordBatchLogReader { + reader: RecordBatchLogReader, + handle: tokio::runtime::Handle, +} + +impl Iterator for SyncRecordBatchLogReader { + type Item = std::result::Result; + + fn next(&mut self) -> Option { + match self.handle.block_on(self.reader.next_batch()) { + Ok(Some(scan_batch)) => Some(Ok(scan_batch.into_batch())), + Ok(None) => None, + Err(e) => Some(Err(arrow::error::ArrowError::ExternalError(Box::new(e)))), + } + } +} + +impl arrow::record_batch::RecordBatchReader for SyncRecordBatchLogReader { + fn schema(&self) -> SchemaRef { + self.reader.schema() + } +} + +/// Query latest offsets for all subscribed buckets, handling both partitioned +/// and non-partitioned tables. +/// +/// Buckets whose subscribed offset already meets or exceeds the latest offset +/// are excluded from the result (there is nothing to read). A `latest_offset` +/// of `0` means the bucket is empty and is silently skipped; a negative value +/// is unexpected from the server and is logged as a warning before being +/// skipped. +async fn query_latest_offsets( + admin: &FlussAdmin, + scanner: &RecordBatchLogScanner, + subscribed: &[(TableBucket, i64)], +) -> Result> { + let table_path = scanner.table_path(); + + if !scanner.is_partitioned() { + let bucket_ids: Vec = subscribed.iter().map(|(tb, _)| tb.bucket_id()).collect(); + + let offsets = admin + .list_offsets(table_path, &bucket_ids, OffsetSpec::Latest) + .await?; + + let subscribed_offset_by_bucket: HashMap = subscribed + .iter() + .map(|(tb, off)| (tb.bucket_id(), *off)) + .collect(); + + let table_id = scanner.table_id(); + Ok(offsets + .into_iter() + .filter(|(bucket_id, latest_offset)| { + if *latest_offset < 0 { + warn!( + "Server returned negative latest offset {latest_offset} for bucket {bucket_id} of table {table_id}; skipping bucket." + ); + return false; + } + if *latest_offset == 0 { + return false; + } + let subscribed_offset = subscribed_offset_by_bucket + .get(bucket_id) + .copied() + .unwrap_or(0); + subscribed_offset < *latest_offset + }) + .map(|(bucket_id, offset)| (TableBucket::new(table_id, bucket_id), offset)) + .collect()) + } else { + query_partitioned_offsets(admin, scanner, subscribed).await + } +} + +/// Query offsets for partitioned table subscriptions. +/// +/// Partition metadata is fetched once per reader construction (not cached), +/// since each [`RecordBatchLogReader`] is typically short-lived and consumed. +async fn query_partitioned_offsets( + admin: &FlussAdmin, + scanner: &RecordBatchLogScanner, + subscribed: &[(TableBucket, i64)], +) -> Result> { + let table_path = scanner.table_path(); + let table_id = scanner.table_id(); + + let partition_infos = admin.list_partition_infos(table_path).await?; + let partition_id_to_name: HashMap = partition_infos + .into_iter() + .map(|info| (info.get_partition_id(), info.get_partition_name())) + .collect(); + + let subscribed_offset_map: HashMap = subscribed.iter().cloned().collect(); + + let mut by_partition: HashMap> = HashMap::new(); + for (tb, _) in subscribed { + if let Some(partition_id) = tb.partition_id() { + by_partition + .entry(partition_id) + .or_default() + .push(tb.bucket_id()); + } + } + + let mut result: HashMap = HashMap::new(); + + for (partition_id, bucket_ids) in by_partition { + let partition_name = + partition_id_to_name + .get(&partition_id) + .ok_or_else(|| Error::UnexpectedError { + message: format!("Unknown partition_id: {partition_id}"), + source: None, + })?; + + let offsets = admin + .list_partition_offsets(table_path, partition_name, &bucket_ids, OffsetSpec::Latest) + .await?; + + for (bucket_id, latest_offset) in offsets { + if latest_offset < 0 { + warn!( + "Server returned negative latest offset {latest_offset} for bucket {bucket_id} of partition {partition_id} (table {table_id}); skipping bucket." + ); + continue; + } + if latest_offset == 0 { + continue; + } + let tb = TableBucket::new_with_partition(table_id, Some(partition_id), bucket_id); + let subscribed_offset = subscribed_offset_map.get(&tb).copied().unwrap_or(0); + if subscribed_offset < latest_offset { + result.insert(tb, latest_offset); + } + } + } + + Ok(result) +} + +/// Filter and slice scan batches against per-bucket stopping offsets. +/// +/// For each batch: +/// - If the batch's bucket is not in `stopping_offsets`, skip it. +/// - If `base_offset >= stop_at`, the bucket is exhausted; remove from map. +/// - If `last_offset >= stop_at`, slice to keep only records before stop_at. +/// - Otherwise, keep the full batch. +/// +/// Accepted batches with at least one row are pushed to `buffer`; empty +/// batches (e.g. a server-emitted batch containing no rows, or a slice that +/// reduces to zero rows) are dropped so consumers never observe an empty +/// `ScanBatch`. Returns the list of buckets that completed (were removed +/// from `stopping_offsets`). +fn filter_batches( + scan_batches: Vec, + stopping_offsets: &mut HashMap, + buffer: &mut VecDeque, +) -> Vec { + let mut completed = Vec::new(); + + for scan_batch in scan_batches { + let bucket = scan_batch.bucket().clone(); + let Some(&stop_at) = stopping_offsets.get(&bucket) else { + continue; + }; + + let base_offset = scan_batch.base_offset(); + let last_offset = scan_batch.last_offset(); + + if base_offset >= stop_at { + stopping_offsets.remove(&bucket); + completed.push(bucket); + continue; + } + + let kept_batch = if last_offset >= stop_at { + let num_to_keep = (stop_at - base_offset) as usize; + let b = scan_batch.into_batch(); + let limit = num_to_keep.min(b.num_rows()); + ScanBatch::new(bucket.clone(), b.slice(0, limit), base_offset) + } else { + scan_batch + }; + + if kept_batch.batch().num_rows() > 0 { + buffer.push_back(kept_batch); + } + + if last_offset >= stop_at - 1 { + stopping_offsets.remove(&bucket); + completed.push(bucket); + } + } + + completed +} + +// TODO: Add an end-to-end test with `FlussTestingCluster` (feature +// `integration_tests`) covering `new_until_latest`, partitioned tables, and +// `new_until_offsets` stopping semantics. +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::Int32Array; + use arrow_schema::{DataType, Field, Schema}; + use std::sync::Arc; + + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)])) + } + + fn make_batch(values: &[i32]) -> RecordBatch { + RecordBatch::try_new( + test_schema(), + vec![Arc::new(Int32Array::from(values.to_vec()))], + ) + .unwrap() + } + + fn make_scan_batch(bucket: TableBucket, base_offset: i64, values: &[i32]) -> ScanBatch { + ScanBatch::new(bucket, make_batch(values), base_offset) + } + + fn bucket(id: i32) -> TableBucket { + TableBucket::new(1, id) + } + + #[test] + fn filter_batch_entirely_before_stop() { + let mut offsets = HashMap::from([(bucket(0), 100)]); + let mut buffer = VecDeque::new(); + + let batches = vec![make_scan_batch(bucket(0), 10, &[1, 2, 3])]; + let completed = filter_batches(batches, &mut offsets, &mut buffer); + + assert_eq!(buffer.len(), 1); + assert_eq!(buffer[0].batch().num_rows(), 3); + assert!(offsets.contains_key(&bucket(0))); + assert!(completed.is_empty()); + } + + #[test] + fn filter_batch_crossing_stop_offset_is_sliced() { + let mut offsets = HashMap::from([(bucket(0), 12)]); + let mut buffer = VecDeque::new(); + + // base_offset=10, 5 rows -> offsets 10,11,12,13,14; stop_at=12 -> keep 2 + let batches = vec![make_scan_batch(bucket(0), 10, &[1, 2, 3, 4, 5])]; + let completed = filter_batches(batches, &mut offsets, &mut buffer); + + assert_eq!(buffer.len(), 1); + assert_eq!(buffer[0].batch().num_rows(), 2); + assert!(!offsets.contains_key(&bucket(0))); + assert_eq!(completed, vec![bucket(0)]); + } + + #[test] + fn filter_batch_at_or_after_stop_offset_is_skipped() { + let mut offsets = HashMap::from([(bucket(0), 10)]); + let mut buffer = VecDeque::new(); + + // base_offset=10, stop_at=10 -> base >= stop, skip entirely + let batches = vec![make_scan_batch(bucket(0), 10, &[1, 2, 3])]; + let completed = filter_batches(batches, &mut offsets, &mut buffer); + + assert!(buffer.is_empty()); + assert!(!offsets.contains_key(&bucket(0))); + assert_eq!(completed, vec![bucket(0)]); + } + + #[test] + fn filter_batch_ending_exactly_at_stop_minus_one() { + let mut offsets = HashMap::from([(bucket(0), 13)]); + let mut buffer = VecDeque::new(); + + // base_offset=10, 3 rows -> offsets 10,11,12; last_offset=12, stop_at=13 + // last_offset (12) >= stop_at - 1 (12) => bucket done + let batches = vec![make_scan_batch(bucket(0), 10, &[1, 2, 3])]; + let completed = filter_batches(batches, &mut offsets, &mut buffer); + + assert_eq!(buffer.len(), 1); + assert_eq!(buffer[0].batch().num_rows(), 3); + assert!(!offsets.contains_key(&bucket(0))); + assert_eq!(completed, vec![bucket(0)]); + } + + #[test] + fn filter_unknown_bucket_is_ignored() { + let mut offsets = HashMap::from([(bucket(0), 100)]); + let mut buffer = VecDeque::new(); + + let batches = vec![make_scan_batch(bucket(99), 0, &[1, 2])]; + let completed = filter_batches(batches, &mut offsets, &mut buffer); + + assert!(buffer.is_empty()); + assert!(offsets.contains_key(&bucket(0))); + assert!(completed.is_empty()); + } + + #[test] + fn filter_multiple_buckets_independent_tracking() { + let mut offsets = HashMap::from([(bucket(0), 12), (bucket(1), 5)]); + let mut buffer = VecDeque::new(); + + let batches = vec![ + make_scan_batch(bucket(0), 10, &[1, 2, 3]), // last=12, stop=12 -> keep 2, done + make_scan_batch(bucket(1), 0, &[10, 20, 30]), // last=2, stop=5 -> keep all, not done + ]; + let completed = filter_batches(batches, &mut offsets, &mut buffer); + + assert_eq!(buffer.len(), 2); + assert_eq!(buffer[0].batch().num_rows(), 2); // bucket 0: sliced + assert_eq!(buffer[1].batch().num_rows(), 3); // bucket 1: full + assert!(!offsets.contains_key(&bucket(0))); // bucket 0: done + assert!(offsets.contains_key(&bucket(1))); // bucket 1: still tracking + assert_eq!(completed, vec![bucket(0)]); + } + + #[test] + fn filter_empty_batch_at_stop() { + let mut offsets = HashMap::from([(bucket(0), 5)]); + let mut buffer = VecDeque::new(); + + // empty batch: base_offset=5, 0 rows -> last_offset = base-1 = 4 + // base_offset (5) >= stop_at (5) -> skip, remove + let batches = vec![make_scan_batch(bucket(0), 5, &[])]; + let completed = filter_batches(batches, &mut offsets, &mut buffer); + + assert!(buffer.is_empty()); + assert!(!offsets.contains_key(&bucket(0))); + assert_eq!(completed, vec![bucket(0)]); + } + + #[test] + fn filter_drops_empty_batch_before_stop() { + // Empty batch well below the stop offset: base=5, 0 rows -> last=4, stop=100. + // base_offset (5) < stop_at (100) and last_offset (4) < stop_at (100), + // so it falls into the "keep full batch" branch but must not surface to + // the consumer because it has zero rows. + let mut offsets = HashMap::from([(bucket(0), 100)]); + let mut buffer = VecDeque::new(); + + let batches = vec![make_scan_batch(bucket(0), 5, &[])]; + let completed = filter_batches(batches, &mut offsets, &mut buffer); + + assert!(buffer.is_empty()); + assert!(offsets.contains_key(&bucket(0))); + assert!(completed.is_empty()); + } + + #[test] + fn filter_single_row_batch_before_stop() { + let mut offsets = HashMap::from([(bucket(0), 10)]); + let mut buffer = VecDeque::new(); + + let batches = vec![make_scan_batch(bucket(0), 5, &[42])]; + let completed = filter_batches(batches, &mut offsets, &mut buffer); + + assert_eq!(buffer.len(), 1); + assert_eq!(buffer[0].batch().num_rows(), 1); + assert!(offsets.contains_key(&bucket(0))); + assert!(completed.is_empty()); + } + + #[test] + fn filter_single_row_batch_at_stop_boundary() { + let mut offsets = HashMap::from([(bucket(0), 5)]); + let mut buffer = VecDeque::new(); + + // base_offset=4, 1 row -> last_offset=4, stop=5 + // last < stop -> keep all; last (4) >= stop-1 (4) -> done + let batches = vec![make_scan_batch(bucket(0), 4, &[42])]; + let completed = filter_batches(batches, &mut offsets, &mut buffer); + + assert_eq!(buffer.len(), 1); + assert_eq!(buffer[0].batch().num_rows(), 1); + assert!(!offsets.contains_key(&bucket(0))); + assert_eq!(completed, vec![bucket(0)]); + } + + #[test] + fn filter_preserves_scan_batch_metadata() { + let mut offsets = HashMap::from([(bucket(3), 100)]); + let mut buffer = VecDeque::new(); + + let batches = vec![make_scan_batch(bucket(3), 42, &[1, 2])]; + filter_batches(batches, &mut offsets, &mut buffer); + + let sb = &buffer[0]; + assert_eq!(*sb.bucket(), bucket(3)); + assert_eq!(sb.base_offset(), 42); + } + + #[test] + fn filter_sliced_batch_preserves_base_offset() { + let mut offsets = HashMap::from([(bucket(0), 12)]); + let mut buffer = VecDeque::new(); + + let batches = vec![make_scan_batch(bucket(0), 10, &[1, 2, 3, 4, 5])]; + filter_batches(batches, &mut offsets, &mut buffer); + + let sb = &buffer[0]; + assert_eq!(sb.base_offset(), 10); + assert_eq!(*sb.bucket(), bucket(0)); + } +} diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 00c5b238..735e7fe5 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -254,6 +254,10 @@ pub struct LogScanner { /// /// More efficient than [`LogScanner`] for batch-level analytics where per-record /// metadata (offsets, timestamps) is not needed. +/// +/// This type is intentionally **not** `Clone`. To perform a bounded read, move +/// the scanner into a [`crate::client::RecordBatchLogReader`] — the compiler +/// then prevents concurrent polls by construction. pub struct RecordBatchLogScanner { inner: Arc, } @@ -266,6 +270,7 @@ struct LogScannerInner { log_scanner_status: Arc, log_fetcher: LogFetcher, is_partitioned_table: bool, + arrow_schema: SchemaRef, } impl LogScannerInner { @@ -277,6 +282,20 @@ impl LogScannerInner { projected_fields: Option>, ) -> Result { let log_scanner_status = Arc::new(LogScannerStatus::new()); + + let full_row_type = table_info.get_row_type(); + let arrow_schema = match &projected_fields { + Some(indices) => { + let projected_fields_vec: Vec<_> = indices + .iter() + .map(|&i| full_row_type.fields()[i].clone()) + .collect(); + let projected_row_type = crate::metadata::RowType::new(projected_fields_vec); + to_arrow_schema(&projected_row_type)? + } + None => to_arrow_schema(full_row_type)?, + }; + Ok(Self { table_path: table_info.table_path.clone(), table_id: table_info.table_id, @@ -285,12 +304,13 @@ impl LogScannerInner { log_scanner_status: log_scanner_status.clone(), log_fetcher: LogFetcher::new( table_info.clone(), - connections.clone(), - metadata.clone(), + connections, + metadata, log_scanner_status.clone(), config, projected_fields, )?, + arrow_schema, }) } @@ -612,6 +632,34 @@ impl RecordBatchLogScanner { ) -> Result<()> { self.inner.unsubscribe_partition(partition_id, bucket).await } + + /// Returns the Arrow schema for batches produced by this scanner. + pub fn schema(&self) -> SchemaRef { + self.inner.arrow_schema.clone() + } + + pub fn table_path(&self) -> &TablePath { + &self.inner.table_path + } + + pub fn table_id(&self) -> TableId { + self.inner.table_id + } + + /// Creates a new handle to the same underlying scanner state. + /// + /// Binding layers (Python, C++) that hold the scanner behind shared + /// ownership (`Arc`) cannot move it into a [`crate::client::RecordBatchLogReader`]. + /// This method produces a second handle so the reader can take ownership + /// while the binding retains its reference for subscription management. + /// + /// **Not intended for general use** — prefer moving the scanner directly. + #[doc(hidden)] + pub fn new_shared_handle(&self) -> Self { + RecordBatchLogScanner { + inner: Arc::clone(&self.inner), + } + } } struct LogFetcher { @@ -1993,6 +2041,7 @@ mod tests { let result = validate_scan_support(&table_path, &table_info); assert!(result.is_ok()); } + #[tokio::test] async fn prepare_fetch_log_requests_uses_configured_fetch_params() -> Result<()> { let table_path = TablePath::new("db".to_string(), "tbl".to_string()); diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index 1268d37f..89d67376 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -164,9 +164,12 @@ Builder for creating a `Lookuper`. Obtain via `FlussTable.new_lookup()`. | `.poll(timeout_ms) -> ScanRecords` | Poll individual records (record scanner only) | | `.poll_arrow(timeout_ms) -> pa.Table` | Poll as Arrow Table (batch scanner only) | | `.poll_record_batch(timeout_ms) -> list[RecordBatch]` | Poll batches with metadata (batch scanner only) | +| `.to_arrow_batch_reader() -> pa.RecordBatchReader` | Lazy Arrow RecordBatchReader reading until latest offsets (batch scanner only) | | `.to_arrow() -> pa.Table` | Read all subscribed data as Arrow Table (batch scanner only) | | `.to_pandas() -> pd.DataFrame` | Read all subscribed data as DataFrame (batch scanner only) | +> **Note:** Overlapping `poll_*` / `to_arrow*` / `to_arrow_batch_reader` calls on the same underlying scanner are not supported. Use only one active polling/consumption path at a time. + ## `ScanRecords` Returned by `LogScanner.poll()`. Records are grouped by bucket. diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 15a62c1c..3a6b0bc7 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -148,6 +148,8 @@ Complete API reference for the Fluss Rust client. ## `RecordBatchLogScanner` +Overlapping `poll` calls on clones that share state, or `poll` concurrent with `RecordBatchLogReader::next_batch`, are not supported. Use one active polling/consumption call at a time per underlying scanner state. + | Method | Description | |-----------------------------------------------------------------------------------------------------------|----------------------------------------------------------| | `async fn subscribe(&self, bucket_id: i32, start_offset: i64) -> Result<()>` | Subscribe to a bucket | @@ -159,6 +161,23 @@ Complete API reference for the Fluss Rust client. | `async fn poll(&self, timeout: Duration) -> Result>` | Poll for Arrow record batches | | `fn is_partitioned(&self) -> bool` | Check if the table is partitioned | | `fn get_subscribed_buckets(&self) -> Vec<(TableBucket, i64)>` | Get all current subscriptions as (bucket, offset) pairs | +| `fn schema(&self) -> SchemaRef` | Get the Arrow schema for batches produced by this scanner| +| `fn table_path(&self) -> &TablePath` | Get the table path | +| `fn table_id(&self) -> TableId` | Get the table ID | + +## `RecordBatchLogReader` + +Bounded log reader that consumes data up to specified stopping offsets, then terminates. +Unlike `RecordBatchLogScanner` which polls indefinitely, this reader stops automatically. + +| Method | Description | +|-------------------------------------------------------------------------------------------------------------|----------------------------------------------------------| +| `async fn new_until_latest(scanner: RecordBatchLogScanner, admin: &FlussAdmin) -> Result` | Read until the latest offsets at time of creation | +| `fn new_until_offsets(scanner: RecordBatchLogScanner, stopping_offsets: HashMap) -> Self` | Read until custom stopping offsets per bucket | +| `async fn next_batch(&mut self) -> Result>` | Get the next batch with bucket/offset metadata, or `None` when all buckets caught up | +| `async fn collect_all_batches(&mut self) -> Result>` | Drain all batches (with metadata) until stopping offsets are satisfied | +| `fn schema(&self) -> SchemaRef` | Arrow schema for produced batches | +| `fn to_record_batch_reader(self, handle) -> SyncRecordBatchLogReader` | Sync adapter implementing `arrow::RecordBatchReader` | ## `ScanRecord`