From 4cb661819e49e54901ceeb07e0992760c3d79c71 Mon Sep 17 00:00:00 2001 From: Yeji Han Date: Thu, 5 Mar 2026 17:41:48 +0900 Subject: [PATCH 01/10] Add bip157 dependency and CBF builder configuration --- Cargo.toml | 1 + src/builder.rs | 35 +++++++++++++++++- src/chain/cbf.rs | 93 ++++++++++++++++++++++++++++++++++++++++++++++++ src/chain/mod.rs | 78 ++++++++++++++++++++++++++++++++++++++-- src/config.rs | 26 ++++++++++++++ 5 files changed, 230 insertions(+), 3 deletions(-) create mode 100644 src/chain/cbf.rs diff --git a/Cargo.toml b/Cargo.toml index 18947b72f..8e035216f 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} bdk_electrum = { version = "0.23.0", default-features = false, features = ["use-rustls-ring"]} +bip157 = { version = "0.4.2", default-features = false } bdk_wallet = { version = "2.3.0", default-features = false, features = ["std", "keys-bip39"]} bitreq = { version = "0.3", default-features = false, features = ["async-https", "json-using-serde"] } diff --git a/src/builder.rs b/src/builder.rs index 7641a767d..d0319a5df 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -45,7 +45,7 @@ use vss_client::headers::VssHeaderProvider; use crate::chain::ChainSource; use crate::config::{ default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole, - BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, + BitcoindRestClientConfig, CbfSyncConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, }; use crate::connection::ConnectionManager; @@ -105,6 +105,10 @@ enum ChainDataSourceConfig { rpc_password: String, rest_client_config: Option, }, + Cbf { + peers: Vec, + sync_config: Option, + }, } #[derive(Debug, Clone)] @@ -376,6 +380,21 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to source its chain data via BIP 157 compact block + /// filters. + /// + /// `peers` is an optional list of peer addresses to connect to for sourcing compact block + /// filters. If empty, the node will discover peers via DNS seeds. + /// + /// If no `sync_config` is given, default values are used. See [`CbfSyncConfig`] for more + /// information. + pub fn set_chain_source_cbf( + &mut self, peers: Vec, sync_config: Option, + ) -> &mut Self { + self.chain_data_source_config = Some(ChainDataSourceConfig::Cbf { peers, sync_config }); + self + } + /// Configures the [`Node`] instance to source its gossip data from the Lightning peer-to-peer /// network. pub fn set_gossip_source_p2p(&mut self) -> &mut Self { @@ -1255,6 +1274,20 @@ fn build_with_store_internal( }), }, + Some(ChainDataSourceConfig::Cbf { peers, sync_config }) => { + let sync_config = sync_config.clone().unwrap_or(CbfSyncConfig::default()); + ChainSource::new_cbf( + peers.clone(), + sync_config, + Arc::clone(&fee_estimator), + Arc::clone(&tx_broadcaster), + Arc::clone(&kv_store), + Arc::clone(&config), + Arc::clone(&logger), + Arc::clone(&node_metrics), + ) + }, + None => { // Default to Esplora client. let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string(); diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs new file mode 100644 index 000000000..a05a9abc5 --- /dev/null +++ b/src/chain/cbf.rs @@ -0,0 +1,93 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::sync::{Arc, RwLock}; + +use bitcoin::{Script, Transaction, Txid}; +use lightning::chain::WatchedOutput; + +use crate::config::{CbfSyncConfig, Config}; +use crate::fee_estimator::OnchainFeeEstimator; +use crate::logger::{log_error, LdkLogger, Logger}; +use crate::runtime::Runtime; +use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::{Error, NodeMetrics}; + +pub(super) struct CbfChainSource { + /// Peer addresses for sourcing compact block filters via P2P. + peers: Vec, + /// User-provided sync configuration (timeouts, background sync intervals). + pub(super) sync_config: CbfSyncConfig, + /// Shared fee rate estimator, updated by this chain source. + fee_estimator: Arc, + /// Persistent key-value store for node metrics. + kv_store: Arc, + /// Node configuration (network, storage path, etc.). + config: Arc, + /// Logger instance. + logger: Arc, + /// Shared node metrics (sync timestamps, etc.). + node_metrics: Arc>, +} + +impl CbfChainSource { + pub(crate) fn new( + peers: Vec, sync_config: CbfSyncConfig, fee_estimator: Arc, + kv_store: Arc, config: Arc, logger: Arc, + node_metrics: Arc>, + ) -> Self { + Self { peers, sync_config, fee_estimator, kv_store, config, logger, node_metrics } + } + + /// Start the bip157 node and spawn background tasks for event processing. + pub(crate) fn start(&self, _runtime: Arc) { + log_error!(self.logger, "CBF chain source start is not yet implemented."); + } + + /// Shut down the bip157 node and stop all background tasks. + pub(crate) fn stop(&self) { + log_error!(self.logger, "CBF chain source stop is not yet implemented."); + } + + /// Sync the on-chain wallet by scanning compact block filters for relevant transactions. + pub(crate) async fn sync_onchain_wallet( + &self, _onchain_wallet: Arc, + ) -> Result<(), Error> { + log_error!(self.logger, "On-chain wallet sync via CBF is not yet implemented."); + Err(Error::WalletOperationFailed) + } + + /// Sync the Lightning wallet by confirming channel transactions via compact block filters. + pub(crate) async fn sync_lightning_wallet( + &self, _channel_manager: Arc, _chain_monitor: Arc, + _output_sweeper: Arc, + ) -> Result<(), Error> { + log_error!(self.logger, "Lightning wallet sync via CBF is not yet implemented."); + Err(Error::TxSyncFailed) + } + + /// Estimate fee rates from recent block data. + pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { + log_error!(self.logger, "Fee rate estimation via CBF is not yet implemented."); + Err(Error::FeerateEstimationUpdateFailed) + } + + /// Broadcast a package of transactions via the P2P network. + pub(crate) async fn process_broadcast_package(&self, _package: Vec) { + log_error!(self.logger, "Transaction broadcasting via CBF is not yet implemented."); + } + + /// Register a transaction script for Lightning channel monitoring. + pub(crate) fn register_tx(&self, _txid: &Txid, _script_pubkey: &Script) { + log_error!(self.logger, "CBF register_tx is not yet implemented."); + } + + /// Register a watched output script for Lightning channel monitoring. + pub(crate) fn register_output(&self, _output: WatchedOutput) { + log_error!(self.logger, "CBF register_output is not yet implemented."); + } +} diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 49c011a78..3315a1fc1 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -6,6 +6,7 @@ // accordance with one or both of these licenses. pub(crate) mod bitcoind; +mod cbf; mod electrum; mod esplora; @@ -17,11 +18,12 @@ use bitcoin::{Script, Txid}; use lightning::chain::{BestBlock, Filter}; use crate::chain::bitcoind::{BitcoindChainSource, UtxoSourceClient}; +use crate::chain::cbf::CbfChainSource; use crate::chain::electrum::ElectrumChainSource; use crate::chain::esplora::EsploraChainSource; use crate::config::{ - BackgroundSyncConfig, BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, - WALLET_SYNC_INTERVAL_MINIMUM_SECS, + BackgroundSyncConfig, BitcoindRestClientConfig, CbfSyncConfig, Config, ElectrumSyncConfig, + EsploraSyncConfig, WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; use crate::fee_estimator::OnchainFeeEstimator; use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger}; @@ -93,6 +95,7 @@ enum ChainSourceKind { Esplora(EsploraChainSource), Electrum(ElectrumChainSource), Bitcoind(BitcoindChainSource), + Cbf(CbfChainSource), } impl ChainSource { @@ -184,11 +187,33 @@ impl ChainSource { (Self { kind, registered_txids, tx_broadcaster, logger }, best_block) } + pub(crate) fn new_cbf( + peers: Vec, sync_config: CbfSyncConfig, fee_estimator: Arc, + tx_broadcaster: Arc, kv_store: Arc, config: Arc, + logger: Arc, node_metrics: Arc>, + ) -> (Self, Option) { + let cbf_chain_source = CbfChainSource::new( + peers, + sync_config, + fee_estimator, + kv_store, + config, + Arc::clone(&logger), + node_metrics, + ); + let kind = ChainSourceKind::Cbf(cbf_chain_source); + let registered_txids = Mutex::new(Vec::new()); + (Self { kind, registered_txids, tx_broadcaster, logger }, None) + } + pub(crate) fn start(&self, runtime: Arc) -> Result<(), Error> { match &self.kind { ChainSourceKind::Electrum(electrum_chain_source) => { electrum_chain_source.start(runtime)? }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.start(runtime); + }, _ => { // Nothing to do for other chain sources. }, @@ -199,6 +224,9 @@ impl ChainSource { pub(crate) fn stop(&self) { match &self.kind { ChainSourceKind::Electrum(electrum_chain_source) => electrum_chain_source.stop(), + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.stop(); + }, _ => { // Nothing to do for other chain sources. }, @@ -210,6 +238,7 @@ impl ChainSource { ChainSourceKind::Bitcoind(bitcoind_chain_source) => { Some(bitcoind_chain_source.as_utxo_source()) }, + ChainSourceKind::Cbf { .. } => None, _ => None, } } @@ -223,6 +252,7 @@ impl ChainSource { ChainSourceKind::Esplora(_) => true, ChainSourceKind::Electrum { .. } => true, ChainSourceKind::Bitcoind { .. } => false, + ChainSourceKind::Cbf { .. } => false, } } @@ -289,6 +319,28 @@ impl ChainSource { ) .await }, + ChainSourceKind::Cbf(cbf_chain_source) => { + if let Some(background_sync_config) = + cbf_chain_source.sync_config.background_sync_config.as_ref() + { + self.start_tx_based_sync_loop( + stop_sync_receiver, + onchain_wallet, + channel_manager, + chain_monitor, + output_sweeper, + background_sync_config, + Arc::clone(&self.logger), + ) + .await + } else { + log_info!( + self.logger, + "Background syncing is disabled. Manual syncing required for onchain wallet, lightning wallet, and fee rate updates.", + ); + return; + } + }, } } @@ -368,6 +420,9 @@ impl ChainSource { // `ChainPoller`. So nothing to do here. unreachable!("Onchain wallet will be synced via chain polling") }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.sync_onchain_wallet(onchain_wallet).await + }, } } @@ -393,6 +448,11 @@ impl ChainSource { // `ChainPoller`. So nothing to do here. unreachable!("Lightning wallet will be synced via chain polling") }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source + .sync_lightning_wallet(channel_manager, chain_monitor, output_sweeper) + .await + }, } } @@ -421,6 +481,10 @@ impl ChainSource { ) .await }, + ChainSourceKind::Cbf { .. } => { + // In CBF mode we sync wallets via compact block filters. + unreachable!("Listeners will be synced via compact block filter syncing") + }, } } @@ -435,6 +499,9 @@ impl ChainSource { ChainSourceKind::Bitcoind(bitcoind_chain_source) => { bitcoind_chain_source.update_fee_rate_estimates().await }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.update_fee_rate_estimates().await + }, } } @@ -463,6 +530,9 @@ impl ChainSource { ChainSourceKind::Bitcoind(bitcoind_chain_source) => { bitcoind_chain_source.process_broadcast_package(next_package).await }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.process_broadcast_package(next_package).await + }, } } } @@ -481,6 +551,9 @@ impl Filter for ChainSource { electrum_chain_source.register_tx(txid, script_pubkey) }, ChainSourceKind::Bitcoind { .. } => (), + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.register_tx(txid, script_pubkey) + }, } } fn register_output(&self, output: lightning::chain::WatchedOutput) { @@ -492,6 +565,7 @@ impl Filter for ChainSource { electrum_chain_source.register_output(output) }, ChainSourceKind::Bitcoind { .. } => (), + ChainSourceKind::Cbf(cbf_chain_source) => cbf_chain_source.register_output(output), } } } diff --git a/src/config.rs b/src/config.rs index 96a6f49d9..6eaed965a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -478,6 +478,32 @@ impl Default for ElectrumSyncConfig { } } +/// Configuration for syncing via BIP 157 compact block filters. +/// +/// Background syncing is enabled by default, using the default values specified in +/// [`BackgroundSyncConfig`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CbfSyncConfig { + /// Background sync configuration. + /// + /// If set to `None`, background syncing will be disabled. Users will need to manually + /// sync via [`Node::sync_wallets`] for the wallets and fee rate updates. + /// + /// [`Node::sync_wallets`]: crate::Node::sync_wallets + pub background_sync_config: Option, + /// Sync timeouts configuration. + pub timeouts_config: SyncTimeoutsConfig, +} + +impl Default for CbfSyncConfig { + fn default() -> Self { + Self { + background_sync_config: Some(BackgroundSyncConfig::default()), + timeouts_config: SyncTimeoutsConfig::default(), + } + } +} + /// Configuration for syncing with Bitcoin Core backend via REST. #[derive(Debug, Clone)] pub struct BitcoindRestClientConfig { From 81bccf4cfa510c53389ad269ea36b6a5539983dc Mon Sep 17 00:00:00 2001 From: Yeji Han Date: Thu, 5 Mar 2026 17:51:54 +0900 Subject: [PATCH 02/10] Implement CbfChainSource lifecycle and P2P broadcast --- src/chain/cbf.rs | 218 ++++++++++++++++++++++++++++++-- src/chain/mod.rs | 2 +- tests/common/mod.rs | 22 +++- tests/integration_tests_rust.rs | 17 +++ 4 files changed, 248 insertions(+), 11 deletions(-) diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs index a05a9abc5..6be83442e 100644 --- a/src/chain/cbf.rs +++ b/src/chain/cbf.rs @@ -5,14 +5,19 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use std::sync::{Arc, RwLock}; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::Duration; +use bip157::{BlockHash, Builder, Client, Event, Info, Requester, TrustedPeer, Warning}; use bitcoin::{Script, Transaction, Txid}; use lightning::chain::WatchedOutput; +use lightning::util::ser::Writeable; +use tokio::sync::mpsc; use crate::config::{CbfSyncConfig, Config}; use crate::fee_estimator::OnchainFeeEstimator; -use crate::logger::{log_error, LdkLogger, Logger}; +use crate::logger::{log_bytes, log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{Error, NodeMetrics}; @@ -22,6 +27,10 @@ pub(super) struct CbfChainSource { peers: Vec, /// User-provided sync configuration (timeouts, background sync intervals). pub(super) sync_config: CbfSyncConfig, + /// Tracks whether the bip157 node is running and holds the command handle. + cbf_runtime_status: Mutex, + /// Latest chain tip hash, updated by the background event processing task. + latest_tip: Arc>>, /// Shared fee rate estimator, updated by this chain source. fee_estimator: Arc, /// Persistent key-value store for node metrics. @@ -34,23 +43,169 @@ pub(super) struct CbfChainSource { node_metrics: Arc>, } +enum CbfRuntimeStatus { + Started { requester: Requester }, + Stopped, +} + impl CbfChainSource { pub(crate) fn new( peers: Vec, sync_config: CbfSyncConfig, fee_estimator: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, ) -> Self { - Self { peers, sync_config, fee_estimator, kv_store, config, logger, node_metrics } + let cbf_runtime_status = Mutex::new(CbfRuntimeStatus::Stopped); + let latest_tip = Arc::new(Mutex::new(None)); + Self { + peers, + sync_config, + cbf_runtime_status, + latest_tip, + fee_estimator, + kv_store, + config, + logger, + node_metrics, + } } /// Start the bip157 node and spawn background tasks for event processing. - pub(crate) fn start(&self, _runtime: Arc) { - log_error!(self.logger, "CBF chain source start is not yet implemented."); + pub(crate) fn start(&self, runtime: Arc) { + let mut status = self.cbf_runtime_status.lock().unwrap(); + if matches!(*status, CbfRuntimeStatus::Started { .. }) { + debug_assert!(false, "We shouldn't call start if we're already started"); + return; + } + + let network = self.config.network; + + let mut builder = Builder::new(network); + + // Configure data directory under the node's storage path. + let data_dir = std::path::PathBuf::from(&self.config.storage_dir_path).join("bip157_data"); + builder = builder.data_dir(data_dir); + + // Add configured peers. + let peers: Vec = self + .peers + .iter() + .filter_map(|peer_str| { + peer_str.parse::().ok().map(TrustedPeer::from_socket_addr) + }) + .collect(); + if !peers.is_empty() { + builder = builder.add_peers(peers); + } + + // Request witness data so segwit transactions include full witnesses, + // required for Lightning channel operations. + builder = builder.fetch_witness_data(); + + // Increase peer response timeout from the default 5 seconds to avoid + // disconnecting slow peers during block downloads. + builder = builder.response_timeout(Duration::from_secs(30)); + + let (node, client) = builder.build(); + + let Client { requester, info_rx, warn_rx, event_rx } = client; + + // Spawn the bip157 node in the background. + runtime.spawn_background_task(async move { + let _ = node.run().await; + }); + + // Spawn a task to log info messages. + let info_logger = Arc::clone(&self.logger); + runtime + .spawn_cancellable_background_task(Self::process_info_messages(info_rx, info_logger)); + + // Spawn a task to log warning messages. + let warn_logger = Arc::clone(&self.logger); + runtime + .spawn_cancellable_background_task(Self::process_warn_messages(warn_rx, warn_logger)); + + // Spawn a task to process events. + let event_logger = Arc::clone(&self.logger); + let event_tip = Arc::clone(&self.latest_tip); + runtime.spawn_cancellable_background_task(Self::process_events( + event_rx, + event_tip, + event_logger, + )); + + log_info!(self.logger, "CBF chain source started."); + + *status = CbfRuntimeStatus::Started { requester }; } /// Shut down the bip157 node and stop all background tasks. pub(crate) fn stop(&self) { - log_error!(self.logger, "CBF chain source stop is not yet implemented."); + let mut status = self.cbf_runtime_status.lock().unwrap(); + match &*status { + CbfRuntimeStatus::Started { requester } => { + let _ = requester.shutdown(); + log_info!(self.logger, "CBF chain source stopped."); + }, + CbfRuntimeStatus::Stopped => {}, + } + *status = CbfRuntimeStatus::Stopped; + } + + async fn process_info_messages(mut info_rx: mpsc::Receiver, logger: Arc) { + while let Some(info) = info_rx.recv().await { + log_debug!(logger, "CBF node info: {}", info); + } + } + + async fn process_warn_messages( + mut warn_rx: mpsc::UnboundedReceiver, logger: Arc, + ) { + while let Some(warning) = warn_rx.recv().await { + log_debug!(logger, "CBF node warning: {}", warning); + } + } + + async fn process_events( + mut event_rx: mpsc::UnboundedReceiver, latest_tip: Arc>>, + logger: Arc, + ) { + while let Some(event) = event_rx.recv().await { + match event { + Event::FiltersSynced(sync_update) => { + let tip = sync_update.tip(); + *latest_tip.lock().unwrap() = Some(tip.hash); + log_info!( + logger, + "CBF filters synced to tip: height={}, hash={}", + tip.height, + tip.hash, + ); + }, + Event::Block(indexed_block) => { + log_trace!(logger, "CBF received block at height {}", indexed_block.height,); + }, + Event::ChainUpdate(header_changes) => { + log_debug!(logger, "CBF chain update: {:?}", header_changes); + }, + Event::IndexedFilter(indexed_filter) => { + log_trace!(logger, "CBF received filter at height {}", indexed_filter.height(),); + }, + } + } + } + + fn requester(&self) -> Result { + let status = self.cbf_runtime_status.lock().unwrap(); + match &*status { + CbfRuntimeStatus::Started { requester } => Ok(requester.clone()), + CbfRuntimeStatus::Stopped => { + debug_assert!( + false, + "We should have started the chain source before using the requester" + ); + Err(Error::ConnectionFailed) + }, + } } /// Sync the on-chain wallet by scanning compact block filters for relevant transactions. @@ -77,8 +232,55 @@ impl CbfChainSource { } /// Broadcast a package of transactions via the P2P network. - pub(crate) async fn process_broadcast_package(&self, _package: Vec) { - log_error!(self.logger, "Transaction broadcasting via CBF is not yet implemented."); + pub(crate) async fn process_broadcast_package(&self, package: Vec) { + let Ok(requester) = self.requester() else { return }; + + for tx in package { + let txid = tx.compute_txid(); + let tx_bytes = tx.encode(); + let timeout_fut = tokio::time::timeout( + Duration::from_secs(self.sync_config.timeouts_config.tx_broadcast_timeout_secs), + requester.broadcast_tx(tx), + ); + match timeout_fut.await { + Ok(res) => match res { + Ok(wtxid) => { + log_trace!( + self.logger, + "Successfully broadcast transaction {} (wtxid: {})", + txid, + wtxid + ); + }, + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {:?}", + txid, + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx_bytes) + ); + }, + }, + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction due to timeout {}: {}", + txid, + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx_bytes) + ); + }, + } + } } /// Register a transaction script for Lightning channel monitoring. diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 3315a1fc1..9cfbe4abe 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -252,7 +252,7 @@ impl ChainSource { ChainSourceKind::Esplora(_) => true, ChainSourceKind::Electrum { .. } => true, ChainSourceKind::Bitcoind { .. } => false, - ChainSourceKind::Cbf { .. } => false, + ChainSourceKind::Cbf { .. } => true, } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 7854a77f2..a08e480aa 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -26,7 +26,9 @@ use bitcoin::{ use electrsd::corepc_node::{Client as BitcoindClient, Node as BitcoinD}; use electrsd::{corepc_node, ElectrsD}; use electrum_client::ElectrumApi; -use ldk_node::config::{AsyncPaymentsRole, Config, ElectrumSyncConfig, EsploraSyncConfig}; +use ldk_node::config::{ + AsyncPaymentsRole, CbfSyncConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, +}; use ldk_node::entropy::{generate_entropy_mnemonic, NodeEntropy}; use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; @@ -222,6 +224,11 @@ pub(crate) fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) { let mut bitcoind_conf = corepc_node::Conf::default(); bitcoind_conf.network = "regtest"; bitcoind_conf.args.push("-rest"); + + bitcoind_conf.p2p = corepc_node::P2P::Yes; + bitcoind_conf.args.push("-blockfilterindex=1"); + bitcoind_conf.args.push("-peerblockfilters=1"); + let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); let electrs_exe = env::var("ELECTRS_EXE") @@ -326,6 +333,7 @@ pub(crate) enum TestChainSource<'a> { Electrum(&'a ElectrsD), BitcoindRpcSync(&'a BitcoinD), BitcoindRestSync(&'a BitcoinD), + Cbf(&'a BitcoinD), } #[derive(Clone, Copy)] @@ -463,6 +471,13 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> rpc_password, ); }, + TestChainSource::Cbf(bitcoind) => { + let p2p_socket = bitcoind.params.p2p_socket.expect("P2P must be enabled for CBF"); + let peer_addr = format!("{}", p2p_socket); + let sync_config = + CbfSyncConfig { background_sync_config: None, timeouts_config: Default::default() }; + builder.set_chain_source_cbf(vec![peer_addr], Some(sync_config)); + }, } match &config.log_writer { @@ -497,7 +512,10 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> node.start().unwrap(); assert!(node.status().is_running); - assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some()); + + if !matches!(chain_source, TestChainSource::Cbf(_)) { + assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some()); + } node } diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 3fde52dc4..1c43d5a7c 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -2805,3 +2805,20 @@ async fn splice_in_with_all_balance() { node_a.stop().unwrap(); node_b.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn start_stop_cbf() { + let (bitcoind, _electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let node = setup_node(&chain_source, random_config(true)); + + assert!(node.status().is_running); + node.stop().unwrap(); + assert_eq!(node.stop(), Err(NodeError::NotRunning)); + + node.start().unwrap(); + assert_eq!(node.start(), Err(NodeError::AlreadyRunning)); + assert!(node.status().is_running); + + node.stop().unwrap(); +} From 8f2fecde2f22e13bbb54394f1d230136567bdb1b Mon Sep 17 00:00:00 2001 From: Yeji Han Date: Sun, 15 Mar 2026 17:57:08 +0900 Subject: [PATCH 03/10] Add coinbase-derived fee estimation --- src/chain/cbf.rs | 85 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 81 insertions(+), 4 deletions(-) diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs index 6be83442e..dfb122208 100644 --- a/src/chain/cbf.rs +++ b/src/chain/cbf.rs @@ -5,9 +5,10 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, Mutex, RwLock}; -use std::time::Duration; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use bip157::{BlockHash, Builder, Client, Event, Info, Requester, TrustedPeer, Warning}; use bitcoin::{Script, Transaction, Txid}; @@ -16,7 +17,10 @@ use lightning::util::ser::Writeable; use tokio::sync::mpsc; use crate::config::{CbfSyncConfig, Config}; -use crate::fee_estimator::OnchainFeeEstimator; +use crate::fee_estimator::{ + apply_post_estimation_adjustments, get_all_conf_targets, OnchainFeeEstimator, +}; +use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; @@ -226,9 +230,70 @@ impl CbfChainSource { } /// Estimate fee rates from recent block data. + // NOTE: This is a single-block fee estimation. A multi-block lookback with + // per-target percentile selection is added later. pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { - log_error!(self.logger, "Fee rate estimation via CBF is not yet implemented."); - Err(Error::FeerateEstimationUpdateFailed) + let requester = self.requester()?; + + let tip_hash = match *self.latest_tip.lock().unwrap() { + Some(hash) => hash, + None => { + log_debug!(self.logger, "No tip available yet for fee rate estimation, skipping."); + return Ok(()); + }, + }; + + let now = Instant::now(); + + let base_fee_rate = tokio::time::timeout( + Duration::from_secs( + self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs, + ), + requester.average_fee_rate(tip_hash), + ) + .await + .map_err(|e| { + log_error!(self.logger, "Updating fee rate estimates timed out: {}", e); + Error::FeerateEstimationUpdateTimeout + })? + .map_err(|e| { + log_error!(self.logger, "Failed to retrieve fee rate estimate: {:?}", e); + Error::FeerateEstimationUpdateFailed + })?; + + let confirmation_targets = get_all_conf_targets(); + let mut new_fee_rate_cache = HashMap::with_capacity(confirmation_targets.len()); + + for target in confirmation_targets { + let adjusted_fee_rate = apply_post_estimation_adjustments(target, base_fee_rate); + new_fee_rate_cache.insert(target, adjusted_fee_rate); + + log_trace!( + self.logger, + "Fee rate estimation updated for {:?}: {} sats/kwu", + target, + adjusted_fee_rate.to_sat_per_kwu(), + ); + } + + self.fee_estimator.set_fee_rate_cache(new_fee_rate_cache); + + log_debug!( + self.logger, + "Fee rate cache update finished in {}ms.", + now.elapsed().as_millis() + ); + + update_node_metrics_timestamp( + &self.node_metrics, + &*self.kv_store, + &*self.logger, + |m, t| { + m.latest_fee_rate_cache_update_timestamp = t; + }, + )?; + + Ok(()) } /// Broadcast a package of transactions via the P2P network. @@ -293,3 +358,15 @@ impl CbfChainSource { log_error!(self.logger, "CBF register_output is not yet implemented."); } } + +/// Record the current timestamp in a `NodeMetrics` field and persist the metrics. +fn update_node_metrics_timestamp( + node_metrics: &RwLock, kv_store: &DynStore, logger: &Logger, + setter: impl FnOnce(&mut NodeMetrics, Option), +) -> Result<(), Error> { + let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + let mut locked = node_metrics.write().unwrap(); + setter(&mut locked, unix_time_secs_opt); + write_node_metrics(&*locked, kv_store, logger)?; + Ok(()) +} From 507bb658404e0ea7498fcbd597a6b8898e9f3bc9 Mon Sep 17 00:00:00 2001 From: Yeji Han Date: Thu, 5 Mar 2026 19:05:56 +0900 Subject: [PATCH 04/10] Implement on-chain wallet sync via compact block filters --- src/chain/cbf.rs | 202 +++++++++++++++++++++++++++++++++++++++++--- src/wallet/mod.rs | 22 +++++ tests/common/mod.rs | 18 ++++ 3 files changed, 229 insertions(+), 13 deletions(-) diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs index dfb122208..e06d797f4 100644 --- a/src/chain/cbf.rs +++ b/src/chain/cbf.rs @@ -5,18 +5,23 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use bip157::{BlockHash, Builder, Client, Event, Info, Requester, TrustedPeer, Warning}; -use bitcoin::{Script, Transaction, Txid}; +use bdk_chain::{BlockId, ConfirmationBlockTime, TxUpdate}; +use bdk_wallet::Update; +use bip157::{ + BlockHash, Builder, Client, Event, Info, Requester, SyncUpdate, TrustedPeer, Warning, +}; +use bitcoin::{Script, ScriptBuf, Transaction, Txid}; use lightning::chain::WatchedOutput; use lightning::util::ser::Writeable; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; -use crate::config::{CbfSyncConfig, Config}; +use super::WalletSyncStatus; +use crate::config::{CbfSyncConfig, Config, BDK_CLIENT_STOP_GAP}; use crate::fee_estimator::{ apply_post_estimation_adjustments, get_all_conf_targets, OnchainFeeEstimator, }; @@ -35,6 +40,14 @@ pub(super) struct CbfChainSource { cbf_runtime_status: Mutex, /// Latest chain tip hash, updated by the background event processing task. latest_tip: Arc>>, + /// Scripts to match against compact block filters during a scan. + watched_scripts: Arc>>, + /// Block (height, hash) pairs where filters matched watched scripts. + matched_block_hashes: Arc>>, + /// One-shot channel sender to signal filter scan completion. + sync_completion_tx: Arc>>>, + /// Deduplicates concurrent on-chain wallet sync requests. + onchain_wallet_sync_status: Mutex, /// Shared fee rate estimator, updated by this chain source. fee_estimator: Arc, /// Persistent key-value store for node metrics. @@ -52,6 +65,14 @@ enum CbfRuntimeStatus { Stopped, } +/// Shared state passed to the background event processing task. +struct CbfEventState { + latest_tip: Arc>>, + watched_scripts: Arc>>, + matched_block_hashes: Arc>>, + sync_completion_tx: Arc>>>, +} + impl CbfChainSource { pub(crate) fn new( peers: Vec, sync_config: CbfSyncConfig, fee_estimator: Arc, @@ -60,11 +81,19 @@ impl CbfChainSource { ) -> Self { let cbf_runtime_status = Mutex::new(CbfRuntimeStatus::Stopped); let latest_tip = Arc::new(Mutex::new(None)); + let watched_scripts = Arc::new(RwLock::new(Vec::new())); + let matched_block_hashes = Arc::new(Mutex::new(Vec::new())); + let sync_completion_tx = Arc::new(Mutex::new(None)); + let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); Self { peers, sync_config, cbf_runtime_status, latest_tip, + watched_scripts, + matched_block_hashes, + sync_completion_tx, + onchain_wallet_sync_status, fee_estimator, kv_store, config, @@ -129,11 +158,16 @@ impl CbfChainSource { .spawn_cancellable_background_task(Self::process_warn_messages(warn_rx, warn_logger)); // Spawn a task to process events. + let event_state = CbfEventState { + latest_tip: Arc::clone(&self.latest_tip), + watched_scripts: Arc::clone(&self.watched_scripts), + matched_block_hashes: Arc::clone(&self.matched_block_hashes), + sync_completion_tx: Arc::clone(&self.sync_completion_tx), + }; let event_logger = Arc::clone(&self.logger); - let event_tip = Arc::clone(&self.latest_tip); runtime.spawn_cancellable_background_task(Self::process_events( event_rx, - event_tip, + event_state, event_logger, )); @@ -170,20 +204,22 @@ impl CbfChainSource { } async fn process_events( - mut event_rx: mpsc::UnboundedReceiver, latest_tip: Arc>>, - logger: Arc, + mut event_rx: mpsc::UnboundedReceiver, state: CbfEventState, logger: Arc, ) { while let Some(event) = event_rx.recv().await { match event { Event::FiltersSynced(sync_update) => { let tip = sync_update.tip(); - *latest_tip.lock().unwrap() = Some(tip.hash); + *state.latest_tip.lock().unwrap() = Some(tip.hash); log_info!( logger, "CBF filters synced to tip: height={}, hash={}", tip.height, tip.hash, ); + if let Some(tx) = state.sync_completion_tx.lock().unwrap().take() { + let _ = tx.send(sync_update); + } }, Event::Block(indexed_block) => { log_trace!(logger, "CBF received block at height {}", indexed_block.height,); @@ -192,6 +228,14 @@ impl CbfChainSource { log_debug!(logger, "CBF chain update: {:?}", header_changes); }, Event::IndexedFilter(indexed_filter) => { + let scripts = state.watched_scripts.read().unwrap(); + if !scripts.is_empty() && indexed_filter.contains_any(scripts.iter()) { + state + .matched_block_hashes + .lock() + .unwrap() + .push((indexed_filter.height(), indexed_filter.block_hash())); + } log_trace!(logger, "CBF received filter at height {}", indexed_filter.height(),); }, } @@ -212,12 +256,144 @@ impl CbfChainSource { } } + /// Run a CBF filter scan: set watched scripts, trigger a rescan, wait for + /// completion, and return the sync update along with matched block hashes. + async fn run_filter_scan( + &self, scripts: Vec, + ) -> Result<(SyncUpdate, Vec<(u32, BlockHash)>), Error> { + let requester = self.requester()?; + + self.matched_block_hashes.lock().unwrap().clear(); + *self.watched_scripts.write().unwrap() = scripts; + + let (tx, rx) = oneshot::channel(); + *self.sync_completion_tx.lock().unwrap() = Some(tx); + + requester.rescan().map_err(|e| { + log_error!(self.logger, "Failed to trigger CBF rescan: {:?}", e); + Error::WalletOperationFailed + })?; + + let sync_update = rx.await.map_err(|e| { + log_error!(self.logger, "CBF sync completion channel dropped: {:?}", e); + Error::WalletOperationFailed + })?; + + self.watched_scripts.write().unwrap().clear(); + let matched = std::mem::take(&mut *self.matched_block_hashes.lock().unwrap()); + + Ok((sync_update, matched)) + } + /// Sync the on-chain wallet by scanning compact block filters for relevant transactions. pub(crate) async fn sync_onchain_wallet( - &self, _onchain_wallet: Arc, + &self, onchain_wallet: Arc, ) -> Result<(), Error> { - log_error!(self.logger, "On-chain wallet sync via CBF is not yet implemented."); - Err(Error::WalletOperationFailed) + let receiver_res = { + let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + if let Some(mut sync_receiver) = receiver_res { + log_debug!(self.logger, "On-chain wallet sync already in progress, waiting."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e); + Error::WalletOperationFailed + })?; + } + + let requester = self.requester()?; + let now = Instant::now(); + + let scripts = onchain_wallet.get_spks_for_cbf_sync(BDK_CLIENT_STOP_GAP); + if scripts.is_empty() { + log_debug!(self.logger, "No wallet scripts to sync via CBF."); + return Ok(()); + } + + let timeout_fut = tokio::time::timeout( + Duration::from_secs(self.sync_config.timeouts_config.onchain_wallet_sync_timeout_secs), + self.sync_onchain_wallet_op(requester, scripts), + ); + + let (tx_update, sync_update) = match timeout_fut.await { + Ok(res) => res?, + Err(e) => { + log_error!(self.logger, "Sync of on-chain wallet timed out: {}", e); + return Err(Error::WalletOperationTimeout); + }, + }; + + // Build chain checkpoint extending from the wallet's current tip. + let mut cp = onchain_wallet.latest_checkpoint(); + for (height, header) in sync_update.recent_history() { + if *height > cp.height() { + let block_id = BlockId { height: *height, hash: header.block_hash() }; + cp = cp.push(block_id).unwrap_or_else(|old| old); + } + } + let tip = sync_update.tip(); + if tip.height > cp.height() { + let tip_block_id = BlockId { height: tip.height, hash: tip.hash }; + cp = cp.push(tip_block_id).unwrap_or_else(|old| old); + } + + let update = Update { last_active_indices: BTreeMap::new(), tx_update, chain: Some(cp) }; + + // Apply update to wallet. + onchain_wallet.apply_update(update)?; + + log_debug!( + self.logger, + "Sync of on-chain wallet via CBF finished in {}ms.", + now.elapsed().as_millis() + ); + + update_node_metrics_timestamp( + &self.node_metrics, + &*self.kv_store, + &*self.logger, + |m, t| { + m.latest_onchain_wallet_sync_timestamp = t; + }, + )?; + + Ok(()) + } + + async fn sync_onchain_wallet_op( + &self, requester: Requester, scripts: Vec, + ) -> Result<(TxUpdate, SyncUpdate), Error> { + let (sync_update, matched) = self.run_filter_scan(scripts).await?; + + log_debug!( + self.logger, + "CBF on-chain filter scan complete: {} matching blocks found.", + matched.len() + ); + + // Fetch matching blocks and include all their transactions. + // The compact block filter already matched our scripts (covering both + // created outputs and spent inputs), so we include every transaction + // from matched blocks and let BDK determine relevance. + let mut tx_update = TxUpdate::default(); + for (height, block_hash) in &matched { + let indexed_block = requester.get_block(*block_hash).await.map_err(|e| { + log_error!(self.logger, "Failed to fetch block {}: {:?}", block_hash, e); + Error::WalletOperationFailed + })?; + let block = indexed_block.block; + let block_id = BlockId { height: *height, hash: block.header.block_hash() }; + let conf_time = + ConfirmationBlockTime { block_id, confirmation_time: block.header.time as u64 }; + for tx in &block.txdata { + let txid = tx.compute_txid(); + tx_update.txs.push(Arc::new(tx.clone())); + tx_update.anchors.insert((conf_time, txid)); + } + } + + Ok((tx_update, sync_update)) } /// Sync the Lightning wallet by confirming channel transactions via compact block filters. diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 0e80a46db..d6c49274b 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -122,6 +122,28 @@ impl Wallet { self.inner.lock().unwrap().start_sync_with_revealed_spks().build() } + pub(crate) fn get_spks_for_cbf_sync(&self, stop_gap: usize) -> Vec { + let wallet = self.inner.lock().unwrap(); + let mut scripts: Vec = + wallet.spk_index().revealed_spks(..).map(|((_, _), spk)| spk).collect(); + + // For first sync when no scripts have been revealed yet, generate + // lookahead scripts up to the stop gap for both keychains. + if scripts.is_empty() { + for keychain in [KeychainKind::External, KeychainKind::Internal] { + for idx in 0..stop_gap as u32 { + scripts.push(wallet.peek_address(keychain, idx).address.script_pubkey()); + } + } + } + + scripts + } + + pub(crate) fn latest_checkpoint(&self) -> bdk_chain::CheckPoint { + self.inner.lock().unwrap().latest_checkpoint() + } + pub(crate) fn get_cached_txs(&self) -> Vec> { self.inner.lock().unwrap().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect() } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a08e480aa..a1d592444 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -603,6 +603,24 @@ pub(crate) async fn wait_for_outpoint_spend(electrs: &E, outpoin .await; } +pub(crate) async fn wait_for_cbf_sync(node: &TestNode) { + let before = node.status().latest_onchain_wallet_sync_timestamp; + let mut delay = Duration::from_millis(200); + for _ in 0..30 { + if node.sync_wallets().is_ok() { + let after = node.status().latest_onchain_wallet_sync_timestamp; + if after > before { + return; + } + } + tokio::time::sleep(delay).await; + if delay < Duration::from_secs(2) { + delay = delay.mul_f32(1.5); + } + } + panic!("wait_for_cbf_sync: timed out waiting for CBF sync to complete"); +} + pub(crate) async fn exponential_backoff_poll(mut poll: F) -> T where F: FnMut() -> Option, From 3dc8eacdaa8db6647cc0c1cc1c454c73dbe93e6a Mon Sep 17 00:00:00 2001 From: Yeji Han Date: Sun, 15 Mar 2026 02:43:01 +0900 Subject: [PATCH 05/10] Add multi-block fee estimation and unit tests --- src/chain/cbf.rs | 322 +++++++++++++++++++++++++++++--- tests/integration_tests_rust.rs | 30 ++- 2 files changed, 330 insertions(+), 22 deletions(-) diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs index e06d797f4..de6b537b9 100644 --- a/src/chain/cbf.rs +++ b/src/chain/cbf.rs @@ -15,7 +15,8 @@ use bdk_wallet::Update; use bip157::{ BlockHash, Builder, Client, Event, Info, Requester, SyncUpdate, TrustedPeer, Warning, }; -use bitcoin::{Script, ScriptBuf, Transaction, Txid}; +use bitcoin::constants::SUBSIDY_HALVING_INTERVAL; +use bitcoin::{Amount, FeeRate, Network, Script, ScriptBuf, Transaction, Txid}; use lightning::chain::WatchedOutput; use lightning::util::ser::Writeable; use tokio::sync::{mpsc, oneshot}; @@ -23,7 +24,8 @@ use tokio::sync::{mpsc, oneshot}; use super::WalletSyncStatus; use crate::config::{CbfSyncConfig, Config, BDK_CLIENT_STOP_GAP}; use crate::fee_estimator::{ - apply_post_estimation_adjustments, get_all_conf_targets, OnchainFeeEstimator, + apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, + OnchainFeeEstimator, }; use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; @@ -31,6 +33,12 @@ use crate::runtime::Runtime; use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{Error, NodeMetrics}; +/// Minimum fee rate: 1 sat/vB = 250 sat/kWU. Used as a floor for computed fee rates. +const MIN_FEERATE_SAT_PER_KWU: u64 = 250; + +/// Number of recent blocks to look back for per-target fee rate estimation. +const FEE_RATE_LOOKBACK_BLOCKS: usize = 6; + pub(super) struct CbfChainSource { /// Peer addresses for sourcing compact block filters via P2P. peers: Vec, @@ -406,8 +414,6 @@ impl CbfChainSource { } /// Estimate fee rates from recent block data. - // NOTE: This is a single-block fee estimation. A multi-block lookback with - // per-target percentile selection is added later. pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { let requester = self.requester()?; @@ -421,26 +427,118 @@ impl CbfChainSource { let now = Instant::now(); - let base_fee_rate = tokio::time::timeout( - Duration::from_secs( - self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs, - ), - requester.average_fee_rate(tip_hash), - ) - .await - .map_err(|e| { - log_error!(self.logger, "Updating fee rate estimates timed out: {}", e); - Error::FeerateEstimationUpdateTimeout - })? - .map_err(|e| { - log_error!(self.logger, "Failed to retrieve fee rate estimate: {:?}", e); - Error::FeerateEstimationUpdateFailed - })?; + // Fetch fee rates from the last N blocks for per-target estimation. + // We compute fee rates ourselves rather than using Requester::average_fee_rate, + // so we can sample multiple blocks and select percentiles per confirmation target. + let mut block_fee_rates: Vec = Vec::with_capacity(FEE_RATE_LOOKBACK_BLOCKS); + let mut current_hash = tip_hash; + + let timeout = Duration::from_secs( + self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs, + ); + let fetch_start = Instant::now(); + + for idx in 0..FEE_RATE_LOOKBACK_BLOCKS { + // Check if we've exceeded the overall timeout for fee estimation. + let remaining_timeout = timeout.saturating_sub(fetch_start.elapsed()); + if remaining_timeout.is_zero() { + log_error!(self.logger, "Updating fee rate estimates timed out."); + return Err(Error::FeerateEstimationUpdateTimeout); + } + + // Fetch the block via P2P. On the first iteration, a fetch failure + // likely means the cached tip is stale (initial sync or reorg), so + // we clear the tip and skip gracefully instead of returning an error. + let indexed_block = + match tokio::time::timeout(remaining_timeout, requester.get_block(current_hash)) + .await + { + Ok(Ok(indexed_block)) => indexed_block, + Ok(Err(e)) if idx == 0 => { + log_debug!( + self.logger, + "Cached CBF tip {} was unavailable during fee estimation, \ + likely due to initial sync or a reorg: {:?}", + current_hash, + e + ); + *self.latest_tip.lock().unwrap() = None; + return Ok(()); + }, + Ok(Err(e)) => { + log_error!( + self.logger, + "Failed to fetch block for fee estimation: {:?}", + e + ); + return Err(Error::FeerateEstimationUpdateFailed); + }, + Err(e) if idx == 0 => { + log_debug!( + self.logger, + "Timed out fetching cached CBF tip {} during fee estimation, \ + likely due to initial sync or a reorg: {}", + current_hash, + e + ); + *self.latest_tip.lock().unwrap() = None; + return Ok(()); + }, + Err(e) => { + log_error!(self.logger, "Updating fee rate estimates timed out: {}", e); + return Err(Error::FeerateEstimationUpdateTimeout); + }, + }; + + let height = indexed_block.height; + let block = &indexed_block.block; + let weight_kwu = block.weight().to_kwu_floor(); + + // Compute fee rate: (coinbase_output - subsidy) / weight. + // For blocks with zero weight (e.g. coinbase-only in regtest), use the floor rate. + let fee_rate_sat_per_kwu = if weight_kwu == 0 { + MIN_FEERATE_SAT_PER_KWU + } else { + let subsidy = block_subsidy(height); + let revenue = block + .txdata + .first() + .map(|tx| tx.output.iter().map(|o| o.value).sum()) + .unwrap_or(Amount::ZERO); + let block_fees = revenue.checked_sub(subsidy).unwrap_or(Amount::ZERO); + + if block_fees == Amount::ZERO && self.config.network == Network::Bitcoin { + log_error!( + self.logger, + "Failed to retrieve fee rate estimates: zero block fees are disallowed on Mainnet.", + ); + return Err(Error::FeerateEstimationUpdateFailed); + } + + (block_fees.to_sat() / weight_kwu).max(MIN_FEERATE_SAT_PER_KWU) + }; + + block_fee_rates.push(fee_rate_sat_per_kwu); + // Walk backwards through the chain via prev_blockhash. + if height == 0 { + break; + } + current_hash = block.header.prev_blockhash; + } + + if block_fee_rates.is_empty() { + log_error!(self.logger, "No blocks available for fee rate estimation."); + return Err(Error::FeerateEstimationUpdateFailed); + } + + block_fee_rates.sort(); let confirmation_targets = get_all_conf_targets(); let mut new_fee_rate_cache = HashMap::with_capacity(confirmation_targets.len()); for target in confirmation_targets { + let num_blocks = get_num_block_defaults_for_target(target); + let base_fee_rate = select_fee_rate_for_target(&block_fee_rates, num_blocks); let adjusted_fee_rate = apply_post_estimation_adjustments(target, base_fee_rate); new_fee_rate_cache.insert(target, adjusted_fee_rate); @@ -456,8 +554,9 @@ impl CbfChainSource { log_debug!( self.logger, - "Fee rate cache update finished in {}ms.", - now.elapsed().as_millis() + "Fee rate cache update finished in {}ms ({} blocks sampled).", + now.elapsed().as_millis(), + block_fee_rates.len(), ); update_node_metrics_timestamp( @@ -546,3 +645,184 @@ fn update_node_metrics_timestamp( write_node_metrics(&*locked, kv_store, logger)?; Ok(()) } + +/// Compute the block subsidy (mining reward before fees) at the given block height. +fn block_subsidy(height: u32) -> Amount { + let halvings = height / SUBSIDY_HALVING_INTERVAL; + if halvings >= 64 { + return Amount::ZERO; + } + let base = Amount::ONE_BTC.to_sat() * 50; + Amount::from_sat(base >> halvings) +} + +/// Select a fee rate from sorted block fee rates based on confirmation urgency. +/// +/// For urgent targets (1 block), uses the highest observed fee rate. +/// For medium targets (2-6 blocks), uses the 75th percentile. +/// For standard targets (7-12 blocks), uses the median. +/// For low-urgency targets (13+ blocks), uses the 25th percentile. +fn select_fee_rate_for_target(sorted_rates: &[u64], num_blocks: usize) -> FeeRate { + if sorted_rates.is_empty() { + return FeeRate::from_sat_per_kwu(MIN_FEERATE_SAT_PER_KWU); + } + + let len = sorted_rates.len(); + let idx = if num_blocks <= 1 { + len - 1 + } else if num_blocks <= 6 { + (len * 3) / 4 + } else if num_blocks <= 12 { + len / 2 + } else { + len / 4 + }; + + FeeRate::from_sat_per_kwu(sorted_rates[idx.min(len - 1)]) +} + +#[cfg(test)] +mod tests { + use bitcoin::constants::SUBSIDY_HALVING_INTERVAL; + use bitcoin::{Amount, FeeRate}; + + use super::{block_subsidy, select_fee_rate_for_target, MIN_FEERATE_SAT_PER_KWU}; + use crate::fee_estimator::{ + apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, + }; + + #[test] + fn select_fee_rate_empty_returns_floor() { + let rate = select_fee_rate_for_target(&[], 1); + assert_eq!(rate, FeeRate::from_sat_per_kwu(MIN_FEERATE_SAT_PER_KWU)); + } + + #[test] + fn select_fee_rate_single_element_returns_it_for_all_buckets() { + let rates = [5000u64]; + // Every urgency bucket should return the single available rate. + for num_blocks in [1, 3, 6, 12, 144, 1008] { + let rate = select_fee_rate_for_target(&rates, num_blocks); + assert_eq!( + rate, + FeeRate::from_sat_per_kwu(5000), + "num_blocks={} should return the only available rate", + num_blocks, + ); + } + } + + #[test] + fn select_fee_rate_picks_correct_percentile() { + // 6 sorted rates: indices 0..5 + let rates = [100, 200, 300, 400, 500, 600]; + // 1-block (most urgent): highest → index 5 → 600 + assert_eq!(select_fee_rate_for_target(&rates, 1), FeeRate::from_sat_per_kwu(600)); + // 6-block (medium): 75th percentile → (6*3)/4 = 4 → 500 + assert_eq!(select_fee_rate_for_target(&rates, 6), FeeRate::from_sat_per_kwu(500)); + // 12-block (standard): median → 6/2 = 3 → 400 + assert_eq!(select_fee_rate_for_target(&rates, 12), FeeRate::from_sat_per_kwu(400)); + // 144-block (low): 25th percentile → 6/4 = 1 → 200 + assert_eq!(select_fee_rate_for_target(&rates, 144), FeeRate::from_sat_per_kwu(200)); + } + + #[test] + fn select_fee_rate_monotonic_urgency() { + // More urgent targets should never produce lower rates than less urgent ones. + let rates = [250, 500, 1000, 2000, 4000, 8000]; + let urgent = select_fee_rate_for_target(&rates, 1); + let medium = select_fee_rate_for_target(&rates, 6); + let standard = select_fee_rate_for_target(&rates, 12); + let low = select_fee_rate_for_target(&rates, 144); + + assert!( + urgent >= medium, + "urgent ({}) >= medium ({})", + urgent.to_sat_per_kwu(), + medium.to_sat_per_kwu() + ); + assert!( + medium >= standard, + "medium ({}) >= standard ({})", + medium.to_sat_per_kwu(), + standard.to_sat_per_kwu() + ); + assert!( + standard >= low, + "standard ({}) >= low ({})", + standard.to_sat_per_kwu(), + low.to_sat_per_kwu() + ); + } + + #[test] + fn uniform_rates_match_naive_single_rate() { + // When all blocks have the same fee rate (like the old single-block + // approach), every target should select that same base rate. This + // proves the optimized multi-block approach is backwards-compatible. + + let uniform_rate = 3000u64; + let rates = [uniform_rate; 6]; + for target in get_all_conf_targets() { + let num_blocks = get_num_block_defaults_for_target(target); + let optimized = select_fee_rate_for_target(&rates, num_blocks); + let naive = FeeRate::from_sat_per_kwu(uniform_rate); + assert_eq!( + optimized, naive, + "For target {:?} (num_blocks={}), optimized rate should match naive single-rate", + target, num_blocks, + ); + + // Also verify the post-estimation adjustments produce the same + // result for both approaches. + let adjusted_optimized = apply_post_estimation_adjustments(target, optimized); + let adjusted_naive = apply_post_estimation_adjustments(target, naive); + assert_eq!(adjusted_optimized, adjusted_naive); + } + } + + #[test] + fn block_subsidy_genesis() { + assert_eq!(block_subsidy(0), Amount::from_sat(50 * 100_000_000)); + } + + #[test] + fn block_subsidy_first_halving() { + assert_eq!(block_subsidy(SUBSIDY_HALVING_INTERVAL), Amount::from_sat(25 * 100_000_000)); + } + + #[test] + fn block_subsidy_second_halving() { + assert_eq!(block_subsidy(SUBSIDY_HALVING_INTERVAL * 2), Amount::from_sat(1_250_000_000)); + } + + #[test] + fn block_subsidy_exhausted_after_64_halvings() { + assert_eq!(block_subsidy(SUBSIDY_HALVING_INTERVAL * 64), Amount::ZERO); + assert_eq!(block_subsidy(SUBSIDY_HALVING_INTERVAL * 100), Amount::ZERO); + } + + #[test] + fn select_fee_rate_two_elements() { + let rates = [1000, 5000]; + // 1-block: index 1 (highest) → 5000 + assert_eq!(select_fee_rate_for_target(&rates, 1), FeeRate::from_sat_per_kwu(5000)); + // 6-block: (2*3)/4 = 1 → 5000 + assert_eq!(select_fee_rate_for_target(&rates, 6), FeeRate::from_sat_per_kwu(5000)); + // 12-block: 2/2 = 1 → 5000 + assert_eq!(select_fee_rate_for_target(&rates, 12), FeeRate::from_sat_per_kwu(5000)); + // 144-block: 2/4 = 0 → 1000 + assert_eq!(select_fee_rate_for_target(&rates, 144), FeeRate::from_sat_per_kwu(1000)); + } + + #[test] + fn select_fee_rate_all_targets_use_valid_indices() { + for size in 1..=6 { + let rates: Vec = (1..=size).map(|i| i as u64 * 1000).collect(); + for target in get_all_conf_targets() { + let num_blocks = get_num_block_defaults_for_target(target); + let _ = select_fee_rate_for_target(&rates, num_blocks); + } + } + } +} diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 1c43d5a7c..b3fe9c752 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -24,7 +24,7 @@ use common::{ open_channel, open_channel_push_amt, open_channel_with_all, premine_and_distribute_funds, premine_blocks, prepare_rbf, random_chain_source, random_config, random_listening_addresses, setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, - wait_for_tx, TestChainSource, TestStoreType, TestSyncStore, + wait_for_cbf_sync, wait_for_tx, TestChainSource, TestStoreType, TestSyncStore, }; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; @@ -2822,3 +2822,31 @@ async fn start_stop_cbf() { node.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn fee_rate_estimation_after_manual_sync_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let node = setup_node(&chain_source, random_config(true)); + + let addr = node.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr], + Amount::from_sat(100_000), + ) + .await; + + wait_for_cbf_sync(&node).await; + let first_fee_update = node.status().latest_fee_rate_cache_update_timestamp; + assert!(first_fee_update.is_some()); + + // Subsequent manual syncs should keep the fee cache populated. + node.sync_wallets().unwrap(); + let second_fee_update = node.status().latest_fee_rate_cache_update_timestamp; + assert!(second_fee_update.is_some()); + assert!(second_fee_update >= first_fee_update); + + node.stop().unwrap(); +} From 93c0c69db8504bfd316e5ba604eea9bb2fa54db8 Mon Sep 17 00:00:00 2001 From: Yeji Han Date: Sun, 15 Mar 2026 03:34:59 +0900 Subject: [PATCH 06/10] Add incremental on-chain wallet sync --- src/chain/cbf.rs | 131 ++++++++++++++++++++------------ tests/integration_tests_rust.rs | 27 +++++++ 2 files changed, 109 insertions(+), 49 deletions(-) diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs index de6b537b9..d0f902ace 100644 --- a/src/chain/cbf.rs +++ b/src/chain/cbf.rs @@ -7,6 +7,7 @@ use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -54,6 +55,10 @@ pub(super) struct CbfChainSource { matched_block_hashes: Arc>>, /// One-shot channel sender to signal filter scan completion. sync_completion_tx: Arc>>>, + /// Filters at or below this height are skipped during incremental scans. + filter_skip_height: Arc, + /// Last block height reached by on-chain wallet sync, used for incremental scans. + last_onchain_synced_height: Mutex>, /// Deduplicates concurrent on-chain wallet sync requests. onchain_wallet_sync_status: Mutex, /// Shared fee rate estimator, updated by this chain source. @@ -79,6 +84,7 @@ struct CbfEventState { watched_scripts: Arc>>, matched_block_hashes: Arc>>, sync_completion_tx: Arc>>>, + filter_skip_height: Arc, } impl CbfChainSource { @@ -92,6 +98,8 @@ impl CbfChainSource { let watched_scripts = Arc::new(RwLock::new(Vec::new())); let matched_block_hashes = Arc::new(Mutex::new(Vec::new())); let sync_completion_tx = Arc::new(Mutex::new(None)); + let filter_skip_height = Arc::new(AtomicU32::new(0)); + let last_onchain_synced_height = Mutex::new(None); let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); Self { peers, @@ -101,6 +109,8 @@ impl CbfChainSource { watched_scripts, matched_block_hashes, sync_completion_tx, + filter_skip_height, + last_onchain_synced_height, onchain_wallet_sync_status, fee_estimator, kv_store, @@ -171,6 +181,7 @@ impl CbfChainSource { watched_scripts: Arc::clone(&self.watched_scripts), matched_block_hashes: Arc::clone(&self.matched_block_hashes), sync_completion_tx: Arc::clone(&self.sync_completion_tx), + filter_skip_height: Arc::clone(&self.filter_skip_height), }; let event_logger = Arc::clone(&self.logger); runtime.spawn_cancellable_background_task(Self::process_events( @@ -236,6 +247,10 @@ impl CbfChainSource { log_debug!(logger, "CBF chain update: {:?}", header_changes); }, Event::IndexedFilter(indexed_filter) => { + let skip_height = state.filter_skip_height.load(Ordering::Acquire); + if skip_height > 0 && indexed_filter.height() <= skip_height { + continue; + } let scripts = state.watched_scripts.read().unwrap(); if !scripts.is_empty() && indexed_filter.contains_any(scripts.iter()) { state @@ -266,11 +281,15 @@ impl CbfChainSource { /// Run a CBF filter scan: set watched scripts, trigger a rescan, wait for /// completion, and return the sync update along with matched block hashes. + /// + /// When `skip_before_height` is `Some(h)`, filters at or below height `h` are + /// skipped, making the scan incremental. async fn run_filter_scan( - &self, scripts: Vec, + &self, scripts: Vec, skip_before_height: Option, ) -> Result<(SyncUpdate, Vec<(u32, BlockHash)>), Error> { let requester = self.requester()?; + self.filter_skip_height.store(skip_before_height.unwrap_or(0), Ordering::Release); self.matched_block_hashes.lock().unwrap().clear(); *self.watched_scripts.write().unwrap() = scripts; @@ -287,6 +306,7 @@ impl CbfChainSource { Error::WalletOperationFailed })?; + self.filter_skip_height.store(0, Ordering::Release); self.watched_scripts.write().unwrap().clear(); let matched = std::mem::take(&mut *self.matched_block_hashes.lock().unwrap()); @@ -310,69 +330,79 @@ impl CbfChainSource { })?; } - let requester = self.requester()?; - let now = Instant::now(); + let res = async { + let requester = self.requester()?; + let now = Instant::now(); - let scripts = onchain_wallet.get_spks_for_cbf_sync(BDK_CLIENT_STOP_GAP); - if scripts.is_empty() { - log_debug!(self.logger, "No wallet scripts to sync via CBF."); - return Ok(()); - } + let scripts = onchain_wallet.get_spks_for_cbf_sync(BDK_CLIENT_STOP_GAP); + if scripts.is_empty() { + log_debug!(self.logger, "No wallet scripts to sync via CBF."); + return Ok(()); + } - let timeout_fut = tokio::time::timeout( - Duration::from_secs(self.sync_config.timeouts_config.onchain_wallet_sync_timeout_secs), - self.sync_onchain_wallet_op(requester, scripts), - ); + let timeout_fut = tokio::time::timeout( + Duration::from_secs( + self.sync_config.timeouts_config.onchain_wallet_sync_timeout_secs, + ), + self.sync_onchain_wallet_op(requester, scripts), + ); - let (tx_update, sync_update) = match timeout_fut.await { - Ok(res) => res?, - Err(e) => { - log_error!(self.logger, "Sync of on-chain wallet timed out: {}", e); - return Err(Error::WalletOperationTimeout); - }, - }; + let (tx_update, sync_update) = match timeout_fut.await { + Ok(res) => res?, + Err(e) => { + log_error!(self.logger, "Sync of on-chain wallet timed out: {}", e); + return Err(Error::WalletOperationTimeout); + }, + }; - // Build chain checkpoint extending from the wallet's current tip. - let mut cp = onchain_wallet.latest_checkpoint(); - for (height, header) in sync_update.recent_history() { - if *height > cp.height() { - let block_id = BlockId { height: *height, hash: header.block_hash() }; - cp = cp.push(block_id).unwrap_or_else(|old| old); + // Build chain checkpoint extending from the wallet's current tip. + let mut cp = onchain_wallet.latest_checkpoint(); + for (height, header) in sync_update.recent_history() { + if *height > cp.height() { + let block_id = BlockId { height: *height, hash: header.block_hash() }; + cp = cp.push(block_id).unwrap_or_else(|old| old); + } + } + let tip = sync_update.tip(); + if tip.height > cp.height() { + let tip_block_id = BlockId { height: tip.height, hash: tip.hash }; + cp = cp.push(tip_block_id).unwrap_or_else(|old| old); } - } - let tip = sync_update.tip(); - if tip.height > cp.height() { - let tip_block_id = BlockId { height: tip.height, hash: tip.hash }; - cp = cp.push(tip_block_id).unwrap_or_else(|old| old); - } - let update = Update { last_active_indices: BTreeMap::new(), tx_update, chain: Some(cp) }; + let update = + Update { last_active_indices: BTreeMap::new(), tx_update, chain: Some(cp) }; - // Apply update to wallet. - onchain_wallet.apply_update(update)?; + onchain_wallet.apply_update(update)?; - log_debug!( - self.logger, - "Sync of on-chain wallet via CBF finished in {}ms.", - now.elapsed().as_millis() - ); + log_debug!( + self.logger, + "Sync of on-chain wallet via CBF finished in {}ms.", + now.elapsed().as_millis() + ); - update_node_metrics_timestamp( - &self.node_metrics, - &*self.kv_store, - &*self.logger, - |m, t| { - m.latest_onchain_wallet_sync_timestamp = t; - }, - )?; + update_node_metrics_timestamp( + &self.node_metrics, + &*self.kv_store, + &*self.logger, + |m, t| { + m.latest_onchain_wallet_sync_timestamp = t; + }, + )?; - Ok(()) + Ok(()) + } + .await; + + self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res } async fn sync_onchain_wallet_op( &self, requester: Requester, scripts: Vec, ) -> Result<(TxUpdate, SyncUpdate), Error> { - let (sync_update, matched) = self.run_filter_scan(scripts).await?; + let skip_height = *self.last_onchain_synced_height.lock().unwrap(); + let (sync_update, matched) = self.run_filter_scan(scripts, skip_height).await?; log_debug!( self.logger, @@ -401,6 +431,9 @@ impl CbfChainSource { } } + let tip = sync_update.tip(); + *self.last_onchain_synced_height.lock().unwrap() = Some(tip.height); + Ok((tx_update, sync_update)) } diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index b3fe9c752..98527e2cf 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -2850,3 +2850,30 @@ async fn fee_rate_estimation_after_manual_sync_cbf() { node.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn repeated_manual_sync_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let node = setup_node(&chain_source, random_config(true)); + + let addr = node.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 100_000; + + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr], + Amount::from_sat(premine_amount_sat), + ) + .await; + + wait_for_cbf_sync(&node).await; + assert_eq!(node.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + // Regression: the second manual sync must not block forever. + node.sync_wallets().unwrap(); + assert_eq!(node.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + node.stop().unwrap(); +} From a2b580bff3817aab378917d7e03849d9026604ca Mon Sep 17 00:00:00 2001 From: Yeji Han Date: Thu, 5 Mar 2026 19:11:22 +0900 Subject: [PATCH 07/10] Implement Lightning wallet sync, reorg handling, and UniFFI bindings for CBF --- bindings/ldk_node.udl | 3 + src/builder.rs | 34 +++++ src/chain/cbf.rs | 279 ++++++++++++++++++++++++++++++++++++++---- src/config.rs | 1 + src/ffi/types.rs | 2 +- 5 files changed, 297 insertions(+), 22 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 3ec2919e7..5bde3e183 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -9,6 +9,8 @@ typedef dictionary EsploraSyncConfig; typedef dictionary ElectrumSyncConfig; +typedef dictionary CbfSyncConfig; + typedef interface NodeEntropy; typedef enum WordCount; @@ -36,6 +38,7 @@ interface Builder { constructor(Config config); void set_chain_source_esplora(string server_url, EsploraSyncConfig? config); void set_chain_source_electrum(string server_url, ElectrumSyncConfig? config); + void set_chain_source_cbf(sequence peers, CbfSyncConfig? sync_config); void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password); void set_chain_source_bitcoind_rest(string rest_host, u16 rest_port, string rpc_host, u16 rpc_port, string rpc_user, string rpc_password); void set_gossip_source_p2p(); diff --git a/src/builder.rs b/src/builder.rs index d0319a5df..6266c5f60 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -824,6 +824,18 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_chain_source_electrum(server_url, sync_config); } + /// Configures the [`Node`] instance to source its chain data via BIP 157 compact block + /// filters. + /// + /// `peers` is an optional list of peer addresses to connect to for sourcing compact block + /// filters. If empty, the node will discover peers via DNS seeds. + /// + /// If no `sync_config` is given, default values are used. See [`CbfSyncConfig`] for more + /// information. + pub fn set_chain_source_cbf(&self, peers: Vec, sync_config: Option) { + self.inner.write().unwrap().set_chain_source_cbf(peers, sync_config); + } + /// Configures the [`Node`] instance to connect to a Bitcoin Core node via RPC. /// /// This method establishes an RPC connection that enables all essential chain operations including @@ -2005,6 +2017,9 @@ pub(crate) fn sanitize_alias(alias_str: &str) -> Result { #[cfg(test)] mod tests { + #[cfg(feature = "uniffi")] + use crate::config::CbfSyncConfig; + use super::{sanitize_alias, BuildError, NodeAlias}; #[test] @@ -2042,4 +2057,23 @@ mod tests { let node = sanitize_alias(alias); assert_eq!(node.err().unwrap(), BuildError::InvalidNodeAlias); } + + #[cfg(feature = "uniffi")] + #[test] + fn arced_builder_can_set_cbf_chain_source() { + let builder = super::ArcedNodeBuilder::new(); + let sync_config = CbfSyncConfig::default(); + + let peers = vec!["127.0.0.1:8333".to_string()]; + builder.set_chain_source_cbf(peers.clone(), Some(sync_config.clone())); + + let guard = builder.inner.read().unwrap(); + assert!(matches!( + guard.chain_data_source_config.as_ref(), + Some(super::ChainDataSourceConfig::Cbf { + peers: p, + sync_config: Some(config), + }) if config == &sync_config && p == &peers + )); + } } diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs index d0f902ace..6a4679778 100644 --- a/src/chain/cbf.rs +++ b/src/chain/cbf.rs @@ -5,25 +5,27 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::net::SocketAddr; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use bdk_chain::{BlockId, ConfirmationBlockTime, TxUpdate}; use bdk_wallet::Update; +use bip157::chain::{BlockHeaderChanges, IndexedHeader}; use bip157::{ BlockHash, Builder, Client, Event, Info, Requester, SyncUpdate, TrustedPeer, Warning, }; use bitcoin::constants::SUBSIDY_HALVING_INTERVAL; use bitcoin::{Amount, FeeRate, Network, Script, ScriptBuf, Transaction, Txid}; -use lightning::chain::WatchedOutput; +use lightning::chain::{Confirm, WatchedOutput}; use lightning::util::ser::Writeable; use tokio::sync::{mpsc, oneshot}; use super::WalletSyncStatus; use crate::config::{CbfSyncConfig, Config, BDK_CLIENT_STOP_GAP}; +use crate::error::Error; use crate::fee_estimator::{ apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, OnchainFeeEstimator, @@ -32,7 +34,7 @@ use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; -use crate::{Error, NodeMetrics}; +use crate::NodeMetrics; /// Minimum fee rate: 1 sat/vB = 250 sat/kWU. Used as a floor for computed fee rates. const MIN_FEERATE_SAT_PER_KWU: u64 = 250; @@ -53,14 +55,26 @@ pub(super) struct CbfChainSource { watched_scripts: Arc>>, /// Block (height, hash) pairs where filters matched watched scripts. matched_block_hashes: Arc>>, + /// Queued chain reorganization events, drained at the start of each lightning sync. + reorg_queue: Arc>>, /// One-shot channel sender to signal filter scan completion. sync_completion_tx: Arc>>>, /// Filters at or below this height are skipped during incremental scans. filter_skip_height: Arc, + /// Serializes concurrent filter scans (on-chain and lightning). + scan_lock: tokio::sync::Mutex<()>, + /// Scripts registered by LDK's Filter trait for lightning channel monitoring. + registered_scripts: Mutex>, + /// Set when new scripts are registered; forces a full rescan on next lightning sync. + lightning_scripts_dirty: AtomicBool, /// Last block height reached by on-chain wallet sync, used for incremental scans. last_onchain_synced_height: Mutex>, + /// Last block height reached by lightning wallet sync, used for incremental scans. + last_lightning_synced_height: Mutex>, /// Deduplicates concurrent on-chain wallet sync requests. onchain_wallet_sync_status: Mutex, + /// Deduplicates concurrent lightning wallet sync requests. + lightning_wallet_sync_status: Mutex, /// Shared fee rate estimator, updated by this chain source. fee_estimator: Arc, /// Persistent key-value store for node metrics. @@ -84,9 +98,18 @@ struct CbfEventState { watched_scripts: Arc>>, matched_block_hashes: Arc>>, sync_completion_tx: Arc>>>, + reorg_queue: Arc>>, filter_skip_height: Arc, } +/// A chain reorganization event queued for processing during the next lightning sync. +struct ReorgEvent { + /// Block hashes that were removed from the chain. + reorganized: Vec, + /// Headers of blocks newly accepted on the winning fork. + accepted: Vec, +} + impl CbfChainSource { pub(crate) fn new( peers: Vec, sync_config: CbfSyncConfig, fee_estimator: Arc, @@ -97,10 +120,16 @@ impl CbfChainSource { let latest_tip = Arc::new(Mutex::new(None)); let watched_scripts = Arc::new(RwLock::new(Vec::new())); let matched_block_hashes = Arc::new(Mutex::new(Vec::new())); + let reorg_queue = Arc::new(Mutex::new(Vec::new())); let sync_completion_tx = Arc::new(Mutex::new(None)); let filter_skip_height = Arc::new(AtomicU32::new(0)); + let registered_scripts = Mutex::new(Vec::new()); + let lightning_scripts_dirty = AtomicBool::new(true); + let scan_lock = tokio::sync::Mutex::new(()); let last_onchain_synced_height = Mutex::new(None); + let last_lightning_synced_height = Mutex::new(None); let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); + let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); Self { peers, sync_config, @@ -108,10 +137,16 @@ impl CbfChainSource { latest_tip, watched_scripts, matched_block_hashes, + reorg_queue, sync_completion_tx, filter_skip_height, + registered_scripts, + lightning_scripts_dirty, + scan_lock, last_onchain_synced_height, + last_lightning_synced_height, onchain_wallet_sync_status, + lightning_wallet_sync_status, fee_estimator, kv_store, config, @@ -181,6 +216,7 @@ impl CbfChainSource { watched_scripts: Arc::clone(&self.watched_scripts), matched_block_hashes: Arc::clone(&self.matched_block_hashes), sync_completion_tx: Arc::clone(&self.sync_completion_tx), + reorg_queue: Arc::clone(&self.reorg_queue), filter_skip_height: Arc::clone(&self.filter_skip_height), }; let event_logger = Arc::clone(&self.logger); @@ -243,8 +279,27 @@ impl CbfChainSource { Event::Block(indexed_block) => { log_trace!(logger, "CBF received block at height {}", indexed_block.height,); }, - Event::ChainUpdate(header_changes) => { - log_debug!(logger, "CBF chain update: {:?}", header_changes); + Event::ChainUpdate(header_changes) => match header_changes { + BlockHeaderChanges::Reorganized { accepted, reorganized } => { + log_debug!( + logger, + "CBF chain reorg detected: {} blocks removed, {} blocks accepted.", + reorganized.len(), + accepted.len(), + ); + let reorg_hashes = reorganized.iter().map(|h| h.block_hash()).collect(); + state + .reorg_queue + .lock() + .unwrap() + .push(ReorgEvent { reorganized: reorg_hashes, accepted }); + }, + BlockHeaderChanges::Connected(header) => { + log_trace!(logger, "CBF block connected at height {}", header.height,); + }, + BlockHeaderChanges::ForkAdded(header) => { + log_trace!(logger, "CBF fork block observed at height {}", header.height,); + }, }, Event::IndexedFilter(indexed_filter) => { let skip_height = state.filter_skip_height.load(Ordering::Acquire); @@ -279,6 +334,18 @@ impl CbfChainSource { } } + /// Register a transaction script for Lightning channel monitoring. + pub(crate) fn register_tx(&self, _txid: &Txid, script_pubkey: &Script) { + self.registered_scripts.lock().unwrap().push(script_pubkey.to_owned()); + self.lightning_scripts_dirty.store(true, Ordering::Release); + } + + /// Register a watched output script for Lightning channel monitoring. + pub(crate) fn register_output(&self, output: WatchedOutput) { + self.registered_scripts.lock().unwrap().push(output.script_pubkey.clone()); + self.lightning_scripts_dirty.store(true, Ordering::Release); + } + /// Run a CBF filter scan: set watched scripts, trigger a rescan, wait for /// completion, and return the sync update along with matched block hashes. /// @@ -289,6 +356,8 @@ impl CbfChainSource { ) -> Result<(SyncUpdate, Vec<(u32, BlockHash)>), Error> { let requester = self.requester()?; + let _scan_guard = self.scan_lock.lock().await; + self.filter_skip_height.store(skip_before_height.unwrap_or(0), Ordering::Release); self.matched_block_hashes.lock().unwrap().clear(); *self.watched_scripts.write().unwrap() = scripts; @@ -439,14 +508,172 @@ impl CbfChainSource { /// Sync the Lightning wallet by confirming channel transactions via compact block filters. pub(crate) async fn sync_lightning_wallet( - &self, _channel_manager: Arc, _chain_monitor: Arc, - _output_sweeper: Arc, + &self, channel_manager: Arc, chain_monitor: Arc, + output_sweeper: Arc, + ) -> Result<(), Error> { + let receiver_res = { + let mut status_lock = self.lightning_wallet_sync_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + if let Some(mut sync_receiver) = receiver_res { + log_debug!(self.logger, "Lightning wallet sync already in progress, waiting."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e); + Error::TxSyncFailed + })?; + } + + let res = async { + let requester = self.requester()?; + let now = Instant::now(); + + let scripts: Vec = self.registered_scripts.lock().unwrap().clone(); + if scripts.is_empty() { + log_debug!(self.logger, "No registered scripts for CBF lightning sync."); + return Ok(()); + } + + let timeout_fut = tokio::time::timeout( + Duration::from_secs( + self.sync_config.timeouts_config.lightning_wallet_sync_timeout_secs, + ), + self.sync_lightning_wallet_op( + requester, + channel_manager, + chain_monitor, + output_sweeper, + scripts, + ), + ); + + match timeout_fut.await { + Ok(res) => res?, + Err(e) => { + log_error!(self.logger, "Sync of Lightning wallet timed out: {}", e); + return Err(Error::TxSyncTimeout); + }, + }; + + log_debug!( + self.logger, + "Sync of Lightning wallet via CBF finished in {}ms.", + now.elapsed().as_millis() + ); + + update_node_metrics_timestamp( + &self.node_metrics, + &*self.kv_store, + &*self.logger, + |m, t| { + m.latest_lightning_wallet_sync_timestamp = t; + }, + )?; + + Ok(()) + } + .await; + + self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + } + + async fn sync_lightning_wallet_op( + &self, requester: Requester, channel_manager: Arc, + chain_monitor: Arc, output_sweeper: Arc, scripts: Vec, ) -> Result<(), Error> { - log_error!(self.logger, "Lightning wallet sync via CBF is not yet implemented."); - Err(Error::TxSyncFailed) + let scripts_dirty = self.lightning_scripts_dirty.load(Ordering::Acquire); + let skip_height = + if scripts_dirty { None } else { *self.last_lightning_synced_height.lock().unwrap() }; + let (sync_update, matched) = self.run_filter_scan(scripts, skip_height).await?; + + log_debug!( + self.logger, + "CBF lightning filter scan complete: {} matching blocks found.", + matched.len() + ); + + let confirmables: Vec<&(dyn Confirm + Sync + Send)> = + vec![&*channel_manager, &*chain_monitor, &*output_sweeper]; + + // Process any queued reorg events before the regular sync. + // A reorg invalidates our last synced height since blocks may have changed. + let pending_reorgs = std::mem::take(&mut *self.reorg_queue.lock().unwrap()); + if !pending_reorgs.is_empty() { + *self.last_lightning_synced_height.lock().unwrap() = None; + } + for reorg in &pending_reorgs { + let reorg_set: HashSet = reorg.reorganized.iter().copied().collect(); + + // Unconfirm txs that were in reorganized blocks. + for confirmable in &confirmables { + for (txid, _height, block_hash_opt) in confirmable.get_relevant_txids() { + if let Some(block_hash) = block_hash_opt { + if reorg_set.contains(&block_hash) { + log_debug!( + self.logger, + "Unconfirming transaction {} due to chain reorg.", + txid, + ); + confirmable.transaction_unconfirmed(&txid); + } + } + } + } + + // Confirm txs in newly accepted blocks (in chain order). + let mut accepted_sorted = reorg.accepted.clone(); + accepted_sorted.sort_by_key(|h| h.height); + for indexed_header in &accepted_sorted { + let block_hash = indexed_header.header.block_hash(); + confirm_block_transactions( + &requester, + block_hash, + indexed_header.height, + &confirmables, + &self.logger, + ) + .await?; + } + + // Update the best block to the last accepted header. + if let Some(last_accepted) = accepted_sorted.last() { + for confirmable in &confirmables { + confirmable.best_block_updated(&last_accepted.header, last_accepted.height); + } + } + } + + // Fetch matching blocks and confirm all their transactions. + // The compact block filter already matched our scripts (covering both + // created outputs and spent inputs), so we confirm every transaction + // from matched blocks and let LDK determine relevance. + for (height, block_hash) in &matched { + confirm_block_transactions( + &requester, + *block_hash, + *height, + &confirmables, + &self.logger, + ) + .await?; + } + + // Update the best block tip. + let tip = sync_update.tip(); + if let Some(tip_header) = sync_update.recent_history().get(&tip.height) { + for confirmable in &confirmables { + confirmable.best_block_updated(tip_header, tip.height); + } + } + + *self.last_lightning_synced_height.lock().unwrap() = Some(tip.height); + self.lightning_scripts_dirty.store(false, Ordering::Release); + + Ok(()) } - /// Estimate fee rates from recent block data. pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { let requester = self.requester()?; @@ -655,16 +882,6 @@ impl CbfChainSource { } } } - - /// Register a transaction script for Lightning channel monitoring. - pub(crate) fn register_tx(&self, _txid: &Txid, _script_pubkey: &Script) { - log_error!(self.logger, "CBF register_tx is not yet implemented."); - } - - /// Register a watched output script for Lightning channel monitoring. - pub(crate) fn register_output(&self, _output: WatchedOutput) { - log_error!(self.logger, "CBF register_output is not yet implemented."); - } } /// Record the current timestamp in a `NodeMetrics` field and persist the metrics. @@ -679,6 +896,26 @@ fn update_node_metrics_timestamp( Ok(()) } +/// Fetch a block by hash and call `transactions_confirmed` on each confirmable. +async fn confirm_block_transactions( + requester: &Requester, block_hash: BlockHash, height: u32, + confirmables: &[&(dyn Confirm + Sync + Send)], logger: &Logger, +) -> Result<(), Error> { + let indexed_block = requester.get_block(block_hash).await.map_err(|e| { + log_error!(logger, "Failed to fetch block {}: {:?}", block_hash, e); + Error::TxSyncFailed + })?; + let block = &indexed_block.block; + let header = &block.header; + let txdata: Vec<(usize, &Transaction)> = block.txdata.iter().enumerate().collect(); + if !txdata.is_empty() { + for confirmable in confirmables { + confirmable.transactions_confirmed(header, &txdata, height); + } + } + Ok(()) +} + /// Compute the block subsidy (mining reward before fees) at the given block height. fn block_subsidy(height: u32) -> Amount { let halvings = height / SUBSIDY_HALVING_INTERVAL; diff --git a/src/config.rs b/src/config.rs index 6eaed965a..06cd9605b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -483,6 +483,7 @@ impl Default for ElectrumSyncConfig { /// Background syncing is enabled by default, using the default values specified in /// [`BackgroundSyncConfig`]. #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] pub struct CbfSyncConfig { /// Background sync configuration. /// diff --git a/src/ffi/types.rs b/src/ffi/types.rs index cc7298cfa..214e9cff6 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -142,7 +142,7 @@ impl VssClientHeaderProvider for VssHeaderProviderAdapter { } use crate::builder::sanitize_alias; -pub use crate::config::{default_config, ElectrumSyncConfig, EsploraSyncConfig}; +pub use crate::config::{default_config, CbfSyncConfig, ElectrumSyncConfig, EsploraSyncConfig}; pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; use crate::error::Error; pub use crate::liquidity::LSPS1OrderStatus; From 7e918fe611c761abef9d80f0c4abe735622ff6d1 Mon Sep 17 00:00:00 2001 From: Yeji Han Date: Sun, 15 Mar 2026 04:42:46 +0900 Subject: [PATCH 08/10] Handle on-chain CBF reorgs in wallet payment tracking --- src/chain/cbf.rs | 277 ++++++++++++++++++++++++++------ src/wallet/mod.rs | 61 ++++++- tests/integration_tests_rust.rs | 107 +++++++++++- 3 files changed, 384 insertions(+), 61 deletions(-) diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs index 6a4679778..a35b0d440 100644 --- a/src/chain/cbf.rs +++ b/src/chain/cbf.rs @@ -21,7 +21,7 @@ use bitcoin::constants::SUBSIDY_HALVING_INTERVAL; use bitcoin::{Amount, FeeRate, Network, Script, ScriptBuf, Transaction, Txid}; use lightning::chain::{Confirm, WatchedOutput}; use lightning::util::ser::Writeable; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use super::WalletSyncStatus; use crate::config::{CbfSyncConfig, Config, BDK_CLIENT_STOP_GAP}; @@ -48,7 +48,8 @@ pub(super) struct CbfChainSource { /// User-provided sync configuration (timeouts, background sync intervals). pub(super) sync_config: CbfSyncConfig, /// Tracks whether the bip157 node is running and holds the command handle. - cbf_runtime_status: Mutex, + /// Shared with the background restart loop so it can update the requester. + cbf_runtime_status: Arc>, /// Latest chain tip hash, updated by the background event processing task. latest_tip: Arc>>, /// Scripts to match against compact block filters during a scan. @@ -57,8 +58,11 @@ pub(super) struct CbfChainSource { matched_block_hashes: Arc>>, /// Queued chain reorganization events, drained at the start of each lightning sync. reorg_queue: Arc>>, - /// One-shot channel sender to signal filter scan completion. - sync_completion_tx: Arc>>>, + /// Channel sender for filter scan completion signals (FiltersSynced events). + sync_update_tx: mpsc::Sender, + /// Channel receiver for filter scan completion; held under a tokio Mutex + /// so it can be used across await points in `run_filter_scan`. + sync_update_rx: tokio::sync::Mutex>, /// Filters at or below this height are skipped during incremental scans. filter_skip_height: Arc, /// Serializes concurrent filter scans (on-chain and lightning). @@ -71,6 +75,11 @@ pub(super) struct CbfChainSource { last_onchain_synced_height: Mutex>, /// Last block height reached by lightning wallet sync, used for incremental scans. last_lightning_synced_height: Mutex>, + /// Tracks whether the CBF node has reached FiltersSynced state. + /// Set to `false` on Reorganized events (node goes Behind), + /// `true` on FiltersSynced events. Used by `run_filter_scan` to + /// decide whether to wait for a natural sync or call `rescan()`. + is_node_synced: Arc, /// Deduplicates concurrent on-chain wallet sync requests. onchain_wallet_sync_status: Mutex, /// Deduplicates concurrent lightning wallet sync requests. @@ -97,9 +106,10 @@ struct CbfEventState { latest_tip: Arc>>, watched_scripts: Arc>>, matched_block_hashes: Arc>>, - sync_completion_tx: Arc>>>, + sync_update_tx: mpsc::Sender, reorg_queue: Arc>>, filter_skip_height: Arc, + is_node_synced: Arc, } /// A chain reorganization event queued for processing during the next lightning sync. @@ -116,13 +126,15 @@ impl CbfChainSource { kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, ) -> Self { - let cbf_runtime_status = Mutex::new(CbfRuntimeStatus::Stopped); + let cbf_runtime_status = Arc::new(Mutex::new(CbfRuntimeStatus::Stopped)); let latest_tip = Arc::new(Mutex::new(None)); let watched_scripts = Arc::new(RwLock::new(Vec::new())); let matched_block_hashes = Arc::new(Mutex::new(Vec::new())); let reorg_queue = Arc::new(Mutex::new(Vec::new())); - let sync_completion_tx = Arc::new(Mutex::new(None)); + let (sync_update_tx, sync_update_rx) = mpsc::channel(64); + let sync_update_rx = tokio::sync::Mutex::new(sync_update_rx); let filter_skip_height = Arc::new(AtomicU32::new(0)); + let is_node_synced = Arc::new(AtomicBool::new(false)); let registered_scripts = Mutex::new(Vec::new()); let lightning_scripts_dirty = AtomicBool::new(true); let scan_lock = tokio::sync::Mutex::new(()); @@ -138,8 +150,10 @@ impl CbfChainSource { watched_scripts, matched_block_hashes, reorg_queue, - sync_completion_tx, + sync_update_tx, + sync_update_rx, filter_skip_height, + is_node_synced, registered_scripts, lightning_scripts_dirty, scan_lock, @@ -156,6 +170,11 @@ impl CbfChainSource { } /// Start the bip157 node and spawn background tasks for event processing. + /// + /// The node is wrapped in a restart loop: if it exits unexpectedly (e.g. + /// `NoReachablePeers` after a peer disconnect), it is rebuilt and + /// restarted automatically. A normal shutdown via [`Self::stop`] causes + /// the loop to exit. pub(crate) fn start(&self, runtime: Arc) { let mut status = self.cbf_runtime_status.lock().unwrap(); if matches!(*status, CbfRuntimeStatus::Started { .. }) { @@ -164,14 +183,7 @@ impl CbfChainSource { } let network = self.config.network; - - let mut builder = Builder::new(network); - - // Configure data directory under the node's storage path. let data_dir = std::path::PathBuf::from(&self.config.storage_dir_path).join("bip157_data"); - builder = builder.data_dir(data_dir); - - // Add configured peers. let peers: Vec = self .peers .iter() @@ -179,45 +191,125 @@ impl CbfChainSource { peer_str.parse::().ok().map(TrustedPeer::from_socket_addr) }) .collect(); - if !peers.is_empty() { - builder = builder.add_peers(peers); - } - // Request witness data so segwit transactions include full witnesses, - // required for Lightning channel operations. - builder = builder.fetch_witness_data(); + // Build the first node and grab its requester. + let (node, client) = Self::build_cbf_node(network, &data_dir, &peers); + let Client { requester, info_rx, warn_rx, event_rx } = client; + *status = CbfRuntimeStatus::Started { requester }; + drop(status); - // Increase peer response timeout from the default 5 seconds to avoid - // disconnecting slow peers during block downloads. - builder = builder.response_timeout(Duration::from_secs(30)); + log_info!(self.logger, "CBF chain source started."); - let (node, client) = builder.build(); + // Spawn event processing tasks for this node instance. + self.spawn_event_tasks(&runtime, info_rx, warn_rx, event_rx); - let Client { requester, info_rx, warn_rx, event_rx } = client; + // Spawn the node with automatic restart on unexpected exit. + let shared_status = Arc::clone(&self.cbf_runtime_status); + let shared_latest_tip = Arc::clone(&self.latest_tip); + let shared_watched_scripts = Arc::clone(&self.watched_scripts); + let shared_matched_block_hashes = Arc::clone(&self.matched_block_hashes); + let shared_sync_update_tx = self.sync_update_tx.clone(); + let shared_reorg_queue = Arc::clone(&self.reorg_queue); + let shared_filter_skip_height = Arc::clone(&self.filter_skip_height); + let shared_is_node_synced = Arc::clone(&self.is_node_synced); + let node_logger = Arc::clone(&self.logger); - // Spawn the bip157 node in the background. runtime.spawn_background_task(async move { - let _ = node.run().await; + // Run the first node instance. + let run_result = node.run().await; + + // Restart loop: only re-enters on unexpected exits. + let mut result = run_result; + loop { + match result { + Ok(()) => { + // Normal shutdown (via Requester::shutdown), stop. + break; + }, + Err(ref e) => { + log_error!( + node_logger, + "CBF node exited unexpectedly: {:?}. Restarting in 1s.", + e + ); + + // Mark as not synced so run_filter_scan waits for + // the rebuilt node to finish its initial sync. + shared_is_node_synced.store(false, Ordering::Release); + + // Clear the requester so callers know the node is + // temporarily unavailable. + *shared_status.lock().unwrap() = CbfRuntimeStatus::Stopped; + + tokio::time::sleep(Duration::from_secs(1)).await; + + let (new_node, new_client) = + Self::build_cbf_node(network, &data_dir, &peers); + let Client { requester, info_rx, warn_rx, event_rx } = new_client; + *shared_status.lock().unwrap() = CbfRuntimeStatus::Started { requester }; + + // Spawn new event processing tasks for the rebuilt node. + // Old tasks exit naturally when their channels close. + let event_state = CbfEventState { + latest_tip: Arc::clone(&shared_latest_tip), + watched_scripts: Arc::clone(&shared_watched_scripts), + matched_block_hashes: Arc::clone(&shared_matched_block_hashes), + sync_update_tx: shared_sync_update_tx.clone(), + reorg_queue: Arc::clone(&shared_reorg_queue), + filter_skip_height: Arc::clone(&shared_filter_skip_height), + is_node_synced: Arc::clone(&shared_is_node_synced), + }; + let el = Arc::clone(&node_logger); + tokio::spawn(Self::process_events(event_rx, event_state, el)); + let il = Arc::clone(&node_logger); + tokio::spawn(Self::process_info_messages(info_rx, il)); + let wl = Arc::clone(&node_logger); + tokio::spawn(Self::process_warn_messages(warn_rx, wl)); + + log_info!(node_logger, "CBF node restarted."); + + result = new_node.run().await; + }, + } + } }); + } - // Spawn a task to log info messages. + /// Build a bip157 node and client from configuration parameters. + fn build_cbf_node( + network: Network, data_dir: &std::path::Path, peers: &[TrustedPeer], + ) -> (bip157::Node, Client) { + let mut builder = Builder::new(network); + builder = builder.data_dir(data_dir.to_path_buf()); + if !peers.is_empty() { + builder = builder.add_peers(peers.to_vec()); + } + builder = builder.fetch_witness_data(); + builder = builder.response_timeout(Duration::from_secs(30)); + builder.build() + } + + /// Spawn event processing tasks for a given node instance. + fn spawn_event_tasks( + &self, runtime: &Arc, info_rx: mpsc::Receiver, + warn_rx: mpsc::UnboundedReceiver, event_rx: mpsc::UnboundedReceiver, + ) { let info_logger = Arc::clone(&self.logger); runtime .spawn_cancellable_background_task(Self::process_info_messages(info_rx, info_logger)); - // Spawn a task to log warning messages. let warn_logger = Arc::clone(&self.logger); runtime .spawn_cancellable_background_task(Self::process_warn_messages(warn_rx, warn_logger)); - // Spawn a task to process events. let event_state = CbfEventState { latest_tip: Arc::clone(&self.latest_tip), watched_scripts: Arc::clone(&self.watched_scripts), matched_block_hashes: Arc::clone(&self.matched_block_hashes), - sync_completion_tx: Arc::clone(&self.sync_completion_tx), + sync_update_tx: self.sync_update_tx.clone(), reorg_queue: Arc::clone(&self.reorg_queue), filter_skip_height: Arc::clone(&self.filter_skip_height), + is_node_synced: Arc::clone(&self.is_node_synced), }; let event_logger = Arc::clone(&self.logger); runtime.spawn_cancellable_background_task(Self::process_events( @@ -225,10 +317,6 @@ impl CbfChainSource { event_state, event_logger, )); - - log_info!(self.logger, "CBF chain source started."); - - *status = CbfRuntimeStatus::Started { requester }; } /// Shut down the bip157 node and stop all background tasks. @@ -264,6 +352,7 @@ impl CbfChainSource { while let Some(event) = event_rx.recv().await { match event { Event::FiltersSynced(sync_update) => { + state.is_node_synced.store(true, Ordering::Release); let tip = sync_update.tip(); *state.latest_tip.lock().unwrap() = Some(tip.hash); log_info!( @@ -272,15 +361,14 @@ impl CbfChainSource { tip.height, tip.hash, ); - if let Some(tx) = state.sync_completion_tx.lock().unwrap().take() { - let _ = tx.send(sync_update); - } + let _ = state.sync_update_tx.try_send(sync_update); }, Event::Block(indexed_block) => { log_trace!(logger, "CBF received block at height {}", indexed_block.height,); }, Event::ChainUpdate(header_changes) => match header_changes { BlockHeaderChanges::Reorganized { accepted, reorganized } => { + state.is_node_synced.store(false, Ordering::Release); log_debug!( logger, "CBF chain reorg detected: {} blocks removed, {} blocks accepted.", @@ -359,21 +447,61 @@ impl CbfChainSource { let _scan_guard = self.scan_lock.lock().await; self.filter_skip_height.store(skip_before_height.unwrap_or(0), Ordering::Release); - self.matched_block_hashes.lock().unwrap().clear(); *self.watched_scripts.write().unwrap() = scripts; - let (tx, rx) = oneshot::channel(); - *self.sync_completion_tx.lock().unwrap() = Some(tx); + // If the CBF node is not in FiltersSynced state (e.g. initial sync + // or re-syncing after a reorg), wait for it to reach that state + // before calling rescan(). rescan() is a no-op in Behind or + // HeadersSynced states, which would cause us to hang forever + // waiting for a FiltersSynced event that never comes. + if !self.is_node_synced.load(Ordering::Acquire) { + log_debug!( + self.logger, + "CBF node not yet synced, waiting for FiltersSynced before rescan.", + ); + let mut rx = self.sync_update_rx.lock().await; + match tokio::time::timeout(Duration::from_secs(60), rx.recv()).await { + Ok(Some(_)) => { + log_debug!(self.logger, "CBF node reached synced state."); + }, + Ok(None) => { + log_error!(self.logger, "CBF sync update channel closed while waiting."); + return Err(Error::WalletOperationFailed); + }, + Err(_) => { + log_error!(self.logger, "Timed out waiting for CBF node to sync."); + return Err(Error::WalletOperationTimeout); + }, + } + } + + // Drain any buffered FiltersSynced events and clear matches so + // that the upcoming rescan starts from a clean slate. + { + let mut rx = self.sync_update_rx.lock().await; + while rx.try_recv().is_ok() {} + } + self.matched_block_hashes.lock().unwrap().clear(); requester.rescan().map_err(|e| { log_error!(self.logger, "Failed to trigger CBF rescan: {:?}", e); Error::WalletOperationFailed })?; - let sync_update = rx.await.map_err(|e| { - log_error!(self.logger, "CBF sync completion channel dropped: {:?}", e); - Error::WalletOperationFailed - })?; + let sync_update = { + let mut rx = self.sync_update_rx.lock().await; + match tokio::time::timeout(Duration::from_secs(60), rx.recv()).await { + Ok(Some(update)) => update, + Ok(None) => { + log_error!(self.logger, "CBF sync update channel closed."); + return Err(Error::WalletOperationFailed); + }, + Err(_) => { + log_error!(self.logger, "Timed out waiting for CBF rescan to complete."); + return Err(Error::WalletOperationTimeout); + }, + } + }; self.filter_skip_height.store(0, Ordering::Release); self.watched_scripts.write().unwrap().clear(); @@ -403,6 +531,13 @@ impl CbfChainSource { let requester = self.requester()?; let now = Instant::now(); + // If a reorg happened but hasn't been processed yet (e.g. lightning + // sync was skipped because no scripts were registered), reset the + // on-chain sync height so we do a full rescan. + if !self.reorg_queue.lock().unwrap().is_empty() { + *self.last_onchain_synced_height.lock().unwrap() = None; + } + let scripts = onchain_wallet.get_spks_for_cbf_sync(BDK_CLIENT_STOP_GAP); if scripts.is_empty() { log_debug!(self.logger, "No wallet scripts to sync via CBF."); @@ -425,8 +560,35 @@ impl CbfChainSource { }; // Build chain checkpoint extending from the wallet's current tip. + // If the sync update contains a block at a height we already have + // but with a different hash, that means a reorg happened. We walk + // the wallet's checkpoint back to find a common ancestor, then + // push the new blocks on top so BDK can detect and handle the reorg. let mut cp = onchain_wallet.latest_checkpoint(); - for (height, header) in sync_update.recent_history() { + let recent = sync_update.recent_history(); + + // Detect reorg: find the lowest height where hashes disagree. + let mut fork_height: Option = None; + for check in cp.iter() { + if let Some(new_header) = recent.get(&check.height()) { + if new_header.block_hash() != check.hash() { + fork_height = Some(check.height()); + } + } + } + + // Roll back the checkpoint to below the fork point. + if let Some(fh) = fork_height { + while cp.height() >= fh { + if let Some(prev) = cp.prev() { + cp = prev; + } else { + break; + } + } + } + + for (height, header) in recent { if *height > cp.height() { let block_id = BlockId { height: *height, hash: header.block_hash() }; cp = cp.push(block_id).unwrap_or_else(|old| old); @@ -470,8 +632,11 @@ impl CbfChainSource { async fn sync_onchain_wallet_op( &self, requester: Requester, scripts: Vec, ) -> Result<(TxUpdate, SyncUpdate), Error> { - let skip_height = *self.last_onchain_synced_height.lock().unwrap(); - let (sync_update, matched) = self.run_filter_scan(scripts, skip_height).await?; + // Always do a full scan (skip_height=None) for the on-chain wallet. + // Unlike the Lightning wallet which can rely on reorg_queue events, + // the on-chain wallet needs to see all blocks to correctly detect + // reorgs via checkpoint comparison in the caller. + let (sync_update, matched) = self.run_filter_scan(scripts, None).await?; log_debug!( self.logger, @@ -528,8 +693,17 @@ impl CbfChainSource { let requester = self.requester()?; let now = Instant::now(); + // Process any queued reorg events before checking scripts. + // This must happen even when there are no registered scripts, + // because reorgs also invalidate the on-chain wallet's sync height. + let pending_reorgs = std::mem::take(&mut *self.reorg_queue.lock().unwrap()); + if !pending_reorgs.is_empty() { + *self.last_lightning_synced_height.lock().unwrap() = None; + *self.last_onchain_synced_height.lock().unwrap() = None; + } + let scripts: Vec = self.registered_scripts.lock().unwrap().clone(); - if scripts.is_empty() { + if scripts.is_empty() && pending_reorgs.is_empty() { log_debug!(self.logger, "No registered scripts for CBF lightning sync."); return Ok(()); } @@ -598,10 +772,11 @@ impl CbfChainSource { vec![&*channel_manager, &*chain_monitor, &*output_sweeper]; // Process any queued reorg events before the regular sync. - // A reorg invalidates our last synced height since blocks may have changed. + // A reorg invalidates our last synced heights since blocks may have changed. let pending_reorgs = std::mem::take(&mut *self.reorg_queue.lock().unwrap()); if !pending_reorgs.is_empty() { *self.last_lightning_synced_height.lock().unwrap() = None; + *self.last_onchain_synced_height.lock().unwrap() = None; } for reorg in &pending_reorgs { let reorg_set: HashSet = reorg.reorganized.iter().copied().collect(); diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index d6c49274b..258898528 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -319,14 +319,63 @@ impl Wallet { for mut payment in pending_payments { match payment.details.kind { PaymentKind::Onchain { - status: ConfirmationStatus::Confirmed { height, .. }, - .. + txid, + status: ConfirmationStatus::Confirmed { .. }, } => { + let current_confirmation_status = locked_wallet + .tx_details(txid) + .map(|tx_details| match tx_details.chain_position { + bdk_chain::ChainPosition::Confirmed { anchor, .. } => { + ConfirmationStatus::Confirmed { + block_hash: anchor.block_id.hash, + height: anchor.block_id.height, + timestamp: anchor.confirmation_time, + } + }, + bdk_chain::ChainPosition::Unconfirmed { .. } => { + ConfirmationStatus::Unconfirmed + }, + }); let payment_id = payment.details.id; - if new_tip.height >= height + ANTI_REORG_DELAY - 1 { - payment.details.status = PaymentStatus::Succeeded; - self.payment_store.insert_or_update(payment.details)?; - self.pending_payment_store.remove(&payment_id)?; + match current_confirmation_status { + Some(ConfirmationStatus::Confirmed { + block_hash, + height, + timestamp, + }) => { + payment.details.kind = PaymentKind::Onchain { + txid, + status: ConfirmationStatus::Confirmed { + block_hash, + height, + timestamp, + }, + }; + if new_tip.height >= height + ANTI_REORG_DELAY - 1 { + payment.details.status = PaymentStatus::Succeeded; + self.payment_store.insert_or_update(payment.details)?; + self.pending_payment_store.remove(&payment_id)?; + } else { + self.payment_store + .insert_or_update(payment.details.clone())?; + self.pending_payment_store.insert_or_update(payment)?; + } + }, + Some(ConfirmationStatus::Unconfirmed) | None => { + // Reorg detected: previously confirmed tx is no + // longer confirmed. Revert to unconfirmed/pending. + payment.details.kind = PaymentKind::Onchain { + txid, + status: ConfirmationStatus::Unconfirmed, + }; + payment.details.status = PaymentStatus::Pending; + if payment.details.direction == PaymentDirection::Outbound { + unconfirmed_outbound_txids.push(txid); + } + self.payment_store + .insert_or_update(payment.details.clone())?; + self.pending_payment_store.insert_or_update(payment)?; + }, } }, PaymentKind::Onchain { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 98527e2cf..6dd6b42d6 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -21,10 +21,11 @@ use common::{ expect_channel_pending_event, expect_channel_ready_event, expect_channel_ready_events, expect_event, expect_payment_claimable_event, expect_payment_received_event, expect_payment_successful_event, expect_splice_pending_event, generate_blocks_and_wait, - open_channel, open_channel_push_amt, open_channel_with_all, premine_and_distribute_funds, - premine_blocks, prepare_rbf, random_chain_source, random_config, random_listening_addresses, - setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, - wait_for_cbf_sync, wait_for_tx, TestChainSource, TestStoreType, TestSyncStore, + invalidate_blocks, open_channel, open_channel_push_amt, open_channel_with_all, + premine_and_distribute_funds, premine_blocks, prepare_rbf, random_chain_source, random_config, + random_listening_addresses, setup_bitcoind_and_electrsd, setup_builder, setup_node, + setup_two_nodes, splice_in_with_all, wait_for_block, wait_for_cbf_sync, wait_for_tx, + TestChainSource, TestStoreType, TestSyncStore, }; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; @@ -2877,3 +2878,101 @@ async fn repeated_manual_sync_cbf() { node.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_wallet_sync_cbf_after_reorg() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let node = setup_node(&chain_source, random_config(true)); + + let first_addr = node.onchain_payment().new_address().unwrap(); + let first_amount_sat = 100_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![first_addr], + Amount::from_sat(first_amount_sat), + ) + .await; + + wait_for_cbf_sync(&node).await; + assert_eq!(node.list_balances().spendable_onchain_balance_sats, first_amount_sat); + + // Advance the tip so the reorg happens below our synced checkpoint. + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 2).await; + wait_for_cbf_sync(&node).await; + + // Replace the last two blocks with a different branch that has no wallet activity. + invalidate_blocks(&bitcoind.client, 2); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 2).await; + wait_for_cbf_sync(&node).await; + + let second_addr = node.onchain_payment().new_address().unwrap(); + let second_amount_sat = 50_000; + distribute_funds_unconfirmed( + &bitcoind.client, + &electrsd.client, + vec![second_addr], + Amount::from_sat(second_amount_sat), + ) + .await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + wait_for_cbf_sync(&node).await; + + assert_eq!( + node.list_balances().spendable_onchain_balance_sats, + first_amount_sat + second_amount_sat + ); + assert_eq!( + node.list_payments_with_filter(|p| { + p.direction == PaymentDirection::Inbound + && matches!(p.kind, PaymentKind::Onchain { .. }) + }) + .len(), + 2 + ); + + node.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_wallet_sync_cbf_reorgs_out_confirmed_receive() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let node = setup_node(&chain_source, random_config(true)); + + let addr = node.onchain_payment().new_address().unwrap(); + premine_blocks(&bitcoind.client, &electrsd.client).await; + let cur_height = bitcoind.client.get_blockchain_info().unwrap().blocks as usize; + let reward_block_hash = + bitcoind.client.generate_to_address(1, &addr).unwrap().0.pop().unwrap().parse().unwrap(); + wait_for_block(&electrsd.client, cur_height + 1).await; + let reward_block = bitcoind.client.get_block(reward_block_hash).unwrap(); + let txid = reward_block.txdata[0].compute_txid(); + wait_for_cbf_sync(&node).await; + assert_eq!(node.list_balances().total_onchain_balance_sats, 5_000_000_000); + assert_eq!(node.list_balances().spendable_onchain_balance_sats, 0); + + let payment_id = PaymentId(txid.to_byte_array()); + let payment = node.payment(&payment_id).unwrap(); + assert_eq!(payment.status, PaymentStatus::Pending); + match payment.kind { + PaymentKind::Onchain { status: ConfirmationStatus::Confirmed { .. }, .. } => {}, + other => panic!("Unexpected payment state before reorg: {:?}", other), + } + + invalidate_blocks(&bitcoind.client, 1); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 2).await; + wait_for_cbf_sync(&node).await; + + assert_eq!(node.list_balances().total_onchain_balance_sats, 0); + assert_eq!(node.list_balances().spendable_onchain_balance_sats, 0); + let payment = node.payment(&payment_id).unwrap(); + assert_eq!(payment.status, PaymentStatus::Pending); + match payment.kind { + PaymentKind::Onchain { status: ConfirmationStatus::Unconfirmed, .. } => {}, + other => panic!("Unexpected payment state after reorg: {:?}", other), + } + + node.stop().unwrap(); +} From bb45a1c384538eb3dc9db638bc2ff7ce099d8332 Mon Sep 17 00:00:00 2001 From: Yeji Han Date: Thu, 5 Mar 2026 19:13:22 +0900 Subject: [PATCH 09/10] Add CBF documentation to README --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 0068b6e07..6eb8e66cf 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ fn main() { LDK Node currently comes with a decidedly opinionated set of design choices: - On-chain data is handled by the integrated [BDK][bdk] wallet. -- Chain data may currently be sourced from the Bitcoin Core RPC interface, or from an [Electrum][electrum] or [Esplora][esplora] server. +- Chain data may currently be sourced from the Bitcoin Core RPC interface, from an [Electrum][electrum] or [Esplora][esplora] server, or via [compact block filters (BIP 157)][bip157] when the `cbf` feature is enabled. - Wallet and channel state may be persisted to an [SQLite][sqlite] database, to file system, or to a custom back-end to be implemented by the user. - Gossip data may be sourced via Lightning's peer-to-peer network or the [Rapid Gossip Sync](https://docs.rs/lightning-rapid-gossip-sync/*/lightning_rapid_gossip_sync/) protocol. - Entropy for the Lightning and on-chain wallets may be sourced from raw bytes or a [BIP39](https://github.com/bitcoin/bips/blob/master/bip-0039.mediawiki) mnemonic. In addition, LDK Node offers the means to generate and persist the entropy bytes to disk. @@ -80,6 +80,7 @@ The Minimum Supported Rust Version (MSRV) is currently 1.85.0. [bdk]: https://bitcoindevkit.org/ [electrum]: https://github.com/spesmilo/electrum-protocol [esplora]: https://github.com/Blockstream/esplora +[bip157]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki [sqlite]: https://sqlite.org/ [rust]: https://www.rust-lang.org/ [swift]: https://www.swift.org/ From 3de47771d26be53b91598337a291f3b3942da079 Mon Sep 17 00:00:00 2001 From: Yeji Han Date: Sat, 14 Mar 2026 18:51:26 +0900 Subject: [PATCH 10/10] Add CBF integration tests for restart, recovery, and on-chain send/receive --- .github/workflows/rust.yml | 6 +- tests/integration_tests_rust.rs | 263 ++++++++++++++++++++++++++++++++ 2 files changed, 268 insertions(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 1ccade444..fcda2c83e 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -80,7 +80,11 @@ jobs: - name: Test on Rust ${{ matrix.toolchain }} if: "matrix.platform != 'windows-latest'" run: | - RUSTFLAGS="--cfg no_download --cfg cycle_tests" cargo test + RUSTFLAGS="--cfg no_download --cfg cycle_tests" cargo test -- --skip cbf + - name: Test CBF on Rust ${{ matrix.toolchain }} + if: "matrix.platform != 'windows-latest'" + run: | + RUSTFLAGS="--cfg no_download --cfg cycle_tests" cargo test cbf -- --test-threads=1 - name: Test with UniFFI support on Rust ${{ matrix.toolchain }} if: "matrix.platform != 'windows-latest' && matrix.build-uniffi" run: | diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 6dd6b42d6..69d8ff284 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -2976,3 +2976,266 @@ async fn onchain_wallet_sync_cbf_reorgs_out_confirmed_receive() { node.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn start_stop_reinit_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let config = random_config(true); + + let p2p_socket = bitcoind.params.p2p_socket.expect("P2P must be enabled for CBF"); + let peer_addr = format!("{}", p2p_socket); + let sync_config = ldk_node::config::CbfSyncConfig { + background_sync_config: None, + timeouts_config: Default::default(), + }; + + let test_sync_store = TestSyncStore::new(config.node_config.storage_dir_path.clone().into()); + + setup_builder!(builder, config.node_config); + builder.set_chain_source_cbf(vec![peer_addr.clone()], Some(sync_config.clone())); + + let node = builder + .build_with_store(config.node_entropy.clone().into(), test_sync_store.clone()) + .unwrap(); + node.start().unwrap(); + + let expected_node_id = node.node_id(); + assert_eq!(node.start(), Err(NodeError::AlreadyRunning)); + + let funding_address = node.onchain_payment().new_address().unwrap(); + assert_eq!(node.list_balances().total_onchain_balance_sats, 0); + + let expected_amount = Amount::from_sat(100_000); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![funding_address], + expected_amount, + ) + .await; + + wait_for_cbf_sync(&node).await; + assert_eq!(node.list_balances().spendable_onchain_balance_sats, expected_amount.to_sat()); + + node.stop().unwrap(); + assert_eq!(node.stop(), Err(NodeError::NotRunning)); + + node.start().unwrap(); + assert_eq!(node.start(), Err(NodeError::AlreadyRunning)); + + node.stop().unwrap(); + assert_eq!(node.stop(), Err(NodeError::NotRunning)); + drop(node); + + // Reinitialize from the same config and store. + setup_builder!(builder, config.node_config); + builder.set_chain_source_cbf(vec![peer_addr], Some(sync_config)); + + let reinitialized_node = + builder.build_with_store(config.node_entropy.into(), test_sync_store).unwrap(); + reinitialized_node.start().unwrap(); + assert_eq!(reinitialized_node.node_id(), expected_node_id); + + // Balance should be persisted from the previous run. + assert_eq!( + reinitialized_node.list_balances().spendable_onchain_balance_sats, + expected_amount.to_sat() + ); + + wait_for_cbf_sync(&reinitialized_node).await; + assert_eq!( + reinitialized_node.list_balances().spendable_onchain_balance_sats, + expected_amount.to_sat() + ); + + reinitialized_node.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_wallet_recovery_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + + let original_config = random_config(true); + let original_node_entropy = original_config.node_entropy.clone(); + let original_node = setup_node(&chain_source, original_config); + + let premine_amount_sat = 100_000; + + let addr_1 = original_node.onchain_payment().new_address().unwrap(); + + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_1], + Amount::from_sat(premine_amount_sat), + ) + .await; + + wait_for_cbf_sync(&original_node).await; + assert_eq!(original_node.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + let addr_2 = original_node.onchain_payment().new_address().unwrap(); + + let txid = bitcoind + .client + .send_to_address(&addr_2, Amount::from_sat(premine_amount_sat)) + .unwrap() + .0 + .parse() + .unwrap(); + wait_for_tx(&electrsd.client, txid).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + + wait_for_cbf_sync(&original_node).await; + assert_eq!( + original_node.list_balances().spendable_onchain_balance_sats, + premine_amount_sat * 2 + ); + + original_node.stop().unwrap(); + drop(original_node); + + // Now we start from scratch, only the seed remains the same. + let mut recovered_config = random_config(true); + recovered_config.node_entropy = original_node_entropy; + recovered_config.recovery_mode = true; + let recovered_node = setup_node(&chain_source, recovered_config); + + wait_for_cbf_sync(&recovered_node).await; + assert_eq!( + recovered_node.list_balances().spendable_onchain_balance_sats, + premine_amount_sat * 2 + ); + + // Check we sync even when skipping some addresses. + let _addr_3 = recovered_node.onchain_payment().new_address().unwrap(); + let _addr_4 = recovered_node.onchain_payment().new_address().unwrap(); + let _addr_5 = recovered_node.onchain_payment().new_address().unwrap(); + let addr_6 = recovered_node.onchain_payment().new_address().unwrap(); + + let txid = bitcoind + .client + .send_to_address(&addr_6, Amount::from_sat(premine_amount_sat)) + .unwrap() + .0 + .parse() + .unwrap(); + wait_for_tx(&electrsd.client, txid).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + + wait_for_cbf_sync(&recovered_node).await; + assert_eq!( + recovered_node.list_balances().spendable_onchain_balance_sats, + premine_amount_sat * 3 + ); + + recovered_node.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_send_receive_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + + let premine_amount_sat = 1_100_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a.clone(), addr_b.clone()], + Amount::from_sat(premine_amount_sat), + ) + .await; + + wait_for_cbf_sync(&node_a).await; + node_b.sync_wallets().unwrap(); + assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + assert_eq!(node_b.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + // Check on-chain payment tracking after premine. + let node_a_payments = node_a.list_payments(); + let node_b_payments = node_b.list_payments(); + for payments in [&node_a_payments, &node_b_payments] { + assert_eq!(payments.len(), 1); + } + for p in [node_a_payments.first().unwrap(), node_b_payments.first().unwrap()] { + assert_eq!(p.amount_msat, Some(premine_amount_sat * 1000)); + assert_eq!(p.direction, PaymentDirection::Inbound); + assert_eq!(p.status, PaymentStatus::Pending); + match p.kind { + PaymentKind::Onchain { status, .. } => { + assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); + }, + _ => panic!("Unexpected payment kind"), + } + } + + // Send from B to A. + let amount_to_send_sats = 54_321; + let txid = + node_b.onchain_payment().send_to_address(&addr_a, amount_to_send_sats, None).unwrap(); + wait_for_tx(&electrsd.client, txid).await; + + // Mine the transaction so CBF can see it. + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + wait_for_cbf_sync(&node_a).await; + node_b.sync_wallets().unwrap(); + + let payment_id = PaymentId(txid.to_byte_array()); + let payment_a = node_a.payment(&payment_id).unwrap(); + match payment_a.kind { + PaymentKind::Onchain { txid: tx, status } => { + assert_eq!(tx, txid); + assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); + }, + _ => panic!("Unexpected payment kind"), + } + assert!(payment_a.fee_paid_msat > Some(0)); + assert_eq!(payment_a.amount_msat, Some(amount_to_send_sats * 1000)); + + let payment_b = node_b.payment(&payment_id).unwrap(); + match payment_b.kind { + PaymentKind::Onchain { txid: tx, status } => { + assert_eq!(tx, txid); + assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); + }, + _ => panic!("Unexpected payment kind"), + } + assert!(payment_b.fee_paid_msat > Some(0)); + assert_eq!(payment_b.amount_msat, Some(amount_to_send_sats * 1000)); + assert_eq!(payment_a.fee_paid_msat, payment_b.fee_paid_msat); + + let onchain_fee_buffer_sat = 1000; + let expected_node_a_balance = premine_amount_sat + amount_to_send_sats; + assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, expected_node_a_balance); + assert!( + node_b.list_balances().spendable_onchain_balance_sats + > premine_amount_sat - amount_to_send_sats - onchain_fee_buffer_sat + ); + assert!( + node_b.list_balances().spendable_onchain_balance_sats + < premine_amount_sat - amount_to_send_sats + ); + + // Test send_all_to_address. + let addr_b2 = node_b.onchain_payment().new_address().unwrap(); + let txid = node_a.onchain_payment().send_all_to_address(&addr_b2, false, None).unwrap(); + wait_for_tx(&electrsd.client, txid).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + wait_for_cbf_sync(&node_a).await; + node_b.sync_wallets().unwrap(); + + assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, 0); + assert_eq!(node_a.list_balances().total_onchain_balance_sats, 0); + assert!(node_b.list_balances().spendable_onchain_balance_sats > premine_amount_sat); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +}