From 98535fab45b33d5b098ab6c582edd08f45f4ac72 Mon Sep 17 00:00:00 2001 From: Apostlex0 Date: Tue, 27 Jan 2026 13:40:14 +0530 Subject: [PATCH 1/2] changes: switch to bitreq from chunked_transfer we changed the http layer from the manual tcpstream client implementation to bitreq, this change lets us rely on bitreq for http request formatting, sockets, HTTP parsing, chunked bodies, pooling, and async support instead of having to manually implement them. cargo.toml: we added bitreq 0.3 and updates the rest-client and rpc-client feature wiring to depend on bitreq. A tokio feature is also enabled to allow bitreq async support and pipelining. http.rs: The old HttpEndpoint builder and all manual TCP/socket timeout logic are dropped along with the manual GET/POST construction and response parsing. The client API now uses base_url and get/post return Result with a typed HttpClientError instead of std::io::Result. HttpClientError splits transport failures (bitreq::Error), non-2xx HTTP responses (HttpError), and response decoding issues (std::io::Error). rest.rs and rpc.rs: HttpEndpoint and the Mutex> caching pattern are removed and both clients now own an HttpClient directly using base_url. rpc.rs also adds RpcClientError so we can represent HTTP failures, JSON-RPC errors from the server, and malformed responses instead of just giving out std::io::Error. convert.rs: it maps HttpClientError and RpcClientError into BlockSourceError with this retry classification: transport errors and HTTP 5xx are transient, HTTP 4xx and invalid data are persistent, and RPC errors are treated as transient. --- lightning-block-sync/Cargo.toml | 7 +- lightning-block-sync/src/convert.rs | 62 +- lightning-block-sync/src/http.rs | 870 ++++++---------------------- lightning-block-sync/src/rest.rs | 33 +- lightning-block-sync/src/rpc.rs | 135 +++-- 5 files changed, 345 insertions(+), 762 deletions(-) diff --git a/lightning-block-sync/Cargo.toml b/lightning-block-sync/Cargo.toml index 97f199963ac..d8d71da3fae 100644 --- a/lightning-block-sync/Cargo.toml +++ b/lightning-block-sync/Cargo.toml @@ -16,15 +16,16 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [features] -rest-client = [ "serde_json", "chunked_transfer" ] -rpc-client = [ "serde_json", "chunked_transfer" ] +rest-client = [ "serde_json", "dep:bitreq" ] +rpc-client = [ "serde_json", "dep:bitreq" ] +tokio = [ "dep:tokio", "bitreq?/async" ] [dependencies] bitcoin = "0.32.2" lightning = { version = "0.3.0", path = "../lightning" } tokio = { version = "1.35", features = [ "io-util", "net", "time", "rt" ], optional = true } serde_json = { version = "1.0", optional = true } -chunked_transfer = { version = "1.4", optional = true } +bitreq = { version = "0.3", default-features = false, features = ["std"], optional = true } [dev-dependencies] lightning = { version = "0.3.0", path = "../lightning", features = ["_test_utils"] } diff --git a/lightning-block-sync/src/convert.rs b/lightning-block-sync/src/convert.rs index a31b329a5af..47c7586a2c4 100644 --- a/lightning-block-sync/src/convert.rs +++ b/lightning-block-sync/src/convert.rs @@ -1,4 +1,6 @@ -use crate::http::{BinaryResponse, JsonResponse}; +use crate::http::{BinaryResponse, HttpClientError, JsonResponse}; +#[cfg(feature = "rpc-client")] +use crate::rpc::RpcClientError; use crate::utils::hex_to_work; use crate::{BlockHeaderData, BlockSourceError}; @@ -35,6 +37,64 @@ impl From for BlockSourceError { } } +/// Conversion from `HttpClientError` into `BlockSourceError`. +impl From for BlockSourceError { + fn from(e: HttpClientError) -> BlockSourceError { + match e { + // Transport errors (connection, timeout, etc.) are transient + HttpClientError::Transport(err) => { + BlockSourceError::transient(HttpClientError::Transport(err)) + }, + // 5xx errors are transient (server issues), others are persistent (client errors) + HttpClientError::Http(http_err) => { + if (500..600).contains(&http_err.status_code) { + BlockSourceError::transient(HttpClientError::Http(http_err)) + } else { + BlockSourceError::persistent(HttpClientError::Http(http_err)) + } + }, + // Delegate to existing From implementation + HttpClientError::Io(io_err) => BlockSourceError::from(io_err), + } + } +} + +/// Conversion from `RpcClientError` into `BlockSourceError`. +#[cfg(feature = "rpc-client")] +impl From for BlockSourceError { + fn from(e: RpcClientError) -> BlockSourceError { + match e { + RpcClientError::Http(http_err) => match http_err { + // Transport errors (connection, timeout, etc.) are transient + HttpClientError::Transport(err) => BlockSourceError::transient( + RpcClientError::Http(HttpClientError::Transport(err)), + ), + // 5xx errors are transient (server issues), others are persistent (client errors) + HttpClientError::Http(http) => { + if (500..600).contains(&http.status_code) { + BlockSourceError::transient(RpcClientError::Http(HttpClientError::Http( + http, + ))) + } else { + BlockSourceError::persistent(RpcClientError::Http(HttpClientError::Http( + http, + ))) + } + }, + HttpClientError::Io(io_err) => BlockSourceError::from(io_err), + }, + // RPC errors (e.g. "block not found") are transient + RpcClientError::Rpc(rpc_err) => { + BlockSourceError::transient(RpcClientError::Rpc(rpc_err)) + }, + // Malformed response data is persistent + RpcClientError::InvalidData(msg) => { + BlockSourceError::persistent(RpcClientError::InvalidData(msg)) + }, + } + } +} + /// Parses binary data as a block. impl TryInto for BinaryResponse { type Error = io::Error; diff --git a/lightning-block-sync/src/http.rs b/lightning-block-sync/src/http.rs index 0fb82b4acde..29cc4256437 100644 --- a/lightning-block-sync/src/http.rs +++ b/lightning-block-sync/src/http.rs @@ -1,399 +1,164 @@ //! Simple HTTP implementation which supports both async and traditional execution environments //! with minimal dependencies. This is used as the basis for REST and RPC clients. -use chunked_transfer; use serde_json; -use std::convert::TryFrom; -use std::fmt; -#[cfg(not(feature = "tokio"))] -use std::io::Write; -use std::net::{SocketAddr, ToSocketAddrs}; -use std::time::Duration; - -#[cfg(feature = "tokio")] -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt}; #[cfg(feature = "tokio")] -use tokio::net::TcpStream; +use bitreq::RequestExt; -#[cfg(not(feature = "tokio"))] -use std::io::BufRead; -use std::io::Read; -#[cfg(not(feature = "tokio"))] -use std::net::TcpStream; - -/// Timeout for operations on TCP streams. -const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5); - -/// Timeout for reading the first byte of a response. This is separate from the general read -/// timeout as it is not uncommon for Bitcoin Core to be blocked waiting on UTXO cache flushes for -/// upwards of 10 minutes on slow devices (e.g. RPis with SSDs over USB). Note that we always retry -/// once when we time out, so the maximum time we allow Bitcoin Core to block for is twice this -/// value. -const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(300); +use std::convert::TryFrom; +use std::fmt; -/// Maximum HTTP message header size in bytes. -const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192; +/// Timeout for requests in seconds. This is set to a high value as it is not uncommon for Bitcoin +/// Core to be blocked waiting on UTXO cache flushes for upwards of 10 minutes on slow devices +/// (e.g. RPis with SSDs over USB). +const TCP_STREAM_RESPONSE_TIMEOUT: u64 = 300; /// Maximum HTTP message body size in bytes. Enough for a hex-encoded block in JSON format and any /// overhead for HTTP chunked transfer encoding. const MAX_HTTP_MESSAGE_BODY_SIZE: usize = 2 * 4_000_000 + 32_000; -/// Endpoint for interacting with an HTTP-based API. +/// Error type for HTTP client operations. #[derive(Debug)] -pub struct HttpEndpoint { - host: String, - port: Option, - path: String, +pub enum HttpClientError { + /// transport-level error (connection, timeout, protocol parsing, etc.) + Transport(bitreq::Error), + /// HTTP error response (non-2xx status code) + Http(HttpError), + /// Response parsing/conversion error + Io(std::io::Error), } -impl HttpEndpoint { - /// Creates an endpoint for the given host and default HTTP port. - pub fn for_host(host: String) -> Self { - Self { host, port: None, path: String::from("/") } - } - - /// Specifies a port to use with the endpoint. - pub fn with_port(mut self, port: u16) -> Self { - self.port = Some(port); - self - } - - /// Specifies a path to use with the endpoint. - pub fn with_path(mut self, path: String) -> Self { - self.path = path; - self - } - - /// Returns the endpoint host. - pub fn host(&self) -> &str { - &self.host +impl std::error::Error for HttpClientError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + HttpClientError::Transport(e) => Some(e), + HttpClientError::Http(e) => Some(e), + HttpClientError::Io(e) => Some(e), + } } +} - /// Returns the endpoint port. - pub fn port(&self) -> u16 { - match self.port { - None => 80, - Some(port) => port, +impl fmt::Display for HttpClientError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + HttpClientError::Transport(e) => write!(f, "transport error: {}", e), + HttpClientError::Http(e) => write!(f, "HTTP error: {}", e), + HttpClientError::Io(e) => write!(f, "Response parsing/conversion error: {}", e), } } +} - /// Returns the endpoint path. - pub fn path(&self) -> &str { - &self.path +impl From for HttpClientError { + fn from(e: std::io::Error) -> Self { + HttpClientError::Io(e) } } -impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint { - type Iter = <(&'a str, u16) as std::net::ToSocketAddrs>::Iter; +impl From for HttpClientError { + fn from(e: bitreq::Error) -> Self { + HttpClientError::Transport(e) + } +} - fn to_socket_addrs(&self) -> std::io::Result { - (self.host(), self.port()).to_socket_addrs() +impl From for HttpClientError { + fn from(e: HttpError) -> Self { + HttpClientError::Http(e) } } +/// Maximum number of cached connections in the connection pool. +#[cfg(feature = "tokio")] +const MAX_CONNECTIONS: usize = 10; + /// Client for making HTTP requests. pub(crate) struct HttpClient { - address: SocketAddr, - stream: TcpStream, + base_url: String, + #[cfg(feature = "tokio")] + client: bitreq::Client, } impl HttpClient { - /// Opens a connection to an HTTP endpoint. - pub fn connect(endpoint: E) -> std::io::Result { - let address = match endpoint.to_socket_addrs()?.next() { - None => { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "could not resolve to any addresses", - )); - }, - Some(address) => address, - }; - let stream = std::net::TcpStream::connect_timeout(&address, TCP_STREAM_TIMEOUT)?; - stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT))?; - stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT))?; - - #[cfg(feature = "tokio")] - let stream = { - stream.set_nonblocking(true)?; - TcpStream::from_std(stream)? - }; - - Ok(Self { address, stream }) + /// Creates a new HTTP client for the given base URL. + /// + /// The base URL should include the scheme, host, and port (e.g., "http://127.0.0.1:8332"). + /// DNS resolution is deferred until the first request is made. + pub fn new(base_url: String) -> Self { + Self { + base_url, + #[cfg(feature = "tokio")] + client: bitreq::Client::new(MAX_CONNECTIONS), + } } - /// Sends a `GET` request for a resource identified by `uri` at the `host`. + /// Sends a `GET` request for a resource identified by `uri`. /// /// Returns the response body in `F` format. #[allow(dead_code)] - pub async fn get(&mut self, uri: &str, host: &str) -> std::io::Result + pub async fn get(&self, uri: &str) -> Result where F: TryFrom, Error = std::io::Error>, { - let request = format!( - "GET {} HTTP/1.1\r\n\ - Host: {}\r\n\ - Connection: keep-alive\r\n\ - \r\n", - uri, host - ); - let response_body = self.send_request_with_retry(&request).await?; - F::try_from(response_body) + let url = format!("{}{}", self.base_url, uri); + let request = bitreq::get(url) + .with_timeout(TCP_STREAM_RESPONSE_TIMEOUT) + .with_max_body_size(Some(MAX_HTTP_MESSAGE_BODY_SIZE)); + #[cfg(feature = "tokio")] + let request = request.with_pipelining(); + let response_body = self.send_request(request).await?; + F::try_from(response_body).map_err(HttpClientError::Io) } - /// Sends a `POST` request for a resource identified by `uri` at the `host` using the given HTTP + /// Sends a `POST` request for a resource identified by `uri` using the given HTTP /// authentication credentials. /// /// The request body consists of the provided JSON `content`. Returns the response body in `F` /// format. #[allow(dead_code)] pub async fn post( - &mut self, uri: &str, host: &str, auth: &str, content: serde_json::Value, - ) -> std::io::Result + &self, uri: &str, auth: &str, content: serde_json::Value, + ) -> Result where F: TryFrom, Error = std::io::Error>, { - let content = content.to_string(); - let request = format!( - "POST {} HTTP/1.1\r\n\ - Host: {}\r\n\ - Authorization: {}\r\n\ - Connection: keep-alive\r\n\ - Content-Type: application/json\r\n\ - Content-Length: {}\r\n\ - \r\n\ - {}", - uri, - host, - auth, - content.len(), - content - ); - let response_body = self.send_request_with_retry(&request).await?; - F::try_from(response_body) - } - - /// Sends an HTTP request message and reads the response, returning its body. Attempts to - /// reconnect and retry if the connection has been closed. - async fn send_request_with_retry(&mut self, request: &str) -> std::io::Result> { - match self.send_request(request).await { - Ok(bytes) => Ok(bytes), - Err(_) => { - // Reconnect and retry on fail. This can happen if the connection was closed after - // the keep-alive limits are reached, or generally if the request timed out due to - // Bitcoin Core being stuck on a long-running operation or its RPC queue being - // full. - // Block 100ms before retrying the request as in many cases the source of the error - // may be persistent for some time. - #[cfg(feature = "tokio")] - tokio::time::sleep(Duration::from_millis(100)).await; - #[cfg(not(feature = "tokio"))] - std::thread::sleep(Duration::from_millis(100)); - *self = Self::connect(self.address)?; - self.send_request(request).await - }, - } - } - - /// Sends an HTTP request message and reads the response, returning its body. - async fn send_request(&mut self, request: &str) -> std::io::Result> { - self.write_request(request).await?; - self.read_response().await - } - - /// Writes an HTTP request message. - async fn write_request(&mut self, request: &str) -> std::io::Result<()> { + let url = format!("{}{}", self.base_url, uri); + let request = bitreq::post(url) + .with_header("Authorization", auth) + .with_header("Content-Type", "application/json") + .with_timeout(TCP_STREAM_RESPONSE_TIMEOUT) + .with_max_body_size(Some(MAX_HTTP_MESSAGE_BODY_SIZE)) + .with_body(content.to_string()); #[cfg(feature = "tokio")] - { - self.stream.write_all(request.as_bytes()).await?; - self.stream.flush().await - } - #[cfg(not(feature = "tokio"))] - { - self.stream.write_all(request.as_bytes())?; - self.stream.flush() - } + let request = request.with_pipelining(); + let response_body = self.send_request(request).await?; + F::try_from(response_body).map_err(HttpClientError::Io) } - /// Reads an HTTP response message. - async fn read_response(&mut self) -> std::io::Result> { - #[cfg(feature = "tokio")] - let stream = self.stream.split().0; - #[cfg(not(feature = "tokio"))] - let stream = std::io::Read::by_ref(&mut self.stream); - - let limited_stream = stream.take(MAX_HTTP_MESSAGE_HEADER_SIZE as u64); - + /// Sends an HTTP request message and reads the response, returning its body. + async fn send_request(&self, request: bitreq::Request) -> Result, HttpClientError> { #[cfg(feature = "tokio")] - let mut reader = tokio::io::BufReader::new(limited_stream); + let response = request.send_async_with_client(&self.client).await?; #[cfg(not(feature = "tokio"))] - let mut reader = std::io::BufReader::new(limited_stream); - - macro_rules! read_line { - () => { - read_line!(0) - }; - ($retry_count: expr) => {{ - let mut line = String::new(); - let mut timeout_count: u64 = 0; - let bytes_read = loop { - #[cfg(feature = "tokio")] - let read_res = reader.read_line(&mut line).await; - #[cfg(not(feature = "tokio"))] - let read_res = reader.read_line(&mut line); - match read_res { - Ok(bytes_read) => break bytes_read, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - timeout_count += 1; - if timeout_count > $retry_count { - return Err(e); - } else { - continue; - } - }, - Err(e) => return Err(e), - } - }; - - match bytes_read { - 0 => None, - _ => { - // Remove trailing CRLF - if line.ends_with('\n') { - line.pop(); - if line.ends_with('\r') { - line.pop(); - } - } - Some(line) - }, - } - }}; - } + let response = request.send()?; - // Read and parse status line - // Note that we allow retrying a few times to reach TCP_STREAM_RESPONSE_TIMEOUT. - let status_line = - read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs()) - .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?; - let status = HttpStatus::parse(&status_line)?; - - // Read and parse relevant headers - let mut message_length = HttpMessageLength::Empty; - loop { - let line = read_line!() - .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no headers"))?; - if line.is_empty() { - break; - } + let status_code = response.status_code; + let body = response.into_bytes(); - let header = HttpHeader::parse(&line)?; - if header.has_name("Content-Length") { - let length = header - .value - .parse() - .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; - if let HttpMessageLength::Empty = message_length { - message_length = HttpMessageLength::ContentLength(length); - } - continue; - } - - if header.has_name("Transfer-Encoding") { - message_length = HttpMessageLength::TransferEncoding(header.value.into()); - continue; - } + if !(200..300).contains(&status_code) { + return Err(HttpError { status_code, contents: body }.into()); } - // Read message body - let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len(); - reader.get_mut().set_limit(read_limit as u64); - let contents = match message_length { - HttpMessageLength::Empty => Vec::new(), - HttpMessageLength::ContentLength(length) => { - if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("invalid response length: {} bytes", length), - )); - } else { - let mut content = vec![0; length]; - #[cfg(feature = "tokio")] - reader.read_exact(&mut content[..]).await?; - #[cfg(not(feature = "tokio"))] - reader.read_exact(&mut content[..])?; - content - } - }, - HttpMessageLength::TransferEncoding(coding) => { - if !coding.eq_ignore_ascii_case("chunked") { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "unsupported transfer coding", - )); - } else { - let mut content = Vec::new(); - #[cfg(feature = "tokio")] - { - // Since chunked_transfer doesn't have an async interface, only use it to - // determine the size of each chunk to read. - // - // TODO: Replace with an async interface when available. - // https://github.com/frewsxcv/rust-chunked-transfer/issues/7 - loop { - // Read the chunk header which contains the chunk size. - let mut chunk_header = String::new(); - reader.read_line(&mut chunk_header).await?; - if chunk_header == "0\r\n" { - // Read the terminator chunk since the decoder consumes the CRLF - // immediately when this chunk is encountered. - reader.read_line(&mut chunk_header).await?; - } - - // Decode the chunk header to obtain the chunk size. - let mut buffer = Vec::new(); - let mut decoder = - chunked_transfer::Decoder::new(chunk_header.as_bytes()); - decoder.read_to_end(&mut buffer)?; - - // Read the chunk body. - let chunk_size = match decoder.remaining_chunks_size() { - None => break, - Some(chunk_size) => chunk_size, - }; - let chunk_offset = content.len(); - content.resize(chunk_offset + chunk_size + "\r\n".len(), 0); - reader.read_exact(&mut content[chunk_offset..]).await?; - content.resize(chunk_offset + chunk_size, 0); - } - content - } - #[cfg(not(feature = "tokio"))] - { - let mut decoder = chunked_transfer::Decoder::new(reader); - decoder.read_to_end(&mut content)?; - content - } - } - }, - }; - - if !status.is_ok() { - // TODO: Handle 3xx redirection responses. - let error = HttpError { status_code: status.code.to_string(), contents }; - return Err(std::io::Error::new(std::io::ErrorKind::Other, error)); - } - - Ok(contents) + Ok(body) } } /// HTTP error consisting of a status code and body contents. #[derive(Debug)] -pub(crate) struct HttpError { - pub(crate) status_code: String, - pub(crate) contents: Vec, +pub struct HttpError { + /// The HTTP status code. + pub status_code: i32, + /// The response body contents. + pub contents: Vec, } impl std::error::Error for HttpError {} @@ -405,94 +170,6 @@ impl fmt::Display for HttpError { } } -/// HTTP response status code as defined by [RFC 7231]. -/// -/// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-6 -struct HttpStatus<'a> { - code: &'a str, -} - -impl<'a> HttpStatus<'a> { - /// Parses an HTTP status line as defined by [RFC 7230]. - /// - /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.1.2 - fn parse(line: &'a String) -> std::io::Result> { - let mut tokens = line.splitn(3, ' '); - - let http_version = tokens - .next() - .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no HTTP-Version"))?; - if !http_version.eq_ignore_ascii_case("HTTP/1.1") - && !http_version.eq_ignore_ascii_case("HTTP/1.0") - { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "invalid HTTP-Version", - )); - } - - let code = tokens - .next() - .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Status-Code"))?; - if code.len() != 3 || !code.chars().all(|c| c.is_ascii_digit()) { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "invalid Status-Code", - )); - } - - let _reason = tokens - .next() - .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Reason-Phrase"))?; - - Ok(Self { code }) - } - - /// Returns whether the status is successful (i.e., 2xx status class). - fn is_ok(&self) -> bool { - self.code.starts_with('2') - } -} - -/// HTTP response header as defined by [RFC 7231]. -/// -/// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-7 -struct HttpHeader<'a> { - name: &'a str, - value: &'a str, -} - -impl<'a> HttpHeader<'a> { - /// Parses an HTTP header field as defined by [RFC 7230]. - /// - /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.2 - fn parse(line: &'a String) -> std::io::Result> { - let mut tokens = line.splitn(2, ':'); - let name = tokens - .next() - .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header name"))?; - let value = tokens - .next() - .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header value"))? - .trim_start(); - Ok(Self { name, value }) - } - - /// Returns whether the header field has the given name. - fn has_name(&self, name: &str) -> bool { - self.name.eq_ignore_ascii_case(name) - } -} - -/// HTTP message body length as defined by [RFC 7230]. -/// -/// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.3.3 -enum HttpMessageLength { - Empty, - ContentLength(usize), - TransferEncoding(String), -} - /// An HTTP response body in binary format. pub struct BinaryResponse(pub Vec); @@ -517,82 +194,45 @@ impl TryFrom> for JsonResponse { } } -#[cfg(test)] -mod endpoint_tests { - use super::HttpEndpoint; - - #[test] - fn with_default_port() { - let endpoint = HttpEndpoint::for_host("foo.com".into()); - assert_eq!(endpoint.host(), "foo.com"); - assert_eq!(endpoint.port(), 80); - } - - #[test] - fn with_custom_port() { - let endpoint = HttpEndpoint::for_host("foo.com".into()).with_port(8080); - assert_eq!(endpoint.host(), "foo.com"); - assert_eq!(endpoint.port(), 8080); - } - - #[test] - fn with_uri_path() { - let endpoint = HttpEndpoint::for_host("foo.com".into()).with_path("/path".into()); - assert_eq!(endpoint.host(), "foo.com"); - assert_eq!(endpoint.path(), "/path"); - } - - #[test] - fn without_uri_path() { - let endpoint = HttpEndpoint::for_host("foo.com".into()); - assert_eq!(endpoint.host(), "foo.com"); - assert_eq!(endpoint.path(), "/"); - } - - #[test] - fn convert_to_socket_addrs() { - let endpoint = HttpEndpoint::for_host("localhost".into()); - let host = endpoint.host(); - let port = endpoint.port(); - - use std::net::ToSocketAddrs; - match (&endpoint).to_socket_addrs() { - Err(e) => panic!("Unexpected error: {:?}", e), - Ok(socket_addrs) => { - let mut std_addrs = (host, port).to_socket_addrs().unwrap(); - for addr in socket_addrs { - assert_eq!(addr, std_addrs.next().unwrap()); - } - assert!(std_addrs.next().is_none()); - }, - } - } -} - #[cfg(test)] pub(crate) mod client_tests { use super::*; - use std::io::BufRead; - use std::io::Write; + use std::io::{BufRead, Read, Write}; + use std::time::Duration; /// Server for handling HTTP client requests with a stock response. pub struct HttpServer { address: std::net::SocketAddr, - handler: std::thread::JoinHandle<()>, + handler: Option>, shutdown: std::sync::Arc, } + impl Drop for HttpServer { + fn drop(&mut self) { + self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst); + // Make a connection to unblock the listener's accept() call + let _ = std::net::TcpStream::connect(self.address); + if let Some(handler) = self.handler.take() { + let _ = handler.join(); + } + } + } + /// Body of HTTP response messages. pub enum MessageBody { Empty, Content(T), - ChunkedContent(T), } impl HttpServer { fn responding_with_body(status: &str, body: MessageBody) -> Self { let response = match body { - MessageBody::Empty => format!("{}\r\n\r\n", status), + MessageBody::Empty => format!( + "{}\r\n\ + Content-Length: 0\r\n\ + \r\n", + status + ), MessageBody::Content(body) => { let body = body.to_string(); format!( @@ -605,22 +245,6 @@ pub(crate) mod client_tests { body ) }, - MessageBody::ChunkedContent(body) => { - let mut chuncked_body = Vec::new(); - { - use chunked_transfer::Encoder; - let mut encoder = Encoder::with_chunks_size(&mut chuncked_body, 8); - encoder.write_all(body.to_string().as_bytes()).unwrap(); - } - format!( - "{}\r\n\ - Transfer-Encoding: chunked\r\n\ - \r\n\ - {}", - status, - String::from_utf8(chuncked_body).unwrap() - ) - }, }; HttpServer::responding_with(response) } @@ -645,179 +269,90 @@ pub(crate) mod client_tests { let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); let shutdown_signaled = std::sync::Arc::clone(&shutdown); let handler = std::thread::spawn(move || { + let timeout = Duration::from_secs(5); for stream in listener.incoming() { - let mut stream = stream.unwrap(); - stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap(); - - let lines_read = std::io::BufReader::new(&stream) - .lines() - .take_while(|line| !line.as_ref().unwrap().is_empty()) - .count(); - if lines_read == 0 { - continue; + if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) { + return; } - for chunk in response.as_bytes().chunks(16) { + let stream = stream.unwrap(); + stream.set_write_timeout(Some(timeout)).unwrap(); + stream.set_read_timeout(Some(timeout)).unwrap(); + + let mut reader = std::io::BufReader::new(stream); + + // Handle multiple requests on the same connection (keep-alive) + loop { if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) { return; - } else { - if let Err(_) = stream.write(chunk) { + } + + // Read request headers + let mut lines_read = 0; + let mut content_length: usize = 0; + loop { + let mut line = String::new(); + match reader.read_line(&mut line) { + Ok(0) => break, // eof + Ok(_) => { + if line == "\r\n" || line == "\n" { + break; // end of headers + } + // Parse content_length for POST body handling + if let Some(value) = line.strip_prefix("Content-Length:") { + content_length = value.trim().parse().unwrap_or(0); + } + lines_read += 1; + }, + Err(_) => break, // Read error or timeout + } + } + + if lines_read == 0 { + break; // No request received, connection closed + } + + // Consume request body if present (needed for POST keep-alive) + if content_length > 0 { + let mut body = vec![0u8; content_length]; + if reader.read_exact(&mut body).is_err() { break; } - if let Err(_) = stream.flush() { + } + + // Send response + let stream = reader.get_mut(); + let mut write_error = false; + for chunk in response.as_bytes().chunks(16) { + if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) { + return; + } + if stream.write(chunk).is_err() || stream.flush().is_err() { + write_error = true; break; } } + if write_error { + break; + } } } }); - Self { address, handler, shutdown } - } - - fn shutdown(self) { - self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst); - self.handler.join().unwrap(); - } - - pub fn endpoint(&self) -> HttpEndpoint { - HttpEndpoint::for_host(self.address.ip().to_string()).with_port(self.address.port()) - } - } - - #[test] - fn connect_to_unresolvable_host() { - match HttpClient::connect(("example.invalid", 80)) { - Err(e) => { - assert!( - e.to_string().contains("failed to lookup address information") - || e.to_string().contains("No such host"), - "{:?}", - e - ); - }, - Ok(_) => panic!("Expected error"), - } - } - - #[test] - fn connect_with_no_socket_address() { - match HttpClient::connect(&vec![][..]) { - Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput), - Ok(_) => panic!("Expected error"), - } - } - - #[test] - fn connect_with_unknown_server() { - // get an unused port by binding to port 0 - let port = { - let t = std::net::TcpListener::bind(("127.0.0.1", 0)).unwrap(); - t.local_addr().unwrap().port() - }; - - match HttpClient::connect(("::", port)) { - #[cfg(target_os = "windows")] - Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::AddrNotAvailable), - #[cfg(not(target_os = "windows"))] - Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::ConnectionRefused), - Ok(_) => panic!("Expected error"), + Self { address, handler: Some(handler), shutdown } } - } - - #[tokio::test] - async fn connect_with_valid_endpoint() { - let server = HttpServer::responding_with_ok::(MessageBody::Empty); - match HttpClient::connect(&server.endpoint()) { - Err(e) => panic!("Unexpected error: {:?}", e), - Ok(_) => {}, + pub fn endpoint(&self) -> String { + format!("http://{}:{}", self.address.ip(), self.address.port()) } } #[tokio::test] - async fn read_empty_message() { - let server = HttpServer::responding_with("".to_string()); - - let mut client = HttpClient::connect(&server.endpoint()).unwrap(); - match client.get::("/foo", "foo.com").await { - Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof); - assert_eq!(e.get_ref().unwrap().to_string(), "no status line"); - }, - Ok(_) => panic!("Expected error"), - } - } - - #[tokio::test] - async fn read_incomplete_message() { - let server = HttpServer::responding_with("HTTP/1.1 200 OK".to_string()); - - let mut client = HttpClient::connect(&server.endpoint()).unwrap(); - match client.get::("/foo", "foo.com").await { - Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof); - assert_eq!(e.get_ref().unwrap().to_string(), "no headers"); - }, - Ok(_) => panic!("Expected error"), - } - } - - #[tokio::test] - async fn read_too_large_message_headers() { - let response = format!( - "HTTP/1.1 302 Found\r\n\ - Location: {}\r\n\ - \r\n", - "Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE) - ); - let server = HttpServer::responding_with(response); - - let mut client = HttpClient::connect(&server.endpoint()).unwrap(); - match client.get::("/foo", "foo.com").await { - Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof); - assert_eq!(e.get_ref().unwrap().to_string(), "no headers"); - }, - Ok(_) => panic!("Expected error"), - } - } - - #[tokio::test] - async fn read_too_large_message_body() { - let body = "Z".repeat(MAX_HTTP_MESSAGE_BODY_SIZE + 1); - let server = HttpServer::responding_with_ok::(MessageBody::Content(body)); - - let mut client = HttpClient::connect(&server.endpoint()).unwrap(); - match client.get::("/foo", "foo.com").await { - Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); - assert_eq!( - e.get_ref().unwrap().to_string(), - "invalid response length: 8032001 bytes" - ); - }, - Ok(_) => panic!("Expected error"), - } - server.shutdown(); - } - - #[tokio::test] - async fn read_message_with_unsupported_transfer_coding() { - let response = String::from( - "HTTP/1.1 200 OK\r\n\ - Transfer-Encoding: gzip\r\n\ - \r\n\ - foobar", - ); - let server = HttpServer::responding_with(response); - - let mut client = HttpClient::connect(&server.endpoint()).unwrap(); - match client.get::("/foo", "foo.com").await { - Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); - assert_eq!(e.get_ref().unwrap().to_string(), "unsupported transfer coding"); - }, + async fn connect_with_invalid_host() { + let client = HttpClient::new("http://invalid.host.example:80".to_string()); + match client.get::("/foo").await { + Err(HttpClientError::Transport(_)) => {}, + Err(e) => panic!("Unexpected error type: {:?}", e), Ok(_) => panic!("Expected error"), } } @@ -826,50 +361,25 @@ pub(crate) mod client_tests { async fn read_error() { let server = HttpServer::responding_with_server_error("foo"); - let mut client = HttpClient::connect(&server.endpoint()).unwrap(); - match client.get::("/foo", "foo.com").await { - Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::Other); - let http_error = e.into_inner().unwrap().downcast::().unwrap(); - assert_eq!(http_error.status_code, "500"); + let client = HttpClient::new(server.endpoint()); + match client.get::("/foo").await { + Err(HttpClientError::Http(http_error)) => { + assert_eq!(http_error.status_code, 500); assert_eq!(http_error.contents, "foo".as_bytes()); }, + Err(e) => panic!("Unexpected error type: {:?}", e), Ok(_) => panic!("Expected error"), } } #[tokio::test] - async fn read_empty_message_body() { - let server = HttpServer::responding_with_ok::(MessageBody::Empty); - - let mut client = HttpClient::connect(&server.endpoint()).unwrap(); - match client.get::("/foo", "foo.com").await { - Err(e) => panic!("Unexpected error: {:?}", e), - Ok(bytes) => assert_eq!(bytes.0, Vec::::new()), - } - } - - #[tokio::test] - async fn read_message_body_with_length() { + async fn read_message_body() { let body = "foo bar baz qux".repeat(32); let content = MessageBody::Content(body.clone()); let server = HttpServer::responding_with_ok::(content); - let mut client = HttpClient::connect(&server.endpoint()).unwrap(); - match client.get::("/foo", "foo.com").await { - Err(e) => panic!("Unexpected error: {:?}", e), - Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()), - } - } - - #[tokio::test] - async fn read_chunked_message_body() { - let body = "foo bar baz qux".repeat(32); - let chunked_content = MessageBody::ChunkedContent(body.clone()); - let server = HttpServer::responding_with_ok::(chunked_content); - - let mut client = HttpClient::connect(&server.endpoint()).unwrap(); - match client.get::("/foo", "foo.com").await { + let client = HttpClient::new(server.endpoint()); + match client.get::("/foo").await { Err(e) => panic!("Unexpected error: {:?}", e), Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()), } @@ -879,9 +389,9 @@ pub(crate) mod client_tests { async fn reconnect_closed_connection() { let server = HttpServer::responding_with_ok::(MessageBody::Empty); - let mut client = HttpClient::connect(&server.endpoint()).unwrap(); - assert!(client.get::("/foo", "foo.com").await.is_ok()); - match client.get::("/foo", "foo.com").await { + let client = HttpClient::new(server.endpoint()); + assert!(client.get::("/foo").await.is_ok()); + match client.get::("/foo").await { Err(e) => panic!("Unexpected error: {:?}", e), Ok(bytes) => assert_eq!(bytes.0, Vec::::new()), } diff --git a/lightning-block-sync/src/rest.rs b/lightning-block-sync/src/rest.rs index 619981bb4d0..0ea93895bcd 100644 --- a/lightning-block-sync/src/rest.rs +++ b/lightning-block-sync/src/rest.rs @@ -3,7 +3,7 @@ use crate::convert::GetUtxosResponse; use crate::gossip::UtxoSource; -use crate::http::{BinaryResponse, HttpClient, HttpEndpoint, JsonResponse}; +use crate::http::{BinaryResponse, HttpClient, HttpClientError, JsonResponse}; use crate::{BlockData, BlockHeaderData, BlockSource, BlockSourceResult}; use bitcoin::hash_types::BlockHash; @@ -12,38 +12,27 @@ use bitcoin::OutPoint; use std::convert::TryFrom; use std::convert::TryInto; use std::future::Future; -use std::sync::Mutex; /// A simple REST client for requesting resources using HTTP `GET`. pub struct RestClient { - endpoint: HttpEndpoint, - client: Mutex>, + client: HttpClient, } impl RestClient { /// Creates a new REST client connected to the given endpoint. /// - /// The endpoint should contain the REST path component (e.g., http://127.0.0.1:8332/rest). - pub fn new(endpoint: HttpEndpoint) -> Self { - Self { endpoint, client: Mutex::new(None) } + /// The base URL should include the REST path component (e.g., "http://127.0.0.1:8332/rest"). + pub fn new(base_url: String) -> Self { + Self { client: HttpClient::new(base_url) } } /// Requests a resource encoded in `F` format and interpreted as type `T`. - pub async fn request_resource(&self, resource_path: &str) -> std::io::Result + pub async fn request_resource(&self, resource_path: &str) -> Result where F: TryFrom, Error = std::io::Error> + TryInto, { - let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port()); - let uri = format!("{}/{}", self.endpoint.path().trim_end_matches("/"), resource_path); - let reserved_client = self.client.lock().unwrap().take(); - let mut client = if let Some(client) = reserved_client { - client - } else { - HttpClient::connect(&self.endpoint)? - }; - let res = client.get::(&uri, &host).await?.try_into(); - *self.client.lock().unwrap() = Some(client); - res + let uri = format!("/{}", resource_path); + self.client.get::(&uri).await?.try_into().map_err(HttpClientError::Io) } } @@ -126,7 +115,8 @@ mod tests { let client = RestClient::new(server.endpoint()); match client.request_resource::("/").await { - Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other), + Err(HttpClientError::Http(e)) => assert_eq!(e.status_code, 404), + Err(e) => panic!("Unexpected error type: {:?}", e), Ok(_) => panic!("Expected error"), } } @@ -137,7 +127,8 @@ mod tests { let client = RestClient::new(server.endpoint()); match client.request_resource::("/").await { - Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidData), + Err(HttpClientError::Io(_)) => {}, + Err(e) => panic!("Unexpected error type: {:?}", e), Ok(_) => panic!("Expected error"), } } diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs index d851ba2ccf0..c81d7f23da9 100644 --- a/lightning-block-sync/src/rpc.rs +++ b/lightning-block-sync/src/rpc.rs @@ -2,14 +2,12 @@ //! endpoint. use crate::gossip::UtxoSource; -use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse}; +use crate::http::{HttpClient, HttpClientError, JsonResponse}; use crate::{BlockData, BlockHeaderData, BlockSource, BlockSourceResult}; use bitcoin::hash_types::BlockHash; use bitcoin::OutPoint; -use std::sync::Mutex; - use serde_json; use std::convert::TryFrom; @@ -36,14 +34,56 @@ impl fmt::Display for RpcError { impl Error for RpcError {} +/// Error type for RPC client operations. +#[derive(Debug)] +pub enum RpcClientError { + /// An HTTP client error (transport or HTTP error). + Http(HttpClientError), + /// An RPC error returned by the server. + Rpc(RpcError), + /// Invalid data in the response. + InvalidData(String), +} + +impl std::error::Error for RpcClientError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + RpcClientError::Http(e) => Some(e), + RpcClientError::Rpc(e) => Some(e), + RpcClientError::InvalidData(_) => None, + } + } +} + +impl fmt::Display for RpcClientError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + RpcClientError::Http(e) => write!(f, "HTTP error: {}", e), + RpcClientError::Rpc(e) => write!(f, "{}", e), + RpcClientError::InvalidData(msg) => write!(f, "invalid data: {}", msg), + } + } +} + +impl From for RpcClientError { + fn from(e: HttpClientError) -> Self { + RpcClientError::Http(e) + } +} + +impl From for RpcClientError { + fn from(e: RpcError) -> Self { + RpcClientError::Rpc(e) + } +} + /// A simple RPC client for calling methods using HTTP `POST`. /// /// Implements [`BlockSource`] and may return an `Err` containing [`RpcError`]. See /// [`RpcClient::call_method`] for details. pub struct RpcClient { basic_auth: String, - endpoint: HttpEndpoint, - client: Mutex>, + client: HttpClient, id: AtomicUsize, } @@ -51,85 +91,64 @@ impl RpcClient { /// Creates a new RPC client connected to the given endpoint with the provided credentials. The /// credentials should be a base64 encoding of a user name and password joined by a colon, as is /// required for HTTP basic access authentication. - pub fn new(credentials: &str, endpoint: HttpEndpoint) -> Self { + /// + /// The base URL should include the scheme, host, and port (e.g., "http://127.0.0.1:8332"). + pub fn new(credentials: &str, base_url: String) -> Self { Self { basic_auth: "Basic ".to_string() + credentials, - endpoint, - client: Mutex::new(None), + client: HttpClient::new(base_url), id: AtomicUsize::new(0), } } /// Calls a method with the response encoded in JSON format and interpreted as type `T`. - /// - /// When an `Err` is returned, [`std::io::Error::into_inner`] may contain an [`RpcError`] if - /// [`std::io::Error::kind`] is [`std::io::ErrorKind::Other`]. pub async fn call_method( &self, method: &str, params: &[serde_json::Value], - ) -> std::io::Result + ) -> Result where JsonResponse: TryFrom, Error = std::io::Error> + TryInto, { - let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port()); - let uri = self.endpoint.path(); let content = serde_json::json!({ "method": method, "params": params, "id": &self.id.fetch_add(1, Ordering::AcqRel).to_string() }); - let reserved_client = self.client.lock().unwrap().take(); - let mut client = if let Some(client) = reserved_client { - client - } else { - HttpClient::connect(&self.endpoint)? - }; - let http_response = - client.post::(&uri, &host, &self.basic_auth, content).await; - *self.client.lock().unwrap() = Some(client); + let http_response = self.client.post::("/", &self.basic_auth, content).await; let mut response = match http_response { Ok(JsonResponse(response)) => response, - Err(e) if e.kind() == std::io::ErrorKind::Other => { - match e.get_ref().unwrap().downcast_ref::() { - Some(http_error) => match JsonResponse::try_from(http_error.contents.clone()) { - Ok(JsonResponse(response)) => response, - Err(_) => Err(e)?, - }, - None => Err(e)?, + Err(HttpClientError::Http(http_error)) => { + // Try to parse the error body as JSON-RPC response + match JsonResponse::try_from(http_error.contents.clone()) { + Ok(JsonResponse(response)) => response, + Err(_) => return Err(HttpClientError::Http(http_error).into()), } }, - Err(e) => Err(e)?, + Err(e) => return Err(e.into()), }; if !response.is_object() { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "expected JSON object", - )); + return Err(RpcClientError::InvalidData("expected JSON object".to_string())); } let error = &response["error"]; if !error.is_null() { - // TODO: Examine error code for a more precise std::io::ErrorKind. let rpc_error = RpcError { code: error["code"].as_i64().unwrap_or(-1), message: error["message"].as_str().unwrap_or("unknown error").to_string(), }; - return Err(std::io::Error::new(std::io::ErrorKind::Other, rpc_error)); + return Err(rpc_error.into()); } let result = match response.get_mut("result") { Some(result) => result.take(), - None => { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "expected JSON result", - )) - }, + None => return Err(RpcClientError::InvalidData("expected JSON result".to_string())), }; - JsonResponse(result).try_into() + JsonResponse(result) + .try_into() + .map_err(|e: std::io::Error| RpcClientError::InvalidData(e.to_string())) } } @@ -212,7 +231,10 @@ mod tests { let client = RpcClient::new(CREDENTIALS, server.endpoint()); match client.call_method::("getblockcount", &[]).await { - Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other), + Err(RpcClientError::Http(HttpClientError::Http(e))) => { + assert_eq!(e.status_code, 404); + }, + Err(e) => panic!("Unexpected error type: {:?}", e), Ok(_) => panic!("Expected error"), } } @@ -224,10 +246,10 @@ mod tests { let client = RpcClient::new(CREDENTIALS, server.endpoint()); match client.call_method::("getblockcount", &[]).await { - Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object"); + Err(RpcClientError::InvalidData(msg)) => { + assert_eq!(msg, "expected JSON object"); }, + Err(e) => panic!("Unexpected error type: {:?}", e), Ok(_) => panic!("Expected error"), } } @@ -242,12 +264,11 @@ mod tests { let invalid_block_hash = serde_json::json!("foo"); match client.call_method::("getblock", &[invalid_block_hash]).await { - Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::Other); - let rpc_error: Box = e.into_inner().unwrap().downcast().unwrap(); + Err(RpcClientError::Rpc(rpc_error)) => { assert_eq!(rpc_error.code, -8); assert_eq!(rpc_error.message, "invalid parameter"); }, + Err(e) => panic!("Unexpected error type: {:?}", e), Ok(_) => panic!("Expected error"), } } @@ -259,10 +280,10 @@ mod tests { let client = RpcClient::new(CREDENTIALS, server.endpoint()); match client.call_method::("getblockcount", &[]).await { - Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON result"); + Err(RpcClientError::InvalidData(msg)) => { + assert_eq!(msg, "expected JSON result"); }, + Err(e) => panic!("Unexpected error type: {:?}", e), Ok(_) => panic!("Expected error"), } } @@ -274,10 +295,10 @@ mod tests { let client = RpcClient::new(CREDENTIALS, server.endpoint()); match client.call_method::("getblockcount", &[]).await { - Err(e) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "not a number"); + Err(RpcClientError::InvalidData(msg)) => { + assert!(msg.contains("not a number")); }, + Err(e) => panic!("Unexpected error type: {:?}", e), Ok(_) => panic!("Expected error"), } } From 16ea4c7e2e05573bcffb3d6aa92592c24d5132ae Mon Sep 17 00:00:00 2001 From: Apostlex0 Date: Thu, 19 Feb 2026 19:35:28 +0530 Subject: [PATCH 2/2] Error api change- replaced io::error with string based format --- lightning-block-sync/src/convert.rs | 199 +++++++++++----------------- lightning-block-sync/src/http.rs | 63 ++++++--- lightning-block-sync/src/rest.rs | 27 ++-- lightning-block-sync/src/rpc.rs | 13 +- 4 files changed, 141 insertions(+), 161 deletions(-) diff --git a/lightning-block-sync/src/convert.rs b/lightning-block-sync/src/convert.rs index 47c7586a2c4..48a80c8cbf1 100644 --- a/lightning-block-sync/src/convert.rs +++ b/lightning-block-sync/src/convert.rs @@ -13,15 +13,15 @@ use bitcoin::Transaction; use serde_json; use bitcoin::hashes::Hash; -use std::convert::From; +use std::convert::Infallible; use std::convert::TryFrom; use std::convert::TryInto; use std::io; use std::str::FromStr; impl TryInto for JsonResponse { - type Error = io::Error; - fn try_into(self) -> Result { + type Error = Infallible; + fn try_into(self) -> Result { Ok(self.0) } } @@ -53,8 +53,10 @@ impl From for BlockSourceError { BlockSourceError::persistent(HttpClientError::Http(http_err)) } }, - // Delegate to existing From implementation - HttpClientError::Io(io_err) => BlockSourceError::from(io_err), + // Parse errors are persistent (invalid data) + HttpClientError::Parse(msg) => { + BlockSourceError::persistent(HttpClientError::Parse(msg)) + }, } } } @@ -81,7 +83,9 @@ impl From for BlockSourceError { ))) } }, - HttpClientError::Io(io_err) => BlockSourceError::from(io_err), + HttpClientError::Parse(msg) => { + BlockSourceError::persistent(RpcClientError::Http(HttpClientError::Parse(msg))) + }, }, // RPC errors (e.g. "block not found") are transient RpcClientError::Rpc(rpc_err) => { @@ -97,49 +101,42 @@ impl From for BlockSourceError { /// Parses binary data as a block. impl TryInto for BinaryResponse { - type Error = io::Error; + type Error = (); - fn try_into(self) -> io::Result { - match encode::deserialize(&self.0) { - Err(_) => return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid block data")), - Ok(block) => Ok(block), - } + fn try_into(self) -> Result { + encode::deserialize(&self.0).map_err(|_| ()) } } /// Parses binary data as a block hash. impl TryInto for BinaryResponse { - type Error = io::Error; + type Error = (); - fn try_into(self) -> io::Result { - BlockHash::from_slice(&self.0) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "bad block hash length")) + fn try_into(self) -> Result { + BlockHash::from_slice(&self.0).map_err(|_| ()) } } /// Converts a JSON value into block header data. The JSON value may be an object representing a /// block header or an array of such objects. In the latter case, the first object is converted. impl TryInto for JsonResponse { - type Error = io::Error; + type Error = &'static str; - fn try_into(self) -> io::Result { + fn try_into(self) -> Result { let header = match self.0 { serde_json::Value::Array(mut array) if !array.is_empty() => { array.drain(..).next().unwrap() }, serde_json::Value::Object(_) => self.0, - _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected JSON type")), + _ => return Err("unexpected JSON type"), }; if !header.is_object() { - return Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON object")); + return Err("expected JSON object"); } // Add an empty previousblockhash for the genesis block. - match header.try_into() { - Err(_) => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid header data")), - Ok(header) => Ok(header), - } + header.try_into().map_err(|_| "invalid header data") } } @@ -179,15 +176,15 @@ impl TryFrom for BlockHeaderData { /// Converts a JSON value into a block. Assumes the block is hex-encoded in a JSON string. impl TryInto for JsonResponse { - type Error = io::Error; + type Error = &'static str; - fn try_into(self) -> io::Result { + fn try_into(self) -> Result { match self.0.as_str() { - None => Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON string")), + None => Err("expected JSON string"), Some(hex_data) => match Vec::::from_hex(hex_data) { - Err(_) => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid hex data")), + Err(_) => Err("invalid hex data"), Ok(block_data) => match encode::deserialize(&block_data) { - Err(_) => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid block data")), + Err(_) => Err("invalid block data"), Ok(block) => Ok(block), }, }, @@ -197,35 +194,31 @@ impl TryInto for JsonResponse { /// Converts a JSON value into the best block hash and optional height. impl TryInto<(BlockHash, Option)> for JsonResponse { - type Error = io::Error; + type Error = &'static str; - fn try_into(self) -> io::Result<(BlockHash, Option)> { + fn try_into(self) -> Result<(BlockHash, Option), &'static str> { if !self.0.is_object() { - return Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON object")); + return Err("expected JSON object"); } let hash = match &self.0["bestblockhash"] { serde_json::Value::String(hex_data) => match BlockHash::from_str(&hex_data) { - Err(_) => { - return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid hex data")) - }, + Err(_) => return Err("invalid hex data"), Ok(block_hash) => block_hash, }, - _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON string")), + _ => return Err("expected JSON string"), }; let height = match &self.0["blocks"] { serde_json::Value::Null => None, serde_json::Value::Number(height) => match height.as_u64() { - None => return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid height")), + None => return Err("invalid height"), Some(height) => match height.try_into() { - Err(_) => { - return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid height")) - }, + Err(_) => return Err("invalid height"), Ok(height) => Some(height), }, }, - _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON number")), + _ => return Err("expected JSON number"), }; Ok((hash, height)) @@ -233,22 +226,18 @@ impl TryInto<(BlockHash, Option)> for JsonResponse { } impl TryInto for JsonResponse { - type Error = io::Error; - fn try_into(self) -> io::Result { - let hex_data = self - .0 - .as_str() - .ok_or(io::Error::new(io::ErrorKind::InvalidData, "expected JSON string"))?; - Txid::from_str(hex_data) - .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string())) + type Error = String; + fn try_into(self) -> Result { + let hex_data = self.0.as_str().ok_or_else(|| "expected JSON string".to_string())?; + Txid::from_str(hex_data).map_err(|err| err.to_string()) } } /// Converts a JSON value into a transaction. WATCH OUT! this cannot be used for zero-input transactions /// (e.g. createrawtransaction). See impl TryInto for JsonResponse { - type Error = io::Error; - fn try_into(self) -> io::Result { + type Error = String; + fn try_into(self) -> Result { let hex_tx = if self.0.is_object() { // result is json encoded match &self.0["hex"] { @@ -262,10 +251,7 @@ impl TryInto for JsonResponse { _ => "Unknown error", }; - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("transaction couldn't be signed. {}", reason), - )); + return Err(format!("transaction couldn't be signed. {}", reason)); } else { hex_data } @@ -274,7 +260,7 @@ impl TryInto for JsonResponse { _ => hex_data, }, _ => { - return Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON string")); + return Err("expected JSON string".to_string()); }, } } else { @@ -282,15 +268,15 @@ impl TryInto for JsonResponse { match self.0.as_str() { Some(hex_tx) => hex_tx, None => { - return Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON string")); + return Err("expected JSON string".to_string()); }, } }; match Vec::::from_hex(hex_tx) { - Err(_) => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid hex data")), + Err(_) => Err("invalid hex data".to_string()), Ok(tx_data) => match encode::deserialize(&tx_data) { - Err(_) => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid transaction")), + Err(_) => Err("invalid transaction".to_string()), Ok(tx) => Ok(tx), }, } @@ -298,16 +284,13 @@ impl TryInto for JsonResponse { } impl TryInto for JsonResponse { - type Error = io::Error; + type Error = &'static str; - fn try_into(self) -> io::Result { + fn try_into(self) -> Result { match self.0.as_str() { - None => Err(io::Error::new(io::ErrorKind::InvalidData, "expected JSON string")), - Some(hex_data) if hex_data.len() != 64 => { - Err(io::Error::new(io::ErrorKind::InvalidData, "invalid hash length")) - }, - Some(hex_data) => BlockHash::from_str(hex_data) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid hex data")), + None => Err("expected JSON string"), + Some(hex_data) if hex_data.len() != 64 => Err("invalid hash length"), + Some(hex_data) => BlockHash::from_str(hex_data).map_err(|_| "invalid hex data"), } } } @@ -322,24 +305,21 @@ pub(crate) struct GetUtxosResponse { #[cfg(feature = "rest-client")] impl TryInto for JsonResponse { - type Error = io::Error; + type Error = &'static str; - fn try_into(self) -> io::Result { - let obj_err = || io::Error::new(io::ErrorKind::InvalidData, "expected an object"); - let bitmap_err = || io::Error::new(io::ErrorKind::InvalidData, "missing bitmap field"); - let bitstr_err = || io::Error::new(io::ErrorKind::InvalidData, "bitmap should be an str"); + fn try_into(self) -> Result { let bitmap_str = self .0 .as_object() - .ok_or_else(obj_err)? + .ok_or("expected an object")? .get("bitmap") - .ok_or_else(bitmap_err)? + .ok_or("missing bitmap field")? .as_str() - .ok_or_else(bitstr_err)?; + .ok_or("bitmap should be an str")?; let mut hit_bitmap_nonempty = false; for c in bitmap_str.chars() { if c < '0' || c > '9' { - return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid byte")); + return Err("invalid byte"); } if c > '0' { hit_bitmap_nonempty = true; @@ -381,8 +361,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!(42)); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "unexpected JSON type"); + assert_eq!(e, "unexpected JSON type"); }, Ok(_) => panic!("Expected error"), } @@ -393,8 +372,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!([42])); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object"); + assert_eq!(e, "expected JSON object"); }, Ok(_) => panic!("Expected error"), } @@ -411,8 +389,7 @@ pub(crate) mod tests { match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "invalid header data"); + assert_eq!(e, "invalid header data"); }, Ok(_) => panic!("Expected error"), } @@ -429,8 +406,7 @@ pub(crate) mod tests { match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "invalid header data"); + assert_eq!(e, "invalid header data"); }, Ok(_) => panic!("Expected error"), } @@ -524,8 +500,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!({ "result": "foo" })); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string"); + assert_eq!(e, "expected JSON string"); }, Ok(_) => panic!("Expected error"), } @@ -536,8 +511,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("foobar")); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "invalid hex data"); + assert_eq!(e, "invalid hex data"); }, Ok(_) => panic!("Expected error"), } @@ -548,8 +522,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("abcd")); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "invalid block data"); + assert_eq!(e, "invalid block data"); }, Ok(_) => panic!("Expected error"), } @@ -570,8 +543,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("foo")); match TryInto::<(BlockHash, Option)>::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object"); + assert_eq!(e, "expected JSON object"); }, Ok(_) => panic!("Expected error"), } @@ -582,8 +554,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!({ "bestblockhash": 42 })); match TryInto::<(BlockHash, Option)>::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string"); + assert_eq!(e, "expected JSON string"); }, Ok(_) => panic!("Expected error"), } @@ -594,8 +565,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!({ "bestblockhash": "foobar"} )); match TryInto::<(BlockHash, Option)>::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "invalid hex data"); + assert_eq!(e, "invalid hex data"); }, Ok(_) => panic!("Expected error"), } @@ -625,8 +595,7 @@ pub(crate) mod tests { })); match TryInto::<(BlockHash, Option)>::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON number"); + assert_eq!(e, "expected JSON number"); }, Ok(_) => panic!("Expected error"), } @@ -641,8 +610,7 @@ pub(crate) mod tests { })); match TryInto::<(BlockHash, Option)>::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "invalid height"); + assert_eq!(e, "invalid height"); }, Ok(_) => panic!("Expected error"), } @@ -669,8 +637,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!({ "result": "foo" })); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string"); + assert_eq!(e, "expected JSON string"); }, Ok(_) => panic!("Expected error"), } @@ -681,8 +648,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("foobar")); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "failed to parse hex"); + assert_eq!(e, "failed to parse hex"); }, Ok(_) => panic!("Expected error"), } @@ -693,8 +659,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("abcd")); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "failed to parse hex"); + assert_eq!(e, "failed to parse hex"); }, Ok(_) => panic!("Expected error"), } @@ -714,9 +679,8 @@ pub(crate) mod tests { fn into_txid_from_bitcoind_rpc_json_response() { let mut rpc_response = serde_json::json!( {"error": "", "id": "770", "result": "7934f775149929a8b742487129a7c3a535dfb612f0b726cc67bc10bc2628f906"} - ); - let r: io::Result = + let r: Result = JsonResponse(rpc_response.get_mut("result").unwrap().take()).try_into(); assert_eq!( r.unwrap().to_string(), @@ -736,8 +700,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("foobar")); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "invalid hex data"); + assert_eq!(e, "invalid hex data"); }, Ok(_) => panic!("Expected error"), } @@ -748,8 +711,7 @@ pub(crate) mod tests { let response = JsonResponse(Value::Number(Number::from_f64(1.0).unwrap())); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string"); + assert_eq!(e, "expected JSON string"); }, Ok(_) => panic!("Expected error"), } @@ -760,8 +722,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!("abcd")); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "invalid transaction"); + assert_eq!(e, "invalid transaction"); }, Ok(_) => panic!("Expected error"), } @@ -797,8 +758,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!({ "error": "foo" })); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string"); + assert_eq!(e, "expected JSON string"); }, Ok(_) => panic!("Expected error"), } @@ -809,12 +769,7 @@ pub(crate) mod tests { let response = JsonResponse(serde_json::json!({ "hex": "foo", "complete": false })); match TryInto::::try_into(response) { Err(e) => { - assert_eq!(e.kind(), io::ErrorKind::InvalidData); - assert!(e - .get_ref() - .unwrap() - .to_string() - .contains("transaction couldn't be signed")); + assert!(e.contains("transaction couldn't be signed")); }, Ok(_) => panic!("Expected error"), } diff --git a/lightning-block-sync/src/http.rs b/lightning-block-sync/src/http.rs index 29cc4256437..f473849226f 100644 --- a/lightning-block-sync/src/http.rs +++ b/lightning-block-sync/src/http.rs @@ -6,9 +6,40 @@ use serde_json; #[cfg(feature = "tokio")] use bitreq::RequestExt; +use std::convert::Infallible; use std::convert::TryFrom; use std::fmt; +/// Trait for converting parse errors into a String message. +pub trait ToParseErrorMessage { + /// Converts a parse error into a human-readable message. + fn to_parse_error_message(self) -> String; +} + +impl ToParseErrorMessage for Infallible { + fn to_parse_error_message(self) -> String { + match self {} + } +} + +impl ToParseErrorMessage for () { + fn to_parse_error_message(self) -> String { + "invalid data".to_string() + } +} + +impl ToParseErrorMessage for &'static str { + fn to_parse_error_message(self) -> String { + self.to_string() + } +} + +impl ToParseErrorMessage for String { + fn to_parse_error_message(self) -> String { + self + } +} + /// Timeout for requests in seconds. This is set to a high value as it is not uncommon for Bitcoin /// Core to be blocked waiting on UTXO cache flushes for upwards of 10 minutes on slow devices /// (e.g. RPis with SSDs over USB). @@ -26,7 +57,7 @@ pub enum HttpClientError { /// HTTP error response (non-2xx status code) Http(HttpError), /// Response parsing/conversion error - Io(std::io::Error), + Parse(String), } impl std::error::Error for HttpClientError { @@ -34,7 +65,7 @@ impl std::error::Error for HttpClientError { match self { HttpClientError::Transport(e) => Some(e), HttpClientError::Http(e) => Some(e), - HttpClientError::Io(e) => Some(e), + HttpClientError::Parse(_) => None, } } } @@ -44,17 +75,11 @@ impl fmt::Display for HttpClientError { match self { HttpClientError::Transport(e) => write!(f, "transport error: {}", e), HttpClientError::Http(e) => write!(f, "HTTP error: {}", e), - HttpClientError::Io(e) => write!(f, "Response parsing/conversion error: {}", e), + HttpClientError::Parse(e) => write!(f, "response parsing error: {}", e), } } } -impl From for HttpClientError { - fn from(e: std::io::Error) -> Self { - HttpClientError::Io(e) - } -} - impl From for HttpClientError { fn from(e: bitreq::Error) -> Self { HttpClientError::Transport(e) @@ -97,7 +122,8 @@ impl HttpClient { #[allow(dead_code)] pub async fn get(&self, uri: &str) -> Result where - F: TryFrom, Error = std::io::Error>, + F: TryFrom>, + >>::Error: ToParseErrorMessage, { let url = format!("{}{}", self.base_url, uri); let request = bitreq::get(url) @@ -106,7 +132,7 @@ impl HttpClient { #[cfg(feature = "tokio")] let request = request.with_pipelining(); let response_body = self.send_request(request).await?; - F::try_from(response_body).map_err(HttpClientError::Io) + F::try_from(response_body).map_err(|e| HttpClientError::Parse(e.to_parse_error_message())) } /// Sends a `POST` request for a resource identified by `uri` using the given HTTP @@ -119,7 +145,8 @@ impl HttpClient { &self, uri: &str, auth: &str, content: serde_json::Value, ) -> Result where - F: TryFrom, Error = std::io::Error>, + F: TryFrom>, + >>::Error: ToParseErrorMessage, { let url = format!("{}{}", self.base_url, uri); let request = bitreq::post(url) @@ -131,7 +158,7 @@ impl HttpClient { #[cfg(feature = "tokio")] let request = request.with_pipelining(); let response_body = self.send_request(request).await?; - F::try_from(response_body).map_err(HttpClientError::Io) + F::try_from(response_body).map_err(|e| HttpClientError::Parse(e.to_parse_error_message())) } /// Sends an HTTP request message and reads the response, returning its body. @@ -178,19 +205,19 @@ pub struct JsonResponse(pub serde_json::Value); /// Interprets bytes from an HTTP response body as binary data. impl TryFrom> for BinaryResponse { - type Error = std::io::Error; + type Error = Infallible; - fn try_from(bytes: Vec) -> std::io::Result { + fn try_from(bytes: Vec) -> Result { Ok(BinaryResponse(bytes)) } } /// Interprets bytes from an HTTP response body as a JSON value. impl TryFrom> for JsonResponse { - type Error = std::io::Error; + type Error = String; - fn try_from(bytes: Vec) -> std::io::Result { - Ok(JsonResponse(serde_json::from_slice(&bytes)?)) + fn try_from(bytes: Vec) -> Result { + serde_json::from_slice(&bytes).map(JsonResponse).map_err(|e| e.to_string()) } } diff --git a/lightning-block-sync/src/rest.rs b/lightning-block-sync/src/rest.rs index 0ea93895bcd..cdcf8424d2a 100644 --- a/lightning-block-sync/src/rest.rs +++ b/lightning-block-sync/src/rest.rs @@ -3,7 +3,7 @@ use crate::convert::GetUtxosResponse; use crate::gossip::UtxoSource; -use crate::http::{BinaryResponse, HttpClient, HttpClientError, JsonResponse}; +use crate::http::{BinaryResponse, HttpClient, HttpClientError, JsonResponse, ToParseErrorMessage}; use crate::{BlockData, BlockHeaderData, BlockSource, BlockSourceResult}; use bitcoin::hash_types::BlockHash; @@ -29,10 +29,13 @@ impl RestClient { /// Requests a resource encoded in `F` format and interpreted as type `T`. pub async fn request_resource(&self, resource_path: &str) -> Result where - F: TryFrom, Error = std::io::Error> + TryInto, + F: TryFrom> + TryInto, + >>::Error: ToParseErrorMessage, + >::Error: ToParseErrorMessage, { let uri = format!("/{}", resource_path); - self.client.get::(&uri).await?.try_into().map_err(HttpClientError::Io) + let response = self.client.get::(&uri).await?; + response.try_into().map_err(|e| HttpClientError::Parse(e.to_parse_error_message())) } } @@ -91,21 +94,15 @@ impl UtxoSource for RestClient { mod tests { use super::*; use crate::http::client_tests::{HttpServer, MessageBody}; - use crate::http::BinaryResponse; use bitcoin::hashes::Hash; /// Parses binary data as a string-encoded `u32`. impl TryInto for BinaryResponse { - type Error = std::io::Error; - - fn try_into(self) -> std::io::Result { - match std::str::from_utf8(&self.0) { - Err(e) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)), - Ok(s) => match u32::from_str_radix(s, 10) { - Err(e) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)), - Ok(n) => Ok(n), - }, - } + type Error = String; + + fn try_into(self) -> Result { + let s = std::str::from_utf8(&self.0).map_err(|e| e.to_string())?; + u32::from_str_radix(s, 10).map_err(|e| e.to_string()) } } @@ -127,7 +124,7 @@ mod tests { let client = RestClient::new(server.endpoint()); match client.request_resource::("/").await { - Err(HttpClientError::Io(_)) => {}, + Err(HttpClientError::Parse(_)) => {}, Err(e) => panic!("Unexpected error type: {:?}", e), Ok(_) => panic!("Expected error"), } diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs index c81d7f23da9..bfa1b31c84a 100644 --- a/lightning-block-sync/src/rpc.rs +++ b/lightning-block-sync/src/rpc.rs @@ -2,7 +2,7 @@ //! endpoint. use crate::gossip::UtxoSource; -use crate::http::{HttpClient, HttpClientError, JsonResponse}; +use crate::http::{HttpClient, HttpClientError, JsonResponse, ToParseErrorMessage}; use crate::{BlockData, BlockHeaderData, BlockSource, BlockSourceResult}; use bitcoin::hash_types::BlockHash; @@ -106,7 +106,8 @@ impl RpcClient { &self, method: &str, params: &[serde_json::Value], ) -> Result where - JsonResponse: TryFrom, Error = std::io::Error> + TryInto, + JsonResponse: TryInto, + >::Error: ToParseErrorMessage, { let content = serde_json::json!({ "method": method, @@ -148,7 +149,7 @@ impl RpcClient { JsonResponse(result) .try_into() - .map_err(|e: std::io::Error| RpcClientError::InvalidData(e.to_string())) + .map_err(|e| RpcClientError::InvalidData(e.to_parse_error_message())) } } @@ -215,11 +216,11 @@ mod tests { /// Converts a JSON value into `u64`. impl TryInto for JsonResponse { - type Error = std::io::Error; + type Error = &'static str; - fn try_into(self) -> std::io::Result { + fn try_into(self) -> Result { match self.0.as_u64() { - None => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "not a number")), + None => Err("not a number"), Some(n) => Ok(n), } }