Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
},
req_resp::{
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec,
MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status,
MAX_COMPRESSED_PAYLOAD_SIZE, MAX_REQUEST_BLOCKS, Request, STATUS_PROTOCOL_V1, build_status,
fetch_block_from_peer,
},
swarm_adapter::SwarmHandle,
Expand All @@ -59,6 +59,7 @@ const MAX_FETCH_RETRIES: u32 = 10;
const INITIAL_BACKOFF_MS: u64 = 5;
const BACKOFF_MULTIPLIER: u64 = 2;
const PEER_REDIAL_INTERVAL_SECS: u64 = 12;
const MAX_SYNC_RANGE: u64 = MAX_REQUEST_BLOCKS * 64; // 65,536 slots (~3 days)

pub(crate) struct PendingRequest {
pub(crate) attempts: u32,
Expand Down Expand Up @@ -302,6 +303,8 @@ impl P2P {
connected_peers: HashSet::new(),
pending_requests: HashMap::new(),
request_id_map: HashMap::new(),
range_request_ids: HashSet::new(),
pending_sync_ranges: HashSet::new(),
bootnode_addrs: built.bootnode_addrs,
node_names,
};
Expand Down Expand Up @@ -338,6 +341,8 @@ pub struct P2PServer {
pub(crate) connected_peers: HashSet<PeerId>,
pub(crate) pending_requests: HashMap<H256, PendingRequest>,
pub(crate) request_id_map: HashMap<OutboundRequestId, H256>,
pub(crate) range_request_ids: HashSet<OutboundRequestId>,
pub(crate) pending_sync_ranges: HashSet<(u64, u64)>,
bootnode_addrs: HashMap<PeerId, Multiaddr>,
node_names: HashMap<PeerId, String>,
}
Expand Down
125 changes: 118 additions & 7 deletions crates/net/p2p/src/req_resp/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ use ethlambda_types::primitives::HashTreeRoot as _;
use ethlambda_types::{block::SignedBlock, primitives::H256};

use super::{
BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS,
Request, Response, ResponsePayload, Status,
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest,
BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, Response, ResponsePayload, Status,
messages::{ResponseCode, error_message},
};
use crate::{
BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest,
p2p_protocol, req_resp::RequestedBlockRoots,
BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, MAX_SYNC_RANGE, P2PServer,
PendingRequest, p2p_protocol, req_resp::RequestedBlockRoots,
};

pub async fn handle_req_resp_message(
Expand Down Expand Up @@ -62,12 +62,18 @@ pub async fn handle_req_resp_message(
Response::Success { payload } => match payload {
ResponsePayload::Status(status) => {
info!(kind = "status_response", peer_count, "P2P message received");
handle_status_response(status, peer).await;
handle_status_response(server, status, peer).await;
}
ResponsePayload::Blocks(blocks) => {
info!(kind = "blocks_response", peer_count, "P2P message received");
handle_blocks_by_root_response(server, blocks, peer, request_id, ctx)
if server.range_request_ids.remove(&request_id) {
handle_blocks_by_range_response(server, blocks, peer).await;
} else {
handle_blocks_by_root_response(
server, blocks, peer, request_id, ctx,
)
.await;
}
}
},
Response::Error { code, message } => {
Expand All @@ -88,6 +94,8 @@ pub async fn handle_req_resp_message(
// Check if this was a block fetch request
if let Some(root) = server.request_id_map.remove(&request_id) {
handle_fetch_failure(server, root, peer, ctx).await;
} else if server.range_request_ids.remove(&request_id) {
warn!(%peer, ?request_id, "BlocksByRange request failed");
}
}
request_response::Event::InboundFailure {
Expand Down Expand Up @@ -118,8 +126,18 @@ async fn handle_status_request(
server.swarm_handle.send_response(channel, response);
}

async fn handle_status_response(status: Status, peer: PeerId) {
async fn handle_status_response(server: &mut P2PServer, status: Status, peer: PeerId) {
info!(finalized_slot=%status.finalized.slot, head_slot=%status.head.slot, "Received status response from peer {peer}");

let our_head_slot = server.store.head_slot();
if status.head.slot <= our_head_slot {
return;
}
let gap = status.head.slot - our_head_slot;
let count = gap.min(MAX_SYNC_RANGE);
let start_slot = our_head_slot.saturating_add(1);
request_blocks_by_range_from_peer(server, peer, start_slot, count).await;
info!(%peer, start_slot, gap, "Long-range sync: using BlocksByRange");
}

async fn handle_blocks_by_root_request(
Expand Down Expand Up @@ -282,6 +300,36 @@ async fn handle_blocks_by_root_response(
}
}

async fn handle_blocks_by_range_response(
server: &mut P2PServer,
blocks: Vec<SignedBlock>,
peer: PeerId,
) {
info!(%peer, count = blocks.len(), "Received BlocksByRange response");

if blocks.is_empty() {
warn!(%peer, "Received empty BlocksByRange response");
return;
}

if let Some(ref blockchain) = server.blockchain {
for block in blocks {
let block_root = block.message.hash_tree_root();
let slot = block.message.slot;
// TODO: validate block.message.slot is within the originally requested range.
let _ = blockchain.new_block(block).inspect_err(|err| {
error!(
%peer,
%slot,
block_root = %ethlambda_types::ShortRoot(&block_root.0),
%err,
"Failed to forward range-fetched block to blockchain"
)
});
}
}
Comment thread
dicethedev marked this conversation as resolved.
}

/// Build a Status message from the current Store state.
pub fn build_status(store: &Store) -> Status {
let finalized = store.latest_finalized();
Expand Down Expand Up @@ -380,6 +428,69 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool {
true
}

async fn request_blocks_by_range_from_peer(
server: &mut P2PServer,
peer: PeerId,
start_slot: u64,
count: u64,
) -> bool {
if count == 0 {
return true;
}
let end_slot = start_slot.saturating_add(count).saturating_sub(1);

// Deduplicate: skip if we already have this range in-flight
if server.pending_sync_ranges.contains(&(start_slot, end_slot)) {
info!(%peer, start_slot, end_slot, "BlocksByRange already in-flight, skipping duplicate");
return true;
}
server.pending_sync_ranges.insert((start_slot, end_slot));

let mut remaining = count;
let mut next_slot = start_slot;

while remaining > 0 {
let batch_count = remaining.min(MAX_REQUEST_BLOCKS);
let request = BlocksByRangeRequest {
start_slot: next_slot,
count: batch_count,
step: 1,
};

info!(
%peer,
start_slot = next_slot,
count = batch_count,
"Sending BlocksByRange request"
);

let Some(request_id) = server
.swarm_handle
.send_request(
peer,
Request::BlocksByRange(request),
libp2p::StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1),
)
.await
else {
warn!(
%peer,
start_slot = next_slot,
count = batch_count,
"Failed to send BlocksByRange request (swarm adapter closed)"
);
return false;
};

server.range_request_ids.insert(request_id);
Comment thread
dicethedev marked this conversation as resolved.

remaining -= batch_count;
next_slot = next_slot.saturating_add(batch_count);
}

true
}

async fn handle_fetch_failure(
server: &mut P2PServer,
root: H256,
Expand Down