Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/p2p/sensor/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
// (number of p2p messages), along with connection timing information.
type peerData struct {
Name string `json:"name"`
ProtocolVersion uint `json:"protocol_version"`
Received p2p.MessageCount `json:"received"`
Sent p2p.MessageCount `json:"sent"`
PacketsReceived p2p.MessageCount `json:"packets_received"`
Expand Down Expand Up @@ -85,6 +86,7 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) {

peers[url] = peerData{
Name: conns.GetPeerName(peerID),
ProtocolVersion: conns.GetPeerVersion(peerID),
Received: messages.Received,
Sent: messages.Sent,
PacketsReceived: messages.PacketsReceived,
Expand Down
158 changes: 100 additions & 58 deletions cmd/p2p/sensor/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
_ "embed"
"errors"
"fmt"
"os"
"os/signal"
"runtime"
"syscall"
"time"

Expand All @@ -29,6 +29,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

ds "github.com/0xPolygon/polygon-cli/p2p/datastructures"
"github.com/0xPolygon/polygon-cli/flag"
"github.com/0xPolygon/polygon-cli/p2p"
"github.com/0xPolygon/polygon-cli/p2p/database"
Expand All @@ -52,6 +53,10 @@ type (
ShouldWriteTransactions bool
ShouldWriteTransactionEvents bool
ShouldWritePeers bool
ShouldBroadcastTx bool
ShouldBroadcastTxHashes bool
ShouldBroadcastBlocks bool
ShouldBroadcastBlockHashes bool
ShouldRunPprof bool
PprofPort uint
ShouldRunPrometheus bool
Expand All @@ -71,9 +76,12 @@ type (
DiscoveryDNS string
Database string
NoDiscovery bool
RequestsCache p2p.CacheOptions
ParentsCache p2p.CacheOptions
BlocksCache p2p.CacheOptions
RequestsCache ds.LRUOptions
ParentsCache ds.LRUOptions
BlocksCache ds.LRUOptions
TxsCache ds.LRUOptions
KnownTxsBloom ds.BloomSetOptions
KnownBlocksMax int

bootnodes []*enode.Node
staticNodes []*enode.Node
Expand Down Expand Up @@ -166,7 +174,10 @@ var SensorCmd = &cobra.Command{
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
db, err := newDatabase(cmd.Context())
ctx, stop := signal.NotifyContext(cmd.Context(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

db, err := newDatabase(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -195,21 +206,32 @@ var SensorCmd = &cobra.Command{
// Create peer connection manager for broadcasting transactions
// and managing the global blocks cache
conns := p2p.NewConns(p2p.ConnsOptions{
BlocksCache: inputSensorParams.BlocksCache,
Head: head,
BlocksCache: inputSensorParams.BlocksCache,
TxsCache: inputSensorParams.TxsCache,
KnownTxsBloom: inputSensorParams.KnownTxsBloom,
KnownBlocksMax: inputSensorParams.KnownBlocksMax,
Head: head,
ShouldBroadcastTx: inputSensorParams.ShouldBroadcastTx,
ShouldBroadcastTxHashes: inputSensorParams.ShouldBroadcastTxHashes,
ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks,
ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes,
})

opts := p2p.EthProtocolOptions{
Context: cmd.Context(),
Database: db,
GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
RPC: inputSensorParams.RPC,
SensorID: inputSensorParams.SensorID,
NetworkID: inputSensorParams.NetworkID,
Conns: conns,
ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
RequestsCache: inputSensorParams.RequestsCache,
ParentsCache: inputSensorParams.ParentsCache,
Context: ctx,
Database: db,
GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
RPC: inputSensorParams.RPC,
SensorID: inputSensorParams.SensorID,
NetworkID: inputSensorParams.NetworkID,
Conns: conns,
ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
RequestsCache: inputSensorParams.RequestsCache,
ParentsCache: inputSensorParams.ParentsCache,
ShouldBroadcastTx: inputSensorParams.ShouldBroadcastTx,
ShouldBroadcastTxHashes: inputSensorParams.ShouldBroadcastTxHashes,
ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks,
ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes,
}

config := ethp2p.Config{
Expand Down Expand Up @@ -242,20 +264,14 @@ var SensorCmd = &cobra.Command{
if err = server.Start(); err != nil {
return err
}
defer server.Stop()
defer stopServer(&server)

events := make(chan *ethp2p.PeerEvent)
sub := server.SubscribeEvents(events)
defer sub.Unsubscribe()

ticker := time.NewTicker(2 * time.Second) // Ticker for recurring tasks every 2 seconds.
ticker1h := time.NewTicker(time.Hour) // Ticker for running DNS discovery every hour.
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
defer ticker1h.Stop()

dnsLock := make(chan struct{}, 1)
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

if inputSensorParams.ShouldRunPprof {
go handlePprof()
Expand All @@ -266,34 +282,17 @@ var SensorCmd = &cobra.Command{
}

go handleAPI(&server, conns)

// Start the RPC server for receiving transactions
go handleRPC(conns, inputSensorParams.NetworkID)

// Run DNS discovery immediately at startup.
go handleDNSDiscovery(&server, dnsLock)
go handleDNSDiscovery(&server)

for {
select {
case <-ticker.C:
peersGauge.Set(float64(server.PeerCount()))
db.WritePeers(cmd.Context(), server.Peers(), time.Now())

db.WritePeers(ctx, server.Peers(), time.Now())
metrics.Update(conns.HeadBlock().Block, conns.OldestBlock())

urls := []string{}
for _, peer := range server.Peers() {
urls = append(urls, peer.Node().URLv4())
}

if err := p2p.WritePeers(inputSensorParams.NodesFile, urls); err != nil {
log.Error().Err(err).Msg("Failed to write nodes to file")
}
case <-ticker1h.C:
go handleDNSDiscovery(&server, dnsLock)
case <-signals:
// This gracefully stops the sensor so that the peers can be written to
// the nodes file.
writePeers(server.Peers())
case <-ctx.Done():
log.Info().Msg("Stopping sensor...")
return nil
case event := <-events:
Expand All @@ -305,11 +304,43 @@ var SensorCmd = &cobra.Command{
},
}

// writePeers writes the enode URLs of connected peers to the nodes file.
func writePeers(peers []*ethp2p.Peer) {
urls := make([]string, 0, len(peers))
for _, peer := range peers {
urls = append(urls, peer.Node().URLv4())
}

if err := p2p.WritePeers(inputSensorParams.NodesFile, urls); err != nil {
log.Error().Err(err).Msg("Failed to write nodes to file")
}
}

// stopServer stops the p2p server with a timeout to avoid hanging on shutdown.
// This is necessary because go-ethereum's discovery shutdown can deadlock.
func stopServer(server *ethp2p.Server) {
done := make(chan struct{})

go func() {
server.Stop()
close(done)
}()

select {
case <-done:
case <-time.After(5 * time.Second):
}
}

// handlePprof starts a server for performance profiling using pprof on the
// specified port. This allows for real-time monitoring and analysis of the
// sensor's performance. The port number is configured through
// inputSensorParams.PprofPort. An error is logged if the server fails to start.
func handlePprof() {
// Enable mutex and block profiling to detect lock contention.
runtime.SetMutexProfileFraction(1)
runtime.SetBlockProfileRate(1)

addr := fmt.Sprintf(":%d", inputSensorParams.PprofPort)
if err := http.ListenAndServe(addr, nil); err != nil {
log.Error().Err(err).Msg("Failed to start pprof")
Expand All @@ -331,20 +362,24 @@ func handlePrometheus() {

// handleDNSDiscovery performs DNS-based peer discovery and adds new peers to
// the p2p server. It uses an iterator to discover peers incrementally rather
// than loading all nodes at once. The lock channel prevents concurrent runs.
func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) {
// than loading all nodes at once. Runs immediately and then hourly.
func handleDNSDiscovery(server *ethp2p.Server) {
if len(inputSensorParams.DiscoveryDNS) == 0 {
return
}

select {
case lock <- struct{}{}:
defer func() { <-lock }()
default:
log.Warn().Msg("DNS discovery already running, skipping")
return
discoverPeers(server)

ticker := time.NewTicker(time.Hour)
defer ticker.Stop()

for range ticker.C {
discoverPeers(server)
}
}

// discoverPeers performs a single DNS discovery round.
func discoverPeers(server *ethp2p.Server) {
log.Info().
Str("discovery-dns", inputSensorParams.DiscoveryDNS).
Msg("Starting DNS discovery")
Expand All @@ -357,17 +392,13 @@ func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) {
}
defer iter.Close()

// Add DNS-discovered peers using the iterator.
count := 0
for iter.Next() {
node := iter.Node()
log.Debug().
Str("enode", node.URLv4()).
Msg("Discovered peer through DNS")

// Add the peer to the static node set. The server itself handles whether to
// connect to the peer if it's already connected. If a node is part of the
// static peer set, the server will handle reconnecting after disconnects.
server.AddPeer(node)
count++
}
Expand Down Expand Up @@ -450,6 +481,10 @@ will result in less chance of missing data but can significantly increase memory
f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true,
`write transaction events to database (this option can significantly increase CPU and memory usage)`)
f.BoolVar(&inputSensorParams.ShouldWritePeers, "write-peers", true, "write peers to database")
f.BoolVar(&inputSensorParams.ShouldBroadcastTx, "broadcast-txs", false, "broadcast full transactions to peers")
f.BoolVar(&inputSensorParams.ShouldBroadcastTxHashes, "broadcast-tx-hashes", false, "broadcast transaction hashes to peers")
f.BoolVar(&inputSensorParams.ShouldBroadcastBlocks, "broadcast-blocks", false, "broadcast full blocks to peers")
f.BoolVar(&inputSensorParams.ShouldBroadcastBlockHashes, "broadcast-block-hashes", false, "broadcast block hashes to peers")
f.BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "run pprof server")
f.UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "port pprof runs on")
f.BoolVar(&inputSensorParams.ShouldRunPrometheus, "prom", true, "run Prometheus server")
Expand Down Expand Up @@ -483,4 +518,11 @@ will result in less chance of missing data but can significantly increase memory
f.DurationVar(&inputSensorParams.ParentsCache.TTL, "parents-cache-ttl", 5*time.Minute, "time to live for parent hash cache entries (0 for no expiration)")
f.IntVar(&inputSensorParams.BlocksCache.MaxSize, "max-blocks", 1024, "maximum blocks to track across all peers (0 for no limit)")
f.DurationVar(&inputSensorParams.BlocksCache.TTL, "blocks-cache-ttl", 10*time.Minute, "time to live for block cache entries (0 for no expiration)")
f.IntVar(&inputSensorParams.TxsCache.MaxSize, "max-txs", 32768, "maximum transactions to cache for serving to peers (0 for no limit)")
f.DurationVar(&inputSensorParams.TxsCache.TTL, "txs-cache-ttl", 10*time.Minute, "time to live for transaction cache entries (0 for no expiration)")
f.UintVar(&inputSensorParams.KnownTxsBloom.Size, "known-txs-bloom-size", 327680,
`bloom filter size in bits for tracking known transactions per peer (default ~40KB per filter,
optimized for ~32K elements with ~1% false positive rate)`)
f.UintVar(&inputSensorParams.KnownTxsBloom.HashCount, "known-txs-bloom-hashes", 7, "number of hash functions for known txs bloom filter")
f.IntVar(&inputSensorParams.KnownBlocksMax, "max-known-blocks", 1024, "maximum block hashes to track per peer (0 for no limit)")
}
10 changes: 10 additions & 0 deletions doc/polycli_p2p_sensor.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ polycli p2p sensor amoy-nodes.json \
--api-port uint port API server will listen on (default 8080)
--blocks-cache-ttl duration time to live for block cache entries (0 for no expiration) (default 10m0s)
-b, --bootnodes string comma separated nodes used for bootstrapping
--broadcast-block-hashes broadcast block hashes to peers
--broadcast-blocks broadcast full blocks to peers
--broadcast-tx-hashes broadcast transaction hashes to peers
--broadcast-txs broadcast full transactions to peers
--database string which database to persist data to, options are:
- datastore (GCP Datastore)
- json (output to stdout)
Expand All @@ -107,12 +111,17 @@ polycli p2p sensor amoy-nodes.json \
-h, --help help for sensor
--key string hex-encoded private key (cannot be set with --key-file)
-k, --key-file string private key file (cannot be set with --key)
--known-txs-bloom-hashes uint number of hash functions for known txs bloom filter (default 7)
--known-txs-bloom-size uint bloom filter size in bits for tracking known transactions per peer (default ~40KB per filter,
optimized for ~32K elements with ~1% false positive rate) (default 327680)
--max-blocks int maximum blocks to track across all peers (0 for no limit) (default 1024)
-D, --max-db-concurrency int maximum number of concurrent database operations to perform (increasing this
will result in less chance of missing data but can significantly increase memory usage) (default 10000)
--max-known-blocks int maximum block hashes to track per peer (0 for no limit) (default 1024)
--max-parents int maximum parent block hashes to track per peer (0 for no limit) (default 1024)
-m, --max-peers int maximum number of peers to connect to (default 2000)
--max-requests int maximum request IDs to track per peer (0 for no limit) (default 2048)
--max-txs int maximum transactions to cache for serving to peers (0 for no limit) (default 32768)
--nat string NAT port mapping mechanism (any|none|upnp|pmp|pmp:<IP>|extip:<IP>) (default "any")
-n, --network-id uint filter discovered nodes by this network ID
--no-discovery disable P2P peer discovery
Expand All @@ -130,6 +139,7 @@ polycli p2p sensor amoy-nodes.json \
--static-nodes string static nodes file
--trusted-nodes string trusted nodes file
--ttl duration time to live (default 336h0m0s)
--txs-cache-ttl duration time to live for transaction cache entries (0 for no expiration) (default 10m0s)
--write-block-events write block events to database (default true)
-B, --write-blocks write blocks to database (default true)
--write-peers write peers to database (default true)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ require (
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/consensys/gnark-crypto v0.19.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/deckarep/golang-set/v2 v2.6.0
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/getsentry/sentry-go v0.29.1 // indirect
Expand All @@ -114,7 +114,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.12 // indirect
github.com/googleapis/gax-go/v2 v2.17.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3
github.com/holiman/uint256 v1.3.2
github.com/huin/goupnp v1.3.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down
Loading