From 4a3a005b737e6039bab0009f42803bcd72bb8721 Mon Sep 17 00:00:00 2001 From: liujiwen-up Date: Thu, 21 May 2026 18:06:17 +0800 Subject: [PATCH] fix: classify physical files by path context --- .../src/system_tables/physical_files_size.rs | 3 +- crates/paimon/src/table/referenced_files.rs | 302 ++++++++++++++++-- docs/src/sql.md | 18 +- 3 files changed, 286 insertions(+), 37 deletions(-) diff --git a/crates/integrations/datafusion/src/system_tables/physical_files_size.rs b/crates/integrations/datafusion/src/system_tables/physical_files_size.rs index 72b3d9e3..5380612b 100644 --- a/crates/integrations/datafusion/src/system_tables/physical_files_size.rs +++ b/crates/integrations/datafusion/src/system_tables/physical_files_size.rs @@ -82,7 +82,8 @@ impl TableProvider for PhysicalFilesSizeTable { ) -> DFResult> { let table = self.table.clone(); let summary = crate::runtime::await_with_runtime(async move { - collect_physical_files_summary(table.file_io(), table.location()).await + let partition_depth = table.schema().partition_keys().len(); + collect_physical_files_summary(table.file_io(), table.location(), partition_depth).await }) .await .map_err(to_datafusion_error)?; diff --git a/crates/paimon/src/table/referenced_files.rs b/crates/paimon/src/table/referenced_files.rs index 2c41c675..a170b3c8 100644 --- a/crates/paimon/src/table/referenced_files.rs +++ b/crates/paimon/src/table/referenced_files.rs @@ -614,25 +614,113 @@ pub struct PhysicalFilesSummary { pub index_file_size: i64, } -/// Categorize a file name into a file type. -/// Everything that is not a manifest/statistics or index file is classified as data. -fn classify_file_name(file_name: &str) -> FileType { - if file_name.starts_with("manifest-") - || file_name.starts_with("index-manifest-") - || file_name.starts_with("statistics-") - { - FileType::Manifest - } else if file_name.starts_with("index-") { - FileType::Index +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum PhysicalFileKind { + Manifest, + Statistics, + Data, + Index, + Other, +} + +fn table_relative_path<'a>(table_location: &str, path: &'a str) -> Option<&'a str> { + let table_location = table_location.trim_end_matches('/'); + if path == table_location { + Some("") } else { - FileType::Data + path.strip_prefix(table_location) + .and_then(|rest| rest.strip_prefix('/')) } } -enum FileType { - Manifest, - Data, - Index, +fn is_manifest_file_name(file_name: &str) -> bool { + file_name.starts_with("manifest-") + || file_name.starts_with("manifest-list-") + || file_name.starts_with("index-manifest-") +} + +fn is_bucket_dir_name(segment: &str) -> bool { + segment == "bucket-postpone" + || segment + .strip_prefix("bucket-") + .is_some_and(|bucket| !bucket.is_empty() && bucket.chars().all(|c| c.is_ascii_digit())) +} + +fn is_partition_segment(segment: &str) -> bool { + let Some((key, _value)) = segment.split_once('=') else { + return false; + }; + !key.is_empty() +} + +fn is_data_file_in_bucket(segments: &[&str], partition_depth: usize) -> bool { + if segments.len() != partition_depth + 2 { + return false; + } + + segments[..partition_depth] + .iter() + .all(|segment| is_partition_segment(segment)) + && is_bucket_dir_name(segments[partition_depth]) + && !segments[partition_depth + 1].starts_with("index-") +} + +fn is_data_file_in_data_dir( + relative_path: &str, + data_dir_relative_path: &str, + partition_depth: usize, +) -> bool { + let data_dir_relative_path = data_dir_relative_path.trim_matches('/'); + let data_relative_path = if data_dir_relative_path.is_empty() { + relative_path + } else { + let Some(rest) = relative_path + .strip_prefix(data_dir_relative_path) + .and_then(|rest| rest.strip_prefix('/')) + else { + return false; + }; + rest + }; + let segments = data_relative_path.split('/').collect::>(); + is_data_file_in_bucket(&segments, partition_depth) +} + +fn classify_physical_path( + table_location: &str, + path: &str, + partition_depth: usize, + data_file_path_directory: Option<&str>, +) -> PhysicalFileKind { + let Some(relative_path) = table_relative_path(table_location, path) else { + return PhysicalFileKind::Other; + }; + let relative_path = relative_path.trim_matches('/'); + if relative_path.is_empty() { + return PhysicalFileKind::Other; + } + + let segments = relative_path.split('/').collect::>(); + + match segments.as_slice() { + ["manifest", name] if is_manifest_file_name(name) => PhysicalFileKind::Manifest, + ["statistics", _] => PhysicalFileKind::Statistics, + ["index", _] => PhysicalFileKind::Index, + _ => { + if let Some(data_dir) = data_file_path_directory { + let data_dir = table_relative_path(table_location, data_dir).unwrap_or(data_dir); + if is_data_file_in_data_dir(relative_path, data_dir, partition_depth) { + PhysicalFileKind::Data + } else { + PhysicalFileKind::Other + } + } else if is_data_file_in_bucket(&segments, partition_depth) { + PhysicalFileKind::Data + } else { + PhysicalFileKind::Other + } + } + } } const DIR_LIST_CONCURRENCY: usize = 32; @@ -643,13 +731,13 @@ const DIR_LIST_CONCURRENCY: usize = 32; /// subdirectory recursively (up to 32 in parallel) to maximize throughput /// on object stores with many partition directories. /// -/// Files are classified by their file name prefix: -/// - `manifest-*` / `index-manifest-*` → manifest -/// - `index-*` (excluding `index-manifest-*`) → index -/// - Everything else → data +/// Files are classified by their table-relative path. Only recognized Paimon +/// metadata directories and partition/bucket data paths are counted; unknown +/// files are ignored by this summary. pub async fn collect_physical_files_summary( file_io: &FileIO, table_location: &str, + partition_depth: usize, ) -> crate::Result { // List top-level entries to discover subdirectories and top-level files let top_entries = match file_io.list_status(table_location).await { @@ -670,8 +758,13 @@ pub async fn collect_physical_files_summary( if entry.is_dir { sub_dirs.push(entry.path.clone()); } else { - let file_name = entry.path.rsplit('/').next().unwrap_or(&entry.path); - accumulate_file(&mut summary, file_name, entry.size); + accumulate_file( + &mut summary, + table_location, + &entry.path, + partition_depth, + entry.size, + ); } } @@ -695,28 +788,40 @@ pub async fn collect_physical_files_summary( for result in dir_results { let statuses = result?; for status in &statuses { - let file_name = status.path.rsplit('/').next().unwrap_or(&status.path); - accumulate_file(&mut summary, file_name, status.size); + accumulate_file( + &mut summary, + table_location, + &status.path, + partition_depth, + status.size, + ); } } Ok(summary) } -fn accumulate_file(summary: &mut PhysicalFilesSummary, file_name: &str, size: u64) { - match classify_file_name(file_name) { - FileType::Manifest => { +fn accumulate_file( + summary: &mut PhysicalFilesSummary, + table_location: &str, + path: &str, + partition_depth: usize, + size: u64, +) { + match classify_physical_path(table_location, path, partition_depth, None) { + PhysicalFileKind::Manifest | PhysicalFileKind::Statistics => { summary.manifest_file_count += 1; summary.manifest_file_size += size as i64; } - FileType::Data => { + PhysicalFileKind::Data => { summary.data_file_count += 1; summary.data_file_size += size as i64; } - FileType::Index => { + PhysicalFileKind::Index => { summary.index_file_count += 1; summary.index_file_size += size as i64; } + PhysicalFileKind::Other => {} } } @@ -726,11 +831,21 @@ mod tests { use crate::io::FileIOBuilder; use crate::spec::{CommitKind, Snapshot}; use crate::table::{BranchManager, SnapshotManager, TagManager}; + use bytes::Bytes; fn test_file_io() -> FileIO { FileIOBuilder::new("memory").build().unwrap() } + async fn write_test_file(file_io: &FileIO, path: &str, content: &str) { + file_io + .new_output(path) + .unwrap() + .write(Bytes::from(content.to_string())) + .await + .unwrap(); + } + #[tokio::test] async fn test_collect_empty_table() { let file_io = test_file_io(); @@ -788,6 +903,137 @@ mod tests { assert_eq!(result[1].data_file_count, 0); } + #[tokio::test] + async fn test_physical_files_summary_uses_path_context_for_unpartitioned_table() { + let table_path = "memory:/test_physical_files_summary_path_context"; + let file_io = test_file_io(); + + write_test_file( + &file_io, + &format!("{table_path}/manifest/manifest-list-0"), + "manifest list", + ) + .await; + write_test_file( + &file_io, + &format!("{table_path}/manifest/manifest-0"), + "manifest", + ) + .await; + write_test_file( + &file_io, + &format!("{table_path}/manifest/index-manifest-0"), + "index manifest", + ) + .await; + write_test_file(&file_io, &format!("{table_path}/index/index-0"), "index").await; + write_test_file(&file_io, &format!("{table_path}/bucket-0/data-0"), "data").await; + write_test_file( + &file_io, + &format!("{table_path}/bucket-0/part-0.parquet"), + "data without prefix", + ) + .await; + write_test_file( + &file_io, + &format!("{table_path}/bucket-0/index-should-not-be-data"), + "bucket index", + ) + .await; + write_test_file( + &file_io, + &format!("{table_path}/bucket-postpone/data-u-0"), + "postpone data", + ) + .await; + write_test_file( + &file_io, + &format!("{table_path}/nested/bucket-0/data-too-deep"), + "not a data bucket", + ) + .await; + write_test_file( + &file_io, + &format!("{table_path}/data-root-file.parquet"), + "root data prefix", + ) + .await; + write_test_file( + &file_io, + &format!("{table_path}/snapshot/snapshot-1"), + "snapshot", + ) + .await; + write_test_file(&file_io, &format!("{table_path}/schema/schema-0"), "schema").await; + write_test_file(&file_io, &format!("{table_path}/tag/tag-v1"), "tag").await; + write_test_file(&file_io, &format!("{table_path}/_SUCCESS"), "success").await; + write_test_file(&file_io, &format!("{table_path}/random-file"), "random").await; + write_test_file( + &file_io, + &format!("{table_path}/statistics/stat-0"), + "statistics", + ) + .await; + write_test_file( + &file_io, + &format!("{table_path}/manifest/stat-0"), + "not classified by statistics prefix", + ) + .await; + + let result = collect_physical_files_summary(&file_io, table_path, 0) + .await + .unwrap(); + + assert_eq!(result.manifest_file_count, 4); + assert_eq!(result.index_file_count, 1); + assert_eq!(result.data_file_count, 3); + } + + #[tokio::test] + async fn test_physical_files_summary_uses_partition_depth() { + let table_path = "memory:/test_physical_files_summary_partitioned"; + let file_io = test_file_io(); + + write_test_file( + &file_io, + &format!("{table_path}/dt=2026-05-21/bucket-0/part-0.parquet"), + "partition data", + ) + .await; + write_test_file( + &file_io, + &format!("{table_path}/dt=2026-05-21/bucket-0/index-0"), + "partition bucket index", + ) + .await; + write_test_file( + &file_io, + &format!("{table_path}/dt=2026-05-21/not-bucket/data-0"), + "not bucket", + ) + .await; + write_test_file( + &file_io, + &format!("{table_path}/not_partition/bucket-0/data-0"), + "not partition", + ) + .await; + write_test_file( + &file_io, + &format!("{table_path}/bucket-0/root-bucket-file"), + "wrong depth", + ) + .await; + + let result = collect_physical_files_summary(&file_io, table_path, 1) + .await + .unwrap(); + + assert_eq!(result.data_file_count, 1); + assert_eq!(result.index_file_count, 0); + } + #[tokio::test] async fn test_branch_tag_referenced_files() { use crate::spec::stats::BinaryTableStats; diff --git a/docs/src/sql.md b/docs/src/sql.md index 325d236e..4a8dc4f7 100644 --- a/docs/src/sql.md +++ b/docs/src/sql.md @@ -806,12 +806,14 @@ Columns: ### $physical_files_size -Scan the table directory recursively and compute the total size of all physical files on disk, categorized by file type. By comparing with `$referenced_files_size`, you can identify orphan files that are no longer referenced by any snapshot. +Scan the table directory recursively and compute the total size of recognized physical files on disk, categorized by file type. This table is a diagnostic size summary; orphan cleanup needs file-level candidates and retention checks, not just aggregate size differences. -Files are classified by their file name prefix: -- `manifest-*` / `index-manifest-*` → manifest -- `index-*` (excluding `index-manifest-*`) → index -- Everything else → data +Files are classified by their table-relative path: +- `manifest/manifest-*`, `manifest/manifest-list-*`, and `manifest/index-manifest-*` → manifest +- `statistics/*` → manifest file counters for the current compatible output schema +- `index/*` → index +- `/bucket-*/*` and `/bucket-postpone/*` → data, using the table's partition depth +- unknown files are ignored by this summary ```sql SELECT * FROM paimon.default.my_table$physical_files_size; @@ -823,8 +825,8 @@ Columns: |---|---|---| | `manifest_file_count` | BIGINT | Number of manifest files on disk | | `manifest_file_size` | BIGINT | Total size of manifest files (bytes) | -| `data_file_count` | BIGINT | Number of data files on disk | -| `data_file_size` | BIGINT | Total size of data files (bytes) | +| `data_file_count` | BIGINT | Number of recognized data files on disk | +| `data_file_size` | BIGINT | Total size of recognized data files (bytes) | | `index_file_count` | BIGINT | Number of index files on disk | | `index_file_size` | BIGINT | Total size of index files (bytes) | @@ -855,7 +857,7 @@ The output contains one row per scope: - `branch:main` — main branch snapshots + tag snapshots - `branch:` — one row per other branch -To identify orphan file size: +To estimate possible orphan file size for recognized data files: ```sql SELECT p.data_file_size - r.data_file_size AS orphan_data_size