From 77b474b4d60e123c806a138754cd4061a7830369 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Tue, 3 Mar 2026 21:47:17 -0500 Subject: [PATCH] feat(sensor): broadcast blocks, txs, and hashes - Add transaction and block broadcasting to connected peers - Implement LRU cache and BloomSet data structures for efficient tracking - Add per-peer known tx/block tracking to avoid duplicate sends - Support async transaction hash announcements - Add protocol version negotiation - Fix goroutine leak in broadcast handlers - Add pprof lock profiling support Co-Authored-By: Claude Opus 4.5 --- cmd/p2p/sensor/api.go | 2 + cmd/p2p/sensor/sensor.go | 158 ++++++---- doc/polycli_p2p_sensor.md | 10 + go.mod | 4 +- p2p/cache.go | 239 --------------- p2p/conns.go | 295 ++++++++++++++++-- p2p/datastructures/bloomset.go | 173 +++++++++++ p2p/datastructures/bloomset_test.go | 203 ++++++++++++ p2p/datastructures/boundedset.go | 43 +++ p2p/{ => datastructures}/locked.go | 2 +- p2p/datastructures/lru.go | 307 +++++++++++++++++++ p2p/protocol.go | 459 +++++++++++++++++++++++++--- p2p/types.go | 67 ++-- 13 files changed, 1558 insertions(+), 404 deletions(-) delete mode 100644 p2p/cache.go create mode 100644 p2p/datastructures/bloomset.go create mode 100644 p2p/datastructures/bloomset_test.go create mode 100644 p2p/datastructures/boundedset.go rename p2p/{ => datastructures}/locked.go (97%) create mode 100644 p2p/datastructures/lru.go diff --git a/cmd/p2p/sensor/api.go b/cmd/p2p/sensor/api.go index efba4b8c8..fb68eb208 100644 --- a/cmd/p2p/sensor/api.go +++ b/cmd/p2p/sensor/api.go @@ -17,6 +17,7 @@ import ( // (number of p2p messages), along with connection timing information. type peerData struct { Name string `json:"name"` + ProtocolVersion uint `json:"protocol_version"` Received p2p.MessageCount `json:"received"` Sent p2p.MessageCount `json:"sent"` PacketsReceived p2p.MessageCount `json:"packets_received"` @@ -85,6 +86,7 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) { peers[url] = peerData{ Name: conns.GetPeerName(peerID), + ProtocolVersion: conns.GetPeerVersion(peerID), Received: messages.Received, Sent: messages.Sent, PacketsReceived: messages.PacketsReceived, diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 110f271f7..5daa5f397 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -6,8 +6,8 @@ import ( _ "embed" "errors" "fmt" - "os" "os/signal" + "runtime" "syscall" "time" @@ -29,6 +29,7 @@ import ( "github.com/rs/zerolog/log" "github.com/spf13/cobra" + ds "github.com/0xPolygon/polygon-cli/p2p/datastructures" "github.com/0xPolygon/polygon-cli/flag" "github.com/0xPolygon/polygon-cli/p2p" "github.com/0xPolygon/polygon-cli/p2p/database" @@ -52,6 +53,10 @@ type ( ShouldWriteTransactions bool ShouldWriteTransactionEvents bool ShouldWritePeers bool + ShouldBroadcastTx bool + ShouldBroadcastTxHashes bool + ShouldBroadcastBlocks bool + ShouldBroadcastBlockHashes bool ShouldRunPprof bool PprofPort uint ShouldRunPrometheus bool @@ -71,9 +76,12 @@ type ( DiscoveryDNS string Database string NoDiscovery bool - RequestsCache p2p.CacheOptions - ParentsCache p2p.CacheOptions - BlocksCache p2p.CacheOptions + RequestsCache ds.LRUOptions + ParentsCache ds.LRUOptions + BlocksCache ds.LRUOptions + TxsCache ds.LRUOptions + KnownTxsBloom ds.BloomSetOptions + KnownBlocksMax int bootnodes []*enode.Node staticNodes []*enode.Node @@ -166,7 +174,10 @@ var SensorCmd = &cobra.Command{ return nil }, RunE: func(cmd *cobra.Command, args []string) error { - db, err := newDatabase(cmd.Context()) + ctx, stop := signal.NotifyContext(cmd.Context(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + db, err := newDatabase(ctx) if err != nil { return err } @@ -195,21 +206,32 @@ var SensorCmd = &cobra.Command{ // Create peer connection manager for broadcasting transactions // and managing the global blocks cache conns := p2p.NewConns(p2p.ConnsOptions{ - BlocksCache: inputSensorParams.BlocksCache, - Head: head, + BlocksCache: inputSensorParams.BlocksCache, + TxsCache: inputSensorParams.TxsCache, + KnownTxsBloom: inputSensorParams.KnownTxsBloom, + KnownBlocksMax: inputSensorParams.KnownBlocksMax, + Head: head, + ShouldBroadcastTx: inputSensorParams.ShouldBroadcastTx, + ShouldBroadcastTxHashes: inputSensorParams.ShouldBroadcastTxHashes, + ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks, + ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes, }) opts := p2p.EthProtocolOptions{ - Context: cmd.Context(), - Database: db, - GenesisHash: common.HexToHash(inputSensorParams.GenesisHash), - RPC: inputSensorParams.RPC, - SensorID: inputSensorParams.SensorID, - NetworkID: inputSensorParams.NetworkID, - Conns: conns, - ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)}, - RequestsCache: inputSensorParams.RequestsCache, - ParentsCache: inputSensorParams.ParentsCache, + Context: ctx, + Database: db, + GenesisHash: common.HexToHash(inputSensorParams.GenesisHash), + RPC: inputSensorParams.RPC, + SensorID: inputSensorParams.SensorID, + NetworkID: inputSensorParams.NetworkID, + Conns: conns, + ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)}, + RequestsCache: inputSensorParams.RequestsCache, + ParentsCache: inputSensorParams.ParentsCache, + ShouldBroadcastTx: inputSensorParams.ShouldBroadcastTx, + ShouldBroadcastTxHashes: inputSensorParams.ShouldBroadcastTxHashes, + ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks, + ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes, } config := ethp2p.Config{ @@ -242,20 +264,14 @@ var SensorCmd = &cobra.Command{ if err = server.Start(); err != nil { return err } - defer server.Stop() + defer stopServer(&server) events := make(chan *ethp2p.PeerEvent) sub := server.SubscribeEvents(events) defer sub.Unsubscribe() - ticker := time.NewTicker(2 * time.Second) // Ticker for recurring tasks every 2 seconds. - ticker1h := time.NewTicker(time.Hour) // Ticker for running DNS discovery every hour. + ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() - defer ticker1h.Stop() - - dnsLock := make(chan struct{}, 1) - signals := make(chan os.Signal, 1) - signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) if inputSensorParams.ShouldRunPprof { go handlePprof() @@ -266,34 +282,17 @@ var SensorCmd = &cobra.Command{ } go handleAPI(&server, conns) - - // Start the RPC server for receiving transactions go handleRPC(conns, inputSensorParams.NetworkID) - - // Run DNS discovery immediately at startup. - go handleDNSDiscovery(&server, dnsLock) + go handleDNSDiscovery(&server) for { select { case <-ticker.C: peersGauge.Set(float64(server.PeerCount())) - db.WritePeers(cmd.Context(), server.Peers(), time.Now()) - + db.WritePeers(ctx, server.Peers(), time.Now()) metrics.Update(conns.HeadBlock().Block, conns.OldestBlock()) - - urls := []string{} - for _, peer := range server.Peers() { - urls = append(urls, peer.Node().URLv4()) - } - - if err := p2p.WritePeers(inputSensorParams.NodesFile, urls); err != nil { - log.Error().Err(err).Msg("Failed to write nodes to file") - } - case <-ticker1h.C: - go handleDNSDiscovery(&server, dnsLock) - case <-signals: - // This gracefully stops the sensor so that the peers can be written to - // the nodes file. + writePeers(server.Peers()) + case <-ctx.Done(): log.Info().Msg("Stopping sensor...") return nil case event := <-events: @@ -305,11 +304,43 @@ var SensorCmd = &cobra.Command{ }, } +// writePeers writes the enode URLs of connected peers to the nodes file. +func writePeers(peers []*ethp2p.Peer) { + urls := make([]string, 0, len(peers)) + for _, peer := range peers { + urls = append(urls, peer.Node().URLv4()) + } + + if err := p2p.WritePeers(inputSensorParams.NodesFile, urls); err != nil { + log.Error().Err(err).Msg("Failed to write nodes to file") + } +} + +// stopServer stops the p2p server with a timeout to avoid hanging on shutdown. +// This is necessary because go-ethereum's discovery shutdown can deadlock. +func stopServer(server *ethp2p.Server) { + done := make(chan struct{}) + + go func() { + server.Stop() + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + } +} + // handlePprof starts a server for performance profiling using pprof on the // specified port. This allows for real-time monitoring and analysis of the // sensor's performance. The port number is configured through // inputSensorParams.PprofPort. An error is logged if the server fails to start. func handlePprof() { + // Enable mutex and block profiling to detect lock contention. + runtime.SetMutexProfileFraction(1) + runtime.SetBlockProfileRate(1) + addr := fmt.Sprintf(":%d", inputSensorParams.PprofPort) if err := http.ListenAndServe(addr, nil); err != nil { log.Error().Err(err).Msg("Failed to start pprof") @@ -331,20 +362,24 @@ func handlePrometheus() { // handleDNSDiscovery performs DNS-based peer discovery and adds new peers to // the p2p server. It uses an iterator to discover peers incrementally rather -// than loading all nodes at once. The lock channel prevents concurrent runs. -func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) { +// than loading all nodes at once. Runs immediately and then hourly. +func handleDNSDiscovery(server *ethp2p.Server) { if len(inputSensorParams.DiscoveryDNS) == 0 { return } - select { - case lock <- struct{}{}: - defer func() { <-lock }() - default: - log.Warn().Msg("DNS discovery already running, skipping") - return + discoverPeers(server) + + ticker := time.NewTicker(time.Hour) + defer ticker.Stop() + + for range ticker.C { + discoverPeers(server) } +} +// discoverPeers performs a single DNS discovery round. +func discoverPeers(server *ethp2p.Server) { log.Info(). Str("discovery-dns", inputSensorParams.DiscoveryDNS). Msg("Starting DNS discovery") @@ -357,7 +392,6 @@ func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) { } defer iter.Close() - // Add DNS-discovered peers using the iterator. count := 0 for iter.Next() { node := iter.Node() @@ -365,9 +399,6 @@ func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) { Str("enode", node.URLv4()). Msg("Discovered peer through DNS") - // Add the peer to the static node set. The server itself handles whether to - // connect to the peer if it's already connected. If a node is part of the - // static peer set, the server will handle reconnecting after disconnects. server.AddPeer(node) count++ } @@ -450,6 +481,10 @@ will result in less chance of missing data but can significantly increase memory f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true, `write transaction events to database (this option can significantly increase CPU and memory usage)`) f.BoolVar(&inputSensorParams.ShouldWritePeers, "write-peers", true, "write peers to database") + f.BoolVar(&inputSensorParams.ShouldBroadcastTx, "broadcast-txs", false, "broadcast full transactions to peers") + f.BoolVar(&inputSensorParams.ShouldBroadcastTxHashes, "broadcast-tx-hashes", false, "broadcast transaction hashes to peers") + f.BoolVar(&inputSensorParams.ShouldBroadcastBlocks, "broadcast-blocks", false, "broadcast full blocks to peers") + f.BoolVar(&inputSensorParams.ShouldBroadcastBlockHashes, "broadcast-block-hashes", false, "broadcast block hashes to peers") f.BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "run pprof server") f.UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "port pprof runs on") f.BoolVar(&inputSensorParams.ShouldRunPrometheus, "prom", true, "run Prometheus server") @@ -483,4 +518,11 @@ will result in less chance of missing data but can significantly increase memory f.DurationVar(&inputSensorParams.ParentsCache.TTL, "parents-cache-ttl", 5*time.Minute, "time to live for parent hash cache entries (0 for no expiration)") f.IntVar(&inputSensorParams.BlocksCache.MaxSize, "max-blocks", 1024, "maximum blocks to track across all peers (0 for no limit)") f.DurationVar(&inputSensorParams.BlocksCache.TTL, "blocks-cache-ttl", 10*time.Minute, "time to live for block cache entries (0 for no expiration)") + f.IntVar(&inputSensorParams.TxsCache.MaxSize, "max-txs", 32768, "maximum transactions to cache for serving to peers (0 for no limit)") + f.DurationVar(&inputSensorParams.TxsCache.TTL, "txs-cache-ttl", 10*time.Minute, "time to live for transaction cache entries (0 for no expiration)") + f.UintVar(&inputSensorParams.KnownTxsBloom.Size, "known-txs-bloom-size", 327680, + `bloom filter size in bits for tracking known transactions per peer (default ~40KB per filter, +optimized for ~32K elements with ~1% false positive rate)`) + f.UintVar(&inputSensorParams.KnownTxsBloom.HashCount, "known-txs-bloom-hashes", 7, "number of hash functions for known txs bloom filter") + f.IntVar(&inputSensorParams.KnownBlocksMax, "max-known-blocks", 1024, "maximum block hashes to track per peer (0 for no limit)") } diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index 3c7ce877a..0561950e4 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -94,6 +94,10 @@ polycli p2p sensor amoy-nodes.json \ --api-port uint port API server will listen on (default 8080) --blocks-cache-ttl duration time to live for block cache entries (0 for no expiration) (default 10m0s) -b, --bootnodes string comma separated nodes used for bootstrapping + --broadcast-block-hashes broadcast block hashes to peers + --broadcast-blocks broadcast full blocks to peers + --broadcast-tx-hashes broadcast transaction hashes to peers + --broadcast-txs broadcast full transactions to peers --database string which database to persist data to, options are: - datastore (GCP Datastore) - json (output to stdout) @@ -107,12 +111,17 @@ polycli p2p sensor amoy-nodes.json \ -h, --help help for sensor --key string hex-encoded private key (cannot be set with --key-file) -k, --key-file string private key file (cannot be set with --key) + --known-txs-bloom-hashes uint number of hash functions for known txs bloom filter (default 7) + --known-txs-bloom-size uint bloom filter size in bits for tracking known transactions per peer (default ~40KB per filter, + optimized for ~32K elements with ~1% false positive rate) (default 327680) --max-blocks int maximum blocks to track across all peers (0 for no limit) (default 1024) -D, --max-db-concurrency int maximum number of concurrent database operations to perform (increasing this will result in less chance of missing data but can significantly increase memory usage) (default 10000) + --max-known-blocks int maximum block hashes to track per peer (0 for no limit) (default 1024) --max-parents int maximum parent block hashes to track per peer (0 for no limit) (default 1024) -m, --max-peers int maximum number of peers to connect to (default 2000) --max-requests int maximum request IDs to track per peer (0 for no limit) (default 2048) + --max-txs int maximum transactions to cache for serving to peers (0 for no limit) (default 32768) --nat string NAT port mapping mechanism (any|none|upnp|pmp|pmp:|extip:) (default "any") -n, --network-id uint filter discovered nodes by this network ID --no-discovery disable P2P peer discovery @@ -130,6 +139,7 @@ polycli p2p sensor amoy-nodes.json \ --static-nodes string static nodes file --trusted-nodes string trusted nodes file --ttl duration time to live (default 336h0m0s) + --txs-cache-ttl duration time to live for transaction cache entries (0 for no expiration) (default 10m0s) --write-block-events write block events to database (default true) -B, --write-blocks write blocks to database (default true) --write-peers write peers to database (default true) diff --git a/go.mod b/go.mod index 75a3e37f2..a8beedf75 100644 --- a/go.mod +++ b/go.mod @@ -100,7 +100,7 @@ require ( github.com/cockroachdb/redact v1.1.5 // indirect github.com/consensys/gnark-crypto v0.19.2 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/deckarep/golang-set/v2 v2.6.0 // indirect + github.com/deckarep/golang-set/v2 v2.6.0 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/getsentry/sentry-go v0.29.1 // indirect @@ -114,7 +114,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.12 // indirect github.com/googleapis/gax-go/v2 v2.17.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect - github.com/holiman/bloomfilter/v2 v2.0.3 // indirect + github.com/holiman/bloomfilter/v2 v2.0.3 github.com/holiman/uint256 v1.3.2 github.com/huin/goupnp v1.3.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect diff --git a/p2p/cache.go b/p2p/cache.go deleted file mode 100644 index def29458b..000000000 --- a/p2p/cache.go +++ /dev/null @@ -1,239 +0,0 @@ -package p2p - -import ( - "container/list" - "sync" - "time" -) - -// CacheOptions contains configuration for LRU caches with TTL. -type CacheOptions struct { - MaxSize int - TTL time.Duration -} - -// Cache is a thread-safe LRU cache with optional TTL-based expiration. -type Cache[K comparable, V any] struct { - mu sync.RWMutex - maxSize int - ttl time.Duration - items map[K]*list.Element - list *list.List -} - -type entry[K comparable, V any] struct { - key K - value V - expiresAt time.Time -} - -// NewCache creates a new cache with the given options. -// If opts.MaxSize <= 0, the cache has no size limit. -// If opts.TTL is 0, entries never expire based on time. -func NewCache[K comparable, V any](opts CacheOptions) *Cache[K, V] { - return &Cache[K, V]{ - maxSize: opts.MaxSize, - ttl: opts.TTL, - items: make(map[K]*list.Element), - list: list.New(), - } -} - -// Add adds or updates a value in the cache. -func (c *Cache[K, V]) Add(key K, value V) { - c.mu.Lock() - defer c.mu.Unlock() - - now := time.Now() - expiresAt := time.Time{} - if c.ttl > 0 { - expiresAt = now.Add(c.ttl) - } - - if elem, ok := c.items[key]; ok { - c.list.MoveToFront(elem) - e := elem.Value.(*entry[K, V]) - e.value = value - e.expiresAt = expiresAt - return - } - - e := &entry[K, V]{ - key: key, - value: value, - expiresAt: expiresAt, - } - elem := c.list.PushFront(e) - c.items[key] = elem - - if c.maxSize > 0 && c.list.Len() > c.maxSize { - back := c.list.Back() - if back != nil { - c.list.Remove(back) - e := back.Value.(*entry[K, V]) - delete(c.items, e.key) - } - } -} - -// Get retrieves a value from the cache and updates LRU ordering. -func (c *Cache[K, V]) Get(key K) (V, bool) { - c.mu.Lock() - defer c.mu.Unlock() - - elem, ok := c.items[key] - if !ok { - var zero V - return zero, false - } - - e := elem.Value.(*entry[K, V]) - - if c.ttl > 0 && time.Now().After(e.expiresAt) { - c.list.Remove(elem) - delete(c.items, key) - var zero V - return zero, false - } - - c.list.MoveToFront(elem) - return e.value, true -} - -// Peek retrieves a value from the cache without updating LRU ordering. -// Uses a read lock for better concurrency. -func (c *Cache[K, V]) Peek(key K) (V, bool) { - c.mu.RLock() - defer c.mu.RUnlock() - - elem, ok := c.items[key] - if !ok { - var zero V - return zero, false - } - - e := elem.Value.(*entry[K, V]) - - if c.ttl > 0 && time.Now().After(e.expiresAt) { - var zero V - return zero, false - } - - return e.value, true -} - -// Update atomically updates a value in the cache using the provided update function. -// The update function receives the current value (or zero value if not found) and -// returns the new value to store. This is thread-safe and prevents race conditions -// in get-modify-add patterns. -func (c *Cache[K, V]) Update(key K, updateFn func(V) V) { - c.mu.Lock() - defer c.mu.Unlock() - - now := time.Now() - expiresAt := time.Time{} - if c.ttl > 0 { - expiresAt = now.Add(c.ttl) - } - - var currentVal V - if elem, ok := c.items[key]; ok { - e := elem.Value.(*entry[K, V]) - if c.ttl == 0 || !now.After(e.expiresAt) { - currentVal = e.value - // Update existing entry - c.list.MoveToFront(elem) - e.value = updateFn(currentVal) - e.expiresAt = expiresAt - return - } - // Entry expired, remove it - c.list.Remove(elem) - delete(c.items, key) - } - - // Create new entry - newVal := updateFn(currentVal) - e := &entry[K, V]{ - key: key, - value: newVal, - expiresAt: expiresAt, - } - elem := c.list.PushFront(e) - c.items[key] = elem - - // Enforce size limit - if c.maxSize > 0 && c.list.Len() > c.maxSize { - back := c.list.Back() - if back != nil { - c.list.Remove(back) - e := back.Value.(*entry[K, V]) - delete(c.items, e.key) - } - } -} - -// Contains checks if a key exists in the cache and is not expired. -// Uses a read lock and doesn't update LRU ordering. -func (c *Cache[K, V]) Contains(key K) bool { - c.mu.RLock() - defer c.mu.RUnlock() - - elem, ok := c.items[key] - if !ok { - return false - } - - e := elem.Value.(*entry[K, V]) - - if c.ttl > 0 && time.Now().After(e.expiresAt) { - return false - } - - return true -} - -// Remove removes a key from the cache and returns the value if it existed. -func (c *Cache[K, V]) Remove(key K) (V, bool) { - c.mu.Lock() - defer c.mu.Unlock() - - if elem, ok := c.items[key]; ok { - e := elem.Value.(*entry[K, V]) - c.list.Remove(elem) - delete(c.items, key) - return e.value, true - } - - var zero V - return zero, false -} - -// Len returns the number of items in the cache. -func (c *Cache[K, V]) Len() int { - c.mu.RLock() - defer c.mu.RUnlock() - return c.list.Len() -} - -// Purge clears all items from the cache. -func (c *Cache[K, V]) Purge() { - c.mu.Lock() - defer c.mu.Unlock() - - c.items = make(map[K]*list.Element) - c.list.Init() -} - -// Keys returns all keys in the cache. -func (c *Cache[K, V]) Keys() []K { - c.mu.RLock() - defer c.mu.RUnlock() - - keys := make([]K, 0, c.list.Len()) - for elem := c.list.Front(); elem != nil; elem = elem.Next() { - e := elem.Value.(*entry[K, V]) - keys = append(keys, e.key) - } - return keys -} diff --git a/p2p/conns.go b/p2p/conns.go index a44960483..7690cd629 100644 --- a/p2p/conns.go +++ b/p2p/conns.go @@ -10,6 +10,9 @@ import ( "github.com/ethereum/go-ethereum/eth/protocols/eth" ethp2p "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/rs/zerolog/log" + + ds "github.com/0xPolygon/polygon-cli/p2p/datastructures" ) // BlockCache stores the actual block data to avoid duplicate fetches and database queries. @@ -19,6 +22,19 @@ type BlockCache struct { TD *big.Int } +// ConnsOptions contains configuration options for creating a new Conns manager. +type ConnsOptions struct { + BlocksCache ds.LRUOptions + TxsCache ds.LRUOptions + KnownTxsBloom ds.BloomSetOptions + KnownBlocksMax int + Head eth.NewBlockPacket + ShouldBroadcastTx bool + ShouldBroadcastTxHashes bool + ShouldBroadcastBlocks bool + ShouldBroadcastBlockHashes bool +} + // Conns manages a collection of active peer connections for transaction broadcasting. // It also maintains a global cache of blocks written to the database. type Conns struct { @@ -27,35 +43,50 @@ type Conns struct { // blocks tracks blocks written to the database across all peers // to avoid duplicate writes and requests. - blocks *Cache[common.Hash, BlockCache] + blocks *ds.LRU[common.Hash, BlockCache] + + // txs caches transactions for serving to peers and duplicate detection + txs *ds.LRU[common.Hash, *types.Transaction] + + // knownTxsOpts stores bloom filter options for per-peer known tx tracking + knownTxsOpts ds.BloomSetOptions + // knownBlocksMax stores the maximum size for per-peer known block caches + knownBlocksMax int // oldest stores the first block the sensor has seen so when fetching // parent blocks, it does not request blocks older than this. - oldest *Locked[*types.Header] + oldest *ds.Locked[*types.Header] // head keeps track of the current head block of the chain. - head *Locked[eth.NewBlockPacket] -} + head *ds.Locked[eth.NewBlockPacket] -// ConnsOptions contains configuration options for creating a new Conns manager. -type ConnsOptions struct { - BlocksCache CacheOptions - Head eth.NewBlockPacket + // Broadcast flags control what gets cached and rebroadcasted + shouldBroadcastTx bool + shouldBroadcastTxHashes bool + shouldBroadcastBlocks bool + shouldBroadcastBlockHashes bool } // NewConns creates a new connection manager with a blocks cache. func NewConns(opts ConnsOptions) *Conns { - head := &Locked[eth.NewBlockPacket]{} + head := &ds.Locked[eth.NewBlockPacket]{} head.Set(opts.Head) - oldest := &Locked[*types.Header]{} + oldest := &ds.Locked[*types.Header]{} oldest.Set(opts.Head.Block.Header()) return &Conns{ - conns: make(map[string]*conn), - blocks: NewCache[common.Hash, BlockCache](opts.BlocksCache), - oldest: oldest, - head: head, + conns: make(map[string]*conn), + blocks: ds.NewLRU[common.Hash, BlockCache](opts.BlocksCache), + txs: ds.NewLRU[common.Hash, *types.Transaction](opts.TxsCache), + knownTxsOpts: opts.KnownTxsBloom, + knownBlocksMax: opts.KnownBlocksMax, + oldest: oldest, + head: head, + shouldBroadcastTx: opts.ShouldBroadcastTx, + shouldBroadcastTxHashes: opts.ShouldBroadcastTxHashes, + shouldBroadcastBlocks: opts.ShouldBroadcastBlocks, + shouldBroadcastBlockHashes: opts.ShouldBroadcastBlockHashes, } } @@ -81,9 +112,15 @@ func (c *Conns) BroadcastTx(tx *types.Transaction) int { return c.BroadcastTxs(types.Transactions{tx}) } -// BroadcastTxs broadcasts multiple transactions to all connected peers. +// BroadcastTxs broadcasts multiple transactions to all connected peers, +// filtering out transactions that each peer already knows about. // Returns the number of peers the transactions were successfully sent to. +// If broadcast flags are disabled, this is a no-op. func (c *Conns) BroadcastTxs(txs types.Transactions) int { + if !c.shouldBroadcastTx { + return 0 + } + c.mu.RLock() defer c.mu.RUnlock() @@ -93,12 +130,167 @@ func (c *Conns) BroadcastTxs(txs types.Transactions) int { count := 0 for _, cn := range c.conns { - if err := ethp2p.Send(cn.rw, eth.TransactionsMsg, txs); err != nil { + // Filter transactions this peer doesn't know about + unknownTxs := make(types.Transactions, 0, len(txs)) + for _, tx := range txs { + if !cn.hasKnownTx(tx.Hash()) { + unknownTxs = append(unknownTxs, tx) + } + } + + if len(unknownTxs) == 0 { continue } + + // Send as TransactionsPacket + packet := eth.TransactionsPacket(unknownTxs) + cn.countMsgSent(packet.Name(), float64(len(unknownTxs))) + if err := ethp2p.Send(cn.rw, eth.TransactionsMsg, packet); err != nil { + cn.logger.Debug(). + Err(err). + Msg("Failed to send transactions") + continue + } + + // Mark transactions as known for this peer + for _, tx := range unknownTxs { + cn.addKnownTx(tx.Hash()) + } + count++ } + if count > 0 { + log.Debug(). + Int("peers", count). + Int("txs", len(txs)). + Msg("Broadcasted transactions") + } + + return count +} + +// BroadcastTxHashes enqueues transaction hashes to per-peer broadcast queues. +// Each peer has a dedicated goroutine that drains the queue and batches sends. +// Returns the number of peers the hashes were enqueued to. +// If broadcast flags are disabled, this is a no-op. +func (c *Conns) BroadcastTxHashes(hashes []common.Hash) int { + if !c.shouldBroadcastTxHashes || len(hashes) == 0 { + return 0 + } + + // Copy peers to avoid holding lock during sends + c.mu.RLock() + peers := make([]*conn, 0, len(c.conns)) + for _, cn := range c.conns { + if cn.txAnnounce != nil { + peers = append(peers, cn) + } + } + c.mu.RUnlock() + + count := 0 + for _, cn := range peers { + // Non-blocking send, drop if queue full (matches Bor behavior) + select { + case cn.txAnnounce <- hashes: + count++ + case <-cn.closeCh: + // Peer closing, skip + default: + // Channel full, skip to avoid goroutine leak + } + } + + return count +} + +// BroadcastBlock broadcasts a full block to peers that don't already know +// about it and returns the number of peers the block was successfully sent to. +// If broadcast flags are disabled, this is a no-op. +func (c *Conns) BroadcastBlock(block *types.Block, td *big.Int) int { + if !c.shouldBroadcastBlocks { + return 0 + } + + c.mu.RLock() + defer c.mu.RUnlock() + + if block == nil { + return 0 + } + + hash := block.Hash() + count := 0 + + for _, cn := range c.conns { + // Skip if peer already knows about this block + if cn.hasKnownBlock(hash) { + continue + } + + // Send NewBlockPacket + packet := eth.NewBlockPacket{ + Block: block, + TD: td, + } + + cn.countMsgSent(packet.Name(), 1) + if err := ethp2p.Send(cn.rw, eth.NewBlockMsg, &packet); err != nil { + cn.logger.Debug(). + Err(err). + Uint64("number", block.Number().Uint64()). + Msg("Failed to send block") + continue + } + + // Mark block as known for this peer + cn.addKnownBlock(hash) + count++ + } + + if count > 0 { + log.Debug(). + Int("peers", count). + Uint64("number", block.NumberU64()). + Msg("Broadcasted block") + } + + return count +} + +// BroadcastBlockHashes enqueues block hashes to per-peer broadcast queues. +// Each peer has a dedicated goroutine that drains the queue and sends. +// Returns the number of peers the hashes were enqueued to. +// If broadcast flags are disabled, this is a no-op. +func (c *Conns) BroadcastBlockHashes(hashes []common.Hash, numbers []uint64) int { + if !c.shouldBroadcastBlockHashes || len(hashes) == 0 || len(hashes) != len(numbers) { + return 0 + } + + // Build packet once, share across all peers + packet := make(eth.NewBlockHashesPacket, len(hashes)) + for i := range hashes { + packet[i].Hash = hashes[i] + packet[i].Number = numbers[i] + } + + c.mu.RLock() + defer c.mu.RUnlock() + + count := 0 + for _, cn := range c.conns { + if cn.blockAnnounce == nil { + continue + } + // Non-blocking send, drop if queue full (matches Bor) + select { + case cn.blockAnnounce <- packet: + count++ + default: + } + } + return count } @@ -128,8 +320,34 @@ func (c *Conns) PeerConnectedAt(peerID string) time.Time { return time.Time{} } +// AddTxs adds multiple transactions to the shared cache in a single lock operation. +// Returns the computed hashes for reuse by the caller. +func (c *Conns) AddTxs(txs []*types.Transaction) []common.Hash { + if len(txs) == 0 { + return nil + } + hashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + hashes[i] = tx.Hash() + } + c.txs.AddBatch(hashes, txs) + return hashes +} + +// PeekTxs retrieves multiple transactions from the shared cache without updating LRU ordering. +// Uses a single read lock for better concurrency when LRU ordering is not needed. +func (c *Conns) PeekTxs(hashes []common.Hash) []*types.Transaction { + return c.txs.PeekMany(hashes) +} + +// PeekTxsWithHashes retrieves multiple transactions with their hashes from the cache. +// Returns parallel slices of found hashes and transactions. Uses a single read lock. +func (c *Conns) PeekTxsWithHashes(hashes []common.Hash) ([]common.Hash, []*types.Transaction) { + return c.txs.PeekManyWithKeys(hashes) +} + // Blocks returns the global blocks cache. -func (c *Conns) Blocks() *Cache[common.Hash, BlockCache] { +func (c *Conns) Blocks() *ds.LRU[common.Hash, BlockCache] { return c.blocks } @@ -156,6 +374,36 @@ func (c *Conns) UpdateHeadBlock(packet eth.NewBlockPacket) bool { }) } +// KnownTxsOpts returns the bloom filter options for per-peer known tx tracking. +func (c *Conns) KnownTxsOpts() ds.BloomSetOptions { + return c.knownTxsOpts +} + +// KnownBlocksMax returns the maximum size for per-peer known block caches. +func (c *Conns) KnownBlocksMax() int { + return c.knownBlocksMax +} + +// ShouldBroadcastTx returns whether full transaction broadcasting is enabled. +func (c *Conns) ShouldBroadcastTx() bool { + return c.shouldBroadcastTx +} + +// ShouldBroadcastTxHashes returns whether transaction hash broadcasting is enabled. +func (c *Conns) ShouldBroadcastTxHashes() bool { + return c.shouldBroadcastTxHashes +} + +// ShouldBroadcastBlocks returns whether full block broadcasting is enabled. +func (c *Conns) ShouldBroadcastBlocks() bool { + return c.shouldBroadcastBlocks +} + +// ShouldBroadcastBlockHashes returns whether block hash broadcasting is enabled. +func (c *Conns) ShouldBroadcastBlockHashes() bool { + return c.shouldBroadcastBlockHashes +} + // GetPeerMessages returns a snapshot of message counts for a specific peer. // Returns nil if the peer is not found. func (c *Conns) GetPeerMessages(peerID string) *PeerMessages { @@ -182,3 +430,16 @@ func (c *Conns) GetPeerName(peerID string) string { return "" } + +// GetPeerVersion returns the negotiated eth protocol version for a specific peer. +// Returns 0 if the peer is not found. +func (c *Conns) GetPeerVersion(peerID string) uint { + c.mu.RLock() + defer c.mu.RUnlock() + + if cn, ok := c.conns[peerID]; ok { + return cn.version + } + + return 0 +} diff --git a/p2p/datastructures/bloomset.go b/p2p/datastructures/bloomset.go new file mode 100644 index 000000000..500d65d56 --- /dev/null +++ b/p2p/datastructures/bloomset.go @@ -0,0 +1,173 @@ +package datastructures + +import ( + "encoding/binary" + "sync" + + "github.com/ethereum/go-ethereum/common" + bloomfilter "github.com/holiman/bloomfilter/v2" +) + +// BloomSetOptions contains configuration for creating a BloomSet. +type BloomSetOptions struct { + // Size is the number of bits in the bloom filter. + // Larger size = lower false positive rate but more memory. + // Recommended: 10 * expected_elements for ~1% false positive rate. + Size uint + + // HashCount is the number of hash functions to use. + // Recommended: 7 for ~1% false positive rate. + HashCount uint +} + +// DefaultBloomSetOptions returns sensible defaults for tracking ~32K elements +// with approximately 1% false positive rate. +// Memory usage: ~80KB per BloomSet (2 filters of ~40KB each). +func DefaultBloomSetOptions() BloomSetOptions { + return BloomSetOptions{ + Size: 327680, // 32768 * 10 bits ≈ 40KB per filter + HashCount: 7, + } +} + +// BloomSet is a memory-efficient probabilistic set for tracking seen hashes. +// It uses a rotating dual-bloom-filter design: +// - "current" filter receives all new additions +// - "previous" filter is checked during lookups for recency +// - Rotate() moves current to previous and creates a fresh current +// +// Trade-offs vs LRU cache: +// - Pro: ~10x less memory, minimal GC pressure (fixed-size arrays) +// - Pro: O(1) add/lookup with very low constant factor +// - Con: False positives possible (~1% with default settings) +// - Con: No exact eviction control (use Rotate for approximate TTL) +// +// For knownTxs, false positives mean occasionally not broadcasting a tx +// to a peer that doesn't have it - acceptable since they'll get it elsewhere. +// +// This implementation wraps holiman/bloomfilter/v2, the same battle-tested +// bloom filter library used by geth for state pruning. +type BloomSet struct { + mu sync.RWMutex + current *bloomfilter.Filter + previous *bloomfilter.Filter + m uint64 // bits per filter + k uint64 // hash functions +} + +// NewBloomSet creates a new BloomSet with the given options. +// If options are zero-valued, defaults are applied. +func NewBloomSet(opts BloomSetOptions) *BloomSet { + defaults := DefaultBloomSetOptions() + if opts.Size == 0 { + opts.Size = defaults.Size + } + if opts.HashCount == 0 { + opts.HashCount = defaults.HashCount + } + + m := uint64(opts.Size) + k := uint64(opts.HashCount) + + current, _ := bloomfilter.New(m, k) + previous, _ := bloomfilter.New(m, k) + + return &BloomSet{ + current: current, + previous: previous, + m: m, + k: k, + } +} + +// bloomHash converts common.Hash to uint64 for the bloom filter. +// Uses first 8 bytes - sufficient since keccak256 hashes are already +// cryptographically distributed (same approach as geth). +func bloomHash(hash common.Hash) uint64 { + return binary.BigEndian.Uint64(hash[:8]) +} + +// Add adds a hash to the set. +func (b *BloomSet) Add(hash common.Hash) { + b.mu.Lock() + defer b.mu.Unlock() + + b.current.AddHash(bloomHash(hash)) +} + +// AddMany adds multiple hashes to the set efficiently. +func (b *BloomSet) AddMany(hashes []common.Hash) { + b.mu.Lock() + defer b.mu.Unlock() + + for _, hash := range hashes { + b.current.AddHash(bloomHash(hash)) + } +} + +// Contains checks if a hash might be in the set. +// Returns true if the hash is probably in the set (may have false positives). +// Returns false if the hash is definitely not in the set. +func (b *BloomSet) Contains(hash common.Hash) bool { + b.mu.RLock() + defer b.mu.RUnlock() + + h := bloomHash(hash) + return b.current.ContainsHash(h) || b.previous.ContainsHash(h) +} + +// FilterNotContained returns hashes that are definitely not in the set. +// Hashes that might be in the set (including false positives) are excluded. +func (b *BloomSet) FilterNotContained(hashes []common.Hash) []common.Hash { + b.mu.RLock() + defer b.mu.RUnlock() + + result := make([]common.Hash, 0, len(hashes)) + for _, hash := range hashes { + h := bloomHash(hash) + if !b.current.ContainsHash(h) && !b.previous.ContainsHash(h) { + result = append(result, hash) + } + } + return result +} + +// Rotate moves the current filter to previous and creates a fresh current. +// Call this periodically to maintain approximate recency (e.g., every N minutes). +// After rotation, lookups still check the previous filter, so recently-added +// items remain "known" for one more rotation period. +func (b *BloomSet) Rotate() { + b.mu.Lock() + defer b.mu.Unlock() + + b.previous = b.current + b.current, _ = bloomfilter.New(b.m, b.k) +} + +// Count returns the approximate number of elements added since last rotation. +// This uses the bloom filter's internal count of added elements. +func (b *BloomSet) Count() uint { + b.mu.RLock() + defer b.mu.RUnlock() + return uint(b.current.N()) +} + +// Reset clears both filters. +func (b *BloomSet) Reset() { + b.mu.Lock() + defer b.mu.Unlock() + + b.current, _ = bloomfilter.New(b.m, b.k) + b.previous, _ = bloomfilter.New(b.m, b.k) +} + +// MemoryUsage returns the approximate memory usage in bytes. +func (b *BloomSet) MemoryUsage() uint { + b.mu.RLock() + defer b.mu.RUnlock() + + // Two filters, each with m bits = m/8 bytes + // Round up to account for uint64 alignment + bytesPerFilter := (b.m + 63) / 64 * 8 + return uint(bytesPerFilter * 2) +} diff --git a/p2p/datastructures/bloomset_test.go b/p2p/datastructures/bloomset_test.go new file mode 100644 index 000000000..246b29add --- /dev/null +++ b/p2p/datastructures/bloomset_test.go @@ -0,0 +1,203 @@ +package datastructures + +import ( + "encoding/binary" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" +) + +func TestBloomSetAddAndContains(t *testing.T) { + b := NewBloomSet(DefaultBloomSetOptions()) + + hash1 := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") + hash2 := common.HexToHash("0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890") + + // Initially should not contain anything + if b.Contains(hash1) { + t.Error("expected hash1 not to be contained initially") + } + + // Add hash1 + b.Add(hash1) + + // Should contain hash1 + if !b.Contains(hash1) { + t.Error("expected hash1 to be contained after add") + } + + // Should not contain hash2 + if b.Contains(hash2) { + t.Error("expected hash2 not to be contained") + } +} + +func TestBloomSetAddMany(t *testing.T) { + b := NewBloomSet(DefaultBloomSetOptions()) + + hashes := make([]common.Hash, 100) + for i := range hashes { + hashes[i] = common.BytesToHash([]byte{byte(i), byte(i + 1), byte(i + 2)}) + } + + b.AddMany(hashes) + + // All added hashes should be contained + for i, hash := range hashes { + if !b.Contains(hash) { + t.Errorf("expected hash %d to be contained", i) + } + } +} + +func TestBloomSetFilterNotContained(t *testing.T) { + b := NewBloomSet(DefaultBloomSetOptions()) + + // Add some hashes + known := []common.Hash{ + common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111"), + common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222"), + } + b.AddMany(known) + + // Create a mixed list + unknown := []common.Hash{ + common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333"), + common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444"), + } + mixed := append(known, unknown...) + + // Filter should return only unknown hashes + result := b.FilterNotContained(mixed) + + if len(result) != 2 { + t.Errorf("expected 2 unknown hashes, got %d", len(result)) + } + + for _, h := range result { + if h == known[0] || h == known[1] { + t.Errorf("known hash %s should not be in result", h.Hex()) + } + } +} + +func TestBloomSetRotate(t *testing.T) { + b := NewBloomSet(DefaultBloomSetOptions()) + + hash1 := common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + hash2 := common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") + + // Add hash1 to current + b.Add(hash1) + + // Rotate - hash1 moves to previous + b.Rotate() + + // hash1 should still be found (in previous) + if !b.Contains(hash1) { + t.Error("expected hash1 to be found after first rotation") + } + + // Add hash2 to current + b.Add(hash2) + + // Rotate again - hash1's filter is now cleared, hash2 moves to previous + b.Rotate() + + // hash1 should no longer be found + if b.Contains(hash1) { + t.Error("expected hash1 not to be found after second rotation") + } + + // hash2 should still be found + if !b.Contains(hash2) { + t.Error("expected hash2 to be found after second rotation") + } +} + +func TestBloomSetMemoryUsage(t *testing.T) { + opts := DefaultBloomSetOptions() + b := NewBloomSet(opts) + + usage := b.MemoryUsage() + + // With default 327680 bits = 5120 words per filter * 8 bytes * 2 filters = 81920 bytes + expectedBytes := uint((327680+63)/64) * 8 * 2 + if usage != expectedBytes { + t.Errorf("expected memory usage %d bytes, got %d", expectedBytes, usage) + } +} + +// generateTestHash creates a deterministic hash from a seed using keccak256. +func generateTestHash(seed uint64) common.Hash { + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], seed) + return crypto.Keccak256Hash(buf[:]) +} + +func TestBloomSetFalsePositiveRate(t *testing.T) { + // Test that false positive rate is approximately as expected + b := NewBloomSet(DefaultBloomSetOptions()) + + // Add 32768 unique hashes (the design capacity) + // Use keccak256 to generate properly distributed hashes + for i := range uint64(32768) { + b.Add(generateTestHash(i)) + } + + // Test 10000 hashes that were NOT added (different seed range) + falsePositives := 0 + for i := uint64(100000); i < 110000; i++ { + if b.Contains(generateTestHash(i)) { + falsePositives++ + } + } + + // Expected ~1% false positive rate, allow up to 3% for statistical variance + rate := float64(falsePositives) / 10000.0 + if rate > 0.03 { + t.Errorf("false positive rate too high: %.2f%% (expected ~1%%)", rate*100) + } + t.Logf("False positive rate: %.2f%%", rate*100) +} + +func BenchmarkBloomSetAdd(b *testing.B) { + bloom := NewBloomSet(DefaultBloomSetOptions()) + hash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") + + for b.Loop() { + bloom.Add(hash) + } +} + +func BenchmarkBloomSetContains(b *testing.B) { + bloom := NewBloomSet(DefaultBloomSetOptions()) + hash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") + bloom.Add(hash) + + for b.Loop() { + bloom.Contains(hash) + } +} + +func BenchmarkBloomSetFilterNotContained(b *testing.B) { + bloom := NewBloomSet(DefaultBloomSetOptions()) + + // Add 1000 hashes + for i := range 1000 { + hash := common.BytesToHash([]byte{byte(i >> 8), byte(i)}) + bloom.Add(hash) + } + + // Create a batch of 100 hashes (mix of known and unknown) + batch := make([]common.Hash, 100) + for i := range batch { + batch[i] = common.BytesToHash([]byte{byte(i >> 8), byte(i)}) + } + + for b.Loop() { + bloom.FilterNotContained(batch) + } +} + diff --git a/p2p/datastructures/boundedset.go b/p2p/datastructures/boundedset.go new file mode 100644 index 000000000..e63128010 --- /dev/null +++ b/p2p/datastructures/boundedset.go @@ -0,0 +1,43 @@ +package datastructures + +import mapset "github.com/deckarep/golang-set/v2" + +// BoundedSet is a simple set-based collection with a maximum size. +// When the set reaches capacity, the oldest element is evicted via Pop(). +// This provides lower memory overhead compared to a full LRU cache when +// only membership tracking is needed without value storage. +type BoundedSet[T comparable] struct { + set mapset.Set[T] + max int +} + +// NewBoundedSet creates a new BoundedSet with the specified maximum size. +func NewBoundedSet[T comparable](max int) *BoundedSet[T] { + return &BoundedSet[T]{ + max: max, + set: mapset.NewSet[T](), + } +} + +// Add adds an element to the set, evicting the oldest element if at capacity. +func (b *BoundedSet[T]) Add(elem T) { + for b.set.Cardinality() >= b.max { + b.set.Pop() + } + b.set.Add(elem) +} + +// Contains returns true if the element exists in the set. +func (b *BoundedSet[T]) Contains(elem T) bool { + return b.set.Contains(elem) +} + +// Len returns the number of elements in the set. +func (b *BoundedSet[T]) Len() int { + return b.set.Cardinality() +} + +// Clear removes all elements from the set. +func (b *BoundedSet[T]) Clear() { + b.set.Clear() +} diff --git a/p2p/locked.go b/p2p/datastructures/locked.go similarity index 97% rename from p2p/locked.go rename to p2p/datastructures/locked.go index d2b8593e6..1489bd062 100644 --- a/p2p/locked.go +++ b/p2p/datastructures/locked.go @@ -1,4 +1,4 @@ -package p2p +package datastructures import "sync" diff --git a/p2p/datastructures/lru.go b/p2p/datastructures/lru.go new file mode 100644 index 000000000..f39edd1ca --- /dev/null +++ b/p2p/datastructures/lru.go @@ -0,0 +1,307 @@ +package datastructures + +import ( + "container/list" + "sync" + "time" +) + +// LRUOptions contains configuration for LRU caches with TTL. +type LRUOptions struct { + MaxSize int + TTL time.Duration +} + +// LRU is a thread-safe LRU cache with optional TTL-based expiration. +type LRU[K comparable, V any] struct { + mu sync.RWMutex + maxSize int + ttl time.Duration + items map[K]*list.Element + list *list.List +} + +type entry[K comparable, V any] struct { + key K + value V + expiresAt *time.Time +} + +// NewLRU creates a new LRU cache with the given options. +// If opts.MaxSize <= 0, the cache has no size limit. +// If opts.TTL is 0, entries never expire based on time. +func NewLRU[K comparable, V any](opts LRUOptions) *LRU[K, V] { + return &LRU[K, V]{ + maxSize: opts.MaxSize, + ttl: opts.TTL, + items: make(map[K]*list.Element), + list: list.New(), + } +} + +// Add adds or updates a value in the cache. +func (c *LRU[K, V]) Add(key K, value V) { + c.mu.Lock() + defer c.mu.Unlock() + + var expiresAt *time.Time + if c.ttl > 0 { + t := time.Now().Add(c.ttl) + expiresAt = &t + } + + if elem, ok := c.items[key]; ok { + c.list.MoveToFront(elem) + e := elem.Value.(*entry[K, V]) + e.value = value + e.expiresAt = expiresAt + return + } + + e := &entry[K, V]{ + key: key, + value: value, + expiresAt: expiresAt, + } + elem := c.list.PushFront(e) + c.items[key] = elem + + if c.maxSize > 0 && c.list.Len() > c.maxSize { + back := c.list.Back() + if back != nil { + c.list.Remove(back) + e := back.Value.(*entry[K, V]) + delete(c.items, e.key) + } + } +} + +// Get retrieves a value from the cache and updates LRU ordering. +func (c *LRU[K, V]) Get(key K) (V, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + elem, ok := c.items[key] + if !ok { + var zero V + return zero, false + } + + e := elem.Value.(*entry[K, V]) + + if e.expiresAt != nil && time.Now().After(*e.expiresAt) { + c.list.Remove(elem) + delete(c.items, key) + var zero V + return zero, false + } + + c.list.MoveToFront(elem) + return e.value, true +} + +// Peek retrieves a value from the cache without updating LRU ordering. +// Uses a read lock for better concurrency. +func (c *LRU[K, V]) Peek(key K) (V, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + elem, ok := c.items[key] + if !ok { + var zero V + return zero, false + } + + e := elem.Value.(*entry[K, V]) + + if e.expiresAt != nil && time.Now().After(*e.expiresAt) { + var zero V + return zero, false + } + + return e.value, true +} + +// PeekMany retrieves multiple values from the cache without updating LRU ordering. +// Uses a single read lock for all lookups, providing better concurrency than GetMany +// when LRU ordering updates are not needed. Returns only found values (indices don't +// correspond to input keys). Use PeekManyWithKeys if you need key-value pairs. +func (c *LRU[K, V]) PeekMany(keys []K) []V { + if len(keys) == 0 { + return nil + } + + c.mu.RLock() + defer c.mu.RUnlock() + + now := time.Now() + result := make([]V, 0, len(keys)) + + for _, key := range keys { + elem, ok := c.items[key] + if !ok { + continue + } + + e := elem.Value.(*entry[K, V]) + + if e.expiresAt != nil && now.After(*e.expiresAt) { + continue + } + + result = append(result, e.value) + } + + return result +} + +// PeekManyWithKeys retrieves multiple key-value pairs from the cache without updating +// LRU ordering. Returns parallel slices of found keys and values. Uses a single read +// lock for all lookups. +func (c *LRU[K, V]) PeekManyWithKeys(keys []K) ([]K, []V) { + if len(keys) == 0 { + return nil, nil + } + + c.mu.RLock() + defer c.mu.RUnlock() + + now := time.Now() + foundKeys := make([]K, 0, len(keys)) + foundValues := make([]V, 0, len(keys)) + + for _, key := range keys { + elem, ok := c.items[key] + if !ok { + continue + } + + e := elem.Value.(*entry[K, V]) + + if e.expiresAt != nil && now.After(*e.expiresAt) { + continue + } + + foundKeys = append(foundKeys, key) + foundValues = append(foundValues, e.value) + } + + return foundKeys, foundValues +} + +// Update atomically updates a value in the cache using the provided update function. +// The update function receives the current value (or zero value if not found) and +// returns the new value to store. This is thread-safe and prevents race conditions +// in get-modify-add patterns. +func (c *LRU[K, V]) Update(key K, updateFn func(V) V) { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + var expiresAt *time.Time + if c.ttl > 0 { + t := now.Add(c.ttl) + expiresAt = &t + } + + var currentVal V + if elem, ok := c.items[key]; ok { + e := elem.Value.(*entry[K, V]) + if e.expiresAt == nil || !now.After(*e.expiresAt) { + currentVal = e.value + // Update existing entry + c.list.MoveToFront(elem) + e.value = updateFn(currentVal) + e.expiresAt = expiresAt + return + } + // Entry expired, remove it + c.list.Remove(elem) + delete(c.items, key) + } + + // Create new entry + newVal := updateFn(currentVal) + e := &entry[K, V]{ + key: key, + value: newVal, + expiresAt: expiresAt, + } + elem := c.list.PushFront(e) + c.items[key] = elem + + // Enforce size limit + if c.maxSize > 0 && c.list.Len() > c.maxSize { + back := c.list.Back() + if back != nil { + c.list.Remove(back) + e := back.Value.(*entry[K, V]) + delete(c.items, e.key) + } + } +} + +// Remove removes a key from the cache and returns the value if it existed. +func (c *LRU[K, V]) Remove(key K) (V, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + if elem, ok := c.items[key]; ok { + e := elem.Value.(*entry[K, V]) + c.list.Remove(elem) + delete(c.items, key) + return e.value, true + } + + var zero V + return zero, false +} + +// AddBatch adds multiple key-value pairs to the cache. +// Uses a single write lock for all additions, reducing lock contention +// compared to calling Add in a loop. Keys and values must have the same length. +func (c *LRU[K, V]) AddBatch(keys []K, values []V) { + if len(keys) == 0 || len(keys) != len(values) { + return + } + + c.mu.Lock() + defer c.mu.Unlock() + + var expiresAt *time.Time + if c.ttl > 0 { + t := time.Now().Add(c.ttl) + expiresAt = &t + } + + for i, key := range keys { + value := values[i] + + if elem, ok := c.items[key]; ok { + c.list.MoveToFront(elem) + e := elem.Value.(*entry[K, V]) + e.value = value + e.expiresAt = expiresAt + continue + } + + e := &entry[K, V]{ + key: key, + value: value, + expiresAt: expiresAt, + } + elem := c.list.PushFront(e) + c.items[key] = elem + } + + // Enforce size limit after all additions + for c.maxSize > 0 && c.list.Len() > c.maxSize { + back := c.list.Back() + if back == nil { + break + } + c.list.Remove(back) + e := back.Value.(*entry[K, V]) + delete(c.items, e.key) + } +} diff --git a/p2p/protocol.go b/p2p/protocol.go index c6a31e6bd..7721d0373 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/protocols/eth" ethp2p "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" @@ -19,9 +20,32 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" + ds "github.com/0xPolygon/polygon-cli/p2p/datastructures" "github.com/0xPolygon/polygon-cli/p2p/database" ) +const ( + // maxTxPacketSize is the target size for transaction announcement packets. + // Matches Bor's limit of 100KB. + maxTxPacketSize = 100 * 1024 + + // maxQueuedTxAnns is the maximum number of transaction announcements to + // queue before dropping oldest. Matches Bor. + maxQueuedTxAnns = 4096 + + // maxQueuedBlockAnns is the maximum number of block announcements to queue + // before dropping. Matches Bor. + maxQueuedBlockAnns = 4 +) + +// protocolLengths maps protocol versions to their message counts. +var protocolLengths = map[uint]uint64{ + 66: 17, + 67: 17, + 68: 17, + 69: 18, +} + // conn represents an individual connection with a peer. type conn struct { sensorID string @@ -34,12 +58,12 @@ type conn struct { // requests is used to store the request ID and the block hash. This is used // when fetching block bodies because the eth protocol block bodies do not // contain information about the block hash. - requests *Cache[uint64, common.Hash] + requests *ds.LRU[uint64, common.Hash] requestNum uint64 // parents tracks hashes of blocks requested as parents to mark them // with IsParent=true when writing to the database. - parents *Cache[common.Hash, struct{}] + parents *ds.LRU[common.Hash, struct{}] // conns provides access to the global connection manager, which includes // the blocks cache shared across all peers. @@ -51,8 +75,30 @@ type conn struct { // peerURL is cached to avoid repeated URLv4() calls. peerURL string + // Broadcast flags control what gets rebroadcasted to other peers + shouldBroadcastTx bool + shouldBroadcastTxHashes bool + shouldBroadcastBlocks bool + shouldBroadcastBlockHashes bool + + // Known caches track what this peer has seen to avoid redundant sends. + // knownTxs uses a bloom filter for memory efficiency (~40KB vs ~4MB per peer). + // knownBlocks uses a simple bounded set for lower memory overhead than the generic LRU. + knownTxs *ds.BloomSet + knownBlocks *ds.BoundedSet[common.Hash] + // messages tracks per-peer message counts for API visibility. messages *PeerMessages + + // Broadcast queues for per-peer rate limiting. These decouple message + // reception from broadcasting to prevent flooding peers with immediate + // broadcasts. + txAnnounce chan []common.Hash + blockAnnounce chan eth.NewBlockHashesPacket + closeCh chan struct{} + + // version stores the negotiated eth protocol version (e.g., 68 or 69). + version uint } // EthProtocolOptions is the options used when creating a new eth protocol. @@ -67,8 +113,14 @@ type EthProtocolOptions struct { ForkID forkid.ID // Cache configurations - RequestsCache CacheOptions - ParentsCache CacheOptions + RequestsCache ds.LRUOptions + ParentsCache ds.LRUOptions + + // Broadcast flags control what gets rebroadcasted to other peers + ShouldBroadcastTx bool + ShouldBroadcastTxHashes bool + ShouldBroadcastBlocks bool + ShouldBroadcastBlockHashes bool } // NewEthProtocol creates the new eth protocol. This will handle writing the @@ -77,32 +129,58 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol { return ethp2p.Protocol{ Name: "eth", Version: version, - Length: 17, + Length: protocolLengths[version], Run: func(p *ethp2p.Peer, rw ethp2p.MsgReadWriter) error { peerURL := p.Node().URLv4() c := &conn{ - sensorID: opts.SensorID, - node: p.Node(), - logger: log.With().Str("peer", peerURL).Logger(), - rw: rw, - db: opts.Database, - requests: NewCache[uint64, common.Hash](opts.RequestsCache), - requestNum: 0, - parents: NewCache[common.Hash, struct{}](opts.ParentsCache), - peer: p, - conns: opts.Conns, - connectedAt: time.Now(), - peerURL: peerURL, - messages: NewPeerMessages(), + sensorID: opts.SensorID, + node: p.Node(), + logger: log.With().Str("peer", peerURL).Logger(), + rw: rw, + db: opts.Database, + requests: ds.NewLRU[uint64, common.Hash](opts.RequestsCache), + requestNum: 0, + parents: ds.NewLRU[common.Hash, struct{}](opts.ParentsCache), + peer: p, + conns: opts.Conns, + connectedAt: time.Now(), + peerURL: peerURL, + shouldBroadcastTx: opts.ShouldBroadcastTx, + shouldBroadcastTxHashes: opts.ShouldBroadcastTxHashes, + shouldBroadcastBlocks: opts.ShouldBroadcastBlocks, + shouldBroadcastBlockHashes: opts.ShouldBroadcastBlockHashes, + knownTxs: ds.NewBloomSet(opts.Conns.KnownTxsOpts()), + knownBlocks: ds.NewBoundedSet[common.Hash](opts.Conns.KnownBlocksMax()), + messages: NewPeerMessages(), + txAnnounce: make(chan []common.Hash), + blockAnnounce: make(chan eth.NewBlockHashesPacket, maxQueuedBlockAnns), + closeCh: make(chan struct{}), + version: version, + } + + // Ensure cleanup happens on any exit path (including statusExchange failure) + defer func() { + close(c.closeCh) + opts.Conns.Remove(c) + }() + + // Start broadcast loops for per-peer queued broadcasting + if opts.ShouldBroadcastTxHashes { + go c.txAnnouncementLoop() + } + if opts.ShouldBroadcastBlockHashes { + go c.blockAnnouncementLoop() } if err := c.statusExchange(version, opts); err != nil { return err } + // Update logger with peer name now that status exchange is complete + c.logger = log.With().Str("peer", peerURL).Str("peer_name", c.peer.Fullname()).Logger() + // Send the connection object to the conns manager for RPC broadcasting opts.Conns.Add(c) - defer opts.Conns.Remove(c) ctx := opts.Context @@ -164,9 +242,10 @@ func (c *conn) statusExchange(version uint, opts EthProtocolOptions) error { head := c.conns.HeadBlock() if version >= eth.ETH69 { - status := eth.StatusPacket69{ + status := BorStatusPacket69{ ProtocolVersion: uint32(version), NetworkID: opts.NetworkID, + TD: head.TD, Genesis: opts.GenesisHash, ForkID: opts.ForkID, EarliestBlock: head.Block.NumberU64(), @@ -220,11 +299,11 @@ func (c *conn) statusExchange68(packet *eth.StatusPacket68) error { } // statusExchange69 will exchange status message for ETH69. -func (c *conn) statusExchange69(packet *eth.StatusPacket69) error { +func (c *conn) statusExchange69(packet *BorStatusPacket69) error { errc := make(chan error, 2) go func() { - c.countMsgSent((ð.StatusPacket69{}).Name(), 1) + c.countMsgSent(packet.Name(), 1) errc <- ethp2p.Send(c.rw, eth.StatusMsg, packet) }() @@ -302,7 +381,7 @@ func (c *conn) readStatus68(packet *eth.StatusPacket68) error { return nil } -func (c *conn) readStatus69(packet *eth.StatusPacket69) error { +func (c *conn) readStatus69(packet *BorStatusPacket69) error { msg, err := c.rw.ReadMsg() if err != nil { return err @@ -312,7 +391,7 @@ func (c *conn) readStatus69(packet *eth.StatusPacket69) error { return errors.New("expected status message code") } - var status eth.StatusPacket69 + var status BorStatusPacket69 if err := msg.Decode(&status); err != nil { return err } @@ -443,12 +522,16 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { c.countMsgReceived(packet.Name(), float64(len(packet))) - // Collect unique hashes for database write. + // Collect unique hashes and numbers for database write and broadcasting. uniqueHashes := make([]common.Hash, 0, len(packet)) + uniqueNumbers := make([]uint64, 0, len(packet)) for _, entry := range packet { hash := entry.Hash + // Mark as known from this peer + c.addKnownBlock(hash) + // Check what parts of the block we already have cache, ok := c.conns.Blocks().Get(hash) if ok { @@ -465,16 +548,258 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { c.conns.Blocks().Add(hash, BlockCache{}) uniqueHashes = append(uniqueHashes, hash) + uniqueNumbers = append(uniqueNumbers, entry.Number) } // Write only unique hashes to the database. - if len(uniqueHashes) > 0 { - c.db.WriteBlockHashes(ctx, c.node, uniqueHashes, tfs) + if len(uniqueHashes) == 0 { + return nil + } + + c.db.WriteBlockHashes(ctx, c.node, uniqueHashes, tfs) + + // Broadcast block hashes to other peers asynchronously + go c.conns.BroadcastBlockHashes(uniqueHashes, uniqueNumbers) + + return nil +} + +// addKnownTx adds a transaction hash to the known tx cache. +func (c *conn) addKnownTx(hash common.Hash) { + if !c.shouldBroadcastTx && !c.shouldBroadcastTxHashes { + return + } + + c.knownTxs.Add(hash) +} + +// addKnownBlock adds a block hash to the known block cache. +func (c *conn) addKnownBlock(hash common.Hash) { + if !c.shouldBroadcastBlocks && !c.shouldBroadcastBlockHashes { + return + } + + c.knownBlocks.Add(hash) +} + +// hasKnownTx checks if a transaction hash is in the known tx cache. +func (c *conn) hasKnownTx(hash common.Hash) bool { + if !c.shouldBroadcastTx && !c.shouldBroadcastTxHashes { + return false + } + + return c.knownTxs.Contains(hash) +} + +// hasKnownBlock checks if a block hash is in the known block cache. +func (c *conn) hasKnownBlock(hash common.Hash) bool { + if !c.shouldBroadcastBlocks && !c.shouldBroadcastBlockHashes { + return false + } + + return c.knownBlocks.Contains(hash) +} + +// txAnnouncementLoop schedules transaction hash announcements to the peer. +// Matches Bor's announceTransactions pattern: async sends with internal queue. +func (c *conn) txAnnouncementLoop() { + var ( + queue []common.Hash // Queue of hashes to announce + done chan struct{} // Non-nil if background announcer is running + fail = make(chan error, 1) // Channel used to receive network error + failed bool // Flag whether a send failed + ) + + for { + // If there's no in-flight announce running, check if a new one is needed + if done == nil && len(queue) > 0 { + var pending []common.Hash + pending, queue = c.prepareTxAnnouncements(queue) + + // If there's anything available to transfer, fire up an async writer + if len(pending) > 0 { + done = make(chan struct{}) + go func() { + if err := c.sendTxAnnouncements(pending); err != nil { + fail <- err + return + } + close(done) + }() + } + } + + // Transfer goroutine may or may not have been started, listen for events + select { + case hashes := <-c.txAnnounce: + if !failed { + queue = c.enqueueTxHashes(queue, hashes) + } + + case <-done: + done = nil + + case <-fail: + failed = true + + case <-c.closeCh: + return + } + } +} + +// prepareTxAnnouncements extracts a batch of unknown tx hashes from the queue +// up to maxTxPacketSize bytes. Returns the pending hashes and remaining queue. +func (c *conn) prepareTxAnnouncements(queue []common.Hash) (pending, remaining []common.Hash) { + // Calculate max hashes we can send based on packet size limit + maxHashes := min(maxTxPacketSize/common.HashLength, len(queue)) + + // Filter out known hashes in a single lock operation + pending = c.knownTxs.FilterNotContained(queue[:maxHashes]) + remaining = queue[:copy(queue, queue[maxHashes:])] + return pending, remaining +} + +// enqueueTxHashes adds hashes to the queue, dropping oldest if over capacity. +func (c *conn) enqueueTxHashes(queue, hashes []common.Hash) []common.Hash { + queue = append(queue, hashes...) + if len(queue) > maxQueuedTxAnns { + queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])] + } + return queue +} + +// sendTxAnnouncements sends a batch of transaction hashes to the peer. +// It looks up each transaction from the cache to populate Types and Sizes +// as required by the ETH68 protocol. +func (c *conn) sendTxAnnouncements(hashes []common.Hash) error { + // Batch lookup all transactions in a single lock operation. + // Skip hashes where the transaction is no longer in cache. + pending, txs := c.conns.PeekTxsWithHashes(hashes) + if len(pending) == 0 { + return nil + } + + // Build Types and Sizes from the found transactions. + pendingTypes := make([]byte, len(txs)) + pendingSizes := make([]uint32, len(txs)) + for i, tx := range txs { + pendingTypes[i] = tx.Type() + pendingSizes[i] = uint32(tx.Size()) + } + + packet := eth.NewPooledTransactionHashesPacket{ + Types: pendingTypes, + Sizes: pendingSizes, + Hashes: pending, + } + c.countMsgSent(packet.Name(), float64(len(pending))) + if err := ethp2p.Send(c.rw, eth.NewPooledTransactionHashesMsg, packet); err != nil { + c.logger.Debug().Err(err).Msg("Failed to send tx announcements") + return err + } + + // Mark all hashes as known in a single lock operation + c.knownTxs.AddMany(pending) + return nil +} + +// blockAnnouncementLoop drains the blockAnnounce queue and sends block +// announcements. Matches Bor's broadcastBlocks pattern. +func (c *conn) blockAnnouncementLoop() { + for { + select { + case packet := <-c.blockAnnounce: + if c.sendBlockAnnouncements(packet) != nil { + return + } + case <-c.closeCh: + return + } + } +} + +// sendBlockAnnouncements sends a batch of block hashes to the peer, +// filtering out blocks the peer already knows about. +func (c *conn) sendBlockAnnouncements(packet eth.NewBlockHashesPacket) error { + // Filter to only unknown blocks + var filtered eth.NewBlockHashesPacket + for _, entry := range packet { + if !c.hasKnownBlock(entry.Hash) { + filtered = append(filtered, entry) + } + } + + if len(filtered) == 0 { + return nil + } + + c.countMsgSent(filtered.Name(), float64(len(filtered))) + if err := ethp2p.Send(c.rw, eth.NewBlockHashesMsg, filtered); err != nil { + c.logger.Debug().Err(err).Msg("Failed to send block announcements") + return err + } + for _, entry := range filtered { + c.addKnownBlock(entry.Hash) + } + return nil +} + +// decodeTx attempts to decode a transaction from an RLP-encoded raw value. +func (c *conn) decodeTx(raw []byte) *types.Transaction { + if len(raw) == 0 { + return nil + } + + // Try decoding as RLP-wrapped bytes first (legacy format) + var bytes []byte + if rlp.DecodeBytes(raw, &bytes) == nil { + tx := new(types.Transaction) + err := tx.UnmarshalBinary(bytes) + if err == nil { + return tx + } + + c.logger.Warn(). + Err(err). + Uint8("type", bytes[0]). + Int("size", len(bytes)). + Str("hash", crypto.Keccak256Hash(bytes).Hex()). + Msg("Failed to decode transaction") + + return nil + } + + // Try decoding as raw binary (typed transaction format) + tx := new(types.Transaction) + err := tx.UnmarshalBinary(raw) + if err == nil { + return tx } + c.logger.Warn(). + Err(err). + Uint8("prefix", raw[0]). + Int("size", len(raw)). + Str("hash", crypto.Keccak256Hash(raw).Hex()). + Msg("Failed to decode transaction") + return nil } +// decodeTxs decodes a list of transactions, returning only successfully decoded ones. +func (c *conn) decodeTxs(rawTxs []rlp.RawValue) []*types.Transaction { + var txs []*types.Transaction + + for _, raw := range rawTxs { + if tx := c.decodeTx(raw); tx != nil { + txs = append(txs, tx) + } + } + + return txs +} + func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { payload, err := io.ReadAll(msg.Payload) if err != nil { @@ -487,15 +812,27 @@ func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { return nil } - txs := decodeTxs(rawTxs) + txs := c.decodeTxs(rawTxs) tfs := time.Now() c.countMsgReceived((ð.TransactionsPacket{}).Name(), float64(len(txs))) + // Mark transactions as known from this peer + for _, tx := range txs { + c.addKnownTx(tx.Hash()) + } + if len(txs) > 0 { c.db.WriteTransactions(ctx, c.node, txs, tfs) } + // Cache transactions for duplicate detection and serving to peers (single lock) + hashes := c.conns.AddTxs(txs) + + // Broadcast transactions or hashes to other peers asynchronously + go c.conns.BroadcastTxs(types.Transactions(txs)) + go c.conns.BroadcastTxHashes(hashes) + return nil } @@ -507,8 +844,17 @@ func (c *conn) handleGetBlockHeaders(msg ethp2p.Msg) error { c.countMsgReceived(request.Name(), 1) - response := ð.BlockHeadersPacket{RequestId: request.RequestId} - c.countMsgSent(response.Name(), 0) + // Try to serve from cache if we have the block + var headers []*types.Header + if cache, ok := c.conns.Blocks().Peek(request.Origin.Hash); ok && cache.Header != nil { + headers = []*types.Header{cache.Header} + } + + response := ð.BlockHeadersPacket{ + RequestId: request.RequestId, + BlockHeadersRequest: headers, + } + c.countMsgSent(response.Name(), float64(len(headers))) return ethp2p.Send(c.rw, eth.BlockHeadersMsg, response) } @@ -558,8 +904,19 @@ func (c *conn) handleGetBlockBodies(msg ethp2p.Msg) error { c.countMsgReceived(request.Name(), float64(len(request.GetBlockBodiesRequest))) - response := ð.BlockBodiesPacket{RequestId: request.RequestId} - c.countMsgSent(response.Name(), 0) + // Try to serve from cache + var bodies []*eth.BlockBody + for _, hash := range request.GetBlockBodiesRequest { + if cache, ok := c.conns.Blocks().Peek(hash); ok && cache.Body != nil { + bodies = append(bodies, cache.Body) + } + } + + response := ð.BlockBodiesPacket{ + RequestId: request.RequestId, + BlockBodiesResponse: bodies, + } + c.countMsgSent(response.Name(), float64(len(bodies))) return ethp2p.Send(c.rw, eth.BlockBodiesMsg, response) } @@ -596,7 +953,7 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error { } body := ð.BlockBody{ - Transactions: decodeTxs(decoded.Transactions), + Transactions: c.decodeTxs(decoded.Transactions), Uncles: decoded.Uncles, Withdrawals: decoded.Withdrawals, } @@ -625,7 +982,7 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { } block := types.NewBlockWithHeader(raw.Block.Header).WithBody(types.Body{ - Transactions: decodeTxs(raw.Block.Txs), + Transactions: c.decodeTxs(raw.Block.Txs), Uncles: raw.Block.Uncles, Withdrawals: raw.Block.Withdrawals, }) @@ -636,6 +993,9 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { c.countMsgReceived(packet.Name(), 1) + // Mark block as known from this peer + c.addKnownBlock(hash) + // Set the head block if newer. if c.conns.UpdateHeadBlock(*packet) { c.logger.Info(). @@ -667,6 +1027,13 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { TD: packet.TD, }) + // Broadcast block or block hash to other peers asynchronously + go c.conns.BroadcastBlock(packet.Block, packet.TD) + go c.conns.BroadcastBlockHashes( + []common.Hash{hash}, + []uint64{packet.Block.Number().Uint64()}, + ) + return nil } @@ -678,8 +1045,14 @@ func (c *conn) handleGetPooledTransactions(msg ethp2p.Msg) error { c.countMsgReceived(request.Name(), float64(len(request.GetPooledTransactionsRequest))) - response := ð.PooledTransactionsPacket{RequestId: request.RequestId} - c.countMsgSent(response.Name(), 0) + // Try to serve from cache using batch lookup (single read lock operation) + txs := c.conns.PeekTxs(request.GetPooledTransactionsRequest) + + response := ð.PooledTransactionsPacket{ + RequestId: request.RequestId, + PooledTransactionsResponse: txs, + } + c.countMsgSent(response.Name(), float64(len(txs))) return ethp2p.Send(c.rw, eth.PooledTransactionsMsg, response) } @@ -723,17 +1096,29 @@ func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) err } packet := ð.PooledTransactionsPacket{ - PooledTransactionsResponse: decodeTxs(raw.Txs), + PooledTransactionsResponse: c.decodeTxs(raw.Txs), } tfs := time.Now() c.countMsgReceived(packet.Name(), float64(len(packet.PooledTransactionsResponse))) + // Mark transactions as known from this peer + for _, tx := range packet.PooledTransactionsResponse { + c.addKnownTx(tx.Hash()) + } + if len(packet.PooledTransactionsResponse) > 0 { c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse, tfs) } + // Cache transactions for duplicate detection and serving to peers (single lock) + hashes := c.conns.AddTxs(packet.PooledTransactionsResponse) + + // Broadcast transactions or hashes to other peers asynchronously + go c.conns.BroadcastTxs(types.Transactions(packet.PooledTransactionsResponse)) + go c.conns.BroadcastTxHashes(hashes) + return nil } diff --git a/p2p/types.go b/p2p/types.go index f89cec2e0..56b25f16e 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -8,9 +8,9 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/core/stateless" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/p2p" @@ -18,7 +18,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/rlpx" "github.com/ethereum/go-ethereum/rlp" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" ) type Message interface { @@ -86,6 +85,22 @@ type Status eth.StatusPacket68 func (msg Status) Code() int { return 16 } func (msg Status) ReqID() uint64 { return 0 } +// BorStatusPacket69 is the Bor-compatible status packet for ETH69. +// Bor's implementation includes the TD field which upstream go-ethereum removed. +type BorStatusPacket69 struct { + ProtocolVersion uint32 + NetworkID uint64 + TD *big.Int + Genesis common.Hash + ForkID forkid.ID + EarliestBlock uint64 + LatestBlock uint64 + LatestBlockHash common.Hash +} + +func (*BorStatusPacket69) Name() string { return "Status" } +func (*BorStatusPacket69) Kind() byte { return eth.StatusMsg } + // NewBlockHashes is the network packet for the block announcements. type NewBlockHashes eth.NewBlockHashesPacket @@ -434,51 +449,3 @@ type rawPooledTransactionsPacket struct { Txs []rlp.RawValue } -// decodeTx attempts to decode a transaction from an RLP-encoded raw value. -func decodeTx(raw []byte) *types.Transaction { - if len(raw) == 0 { - return nil - } - - var bytes []byte - if rlp.DecodeBytes(raw, &bytes) == nil { - tx := new(types.Transaction) - if tx.UnmarshalBinary(bytes) == nil { - return tx - } - - log.Warn(). - Uint8("type", bytes[0]). - Int("size", len(bytes)). - Str("hash", crypto.Keccak256Hash(bytes).Hex()). - Msg("Failed to decode transaction") - - return nil - } - - tx := new(types.Transaction) - if tx.UnmarshalBinary(raw) == nil { - return tx - } - - log.Warn(). - Uint8("prefix", raw[0]). - Int("size", len(raw)). - Str("hash", crypto.Keccak256Hash(raw).Hex()). - Msg("Failed to decode transaction") - - return nil -} - -// decodeTxs decodes a list of transactions, returning only successfully decoded ones. -func decodeTxs(rawTxs []rlp.RawValue) []*types.Transaction { - var txs []*types.Transaction - - for _, raw := range rawTxs { - if tx := decodeTx(raw); tx != nil { - txs = append(txs, tx) - } - } - - return txs -}