From e6548fbc1f157329b383acc61d74a211b93d69cd Mon Sep 17 00:00:00 2001 From: Enigbe Date: Tue, 7 Apr 2026 11:27:40 +0100 Subject: [PATCH 1/3] Implement tiered storage This commit adds `TierStore`, a tiered `KVStore` implementation that routes node persistence across three storage roles: - a primary store for durable, authoritative data - an optional backup store for a second durable copy of primary-backed data - an optional ephemeral store for rebuildable cached data such as the network graph and scorer TierStore routes ephemeral cache data to the ephemeral store when configured, while durable data remains primary+backup. Reads and lists do not consult the backup store during normal operation. For primary+backup writes and removals, this implementation treats the backup store as part of the persistence success path rather than as a best-effort background mirror. Earlier designs used asynchronous backup queueing to avoid blocking the primary path, but that weakens the durability contract by allowing primary success to be reported before backup persistence has completed. TierStore now issues primary and backup operations together and only returns success once both complete. This gives callers a clearer persistence guarantee when a backup store is configured: acknowledged primary+backup mutations have been attempted against both durable stores. The tradeoff is that dual-store operations are not atomic across stores, so an error may still be returned after one store has already been updated. Additionally, adds unit coverage for the current contract, including: - basic read/write/remove/list persistence - routing of ephemeral data away from the primary store - backup participation in the foreground success path for writes and removals --- src/io/mod.rs | 1 + src/io/tier_store.rs | 817 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 818 insertions(+) create mode 100644 src/io/tier_store.rs diff --git a/src/io/mod.rs b/src/io/mod.rs index a01aa59a8..f0bbaee46 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -12,6 +12,7 @@ pub mod postgres_store; pub mod sqlite_store; #[cfg(test)] pub(crate) mod test_utils; +pub(crate) mod tier_store; pub(crate) mod utils; pub mod vss_store; diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs new file mode 100644 index 000000000..9d4b6603e --- /dev/null +++ b/src/io/tier_store.rs @@ -0,0 +1,817 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. +#![allow(dead_code)] // TODO: Temporal warning silencer. Will be removed in later commit. + +use std::collections::HashMap; +use std::future::Future; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; + +use lightning::util::persist::{ + KVStore, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, +}; +use lightning::{io, log_error}; +use tokio::sync::Mutex as TokioMutex; + +use crate::io::utils::check_namespace_key_validity; +use crate::logger::{LdkLogger, Logger}; +use crate::types::DynStore; + +/// A 3-tiered [`KVStore`] implementation that routes data across +/// storage backends that may be local or remote: +/// - a primary store for durable, authoritative persistence, +/// - an optional backup store that maintains an additional durable copy of +/// primary-backed data, and +/// - an optional ephemeral store for non-critical, rebuildable cached data. +/// +/// When a backup store is configured, writes and removals for primary-backed data +/// are issued to the primary and backup stores concurrently and only succeed once +/// both stores complete successfully. +/// +/// Reads and lists do not consult the backup store during normal operation. +/// Ephemeral data is read from and written to the ephemeral store when configured. +/// +/// Note that dual-store writes and removals are not atomic across the primary and +/// backup stores. If one store succeeds and the other fails, the operation +/// returns an error even though one store may already reflect the change. +pub(crate) struct TierStore { + inner: Arc, +} + +impl TierStore { + pub fn new(primary_store: Arc, logger: Arc) -> Self { + let inner = Arc::new(TierStoreInner::new(primary_store, Arc::clone(&logger))); + + Self { inner } + } + + /// Configures a backup store for primary-backed data. + /// + /// Once set, writes and removals targeting the primary tier succeed only if both + /// the primary and backup stores succeed. The two operations are issued + /// concurrently, and any failure is returned to the caller. + /// + /// Note: dual-store writes/removals are not atomic. An error may be returned + /// after the primary store has already been updated if the backup store fails. + /// + /// The backup store is not consulted for normal reads or lists. + pub fn set_backup_store(&mut self, backup: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.backup_store = Some(backup); + } + + /// Configures the ephemeral store for non-critical, rebuildable data. + /// + /// When configured, selected cache-like data is routed to this store instead of + /// the primary store. + pub fn set_ephemeral_store(&mut self, ephemeral: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.ephemeral_store = Some(ephemeral); + } +} + +impl KVStore for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + let locking_key = inner.build_locking_key(primary_namespace, secondary_namespace, key); + let (lock_ref, version) = inner.get_new_version_and_lock_ref(locking_key.clone()); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { + inner + .write_internal( + primary_namespace, + secondary_namespace, + key, + buf, + lock_ref, + locking_key, + version, + ) + .await + } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + let locking_key = inner.build_locking_key(primary_namespace, secondary_namespace, key); + let (lock_ref, version) = inner.get_new_version_and_lock_ref(locking_key.clone()); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { + inner + .remove_internal( + primary_namespace, + secondary_namespace, + key, + lazy, + lock_ref, + locking_key, + version, + ) + .await + } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + async move { inner.list_internal(primary_namespace, secondary_namespace).await } + } +} + +struct TierStoreInner { + /// The authoritative store for durable data. + primary_store: Arc, + /// The store used for non-critical, rebuildable cached data. + ephemeral_store: Option>, + /// An optional second durable store for primary-backed data. + backup_store: Option>, + /// Per-key locks for serializing primary+backup operations and skipping stale writes. + locks: Mutex>>>, + next_write_version: AtomicU64, + logger: Arc, +} + +impl TierStoreInner { + /// Creates a tier store with the primary data store. + pub fn new(primary_store: Arc, logger: Arc) -> Self { + Self { + primary_store, + ephemeral_store: None, + backup_store: None, + locks: Mutex::new(HashMap::new()), + next_write_version: AtomicU64::new(1), + logger, + } + } + + fn get_new_version_and_lock_ref(&self, locking_key: String) -> (Arc>, u64) { + let version = self.next_write_version.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("TierStore version counter overflowed"); + } + + let mut locks = self.locks.lock().expect("lock"); + let lock_ref = + Arc::clone(locks.entry(locking_key).or_insert_with(|| Arc::new(TokioMutex::new(0)))); + + (lock_ref, version) + } + + fn clean_locks(&self, lock_ref: &Arc>, locking_key: String) { + let mut locks = self.locks.lock().expect("lock"); + let strong_count = Arc::strong_count(lock_ref); + debug_assert!(strong_count >= 2, "Unexpected TierStore lock strong count"); + if strong_count == 2 { + locks.remove(&locking_key); + } + } + + fn build_locking_key( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> String { + if primary_namespace.is_empty() { + key.to_owned() + } else { + format!("{}#{}#{}", primary_namespace, secondary_namespace, key) + } + } + + /// Reads from the primary data store. + async fn read_primary( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStore::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) + .await + { + Ok(data) => Ok(data), + Err(e) => Err(e), + } + } + + /// Lists keys from the primary data store. + async fn list_primary( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStore::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + .await + { + Ok(keys) => Ok(keys), + Err(e) => { + log_error!( + self.logger, + "Failed to list from primary store for namespace {}/{}: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + async fn write_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + if let Some(backup_store) = self.backup_store.as_ref() { + let primary_fut = KVStore::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + + let backup_fut = KVStore::write( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "write", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + KVStore::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ) + .await + } + } + + async fn remove_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + let primary_fut = KVStore::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + if let Some(backup_store) = self.backup_store.as_ref() { + let backup_fut = KVStore::remove( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "removal", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + primary_fut.await + } + } + + async fn execute_locked_write( + &self, lock_ref: Arc>, locking_key: String, version: u64, callback: F, + ) -> io::Result<()> + where + F: FnOnce() -> Fut, + Fut: Future>, + { + let res = { + let mut last_written_version = lock_ref.lock().await; + + if version <= *last_written_version { + Ok(()) + } else { + let res = callback().await; + if res.is_ok() { + *last_written_version = version; + } + res + } + }; + + self.clean_locks(&lock_ref, locking_key); + res + } + + async fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + if is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key) { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We don't retry ephemeral-store reads here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + return KVStore::read( + eph_store.as_ref(), + &primary_namespace, + &secondary_namespace, + &key, + ) + .await; + } + } + + self.read_primary(&primary_namespace, &secondary_namespace, &key).await + } + + async fn write_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + lock_ref: Arc>, locking_key: String, version: u64, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "write", + )?; + + if is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key) { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + let res = KVStore::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await; + self.clean_locks(&lock_ref, locking_key); + return res; + } + } + + self.execute_locked_write(lock_ref, locking_key, version, || async move { + self.write_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + }) + .await + } + + async fn remove_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + lock_ref: Arc>, locking_key: String, version: u64, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "remove", + )?; + + if is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key) { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + let res = KVStore::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await; + self.clean_locks(&lock_ref, locking_key); + return res; + } + } + + self.execute_locked_write(lock_ref, locking_key, version, || async move { + self.remove_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + }) + .await + } + + async fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We don't retry ephemeral-store lists here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + KVStore::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + .await + } else { + self.list_primary(&primary_namespace, &secondary_namespace).await + } + }, + _ => self.list_primary(&primary_namespace, &secondary_namespace).await, + } + } + + fn handle_primary_backup_results( + &self, op: &str, primary_namespace: &str, secondary_namespace: &str, key: &str, + primary_res: io::Result<()>, backup_res: io::Result<()>, + ) -> io::Result<()> { + match (primary_res, backup_res) { + (Ok(()), Ok(())) => Ok(()), + (Err(primary_err), Ok(())) => { + log_error!( + self.logger, + "Primary {} failed after backup {} succeeded for key {}/{}/{}; primary and backup may have diverged: {}", + op, + op, + primary_namespace, + secondary_namespace, + key, + primary_err + ); + Err(primary_err) + }, + (Ok(()), Err(backup_err)) => { + log_error!( + self.logger, + "Backup {} failed after primary {} succeeded for key {}/{}/{}; primary and backup may have diverged: {}", + op, + op, + primary_namespace, + secondary_namespace, + key, + backup_err + ); + Err(backup_err) + }, + (Err(primary_err), Err(backup_err)) => { + log_error!( + self.logger, + "Primary and backup {}s both failed for key {}/{}/{}: primary={}, backup={}", + op, + primary_namespace, + secondary_namespace, + key, + primary_err, + backup_err + ); + Err(primary_err) + }, + } + } +} + +fn is_ephemeral_cached_key(pn: &str, sn: &str, key: &str) -> bool { + matches!( + (pn, sn, key), + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) + ) +} + +#[cfg(test)] +mod tests { + use std::panic::RefUnwindSafe; + use std::path::PathBuf; + use std::sync::Arc; + + use lightning::util::logger::Level; + use lightning::util::persist::{ + CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + }; + use lightning_persister::fs_store::v2::FilesystemStoreV2; + + use super::*; + use crate::io::test_utils::{do_read_write_remove_list_persist, random_storage_path}; + use crate::io::tier_store::TierStore; + use crate::logger::Logger; + use crate::types::{DynStore, DynStoreWrapper}; + + impl RefUnwindSafe for TierStore {} + + struct CleanupDir(PathBuf); + impl Drop for CleanupDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + fn setup_tier_store(primary_store: Arc, logger: Arc) -> TierStore { + TierStore::new(primary_store, logger) + } + + #[tokio::test] + async fn write_read_list_remove() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("primary")).unwrap())); + let tier = setup_tier_store(primary_store, logger); + + do_read_write_remove_list_persist(&tier).await; + } + + #[tokio::test] + async fn ephemeral_routing() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("primary")).unwrap())); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let ephemeral_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("ephemeral")).unwrap())); + tier.set_ephemeral_store(Arc::clone(&ephemeral_store)); + + let data = vec![42u8; 32]; + + tier.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + data.clone(), + ) + .await + .unwrap(); + + tier.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .await + .unwrap(); + + let primary_read_ng = primary_store + .read( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ) + .await; + let ephemeral_read_ng = ephemeral_store + .read( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ) + .await; + + let primary_read_cm = primary_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .await; + let ephemeral_read_cm = ephemeral_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .await; + + assert!(primary_read_ng.is_err()); + assert_eq!(ephemeral_read_ng.unwrap(), data); + + assert!(ephemeral_read_cm.is_err()); + assert_eq!(primary_read_cm.unwrap(), data); + } + + #[tokio::test] + async fn primary_backed_writes_preserve_latest_call_order() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("primary")).unwrap())); + let tier = setup_tier_store(primary_store, logger); + + let old_data = vec![1u8; 32]; + let new_data = vec![2u8; 32]; + + let old_write = tier.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + old_data, + ); + let new_write = tier.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + new_data.clone(), + ); + + new_write.await.unwrap(); + old_write.await.unwrap(); + + // Stale data doesn't overwrite latest + let persisted = tier + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .await + .unwrap(); + assert_eq!(persisted, new_data); + } + + #[tokio::test] + async fn backup_write_is_part_of_success_path() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("primary")).unwrap())); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("backup")).unwrap())); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + tier.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .await + .unwrap(); + + let primary_read = primary_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .await; + let backup_read = backup_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .await; + + assert_eq!(primary_read.unwrap(), data); + assert_eq!(backup_read.unwrap(), data); + } + + #[tokio::test] + async fn backup_remove_is_part_of_success_path() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("primary")).unwrap())); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("backup")).unwrap())); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + + tier.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + data, + ) + .await + .unwrap(); + + tier.remove( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + true, + ) + .await + .unwrap(); + + let primary_read = primary_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ) + .await; + let backup_read = backup_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ) + .await; + + assert!(primary_read.is_err()); + assert!(backup_read.is_err()); + } +} From 0094d33977adddcb78adc0e8df266eab92d0fdac Mon Sep 17 00:00:00 2001 From: Enigbe Date: Tue, 7 Apr 2026 19:17:22 +0100 Subject: [PATCH 2/3] Integrate TierStore into NodeBuilder Add native builder support for configuring ephemeral storage and a local SQLite backup mirror. Wrap the primary store in TierStore during node construction and create configured secondary stores using dedicated SQLite database files. Implement paginated listing through TierStore and update filesystem-backed tests to use FilesystemStoreV2. Add full-cycle integration coverage verifying durable backup mirroring. --- src/builder.rs | 87 ++++++++++++++++++- src/io/sqlite_store/mod.rs | 4 + src/io/tier_store.rs | 144 +++++++++++++++++++++++++++++++- tests/common/mod.rs | 6 +- tests/integration_tests_rust.rs | 74 ++++++++++++++++ 5 files changed, 306 insertions(+), 9 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index d142f51af..caeec2bd0 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -57,6 +57,7 @@ use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; +use crate::io::tier_store::TierStore; use crate::io::utils::{ open_or_migrate_fs_store, read_all_objects, read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, read_node_metrics, @@ -150,6 +151,12 @@ impl std::fmt::Debug for LogWriterConfig { } } +#[derive(Default, Debug)] +struct TierStoreConfig { + ephemeral_storage_dir_path: Option, + backup_storage_dir_path: Option, +} + /// An error encountered during building a [`Node`]. /// /// [`Node`]: crate::Node @@ -285,6 +292,7 @@ pub struct NodeBuilder { liquidity_source_config: Option, log_writer_config: Option, async_payments_role: Option, + tier_store_config: Option, runtime_handle: Option, pathfinding_scores_sync_config: Option, recovery_mode: bool, @@ -303,6 +311,7 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let tier_store_config = None; let runtime_handle = None; let pathfinding_scores_sync_config = None; let recovery_mode = false; @@ -312,6 +321,7 @@ impl NodeBuilder { gossip_source_config, liquidity_source_config, log_writer_config, + tier_store_config, runtime_handle, async_payments_role: None, pathfinding_scores_sync_config, @@ -612,6 +622,39 @@ impl NodeBuilder { self } + /// Configures a local SQLite backup store for disaster recovery. + /// + /// When building with tiered storage, a SQLite store will be created at the + /// given directory path using [`SQLITE_BACKUP_DB_FILE_NAME`] as its database + /// file name. It receives a second durable copy of data written to the + /// primary store. + /// + /// Writes and removals for primary-backed data only succeed once both the + /// primary and backup SQLite stores complete successfully. + /// + /// If not set, durable data will be stored only in the primary store. + /// + /// [`SQLITE_BACKUP_DB_FILE_NAME`]: crate::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME + pub fn set_backup_storage_dir_path(&mut self, backup_storage_dir_path: String) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.backup_storage_dir_path = Some(backup_storage_dir_path.into()); + self + } + + /// Configures the ephemeral storage directory path for non-critical, frequently-accessed data. + /// + /// When set, a local SQLite store is created at this path for ephemeral data like + /// the network graph and scorer. Data stored here can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + pub fn set_ephemeral_storage_dir_path( + &mut self, ephemeral_storage_dir_path: String, + ) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.ephemeral_storage_dir_path = Some(ephemeral_storage_dir_path.into()); + self + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: NodeEntropy) -> Result { @@ -813,11 +856,18 @@ impl NodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. + /// + /// The provided `kv_store` will be used as the primary storage backend. Optionally, + /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) + /// and a local SQLite backup store for disaster recovery can be configured via + /// [`set_ephemeral_storage_dir_path`] and [`set_backup_storage_dir_path`]. + /// + /// [`set_ephemeral_storage_dir_path`]: Self::set_ephemeral_storage_dir_path + /// [`set_backup_storage_dir_path`]: Self::set_backup_storage_dir_path pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, ) -> Result { let logger = setup_logger(&self.log_writer_config, &self.config)?; - self.build_with_store_and_logger(node_entropy, kv_store, logger) } @@ -842,6 +892,39 @@ impl NodeBuilder { fn build_with_store_runtime_and_logger( &self, node_entropy: NodeEntropy, kv_store: S, runtime: Arc, logger: Arc, ) -> Result { + let ts_config = self.tier_store_config.as_ref(); + let primary_store = Arc::new(DynStoreWrapper(kv_store)); + let mut tier_store = TierStore::new(primary_store, Arc::clone(&logger)); + if let Some(config) = ts_config { + if let Some(ephemeral_storage_dir_path) = config.ephemeral_storage_dir_path.as_ref() { + let ephemeral_store = SqliteStore::new( + ephemeral_storage_dir_path.clone(), + Some(io::sqlite_store::SQLITE_EPHEMERAL_DB_FILE_NAME.to_string()), + Some(io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .map_err(|e| { + log_error!(logger, "Failed to setup ephemeral SQLite store: {}", e); + BuildError::KVStoreSetupFailed + })?; + let ephemeral_store: Arc = Arc::new(DynStoreWrapper(ephemeral_store)); + tier_store.set_ephemeral_store(ephemeral_store); + } + + if let Some(backup_storage_dir_path) = config.backup_storage_dir_path.as_ref() { + let backup_store = SqliteStore::new( + backup_storage_dir_path.clone(), + Some(io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME.to_string()), + Some(io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .map_err(|e| { + log_error!(logger, "Failed to setup backup SQLite store: {}", e); + BuildError::KVStoreSetupFailed + })?; + let backup_store: Arc = Arc::new(DynStoreWrapper(backup_store)); + tier_store.set_backup_store(backup_store); + } + } + let seed_bytes = node_entropy.to_seed_bytes(); let config = Arc::new(self.config.clone()); @@ -856,7 +939,7 @@ impl NodeBuilder { seed_bytes, runtime, logger, - Arc::new(DynStoreWrapper(kv_store)), + Arc::new(DynStoreWrapper(tier_store)), ) } } diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 076aeef9b..ddafc3748 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -24,6 +24,10 @@ mod migrations; /// LDK Node's database file name. pub const SQLITE_DB_FILE_NAME: &str = "ldk_node_data.sqlite"; +/// LDK Node's backup database file name. +pub const SQLITE_BACKUP_DB_FILE_NAME: &str = "ldk_node_data_backup.sqlite"; +/// LDK Node's ephemeral database file name. +pub const SQLITE_EPHEMERAL_DB_FILE_NAME: &str = "ldk_node_data_ephemeral.sqlite"; /// LDK Node's table in which we store all data. pub const KV_TABLE_NAME: &str = "ldk_node_data"; diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index 9d4b6603e..294bd8395 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -4,7 +4,6 @@ // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license , at your option. You may not use this file except in // accordance with one or both of these licenses. -#![allow(dead_code)] // TODO: Temporal warning silencer. Will be removed in later commit. use std::collections::HashMap; use std::future::Future; @@ -12,9 +11,9 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use lightning::util::persist::{ - KVStore, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + KVStore, PageToken, PaginatedKVStore, PaginatedListResponse, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, }; use lightning::{io, log_error}; use tokio::sync::Mutex as TokioMutex; @@ -163,6 +162,21 @@ impl KVStore for TierStore { } } +impl PaginatedKVStore for TierStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + async move { + inner.list_paginated_internal(primary_namespace, secondary_namespace, page_token).await + } + } +} + struct TierStoreInner { /// The authoritative store for durable data. primary_store: Arc, @@ -493,6 +507,47 @@ impl TierStoreInner { } } + async fn list_paginated_internal( + &self, primary_namespace: String, secondary_namespace: String, + page_token: Option, + ) -> io::Result { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list_paginated", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We don't retry ephemeral-store lists here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + return PaginatedKVStore::list_paginated( + eph_store.as_ref(), + &primary_namespace, + &secondary_namespace, + page_token, + ) + .await; + } + }, + _ => {}, + } + + PaginatedKVStore::list_paginated( + self.primary_store.as_ref(), + &primary_namespace, + &secondary_namespace, + page_token, + ) + .await + } + fn handle_primary_backup_results( &self, op: &str, primary_namespace: &str, secondary_namespace: &str, key: &str, primary_res: io::Result<()>, backup_res: io::Result<()>, @@ -560,6 +615,8 @@ mod tests { use lightning::util::persist::{ CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning_persister::fs_store::v2::FilesystemStoreV2; @@ -670,6 +727,85 @@ mod tests { assert_eq!(primary_read_cm.unwrap(), data); } + #[tokio::test] + async fn list_paginated_routes_to_selected_tier() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("primary")).unwrap())); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let ephemeral_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("ephemeral")).unwrap())); + tier.set_ephemeral_store(Arc::clone(&ephemeral_store)); + + tier.write( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + "monitor-key", + vec![1u8; 32], + ) + .await + .unwrap(); + + // This decoy uses the same namespace but the opposite physical store, so it + // would show up if paginated listing routed to the wrong tier. + ephemeral_store + .write( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + "ephemeral-decoy", + vec![2u8; 32], + ) + .await + .unwrap(); + + // Same decoy check in the other direction: this key should be ignored + // because network graph listings route to the ephemeral store when set. + primary_store + .write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + "primary-decoy", + vec![3u8; 32], + ) + .await + .unwrap(); + + tier.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + vec![4u8; 32], + ) + .await + .unwrap(); + + let primary_response = PaginatedKVStore::list_paginated( + &tier, + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + None, + ) + .await + .unwrap(); + assert_eq!(primary_response.keys, vec!["monitor-key".to_string()]); + + let ephemeral_response = PaginatedKVStore::list_paginated( + &tier, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + None, + ) + .await + .unwrap(); + assert_eq!(ephemeral_response.keys, vec![NETWORK_GRAPH_PERSISTENCE_KEY.to_string()]); + } + #[tokio::test] async fn primary_backed_writes_preserve_latest_call_order() { let base_dir = random_storage_path(); diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 1f5753e55..a84f62bbb 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -52,7 +52,7 @@ use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore, PaginatedListResponse}; use lightning_invoice::{Bolt11InvoiceDescription, Description}; -use lightning_persister::fs_store::v1::FilesystemStore; +use lightning_persister::fs_store::v2::FilesystemStoreV2; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use logging::TestLogWriter; use rand::distr::Alphanumeric; @@ -1720,7 +1720,7 @@ impl PaginatedKVStore for TestSyncStore { struct TestSyncStoreInner { serializer: tokio::sync::RwLock<()>, test_store: InMemoryStore, - fs_store: FilesystemStore, + fs_store: FilesystemStoreV2, sqlite_store: SqliteStore, } @@ -1729,7 +1729,7 @@ impl TestSyncStoreInner { let serializer = tokio::sync::RwLock::new(()); let mut fs_dir = dest_dir.clone(); fs_dir.push("fs_store"); - let fs_store = FilesystemStore::new(fs_dir); + let fs_store = FilesystemStoreV2::new(fs_dir).unwrap(); let mut sql_dir = dest_dir.clone(); sql_dir.push("sqlite_store"); let sqlite_store = SqliteStore::new( diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 521cb74ca..ea622b67f 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -31,6 +31,8 @@ use electrsd::corepc_node::Node as BitcoinD; use electrsd::ElectrsD; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; +#[cfg(not(feature = "uniffi"))] +use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::liquidity::LSPS2ServiceConfig; use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, @@ -40,6 +42,8 @@ use ldk_node::{Builder, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; +#[cfg(not(feature = "uniffi"))] +use lightning::util::persist::KVStore; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; @@ -3356,3 +3360,73 @@ async fn do_lsps2_multi_lsp_picks_cheapest(reverse_order: bool) { cheap.stop().unwrap(); expensive.stop().unwrap(); } + +// Builder backup-store configuration is not yet exposed via FFI (see #871) +#[cfg(not(feature = "uniffi"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn builder_configures_sqlite_backup_store() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + let primary_dir = config_a.node_config.storage_dir_path.clone(); + let backup_dir = common::random_storage_path(); + + // Build node_a with backup storage configured + setup_builder!(builder_a, config_a.node_config.clone()); + builder_a.set_chain_source_esplora( + format!("http://{}", electrsd.esplora_url.as_ref().unwrap()), + None, + ); + builder_a.set_filesystem_logger(None, None); + builder_a.set_backup_storage_dir_path(backup_dir.to_str().unwrap().to_owned()); + + let node_a = builder_a.build(config_a.node_entropy.into()).unwrap(); + node_a.start().unwrap(); + assert!(node_a.status().is_running); + assert!(node_a.status().latest_fee_rate_cache_update_timestamp.is_some()); + + let config_b = random_config(true); + let node_b = setup_node(&chain_source, config_b); + + do_channel_full_cycle( + node_a, + node_b, + &bitcoind.client, + &electrsd.client, + false, + true, + true, + false, + ) + .await; + + let primary_store = SqliteStore::new( + primary_dir.into(), + Some(ldk_node::io::sqlite_store::SQLITE_DB_FILE_NAME.to_string()), + Some(ldk_node::io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .unwrap(); + + let backup_store = SqliteStore::new( + backup_dir, + Some(ldk_node::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME.to_string()), + Some(ldk_node::io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .unwrap(); + + for (pn, sn, key) in [ + ("bdk_wallet", "", "descriptor"), + ("bdk_wallet", "", "change_descriptor"), + ("bdk_wallet", "", "network"), + ("", "", "node_metrics"), + ("", "", "events"), + ("", "", "peers"), + ] { + let primary = primary_store.read(pn, sn, key).await.unwrap(); + let backup = backup_store.read(pn, sn, key).await.unwrap(); + + assert_eq!(backup, primary, "backup mismatch for {pn}/{sn}/{key}"); + } +} From 0a240d4fd33acf0f722974b19ebab10a5de8d7b5 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Sat, 27 Jun 2026 09:18:21 +0100 Subject: [PATCH 3/3] fixup! Implement tiered storage Preserve call-time ordering for ephemeral writes and removes by routing them through the same versioned lock path as primary-backed mutations. Add regression coverage for stale ephemeral writes and removes. --- src/io/tier_store.rs | 137 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 117 insertions(+), 20 deletions(-) diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index 294bd8395..047a30477 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -415,16 +415,19 @@ impl TierStoreInner { if is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key) { if let Some(eph_store) = self.ephemeral_store.as_ref() { - let res = KVStore::write( - eph_store.as_ref(), - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - buf, - ) - .await; - self.clean_locks(&lock_ref, locking_key); - return res; + let eph_store = Arc::clone(eph_store); + return self + .execute_locked_write(lock_ref, locking_key, version, || async move { + KVStore::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + }) + .await; } } @@ -453,16 +456,19 @@ impl TierStoreInner { if is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key) { if let Some(eph_store) = self.ephemeral_store.as_ref() { - let res = KVStore::remove( - eph_store.as_ref(), - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - lazy, - ) - .await; - self.clean_locks(&lock_ref, locking_key); - return res; + let eph_store = Arc::clone(eph_store); + return self + .execute_locked_write(lock_ref, locking_key, version, || async move { + KVStore::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + }) + .await; } } @@ -849,6 +855,97 @@ mod tests { assert_eq!(persisted, new_data); } + #[tokio::test] + async fn ephemeral_writes_preserve_latest_call_order() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("primary")).unwrap())); + let mut tier = setup_tier_store(primary_store, logger); + + let ephemeral_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("ephemeral")).unwrap())); + tier.set_ephemeral_store(ephemeral_store); + + let old_data = vec![1u8; 32]; + let new_data = vec![2u8; 32]; + + let old_write = tier.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + old_data, + ); + let new_write = tier.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + new_data.clone(), + ); + + new_write.await.unwrap(); + old_write.await.unwrap(); + + let persisted = tier + .read( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ) + .await + .unwrap(); + assert_eq!(persisted, new_data); + } + + #[tokio::test] + async fn ephemeral_removes_preserve_latest_call_order() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("primary")).unwrap())); + let mut tier = setup_tier_store(primary_store, logger); + + let ephemeral_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStoreV2::new(base_dir.join("ephemeral")).unwrap())); + tier.set_ephemeral_store(ephemeral_store); + + let data = vec![2u8; 32]; + + let stale_remove = tier.remove( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + true, + ); + let new_write = tier.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + data.clone(), + ); + + new_write.await.unwrap(); + stale_remove.await.unwrap(); + + let persisted = tier + .read( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ) + .await + .unwrap(); + assert_eq!(persisted, data); + } + #[tokio::test] async fn backup_write_is_part_of_success_path() { let base_dir = random_storage_path();