Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ impl TableProvider for PhysicalFilesSizeTable {
) -> DFResult<Arc<dyn ExecutionPlan>> {
let table = self.table.clone();
let summary = crate::runtime::await_with_runtime(async move {
collect_physical_files_summary(table.file_io(), table.location()).await
let partition_depth = table.schema().partition_keys().len();
collect_physical_files_summary(table.file_io(), table.location(), partition_depth).await
})
.await
.map_err(to_datafusion_error)?;
Expand Down
302 changes: 274 additions & 28 deletions crates/paimon/src/table/referenced_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,25 +614,113 @@ pub struct PhysicalFilesSummary {
pub index_file_size: i64,
}

/// Categorize a file name into a file type.
/// Everything that is not a manifest/statistics or index file is classified as data.
fn classify_file_name(file_name: &str) -> FileType {
if file_name.starts_with("manifest-")
|| file_name.starts_with("index-manifest-")
|| file_name.starts_with("statistics-")
{
FileType::Manifest
} else if file_name.starts_with("index-") {
FileType::Index
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PhysicalFileKind {
Manifest,
Statistics,
Data,
Index,
Other,
}

fn table_relative_path<'a>(table_location: &str, path: &'a str) -> Option<&'a str> {
let table_location = table_location.trim_end_matches('/');
if path == table_location {
Some("")
} else {
FileType::Data
path.strip_prefix(table_location)
.and_then(|rest| rest.strip_prefix('/'))
}
}

enum FileType {
Manifest,
Data,
Index,
fn is_manifest_file_name(file_name: &str) -> bool {
file_name.starts_with("manifest-")
|| file_name.starts_with("manifest-list-")
|| file_name.starts_with("index-manifest-")
}

fn is_bucket_dir_name(segment: &str) -> bool {
segment == "bucket-postpone"
|| segment
.strip_prefix("bucket-")
.is_some_and(|bucket| !bucket.is_empty() && bucket.chars().all(|c| c.is_ascii_digit()))
}

fn is_partition_segment(segment: &str) -> bool {
let Some((key, _value)) = segment.split_once('=') else {
return false;
};
!key.is_empty()
}

fn is_data_file_in_bucket(segments: &[&str], partition_depth: usize) -> bool {
if segments.len() != partition_depth + 2 {
return false;
}

segments[..partition_depth]
.iter()
.all(|segment| is_partition_segment(segment))
&& is_bucket_dir_name(segments[partition_depth])
&& !segments[partition_depth + 1].starts_with("index-")
}

fn is_data_file_in_data_dir(
relative_path: &str,
data_dir_relative_path: &str,
partition_depth: usize,
) -> bool {
let data_dir_relative_path = data_dir_relative_path.trim_matches('/');
let data_relative_path = if data_dir_relative_path.is_empty() {
relative_path
} else {
let Some(rest) = relative_path
.strip_prefix(data_dir_relative_path)
.and_then(|rest| rest.strip_prefix('/'))
else {
return false;
};
rest
};
let segments = data_relative_path.split('/').collect::<Vec<_>>();
is_data_file_in_bucket(&segments, partition_depth)
}

fn classify_physical_path(
table_location: &str,
path: &str,
partition_depth: usize,
data_file_path_directory: Option<&str>,
) -> PhysicalFileKind {
let Some(relative_path) = table_relative_path(table_location, path) else {
return PhysicalFileKind::Other;
};
let relative_path = relative_path.trim_matches('/');
if relative_path.is_empty() {
return PhysicalFileKind::Other;
}

let segments = relative_path.split('/').collect::<Vec<_>>();

match segments.as_slice() {
["manifest", name] if is_manifest_file_name(name) => PhysicalFileKind::Manifest,
["statistics", _] => PhysicalFileKind::Statistics,
["index", _] => PhysicalFileKind::Index,
_ => {
if let Some(data_dir) = data_file_path_directory {
let data_dir = table_relative_path(table_location, data_dir).unwrap_or(data_dir);
if is_data_file_in_data_dir(relative_path, data_dir, partition_depth) {
PhysicalFileKind::Data
} else {
PhysicalFileKind::Other
}
} else if is_data_file_in_bucket(&segments, partition_depth) {
PhysicalFileKind::Data
} else {
PhysicalFileKind::Other
}
}
}
}

const DIR_LIST_CONCURRENCY: usize = 32;
Expand All @@ -643,13 +731,13 @@ const DIR_LIST_CONCURRENCY: usize = 32;
/// subdirectory recursively (up to 32 in parallel) to maximize throughput
/// on object stores with many partition directories.
///
/// Files are classified by their file name prefix:
/// - `manifest-*` / `index-manifest-*` → manifest
/// - `index-*` (excluding `index-manifest-*`) → index
/// - Everything else → data
/// Files are classified by their table-relative path. Only recognized Paimon
/// metadata directories and partition/bucket data paths are counted; unknown
/// files are ignored by this summary.
pub async fn collect_physical_files_summary(
file_io: &FileIO,
table_location: &str,
partition_depth: usize,
) -> crate::Result<PhysicalFilesSummary> {
// List top-level entries to discover subdirectories and top-level files
let top_entries = match file_io.list_status(table_location).await {
Expand All @@ -670,8 +758,13 @@ pub async fn collect_physical_files_summary(
if entry.is_dir {
sub_dirs.push(entry.path.clone());
} else {
let file_name = entry.path.rsplit('/').next().unwrap_or(&entry.path);
accumulate_file(&mut summary, file_name, entry.size);
accumulate_file(
&mut summary,
table_location,
&entry.path,
partition_depth,
entry.size,
);
}
}

Expand All @@ -695,28 +788,40 @@ pub async fn collect_physical_files_summary(
for result in dir_results {
let statuses = result?;
for status in &statuses {
let file_name = status.path.rsplit('/').next().unwrap_or(&status.path);
accumulate_file(&mut summary, file_name, status.size);
accumulate_file(
&mut summary,
table_location,
&status.path,
partition_depth,
status.size,
);
}
}

Ok(summary)
}

fn accumulate_file(summary: &mut PhysicalFilesSummary, file_name: &str, size: u64) {
match classify_file_name(file_name) {
FileType::Manifest => {
fn accumulate_file(
summary: &mut PhysicalFilesSummary,
table_location: &str,
path: &str,
partition_depth: usize,
size: u64,
) {
match classify_physical_path(table_location, path, partition_depth, None) {
PhysicalFileKind::Manifest | PhysicalFileKind::Statistics => {
summary.manifest_file_count += 1;
summary.manifest_file_size += size as i64;
}
FileType::Data => {
PhysicalFileKind::Data => {
summary.data_file_count += 1;
summary.data_file_size += size as i64;
}
FileType::Index => {
PhysicalFileKind::Index => {
summary.index_file_count += 1;
summary.index_file_size += size as i64;
}
PhysicalFileKind::Other => {}
}
}

Expand All @@ -726,11 +831,21 @@ mod tests {
use crate::io::FileIOBuilder;
use crate::spec::{CommitKind, Snapshot};
use crate::table::{BranchManager, SnapshotManager, TagManager};
use bytes::Bytes;

fn test_file_io() -> FileIO {
FileIOBuilder::new("memory").build().unwrap()
}

async fn write_test_file(file_io: &FileIO, path: &str, content: &str) {
file_io
.new_output(path)
.unwrap()
.write(Bytes::from(content.to_string()))
.await
.unwrap();
}

#[tokio::test]
async fn test_collect_empty_table() {
let file_io = test_file_io();
Expand Down Expand Up @@ -788,6 +903,137 @@ mod tests {
assert_eq!(result[1].data_file_count, 0);
}

#[tokio::test]
async fn test_physical_files_summary_uses_path_context_for_unpartitioned_table() {
let table_path = "memory:/test_physical_files_summary_path_context";
let file_io = test_file_io();

write_test_file(
&file_io,
&format!("{table_path}/manifest/manifest-list-0"),
"manifest list",
)
.await;
write_test_file(
&file_io,
&format!("{table_path}/manifest/manifest-0"),
"manifest",
)
.await;
write_test_file(
&file_io,
&format!("{table_path}/manifest/index-manifest-0"),
"index manifest",
)
.await;
write_test_file(&file_io, &format!("{table_path}/index/index-0"), "index").await;
write_test_file(&file_io, &format!("{table_path}/bucket-0/data-0"), "data").await;
write_test_file(
&file_io,
&format!("{table_path}/bucket-0/part-0.parquet"),
"data without prefix",
)
.await;
write_test_file(
&file_io,
&format!("{table_path}/bucket-0/index-should-not-be-data"),
"bucket index",
)
.await;
write_test_file(
&file_io,
&format!("{table_path}/bucket-postpone/data-u-0"),
"postpone data",
)
.await;
write_test_file(
&file_io,
&format!("{table_path}/nested/bucket-0/data-too-deep"),
"not a data bucket",
)
.await;
write_test_file(
&file_io,
&format!("{table_path}/data-root-file.parquet"),
"root data prefix",
)
.await;
write_test_file(
&file_io,
&format!("{table_path}/snapshot/snapshot-1"),
"snapshot",
)
.await;
write_test_file(&file_io, &format!("{table_path}/schema/schema-0"), "schema").await;
write_test_file(&file_io, &format!("{table_path}/tag/tag-v1"), "tag").await;
write_test_file(&file_io, &format!("{table_path}/_SUCCESS"), "success").await;
write_test_file(&file_io, &format!("{table_path}/random-file"), "random").await;
write_test_file(
&file_io,
&format!("{table_path}/statistics/stat-0"),
"statistics",
)
.await;
write_test_file(
&file_io,
&format!("{table_path}/manifest/stat-0"),
"not classified by statistics prefix",
)
.await;

let result = collect_physical_files_summary(&file_io, table_path, 0)
.await
.unwrap();

assert_eq!(result.manifest_file_count, 4);
assert_eq!(result.index_file_count, 1);
assert_eq!(result.data_file_count, 3);
}

#[tokio::test]
async fn test_physical_files_summary_uses_partition_depth() {
let table_path = "memory:/test_physical_files_summary_partitioned";
let file_io = test_file_io();

write_test_file(
&file_io,
&format!("{table_path}/dt=2026-05-21/bucket-0/part-0.parquet"),
"partition data",
)
.await;
write_test_file(
&file_io,
&format!("{table_path}/dt=2026-05-21/bucket-0/index-0"),
"partition bucket index",
)
.await;
write_test_file(
&file_io,
&format!("{table_path}/dt=2026-05-21/not-bucket/data-0"),
"not bucket",
)
.await;
write_test_file(
&file_io,
&format!("{table_path}/not_partition/bucket-0/data-0"),
"not partition",
)
.await;
write_test_file(
&file_io,
&format!("{table_path}/bucket-0/root-bucket-file"),
"wrong depth",
)
.await;

let result = collect_physical_files_summary(&file_io, table_path, 1)
.await
.unwrap();

assert_eq!(result.data_file_count, 1);
assert_eq!(result.index_file_count, 0);
}

#[tokio::test]
async fn test_branch_tag_referenced_files() {
use crate::spec::stats::BinaryTableStats;
Expand Down
Loading
Loading