diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 6bd031379..9f7b1ead3 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -361,6 +361,9 @@ enum NodeError { "InvalidBlindedPaths", "AsyncPaymentServicesDisabled", "HrnParsingFailed", + "LiquiditySetWebhookFailed", + "LiquidityRemoveWebhookFailed", + "LiquidityListWebhooksFailed" }; dictionary NodeStatus { diff --git a/src/builder.rs b/src/builder.rs index 5d8a5a7a9..ea26a526e 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -66,7 +66,8 @@ use crate::io::{ PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; use crate::liquidity::{ - LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder, + LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LSPS5ClientConfig, + LiquiditySourceBuilder, }; use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; @@ -76,8 +77,8 @@ use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, - KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, - Persister, SyncAndAsyncKVStore, + KeysManager, LSPS5ServiceConfig, MessageRouter, OnionMessenger, PaymentStore, PeerManager, + PendingPaymentStore, Persister, SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -125,6 +126,10 @@ struct LiquiditySourceConfig { lsps2_client: Option, // Act as an LSPS2 service. lsps2_service: Option, + // Act as an LSPS5 client connecting to the given service. + lsps5_client: Option, + // Act as an LSPS5 service. + lsps5_service: Option, } #[derive(Clone)] @@ -450,6 +455,36 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to source webhook notifications from the given + /// [bLIP-55 / LSPS5] service. + /// + /// This allows the client to register webhook endpoints with the LSP to receive + /// push notifications for Lightning events when the client is offline. + /// + /// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + pub fn set_liquidity_source_lsps5( + &mut self, node_id: PublicKey, address: SocketAddress, + ) -> &mut Self { + let liquidity_source_config = + self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); + let lsps5_client_config = LSPS5ClientConfig { node_id, address }; + liquidity_source_config.lsps5_client = Some(lsps5_client_config); + self + } + + /// Configures the [`Node`] instance to provide an [LSPS5] service, enabling clients + /// to register webhooks for push notifications. + /// + /// [LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + pub fn set_liquidity_provider_lsps5( + &mut self, service_config: LSPS5ServiceConfig, + ) -> &mut Self { + let liquidity_source_config = + self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); + liquidity_source_config.lsps5_service = Some(service_config); + self + } + /// Sets the used storage directory path. pub fn set_storage_dir_path(&mut self, storage_dir_path: String) -> &mut Self { self.config.storage_dir_path = storage_dir_path; @@ -851,6 +886,25 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_liquidity_provider_lsps2(service_config); } + /// Configures the [`Node`] instance to source webhook notifications from the given + /// [bLIP-55 / LSPS5] service. + /// + /// This allows the client to register webhook endpoints with the LSP to receive + /// push notifications for Lightning events when the client is offline. + /// + /// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + pub fn set_liquidity_source_lsps5(&self, node_id: PublicKey, address: SocketAddress) { + self.inner.write().unwrap().set_liquidity_source_lsps5(node_id, address); + } + + /// Configures the [`Node`] instance to provide an [LSPS5] service, enabling clients + /// to register webhooks for push notifications. + /// + /// [LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + pub fn set_liquidity_provider_lsps5(&self, service_config: LSPS5ServiceConfig) { + self.inner.write().unwrap().set_liquidity_provider_lsps5(service_config); + } + /// Sets the used storage directory path. pub fn set_storage_dir_path(&self, storage_dir_path: String) { self.inner.write().unwrap().set_storage_dir_path(storage_dir_path); @@ -1627,6 +1681,14 @@ fn build_with_store_internal( liquidity_source_builder.lsps2_service(promise_secret, config.clone()) }); + lsc.lsps5_client.as_ref().map(|config| { + liquidity_source_builder.lsps5_client(config.node_id, config.address.clone()) + }); + + lsc.lsps5_service + .as_ref() + .map(|config| liquidity_source_builder.lsps5_service(config.clone())); + let liquidity_source = runtime .block_on(async move { liquidity_source_builder.build().await.map(Arc::new) })?; let custom_message_handler = diff --git a/src/error.rs b/src/error.rs index ea0bcca3b..0c3ecc785 100644 --- a/src/error.rs +++ b/src/error.rs @@ -131,6 +131,12 @@ pub enum Error { AsyncPaymentServicesDisabled, /// Parsing a Human-Readable Name has failed. HrnParsingFailed, + /// Failed to set a webhook with the LSP. + LiquiditySetWebhookFailed, + /// Failed to remove a webhook with the LSP. + LiquidityRemoveWebhookFailed, + /// Failed to list webhooks with the LSP. + LiquidityListWebhooksFailed, } impl fmt::Display for Error { @@ -213,6 +219,15 @@ impl fmt::Display for Error { Self::HrnParsingFailed => { write!(f, "Failed to parse a human-readable name.") }, + Self::LiquiditySetWebhookFailed => { + write!(f, "Failed to set a webhook with the LSP.") + }, + Self::LiquidityRemoveWebhookFailed => { + write!(f, "Failed to remove a webhook with the LSP.") + }, + Self::LiquidityListWebhooksFailed => { + write!(f, "Failed to list webhooks with the LSP.") + }, } } } diff --git a/src/lib.rs b/src/lib.rs index d2222d949..3412644a9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -148,7 +148,7 @@ use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; use lightning::util::persist::KVStoreSync; use lightning_background_processor::process_events_async; -use liquidity::{LSPS1Liquidity, LiquiditySource}; +use liquidity::{LSPS1Liquidity, LSPS5Liquidity, LiquiditySource}; use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use payment::asynchronous::om_mailbox::OnionMessageMailbox; use payment::asynchronous::static_invoice_store::StaticInvoiceStore; @@ -1032,6 +1032,32 @@ impl Node { )) } + /// Returns a liquidity handler allowing to handle webhooks and notifications via the [bLIP-55 / LSPS5] protocol. + /// + /// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + #[cfg(not(feature = "uniffi"))] + pub fn lsps5_liquidity(&self) -> LSPS5Liquidity { + LSPS5Liquidity::new( + Arc::clone(&self.runtime), + Arc::clone(&self.connection_manager), + self.liquidity_source.clone(), + Arc::clone(&self.logger), + ) + } + + /// Returns a liquidity handler allowing to handle webhooks and notifications via the [bLIP-55 / LSPS5] protocol. + /// + /// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + #[cfg(feature = "uniffi")] + pub fn lsps5_liquidity(&self) -> Arc { + Arc::new(LSPS5Liquidity::new( + Arc::clone(&self.runtime), + Arc::clone(&self.connection_manager), + self.liquidity_source.clone(), + Arc::clone(&self.logger), + )) + } + /// Retrieve a list of known channels. pub fn list_channels(&self) -> Vec { self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect() diff --git a/src/liquidity.rs b/src/liquidity.rs index 2151110b6..18404f76a 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -34,6 +34,12 @@ use lightning_liquidity::lsps2::event::{LSPS2ClientEvent, LSPS2ServiceEvent}; use lightning_liquidity::lsps2::msgs::{LSPS2OpeningFeeParams, LSPS2RawOpeningFeeParams}; use lightning_liquidity::lsps2::service::LSPS2ServiceConfig as LdkLSPS2ServiceConfig; use lightning_liquidity::lsps2::utils::compute_opening_fee; +use lightning_liquidity::lsps5::client::LSPS5ClientConfig as LdkLSPS5ClientConfig; +use lightning_liquidity::lsps5::event::{LSPS5ClientEvent, LSPS5ServiceEvent}; +use lightning_liquidity::lsps5::msgs::{ + LSPS5Error, ListWebhooksResponse, RemoveWebhookResponse, SetWebhookResponse, +}; +use lightning_liquidity::lsps5::service::LSPS5ServiceConfig as LdkLSPS5ServiceConfig; use lightning_liquidity::{LiquidityClientConfig, LiquidityServiceConfig}; use lightning_types::payment::PaymentHash; use rand::Rng; @@ -54,6 +60,29 @@ const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5; const LSPS2_GETINFO_REQUEST_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24); const LSPS2_CHANNEL_CLTV_EXPIRY_DELTA: u32 = 72; +/// Error type for HTTP client operations. +#[derive(Debug)] +pub enum HttpClientError { + /// Network or connection error. + Network(String), + /// HTTP status error. + Status(u16), + /// Other error. + Other(String), +} + +impl std::fmt::Display for HttpClientError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + HttpClientError::Network(msg) => write!(f, "Network error: {}", msg), + HttpClientError::Status(code) => write!(f, "HTTP error: {}", code), + HttpClientError::Other(msg) => write!(f, "HTTP client error: {}", msg), + } + } +} + +impl std::error::Error for HttpClientError {} + struct LSPS1Client { lsp_node_id: PublicKey, lsp_address: SocketAddress, @@ -145,6 +174,29 @@ pub struct LSPS2ServiceConfig { pub client_trusts_lsp: bool, } +struct LSPS5Client { + lsp_node_id: PublicKey, + lsp_address: SocketAddress, + ldk_client_config: LdkLSPS5ClientConfig, + pending_set_webhook_requests: + Mutex>>>, + pending_list_webhooks_requests: + Mutex>>>, + pending_remove_webhook_requests: + Mutex>>>, +} + +#[derive(Debug, Clone)] +pub(crate) struct LSPS5ClientConfig { + pub node_id: PublicKey, + pub address: SocketAddress, +} + +struct LSPS5Service { + ldk_service_config: LdkLSPS5ServiceConfig, + // http_client: reqwest::Client, +} + pub(crate) struct LiquiditySourceBuilder where L::Target: LdkLogger, @@ -152,6 +204,8 @@ where lsps1_client: Option, lsps2_client: Option, lsps2_service: Option, + lsps5_client: Option, + lsps5_service: Option, wallet: Arc, channel_manager: Arc, keys_manager: Arc, @@ -174,10 +228,14 @@ where let lsps1_client = None; let lsps2_client = None; let lsps2_service = None; + let lsps5_client = None; + let lsps5_service = None; Self { lsps1_client, lsps2_client, lsps2_service, + lsps5_client, + lsps5_service, wallet, channel_manager, keys_manager, @@ -234,17 +292,56 @@ where self } - pub(crate) async fn build(self) -> Result, BuildError> { - let liquidity_service_config = self.lsps2_service.as_ref().map(|s| { - let lsps2_service_config = Some(s.ldk_service_config.clone()); - let lsps5_service_config = None; - let advertise_service = s.service_config.advertise_service; - LiquidityServiceConfig { lsps2_service_config, lsps5_service_config, advertise_service } + pub(crate) fn lsps5_client( + &mut self, lsp_node_id: PublicKey, lsp_address: SocketAddress, + ) -> &mut Self { + let ldk_client_config = LdkLSPS5ClientConfig {}; + + let pending_set_webhook_requests = Mutex::new(HashMap::new()); + let pending_list_webhooks_requests = Mutex::new(HashMap::new()); + let pending_remove_webhook_requests = Mutex::new(HashMap::new()); + + self.lsps5_client = Some(LSPS5Client { + ldk_client_config, + lsp_node_id, + lsp_address, + pending_set_webhook_requests, + pending_list_webhooks_requests, + pending_remove_webhook_requests, }); + self + } + + pub(crate) fn lsps5_service(&mut self, service_config: LdkLSPS5ServiceConfig) -> &mut Self { + self.lsps5_service = Some(LSPS5Service { ldk_service_config: service_config }); + self + } + + pub(crate) async fn build(self) -> Result, BuildError> { + let lsps2_service_config = + self.lsps2_service.as_ref().map(|s| s.ldk_service_config.clone()); + let lsps5_service_config = + self.lsps5_service.as_ref().map(|s| s.ldk_service_config.clone()); + let advertise_service = self + .lsps2_service + .as_ref() + .map(|s| s.service_config.advertise_service) + .unwrap_or(false); + + let liquidity_service_config = + if lsps2_service_config.is_some() || lsps5_service_config.is_some() { + Some(LiquidityServiceConfig { + lsps2_service_config, + lsps5_service_config, + advertise_service, + }) + } else { + None + }; let lsps1_client_config = self.lsps1_client.as_ref().map(|s| s.ldk_client_config.clone()); let lsps2_client_config = self.lsps2_client.as_ref().map(|s| s.ldk_client_config.clone()); - let lsps5_client_config = None; + let lsps5_client_config = self.lsps5_client.as_ref().map(|s| s.ldk_client_config.clone()); let liquidity_client_config = Some(LiquidityClientConfig { lsps1_client_config, lsps2_client_config, @@ -271,6 +368,7 @@ where lsps1_client: self.lsps1_client, lsps2_client: self.lsps2_client, lsps2_service: self.lsps2_service, + lsps5_client: self.lsps5_client, wallet: self.wallet, channel_manager: self.channel_manager, peer_manager: RwLock::new(None), @@ -289,6 +387,7 @@ where lsps1_client: Option, lsps2_client: Option, lsps2_service: Option, + lsps5_client: Option, wallet: Arc, channel_manager: Arc, peer_manager: RwLock>>, @@ -318,6 +417,10 @@ where self.lsps2_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone())) } + pub(crate) fn get_lsps5_lsp_details(&self) -> Option<(PublicKey, SocketAddress)> { + self.lsps5_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone())) + } + pub(crate) fn lsps2_channel_needs_manual_broadcast( &self, counterparty_node_id: PublicKey, user_channel_id: u128, ) -> bool { @@ -910,6 +1013,340 @@ where ); } }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhookRegistered { + request_id, + counterparty_node_id, + num_webhooks, + max_webhooks, + no_change, + .. + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + let response = Ok(SetWebhookResponse { num_webhooks, max_webhooks, no_change }); + + match lsps5_client + .pending_set_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + if sender.send(response).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhookRegistered event!" + ); + } + }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhookRegistrationFailed { + request_id, + counterparty_node_id, + error, + app_name, + url, + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + match lsps5_client + .pending_set_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + log_error!( + self.logger, + "Webhook registration failed for app '{}' with url '{}': {:?}", + app_name, + url, + error + ); + if sender.send(Err(error)).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhookRegistrationFailed event!" + ); + } + }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhooksListed { + request_id, + counterparty_node_id, + app_names, + max_webhooks, + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + let response = + Ok(ListWebhooksResponse { app_names: app_names.clone(), max_webhooks }); + + match lsps5_client + .pending_list_webhooks_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + if sender.send(response).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhooksListed event!" + ); + } + }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhookRemoved { + request_id, + counterparty_node_id, + .. + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + match lsps5_client + .pending_remove_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + if sender.send(Ok(RemoveWebhookResponse {})).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhookRemoved event!" + ); + } + }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhookRemovalFailed { + request_id, + counterparty_node_id, + error, + app_name, + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + match lsps5_client + .pending_remove_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + log_error!( + self.logger, + "Webhook removal failed for app '{}': {:?}", + app_name, + error + ); + if sender.send(Err(error)).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhookRemovalFailed event!" + ); + } + }, + LiquidityEvent::LSPS5Service(LSPS5ServiceEvent::SendWebhookNotification { + counterparty_node_id: _, + app_name, + url, + notification, + headers, + }) => { + if self.liquidity_manager.lsps5_service_handler().is_some() { + log_info!( + self.logger, + "Sending webhook notification for {} to {}: {:?}", + app_name, + url, + notification + ); + + let notification_str = serde_json::to_string(¬ification) + .unwrap_or_else(|_| format!("{:?}", notification)); + + let result = bitreq::post(url.as_str()) + .with_headers(headers) + .with_body(notification_str) + .send_async() + .await; + + match result { + Ok(response) => { + if response.status_code != 200 { + log_error!( + self.logger, + "Webhook call failed with status {} for {} to {}", + response.status_code, + app_name, + url + ); + } + }, + Err(e) => { + log_error!( + self.logger, + "Failed to send webhook notification for {} to {}: {}", + app_name, + url, + e + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5ServiceEvent::SendWebhookNotification event!" + ); + } + }, e => { log_error!(self.logger, "Received unexpected liquidity event: {:?}", e); }, @@ -1336,6 +1773,184 @@ where }) } + pub(crate) async fn lsps5_set_webhook( + &self, app_name: String, webhook_url: String, + ) -> Result { + let lsps5_client = self.lsps5_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps5_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS5 liquidity client was not configured."); + Error::LiquiditySourceUnavailable + })?; + + let (sender, receiver) = oneshot::channel(); + let request_id = match client_handler.set_webhook( + lsps5_client.lsp_node_id, + app_name.clone(), + webhook_url.clone(), + ) { + Ok(request_id) => request_id, + Err(e) => { + log_error!( + self.logger, + "Failed to send set webhook request to liquidity service: {:?}", + e + ); + return Err(Error::LiquiditySetWebhookFailed); + }, + }; + + lsps5_client + .pending_set_webhook_requests + .lock() + .unwrap() + .insert(request_id.clone(), sender); + + match tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + { + Ok(Ok(result)) => result.map_err(|e| { + log_error!(self.logger, "Failed to set webhook: {:?}", e); + Error::LiquiditySetWebhookFailed + }), + Ok(Err(e)) => { + log_error!( + self.logger, + "Failed to handle response from liquidity service: {:?}", + e + ); + Err(Error::LiquidityRequestFailed) + }, + Err(e) => { + lsps5_client.pending_set_webhook_requests.lock().unwrap().remove(&request_id); + log_error!(self.logger, "Liquidity request timed out: {}", e); + Err(Error::LiquidityRequestFailed) + }, + } + } + + pub(crate) async fn lsps5_list_webhooks(&self) -> Result { + let lsps5_client = self.lsps5_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps5_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS5 liquidity client was not configured."); + Error::LiquiditySourceUnavailable + })?; + + let (sender, receiver) = oneshot::channel(); + let request_id = client_handler.list_webhooks(lsps5_client.lsp_node_id); + lsps5_client + .pending_list_webhooks_requests + .lock() + .unwrap() + .insert(request_id.clone(), sender); + + match tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + { + Ok(Ok(result)) => result.map_err(|e| { + log_error!(self.logger, "Failed to list webhooks: {:?}", e); + Error::LiquidityListWebhooksFailed + }), + Ok(Err(e)) => { + log_error!( + self.logger, + "Failed to handle response from liquidity service: {:?}", + e + ); + Err(Error::LiquidityRequestFailed) + }, + Err(e) => { + lsps5_client.pending_list_webhooks_requests.lock().unwrap().remove(&request_id); + log_error!(self.logger, "Liquidity request timed out: {}", e); + Err(Error::LiquidityRequestFailed) + }, + } + } + + pub(crate) async fn lsps5_remove_webhook( + &self, app_name: String, + ) -> Result { + let lsps5_client = self.lsps5_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps5_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS5 liquidity client was not configured."); + Error::LiquiditySourceUnavailable + })?; + + let (sender, receiver) = oneshot::channel(); + let request_id = + match client_handler.remove_webhook(lsps5_client.lsp_node_id, app_name.clone()) { + Ok(request_id) => request_id, + Err(_) => return Err(Error::LiquidityRemoveWebhookFailed), + }; + + lsps5_client + .pending_remove_webhook_requests + .lock() + .unwrap() + .insert(request_id.clone(), sender); + + match tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + { + Ok(Ok(result)) => result.map_err(|e| { + log_error!(self.logger, "Failed to remove webhook: {:?}", e); + Error::LiquidityRemoveWebhookFailed + }), + Ok(Err(e)) => { + log_error!( + self.logger, + "Failed to handle response from liquidity service: {:?}", + e + ); + Err(Error::LiquidityRequestFailed) + }, + Err(e) => { + lsps5_client.pending_remove_webhook_requests.lock().unwrap().remove(&request_id); + log_error!(self.logger, "Liquidity request timed out: {}", e); + Err(Error::LiquidityRequestFailed) + }, + } + } + + pub(crate) fn lsps5_notify_payment_incoming(&self, client_id: PublicKey) -> Result<(), Error> { + let handler = self + .liquidity_manager + .lsps5_service_handler() + .ok_or(Error::LiquiditySourceUnavailable)?; + handler.notify_payment_incoming(client_id).map_err(|_| Error::LiquidityRequestFailed) + } + + pub(crate) fn lsps5_notify_expiry_soon( + &self, client_id: PublicKey, timeout: u32, + ) -> Result<(), Error> { + let handler = self + .liquidity_manager + .lsps5_service_handler() + .ok_or(Error::LiquiditySourceUnavailable)?; + handler.notify_expiry_soon(client_id, timeout).map_err(|_| Error::LiquidityRequestFailed) + } + + pub(crate) fn lsps5_notify_liquidity_management_request( + &self, client_id: PublicKey, + ) -> Result<(), Error> { + let handler = self + .liquidity_manager + .lsps5_service_handler() + .ok_or(Error::LiquiditySourceUnavailable)?; + handler + .notify_liquidity_management_request(client_id) + .map_err(|_| Error::LiquidityRequestFailed) + } + + pub(crate) fn lsps5_notify_onion_message_incoming( + &self, client_id: PublicKey, + ) -> Result<(), Error> { + let handler = self + .liquidity_manager + .lsps5_service_handler() + .ok_or(Error::LiquiditySourceUnavailable)?; + handler.notify_onion_message_incoming(client_id).map_err(|_| Error::LiquidityRequestFailed) + } + pub(crate) async fn handle_channel_ready( &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey, ) { @@ -1540,3 +2155,213 @@ impl LSPS1Liquidity { Ok(response) } } + +/// A liquidity handler for managing LSPS5 webhook notifications. +/// +/// Should be retrieved by calling [`Node::lsps5_liquidity`]. +/// +/// This handler allows clients to register webhook endpoints with their LSP to receive +/// push notifications for Lightning events when the client is offline. +/// +/// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md +/// [`Node::lsps5_liquidity`]: crate::Node::lsps5_liquidity +#[derive(Clone)] +pub struct LSPS5Liquidity { + runtime: Arc, + connection_manager: Arc>>, + liquidity_source: Option>>>, + logger: Arc, +} + +impl LSPS5Liquidity { + pub(crate) fn new( + runtime: Arc, connection_manager: Arc>>, + liquidity_source: Option>>>, logger: Arc, + ) -> Self { + Self { runtime, connection_manager, liquidity_source, logger } + } + + fn set_webhook_impl( + &self, app_name: String, webhook_url: String, + ) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let (lsp_node_id, lsp_address) = + liquidity_source.get_lsps5_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + + let con_node_id = lsp_node_id; + let con_addr = lsp_address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_node_id, lsp_address); + + let liquidity_source = Arc::clone(&liquidity_source); + let response = self.runtime.block_on(async move { + liquidity_source.lsps5_set_webhook(app_name, webhook_url).await + })?; + + Ok(response) + } + + /// Connects to the configured LSP and registers a webhook URL for receiving LSPS5 notifications. + /// + /// The webhook will receive signed push notifications for Lightning events such as incoming + /// payments when the client is offline. + #[cfg(not(feature = "uniffi"))] + pub fn set_webhook( + &self, app_name: String, webhook_url: String, + ) -> Result { + self.set_webhook_impl(app_name, webhook_url) + } + + /// Connects to the configured LSP and registers a webhook URL for receiving LSPS5 notifications. + /// + /// The webhook will receive signed push notifications for Lightning events such as incoming + /// payments when the client is offline. + #[cfg(feature = "uniffi")] + pub fn set_webhook( + &self, app_name: String, webhook_url: String, + ) -> Result { + self.set_webhook_impl(app_name, webhook_url) + } + + fn list_webhooks_impl(&self) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let (lsp_node_id, lsp_address) = + liquidity_source.get_lsps5_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + let con_node_id = lsp_node_id; + let con_addr = lsp_address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + let liquidity_source = Arc::clone(&liquidity_source); + let response = + self.runtime.block_on(async move { liquidity_source.lsps5_list_webhooks().await })?; + Ok(response) + } + + /// Lists all currently configured webhooks at the configured LSP. + #[cfg(not(feature = "uniffi"))] + pub fn list_webhooks(&self) -> Result { + self.list_webhooks_impl() + } + + /// Lists all currently configured webhooks at the configured LSP. + #[cfg(feature = "uniffi")] + pub fn list_webhooks(&self) -> Result { + self.list_webhooks_impl().map(|response| response.into()) + } + + fn remove_webhook_impl(&self, app_name: String) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let (lsp_node_id, lsp_address) = + liquidity_source.get_lsps5_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + + let con_node_id = lsp_node_id; + let con_addr = lsp_address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_node_id, lsp_address); + + let liquidity_source = Arc::clone(&liquidity_source); + let response = self + .runtime + .block_on(async move { liquidity_source.lsps5_remove_webhook(app_name).await })?; + + Ok(response) + } + + /// Removes a previously-configured webhook at the configured LSP. + #[cfg(not(feature = "uniffi"))] + pub fn remove_webhook(&self, app_name: String) -> Result { + self.remove_webhook_impl(app_name) + } + + /// Removes a previously-configured webhook at the configured LSP. + #[cfg(feature = "uniffi")] + pub fn remove_webhook(&self, app_name: String) -> Result { + self.remove_webhook_impl(app_name) + } + + /// Notifies the configured LSP about an incoming payment. + /// + /// This is called by the LSP service to trigger webhook notifications to the specified client. + pub fn notify_payment_incoming(&self, client_id: PublicKey) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + liquidity_source.lsps5_notify_payment_incoming(client_id) + } + + /// Notifies the configured LSP about an invoice expiring soon. + /// + /// This is called by the LSP service to trigger webhook notifications to the specified client. + pub fn notify_expiry_soon(&self, client_id: PublicKey, timeout: u32) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + liquidity_source.lsps5_notify_expiry_soon(client_id, timeout) + } + + /// Notifies the configured LSP about a liquidity management request. + /// + /// This is called by the LSP service to trigger webhook notifications to the specified client. + pub fn notify_liquidity_management_request(&self, client_id: PublicKey) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + liquidity_source.lsps5_notify_liquidity_management_request(client_id) + } + + /// Notifies the configured LSP about an incoming onion message. + /// + /// This is called by the LSP service to trigger webhook notifications to the specified client. + pub fn notify_onion_message_incoming(&self, client_id: PublicKey) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + liquidity_source.lsps5_notify_onion_message_incoming(client_id) + } +} + +#[cfg(feature = "uniffi")] +// Re-export LSPS5 response types for uniffi +pub use lightning_liquidity::lsps5::msgs::{ + RemoveWebhookResponse as LSPS5RemoveWebhookResponse, + SetWebhookResponse as LSPS5SetWebhookResponse, +}; + +#[cfg(feature = "uniffi")] +/// Wrapper for ListWebhooksResponse that converts LSPS5AppName to String for uniffi +#[derive(Clone, Debug)] +pub struct LSPS5ListWebhooksResponse { + pub app_names: Vec, + pub max_webhooks: u32, +} + +#[cfg(feature = "uniffi")] +impl From for LSPS5ListWebhooksResponse { + fn from(response: ListWebhooksResponse) -> Self { + Self { + app_names: response.app_names.into_iter().map(|name| name.to_string()).collect(), + max_webhooks: response.max_webhooks, + } + } +} diff --git a/src/types.rs b/src/types.rs index b5b1ffed7..991c85700 100644 --- a/src/types.rs +++ b/src/types.rs @@ -321,6 +321,8 @@ pub(crate) type BumpTransactionEventHandler = pub(crate) type PaymentStore = DataStore>; +pub type LSPS5ServiceConfig = lightning_liquidity::lsps5::service::LSPS5ServiceConfig; + /// A local, potentially user-provided, identifier of a channel. /// /// By default, this will be randomly generated for the user to ensure local uniqueness. diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 605dd0613..24a30f262 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -8,6 +8,7 @@ mod common; use std::collections::HashSet; +use std::ops::Add; use std::str::FromStr; use std::sync::Arc; @@ -38,6 +39,7 @@ use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; use lightning_invoice::{Bolt11InvoiceDescription, Description}; +use lightning_liquidity::lsps5::service::{LSPS5ServiceConfig, NOTIFICATION_COOLDOWN_TIME}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; @@ -2506,3 +2508,136 @@ async fn persistence_backwards_compatibility() { node_new.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn lsps5_webhook_registration() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + let sync_config = EsploraSyncConfig::default(); + + // Setup LSPS5 service provider node + let service_config = random_config(true); + setup_builder!(service_builder, service_config.node_config); + service_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); + let lsps5_service_config = LSPS5ServiceConfig { max_webhooks_per_client: 2 }; + service_builder.set_liquidity_provider_lsps5(lsps5_service_config); + let service_node = service_builder.build(service_config.node_entropy.into()).unwrap(); + service_node.start().unwrap(); + let service_node_id = service_node.node_id(); + let service_addr = service_node.onchain_payment().new_address().unwrap(); + let service_socket_addr = service_node.listening_addresses().unwrap().first().unwrap().clone(); + + // Setup LSPS5 client node + let client_config = random_config(true); + setup_builder!(client_builder, client_config.node_config); + client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); + client_builder.set_liquidity_source_lsps5(service_node_id, service_socket_addr.clone()); + let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); + client_node.start().unwrap(); + let client_node_id = client_node.node_id(); + let client_addr = client_node.onchain_payment().new_address().unwrap(); + + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![service_addr, client_addr], + Amount::from_sat(10_000_000), + ) + .await; + service_node.sync_wallets().unwrap(); + client_node.sync_wallets().unwrap(); + + open_channel(&client_node, &service_node, 5_000_000, false, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + service_node.sync_wallets().unwrap(); + client_node.sync_wallets().unwrap(); + expect_channel_ready_event!(client_node, service_node.node_id()); + expect_channel_ready_event!(service_node, client_node.node_id()); + + // Test webhook registration + let lsps5_client = client_node.lsps5_liquidity(); + let app_name_1 = "test-app".to_string(); + let webhook_url_1 = "https://example.com/webhook".to_string(); + + // Register first webhook + let response = lsps5_client + .set_webhook(app_name_1.clone(), webhook_url_1.clone()) + .expect("Failed to register webhook"); + assert_eq!(response.num_webhooks, 1, "Expected 1 webhook after first registration"); + assert_eq!(response.max_webhooks, 2, "Expected max_webhooks to be 2"); + assert!(!response.no_change, "Expected no_change to be false for new registration"); + + // Register second webhook with different app name + let app_name_2 = "test-app-2".to_string(); + let webhook_url_2 = "https://example.com/webhook-2".to_string(); + let response = lsps5_client + .set_webhook(app_name_2.clone(), webhook_url_2.clone()) + .expect("Failed to register second webhook"); + assert_eq!(response.num_webhooks, 2, "Expected 2 webhooks after second registration"); + assert_eq!(response.max_webhooks, 2, "Expected max_webhooks to be 2"); + assert!(!response.no_change, "Expected no_change to be false for new registration"); + + // Register the same webhook again - should return no_change=true + let response = lsps5_client + .set_webhook(app_name_2.clone(), webhook_url_2.clone()) + .expect("Failed to re-register webhook"); + assert_eq!(response.num_webhooks, 2, "Expected 2 webhooks after re-registering same webhook"); + assert_eq!(response.max_webhooks, 2, "Expected max_webhooks to be 2"); + assert!(response.no_change, "Expected no_change to be true for duplicate registration"); + + // Attempt to register a third webhook - should fail due to max_webhooks_per_client=2 + let app_name_3 = "test-app-3".to_string(); + let webhook_url_3 = "https://example.com/webhook-3".to_string(); + let result = lsps5_client.set_webhook(app_name_3.clone(), webhook_url_3.clone()); + assert_eq!( + result, + Err(NodeError::LiquiditySetWebhookFailed), + "Expected error when exceeding max webhooks" + ); + + // List registered webhooks + let registered_webhooks = lsps5_client.list_webhooks().expect("Failed to list webhooks"); + assert_eq!(registered_webhooks.app_names.len(), 2, "Expected 2 registered webhooks"); + assert!( + registered_webhooks.app_names.iter().any(|name| name.as_str() == app_name_1), + "Expected app_name_1 in registered webhooks" + ); + assert!( + registered_webhooks.app_names.iter().any(|name| name.as_str() == app_name_2), + "Expected app_name_2 in registered webhooks" + ); + assert_eq!(registered_webhooks.max_webhooks, 2, "Expected max_webhooks to be 2"); + + // Attempt to delete non-existing webhook - should fail + let non_existing_app_name = "non-existing-app".to_string(); + let result = lsps5_client.remove_webhook(non_existing_app_name.clone()); + assert_eq!( + result, + Err(NodeError::LiquidityRemoveWebhookFailed), + "Expected error when removing non-existing webhook" + ); + + // Delete a registered webhook + lsps5_client.remove_webhook(app_name_1.clone()).expect("Failed to delete first webhook"); + + // Verify webhook was deleted + let registered_webhooks = + lsps5_client.list_webhooks().expect("Failed to list webhooks after deletion"); + assert_eq!(registered_webhooks.app_names.len(), 1, "Expected 1 webhook after deletion"); + assert!( + registered_webhooks.app_names.iter().any(|name| name.as_str() == app_name_2), + "Expected app_name_2 to remain after deletion" + ); + assert!( + !registered_webhooks.app_names.iter().any(|name| name.as_str() == app_name_1), + "Expected app_name_1 to be removed" + ); + + // Test service-side notification methods + let lsps5_service = service_node.lsps5_liquidity(); + + lsps5_service.notify_payment_incoming(client_node_id).expect("notify_payment_incoming failed"); + + service_node.stop().unwrap(); + client_node.stop().unwrap(); +}