diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index f20f93c789c..04d92901f32 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -41,6 +41,7 @@ use lightning::chain; use lightning::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType, }; +use lightning::chain::chainmonitor::MonitorEventSource; use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent}; use lightning::chain::transaction::OutPoint; use lightning::chain::{ @@ -364,9 +365,13 @@ impl chain::Watch for TestChainMonitor { fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { return self.chain_monitor.release_pending_monitor_events(); } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.chain_monitor.ack_monitor_event(source); + } } struct KeyProvider { diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index ca01e95c054..79ada7c70e6 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -66,6 +66,21 @@ use core::iter::Cycle; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; +/// Identifies the source of a [`MonitorEvent`] for acknowledgment via +/// [`chain::Watch::ack_monitor_event`] once the event has been processed. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct MonitorEventSource { + /// The event ID assigned by the [`ChannelMonitor`]. + pub event_id: u64, + /// The channel from which the [`MonitorEvent`] originated. + pub channel_id: ChannelId, +} + +impl_writeable_tlv_based!(MonitorEventSource, { + (1, event_id, required), + (3, channel_id, required), +}); + /// A pending operation queued for later execution when `ChainMonitor` is in deferred mode. enum PendingMonitorOp { /// A new monitor to insert and persist. @@ -366,9 +381,6 @@ pub struct ChainMonitor< fee_estimator: F, persister: P, _entropy_source: ES, - /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly - /// from the user and not from a [`ChannelMonitor`]. - pending_monitor_events: Mutex, PublicKey)>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, @@ -436,7 +448,6 @@ where logger, fee_estimator: feeest, _entropy_source, - pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), event_notifier: Arc::clone(&event_notifier), persister: AsyncPersister { persister, event_notifier }, @@ -657,7 +668,6 @@ where fee_estimator: feeest, persister, _entropy_source, - pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), event_notifier: Arc::new(Notifier::new()), pending_send_only_events: Mutex::new(Vec::new()), @@ -802,16 +812,11 @@ where return Ok(()); } let funding_txo = monitor_data.monitor.get_funding_txo(); - self.pending_monitor_events.lock().unwrap().push(( + monitor_data.monitor.push_monitor_event(MonitorEvent::Completed { funding_txo, channel_id, - vec![MonitorEvent::Completed { - funding_txo, - channel_id, - monitor_update_id: monitor_data.monitor.get_latest_update_id(), - }], - monitor_data.monitor.get_counterparty_node_id(), - )); + monitor_update_id: monitor_data.monitor.get_latest_update_id(), + }); self.event_notifier.notify(); Ok(()) @@ -824,14 +829,11 @@ where pub fn force_channel_monitor_updated(&self, channel_id: ChannelId, monitor_update_id: u64) { let monitors = self.monitors.read().unwrap(); let monitor = &monitors.get(&channel_id).unwrap().monitor; - let counterparty_node_id = monitor.get_counterparty_node_id(); - let funding_txo = monitor.get_funding_txo(); - self.pending_monitor_events.lock().unwrap().push(( - funding_txo, + monitor.push_monitor_event(MonitorEvent::Completed { + funding_txo: monitor.get_funding_txo(), channel_id, - vec![MonitorEvent::Completed { funding_txo, channel_id, monitor_update_id }], - counterparty_node_id, - )); + monitor_update_id, + }); self.event_notifier.notify(); } @@ -1266,21 +1268,13 @@ where // The channel is post-close (funding spend seen, lockdown, or // holder tx signed). Return InProgress so ChannelManager freezes // the channel until the force-close MonitorEvents are processed. - // Push a Completed event into pending_monitor_events so it gets - // picked up after the per-monitor events in the next - // release_pending_monitor_events call. - let funding_txo = monitor.get_funding_txo(); - let channel_id = monitor.channel_id(); - self.pending_monitor_events.lock().unwrap().push(( - funding_txo, - channel_id, - vec![MonitorEvent::Completed { - funding_txo, - channel_id, - monitor_update_id: monitor.get_latest_update_id(), - }], - monitor.get_counterparty_node_id(), - )); + // Push a Completed event into the monitor so it gets picked up + // in the next release_pending_monitor_events call. + monitor.push_monitor_event(MonitorEvent::Completed { + funding_txo: monitor.get_funding_txo(), + channel_id: monitor.channel_id(), + monitor_update_id: monitor.get_latest_update_id(), + }); log_debug!( logger, "Deferring completion of ChannelMonitorUpdate id {:?} (channel is post-close)", @@ -1645,7 +1639,7 @@ where fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() { let _ = self.channel_monitor_updated(channel_id, update_id); } @@ -1665,12 +1659,17 @@ where )); } } - // Drain pending_monitor_events (which includes deferred post-close - // completions) after per-monitor events so that force-close - // MonitorEvents are processed by ChannelManager first. - pending_monitor_events.extend(self.pending_monitor_events.lock().unwrap().split_off(0)); pending_monitor_events } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + let monitors = self.monitors.read().unwrap(); + if let Some(monitor_state) = monitors.get(&source.channel_id) { + monitor_state.monitor.ack_monitor_event(source.event_id); + } else { + debug_assert!(false, "Ack'd monitor events should always have a corresponding monitor"); + } + } } impl< diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 810de80da95..66681466069 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -68,8 +68,8 @@ use crate::util::byte_utils; use crate::util::logger::{Logger, WithContext}; use crate::util::persist::MonitorName; use crate::util::ser::{ - MaybeReadable, Readable, ReadableArgs, RequiredWrapper, UpgradableRequired, Writeable, Writer, - U48, + Iterable, MaybeReadable, Readable, ReadableArgs, RequiredWrapper, UpgradableRequired, + Writeable, Writer, U48, }; #[allow(unused_imports)] @@ -183,6 +183,15 @@ impl Readable for ChannelMonitorUpdate { } } +fn push_monitor_event( + pending_monitor_events: &mut Vec<(u64, MonitorEvent)>, event: MonitorEvent, + next_monitor_event_id: &mut u64, +) { + let id = *next_monitor_event_id; + *next_monitor_event_id += 1; + pending_monitor_events.push((id, event)); +} + /// An event to be processed by the ChannelManager. #[derive(Clone, PartialEq, Eq)] pub enum MonitorEvent { @@ -226,8 +235,6 @@ pub enum MonitorEvent { }, } impl_writeable_tlv_based_enum_upgradable_legacy!(MonitorEvent, - // Note that Completed is currently never serialized to disk as it is generated only in - // ChainMonitor. (0, Completed) => { (0, funding_txo, required), (2, monitor_update_id, required), @@ -254,12 +261,14 @@ pub struct HTLCUpdate { pub(crate) payment_preimage: Option, pub(crate) source: HTLCSource, pub(crate) htlc_value_satoshis: Option, + pub(crate) user_channel_id: Option, } impl_writeable_tlv_based!(HTLCUpdate, { (0, payment_hash, required), (1, htlc_value_satoshis, option), (2, source, required), (4, payment_preimage, option), + (5, user_channel_id, option), }); /// If an output goes from claimable only by us to claimable by us or our counterparty within this @@ -1204,6 +1213,8 @@ pub(crate) struct ChannelMonitorImpl { /// True if this channel was configured for manual funding broadcasts. Monitors written by /// versions prior to LDK 0.2 load with `false` until a new update persists it. is_manual_broadcast: bool, + /// The user-provided channel ID, used to populate events generated by the monitor. + user_channel_id: Option, /// True once we've observed either funding transaction on-chain. Older monitors prior to LDK 0.2 /// assume this is `true` when absent during upgrade so holder broadcasts aren't gated unexpectedly. /// In manual-broadcast channels we also use this to trigger deferred holder @@ -1280,7 +1291,19 @@ pub(crate) struct ChannelMonitorImpl { // Note that because the `event_lock` in `ChainMonitor` is only taken in // block/transaction-connected events and *not* during block/transaction-disconnected events, // we further MUST NOT generate events during block/transaction-disconnection. - pending_monitor_events: Vec, + pending_monitor_events: Vec<(u64, MonitorEvent)>, + // `MonitorEvent`s that have been provided to the `ChannelManager` via + // [`ChannelMonitor::get_and_clear_pending_monitor_events`] and are awaiting + // [`ChannelMonitor::ack_monitor_event`] for removal. If an event in this queue is not ack'd, it + // will be re-provided to the `ChannelManager` on startup. + provided_monitor_events: Vec<(u64, MonitorEvent)>, + /// When set, monitor events are retained until explicitly acked rather than cleared on read. + /// + /// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on + /// startup, and make the monitor responsible for both off- and on-chain payment resolution. Will + /// be always set once support for this feature is complete. + persistent_events_enabled: bool, + next_monitor_event_id: u64, pub(super) pending_events: Vec, pub(super) is_processing_pending_events: bool, @@ -1661,32 +1684,38 @@ pub(crate) fn write_chanmon_internal( writer.write_all(&payment_preimage.0[..])?; } - writer.write_all( - &(channel_monitor - .pending_monitor_events - .iter() - .filter(|ev| match ev { - MonitorEvent::HTLCEvent(_) => true, - MonitorEvent::HolderForceClosed(_) => true, - MonitorEvent::HolderForceClosedWithInfo { .. } => true, - _ => false, - }) - .count() as u64) - .to_be_bytes(), - )?; - for event in channel_monitor.pending_monitor_events.iter() { - match event { - MonitorEvent::HTLCEvent(upd) => { - 0u8.write(writer)?; - upd.write(writer)?; - }, - MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?, - // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep - // backwards compatibility, we write a `HolderForceClosed` event along with the - // `HolderForceClosedWithInfo` event. This is deduplicated in the reader. - MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?, - _ => {}, // Covered in the TLV writes below + if !channel_monitor.persistent_events_enabled { + writer.write_all( + &(channel_monitor + .pending_monitor_events + .iter() + .filter(|(_, ev)| match ev { + MonitorEvent::HTLCEvent(_) => true, + MonitorEvent::HolderForceClosed(_) => true, + MonitorEvent::HolderForceClosedWithInfo { .. } => true, + _ => false, + }) + .count() as u64) + .to_be_bytes(), + )?; + for (_, event) in channel_monitor.pending_monitor_events.iter() { + match event { + MonitorEvent::HTLCEvent(upd) => { + 0u8.write(writer)?; + upd.write(writer)?; + }, + MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?, + // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep + // backwards compatibility, we write a `HolderForceClosed` event along with the + // `HolderForceClosedWithInfo` event. This is deduplicated in the reader. + MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?, + _ => {}, // Covered in the TLV writes below + } } + } else { + // If `persistent_events_enabled` is set, we'll write the events with their event ids in the + // TLV section below. + writer.write_all(&(0u64).to_be_bytes())?; } writer.write_all(&(channel_monitor.pending_events.len() as u64).to_be_bytes())?; @@ -1719,24 +1748,47 @@ pub(crate) fn write_chanmon_internal( channel_monitor.lockdown_from_offchain.write(writer)?; channel_monitor.holder_tx_signed.write(writer)?; - // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` for backwards compatibility. - let pending_monitor_events = - match channel_monitor.pending_monitor_events.iter().find(|ev| match ev { - MonitorEvent::HolderForceClosedWithInfo { .. } => true, - _ => false, - }) { - Some(MonitorEvent::HolderForceClosedWithInfo { outpoint, .. }) => { - let mut pending_monitor_events = channel_monitor.pending_monitor_events.clone(); - pending_monitor_events.push(MonitorEvent::HolderForceClosed(*outpoint)); - pending_monitor_events - }, - _ => channel_monitor.pending_monitor_events.clone(), - }; + // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` + // for backwards compatibility. + let holder_force_closed_compat = + channel_monitor.pending_monitor_events.iter().find_map(|(_, ev)| { + if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev { + Some(MonitorEvent::HolderForceClosed(*outpoint)) + } else { + None + } + }); + let pending_monitor_events_legacy = if !channel_monitor.persistent_events_enabled { + Some(Iterable( + channel_monitor + .pending_monitor_events + .iter() + .map(|(_, ev)| ev) + .chain(holder_force_closed_compat.as_ref()), + )) + } else { + None + }; + + // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. + let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(()); + let pending_mon_evs_with_ids = if persistent_events_enabled.is_some() { + Some(Iterable( + channel_monitor + .provided_monitor_events + .iter() + .chain(channel_monitor.pending_monitor_events.iter()), + )) + } else { + None + }; write_tlv_fields!(writer, { (1, channel_monitor.funding_spend_confirmed, option), + (2, persistent_events_enabled, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), - (5, pending_monitor_events, required_vec), + (4, pending_mon_evs_with_ids, option), + (5, pending_monitor_events_legacy, option), // Equivalent to optional_vec because Iterable also writes as WithoutLength (7, channel_monitor.funding_spend_seen, required), (9, channel_monitor.counterparty_node_id, required), (11, channel_monitor.confirmed_commitment_tx_counterparty_output, option), @@ -1756,6 +1808,8 @@ pub(crate) fn write_chanmon_internal( (35, channel_monitor.is_manual_broadcast, required), (37, channel_monitor.funding_seen_onchain, required), (39, channel_monitor.best_block.previous_blocks, required), + (41, channel_monitor.next_monitor_event_id, required), + (43, channel_monitor.user_channel_id, option), // Added in 0.4 }); Ok(()) @@ -1860,7 +1914,7 @@ impl ChannelMonitor { commitment_transaction_number_obscure_factor: u64, initial_holder_commitment_tx: HolderCommitmentTransaction, best_block: BestBlock, counterparty_node_id: PublicKey, channel_id: ChannelId, - is_manual_broadcast: bool, + is_manual_broadcast: bool, user_channel_id: u128, ) -> ChannelMonitor { assert!(commitment_transaction_number_obscure_factor <= (1 << 48)); @@ -1909,6 +1963,7 @@ impl ChannelMonitor { pending_funding: vec![], is_manual_broadcast, + user_channel_id: Some(user_channel_id), funding_seen_onchain: false, latest_update_id: 0, @@ -1939,6 +1994,9 @@ impl ChannelMonitor { payment_preimages: new_hash_map(), pending_monitor_events: Vec::new(), + provided_monitor_events: Vec::new(), + persistent_events_enabled: false, + next_monitor_event_id: 0, pending_events: Vec::new(), is_processing_pending_events: false, @@ -2152,10 +2210,50 @@ impl ChannelMonitor { /// Get the list of HTLCs who's status has been updated on chain. This should be called by /// ChannelManager via [`chain::Watch::release_pending_monitor_events`]. - pub fn get_and_clear_pending_monitor_events(&self) -> Vec { + pub fn get_and_clear_pending_monitor_events(&self) -> Vec<(u64, MonitorEvent)> { self.inner.lock().unwrap().get_and_clear_pending_monitor_events() } + pub(super) fn push_monitor_event(&self, event: MonitorEvent) { + self.inner.lock().unwrap().push_monitor_event(event); + } + + /// Removes a [`MonitorEvent`] by its event ID, acknowledging that it has been processed. + /// Generally called by [`chain::Watch::ack_monitor_event`]. + pub fn ack_monitor_event(&self, event_id: u64) { + let inner = &mut *self.inner.lock().unwrap(); + inner.ack_monitor_event(event_id); + } + + /// Enables persistent monitor events mode. When enabled, monitor events are retained until + /// explicitly acked rather than cleared on read. + pub(crate) fn set_persistent_events_enabled(&self, enabled: bool) { + self.inner.lock().unwrap().persistent_events_enabled = enabled; + } + + pub(crate) fn has_pending_event_for_htlc(&self, source: &HTLCSource) -> bool { + self.inner.lock().unwrap().has_pending_event_for_htlc(source) + } + + /// Copies [`MonitorEvent`] state from `other` into `self`. + /// Used in tests to align transient runtime state before equality comparison after a + /// serialization round-trip. + #[cfg(any(test, feature = "_test_utils"))] + pub fn copy_monitor_event_state(&self, other: &ChannelMonitor) { + let (provided, pending, next_id) = { + let other_inner = other.inner.lock().unwrap(); + ( + other_inner.provided_monitor_events.clone(), + other_inner.pending_monitor_events.clone(), + other_inner.next_monitor_event_id, + ) + }; + let mut self_inner = self.inner.lock().unwrap(); + self_inner.provided_monitor_events = provided; + self_inner.pending_monitor_events = pending; + self_inner.next_monitor_event_id = next_id; + } + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] @@ -3202,7 +3300,11 @@ impl ChannelMonitor { // The HTLC was not included in the confirmed commitment transaction, // which has now reached ANTI_REORG_DELAY confirmations and thus the // HTLC has been failed. - res.insert(source.clone(), candidate_htlc.payment_hash); + // However, if we have the preimage, the HTLC was fulfilled off-chain + // and should not be reported as failed. + if !us.counterparty_fulfilled_htlcs.contains_key(&htlc_id) { + res.insert(source.clone(), candidate_htlc.payment_hash); + } } } }; @@ -3730,20 +3832,34 @@ impl ChannelMonitorImpl { self.prev_holder_htlc_data = Some(htlc_data); for (claimed_htlc_id, claimed_preimage) in claimed_htlcs { - #[cfg(debug_assertions)] - { - let cur_counterparty_htlcs = self - .funding - .counterparty_claimable_outpoints - .get(&self.funding.current_counterparty_commitment_txid.unwrap()) - .unwrap(); - assert!(cur_counterparty_htlcs.iter().any(|(_, source_opt)| { + let htlc_opt = self + .funding + .counterparty_claimable_outpoints + .get(&self.funding.current_counterparty_commitment_txid.unwrap()) + .unwrap() + .iter() + .find_map(|(htlc, source_opt)| { if let Some(source) = source_opt { - SentHTLCId::from_source(source) == *claimed_htlc_id - } else { - false + if SentHTLCId::from_source(source) == *claimed_htlc_id { + return Some((htlc, source)); + } } - })); + None + }); + debug_assert!(htlc_opt.is_some()); + if self.persistent_events_enabled { + if let Some((htlc, source)) = htlc_opt { + // If persistent_events_enabled is set, the ChannelMonitor is responsible for providing + // off-chain resolutions of HTLCs to the ChannelManager, will re-provide this event on + // startup until it is explicitly acked. + self.push_monitor_event(MonitorEvent::HTLCEvent(HTLCUpdate { + payment_hash: htlc.payment_hash, + payment_preimage: Some(*claimed_preimage), + source: *source.clone(), + htlc_value_satoshis: Some(htlc.amount_msat), + user_channel_id: self.user_channel_id, + })); + } } self.counterparty_fulfilled_htlcs.insert(*claimed_htlc_id, *claimed_preimage); } @@ -3881,7 +3997,7 @@ impl ChannelMonitorImpl { outpoint: funding_outpoint, channel_id: self.channel_id, }; - self.pending_monitor_events.push(event); + push_monitor_event(&mut self.pending_monitor_events, event, &mut self.next_monitor_event_id); } // Although we aren't signing the transaction directly here, the transaction will be signed @@ -4472,12 +4588,17 @@ impl ChannelMonitorImpl { "Failing HTLC from late counterparty commitment update immediately \ (funding spend already confirmed)" ); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { - payment_hash, - payment_preimage: None, - source: source.clone(), - htlc_value_satoshis, - })); + push_monitor_event( + &mut self.pending_monitor_events, + MonitorEvent::HTLCEvent(HTLCUpdate { + payment_hash, + payment_preimage: None, + source: source.clone(), + htlc_value_satoshis, + user_channel_id: self.user_channel_id, + }), + &mut self.next_monitor_event_id, + ); self.htlcs_resolved_on_chain.push(IrrevocablyResolvedHTLC { commitment_tx_output_idx: None, resolving_txid: Some(confirmed_txid), @@ -4542,10 +4663,41 @@ impl ChannelMonitorImpl { &self.outputs_to_watch } - fn get_and_clear_pending_monitor_events(&mut self) -> Vec { - let mut ret = Vec::new(); - mem::swap(&mut ret, &mut self.pending_monitor_events); - ret + fn push_monitor_event(&mut self, event: MonitorEvent) { + push_monitor_event( + &mut self.pending_monitor_events, + event, + &mut self.next_monitor_event_id, + ); + } + + fn ack_monitor_event(&mut self, event_id: u64) { + self.provided_monitor_events.retain(|(id, _)| *id != event_id); + // If this event was generated prior to a restart, it may be in this queue instead + self.pending_monitor_events.retain(|(id, _)| *id != event_id); + } + + fn has_pending_event_for_htlc(&self, htlc: &HTLCSource) -> bool { + let htlc_id = SentHTLCId::from_source(htlc); + self.pending_monitor_events.iter().any(|(_, ev)| { + if let MonitorEvent::HTLCEvent(upd) = ev { + return htlc_id == SentHTLCId::from_source(&upd.source); + } + false + }) + } + + fn get_and_clear_pending_monitor_events(&mut self) -> Vec<(u64, MonitorEvent)> { + if self.persistent_events_enabled { + let mut ret = Vec::new(); + mem::swap(&mut ret, &mut self.pending_monitor_events); + self.provided_monitor_events.extend(ret.iter().cloned()); + ret + } else { + let mut ret = Vec::new(); + mem::swap(&mut ret, &mut self.pending_monitor_events); + ret + } } /// Gets the set of events that are repeated regularly (e.g. those which RBF bump @@ -5601,7 +5753,7 @@ impl ChannelMonitorImpl { ); log_info!(logger, "Channel closed by funding output spend in txid {txid}"); if !self.funding_spend_seen { - self.pending_monitor_events.push(MonitorEvent::CommitmentTxConfirmed(())); + self.push_monitor_event(MonitorEvent::CommitmentTxConfirmed(())); } self.funding_spend_seen = true; @@ -5776,11 +5928,12 @@ impl ChannelMonitorImpl { log_debug!(logger, "HTLC {} failure update in {} has got enough confirmations to be passed upstream", &payment_hash, entry.txid); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + self.push_monitor_event(MonitorEvent::HTLCEvent(HTLCUpdate { payment_hash, payment_preimage: None, source, htlc_value_satoshis, + user_channel_id: self.user_channel_id, })); self.htlcs_resolved_on_chain.push(IrrevocablyResolvedHTLC { commitment_tx_output_idx, @@ -5872,8 +6025,8 @@ impl ChannelMonitorImpl { if inbound_htlc_expiry > max_expiry_height { continue; } - let duplicate_event = self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { + let duplicate_event = self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()) + .any(|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == *source } else { false }); if duplicate_event { @@ -5886,12 +6039,13 @@ impl ChannelMonitorImpl { log_error!(logger, "Failing back HTLC {} upstream to preserve the \ channel as the forward HTLC hasn't resolved and our backward HTLC \ expires soon at {}", log_bytes!(htlc.payment_hash.0), inbound_htlc_expiry); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source: source.clone(), payment_preimage: None, payment_hash: htlc.payment_hash, htlc_value_satoshis: Some(htlc.amount_msat / 1000), - })); + user_channel_id: self.user_channel_id, + }), &mut self.next_monitor_event_id); } } } @@ -6289,8 +6443,8 @@ impl ChannelMonitorImpl { // HTLC resolution backwards to and figure out whether we learned a preimage from it. if let Some((source, payment_hash, amount_msat)) = payment_data { if accepted_preimage_claim { - if !self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { + if !self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()).any( + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { txid: tx.compute_txid(), height, @@ -6303,16 +6457,17 @@ impl ChannelMonitorImpl { }, }); self.counterparty_fulfilled_htlcs.insert(SentHTLCId::from_source(&source), payment_preimage); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source, payment_preimage: Some(payment_preimage), payment_hash, htlc_value_satoshis: Some(amount_msat / 1000), - })); + user_channel_id: self.user_channel_id, + }), &mut self.next_monitor_event_id); } } else if offered_preimage_claim { - if !self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { + if !self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()).any( + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { @@ -6327,12 +6482,13 @@ impl ChannelMonitorImpl { }, }); self.counterparty_fulfilled_htlcs.insert(SentHTLCId::from_source(&source), payment_preimage); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source, payment_preimage: Some(payment_preimage), payment_hash, htlc_value_satoshis: Some(amount_msat / 1000), - })); + user_channel_id: self.user_channel_id, + }), &mut self.next_monitor_event_id); } } else { self.onchain_events_awaiting_threshold_conf.retain(|ref entry| { @@ -6625,16 +6781,16 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } - let pending_monitor_events_len: u64 = Readable::read(reader)?; - let mut pending_monitor_events = Some( - Vec::with_capacity(cmp::min(pending_monitor_events_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)))); - for _ in 0..pending_monitor_events_len { + let pending_monitor_events_legacy_len: u64 = Readable::read(reader)?; + let mut pending_monitor_events_legacy = Some( + Vec::with_capacity(cmp::min(pending_monitor_events_legacy_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)))); + for _ in 0..pending_monitor_events_legacy_len { let ev = match ::read(reader)? { 0 => MonitorEvent::HTLCEvent(Readable::read(reader)?), 1 => MonitorEvent::HolderForceClosed(outpoint), _ => return Err(DecodeError::InvalidValue) }; - pending_monitor_events.as_mut().unwrap().push(ev); + pending_monitor_events_legacy.as_mut().unwrap().push(ev); } let pending_events_len: u64 = Readable::read(reader)?; @@ -6696,10 +6852,16 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP let mut is_manual_broadcast = RequiredWrapper(None); let mut funding_seen_onchain = RequiredWrapper(None); let mut best_block_previous_blocks = None; + let mut user_channel_id: Option = None; + let mut persistent_events_enabled: Option<()> = None; + let mut next_monitor_event_id: Option = None; + let mut pending_mon_evs_with_ids: Option> = None; read_tlv_fields!(reader, { (1, funding_spend_confirmed, option), + (2, persistent_events_enabled, option), (3, htlcs_resolved_on_chain, optional_vec), - (5, pending_monitor_events, optional_vec), + (4, pending_mon_evs_with_ids, optional_vec), + (5, pending_monitor_events_legacy, optional_vec), (7, funding_spend_seen, option), (9, counterparty_node_id, option), (11, confirmed_commitment_tx_counterparty_output, option), @@ -6719,11 +6881,19 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (35, is_manual_broadcast, (default_value, false)), (37, funding_seen_onchain, (default_value, true)), (39, best_block_previous_blocks, option), // Added and always set in 0.3 + (41, next_monitor_event_id, option), + (43, user_channel_id, option), // Added in 0.4 }); if let Some(previous_blocks) = best_block_previous_blocks { best_block.previous_blocks = previous_blocks; } + #[cfg(not(any(feature = "_test_utils", test)))] + if persistent_events_enabled.is_some() { + // This feature isn't supported yet so error if the writer expected it to be. + return Err(DecodeError::InvalidValue) + } + // Note that `payment_preimages_with_info` was added (and is always written) in LDK 0.1, so // we can use it to determine if this monitor was last written by LDK 0.1 or later. let written_by_0_1_or_later = payment_preimages_with_info.is_some(); @@ -6746,13 +6916,29 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. If we have both // events, we can remove the `HolderForceClosed` event and just keep the `HolderForceClosedWithInfo`. - if let Some(ref mut pending_monitor_events) = pending_monitor_events { - if pending_monitor_events.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosed(_))) && - pending_monitor_events.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosedWithInfo { .. })) + if let Some(ref mut evs) = pending_monitor_events_legacy { + if evs.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosed(_))) && + evs.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosedWithInfo { .. })) { - pending_monitor_events.retain(|e| !matches!(e, MonitorEvent::HolderForceClosed(_))); - } - } + evs.retain(|e| !matches!(e, MonitorEvent::HolderForceClosed(_))); + } + } + + // If persistent events are enabled, use the events with their persisted IDs from TLV 4. + // Otherwise, use the legacy events from TLV 5 and assign sequential IDs. + let (next_monitor_event_id, pending_monitor_events): (u64, Vec<(u64, MonitorEvent)>) = + if persistent_events_enabled.is_some() { + let evs = pending_mon_evs_with_ids.unwrap_or_default() + .into_iter().map(|ev| (ev.0, ev.1)).collect(); + (next_monitor_event_id.unwrap_or(0), evs) + } else if let Some(events) = pending_monitor_events_legacy { + let next_id = next_monitor_event_id.unwrap_or(events.len() as u64); + let evs = events.into_iter().enumerate() + .map(|(i, ev)| (i as u64, ev)).collect(); + (next_id, evs) + } else { + (next_monitor_event_id.unwrap_or(0), Vec::new()) + }; let channel_parameters = channel_parameters.unwrap_or_else(|| { onchain_tx_handler.channel_parameters().clone() @@ -6840,6 +7026,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP }, pending_funding: pending_funding.unwrap_or(vec![]), is_manual_broadcast: is_manual_broadcast.0.unwrap(), + user_channel_id, // Older monitors prior to LDK 0.2 assume this is `true` when absent // during upgrade so holder broadcasts aren't gated unexpectedly. funding_seen_onchain: funding_seen_onchain.0.unwrap(), @@ -6871,7 +7058,10 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP current_holder_commitment_number, payment_preimages, - pending_monitor_events: pending_monitor_events.unwrap(), + pending_monitor_events, + provided_monitor_events: Vec::new(), + persistent_events_enabled: persistent_events_enabled.is_some(), + next_monitor_event_id, pending_events, is_processing_pending_events: false, @@ -6920,6 +7110,22 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } +/// Deserialization wrapper for reading a `(u64, MonitorEvent)`. +/// Necessary because we can't deserialize a (Readable, MaybeReadable) tuple due to trait +/// conflicts. +struct ReadableIdMonitorEvent(u64, MonitorEvent); + +impl MaybeReadable for ReadableIdMonitorEvent { + fn read(reader: &mut R) -> Result, DecodeError> { + let id: u64 = Readable::read(reader)?; + let event_opt: Option = MaybeReadable::read(reader)?; + match event_opt { + Some(ev) => Ok(Some(ReadableIdMonitorEvent(id, ev))), + None => Ok(None), + } + } +} + #[cfg(test)] pub(super) fn dummy_monitor( channel_id: ChannelId, wrap_signer: impl FnOnce(crate::sign::InMemorySigner) -> S, @@ -6982,6 +7188,7 @@ pub(super) fn dummy_monitor( dummy_key, channel_id, false, + 0, ) } diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 9692558cf7c..b7ff2cb917c 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -18,6 +18,7 @@ use bitcoin::network::Network; use bitcoin::script::{Script, ScriptBuf}; use bitcoin::secp256k1::PublicKey; +use crate::chain::chainmonitor::MonitorEventSource; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, ANTI_REORG_DELAY, }; @@ -416,6 +417,10 @@ pub trait Watch { /// Returns any monitor events since the last call. Subsequent calls must only return new /// events. /// + /// Each event comes with a corresponding id. Once the event is processed, call + /// [`Watch::ack_monitor_event`] with the corresponding id and channel id. Unacknowledged events + /// will be re-provided by this method after startup. + /// /// Note that after any block- or transaction-connection calls to a [`ChannelMonitor`], no /// further events may be returned here until the [`ChannelMonitor`] has been fully persisted /// to disk. @@ -424,7 +429,16 @@ pub trait Watch { /// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`]. fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)>; + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)>; + + /// Acknowledges and removes a [`MonitorEvent`] previously returned by + /// [`Watch::release_pending_monitor_events`] by its event ID. + /// + /// Once acknowledged, the event will no longer be returned by future calls to + /// [`Watch::release_pending_monitor_events`] and will not be replayed on restart. + /// + /// Events may be acknowledged in any order. + fn ack_monitor_event(&self, source: MonitorEventSource); } impl + ?Sized, W: Deref> @@ -444,9 +458,13 @@ impl + ?Sized, W: Der fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { self.deref().release_pending_monitor_events() } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.deref().ack_monitor_event(source) + } } /// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to diff --git a/lightning/src/ln/blinded_payment_tests.rs b/lightning/src/ln/blinded_payment_tests.rs index d62f79957eb..9fd97717bf7 100644 --- a/lightning/src/ln/blinded_payment_tests.rs +++ b/lightning/src/ln/blinded_payment_tests.rs @@ -854,7 +854,7 @@ fn do_blinded_intercept_payment(intercept_node_fails: bool) { do_claim_payment_along_route( ClaimAlongRouteArgs::new(&nodes[0], &[&[&nodes[1], &nodes[2]]], payment_preimage) ); - expect_payment_sent(&nodes[0], payment_preimage, Some(Some(1000)), true, true); + expect_payment_sent!(nodes[0], payment_preimage, Some(1000)); } #[test] @@ -1399,7 +1399,7 @@ fn conditionally_round_fwd_amt() { let mut args = ClaimAlongRouteArgs::new(&nodes[0], &expected_route[..], payment_preimage) .allow_1_msat_fee_overpay(); let expected_fee = pass_claimed_payment_along_route(args); - expect_payment_sent(&nodes[0], payment_preimage, Some(Some(expected_fee)), true, true); + expect_payment_sent!(nodes[0], payment_preimage, Some(expected_fee)); } #[test] diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index af4d1569d0c..0b1a9d0ca2c 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -2108,7 +2108,7 @@ fn monitor_update_claim_fail_no_response() { let mut bs_updates = get_htlc_update_msgs(&nodes[1], &node_a_id); nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_updates.update_fulfill_htlcs.remove(0)); do_commitment_signed_dance(&nodes[0], &nodes[1], &bs_updates.commitment_signed, false, false); - expect_payment_sent!(nodes[0], payment_preimage_1); + expect_payment_sent!(&nodes[0], payment_preimage_1); claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_2); } @@ -3450,7 +3450,10 @@ fn do_test_blocked_chan_preimage_release(completion_mode: BlockedUpdateComplMode let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); let persister; let new_chain_mon; - let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let mut cfg = test_default_channel_config(); + // If persistent_monitor_events is enabed, monitor updates will never be blocked. + cfg.override_persistent_monitor_events = Some(false); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, Some(cfg.clone()), Some(cfg)]); let nodes_1_reload; let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); @@ -3763,7 +3766,7 @@ fn do_test_inverted_mon_completion_order( ); // Finally, check that the payment was, ultimately, seen as sent by node A. - expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + expect_payment_sent!(&nodes[0], payment_preimage); } #[test] @@ -3951,7 +3954,7 @@ fn do_test_durable_preimages_on_closed_channel( mine_transactions(&nodes[0], &[&as_closing_tx[0], bs_preimage_tx]); check_closed_broadcast(&nodes[0], 1, false); - expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + expect_payment_sent!(&nodes[0], payment_preimage); if close_chans_before_reload && !hold_post_reload_mon_update { // For close_chans_before_reload with hold=false, the deferred completions @@ -4256,7 +4259,7 @@ fn do_test_glacial_peer_cant_hang(hold_chan_a: bool) { nodes[0].node.handle_update_fulfill_htlc(node_b_id, update_fulfill); let commitment = &a_update[0].commitment_signed; do_commitment_signed_dance(&nodes[0], &nodes[1], commitment, false, false); - expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + expect_payment_sent!(nodes[0], payment_preimage); expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false); pass_along_path( @@ -4936,6 +4939,7 @@ fn native_async_persist() { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let persistent_monitor_events = nodes[0].node.test_persistent_monitor_events_enabled(); let (_, _, chan_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 1); @@ -5021,7 +5025,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - assert!(matches!(completed_persist[0].2[0], MonitorEvent::Completed { .. })); + assert!(matches!(completed_persist[0].2[0].1, MonitorEvent::Completed { .. })); // Now test two async `ChannelMonitorUpdate`s in flight at once, completing them in-order but // separately. @@ -5069,7 +5073,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - assert!(matches!(completed_persist[0].2[0], MonitorEvent::Completed { .. })); + assert!(matches!(completed_persist[0].2[0].1, MonitorEvent::Completed { .. })); // Finally, test two async `ChanelMonitorUpdate`s in flight at once, completing them // out-of-order and ensuring that no `MonitorEvent::Completed` is generated until they are both @@ -5081,7 +5085,16 @@ fn native_async_persist() { assert_eq!(update_status, ChannelMonitorUpdateStatus::InProgress); persist_futures.poll_futures(); - assert_eq!(async_chain_monitor.release_pending_monitor_events().len(), 0); + let events = async_chain_monitor.release_pending_monitor_events(); + if persistent_monitor_events { + // With persistent monitor events, the LatestHolderCommitmentTXInfo update containing + // claimed_htlcs generates an HTLCEvent with the preimage. + assert_eq!(events.len(), 1); + assert_eq!(events[0].2.len(), 1); + assert!(matches!(events[0].2[0].1, MonitorEvent::HTLCEvent(..))); + } else { + assert!(events.is_empty()); + } let pending_writes = kv_store.list_pending_async_writes( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, @@ -5115,7 +5128,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - if let MonitorEvent::Completed { monitor_update_id, .. } = &completed_persist[0].2[0] { + if let (_, MonitorEvent::Completed { monitor_update_id, .. }) = &completed_persist[0].2[0] { assert_eq!(*monitor_update_id, 4); } else { panic!(); @@ -5223,7 +5236,7 @@ fn test_mpp_claim_to_holding_cell() { let claims = vec![(b_claim_msgs, node_b_id), (c_claim_msgs, node_c_id)]; pass_claimed_payment_along_route_from_ev(250_000, claims, args); - expect_payment_sent(&nodes[0], preimage_1, None, true, true); + expect_payment_sent!(nodes[0], preimage_1); expect_and_process_pending_htlcs(&nodes[3], false); expect_payment_claimable!(nodes[3], paymnt_hash_2, payment_secret_2, 400_000); diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 32c0e94bdc8..485d3c320d5 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -3617,7 +3617,7 @@ trait InitialRemoteCommitmentReceiver { funding.get_holder_selected_contest_delay(), &context.destination_script, &funding.channel_transaction_parameters, funding.is_outbound(), obscure_factor, holder_commitment_tx, best_block, context.counterparty_node_id, context.channel_id(), - context.is_manual_broadcast, + context.is_manual_broadcast, context.get_user_id(), ); channel_monitor.provide_initial_counterparty_commitment_tx( counterparty_initial_commitment_tx.clone(), diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 2c97e4adaa1..c10a067b1ed 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -42,6 +42,7 @@ use crate::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator, TransactionType, }; +use crate::chain::chainmonitor::MonitorEventSource; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, WithChannelMonitor, ANTI_REORG_DELAY, CLTV_CLAIM_BUFFER, HTLC_FAIL_BACK_BUFFER, @@ -1552,6 +1553,11 @@ pub(crate) enum EventCompletionAction { /// fully-resolved in the [`ChannelMonitor`], which we do via this action. /// Note that this action will be dropped on downgrade to LDK prior to 0.2! ReleasePaymentCompleteChannelMonitorUpdate(PaymentCompleteUpdate), + /// If [`ChannelManager::persistent_monitor_events`] is enabled, we may want to avoid acking a + /// monitor event via [`Watch::ack_monitor_event`] until after an [`Event`] is processed by the + /// user. For example, we may want a [`MonitorEvent::HTLCEvent`] to keep being re-provided to us + /// until after an [`Event::PaymentSent`] is processed. + AckMonitorEvent { event_id: MonitorEventSource }, } impl_writeable_tlv_based_enum!(EventCompletionAction, (0, ReleaseRAAChannelMonitorUpdate) => { @@ -1563,6 +1569,9 @@ impl_writeable_tlv_based_enum!(EventCompletionAction, } ChannelId::v1_from_funding_outpoint(channel_funding_outpoint.unwrap()) })), + }, + (3, AckMonitorEvent) => { + (1, event_id, required), } {1, ReleasePaymentCompleteChannelMonitorUpdate} => (), ); @@ -2928,6 +2937,15 @@ pub struct ChannelManager< /// [`ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee`] estimate. last_days_feerates: Mutex>, + /// When set, monitors will repeatedly provide an event back to the `ChannelManager` on restart + /// until the event is explicitly acknowledged as processed. + /// + /// Allows us to reconstruct pending HTLC state by replaying monitor events on startup, rather + /// than from complexly polling and reconciling Channel{Monitor} APIs, as well as move the + /// responsibility of off-chain payment resolution from the Channel to the monitor. Will be + /// always set once support is complete. + persistent_monitor_events: bool, + #[cfg(test)] pub(super) entropy_source: ES, #[cfg(not(test))] @@ -3603,6 +3621,8 @@ impl< our_network_pubkey, current_timestamp, expanded_inbound_key, node_signer.get_receive_auth_key(), secp_ctx.clone(), message_router, logger.clone(), ); + #[cfg(any(test, feature = "_test_utils"))] + let override_persistent_monitor_events = config.override_persistent_monitor_events; ChannelManager { config: RwLock::new(config), @@ -3658,6 +3678,28 @@ impl< signer_provider, logger, + + persistent_monitor_events: { + #[cfg(not(any(test, feature = "_test_utils")))] + { false } + #[cfg(any(test, feature = "_test_utils"))] + { + override_persistent_monitor_events.unwrap_or_else(|| { + use core::hash::{BuildHasher, Hasher}; + match std::env::var("LDK_TEST_PERSISTENT_MON_EVENTS") { + Ok(val) => match val.as_str() { + "1" => true, + "0" => false, + _ => panic!("LDK_TEST_PERSISTENT_MON_EVENTS must be 0 or 1, got: {}", val), + }, + Err(_) => { + let rand_val = std::collections::hash_map::RandomState::new().build_hasher().finish(); + rand_val % 2 == 0 + }, + } + }) + } + }, } } @@ -9958,6 +10000,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ next_channel_counterparty_node_id: PublicKey, next_channel_outpoint: OutPoint, next_channel_id: ChannelId, next_user_channel_id: Option, attribution_data: Option, send_timestamp: Option, + monitor_event_id: Option, ) { let startup_replay = !self.background_events_processed_since_startup.load(Ordering::Acquire); @@ -9970,7 +10013,18 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ "We don't support claim_htlc claims during startup - monitors may not be available yet"); debug_assert_eq!(next_channel_counterparty_node_id, path.hops[0].pubkey); - let mut ev_completion_action = if from_onchain { + let mut ev_completion_action = if self.persistent_monitor_events { + // If persistent monitor events is enabled: + // * If the channel is on-chain, we don't need to use ReleasePaymentComplete like we do + // below because we will stop hearing about this payment after the relevant monitor event + // is acked + // * If the channel is open, we don't need to block the preimage-removing monitor update + // like we do below because we will keep hearing about the preimage until we explicitly + // ack the monitor event for this payment + monitor_event_id.map(|event_id| EventCompletionAction::AckMonitorEvent { + event_id: MonitorEventSource { channel_id: next_channel_id, event_id }, + }) + } else if from_onchain { let release = PaymentCompleteUpdate { counterparty_node_id: next_channel_counterparty_node_id, channel_funding_outpoint: next_channel_outpoint, @@ -9992,6 +10046,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(payment_preimage.into()), payment_id, ); + let best_block_height = self.best_block.read().unwrap().height; self.pending_outbound_payments.claim_htlc( payment_id, payment_preimage, @@ -9999,10 +10054,30 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ session_priv, path, from_onchain, + best_block_height, &mut ev_completion_action, &self.pending_events, &logger, ); + + if matches!( + ev_completion_action, + Some(EventCompletionAction::AckMonitorEvent { .. }) + ) { + // If the `PaymentSent` for this redundant claim is still pending, add the event + // completion action here to ensure the `PaymentSent` will always be regenerated until it + // is processed by the user -- as long as the monitor event corresponding to this + // completion action is not acked, it will continue to be re-provided on startup. + let mut pending_events = self.pending_events.lock().unwrap(); + for (ev, act_opt) in pending_events.iter_mut() { + let found_payment_sent = matches!(ev, Event::PaymentSent { payment_id: Some(id), .. } if *id == payment_id); + if found_payment_sent && act_opt.is_none() { + *act_opt = ev_completion_action.take(); + break; + } + } + } + // If an event was generated, `claim_htlc` set `ev_completion_action` to None, if // not, we should go ahead and run it now (as the claim was duplicative), at least // if a PaymentClaimed event with the same action isn't already pending. @@ -11602,6 +11677,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ fail_chan!("Already had channel with the new channel_id"); }, hash_map::Entry::Vacant(e) => { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); if let Ok(persist_state) = monitor_res { // There's no problem signing a counterparty's funding transaction if our monitor @@ -11772,6 +11848,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ match chan .funding_signed(&msg, best_block, &self.signer_provider, &self.logger) .and_then(|(funded_chan, monitor)| { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); self.chain_monitor .watch_channel(funded_chan.context.channel_id(), monitor) .map_err(|()| { @@ -12602,6 +12679,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(next_user_channel_id), msg.attribution_data, send_timestamp, + None, ); Ok(()) @@ -12686,6 +12764,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ if let Some(chan) = chan.as_funded_mut() { if let Some(monitor) = monitor_opt { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); if let Ok(persist_state) = monitor_res { @@ -12833,6 +12912,13 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ }) } + /// Returns `true` if `ChannelManager::persistent_monitor_events` is enabled. This flag will only + /// be set randomly in tests for now. + #[cfg(any(test, feature = "_test_utils"))] + pub fn test_persistent_monitor_events_enabled(&self) -> bool { + self.persistent_monitor_events + } + #[cfg(any(test, feature = "_test_utils"))] pub(crate) fn test_raa_monitor_updates_held( &self, counterparty_node_id: PublicKey, channel_id: ChannelId, @@ -13511,7 +13597,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ for (funding_outpoint, channel_id, mut monitor_events, counterparty_node_id) in pending_monitor_events.drain(..) { - for monitor_event in monitor_events.drain(..) { + for (event_id, monitor_event) in monitor_events.drain(..) { + let monitor_event_source = MonitorEventSource { event_id, channel_id }; match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { let logger = WithContext::from( @@ -13526,21 +13613,41 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ "Claiming HTLC with preimage {} from our monitor", preimage ); - // Claim the funds from the previous hop, if there is one. Because this is in response to a - // chain event, no attribution data is available. - self.claim_funds_internal( - htlc_update.source, - preimage, - htlc_update.htlc_value_satoshis.map(|v| v * 1000), - None, - true, - counterparty_node_id, - funding_outpoint, - channel_id, - None, - None, - None, - ); + let from_onchain = self + .per_peer_state + .read() + .unwrap() + .get(&counterparty_node_id) + .map_or(true, |peer_state_mtx| { + !peer_state_mtx + .lock() + .unwrap() + .channel_by_id + .contains_key(&channel_id) + }); + let we_are_sender = + matches!(htlc_update.source, HTLCSource::OutboundRoute { .. }); + if from_onchain | we_are_sender { + // Claim the funds from the previous hop, if there is one. In the future we can + // store attribution data in the `ChannelMonitor` and provide it here. + self.claim_funds_internal( + htlc_update.source, + preimage, + htlc_update.htlc_value_satoshis.map(|v| v * 1000), + None, + from_onchain, + counterparty_node_id, + funding_outpoint, + channel_id, + htlc_update.user_channel_id, + None, + None, + Some(event_id), + ); + } + if !we_are_sender { + self.chain_monitor.ack_monitor_event(monitor_event_source); + } } else { log_trace!(logger, "Failing HTLC from our monitor"); let failure_reason = LocalHTLCFailureReason::OnChainTimeout; @@ -13560,6 +13667,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ failure_type, completion_update, ); + self.chain_monitor.ack_monitor_event(monitor_event_source); } }, MonitorEvent::HolderForceClosed(_) @@ -13594,6 +13702,9 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ failed_channels.push((Err(e), counterparty_node_id)); } } + // Channel close monitor events do not need to be replayed on startup because we + // already check the monitors to see if the channel is closed. + self.chain_monitor.ack_monitor_event(monitor_event_source); }, MonitorEvent::CommitmentTxConfirmed(_) => { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -13615,6 +13726,9 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ failed_channels.push((Err(e), counterparty_node_id)); } } + // Channel close monitor events do not need to be replayed on startup because we + // already check the monitors to see if the channel is closed. + self.chain_monitor.ack_monitor_event(monitor_event_source); }, MonitorEvent::Completed { channel_id, monitor_update_id, .. } => { self.channel_monitor_updated( @@ -13622,6 +13736,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(monitor_update_id), &counterparty_node_id, ); + self.chain_monitor.ack_monitor_event(monitor_event_source); }, } } @@ -15312,6 +15427,9 @@ impl< } } }, + EventCompletionAction::AckMonitorEvent { event_id } => { + self.chain_monitor.ack_monitor_event(event_id); + }, } } } @@ -18102,6 +18220,9 @@ impl< } } + // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. + let persistent_monitor_events = self.persistent_monitor_events.then_some(()); + write_tlv_fields!(writer, { (1, pending_outbound_payments_no_retry, required), (2, pending_intercepted_htlcs, option), @@ -18114,6 +18235,7 @@ impl< (9, htlc_purposes, required_vec), (10, legacy_in_flight_monitor_updates, option), (11, self.probing_cookie_secret, required), + (12, persistent_monitor_events, option), (13, htlc_onion_fields, optional_vec), (14, decode_update_add_htlcs_opt, option), (15, self.inbound_payment_id_secret, required), @@ -18213,6 +18335,7 @@ pub(super) struct ChannelManagerData { forward_htlcs_legacy: HashMap>, pending_intercepted_htlcs_legacy: HashMap, decode_update_add_htlcs_legacy: HashMap>, + persistent_monitor_events: bool, // The `ChannelManager` version that was written. version: u8, } @@ -18399,6 +18522,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> let mut peer_storage_dir: Option)>> = None; let mut async_receive_offer_cache: AsyncReceiveOfferCache = AsyncReceiveOfferCache::new(); let mut best_block_previous_blocks = None; + let mut persistent_monitor_events: Option<()> = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs_legacy, option), @@ -18411,6 +18535,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> (9, claimable_htlc_purposes, optional_vec), (10, legacy_in_flight_monitor_updates, option), (11, probing_cookie_secret, option), + (12, persistent_monitor_events, option), (13, amountless_claimable_htlc_onion_fields, optional_vec), (14, decode_update_add_htlcs_legacy, option), (15, inbound_payment_id_secret, option), @@ -18420,6 +18545,12 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> (23, best_block_previous_blocks, option), }); + #[cfg(not(any(feature = "_test_utils", test)))] + if persistent_monitor_events.is_some() { + // This feature isn't supported yet so error if the writer expected it to be. + return Err(DecodeError::InvalidValue); + } + // Merge legacy pending_outbound_payments fields into a single HashMap. // Priority: pending_outbound_payments (TLV 3) > pending_outbound_payments_no_retry (TLV 1) // > pending_outbound_payments_compat (non-TLV legacy) @@ -18539,6 +18670,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> peer_storage_dir: peer_storage_dir.unwrap_or_default(), async_receive_offer_cache, version, + persistent_monitor_events: persistent_monitor_events.is_some(), }) } } @@ -18839,6 +18971,7 @@ impl< mut in_flight_monitor_updates, peer_storage_dir, async_receive_offer_cache, + persistent_monitor_events, version: _version, } = data; @@ -18966,6 +19099,14 @@ impl< break; } } + if persistent_monitor_events { + // This will not be necessary once we have persistent events for HTLC failures, we + // can delete this whole loop and wait to re-process the pending monitor events + // rather than failing them proactively below. + if monitor.has_pending_event_for_htlc(&channel_htlc_source) { + found_htlc = true; + } + } if !found_htlc { // If we have some HTLCs in the channel which are not present in the newer // ChannelMonitor, they have been removed and should be failed back to @@ -19594,6 +19735,12 @@ impl< .. } => { if let Some(preimage) = preimage_opt { + if persistent_monitor_events { + // If persistent_monitor_events is enabled, then the monitor will keep + // providing us with a monitor event for this claim until the ChannelManager + // explicitly marks it as resolved, no need to explicitly handle it here. + continue; + } let pending_events = Mutex::new(pending_events_read); let update = PaymentCompleteUpdate { counterparty_node_id: monitor.get_counterparty_node_id(), @@ -19611,6 +19758,7 @@ impl< session_priv, path, true, + best_block.height, &mut compl_action, &pending_events, &logger, @@ -20100,6 +20248,8 @@ impl< logger: args.logger, config: RwLock::new(args.config), + + persistent_monitor_events, }; let mut processed_claims: HashSet> = new_hash_set(); @@ -20430,6 +20580,7 @@ impl< downstream_user_channel_id, None, None, + None, ); } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index d39cee78b0f..3ae7d893c27 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -1260,7 +1260,20 @@ pub fn check_added_monitors>(node: & if let Some(chain_monitor) = node.chain_monitor() { let mut added_monitors = chain_monitor.added_monitors.lock().unwrap(); let n = added_monitors.len(); - assert_eq!(n, count, "expected {} monitors to be added, not {}", count, n); + if n != count { + let recent = chain_monitor.recent_monitor_updates.lock().unwrap(); + let mut desc = String::new(); + for (i, (chan_id, update)) in recent.iter().take(n).enumerate() { + desc += &format!( + "\n [{}] chan={} update_id={} steps={:?}", + i, chan_id, update.update_id, update.updates + ); + } + panic!( + "expected {} monitors to be added, not {}. Last {} updates (most recent first):{}", + count, n, n, desc + ); + } added_monitors.clear(); } } @@ -3061,7 +3074,7 @@ macro_rules! expect_payment_sent { $expected_payment_preimage, $expected_fee_msat_opt.map(|o| Some(o)), $expect_paths, - true, + if $node.node.test_persistent_monitor_events_enabled() { false } else { true }, ) }; } @@ -4222,7 +4235,15 @@ pub fn claim_payment_along_route( do_claim_payment_along_route(args) + expected_extra_total_fees_msat; if !skip_last { - expect_payment_sent!(origin_node, payment_preimage, Some(expected_total_fee_msat)) + let expect_post_ev_mon_update = + if origin_node.node.test_persistent_monitor_events_enabled() { false } else { true }; + expect_payment_sent( + origin_node, + payment_preimage, + Some(Some(expected_total_fee_msat)), + true, + expect_post_ev_mon_update, + ) } else { (None, Vec::new()) } diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 8d9df062868..bf5c54c86b6 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -1358,6 +1358,10 @@ pub fn do_test_multiple_package_conflicts(p2a_anchor: bool) { }; assert_eq!(updates.update_fulfill_htlcs.len(), 1); nodes[0].node.handle_update_fulfill_htlc(node_b_id, updates.update_fulfill_htlcs.remove(0)); + if nodes[0].node.test_persistent_monitor_events_enabled() { + // If persistent_monitor_events is enabled, the RAA monitor update is not blocked. + check_added_monitors(&nodes[0], 1); + } do_commitment_signed_dance(&nodes[0], &nodes[1], &updates.commitment_signed, false, false); expect_payment_sent!(nodes[0], preimage_2); @@ -1625,7 +1629,10 @@ pub fn test_htlc_on_chain_success() { check_closed_broadcast(&nodes[0], 1, true); check_added_monitors(&nodes[0], 1); let events = nodes[0].node.get_and_clear_pending_events(); - check_added_monitors(&nodes[0], 2); + check_added_monitors( + &nodes[0], + if nodes[0].node.test_persistent_monitor_events_enabled() { 0 } else { 2 }, + ); assert_eq!(events.len(), 5); let mut first_claimed = false; for event in events { @@ -2672,7 +2679,10 @@ pub fn test_simple_peer_disconnect() { _ => panic!("Unexpected event"), } } - check_added_monitors(&nodes[0], 1); + check_added_monitors( + &nodes[0], + if nodes[0].node.test_persistent_monitor_events_enabled() { 0 } else { 1 }, + ); claim_payment(&nodes[0], &[&nodes[1], &nodes[2]], payment_preimage_4); fail_payment(&nodes[0], &[&nodes[1], &nodes[2]], payment_hash_6); @@ -4295,7 +4305,7 @@ pub fn test_duplicate_payment_hash_one_failure_one_success() { nodes[0].node.handle_update_fulfill_htlc(node_b_id, updates.update_fulfill_htlcs.remove(0)); do_commitment_signed_dance(&nodes[0], &nodes[1], &updates.commitment_signed, false, false); - expect_payment_sent(&nodes[0], our_payment_preimage, None, true, true); + expect_payment_sent!(&nodes[0], our_payment_preimage); } #[xtest(feature = "_externalize_tests")] @@ -8574,7 +8584,9 @@ pub fn test_inconsistent_mpp_params() { pass_along_path(&nodes[0], path_b, real_amt, hash, Some(payment_secret), event, true, None); do_claim_payment_along_route(ClaimAlongRouteArgs::new(&nodes[0], &[path_a, path_b], preimage)); - expect_payment_sent(&nodes[0], preimage, Some(None), true, true); + let expect_post_ev_mon_update = + if nodes[0].node.test_persistent_monitor_events_enabled() { false } else { true }; + expect_payment_sent(&nodes[0], preimage, Some(None), true, expect_post_ev_mon_update); } #[xtest(feature = "_externalize_tests")] @@ -9932,7 +9944,10 @@ fn do_test_multi_post_event_actions(do_reload: bool) { let chanmon_cfgs = create_chanmon_cfgs(3); let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); let (persister, chain_monitor); - let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let mut cfg = test_default_channel_config(); + // If persistent_monitor_events is enabled, RAAs will not be blocked on events. + cfg.override_persistent_monitor_events = Some(false); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[Some(cfg), None, None]); let node_a_reload; let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); diff --git a/lightning/src/ln/invoice_utils.rs b/lightning/src/ln/invoice_utils.rs index 63ad110bba0..7a0c0b2b8db 100644 --- a/lightning/src/ln/invoice_utils.rs +++ b/lightning/src/ln/invoice_utils.rs @@ -1344,7 +1344,7 @@ mod test { &[&[&nodes[fwd_idx]]], payment_preimage, )); - expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + expect_payment_sent!(&nodes[0], payment_preimage); } #[test] diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index f52f093917b..5dca3c4a582 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -274,7 +274,7 @@ fn archive_fully_resolved_monitors() { // Finally, we process the pending `MonitorEvent` from nodes[0], allowing the `ChannelMonitor` // to be archived `ARCHIVAL_DELAY_BLOCKS` blocks later. - expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + expect_payment_sent!(&nodes[0], payment_preimage); nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors(); assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1); connect_blocks(&nodes[0], ARCHIVAL_DELAY_BLOCKS - 1); @@ -747,9 +747,9 @@ fn do_test_claim_value_force_close(keyed_anchors: bool, p2a_anchor: bool, prev_c mine_transaction(&nodes[0], &b_broadcast_txn[0]); if prev_commitment_tx { expect_payment_path_successful!(nodes[0]); - check_added_monitors(&nodes[0], 1); + check_added_monitors(&nodes[0], if nodes[0].node.test_persistent_monitor_events_enabled() { 0 } else { 1 }); } else { - expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + expect_payment_sent!(&nodes[0], payment_preimage); } assert_eq!(sorted_vec(vec![sent_htlc_balance.clone(), sent_htlc_timeout_balance.clone()]), sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(chan_id).unwrap().get_claimable_balances())); @@ -1015,7 +1015,7 @@ fn do_test_balances_on_local_commitment_htlcs(keyed_anchors: bool, p2a_anchor: b // Now confirm nodes[1]'s HTLC claim, giving nodes[0] the preimage. Note that the "maybe // claimable" balance remains until we see ANTI_REORG_DELAY blocks. mine_transaction(&nodes[0], &bs_htlc_claim_txn[0]); - expect_payment_sent(&nodes[0], payment_preimage_2, None, true, true); + expect_payment_sent!(&nodes[0], payment_preimage_2); assert_eq!(sorted_vec(vec![Balance::ClaimableAwaitingConfirmations { amount_satoshis: 1_000_000 - 10_000 - 20_000 - commitment_tx_fee - anchor_outputs_value, confirmation_height: node_a_commitment_claimable, @@ -2097,7 +2097,7 @@ fn do_test_revoked_counterparty_aggregated_claims(keyed_anchors: bool, p2a_ancho as_revoked_txn[1].clone() }; mine_transaction(&nodes[1], &htlc_success_claim); - expect_payment_sent(&nodes[1], claimed_payment_preimage, None, true, true); + expect_payment_sent!(&nodes[1], claimed_payment_preimage); let mut claim_txn_2 = nodes[1].tx_broadcaster.txn_broadcast(); // Once B sees the HTLC-Success transaction it splits its claim transaction into two, though in @@ -3250,14 +3250,15 @@ fn test_event_replay_causing_monitor_replay() { // Now process the `PaymentSent` to get the final RAA `ChannelMonitorUpdate`, checking that it // resulted in a `ChannelManager` persistence request. nodes[0].node.get_and_clear_needs_persistence(); - expect_payment_sent(&nodes[0], payment_preimage, None, true, true /* expected post-event monitor update*/); + expect_payment_sent!(nodes[0], payment_preimage); assert!(nodes[0].node.get_and_clear_needs_persistence()); let serialized_monitor = get_monitor!(nodes[0], chan.2).encode(); reload_node!(nodes[0], &serialized_channel_manager, &[&serialized_monitor], persister, new_chain_monitor, node_deserialized); // Expect the `PaymentSent` to get replayed, this time without the duplicate monitor update - expect_payment_sent(&nodes[0], payment_preimage, None, false, false /* expected post-event monitor update*/); + let per_path_evs = if nodes[0].node.test_persistent_monitor_events_enabled() { true } else { false }; + expect_payment_sent(&nodes[0], payment_preimage, None, per_path_evs, false /* expected post-event monitor update*/); } #[test] @@ -3546,16 +3547,20 @@ fn do_test_lost_preimage_monitor_events(on_counterparty_tx: bool, p2a_anchor: bo // After the background events are processed in `get_and_clear_pending_events`, above, node B // will create the requisite `ChannelMontiorUpdate` for claiming the forwarded payment back. // The HTLC, however, is added to the holding cell for replay after the peer connects, below. - // It will also apply a `ChannelMonitorUpdate` to let the `ChannelMonitor` know that the - // payment can now be forgotten as the `PaymentSent` event was handled. - check_added_monitors(&nodes[1], 2); + if nodes[1].node.test_persistent_monitor_events_enabled() { + check_added_monitors(&nodes[1], 1); + } else { + // It will also apply a `ChannelMonitorUpdate` to let the `ChannelMonitor` know that the + // payment can now be forgotten as the `PaymentSent` event was handled. + check_added_monitors(&nodes[1], 2); + } nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id()); let mut reconnect_args = ReconnectArgs::new(&nodes[0], &nodes[1]); reconnect_args.pending_cell_htlc_claims = (1, 0); reconnect_nodes(reconnect_args); - expect_payment_sent(&nodes[0], preimage_a, None, true, true); + expect_payment_sent!(&nodes[0], preimage_a); } #[test] @@ -3594,6 +3599,9 @@ fn do_test_lost_timeout_monitor_events(confirm_tx: CommitmentType, dust_htlcs: b let mut cfg = test_default_channel_config(); cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; cfg.channel_handshake_config.negotiate_anchor_zero_fee_commitments = p2a_anchor; + // This test specifically tests lost monitor events, which requires the legacy + // (non-persistent) monitor event behavior. + cfg.override_persistent_monitor_events = Some(false); let cfgs = [Some(cfg.clone()), Some(cfg.clone()), Some(cfg.clone())]; let chanmon_cfgs = create_chanmon_cfgs(3); @@ -3803,27 +3811,83 @@ fn do_test_lost_timeout_monitor_events(confirm_tx: CommitmentType, dust_htlcs: b } #[test] -fn test_lost_timeout_monitor_events() { +fn test_lost_timeout_monitor_events_a() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_b() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_c() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_d() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_e() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_f() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_g() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_h() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_i() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_j() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, true, false); - +} +#[test] +fn test_lost_timeout_monitor_events_k() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_l() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_m() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_n() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_o() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_p() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_q() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_r() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_s() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_t() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, true, true); } @@ -3883,8 +3947,7 @@ fn test_ladder_preimage_htlc_claims() { connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1); connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1); - expect_payment_sent(&nodes[0], payment_preimage1, None, true, false); - check_added_monitors(&nodes[0], 1); + expect_payment_sent!(&nodes[0], payment_preimage1); nodes[1].node.claim_funds(payment_preimage2); expect_payment_claimed!(&nodes[1], payment_hash2, 1_000_000); @@ -3906,6 +3969,5 @@ fn test_ladder_preimage_htlc_claims() { connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1); connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1); - expect_payment_sent(&nodes[0], payment_preimage2, None, true, false); - check_added_monitors(&nodes[0], 1); + expect_payment_sent!(&nodes[0], payment_preimage2); } diff --git a/lightning/src/ln/outbound_payment.rs b/lightning/src/ln/outbound_payment.rs index 7259f60796f..f41a4f06459 100644 --- a/lightning/src/ln/outbound_payment.rs +++ b/lightning/src/ln/outbound_payment.rs @@ -2248,7 +2248,7 @@ impl OutboundPayments { #[rustfmt::skip] pub(super) fn claim_htlc( &self, payment_id: PaymentId, payment_preimage: PaymentPreimage, bolt12_invoice: Option, - session_priv: SecretKey, path: Path, from_onchain: bool, ev_completion_action: &mut Option, + session_priv: SecretKey, path: Path, from_onchain: bool, best_block_height: u32, ev_completion_action: &mut Option, pending_events: &Mutex)>>, logger: &WithContext, ) @@ -2256,6 +2256,16 @@ impl OutboundPayments { { let mut session_priv_bytes = [0; 32]; session_priv_bytes.copy_from_slice(&session_priv[..]); + + if from_onchain { + // If we are re-processing this claim from a `MonitorEvent` and the `ChannelManager` is + // outdated and has no idea about the payment, we may need to re-insert here to ensure a + // `PaymentSent` event gets generated. + self.insert_from_monitor_on_startup( + payment_id, payment_preimage.into(), session_priv_bytes, &path, best_block_height, logger + ); + } + let mut outbounds = self.pending_outbound_payments.lock().unwrap(); let mut pending_events = pending_events.lock().unwrap(); if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) { diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index 807d1a1af39..a9a9998b1b0 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -957,7 +957,7 @@ fn do_retry_with_no_persist(confirm_before_reload: bool) { assert_eq!(txn[0].compute_txid(), as_commitment_tx.compute_txid()); } mine_transaction(&nodes[0], &bs_htlc_claim_txn); - expect_payment_sent(&nodes[0], payment_preimage_1, None, true, true); + expect_payment_sent!(&nodes[0], payment_preimage_1); connect_blocks(&nodes[0], TEST_FINAL_CLTV * 4 + 20); let (first_htlc_timeout_tx, second_htlc_timeout_tx) = { let mut txn = nodes[0].tx_broadcaster.unique_txn_broadcast(); @@ -1368,7 +1368,7 @@ fn do_test_dup_htlc_onchain_doesnt_fail_on_reload( let conditions = PaymentFailedConditions::new().from_mon_update(); expect_payment_failed_conditions(&nodes[0], payment_hash, false, conditions); } else { - expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + expect_payment_sent!(&nodes[0], payment_preimage); } // Note that if we persist the monitor before processing the events, above, we'll always get // them replayed on restart no matter what @@ -1406,18 +1406,22 @@ fn do_test_dup_htlc_onchain_doesnt_fail_on_reload( } else { expect_payment_sent(&nodes[0], payment_preimage, None, true, false); } - if persist_manager_post_event { - // After reload, the ChannelManager identified the failed payment and queued up the - // PaymentSent (or not, if `persist_manager_post_event` resulted in us detecting we - // already did that) and corresponding ChannelMonitorUpdate to mark the payment - // handled, but while processing the pending `MonitorEvent`s (which were not processed - // before the monitor was persisted) we will end up with a duplicate - // ChannelMonitorUpdate. - check_added_monitors(&nodes[0], 2); + if !nodes[0].node.test_persistent_monitor_events_enabled() { + if persist_manager_post_event { + // After reload, the ChannelManager identified the failed payment and queued up the + // PaymentSent (or not, if `persist_manager_post_event` resulted in us detecting we + // already did that) and corresponding ChannelMonitorUpdate to mark the payment + // handled, but while processing the pending `MonitorEvent`s (which were not processed + // before the monitor was persisted) we will end up with a duplicate + // ChannelMonitorUpdate. + check_added_monitors(&nodes[0], 2); + } else { + // ...unless we got the PaymentSent event, in which case we have de-duplication logic + // preventing a redundant monitor event. + check_added_monitors(&nodes[0], 1); + } } else { - // ...unless we got the PaymentSent event, in which case we have de-duplication logic - // preventing a redundant monitor event. - check_added_monitors(&nodes[0], 1); + check_added_monitors(&nodes[0], 0); } } @@ -1430,15 +1434,39 @@ fn do_test_dup_htlc_onchain_doesnt_fail_on_reload( } #[test] -fn test_dup_htlc_onchain_doesnt_fail_on_reload() { +fn test_dup_htlc_onchain_doesnt_fail_on_reload_a() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_b() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_c() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, false, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_d() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_e() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_f() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, false, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_g() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_h() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_i() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, false, false); } @@ -2381,7 +2409,11 @@ fn do_test_intercepted_payment(test: InterceptTest) { }, _ => panic!("Unexpected event"), } - check_added_monitors(&nodes[0], 1); + if nodes[0].node.test_persistent_monitor_events_enabled() { + check_added_monitors(&nodes[0], 0); + } else { + check_added_monitors(&nodes[0], 1); + } } else if test == InterceptTest::Timeout { let mut block = create_dummy_block(nodes[0].best_block_hash(), 42, Vec::new()); connect_block(&nodes[0], &block); @@ -2586,7 +2618,7 @@ fn do_accept_underpaying_htlcs_config(num_mpp_parts: usize) { let total_fee_msat = pass_claimed_payment_along_route(args); // The sender doesn't know that the penultimate hop took an extra fee. let amt = total_fee_msat - skimmed_fee_msat * num_mpp_parts as u64; - expect_payment_sent(&nodes[0], payment_preimage, Some(Some(amt)), true, true); + expect_payment_sent!(nodes[0], payment_preimage, Some(amt)); } #[derive(PartialEq)] @@ -4170,10 +4202,14 @@ fn do_no_missing_sent_on_reload(persist_manager_with_payment: bool, at_midpoint: check_added_monitors(&nodes[0], 0); nodes[0].node.test_process_background_events(); check_added_monitors(&nodes[0], 1); - // Then once we process the PaymentSent event we'll apply a monitor update to remove the - // pending payment from being re-hydrated on the next startup. let events = nodes[0].node.get_and_clear_pending_events(); - check_added_monitors(&nodes[0], 1); + if nodes[0].node.test_persistent_monitor_events_enabled() { + check_added_monitors(&nodes[0], 0); + } else { + // Once we process the PaymentSent event we'll apply a monitor update to remove the + // pending payment from being re-hydrated on the next startup. + check_added_monitors(&nodes[0], 1); + } assert_eq!(events.len(), 3, "{events:?}"); if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[0] { } else { @@ -4767,7 +4803,7 @@ fn do_test_custom_tlvs_consistency( ClaimAlongRouteArgs::new(&nodes[0], &[path_a, &[&nodes[2], &nodes[3]]], preimage) .with_custom_tlvs(expected_tlvs), ); - expect_payment_sent(&nodes[0], preimage, Some(Some(2000)), true, true); + expect_payment_sent!(nodes[0], preimage, Some(2000)); } else { // Expect fail back let expected_destinations = [HTLCHandlingFailureType::Receive { payment_hash: hash }]; @@ -5565,7 +5601,10 @@ fn do_bolt11_multi_node_mpp(use_bolt11_pay: bool) { } let payment_sent = nodes[0].node.get_and_clear_pending_events(); - check_added_monitors(&nodes[0], 1); + check_added_monitors( + &nodes[0], + if nodes[0].node.test_persistent_monitor_events_enabled() { 0 } else { 1 }, + ); assert_eq!(payment_sent.len(), 2, "{payment_sent:?}"); if let Event::PaymentSent { payment_id, payment_hash, amount_msat, fee_paid_msat, .. } = @@ -5594,7 +5633,10 @@ fn do_bolt11_multi_node_mpp(use_bolt11_pay: bool) { } let payment_sent = nodes[1].node.get_and_clear_pending_events(); - check_added_monitors(&nodes[1], 1); + check_added_monitors( + &nodes[1], + if nodes[1].node.test_persistent_monitor_events_enabled() { 0 } else { 1 }, + ); assert_eq!(payment_sent.len(), 2, "{payment_sent:?}"); if let Event::PaymentSent { payment_id, payment_hash, amount_msat, fee_paid_msat, .. } = @@ -5850,7 +5892,10 @@ fn bolt11_multi_node_mpp_with_retry() { } let payment_sent_a = nodes[0].node.get_and_clear_pending_events(); - check_added_monitors(&nodes[0], 1); + check_added_monitors( + &nodes[0], + if nodes[0].node.test_persistent_monitor_events_enabled() { 0 } else { 1 }, + ); assert_eq!(payment_sent_a.len(), 2, "{payment_sent_a:?}"); if let Event::PaymentSent { payment_id, payment_hash, amount_msat, .. } = &payment_sent_a[0] { @@ -5875,7 +5920,10 @@ fn bolt11_multi_node_mpp_with_retry() { } let payment_sent_b = nodes[1].node.get_and_clear_pending_events(); - check_added_monitors(&nodes[1], 1); + check_added_monitors( + &nodes[1], + if nodes[1].node.test_persistent_monitor_events_enabled() { 0 } else { 1 }, + ); assert_eq!(payment_sent_b.len(), 2, "{payment_sent_b:?}"); if let Event::PaymentSent { payment_id, payment_hash, amount_msat, .. } = &payment_sent_b[0] { diff --git a/lightning/src/ln/quiescence_tests.rs b/lightning/src/ln/quiescence_tests.rs index 495a1622522..aa587e4fa8c 100644 --- a/lightning/src/ln/quiescence_tests.rs +++ b/lightning/src/ln/quiescence_tests.rs @@ -259,7 +259,7 @@ fn test_quiescence_waits_for_async_signer_and_monitor_update() { assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); // We have two updates pending: - { + if !nodes[0].node.test_persistent_monitor_events_enabled() { let test_chain_mon = &nodes[0].chain_monitor; let (_, latest_update) = test_chain_mon.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone(); @@ -280,6 +280,27 @@ fn test_quiescence_waits_for_async_signer_and_monitor_update() { } else { panic!("{events:?}"); } + } else { + // In the persistent monitor events path, we don't block the RAA monitor update, so + // `expect_post_ev_mon_update` is false here + expect_payment_sent(&nodes[0], preimage, None, false, false); + // The latest commitment transaction update from the the `revoke_and_ack` was previously + // blocked via `InProgress`, so update that here, which will unblock the commitment secret + // update from the RAA + let test_chain_mon = &nodes[0].chain_monitor; + let (_, latest_update) = + test_chain_mon.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone(); + let chain_monitor = &nodes[0].chain_monitor.chain_monitor; + // Complete the held update, which releases the commitment secret update, which releases the + // `PaymentPathSuccessful` event + chain_monitor.channel_monitor_updated(chan_id, latest_update).unwrap(); + check_added_monitors(&nodes[0], 1); + let events = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + if let Event::PaymentPathSuccessful { .. } = &events[0] { + } else { + panic!("{events:?}"); + } } // With the updates completed, we can now become quiescent. @@ -438,7 +459,10 @@ fn quiescence_updates_go_to_holding_cell(fail_htlc: bool) { } else { assert!(events.iter().find(|e| matches!(e, Event::PaymentSent { .. })).is_some()); assert!(events.iter().find(|e| matches!(e, Event::PaymentPathSuccessful { .. })).is_some()); - check_added_monitors(&nodes[0], 1); + check_added_monitors( + &nodes[0], + if nodes[0].node.test_persistent_monitor_events_enabled() { 0 } else { 1 }, + ); } nodes[0].node.process_pending_htlc_forwards(); expect_payment_claimable!(nodes[0], payment_hash1, payment_secret1, payment_amount); @@ -467,7 +491,7 @@ fn quiescence_updates_go_to_holding_cell(fail_htlc: bool) { let conditions = PaymentFailedConditions::new(); expect_payment_failed_conditions(&nodes[1], payment_hash1, true, conditions); } else { - expect_payment_sent(&nodes[1], payment_preimage1, None, true, true); + expect_payment_sent!(&nodes[1], payment_preimage1); } } @@ -673,6 +697,9 @@ fn do_test_quiescence_during_disconnection(with_pending_claim: bool, propose_dis assert_eq!(bs_raa_stfu.len(), 2); if let MessageSendEvent::SendRevokeAndACK { msg, .. } = &bs_raa_stfu[0] { nodes[0].node.handle_revoke_and_ack(node_b_id, &msg); + if nodes[0].node.test_persistent_monitor_events_enabled() { + check_added_monitors(&nodes[0], 1); + } expect_payment_sent!(&nodes[0], preimage); } else { panic!("Unexpected first message {bs_raa_stfu:?}"); diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index 9e992467ecd..16ae8380d26 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -1296,7 +1296,7 @@ fn do_manager_persisted_pre_outbound_edge_forward(intercept_htlc: bool) { expect_payment_claimable!(nodes[2], payment_hash, payment_secret, amt_msat, None, nodes[2].node.get_our_node_id()); let path: &[&[_]] = &[&[&nodes[1], &nodes[2]]]; do_claim_payment_along_route(ClaimAlongRouteArgs::new(&nodes[0], path, payment_preimage)); - expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + expect_payment_sent!(&nodes[0], payment_preimage); } #[test] @@ -1354,7 +1354,7 @@ fn test_manager_persisted_post_outbound_edge_forward() { expect_payment_claimable!(nodes[2], payment_hash, payment_secret, amt_msat, None, nodes[2].node.get_our_node_id()); let path: &[&[_]] = &[&[&nodes[1], &nodes[2]]]; do_claim_payment_along_route(ClaimAlongRouteArgs::new(&nodes[0], path, payment_preimage)); - expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + expect_payment_sent!(nodes[0], payment_preimage); } #[test] @@ -1458,7 +1458,7 @@ fn test_manager_persisted_post_outbound_edge_holding_cell() { // Claim the a->b->c payment on node_c. let path: &[&[_]] = &[&[&nodes[1], &nodes[2]]]; do_claim_payment_along_route(ClaimAlongRouteArgs::new(&nodes[0], path, payment_preimage)); - expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + expect_payment_sent!(&nodes[0], payment_preimage); // Claim the c->b payment on node_b. nodes[1].node.claim_funds(payment_preimage_2); @@ -1467,7 +1467,7 @@ fn test_manager_persisted_post_outbound_edge_holding_cell() { let mut update = get_htlc_update_msgs(&nodes[1], &nodes[2].node.get_our_node_id()); nodes[2].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), update.update_fulfill_htlcs.remove(0)); do_commitment_signed_dance(&nodes[2], &nodes[1], &update.commitment_signed, false, false); - expect_payment_sent(&nodes[2], payment_preimage_2, None, true, true); + expect_payment_sent!(&nodes[2], payment_preimage_2); } #[test] @@ -1879,7 +1879,7 @@ fn outbound_removed_holding_cell_resolved_no_double_forward() { reconnect_nodes(reconnect_args); // nodes[0] should now have received the fulfill and generate PaymentSent. - expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + expect_payment_sent!(&nodes[0], payment_preimage); } #[test] @@ -1983,7 +1983,7 @@ fn test_reload_node_with_preimage_in_monitor_claims_htlc() { reconnect_nodes(reconnect_args); // nodes[0] should now have received the fulfill and generate PaymentSent. - expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + expect_payment_sent!(&nodes[0], payment_preimage); } #[test] @@ -2244,5 +2244,5 @@ fn test_reload_with_mpp_claims_on_same_channel() { reconnect_nodes(reconnect_args); // nodes[0] should now have received both fulfills and generate PaymentSent. - expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + expect_payment_sent!(&nodes[0], payment_preimage); } diff --git a/lightning/src/ln/reorg_tests.rs b/lightning/src/ln/reorg_tests.rs index 5b5160148d7..b99c5aa936a 100644 --- a/lightning/src/ln/reorg_tests.rs +++ b/lightning/src/ln/reorg_tests.rs @@ -254,7 +254,7 @@ fn test_counterparty_revoked_reorg() { // Connect the HTLC claim transaction for HTLC 3 mine_transaction(&nodes[1], &unrevoked_local_txn[2]); - expect_payment_sent(&nodes[1], payment_preimage_3, None, true, true); + expect_payment_sent!(&nodes[1], payment_preimage_3); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); // Connect blocks to confirm the unrevoked commitment transaction @@ -1228,7 +1228,10 @@ fn do_test_split_htlc_expiry_tracking(use_third_htlc: bool, reorg_out: bool, p2a check_added_monitors(&nodes[0], 0); let sent_events = nodes[0].node.get_and_clear_pending_events(); - check_added_monitors(&nodes[0], 2); + check_added_monitors( + &nodes[0], + if nodes[0].node.test_persistent_monitor_events_enabled() { 0 } else { 2 }, + ); assert_eq!(sent_events.len(), 4, "{sent_events:?}"); let mut found_expected_events = [false, false, false, false]; for event in sent_events { diff --git a/lightning/src/ln/shutdown_tests.rs b/lightning/src/ln/shutdown_tests.rs index d70b240e4e4..5363b9b3b82 100644 --- a/lightning/src/ln/shutdown_tests.rs +++ b/lightning/src/ln/shutdown_tests.rs @@ -1864,7 +1864,10 @@ fn test_pending_htlcs_arent_lost_on_mon_delay() { let chanmon_cfgs = create_chanmon_cfgs(3); let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); - let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let mut cfg = test_default_channel_config(); + // When persistent_monitor_events is enabled, monitor updates will never be blocked. + cfg.override_persistent_monitor_events = Some(false); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, Some(cfg), None]); let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); let node_a_id = nodes[0].node.get_our_node_id(); diff --git a/lightning/src/ln/splicing_tests.rs b/lightning/src/ln/splicing_tests.rs index 9adccd17627..e92cd245045 100644 --- a/lightning/src/ln/splicing_tests.rs +++ b/lightning/src/ln/splicing_tests.rs @@ -1984,7 +1984,12 @@ fn do_test_splice_commitment_broadcast(splice_status: SpliceStatus, claim_htlcs: 2 ); } - check_added_monitors(&nodes[0], 2); // Two `ReleasePaymentComplete` monitor updates + let expected_mons = if nodes[0].node.test_persistent_monitor_events_enabled() && claim_htlcs { + 0 + } else { + 2 // Two `ReleasePaymentComplete` monitor updates + }; + check_added_monitors(&nodes[0], expected_mons); // When the splice never confirms and we see a commitment transaction broadcast and confirm for // the current funding instead, we should expect to see an `Event::DiscardFunding` for the diff --git a/lightning/src/util/config.rs b/lightning/src/util/config.rs index ebef8c27bca..b6b70554b3f 100644 --- a/lightning/src/util/config.rs +++ b/lightning/src/util/config.rs @@ -1132,6 +1132,10 @@ pub struct UserConfig { /// /// [`ChannelManager::splice_channel`]: crate::ln::channelmanager::ChannelManager::splice_channel pub reject_inbound_splices: bool, + /// If set to `Some`, overrides the random selection of whether to use persistent monitor + /// events. Only available in tests. + #[cfg(any(test, feature = "_test_utils"))] + pub override_persistent_monitor_events: Option, } impl Default for UserConfig { @@ -1148,6 +1152,8 @@ impl Default for UserConfig { enable_htlc_hold: false, hold_outbound_htlcs_at_next_hop: false, reject_inbound_splices: true, + #[cfg(any(test, feature = "_test_utils"))] + override_persistent_monitor_events: None, } } } @@ -1170,6 +1176,8 @@ impl Readable for UserConfig { hold_outbound_htlcs_at_next_hop: Readable::read(reader)?, enable_htlc_hold: Readable::read(reader)?, reject_inbound_splices: Readable::read(reader)?, + #[cfg(any(test, feature = "_test_utils"))] + override_persistent_monitor_events: None, }) } } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 57f9ba6b22f..f4eb6287921 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -15,7 +15,7 @@ use crate::chain::chaininterface; #[cfg(any(test, feature = "_externalize_tests"))] use crate::chain::chaininterface::FEERATE_FLOOR_SATS_PER_KW; use crate::chain::chaininterface::{ConfirmationTarget, TransactionType}; -use crate::chain::chainmonitor::{ChainMonitor, Persist}; +use crate::chain::chainmonitor::{ChainMonitor, MonitorEventSource, Persist}; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, }; @@ -521,6 +521,8 @@ pub struct TestChainMonitor<'a> { /// deferred operations. This allows tests to control exactly when queued monitor updates /// are applied to the in-memory monitor. pub pause_flush: AtomicBool, + /// Buffer of the last 20 monitor updates, most recent first. + pub recent_monitor_updates: Mutex>, } impl<'a> TestChainMonitor<'a> { pub fn new( @@ -581,6 +583,7 @@ impl<'a> TestChainMonitor<'a> { #[cfg(feature = "std")] write_blocker: Mutex::new(None), pause_flush: AtomicBool::new(false), + recent_monitor_updates: Mutex::new(Vec::new()), } } @@ -649,6 +652,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { ) .unwrap() .1; + new_monitor.copy_monitor_event_state(&monitor); assert!(new_monitor == monitor); self.latest_monitor_update_id .lock() @@ -678,6 +682,12 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { .or_insert(Vec::new()) .push(update.clone()); + { + let mut recent = self.recent_monitor_updates.lock().unwrap(); + recent.insert(0, (channel_id, update.clone())); + recent.truncate(20); + } + if let Some(exp) = self.expect_channel_force_closed.lock().unwrap().take() { assert_eq!(channel_id, exp.0); assert_eq!(update.updates.len(), 1); @@ -710,6 +720,9 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { // it so it doesn't leak into the rest of the test. let failed_back = monitor.inner.lock().unwrap().failed_back_htlc_ids.clone(); new_monitor.inner.lock().unwrap().failed_back_htlc_ids = failed_back; + // The deserialized monitor will reset the monitor event state, so copy it from the live + // monitor before comparing. + new_monitor.copy_monitor_event_state(&monitor); if let Some(chan_id) = self.expect_monitor_round_trip_fail.lock().unwrap().take() { assert_eq!(chan_id, channel_id); assert!(new_monitor != *monitor); @@ -723,7 +736,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { // Auto-flush pending operations so that the ChannelManager can pick up monitor // completion events. When not in deferred mode the queue is empty so this only // costs a lock acquisition. It ensures standard test helpers (route_payment, etc.) @@ -734,6 +747,10 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { } return self.chain_monitor.release_pending_monitor_events(); } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.chain_monitor.ack_monitor_event(source); + } } #[cfg(any(test, feature = "_externalize_tests"))]