From be51aa641aff461b2e76c760ad0fd1afadb5cd1a Mon Sep 17 00:00:00 2001 From: chahat-101 Date: Fri, 15 May 2026 10:15:35 +0530 Subject: [PATCH 1/3] refactor(storage): extract store auxiliary types --- crates/storage/src/config.rs | 47 ++ crates/storage/src/lib.rs | 6 +- crates/storage/src/store.rs | 1263 ++-------------------------------- crates/storage/src/types.rs | 952 +++++++++++++++++++++++++ crates/storage/src/utils.rs | 60 ++ 5 files changed, 1119 insertions(+), 1209 deletions(-) create mode 100644 crates/storage/src/config.rs create mode 100644 crates/storage/src/types.rs create mode 100644 crates/storage/src/utils.rs diff --git a/crates/storage/src/config.rs b/crates/storage/src/config.rs new file mode 100644 index 00000000..7f51ff91 --- /dev/null +++ b/crates/storage/src/config.rs @@ -0,0 +1,47 @@ +use ethlambda_types::block::BlockBody; +use ethlambda_types::primitives::{H256, HashTreeRoot as _}; +use std::sync::LazyLock; + +/// The tree hash root of an empty block body. +/// +/// Used to detect genesis/anchor blocks that have no attestations, +/// allowing us to skip storing empty bodies and reconstruct them on read. +pub static EMPTY_BODY_ROOT: LazyLock = + LazyLock::new(|| BlockBody::default().hash_tree_root()); + +/// Metadata key for SSZ-encoded time (u64). +pub const KEY_TIME: &[u8] = b"time"; +/// Metadata key for SSZ-encoded ChainConfig. +pub const KEY_CONFIG: &[u8] = b"config"; +/// Metadata key for SSZ-encoded head block root (H256). +pub const KEY_HEAD: &[u8] = b"head"; +/// Metadata key for SSZ-encoded safe target block root (H256). +pub const KEY_SAFE_TARGET: &[u8] = b"safe_target"; +/// Metadata key for SSZ-encoded latest justified checkpoint. +pub const KEY_LATEST_JUSTIFIED: &[u8] = b"latest_justified"; +/// Metadata key for SSZ-encoded latest finalized checkpoint. +pub const KEY_LATEST_FINALIZED: &[u8] = b"latest_finalized"; + +/// ~1 day of block history at 4-second slots (86400 / 4 = 21600). +pub const BLOCKS_TO_KEEP: usize = 21_600; + +/// ~3.3 hours of state history at 4-second slots (12000 / 4 = 3000). +pub const STATES_TO_KEEP: usize = 3_000; + +const _: () = assert!( + BLOCKS_TO_KEEP >= STATES_TO_KEEP, + "BLOCKS_TO_KEEP must be >= STATES_TO_KEEP" +); + +/// Hard cap for the known aggregated payload buffer (number of distinct attestation messages). +/// With 1 attestation/slot, this holds ~500 messages (~33 min at 4s/slot). +pub const AGGREGATED_PAYLOAD_CAP: usize = 512; + +/// Hard cap for the new (pending) aggregated payload buffer. +/// Smaller than known since new payloads are drained every interval (~4s). +pub const NEW_PAYLOAD_CAP: usize = 64; + +/// Hard cap for the gossip signature buffer (individual signatures, not distinct data_roots). +/// With 4 validators and 4-second slots, 2048 signatures covers ~512 slots (~34 min). +/// Each XMSS signature is ~3KB, so worst-case memory is ~6 MB. +pub const GOSSIP_SIGNATURE_CAP: usize = 2048; diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 9662a36c..f78b2cb3 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -1,6 +1,10 @@ mod api; pub mod backend; +mod config; mod store; +mod types; +mod utils; pub use api::{ALL_TABLES, StorageBackend, StorageReadView, StorageWriteBatch, Table}; -pub use store::{ForkCheckpoints, GetForkchoiceStoreError, Store}; +pub use store::{GetForkchoiceStoreError, Store}; +pub use types::{ForkCheckpoints, GossipSignatureSnapshot}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 86a258ff..544d7cc6 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1,16 +1,19 @@ -use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; -use std::sync::{Arc, LazyLock, Mutex}; - -/// The tree hash root of an empty block body. -/// -/// Used to detect genesis/anchor blocks that have no attestations, -/// allowing us to skip storing empty bodies and reconstruct them on read. -static EMPTY_BODY_ROOT: LazyLock = LazyLock::new(|| BlockBody::default().hash_tree_root()); - -use crate::api::{StorageBackend, StorageWriteBatch, Table}; +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, Mutex}; + +use crate::api::{StorageBackend, Table}; +use crate::config::{ + AGGREGATED_PAYLOAD_CAP, BLOCKS_TO_KEEP, EMPTY_BODY_ROOT, GOSSIP_SIGNATURE_CAP, KEY_CONFIG, + KEY_HEAD, KEY_LATEST_FINALIZED, KEY_LATEST_JUSTIFIED, KEY_SAFE_TARGET, KEY_TIME, + NEW_PAYLOAD_CAP, STATES_TO_KEEP, +}; +use crate::types::{ + ForkCheckpoints, GossipSignatureBuffer, GossipSignatureSnapshot, PayloadBuffer, +}; +use crate::utils::{decode_live_chain_key, encode_live_chain_key, write_signed_block}; use ethlambda_types::{ - attestation::{AttestationData, HashedAttestationData, bits_is_subset}, + attestation::{AttestationData, HashedAttestationData}, block::{ AggregatedSignatureProof, Block, BlockBody, BlockHeader, BlockSignatures, SignedBlock, }, @@ -36,431 +39,10 @@ pub enum GetForkchoiceStoreError { }, } -/// Checkpoints to update in the forkchoice store. -/// -/// Used with `Store::update_checkpoints` to update head and optionally -/// update justified/finalized checkpoints (only if higher slot). -pub struct ForkCheckpoints { - head: H256, - justified: Option, - finalized: Option, -} - -impl ForkCheckpoints { - /// Create checkpoints update with only the head. - pub fn head_only(head: H256) -> Self { - Self { - head, - justified: None, - finalized: None, - } - } - - /// Create checkpoints update with optional justified and finalized. - /// - /// The head is passed through unchanged. - pub fn new(head: H256, justified: Option, finalized: Option) -> Self { - Self { - head, - justified, - finalized, - } - } -} - -// ============ Metadata Keys ============ - -/// Key for "time" field of the Store. Its value has type [`u64`] and it's SSZ-encoded. -const KEY_TIME: &[u8] = b"time"; -/// Key for "config" field of the Store. Its value has type [`ChainConfig`] and it's SSZ-encoded. -const KEY_CONFIG: &[u8] = b"config"; -/// Key for "head" field of the Store. Its value has type [`H256`] and it's SSZ-encoded. -const KEY_HEAD: &[u8] = b"head"; -/// Key for "safe_target" field of the Store. Its value has type [`H256`] and it's SSZ-encoded. -const KEY_SAFE_TARGET: &[u8] = b"safe_target"; -/// Key for "latest_justified" field of the Store. Its value has type [`Checkpoint`] and it's SSZ-encoded. -const KEY_LATEST_JUSTIFIED: &[u8] = b"latest_justified"; -/// Key for "latest_finalized" field of the Store. Its value has type [`Checkpoint`] and it's SSZ-encoded. -const KEY_LATEST_FINALIZED: &[u8] = b"latest_finalized"; - -/// ~1 day of block history at 4-second slots (86400 / 4 = 21600). -const BLOCKS_TO_KEEP: usize = 21_600; - -/// ~3.3 hours of state history at 4-second slots (12000 / 4 = 3000). -const STATES_TO_KEEP: usize = 3_000; - -const _: () = assert!( - BLOCKS_TO_KEEP >= STATES_TO_KEEP, - "BLOCKS_TO_KEEP must be >= STATES_TO_KEEP" -); - -/// Hard cap for the known aggregated payload buffer (number of distinct attestation messages). -/// With 1 attestation/slot, this holds ~500 messages (~33 min at 4s/slot). -const AGGREGATED_PAYLOAD_CAP: usize = 512; - -/// Hard cap for the new (pending) aggregated payload buffer. -/// Smaller than known since new payloads are drained every interval (~4s). -const NEW_PAYLOAD_CAP: usize = 64; - -/// Hard cap for the gossip signature buffer (individual signatures, not distinct data_roots). -/// With 4 validators and 4-second slots, 2048 signatures covers ~512 slots (~34 min). -/// Each XMSS signature is ~3KB, so worst-case memory is ~6 MB. -const GOSSIP_SIGNATURE_CAP: usize = 2048; - -/// An entry in the payload buffer: attestation data + set of proofs. -#[derive(Clone)] -struct PayloadEntry { - data: AttestationData, - proofs: Vec, -} - -/// Fixed-size circular buffer for aggregated payloads. -/// -/// Groups proofs by attestation data (via data_root). Each distinct -/// attestation message stores the full `AttestationData` plus all -/// `AggregatedSignatureProof`s covering that message. -/// -/// Entries are evicted FIFO (by insertion order of the data_root) -/// when the buffer reaches capacity. -#[derive(Clone)] -struct PayloadBuffer { - data: HashMap, - order: VecDeque, - capacity: usize, - total_proofs: usize, -} - -impl PayloadBuffer { - fn new(capacity: usize) -> Self { - Self { - data: HashMap::with_capacity(capacity), - order: VecDeque::with_capacity(capacity), - capacity, - total_proofs: 0, - } - } - - /// Insert a proof for an attestation, FIFO-evicting oldest data_roots - /// when total proofs reach capacity. Also ensures the buffer doesn't - /// include proofs which are a subset of other proofs for the same - /// attestation data: - /// - /// - If the incoming proof's participants are a subset (incl. equal) of - /// any existing proof, the incoming proof is redundant and skipped. - /// - Otherwise, any existing proof whose participants are a strict subset - /// of the incoming proof's is removed before inserting. - fn push(&mut self, hashed: HashedAttestationData, proof: AggregatedSignatureProof) { - let (data_root, att_data) = hashed.into_parts(); - - if let Some(entry) = self.data.get_mut(&data_root) { - let mut to_remove: Vec = Vec::new(); - for (i, p) in entry.proofs.iter().enumerate() { - // Incoming is subsumed by an existing proof (incl. equal). Skip. - if bits_is_subset(&proof.participants, &p.participants) { - return; - } - // Existing is a strict subset of incoming. Mark for removal. - // (Non-strict equality was ruled out by the check above.) - if bits_is_subset(&p.participants, &proof.participants) { - to_remove.push(i); - } - } - - // Remove subsumed proofs (reverse order so earlier indices stay valid). - for i in to_remove.into_iter().rev() { - entry.proofs.swap_remove(i); - self.total_proofs -= 1; - } - - entry.proofs.push(proof); - self.total_proofs += 1; - } else { - self.data.insert( - data_root, - PayloadEntry { - data: att_data, - proofs: vec![proof], - }, - ); - self.order.push_back(data_root); - self.total_proofs += 1; - } - // Evict oldest data_roots until under capacity - while self.total_proofs > self.capacity { - if let Some(evicted) = self.order.pop_front() { - if let Some(removed) = self.data.remove(&evicted) { - self.total_proofs -= removed.proofs.len(); - } - } else { - break; - } - } - } - - /// Insert a batch of (hashed_attestation_data, proof) entries. - fn push_batch(&mut self, entries: Vec<(HashedAttestationData, AggregatedSignatureProof)>) { - for (hashed, proof) in entries { - self.push(hashed, proof); - } - } - - /// Take all entries, leaving the buffer empty. - /// - /// Drains in insertion order (via `self.order`) so downstream consumers - /// like `promote_new_aggregated_payloads` re-insert into known_payloads - /// deterministically. HashMap iteration would be RandomState-seeded and - /// produce non-deterministic vote ordering for same-slot equivocation. - fn drain(&mut self) -> Vec<(HashedAttestationData, AggregatedSignatureProof)> { - self.total_proofs = 0; - let mut result = Vec::with_capacity(self.data.values().map(|e| e.proofs.len()).sum()); - while let Some(data_root) = self.order.pop_front() { - if let Some(entry) = self.data.remove(&data_root) { - for proof in entry.proofs { - result.push((HashedAttestationData::new(entry.data.clone()), proof)); - } - } - } - result - } - - /// Return the number of distinct attestation messages in the buffer. - fn len(&self) -> usize { - self.data.len() - } - - /// Return the number of proofs for a given data_root without cloning. - fn proof_count_for_root(&self, data_root: &H256) -> usize { - self.data.get(data_root).map_or(0, |e| e.proofs.len()) - } - - /// Return cloned proofs for a given data_root, or empty vec if none. - fn proofs_for_root(&self, data_root: &H256) -> Vec { - self.data - .get(data_root) - .map_or_else(Vec::new, |e| e.proofs.clone()) - } - - /// Return attestation data entries keyed by data_root. - fn attestation_data_keys(&self) -> Vec<(H256, AttestationData)> { - self.data - .iter() - .map(|(&root, entry)| (root, entry.data.clone())) - .collect() - } - - /// Extract per-validator latest attestations from proofs' participation bits. - /// - /// Iterates entries in insertion order (via `self.order`) so that, when two - /// aggregations carry the same `slot` but disagree on the target (an - /// equivocation by the shared validators), the first-observed aggregation - /// wins. The ethrex spec relies on Python dict insertion-order semantics - /// here; iterating `self.data.values()` would be RandomState-seeded and - /// fail the equivocation fork-choice tests non-deterministically. - fn extract_latest_attestations(&self) -> HashMap { - let mut result: HashMap = HashMap::new(); - for data_root in &self.order { - let Some(entry) = self.data.get(data_root) else { - continue; - }; - for proof in &entry.proofs { - for vid in proof.participant_indices() { - let should_update = result - .get(&vid) - .is_none_or(|existing| existing.slot < entry.data.slot); - if should_update { - result.insert(vid, entry.data.clone()); - } - } - } - } - result - } -} - -/// Gossip signatures grouped by attestation data. -/// -/// Signatures are stored in a `BTreeMap` keyed by validator_id to guarantee -/// ascending iteration order. XMSS aggregate proofs are order-dependent: -/// verification reconstructs pubkeys from the participation bitfield (low-to-high), -/// so aggregation must produce them in the same ascending order. -struct GossipDataEntry { - data: AttestationData, - signatures: BTreeMap, -} - -/// Gossip signatures snapshot: (hashed_attestation_data, Vec<(validator_id, signature)>). -pub type GossipSignatureSnapshot = Vec<(HashedAttestationData, Vec<(u64, ValidatorSignature)>)>; - -/// Bounded buffer for gossip signatures with FIFO eviction. -/// -/// Groups signatures by attestation data (via data_root). Each distinct -/// attestation message stores the full `AttestationData` plus individual -/// validator signatures in ascending order (required for XMSS aggregation). -/// -/// Entries are evicted FIFO (by insertion order of the data_root) when -/// total_signatures exceeds capacity, matching the `PayloadBuffer` pattern. -struct GossipSignatureBuffer { - data: HashMap, - order: VecDeque, - capacity: usize, - total_signatures: usize, -} - -impl GossipSignatureBuffer { - fn new(capacity: usize) -> Self { - Self { - data: HashMap::new(), - order: VecDeque::new(), - capacity, - total_signatures: 0, - } - } - - /// Insert a gossip signature, FIFO-evicting oldest data_roots when over capacity. - /// - /// Last-write-wins: if (validator_id, data_root) already exists, the signature is overwritten. - fn insert( - &mut self, - hashed: HashedAttestationData, - validator_id: u64, - signature: ValidatorSignature, - ) { - let (data_root, att_data) = hashed.into_parts(); - - if let Some(entry) = self.data.get_mut(&data_root) { - let is_new = entry.signatures.insert(validator_id, signature).is_none(); - if is_new { - self.total_signatures += 1; - } - } else { - let mut signatures = BTreeMap::new(); - signatures.insert(validator_id, signature); - self.data.insert( - data_root, - GossipDataEntry { - data: att_data, - signatures, - }, - ); - self.order.push_back(data_root); - self.total_signatures += 1; - } - - // Evict oldest data_roots until under capacity - while self.total_signatures > self.capacity { - if let Some(evicted) = self.order.pop_front() { - if let Some(removed) = self.data.remove(&evicted) { - self.total_signatures -= removed.signatures.len(); - } - } else { - break; - } - } - } - - /// Delete gossip entries for the given (validator_id, data_root) pairs. - /// - /// When all signatures for a data_root are removed, the entry is cleaned up. - /// Collects emptied roots and batch-cleans the VecDeque in one pass. - fn delete(&mut self, keys: &[(u64, H256)]) { - if keys.is_empty() { - return; - } - let mut emptied_roots: HashSet = HashSet::new(); - for &(vid, data_root) in keys { - if let Some(entry) = self.data.get_mut(&data_root) { - if entry.signatures.remove(&vid).is_some() { - self.total_signatures -= 1; - } - if entry.signatures.is_empty() { - self.data.remove(&data_root); - emptied_roots.insert(data_root); - } - } - } - if !emptied_roots.is_empty() { - self.order.retain(|r| !emptied_roots.contains(r)); - } - } - - /// Prune gossip signatures for slots <= finalized_slot. - /// - /// Returns the number of data_root entries pruned. - fn prune(&mut self, finalized_slot: u64) -> usize { - let mut pruned_roots: HashSet = HashSet::new(); - self.data.retain(|root, entry| { - if entry.data.slot > finalized_slot { - true - } else { - self.total_signatures -= entry.signatures.len(); - pruned_roots.insert(*root); - false - } - }); - if !pruned_roots.is_empty() { - self.order.retain(|r| !pruned_roots.contains(r)); - } - pruned_roots.len() - } - - /// Returns a snapshot of all gossip signatures grouped by attestation data. - fn snapshot(&self) -> GossipSignatureSnapshot { - self.data - .values() - .map(|entry| { - let sigs: Vec<_> = entry - .signatures - .iter() - .map(|(&vid, sig)| (vid, sig.clone())) - .collect(); - (HashedAttestationData::new(entry.data.clone()), sigs) - }) - .collect() - } - - /// Returns the total number of individual signatures stored. - fn total_signatures(&self) -> usize { - self.total_signatures - } - - /// Returns the number of distinct data_roots. - #[cfg(test)] - fn len(&self) -> usize { - self.data.len() - } -} - -/// Encode a LiveChain key (slot, root) to bytes. -/// Layout: slot (8 bytes big-endian) || root (32 bytes) -/// Big-endian ensures lexicographic ordering matches numeric ordering. -fn encode_live_chain_key(slot: u64, root: &H256) -> Vec { - let mut result = slot.to_be_bytes().to_vec(); - result.extend_from_slice(&root.0); - result -} - -/// Decode a LiveChain key from bytes. -fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { - let slot = u64::from_be_bytes(bytes[..8].try_into().expect("valid slot bytes")); - let root = H256::from_slice(&bytes[8..]); - (slot, root) -} - /// Fork choice store backed by a pluggable storage backend. /// -/// The Store maintains all state required for fork choice and block processing: -/// -/// - **Metadata**: time, config, head, safe_target, justified/finalized checkpoints -/// - **Blocks**: headers and bodies stored separately for efficient header-only queries -/// - **States**: beacon states indexed by block root -/// - **Attestations**: latest known and pending ("new") attestations per validator -/// - **Signatures**: gossip signatures and aggregated proofs for signature verification -/// - **LiveChain**: slot index for efficient fork choice traversal (pruned on finalization) -/// -/// # Constructors -/// -/// - [`from_anchor_state`](Self::from_anchor_state): Initialize from a checkpoint state (no block body) -/// - [`get_forkchoice_store`](Self::get_forkchoice_store): Initialize from state + block (stores body) +/// Maintains metadata, blocks, states, and attestation/signature buffers +/// required for fork choice and block processing. #[derive(Clone)] pub struct Store { backend: Arc, @@ -471,18 +53,15 @@ pub struct Store { } impl Store { - /// Initialize a Store from an anchor state only. - /// - /// Uses the state's `latest_block_header` as the anchor block header. - /// No block body is stored since it's not available. + /// Initialize from a checkpoint state. Anchor header is taken from + /// `state.latest_block_header`. No body is stored. pub fn from_anchor_state(backend: Arc, anchor_state: State) -> Self { Self::init_store(backend, anchor_state, None) } - /// Initialize a Store from an anchor state and block. + /// Initialize from an anchor state and block. /// /// The block must match the state's `latest_block_header`. - /// Named to mirror the spec's `get_forkchoice_store` function. /// /// # Errors /// @@ -624,68 +203,44 @@ impl Store { batch.commit().expect("commit"); } - // ============ Time ============ - - /// Returns the current store time in interval counts since genesis. + /// Current store time in intervals since genesis. /// - /// Each increment represents one 800ms interval. Derive slot/interval as: - /// slot = time() / INTERVALS_PER_SLOT - /// interval = time() % INTERVALS_PER_SLOT + /// slot = time() / INTERVALS_PER_SLOT + /// interval = time() % INTERVALS_PER_SLOT pub fn time(&self) -> u64 { self.get_metadata(KEY_TIME) } - /// Sets the current store time. pub fn set_time(&mut self, time: u64) { self.set_metadata(KEY_TIME, &time); } - // ============ Config ============ - - /// Returns the chain configuration. pub fn config(&self) -> ChainConfig { self.get_metadata(KEY_CONFIG) } - // ============ Head ============ - - /// Returns the current head block root. pub fn head(&self) -> H256 { self.get_metadata(KEY_HEAD) } - // ============ Safe Target ============ - - /// Returns the safe target block root for attestations. pub fn safe_target(&self) -> H256 { self.get_metadata(KEY_SAFE_TARGET) } - /// Sets the safe target block root. pub fn set_safe_target(&mut self, safe_target: H256) { self.set_metadata(KEY_SAFE_TARGET, &safe_target); } - // ============ Checkpoints ============ - - /// Returns the latest justified checkpoint. pub fn latest_justified(&self) -> Checkpoint { self.get_metadata(KEY_LATEST_JUSTIFIED) } - /// Returns the latest finalized checkpoint. pub fn latest_finalized(&self) -> Checkpoint { self.get_metadata(KEY_LATEST_FINALIZED) } - // ============ Checkpoint Updates ============ - /// Updates head, justified, and finalized checkpoints. /// - /// - Head is always updated to the new value. - /// - Justified is updated if provided. - /// - Finalized is updated if provided. - /// /// When finalization advances, prunes the LiveChain index. pub fn update_checkpoints(&mut self, checkpoints: ForkCheckpoints) { // Read old finalized slot before updating metadata @@ -705,9 +260,7 @@ impl Store { batch.put_batch(Table::Metadata, entries).expect("put"); batch.commit().expect("commit"); - // Lightweight pruning that should happen immediately on finalization advance: - // live chain index, signatures, and attestation data. These are cheap and - // affect fork choice correctness (live chain) or attestation processing. + // Immediately prune cheap finalized data (index, signatures, attestations). // Heavy state/block pruning is deferred to prune_old_data(). if let Some(finalized) = checkpoints.finalized && finalized.slot > old_finalized_slot @@ -725,11 +278,6 @@ impl Store { } /// Prune old states and blocks to keep storage bounded. - /// - /// This is separated from `update_checkpoints` so callers can defer heavy - /// pruning until after a batch of blocks has been fully processed. Running - /// this mid-cascade would delete states that pending children still need, - /// causing infinite re-processing loops when fallback pruning is active. pub fn prune_old_data(&mut self) { let protected_roots = [self.latest_finalized().root, self.latest_justified().root]; let pruned_states = self.prune_old_states(&protected_roots); @@ -739,12 +287,7 @@ impl Store { } } - // ============ Blocks ============ - - /// Get block data for fork choice: root -> (slot, parent_root). - /// - /// Iterates only the LiveChain table, avoiding Block deserialization. - /// Returns only non-finalized blocks, automatically pruned on finalization. + /// Root -> (slot, parent_root) for non-finalized blocks. pub fn get_live_chain(&self) -> HashMap { let view = self.backend.begin_read().expect("read view"); view.prefix_iterator(Table::LiveChain, &[]) @@ -758,9 +301,6 @@ impl Store { .collect() } - /// Get all known block roots as HashSet. - /// - /// Useful for checking block existence without deserializing. pub fn get_block_roots(&self) -> HashSet { let view = self.backend.begin_read().expect("read view"); view.prefix_iterator(Table::LiveChain, &[]) @@ -773,12 +313,7 @@ impl Store { .collect() } - /// Prune slot index entries with slot < finalized_slot. - /// - /// Blocks/states are retained for historical queries, only the - /// LiveChain index is pruned. - /// - /// Returns the number of entries pruned. + /// Prune LiveChain index entries with slot < finalized_slot. pub fn prune_live_chain(&mut self, finalized_slot: u64) -> usize { let view = self.backend.begin_read().expect("read view"); @@ -809,20 +344,12 @@ impl Store { count } - /// Prune gossip signatures for slots <= finalized_slot. - /// - /// Returns the number of entries pruned. pub fn prune_gossip_signatures(&mut self, finalized_slot: u64) -> usize { let mut gossip = self.gossip_signatures.lock().unwrap(); gossip.prune(finalized_slot) } - /// Prune old states beyond the retention window. - /// - /// Keeps the most recent `STATES_TO_KEEP` states (by slot), plus any - /// states whose roots appear in `protected_roots` (finalized, justified). - /// - /// Returns the number of states pruned. + /// Keeps the most recent `STATES_TO_KEEP` states (by slot) plus protected roots. pub fn prune_old_states(&mut self, protected_roots: &[H256]) -> usize { let view = self.backend.begin_read().expect("read view"); @@ -842,12 +369,10 @@ impl Store { return 0; } - // Sort by slot descending (newest first) entries.sort_unstable_by(|a, b| b.1.cmp(&a.1)); let protected: HashSet> = protected_roots.iter().map(|r| r.to_ssz()).collect(); - // Skip the retention window, collect remaining keys for deletion let keys_to_delete: Vec> = entries .into_iter() .skip(STATES_TO_KEEP) @@ -866,13 +391,7 @@ impl Store { count } - /// Prune old blocks beyond the retention window. - /// - /// Keeps the most recent `BLOCKS_TO_KEEP` blocks (by slot), plus any - /// blocks whose roots appear in `protected_roots` (finalized, justified). - /// Deletes from `BlockHeaders`, `BlockBodies`, and `BlockSignatures`. - /// - /// Returns the number of blocks pruned. + /// Keeps the most recent `BLOCKS_TO_KEEP` blocks (by slot) plus protected roots. pub fn prune_old_blocks(&mut self, protected_roots: &[H256]) -> usize { let view = self.backend.begin_read().expect("read view"); @@ -891,7 +410,6 @@ impl Store { return 0; } - // Sort by slot descending (newest first) entries.sort_unstable_by(|a, b| b.1.cmp(&a.1)); let protected: HashSet> = protected_roots.iter().map(|r| r.to_ssz()).collect(); @@ -920,7 +438,6 @@ impl Store { count } - /// Get the block header by root. pub fn get_block_header(&self, root: &H256) -> Option { let view = self.backend.begin_read().expect("read view"); view.get(Table::BlockHeaders, &root.to_ssz()) @@ -928,30 +445,16 @@ impl Store { .map(|bytes| BlockHeader::from_ssz_bytes(&bytes).expect("valid header")) } - // ============ Signed Blocks ============ - /// Insert a block as pending (parent state not yet available). /// - /// Stores block data in `BlockHeaders`/`BlockBodies`/`BlockSignatures` - /// **without** writing to `LiveChain`. This persists the heavy signature - /// data (~3KB+ per block) to disk while keeping the block invisible to - /// fork choice. - /// - /// When the block is later processed via [`insert_signed_block`](Self::insert_signed_block), - /// the same keys are overwritten (idempotent) and a `LiveChain` entry is added. + /// Persists heavy signature data to disk while keeping the block invisible to fork choice. pub fn insert_pending_block(&mut self, root: H256, signed_block: SignedBlock) { let mut batch = self.backend.begin_write().expect("write batch"); write_signed_block(batch.as_mut(), &root, signed_block); batch.commit().expect("commit"); } - /// Insert a signed block, storing the block and signatures separately. - /// - /// Blocks and signatures are stored in separate tables because the genesis - /// block has no signatures. This allows uniform storage of all blocks while - /// only storing signatures for non-genesis blocks. - /// - /// Takes ownership to avoid cloning large signature data. + /// Insert a signed block and its signatures. pub fn insert_signed_block(&mut self, root: H256, signed_block: SignedBlock) { let mut batch = self.backend.begin_write().expect("write batch"); let block = write_signed_block(batch.as_mut(), &root, signed_block); @@ -990,7 +493,6 @@ impl Store { /// Get a signed block by combining header, body, and signatures. /// - /// Returns None if any of the components are not found. /// Note: Genesis block has no entry in BlockSignatures table. pub fn get_signed_block(&self, root: &H256) -> Option { let view = self.backend.begin_read().expect("read view"); @@ -1018,9 +520,6 @@ impl Store { }) } - // ============ States ============ - - /// Returns the state for the given block root. pub fn get_state(&self, root: &H256) -> Option { let view = self.backend.begin_read().expect("read view"); view.get(Table::States, &root.to_ssz()) @@ -1028,7 +527,6 @@ impl Store { .map(|bytes| State::from_ssz_bytes(&bytes).expect("valid state")) } - /// Returns whether a state exists for the given block root. pub fn has_state(&self, root: &H256) -> bool { let view = self.backend.begin_read().expect("read view"); view.get(Table::States, &root.to_ssz()) @@ -1036,7 +534,6 @@ impl Store { .is_some() } - /// Stores a state indexed by block root. pub fn insert_state(&mut self, root: H256, state: State) { let mut batch = self.backend.begin_write().expect("write batch"); let entries = vec![(root.to_ssz(), state.to_ssz())]; @@ -1044,9 +541,6 @@ impl Store { batch.commit().expect("commit"); } - // ============ Attestation Extraction ============ - - /// Extract per-validator latest attestations from known (fork-choice-active) payloads. pub fn extract_latest_known_attestations(&self) -> HashMap { self.known_payloads .lock() @@ -1054,7 +548,6 @@ impl Store { .extract_latest_attestations() } - /// Extract per-validator latest attestations from new (pending) payloads. pub fn extract_latest_new_attestations(&self) -> HashMap { self.new_payloads .lock() @@ -1062,12 +555,29 @@ impl Store { .extract_latest_attestations() } - // ============ Known Aggregated Payloads ============ - // - // "Known" aggregated payloads are active in fork choice weight calculations. - // Promoted from "new" payloads at specific intervals (0 with proposal, 4). + pub fn extract_latest_all_attestations(&self) -> HashMap { + let mut result = self + .known_payloads + .lock() + .unwrap() + .extract_latest_attestations(); + for (vid, data) in self + .new_payloads + .lock() + .unwrap() + .extract_latest_attestations() + { + let should_update = result + .get(&vid) + .is_none_or(|existing| existing.slot < data.slot); + if should_update { + result.insert(vid, data); + } + } + result + } - /// Returns a snapshot of known payloads as (AttestationData, Vec) pairs. + /// Snapshot of known payloads active in fork choice. pub fn known_aggregated_payloads( &self, ) -> HashMap)> { @@ -1079,9 +589,6 @@ impl Store { } /// Combined proof count for a data_root across new and known buffers. - /// - /// Cheap check (no cloning) to short-circuit before calling the more - /// expensive `existing_proofs_for_data` which clones all proof bytes. pub fn proof_count_for_data(&self, data_root: &H256) -> usize { let new = self .new_payloads @@ -1096,12 +603,9 @@ impl Store { new + known } - /// Look up existing proofs for a given data_root from both new and known buffers. + /// Existing proofs for a data_root from both buffers. /// - /// Returns `(new_proofs, known_proofs)` in priority order: new payloads first - /// (uncommitted work from the current round), then known payloads (already active - /// in fork choice). This ordering is used by greedy proof selection to prefer - /// reusing recent work. + /// Returns `(new_proofs, known_proofs)` in priority order (new first). pub fn existing_proofs_for_data( &self, data_root: &H256, @@ -1115,15 +619,10 @@ impl Store { (new, known) } - /// Return attestation data entries from the new (pending) payload buffer. - /// - /// Used to iterate over data that has pending proofs but may lack gossip - /// signatures, matching the spec's `new.keys() | gossip_sigs.keys()` union. pub fn new_payload_keys(&self) -> Vec<(H256, AttestationData)> { self.new_payloads.lock().unwrap().attestation_data_keys() } - /// Insert a single proof into the known (fork-choice-active) buffer. pub fn insert_known_aggregated_payload( &mut self, hashed: HashedAttestationData, @@ -1132,7 +631,6 @@ impl Store { self.known_payloads.lock().unwrap().push(hashed, proof); } - /// Batch-insert proofs into the known buffer. pub fn insert_known_aggregated_payloads_batch( &mut self, entries: Vec<(HashedAttestationData, AggregatedSignatureProof)>, @@ -1140,12 +638,6 @@ impl Store { self.known_payloads.lock().unwrap().push_batch(entries); } - // ============ New Aggregated Payloads ============ - // - // "New" aggregated payloads are pending — not yet counted in fork choice. - // Promoted to "known" via `promote_new_aggregated_payloads`. - - /// Insert a single proof into the new (pending) buffer. pub fn insert_new_aggregated_payload( &mut self, hashed: HashedAttestationData, @@ -1154,7 +646,6 @@ impl Store { self.new_payloads.lock().unwrap().push(hashed, proof); } - /// Batch-insert proofs into the new buffer. pub fn insert_new_aggregated_payloads_batch( &mut self, entries: Vec<(HashedAttestationData, AggregatedSignatureProof)>, @@ -1162,57 +653,39 @@ impl Store { self.new_payloads.lock().unwrap().push_batch(entries); } - // ============ Pruning Helpers ============ - - /// Promotes all new aggregated payloads to known, making them active in fork choice. - /// - /// Drains the new buffer and pushes all entries into the known buffer. + /// Promotes all new aggregated payloads to known (active in fork choice). pub fn promote_new_aggregated_payloads(&mut self) { let drained = self.new_payloads.lock().unwrap().drain(); self.known_payloads.lock().unwrap().push_batch(drained); } - /// Returns the number of entries in the new (pending) aggregated payloads buffer. pub fn new_aggregated_payloads_count(&self) -> usize { self.new_payloads.lock().unwrap().len() } - /// Returns the number of entries in the known (fork-choice-active) aggregated payloads buffer. pub fn known_aggregated_payloads_count(&self) -> usize { self.known_payloads.lock().unwrap().len() } - /// Returns the number of gossip signature entries stored. pub fn gossip_signatures_count(&self) -> usize { let gossip = self.gossip_signatures.lock().unwrap(); gossip.total_signatures() } - /// Estimated live data size in bytes for a table, as reported by the backend. pub fn estimate_table_bytes(&self, table: Table) -> u64 { self.backend.estimate_table_bytes(table) } - // ============ Gossip Signatures ============ - // - // Gossip signatures are individual validator signatures received via P2P. - // They're transient (consumed at interval 2 aggregation) so stored in-memory. - // Keyed by AttestationData (via data_root) matching the leanSpec structure: - // gossip_signatures: dict[AttestationData, set[GossipSignature]] - - /// Delete gossip entries for the given (validator_id, data_root) pairs. pub fn delete_gossip_signatures(&mut self, keys: &[(u64, H256)]) { let mut gossip = self.gossip_signatures.lock().unwrap(); gossip.delete(keys); } - /// Returns a snapshot of gossip signatures grouped by attestation data. pub fn iter_gossip_signatures(&self) -> GossipSignatureSnapshot { let gossip = self.gossip_signatures.lock().unwrap(); gossip.snapshot() } - /// Stores a gossip signature for later aggregation. pub fn insert_gossip_signature( &mut self, hashed: HashedAttestationData, @@ -1223,67 +696,24 @@ impl Store { gossip.insert(hashed, validator_id, signature); } - // ============ Derived Accessors ============ - - /// Returns the slot of the current head block. pub fn head_slot(&self) -> u64 { self.get_block_header(&self.head()) .expect("head block exists") .slot } - /// Returns the slot of the current safe target block. pub fn safe_target_slot(&self) -> u64 { self.get_block_header(&self.safe_target()) .expect("safe target exists") .slot } - /// Returns a clone of the head state. pub fn head_state(&self) -> State { self.get_state(&self.head()) .expect("head state is always available") } } -/// Write block header, body, and signatures onto an existing batch. -/// -/// Returns the deserialized [`Block`] so callers can access fields like -/// `slot` and `parent_root` without re-deserializing. -fn write_signed_block( - batch: &mut dyn StorageWriteBatch, - root: &H256, - signed_block: SignedBlock, -) -> Block { - let SignedBlock { - message: block, - signature, - } = signed_block; - - let header = block.header(); - let root_bytes = root.to_ssz(); - - let header_entries = vec![(root_bytes.clone(), header.to_ssz())]; - batch - .put_batch(Table::BlockHeaders, header_entries) - .expect("put block header"); - - // Skip storing empty bodies - they can be reconstructed from the header's body_root - if header.body_root != *EMPTY_BODY_ROOT { - let body_entries = vec![(root_bytes.clone(), block.body.to_ssz())]; - batch - .put_batch(Table::BlockBodies, body_entries) - .expect("put block body"); - } - - let sig_entries = vec![(root_bytes, signature.to_ssz())]; - batch - .put_batch(Table::BlockSignatures, sig_entries) - .expect("put block signatures"); - - block -} - #[cfg(test)] mod tests { use super::*; @@ -1660,14 +1090,13 @@ mod tests { ); } - // ============ PayloadBuffer Tests ============ + // ============ Store Buffer Tests ============ fn make_proof() -> AggregatedSignatureProof { use ethlambda_types::attestation::AggregationBits; AggregatedSignatureProof::empty(AggregationBits::new()) } - /// Create a proof with a specific validator bit set (distinct participants). fn make_proof_for_validator(vid: usize) -> AggregatedSignatureProof { use ethlambda_types::attestation::AggregationBits; let mut bits = AggregationBits::with_length(vid + 1).unwrap(); @@ -1675,17 +1104,6 @@ mod tests { AggregatedSignatureProof::empty(bits) } - /// Create a proof with bits set for every validator in `vids`. - fn make_proof_for_validators(vids: &[u64]) -> AggregatedSignatureProof { - use ethlambda_types::attestation::AggregationBits; - let max = vids.iter().copied().max().unwrap_or(0) as usize; - let mut bits = AggregationBits::with_length(max + 1).unwrap(); - for &v in vids { - bits.set(v as usize, true).unwrap(); - } - AggregatedSignatureProof::empty(bits) - } - fn make_att_data(slot: u64) -> AttestationData { AttestationData { slot, @@ -1695,72 +1113,6 @@ mod tests { } } - #[test] - fn payload_buffer_fifo_eviction() { - let mut buf = PayloadBuffer::new(3); - - // Insert 3 distinct attestation data entries (different slots → different roots) - for slot in 1..=3u64 { - let data = make_att_data(slot); - buf.push(HashedAttestationData::new(data), make_proof()); - } - assert_eq!(buf.len(), 3); - - // Pushing a 4th should evict the oldest (slot 1) - let data = make_att_data(4); - buf.push(HashedAttestationData::new(data), make_proof()); - assert_eq!(buf.len(), 3); - - // The oldest (slot 1) should be gone - let att_data_1 = make_att_data(1); - assert!(!buf.data.contains_key(&att_data_1.hash_tree_root())); - } - - #[test] - fn payload_buffer_multiple_proofs_per_data() { - let mut buf = PayloadBuffer::new(10); - let data = make_att_data(1); - let data_root = data.hash_tree_root(); - - // Insert 3 proofs with distinct participants for the same attestation data - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validator(0), - ); - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validator(1), - ); - buf.push( - HashedAttestationData::new(data), - make_proof_for_validator(2), - ); - - // Should be 1 distinct data entry with 3 proofs - assert_eq!(buf.len(), 1); - assert_eq!(buf.data[&data_root].proofs.len(), 3); - } - - #[test] - fn payload_buffer_drain_empties_buffer() { - let mut buf = PayloadBuffer::new(10); - let data = make_att_data(1); - - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validator(0), - ); - buf.push( - HashedAttestationData::new(data), - make_proof_for_validator(1), - ); - - let drained = buf.drain(); - assert_eq!(drained.len(), 2); // 2 proofs flattened - assert!(buf.data.is_empty()); - assert!(buf.order.is_empty()); - } - #[test] fn promote_moves_new_to_known() { let mut store = Store::test_store(); @@ -1808,509 +1160,4 @@ mod tests { assert_eq!(cloned.new_payloads.lock().unwrap().len(), 0); assert_eq!(cloned.known_payloads.lock().unwrap().len(), 1); } - - #[test] - fn payload_buffer_push_superset_removes_strict_subset() { - let mut buf = PayloadBuffer::new(10); - let data = make_att_data(1); - let data_root = data.hash_tree_root(); - - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validators(&[1, 2]), - ); - buf.push( - HashedAttestationData::new(data), - make_proof_for_validators(&[1, 2, 3]), - ); - - assert_eq!(buf.total_proofs, 1); - assert_eq!(buf.data[&data_root].proofs.len(), 1); - let kept: HashSet = buf.data[&data_root].proofs[0] - .participant_indices() - .collect(); - assert_eq!(kept, HashSet::from([1, 2, 3])); - } - - #[test] - fn payload_buffer_push_subset_is_skipped() { - let mut buf = PayloadBuffer::new(10); - let data = make_att_data(1); - let data_root = data.hash_tree_root(); - - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validators(&[1, 2, 3]), - ); - buf.push( - HashedAttestationData::new(data), - make_proof_for_validators(&[1, 2]), - ); - - assert_eq!(buf.total_proofs, 1); - assert_eq!(buf.data[&data_root].proofs.len(), 1); - let kept: HashSet = buf.data[&data_root].proofs[0] - .participant_indices() - .collect(); - assert_eq!(kept, HashSet::from([1, 2, 3])); - } - - #[test] - fn payload_buffer_push_equal_participants_is_skipped() { - let mut buf = PayloadBuffer::new(10); - let data = make_att_data(1); - let data_root = data.hash_tree_root(); - - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validators(&[1, 2]), - ); - buf.push( - HashedAttestationData::new(data), - make_proof_for_validators(&[1, 2]), - ); - - assert_eq!(buf.total_proofs, 1); - assert_eq!(buf.data[&data_root].proofs.len(), 1); - } - - #[test] - fn payload_buffer_push_incomparable_proofs_coexist() { - let mut buf = PayloadBuffer::new(10); - let data = make_att_data(1); - let data_root = data.hash_tree_root(); - - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validators(&[1, 2]), - ); - buf.push( - HashedAttestationData::new(data), - make_proof_for_validators(&[3, 4]), - ); - - assert_eq!(buf.total_proofs, 2); - assert_eq!(buf.data[&data_root].proofs.len(), 2); - } - - #[test] - fn payload_buffer_push_superset_absorbs_multiple_subsets() { - let mut buf = PayloadBuffer::new(10); - let data = make_att_data(1); - let data_root = data.hash_tree_root(); - - // Three pairwise-incomparable singletons: all retained. - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validators(&[1]), - ); - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validators(&[2]), - ); - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validators(&[3]), - ); - assert_eq!(buf.total_proofs, 3); - - // Superset push absorbs all three at once. - buf.push( - HashedAttestationData::new(data), - make_proof_for_validators(&[1, 2, 3]), - ); - - assert_eq!(buf.total_proofs, 1); - assert_eq!(buf.data[&data_root].proofs.len(), 1); - // `order` still contains the single entry. - assert_eq!(buf.order.len(), 1); - assert_eq!(buf.order.front().copied(), Some(data_root)); - } - - #[test] - fn payload_buffer_push_mixed_kept_and_removed() { - let mut buf = PayloadBuffer::new(10); - let data = make_att_data(1); - let data_root = data.hash_tree_root(); - - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validators(&[1, 2]), - ); - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validators(&[5, 6]), - ); - buf.push( - HashedAttestationData::new(data), - make_proof_for_validators(&[1, 2, 3]), - ); - - assert_eq!(buf.total_proofs, 2); - - let sets: HashSet> = buf.data[&data_root] - .proofs - .iter() - .map(|p| { - let mut v: Vec = p.participant_indices().collect(); - v.sort_unstable(); - v - }) - .collect(); - assert!(sets.contains(&vec![5, 6])); - assert!(sets.contains(&vec![1, 2, 3])); - } - - #[test] - fn payload_buffer_push_empty_participants_subsumed_by_anything() { - let mut buf = PayloadBuffer::new(10); - let data = make_att_data(1); - let data_root = data.hash_tree_root(); - - // Empty-participant proof inserted first: anything that follows absorbs it. - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validators(&[]), - ); - assert_eq!(buf.total_proofs, 1); - buf.push( - HashedAttestationData::new(data.clone()), - make_proof_for_validators(&[1, 2]), - ); - assert_eq!(buf.total_proofs, 1); - assert_eq!( - buf.data[&data_root].proofs[0] - .participant_indices() - .collect::>(), - vec![1, 2] - ); - - // Empty-participant proof pushed against existing non-empty: incoming is subsumed, skipped. - buf.push( - HashedAttestationData::new(data), - make_proof_for_validators(&[]), - ); - assert_eq!(buf.total_proofs, 1); - } - - #[test] - fn payload_buffer_push_cross_data_root_independence() { - let mut buf = PayloadBuffer::new(10); - let data_a = make_att_data(1); - let data_b = make_att_data(2); - let root_a = data_a.hash_tree_root(); - let root_b = data_b.hash_tree_root(); - - buf.push( - HashedAttestationData::new(data_a), - make_proof_for_validators(&[1, 2, 3]), - ); - buf.push( - HashedAttestationData::new(data_b), - make_proof_for_validators(&[1, 2]), - ); - - // Different data_roots → no cross-entry subsumption. - assert_eq!(buf.total_proofs, 2); - assert_eq!(buf.data[&root_a].proofs.len(), 1); - assert_eq!(buf.data[&root_b].proofs.len(), 1); - } - - #[test] - fn payload_buffer_push_fifo_eviction_uses_total_proofs() { - let mut buf = PayloadBuffer::new(2); - let data_a = make_att_data(1); - let data_b = make_att_data(2); - let data_c = make_att_data(3); - let root_a = data_a.hash_tree_root(); - let root_c = data_c.hash_tree_root(); - - buf.push( - HashedAttestationData::new(data_a), - make_proof_for_validators(&[1]), - ); - buf.push( - HashedAttestationData::new(data_b), - make_proof_for_validators(&[2, 3]), - ); - // total_proofs == 3, over capacity → evict oldest (root_a). - // Pushing a third distinct data_root triggers eviction via capacity. - buf.push( - HashedAttestationData::new(data_c), - make_proof_for_validators(&[4]), - ); - - assert!(!buf.data.contains_key(&root_a)); - assert!(buf.data.contains_key(&root_c)); - assert_eq!(buf.total_proofs, 2); - } - - /// Build an attestation message at `slot` whose target points at `target_root`, - /// distinct from the default zero target so two such datas have different roots. - fn make_att_data_for_target(slot: u64, target_root: H256) -> AttestationData { - AttestationData { - slot, - head: Checkpoint::default(), - target: Checkpoint { - root: target_root, - slot, - }, - source: Checkpoint::default(), - } - } - - /// When two aggregations share `slot` but disagree on the target - /// (same-slot equivocation), the *first inserted* aggregation must win for - /// the validators that participate in both. The fork-choice spec test - /// `test_same_slot_equivocating_attesters_count_once` depends on this. - /// HashMap iteration would make this RandomState-seeded and flaky. - #[test] - fn extract_latest_attestations_first_inserted_wins_on_slot_tie() { - let target_a = H256([0xaa; 32]); - let target_b = H256([0xbb; 32]); - let data_a = make_att_data_for_target(3, target_a); - let data_b = make_att_data_for_target(3, target_b); - assert_ne!(data_a.hash_tree_root(), data_b.hash_tree_root()); - - // Order 1: A then B → validators 0,1 (in both) must see A. - let mut buf = PayloadBuffer::new(10); - buf.push( - HashedAttestationData::new(data_a.clone()), - make_proof_for_validators(&[0, 1, 2]), - ); - buf.push( - HashedAttestationData::new(data_b.clone()), - make_proof_for_validators(&[0, 1, 3, 4]), - ); - let extracted = buf.extract_latest_attestations(); - assert_eq!(extracted[&0].target.root, target_a); - assert_eq!(extracted[&1].target.root, target_a); - assert_eq!(extracted[&2].target.root, target_a); - assert_eq!(extracted[&3].target.root, target_b); - assert_eq!(extracted[&4].target.root, target_b); - - // Order 2: B then A → validators 0,1 must now see B. - let mut buf = PayloadBuffer::new(10); - buf.push( - HashedAttestationData::new(data_b), - make_proof_for_validators(&[0, 1, 3, 4]), - ); - buf.push( - HashedAttestationData::new(data_a), - make_proof_for_validators(&[0, 1, 2]), - ); - let extracted = buf.extract_latest_attestations(); - assert_eq!(extracted[&0].target.root, target_b); - assert_eq!(extracted[&1].target.root, target_b); - assert_eq!(extracted[&2].target.root, target_a); - assert_eq!(extracted[&3].target.root, target_b); - assert_eq!(extracted[&4].target.root, target_b); - } - - /// `drain` must hand back entries in insertion order so that - /// `promote_new_aggregated_payloads` lands them in known_payloads in the - /// same order, preserving same-slot equivocation semantics through the - /// new → known migration. - #[test] - fn drain_preserves_insertion_order() { - let target_a = H256([0xaa; 32]); - let target_b = H256([0xbb; 32]); - let target_c = H256([0xcc; 32]); - let data_a = make_att_data_for_target(1, target_a); - let data_b = make_att_data_for_target(2, target_b); - let data_c = make_att_data_for_target(3, target_c); - - let mut buf = PayloadBuffer::new(10); - buf.push(HashedAttestationData::new(data_a), make_proof()); - buf.push(HashedAttestationData::new(data_b), make_proof()); - buf.push(HashedAttestationData::new(data_c), make_proof()); - - let drained = buf.drain(); - let slots: Vec = drained.iter().map(|(h, _)| h.data().slot).collect(); - assert_eq!(slots, vec![1, 2, 3]); - assert!(buf.data.is_empty()); - assert!(buf.order.is_empty()); - assert_eq!(buf.total_proofs, 0); - } - - // ============ GossipSignatureBuffer Tests ============ - - fn make_dummy_sig() -> ValidatorSignature { - use ethlambda_types::signature::LeanSignatureScheme; - use leansig::{serialization::Serializable, signature::SignatureScheme}; - use rand::{SeedableRng, rngs::StdRng}; - - static CACHED_SIG: std::sync::LazyLock> = std::sync::LazyLock::new(|| { - let mut rng = StdRng::seed_from_u64(42); - let lifetime = 1 << 5; // small for speed - let (_pk, sk) = LeanSignatureScheme::key_gen(&mut rng, 0, lifetime); - let sig = LeanSignatureScheme::sign(&sk, 0, &[0u8; 32]).unwrap(); - sig.to_bytes() - }); - - ValidatorSignature::from_bytes(&CACHED_SIG).expect("cached test signature") - } - - #[test] - fn gossip_buffer_fifo_eviction() { - // Capacity of 3 signatures total - let mut buf = GossipSignatureBuffer::new(3); - - // Insert 3 sigs across 3 data_roots (1 sig each) - for slot in 1..=3u64 { - let data = make_att_data(slot); - buf.insert(HashedAttestationData::new(data), 0, make_dummy_sig()); - } - assert_eq!(buf.total_signatures(), 3); - assert_eq!(buf.len(), 3); - - // Insert a 4th — should evict the oldest (slot 1) - let data4 = make_att_data(4); - buf.insert(HashedAttestationData::new(data4), 0, make_dummy_sig()); - assert_eq!(buf.total_signatures(), 3); - assert_eq!(buf.len(), 3); - - // Slot 1 should be gone - let slot1_root = HashedAttestationData::new(make_att_data(1)).root(); - assert!(!buf.data.contains_key(&slot1_root)); - - // Slots 2, 3, 4 should remain - let slot2_root = HashedAttestationData::new(make_att_data(2)).root(); - let slot4_root = HashedAttestationData::new(make_att_data(4)).root(); - assert!(buf.data.contains_key(&slot2_root)); - assert!(buf.data.contains_key(&slot4_root)); - } - - #[test] - fn gossip_buffer_dedup_last_write_wins() { - let mut buf = GossipSignatureBuffer::new(100); - let data = make_att_data(1); - let hashed = HashedAttestationData::new(data); - - buf.insert(hashed.clone(), 0, make_dummy_sig()); - buf.insert(hashed.clone(), 0, make_dummy_sig()); - - // Last-write-wins: overwrites the signature but count stays at 1 - assert_eq!(buf.total_signatures(), 1); - assert_eq!(buf.len(), 1); - } - - #[test] - fn gossip_buffer_multiple_validators_per_root() { - let mut buf = GossipSignatureBuffer::new(100); - let data = make_att_data(1); - - buf.insert( - HashedAttestationData::new(data.clone()), - 0, - make_dummy_sig(), - ); - buf.insert( - HashedAttestationData::new(data.clone()), - 1, - make_dummy_sig(), - ); - buf.insert( - HashedAttestationData::new(data.clone()), - 2, - make_dummy_sig(), - ); - - assert_eq!(buf.total_signatures(), 3); - assert_eq!(buf.len(), 1); // One data_root - } - - #[test] - fn gossip_buffer_delete_cleans_up() { - let mut buf = GossipSignatureBuffer::new(100); - let data = make_att_data(1); - let root = HashedAttestationData::new(data.clone()).root(); - - buf.insert( - HashedAttestationData::new(data.clone()), - 0, - make_dummy_sig(), - ); - buf.insert( - HashedAttestationData::new(data.clone()), - 1, - make_dummy_sig(), - ); - assert_eq!(buf.total_signatures(), 2); - - // Delete one sig — root should remain - buf.delete(&[(0, root)]); - assert_eq!(buf.total_signatures(), 1); - assert_eq!(buf.len(), 1); - - // Delete last sig — root should be fully removed - buf.delete(&[(1, root)]); - assert_eq!(buf.total_signatures(), 0); - assert_eq!(buf.len(), 0); - assert!(buf.order.is_empty()); - } - - #[test] - fn gossip_buffer_prune_by_slot() { - let mut buf = GossipSignatureBuffer::new(100); - - // Insert sigs at slots 1, 2, 3, 4, 5 - for slot in 1..=5u64 { - buf.insert( - HashedAttestationData::new(make_att_data(slot)), - 0, - make_dummy_sig(), - ); - } - assert_eq!(buf.total_signatures(), 5); - - // Prune slots <= 3 - let pruned = buf.prune(3); - assert_eq!(pruned, 3); - assert_eq!(buf.total_signatures(), 2); - assert_eq!(buf.len(), 2); - assert_eq!(buf.order.len(), 2); - } - - #[test] - fn gossip_buffer_eviction_removes_whole_root() { - // Capacity of 4 signatures - let mut buf = GossipSignatureBuffer::new(4); - - // Slot 1: 3 validators - let data1 = make_att_data(1); - buf.insert( - HashedAttestationData::new(data1.clone()), - 0, - make_dummy_sig(), - ); - buf.insert( - HashedAttestationData::new(data1.clone()), - 1, - make_dummy_sig(), - ); - buf.insert( - HashedAttestationData::new(data1.clone()), - 2, - make_dummy_sig(), - ); - - // Slot 2: 1 validator - let data2 = make_att_data(2); - buf.insert( - HashedAttestationData::new(data2.clone()), - 0, - make_dummy_sig(), - ); - assert_eq!(buf.total_signatures(), 4); - - // Insert slot 3 — should evict slot 1 (3 sigs), now total = 2 - let data3 = make_att_data(3); - buf.insert(HashedAttestationData::new(data3), 0, make_dummy_sig()); - - let slot1_root = HashedAttestationData::new(data1).root(); - assert!(!buf.data.contains_key(&slot1_root)); - assert_eq!(buf.total_signatures(), 2); // slot 2 (1) + slot 3 (1) - assert_eq!(buf.len(), 2); - } } diff --git a/crates/storage/src/types.rs b/crates/storage/src/types.rs new file mode 100644 index 00000000..51e5b0d2 --- /dev/null +++ b/crates/storage/src/types.rs @@ -0,0 +1,952 @@ +use ethlambda_types::{ + attestation::{AttestationData, HashedAttestationData, bits_is_subset}, + block::AggregatedSignatureProof, + checkpoint::Checkpoint, + primitives::H256, + signature::ValidatorSignature, +}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; + +/// Checkpoints to update in the forkchoice store. +/// +/// Used with `Store::update_checkpoints` to update head and optionally +/// update justified/finalized checkpoints (only if higher slot). +pub struct ForkCheckpoints { + pub head: H256, + pub justified: Option, + pub finalized: Option, +} + +impl ForkCheckpoints { + pub fn head_only(head: H256) -> Self { + Self { + head, + justified: None, + finalized: None, + } + } + + pub fn new(head: H256, justified: Option, finalized: Option) -> Self { + Self { + head, + justified, + finalized, + } + } +} + +/// An entry in the payload buffer: attestation data + set of proofs. +#[derive(Clone)] +pub(crate) struct PayloadEntry { + pub data: AttestationData, + pub proofs: Vec, +} + +/// Fixed-size circular buffer for aggregated payloads, keyed by data_root. +/// Entries are evicted FIFO when the buffer reaches capacity. +#[derive(Clone)] +pub(crate) struct PayloadBuffer { + pub data: HashMap, + pub order: VecDeque, + pub capacity: usize, + pub total_proofs: usize, +} + +impl PayloadBuffer { + pub fn new(capacity: usize) -> Self { + Self { + data: HashMap::with_capacity(capacity), + order: VecDeque::with_capacity(capacity), + capacity, + total_proofs: 0, + } + } + + /// Insert a proof for an attestation, FIFO-evicting oldest data_roots + /// when total proofs reach capacity. Also ensures the buffer doesn't + /// include proofs which are a subset of other proofs for the same + /// attestation data: + /// + /// - If the incoming proof's participants are a subset (incl. equal) of + /// any existing proof, the incoming proof is redundant and skipped. + /// - Otherwise, any existing proof whose participants are a strict subset + /// of the incoming proof's is removed before inserting. + pub fn push(&mut self, hashed: HashedAttestationData, proof: AggregatedSignatureProof) { + let (data_root, att_data) = hashed.into_parts(); + + if let Some(entry) = self.data.get_mut(&data_root) { + let mut to_remove: Vec = Vec::new(); + for (i, p) in entry.proofs.iter().enumerate() { + // Incoming is subsumed by an existing proof (incl. equal). Skip. + if bits_is_subset(&proof.participants, &p.participants) { + return; + } + // Existing is a strict subset of incoming. Mark for removal. + // (Non-strict equality was ruled out by the check above.) + if bits_is_subset(&p.participants, &proof.participants) { + to_remove.push(i); + } + } + + // Remove subsumed proofs (reverse order so earlier indices stay valid). + for i in to_remove.into_iter().rev() { + entry.proofs.swap_remove(i); + self.total_proofs -= 1; + } + + entry.proofs.push(proof); + self.total_proofs += 1; + } else { + self.data.insert( + data_root, + PayloadEntry { + data: att_data, + proofs: vec![proof], + }, + ); + self.order.push_back(data_root); + self.total_proofs += 1; + } + // Evict oldest data_roots until under capacity + while self.total_proofs > self.capacity { + if let Some(evicted) = self.order.pop_front() { + if let Some(removed) = self.data.remove(&evicted) { + self.total_proofs -= removed.proofs.len(); + } + } else { + break; + } + } + } + + pub fn push_batch(&mut self, entries: Vec<(HashedAttestationData, AggregatedSignatureProof)>) { + for (hashed, proof) in entries { + self.push(hashed, proof); + } + } + + /// Take all entries, leaving the buffer empty. + /// + /// Drains in insertion order (via `self.order`) so downstream consumers + /// like `promote_new_aggregated_payloads` re-insert into known_payloads + /// deterministically. HashMap iteration would be RandomState-seeded and + /// produce non-deterministic vote ordering for same-slot equivocation. + pub fn drain(&mut self) -> Vec<(HashedAttestationData, AggregatedSignatureProof)> { + self.total_proofs = 0; + let mut result = Vec::with_capacity(self.data.values().map(|e| e.proofs.len()).sum()); + while let Some(data_root) = self.order.pop_front() { + if let Some(entry) = self.data.remove(&data_root) { + for proof in entry.proofs { + result.push((HashedAttestationData::new(entry.data.clone()), proof)); + } + } + } + result + } + + pub fn len(&self) -> usize { + self.data.len() + } + + pub fn proof_count_for_root(&self, data_root: &H256) -> usize { + self.data.get(data_root).map_or(0, |e| e.proofs.len()) + } + + pub fn proofs_for_root(&self, data_root: &H256) -> Vec { + self.data + .get(data_root) + .map_or_else(Vec::new, |e| e.proofs.clone()) + } + + pub fn attestation_data_keys(&self) -> Vec<(H256, AttestationData)> { + self.data + .iter() + .map(|(&root, entry)| (root, entry.data.clone())) + .collect() + } + + /// Extract per-validator latest attestations from proofs' participation bits. + /// + /// Iterates entries in insertion order (via `self.order`) so that, when two + /// aggregations carry the same `slot` but disagree on the target (an + /// equivocation by the shared validators), the first-observed aggregation + /// wins. The ethrex spec relies on Python dict insertion-order semantics + /// here; iterating `self.data.values()` would be RandomState-seeded and + /// fail the equivocation fork-choice tests non-deterministically. + pub fn extract_latest_attestations(&self) -> HashMap { + let mut result: HashMap = HashMap::new(); + for data_root in &self.order { + let Some(entry) = self.data.get(data_root) else { + continue; + }; + for proof in &entry.proofs { + for vid in proof.participant_indices() { + let should_update = result + .get(&vid) + .is_none_or(|existing| existing.slot < entry.data.slot); + if should_update { + result.insert(vid, entry.data.clone()); + } + } + } + } + result + } +} + +/// Gossip signatures grouped by attestation data. +/// +/// Signatures are stored in a `BTreeMap` keyed by validator_id to guarantee +/// ascending iteration order. XMSS aggregate proofs are order-dependent: +/// verification reconstructs pubkeys from the participation bitfield (low-to-high), +/// so aggregation must produce them in the same ascending order. +pub(crate) struct GossipDataEntry { + pub data: AttestationData, + pub signatures: BTreeMap, +} + +/// Gossip signatures snapshot: (hashed_attestation_data, Vec<(validator_id, signature)>). +pub type GossipSignatureSnapshot = Vec<(HashedAttestationData, Vec<(u64, ValidatorSignature)>)>; + +/// Bounded buffer for gossip signatures with FIFO eviction. +/// +/// Groups signatures by attestation data (via data_root). Each distinct +/// attestation message stores the full `AttestationData` plus individual +/// validator signatures in ascending order (required for XMSS aggregation). +/// +/// Entries are evicted FIFO (by insertion order of the data_root) when +/// total_signatures exceeds capacity, matching the `PayloadBuffer` pattern. +pub(crate) struct GossipSignatureBuffer { + pub data: HashMap, + pub order: VecDeque, + pub capacity: usize, + pub total_signatures: usize, +} + +impl GossipSignatureBuffer { + pub fn new(capacity: usize) -> Self { + Self { + data: HashMap::new(), + order: VecDeque::new(), + capacity, + total_signatures: 0, + } + } + + /// Insert a gossip signature, FIFO-evicting oldest data_roots when over capacity. + /// + /// Last-write-wins: if (validator_id, data_root) already exists, the signature is overwritten. + pub fn insert( + &mut self, + hashed: HashedAttestationData, + validator_id: u64, + signature: ValidatorSignature, + ) { + let (data_root, att_data) = hashed.into_parts(); + + if let Some(entry) = self.data.get_mut(&data_root) { + let is_new = entry.signatures.insert(validator_id, signature).is_none(); + if is_new { + self.total_signatures += 1; + } + } else { + let mut signatures = BTreeMap::new(); + signatures.insert(validator_id, signature); + self.data.insert( + data_root, + GossipDataEntry { + data: att_data, + signatures, + }, + ); + self.order.push_back(data_root); + self.total_signatures += 1; + } + + // Evict oldest data_roots until under capacity + while self.total_signatures > self.capacity { + if let Some(evicted) = self.order.pop_front() { + if let Some(removed) = self.data.remove(&evicted) { + self.total_signatures -= removed.signatures.len(); + } + } else { + break; + } + } + } + + /// Delete gossip entries for the given (validator_id, data_root) pairs. + /// + /// When all signatures for a data_root are removed, the entry is cleaned up. + /// Collects emptied roots and batch-cleans the VecDeque in one pass. + pub fn delete(&mut self, keys: &[(u64, H256)]) { + if keys.is_empty() { + return; + } + let mut emptied_roots: HashSet = HashSet::new(); + for &(vid, data_root) in keys { + if let Some(entry) = self.data.get_mut(&data_root) { + if entry.signatures.remove(&vid).is_some() { + self.total_signatures -= 1; + } + if entry.signatures.is_empty() { + self.data.remove(&data_root); + emptied_roots.insert(data_root); + } + } + } + if !emptied_roots.is_empty() { + self.order.retain(|r| !emptied_roots.contains(r)); + } + } + + /// Prune gossip signatures for slots <= finalized_slot. + /// + /// Returns the number of data_root entries pruned. + pub fn prune(&mut self, finalized_slot: u64) -> usize { + let mut pruned_roots: HashSet = HashSet::new(); + self.data.retain(|root, entry| { + if entry.data.slot > finalized_slot { + true + } else { + self.total_signatures -= entry.signatures.len(); + pruned_roots.insert(*root); + false + } + }); + if !pruned_roots.is_empty() { + self.order.retain(|r| !pruned_roots.contains(r)); + } + pruned_roots.len() + } + + pub fn snapshot(&self) -> GossipSignatureSnapshot { + self.data + .values() + .map(|entry| { + let sigs: Vec<_> = entry + .signatures + .iter() + .map(|(&vid, sig)| (vid, sig.clone())) + .collect(); + (HashedAttestationData::new(entry.data.clone()), sigs) + }) + .collect() + } + + pub fn total_signatures(&self) -> usize { + self.total_signatures + } + + pub fn len(&self) -> usize { + self.data.len() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ethlambda_types::primitives::HashTreeRoot as _; + + fn make_proof() -> AggregatedSignatureProof { + use ethlambda_types::attestation::AggregationBits; + AggregatedSignatureProof::empty(AggregationBits::new()) + } + + /// Create a proof with a specific validator bit set (distinct participants). + fn make_proof_for_validator(vid: usize) -> AggregatedSignatureProof { + use ethlambda_types::attestation::AggregationBits; + let mut bits = AggregationBits::with_length(vid + 1).unwrap(); + bits.set(vid, true).unwrap(); + AggregatedSignatureProof::empty(bits) + } + + /// Create a proof with bits set for every validator in `vids`. + fn make_proof_for_validators(vids: &[u64]) -> AggregatedSignatureProof { + use ethlambda_types::attestation::AggregationBits; + let max = vids.iter().copied().max().unwrap_or(0) as usize; + let mut bits = AggregationBits::with_length(max + 1).unwrap(); + for &v in vids { + bits.set(v as usize, true).unwrap(); + } + AggregatedSignatureProof::empty(bits) + } + + fn make_att_data(slot: u64) -> AttestationData { + AttestationData { + slot, + head: Checkpoint::default(), + target: Checkpoint::default(), + source: Checkpoint::default(), + } + } + + #[test] + fn payload_buffer_fifo_eviction() { + let mut buf = PayloadBuffer::new(3); + + // Insert 3 distinct attestation data entries (different slots -> different roots) + for slot in 1..=3u64 { + let data = make_att_data(slot); + buf.push(HashedAttestationData::new(data), make_proof()); + } + assert_eq!(buf.len(), 3); + + // Pushing a 4th should evict the oldest (slot 1) + let data = make_att_data(4); + buf.push(HashedAttestationData::new(data), make_proof()); + assert_eq!(buf.len(), 3); + + // The oldest (slot 1) should be gone + let att_data_1 = make_att_data(1); + assert!(!buf.data.contains_key(&att_data_1.hash_tree_root())); + } + + #[test] + fn payload_buffer_multiple_proofs_per_data() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + // Insert 3 proofs with distinct participants for the same attestation data + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validator(0), + ); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validator(1), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validator(2), + ); + + // Should be 1 distinct data entry with 3 proofs + assert_eq!(buf.len(), 1); + assert_eq!(buf.data[&data_root].proofs.len(), 3); + } + + #[test] + fn payload_buffer_drain_empties_buffer() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validator(0), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validator(1), + ); + + let drained = buf.drain(); + assert_eq!(drained.len(), 2); // 2 proofs flattened + assert!(buf.data.is_empty()); + assert!(buf.order.is_empty()); + } + + #[test] + fn payload_buffer_push_superset_removes_strict_subset() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2, 3]), + ); + + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.data[&data_root].proofs.len(), 1); + let kept: HashSet = buf.data[&data_root].proofs[0] + .participant_indices() + .collect(); + assert_eq!(kept, HashSet::from([1, 2, 3])); + } + + #[test] + fn payload_buffer_push_subset_is_skipped() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2, 3]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2]), + ); + + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.data[&data_root].proofs.len(), 1); + let kept: HashSet = buf.data[&data_root].proofs[0] + .participant_indices() + .collect(); + assert_eq!(kept, HashSet::from([1, 2, 3])); + } + + #[test] + fn payload_buffer_push_equal_participants_is_skipped() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2]), + ); + + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.data[&data_root].proofs.len(), 1); + } + + #[test] + fn payload_buffer_push_incomparable_proofs_coexist() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[3, 4]), + ); + + assert_eq!(buf.total_proofs, 2); + assert_eq!(buf.data[&data_root].proofs.len(), 2); + } + + #[test] + fn payload_buffer_push_superset_absorbs_multiple_subsets() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + // Three pairwise-incomparable singletons: all retained. + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1]), + ); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[2]), + ); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[3]), + ); + assert_eq!(buf.total_proofs, 3); + + // Superset push absorbs all three at once. + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2, 3]), + ); + + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.data[&data_root].proofs.len(), 1); + // `order` still contains the single entry. + assert_eq!(buf.order.len(), 1); + assert_eq!(buf.order.front().copied(), Some(data_root)); + } + + #[test] + fn payload_buffer_push_mixed_kept_and_removed() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[5, 6]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2, 3]), + ); + + assert_eq!(buf.total_proofs, 2); + + let sets: HashSet> = buf.data[&data_root] + .proofs + .iter() + .map(|p| { + let mut v: Vec = p.participant_indices().collect(); + v.sort_unstable(); + v + }) + .collect(); + assert!(sets.contains(&vec![5, 6])); + assert!(sets.contains(&vec![1, 2, 3])); + } + + #[test] + fn payload_buffer_push_empty_participants_subsumed_by_anything() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + // Empty-participant proof inserted first: anything that follows absorbs it. + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[]), + ); + assert_eq!(buf.total_proofs, 1); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + assert_eq!(buf.total_proofs, 1); + assert_eq!( + buf.data[&data_root].proofs[0] + .participant_indices() + .collect::>(), + vec![1, 2] + ); + + // Empty-participant proof pushed against existing non-empty: incoming is subsumed, skipped. + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[]), + ); + assert_eq!(buf.total_proofs, 1); + } + + #[test] + fn payload_buffer_push_cross_data_root_independence() { + let mut buf = PayloadBuffer::new(10); + let data_a = make_att_data(1); + let data_b = make_att_data(2); + let root_a = data_a.hash_tree_root(); + let root_b = data_b.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data_a), + make_proof_for_validators(&[1, 2, 3]), + ); + buf.push( + HashedAttestationData::new(data_b), + make_proof_for_validators(&[1, 2]), + ); + + // Different data_roots -> no cross-entry subsumption. + assert_eq!(buf.total_proofs, 2); + assert_eq!(buf.data[&root_a].proofs.len(), 1); + assert_eq!(buf.data[&root_b].proofs.len(), 1); + } + + #[test] + fn payload_buffer_push_fifo_eviction_uses_total_proofs() { + let mut buf = PayloadBuffer::new(2); + let data_a = make_att_data(1); + let data_b = make_att_data(2); + let data_c = make_att_data(3); + let root_a = data_a.hash_tree_root(); + let root_c = data_c.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data_a), + make_proof_for_validators(&[1]), + ); + buf.push( + HashedAttestationData::new(data_b), + make_proof_for_validators(&[2, 3]), + ); + // total_proofs == 3, over capacity -> evict oldest (root_a). + // Pushing a third distinct data_root triggers eviction via capacity. + buf.push( + HashedAttestationData::new(data_c), + make_proof_for_validators(&[4]), + ); + + assert!(!buf.data.contains_key(&root_a)); + assert!(buf.data.contains_key(&root_c)); + assert_eq!(buf.total_proofs, 2); + } + + /// Build an attestation message at `slot` whose target points at `target_root`, + /// distinct from the default zero target so two such datas have different roots. + fn make_att_data_for_target(slot: u64, target_root: H256) -> AttestationData { + AttestationData { + slot, + head: Checkpoint::default(), + target: Checkpoint { + root: target_root, + slot, + }, + source: Checkpoint::default(), + } + } + + /// When two aggregations share `slot` but disagree on the target + /// (same-slot equivocation), the *first inserted* aggregation must win for + /// the validators that participate in both. The fork-choice spec test + /// `test_same_slot_equivocating_attesters_count_once` depends on this. + /// HashMap iteration would make this RandomState-seeded and flaky. + #[test] + fn extract_latest_attestations_first_inserted_wins_on_slot_tie() { + let target_a = H256([0xaa; 32]); + let target_b = H256([0xbb; 32]); + let data_a = make_att_data_for_target(3, target_a); + let data_b = make_att_data_for_target(3, target_b); + assert_ne!(data_a.hash_tree_root(), data_b.hash_tree_root()); + + // Order 1: A then B -> validators 0,1 (in both) must see A. + let mut buf = PayloadBuffer::new(10); + buf.push( + HashedAttestationData::new(data_a.clone()), + make_proof_for_validators(&[0, 1, 2]), + ); + buf.push( + HashedAttestationData::new(data_b.clone()), + make_proof_for_validators(&[0, 1, 3, 4]), + ); + let extracted = buf.extract_latest_attestations(); + assert_eq!(extracted[&0].target.root, target_a); + assert_eq!(extracted[&1].target.root, target_a); + assert_eq!(extracted[&2].target.root, target_a); + assert_eq!(extracted[&3].target.root, target_b); + assert_eq!(extracted[&4].target.root, target_b); + + // Order 2: B then A -> validators 0,1 must now see B. + let mut buf = PayloadBuffer::new(10); + buf.push( + HashedAttestationData::new(data_b), + make_proof_for_validators(&[0, 1, 3, 4]), + ); + buf.push( + HashedAttestationData::new(data_a), + make_proof_for_validators(&[0, 1, 2]), + ); + let extracted = buf.extract_latest_attestations(); + assert_eq!(extracted[&0].target.root, target_b); + assert_eq!(extracted[&1].target.root, target_b); + assert_eq!(extracted[&2].target.root, target_a); + assert_eq!(extracted[&3].target.root, target_b); + assert_eq!(extracted[&4].target.root, target_b); + } + + /// `drain` must hand back entries in insertion order so that + /// `promote_new_aggregated_payloads` lands them in known_payloads in the + /// same order, preserving same-slot equivocation semantics through the + /// new -> known migration. + #[test] + fn drain_preserves_insertion_order() { + let target_a = H256([0xaa; 32]); + let target_b = H256([0xbb; 32]); + let target_c = H256([0xcc; 32]); + let data_a = make_att_data_for_target(1, target_a); + let data_b = make_att_data_for_target(2, target_b); + let data_c = make_att_data_for_target(3, target_c); + + let mut buf = PayloadBuffer::new(10); + buf.push(HashedAttestationData::new(data_a), make_proof()); + buf.push(HashedAttestationData::new(data_b), make_proof()); + buf.push(HashedAttestationData::new(data_c), make_proof()); + + let drained = buf.drain(); + let slots: Vec = drained.iter().map(|(h, _)| h.data().slot).collect(); + assert_eq!(slots, vec![1, 2, 3]); + assert!(buf.data.is_empty()); + assert!(buf.order.is_empty()); + assert_eq!(buf.total_proofs, 0); + } + + fn make_dummy_sig() -> ValidatorSignature { + use ethlambda_types::signature::LeanSignatureScheme; + use leansig::{serialization::Serializable, signature::SignatureScheme}; + use rand::{SeedableRng, rngs::StdRng}; + + static CACHED_SIG: std::sync::LazyLock> = std::sync::LazyLock::new(|| { + let mut rng = StdRng::seed_from_u64(42); + let lifetime = 1 << 5; // small for speed + let (_pk, sk) = LeanSignatureScheme::key_gen(&mut rng, 0, lifetime); + let sig = LeanSignatureScheme::sign(&sk, 0, &[0u8; 32]).unwrap(); + sig.to_bytes() + }); + + ValidatorSignature::from_bytes(&CACHED_SIG).expect("cached test signature") + } + + #[test] + fn gossip_buffer_fifo_eviction() { + // Capacity of 3 signatures total + let mut buf = GossipSignatureBuffer::new(3); + + // Insert 3 sigs across 3 data_roots (1 sig each) + for slot in 1..=3u64 { + let data = make_att_data(slot); + buf.insert(HashedAttestationData::new(data), 0, make_dummy_sig()); + } + assert_eq!(buf.total_signatures(), 3); + assert_eq!(buf.len(), 3); + + // Insert a 4th - should evict the oldest (slot 1) + let data4 = make_att_data(4); + buf.insert(HashedAttestationData::new(data4), 0, make_dummy_sig()); + assert_eq!(buf.total_signatures(), 3); + assert_eq!(buf.len(), 3); + + // Slot 1 should be gone + let slot1_root = HashedAttestationData::new(make_att_data(1)).root(); + assert!(!buf.data.contains_key(&slot1_root)); + + // Slots 2, 3, 4 should remain + let slot2_root = HashedAttestationData::new(make_att_data(2)).root(); + let slot4_root = HashedAttestationData::new(make_att_data(4)).root(); + assert!(buf.data.contains_key(&slot2_root)); + assert!(buf.data.contains_key(&slot4_root)); + } + + #[test] + fn gossip_buffer_dedup_last_write_wins() { + let mut buf = GossipSignatureBuffer::new(100); + let data = make_att_data(1); + let hashed = HashedAttestationData::new(data); + + buf.insert(hashed.clone(), 0, make_dummy_sig()); + buf.insert(hashed.clone(), 0, make_dummy_sig()); + + // Last-write-wins: overwrites the signature but count stays at 1 + assert_eq!(buf.total_signatures(), 1); + assert_eq!(buf.len(), 1); + } + + #[test] + fn gossip_buffer_multiple_validators_per_root() { + let mut buf = GossipSignatureBuffer::new(100); + let data = make_att_data(1); + + buf.insert( + HashedAttestationData::new(data.clone()), + 0, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data.clone()), + 1, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data.clone()), + 2, + make_dummy_sig(), + ); + + assert_eq!(buf.total_signatures(), 3); + assert_eq!(buf.len(), 1); // One data_root + } + + #[test] + fn gossip_buffer_delete_cleans_up() { + let mut buf = GossipSignatureBuffer::new(100); + let data = make_att_data(1); + let root = HashedAttestationData::new(data.clone()).root(); + + buf.insert( + HashedAttestationData::new(data.clone()), + 0, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data.clone()), + 1, + make_dummy_sig(), + ); + assert_eq!(buf.total_signatures(), 2); + + // Delete one sig - root should remain + buf.delete(&[(0, root)]); + assert_eq!(buf.total_signatures(), 1); + assert_eq!(buf.len(), 1); + + // Delete last sig - root should be fully removed + buf.delete(&[(1, root)]); + assert_eq!(buf.total_signatures(), 0); + assert_eq!(buf.len(), 0); + assert!(buf.order.is_empty()); + } + + #[test] + fn gossip_buffer_prune_by_slot() { + let mut buf = GossipSignatureBuffer::new(100); + + // Insert sigs at slots 1, 2, 3, 4, 5 + for slot in 1..=5u64 { + buf.insert( + HashedAttestationData::new(make_att_data(slot)), + 0, + make_dummy_sig(), + ); + } + assert_eq!(buf.total_signatures(), 5); + + // Prune slots <= 3 + let pruned = buf.prune(3); + assert_eq!(pruned, 3); + assert_eq!(buf.total_signatures(), 2); + assert_eq!(buf.len(), 2); + assert_eq!(buf.order.len(), 2); + } + + #[test] + fn gossip_buffer_eviction_removes_whole_root() { + // Capacity of 4 signatures + let mut buf = GossipSignatureBuffer::new(4); + + // Slot 1: 3 validators + let data1 = make_att_data(1); + buf.insert( + HashedAttestationData::new(data1.clone()), + 0, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data1.clone()), + 1, + make_dummy_sig(), + ); + buf.insert( + HashedAttestationData::new(data1.clone()), + 2, + make_dummy_sig(), + ); + + // Slot 2: 1 validator + let data2 = make_att_data(2); + buf.insert( + HashedAttestationData::new(data2.clone()), + 0, + make_dummy_sig(), + ); + assert_eq!(buf.total_signatures(), 4); + + // Insert slot 3 - should evict slot 1 (3 sigs), now total = 2 + let data3 = make_att_data(3); + buf.insert(HashedAttestationData::new(data3), 0, make_dummy_sig()); + + let slot1_root = HashedAttestationData::new(data1).root(); + assert!(!buf.data.contains_key(&slot1_root)); + assert_eq!(buf.total_signatures(), 2); // slot 2 (1) + slot 3 (1) + assert_eq!(buf.len(), 2); + } +} diff --git a/crates/storage/src/utils.rs b/crates/storage/src/utils.rs new file mode 100644 index 00000000..7878eed6 --- /dev/null +++ b/crates/storage/src/utils.rs @@ -0,0 +1,60 @@ +use crate::api::{StorageWriteBatch, Table}; +use crate::config::EMPTY_BODY_ROOT; +use ethlambda_types::{ + block::{Block, SignedBlock}, + primitives::H256, +}; +use libssz::SszEncode; + +/// Encode a LiveChain key (slot, root) to bytes. +/// Layout: slot (8 bytes BE) || root (32 bytes). +/// Big-endian ensures lexicographic ordering matches numeric ordering. +pub fn encode_live_chain_key(slot: u64, root: &H256) -> Vec { + let mut result = slot.to_be_bytes().to_vec(); + result.extend_from_slice(&root.0); + result +} + +pub fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { + let slot = u64::from_be_bytes(bytes[..8].try_into().expect("valid slot bytes")); + let root = H256::from_slice(&bytes[8..]); + (slot, root) +} + +/// Write block header, body, and signatures onto an existing batch. +/// +/// Returns the deserialized [`Block`] so callers can access fields +/// without re-deserializing. +pub fn write_signed_block( + batch: &mut dyn StorageWriteBatch, + root: &H256, + signed_block: SignedBlock, +) -> Block { + let SignedBlock { + message: block, + signature, + } = signed_block; + + let header = block.header(); + let root_bytes = root.to_ssz(); + + let header_entries = vec![(root_bytes.clone(), header.to_ssz())]; + batch + .put_batch(Table::BlockHeaders, header_entries) + .expect("put block header"); + + // Skip storing empty bodies - they can be reconstructed from the header's body_root + if header.body_root != *EMPTY_BODY_ROOT { + let body_entries = vec![(root_bytes.clone(), block.body.to_ssz())]; + batch + .put_batch(Table::BlockBodies, body_entries) + .expect("put block body"); + } + + let sig_entries = vec![(root_bytes, signature.to_ssz())]; + batch + .put_batch(Table::BlockSignatures, sig_entries) + .expect("put block signatures"); + + block +} From 576b39494ad9e948223f78572cad3bfa9c2afde6 Mon Sep 17 00:00:00 2001 From: Chahat Patel <213402752+chahat-101@users.noreply.github.com> Date: Sat, 16 May 2026 01:22:34 +0530 Subject: [PATCH 2/3] Update crates/storage/src/types.rs Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- crates/storage/src/types.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/storage/src/types.rs b/crates/storage/src/types.rs index 51e5b0d2..e69953a4 100644 --- a/crates/storage/src/types.rs +++ b/crates/storage/src/types.rs @@ -12,9 +12,9 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; /// Used with `Store::update_checkpoints` to update head and optionally /// update justified/finalized checkpoints (only if higher slot). pub struct ForkCheckpoints { - pub head: H256, - pub justified: Option, - pub finalized: Option, + pub(crate) head: H256, + pub(crate) justified: Option, + pub(crate) finalized: Option, } impl ForkCheckpoints { From 07e3414e29cfbbf908e548ddf80a6ce367e342da Mon Sep 17 00:00:00 2001 From: Chahat Patel <213402752+chahat-101@users.noreply.github.com> Date: Sat, 16 May 2026 01:22:48 +0530 Subject: [PATCH 3/3] Update crates/storage/src/types.rs Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- crates/storage/src/types.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/storage/src/types.rs b/crates/storage/src/types.rs index e69953a4..bbc8ea2b 100644 --- a/crates/storage/src/types.rs +++ b/crates/storage/src/types.rs @@ -338,6 +338,7 @@ impl GossipSignatureBuffer { self.total_signatures } + #[cfg(test)] pub fn len(&self) -> usize { self.data.len() }