From a104e657a29b572aee71f838736b33c00d016e35 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Wed, 28 Jan 2026 08:43:13 -0600 Subject: [PATCH 1/2] prefactor: Refactor read_payments to be generic across other types --- src/io/utils.rs | 62 ++++++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/src/io/utils.rs b/src/io/utils.rs index d2f70377b..5fe74e2ef 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -223,21 +223,17 @@ where }) } -/// Read previously persisted payments information from the store. -pub(crate) async fn read_payments( - kv_store: &DynStore, logger: L, -) -> Result, std::io::Error> +/// Generic helper to read persisted items from a KV store namespace. +async fn read_objects_from_store( + kv_store: &DynStore, logger: L, primary_namespace: &str, secondary_namespace: &str, +) -> Result, std::io::Error> where + T: Readable, L::Target: LdkLogger, { - let mut res = Vec::new(); + let mut stored_keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?; - let mut stored_keys = KVStore::list( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - ) - .await?; + let mut res = Vec::with_capacity(stored_keys.len()); const BATCH_SIZE: usize = 50; @@ -246,52 +242,44 @@ where // Fill JoinSet with tasks if possible while set.len() < BATCH_SIZE && !stored_keys.is_empty() { if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); + let fut = KVStore::read(&*kv_store, primary_namespace, secondary_namespace, &next_key); set.spawn(fut); debug_assert!(set.len() <= BATCH_SIZE); } } + let type_name = std::any::type_name::(); + while let Some(read_res) = set.join_next().await { // Exit early if we get an IO error. let reader = read_res .map_err(|e| { - log_error!(logger, "Failed to read PaymentDetails: {}", e); + log_error!(logger, "Failed to read {type_name}: {e}"); set.abort_all(); e })? .map_err(|e| { - log_error!(logger, "Failed to read PaymentDetails: {}", e); + log_error!(logger, "Failed to read {type_name}: {e}"); set.abort_all(); e })?; // Refill set for every finished future, if we still have something to do. if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); + let fut = KVStore::read(&*kv_store, primary_namespace, secondary_namespace, &next_key); set.spawn(fut); debug_assert!(set.len() <= BATCH_SIZE); } // Handle result. - let payment = PaymentDetails::read(&mut &*reader).map_err(|e| { - log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); + let item = T::read(&mut &*reader).map_err(|e| { + log_error!(logger, "Failed to deserialize {type_name}: {e}"); std::io::Error::new( std::io::ErrorKind::InvalidData, - "Failed to deserialize PaymentDetails", + format!("Failed to deserialize {type_name}"), ) })?; - res.push(payment); + res.push(item); } debug_assert!(set.is_empty()); @@ -300,6 +288,22 @@ where Ok(res) } +/// Read previously persisted payments information from the store. +pub(crate) async fn read_payments( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + /// Read `OutputSweeper` state from the store. pub(crate) async fn read_output_sweeper( broadcaster: Arc, fee_estimator: Arc, From ad559cc041e0c073049ec924c43f44142ff506ce Mon Sep 17 00:00:00 2001 From: benthecarman Date: Thu, 5 Feb 2026 17:42:27 -0600 Subject: [PATCH 2/2] Add forwarded payment tracking and statistics Routing nodes and LSPs want to track forwarded payments so they can run accounting on fees earned and track profitability across time. We now store these to make it easier to track and allows for future accounting utils in the future. To prevent potential privacy footguns, we only store the individual forwarding events for a given time period, and then we aggregate them into a single event for each channel pair, this prevents potential payment correlation. For lightweight nodes we have the `Stats` mode that only tracks the total forwarding stats per channel. Co-Authored-By: Claude Sonnet 4.5 (1M context) --- bindings/ldk_node.udl | 68 ++++ src/builder.rs | 88 +++- src/config.rs | 34 +- src/event.rs | 125 +++++- src/ffi/types.rs | 42 +- src/io/mod.rs | 14 + src/io/utils.rs | 59 ++- src/lib.rs | 534 +++++++++++++++++++++++- src/payment/mod.rs | 4 +- src/payment/store.rs | 920 +++++++++++++++++++++++++++++++++++++++++- src/types.rs | 7 + 11 files changed, 1859 insertions(+), 36 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 6bd031379..0157a2453 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -13,6 +13,7 @@ dictionary Config { u64 probing_liquidity_limit_multiplier; AnchorChannelsConfig? anchor_channels_config; RouteParametersConfig? route_parameters; + ForwardedPaymentTrackingMode forwarded_payment_tracking_mode; }; dictionary AnchorChannelsConfig { @@ -189,6 +190,12 @@ interface Node { void remove_payment([ByRef]PaymentId payment_id); BalanceDetails list_balances(); sequence list_payments(); + ForwardedPaymentDetails? forwarded_payment([ByRef]ForwardedPaymentId forwarded_payment_id); + sequence list_forwarded_payments(); + ForwardedPaymentTrackingMode forwarded_payment_tracking_mode(); + ChannelForwardingStats? channel_forwarding_stats([ByRef]ChannelId channel_id); + sequence list_channel_forwarding_stats(); + sequence list_channel_pair_forwarding_stats(); sequence list_peers(); sequence list_channels(); NetworkGraph network_graph(); @@ -486,6 +493,12 @@ enum PaymentStatus { "Failed", }; +[Enum] +interface ForwardedPaymentTrackingMode { + Detailed(u64 retention_minutes); + Stats(); +}; + dictionary LSPFeeLimits { u64? max_total_opening_fee_msat; u64? max_proportional_opening_fee_ppm_msat; @@ -507,6 +520,55 @@ dictionary PaymentDetails { u64 latest_update_timestamp; }; +dictionary ForwardedPaymentDetails { + ForwardedPaymentId id; + ChannelId prev_channel_id; + ChannelId next_channel_id; + UserChannelId? prev_user_channel_id; + UserChannelId? next_user_channel_id; + PublicKey? prev_node_id; + PublicKey? next_node_id; + u64? total_fee_earned_msat; + u64? skimmed_fee_msat; + boolean claim_from_onchain_tx; + u64? outbound_amount_forwarded_msat; + u64 forwarded_at_timestamp; +}; + +dictionary ChannelForwardingStats { + ChannelId channel_id; + PublicKey? counterparty_node_id; + u64 inbound_payments_forwarded; + u64 outbound_payments_forwarded; + u64 total_inbound_amount_msat; + u64 total_outbound_amount_msat; + u64 total_fee_earned_msat; + u64 total_skimmed_fee_msat; + u64 onchain_claims_count; + u64 first_forwarded_at_timestamp; + u64 last_forwarded_at_timestamp; +}; + +dictionary ChannelPairForwardingStats { + ChannelPairStatsId id; + ChannelId prev_channel_id; + ChannelId next_channel_id; + u64 bucket_start_timestamp; + PublicKey? prev_node_id; + PublicKey? next_node_id; + u64 payment_count; + u64 total_inbound_amount_msat; + u64 total_outbound_amount_msat; + u64 total_fee_earned_msat; + u64 total_skimmed_fee_msat; + u64 onchain_claims_count; + u64 avg_fee_msat; + u64 avg_inbound_amount_msat; + u64 first_forwarded_at_timestamp; + u64 last_forwarded_at_timestamp; + u64 aggregated_at_timestamp; +}; + dictionary RouteParametersConfig { u64? max_total_routing_fee_msat; u32 max_total_cltv_expiry_delta; @@ -894,6 +956,9 @@ typedef string OfferId; [Custom] typedef string PaymentId; +[Custom] +typedef string ForwardedPaymentId; + [Custom] typedef string PaymentHash; @@ -906,6 +971,9 @@ typedef string PaymentSecret; [Custom] typedef string ChannelId; +[Custom] +typedef string ChannelPairStatsId; + [Custom] typedef string UserChannelId; diff --git a/src/builder.rs b/src/builder.rs index 5d8a5a7a9..dc9fd51b3 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -55,13 +55,20 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ - read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, + read_channel_forwarding_stats, read_channel_pair_forwarding_stats, read_event_queue, + read_external_pathfinding_scores_from_cache, read_forwarded_payments, read_network_graph, read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments, read_scorer, write_node_metrics, }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + self, CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -75,9 +82,10 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, - KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, - Persister, SyncAndAsyncKVStore, + AsyncPersister, ChainMonitor, ChannelForwardingStatsStore, ChannelManager, + ChannelPairForwardingStatsStore, DynStore, DynStoreWrapper, ForwardedPaymentStore, GossipSync, + Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, + PendingPaymentStore, Persister, SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1060,14 +1068,23 @@ fn build_with_store_internal( let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (payment_store_res, node_metris_res, pending_payment_store_res) = - runtime.block_on(async move { - tokio::join!( - read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), - read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), - read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)) - ) - }); + let ( + payment_store_res, + forwarded_payment_store_res, + channel_forwarding_stats_res, + channel_pair_forwarding_stats_res, + node_metris_res, + pending_payment_store_res, + ) = runtime.block_on(async move { + tokio::join!( + read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_forwarded_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_channel_forwarding_stats(&*kv_store_ref, Arc::clone(&logger_ref)), + read_channel_pair_forwarding_stats(&*kv_store_ref, Arc::clone(&logger_ref)), + read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), + read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)) + ) + }); // Initialize the status fields. let node_metrics = match node_metris_res { @@ -1096,6 +1113,48 @@ fn build_with_store_internal( }, }; + let forwarded_payment_store = match forwarded_payment_store_res { + Ok(forwarded_payments) => Arc::new(ForwardedPaymentStore::new( + forwarded_payments, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read forwarded payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + + let channel_forwarding_stats_store = match channel_forwarding_stats_res { + Ok(stats) => Arc::new(ChannelForwardingStatsStore::new( + stats, + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel forwarding stats from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + + let channel_pair_forwarding_stats_store = match channel_pair_forwarding_stats_res { + Ok(stats) => Arc::new(ChannelPairForwardingStatsStore::new( + stats, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel pair forwarding stats from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default()); @@ -1782,6 +1841,9 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + forwarded_payment_store, + channel_forwarding_stats_store, + channel_pair_forwarding_stats_store, is_running, node_metrics, om_mailbox, diff --git a/src/config.rs b/src/config.rs index 103b74657..7a0fbb990 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,6 +21,30 @@ use lightning::util::config::{ use crate::logger::LogLevel; +/// The mode used for tracking forwarded payments. +/// +/// This determines how much detail is stored about payment forwarding activity. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum ForwardedPaymentTrackingMode { + /// Store individual forwarded payments for the specified retention period (in minutes), + /// then aggregate into channel-pair statistics. + /// + /// Individual payments older than `retention_minutes` are aggregated by channel pair + /// and removed. Set to 0 for unlimited retention. + Detailed { + /// The retention period for individual forwarded payment records, in minutes. + /// Individual payments older than this period are aggregated into channel-pair statistics and removed. + /// Set to 0 for unlimited retention. + retention_minutes: u64, + }, + /// Track only per-channel aggregate statistics without storing individual payment records. + /// + /// This is the default mode. Use this to reduce storage requirements when you only need + /// aggregate metrics like total fees earned per channel. + #[default] + Stats, +} + // Config defaults const DEFAULT_NETWORK: Network = Network::Bitcoin; const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80; @@ -127,9 +151,10 @@ pub(crate) const HRN_RESOLUTION_TIMEOUT_SECS: u64 = 5; /// | `probing_liquidity_limit_multiplier` | 3 | /// | `log_level` | Debug | /// | `anchor_channels_config` | Some(..) | -/// | `route_parameters` | None | +/// | `route_parameters` | None | +/// | `forwarded_payment_tracking_mode` | Detailed | /// -/// See [`AnchorChannelsConfig`] and [`RouteParametersConfig`] for more information regarding their +/// See [`AnchorChannelsConfig`], [`RouteParametersConfig`], and [`ForwardedPaymentTrackingMode`] for more information regarding their /// respective default values. /// /// [`Node`]: crate::Node @@ -192,6 +217,10 @@ pub struct Config { /// **Note:** If unset, default parameters will be used, and you will be able to override the /// parameters on a per-payment basis in the corresponding method calls. pub route_parameters: Option, + /// The mode used for tracking forwarded payments. + /// + /// See [`ForwardedPaymentTrackingMode`] for more information on the available modes. + pub forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode, } impl Default for Config { @@ -206,6 +235,7 @@ impl Default for Config { anchor_channels_config: Some(AnchorChannelsConfig::default()), route_parameters: None, node_alias: None, + forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode::default(), } } } diff --git a/src/event.rs b/src/event.rs index 6f0ed8e09..28d1dc825 100644 --- a/src/event.rs +++ b/src/event.rs @@ -10,6 +10,7 @@ use core::task::{Poll, Waker}; use std::collections::VecDeque; use std::ops::Deref; use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::PublicKey; @@ -32,7 +33,7 @@ use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use rand::{rng, Rng}; -use crate::config::{may_announce_channel, Config}; +use crate::config::{may_announce_channel, Config, ForwardedPaymentTrackingMode}; use crate::connection::ConnectionManager; use crate::data_store::DataStoreUpdateResult; use crate::fee_estimator::ConfirmationTarget; @@ -45,10 +46,14 @@ use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; use crate::payment::store::{ - PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, + ChannelForwardingStats, ForwardedPaymentDetails, ForwardedPaymentId, PaymentDetails, + PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, }; use crate::runtime::Runtime; -use crate::types::{CustomTlvRecord, DynStore, OnionMessenger, PaymentStore, Sweeper, Wallet}; +use crate::types::{ + ChannelForwardingStatsStore, CustomTlvRecord, DynStore, ForwardedPaymentStore, OnionMessenger, + PaymentStore, Sweeper, Wallet, +}; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, UserChannelId, @@ -487,6 +492,8 @@ where network_graph: Arc, liquidity_source: Option>>>, payment_store: Arc, + forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, peer_store: Arc>, runtime: Arc, logger: L, @@ -506,10 +513,11 @@ where channel_manager: Arc, connection_manager: Arc>, output_sweeper: Arc, network_graph: Arc, liquidity_source: Option>>>, - payment_store: Arc, peer_store: Arc>, - static_invoice_store: Option, onion_messenger: Arc, - om_mailbox: Option>, runtime: Arc, logger: L, - config: Arc, + payment_store: Arc, forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, + peer_store: Arc>, static_invoice_store: Option, + onion_messenger: Arc, om_mailbox: Option>, + runtime: Arc, logger: L, config: Arc, ) -> Self { Self { event_queue, @@ -521,6 +529,8 @@ where network_graph, liquidity_source, payment_store, + forwarded_payment_store, + channel_forwarding_stats_store, peer_store, logger, runtime, @@ -1364,9 +1374,106 @@ where .await; } + let prev_channel_id_value = prev_channel_id + .expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."); + let next_channel_id_value = next_channel_id + .expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."); + + let forwarded_at_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("SystemTime::now() should come after SystemTime::UNIX_EPOCH") + .as_secs(); + + // Calculate inbound amount (outbound + fee) + let inbound_amount_msat = outbound_amount_forwarded_msat + .unwrap_or(0) + .saturating_add(total_fee_earned_msat.unwrap_or(0)); + + // Update per-channel forwarding stats for the inbound channel (prev_channel) + // For new entries, this becomes the initial value; for existing entries, + // these values are used as increments via the to_update() -> update() pattern. + let inbound_stats = ChannelForwardingStats { + channel_id: prev_channel_id_value, + counterparty_node_id: prev_node_id, + inbound_payments_forwarded: 1, + outbound_payments_forwarded: 0, + total_inbound_amount_msat: inbound_amount_msat, + total_outbound_amount_msat: 0, + total_fee_earned_msat: total_fee_earned_msat.unwrap_or(0), + total_skimmed_fee_msat: skimmed_fee_msat.unwrap_or(0), + onchain_claims_count: if claim_from_onchain_tx { 1 } else { 0 }, + first_forwarded_at_timestamp: forwarded_at_timestamp, + last_forwarded_at_timestamp: forwarded_at_timestamp, + }; + self.channel_forwarding_stats_store.insert_or_update(inbound_stats).map_err( + |e| { + log_error!( + self.logger, + "Failed to update inbound channel forwarding stats: {e}" + ); + ReplayEvent() + }, + )?; + + // Update per-channel forwarding stats for the outbound channel (next_channel) + let outbound_stats = ChannelForwardingStats { + channel_id: next_channel_id_value, + counterparty_node_id: next_node_id, + inbound_payments_forwarded: 0, + outbound_payments_forwarded: 1, + total_inbound_amount_msat: 0, + total_outbound_amount_msat: outbound_amount_forwarded_msat.unwrap_or(0), + total_fee_earned_msat: 0, + total_skimmed_fee_msat: 0, + onchain_claims_count: 0, + first_forwarded_at_timestamp: forwarded_at_timestamp, + last_forwarded_at_timestamp: forwarded_at_timestamp, + }; + self.channel_forwarding_stats_store.insert_or_update(outbound_stats).map_err( + |e| { + log_error!( + self.logger, + "Failed to update outbound channel forwarding stats: {e}" + ); + ReplayEvent() + }, + )?; + + // Only store individual forwarded payment details in Detailed mode + match self.config.forwarded_payment_tracking_mode { + ForwardedPaymentTrackingMode::Detailed { .. } => { + // PaymentForwarded does not have a unique id, so we generate a random one here. + let mut id_bytes = [0u8; 32]; + rng().fill(&mut id_bytes); + + let forwarded_payment = ForwardedPaymentDetails { + id: ForwardedPaymentId(id_bytes), + prev_channel_id: prev_channel_id_value, + next_channel_id: next_channel_id_value, + prev_user_channel_id: prev_user_channel_id.map(UserChannelId), + next_user_channel_id: next_user_channel_id.map(UserChannelId), + prev_node_id, + next_node_id, + total_fee_earned_msat, + skimmed_fee_msat, + claim_from_onchain_tx, + outbound_amount_forwarded_msat, + forwarded_at_timestamp, + }; + + self.forwarded_payment_store.insert(forwarded_payment).map_err(|e| { + log_error!(self.logger, "Failed to store forwarded payment: {e}"); + ReplayEvent() + })?; + }, + ForwardedPaymentTrackingMode::Stats => { + // Do not store individual payment details + }, + } + let event = Event::PaymentForwarded { - prev_channel_id: prev_channel_id.expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."), - next_channel_id: next_channel_id.expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."), + prev_channel_id: prev_channel_id_value, + next_channel_id: next_channel_id_value, prev_user_channel_id: prev_user_channel_id.map(UserChannelId), next_user_channel_id: next_user_channel_id.map(UserChannelId), prev_node_id, diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 2a349a967..1e4af49d4 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -54,7 +54,8 @@ pub use crate::graph::{ChannelInfo, ChannelUpdateInfo, NodeAnnouncementInfo, Nod pub use crate::liquidity::{LSPS1OrderStatus, LSPS2ServiceConfig}; pub use crate::logger::{LogLevel, LogRecord, LogWriter}; pub use crate::payment::store::{ - ConfirmationStatus, LSPFeeLimits, PaymentDirection, PaymentKind, PaymentStatus, + ConfirmationStatus, ForwardedPaymentId, LSPFeeLimits, PaymentDirection, PaymentKind, + PaymentStatus, }; pub use crate::payment::UnifiedPaymentResult; use crate::{hex_utils, SocketAddress, UniffiCustomTypeConverter, UserChannelId}; @@ -722,6 +723,24 @@ impl UniffiCustomTypeConverter for PaymentId { } } +impl UniffiCustomTypeConverter for ForwardedPaymentId { + type Builtin = String; + + fn into_custom(val: Self::Builtin) -> uniffi::Result { + if let Some(bytes_vec) = hex_utils::to_vec(&val) { + let bytes_res = bytes_vec.try_into(); + if let Ok(bytes) = bytes_res { + return Ok(ForwardedPaymentId(bytes)); + } + } + Err(Error::InvalidPaymentId.into()) + } + + fn from_custom(obj: Self) -> Self::Builtin { + hex_utils::to_string(&obj.0) + } +} + impl UniffiCustomTypeConverter for PaymentHash { type Builtin = String; @@ -793,6 +812,27 @@ impl UniffiCustomTypeConverter for ChannelId { } } +use crate::payment::store::ChannelPairStatsId; + +impl UniffiCustomTypeConverter for ChannelPairStatsId { + type Builtin = String; + + fn into_custom(val: Self::Builtin) -> uniffi::Result { + if let Some(hex_vec) = hex_utils::to_vec(&val) { + if hex_vec.len() == 72 { + let mut id = [0u8; 72]; + id.copy_from_slice(&hex_vec[..]); + return Ok(Self(id)); + } + } + Err(Error::InvalidChannelId.into()) // Reuse this error for now + } + + fn from_custom(obj: Self) -> Self::Builtin { + hex_utils::to_string(&obj.0) + } +} + impl UniffiCustomTypeConverter for UserChannelId { type Builtin = String; diff --git a/src/io/mod.rs b/src/io/mod.rs index e080d39f7..8bc270d1f 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -27,6 +27,20 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers"; pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments"; pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The forwarded payment information will be persisted under this prefix. +pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "forwarded_payments"; +pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The channel forwarding stats will be persisted under this prefix. +pub(crate) const CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE: &str = + "channel_forwarding_stats"; +pub(crate) const CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The channel pair forwarding stats will be persisted under this prefix. +pub(crate) const CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE: &str = + "channel_pair_forwarding_stats"; +pub(crate) const CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + /// The node metrics will be persisted under this key. pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = ""; pub(crate) const NODE_METRICS_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/io/utils.rs b/src/io/utils.rs index 5fe74e2ef..40bbf6f80 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -42,11 +42,20 @@ use super::*; use crate::chain::ChainSource; use crate::config::WALLET_KEYS_SEED_LEN; use crate::fee_estimator::OnchainFeeEstimator; +use crate::io::{ + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, +}; use crate::io::{ NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, }; use crate::logger::{log_error, LdkLogger, Logger}; -use crate::payment::PendingPaymentDetails; +use crate::payment::{ + ChannelForwardingStats, ChannelPairForwardingStats, ForwardedPaymentDetails, + PendingPaymentDetails, +}; use crate::peer_store::PeerStore; use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper}; use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper}; @@ -304,6 +313,54 @@ where .await } +/// Read previously persisted forwarded payments information from the store. +pub(crate) async fn read_forwarded_payments( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + +/// Read previously persisted channel forwarding stats from the store. +pub(crate) async fn read_channel_forwarding_stats( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + +/// Read previously persisted channel pair forwarding stats from the store. +pub(crate) async fn read_channel_pair_forwarding_stats( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + /// Read `OutputSweeper` state from the store. pub(crate) async fn read_output_sweeper( broadcaster: Arc, fee_estimator: Arc, diff --git a/src/lib.rs b/src/lib.rs index d2222d949..f3aba4bd8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -124,7 +124,8 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ default_user_config, may_announce_channel, AsyncPaymentsRole, ChannelConfig, Config, - NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, + ForwardedPaymentTrackingMode, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, + RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; pub use error::Error as NodeError; @@ -145,6 +146,7 @@ use lightning::ln::channel_state::{ChannelDetails as LdkChannelDetails, ChannelS use lightning::ln::channelmanager::PaymentId; use lightning::ln::funding::SpliceContribution; use lightning::ln::msgs::SocketAddress; +use lightning::ln::types::ChannelId; use lightning::routing::gossip::NodeAlias; use lightning::util::persist::KVStoreSync; use lightning_background_processor::process_events_async; @@ -152,15 +154,18 @@ use liquidity::{LSPS1Liquidity, LiquiditySource}; use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use payment::asynchronous::om_mailbox::OnionMessageMailbox; use payment::asynchronous::static_invoice_store::StaticInvoiceStore; +use payment::store::{aggregate_expired_forwarded_payments, ChannelPairStatsId}; use payment::{ - Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment, - UnifiedPayment, + Bolt11Payment, Bolt12Payment, ChannelForwardingStats, ChannelPairForwardingStats, + ForwardedPaymentDetails, ForwardedPaymentId, OnchainPayment, PaymentDetails, + SpontaneousPayment, UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; use rand::Rng; use runtime::Runtime; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelForwardingStatsStore, + ChannelManager, ChannelPairForwardingStatsStore, DynStore, ForwardedPaymentStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, }; @@ -222,6 +227,9 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc, + forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, + channel_pair_forwarding_stats_store: Arc, is_running: Arc>, node_metrics: Arc>, om_mailbox: Option>, @@ -264,6 +272,33 @@ impl Node { let chain_source = Arc::clone(&self.chain_source); self.runtime.block_on(async move { chain_source.update_fee_rate_estimates().await })?; + // Check for expired forwarded payments and aggregate them on startup + if let ForwardedPaymentTrackingMode::Detailed { retention_minutes } = + self.config.forwarded_payment_tracking_mode + { + if retention_minutes > 0 { + log_info!(self.logger, "Checking for expired forwarded payments..."); + match aggregate_expired_forwarded_payments( + &self.forwarded_payment_store, + &self.channel_pair_forwarding_stats_store, + retention_minutes, + &self.logger, + ) { + Ok((pair_count, payment_count)) => { + if pair_count > 0 { + log_info!( + self.logger, + "Aggregated {payment_count} payments into {pair_count} channel pairs" + ); + } else { + log_info!(self.logger, "No expired forwarded payments to aggregate"); + } + }, + Err(e) => log_error!(self.logger, "Startup aggregation failed: {e}"), + } + } + } + // Spawn background task continuously syncing onchain, lightning, and fee rate cache. let stop_sync_receiver = self.stop_sender.subscribe(); let chain_source = Arc::clone(&self.chain_source); @@ -549,6 +584,54 @@ impl Node { chain_source.continuously_process_broadcast_queue(stop_tx_bcast).await }); + // Spawn background task for periodic forwarded payment aggregation + if let ForwardedPaymentTrackingMode::Detailed { retention_minutes } = + self.config.forwarded_payment_tracking_mode + { + if retention_minutes > 0 { + let stop_aggregation = self.stop_sender.subscribe(); + let forwarded_payment_store = Arc::clone(&self.forwarded_payment_store); + let channel_pair_stats_store = + Arc::clone(&self.channel_pair_forwarding_stats_store); + let logger = Arc::clone(&self.logger); + + self.runtime.spawn_cancellable_background_task(async move { + let mut interval_ticker = + tokio::time::interval(Duration::from_secs(retention_minutes * 60)); + interval_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut stop_aggregation = stop_aggregation; + loop { + tokio::select! { + _ = stop_aggregation.changed() => { + log_trace!(logger, "Stopping forwarded payment aggregation task"); + break; + } + _ = interval_ticker.tick() => { + log_trace!(logger, "Running periodic forwarded payment aggregation"); + match aggregate_expired_forwarded_payments( + &forwarded_payment_store, + &channel_pair_stats_store, + retention_minutes, + &logger, + ) { + Ok((pair_count, payment_count)) if pair_count > 0 => { + log_debug!( + logger, + "Aggregated {} payments into {} channel pairs", + payment_count, + pair_count + ); + }, + Err(e) => log_error!(logger, "Periodic aggregation failed: {}", e), + _ => {}, + } + } + } + } + }); + } + } + let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new( Arc::clone(&self.tx_broadcaster), Arc::new(LdkWallet::new(Arc::clone(&self.wallet), Arc::clone(&self.logger))), @@ -573,6 +656,8 @@ impl Node { Arc::clone(&self.network_graph), self.liquidity_source.clone(), Arc::clone(&self.payment_store), + Arc::clone(&self.forwarded_payment_store), + Arc::clone(&self.channel_forwarding_stats_store), Arc::clone(&self.peer_store), static_invoice_store, Arc::clone(&self.onion_messenger), @@ -1214,7 +1299,7 @@ impl Node { /// /// Returns a [`UserChannelId`] allowing to locally keep track of the channel. /// - /// [`AnchorChannelsConfig::per_channel_reserve_sats`]: crate::config::AnchorChannelsConfig::per_channel_reserve_sats + /// [`AnchorChannelsConfig::per_channel_reserve_sats`]: config::AnchorChannelsConfig::per_channel_reserve_sats pub fn open_channel( &self, node_id: PublicKey, address: SocketAddress, channel_amount_sats: u64, push_to_counterparty_msat: Option, channel_config: Option, @@ -1249,7 +1334,7 @@ impl Node { /// /// Returns a [`UserChannelId`] allowing to locally keep track of the channel. /// - /// [`AnchorChannelsConfig::per_channel_reserve_sats`]: crate::config::AnchorChannelsConfig::per_channel_reserve_sats + /// [`AnchorChannelsConfig::per_channel_reserve_sats`]: config::AnchorChannelsConfig::per_channel_reserve_sats pub fn open_announced_channel( &self, node_id: PublicKey, address: SocketAddress, channel_amount_sats: u64, push_to_counterparty_msat: Option, channel_config: Option, @@ -1470,7 +1555,7 @@ impl Node { /// However, if background syncing is disabled (i.e., `background_sync_config` is set to `None`), /// this method must be called manually to keep wallets in sync with the chain state. /// - /// [`EsploraSyncConfig::background_sync_config`]: crate::config::EsploraSyncConfig::background_sync_config + /// [`EsploraSyncConfig::background_sync_config`]: config::EsploraSyncConfig::background_sync_config pub fn sync_wallets(&self) -> Result<(), Error> { if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); @@ -1526,7 +1611,7 @@ impl Node { /// counterparty to broadcast for us (see [`AnchorChannelsConfig::trusted_peers_no_reserve`] /// for more information). /// - /// [`AnchorChannelsConfig::trusted_peers_no_reserve`]: crate::config::AnchorChannelsConfig::trusted_peers_no_reserve + /// [`AnchorChannelsConfig::trusted_peers_no_reserve`]: config::AnchorChannelsConfig::trusted_peers_no_reserve pub fn force_close_channel( &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, reason: Option, @@ -1692,6 +1777,171 @@ impl Node { self.payment_store.list_filter(|_| true) } + /// Retrieve the details of a specific forwarded payment with the given id. + /// + /// **Note:** the identifier is a randomly generated id and not the payment hash or any other + /// identifier tied to the payment itself. + /// + /// **Note:** Individual forwarded payment records are only stored in + /// [`ForwardedPaymentTrackingMode::Detailed`] mode. In [`ForwardedPaymentTrackingMode::Stats`] + /// mode, this will return an empty vector since individual payment records are not stored. + /// In `Detailed` mode, payments are only stored until they are aggregated into statistics + /// based on the configured retention period. + /// + /// Returns `Some` if the forwarded payment was known and `None` otherwise. + pub fn forwarded_payment( + &self, forwarded_payment_id: &ForwardedPaymentId, + ) -> Option { + self.forwarded_payment_store.get(forwarded_payment_id) + } + + /// Retrieves all forwarded payments that match the given predicate. + /// + /// **Note:** Individual forwarded payment records are only stored in + /// [`ForwardedPaymentTrackingMode::Detailed`] mode. In [`ForwardedPaymentTrackingMode::Stats`] + /// mode, this will return an empty vector. In `Detailed` mode, payments are only stored until + /// they are aggregated into statistics based on the configured retention period. + /// + /// For example, to list all forwarded payments that earned at least 1000 msat in fees: + /// ```ignore + /// node.list_forwarded_payments_with_filter(|p| { + /// p.total_fee_earned_msat.unwrap_or(0) >= 1000 + /// }); + /// ``` + /// + /// [`ForwardedPaymentTrackingMode::Detailed`]: ForwardedPaymentTrackingMode::Detailed + /// [`ForwardedPaymentTrackingMode::Stats`]: ForwardedPaymentTrackingMode::Stats + pub fn list_forwarded_payments_with_filter bool>( + &self, f: F, + ) -> Vec { + self.forwarded_payment_store.list_filter(f) + } + + /// Retrieves all forwarded payments. + /// + /// **Note:** Individual forwarded payment records are only stored in + /// [`ForwardedPaymentTrackingMode::Detailed`] mode. In [`ForwardedPaymentTrackingMode::Stats`] + /// mode, this will return an empty vector since individual payment records are not stored. + /// In `Detailed` mode, payments are only stored until they are aggregated into statistics + /// based on the configured retention period. + /// + /// [`ForwardedPaymentTrackingMode::Detailed`]: ForwardedPaymentTrackingMode::Detailed + /// [`ForwardedPaymentTrackingMode::Stats`]: ForwardedPaymentTrackingMode::Stats + pub fn list_forwarded_payments(&self) -> Vec { + self.forwarded_payment_store.list_filter(|_| true) + } + + /// Returns the configured forwarded payment tracking mode. + pub fn forwarded_payment_tracking_mode(&self) -> ForwardedPaymentTrackingMode { + self.config.forwarded_payment_tracking_mode + } + + /// Retrieve the forwarding statistics for a specific channel. + /// + /// Returns `Some` if statistics exist for the given channel and `None` otherwise. + pub fn channel_forwarding_stats( + &self, channel_id: &ChannelId, + ) -> Option { + self.channel_forwarding_stats_store.get(channel_id) + } + + /// Retrieves all channel forwarding statistics. + pub fn list_channel_forwarding_stats(&self) -> Vec { + self.channel_forwarding_stats_store.list_filter(|_| true) + } + + /// Retrieves all channel forwarding statistics that match the given predicate. + /// + /// For example, to list stats for all channels that have earned at least 10000 msat in fees: + /// ```ignore + /// node.list_channel_forwarding_stats_with_filter(|s| { + /// s.total_fee_earned_msat >= 10000 + /// }); + /// ``` + pub fn list_channel_forwarding_stats_with_filter bool>( + &self, f: F, + ) -> Vec { + self.channel_forwarding_stats_store.list_filter(f) + } + + /// Retrieves all aggregated channel pair forwarding statistics. + /// + /// Returns statistics for forwarded payments that have been aggregated by channel pair. + /// These are created when individual forwarded payments expire based on the retention + /// period configured in [`ForwardedPaymentTrackingMode::Detailed`]. + /// + /// **Note:** Channel pair statistics are only created in `Detailed` mode. In + /// [`ForwardedPaymentTrackingMode::Stats`] mode, this will return an empty vector since + /// individual payments are not tracked or aggregated by channel pair in that mode. + /// + /// [`ForwardedPaymentTrackingMode::Detailed`]: ForwardedPaymentTrackingMode::Detailed + /// [`ForwardedPaymentTrackingMode::Stats`]: ForwardedPaymentTrackingMode::Stats + pub fn list_channel_pair_forwarding_stats(&self) -> Vec { + self.channel_pair_forwarding_stats_store.list_filter(|_| true) + } + + /// Retrieves all channel pair forwarding statistics that match the given predicate. + /// + /// **Note:** Channel pair statistics are only created in [`ForwardedPaymentTrackingMode::Detailed`] + /// mode when individual payments are aggregated based on the retention period. In + /// [`ForwardedPaymentTrackingMode::Stats`] mode, this will return an empty vector since + /// individual payments are not tracked or aggregated by channel pair in that mode. + /// + /// For example, to list stats for all channel pairs that have earned at least 50000 msat in fees: + /// ```ignore + /// node.list_channel_pair_forwarding_stats_with_filter(|s| { + /// s.total_fee_earned_msat >= 50000 + /// }); + /// ``` + /// + /// [`ForwardedPaymentTrackingMode::Detailed`]: ForwardedPaymentTrackingMode::Detailed + /// [`ForwardedPaymentTrackingMode::Stats`]: ForwardedPaymentTrackingMode::Stats + pub fn list_channel_pair_forwarding_stats_with_filter< + F: FnMut(&&ChannelPairForwardingStats) -> bool, + >( + &self, f: F, + ) -> Vec { + self.channel_pair_forwarding_stats_store.list_filter(f) + } + + /// Retrieves channel pair forwarding statistics within a specific time range. + /// + /// Returns all bucket entries where `bucket_start_timestamp` falls within `[start_timestamp, end_timestamp)`. + /// + /// Will only be available if [`Node::forwarded_payment_tracking_mode`] returns + /// [`ForwardedPaymentTrackingMode::Detailed`], otherwise an empty [`Vec`] will be returned. + /// + /// [`ForwardedPaymentTrackingMode::Detailed`]: ForwardedPaymentTrackingMode::Detailed + pub fn list_channel_pair_forwarding_stats_in_range( + &self, start_timestamp: u64, end_timestamp: u64, + ) -> Vec { + self.channel_pair_forwarding_stats_store.list_filter(|stats| { + stats.bucket_start_timestamp >= start_timestamp + && stats.bucket_start_timestamp < end_timestamp + }) + } + + /// Retrieves all forwarding statistics buckets for a specific channel pair. + /// + /// Returns all time buckets for the given inbound→outbound channel pair, + /// ordered by bucket timestamp. + /// + /// Will only be available if [`Node::forwarded_payment_tracking_mode`] returns + /// [`ForwardedPaymentTrackingMode::Detailed`], otherwise an empty [`Vec`] will be returned. + /// + /// [`ForwardedPaymentTrackingMode::Detailed`]: ForwardedPaymentTrackingMode::Detailed + pub fn list_channel_pair_forwarding_stats_for_pair( + &self, prev_channel_id: ChannelId, next_channel_id: ChannelId, + ) -> Vec { + let mut results = self.channel_pair_forwarding_stats_store.list_filter(|stats| { + stats.prev_channel_id == prev_channel_id && stats.next_channel_id == next_channel_id + }); + + // Sort by bucket timestamp for chronological ordering + results.sort_by_key(|stats| stats.bucket_start_timestamp); + results + } + /// Retrieves a list of known peers. pub fn list_peers(&self) -> Vec { let mut peers = Vec::new(); @@ -1878,3 +2128,271 @@ pub(crate) fn total_anchor_channels_reserve_sats( * anchor_channels_config.per_channel_reserve_sats }) } + +/// Aggregates multiple channel pair statistics buckets into cumulative totals. +/// +/// This is useful for calculating cumulative statistics across multiple time buckets. +/// All buckets must be for the same channel pair (prev_channel_id, next_channel_id). +/// +/// # Arguments +/// * `buckets` - Slice of statistics buckets to aggregate (must all be for same channel pair) +/// +/// # Returns +/// A single [`ChannelPairForwardingStats`] with: +/// - Summed payment counts, amounts, and fees +/// - Earliest `first_forwarded_at_timestamp` and latest `last_forwarded_at_timestamp` +/// - Recalculated averages based on total payment count +/// - `bucket_start_timestamp` set to the earliest bucket's timestamp +/// - `aggregated_at_timestamp` set to current time +/// +/// # Panics +/// Panics if buckets is empty or if buckets contain different channel pairs. +pub fn aggregate_channel_pair_stats( + buckets: &[ChannelPairForwardingStats], +) -> ChannelPairForwardingStats { + assert!(!buckets.is_empty(), "Cannot aggregate empty bucket list"); + + // Verify all buckets are for the same channel pair + let first = &buckets[0]; + for bucket in &buckets[1..] { + assert_eq!( + bucket.prev_channel_id, first.prev_channel_id, + "All buckets must have the same prev_channel_id" + ); + assert_eq!( + bucket.next_channel_id, first.next_channel_id, + "All buckets must have the same next_channel_id" + ); + } + + // Aggregate values + let mut payment_count = 0u64; + let mut total_inbound_amount_msat = 0u64; + let mut total_outbound_amount_msat = 0u64; + let mut total_fee_earned_msat = 0u64; + let mut total_skimmed_fee_msat = 0u64; + let mut onchain_claims_count = 0u64; + let mut first_forwarded_at_timestamp = u64::MAX; + let mut last_forwarded_at_timestamp = 0u64; + let mut earliest_bucket_start = u64::MAX; + + for bucket in buckets { + payment_count += bucket.payment_count; + total_inbound_amount_msat += bucket.total_inbound_amount_msat; + total_outbound_amount_msat += bucket.total_outbound_amount_msat; + total_fee_earned_msat += bucket.total_fee_earned_msat; + total_skimmed_fee_msat += bucket.total_skimmed_fee_msat; + onchain_claims_count += bucket.onchain_claims_count; + first_forwarded_at_timestamp = + first_forwarded_at_timestamp.min(bucket.first_forwarded_at_timestamp); + last_forwarded_at_timestamp = + last_forwarded_at_timestamp.max(bucket.last_forwarded_at_timestamp); + earliest_bucket_start = earliest_bucket_start.min(bucket.bucket_start_timestamp); + } + + // Calculate averages + let avg_fee_msat = if payment_count > 0 { total_fee_earned_msat / payment_count } else { 0 }; + let avg_inbound_amount_msat = + if payment_count > 0 { total_inbound_amount_msat / payment_count } else { 0 }; + + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + + // Create aggregated ID using earliest bucket timestamp + let aggregated_id = ChannelPairStatsId::from_channel_pair_and_bucket( + &first.prev_channel_id, + &first.next_channel_id, + earliest_bucket_start, + ); + + ChannelPairForwardingStats { + id: aggregated_id, + prev_channel_id: first.prev_channel_id, + next_channel_id: first.next_channel_id, + bucket_start_timestamp: earliest_bucket_start, + prev_node_id: first.prev_node_id, + next_node_id: first.next_node_id, + payment_count, + total_inbound_amount_msat, + total_outbound_amount_msat, + total_fee_earned_msat, + total_skimmed_fee_msat, + onchain_claims_count, + avg_fee_msat, + avg_inbound_amount_msat, + first_forwarded_at_timestamp, + last_forwarded_at_timestamp, + aggregated_at_timestamp: now, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use lightning::ln::types::ChannelId; + + #[test] + fn test_aggregate_channel_pair_stats() { + let prev_channel = ChannelId([1u8; 32]); + let next_channel = ChannelId([2u8; 32]); + + // Create 3 buckets with different statistics + let bucket1 = ChannelPairForwardingStats { + id: ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + 1738800000, + ), + prev_channel_id: prev_channel, + next_channel_id: next_channel, + bucket_start_timestamp: 1738800000, + prev_node_id: None, + next_node_id: None, + payment_count: 10, + total_inbound_amount_msat: 110000, + total_outbound_amount_msat: 100000, + total_fee_earned_msat: 10000, + total_skimmed_fee_msat: 1000, + onchain_claims_count: 2, + avg_fee_msat: 1000, + avg_inbound_amount_msat: 11000, + first_forwarded_at_timestamp: 1738800000, + last_forwarded_at_timestamp: 1738801000, + aggregated_at_timestamp: 1738802000, + }; + + let bucket2 = ChannelPairForwardingStats { + id: ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + 1738803600, + ), + prev_channel_id: prev_channel, + next_channel_id: next_channel, + bucket_start_timestamp: 1738803600, + prev_node_id: None, + next_node_id: None, + payment_count: 5, + total_inbound_amount_msat: 55000, + total_outbound_amount_msat: 50000, + total_fee_earned_msat: 5000, + total_skimmed_fee_msat: 500, + onchain_claims_count: 1, + avg_fee_msat: 1000, + avg_inbound_amount_msat: 11000, + first_forwarded_at_timestamp: 1738803600, + last_forwarded_at_timestamp: 1738804000, + aggregated_at_timestamp: 1738805000, + }; + + let bucket3 = ChannelPairForwardingStats { + id: ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + 1738807200, + ), + prev_channel_id: prev_channel, + next_channel_id: next_channel, + bucket_start_timestamp: 1738807200, + prev_node_id: None, + next_node_id: None, + payment_count: 15, + total_inbound_amount_msat: 165000, + total_outbound_amount_msat: 150000, + total_fee_earned_msat: 15000, + total_skimmed_fee_msat: 1500, + onchain_claims_count: 3, + avg_fee_msat: 1000, + avg_inbound_amount_msat: 11000, + first_forwarded_at_timestamp: 1738807200, + last_forwarded_at_timestamp: 1738808000, + aggregated_at_timestamp: 1738809000, + }; + + // Aggregate the buckets + let buckets = vec![bucket1, bucket2, bucket3]; + let aggregated = aggregate_channel_pair_stats(&buckets); + + // Verify aggregated results + assert_eq!(aggregated.payment_count, 30); // 10 + 5 + 15 + assert_eq!(aggregated.total_inbound_amount_msat, 330000); // 110000 + 55000 + 165000 + assert_eq!(aggregated.total_outbound_amount_msat, 300000); // 100000 + 50000 + 150000 + assert_eq!(aggregated.total_fee_earned_msat, 30000); // 10000 + 5000 + 15000 + assert_eq!(aggregated.total_skimmed_fee_msat, 3000); // 1000 + 500 + 1500 + assert_eq!(aggregated.onchain_claims_count, 6); // 2 + 1 + 3 + + // Verify averages are recalculated + assert_eq!(aggregated.avg_fee_msat, 1000); // 30000 / 30 + assert_eq!(aggregated.avg_inbound_amount_msat, 11000); // 330000 / 30 + + // Verify timestamps + assert_eq!(aggregated.first_forwarded_at_timestamp, 1738800000); // Earliest + assert_eq!(aggregated.last_forwarded_at_timestamp, 1738808000); // Latest + assert_eq!(aggregated.bucket_start_timestamp, 1738800000); // Earliest bucket + + // Verify channel pair is preserved + assert_eq!(aggregated.prev_channel_id, prev_channel); + assert_eq!(aggregated.next_channel_id, next_channel); + } + + #[test] + #[should_panic(expected = "Cannot aggregate empty bucket list")] + fn test_aggregate_channel_pair_stats_empty() { + let buckets: Vec = vec![]; + let _ = aggregate_channel_pair_stats(&buckets); + } + + #[test] + #[should_panic(expected = "All buckets must have the same prev_channel_id")] + fn test_aggregate_channel_pair_stats_different_channels() { + let bucket1 = ChannelPairForwardingStats { + id: ChannelPairStatsId::from_channel_pair_and_bucket( + &ChannelId([1u8; 32]), + &ChannelId([2u8; 32]), + 1738800000, + ), + prev_channel_id: ChannelId([1u8; 32]), + next_channel_id: ChannelId([2u8; 32]), + bucket_start_timestamp: 1738800000, + prev_node_id: None, + next_node_id: None, + payment_count: 10, + total_inbound_amount_msat: 110000, + total_outbound_amount_msat: 100000, + total_fee_earned_msat: 10000, + total_skimmed_fee_msat: 1000, + onchain_claims_count: 2, + avg_fee_msat: 1000, + avg_inbound_amount_msat: 11000, + first_forwarded_at_timestamp: 1738800000, + last_forwarded_at_timestamp: 1738801000, + aggregated_at_timestamp: 1738802000, + }; + + let bucket2 = ChannelPairForwardingStats { + id: ChannelPairStatsId::from_channel_pair_and_bucket( + &ChannelId([99u8; 32]), // Different channel! + &ChannelId([2u8; 32]), + 1738803600, + ), + prev_channel_id: ChannelId([99u8; 32]), // Different channel! + next_channel_id: ChannelId([2u8; 32]), + bucket_start_timestamp: 1738803600, + prev_node_id: None, + next_node_id: None, + payment_count: 5, + total_inbound_amount_msat: 55000, + total_outbound_amount_msat: 50000, + total_fee_earned_msat: 5000, + total_skimmed_fee_msat: 500, + onchain_claims_count: 1, + avg_fee_msat: 1000, + avg_inbound_amount_msat: 11000, + first_forwarded_at_timestamp: 1738803600, + last_forwarded_at_timestamp: 1738804000, + aggregated_at_timestamp: 1738805000, + }; + + let buckets = vec![bucket1, bucket2]; + let _ = aggregate_channel_pair_stats(&buckets); + } +} diff --git a/src/payment/mod.rs b/src/payment/mod.rs index 42b5aff3b..7f9a8a1be 100644 --- a/src/payment/mod.rs +++ b/src/payment/mod.rs @@ -22,6 +22,8 @@ pub use onchain::OnchainPayment; pub use pending_payment_store::PendingPaymentDetails; pub use spontaneous::SpontaneousPayment; pub use store::{ - ConfirmationStatus, LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, + ChannelForwardingStats, ChannelPairForwardingStats, ConfirmationStatus, + ForwardedPaymentDetails, ForwardedPaymentId, LSPFeeLimits, PaymentDetails, PaymentDirection, + PaymentKind, PaymentStatus, }; pub use unified::{UnifiedPayment, UnifiedPaymentResult}; diff --git a/src/payment/store.rs b/src/payment/store.rs index 15e94190c..0d5a82597 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -5,11 +5,16 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use std::collections::HashMap; +use std::fmt::{Debug, Display}; +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use bitcoin::secp256k1::PublicKey; use bitcoin::{BlockHash, Txid}; use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::DecodeError; +use lightning::ln::types::ChannelId; use lightning::offers::offer::OfferId; use lightning::util::ser::{Readable, Writeable}; use lightning::{ @@ -19,8 +24,11 @@ use lightning::{ use lightning_types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; use lightning_types::string::UntrustedString; -use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; +use crate::data_store::{DataStore, StorableObject, StorableObjectId, StorableObjectUpdate}; use crate::hex_utils; +use crate::logger::{log_debug, log_error, LdkLogger, Logger}; +use crate::types::UserChannelId; +use crate::Error; /// Represents a payment. #[derive(Clone, Debug, PartialEq, Eq)] @@ -761,4 +769,914 @@ mod tests { } } } + + #[test] + fn test_bucket_calculation() { + // Test that bucket timestamps are calculated correctly + let retention_minutes = 60; + let bucket_size_secs = retention_minutes * 60; // 3600 seconds + + // Payment at exactly bucket boundary + let timestamp1 = 1738800000; + let bucket1 = (timestamp1 / bucket_size_secs) * bucket_size_secs; + assert_eq!(bucket1, 1738800000); // Should be unchanged + + // Payment 1 second into bucket + let timestamp2 = 1738800001; + let bucket2 = (timestamp2 / bucket_size_secs) * bucket_size_secs; + assert_eq!(bucket2, 1738800000); // Should round down + + // Payment at end of bucket (3599 seconds in) + let timestamp3 = 1738803599; + let bucket3 = (timestamp3 / bucket_size_secs) * bucket_size_secs; + assert_eq!(bucket3, 1738800000); // Should still be in same bucket + + // Payment at start of next bucket + let timestamp4 = 1738803600; + let bucket4 = (timestamp4 / bucket_size_secs) * bucket_size_secs; + assert_eq!(bucket4, 1738803600); // Should be in next bucket + } + + #[test] + fn test_channel_pair_stats_id_with_bucket() { + use lightning::ln::types::ChannelId; + + let prev_channel = ChannelId([1u8; 32]); + let next_channel = ChannelId([2u8; 32]); + let bucket_timestamp = 1738800000u64; + + let id = ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + bucket_timestamp, + ); + + // Verify the ID contains all three components + assert_eq!(&id.0[0..32], &prev_channel.0); + assert_eq!(&id.0[32..64], &next_channel.0); + assert_eq!(&id.0[64..72], &bucket_timestamp.to_be_bytes()); + + // Verify different buckets create different IDs + let id2 = ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + bucket_timestamp + 3600, + ); + assert_ne!(id, id2); + } + + #[test] + fn test_payments_grouped_into_correct_buckets() { + use lightning::ln::types::ChannelId; + use std::collections::HashMap; + + let retention_minutes = 60; + let bucket_size_secs = retention_minutes * 60; + let base_time = 3600 * 100; // 360000 + + let prev_channel = ChannelId([1u8; 32]); + let next_channel = ChannelId([2u8; 32]); + + // Create test payments across different time buckets + let mut payments = Vec::new(); + + // 3 payments in bucket 1 + for i in 0..3 { + payments.push(( + base_time + i * 1000, + ForwardedPaymentDetails { + id: ForwardedPaymentId([i as u8; 32]), + prev_channel_id: prev_channel, + next_channel_id: next_channel, + prev_user_channel_id: None, + next_user_channel_id: None, + prev_node_id: None, + next_node_id: None, + total_fee_earned_msat: Some(1000), + skimmed_fee_msat: Some(100), + claim_from_onchain_tx: false, + outbound_amount_forwarded_msat: Some(10000), + forwarded_at_timestamp: base_time + i * 1000, + }, + )); + } + + // 2 payments in bucket 2 + for i in 3..5 { + payments.push(( + base_time + bucket_size_secs + (i - 3) * 1000, + ForwardedPaymentDetails { + id: ForwardedPaymentId([i as u8; 32]), + prev_channel_id: prev_channel, + next_channel_id: next_channel, + prev_user_channel_id: None, + next_user_channel_id: None, + prev_node_id: None, + next_node_id: None, + total_fee_earned_msat: Some(2000), + skimmed_fee_msat: Some(200), + claim_from_onchain_tx: false, + outbound_amount_forwarded_msat: Some(20000), + forwarded_at_timestamp: base_time + bucket_size_secs + (i - 3) * 1000, + }, + )); + } + + // Group payments by bucket (simulating aggregation logic) + let mut bucket_groups: HashMap> = HashMap::new(); + for (_, payment) in &payments { + let bucket_start = + (payment.forwarded_at_timestamp / bucket_size_secs) * bucket_size_secs; + bucket_groups.entry(bucket_start).or_insert_with(Vec::new).push(payment); + } + + // Verify we have 2 distinct buckets + assert_eq!(bucket_groups.len(), 2, "Should have 2 distinct buckets"); + + // Verify bucket 1 has 3 payments + let bucket1_start = (base_time / bucket_size_secs) * bucket_size_secs; + assert_eq!( + bucket_groups.get(&bucket1_start).unwrap().len(), + 3, + "Bucket 1 should have 3 payments" + ); + + // Verify bucket 2 has 2 payments + let bucket2_start = ((base_time + bucket_size_secs) / bucket_size_secs) * bucket_size_secs; + assert_eq!( + bucket_groups.get(&bucket2_start).unwrap().len(), + 2, + "Bucket 2 should have 2 payments" + ); + } + + #[test] + fn test_bucket_statistics_calculation() { + use lightning::ln::types::ChannelId; + + let prev_channel = ChannelId([1u8; 32]); + let next_channel = ChannelId([2u8; 32]); + let bucket_timestamp = 1738800000u64; + + // Simulate aggregating 3 payments + let mut total_fee = 0u64; + let mut total_inbound = 0u64; + let mut total_outbound = 0u64; + + for i in 1..=3 { + let fee = 1000 * i; + let outbound = 10000 * i; + let inbound = outbound + fee; + + total_fee += fee; + total_outbound += outbound; + total_inbound += inbound; + } + + let payment_count = 3; + let avg_fee = total_fee / payment_count; + let avg_inbound = total_inbound / payment_count; + + // Verify calculations + assert_eq!(total_fee, 6000); // 1000 + 2000 + 3000 + assert_eq!(total_outbound, 60000); // 10000 + 20000 + 30000 + assert_eq!(total_inbound, 66000); // 11000 + 22000 + 33000 + assert_eq!(avg_fee, 2000); + assert_eq!(avg_inbound, 22000); + + // Create stats entry + let id = ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + bucket_timestamp, + ); + + let stats = ChannelPairForwardingStats { + id, + prev_channel_id: prev_channel, + next_channel_id: next_channel, + bucket_start_timestamp: bucket_timestamp, + prev_node_id: None, + next_node_id: None, + payment_count, + total_inbound_amount_msat: total_inbound, + total_outbound_amount_msat: total_outbound, + total_fee_earned_msat: total_fee, + total_skimmed_fee_msat: 0, + onchain_claims_count: 0, + avg_fee_msat: avg_fee, + avg_inbound_amount_msat: avg_inbound, + first_forwarded_at_timestamp: bucket_timestamp, + last_forwarded_at_timestamp: bucket_timestamp + 1000, + aggregated_at_timestamp: bucket_timestamp + 2000, + }; + + assert_eq!(stats.payment_count, 3); + assert_eq!(stats.avg_fee_msat, 2000); + } + + #[test] + fn test_channel_pair_stats_serialization() { + use lightning::ln::types::ChannelId; + use lightning::util::ser::{Readable, Writeable}; + + let prev_channel = ChannelId([5u8; 32]); + let next_channel = ChannelId([6u8; 32]); + let bucket_timestamp = 1738800000u64; + + let id = ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel, + &next_channel, + bucket_timestamp, + ); + + let stats = ChannelPairForwardingStats { + id, + prev_channel_id: prev_channel, + next_channel_id: next_channel, + bucket_start_timestamp: bucket_timestamp, + prev_node_id: None, + next_node_id: None, + payment_count: 10, + total_inbound_amount_msat: 100000, + total_outbound_amount_msat: 90000, + total_fee_earned_msat: 10000, + total_skimmed_fee_msat: 1000, + onchain_claims_count: 2, + avg_fee_msat: 1000, + avg_inbound_amount_msat: 10000, + first_forwarded_at_timestamp: bucket_timestamp, + last_forwarded_at_timestamp: bucket_timestamp + 1000, + aggregated_at_timestamp: bucket_timestamp + 2000, + }; + + // Test serialization/deserialization + let encoded = stats.encode(); + let decoded = ChannelPairForwardingStats::read(&mut &encoded[..]).unwrap(); + + assert_eq!(stats, decoded); + assert_eq!(decoded.bucket_start_timestamp, bucket_timestamp); + assert_eq!(decoded.id.0[64..72], bucket_timestamp.to_be_bytes()); + } +} + +/// A unique identifier for a forwarded payment. +/// +/// This will be a randomly generated 32-byte identifier. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ForwardedPaymentId(pub [u8; 32]); + +impl StorableObjectId for ForwardedPaymentId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +impl Writeable for ForwardedPaymentId { + fn write( + &self, writer: &mut W, + ) -> Result<(), lightning::io::Error> { + self.0.write(writer) + } +} + +impl Readable for ForwardedPaymentId { + fn read(reader: &mut R) -> Result { + Ok(Self(Readable::read(reader)?)) + } +} + +/// Details of a payment that has been forwarded through this node. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ForwardedPaymentDetails { + /// A unique identifier for this forwarded payment. + pub id: ForwardedPaymentId, + /// The channel id of the incoming channel between the previous node and us. + pub prev_channel_id: ChannelId, + /// The channel id of the outgoing channel between the next node and us. + pub next_channel_id: ChannelId, + /// The `user_channel_id` of the incoming channel between the previous node and us. + /// + /// This is only None for events generated or serialized by versions prior to 0.3.0. + pub prev_user_channel_id: Option, + /// The `user_channel_id` of the outgoing channel between the next node and us. + /// + /// This will be `None` if the payment was settled via an on-chain transaction or if the + /// event was generated or serialized by versions prior to 0.3.0. + pub next_user_channel_id: Option, + /// The node id of the previous node. + pub prev_node_id: Option, + /// The node id of the next node. + pub next_node_id: Option, + /// The total fee, in milli-satoshis, which was earned as a result of the payment. + /// + /// Note that if we force-closed the channel over which we forwarded an HTLC while the HTLC + /// was pending, the amount the next hop claimed will have been rounded down to the nearest + /// whole satoshi. Thus, the fee calculated here may be higher than expected as we still + /// claimed the full value in millisatoshis from the source. + /// + /// If the channel which sent us the payment has been force-closed, we will claim the funds + /// via an on-chain transaction. In that case we do not yet know the on-chain transaction + /// fees which we will spend and will instead set this to `None`. It is possible duplicate + /// `PaymentForwarded` events are generated for the same payment iff `total_fee_earned_msat` + /// is `None`. + pub total_fee_earned_msat: Option, + /// The share of the total fee, in milli-satoshis, which was withheld in addition to the + /// forwarding fee. + /// + /// This will be `None` if no fee was skimmed from the forwarded HTLC. + pub skimmed_fee_msat: Option, + /// If this is `true`, the forwarded HTLC was claimed by our counterparty via an on-chain + /// transaction. + pub claim_from_onchain_tx: bool, + /// The final amount forwarded, in milli-satoshis, after the fee is deducted. + /// + /// The caveat described above the total_fee_earned_msat field applies here as well. + pub outbound_amount_forwarded_msat: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when the payment was forwarded. + pub forwarded_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ForwardedPaymentDetails, { + (0, id, required), + (2, prev_channel_id, required), + (4, next_channel_id, required), + (6, prev_user_channel_id, option), + (8, next_user_channel_id, option), + (10, prev_node_id, option), + (12, next_node_id, option), + (14, total_fee_earned_msat, option), + (16, skimmed_fee_msat, option), + (18, claim_from_onchain_tx, required), + (20, outbound_amount_forwarded_msat, option), + (22, forwarded_at_timestamp, required), +}); + +/// A no-op update type for [`ForwardedPaymentDetails`]. +/// +/// Forwarded payments are immutable once stored, so updates are not supported. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct ForwardedPaymentDetailsUpdate { + id: ForwardedPaymentId, +} + +impl StorableObjectUpdate for ForwardedPaymentDetailsUpdate { + fn id(&self) -> ForwardedPaymentId { + self.id + } +} + +impl StorableObject for ForwardedPaymentDetails { + type Id = ForwardedPaymentId; + type Update = ForwardedPaymentDetailsUpdate; + + fn id(&self) -> Self::Id { + self.id + } + + fn update(&mut self, _update: &Self::Update) -> bool { + // Forwarded payments are immutable, so updates are no-ops. + false + } + + fn to_update(&self) -> Self::Update { + ForwardedPaymentDetailsUpdate { id: self.id } + } +} + +/// Aggregate statistics for forwarded payments through a single channel. +/// +/// Each channel has one stats entry tracking all forwards where it was either +/// the inbound or outbound channel. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ChannelForwardingStats { + /// The channel this stats entry tracks. + pub channel_id: ChannelId, + /// The counterparty node id for this channel. + pub counterparty_node_id: Option, + /// Number of payments forwarded where this was the inbound channel. + pub inbound_payments_forwarded: u64, + /// Number of payments forwarded where this was the outbound channel. + pub outbound_payments_forwarded: u64, + /// Total amount received on this channel for forwarding (msat). + pub total_inbound_amount_msat: u64, + /// Total amount sent on this channel for forwarding (msat). + pub total_outbound_amount_msat: u64, + /// Total fees earned from forwards where this was the inbound channel (msat). + pub total_fee_earned_msat: u64, + /// Total skimmed fees (msat). + pub total_skimmed_fee_msat: u64, + /// Number of forwards claimed via onchain tx. + pub onchain_claims_count: u64, + /// Timestamp of first forward through this channel. + pub first_forwarded_at_timestamp: u64, + /// Timestamp of most recent forward through this channel. + pub last_forwarded_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ChannelForwardingStats, { + (0, channel_id, required), + (2, counterparty_node_id, option), + (4, inbound_payments_forwarded, required), + (6, outbound_payments_forwarded, required), + (8, total_inbound_amount_msat, required), + (10, total_outbound_amount_msat, required), + (12, total_fee_earned_msat, required), + (14, total_skimmed_fee_msat, required), + (16, onchain_claims_count, required), + (18, first_forwarded_at_timestamp, required), + (20, last_forwarded_at_timestamp, required), +}); + +/// Channel pair identifier (prev_channel -> next_channel -> time bucket). +/// Formed by concatenating prev_channel_id, next_channel_id, and bucket_start_timestamp. +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct ChannelPairStatsId(pub [u8; 72]); + +impl ChannelPairStatsId { + /// Create ID by concatenating prev and next channel IDs with bucket timestamp. + pub fn from_channel_pair_and_bucket( + prev: &ChannelId, next: &ChannelId, bucket_start_timestamp: u64, + ) -> Self { + let mut result = [0u8; 72]; + result[0..32].copy_from_slice(&prev.0); + result[32..64].copy_from_slice(&next.0); + result[64..72].copy_from_slice(&bucket_start_timestamp.to_be_bytes()); + Self(result) + } +} + +impl Writeable for ChannelPairStatsId { + fn write( + &self, writer: &mut W, + ) -> Result<(), lightning::io::Error> { + writer.write_all(&self.0) + } +} + +impl Readable for ChannelPairStatsId { + fn read(reader: &mut R) -> Result { + let mut bytes = [0u8; 72]; + reader.read_exact(&mut bytes)?; + Ok(Self(bytes)) + } +} + +impl Display for ChannelPairStatsId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex_utils::to_string(&self.0)) + } +} + +impl Debug for ChannelPairStatsId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ChannelPairStatsId({})", hex_utils::to_string(&self.0)) + } +} + +impl StorableObjectId for ChannelPairStatsId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +/// Aggregated statistics for a specific channel pair. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ChannelPairForwardingStats { + /// The unique identifier for this channel pair (derived from channel IDs and bucket timestamp). + pub id: ChannelPairStatsId, + /// The previous (inbound) channel ID. + pub prev_channel_id: ChannelId, + /// The next (outbound) channel ID. + pub next_channel_id: ChannelId, + /// Start of the time bucket (seconds since UNIX epoch). + pub bucket_start_timestamp: u64, + /// The previous (inbound) counterparty node id. + pub prev_node_id: Option, + /// The next (outbound) counterparty node id. + pub next_node_id: Option, + /// Number of payments forwarded through this channel pair. + pub payment_count: u64, + /// Total amount received on the inbound channel (msat). + pub total_inbound_amount_msat: u64, + /// Total amount sent on the outbound channel (msat). + pub total_outbound_amount_msat: u64, + /// Total fees earned from this channel pair (msat). + pub total_fee_earned_msat: u64, + /// Total skimmed fees (msat). + pub total_skimmed_fee_msat: u64, + /// Number of forwards claimed via onchain tx. + pub onchain_claims_count: u64, + /// Average fee per payment (msat). + pub avg_fee_msat: u64, + /// Average inbound amount per payment (msat). + pub avg_inbound_amount_msat: u64, + /// Timestamp of first forward through this channel pair. + pub first_forwarded_at_timestamp: u64, + /// Timestamp of most recent forward through this channel pair. + pub last_forwarded_at_timestamp: u64, + /// Timestamp when this entry was aggregated from individual payments. + pub aggregated_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ChannelPairForwardingStats, { + (0, id, required), + (2, prev_channel_id, required), + (4, next_channel_id, required), + (6, prev_node_id, option), + (8, next_node_id, option), + (10, payment_count, required), + (12, total_inbound_amount_msat, required), + (14, total_outbound_amount_msat, required), + (16, total_fee_earned_msat, required), + (18, total_skimmed_fee_msat, required), + (20, onchain_claims_count, required), + (22, avg_fee_msat, required), + (24, avg_inbound_amount_msat, required), + (26, first_forwarded_at_timestamp, required), + (28, last_forwarded_at_timestamp, required), + (30, aggregated_at_timestamp, required), + (32, bucket_start_timestamp, required), +}); + +/// Update type for [`ChannelForwardingStats`] that supports incrementing counters. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelForwardingStatsUpdate { + /// The channel ID being updated. + pub channel_id: ChannelId, + /// The counterparty node id (used when creating new entry). + pub counterparty_node_id: Option, + /// Increment for inbound payment count. + pub inbound_payments_increment: u64, + /// Increment for outbound payment count. + pub outbound_payments_increment: u64, + /// Increment for total inbound amount. + pub inbound_amount_increment_msat: u64, + /// Increment for total outbound amount. + pub outbound_amount_increment_msat: u64, + /// Increment for total fee earned. + pub fee_earned_increment_msat: u64, + /// Increment for skimmed fee. + pub skimmed_fee_increment_msat: u64, + /// Increment for onchain claims count. + pub onchain_claims_increment: u64, + /// Current timestamp for updating first/last timestamps. + pub timestamp: u64, +} + +impl StorableObjectUpdate for ChannelForwardingStatsUpdate { + fn id(&self) -> ChannelId { + self.channel_id + } +} + +impl StorableObjectId for ChannelId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +impl StorableObject for ChannelForwardingStats { + type Id = ChannelId; + type Update = ChannelForwardingStatsUpdate; + + fn id(&self) -> Self::Id { + self.channel_id + } + + fn update(&mut self, update: &Self::Update) -> bool { + debug_assert_eq!( + self.channel_id, update.channel_id, + "We should only ever update stats for the same channel id" + ); + + let mut updated = false; + + // Update counterparty if not already set + if self.counterparty_node_id.is_none() && update.counterparty_node_id.is_some() { + self.counterparty_node_id = update.counterparty_node_id; + updated = true; + } + + // Increment counters + if update.inbound_payments_increment > 0 { + self.inbound_payments_forwarded += update.inbound_payments_increment; + updated = true; + } + if update.outbound_payments_increment > 0 { + self.outbound_payments_forwarded += update.outbound_payments_increment; + updated = true; + } + if update.inbound_amount_increment_msat > 0 { + self.total_inbound_amount_msat += update.inbound_amount_increment_msat; + updated = true; + } + if update.outbound_amount_increment_msat > 0 { + self.total_outbound_amount_msat += update.outbound_amount_increment_msat; + updated = true; + } + if update.fee_earned_increment_msat > 0 { + self.total_fee_earned_msat += update.fee_earned_increment_msat; + updated = true; + } + if update.skimmed_fee_increment_msat > 0 { + self.total_skimmed_fee_msat += update.skimmed_fee_increment_msat; + updated = true; + } + if update.onchain_claims_increment > 0 { + self.onchain_claims_count += update.onchain_claims_increment; + updated = true; + } + + // Update timestamps + if updated { + self.first_forwarded_at_timestamp = + self.first_forwarded_at_timestamp.min(update.timestamp); + self.last_forwarded_at_timestamp = + self.last_forwarded_at_timestamp.max(update.timestamp); + } + + updated + } + + fn to_update(&self) -> Self::Update { + // This creates an update representing the current state as increments. + // This is primarily used for insert_or_update behavior. + ChannelForwardingStatsUpdate { + channel_id: self.channel_id, + counterparty_node_id: self.counterparty_node_id, + inbound_payments_increment: self.inbound_payments_forwarded, + outbound_payments_increment: self.outbound_payments_forwarded, + inbound_amount_increment_msat: self.total_inbound_amount_msat, + outbound_amount_increment_msat: self.total_outbound_amount_msat, + fee_earned_increment_msat: self.total_fee_earned_msat, + skimmed_fee_increment_msat: self.total_skimmed_fee_msat, + onchain_claims_increment: self.onchain_claims_count, + timestamp: self.last_forwarded_at_timestamp, + } + } +} + +/// Update type for [`ChannelPairForwardingStats`] that supports incrementing counters. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelPairForwardingStatsUpdate { + /// The channel pair ID being updated. + pub id: ChannelPairStatsId, + /// The previous channel ID. + pub prev_channel_id: ChannelId, + /// The next channel ID. + pub next_channel_id: ChannelId, + /// Start of the time bucket (seconds since UNIX epoch). + pub bucket_start_timestamp: u64, + /// The previous node id (used when creating new entry). + pub prev_node_id: Option, + /// The next node id (used when creating new entry). + pub next_node_id: Option, + /// Increment for payment count. + pub payment_count_increment: u64, + /// Increment for total inbound amount. + pub inbound_amount_increment_msat: u64, + /// Increment for total outbound amount. + pub outbound_amount_increment_msat: u64, + /// Increment for total fee earned. + pub fee_earned_increment_msat: u64, + /// Increment for skimmed fee. + pub skimmed_fee_increment_msat: u64, + /// Increment for onchain claims count. + pub onchain_claims_increment: u64, + /// Current timestamp for updating first/last timestamps. + pub timestamp: u64, +} + +impl StorableObjectUpdate for ChannelPairForwardingStatsUpdate { + fn id(&self) -> ChannelPairStatsId { + self.id + } +} + +impl StorableObject for ChannelPairForwardingStats { + type Id = ChannelPairStatsId; + type Update = ChannelPairForwardingStatsUpdate; + + fn id(&self) -> Self::Id { + self.id + } + + fn update(&mut self, update: &Self::Update) -> bool { + debug_assert_eq!( + self.id, update.id, + "We should only ever update stats for the same channel pair id" + ); + + let mut updated = false; + + // Update node ids if not already set + if self.prev_node_id.is_none() && update.prev_node_id.is_some() { + self.prev_node_id = update.prev_node_id; + updated = true; + } + if self.next_node_id.is_none() && update.next_node_id.is_some() { + self.next_node_id = update.next_node_id; + updated = true; + } + + // Increment counters + if update.payment_count_increment > 0 { + self.payment_count += update.payment_count_increment; + updated = true; + } + if update.inbound_amount_increment_msat > 0 { + self.total_inbound_amount_msat += update.inbound_amount_increment_msat; + updated = true; + } + if update.outbound_amount_increment_msat > 0 { + self.total_outbound_amount_msat += update.outbound_amount_increment_msat; + updated = true; + } + if update.fee_earned_increment_msat > 0 { + self.total_fee_earned_msat += update.fee_earned_increment_msat; + updated = true; + } + if update.skimmed_fee_increment_msat > 0 { + self.total_skimmed_fee_msat += update.skimmed_fee_increment_msat; + updated = true; + } + if update.onchain_claims_increment > 0 { + self.onchain_claims_count += update.onchain_claims_increment; + updated = true; + } + + // Update timestamps + if updated { + if self.first_forwarded_at_timestamp == 0 { + self.first_forwarded_at_timestamp = update.timestamp; + } + self.last_forwarded_at_timestamp = update.timestamp; + + // Recalculate averages + if self.payment_count > 0 { + self.avg_fee_msat = self.total_fee_earned_msat / self.payment_count; + self.avg_inbound_amount_msat = self.total_inbound_amount_msat / self.payment_count; + } + } + + updated + } + + fn to_update(&self) -> Self::Update { + // This creates an update representing the current state as increments. + // This is primarily used for insert_or_update behavior. + ChannelPairForwardingStatsUpdate { + id: self.id, + prev_channel_id: self.prev_channel_id, + next_channel_id: self.next_channel_id, + bucket_start_timestamp: self.bucket_start_timestamp, + prev_node_id: self.prev_node_id, + next_node_id: self.next_node_id, + payment_count_increment: self.payment_count, + inbound_amount_increment_msat: self.total_inbound_amount_msat, + outbound_amount_increment_msat: self.total_outbound_amount_msat, + fee_earned_increment_msat: self.total_fee_earned_msat, + skimmed_fee_increment_msat: self.total_skimmed_fee_msat, + onchain_claims_increment: self.onchain_claims_count, + timestamp: self.last_forwarded_at_timestamp, + } + } +} + +/// Aggregate expired forwarded payments into time-bucketed channel pair statistics. +/// +/// Returns (number of buckets created, number of payments aggregated). +pub(crate) fn aggregate_expired_forwarded_payments( + forwarded_payment_store: &DataStore>, + channel_pair_stats_store: &DataStore>, + retention_minutes: u64, logger: &Arc, +) -> Result<(u64, u64), Error> { + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + let cutoff = now.saturating_sub(retention_minutes * 60); + let bucket_size_secs = retention_minutes * 60; + + log_debug!( + logger, + "Aggregating forwarded payments older than {retention_minutes} mins (cutoff timestamp: {cutoff})" + ); + + // Get all expired payments + let expired = forwarded_payment_store.list_filter(|p| p.forwarded_at_timestamp < cutoff); + + if expired.is_empty() { + log_debug!(logger, "No expired forwarded payments found"); + return Ok((0, 0)); + } + + log_debug!(logger, "Found {} expired forwarded payments to aggregate", expired.len()); + + // Group by (channel_pair, bucket_start_timestamp) + let mut bucket_groups: HashMap<(ChannelId, ChannelId, u64), Vec<&ForwardedPaymentDetails>> = + HashMap::new(); + + for payment in &expired { + // Calculate which bucket this payment belongs to + let bucket_start = (payment.forwarded_at_timestamp / bucket_size_secs) * bucket_size_secs; + let key = (payment.prev_channel_id, payment.next_channel_id, bucket_start); + bucket_groups.entry(key).or_insert_with(Vec::new).push(payment); + } + + log_debug!(logger, "Grouped into {} time buckets", bucket_groups.len()); + + // Aggregate each bucket + let mut aggregated_bucket_count = 0u64; + let mut removed_payment_count = 0u64; + + for ((prev_channel_id, next_channel_id, bucket_start), payments) in bucket_groups { + debug_assert!(!payments.is_empty(), "Each bucket group should have at least one payment"); + + // Calculate aggregated values + let mut total_inbound_amount_msat = 0u64; + let mut total_outbound_amount_msat = 0u64; + let mut total_fee_earned_msat = 0u64; + let mut total_skimmed_fee_msat = 0u64; + let mut onchain_claims_count = 0u64; + let mut first_timestamp = u64::MAX; + let mut last_timestamp = 0u64; + + // Use first payment for node IDs (they should all be the same for a channel pair) + let first_payment = payments[0]; + let prev_node_id = first_payment.prev_node_id; + let next_node_id = first_payment.next_node_id; + + for payment in &payments { + let outbound = payment.outbound_amount_forwarded_msat.unwrap_or(0); + let fee = payment.total_fee_earned_msat.unwrap_or(0); + let skimmed = payment.skimmed_fee_msat.unwrap_or(0); + + total_inbound_amount_msat = + total_inbound_amount_msat.saturating_add(outbound.saturating_add(fee)); + total_outbound_amount_msat = total_outbound_amount_msat.saturating_add(outbound); + total_fee_earned_msat = total_fee_earned_msat.saturating_add(fee); + total_skimmed_fee_msat = total_skimmed_fee_msat.saturating_add(skimmed); + if payment.claim_from_onchain_tx { + onchain_claims_count += 1; + } + first_timestamp = first_timestamp.min(payment.forwarded_at_timestamp); + last_timestamp = last_timestamp.max(payment.forwarded_at_timestamp); + } + + let payment_count = payments.len() as u64; + let avg_fee_msat = total_fee_earned_msat / payment_count; + let avg_inbound_amount_msat = total_inbound_amount_msat / payment_count; + + let pair_id = ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel_id, + &next_channel_id, + bucket_start, + ); + + // Create the bucket stats entry + let stats = ChannelPairForwardingStats { + id: pair_id, + prev_channel_id, + next_channel_id, + bucket_start_timestamp: bucket_start, + prev_node_id, + next_node_id, + payment_count, + total_inbound_amount_msat, + total_outbound_amount_msat, + total_fee_earned_msat, + total_skimmed_fee_msat, + onchain_claims_count, + avg_fee_msat, + avg_inbound_amount_msat, + first_forwarded_at_timestamp: first_timestamp, + last_forwarded_at_timestamp: last_timestamp, + aggregated_at_timestamp: now, + }; + + // Insert the bucket (should be unique - no update needed) + channel_pair_stats_store.insert(stats).map_err(|e| { + log_error!(logger, "Failed to insert channel pair stats bucket for {pair_id:?}: {e}"); + e + })?; + + aggregated_bucket_count += 1; + + // Remove aggregated payments + for payment in payments { + forwarded_payment_store.remove(&payment.id).map_err(|e| { + log_error!(logger, "Failed to remove forwarded payment {:?}: {}", payment.id, e); + e + })?; + removed_payment_count += 1; + } + } + + log_debug!( + logger, + "Successfully aggregated {} payments into {} time buckets", + removed_payment_count, + aggregated_bucket_count + ); + + Ok((aggregated_bucket_count, removed_payment_count)) } diff --git a/src/types.rs b/src/types.rs index b5b1ffed7..3b7d84169 100644 --- a/src/types.rs +++ b/src/types.rs @@ -39,6 +39,9 @@ use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; +use crate::payment::store::{ + ChannelForwardingStats, ChannelPairForwardingStats, ForwardedPaymentDetails, +}; use crate::payment::{PaymentDetails, PendingPaymentDetails}; use crate::runtime::RuntimeSpawner; @@ -320,6 +323,10 @@ pub(crate) type BumpTransactionEventHandler = >; pub(crate) type PaymentStore = DataStore>; +pub(crate) type ForwardedPaymentStore = DataStore>; +pub(crate) type ChannelForwardingStatsStore = DataStore>; +pub(crate) type ChannelPairForwardingStatsStore = + DataStore>; /// A local, potentially user-provided, identifier of a channel. ///