diff --git a/Cargo.lock b/Cargo.lock index e7f76760a..0d9303cd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -426,6 +426,27 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" +[[package]] +name = "bitreq" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c84f27ed293cb5218ab015faad9fbb95cf7905865ce71df075c8805a0b33b71" +dependencies = [ + "serde", + "serde_json", +] + +[[package]] +name = "bitreq" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6df90cd78f0510165fd370574676aeb57dbec0ee3bfff68645bb7b0e9a65dbd5" +dependencies = [ + "rustls 0.23.40", + "rustls-webpki 0.103.13", + "webpki-roots 1.0.7", +] + [[package]] name = "bumpalo" version = "3.20.2" @@ -684,13 +705,13 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "corepc-client" -version = "0.10.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7755b8b9219b23d166a5897b5e2d8266cbdd0de5861d351b96f6db26bcf415f3" +checksum = "dc0d0096927a820ea80d4a43a2355209d9a69ef5b420861bf413c7e667cbff0b" dependencies = [ "bitcoin 0.32.8", "corepc-types", - "jsonrpc 0.18.0", + "jsonrpc 0.19.0", "log", "serde", "serde_json", @@ -698,16 +719,16 @@ dependencies = [ [[package]] name = "corepc-node" -version = "0.10.1" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "768391062ec3812e223bb3031c5b2fcdd6e0e60b816157f21df82fd3e6617dc0" +checksum = "2f8749105873c391b77fc943b7fdf8c05b6c1d06f6e71c6d2746ee7f8bda0072" dependencies = [ "anyhow", "bitcoin_hashes 0.14.1", + "bitreq 0.3.5", "corepc-client", "flate2", "log", - "minreq", "serde_json", "tar", "tempfile", @@ -717,9 +738,9 @@ dependencies = [ [[package]] name = "corepc-types" -version = "0.10.1" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c22db78b0223b66f82f92b14345f06307078f76d94b18280431ea9bc6cd9cbb6" +checksum = "1583872320eb2ac629c36753023fd072f1ca1b3b74b20cc62bab055b54278789" dependencies = [ "bitcoin 0.32.8", "serde", @@ -1748,6 +1769,18 @@ dependencies = [ "serde_json", ] +[[package]] +name = "jsonrpc" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "629d2b4ae586d04b6bae3c75879d7ddd39325c2b67a9c87634f4ec88a488dc65" +dependencies = [ + "base64 0.22.1", + "bitreq 0.2.0", + "serde", + "serde_json", +] + [[package]] name = "jwalk" version = "0.8.1" @@ -1939,7 +1972,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05015102dad0f7d61691ca347e9d9d9006685a64aefb3d79eecf62665de2153d" dependencies = [ "rustls 0.21.12", - "rustls-webpki", + "rustls-webpki 0.101.7", "serde", "serde_json", "webpki-roots 0.25.4", @@ -2676,10 +2709,33 @@ checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring 0.17.14", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct 0.7.1", ] +[[package]] +name = "rustls" +version = "0.23.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b" +dependencies = [ + "once_cell", + "ring 0.17.14", + "rustls-pki-types", + "rustls-webpki 0.103.13", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pki-types" +version = "1.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a7197ae7eb376e574fe940d068c30fe0462554a3ddbe4eca7838e049c937a9" +dependencies = [ + "zeroize", +] + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -2690,6 +2746,17 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "rustls-webpki" +version = "0.103.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" +dependencies = [ + "ring 0.17.14", + "rustls-pki-types", + "untrusted 0.9.0", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -3073,6 +3140,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + [[package]] name = "syn" version = "1.0.109" @@ -3957,6 +4030,15 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "webpki-roots" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52f5ee44c96cf55f1b349600768e3ece3a8f26010c05265ab73f945bb1a2eb9d" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "3.1.1" @@ -4424,6 +4506,12 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zeroize" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" + [[package]] name = "zeromq-src" version = "0.2.6+4.3.4" diff --git a/Cargo.toml b/Cargo.toml index 29f355be1..90c92719b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ tracing-subscriber = { version = "0.3.17", default-features = false, features = opentelemetry-semantic-conventions = { version = "0.12.0", optional = true } tracing = { version = "0.1.40", default-features = false, features = ["attributes"], optional = true } rand = "0.9.1" +ureq = { version = "3.1", default-features = false, features = ["json"] } # optional dependencies for electrum-discovery electrum-client = { version = "0.8", optional = true } @@ -77,10 +78,9 @@ zmq = "0.10.0" electrs_macros = { path = "electrs_macros", default-features = false } [dev-dependencies] -corepc-node = { version = "0.10", features = ["download", "29_0"] } +corepc-node = { version = "0.12", features = ["download", "30_2"] } elementsd = { version = "0.11", features = ["22_1_1"] } electrumd = { version = "0.1.0", features = ["4_6_2"] } -ureq = { version = "3.1", default-features = false, features = ["json"] } tempfile = "3.10" criterion = { version = "0.8", features = ["html_reports"] } bitcoin-test-data = { version = "*" } diff --git a/README.md b/README.md index 8fc410b97..e0052ebc8 100644 --- a/README.md +++ b/README.md @@ -30,15 +30,6 @@ Creating the indexes should take a few hours on a beefy machine with SSD. To deploy with Docker, follow the [instructions here](https://github.com/Blockstream/esplora#how-to-build-the-docker-image). -### Light mode - -For personal or low-volume use, you may set `--lightmode` to reduce disk storage requirements -by roughly 50% at the cost of slower and more expensive lookups. - -With this option set, raw transactions and metadata associated with blocks will not be kept in rocksdb -(the `T`, `X` and `M` indexes), -but instead queried from bitcoind on demand. - ### Notable changes from Electrs: - HTTP REST API in addition to the Electrum JSON-RPC protocol, with extended transaction information @@ -63,7 +54,6 @@ but instead queried from bitcoind on demand. In addition to electrs's original configuration options, a few new options are also available: - `--http-addr ` - HTTP server address/port to listen on (default: `127.0.0.1:3000`). -- `--lightmode` - enable light mode (see above) - `--cors ` - origins allowed to make cross-site request (optional, defaults to none). - `--address-search` - enables the by-prefix address search index. - `--index-unspendables` - enables indexing of provably unspendable outputs. diff --git a/doc/indexer-consistency.md b/doc/indexer-consistency.md new file mode 100644 index 000000000..eff18b8df --- /dev/null +++ b/doc/indexer-consistency.md @@ -0,0 +1,338 @@ +# Indexer Consistency And Crash Recovery + +## Purpose + +This document describes indexer consistency, live public API visibility, crash +recovery, and publication behavior. It records the failure modes and visibility +scenarios considered for the indexer and the reason the current algorithm +should expose and recover a consistent state. + +Use this when reviewing future changes to `Store::open()`, `Indexer::update()`, +`HeaderList` visibility, completion markers, stale cleanup, or persisted tip +publication. + +## Core Invariants + +The indexer is consistent and crash-recoverable if these invariants hold. Treat +this section as the review checklist when changing indexer publication or +recovery code. + +### Visibility And Staging + +- The indexer has two publication cutoffs: persisted `t` for startup visibility + and in-memory `indexed_headers` (`HeaderList`) for live public API visibility. + Both treat rows beyond the visible tip as staging data. +- Public best-chain APIs derive block visibility, confirmations, and + height-based history visibility from the current `indexed_headers` chain. +- During forward indexing, new block rows are written before `indexed_headers` + is appended, so partial new-block best-chain state is hidden from public APIs + until the update completes. +- During reorg cleanup, `indexed_headers` is first rolled back to the common + ancestor. This immediately hides stale headers and height-based stale history + from public APIs; stale deletion and replacement indexing then run in the + staging area. +- On startup, `Store::open()` reconstructs visibility from persisted `t` and + defensively trims it to the contiguous prefix where both completion markers + exist. Under current publication rules this trim should not be needed in + normal operation; it is a fail-closed guard if `t` and completion markers + disagree. + +### Completion State + +- Txstore and history each have a per-block `D` marker, written atomically with + that side's rows, recording that the corresponding side of the block is + complete. +- A block is fully indexed only when both txstore and history are complete. +- A block's txstore-side rows required for recovery are written before, or in + the same atomic write as, that block's history-side rows. + +### Startup Recovery + +- The first daemon-aware `Indexer::update()` keeps `new_headers` and the current + `indexed_headers` chain. History-complete blocks outside that set are + stale and are undone. +- Startup recovery runs before live reorg rollback, so live-reorg stale headers + are still in `indexed_headers` and are left for the normal reorg path. + +### Stale Cleanup And Reorg Publication + +- Live reorg cleanup and startup stale cleanup both reconstruct stale blocks + and spent prevouts from local RocksDB data, then delete stale history rows + through `process_stale()` and `undo_index()`. +- On reorg, the live `HeaderList` tip and persisted `t` are rolled back to the + common ancestor before stale history rows are undone and replacement rows are + indexed in the staging area. +- Stale history deletion is written and flushed before replacement history rows. + Write order protects same-height keys; the flush is the crash barrier before + replacement indexing. +- Stale cleanup deletes history rows only. Txstore rows for stale blocks are + cheap to keep around, stay outside the history-index lookup hot path, can be + reused if the block becomes best chain again, and may be useful for future + archival access. These rows are not public best-chain state unless + `indexed_headers` contains the block. +- The live `HeaderList` tip and persisted `t` advance only after stale cleanup, + block processing, and required flushes have completed. + +### Storage Durability + +- Txstore and history live in column families in one RocksDB database with + atomic flush enabled. +- RocksDB may flush memtables before `Store::flush_block_writes()` runs. This is + safe: atomic flushes publish a coherent sequence-number cutoff across column + families and cannot split a write batch. +- Therefore, a durable history `D` implies the required txstore-side recovery + rows are durable from the same or an earlier flush. Early durable rows remain + unpublished until `t` and `indexed_headers` advance; restart uses completion + markers to skip, finish, or undo them. +- When bulk writes are disabled, `Store::flush_block_writes()` still atomically + flushes txstore and history before the final `t` advance is published. +- Persisted tip `t` is written synchronously. + +## Failure And Visibility Matrix + +### Startup And Initial Sync + +1. **No `t` exists and initial sync crashed** + + `Store::open()` starts with an empty visible chain. The first `update()` + downloads the current best chain from genesis. Blocks that are already + complete are skipped by `headers_to_process()`, incomplete blocks are + finished, and history-complete blocks outside the new best chain are undone. + + Result: correct. + +2. **`t` points behind durable completed best-chain work** + + Completed blocks beyond `t` are initially hidden because `Store::open()` only + rebuilds from the stored `t` chain. The first `update()` receives those blocks + as `new_headers`, skips any sides that are already complete, finishes missing + work, and republishes `t`. + + Result: correct. + +3. **The stored `t` chain has an incomplete suffix** + + Current publication rules write `t` only after block processing and required + flushes complete, so normal crash recovery should not produce this state. If + the durable markers and stored `t` nevertheless disagree, `Store::open()` + rebuilds the stored `t` chain, then walks forward from genesis and trims at + the first block missing either completion marker. The incomplete suffix is + hidden until the first `update()` finishes it or undoes it if it became + stale. + + Result: correct. + +4. **Crash during `Store::open()`** + + `Store::open()` does not mutate the database. A crash while opening leaves + the database unchanged, so the next startup repeats the same derivation. + + Result: correct. + +### Forward Processing And Publication + +5. **Crash after txstore completion but before history completion** + + Txstore `D` exists and history `D` does not. `Store::open()` hides the block + unless all prior blocks are complete and this block is later completed. The + next `update()` sees `need_history = true` and writes the missing history + side if the block remains best chain. + + Result: correct. + +6. **History completion exists without txstore recovery data** + + Txstore-side recovery data is written before, or in the same atomic write as, + history `D`, and atomic flush makes a durable history `D` imply the required + txstore rows are durable. This state is not expected from normal crash + recovery. If it exists anyway, `Store::open()` hides the block because both + markers are required. The next `update()` writes the missing txstore side if + the block remains best chain. + + Result: correct. + +7. **Crash after both sides complete, before final `t` publication** + + Both completion markers exist, but public visibility may still be behind. + The first `update()` gets the completed blocks as `new_headers`, skips their + completed sides, and publishes `t` after confirming the update is complete. + + Result: correct. + +8. **Public API query while new blocks are staged** + + New block rows may already exist in RocksDB, but `indexed_headers` has not + advanced yet. Best-chain APIs derive block visibility, confirmations, and + height-based history from `indexed_headers`, so staged rows are not exposed + as confirmed best-chain state. + + Result: correct. + +9. **Crash after final `t` publication, before in-memory append** + + The final daemon-tip `t` advance is written with `put_sync()` only after + block processing and required flushes. On restart, `Store::open()` + reconstructs visibility from that persisted tip and the completion markers. + + Result: correct. + +10. **Spenttxouts workers complete blocks out of order** + + Later blocks may have both `D` markers before earlier blocks do. + Persisted `t` is not published to the target tip until all blocks in the + update range and required flushes complete. On restart, completed blocks + beyond `t` remain staged; the next `update()` fills gaps and skips + already-complete blocks. + + Result: correct. + +11. **Legacy batch processing crashes mid-batch** + + Legacy forward mode uses the same txstore/history completion markers per + side. Restart uses the shared marker logic: txstore-complete and + history-incomplete blocks are indexed, fully complete blocks are skipped, + and incomplete blocks remain hidden until finished. + + Result: correct. + +### Best-Chain Blocks That Become Stale While Electrs Is Down + +12. **Crash during forward processing, then the partially processed suffix + becomes stale** + + Persisted `t` keeps the partially processed suffix hidden on startup. + The first daemon-aware `update()` keeps `new_headers` and the current + `indexed_headers` chain. Because startup recovery runs before live reorg + rollback, any live-reorg stale suffix is still in `indexed_headers` and is + left for the normal reorg path. Any other history-complete block is + stale and is reconstructed from local txstore rows for undo. + + Result: correct. + +13. **Crash during initial sync with no published `t`, then indexed blocks + become stale** + + With no `t`, the visible chain starts empty. The first `update()` downloads + the current best chain from genesis. History-complete blocks not in that + chain are found by startup stale recovery and undone. + + Result: correct. + +14. **Startup recovery finds no new headers but stale history exists** + + This can happen if the daemon tip appears unchanged relative to the retained + prefix but hidden history-complete stale blocks exist. The pending startup + sweep still runs once and undoes history-complete blocks outside + `indexed_headers`. + + Result: correct. + +### Live Reorg And Stale Cleanup + +15. **Crash before persisting rollback to the common ancestor** + + The in-memory pop is lost with the process. On restart, `Store::open()` uses + the old persisted `t` plus completion markers, and the next `update()` + redetects the reorg against bitcoind. + + Result: correct. + +16. **Crash after persisting rollback, before stale rows are deleted** + + Persisted `t` points to the common ancestor. Stale rows are hidden from + public visibility. The next startup's first `update()` sees stale + history-complete blocks outside the target chain and undoes them. + + Result: correct. + +17. **Public API query during stale cleanup** + + `indexed_headers` has already been rolled back to the common ancestor. Stale + headers and height-based stale history are hidden from public APIs even if + the stale history rows have not yet been deleted. + + Result: correct. + +18. **Crash during stale history deletion** + + Deleting stale history rows is idempotent. If the delete batch was lost, the + stale history `D` remains and startup recovery undoes it again. If the delete + batch survived, the stale `D` is gone and there is nothing left to undo. + + Result: correct. + +19. **Crash after stale deletion, before the history flush** + + RocksDB will expose either the old state or the deleted state after restart. + The startup sweep handles both: stale `D` present means undo again; stale `D` + absent means no action. + + Result: correct. + +20. **Crash after stale deletion flush, before replacement blocks are indexed** + + Stale rows are durably gone and persisted `t` is at the common ancestor. The + next `update()` indexes the replacement `new_headers`. + + Result: correct. + +21. **Crash after replacement rows are written, before final `t` publication** + + Replacement rows may be durable, but public visibility remains at the common + ancestor. On restart, replacement blocks are in `new_headers`; completed + sides are skipped, incomplete sides are finished, and `t` is published. + + Result: correct. + +22. **Stale and replacement blocks share a height** + + Stale history deletion is written before replacement history rows and + flushed before replacement indexing. Write order protects same-height keys; + the flush keeps that order durable across a crash while WAL-disabled writes + may be in use. + + Result: correct. + +23. **Manual chain shortening with no new headers** + + `HeaderList::preprocess()` can return `reorged_since` even when + `new_headers` is empty. The normal stale path pops the suffix, persists the + common ancestor tip, undoes stale rows, and republishes. + + Result: correct. + +### Daemon And Retry Failures + +24. **Daemon reorgs again while `update()` is running** + + Electrs finishes or fails the update against the tip observed at the start. + If it finishes, the next `update()` detects the newer reorg. If fetches fail, + `update()` returns an error before final publication, leaving recovery to + retry from durable markers. + + Result: eventually correct; transient lag or retry is expected. + +25. **Daemon/RPC/REST fetch fails during forward processing** + + `process_blocks()` returns an error before final `t` publication. Any + completed sides are represented by `D` markers and will be skipped next time; + incomplete sides are retried. + + Result: correct after retry. + +26. **Current daemon cannot serve stale block during stale undo** + + Stale cleanup reconstructs stale blocks and spent prevouts from local + RocksDB data. It does not need the current daemon to serve stale block + contents, so a multiple-backend setup can still undo stale history even if + the daemon that originally served the stale block is no longer reachable. + + Result: correct. + +27. **In-process retry after startup sweep succeeds but later processing fails** + + The startup flag may already be cleared in memory, but stale deletion has + already been completed and flushed before that happens. Later retries only + need normal forward/reorg processing. + + Result: correct. diff --git a/doc/schema.md b/doc/schema.md index d9dabf089..ba45cbedc 100644 --- a/doc/schema.md +++ b/doc/schema.md @@ -1,25 +1,44 @@ # Index Schema -The index is stored as three RocksDB databases: +The index is stored as a RocksDB database with four column families: -- `txstore` -- `history` -- `cache` +- `default` - global metadata +- `txstore` - block, transaction and output rows +- `history` - scripthash history and spend rows +- `cache` - aggregated script and asset caches ### Indexing process -The indexing is done in the two phase, where each can be done concurrently within itself. -The first phase populates the `txstore` database, the second phase populates the `history` database. +Electrs indexes blocks using the bitcoind binary REST `block` endpoint, +storing blocks, transactions, outputs and scripthash funding/spending history. -NOTE: in order to construct the history rows for spending inputs in phase #2, we rely on having the transactions being processed at phase #1, so they can be looked up efficiently (using parallel point lookups). +There are two indexing modes: -After the indexing is completed, both funding and spending are indexed as independent rows under `H{scripthash}`, so that they can be queried in-order in one go. +- Default (non-Elements) mode: fetches previous outputs from bitcoind's binary + REST `spenttxouts` endpoint. Because indexing does not depend on local RocksDB + TXO lookups, blocks can be fully processed in parallel and in any order. + +- Legacy mode: used for Elements and Bitcoin Core <v30 (with `--no-spenttxouts`), + which do not support `spenttxouts`. Indexing is done in two phases, where each can be + done concurrently within itself: first populate `txstore` with block/transaction/output + data, then populate `history` by looking up spent TXOs in `txstore`. + +After the indexing is completed, both funding and spending are indexed as independent +rows under `H{scripthash}`, so that they can be queried in-order in one go. + +### `default` + +Global metadata rows: + + * `"t" → "{blockhash}"` (the synced tip) + + * `"V" → "{db-version}"` (compatibility marker) ### `txstore` Each block results in the following new rows: - * `"B{blockhash}" → "{header}"` + * `"B{blockhash}" → "{height, header}"` * `"X{blockhash}" → "{txids}"` (list of txids included in the block) @@ -36,10 +55,6 @@ Each output results in the following new rows: * `"O{txid}{vout}" → "{scriptpubkey}{value}"` * `"a{funding-address-str}" → ""` (for prefix address search, only saved when `--address-search` is enabled) -When the indexer is synced up to the tip of the chain, the hash of the tip is saved as following: - - * `"t" → "{blockhash}"` - ### `history` Each transaction results in the following new row: diff --git a/src/bin/db-migrate-v1-to-v2.rs b/src/bin/db-migrate-v1-to-v2.rs deleted file mode 100644 index 606583c13..000000000 --- a/src/bin/db-migrate-v1-to-v2.rs +++ /dev/null @@ -1,335 +0,0 @@ -use std::collections::BTreeSet; -use std::convert::TryInto; -use std::str; - -use itertools::Itertools; -use log::{debug, info, trace}; -use rocksdb::WriteBatch; - -use bitcoin::hashes::Hash; - -use electrs::chain::{BlockHash, Txid}; -use electrs::new_index::db::DBFlush; -use electrs::new_index::schema::{ - lookup_confirmations, FullHash, Store, TxConfRow as V2TxConfRow, TxHistoryKey, -}; -use electrs::util::bincode::{deserialize_big, deserialize_little, serialize_little}; -use electrs::{config::Config, metrics::Metrics}; - -const FROM_DB_VERSION: u32 = 1; -const TO_DB_VERSION: u32 = 2; - -const BATCH_SIZE: usize = 15000; -const PROGRESS_EVERY: usize = BATCH_SIZE * 50; - -// For Elements-based chains the 'I' asset history index is migrated too -#[cfg(not(feature = "liquid"))] -const HISTORY_PREFIXES: [u8; 1] = [b'H']; -#[cfg(feature = "liquid")] -const HISTORY_PREFIXES: [u8; 2] = [b'H', b'I']; - -fn main() { - let config = Config::from_args(); - let metrics = Metrics::new(config.monitoring_addr); - let store = Store::open(&config, &metrics, false); - - let txstore_db = store.txstore_db(); - let history_db = store.history_db(); - let cache_db = store.cache_db(); - let headers = store.headers(); - let tip_height = headers.best_height() as u32; - - // Check the DB version under `V` matches the expected version - for db in [txstore_db, history_db, cache_db] { - let ver_bytes = db.get(b"V").expect("missing DB version"); - let ver: u32 = deserialize_little(&ver_bytes[0..4]).unwrap(); - assert_eq!(ver, FROM_DB_VERSION, "unexpected DB version {}", ver); - } - - // Utility to log progress once every PROGRESS_EVERY ticks - let mut tick = 0usize; - macro_rules! progress { - ($($arg:tt)+) => {{ - tick = tick.wrapping_add(1); - if tick % PROGRESS_EVERY == 0 { - debug!($($arg)+); - } - }}; - } - - // 1. Migrate the address prefix search index - // Moved as-is from the history db to the txstore db - info!("[1/4] migrating address prefix search index..."); - let address_iter = history_db.iter_scan(b"a"); - for chunk in &address_iter.chunks(BATCH_SIZE) { - let mut batch = WriteBatch::default(); - for row in chunk { - progress!("[1/4] at {}", str::from_utf8(&row.key[1..]).unwrap()); - batch.put(row.key, row.value); - } - // Write batches without flushing (sync and WAL disabled) - trace!("[1/4] writing batch of {} ops", batch.len()); - txstore_db.write_batch(batch, DBFlush::Disable); - } - // Flush the txstore db, only then delete the original rows from the history db - info!("[1/4] flushing V2 address index to txstore db"); - txstore_db.flush(); - info!("[1/4] deleting V1 address index from history db"); - history_db.delete_range(b"a", b"b", DBFlush::Enable); - - // 2. Migrate the TxConf transaction confirmation index - // - Moved from the txstore db to the history db - // - Changed from a set of blocks seen to include the tx to a single block (that is part of the best chain) - // - Changed from the block hash to the block height - // - Entries originating from stale blocks are removed - // Steps 3/4 depend on this index getting migrated first - info!("[2/4] migrating TxConf index..."); - let txconf_iter = txstore_db.iter_scan(b"C"); - for chunk in &txconf_iter.chunks(BATCH_SIZE) { - let mut batch = WriteBatch::default(); - for v1_row in chunk { - let v1_txconf: V1TxConfKey = - deserialize_little(&v1_row.key).expect("invalid TxConfKey"); - let blockhash = BlockHash::from_byte_array(v1_txconf.blockhash); - if let Some(header) = headers.header_by_blockhash(&blockhash) { - // The blockhash is still part of the best chain, use its height to construct the V2 row - let v2_row = V2TxConfRow::new(v1_txconf.txid, header.height() as u32).into_row(); - batch.put(v2_row.key, v2_row.value); - } else { - // The transaction was reorged, don't write the V2 entry - // trace!("[2/4] skipping reorged TxConf for {}", Txid::from_byte_array(txconf.txid)); - } - progress!( - "[2/4] migrating TxConf index ~{:.2}%", - est_hash_progress(&v1_txconf.txid) - ); - } - // Write batches without flushing (sync and WAL disabled) - trace!("[2/4] writing batch of {} ops", batch.len()); - history_db.write_batch(batch, DBFlush::Disable); - } - // Flush the history db, only then delete the original rows from the txstore db - info!("[2/4] flushing V2 TxConf to history db"); - history_db.flush(); - info!("[2/4] deleting V1 TxConf from txstore db"); - txstore_db.delete_range(b"C", b"D", DBFlush::Enable); - - // 3. Migrate the TxEdge spending index - // - Changed from a set of inputs seen to spend the outpoint to a single spending input (that is part of the best chain) - // - Keep the height of the spending tx - // - Entries originating from stale blocks are removed - info!("[3/4] migrating TxEdge index..."); - let txedge_iter = history_db.iter_scan(b"S"); - for chunk in &txedge_iter.chunks(BATCH_SIZE) { - let mut v1_edges = Vec::with_capacity(BATCH_SIZE); - let mut spending_txids = BTreeSet::new(); - for v1_row in chunk { - if let Ok(v1_edge) = deserialize_little::(&v1_row.key) { - spending_txids.insert(Txid::from_byte_array(v1_edge.spending_txid)); - v1_edges.push((v1_edge, v1_row.key)); - } - // Rows with keys that cannot be deserialized into V1TxEdgeKey are assumed to already be upgraded, and skipped - // This is necessary to properly recover if the migration stops halfway through. - } - - // Lookup the confirmation status for the entire chunk using a MultiGet operation - let confirmations = lookup_confirmations(history_db, tip_height, spending_txids); - - let mut batch = WriteBatch::default(); - for (v1_edge, v1_db_key) in v1_edges { - let spending_txid = Txid::from_byte_array(v1_edge.spending_txid); - - // Remove the old V1 entry. V2 entries use a different key. - batch.delete(v1_db_key); - - if let Some(spending_height) = confirmations.get(&spending_txid) { - // Re-add the V2 entry if it is still part of the best chain - let v2_row = V2TxEdgeRow::new( - v1_edge.funding_txid, - v1_edge.funding_vout, - v1_edge.spending_txid, - v1_edge.spending_vin, - *spending_height, // now with the height included - ) - .into_row(); - batch.put(v2_row.key, v2_row.value); - } else { - // The spending transaction was reorged, don't write the V2 entry - //trace!("[3/4] skipping reorged TxEdge for {}", spending_txid); - } - - progress!( - "[3/4] migrating TxEdge index ~{:.2}%", - est_hash_progress(&v1_edge.funding_txid) - ); - } - // Write batches without flushing (sync and WAL disabled) - trace!("[3/4] writing batch of {} ops", batch.len()); - history_db.write_batch(batch, DBFlush::Disable); - } - info!("[3/4] flushing V2 TxEdge index to history db"); - history_db.flush(); - - // 4. Migrate the TxHistory index - // Entries originating from stale blocks are removed, with no other changes - info!("[4/4] migrating TxHistory index..."); - for prefix in HISTORY_PREFIXES { - let txhistory_iter = history_db.iter_scan(&[prefix]); - info!("[4/4] migrating TxHistory index {}", prefix as char); - for chunk in &txhistory_iter.chunks(BATCH_SIZE) { - let mut history_entries = Vec::with_capacity(BATCH_SIZE); - let mut history_txids = BTreeSet::new(); - for row in chunk { - let hist: TxHistoryKey = deserialize_big(&row.key).expect("invalid TxHistoryKey"); - history_txids.insert(hist.txinfo.get_txid()); - history_entries.push((hist, row.key)); - } - - // Lookup the confirmation status for the entire chunk using a MultiGet operation - let confirmations = lookup_confirmations(history_db, tip_height, history_txids); - - let mut batch = WriteBatch::default(); - for (hist, db_key) in history_entries { - let hist_txid = hist.txinfo.get_txid(); - if confirmations.get(&hist_txid) != Some(&hist.confirmed_height) { - // The history entry originated from a stale block, remove it - batch.delete(db_key); - // trace!("[4/4] removing reorged TxHistory for {}", hist.txinfo.get_txid()); - } - progress!( - "[4/4] migrating TxHistory index {} ~{:.2}%", - prefix as char, - est_hash_progress(&hist.hash) - ); - } - // Write batches without flushing (sync and WAL disabled) - trace!("[4/4] writing batch of {} deletions", batch.len()); - if !batch.is_empty() { - history_db.write_batch(batch, DBFlush::Disable); - } - } - } - info!("[4/4] flushing TxHistory deletions to history db"); - history_db.flush(); - - // Update the DB version under `V` - let ver_bytes = serialize_little(&(TO_DB_VERSION, config.light_mode)).unwrap(); - for db in [txstore_db, history_db, cache_db] { - db.put_sync(b"V", &ver_bytes); - } - - // Compact everything once at the end - txstore_db.full_compaction(); - history_db.full_compaction(); -} - -// Estimates progress using the first 4 bytes, relying on RocksDB's lexicographic key ordering and uniform hash distribution -fn est_hash_progress(hash: &FullHash) -> f32 { - u32::from_be_bytes(hash[0..4].try_into().unwrap()) as f32 / u32::MAX as f32 * 100f32 -} - -#[derive(Debug, serde::Deserialize)] -struct V1TxConfKey { - #[allow(dead_code)] - code: u8, - txid: FullHash, - blockhash: FullHash, -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -struct V1TxEdgeKey { - code: u8, - funding_txid: FullHash, - funding_vout: u16, - spending_txid: FullHash, - spending_vin: u16, -} - -#[derive(Debug, serde::Serialize)] -struct V2TxEdgeKey { - code: u8, - funding_txid: FullHash, - funding_vout: u16, -} - -#[derive(Debug, serde::Serialize)] -struct V2TxEdgeValue { - spending_txid: FullHash, - spending_vin: u16, - spending_height: u32, -} - -struct V2TxEdgeRow { - key: V2TxEdgeKey, - value: V2TxEdgeValue, -} - -impl V2TxEdgeRow { - fn new( - funding_txid: FullHash, - funding_vout: u16, - spending_txid: FullHash, - spending_vin: u16, - spending_height: u32, - ) -> Self { - Self { - key: V2TxEdgeKey { - code: b'S', - funding_txid, - funding_vout, - }, - value: V2TxEdgeValue { - spending_txid, - spending_vin, - spending_height, - }, - } - } - - fn into_row(self) -> electrs::new_index::DBRow { - electrs::new_index::DBRow { - key: serialize_little(&self.key).unwrap(), - value: serialize_little(&self.value).unwrap(), - } - } -} - -/* -use bitcoin::hex::DisplayHex; - -fn dump_db(db: &DB, label: &str, prefix: &[u8]) { - debug!("dumping {}", label); - for item in db.iter_scan(prefix) { - trace!( - "[{}] {} => {}", - label, - fmt_key(&item.key), - &item.value.to_lower_hex_string() - ); - } -} - -fn debug_batch(batch: &WriteBatch, label: &'static str) { - debug!("batch {} with {} ops", label, batch.len()); - batch.iterate(&mut WriteBatchLogIterator(label)); -} - -struct WriteBatchLogIterator(&'static str); -impl rocksdb::WriteBatchIterator for WriteBatchLogIterator { - fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>) { - trace!( - "[batch {}] PUT {} => {}", - self.0, - fmt_key(&key), - value.to_lower_hex_string() - ); - } - fn delete(&mut self, key: Box<[u8]>) { - trace!("[batch {}] DELETE {}", self.0, fmt_key(&key)); - } -} - -fn fmt_key(key: &[u8]) -> String { - format!("{}-{}", key[0] as char, &key[1..].to_lower_hex_string()) -} -*/ diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 6a417d824..e18f010cc 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -7,23 +7,23 @@ extern crate log; extern crate electrs; -use crossbeam_channel::{self as channel}; -use error_chain::ChainedError; -use std::{env, process, thread}; -use std::sync::{Arc, RwLock}; -use std::time::Duration; use bitcoin::hex::DisplayHex; -use rand::{rng, RngCore}; +use crossbeam_channel::{self as channel}; use electrs::{ config::Config, daemon::Daemon, electrum::RPC as ElectrumRPC, errors::*, metrics::Metrics, - new_index::{precache, zmq, ChainQuery, FetchFrom, Indexer, Mempool, Query, Store}, + new_index::{precache, zmq, ChainQuery, Indexer, Mempool, Query, Store}, rest, signal::Waiter, }; +use error_chain::ChainedError; +use rand::{rng, RngCore}; +use std::sync::{Arc, RwLock}; +use std::time::Duration; +use std::{env, process, thread}; #[cfg(feature = "otlp-tracing")] use electrs::otlp_trace; @@ -35,22 +35,6 @@ use electrs::metrics::MetricOpts; /// Default salt rotation interval in seconds (24 hours) const DEFAULT_SALT_ROTATION_INTERVAL_SECS: u64 = 24 * 3600; -fn fetch_from(config: &Config, store: &Store) -> FetchFrom { - let mut jsonrpc_import = config.jsonrpc_import; - if !jsonrpc_import { - // switch over to jsonrpc after the initial sync is done - jsonrpc_import = store.done_initial_sync(); - } - - if jsonrpc_import { - // slower, uses JSONRPC (good for incremental updates) - FetchFrom::Bitcoind - } else { - // faster, uses blk*.dat files (good for initial indexing) - FetchFrom::BlkFiles - } -} - fn run_server(config: Arc, salt_rwlock: Arc>) -> Result<()> { let (block_hash_notify, block_hash_receive) = channel::bounded(1); let signal = Waiter::start(block_hash_receive); @@ -66,7 +50,6 @@ fn run_server(config: Arc, salt_rwlock: Arc>) -> Result<( info!("connecting to daemon at {}", config.daemon_rpc_addr); let daemon = Arc::new(Daemon::new( &config.daemon_dir, - &config.blocks_dir, config.daemon_rpc_addr, config.daemon_parallelism, config.cookie_getter(), @@ -75,23 +58,13 @@ fn run_server(config: Arc, salt_rwlock: Arc>) -> Result<( &metrics, )?); info!("opening database at {}", config.db_path.display()); - let store = Arc::new(Store::open(&config, &metrics, true)); - let mut indexer = Indexer::open( - Arc::clone(&store), - fetch_from(&config, &store), - &config, - &metrics, - ); + let store = Arc::new(Store::open(&config, &metrics)); + let mut indexer = Indexer::open(Arc::clone(&store), &config, &metrics); info!("starting initial sync"); let mut tip = indexer.update(&daemon)?; info!("initial sync complete, tip at {}", tip); - let chain = Arc::new(ChainQuery::new( - Arc::clone(&store), - Arc::clone(&daemon), - &config, - &metrics, - )); + let chain = Arc::new(ChainQuery::new(Arc::clone(&store), &config, &metrics)); if let Some(ref precache_file) = config.precache_scripts { let precache_scripthashes = precache::scripthashes_from_file(precache_file.to_string()) diff --git a/src/bin/popular-scripts.rs b/src/bin/popular-scripts.rs index 6ad39f667..3ed3cf22c 100644 --- a/src/bin/popular-scripts.rs +++ b/src/bin/popular-scripts.rs @@ -8,7 +8,7 @@ use electrs::{ fn main() { let config = Config::from_args(); let metrics = Metrics::new(config.monitoring_addr); - let store = Store::open(&config, &metrics, true); + let store = Store::open(&config, &metrics); let mut iter = store.history_db().raw_iterator(); iter.seek(b"H"); diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index f96c7e7e4..c567117c2 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -16,7 +16,7 @@ fn main() { config::Config, daemon::Daemon, metrics::Metrics, - new_index::{ChainQuery, FetchFrom, Indexer, Store}, + new_index::{ChainQuery, Indexer, Store}, signal::Waiter, util::has_prevout, }; @@ -24,7 +24,7 @@ fn main() { let signal = Waiter::start(crossbeam_channel::never()); let config = Config::from_args(); let metrics = Metrics::new(config.monitoring_addr); - let store = Arc::new(Store::open(&config, &metrics, true)); + let store = Arc::new(Store::open(&config, &metrics)); let metrics = Metrics::new(config.monitoring_addr); metrics.start(); @@ -32,7 +32,6 @@ fn main() { let daemon = Arc::new( Daemon::new( &config.daemon_dir, - &config.blocks_dir, config.daemon_rpc_addr, config.daemon_parallelism, config.cookie_getter(), @@ -43,9 +42,9 @@ fn main() { .unwrap(), ); - let chain = ChainQuery::new(Arc::clone(&store), Arc::clone(&daemon), &config, &metrics); + let chain = ChainQuery::new(Arc::clone(&store), &config, &metrics); - let mut indexer = Indexer::open(Arc::clone(&store), FetchFrom::Bitcoind, &config, &metrics); + let mut indexer = Indexer::open(Arc::clone(&store), &config, &metrics); indexer.update(&daemon).unwrap(); let mut iter = store.txstore_db().raw_iterator(); diff --git a/src/chain.rs b/src/chain.rs index 0475223e6..f5dd5210b 100644 --- a/src/chain.rs +++ b/src/chain.rs @@ -44,19 +44,6 @@ pub enum Network { } impl Network { - #[cfg(not(feature = "liquid"))] - pub fn magic(self) -> u32 { - u32::from_le_bytes(BNetwork::from(self).magic().to_bytes()) - } - - #[cfg(feature = "liquid")] - pub fn magic(self) -> u32 { - match self { - Network::Liquid | Network::LiquidRegtest => 0xDAB5_BFFA, - Network::LiquidTestnet => 0x62DD_0E41, - } - } - pub fn is_regtest(self) -> bool { match self { #[cfg(not(feature = "liquid"))] diff --git a/src/config.rs b/src/config.rs index dd1304b4e..a8d850b49 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,7 +23,6 @@ pub struct Config { pub network_type: Network, pub db_path: PathBuf, pub daemon_dir: PathBuf, - pub blocks_dir: PathBuf, pub daemon_rpc_addr: SocketAddr, pub daemon_parallelism: usize, pub cookie: Option, @@ -31,8 +30,6 @@ pub struct Config { pub http_addr: SocketAddr, pub http_socket_file: Option, pub monitoring_addr: SocketAddr, - pub jsonrpc_import: bool, - pub light_mode: bool, pub address_search: bool, pub index_unspendables: bool, pub cors: Option, @@ -43,9 +40,8 @@ pub struct Config { pub rpc_logging: RpcLogging, pub zmq_addr: Option, - /// RocksDB block cache size in MB (per database) + /// RocksDB block cache size in MB (shared across all column families) /// Caches decompressed data blocks, plus index and filter blocks (via cache_index_and_filter_blocks). - /// Total memory usage = cache_size * 3_databases (txstore, history, cache) /// Recommendation: 1024 MB for steady-state; 4096 MB+ for initial sync (L0 SST /// files accumulate up to the compaction trigger — their index, filter (Bloom), /// and data blocks must fit in this cache). With 10 bits/key bloom filters and @@ -58,13 +54,13 @@ pub struct Config { /// This configures max_background_jobs and thread pools automatically pub db_parallelism: usize, - /// RocksDB write buffer size in MB (per database) - /// Each database uses this much RAM for in-memory writes before flushing to disk - /// Total RAM usage = write_buffer_size * max_write_buffer_number * 3_databases + /// RocksDB write buffer size in MB (per column family) + /// Each column family uses this much RAM for in-memory writes before flushing to disk + /// Total RAM usage = write_buffer_size * max_write_buffer_number * 3 CFs /// Larger buffers = fewer flushes (less CPU) but more RAM usage pub db_write_buffer_size_mb: usize, - /// Number of blocks per batch during initial sync (bitcoind fetch mode). + /// Number of blocks per batch during initial sync (for the legacy --no-spenttxouts and Liquid modes). /// Larger batches keep more O rows in the write buffer when index() runs lookup_txos(), /// improving cache hit rate for outputs spent within the same batch window. /// Must stay within db_write_buffer_size_mb to avoid mid-batch flushes. @@ -77,6 +73,11 @@ pub struct Config { /// per SST file of unbounded memory. pub db_cache_index_filter_blocks: bool, + #[cfg(not(feature = "liquid"))] + /// Use Bitcoin Core's REST spenttxouts endpoint to resolve spent outputs during indexing, + /// instead of looking them up in RocksDB. Requires Bitcoin Core 30+ with -rest=1. + pub use_spenttxouts: bool, + #[cfg(feature = "liquid")] pub parent_network: BNetwork, #[cfg(feature = "liquid")] @@ -128,12 +129,6 @@ impl Config { .help("Data directory of Bitcoind (default: ~/.bitcoin/)") .takes_value(true), ) - .arg( - Arg::with_name("blocks_dir") - .long("blocks-dir") - .help("Analogous to bitcoind's -blocksdir option, this specifies the directory containing the raw blocks files (blk*.dat) (default: ~/.bitcoin/blocks/)") - .takes_value(true), - ) .arg( Arg::with_name("cookie") .long("cookie") @@ -176,16 +171,6 @@ impl Config { .help("Prometheus monitoring 'addr:port' to listen on (default: 127.0.0.1:4224 for mainnet, 127.0.0.1:14224 for testnet3 and 127.0.0.1:44224 for testnet4 and 127.0.0.1:24224 for regtest)") .takes_value(true), ) - .arg( - Arg::with_name("jsonrpc_import") - .long("jsonrpc-import") - .help("Use JSONRPC instead of directly importing blk*.dat files. Useful for remote full node or low memory system"), - ) - .arg( - Arg::with_name("light_mode") - .long("lightmode") - .help("Enable light mode for reduced storage") - ) .arg( Arg::with_name("address_search") .long("address-search") @@ -242,7 +227,7 @@ impl Config { ).arg( Arg::with_name("db_block_cache_mb") .long("db-block-cache-mb") - .help("RocksDB block cache size in MB (shared across all databases). Bounds index/filter block memory; use 4096+ for initial sync to avoid table-reader heap growth.") + .help("RocksDB block cache size in MB (shared across all column families). Bounds index/filter block memory; use 4096+ for initial sync to avoid table-reader heap growth.") .takes_value(true) .default_value("24") ).arg( @@ -254,13 +239,13 @@ impl Config { ).arg( Arg::with_name("db_write_buffer_size_mb") .long("db-write-buffer-size-mb") - .help("RocksDB write buffer size in MB per database. RAM usage = size * max_write_buffers(2) * 3_databases") + .help("RocksDB write buffer size in MB per column family. RAM usage = size * max_write_buffers(2) * 3 CFs") .takes_value(true) - .default_value("256") + .default_value("128") ).arg( Arg::with_name("initial_sync_batch_size") .long("initial-sync-batch-size") - .help("Number of blocks per batch during initial sync. Larger values keep more txo rows in the write buffer during indexing, improving lookup_txos cache hit rate for recently-created outputs.") + .help("Number of blocks per batch during initial sync (for the legacy --no-spenttxouts and Liquid modes). Larger values keep more txo rows in the write buffer during indexing, improving lookup_txos cache hit rate for recently-created outputs.") .takes_value(true) .default_value("250") ).arg( @@ -274,6 +259,13 @@ impl Config { .takes_value(true), ); + #[cfg(not(feature = "liquid"))] + let args = args.arg( + Arg::with_name("no_spenttxouts") + .long("no-spenttxouts") + .help("Use the legacy indexer that does not rely on the spenttxouts REST endpoint. Required for Bitcoin Core before v30."), + ); + #[cfg(unix)] let args = args.arg( Arg::with_name("http_socket_file") @@ -447,10 +439,6 @@ impl Config { if let Some(network_subdir) = get_network_subdir(network_type) { daemon_dir.push(network_subdir); } - let blocks_dir = m - .value_of("blocks_dir") - .map(PathBuf::from) - .unwrap_or_else(|| daemon_dir.join("blocks")); let cookie = m.value_of("cookie").map(|s| s.to_owned()); let electrum_banner = m.value_of("electrum_banner").map_or_else( @@ -479,7 +467,6 @@ impl Config { network_type, db_path, daemon_dir, - blocks_dir, daemon_rpc_addr, daemon_parallelism: value_t_or_exit!(m, "daemon_parallelism", usize), cookie, @@ -499,8 +486,6 @@ impl Config { http_addr, http_socket_file, monitoring_addr, - jsonrpc_import: m.is_present("jsonrpc_import"), - light_mode: m.is_present("light_mode"), address_search: m.is_present("address_search"), index_unspendables: m.is_present("index_unspendables"), cors: m.value_of("cors").map(|s| s.to_string()), @@ -510,6 +495,8 @@ impl Config { db_write_buffer_size_mb: value_t_or_exit!(m, "db_write_buffer_size_mb", usize), initial_sync_batch_size: value_t_or_exit!(m, "initial_sync_batch_size", usize), db_cache_index_filter_blocks: m.is_present("cache_index_filter_blocks"), + #[cfg(not(feature = "liquid"))] + use_spenttxouts: !m.is_present("no_spenttxouts"), zmq_addr, #[cfg(feature = "liquid")] diff --git a/src/daemon.rs b/src/daemon.rs index 4cb38306f..c9b00d0ef 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,13 +1,12 @@ use std::cell::OnceCell; use std::collections::{HashMap, HashSet}; -use std::convert::TryFrom; +use std::env; use std::io::{BufRead, BufReader, Lines, Write}; use std::net::{SocketAddr, TcpStream}; use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; -use std::{env, fs, io}; use base64::prelude::{Engine, BASE64_STANDARD}; #[cfg(feature = "liquid")] @@ -17,9 +16,13 @@ use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterato use serde_json::{from_str, from_value, Value}; #[cfg(not(feature = "liquid"))] -use bitcoin::consensus::encode::{deserialize_hex, serialize_hex}; +use bitcoin::consensus::{ + encode::{deserialize_hex, serialize_hex, VarInt}, + Decodable, +}; #[cfg(feature = "liquid")] -use elements::encode::{deserialize, serialize_hex}; +use elements::encode::{deserialize, serialize_hex, Decodable}; +use rayon::iter::IntoParallelRefIterator; use electrs_macros::trace; @@ -67,11 +70,6 @@ fn header_from_value(value: Value) -> Result { deserialize_value(header_hex) } -fn block_from_value(value: Value) -> Result { - let block_hex = value.as_str().chain_err(|| "non-string block")?; - deserialize_value(block_hex) -} - fn tx_from_value(value: Value) -> Result { let tx_hex = value.as_str().chain_err(|| "non-string tx")?; deserialize_value(tx_hex) @@ -335,13 +333,14 @@ impl Counter { pub struct Daemon { daemon_dir: PathBuf, - blocks_dir: PathBuf, network: Network, conn: Mutex, message_id: Counter, // for monotonic JSONRPC 'id' signal: Waiter, + addr: SocketAddr, - rpc_threads: Arc, + request_pool: Arc, + rest_agent: ureq::Agent, // connection-pooling HTTP agent for REST endpoints // monitoring latency: HistogramVec, @@ -351,7 +350,6 @@ pub struct Daemon { impl Daemon { pub fn new( daemon_dir: &PathBuf, - blocks_dir: &PathBuf, daemon_rpc_addr: SocketAddr, daemon_parallelism: usize, cookie_getter: Arc, @@ -361,7 +359,6 @@ impl Daemon { ) -> Result { let daemon = Daemon { daemon_dir: daemon_dir.clone(), - blocks_dir: blocks_dir.clone(), network, conn: Mutex::new(Connection::new( daemon_rpc_addr, @@ -370,13 +367,20 @@ impl Daemon { )?), message_id: Counter::new(), signal: signal.clone(), - rpc_threads: Arc::new( + addr: daemon_rpc_addr, + request_pool: Arc::new( rayon::ThreadPoolBuilder::new() .num_threads(daemon_parallelism) - .thread_name(|i| format!("rpc-requests-{}", i)) + .thread_name(|i| format!("daemon-requests-{}", i)) .build() .unwrap(), ), + rest_agent: ureq::Agent::new_with_config( + ureq::Agent::config_builder() + .max_idle_connections(daemon_parallelism * 2) + .max_idle_connections_per_host(daemon_parallelism * 2) + .build(), + ), latency: metrics.histogram_vec( HistogramOpts::new("daemon_rpc", "Bitcoind RPC latency (in seconds)"), &["method"], @@ -421,51 +425,24 @@ impl Daemon { pub fn reconnect(&self) -> Result { Ok(Daemon { daemon_dir: self.daemon_dir.clone(), - blocks_dir: self.blocks_dir.clone(), network: self.network, conn: Mutex::new(self.conn.lock().unwrap().reconnect()?), message_id: Counter::new(), signal: self.signal.clone(), - rpc_threads: self.rpc_threads.clone(), + addr: self.addr, + request_pool: self.request_pool.clone(), + rest_agent: self.rest_agent.clone(), latency: self.latency.clone(), size: self.size.clone(), }) } - #[trace] - pub fn list_blk_files(&self) -> Result> { - let path = self.blocks_dir.join("blk*.dat"); - debug!("listing block files at {:?}", path); - let mut paths: Vec = glob::glob(path.to_str().unwrap()) - .chain_err(|| "failed to list blk*.dat files")? - .map(|res| res.unwrap()) - .collect(); - paths.sort(); - Ok(paths) - } - - /// bitcoind v28.0+ defaults to xor-ing all blk*.dat files with this key, - /// stored in the blocks dir. - /// See: - pub fn read_blk_file_xor_key(&self) -> Result> { - // From: - let path = self.blocks_dir.join("xor.dat"); - let bytes = match fs::read(path) { - Ok(bytes) => bytes, - Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(None), - Err(err) => return Err(err).chain_err(|| "failed to read daemon xor.dat file"), - }; - let xor_key: [u8; 8] = <[u8; 8]>::try_from(bytes.as_slice()).chain_err(|| { - format!( - "xor.dat unexpected length: actual: {}, expected: 8", - bytes.len() - ) - })?; - Ok(Some(xor_key)) - } - - pub fn magic(&self) -> u32 { - self.network.magic() + pub fn with_request_pool(&self, f: F) -> T + where + T: Send, + F: FnOnce() -> T + Send, + { + self.request_pool.install(f) } #[trace] @@ -531,14 +508,14 @@ impl Daemon { // buffering the replies into a vector. If any of the requests fail, processing is terminated and an Err is returned. #[trace] fn requests(&self, method: &str, params_list: Vec) -> Result> { - self.rpc_threads + self.request_pool .install(|| self.requests_iter(method, params_list).collect()) } // Send requests in parallel over multiple RPC connections, iterating over the results without buffering them. // Errors are included in the iterator and do not terminate other pending requests. // - // IMPORTANT: The returned parallel iterator must be collected inside self.rpc_threads.install() + // IMPORTANT: The returned parallel iterator must be collected inside self.request_pool.install() // to ensure it runs on the daemon's own thread pool, not the global rayon pool. This is necessary // because the per-thread DAEMON_INSTANCE thread-locals would otherwise be shared across different // daemon instances in the same process (e.g. during parallel tests). @@ -550,7 +527,7 @@ impl Daemon { ) -> impl ParallelIterator> + IndexedParallelIterator + 'a { params_list.into_par_iter().map(move |params| { // Store a local per-thread Daemon, each with its own TCP connection. These will - // get initialized as necessary for the `rpc_threads` pool thread managed by rayon. + // get initialized as necessary for the request_pool thread managed by rayon. thread_local!(static DAEMON_INSTANCE: OnceCell = OnceCell::new()); DAEMON_INSTANCE.with(|daemon| { @@ -602,52 +579,83 @@ impl Daemon { #[trace] pub fn getblock(&self, blockhash: &BlockHash) -> Result { - let block = - block_from_value(self.request("getblock", json!([blockhash, /*verbose=*/ false]))?)?; - assert_eq!(block.block_hash(), *blockhash); - Ok(block) - } - - #[trace] - pub fn getblock_raw(&self, blockhash: &BlockHash, verbose: u32) -> Result { - self.request("getblock", json!([blockhash, verbose])) - } - - #[trace] - pub fn getblocks(&self, blockhashes: &[BlockHash]) -> Result> { - let params_list: Vec = blockhashes - .iter() - .map(|hash| json!([hash, /*verbose=*/ false])) - .collect(); + let url = format!("http://{}/rest/block/{}.bin", self.addr, blockhash); let mut attempts = MAX_ATTEMPTS; - let values = loop { + loop { attempts -= 1; - match self.requests("getblock", params_list.clone()) { - Ok(blocks) => break blocks, - Err(e) => { - let err_msg = format!("{e:?}"); - if err_msg.contains("Block not found on disk") - || err_msg.contains("Block not available") - { + let request = self + .rest_agent + .get(&url) + .config() + .http_status_as_error(false) + .build(); + + match request.call() { + Ok(mut response) if response.status() == 200 => { + let mut reader = BufReader::new(response.body_mut().as_reader()); + let block = Block::consensus_decode(&mut reader) + .chain_err(|| "failed to parse block")?; + ensure!( + reader + .fill_buf() + .chain_err(|| "failed to end stream")? + .is_empty(), + "block response has trailing data" + ); + ensure!( + block.block_hash() == *blockhash, + "REST block hash mismatch: expected {}, got {}", + blockhash, + block.block_hash() + ); + return Ok(block); + } + Ok(mut response) => { + let status = response.status(); + let body = response.body_mut().read_to_string().map_err(|e| { + ErrorKind::Connection(format!( + "REST block fetch failed with status {}: {}", + status, e + )) + })?; + match status { // There is a small chance the node returns the header but didn't finish to index the block - log::warn!("getblocks failing with: {e:?} trying {attempts} more time") - } else { - panic!("failed to get blocks from bitcoind: {}", err_msg); + hyper::StatusCode::NOT_FOUND if body.contains("not available (not fully downloaded)") => warn!( + "REST block fetch failed with status 404: {body}, trying {attempts} more time", + ), + // Treat other errors as fatal + status => bail!(ErrorKind::Connection(format!( + "REST block fetch failed with status {status}: {body}, giving up", + ))), } } + Err(e) => { + warn!("REST block fetch failed: {e:?}, trying {attempts} more time"); + } } + if attempts == 0 { - panic!("failed to get blocks from bitcoind") + bail!("failed to get block from bitcoind") } std::thread::sleep(RETRY_WAIT_DURATION); - }; - let mut blocks = vec![]; - for value in values { - blocks.push(block_from_value(value)?); } - Ok(blocks) + } + + #[trace] + pub fn getblock_raw(&self, blockhash: &BlockHash, verbose: u32) -> Result { + self.request("getblock", json!([blockhash, verbose])) + } + + #[trace] + pub fn getblocks(&self, blockhashes: &[BlockHash]) -> Result> { + self.with_request_pool(|| { + blockhashes + .par_iter() + .map(|hash| self.getblock(hash)) + .collect() + }) } /// Fetch the given transactions in parallel over multiple threads and RPC connections, @@ -661,7 +669,7 @@ impl Daemon { .map(|txhash| json!([txhash, /*verbose=*/ false])) .collect(); - self.rpc_threads.install(|| { + self.request_pool.install(|| { self.requests_iter("getrawtransaction", params_list) .zip(txids) .filter_map(|(res, txid)| match res { @@ -847,4 +855,70 @@ impl Daemon { // from BTC/kB to sat/b Ok(relayfee * 100_000f64) } + + /// Fetch spent transaction outputs for the given block via Bitcoin Core's + /// REST /rest/spenttxouts/.bin endpoint. + /// + /// Returns one Vec per non-coinbase transaction, with TxOuts ordered + /// by input index. + /// + /// Requires bitcoind started with -rest=1. + #[cfg(not(feature = "liquid"))] + pub fn get_spent_txouts(&self, blockhash: &BlockHash) -> Result>> { + let url = format!("http://{}/rest/spenttxouts/{}.bin", self.addr, blockhash); + let mut response = self.rest_agent.get(&url).call().map_err(|e| { + ErrorKind::Connection(format!( + "REST spenttxouts failed for {} (is bitcoind running with -rest=1?): {}", + blockhash, e + )) + })?; + let mut reader = BufReader::new(response.body_mut().as_reader()); + parse_spent_txouts(&mut reader) + } +} + +#[cfg(not(feature = "liquid"))] +fn parse_spent_txouts( + reader: &mut R, +) -> Result>> { + // The binary spenttxouts format is: the CompactSize tx count, then for each tx the CompactSize TxOut count followed by the TxOuts. + // Entry 0 is an empty coinbase placeholder, which we drop from the final returned structure. + let tx_count = VarInt::consensus_decode(reader) + .chain_err(|| "failed to parse spenttxouts tx count")? + .0 as usize; + + ensure!(tx_count > 0, "spenttxouts response is empty"); + + let coinbase_input_count = VarInt::consensus_decode(reader) + .chain_err(|| "failed to parse spenttxouts coinbase placeholder")? + .0; + ensure!( + coinbase_input_count == 0, + "spenttxouts coinbase placeholder is not empty" + ); + + let mut spenttxouts = Vec::with_capacity(tx_count - 1); + for _ in 1..tx_count { + let input_count = VarInt::consensus_decode(reader) + .chain_err(|| "failed to parse spenttxouts input count")? + .0 as usize; + let mut prevouts = Vec::with_capacity(input_count); + for _ in 0..input_count { + prevouts.push( + bitcoin::TxOut::consensus_decode(reader) + .chain_err(|| "failed to parse spenttxout")?, + ); + } + spenttxouts.push(prevouts); + } + + ensure!( + reader + .fill_buf() + .chain_err(|| "failed to end stream")? + .is_empty(), + "spenttxouts response has trailing data" + ); + + Ok(spenttxouts) } diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 9c46c175a..a65eb0863 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -1,6 +1,6 @@ use prometheus::GaugeVec; use rayon::prelude::*; -use rocksdb; +use rocksdb::ColumnFamilyDescriptor; use std::convert::TryInto; use std::path::Path; @@ -12,10 +12,15 @@ use crate::config::Config; use crate::new_index::db_metrics::RocksDbMetrics; use crate::util::{bincode, spawn_thread, Bytes}; -static DB_VERSION: u32 = 3; +const DB_VERSION: u32 = 4; const ROCKSDB_NUM_LEVELS: u32 = 7; +const DEFAULT_CF: &str = rocksdb::DEFAULT_COLUMN_FAMILY_NAME; +const TXSTORE_CF: &str = "txstore"; +const HISTORY_CF: &str = "history"; +const CACHE_CF: &str = "cache"; + #[derive(Debug, Eq, PartialEq)] pub struct DBRow { pub key: Vec, @@ -81,6 +86,7 @@ impl<'a> Iterator for ReverseScanIterator<'a> { #[derive(Debug)] pub struct DB { db: Arc, + cf_name: &'static str, } #[derive(Copy, Clone, Debug)] @@ -90,126 +96,49 @@ pub enum DBFlush { } impl DB { - pub fn open(path: &Path, config: &Config, verify_compat: bool, shared_cache: &rocksdb::Cache) -> DB { - info!("opening DB at {:?}", path); - let mut db_opts = rocksdb::Options::default(); - db_opts.create_if_missing(true); - db_opts.set_max_open_files(100_000); // TODO: make sure to `ulimit -n` this process correctly - db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); - db_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); - db_opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); - db_opts.set_target_file_size_base(1_073_741_824); - // Bulk-load compaction: allow L0 files to accumulate to a bounded limit - // before compacting. This reduces write amplification compared to the - // default trigger of 4, while keeping the file count — and therefore - // bloom-filter memory and lookup cost — bounded. - // - // With bloom filters at 10 bits/key and a 512 MB write buffer, each L0 - // file has ~7.8 M keys, so its filter block is ~9.75 MB. At 64 files - // that is ~625 MB of pinned filter blocks — well within an 8 GB cache. - // Each lookup checks 64 bloom filters (fast, in-memory) and reads from - // only ~0.64 files on average (1 % false-positive rate × 64 files). - // - // Set slowdown/stop triggers well above the compaction trigger so writes - // are never stalled while background compaction catches up. - // Disable the pending-compaction-bytes stall so the large backlog that - // builds up during the bulk load does not block writes. - const L0_BULK_TRIGGER: i32 = 64; - db_opts.set_level_zero_file_num_compaction_trigger(L0_BULK_TRIGGER); - db_opts.set_level_zero_slowdown_writes_trigger(L0_BULK_TRIGGER * 4); - db_opts.set_level_zero_stop_writes_trigger(L0_BULK_TRIGGER * 8); - db_opts.set_hard_pending_compaction_bytes_limit(0); - db_opts.set_soft_pending_compaction_bytes_limit(0); - - - let parallelism: i32 = config.db_parallelism.try_into() - .expect("db_parallelism value too large for i32"); - - // Configure parallelism (background jobs and thread pools) - db_opts.increase_parallelism(parallelism); - - // Configure write buffer size (not set by increase_parallelism) - db_opts.set_write_buffer_size(config.db_write_buffer_size_mb * 1024 * 1024); - - // 4 MiB readahead for compaction I/O. Larger than the previous 1 MiB to better - // amortise syscall overhead when reading the many L0 files accumulated during - // initial sync. - db_opts.set_compaction_readahead_size(4 << 20); - - // Background-sync SST files to the OS incrementally as they are written, - // rather than doing a large fsync on close. Smooths out I/O latency spikes. - db_opts.set_bytes_per_sync(1 << 20); - - // Parallelize sub-ranges within a single compaction job (including the one-time - // full_compaction at the end of initial sync). Without this, compact_range() is - // single-threaded regardless of increase_parallelism(). Setting it equal to the - // parallelism level keeps all background threads busy during the final compaction. - db_opts.set_max_subcompactions(parallelism as u32); - - // Configure block cache and table options - let mut block_opts = rocksdb::BlockBasedOptions::default(); - block_opts.set_block_cache(shared_cache); - // When --cache-index-filter-blocks is passed, store index and filter blocks - // inside the block cache so their memory is bounded by --db-block-cache-mb. - // Without this (the default), RocksDB keeps them on the heap where they may - // never be evicted — possibly better for read performance compared to needing - // to go to disk, but uses ~18 MB per SST file. - if config.db_cache_index_filter_blocks { - block_opts.set_cache_index_and_filter_blocks(true); - // Pin L0 index and filter blocks in the cache so they are never evicted. - // Without this, data block churn evicts L0 index/filter blocks, causing - // repeated disk reads for every SST lookup. - block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); - } - // Bloom filters allow multi_get() to skip SST files that don't contain a key - // without touching the index or data blocks. Without this, every point lookup - // must binary-search the index of every L0 file whose key range overlaps the - // query (all of them for random txids) — extremely expensive with 1000+ L0 - // files accumulated during initial sync. At 10 bits/key the false-positive - // rate is ~1%, so only ~10 out of 1000 L0 files need actual I/O per key. - // Combined with the prefix extractor below, these become prefix Bloom filters - // keyed on `code || hash` (33 bytes), which also allow prefix range scans - // (e.g. history lookups) to skip L0 files entirely. The filter blocks are - // cached and pinned alongside the index blocks via the settings above. - block_opts.set_bloom_filter(10.0, false); + fn new(db: Arc, cf_name: &'static str) -> DB { + DB { db, cf_name } + } - // All electrs keys share the structure `code (1 byte) || hash (32 bytes) || ...`. - // A 33-byte fixed prefix extractor enables prefix Bloom filters: range scans - // like iter_scan("H" + scripthash) can skip SST files whose Bloom filter - // doesn't match the prefix, rather than checking every L0 file. - // - // INVARIANT: All iter_scan* and raw_iterator methods must use total_order_seek - // when the seek key may be shorter than 33 bytes. Without it, RocksDB silently - // skips SST files that contain matching keys. See the conditional in iter_scan(). - db_opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(33)); + pub fn default_cf(db: Arc) -> DB { + Self::new(db, DEFAULT_CF) + } - db_opts.set_block_based_table_factory(&block_opts); + pub fn txstore_cf(db: Arc) -> DB { + Self::new(db, TXSTORE_CF) + } - let db = DB { - db: Arc::new(rocksdb::DB::open(&db_opts, path).expect("failed to open RocksDB")) - }; - if verify_compat { - db.verify_compatibility(config); - } - db + pub fn history_cf(db: Arc) -> DB { + Self::new(db, HISTORY_CF) + } + + pub fn cache_cf(db: Arc) -> DB { + Self::new(db, CACHE_CF) } pub fn full_compaction(&self) { - info!("starting full compaction on {:?}", self.db); + info!( + "starting full compaction on {} ({:?})", + self.cf_name, self.db + ); let start = std::time::Instant::now(); let mut opts = rocksdb::CompactOptions::default(); opts.set_bottommost_level_compaction(rocksdb::BottommostLevelCompaction::Force); - self.db.compact_range_opt(None::<&[u8]>, None::<&[u8]>, &opts); + self.db + .compact_range_cf_opt(self.cf(), None::<&[u8]>, None::<&[u8]>, &opts); let elapsed = start.elapsed(); - info!("finished full compaction on {:?} in elapsed='{:.1?}'", self.db, elapsed); + info!( + "finished full compaction on {} ({:?}) in elapsed='{:.1?}'", + self.cf_name, self.db, elapsed + ); } pub fn enable_auto_compaction(&self) { - // Reset L0 triggers and pending-compaction stall thresholds to RocksDB - // defaults, so that steady-state operation compacts promptly and avoids - // unbounded compaction backlogs that cause read latency spikes. - // RocksDB defaults (stable since v5.x through v10.4.2). Hardcoded because - // set_options() doesn't return previous values and the Rust bindings lack getters. + // Reset L0 triggers and pending-compaction stall thresholds to RocksDB + // defaults, so that steady-state operation compacts promptly and avoids + // unbounded compaction backlogs that cause read latency spikes. + // RocksDB defaults (stable since v5.x through v10.4.2). Hardcoded because + // set_options() doesn't return previous values and the Rust bindings lack getters. let soft_limit = (64u64 << 30).to_string(); // 64 GiB let hard_limit = (256u64 << 30).to_string(); // 256 GiB @@ -222,27 +151,28 @@ impl DB { ("soft_pending_compaction_bytes_limit", &soft_limit), ("hard_pending_compaction_bytes_limit", &hard_limit), ]; - self.db.set_options(&opts).unwrap(); + self.db.set_options_cf(self.cf(), &opts).unwrap(); } pub fn raw_iterator(&self) -> rocksdb::DBRawIterator<'_> { let mut opts = rocksdb::ReadOptions::default(); opts.set_total_order_seek(true); - self.db.raw_iterator_opt(opts) + self.db.raw_iterator_cf_opt(self.cf(), opts) } pub fn iter_scan(&self, prefix: &[u8]) -> ScanIterator<'_> { let iter = if prefix.len() >= 33 { - self.db.prefix_iterator(prefix) + self.db.prefix_iterator_cf(self.cf(), prefix) } else { // Short prefixes (e.g. b"B", b"D") are below the 33-byte prefix extractor // length. prefix_iterator would silently skip SST files. Use total_order_seek // to fall back to a full scan; ScanIterator enforces the prefix boundary. let mut opts = rocksdb::ReadOptions::default(); opts.set_total_order_seek(true); - self.db.iterator_opt( - rocksdb::IteratorMode::From(prefix, rocksdb::Direction::Forward), + self.db.iterator_cf_opt( + self.cf(), opts, + rocksdb::IteratorMode::From(prefix, rocksdb::Direction::Forward), ) }; ScanIterator { @@ -257,16 +187,17 @@ impl DB { // uses the prefix extractor for bloom filtering automatically. When < 33 // bytes, fall back to total_order_seek to avoid silent misses. let iter = if start_at.len() >= 33 { - self.db.iterator(rocksdb::IteratorMode::From( - start_at, - rocksdb::Direction::Forward, - )) + self.db.iterator_cf( + self.cf(), + rocksdb::IteratorMode::From(start_at, rocksdb::Direction::Forward), + ) } else { let mut opts = rocksdb::ReadOptions::default(); opts.set_total_order_seek(true); - self.db.iterator_opt( - rocksdb::IteratorMode::From(start_at, rocksdb::Direction::Forward), + self.db.iterator_cf_opt( + self.cf(), opts, + rocksdb::IteratorMode::From(start_at, rocksdb::Direction::Forward), ) }; ScanIterator { @@ -292,25 +223,33 @@ impl DB { pub fn write_rows(&self, mut rows: Vec, flush: DBFlush) { log::trace!( - "writing {} rows to {:?}, flush={:?}", + "writing {} rows to {} ({:?}), flush={:?}", rows.len(), + self.cf_name, self.db, flush ); rows.par_sort_unstable_by(|a, b| a.key.cmp(&b.key)); let mut batch = rocksdb::WriteBatch::default(); + let cf = self.cf(); for row in rows { - batch.put(&row.key, &row.value); + batch.put_cf(cf, &row.key, &row.value); } self.write_batch(batch, flush) } pub fn delete_rows(&self, mut rows: Vec, flush: DBFlush) { - log::trace!("deleting {} rows from {:?}", rows.len(), self.db,); + log::trace!( + "deleting {} rows from {} ({:?})", + rows.len(), + self.cf_name, + self.db, + ); rows.par_sort_unstable_by(|a, b| a.key.cmp(&b.key)); let mut batch = rocksdb::WriteBatch::default(); + let cf = self.cf(); for row in rows { - batch.delete(&row.key); + batch.delete_cf(cf, &row.key); } self.write_batch(batch, flush) } @@ -327,21 +266,21 @@ impl DB { } pub fn flush(&self) { - self.db.flush().unwrap(); + self.db.flush_cf(self.cf()).unwrap(); } pub fn put(&self, key: &[u8], value: &[u8]) { - self.db.put(key, value).unwrap(); + self.db.put_cf(self.cf(), key, value).unwrap(); } pub fn put_sync(&self, key: &[u8], value: &[u8]) { let mut opts = rocksdb::WriteOptions::new(); opts.set_sync(true); - self.db.put_opt(key, value, &opts).unwrap(); + self.db.put_cf_opt(self.cf(), key, value, &opts).unwrap(); } pub fn get(&self, key: &[u8]) -> Option { - self.db.get(key).unwrap().map(|v| v.to_vec()) + self.db.get_cf(self.cf(), key).unwrap().map(|v| v.to_vec()) } pub fn multi_get(&self, keys: I) -> Vec>, rocksdb::Error>> @@ -349,51 +288,57 @@ impl DB { K: AsRef<[u8]>, I: IntoIterator, { - self.db.multi_get(keys) + let cf = self.cf(); + self.db.multi_get_cf(keys.into_iter().map(|key| (cf, key))) } /// Remove database entries in the range [from, to) pub fn delete_range>(&self, from: K, to: K, flush: DBFlush) { let mut batch = rocksdb::WriteBatch::default(); - batch.delete_range(from, to); + batch.delete_range_cf(self.cf(), from, to); self.write_batch(batch, flush); } - fn verify_compatibility(&self, config: &Config) { - let compatibility_bytes = bincode::serialize_little(&(DB_VERSION, config.light_mode)).unwrap(); - - match self.get(b"V") { - None => self.put(b"V", &compatibility_bytes), - Some(x) if x != compatibility_bytes => { - panic!("Incompatible database found. Please reindex or migrate.") - } - Some(_) => (), - } + pub(super) fn cf(&self) -> rocksdb::ColumnFamilyRef<'_> { + self.db + .cf_handle(self.cf_name) + .unwrap_or_else(|| panic!("missing RocksDB column family {}", self.cf_name)) } #[cfg(test)] fn open_test(path: &Path) -> DB { + const TEST_CF: &str = "test"; + let mut db_opts = rocksdb::Options::default(); db_opts.create_if_missing(true); - db_opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(33)); + db_opts.create_missing_column_families(true); + db_opts.set_atomic_flush(true); + let mut cf_opts = rocksdb::Options::default(); + cf_opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(33)); let mut block_opts = rocksdb::BlockBasedOptions::default(); block_opts.set_bloom_filter(10.0, false); - db_opts.set_block_based_table_factory(&block_opts); + cf_opts.set_block_based_table_factory(&block_opts); - DB { - db: Arc::new(rocksdb::DB::open(&db_opts, path).expect("failed to open test RocksDB")), - } + let cf_descriptors = [ColumnFamilyDescriptor::new(TEST_CF, cf_opts)]; + + let db = rocksdb::DB::open_cf_descriptors(&db_opts, path, cf_descriptors) + .expect("failed to open test RocksDB"); + DB::new(Arc::new(db), TEST_CF) } pub fn start_stats_exporter(&self, db_metrics: Arc, db_name: &str) { let db_arc = Arc::clone(&self.db); let db_arc2 = Arc::clone(&self.db); + let cf_name = self.cf_name; let label = db_name.to_string(); let label2 = label.clone(); let update_gauge = move |gauge: &GaugeVec, property: &str| { - if let Ok(Some(value)) = db_arc.property_value(property) { + let cf = db_arc + .cf_handle(cf_name) + .unwrap_or_else(|| panic!("missing RocksDB column family {}", cf_name)); + if let Ok(Some(value)) = db_arc.property_value_cf(cf, property) { if let Ok(v) = value.parse::() { gauge.with_label_values(&[&label]).set(v); } @@ -434,12 +379,18 @@ impl DB { update_gauge(&db_metrics.block_cache_capacity, "rocksdb.block-cache-capacity"); update_gauge(&db_metrics.block_cache_usage, "rocksdb.block-cache-usage"); update_gauge(&db_metrics.block_cache_pinned_usage, "rocksdb.block-cache-pinned-usage"); + let cf = db_arc2 + .cf_handle(cf_name) + .unwrap_or_else(|| panic!("missing RocksDB column family {}", cf_name)); for level in 0..ROCKSDB_NUM_LEVELS { let prop = format!("rocksdb.num-files-at-level{}", level); - if let Ok(Some(value)) = db_arc2.property_value(&prop) { + if let Ok(Some(value)) = db_arc2.property_value_cf(cf, &prop) { if let Ok(v) = value.parse::() { let level_str = level.to_string(); - db_metrics.num_files_at_level.with_label_values(&[&label2, &level_str]).set(v); + db_metrics + .num_files_at_level + .with_label_values(&[&label2, &level_str]) + .set(v); } } } @@ -448,6 +399,160 @@ impl DB { } } +pub fn open_rocksdb(path: &Path, config: &Config) -> rocksdb::DB { + assert!( + !path.join("newindex").exists(), + "Found obsolete index database layout. Please reindex." + ); + + debug!("opening DB at {:?}", path); + + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + db_opts.set_atomic_flush(true); + db_opts.set_max_open_files(100_000); // TODO: make sure to `ulimit -n` this process correctly + + let parallelism: i32 = config + .db_parallelism + .try_into() + .expect("db_parallelism value too large for i32"); + + // Configure parallelism (background jobs and thread pools) + db_opts.increase_parallelism(parallelism); + + // 4 MiB readahead for compaction I/O. Larger than the previous 1 MiB to better + // amortise syscall overhead when reading the many L0 files accumulated during + // initial sync. + db_opts.set_compaction_readahead_size(4 << 20); + + // Background-sync SST files to the OS incrementally as they are written, + // rather than doing a large fsync on close. Smooths out I/O latency spikes. + db_opts.set_bytes_per_sync(1 << 20); + + // Parallelize sub-ranges within a single compaction job (including the one-time + // full_compaction at the end of initial sync). Without this, compact_range() is + // single-threaded regardless of increase_parallelism(). Setting it equal to the + // parallelism level keeps all background threads busy during the final compaction. + db_opts.set_max_subcompactions(parallelism as u32); + + // Create a single shared LRU cache for all CFs. The total size is + // --db-block-cache-mb (not multiplied by 3). RocksDB's LRU cache is + // thread-safe, so all CFs share one eviction pool. This lets the + // txstore (which holds the bulk of the data) claim as much cache as it + // needs without being artificially capped at 1/3 of the total. + let cache_size_bytes = config.db_block_cache_mb * 1024 * 1024; + let shared_cache = rocksdb::Cache::new_lru_cache(cache_size_bytes); + debug!( + "shared LRU block cache: db_block_cache_mb='{}'", + config.db_block_cache_mb + ); + + let cf_descriptors = [ + ColumnFamilyDescriptor::new(DEFAULT_CF, rocksdb::Options::default()), + ColumnFamilyDescriptor::new(TXSTORE_CF, data_cf_options(config, &shared_cache)), + ColumnFamilyDescriptor::new(HISTORY_CF, data_cf_options(config, &shared_cache)), + ColumnFamilyDescriptor::new(CACHE_CF, data_cf_options(config, &shared_cache)), + ]; + + let db = rocksdb::DB::open_cf_descriptors(&db_opts, path, cf_descriptors) + .expect("failed to open RocksDB"); + + verify_compatibility(&db); + db +} + +fn verify_compatibility(db: &rocksdb::DB) { + let compatibility_bytes = bincode::serialize_little(&DB_VERSION).unwrap(); + let cf = db + .cf_handle(DEFAULT_CF) + .unwrap_or_else(|| panic!("missing RocksDB column family {}", DEFAULT_CF)); + + match db.get_cf(cf, b"V").unwrap().map(|v| v.to_vec()) { + None => db.put_cf(cf, b"V", &compatibility_bytes).unwrap(), + Some(x) if x != compatibility_bytes => { + panic!("Incompatible database found. Please reindex.") + } + Some(_) => (), + } +} + +fn data_cf_options(config: &Config, shared_cache: &rocksdb::Cache) -> rocksdb::Options { + let mut cf_opts = rocksdb::Options::default(); + cf_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); + cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); + cf_opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); + cf_opts.set_target_file_size_base(1_073_741_824); + // Bulk-load compaction: allow L0 files to accumulate to a bounded limit + // before compacting. This reduces write amplification compared to the + // default trigger of 4, while keeping the file count — and therefore + // bloom-filter memory and lookup cost — bounded. + // + // With bloom filters at 10 bits/key and a 128 MB write buffer, each L0 + // file has ~3.9 M keys, so its filter block is ~4.9 MB. At the slowdown + // threshold (96 files) that is ~470 MB of pinned filter blocks per CF, + // ~1.41 GB across 3 data CFs — within a 2 GB cache. At the stop threshold + // (128 files) it is ~628 MB per CF / ~1.88 GB total, still within bounds. + // Previously trigger=64 with 256 MB buffers caused pinned metadata to + // overflow the 2 GB cache at L0=128 (~3.7 GB), spilling to uncontrolled + // heap and triggering OOM. Trigger=32 + slowdown=96 keeps the peak safe + // while allowing enough L0 accumulation for good bulk-load throughput. + // + // Set slowdown/stop triggers well above the compaction trigger so writes + // are never stalled while background compaction catches up. + // Disable the pending-compaction-bytes stall so the large backlog that + // builds up during the bulk load does not block writes. + const L0_BULK_TRIGGER: i32 = 32; + cf_opts.set_level_zero_file_num_compaction_trigger(L0_BULK_TRIGGER); + cf_opts.set_level_zero_slowdown_writes_trigger(L0_BULK_TRIGGER * 3); + cf_opts.set_level_zero_stop_writes_trigger(L0_BULK_TRIGGER * 4); + cf_opts.set_hard_pending_compaction_bytes_limit(0); + cf_opts.set_soft_pending_compaction_bytes_limit(0); + + // Configure write buffer size (not set by increase_parallelism) + cf_opts.set_write_buffer_size(config.db_write_buffer_size_mb * 1024 * 1024); + + // Configure block cache and table options + let mut block_opts = rocksdb::BlockBasedOptions::default(); + block_opts.set_block_cache(shared_cache); + // When --cache-index-filter-blocks is passed, store index and filter blocks + // inside the block cache so their memory is bounded by --db-block-cache-mb. + // Without this (the default), RocksDB keeps them on the heap where they may + // never be evicted — possibly better for read performance compared to needing + // to go to disk, but uses ~18 MB per SST file. + if config.db_cache_index_filter_blocks { + block_opts.set_cache_index_and_filter_blocks(true); + // Pin L0 index and filter blocks in the cache so they are never evicted. + // Without this, data block churn evicts L0 index/filter blocks, causing + // repeated disk reads for every SST lookup. + block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + } + // Bloom filters allow multi_get() to skip SST files that don't contain a key + // without touching the index or data blocks. Without this, every point lookup + // must binary-search the index of every L0 file whose key range overlaps the + // query (all of them for random txids) — extremely expensive with 1000+ L0 + // files accumulated during initial sync. At 10 bits/key the false-positive + // rate is ~1%, so only ~10 out of 1000 L0 files need actual I/O per key. + // Combined with the prefix extractor below, these become prefix Bloom filters + // keyed on `code || hash` (33 bytes), which also allow prefix range scans + // (e.g. history lookups) to skip L0 files entirely. The filter blocks are + // cached and pinned alongside the index blocks via the settings above. + block_opts.set_bloom_filter(10.0, false); + + // All electrs keys share the structure `code (1 byte) || hash (32 bytes) || ...`. + // A 33-byte fixed prefix extractor enables prefix Bloom filters: range scans + // like iter_scan("H" + scripthash) can skip SST files whose Bloom filter + // doesn't match the prefix, rather than checking every L0 file. + // + // INVARIANT: All iter_scan* and raw_iterator methods must use total_order_seek + // when the seek key may be shorter than 33 bytes. Without it, RocksDB silently + // skips SST files that contain matching keys. See the conditional in iter_scan(). + cf_opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(33)); + + cf_opts.set_block_based_table_factory(&block_opts); + cf_opts +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index 0dc92aeaa..b9e16dbe4 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -1,45 +1,16 @@ -use rayon::prelude::*; - -#[cfg(feature = "liquid")] -use crate::elements::ebcompact::*; -#[cfg(not(feature = "liquid"))] -use bitcoin::consensus::encode::{deserialize, Decodable}; -#[cfg(feature = "liquid")] -use elements::encode::{deserialize, Decodable}; - -use std::collections::HashMap; -use std::fs; -use std::io::Cursor; -use std::path::PathBuf; use std::sync::mpsc::Receiver; use std::thread; use electrs_macros::trace; +use itertools::Itertools; use crate::chain::{Block, BlockHash, Txid}; use crate::daemon::Daemon; use crate::errors::*; use crate::util::{spawn_thread, HeaderEntry, SyncChannel}; -#[derive(Clone, Copy, Debug)] -pub enum FetchFrom { - Bitcoind, - BlkFiles, -} - -#[trace] -pub fn start_fetcher( - from: FetchFrom, - daemon: &Daemon, - new_headers: Vec, - batch_size: usize, - chain_tip_height: usize, -) -> Result>> { - match from { - FetchFrom::Bitcoind => bitcoind_fetcher(daemon, new_headers, batch_size, chain_tip_height), - FetchFrom::BlkFiles => blkfiles_fetcher(daemon, new_headers), - } -} +#[cfg(feature = "liquid")] +use crate::elements::ebcompact::{SizeMethod, TxidCompat}; #[derive(Clone)] pub struct BlockEntry { @@ -50,8 +21,6 @@ pub struct BlockEntry { pub txids: Vec, } -type SizedBlock = (Block, u32); - pub struct Fetcher { receiver: Receiver, thread: thread::JoinHandle<()>, @@ -74,7 +43,7 @@ impl Fetcher { } #[trace] -fn bitcoind_fetcher( +pub fn start_fetcher( daemon: &Daemon, new_headers: Vec, batch_size: usize, @@ -91,10 +60,12 @@ fn bitcoind_fetcher( spawn_thread("bitcoind_fetcher", move || { let mut fetcher_count = 0; let total_blocks_fetched = new_headers.len(); - for entries in new_headers.chunks(batch_size) { + for entries in &new_headers.into_iter().chunks(batch_size) { + let entries: Vec<_> = entries.collect(); if fetcher_count % 50 == 0 && total_blocks_fetched >= 50 { let batch_height = entries.last().map(|e| e.height()).unwrap_or(0); - info!("fetching blocks {}/{} ({:.1}%)", + info!( + "fetching blocks {}/{} ({:.1}%)", batch_height, chain_tip_height, batch_height as f32 / chain_tip_height.max(1) as f32 * 100.0 @@ -106,198 +77,24 @@ fn bitcoind_fetcher( let blocks = daemon .getblocks(&blockhashes) .expect("failed to get blocks from bitcoind"); - assert_eq!(blocks.len(), entries.len()); let block_entries: Vec = blocks .into_iter() - .zip(entries) + .zip_eq(entries) .map(|(block, entry)| { let txids = block.txdata.iter().map(|tx| tx.compute_txid()).collect(); BlockEntry { - entry: entry.clone(), // TODO: remove this clone() + entry, size: block.total_size() as u32, txids, block, } }) .collect(); - assert_eq!(block_entries.len(), entries.len()); + log::debug!("last fetch {:?}", block_entries.last().map(|b| &b.entry)); sender .send(block_entries) .expect("failed to send fetched blocks"); - log::debug!("last fetch {:?}", entries.last()); - } - }), - )) -} - -#[trace] -fn blkfiles_fetcher( - daemon: &Daemon, - new_headers: Vec, -) -> Result>> { - let magic = daemon.magic(); - let blk_files = daemon.list_blk_files()?; - let xor_key = daemon.read_blk_file_xor_key()?; - - // Buffer of 2 lets the parser produce one batch ahead of the consumer, - // overlapping block-entry construction with the indexer. - let chan = SyncChannel::new(2); - let sender = chan.sender(); - - let mut entry_map: HashMap = - new_headers.into_iter().map(|h| (*h.hash(), h)).collect(); - - let parser = blkfiles_parser(blkfiles_reader(blk_files, xor_key), magic); - Ok(Fetcher::from( - chan.into_receiver(), - spawn_thread("blkfiles_fetcher", move || { - parser.map(|sizedblocks| { - let block_count = sizedblocks.len(); - let mut index = 0; - let block_entries: Vec = sizedblocks - .into_iter() - .filter_map(|(block, size)| { - index += 1; - debug!("fetch block {:}/{:} {:.2}%", - index, - block_count, - (index/block_count) as f32/100.0 - ); - let blockhash = block.block_hash(); - entry_map - .remove(&blockhash) - .map(|entry| { - let txids = block.txdata.iter().map(|tx| tx.compute_txid()).collect(); - BlockEntry { block, entry, size, txids } - }) - .or_else(|| { - trace!("skipping block {}", blockhash); - None - }) - }) - .collect(); - trace!("fetched {} blocks", block_entries.len()); - sender - .send(block_entries) - .expect("failed to send blocks entries from blk*.dat files"); - }); - if !entry_map.is_empty() { - panic!( - "failed to index {} blocks from blk*.dat files", - entry_map.len() - ) } }), )) } - -#[trace] -fn blkfiles_reader(blk_files: Vec, xor_key: Option<[u8; 8]>) -> Fetcher> { - // Buffer of 2 lets the reader read ahead by one blk file while the parser - // is working, overlapping sequential disk I/O with CPU deserialization. - let chan = SyncChannel::new(2); - let sender = chan.sender(); - - Fetcher::from( - chan.into_receiver(), - spawn_thread("blkfiles_reader", move || { - let blk_files_len = blk_files.len(); - for (count, path) in blk_files.iter().enumerate() { - info!("block file reading {:}/{:} {:.2}%", - count, - blk_files_len, - count / blk_files_len - ); - - trace!("reading {:?}", path); - let mut blob = fs::read(&path) - .unwrap_or_else(|e| panic!("failed to read {:?}: {:?}", path, e)); - if let Some(xor_key) = xor_key { - blkfile_apply_xor_key(xor_key, &mut blob); - } - sender - .send(blob) - .unwrap_or_else(|_| panic!("failed to send {:?} contents", path)); - } - }), - ) -} - -/// By default, bitcoind v28.0+ applies an 8-byte "xor key" over each "blk*.dat" -/// file. We have xor again to undo this transformation. -fn blkfile_apply_xor_key(xor_key: [u8; 8], blob: &mut [u8]) { - for (i, blob_i) in blob.iter_mut().enumerate() { - *blob_i ^= xor_key[i & 0x7]; - } -} - -#[trace] -fn blkfiles_parser(blobs: Fetcher>, magic: u32) -> Fetcher> { - // Buffer of 2 lets the parser stay one batch ahead of the fetcher stage. - let chan = SyncChannel::new(2); - let sender = chan.sender(); - - Fetcher::from( - chan.into_receiver(), - spawn_thread("blkfiles_parser", move || { - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(0) // CPU-bound - .thread_name(|i| format!("parse-blocks-{}", i)) - .build() - .unwrap(); - blobs.map(|blob| { - trace!("parsing {} bytes", blob.len()); - let blocks = parse_blocks(&pool, blob, magic).expect("failed to parse blk*.dat file"); - sender - .send(blocks) - .expect("failed to send blocks from blk*.dat file"); - }); - }), - ) -} - -#[trace] -fn parse_blocks(pool: &rayon::ThreadPool, blob: Vec, magic: u32) -> Result> { - let mut cursor = Cursor::new(&blob); - let mut slices = vec![]; - let max_pos = blob.len() as u64; - - while cursor.position() < max_pos { - let offset = cursor.position(); - match u32::consensus_decode(&mut cursor) { - Ok(value) => { - if magic != value { - cursor.set_position(offset + 1); - continue; - } - } - Err(_) => break, // EOF - }; - let block_size = u32::consensus_decode(&mut cursor).chain_err(|| "no block size")?; - let start = cursor.position(); - let end = start + block_size as u64; - - // If Core's WriteBlockToDisk ftell fails, only the magic bytes and size will be written - // and the block body won't be written to the blk*.dat file. - // Since the first 4 bytes should contain the block's version, we can skip such blocks - // by peeking the cursor (and skipping previous `magic` and `block_size`). - match u32::consensus_decode(&mut cursor) { - Ok(value) => { - if magic == value { - cursor.set_position(start); - continue; - } - } - Err(_) => break, // EOF - } - slices.push((&blob[start as usize..end as usize], block_size)); - cursor.set_position(end as u64); - } - - Ok(pool.install(|| { - slices - .into_par_iter() - .map(|(slice, size)| (deserialize(slice).expect("failed to parse Block"), size)) - .collect() - })) -} diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index b82fbe2aa..aa57028df 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -7,7 +7,7 @@ use electrs_macros::trace; #[cfg(feature = "liquid")] use elements::{encode::serialize, AssetId}; -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use std::iter::FromIterator; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -449,7 +449,7 @@ impl Mempool { } #[trace] - pub fn lookup_txos(&self, outpoints: BTreeSet) -> Result> { + pub fn lookup_txos(&self, outpoints: Vec) -> Result> { let _timer = self .latency .with_label_values(&["lookup_txos"]) diff --git a/src/new_index/mod.rs b/src/new_index/mod.rs index f82291e55..047852563 100644 --- a/src/new_index/mod.rs +++ b/src/new_index/mod.rs @@ -8,7 +8,7 @@ pub mod schema; pub mod zmq; pub use self::db::{DBRow, DB}; -pub use self::fetch::{BlockEntry, FetchFrom}; +pub use self::fetch::BlockEntry; pub use self::mempool::Mempool; pub use self::query::Query; pub use self::schema::{ diff --git a/src/new_index/query.rs b/src/new_index/query.rs index 8cae86be5..69024fbc7 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -124,19 +124,19 @@ impl Query { #[trace] pub fn lookup_txn(&self, txid: &Txid) -> Option { self.chain - .lookup_txn(txid, None) + .lookup_txn(txid) .or_else(|| self.mempool().lookup_txn(txid)) } #[trace] pub fn lookup_raw_txn(&self, txid: &Txid) -> Option { self.chain - .lookup_raw_txn(txid, None) + .lookup_raw_txn(txid) .or_else(|| self.mempool().lookup_raw_txn(txid)) } #[trace] - pub fn lookup_txos(&self, outpoints: BTreeSet) -> HashMap { + pub fn lookup_txos(&self, outpoints: Vec) -> HashMap { // the mempool lookup_txos() internally looks up confirmed txos as well self.mempool() .lookup_txos(outpoints) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 23193ba2d..75949e2c3 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -1,5 +1,4 @@ use bitcoin::hashes::{sha256, sha256d::Hash as Sha256dHash, Hash}; -use bitcoin::hex::FromHex; #[cfg(not(feature = "liquid"))] use bitcoin::merkle_tree::MerkleBlock; @@ -17,11 +16,10 @@ use elements::{ use std::collections::{BTreeSet, HashMap, HashSet}; use std::convert::TryInto; +#[cfg(not(feature = "liquid"))] +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock, RwLockReadGuard}; -use crate::{chain::{ - BlockHash, BlockHeader, Network, OutPoint, Script, Transaction, TxOut, Txid, Value, -}, new_index::db_metrics::RocksDbMetrics}; use crate::config::Config; use crate::daemon::Daemon; use crate::errors::*; @@ -30,12 +28,21 @@ use crate::util::{ bincode, full_hash, has_prevout, is_spendable, BlockHeaderMeta, BlockId, BlockMeta, BlockStatus, Bytes, HeaderEntry, HeaderList, ScriptToAddr, }; +use crate::{ + chain::{ + Block, BlockHash, BlockHeader, Network, OutPoint, Script, Transaction, TxOut, Txid, Value, + }, + new_index::db_metrics::RocksDbMetrics, +}; -use crate::new_index::db::{DBFlush, DBRow, ReverseScanIterator, ScanIterator, DB}; -use crate::new_index::fetch::{start_fetcher, BlockEntry, FetchFrom}; +use crate::new_index::db::{self, DBFlush, DBRow, ReverseScanIterator, ScanIterator, DB}; +use crate::new_index::fetch::{start_fetcher, BlockEntry}; + +#[cfg(feature = "liquid")] +use crate::elements::{asset, peg}; #[cfg(feature = "liquid")] -use crate::elements::{asset, ebcompact::TxidCompat, peg}; +use crate::elements::ebcompact::SizeMethod; #[cfg(feature = "liquid")] use elements::encode::VarInt; @@ -46,7 +53,8 @@ use bitcoin::VarInt; const MIN_HISTORY_ITEMS_TO_CACHE: usize = 100; pub struct Store { - // TODO: should be column families + db: Arc, + default_db: DB, txstore_db: DB, history_db: DB, cache_db: DB, @@ -56,60 +64,61 @@ pub struct Store { } impl Store { - pub fn open(config: &Config, metrics: &Metrics, verify_compat: bool) -> Self { - let path = config.db_path.join("newindex"); - - // Create a single shared LRU cache for all three DBs. The total size is - // --db-block-cache-mb (not multiplied by 3). RocksDB's LRU cache is - // thread-safe, so all DBs share one eviction pool. This lets the - // txstore (which holds the bulk of the data) claim as much cache as it - // needs without being artificially capped at 1/3 of the total. - let cache_size_bytes = config.db_block_cache_mb * 1024 * 1024; - let shared_cache = rocksdb::Cache::new_lru_cache(cache_size_bytes); - debug!("shared LRU block cache: db_block_cache_mb='{}'", config.db_block_cache_mb); - - let txstore_db = DB::open(&path.join("txstore"), config, verify_compat, &shared_cache); + pub fn open(config: &Config, metrics: &Metrics) -> Self { + let db = Arc::new(db::open_rocksdb(&config.db_path, config)); + let default_db = DB::default_cf(Arc::clone(&db)); + let txstore_db = DB::txstore_cf(Arc::clone(&db)); + let history_db = DB::history_cf(Arc::clone(&db)); + let cache_db = DB::cache_cf(Arc::clone(&db)); + let added_blockhashes = load_blockhashes(&txstore_db, &BlockRow::done_filter()); info!("{} blocks were added", added_blockhashes.len()); - let history_db = DB::open(&path.join("history"), config, verify_compat, &shared_cache); let indexed_blockhashes = load_blockhashes(&history_db, &BlockRow::done_filter()); info!("{} blocks were indexed", indexed_blockhashes.len()); - let cache_db = DB::open(&path.join("cache"), config, verify_compat, &shared_cache); - let db_metrics = Arc::new(RocksDbMetrics::new(&metrics)); txstore_db.start_stats_exporter(Arc::clone(&db_metrics), "txstore_db"); history_db.start_stats_exporter(Arc::clone(&db_metrics), "history_db"); cache_db.start_stats_exporter(Arc::clone(&db_metrics), "cache_db"); - let headers = if let Some(tip_hash) = txstore_db.get(b"t") { - let mut tip_hash = deserialize(&tip_hash).expect("invalid chain tip in `t`"); + // Construct the `HeaderList` from persisted `t` and `D` markers + let headers = if let Some(tip_hash) = default_db.get(b"t") { + let tip_hash = deserialize(&tip_hash).expect("invalid chain tip in `t`"); let headers_map = load_blockheaders(&txstore_db); - // Move the tip back until we reach a block that is indexed in the history db. - // It is possible for the tip recorded under the db "t" key to be un-indexed if electrs - // shuts down during reorg handling. Normally this wouldn't matter because the non-indexed - // block would be stale, but it could matter if the chain later re-orged back to - // include the previously stale block because more blocks were built on top of it. - // Without this, the stale-then-not-stale block(s) would not get re-indexed correctly. - while !indexed_blockhashes.contains(&tip_hash) { - tip_hash = headers_map - .get(&tip_hash) - .expect("invalid header chain") - .prev_blockhash; - } info!( "{} headers were loaded, tip at {:?}", headers_map.len(), tip_hash ); - HeaderList::new(headers_map, tip_hash) + // First build and validate the header chain by walking backwards from `t`. + let mut headers = HeaderList::new(headers_map, tip_hash); + + // Then, walk forward from genesis and roll back to the last block + // with both completion markers. Under normal publication rules this + // should keep the whole `t` chain. The trim is a fail-closed guard + // if `t` and completion markers disagree. + let fully_indexed_blockhashes: HashSet<_> = added_blockhashes + .intersection(&indexed_blockhashes) + .copied() + .collect(); + let fully_indexed_len = headers + .iter() + .take_while(|entry| fully_indexed_blockhashes.contains(entry.hash())) + .count(); + let _ = headers.pop(fully_indexed_len); + // The popped headers are only hidden from the Store::open() midstate. + // Their rows remain in RocksDB. The first daemon-aware update() can + // finish still-best work or undo hidden stale history. + headers } else { HeaderList::empty() }; Store { + db, + default_db, txstore_db, history_db, cache_db, @@ -135,8 +144,13 @@ impl Store { self.indexed_headers.read().unwrap() } - pub fn done_initial_sync(&self) -> bool { - self.txstore_db.get(b"t").is_some() + fn flush_block_writes(&self) { + let cfs = [self.txstore_db.cf(), self.history_db.cf()]; + self.db.flush_cfs_opt(&cfs, &Default::default()).unwrap(); + } + + fn update_tip(&self, tip: &BlockHash) { + self.default_db.put_sync(b"t", &serialize(tip)); } } @@ -201,20 +215,22 @@ impl ScriptStats { pub struct Indexer { store: Arc, flush: DBFlush, - from: FetchFrom, iconfig: IndexerConfig, duration: HistogramVec, tip_metric: Gauge, sync_height: Gauge, sync_progress: prometheus::Gauge, + // Always true initially, cleared after the first update() completes the startup stale sweep + pending_startup_recovery: bool, } struct IndexerConfig { - light_mode: bool, address_search: bool, index_unspendables: bool, network: Network, block_batch_size: usize, + #[cfg(not(feature = "liquid"))] + use_spenttxouts: bool, #[cfg(feature = "liquid")] parent_network: crate::chain::BNetwork, } @@ -222,11 +238,12 @@ struct IndexerConfig { impl From<&Config> for IndexerConfig { fn from(config: &Config) -> Self { IndexerConfig { - light_mode: config.light_mode, address_search: config.address_search, index_unspendables: config.index_unspendables, network: config.network_type, block_batch_size: config.initial_sync_batch_size, + #[cfg(not(feature = "liquid"))] + use_spenttxouts: config.use_spenttxouts, #[cfg(feature = "liquid")] parent_network: config.parent_network, } @@ -235,19 +252,16 @@ impl From<&Config> for IndexerConfig { pub struct ChainQuery { store: Arc, // TODO: should be used as read-only - daemon: Arc, - light_mode: bool, duration: HistogramVec, network: Network, } // TODO: &[Block] should be an iterator / a queue. impl Indexer { - pub fn open(store: Arc, from: FetchFrom, config: &Config, metrics: &Metrics) -> Self { + pub fn open(store: Arc, config: &Config, metrics: &Metrics) -> Self { Indexer { store, flush: DBFlush::Disable, - from, iconfig: IndexerConfig::from(config), duration: metrics.histogram_vec( HistogramOpts::new("index_duration", "Index update duration (in seconds)"), @@ -262,6 +276,7 @@ impl Indexer { "initial_sync_progress_pct", "Initial sync progress as a percentage of the best known chain height", )), + pending_startup_recovery: true, } } @@ -270,13 +285,21 @@ impl Indexer { } // Headers that need any work: either not yet added to txstore or not yet indexed to history. - fn headers_to_process(&self, new_headers: &[HeaderEntry]) -> Vec { + fn headers_to_process(&self, new_headers: &[HeaderEntry]) -> Vec { let added = self.store.added_blockhashes.read().unwrap(); let indexed = self.store.indexed_blockhashes.read().unwrap(); new_headers .iter() - .filter(|e| !added.contains(e.hash()) || !indexed.contains(e.hash())) - .cloned() + .filter_map(|entry| { + let hash = entry.hash(); + let need_txstore = !added.contains(hash); + let need_history = !indexed.contains(hash); + (need_txstore || need_history).then(|| HeaderWork { + entry: entry.clone(), + need_txstore, + need_history, + }) + }) .collect() } @@ -312,6 +335,15 @@ impl Indexer { let (new_headers, reorged_since) = self.get_new_headers(&daemon, &tip)?; let chain_tip_height = new_headers.last().map(|h| h.height()).unwrap_or(0); + // Run a one-time recovery on the first startup to sweep stale history db entries + // from blocks not part of the `indexed_headers` chain loaded from DB or the `new_headers`. + // This can be necessary if electrs crashes while processing a reorg, or if it partially + // processed new blocks that became stale while electrs was down. + if self.pending_startup_recovery { + self.stale_history_startup_recovery(&new_headers)?; + self.pending_startup_recovery = false; + } + // Handle reorgs by undoing the reorged (stale) blocks first if let Some(reorged_since) = reorged_since { // Remove reorged headers from the in-memory HeaderList. @@ -319,7 +351,7 @@ impl Indexer { // (even before the rows are deleted below), since they reference block heights that will no longer exist. // This ensures consistency - it is not possible for blocks to be available (e.g. in GET /blocks/tip or /block/:hash) // without the corresponding history entries for these blocks (e.g. in GET /address/:address/txs), or vice-versa. - let mut reorged_headers = self + let (reorged_headers, common_ancestor) = self .store .indexed_headers .write() @@ -334,18 +366,120 @@ impl Indexer { reorged_since, ); - // Reorged blocks are undone in chunks of 100, processed in serial, each as an atomic batch. - // Reverse them so that chunks closest to the chain tip are processed first, - // which is necessary to properly recover from crashes during reorg handling. - // Also see the comment under `Store::open()`. - reorged_headers.reverse(); + // Persist the common ancestor tip before deleting reorged history rows. If electrs crashes before + // reorg cleanup completes, the next startup will recover from this tip and complete the cleanup. + self.store.update_tip(&common_ancestor); - // Fetch the reorged blocks, then undo their history index db rows. + // Reconstruct the reorged blocks locally, then undo their history index db rows. // The txstore db rows are kept for reorged blocks/transactions. - start_fetcher(self.from, &daemon, reorged_headers, self.iconfig.block_batch_size, chain_tip_height)? - .map(|blocks| self.undo_index(&blocks)); + self.process_stale(reorged_headers)?; + + // Flush deletions prior to processing new blocks + self.store.history_db.flush(); + } + + self.process_blocks(&daemon, &new_headers, chain_tip_height)?; + + // Compact after all add+index work is done, not between passes. + self.start_auto_compactions(&self.store.txstore_db); + self.start_auto_compactions(&self.store.history_db); + self.start_auto_compactions(&self.store.cache_db); + + if let DBFlush::Disable = self.flush { + let t = std::time::Instant::now(); + info!("flushing block writes to disk"); + self.store.flush_block_writes(); + info!("flushing block writes complete in {:.1?}", t.elapsed()); + + self.flush = DBFlush::Enable; + } + + // Update the synced tip after all db writes are flushed + debug!("updating synced tip to {:?}", tip); + self.store.update_tip(&tip); + + // Finally, append the new headers to the in-memory HeaderList. + // This will make both the headers and the history entries visible in the public APIs, consistently with each-other. + let mut headers = self.store.indexed_headers.write().unwrap(); + headers.append(new_headers); + assert_eq!(tip, *headers.tip()); + + self.tip_metric.set(headers.best_height() as i64); + + Ok(tip) + } + + fn stale_history_startup_recovery(&self, new_headers: &[HeaderEntry]) -> Result<()> { + // Store::open() has no daemon view, so it only finds a safe chain tip that's fully + // indexed but does not attempt to clean up stale history db entries. On the first + // update(), new_headers identify bitcoind's best chain so stale blocks can be + // reconstructed locally and cleaned up without asking bitcoind for stale data. + let stale_blockhashes: Vec<_> = { + let to_keep_blockhashes = new_headers + .iter() + .chain(self.store.indexed_headers.read().unwrap().iter()) + .map(|entry| *entry.hash()) + .collect::>(); + let indexed_blockhashes = self.store.indexed_blockhashes.read().unwrap(); + + indexed_blockhashes + .difference(&to_keep_blockhashes) + .copied() + .collect() + }; + + if !stale_blockhashes.is_empty() { + let stale_headers: Vec<_> = stale_blockhashes + .iter() + .map(|hash| load_header_entry(&self.store.txstore_db, hash)) + .collect(); + info!("cleaning up {} stale blocks", stale_headers.len()); + self.process_stale(stale_headers)?; + self.store.history_db.flush(); + } + + Ok(()) + } + + fn process_stale(&self, headers: Vec) -> Result<()> { + for headers in &headers.into_iter().chunks(self.iconfig.block_batch_size) { + let blocks = headers + .map(|entry| load_block_entry(&self.store.txstore_db, entry)) + .collect::>>()?; + let block_refs = blocks.iter().collect::>(); + self.undo_index(&block_refs); + } + + Ok(()) + } + + fn process_blocks( + &self, + daemon: &Daemon, + new_headers: &[HeaderEntry], + chain_tip_height: usize, + ) -> Result<()> { + let to_process = self.headers_to_process(new_headers); + + if to_process.is_empty() { + return Ok(()); + } + + #[cfg(not(feature = "liquid"))] + if self.iconfig.use_spenttxouts { + return self.process_blocks_spenttxouts(daemon, to_process, chain_tip_height); } + self.process_blocks_legacy(daemon, to_process, chain_tip_height) + } + + fn process_blocks_legacy( + &self, + daemon: &Daemon, + to_process: Vec, + chain_tip_height: usize, + ) -> Result<()> { + let _timer = self.start_timer("process_blocks"); // Single-pass: add to txstore and index to history in the same per-batch loop. // // In the old two-pass approach, txstore_db was fully compacted between the add @@ -360,17 +494,20 @@ impl Indexer { // Crash safety: added_blockhashes / indexed_blockhashes are persisted via the // "D" done-marker rows. On restart, headers_to_process() re-derives which // blocks still need work, so partially-processed batches are re-processed safely. - let to_process = self.headers_to_process(&new_headers); - debug!( - "processing {} blocks (add + index) using {:?}", - to_process.len(), - self.from - ); + debug!("processing {} blocks (add + index)", to_process.len()); let mut fetcher_count = 0; let to_process_total = to_process.len(); + let headers_to_fetch = to_process.iter().map(|work| work.entry.clone()).collect(); + let mut to_process_batches = to_process.chunks(self.iconfig.block_batch_size); - start_fetcher(self.from, &daemon, to_process, self.iconfig.block_batch_size, chain_tip_height)?.map(|blocks| { + start_fetcher( + daemon, + headers_to_fetch, + self.iconfig.block_batch_size, + chain_tip_height, + )? + .map(|blocks| { if fetcher_count % 25 == 0 && to_process_total > 20 { let batch_height = blocks.last().map(|b| b.entry.height()).unwrap_or(0); info!( @@ -382,25 +519,23 @@ impl Indexer { } fetcher_count += 1; + let to_process_batch = to_process_batches.next().unwrap(); + // Add blocks not yet in txstore (idempotent: crash recovery skips already-added blocks) - let to_add: Vec<_> = { - let added = self.store.added_blockhashes.read().unwrap(); - blocks - .iter() - .filter(|b| !added.contains(b.entry.hash())) - .cloned() - .collect() - }; + let to_add: Vec<_> = blocks + .iter() + .zip_eq(to_process_batch.iter()) + .filter(|(_, work)| work.need_txstore) + .map(|(block, _)| block) + .collect(); // Index blocks not yet in history (O rows for to_add are now in the write buffer) - let to_index: Vec<_> = { - let indexed = self.store.indexed_blockhashes.read().unwrap(); - blocks - .iter() - .filter(|b| !indexed.contains(b.entry.hash())) - .cloned() - .collect() - }; + let to_index: Vec<_> = blocks + .iter() + .zip_eq(to_process_batch.iter()) + .filter(|(_, work)| work.need_history) + .map(|(block, _)| block) + .collect(); if !to_add.is_empty() || !to_index.is_empty() { let _batch_timer = self.start_timer("batch_total"); @@ -415,61 +550,119 @@ impl Indexer { let h = last.entry.height(); self.sync_height.set(h as i64); if chain_tip_height > 0 { - self.sync_progress.set(h as f64 / chain_tip_height as f64 * 100.0); + self.sync_progress + .set(h as f64 / chain_tip_height as f64 * 100.0); } } }); + assert!(to_process_batches.next().is_none()); - // Compact after all add+index work is done, not between passes. - self.start_auto_compactions(&self.store.txstore_db); - self.start_auto_compactions(&self.store.history_db); - self.start_auto_compactions(&self.store.cache_db); - - if let DBFlush::Disable = self.flush { - let t = std::time::Instant::now(); - info!("flushing txstore_db to disk"); - self.store.txstore_db.flush(); - info!("flushing txstore_db complete in {:.1?}", t.elapsed()); + Ok(()) + } - let t = std::time::Instant::now(); - info!("flushing history_db to disk"); - self.store.history_db.flush(); - info!("flushing history_db complete in {:.1?}", t.elapsed()); + #[cfg(not(feature = "liquid"))] + fn process_blocks_spenttxouts( + &self, + daemon: &Daemon, + to_process: Vec, + chain_tip_height: usize, + ) -> Result<()> { + let _timer = self.start_timer("process_blocks"); + debug!("processing {} blocks (spenttxouts)", to_process.len()); + + let total_blocks = to_process.len(); + let processed_blocks = AtomicUsize::new(0); + let highest_completed = AtomicUsize::new(0); + + daemon.with_request_pool(|| { + to_process.into_par_iter().try_for_each(|work| { + self.process_block_spenttxouts( + daemon, + work, + total_blocks, + chain_tip_height, + &processed_blocks, + &highest_completed, + ) + }) + })?; - // cache_db receives WAL-disabled writes when --address-search is enabled, - // so it needs the same explicit flush to ensure durability. - let t = std::time::Instant::now(); - info!("flushing cache_db to disk"); - self.store.cache_db.flush(); - info!("flushing cache_db complete in {:.1?}", t.elapsed()); + Ok(()) + } - self.flush = DBFlush::Enable; + #[cfg(not(feature = "liquid"))] + fn process_block_spenttxouts( + &self, + daemon: &Daemon, + work: HeaderWork, + total_blocks: usize, + chain_tip_height: usize, + processed_blocks: &AtomicUsize, + highest_completed: &AtomicUsize, + ) -> Result<()> { + let block_entry = fetch_block_entry(daemon, work.entry)?; + + if work.need_txstore { + self.store + .txstore_db + .write_rows(add_block(&block_entry, &self.iconfig), self.flush); + self.store + .added_blockhashes + .write() + .unwrap() + .insert(*block_entry.entry.hash()); } - // Update the synced tip after all db writes are flushed - debug!("updating synced tip to {:?}", tip); - self.store.txstore_db.put_sync(b"t", &serialize(&tip)); - - // Finally, append the new headers to the in-memory HeaderList. - // This will make both the headers and the history entries visible in the public APIs, consistently with each-other. - let mut headers = self.store.indexed_headers.write().unwrap(); - headers.append(new_headers); - assert_eq!(tip, *headers.tip()); - - if let FetchFrom::BlkFiles = self.from { - self.from = FetchFrom::Bitcoind; + if work.need_history { + // Need to flatten() because Bitcoin Core returns spenttxouts as a per-tx vector of + // prevouts, while the indexer expects a per-block prevouts vector. + let previous_txos: Vec = daemon + .get_spent_txouts(block_entry.entry.hash())? + .into_iter() + .flatten() + .collect(); + self.store.history_db.write_rows( + index_block(&block_entry, &previous_txos, &self.iconfig), + self.flush, + ); + self.store + .indexed_blockhashes + .write() + .unwrap() + .insert(*block_entry.entry.hash()); } - self.tip_metric.set(headers.best_height() as i64); + let processed = processed_blocks.fetch_add(1, Ordering::Relaxed) + 1; + self.sync_progress + .set(processed as f64 / total_blocks.max(1) as f64 * 100.0); + let height = block_entry.entry.height(); + // Workers complete out of order, so this is only a best-effort highest completed height, + // not the last contiguous fully-processed best-chain height. + let highest = highest_completed + .fetch_max(height, Ordering::Relaxed) + .max(height); + self.sync_height.set(highest as i64); + if processed % 6000 == 0 && total_blocks > 20 { + info!( + "processing blocks {}/{} ({:.1}%)", + highest, + chain_tip_height, + processed as f32 / total_blocks.max(1) as f32 * 100.0 + ); + } - Ok(tip) + Ok(()) } - fn add(&self, blocks: &[BlockEntry]) { + fn add(&self, blocks: &[&BlockEntry]) { // TODO: skip orphaned blocks? let rows = { let _timer = self.start_timer("add_process"); - add_blocks(blocks, &self.iconfig) + blocks + .par_iter() // serialization is CPU-intensive + .map(|b| add_block(b, &self.iconfig)) + .flatten() + .collect() }; { let _timer = self.start_timer("add_write"); @@ -483,7 +676,7 @@ impl Indexer { .extend(blocks.iter().map(|b| b.entry.hash())); } - fn index(&self, blocks: &[BlockEntry]) { + fn index(&self, blocks: &[&BlockEntry]) { self.store .history_db .write_rows(self._index(blocks), self.flush); @@ -498,7 +691,7 @@ impl Indexer { // // This does *not* remove any txstore db entries, which are intentionally kept // even for reorged blocks. - fn undo_index(&self, blocks: &[BlockEntry]) { + fn undo_index(&self, blocks: &[&BlockEntry]) { self.store .history_db .delete_rows(self._index(blocks), self.flush); @@ -514,37 +707,45 @@ impl Indexer { } } - fn _index(&self, blocks: &[BlockEntry]) -> Vec { - let previous_txos_map = { + fn _index(&self, blocks: &[&BlockEntry]) -> Vec { + let previous_txos_by_block: Vec> = { let _timer = self.start_timer("index_lookup"); - lookup_txos(&self.store.txstore_db, get_previous_txos(blocks)).unwrap() + let previous_outpoints_by_block = get_prev_outpoints_by_block(blocks); + // Use a multi_get to fetch previous txos for the whole block batch as a flat vector, + // then split it back into the per-block vectors expected by the indexer. + let mut previous_txos = lookup_txos( + &self.store.txstore_db, + previous_outpoints_by_block.iter().flatten(), + ) + .unwrap() + .into_iter(); + previous_outpoints_by_block + .iter() + .map(|outpoints| previous_txos.by_ref().take(outpoints.len()).collect()) + .collect() }; - let rows = { - let _timer = self.start_timer("index_process"); - let added_blockhashes = self.store.added_blockhashes.read().unwrap(); - for b in blocks { - let blockhash = b.entry.hash(); - // TODO: replace by lookup into txstore_db? - if !added_blockhashes.contains(blockhash) { - panic!("cannot index block {} (missing from store)", blockhash); - } + let _timer = self.start_timer("index_process"); + let added_blockhashes = self.store.added_blockhashes.read().unwrap(); + for b in blocks { + let blockhash = b.entry.hash(); + // TODO: replace by lookup into txstore_db? + if !added_blockhashes.contains(blockhash) { + panic!("cannot index block {} (missing from store)", blockhash); } - index_blocks(blocks, &previous_txos_map, &self.iconfig) - }; - rows - } - - pub fn fetch_from(&mut self, from: FetchFrom) { - self.from = from; + } + blocks + .par_iter() // serialization is CPU-intensive + .zip_eq(&previous_txos_by_block) + .map(|(b, previous_txos)| index_block(b, previous_txos, &self.iconfig)) + .flatten() + .collect() } } impl ChainQuery { - pub fn new(store: Arc, daemon: Arc, config: &Config, metrics: &Metrics) -> Self { + pub fn new(store: Arc, config: &Config, metrics: &Metrics) -> Self { ChainQuery { store, - daemon, - light_mode: config.light_mode, network: config.network_type, duration: metrics.histogram_vec( HistogramOpts::new("query_duration", "Index query duration (in seconds)"), @@ -567,16 +768,7 @@ impl ChainQuery { pub fn get_block_txids(&self, hash: &BlockHash) -> Option> { let _timer = self.start_timer("get_block_txids"); - if self.light_mode { - // TODO fetch block as binary from REST API instead of as hex - let mut blockinfo = self.daemon.getblock_raw(hash, 1).ok()?; - Some(serde_json::from_value(blockinfo["tx"].take()).unwrap()) - } else { - self.store - .txstore_db - .get(&BlockRow::txids_key(full_hash(&hash[..]))) - .map(|val| bincode::deserialize_little(&val).expect("failed to parse block txids")) - } + load_block_txids(&self.store.txstore_db, hash) } pub fn get_block_txs( @@ -587,62 +779,44 @@ impl ChainQuery { ) -> Result> { let txids = self.get_block_txids(hash).chain_err(|| "block not found")?; ensure!(start_index < txids.len(), "start index out of range"); - - let txids_with_blockhash = txids - .into_iter() - .skip(start_index) - .take(limit) - .map(|txid| (txid, *hash)) - .collect::>(); - - self.lookup_txns(&txids_with_blockhash) - - // XXX use getblock in lightmode? a single RPC call, but would fetch all txs to get one page - // self.daemon.getblock(hash)?.txdata.into_iter().skip(start_index).take(limit).collect() + lookup_txns( + &self.store.txstore_db, + &txids + .into_iter() + .skip(start_index) + .take(limit) + .collect::>(), + ) } pub fn get_block_meta(&self, hash: &BlockHash) -> Option { let _timer = self.start_timer("get_block_meta"); - - if self.light_mode { - let blockinfo = self.daemon.getblock_raw(hash, 1).ok()?; - Some(serde_json::from_value(blockinfo).unwrap()) - } else { - self.store - .txstore_db - .get(&BlockRow::meta_key(full_hash(&hash[..]))) - .map(|val| bincode::deserialize_little(&val).expect("failed to parse BlockMeta")) - } + self.store + .txstore_db + .get(&BlockRow::meta_key(full_hash(&hash[..]))) + .map(|val| bincode::deserialize_little(&val).expect("failed to parse BlockMeta")) } pub fn get_block_raw(&self, hash: &BlockHash) -> Option> { let _timer = self.start_timer("get_block_raw"); - if self.light_mode { - let blockval = self.daemon.getblock_raw(hash, 0).ok()?; - let blockhex = blockval.as_str().expect("valid block from bitcoind"); - Some(Vec::from_hex(blockhex).expect("valid block from bitcoind")) - } else { - let entry = self.header_by_hash(hash)?; - let meta = self.get_block_meta(hash)?; - let txids = self.get_block_txids(hash)?; - let txids_with_blockhash: Vec<_> = - txids.into_iter().map(|txid| (txid, *hash)).collect(); - let raw_txs = self.lookup_raw_txns(&txids_with_blockhash).ok()?; // TODO avoid hiding all errors as None, return a Result - - // Reconstruct the raw block using the header and txids, - // as - let mut raw = Vec::with_capacity(meta.size as usize); - - raw.append(&mut serialize(entry.header())); - raw.append(&mut serialize(&VarInt(raw_txs.len() as u64))); - - for mut raw_tx in raw_txs { - raw.append(&mut raw_tx); - } + let entry = self.header_by_hash(hash)?; + let meta = self.get_block_meta(hash)?; + let txids = self.get_block_txids(hash)?; + let raw_txs = lookup_raw_txns(&self.store.txstore_db, &txids).ok()?; // TODO avoid hiding all errors as None, return a Result + + // Reconstruct the raw block using the header and txids, + // as + let mut raw = Vec::with_capacity(meta.size as usize); + + raw.append(&mut serialize(entry.header())); + raw.append(&mut serialize(&VarInt(raw_txs.len() as u64))); - Some(raw) + for mut raw_tx in raw_txs { + raw.append(&mut raw_tx); } + + Some(raw) } pub fn get_block_header(&self, hash: &BlockHash) -> Option { @@ -722,15 +896,15 @@ impl ChainQuery { .filter_map(|(txid, height)| Some((txid, headers.header_by_height(height)?))) .take(limit); - let mut txids_with_blockhash = Vec::with_capacity(limit); + let mut txids = Vec::with_capacity(limit); let mut blockids = Vec::with_capacity(limit); for (txid, header) in history_iter { - txids_with_blockhash.push((txid, *header.hash())); + txids.push(txid); blockids.push(BlockId::from(header)); } drop(headers); - self.lookup_txns(&txids_with_blockhash) + self.lookup_txns(&txids) .expect("failed looking up txs in history index") .into_iter() .zip(blockids) @@ -1047,59 +1221,25 @@ impl ChainQuery { .clone() } - pub fn lookup_txns(&self, txids: &[(Txid, BlockHash)]) -> Result> { + pub fn lookup_txns(&self, txids: &[Txid]) -> Result> { let _timer = self.start_timer("lookup_txns"); - Ok(self - .lookup_raw_txns(txids)? - .into_iter() - .map(|rawtx| deserialize(&rawtx).expect("failed to parse Transaction")) - .collect()) + lookup_txns(&self.store.txstore_db, txids) } - pub fn lookup_txn(&self, txid: &Txid, blockhash: Option<&BlockHash>) -> Option { + pub fn lookup_txn(&self, txid: &Txid) -> Option { let _timer = self.start_timer("lookup_txn"); - let rawtx = self.lookup_raw_txn(txid, blockhash)?; + let rawtx = self.lookup_raw_txn(txid)?; Some(deserialize(&rawtx).expect("failed to parse Transaction")) } - pub fn lookup_raw_txns(&self, txids: &[(Txid, BlockHash)]) -> Result> { + pub fn lookup_raw_txns(&self, txids: &[Txid]) -> Result> { let _timer = self.start_timer("lookup_raw_txns"); - if self.light_mode { - txids - .par_iter() - .map(|(txid, blockhash)| { - self.lookup_raw_txn(txid, Some(blockhash)) - .chain_err(|| "missing tx") - }) - .collect() - } else { - let keys = txids.iter().map(|(txid, _)| TxRow::key(&txid[..])); - self.store - .txstore_db - .multi_get(keys) - .into_iter() - .map(|val| val.unwrap().chain_err(|| "missing tx")) - .collect() - } + lookup_raw_txns(&self.store.txstore_db, txids) } - pub fn lookup_raw_txn(&self, txid: &Txid, blockhash: Option<&BlockHash>) -> Option { + pub fn lookup_raw_txn(&self, txid: &Txid) -> Option { let _timer = self.start_timer("lookup_raw_txn"); - - if self.light_mode { - let queried_blockhash = - blockhash.map_or_else(|| self.tx_confirming_block(txid).map(|b| b.hash), |_| None); - let blockhash = blockhash.or_else(|| queried_blockhash.as_ref())?; - // TODO fetch transaction as binary from REST API instead of as hex - let txval = self - .daemon - .gettransaction_raw(txid, blockhash, false) - .ok()?; - let txhex = txval.as_str().expect("valid tx from bitcoind"); - Some(Bytes::from_hex(txhex).expect("valid tx from bitcoind")) - } else { - self.store.txstore_db.get(&TxRow::key(&txid[..])) - } + self.store.txstore_db.get(&TxRow::key(&txid[..])) } pub fn lookup_txo(&self, outpoint: &OutPoint) -> Option { @@ -1107,9 +1247,10 @@ impl ChainQuery { lookup_txo(&self.store.txstore_db, outpoint) } - pub fn lookup_txos(&self, outpoints: BTreeSet) -> Result> { + pub fn lookup_txos(&self, outpoints: Vec) -> Result> { let _timer = self.start_timer("lookup_txos"); - lookup_txos(&self.store.txstore_db, outpoints) + let txos = lookup_txos(&self.store.txstore_db, &outpoints)?; + Ok(outpoints.into_iter().zip_eq(txos).collect()) } pub fn lookup_spend(&self, outpoint: &OutPoint) -> Option { @@ -1226,47 +1367,105 @@ fn load_blockheaders(db: &DB) -> HashMap { .map(BlockRow::from_row) .map(|r| { let key: BlockHash = deserialize(&r.key.hash).expect("failed to parse BlockHash"); - let value: BlockHeader = deserialize(&r.value).expect("failed to parse BlockHeader"); - (key, value) + let (_height, header_bytes): (u32, Bytes) = + bincode::deserialize_little(&r.value).expect("failed to parse stored BlockHeader"); + let header = deserialize(&header_bytes).expect("failed to parse stored BlockHeader"); + (key, header) }) .collect() } -fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec { +fn load_header_entry(db: &DB, hash: &BlockHash) -> HeaderEntry { + let row = db + .get(&BlockRow::header_key(full_hash(&hash[..]))) + .unwrap_or_else(|| panic!("missing block header row for {}", hash)); + BlockRow::header_entry_from_value(hash, &row) +} + +fn load_block_txids(db: &DB, hash: &BlockHash) -> Option> { + db.get(&BlockRow::txids_key(full_hash(&hash[..]))) + .map(|val| bincode::deserialize_little(&val).expect("failed to parse block txids")) +} + +fn lookup_raw_txns(txstore_db: &DB, txids: &[Txid]) -> Result> { + let keys = txids.iter().map(|txid| TxRow::key(&txid[..])); + txstore_db + .multi_get(keys) + .into_iter() + .map(|val| val.unwrap().chain_err(|| "missing tx")) + .collect() +} + +fn lookup_txns(txstore_db: &DB, txids: &[Txid]) -> Result> { + Ok(lookup_raw_txns(txstore_db, txids)? + .into_iter() + .map(|rawtx| deserialize(&rawtx).expect("failed to parse Transaction")) + .collect()) +} + +fn load_block_entry(txstore_db: &DB, header_entry: HeaderEntry) -> Result { + let hash = *header_entry.hash(); + let txids = load_block_txids(txstore_db, &hash).chain_err(|| "missing block txids")?; + let txdata = lookup_txns(txstore_db, &txids)?; + let block = Block { + header: header_entry.header().clone(), + txdata, + }; + Ok(BlockEntry { + entry: header_entry, + size: block.total_size() as u32, + txids, + block, + }) +} + +struct HeaderWork { + entry: HeaderEntry, + need_txstore: bool, + need_history: bool, +} + +#[cfg(not(feature = "liquid"))] +fn fetch_block_entry(daemon: &Daemon, entry: HeaderEntry) -> Result { + let block = daemon.getblock(entry.hash())?; + let txids = block.txdata.iter().map(|tx| tx.compute_txid()).collect(); + Ok(BlockEntry { + size: block.total_size() as u32, + txids, + block, + entry, + }) +} + +fn add_block(block_entry: &BlockEntry, iconfig: &IndexerConfig) -> Vec { + assert_eq!(block_entry.txids.len(), block_entry.block.txdata.len()); + let mut rows = vec![]; + let blockhash = full_hash(&block_entry.entry.hash()[..]); // persist individual transactions: // T{txid} → {rawtx} // O{txid}{index} → {txout} + for (tx, txid) in block_entry + .block + .txdata + .iter() + .zip(block_entry.txids.iter()) + { + add_transaction(*txid, tx, &mut rows, iconfig); + } + // persist block headers', block txids' and metadata rows: - // B{blockhash} → {header} + // B{blockhash} → {height, header} // X{blockhash} → {txid1}...{txidN} // M{blockhash} → {tx_count}{size}{weight} - block_entries - .par_iter() // serialization is CPU-intensive - .map(|b| { - assert_eq!(b.txids.len(), b.block.txdata.len()); - let mut rows = vec![]; - let blockhash = full_hash(&b.entry.hash()[..]); - for (tx, txid) in b.block.txdata.iter().zip(b.txids.iter()) { - add_transaction(*txid, tx, &mut rows, iconfig); - } - - if !iconfig.light_mode { - rows.push(BlockRow::new_txids(blockhash, &b.txids).into_row()); - rows.push(BlockRow::new_meta(blockhash, &BlockMeta::from(b)).into_row()); - } - - rows.push(BlockRow::new_header(&b).into_row()); - rows.push(BlockRow::new_done(blockhash).into_row()); // mark block as "added" - rows - }) - .flatten() - .collect() + rows.push(BlockRow::new_txids(blockhash, &block_entry.txids).into_row()); + rows.push(BlockRow::new_meta(blockhash, &BlockMeta::from(block_entry)).into_row()); + rows.push(BlockRow::new_header(block_entry).into_row()); + rows.push(BlockRow::new_done(blockhash).into_row()); // mark block as "added" + rows } fn add_transaction(txid: Txid, tx: &Transaction, rows: &mut Vec, iconfig: &IndexerConfig) { - if !iconfig.light_mode { - rows.push(TxRow::new(txid, tx).into_row()); - } + rows.push(TxRow::new(txid, tx).into_row()); let txid = full_hash(&txid[..]); for (txo_index, txo) in tx.output.iter().enumerate() { @@ -1282,30 +1481,35 @@ fn add_transaction(txid: Txid, tx: &Transaction, rows: &mut Vec, iconfig: } } -fn get_previous_txos(block_entries: &[BlockEntry]) -> BTreeSet { +fn get_prev_outpoints_by_block(block_entries: &[&BlockEntry]) -> Vec> { block_entries .iter() - .flat_map(|b| b.block.txdata.iter()) - .flat_map(|tx| { - tx.input + .map(|b| { + b.block + .txdata .iter() - .filter(|txin| has_prevout(txin)) - .map(|txin| txin.previous_output) + .flat_map(|tx| { + tx.input + .iter() + .filter(|txin| has_prevout(txin)) + .map(|txin| txin.previous_output) + }) + .collect() }) .collect() } -fn lookup_txos(txstore_db: &DB, outpoints: BTreeSet) -> Result> { - let keys = outpoints.iter().map(TxOutRow::key).collect::>(); +fn lookup_txos<'a, I>(txstore_db: &DB, outpoints: I) -> Result> +where + I: IntoIterator, +{ + let keys = outpoints.into_iter().map(TxOutRow::key).collect::>(); txstore_db .multi_get(keys) .into_iter() - .zip(outpoints) - .map(|(res, outpoint)| { - let txo = res - .unwrap() - .ok_or_else(|| format!("missing txo {}", outpoint))?; - Ok((outpoint, deserialize(&txo).expect("failed to parse TxOut"))) + .map(|res| { + let txo = res.unwrap().chain_err(|| "missing txo")?; + Ok(deserialize(&txo).expect("failed to parse TxOut")) }) .collect() } @@ -1333,26 +1537,38 @@ pub fn lookup_confirmations( .collect() } -fn index_blocks( - block_entries: &[BlockEntry], - previous_txos_map: &HashMap, +fn index_block( + block_entry: &BlockEntry, + previous_txos: &[TxOut], iconfig: &IndexerConfig, ) -> Vec { - block_entries - .par_iter() // serialization is CPU-intensive - .map(|b| { - assert_eq!(b.txids.len(), b.block.txdata.len()); - let mut rows = vec![]; - let height = b.entry.height() as u32; - for (tx, txid) in b.block.txdata.iter().zip(b.txids.iter()) { - let txid_hash = full_hash(&txid[..]); - index_transaction(tx, txid_hash, height, previous_txos_map, &mut rows, iconfig); - } - rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed" - rows - }) - .flatten() - .collect() + assert_eq!(block_entry.txids.len(), block_entry.block.txdata.len()); + let mut rows = vec![]; + let height = block_entry.entry.height() as u32; + let mut previous_txos = previous_txos.iter(); + for (tx, txid) in block_entry + .block + .txdata + .iter() + .zip(block_entry.txids.iter()) + { + let txid_hash = full_hash(&txid[..]); + index_transaction( + tx, + txid_hash, + height, + &mut previous_txos, + &mut rows, + iconfig, + ); + } + assert!( + previous_txos.next().is_none(), + "unexpected trailing previous txos for block {}", + block_entry.entry.hash() + ); + rows.push(BlockRow::new_done(full_hash(&block_entry.entry.hash()[..])).into_row()); // mark block as "indexed" + rows } // TODO: return an iterator? @@ -1360,11 +1576,10 @@ fn index_transaction( tx: &Transaction, txid: FullHash, confirmed_height: u32, - previous_txos_map: &HashMap, + previous_txos: &mut std::slice::Iter<'_, TxOut>, rows: &mut Vec, iconfig: &IndexerConfig, ) { - // persist tx confirmation row: // C{txid} → "{block_height}" rows.push(TxConfRow::new(txid, confirmed_height).into_row()); @@ -1392,8 +1607,8 @@ fn index_transaction( if !has_prevout(txi) { continue; } - let prev_txo = previous_txos_map - .get(&txi.previous_output) + let prev_txo = previous_txos + .next() .unwrap_or_else(|| panic!("missing previous txo {}", txi.previous_output)); let history = TxHistoryRow::new( @@ -1577,12 +1792,14 @@ struct BlockRow { impl BlockRow { fn new_header(block_entry: &BlockEntry) -> BlockRow { + let height = block_entry.entry.height() as u32; + let header_bytes = serialize(&block_entry.block.header); BlockRow { key: BlockKey { code: b'B', hash: full_hash(&block_entry.entry.hash()[..]), }, - value: serialize(&block_entry.block.header), + value: bincode::serialize_little(&(height, header_bytes)).unwrap(), } } @@ -1611,6 +1828,10 @@ impl BlockRow { b"B".to_vec() } + fn header_key(hash: FullHash) -> Bytes { + [b"B", &hash[..]].concat() + } + fn txids_key(hash: FullHash) -> Bytes { [b"X", &hash[..]].concat() } @@ -1636,6 +1857,13 @@ impl BlockRow { value: row.value, } } + + fn header_entry_from_value(hash: &BlockHash, value: &[u8]) -> HeaderEntry { + let (height, header_bytes): (u32, Bytes) = + bincode::deserialize_little(value).expect("failed to parse stored BlockHeader"); + let header = deserialize(&header_bytes).expect("failed to parse stored BlockHeader"); + HeaderEntry::new(height as usize, *hash, header) + } } #[derive(Serialize, Deserialize, Debug)] @@ -1803,8 +2031,7 @@ impl TxEdgeRow { } fn key(outpoint: &OutPoint) -> Bytes { - bincode::serialize_little(&(b'S', full_hash(&outpoint.txid[..]), outpoint.vout)) - .unwrap() + bincode::serialize_little(&(b'S', full_hash(&outpoint.txid[..]), outpoint.vout)).unwrap() } pub fn into_row(self) -> DBRow { @@ -1953,7 +2180,6 @@ pub mod bench { impl Data { pub fn new(block: Block) -> Data { let iconfig = IndexerConfig { - light_mode: false, address_search: false, index_unspendables: false, network: crate::chain::Network::Regtest, @@ -1978,12 +2204,14 @@ pub mod bench { } pub fn add_blocks(data: &Data) -> Vec { - super::add_blocks(&[data.block_entry.clone()], &data.iconfig) + super::add_block(&data.block_entry, &data.iconfig) } } #[cfg(test)] mod tests { + use bitcoin::hex::FromHex; + use super::*; #[test] diff --git a/src/rest.rs b/src/rest.rs index c0ab6954a..7a5948ec7 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -783,7 +783,7 @@ fn handle_request( } (&Method::GET, Some(&"block"), Some(hash), Some(&"txs"), start_index, None) => { let hash = BlockHash::from_str(hash)?; - + // Add lightweight validation that block exists before fetching transactions, // to avoid expensive lookups in case of invalid block hash query.chain().get_block_header(&hash) diff --git a/src/util/block.rs b/src/util/block.rs index ca2da60ea..4914901df 100644 --- a/src/util/block.rs +++ b/src/util/block.rs @@ -45,7 +45,6 @@ pub struct HeaderEntry { } impl HeaderEntry { - #[cfg(feature = "bench")] pub fn new(height: usize, hash: BlockHash, header: BlockHeader) -> Self { Self { height, @@ -186,9 +185,10 @@ impl HeaderList { (header_entries, reorged_since) } - /// Pop off reorged blocks since (including) the given height and return them. + /// Pop off reorged blocks since (including) the given height and return them + /// alongside the new chain tip (the common ancestor) #[trace] - pub fn pop(&mut self, since_height: usize) -> Vec { + pub fn pop(&mut self, since_height: usize) -> (Vec, BlockHash) { let reorged_headers = self.headers.split_off(since_height); for header in &reorged_headers { @@ -200,7 +200,7 @@ impl HeaderList { .map(|h| *h.hash()) .unwrap_or_else(|| *DEFAULT_BLOCKHASH); - reorged_headers + (reorged_headers, self.tip) } /// Append new headers. Expected to always extend the tip (stale blocks must be removed first) diff --git a/tests/common.rs b/tests/common.rs index 849608877..336bceec1 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -1,6 +1,6 @@ +use std::net; use std::str::FromStr; use std::sync::{Arc, Once, RwLock}; -use std::{env, net}; use log::LevelFilter; use stderrlog::StdErrLog; @@ -24,7 +24,7 @@ use electrs::{ daemon::Daemon, electrum::RPC as ElectrumRPC, metrics::Metrics, - new_index::{ChainQuery, FetchFrom, Indexer, Mempool, Query, Store}, + new_index::{ChainQuery, Indexer, Mempool, Query, Store}, rest, signal::Waiter, }; @@ -57,6 +57,8 @@ impl TestRunner { #[cfg(feature = "liquid")] node_conf.args.push("-anyonecanspendaremine=1"); + node_conf.args.push("-rest=1"); + node_conf.view_stdout = std::env::var_os("RUST_LOG").is_some(); } @@ -94,15 +96,12 @@ impl TestRunner { db_path: electrsdb.path().to_path_buf(), daemon_dir: daemon_subdir.clone(), daemon_parallelism: 3, - blocks_dir: daemon_subdir.join("blocks"), daemon_rpc_addr: params.rpc_socket.into(), cookie: None, electrum_rpc_addr: rand_available_addr(), http_addr: rand_available_addr(), http_socket_file: None, // XXX test with socket file or tcp? monitoring_addr: rand_available_addr(), - jsonrpc_import: false, - light_mode: false, address_search: true, index_unspendables: false, cors: None, @@ -122,6 +121,8 @@ impl TestRunner { db_write_buffer_size_mb: 256, initial_sync_batch_size: 250, db_cache_index_filter_blocks: false, + #[cfg(not(feature = "liquid"))] + use_spenttxouts: true, //#[cfg(feature = "electrum-discovery")] //electrum_public_hosts: Option, //#[cfg(feature = "electrum-discovery")] @@ -136,7 +137,6 @@ impl TestRunner { let daemon = Arc::new(Daemon::new( &config.daemon_dir, - &config.blocks_dir, config.daemon_rpc_addr, config.daemon_parallelism, config.cookie_getter(), @@ -145,29 +145,11 @@ impl TestRunner { &metrics, )?); - let store = Arc::new(Store::open(&config, &metrics, true)); - - let fetch_from = if !env::var("JSONRPC_IMPORT").is_ok() && !cfg!(feature = "liquid") { - // run the initial indexing from the blk files then switch to using the jsonrpc, - // similarly to how electrs is typically used. - FetchFrom::BlkFiles - } else { - // when JSONRPC_IMPORT is set, use the jsonrpc for the initial indexing too. - // this runs faster on small regtest chains and can be useful for quicker local development iteration. - // this is also used on liquid regtest, which currently fails to parse the BlkFiles due to the magic bytes - FetchFrom::Bitcoind - }; - - let mut indexer = Indexer::open(Arc::clone(&store), fetch_from, &config, &metrics); + let store = Arc::new(Store::open(&config, &metrics)); + let mut indexer = Indexer::open(Arc::clone(&store), &config, &metrics); let tip = indexer.update(&daemon)?; - indexer.fetch_from(FetchFrom::Bitcoind); - let chain = Arc::new(ChainQuery::new( - Arc::clone(&store), - Arc::clone(&daemon), - &config, - &metrics, - )); + let chain = Arc::new(ChainQuery::new(Arc::clone(&store), &config, &metrics)); let mempool = Arc::new(RwLock::new(Mempool::new( Arc::clone(&chain),