From f26397db348108b9df36dc3118bc61c3f81ef294 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Mon, 26 Jan 2026 08:31:17 -0600 Subject: [PATCH 1/3] Add PaginatedKVStore traits upstreamed from ldk-server Allows for a paginated KV store for more efficient listing of keys so you don't need to fetch all at once. Uses monotonic counter or timestamp to track the order of keys and allow for pagination. The traits are largely just copy-pasted from ldk-server. Adds some basic tests that were generated using claude code. --- lightning/src/util/persist.rs | 265 ++++++++++++++++++++++++++++++- lightning/src/util/test_utils.rs | 119 +++++++++++++- 2 files changed, 382 insertions(+), 2 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index cb4bdeb6a51..9f8bd0dd1e1 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -367,6 +367,195 @@ where } } +/// An opaque token used for paginated listing operations. +/// +/// This token should be treated as an opaque value by callers. Pass the token returned from +/// one `list_paginated` call to the next call to continue pagination. The internal format +/// is implementation-defined and may change between versions. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PageToken(pub String); + +/// Represents the response from a paginated `list` operation. +/// +/// Contains the list of keys and a token for retrieving the next page of results. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PaginatedListResponse { + /// A vector of keys, ordered from most recently created to least recently created. + pub keys: Vec, + + /// A token that can be passed to the next call to continue pagination. + /// + /// Is `None` if there are no more pages to retrieve. + pub next_page_token: Option, +} + +/// Provides an interface that allows storage and retrieval of persisted values that are associated +/// with given keys, with support for pagination. +/// +/// In order to avoid collisions, the key space is segmented based on the given `primary_namespace`s +/// and `secondary_namespace`s. Implementations of this trait are free to handle them in different +/// ways, as long as per-namespace key uniqueness is asserted. +/// +/// Keys and namespaces are required to be valid ASCII strings in the range of +/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty +/// primary namespaces and secondary namespaces (`""`) are considered valid; however, if +/// `primary_namespace` is empty, `secondary_namespace` must also be empty. This means that concerns +/// should always be separated by primary namespace first, before secondary namespaces are used. +/// While the number of primary namespaces will be relatively small and determined at compile time, +/// there may be many secondary namespaces per primary namespace. Note that per-namespace uniqueness +/// needs to also hold for keys *and* namespaces in any given namespace, i.e., conflicts between keys +/// and equally named primary or secondary namespaces must be avoided. +/// +/// **Note:** This trait extends the functionality of [`KVStoreSync`] by adding support for +/// paginated listing of keys in creation order. This is useful when dealing with a large number +/// of keys that cannot be efficiently retrieved all at once. +/// +/// For an asynchronous version of this trait, see [`PaginatedKVStore`]. +pub trait PaginatedKVStoreSync: KVStoreSync { + /// Returns a paginated list of keys that are stored under the given `secondary_namespace` in + /// `primary_namespace`, ordered from most recently created to least recently created. + /// + /// Implementations must return keys in reverse creation order (newest first). How creation + /// order is tracked is implementation-defined (e.g., storing creation timestamps, using an + /// incrementing ID, or another mechanism). Creation order (not last-updated order) is used + /// to prevent race conditions during pagination: if keys were ordered by update time, a key + /// updated mid-pagination could shift position, causing it to be skipped or returned twice + /// across pages. + /// + /// If `page_token` is provided, listing continues from where the previous page left off. + /// If `None`, listing starts from the most recently created entry. The `next_page_token` + /// in the returned [`PaginatedListResponse`] can be passed to subsequent calls to fetch + /// the next page. + /// + /// Implementations must generate a [`PageToken`] that encodes enough information to resume + /// listing from the correct position. The token should encode the creation timestamp (or + /// sequence number) and key name of the last returned entry. Tokens must remain valid across + /// multiple calls within a reasonable timeframe. If the entry referenced by a token has been + /// deleted, implementations should resume from the next valid position rather than failing. + /// Tokens are scoped to a specific `(primary_namespace, secondary_namespace)` pair and should + /// not be used across different namespace pairs. + /// + /// Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown or if + /// there are no more keys to return. + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> Result; +} + +/// A wrapper around a [`PaginatedKVStoreSync`] that implements the [`PaginatedKVStore`] trait. +/// It is not necessary to use this type directly. +#[derive(Clone)] +pub struct PaginatedKVStoreSyncWrapper(pub K) +where + K::Target: PaginatedKVStoreSync; + +/// This is not exported to bindings users as async is only supported in Rust. +impl KVStore for PaginatedKVStoreSyncWrapper +where + K::Target: PaginatedKVStoreSync, +{ + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + MaybeSend { + let res = self.0.read(primary_namespace, secondary_namespace, key); + + async move { res } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.write(primary_namespace, secondary_namespace, key, buf); + + async move { res } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy); + + async move { res } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + MaybeSend { + let res = self.0.list(primary_namespace, secondary_namespace); + + async move { res } + } +} + +/// This is not exported to bindings users as async is only supported in Rust. +impl PaginatedKVStore for PaginatedKVStoreSyncWrapper +where + K::Target: PaginatedKVStoreSync, +{ + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.list_paginated(primary_namespace, secondary_namespace, page_token); + + async move { res } + } +} + +/// Provides an interface that allows storage and retrieval of persisted values that are associated +/// with given keys, with support for pagination. +/// +/// In order to avoid collisions, the key space is segmented based on the given `primary_namespace`s +/// and `secondary_namespace`s. Implementations of this trait are free to handle them in different +/// ways, as long as per-namespace key uniqueness is asserted. +/// +/// Keys and namespaces are required to be valid ASCII strings in the range of +/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty +/// primary namespaces and secondary namespaces (`""`) are considered valid; however, if +/// `primary_namespace` is empty, `secondary_namespace` must also be empty. This means that concerns +/// should always be separated by primary namespace first, before secondary namespaces are used. +/// While the number of primary namespaces will be relatively small and determined at compile time, +/// there may be many secondary namespaces per primary namespace. Note that per-namespace uniqueness +/// needs to also hold for keys *and* namespaces in any given namespace, i.e., conflicts between keys +/// and equally named primary or secondary namespaces must be avoided. +/// +/// **Note:** This trait extends the functionality of [`KVStore`] by adding support for +/// paginated listing of keys in creation order. This is useful when dealing with a large number +/// of keys that cannot be efficiently retrieved all at once. +/// +/// For a synchronous version of this trait, see [`PaginatedKVStoreSync`]. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait PaginatedKVStore: KVStore { + /// Returns a paginated list of keys that are stored under the given `secondary_namespace` in + /// `primary_namespace`, ordered from most recently created to least recently created. + /// + /// Implementations must return keys in reverse creation order (newest first). How creation + /// order is tracked is implementation-defined (e.g., storing creation timestamps, using an + /// incrementing ID, or another mechanism). Creation order (not last-updated order) is used + /// to prevent race conditions during pagination: if keys were ordered by update time, a key + /// updated mid-pagination could shift position, causing it to be skipped or returned twice + /// across pages. + /// + /// If `page_token` is provided, listing continues from where the previous page left off. + /// If `None`, listing starts from the most recently created entry. The `next_page_token` + /// in the returned [`PaginatedListResponse`] can be passed to subsequent calls to fetch + /// the next page. + /// + /// Implementations must generate a [`PageToken`] that encodes enough information to resume + /// listing from the correct position. The token should encode the creation timestamp (or + /// sequence number) and key name of the last returned entry. Tokens must remain valid across + /// multiple calls within a reasonable timeframe. If the entry referenced by a token has been + /// deleted, implementations should resume from the next valid position rather than failing. + /// Tokens are scoped to a specific `(primary_namespace, secondary_namespace)` pair and should + /// not be used across different namespace pairs. + /// + /// Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown or if + /// there are no more keys to return. + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + MaybeSend; +} + /// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] /// data migration. pub trait MigratableKVStore: KVStoreSync { @@ -1539,7 +1728,7 @@ mod tests { use crate::ln::msgs::BaseMessageHandler; use crate::sync::Arc; use crate::util::test_channel_signer::TestChannelSigner; - use crate::util::test_utils::{self, TestStore}; + use crate::util::test_utils::{self, TestPaginatedStore, TestStore}; use bitcoin::hashes::hex::FromHex; use core::cmp; @@ -1951,4 +2140,78 @@ mod tests { let store: Arc = Arc::new(TestStore::new(false)); assert!(persist_fn::<_, TestChannelSigner>(Arc::clone(&store))); } + + #[test] + fn paginated_store_basic_operations() { + let store = TestPaginatedStore::new(10); + + // Write some data + store.write("ns1", "ns2", "key1", vec![1, 2, 3]).unwrap(); + store.write("ns1", "ns2", "key2", vec![4, 5, 6]).unwrap(); + + // Read it back + assert_eq!(KVStoreSync::read(&store, "ns1", "ns2", "key1").unwrap(), vec![1, 2, 3]); + assert_eq!(KVStoreSync::read(&store, "ns1", "ns2", "key2").unwrap(), vec![4, 5, 6]); + + // List should return keys in descending order + let response = store.list_paginated("ns1", "ns2", None).unwrap(); + assert_eq!(response.keys, vec!["key2", "key1"]); + assert!(response.next_page_token.is_none()); + + // Remove a key + KVStoreSync::remove(&store, "ns1", "ns2", "key1", false).unwrap(); + assert!(KVStoreSync::read(&store, "ns1", "ns2", "key1").is_err()); + } + + #[test] + fn paginated_store_pagination() { + let store = TestPaginatedStore::new(2); + + // Write 5 items with different order values + for i in 0..5i64 { + store.write("ns", "", &format!("key{i}"), vec![i as u8]).unwrap(); + } + + // First page should have 2 items (most recently created first: key4, key3) + let page1 = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(page1.keys.len(), 2); + assert_eq!(page1.keys, vec!["key4", "key3"]); + assert!(page1.next_page_token.is_some()); + + // Second page + let page2 = store.list_paginated("ns", "", page1.next_page_token).unwrap(); + assert_eq!(page2.keys.len(), 2); + assert_eq!(page2.keys, vec!["key2", "key1"]); + assert!(page2.next_page_token.is_some()); + + // Third page (last item) + let page3 = store.list_paginated("ns", "", page2.next_page_token).unwrap(); + assert_eq!(page3.keys.len(), 1); + assert_eq!(page3.keys, vec!["key0"]); + assert!(page3.next_page_token.is_none()); + } + + #[test] + fn paginated_store_update_preserves_order() { + let store = TestPaginatedStore::new(10); + + // Write items with specific order values + store.write("ns", "", "key1", vec![1]).unwrap(); + store.write("ns", "", "key2", vec![2]).unwrap(); + store.write("ns", "", "key3", vec![3]).unwrap(); + + // Verify initial order (newest first) + let response = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(response.keys, vec!["key3", "key2", "key1"]); + + // Update key1 with a new order value that would put it first if used + store.write("ns", "", "key1", vec![1, 1]).unwrap(); + + // Verify data was updated + assert_eq!(KVStoreSync::read(&store, "ns", "", "key1").unwrap(), vec![1, 1]); + + // Verify order is unchanged - creation order should have been preserved + let response = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(response.keys, vec!["key3", "key2", "key1"]); + } } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index bcf39fde482..79ad4182323 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -51,6 +51,7 @@ use crate::sign::{ChannelSigner, PeerStorageKey}; use crate::sync::RwLock; use crate::types::features::{ChannelFeatures, InitFeatures, NodeFeatures}; use crate::util::async_poll::MaybeSend; +use crate::util::atomic_counter::AtomicCounter; use crate::util::config::UserConfig; use crate::util::dyn_signer::{ DynKeysInterface, DynKeysInterfaceTrait, DynPhantomKeysInterface, DynSigner, @@ -58,7 +59,7 @@ use crate::util::dyn_signer::{ use crate::util::logger::{Logger, Record}; #[cfg(feature = "std")] use crate::util::mut_global::MutGlobal; -use crate::util::persist::{KVStore, KVStoreSync, MonitorName}; +use crate::util::persist::{KVStore, KVStoreSync, MonitorName, PageToken, PaginatedListResponse}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner}; use crate::util::wakers::Notifier; @@ -1129,6 +1130,122 @@ impl KVStoreSync for TestStore { unsafe impl Sync for TestStore {} unsafe impl Send for TestStore {} +/// A simple in-memory implementation of [`PaginatedKVStoreSync`] for testing. +/// +/// [`PaginatedKVStoreSync`]: crate::util::persist::PaginatedKVStoreSync +pub struct TestPaginatedStore { + data: Mutex)>>, + page_size: usize, + time_counter: AtomicCounter, +} + +impl TestPaginatedStore { + /// Creates a new `TestPaginatedStore` with the given page size. + pub fn new(page_size: usize) -> Self { + Self { data: Mutex::new(new_hash_map()), page_size, time_counter: AtomicCounter::new() } + } +} + +impl KVStoreSync for TestPaginatedStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, io::Error> { + let data = self.data.lock().unwrap(); + data.get(&(primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string())) + .map(|(_, v)| v.clone()) + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "key not found")) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), io::Error> { + let mut data = self.data.lock().unwrap(); + let order = self.time_counter.next() as i64; + let key_tuple = + (primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string()); + // Only use order for new entries; preserve existing order on updates + let order_to_use = + data.get(&key_tuple).map(|(existing_order, _)| *existing_order).unwrap_or(order); + data.insert(key_tuple, (order_to_use, buf)); + Ok(()) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> Result<(), io::Error> { + let mut data = self.data.lock().unwrap(); + data.remove(&( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + )); + Ok(()) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, io::Error> { + let mut all_keys = Vec::new(); + let mut page_token = None; + loop { + let response = crate::util::persist::PaginatedKVStoreSync::list_paginated( + self, + primary_namespace, + secondary_namespace, + page_token, + )?; + all_keys.extend(response.keys); + match response.next_page_token { + Some(token) => page_token = Some(token), + None => break, + } + } + Ok(all_keys) + } +} + +impl crate::util::persist::PaginatedKVStoreSync for TestPaginatedStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> Result { + let data = self.data.lock().unwrap(); + let mut entries: Vec<_> = data + .iter() + .filter(|((pn, sn, _), _)| pn == primary_namespace && sn == secondary_namespace) + .map(|((_, _, k), (t, _))| (k.clone(), *t)) + .collect(); + + // Sort by time descending, then by key + entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0))); + + // Apply pagination: find the first entry AFTER the given key in sort order. + // This implementation uses the last key as the page token. + let start_idx = if let Some(PageToken(ref last_key)) = page_token { + // Find the position of this key and start after it + entries.iter().position(|(k, _)| k == last_key).map(|pos| pos + 1).unwrap_or(0) + } else { + 0 + }; + + let page_entries: Vec<_> = + entries.into_iter().skip(start_idx).take(self.page_size).collect(); + + let next_page_token = if page_entries.len() == self.page_size { + page_entries.last().map(|(k, _)| PageToken(k.clone())) + } else { + None + }; + + Ok(PaginatedListResponse { + keys: page_entries.into_iter().map(|(k, _)| k).collect(), + next_page_token, + }) + } +} + +unsafe impl Sync for TestPaginatedStore {} +unsafe impl Send for TestPaginatedStore {} + pub struct TestBroadcaster { pub txn_broadcasted: Mutex>, pub txn_types: Mutex>, From 8cfd5ccb4fb5ab2fddc2807e04675b10641ce15e Mon Sep 17 00:00:00 2001 From: benthecarman Date: Fri, 13 Feb 2026 14:17:54 -0600 Subject: [PATCH 2/3] prefactor: move FilesystemStore utilies into fs_store_common.rs Ahead of adding the FilesystemStoreV2 we move some common utilies into a shared file. --- lightning-persister/src/fs_store.rs | 641 +------------------ lightning-persister/src/fs_store_common.rs | 712 +++++++++++++++++++++ lightning-persister/src/lib.rs | 1 + 3 files changed, 731 insertions(+), 623 deletions(-) create mode 100644 lightning-persister/src/fs_store_common.rs diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 3129748afda..b7d09ac675a 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -1,98 +1,37 @@ //! Objects related to [`FilesystemStore`] live here. -use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; +use crate::fs_store_common::FilesystemStoreState; -use lightning::types::string::PrintableString; use lightning::util::persist::{KVStoreSync, MigratableKVStore}; -use std::collections::HashMap; -use std::fs; -use std::io::{Read, Write}; -use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::path::PathBuf; #[cfg(feature = "tokio")] use core::future::Future; #[cfg(feature = "tokio")] use lightning::util::persist::KVStore; -#[cfg(target_os = "windows")] -use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt}; - -#[cfg(target_os = "windows")] -macro_rules! call { - ($e: expr) => { - if $e != 0 { - Ok(()) - } else { - Err(std::io::Error::last_os_error()) - } - }; -} - -#[cfg(target_os = "windows")] -fn path_to_windows_str>(path: &T) -> Vec { - path.as_ref().encode_wide().chain(Some(0)).collect() -} - -// The number of times we retry listing keys in `FilesystemStore::list` before we give up reaching -// a consistent view and error out. -const LIST_DIR_CONSISTENCY_RETRIES: usize = 10; - -struct FilesystemStoreInner { - data_dir: PathBuf, - tmp_file_counter: AtomicUsize, - - // Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the - // latest written version per key. - locks: Mutex>>>, -} - /// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system. /// /// [`KVStore`]: lightning::util::persist::KVStore pub struct FilesystemStore { - inner: Arc, - - // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list - // operations aren't sensitive to the order of execution. - next_version: AtomicU64, + state: FilesystemStoreState, } impl FilesystemStore { /// Constructs a new [`FilesystemStore`]. pub fn new(data_dir: PathBuf) -> Self { - let locks = Mutex::new(HashMap::new()); - let tmp_file_counter = AtomicUsize::new(0); - Self { - inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }), - next_version: AtomicU64::new(1), - } + Self { state: FilesystemStoreState::new(data_dir) } } /// Returns the data directory. pub fn get_data_dir(&self) -> PathBuf { - self.inner.data_dir.clone() - } - - fn get_new_version_and_lock_ref(&self, dest_file_path: PathBuf) -> (Arc>, u64) { - let version = self.next_version.fetch_add(1, Ordering::Relaxed); - if version == u64::MAX { - panic!("FilesystemStore version counter overflowed"); - } - - // Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for - // cleaning up unused locks. - let inner_lock_ref = self.inner.get_inner_lock_ref(dest_file_path); - - (inner_lock_ref, version) + self.state.get_data_dir() } #[cfg(any(all(feature = "tokio", test), fuzzing))] /// Returns the size of the async state. pub fn state_size(&self) -> usize { - let outer_lock = self.inner.locks.lock().unwrap(); - outer_lock.len() + self.state.state_size() } } @@ -100,361 +39,25 @@ impl KVStoreSync for FilesystemStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Result, lightning::io::Error> { - let path = self.inner.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - Some(key), - "read", - )?; - self.inner.read(path) + self.state.read_impl(primary_namespace, secondary_namespace, key) } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> Result<(), lightning::io::Error> { - let path = self.inner.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - Some(key), - "write", - )?; - let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); - self.inner.write_version(inner_lock_ref, path, buf, version) + self.state.write_impl(primary_namespace, secondary_namespace, key, buf) } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> Result<(), lightning::io::Error> { - let path = self.inner.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - Some(key), - "remove", - )?; - let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); - self.inner.remove_version(inner_lock_ref, path, lazy, version) + self.state.remove_impl(primary_namespace, secondary_namespace, key, lazy) } fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> Result, lightning::io::Error> { - let path = self.inner.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - None, - "list", - )?; - self.inner.list(path) - } -} - -impl FilesystemStoreInner { - fn get_inner_lock_ref(&self, path: PathBuf) -> Arc> { - let mut outer_lock = self.locks.lock().unwrap(); - Arc::clone(&outer_lock.entry(path).or_default()) - } - - fn get_dest_dir_path( - &self, primary_namespace: &str, secondary_namespace: &str, - ) -> std::io::Result { - let mut dest_dir_path = { - #[cfg(target_os = "windows")] - { - let data_dir = self.data_dir.clone(); - fs::create_dir_all(data_dir.clone())?; - fs::canonicalize(data_dir)? - } - #[cfg(not(target_os = "windows"))] - { - self.data_dir.clone() - } - }; - - dest_dir_path.push(primary_namespace); - if !secondary_namespace.is_empty() { - dest_dir_path.push(secondary_namespace); - } - - Ok(dest_dir_path) - } - - fn get_checked_dest_file_path( - &self, primary_namespace: &str, secondary_namespace: &str, key: Option<&str>, - operation: &str, - ) -> lightning::io::Result { - check_namespace_key_validity(primary_namespace, secondary_namespace, key, operation)?; - - let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; - if let Some(key) = key { - dest_file_path.push(key); - } - - Ok(dest_file_path) - } - - fn read(&self, dest_file_path: PathBuf) -> lightning::io::Result> { - let mut buf = Vec::new(); - - self.execute_locked_read(dest_file_path.clone(), || { - let mut f = fs::File::open(dest_file_path)?; - f.read_to_end(&mut buf)?; - Ok(()) - })?; - - Ok(buf) - } - - fn execute_locked_write Result<(), lightning::io::Error>>( - &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, version: u64, callback: F, - ) -> Result<(), lightning::io::Error> { - let res = { - let mut last_written_version = inner_lock_ref.write().unwrap(); - - // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual - // consistency. - let is_stale_version = version <= *last_written_version; - - // If the version is not stale, we execute the callback. Otherwise we can and must skip writing. - if is_stale_version { - Ok(()) - } else { - callback().map(|_| { - *last_written_version = version; - }) - } - }; - - self.clean_locks(&inner_lock_ref, dest_file_path); - - res - } - - fn execute_locked_read Result<(), lightning::io::Error>>( - &self, dest_file_path: PathBuf, callback: F, - ) -> Result<(), lightning::io::Error> { - let inner_lock_ref = self.get_inner_lock_ref(dest_file_path.clone()); - let res = { - let _guard = inner_lock_ref.read().unwrap(); - callback() - }; - self.clean_locks(&inner_lock_ref, dest_file_path); - res - } - - fn clean_locks(&self, inner_lock_ref: &Arc>, dest_file_path: PathBuf) { - // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry - // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in - // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already - // counted. - let mut outer_lock = self.locks.lock().unwrap(); - - let strong_count = Arc::strong_count(&inner_lock_ref); - debug_assert!(strong_count >= 2, "Unexpected FilesystemStore strong count"); - - if strong_count == 2 { - outer_lock.remove(&dest_file_path); - } - } - - /// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function - /// returns early without writing. - fn write_version( - &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, buf: Vec, - version: u64, - ) -> lightning::io::Result<()> { - let parent_directory = dest_file_path.parent().ok_or_else(|| { - let msg = - format!("Could not retrieve parent directory of {}.", dest_file_path.display()); - std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) - })?; - fs::create_dir_all(&parent_directory)?; - - // Do a crazy dance with lots of fsync()s to be overly cautious here... - // We never want to end up in a state where we've lost the old data, or end up using the - // old data on power loss after we've returned. - // The way to atomically write a file on Unix platforms is: - // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) - let mut tmp_file_path = dest_file_path.clone(); - let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); - tmp_file_path.set_extension(tmp_file_ext); - - { - let mut tmp_file = fs::File::create(&tmp_file_path)?; - tmp_file.write_all(&buf)?; - tmp_file.sync_all()?; - } - - self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { - #[cfg(not(target_os = "windows"))] - { - fs::rename(&tmp_file_path, &dest_file_path)?; - let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?; - dir_file.sync_all()?; - Ok(()) - } - - #[cfg(target_os = "windows")] - { - let res = if dest_file_path.exists() { - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::ReplaceFileW( - path_to_windows_str(&dest_file_path).as_ptr(), - path_to_windows_str(&tmp_file_path).as_ptr(), - std::ptr::null(), - windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS, - std::ptr::null_mut() as *const core::ffi::c_void, - std::ptr::null_mut() as *const core::ffi::c_void, - ) - }) - } else { - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::MoveFileExW( - path_to_windows_str(&tmp_file_path).as_ptr(), - path_to_windows_str(&dest_file_path).as_ptr(), - windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH - | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, - ) - }) - }; - - match res { - Ok(()) => { - // We fsync the dest file in hopes this will also flush the metadata to disk. - let dest_file = - fs::OpenOptions::new().read(true).write(true).open(&dest_file_path)?; - dest_file.sync_all()?; - Ok(()) - }, - Err(e) => Err(e.into()), - } - } - }) - } - - fn remove_version( - &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, lazy: bool, version: u64, - ) -> lightning::io::Result<()> { - self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { - if !dest_file_path.is_file() { - return Ok(()); - } - - if lazy { - // If we're lazy we just call remove and be done with it. - fs::remove_file(&dest_file_path)?; - } else { - // If we're not lazy we try our best to persist the updated metadata to ensure - // atomicity of this call. - #[cfg(not(target_os = "windows"))] - { - fs::remove_file(&dest_file_path)?; - - let parent_directory = dest_file_path.parent().ok_or_else(|| { - let msg = format!( - "Could not retrieve parent directory of {}.", - dest_file_path.display() - ); - std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) - })?; - let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; - // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes - // to the inode might get cached (and hence possibly lost on crash), depending on - // the target platform and file system. - // - // In order to assert we permanently removed the file in question we therefore - // call `fsync` on the parent directory on platforms that support it. - dir_file.sync_all()?; - } - - #[cfg(target_os = "windows")] - { - // Since Windows `DeleteFile` API is not persisted until the last open file handle - // is dropped, and there seemingly is no reliable way to flush the directory - // metadata, we here fall back to use a 'recycling bin' model, i.e., first move the - // file to be deleted to a temporary trash file and remove the latter file - // afterwards. - // - // This should be marginally better, as, according to the documentation, - // `MoveFileExW` APIs should offer stronger persistence guarantees, - // at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set. - // However, all this is partially based on assumptions and local experiments, as - // Windows API is horribly underdocumented. - let mut trash_file_path = dest_file_path.clone(); - let trash_file_ext = - format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); - trash_file_path.set_extension(trash_file_ext); - - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::MoveFileExW( - path_to_windows_str(&dest_file_path).as_ptr(), - path_to_windows_str(&trash_file_path).as_ptr(), - windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH - | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, - ) - })?; - - { - // We fsync the trash file in hopes this will also flush the original's file - // metadata to disk. - let trash_file = fs::OpenOptions::new() - .read(true) - .write(true) - .open(&trash_file_path.clone())?; - trash_file.sync_all()?; - } - - // We're fine if this remove would fail as the trash file will be cleaned up in - // list eventually. - fs::remove_file(trash_file_path).ok(); - } - } - - Ok(()) - }) - } - - fn list(&self, prefixed_dest: PathBuf) -> lightning::io::Result> { - if !Path::new(&prefixed_dest).exists() { - return Ok(Vec::new()); - } - - let mut keys; - let mut retries = LIST_DIR_CONSISTENCY_RETRIES; - - 'retry_list: loop { - keys = Vec::new(); - 'skip_entry: for entry in fs::read_dir(&prefixed_dest)? { - let entry = entry?; - let p = entry.path(); - - let res = dir_entry_is_key(&entry); - match res { - Ok(true) => { - let key = get_key_from_dir_entry_path(&p, &prefixed_dest)?; - keys.push(key); - }, - Ok(false) => { - // We didn't error, but the entry is not a valid key (e.g., a directory, - // or a temp file). - continue 'skip_entry; - }, - Err(e) => { - if e.kind() == lightning::io::ErrorKind::NotFound && retries > 0 { - // We had found the entry in `read_dir` above, so some race happend. - // Retry the `read_dir` to get a consistent view. - retries -= 1; - continue 'retry_list; - } else { - // For all errors or if we exhausted retries, bubble up. - return Err(e.into()); - } - }, - } - } - break 'retry_list; - } - - Ok(keys) + self.state.list_impl(primary_namespace, secondary_namespace) } } @@ -463,241 +66,31 @@ impl KVStore for FilesystemStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> impl Future, lightning::io::Error>> + 'static + Send { - let this = Arc::clone(&self.inner); - let path = this.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - Some(key), - "read", - ); - - async move { - let path = match path { - Ok(path) => path, - Err(e) => return Err(e), - }; - tokio::task::spawn_blocking(move || this.read(path)).await.unwrap_or_else(|e| { - Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) - }) - } + self.state.read_async(primary_namespace, secondary_namespace, key) } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> impl Future> + 'static + Send { - let this = Arc::clone(&self.inner); - let path = this - .get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "write") - .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path)); - - async move { - let ((inner_lock_ref, version), path) = match path { - Ok(res) => res, - Err(e) => return Err(e), - }; - tokio::task::spawn_blocking(move || { - this.write_version(inner_lock_ref, path, buf, version) - }) - .await - .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) - } + self.state.write_async(primary_namespace, secondary_namespace, key, buf) } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> impl Future> + 'static + Send { - let this = Arc::clone(&self.inner); - let path = this - .get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "remove") - .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path)); - - async move { - let ((inner_lock_ref, version), path) = match path { - Ok(res) => res, - Err(e) => return Err(e), - }; - tokio::task::spawn_blocking(move || { - this.remove_version(inner_lock_ref, path, lazy, version) - }) - .await - .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) - } + self.state.remove_async(primary_namespace, secondary_namespace, key, lazy) } fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> impl Future, lightning::io::Error>> + 'static + Send { - let this = Arc::clone(&self.inner); - - let path = - this.get_checked_dest_file_path(primary_namespace, secondary_namespace, None, "list"); - - async move { - let path = match path { - Ok(path) => path, - Err(e) => return Err(e), - }; - tokio::task::spawn_blocking(move || this.list(path)).await.unwrap_or_else(|e| { - Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) - }) - } - } -} - -fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result { - let p = dir_entry.path(); - if let Some(ext) = p.extension() { - #[cfg(target_os = "windows")] - { - // Clean up any trash files lying around. - if ext == "trash" { - fs::remove_file(p).ok(); - return Ok(false); - } - } - if ext == "tmp" { - return Ok(false); - } - } - - let file_type = dir_entry.file_type()?; - - // We allow the presence of directories in the empty primary namespace and just skip them. - if file_type.is_dir() { - return Ok(false); - } - - // If we otherwise don't find a file at the given path something went wrong. - if !file_type.is_file() { - debug_assert!( - false, - "Failed to list keys at path {}: file couldn't be accessed.", - PrintableString(p.to_str().unwrap_or_default()) - ); - let msg = format!( - "Failed to list keys at path {}: file couldn't be accessed.", - PrintableString(p.to_str().unwrap_or_default()) - ); - return Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, msg)); - } - - Ok(true) -} - -fn get_key_from_dir_entry_path(p: &Path, base_path: &Path) -> Result { - match p.strip_prefix(&base_path) { - Ok(stripped_path) => { - if let Some(relative_path) = stripped_path.to_str() { - if is_valid_kvstore_str(relative_path) { - return Ok(relative_path.to_string()); - } else { - debug_assert!( - false, - "Failed to list keys of path {}: file path is not valid key", - PrintableString(p.to_str().unwrap_or_default()) - ); - let msg = format!( - "Failed to list keys of path {}: file path is not valid key", - PrintableString(p.to_str().unwrap_or_default()) - ); - return Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, msg)); - } - } else { - debug_assert!( - false, - "Failed to list keys of path {}: file path is not valid UTF-8", - PrintableString(p.to_str().unwrap_or_default()) - ); - let msg = format!( - "Failed to list keys of path {}: file path is not valid UTF-8", - PrintableString(p.to_str().unwrap_or_default()) - ); - return Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, msg)); - } - }, - Err(e) => { - debug_assert!( - false, - "Failed to list keys of path {}: {}", - PrintableString(p.to_str().unwrap_or_default()), - e - ); - let msg = format!( - "Failed to list keys of path {}: {}", - PrintableString(p.to_str().unwrap_or_default()), - e - ); - return Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, msg)); - }, + self.state.list_async(primary_namespace, secondary_namespace) } } impl MigratableKVStore for FilesystemStore { fn list_all_keys(&self) -> Result, lightning::io::Error> { - let prefixed_dest = &self.inner.data_dir; - if !prefixed_dest.exists() { - return Ok(Vec::new()); - } - - let mut keys = Vec::new(); - - 'primary_loop: for primary_entry in fs::read_dir(prefixed_dest)? { - let primary_entry = primary_entry?; - let primary_path = primary_entry.path(); - - if dir_entry_is_key(&primary_entry)? { - let primary_namespace = String::new(); - let secondary_namespace = String::new(); - let key = get_key_from_dir_entry_path(&primary_path, prefixed_dest)?; - keys.push((primary_namespace, secondary_namespace, key)); - continue 'primary_loop; - } - - // The primary_entry is actually also a directory. - 'secondary_loop: for secondary_entry in fs::read_dir(&primary_path)? { - let secondary_entry = secondary_entry?; - let secondary_path = secondary_entry.path(); - - if dir_entry_is_key(&secondary_entry)? { - let primary_namespace = - get_key_from_dir_entry_path(&primary_path, prefixed_dest)?; - let secondary_namespace = String::new(); - let key = get_key_from_dir_entry_path(&secondary_path, &primary_path)?; - keys.push((primary_namespace, secondary_namespace, key)); - continue 'secondary_loop; - } - - // The secondary_entry is actually also a directory. - for tertiary_entry in fs::read_dir(&secondary_path)? { - let tertiary_entry = tertiary_entry?; - let tertiary_path = tertiary_entry.path(); - - if dir_entry_is_key(&tertiary_entry)? { - let primary_namespace = - get_key_from_dir_entry_path(&primary_path, prefixed_dest)?; - let secondary_namespace = - get_key_from_dir_entry_path(&secondary_path, &primary_path)?; - let key = get_key_from_dir_entry_path(&tertiary_path, &secondary_path)?; - keys.push((primary_namespace, secondary_namespace, key)); - } else { - debug_assert!( - false, - "Failed to list keys of path {}: only two levels of namespaces are supported", - PrintableString(tertiary_path.to_str().unwrap_or_default()) - ); - let msg = format!( - "Failed to list keys of path {}: only two levels of namespaces are supported", - PrintableString(tertiary_path.to_str().unwrap_or_default()) - ); - return Err(lightning::io::Error::new( - lightning::io::ErrorKind::Other, - msg, - )); - } - } - } - } - Ok(keys) + self.state.list_all_keys_impl() } } @@ -716,11 +109,13 @@ mod tests { use lightning::util::persist::read_channel_monitors; use lightning::util::test_utils; + use std::fs; + impl Drop for FilesystemStore { fn drop(&mut self) { // We test for invalid directory names, so it's OK if directory removal // fails. - match fs::remove_dir_all(&self.inner.data_dir) { + match fs::remove_dir_all(&self.get_data_dir()) { Err(e) => println!("Failed to remove test persister directory: {}", e), _ => {}, } diff --git a/lightning-persister/src/fs_store_common.rs b/lightning-persister/src/fs_store_common.rs new file mode 100644 index 00000000000..6b0d1213db6 --- /dev/null +++ b/lightning-persister/src/fs_store_common.rs @@ -0,0 +1,712 @@ +//! Common utilities shared between [`FilesystemStore`]. +//! +//! [`FilesystemStore`]: crate::fs_store::FilesystemStore + +use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; + +use lightning::types::string::PrintableString; + +use std::collections::HashMap; +use std::fs; +use std::io::{Read, Write}; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; + +#[cfg(target_os = "windows")] +use std::ffi::OsStr; +#[cfg(feature = "tokio")] +use std::future::Future; +#[cfg(target_os = "windows")] +use std::os::windows::ffi::OsStrExt; + +/// Calls a Windows API function and returns Ok(()) on success or the last OS error on failure. +#[cfg(target_os = "windows")] +macro_rules! call { + ($e: expr) => { + if $e != 0 { + Ok(()) + } else { + Err(std::io::Error::last_os_error()) + } + }; +} + +#[cfg(target_os = "windows")] +use call; + +/// Converts a path to a null-terminated wide string for Windows API calls. +#[cfg(target_os = "windows")] +fn path_to_windows_str>(path: &T) -> Vec { + path.as_ref().encode_wide().chain(Some(0)).collect() +} + +// The number of times we retry listing keys in `FilesystemStore::list` before we give up reaching +// a consistent view and error out. +const LIST_DIR_CONSISTENCY_RETRIES: usize = 10; + +/// Inner state shared between sync and async operations for filesystem stores. +/// +/// This struct manages the data directory, temporary file counter, and per-path locks +/// that ensure we don't have concurrent writes to the same file. +struct FilesystemStoreInner { + data_dir: PathBuf, + tmp_file_counter: AtomicUsize, + + // Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the + // latest written version per key. + locks: Mutex>>>, +} + +pub(crate) struct FilesystemStoreState { + inner: Arc, + + // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list + // operations aren't sensitive to the order of execution. + next_version: AtomicU64, +} + +impl FilesystemStoreState { + /// Creates a new [`FilesystemStoreInner`] with the given data directory. + pub(crate) fn new(data_dir: PathBuf) -> Self { + Self { + inner: Arc::new(FilesystemStoreInner { + data_dir, + tmp_file_counter: AtomicUsize::new(0), + locks: Mutex::new(HashMap::new()), + }), + next_version: AtomicU64::new(1), + } + } + + /// Returns the data directory. + pub fn get_data_dir(&self) -> PathBuf { + self.inner.data_dir.clone() + } + + fn get_new_version_and_lock_ref(&self, dest_file_path: PathBuf) -> (Arc>, u64) { + let version = self.next_version.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("FilesystemStore version counter overflowed"); + } + + // Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for + // cleaning up unused locks. + let inner_lock_ref = self.inner.get_inner_lock_ref(dest_file_path); + + (inner_lock_ref, version) + } + + #[cfg(any(all(feature = "tokio", test), fuzzing))] + /// Returns the size of the async state. + pub fn state_size(&self) -> usize { + let outer_lock = self.inner.locks.lock().unwrap(); + outer_lock.len() + } +} + +impl FilesystemStoreInner { + fn get_inner_lock_ref(&self, path: PathBuf) -> Arc> { + let mut outer_lock = self.locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(path).or_default()) + } + + fn get_dest_dir_path( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> std::io::Result { + let mut dest_dir_path = { + #[cfg(target_os = "windows")] + { + let data_dir = self.data_dir.clone(); + fs::create_dir_all(data_dir.clone())?; + fs::canonicalize(data_dir)? + } + #[cfg(not(target_os = "windows"))] + { + self.data_dir.clone() + } + }; + + dest_dir_path.push(primary_namespace); + if !secondary_namespace.is_empty() { + dest_dir_path.push(secondary_namespace); + } + + Ok(dest_dir_path) + } + + fn get_checked_dest_file_path( + &self, primary_namespace: &str, secondary_namespace: &str, key: Option<&str>, + operation: &str, + ) -> lightning::io::Result { + check_namespace_key_validity(primary_namespace, secondary_namespace, key, operation)?; + + let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; + if let Some(key) = key { + dest_file_path.push(key); + } + + Ok(dest_file_path) + } + + fn read(&self, dest_file_path: PathBuf) -> lightning::io::Result> { + let mut buf = Vec::new(); + + self.execute_locked_read(dest_file_path.clone(), || { + let mut f = fs::File::open(dest_file_path)?; + f.read_to_end(&mut buf)?; + Ok(()) + })?; + + Ok(buf) + } + + fn execute_locked_write Result<(), lightning::io::Error>>( + &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, version: u64, callback: F, + ) -> Result<(), lightning::io::Error> { + let res = { + let mut last_written_version = inner_lock_ref.write().unwrap(); + + // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual + // consistency. + let is_stale_version = version <= *last_written_version; + + // If the version is not stale, we execute the callback. Otherwise we can and must skip writing. + if is_stale_version { + Ok(()) + } else { + callback().map(|_| { + *last_written_version = version; + }) + } + }; + + self.clean_locks(&inner_lock_ref, dest_file_path); + + res + } + + fn execute_locked_read Result<(), lightning::io::Error>>( + &self, dest_file_path: PathBuf, callback: F, + ) -> Result<(), lightning::io::Error> { + let inner_lock_ref = self.get_inner_lock_ref(dest_file_path.clone()); + let res = { + let _guard = inner_lock_ref.read().unwrap(); + callback() + }; + self.clean_locks(&inner_lock_ref, dest_file_path); + res + } + + fn clean_locks(&self, inner_lock_ref: &Arc>, dest_file_path: PathBuf) { + // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry + // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in + // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already + // counted. + let mut outer_lock = self.locks.lock().unwrap(); + + let strong_count = Arc::strong_count(&inner_lock_ref); + debug_assert!(strong_count >= 2, "Unexpected FilesystemStore strong count"); + + if strong_count == 2 { + outer_lock.remove(&dest_file_path); + } + } + + /// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function + /// returns early without writing. + fn write_version( + &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, buf: Vec, + version: u64, + ) -> lightning::io::Result<()> { + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = + format!("Could not retrieve parent directory of {}.", dest_file_path.display()); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; + fs::create_dir_all(&parent_directory)?; + + // Do a crazy dance with lots of fsync()s to be overly cautious here... + // We never want to end up in a state where we've lost the old data, or end up using the + // old data on power loss after we've returned. + // The way to atomically write a file on Unix platforms is: + // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) + let mut tmp_file_path = dest_file_path.clone(); + let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); + tmp_file_path.set_extension(tmp_file_ext); + + { + let mut tmp_file = fs::File::create(&tmp_file_path)?; + tmp_file.write_all(&buf)?; + tmp_file.sync_all()?; + } + + self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + #[cfg(not(target_os = "windows"))] + { + fs::rename(&tmp_file_path, &dest_file_path)?; + let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?; + dir_file.sync_all()?; + Ok(()) + } + + #[cfg(target_os = "windows")] + { + let res = if dest_file_path.exists() { + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::ReplaceFileW( + path_to_windows_str(&dest_file_path).as_ptr(), + path_to_windows_str(&tmp_file_path).as_ptr(), + std::ptr::null(), + windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS, + std::ptr::null_mut() as *const core::ffi::c_void, + std::ptr::null_mut() as *const core::ffi::c_void, + ) + }) + } else { + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::MoveFileExW( + path_to_windows_str(&tmp_file_path).as_ptr(), + path_to_windows_str(&dest_file_path).as_ptr(), + windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH + | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, + ) + }) + }; + + match res { + Ok(()) => { + // We fsync the dest file in hopes this will also flush the metadata to disk. + let dest_file = + fs::OpenOptions::new().read(true).write(true).open(&dest_file_path)?; + dest_file.sync_all()?; + Ok(()) + }, + Err(e) => Err(e.into()), + } + } + }) + } + + fn remove_version( + &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, lazy: bool, version: u64, + ) -> lightning::io::Result<()> { + self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + if !dest_file_path.is_file() { + return Ok(()); + } + + if lazy { + // If we're lazy we just call remove and be done with it. + fs::remove_file(&dest_file_path)?; + } else { + // If we're not lazy we try our best to persist the updated metadata to ensure + // atomicity of this call. + #[cfg(not(target_os = "windows"))] + { + fs::remove_file(&dest_file_path)?; + + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = format!( + "Could not retrieve parent directory of {}.", + dest_file_path.display() + ); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; + let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; + // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes + // to the inode might get cached (and hence possibly lost on crash), depending on + // the target platform and file system. + // + // In order to assert we permanently removed the file in question we therefore + // call `fsync` on the parent directory on platforms that support it. + dir_file.sync_all()?; + } + + #[cfg(target_os = "windows")] + { + // Since Windows `DeleteFile` API is not persisted until the last open file handle + // is dropped, and there seemingly is no reliable way to flush the directory + // metadata, we here fall back to use a 'recycling bin' model, i.e., first move the + // file to be deleted to a temporary trash file and remove the latter file + // afterwards. + // + // This should be marginally better, as, according to the documentation, + // `MoveFileExW` APIs should offer stronger persistence guarantees, + // at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set. + // However, all this is partially based on assumptions and local experiments, as + // Windows API is horribly underdocumented. + let mut trash_file_path = dest_file_path.clone(); + let trash_file_ext = + format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); + trash_file_path.set_extension(trash_file_ext); + + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::MoveFileExW( + path_to_windows_str(&dest_file_path).as_ptr(), + path_to_windows_str(&trash_file_path).as_ptr(), + windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH + | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, + ) + })?; + + { + // We fsync the trash file in hopes this will also flush the original's file + // metadata to disk. + let trash_file = fs::OpenOptions::new() + .read(true) + .write(true) + .open(&trash_file_path.clone())?; + trash_file.sync_all()?; + } + + // We're fine if this remove would fail as the trash file will be cleaned up in + // list eventually. + fs::remove_file(trash_file_path).ok(); + } + } + + Ok(()) + }) + } + + fn list(&self, prefixed_dest: PathBuf) -> lightning::io::Result> { + if !Path::new(&prefixed_dest).exists() { + return Ok(Vec::new()); + } + + let mut keys; + let mut retries = LIST_DIR_CONSISTENCY_RETRIES; + + 'retry_list: loop { + keys = Vec::new(); + 'skip_entry: for entry in fs::read_dir(&prefixed_dest)? { + let entry = entry?; + let p = entry.path(); + + let res = dir_entry_is_key(&entry); + match res { + Ok(true) => { + let key = get_key_from_dir_entry_path(&p, &prefixed_dest)?; + keys.push(key); + }, + Ok(false) => { + // We didn't error, but the entry is not a valid key (e.g., a directory, + // or a temp file). + continue 'skip_entry; + }, + Err(e) => { + if e.kind() == lightning::io::ErrorKind::NotFound && retries > 0 { + // We had found the entry in `read_dir` above, so some race happend. + // Retry the `read_dir` to get a consistent view. + retries -= 1; + continue 'retry_list; + } else { + // For all errors or if we exhausted retries, bubble up. + return Err(e.into()); + } + }, + } + } + break 'retry_list; + } + + Ok(keys) + } +} + +impl FilesystemStoreState { + pub(crate) fn read_impl( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, lightning::io::Error> { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "read", + )?; + self.inner.read(path) + } + + pub(crate) fn write_impl( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), lightning::io::Error> { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "write", + )?; + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); + self.inner.write_version(inner_lock_ref, path, buf, version) + } + + pub(crate) fn remove_impl( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), lightning::io::Error> { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "remove", + )?; + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); + self.inner.remove_version(inner_lock_ref, path, lazy, version) + } + + pub(crate) fn list_impl( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, lightning::io::Error> { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + None, + "list", + )?; + self.inner.list(path) + } + + #[cfg(feature = "tokio")] + pub(crate) fn read_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, lightning::io::Error>> + 'static + Send { + let this = Arc::clone(&self.inner); + let path = this.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "read", + ); + + async move { + let path = match path { + Ok(path) => path, + Err(e) => return Err(e), + }; + tokio::task::spawn_blocking(move || this.read(path)).await.unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) + } + } + + #[cfg(feature = "tokio")] + pub(crate) fn write_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let this = Arc::clone(&self.inner); + let path = this + .get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "write") + .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path)); + + async move { + let ((inner_lock_ref, version), path) = match path { + Ok(res) => res, + Err(e) => return Err(e), + }; + tokio::task::spawn_blocking(move || { + this.write_version(inner_lock_ref, path, buf, version) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } + + #[cfg(feature = "tokio")] + pub(crate) fn remove_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let this = Arc::clone(&self.inner); + let path = this + .get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "remove") + .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path)); + + async move { + let ((inner_lock_ref, version), path) = match path { + Ok(res) => res, + Err(e) => return Err(e), + }; + tokio::task::spawn_blocking(move || { + this.remove_version(inner_lock_ref, path, lazy, version) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } + + #[cfg(feature = "tokio")] + pub(crate) fn list_async( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, lightning::io::Error>> + 'static + Send { + let this = Arc::clone(&self.inner); + + let path = + this.get_checked_dest_file_path(primary_namespace, secondary_namespace, None, "list"); + + async move { + let path = match path { + Ok(path) => path, + Err(e) => return Err(e), + }; + tokio::task::spawn_blocking(move || this.list(path)).await.unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) + } + } + + pub(crate) fn list_all_keys_impl( + &self, + ) -> Result, lightning::io::Error> { + let prefixed_dest = &self.inner.data_dir; + if !prefixed_dest.exists() { + return Ok(Vec::new()); + } + + let mut keys = Vec::new(); + + 'primary_loop: for primary_entry in fs::read_dir(prefixed_dest)? { + let primary_entry = primary_entry?; + let primary_path = primary_entry.path(); + + if dir_entry_is_key(&primary_entry)? { + let primary_namespace = String::new(); + let secondary_namespace = String::new(); + let key = get_key_from_dir_entry_path(&primary_path, prefixed_dest)?; + keys.push((primary_namespace, secondary_namespace, key)); + continue 'primary_loop; + } + + // The primary_entry is actually also a directory. + 'secondary_loop: for secondary_entry in fs::read_dir(&primary_path)? { + let secondary_entry = secondary_entry?; + let secondary_path = secondary_entry.path(); + + if dir_entry_is_key(&secondary_entry)? { + let primary_namespace = + get_key_from_dir_entry_path(&primary_path, prefixed_dest)?; + let secondary_namespace = String::new(); + let key = get_key_from_dir_entry_path(&secondary_path, &primary_path)?; + keys.push((primary_namespace, secondary_namespace, key)); + continue 'secondary_loop; + } + + // The secondary_entry is actually also a directory. + for tertiary_entry in fs::read_dir(&secondary_path)? { + let tertiary_entry = tertiary_entry?; + let tertiary_path = tertiary_entry.path(); + + if dir_entry_is_key(&tertiary_entry)? { + let primary_namespace = + get_key_from_dir_entry_path(&primary_path, prefixed_dest)?; + let secondary_namespace = + get_key_from_dir_entry_path(&secondary_path, &primary_path)?; + let key = get_key_from_dir_entry_path(&tertiary_path, &secondary_path)?; + keys.push((primary_namespace, secondary_namespace, key)); + } else { + debug_assert!( + false, + "Failed to list keys of path {}: only two levels of namespaces are supported", + PrintableString(tertiary_path.to_str().unwrap_or_default()) + ); + let msg = format!( + "Failed to list keys of path {}: only two levels of namespaces are supported", + PrintableString(tertiary_path.to_str().unwrap_or_default()) + ); + return Err(lightning::io::Error::new( + lightning::io::ErrorKind::Other, + msg, + )); + } + } + } + } + Ok(keys) + } +} + +fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result { + let p = dir_entry.path(); + if let Some(ext) = p.extension() { + #[cfg(target_os = "windows")] + { + // Clean up any trash files lying around. + if ext == "trash" { + fs::remove_file(p).ok(); + return Ok(false); + } + } + if ext == "tmp" { + return Ok(false); + } + } + + let file_type = dir_entry.file_type()?; + + // We allow the presence of directories in the empty primary namespace and just skip them. + if file_type.is_dir() { + return Ok(false); + } + + // If we otherwise don't find a file at the given path something went wrong. + if !file_type.is_file() { + debug_assert!( + false, + "Failed to list keys at path {}: file couldn't be accessed.", + PrintableString(p.to_str().unwrap_or_default()) + ); + let msg = format!( + "Failed to list keys at path {}: file couldn't be accessed.", + PrintableString(p.to_str().unwrap_or_default()) + ); + return Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, msg)); + } + + Ok(true) +} + +fn get_key_from_dir_entry_path(p: &Path, base_path: &Path) -> Result { + match p.strip_prefix(&base_path) { + Ok(stripped_path) => { + if let Some(relative_path) = stripped_path.to_str() { + if is_valid_kvstore_str(relative_path) { + return Ok(relative_path.to_string()); + } else { + debug_assert!( + false, + "Failed to list keys of path {}: file path is not valid key", + PrintableString(p.to_str().unwrap_or_default()) + ); + let msg = format!( + "Failed to list keys of path {}: file path is not valid key", + PrintableString(p.to_str().unwrap_or_default()) + ); + return Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, msg)); + } + } else { + debug_assert!( + false, + "Failed to list keys of path {}: file path is not valid UTF-8", + PrintableString(p.to_str().unwrap_or_default()) + ); + let msg = format!( + "Failed to list keys of path {}: file path is not valid UTF-8", + PrintableString(p.to_str().unwrap_or_default()) + ); + return Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, msg)); + } + }, + Err(e) => { + debug_assert!( + false, + "Failed to list keys of path {}: {}", + PrintableString(p.to_str().unwrap_or_default()), + e + ); + let msg = format!( + "Failed to list keys of path {}: {}", + PrintableString(p.to_str().unwrap_or_default()), + e + ); + return Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, msg)); + }, + } +} diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index 0e3541e1b27..e4973f532e8 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -10,6 +10,7 @@ extern crate criterion; pub mod fs_store; +mod fs_store_common; mod utils; #[cfg(test)] From e969258cc4bcd4aa4851e00b44c78ac107a1d53c Mon Sep 17 00:00:00 2001 From: benthecarman Date: Fri, 13 Feb 2026 14:35:02 -0600 Subject: [PATCH 3/3] Add FilesystemStoreV2 with paginated listing support Implements PaginatedKVStore traits with timestamp-prefixed filenames for newest-first pagination and [empty] directory markers for consistent namespace hierarchy. Co-Authored-By: Claude Opus 4.5 --- lightning-persister/src/fs_store.rs | 18 +- lightning-persister/src/fs_store_common.rs | 293 ++++++++++-- lightning-persister/src/fs_store_v2.rs | 526 +++++++++++++++++++++ lightning-persister/src/lib.rs | 1 + 4 files changed, 801 insertions(+), 37 deletions(-) create mode 100644 lightning-persister/src/fs_store_v2.rs diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index b7d09ac675a..beb69274dc6 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -39,25 +39,25 @@ impl KVStoreSync for FilesystemStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Result, lightning::io::Error> { - self.state.read_impl(primary_namespace, secondary_namespace, key) + self.state.read_impl(primary_namespace, secondary_namespace, key, false) } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> Result<(), lightning::io::Error> { - self.state.write_impl(primary_namespace, secondary_namespace, key, buf) + self.state.write_impl(primary_namespace, secondary_namespace, key, buf, false) } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> Result<(), lightning::io::Error> { - self.state.remove_impl(primary_namespace, secondary_namespace, key, lazy) + self.state.remove_impl(primary_namespace, secondary_namespace, key, lazy, false) } fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> Result, lightning::io::Error> { - self.state.list_impl(primary_namespace, secondary_namespace) + self.state.list_impl(primary_namespace, secondary_namespace, false) } } @@ -66,31 +66,31 @@ impl KVStore for FilesystemStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> impl Future, lightning::io::Error>> + 'static + Send { - self.state.read_async(primary_namespace, secondary_namespace, key) + self.state.read_async(primary_namespace, secondary_namespace, key, false) } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> impl Future> + 'static + Send { - self.state.write_async(primary_namespace, secondary_namespace, key, buf) + self.state.write_async(primary_namespace, secondary_namespace, key, buf, false) } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> impl Future> + 'static + Send { - self.state.remove_async(primary_namespace, secondary_namespace, key, lazy) + self.state.remove_async(primary_namespace, secondary_namespace, key, lazy, false) } fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> impl Future, lightning::io::Error>> + 'static + Send { - self.state.list_async(primary_namespace, secondary_namespace) + self.state.list_async(primary_namespace, secondary_namespace, false) } } impl MigratableKVStore for FilesystemStore { fn list_all_keys(&self) -> Result, lightning::io::Error> { - self.state.list_all_keys_impl() + self.state.list_all_keys_impl(false) } } diff --git a/lightning-persister/src/fs_store_common.rs b/lightning-persister/src/fs_store_common.rs index 6b0d1213db6..88aa9490ba5 100644 --- a/lightning-persister/src/fs_store_common.rs +++ b/lightning-persister/src/fs_store_common.rs @@ -12,6 +12,7 @@ use std::io::{Read, Write}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; +use std::time::UNIX_EPOCH; #[cfg(target_os = "windows")] use std::ffi::OsStr; @@ -34,6 +35,7 @@ macro_rules! call { #[cfg(target_os = "windows")] use call; +use lightning::util::persist::{PageToken, PaginatedListResponse}; /// Converts a path to a null-terminated wide string for Windows API calls. #[cfg(target_os = "windows")] @@ -45,6 +47,11 @@ fn path_to_windows_str>(path: &T) -> Vec { // a consistent view and error out. const LIST_DIR_CONSISTENCY_RETRIES: usize = 10; +// The directory name used for empty namespaces in v2. +// Uses brackets which are not in KVSTORE_NAMESPACE_KEY_ALPHABET, preventing collisions +// with valid namespace names. +pub(crate) const EMPTY_NAMESPACE_DIR: &str = "[empty]"; + /// Inner state shared between sync and async operations for filesystem stores. /// /// This struct manages the data directory, temporary file counter, and per-path locks @@ -112,7 +119,7 @@ impl FilesystemStoreInner { } fn get_dest_dir_path( - &self, primary_namespace: &str, secondary_namespace: &str, + &self, primary_namespace: &str, secondary_namespace: &str, use_empty_ns_dir: bool, ) -> std::io::Result { let mut dest_dir_path = { #[cfg(target_os = "windows")] @@ -127,9 +134,22 @@ impl FilesystemStoreInner { } }; - dest_dir_path.push(primary_namespace); - if !secondary_namespace.is_empty() { - dest_dir_path.push(secondary_namespace); + if use_empty_ns_dir { + dest_dir_path.push(if primary_namespace.is_empty() { + EMPTY_NAMESPACE_DIR + } else { + primary_namespace + }); + dest_dir_path.push(if secondary_namespace.is_empty() { + EMPTY_NAMESPACE_DIR + } else { + secondary_namespace + }); + } else { + dest_dir_path.push(primary_namespace); + if !secondary_namespace.is_empty() { + dest_dir_path.push(secondary_namespace); + } } Ok(dest_dir_path) @@ -137,11 +157,12 @@ impl FilesystemStoreInner { fn get_checked_dest_file_path( &self, primary_namespace: &str, secondary_namespace: &str, key: Option<&str>, - operation: &str, + operation: &str, use_empty_ns_dir: bool, ) -> lightning::io::Result { check_namespace_key_validity(primary_namespace, secondary_namespace, key, operation)?; - let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; + let mut dest_file_path = + self.get_dest_dir_path(primary_namespace, secondary_namespace, use_empty_ns_dir)?; if let Some(key) = key { dest_file_path.push(key); } @@ -217,8 +238,13 @@ impl FilesystemStoreInner { /// returns early without writing. fn write_version( &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, buf: Vec, - version: u64, + version: u64, preserve_mtime: bool, ) -> lightning::io::Result<()> { + let mtime = if preserve_mtime { + fs::metadata(&dest_file_path).ok().and_then(|m| m.modified().ok()) + } else { + None + }; let parent_directory = dest_file_path.parent().ok_or_else(|| { let msg = format!("Could not retrieve parent directory of {}.", dest_file_path.display()); @@ -238,6 +264,13 @@ impl FilesystemStoreInner { { let mut tmp_file = fs::File::create(&tmp_file_path)?; tmp_file.write_all(&buf)?; + + // If we need to preserve the original mtime (for updates), set it before fsync. + if let Some(mtime) = mtime { + let times = fs::FileTimes::new().set_modified(mtime); + tmp_file.set_times(times)?; + } + tmp_file.sync_all()?; } @@ -370,13 +403,76 @@ impl FilesystemStoreInner { }) } - fn list(&self, prefixed_dest: PathBuf) -> lightning::io::Result> { + fn list_paginated( + &self, prefixed_dest: PathBuf, page_token: Option, + ) -> lightning::io::Result { + if !prefixed_dest.exists() { + return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None }); + } + + // Collect all entries with their modification times + let mut entries: Vec<(u64, String)> = Vec::new(); + for dir_entry in fs::read_dir(&prefixed_dest)? { + let dir_entry = dir_entry?; + + let key = get_key_from_dir_entry_path(&dir_entry.path(), prefixed_dest.as_path())?; + // Get modification time as millis since epoch + let mtime_millis = dir_entry + .metadata() + .ok() + .and_then(|m| m.modified().ok()) + .and_then(|t| t.duration_since(UNIX_EPOCH).ok()) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + + entries.push((mtime_millis, key)); + } + + // Sort by mtime descending (newest first), then by key descending for same mtime + entries.sort_by(|a, b| b.0.cmp(&a.0).then_with(|| b.1.cmp(&a.1))); + + // Find starting position based on page token + let start_idx = if let Some(token) = page_token { + let (token_mtime, token_key) = parse_page_token(&token.0)?; + + // Find entries that come after the token (older entries = lower mtime) + // or same mtime but lexicographically smaller key (since we sort descending) + entries + .iter() + .position(|(mtime, key)| { + *mtime < token_mtime + || (*mtime == token_mtime && key.as_str() < token_key.as_str()) + }) + .unwrap_or(entries.len()) + } else { + 0 + }; + + // Take PAGE_SIZE entries starting from start_idx + let page_entries: Vec<_> = + entries.iter().skip(start_idx).take(PAGE_SIZE).cloned().collect(); + + let keys: Vec = page_entries.iter().map(|(_, key)| key.clone()).collect(); + + // Determine next page token + let next_page_token = if start_idx + PAGE_SIZE < entries.len() { + page_entries.last().map(|(mtime, key)| PageToken(format_page_token(*mtime, key))) + } else { + None + }; + + Ok(PaginatedListResponse { keys, next_page_token }) + } + + fn list( + &self, prefixed_dest: PathBuf, retry_on_race: bool, + ) -> lightning::io::Result> { if !Path::new(&prefixed_dest).exists() { return Ok(Vec::new()); } let mut keys; - let mut retries = LIST_DIR_CONSISTENCY_RETRIES; + let mut retries = if retry_on_race { LIST_DIR_CONSISTENCY_RETRIES } else { 0 }; 'retry_list: loop { keys = Vec::new(); @@ -418,57 +514,65 @@ impl FilesystemStoreInner { impl FilesystemStoreState { pub(crate) fn read_impl( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + use_empty_ns_dir: bool, ) -> Result, lightning::io::Error> { let path = self.inner.get_checked_dest_file_path( primary_namespace, secondary_namespace, Some(key), "read", + use_empty_ns_dir, )?; self.inner.read(path) } pub(crate) fn write_impl( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + use_empty_ns_dir: bool, ) -> Result<(), lightning::io::Error> { let path = self.inner.get_checked_dest_file_path( primary_namespace, secondary_namespace, Some(key), "write", + use_empty_ns_dir, )?; let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); - self.inner.write_version(inner_lock_ref, path, buf, version) + self.inner.write_version(inner_lock_ref, path, buf, version, use_empty_ns_dir) } pub(crate) fn remove_impl( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + use_empty_ns_dir: bool, ) -> Result<(), lightning::io::Error> { let path = self.inner.get_checked_dest_file_path( primary_namespace, secondary_namespace, Some(key), "remove", + use_empty_ns_dir, )?; let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); self.inner.remove_version(inner_lock_ref, path, lazy, version) } pub(crate) fn list_impl( - &self, primary_namespace: &str, secondary_namespace: &str, + &self, primary_namespace: &str, secondary_namespace: &str, use_empty_ns_dir: bool, ) -> Result, lightning::io::Error> { let path = self.inner.get_checked_dest_file_path( primary_namespace, secondary_namespace, None, "list", + use_empty_ns_dir, )?; - self.inner.list(path) + self.inner.list(path, !use_empty_ns_dir) } #[cfg(feature = "tokio")] pub(crate) fn read_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + use_empty_ns_dir: bool, ) -> impl Future, lightning::io::Error>> + 'static + Send { let this = Arc::clone(&self.inner); let path = this.get_checked_dest_file_path( @@ -476,6 +580,7 @@ impl FilesystemStoreState { secondary_namespace, Some(key), "read", + use_empty_ns_dir, ); async move { @@ -492,10 +597,17 @@ impl FilesystemStoreState { #[cfg(feature = "tokio")] pub(crate) fn write_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + use_empty_ns_dir: bool, ) -> impl Future> + 'static + Send { let this = Arc::clone(&self.inner); let path = this - .get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "write") + .get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "write", + use_empty_ns_dir, + ) .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path)); async move { @@ -504,7 +616,7 @@ impl FilesystemStoreState { Err(e) => return Err(e), }; tokio::task::spawn_blocking(move || { - this.write_version(inner_lock_ref, path, buf, version) + this.write_version(inner_lock_ref, path, buf, version, use_empty_ns_dir) }) .await .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) @@ -514,10 +626,17 @@ impl FilesystemStoreState { #[cfg(feature = "tokio")] pub(crate) fn remove_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + use_empty_ns_dir: bool, ) -> impl Future> + 'static + Send { let this = Arc::clone(&self.inner); let path = this - .get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "remove") + .get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "remove", + use_empty_ns_dir, + ) .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path)); async move { @@ -535,32 +654,50 @@ impl FilesystemStoreState { #[cfg(feature = "tokio")] pub(crate) fn list_async( - &self, primary_namespace: &str, secondary_namespace: &str, + &self, primary_namespace: &str, secondary_namespace: &str, use_empty_ns_dir: bool, ) -> impl Future, lightning::io::Error>> + 'static + Send { let this = Arc::clone(&self.inner); - let path = - this.get_checked_dest_file_path(primary_namespace, secondary_namespace, None, "list"); + let path = this.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + None, + "list", + use_empty_ns_dir, + ); async move { let path = match path { Ok(path) => path, Err(e) => return Err(e), }; - tokio::task::spawn_blocking(move || this.list(path)).await.unwrap_or_else(|e| { - Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) - }) + tokio::task::spawn_blocking(move || this.list(path, !use_empty_ns_dir)) + .await + .unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) } } pub(crate) fn list_all_keys_impl( - &self, + &self, use_empty_ns_dir: bool, ) -> Result, lightning::io::Error> { let prefixed_dest = &self.inner.data_dir; if !prefixed_dest.exists() { return Ok(Vec::new()); } + // When use_empty_ns_dir is true (v2), namespace directories may be named + // [empty] to represent empty namespaces, so we resolve via namespace_from_dir_path. + // When false (v1), directory names are always valid kvstore strings. + let resolve_ns = |path: &Path, base: &Path| -> Result { + if use_empty_ns_dir { + namespace_from_dir_path(path) + } else { + get_key_from_dir_entry_path(path, base) + } + }; + let mut keys = Vec::new(); 'primary_loop: for primary_entry in fs::read_dir(prefixed_dest)? { @@ -581,8 +718,7 @@ impl FilesystemStoreState { let secondary_path = secondary_entry.path(); if dir_entry_is_key(&secondary_entry)? { - let primary_namespace = - get_key_from_dir_entry_path(&primary_path, prefixed_dest)?; + let primary_namespace = resolve_ns(&primary_path, prefixed_dest)?; let secondary_namespace = String::new(); let key = get_key_from_dir_entry_path(&secondary_path, &primary_path)?; keys.push((primary_namespace, secondary_namespace, key)); @@ -595,10 +731,8 @@ impl FilesystemStoreState { let tertiary_path = tertiary_entry.path(); if dir_entry_is_key(&tertiary_entry)? { - let primary_namespace = - get_key_from_dir_entry_path(&primary_path, prefixed_dest)?; - let secondary_namespace = - get_key_from_dir_entry_path(&secondary_path, &primary_path)?; + let primary_namespace = resolve_ns(&primary_path, prefixed_dest)?; + let secondary_namespace = resolve_ns(&secondary_path, &primary_path)?; let key = get_key_from_dir_entry_path(&tertiary_path, &secondary_path)?; keys.push((primary_namespace, secondary_namespace, key)); } else { @@ -621,6 +755,109 @@ impl FilesystemStoreState { } Ok(keys) } + + /// Sync entry point for paginated listing. + /// + /// Collects all entries with their modification times, sorts by mtime descending + /// (newest first), and returns a page of results with an optional next page token. + pub(crate) fn list_paginated_impl( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> lightning::io::Result { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + None, + "list_paginated", + true, + )?; + self.inner.list_paginated(path, page_token) + } + + #[cfg(feature = "tokio")] + pub(crate) fn list_paginated_async( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + Send + { + let this = Arc::clone(&self.inner); + + let path = this.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + None, + "list_paginated", + true, + ); + + async move { + let path = match path { + Ok(path) => path, + Err(e) => return Err(e), + }; + tokio::task::spawn_blocking(move || this.list_paginated(path, page_token)) + .await + .unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) + } + } +} + +/// The fixed page size for paginated listing operations. +pub(crate) const PAGE_SIZE: usize = 50; + +/// The length of the timestamp in a page token (milliseconds since epoch as 16-digit decimal). +const PAGE_TOKEN_TIMESTAMP_LEN: usize = 16; + +/// Formats a page token from mtime (millis since epoch) and key. +pub(crate) fn format_page_token(mtime_millis: u64, key: &str) -> String { + format!("{:016}:{}", mtime_millis, key) +} + +/// Parses a page token into mtime (millis since epoch) and key. +pub(crate) fn parse_page_token(token: &str) -> lightning::io::Result<(u64, String)> { + let colon_pos = token.find(':').ok_or_else(|| { + lightning::io::Error::new( + lightning::io::ErrorKind::InvalidInput, + "Invalid page token format", + ) + })?; + + if colon_pos != PAGE_TOKEN_TIMESTAMP_LEN { + return Err(lightning::io::Error::new( + lightning::io::ErrorKind::InvalidInput, + "Invalid page token format", + )); + } + + let mtime = token[..colon_pos].parse::().map_err(|_| { + lightning::io::Error::new( + lightning::io::ErrorKind::InvalidInput, + "Invalid page token timestamp", + ) + })?; + + let key = token[colon_pos + 1..].to_string(); + + Ok((mtime, key)) +} + +/// Extracts a namespace string from a directory path, converting [`EMPTY_NAMESPACE_DIR`] to an +/// empty string. +fn namespace_from_dir_path(path: &Path) -> Result { + let name = path.file_name().and_then(|n| n.to_str()).ok_or_else(|| { + lightning::io::Error::new( + lightning::io::ErrorKind::Other, + format!( + "Failed to extract namespace from path {}", + PrintableString(path.to_str().unwrap_or_default()) + ), + ) + })?; + if name == EMPTY_NAMESPACE_DIR { + Ok(String::new()) + } else { + Ok(name.to_string()) + } } fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result { diff --git a/lightning-persister/src/fs_store_v2.rs b/lightning-persister/src/fs_store_v2.rs new file mode 100644 index 00000000000..44a1a4bed01 --- /dev/null +++ b/lightning-persister/src/fs_store_v2.rs @@ -0,0 +1,526 @@ +//! Objects related to [`FilesystemStoreV2`] live here. +use crate::fs_store_common::FilesystemStoreState; + +use lightning::util::persist::{ + KVStoreSync, MigratableKVStore, PageToken, PaginatedKVStoreSync, PaginatedListResponse, +}; + +use std::fs; +use std::path::PathBuf; + +#[cfg(feature = "tokio")] +use core::future::Future; +#[cfg(feature = "tokio")] +use lightning::util::persist::{KVStore, PaginatedKVStore}; + +/// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system. +/// +/// This is version 2 of the filesystem store which provides: +/// - Consistent directory structure using `[empty]` for empty namespaces +/// - File modification times for creation-order pagination +/// - Support for [`PaginatedKVStoreSync`] with newest-first ordering +/// +/// ## Directory Structure +/// +/// Files are stored with a consistent two-level namespace hierarchy: +/// ```text +/// data_dir/ +/// [empty]/ # empty primary namespace +/// [empty]/ # empty secondary namespace +/// {key} +/// primary_ns/ +/// [empty]/ # empty secondary namespace +/// {key} +/// secondary_ns/ +/// {key} +/// ``` +/// +/// ## File Ordering +/// +/// Files are ordered by their modification time (mtime). When a file is created, it gets +/// the current time. When updated, the original creation time is preserved by setting +/// the mtime of the new file to match the original before the atomic rename. +/// +/// [`KVStore`]: lightning::util::persist::KVStore +pub struct FilesystemStoreV2 { + inner: FilesystemStoreState, +} + +impl FilesystemStoreV2 { + /// Constructs a new [`FilesystemStoreV2`]. + /// + /// Returns an error if the data directory already exists and contains files at the top level, + /// which would indicate it was previously used by a [`FilesystemStore`] (v1). The v2 store + /// expects only directories (namespaces) at the top level. + /// + /// [`FilesystemStore`]: crate::fs_store::FilesystemStore + pub fn new(data_dir: PathBuf) -> std::io::Result { + if data_dir.exists() { + for entry in fs::read_dir(&data_dir)? { + let entry = entry?; + if entry.file_type()?.is_file() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Found file `{}` in the top-level data directory. \ + This indicates the directory was previously used by FilesystemStore (v1). \ + Please migrate your data or use a different directory.", + entry.path().display() + ), + )); + } + } + } + + Ok(Self { inner: FilesystemStoreState::new(data_dir) }) + } + + /// Returns the data directory. + pub fn get_data_dir(&self) -> PathBuf { + self.inner.get_data_dir() + } + + #[cfg(any(all(feature = "tokio", test), fuzzing))] + /// Returns the size of the async state. + pub fn state_size(&self) -> usize { + self.inner.state_size() + } +} + +impl KVStoreSync for FilesystemStoreV2 { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, lightning::io::Error> { + self.inner.read_impl(primary_namespace, secondary_namespace, key, true) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), lightning::io::Error> { + self.inner.write_impl(primary_namespace, secondary_namespace, key, buf, true) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), lightning::io::Error> { + self.inner.remove_impl(primary_namespace, secondary_namespace, key, lazy, true) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, lightning::io::Error> { + self.inner.list_impl(primary_namespace, secondary_namespace, true) + } +} + +impl PaginatedKVStoreSync for FilesystemStoreV2 { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> Result { + self.inner.list_paginated_impl(primary_namespace, secondary_namespace, page_token) + } +} + +#[cfg(feature = "tokio")] +impl KVStore for FilesystemStoreV2 { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, lightning::io::Error>> + 'static + Send { + self.inner.read_async(primary_namespace, secondary_namespace, key, true) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + self.inner.write_async(primary_namespace, secondary_namespace, key, buf, true) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + self.inner.remove_async(primary_namespace, secondary_namespace, key, lazy, true) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, lightning::io::Error>> + 'static + Send { + self.inner.list_async(primary_namespace, secondary_namespace, true) + } +} + +#[cfg(feature = "tokio")] +impl PaginatedKVStore for FilesystemStoreV2 { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + Send + { + self.inner.list_paginated_async(primary_namespace, secondary_namespace, page_token) + } +} + +impl MigratableKVStore for FilesystemStoreV2 { + fn list_all_keys(&self) -> Result, lightning::io::Error> { + self.inner.list_all_keys_impl(true) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::fs_store_common::{ + format_page_token, parse_page_token, EMPTY_NAMESPACE_DIR, PAGE_SIZE, + }; + use crate::test_utils::{ + do_read_write_remove_list_persist, do_test_data_migration, do_test_store, + }; + use std::fs::FileTimes; + use std::time::UNIX_EPOCH; + + impl Drop for FilesystemStoreV2 { + fn drop(&mut self) { + // We test for invalid directory names, so it's OK if directory removal + // fails. + match fs::remove_dir_all(&self.inner.get_data_dir()) { + Err(e) => println!("Failed to remove test persister directory: {}", e), + _ => {}, + } + } + } + + #[test] + fn read_write_remove_list_persist() { + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_read_write_remove_list_persist_v2"); + let fs_store = FilesystemStoreV2::new(temp_path).unwrap(); + do_read_write_remove_list_persist(&fs_store); + } + + #[cfg(feature = "tokio")] + #[tokio::test] + async fn read_write_remove_list_persist_async() { + use lightning::util::persist::KVStore; + use std::sync::Arc; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_read_write_remove_list_persist_async_v2"); + let fs_store = Arc::new(FilesystemStoreV2::new(temp_path).unwrap()); + assert_eq!(fs_store.state_size(), 0); + + let async_fs_store = Arc::clone(&fs_store); + + let data1 = vec![42u8; 32]; + let data2 = vec![43u8; 32]; + + let primary = "testspace"; + let secondary = "testsubspace"; + let key = "testkey"; + + // Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure + // that eventual consistency works. + let fut1 = KVStore::write(&*async_fs_store, primary, secondary, key, data1); + assert_eq!(fs_store.state_size(), 1); + + let fut2 = KVStore::remove(&*async_fs_store, primary, secondary, key, false); + assert_eq!(fs_store.state_size(), 1); + + let fut3 = KVStore::write(&*async_fs_store, primary, secondary, key, data2.clone()); + assert_eq!(fs_store.state_size(), 1); + + fut3.await.unwrap(); + assert_eq!(fs_store.state_size(), 1); + + fut2.await.unwrap(); + assert_eq!(fs_store.state_size(), 1); + + fut1.await.unwrap(); + assert_eq!(fs_store.state_size(), 0); + + // Test list. + let listed_keys = KVStore::list(&*async_fs_store, primary, secondary).await.unwrap(); + assert_eq!(listed_keys.len(), 1); + assert_eq!(listed_keys[0], key); + + // Test read. We expect to read data2, as the write call was initiated later. + let read_data = KVStore::read(&*async_fs_store, primary, secondary, key).await.unwrap(); + assert_eq!(data2, &*read_data); + + // Test remove. + KVStore::remove(&*async_fs_store, primary, secondary, key, false).await.unwrap(); + + let listed_keys = KVStore::list(&*async_fs_store, primary, secondary).await.unwrap(); + assert_eq!(listed_keys.len(), 0); + } + + #[test] + fn test_data_migration() { + let mut source_temp_path = std::env::temp_dir(); + source_temp_path.push("test_data_migration_source_v2"); + let mut source_store = FilesystemStoreV2::new(source_temp_path).unwrap(); + + let mut target_temp_path = std::env::temp_dir(); + target_temp_path.push("test_data_migration_target_v2"); + let mut target_store = FilesystemStoreV2::new(target_temp_path).unwrap(); + + do_test_data_migration(&mut source_store, &mut target_store); + } + + #[test] + fn test_filesystem_store_v2() { + // Create the nodes, giving them FilesystemStoreV2s for data stores. + let store_0 = FilesystemStoreV2::new("test_filesystem_store_v2_0".into()).unwrap(); + let store_1 = FilesystemStoreV2::new("test_filesystem_store_v2_1".into()).unwrap(); + do_test_store(&store_0, &store_1) + } + + #[test] + fn test_page_token_format() { + let mtime: u64 = 1706500000000; + let key = "test_key"; + let token = format_page_token(mtime, key); + assert_eq!(token, "0001706500000000:test_key"); + + let parsed = parse_page_token(&token).unwrap(); + assert_eq!(parsed, (mtime, key.to_string())); + + // Test invalid tokens + assert!(parse_page_token("invalid").is_err()); + assert!(parse_page_token("0001706500000000_key").is_err()); // wrong separator + } + + #[test] + fn test_directory_structure() { + use lightning::util::persist::KVStoreSync; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_directory_structure_v2"); + let fs_store = FilesystemStoreV2::new(temp_path.clone()).unwrap(); + + let data = vec![42u8; 32]; + + // Write with empty namespaces + KVStoreSync::write(&fs_store, "", "", "key1", data.clone()).unwrap(); + assert!(temp_path.join(EMPTY_NAMESPACE_DIR).join(EMPTY_NAMESPACE_DIR).exists()); + + // Write with non-empty primary, empty secondary + KVStoreSync::write(&fs_store, "primary", "", "key2", data.clone()).unwrap(); + assert!(temp_path.join("primary").join(EMPTY_NAMESPACE_DIR).exists()); + + // Write with both non-empty + KVStoreSync::write(&fs_store, "primary", "secondary", "key3", data.clone()).unwrap(); + assert!(temp_path.join("primary").join("secondary").exists()); + + // Verify we can read them back + assert_eq!(KVStoreSync::read(&fs_store, "", "", "key1").unwrap(), data); + assert_eq!(KVStoreSync::read(&fs_store, "primary", "", "key2").unwrap(), data); + assert_eq!(KVStoreSync::read(&fs_store, "primary", "secondary", "key3").unwrap(), data); + + // Verify files are named just by key (no timestamp prefix) + assert!(temp_path + .join(EMPTY_NAMESPACE_DIR) + .join(EMPTY_NAMESPACE_DIR) + .join("key1") + .exists()); + assert!(temp_path.join("primary").join(EMPTY_NAMESPACE_DIR).join("key2").exists()); + assert!(temp_path.join("primary").join("secondary").join("key3").exists()); + } + + #[test] + fn test_update_preserves_mtime() { + use lightning::util::persist::KVStoreSync; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_update_preserves_mtime_v2"); + let fs_store = FilesystemStoreV2::new(temp_path.clone()).unwrap(); + + let data1 = vec![42u8; 32]; + let data2 = vec![43u8; 32]; + + // Write initial data + KVStoreSync::write(&fs_store, "ns", "sub", "key", data1).unwrap(); + + // Get the original mtime + let file_path = temp_path.join("ns").join("sub").join("key"); + let original_mtime = fs::metadata(&file_path).unwrap().modified().unwrap(); + + // Sleep briefly to ensure different timestamp if not preserved + std::thread::sleep(std::time::Duration::from_millis(50)); + + // Update with new data + KVStoreSync::write(&fs_store, "ns", "sub", "key", data2.clone()).unwrap(); + + // Verify mtime is preserved + let updated_mtime = fs::metadata(&file_path).unwrap().modified().unwrap(); + assert_eq!(original_mtime, updated_mtime); + + // Verify data was updated + assert_eq!(KVStoreSync::read(&fs_store, "ns", "sub", "key").unwrap(), data2); + } + + #[test] + fn test_paginated_listing() { + use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync}; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_paginated_listing_v2"); + let fs_store = FilesystemStoreV2::new(temp_path).unwrap(); + + let data = vec![42u8; 32]; + + // Write several keys with small delays to ensure different mtimes + let keys: Vec = (0..5).map(|i| format!("key{}", i)).collect(); + for key in &keys { + KVStoreSync::write(&fs_store, "ns", "sub", key, data.clone()).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + // List paginated - should return newest first + let response = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response.keys.len(), 5); + // Newest key (key4) should be first + assert_eq!(response.keys[0], "key4"); + assert_eq!(response.keys[4], "key0"); + assert!(response.next_page_token.is_none()); // Less than PAGE_SIZE items + } + + #[test] + fn test_paginated_listing_with_pagination() { + use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync}; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_paginated_listing_with_pagination_v2"); + let fs_store = FilesystemStoreV2::new(temp_path).unwrap(); + + let data = vec![42u8; 32]; + + // Write more than PAGE_SIZE keys + let num_keys = PAGE_SIZE + 50; + for i in 0..num_keys { + let key = format!("key{:04}", i); + KVStoreSync::write(&fs_store, "ns", "sub", &key, data.clone()).unwrap(); + // Small delay to ensure ordering + if i % 10 == 0 { + std::thread::sleep(std::time::Duration::from_millis(1)); + } + } + + // First page + let response1 = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response1.keys.len(), PAGE_SIZE); + assert!(response1.next_page_token.is_some()); + + // Second page + let response2 = + PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", response1.next_page_token) + .unwrap(); + assert_eq!(response2.keys.len(), 50); + assert!(response2.next_page_token.is_none()); + + // Verify no duplicates between pages + let all_keys: std::collections::HashSet<_> = + response1.keys.iter().chain(response2.keys.iter()).collect(); + assert_eq!(all_keys.len(), num_keys); + } + + #[test] + fn test_page_token_after_deletion() { + use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync}; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_page_token_after_deletion_v2"); + let fs_store = FilesystemStoreV2::new(temp_path).unwrap(); + + let data = vec![42u8; 32]; + + // Write keys + for i in 0..10 { + let key = format!("key{}", i); + KVStoreSync::write(&fs_store, "ns", "sub", &key, data.clone()).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + // Verify initial listing + let response1 = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response1.keys.len(), 10); + + // Delete some keys + KVStoreSync::remove(&fs_store, "ns", "sub", "key5", false).unwrap(); + KVStoreSync::remove(&fs_store, "ns", "sub", "key3", false).unwrap(); + + // List again - should work fine with deleted keys + let response2 = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response2.keys.len(), 8); // 10 - 2 deleted + } + + #[test] + fn test_same_mtime_sorted_by_key() { + use lightning::util::persist::PaginatedKVStoreSync; + use std::time::Duration; + + // Create files directly on disk first with the same mtime + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_same_mtime_sorted_by_key_v2"); + let _ = fs::remove_dir_all(&temp_path); + + let data = vec![42u8; 32]; + let dir = temp_path.join("ns").join("sub"); + fs::create_dir_all(&dir).unwrap(); + + // Write files with the same mtime but different keys + let keys = vec!["zebra", "apple", "mango", "banana"]; + let fixed_time = UNIX_EPOCH + Duration::from_secs(1706500000); + + for key in &keys { + let file_path = dir.join(key); + let file = fs::File::create(&file_path).unwrap(); + std::io::Write::write_all(&mut &file, &data).unwrap(); + file.set_times(FileTimes::new().set_modified(fixed_time)).unwrap(); + } + + // Open the store + let fs_store = FilesystemStoreV2::new(temp_path.clone()).unwrap(); + + // List paginated - should return keys sorted by key in reverse order + // (for same mtime, keys are sorted reverse alphabetically) + let response = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response.keys.len(), 4); + + // Same mtime means sorted by key in reverse order (z > m > b > a) + assert_eq!(response.keys[0], "zebra"); + assert_eq!(response.keys[1], "mango"); + assert_eq!(response.keys[2], "banana"); + assert_eq!(response.keys[3], "apple"); + } + + #[test] + fn test_rejects_v1_data_directory() { + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_rejects_v1_data_directory"); + let _ = fs::remove_dir_all(&temp_path); + fs::create_dir_all(&temp_path).unwrap(); + + // Create a file at the top level, as v1 would for an empty primary namespace + fs::write(temp_path.join("some_key"), b"data").unwrap(); + + // V2 construction should fail + match FilesystemStoreV2::new(temp_path.clone()) { + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); + assert!(err.to_string().contains("FilesystemStore (v1)")); + }, + Ok(_) => panic!("Expected error for directory with top-level files"), + } + + // Clean up + let _ = fs::remove_dir_all(&temp_path); + + // An empty directory should succeed + fs::create_dir_all(&temp_path).unwrap(); + let result = FilesystemStoreV2::new(temp_path.clone()); + assert!(result.is_ok()); + + // A directory with only subdirectories should succeed + fs::create_dir_all(temp_path.join("some_namespace")).unwrap(); + let result = FilesystemStoreV2::new(temp_path); + assert!(result.is_ok()); + } +} diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index e4973f532e8..ba64e916335 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -9,6 +9,7 @@ extern crate criterion; pub mod fs_store; +pub mod fs_store_v2; mod fs_store_common; mod utils;