diff --git a/crates/cli/src/commands/ls.rs b/crates/cli/src/commands/ls.rs index 9410a90..0b55820 100644 --- a/crates/cli/src/commands/ls.rs +++ b/crates/cli/src/commands/ls.rs @@ -3,7 +3,10 @@ //! Lists buckets when given an alias only, or lists objects when given a bucket path. use clap::Args; -use rc_core::{AliasManager, ListOptions, ObjectInfo, ObjectStore as _, RemotePath}; +use rc_core::{ + AliasManager, Error, ListOptions, ObjectInfo, ObjectStore as _, ObjectVersionListResult, + RemotePath, +}; use rc_s3::S3Client; use serde::Serialize; use std::collections::HashMap; @@ -52,6 +55,30 @@ struct Summary { total_size_human: String, } +#[derive(Debug, Serialize)] +struct LsVersionOutput { + items: Vec, + truncated: bool, + #[serde(skip_serializing_if = "Option::is_none")] + continuation_token: Option, + #[serde(skip_serializing_if = "Option::is_none")] + version_id_marker: Option, +} + +#[derive(Debug, Serialize)] +struct LsVersionInfo { + key: String, + version_id: String, + is_latest: bool, + is_delete_marker: bool, + #[serde(skip_serializing_if = "Option::is_none")] + last_modified: Option, + #[serde(skip_serializing_if = "Option::is_none")] + size_bytes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + size_human: Option, +} + /// Execute the ls command pub async fn execute(args: LsArgs, output_config: OutputConfig) -> ExitCode { let formatter = Formatter::new(output_config); @@ -104,10 +131,107 @@ pub async fn execute(args: LsArgs, output_config: OutputConfig) -> ExitCode { let bucket = bucket.unwrap(); let path = RemotePath::new(&alias_name, &bucket, prefix.unwrap_or_default()); + if args.versions { + return list_object_versions(&client, &path, args.summarize, &formatter).await; + } + // List objects list_objects(&client, &path, &args, &formatter).await } +async fn list_object_versions( + client: &S3Client, + path: &RemotePath, + summarize: bool, + formatter: &Formatter, +) -> ExitCode { + match client.list_object_versions_page(path, Some(1000)).await { + Ok(result) => { + let versions = result.items.clone(); + let total_size: i64 = versions.iter().filter_map(|v| v.size_bytes).sum(); + + if formatter.is_json() { + formatter.json(&ls_version_output(result)); + } else { + for version in &versions { + let marker = if version.is_delete_marker { + " [DELETE]" + } else { + "" + }; + let latest = if version.is_latest { "*" } else { " " }; + let size = version + .size_bytes + .map(|s| humansize::format_size(s as u64, humansize::BINARY)) + .unwrap_or_default(); + + formatter.println(&format!( + "{latest} {:<40} {:>10} {:>12}{marker}", + version.key, + version.version_id.chars().take(10).collect::(), + size + )); + } + + if summarize { + let total_size_human = + humansize::format_size(total_size as u64, humansize::BINARY); + formatter.println(&format!( + "\nTotal: {} version(s), {}", + formatter.style_size(&versions.len().to_string()), + formatter.style_size(&total_size_human) + )); + } + } + + ExitCode::Success + } + Err(e) => { + formatter.error(&format!("Failed to list versions: {e}")); + exit_code_from_version_listing_error(&e) + } + } +} + +fn ls_version_output(result: ObjectVersionListResult) -> LsVersionOutput { + let items = result + .items + .into_iter() + .map(|v| LsVersionInfo { + key: v.key, + version_id: v.version_id, + is_latest: v.is_latest, + is_delete_marker: v.is_delete_marker, + last_modified: v.last_modified.map(|t| t.to_string()), + size_bytes: v.size_bytes, + size_human: v + .size_bytes + .map(|s| humansize::format_size(s as u64, humansize::BINARY)), + }) + .collect(); + + LsVersionOutput { + items, + truncated: result.truncated, + continuation_token: result.continuation_token, + version_id_marker: result.version_id_marker, + } +} + +fn exit_code_from_version_listing_error(error: &Error) -> ExitCode { + match error { + Error::NotFound(_) => ExitCode::NotFound, + _ => { + let error_text = error.to_string(); + if error_text.contains("NotFound") || error_text.contains("NoSuchBucket") { + ExitCode::NotFound + } else { + ExitCode::GeneralError + } + } + } +} + async fn list_buckets(client: &S3Client, formatter: &Formatter, summarize: bool) -> ExitCode { match client.list_buckets().await { Ok(buckets) => { @@ -433,6 +557,7 @@ fn parse_ls_path(path: &str) -> Result<(String, Option, Option), #[cfg(test)] mod tests { use super::*; + use rc_core::ObjectVersion; #[test] fn test_parse_ls_path_alias_only() { @@ -493,4 +618,52 @@ mod tests { assert_eq!(alias_listing_mode(Some(&bucket), false), None); assert_eq!(alias_listing_mode(Some(&bucket), true), None); } + + #[test] + fn test_ls_version_output_preserves_pagination_metadata() { + let output = ls_version_output(ObjectVersionListResult { + items: vec![ObjectVersion { + key: "logs/a.txt".to_string(), + version_id: "v1".to_string(), + is_latest: true, + is_delete_marker: false, + last_modified: None, + size_bytes: Some(12), + etag: None, + }], + truncated: true, + continuation_token: Some("logs/b.txt".to_string()), + version_id_marker: Some("v2".to_string()), + }); + + let json = serde_json::to_value(output).unwrap(); + assert_eq!(json["truncated"], true); + assert_eq!(json["continuation_token"], "logs/b.txt"); + assert_eq!(json["version_id_marker"], "v2"); + assert_eq!(json["items"][0]["key"], "logs/a.txt"); + } + + #[test] + fn test_version_listing_not_found_errors_use_not_found_exit_code() { + assert_eq!( + exit_code_from_version_listing_error(&Error::NotFound("missing".to_string())), + ExitCode::NotFound + ); + assert_eq!( + exit_code_from_version_listing_error(&Error::General( + "list_object_versions: Service error: NoSuchBucket".to_string() + )), + ExitCode::NotFound + ); + } + + #[test] + fn test_version_listing_other_errors_use_general_exit_code() { + assert_eq!( + exit_code_from_version_listing_error(&Error::General( + "list_object_versions: timeout".to_string() + )), + ExitCode::GeneralError + ); + } } diff --git a/crates/cli/tests/integration.rs b/crates/cli/tests/integration.rs index 8a2bfa9..fa40f60 100644 --- a/crates/cli/tests/integration.rs +++ b/crates/cli/tests/integration.rs @@ -2741,6 +2741,122 @@ mod version_operations { cleanup_bucket(config_dir.path(), &bucket_name); } + #[test] + fn test_ls_versions_lists_multiple_object_versions() { + let (config_dir, bucket_name) = match setup_with_alias("lsversions") { + Some(v) => v, + None => { + eprintln!("Skipping: S3 test config not available"); + return; + } + }; + + let enable_output = run_rc( + &[ + "version", + "enable", + &format!("test/{}", bucket_name), + "--json", + ], + config_dir.path(), + ); + if !enable_output.status.success() { + eprintln!( + "Enable versioning not supported: {}", + String::from_utf8_lossy(&enable_output.stderr) + ); + cleanup_bucket(config_dir.path(), &bucket_name); + return; + } + + let temp_file = tempfile::NamedTempFile::new().expect("Failed to create temp file"); + let object_key = "multi-version.txt"; + + std::fs::write(temp_file.path(), "first version").expect("Failed to write first version"); + let first_upload_output = run_rc( + &[ + "cp", + temp_file + .path() + .to_str() + .expect("Temp file path should be UTF-8"), + &format!("test/{}/{}", bucket_name, object_key), + ], + config_dir.path(), + ); + assert!( + first_upload_output.status.success(), + "Failed to upload first version: {}", + String::from_utf8_lossy(&first_upload_output.stderr) + ); + + std::thread::sleep(Duration::from_secs(1)); + std::fs::write(temp_file.path(), "second version with more bytes") + .expect("Failed to write second version"); + let second_upload_output = run_rc( + &[ + "cp", + temp_file + .path() + .to_str() + .expect("Temp file path should be UTF-8"), + &format!("test/{}/{}", bucket_name, object_key), + ], + config_dir.path(), + ); + assert!( + second_upload_output.status.success(), + "Failed to upload second version: {}", + String::from_utf8_lossy(&second_upload_output.stderr) + ); + + let list_output = run_rc( + &[ + "ls", + &format!("test/{}/", bucket_name), + "--versions", + "--json", + ], + config_dir.path(), + ); + assert!( + list_output.status.success(), + "Failed to list versions through ls: {}", + String::from_utf8_lossy(&list_output.stderr) + ); + + let stdout = String::from_utf8_lossy(&list_output.stdout); + let payload: serde_json::Value = + serde_json::from_str(&stdout).expect("Invalid JSON ls version output"); + let items = payload["items"] + .as_array() + .expect("ls version output should expose an items array"); + let matching_versions: Vec<&serde_json::Value> = items + .iter() + .filter(|entry| entry["key"].as_str() == Some(object_key)) + .collect(); + + assert_eq!( + matching_versions.len(), + 2, + "Expected ls --versions to return both object versions" + ); + assert!( + matching_versions + .iter() + .any(|entry| entry["is_latest"].as_bool() == Some(true)), + "Expected one listed version to be marked as latest" + ); + assert!( + matching_versions + .iter() + .all(|entry| entry["version_id"].as_str().is_some()), + "Expected every listed version to carry a version_id" + ); + + cleanup_bucket(config_dir.path(), &bucket_name); + } + #[test] fn test_rm_recursive_purge_permanently_deletes_versioned_prefix() { let (config_dir, bucket_name) = match setup_with_alias("rmpurgeprefix") { diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 28d249f..5246622 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -38,5 +38,5 @@ pub use retry::{RetryBuilder, is_retryable_error, retry_with_backoff}; pub use select::{SelectCompression, SelectInputFormat, SelectOptions, SelectOutputFormat}; pub use traits::{ BucketNotification, Capabilities, ListOptions, ListResult, NotificationTarget, ObjectInfo, - ObjectStore, ObjectVersion, + ObjectStore, ObjectVersion, ObjectVersionListResult, }; diff --git a/crates/core/src/traits.rs b/crates/core/src/traits.rs index 52d9670..183fec7 100644 --- a/crates/core/src/traits.rs +++ b/crates/core/src/traits.rs @@ -45,6 +45,24 @@ pub struct ObjectVersion { pub etag: Option, } +/// Result of an object version list operation +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ObjectVersionListResult { + /// Listed object versions and delete markers + pub items: Vec, + + /// Whether the result is truncated (more items available) + pub truncated: bool, + + /// Continuation key marker for pagination + #[serde(skip_serializing_if = "Option::is_none")] + pub continuation_token: Option, + + /// Continuation version marker for pagination + #[serde(skip_serializing_if = "Option::is_none")] + pub version_id_marker: Option, +} + /// Metadata for an object or bucket #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ObjectInfo { diff --git a/crates/s3/src/client.rs b/crates/s3/src/client.rs index 9765f0b..02042b3 100644 --- a/crates/s3/src/client.rs +++ b/crates/s3/src/client.rs @@ -22,8 +22,8 @@ use jiff::Timestamp; use quick_xml::de::from_str as from_xml_str; use rc_core::{ Alias, BucketNotification, Capabilities, CorsRule, Error, LifecycleRule, ListOptions, - ListResult, NotificationTarget, ObjectInfo, ObjectStore, ObjectVersion, RemotePath, - ReplicationConfiguration, Result, SelectOptions, + ListResult, NotificationTarget, ObjectInfo, ObjectStore, ObjectVersion, + ObjectVersionListResult, RemotePath, ReplicationConfiguration, Result, SelectOptions, }; use reqwest::Method; use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue}; @@ -744,6 +744,75 @@ impl S3Client { &self.inner } + /// List a single page of object versions and return pagination metadata. + pub async fn list_object_versions_page( + &self, + path: &RemotePath, + max_keys: Option, + ) -> Result { + let mut builder = self.inner.list_object_versions().bucket(&path.bucket); + + if !path.key.is_empty() { + builder = builder.prefix(&path.key); + } + + if let Some(max) = max_keys { + builder = builder.max_keys(max); + } + + let response = builder.send().await.map_err(|e| { + let err_str = e.to_string(); + if err_str.contains("NotFound") || err_str.contains("NoSuchBucket") { + Error::NotFound(format!("Bucket not found: {}", path.bucket)) + } else { + Error::General(format!("list_object_versions: {e}")) + } + })?; + + let mut items = Vec::new(); + + for v in response.versions() { + items.push(ObjectVersion { + key: v.key().unwrap_or_default().to_string(), + version_id: v.version_id().unwrap_or("null").to_string(), + is_latest: v.is_latest().unwrap_or(false), + is_delete_marker: false, + last_modified: v + .last_modified() + .and_then(|dt| Timestamp::from_second(dt.secs()).ok()), + size_bytes: v.size(), + etag: v.e_tag().map(|s| s.trim_matches('"').to_string()), + }); + } + + for m in response.delete_markers() { + items.push(ObjectVersion { + key: m.key().unwrap_or_default().to_string(), + version_id: m.version_id().unwrap_or("null").to_string(), + is_latest: m.is_latest().unwrap_or(false), + is_delete_marker: true, + last_modified: m + .last_modified() + .and_then(|dt| Timestamp::from_second(dt.secs()).ok()), + size_bytes: None, + etag: None, + }); + } + + items.sort_by(|a, b| { + a.key + .cmp(&b.key) + .then_with(|| b.last_modified.cmp(&a.last_modified)) + }); + + Ok(ObjectVersionListResult { + items, + truncated: response.is_truncated().unwrap_or(false), + continuation_token: response.next_key_marker().map(ToString::to_string), + version_id_marker: response.next_version_id_marker().map(ToString::to_string), + }) + } + /// Download object content and report downloaded bytes after each received chunk. pub async fn get_object_with_progress( &self, @@ -1930,61 +1999,7 @@ impl ObjectStore for S3Client { path: &RemotePath, max_keys: Option, ) -> Result> { - let mut builder = self.inner.list_object_versions().bucket(&path.bucket); - - if !path.key.is_empty() { - builder = builder.prefix(&path.key); - } - - if let Some(max) = max_keys { - builder = builder.max_keys(max); - } - - let response = builder - .send() - .await - .map_err(|e| Error::General(format!("list_object_versions: {e}")))?; - - let mut versions = Vec::new(); - - // Add regular versions - for v in response.versions() { - versions.push(ObjectVersion { - key: v.key().unwrap_or_default().to_string(), - version_id: v.version_id().unwrap_or("null").to_string(), - is_latest: v.is_latest().unwrap_or(false), - is_delete_marker: false, - last_modified: v - .last_modified() - .and_then(|dt| Timestamp::from_second(dt.secs()).ok()), - size_bytes: v.size(), - etag: v.e_tag().map(|s| s.trim_matches('"').to_string()), - }); - } - - // Add delete markers - for m in response.delete_markers() { - versions.push(ObjectVersion { - key: m.key().unwrap_or_default().to_string(), - version_id: m.version_id().unwrap_or("null").to_string(), - is_latest: m.is_latest().unwrap_or(false), - is_delete_marker: true, - last_modified: m - .last_modified() - .and_then(|dt| Timestamp::from_second(dt.secs()).ok()), - size_bytes: None, - etag: None, - }); - } - - // Sort by key and then by last_modified (descending) - versions.sort_by(|a, b| { - a.key - .cmp(&b.key) - .then_with(|| b.last_modified.cmp(&a.last_modified)) - }); - - Ok(versions) + Ok(self.list_object_versions_page(path, max_keys).await?.items) } async fn get_object_tags(