Setup Discovery

Setup Discovery

setupDiscovery configures and initializes the discovery mechanisms for the Ethereum node, using both v4 and v5 discovery protocols if enabled. The discovery mechanisms help the node find and connect to peers in the Ethereum network.
/// ---p2p/server.go--- func (srv *Server) setupDiscovery() error { srv.discmix = enode.NewFairMix(discmixTimeout) // Don't listen on UDP endpoint if DHT is disabled. if srv.NoDiscovery { return nil } conn, err := srv.setupUDPListening() if err != nil { return err } var ( sconn discover.UDPConn = conn unhandled chan discover.ReadPacket ) // If both versions of discovery are running, setup a shared // connection, so v5 can read unhandled messages from v4. if srv.DiscoveryV4 && srv.DiscoveryV5 { unhandled = make(chan discover.ReadPacket, 100) sconn = &sharedUDPConn{conn, unhandled} } // Start discovery services. if srv.DiscoveryV4 { cfg := discover.Config{ PrivateKey: srv.PrivateKey, NetRestrict: srv.NetRestrict, Bootnodes: srv.BootstrapNodes, Unhandled: unhandled, Log: srv.log, } ntab, err := discover.ListenV4(conn, srv.localnode, cfg) if err != nil { return err } srv.ntab = ntab srv.discmix.AddSource(ntab.RandomNodes()) } if srv.DiscoveryV5 { cfg := discover.Config{ PrivateKey: srv.PrivateKey, NetRestrict: srv.NetRestrict, Bootnodes: srv.BootstrapNodesV5, Log: srv.log, } srv.DiscV5, err = discover.ListenV5(sconn, srv.localnode, cfg) if err != nil { return err } } // Add protocol-specific discovery sources. added := make(map[string]bool) for _, proto := range srv.Protocols { if proto.DialCandidates != nil && !added[proto.Name] { srv.discmix.AddSource(proto.DialCandidates) added[proto.Name] = true } } return nil }

Fair Mix

Geth has two node discovery mechnisms, V4 and V5. FairMix is used to switch between two ways.
Each node discovery mechanism provides an nodeiterator(source) to the FairMix by calling src.discmix.AddSouce. src.discmix implements methods to fetch node from its source based on mix mechnism.
/// ---p2p/enode/iter.go--- // NewFairMix creates a mixer. // // The timeout specifies how long the mixer will wait for the next fairly-chosen source // before giving up and taking a node from any other source. A good way to set the timeout // is deciding how long you'd want to wait for a node on average. Passing a negative // timeout makes the mixer completely fair. func NewFairMix(timeout time.Duration) *FairMix { m := &FairMix{ fromAny: make(chan *Node), closed: make(chan struct{}), timeout: timeout, } return m } type mixSource struct { it Iterator next chan *Node timeout time.Duration } // AddSource adds a source of nodes. func (m *FairMix) AddSource(it Iterator) { m.mu.Lock() defer m.mu.Unlock() if m.closed == nil { return } m.wg.Add(1) source := &mixSource{it, make(chan *Node), m.timeout} m.sources = append(m.sources, source) go m.runSource(m.closed, source) }
 
FairMix.runSource starts iterating the source, and passes fetched nodes to mixSource.next and FairMix.fromAny
/// ---p2p/enode/iter.go--- // runSource reads a single source in a loop. func (m *FairMix) runSource(closed chan struct{}, s *mixSource) { defer m.wg.Done() defer close(s.next) for s.it.Next() { n := s.it.Node() select { case s.next <- n: case m.fromAny <- n: case <-closed: return } } }
 
Below is node discovery V4’s iterator implementation. In Server.setupDiscovery, it calls srv.discmix.AddSource to insert ntab.RandomNodes() which is an lookupIterator instance implements Next() and Node().
  • Next() uses the v4 discovery to discovery nodes and stores them in it.buffer.
  • Node() fetches the current node. (The first in it.buffer)
/// ---p2p/discover/v4_udp.go--- // RandomNodes is an iterator yielding nodes from a random walk of the DHT. func (t *UDPv4) RandomNodes() enode.Iterator { return newLookupIterator(t.closeCtx, t.newRandomLookup) } /// ---p2p/discover/lookup.go--- // lookupIterator performs lookup operations and iterates over all seen nodes. // When a lookup finishes, a new one is created through nextLookup. type lookupIterator struct { buffer []*node nextLookup lookupFunc ctx context.Context cancel func() lookup *lookup } func newLookupIterator(ctx context.Context, next lookupFunc) *lookupIterator { ctx, cancel := context.WithCancel(ctx) return &lookupIterator{ctx: ctx, cancel: cancel, nextLookup: next} } /// ---p2p/discover/lookup.go--- // Node returns the current node. func (it *lookupIterator) Node() *enode.Node { if len(it.buffer) == 0 { return nil } return unwrapNode(it.buffer[0]) } // Next moves to the next node. func (it *lookupIterator) Next() bool { // Consume next node in buffer. if len(it.buffer) > 0 { it.buffer = it.buffer[1:] } // Advance the lookup to refill the buffer. for len(it.buffer) == 0 { if it.ctx.Err() != nil { it.lookup = nil it.buffer = nil return false } if it.lookup == nil { it.lookup = it.nextLookup(it.ctx) continue } if !it.lookup.advance() { it.lookup = nil continue } it.buffer = it.lookup.replyBuffer } return true }
 
Next method of the FairMix struct in the enode package is designed to return nodes from multiple sources in a way that balances fairness and responsiveness.
Steps:
  1. Initialization:
    1. sets m.cur to nil which indicates that no current node is selected yet.
  1. Pick a Source:
      • It calls m.pickSource() to select the next source from which to try and get a node. If pickSource returns nil, indicating that there are no sources left, it falls back to m.nextFromAny() which attempts to get a node from any remaining sources.
      • m.pickSource() picks souce in order.
  1. Setup Timeout:
    1. If the source has a non-negative timeout value, a timer is set up to enforce this timeout. This timer channel will be used to ensure that the method doesn’t wait indefinitely for a node from the selected source.
  1. Receives Node from Source:
      • If a node is received from the selected source’s next channel:
        • Resets the source’s timeout to the configured value (m.timeout).
        • Sets m.cur to the received node.
        • Returns true, indicating a node was successfully retrieved.
      • Else delete the source
  1. Timeout
    1. If the timeout occurs before a node is received from the source, the method:
      • Halves the source’s timeout value to improve responsiveness for subsequent attempts.
      • Falls back to trying to get a node from any source using m.nextFromAny().
/// ---p2p/enode/iter.go--- // Node returns the current node. func (m *FairMix) Node() *Node { return m.cur } // Next returns a node from a random source. func (m *FairMix) Next() bool { // Initialization: m.cur = nil for { // Pick a Source: source := m.pickSource() if source == nil { return m.nextFromAny() } // Setup Timeout to Souce's Configuration: var timeout <-chan time.Time if source.timeout >= 0 { timer := time.NewTimer(source.timeout) timeout = timer.C defer timer.Stop() } select { // Receives Node from Source: case n, ok := <-source.next: if ok { // Here, the timeout is reset to the configured value // because the source delivered a node. source.timeout = m.timeout m.cur = n return true } // This source has ended. m.deleteSource(source) // Timeout: case <-timeout: // The selected source did not deliver a node within the timeout, so the // timeout duration is halved for next time. This is supposed to improve // latency with stuck sources. source.timeout /= 2 return m.nextFromAny() } } } // pickSource chooses the next source to read from, cycling through them in order. func (m *FairMix) pickSource() *mixSource { m.mu.Lock() defer m.mu.Unlock() if len(m.sources) == 0 { return nil } m.last = (m.last + 1) % len(m.sources) return m.sources[m.last] } // nextFromAny is used when there are no sources or when the 'fair' choice // doesn't turn up a node quickly enough. func (m *FairMix) nextFromAny() bool { n, ok := <-m.fromAny if ok { m.cur = n } return ok }

Set up UDP Listening

Sets up a UDP connection for the discovery protocol. This function creates and binds a UDP socket.
/// ---p2p/server.go--- func (srv *Server) setupUDPListening() (*net.UDPConn, error) { // get configured Listen address listenAddr := srv.ListenAddr // Use an alternate listening address for UDP if // a custom discovery address is configured. if srv.DiscAddr != "" { listenAddr = srv.DiscAddr } // resolve UDP address addr, err := net.ResolveUDPAddr("udp", listenAddr) if err != nil { return nil, err } // listen on UDP address conn, err := net.ListenUDP("udp", addr) if err != nil { return nil, err } // get listening address(like automatically choosen Port by operation system) laddr := conn.LocalAddr().(*net.UDPAddr) // set fallback UDP in local node ENR srv.localnode.SetFallbackUDP(laddr.Port) srv.log.Debug("UDP listener up", "addr", laddr) // set up port mapping. Send UDP register message to chan to notify port mapping routine. if !laddr.IP.IsLoopback() && !laddr.IP.IsPrivate() { srv.portMappingRegister <- &portMapping{ protocol: "UDP", name: "ethereum peer discovery", port: laddr.Port, } } return conn, nil }
 

Start V4 Discovery Service

ListenV4 handles setting up a new UDPv4 object and starting the discovery protocol over UDP for Ethereum nodes.
Steps:
  1. Configuration and Context Initialization:
    1. sets up default values for the configuration and creates a context that can be canceled to signal closure.
  1. UDPv4 Structure Initialization:
    1. A new UDPv4 struct is initialized with the provided connection, configuration, local node, and other required fields.
  1. Table Initialization:
    1. newMeteredTable is called to create a new node table. The node table keeps track of known nodes and performs periodic refreshes and validations.
  1. Background Goroutines:
      • tab.loop() handles sending message to remote nodes to refresh nodes in table.
      • t.loop() handles the main loop of the UDPv4 object, including reply matching and timeout handling.
      • t.readLoop(cfg.Unhandled) continuously reads incoming UDP packets and processes them.
  1. Returning the UDPv4 Object.
/// ---p2p/discover/v4_udp.go--- func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { // Configuration and Context Initialization: cfg = cfg.withDefaults() // UDPv4 Structure Initialization: closeCtx, cancel := context.WithCancel(context.Background()) t := &UDPv4{ conn: newMeteredConn(c), priv: cfg.PrivateKey, netrestrict: cfg.NetRestrict, localNode: ln, db: ln.Database(), gotreply: make(chan reply), addReplyMatcher: make(chan *replyMatcher), closeCtx: closeCtx, cancelCloseCtx: cancel, log: cfg.Log, } // Table Initialization: tab, err := newMeteredTable(t, ln.Database(), cfg) if err != nil { return nil, err } t.tab = tab go tab.loop() t.wg.Add(2) // Background Goroutines: go t.loop() go t.readLoop(cfg.Unhandled) // Returning the UDPv4 Object. return t, nil }

Kademlia Distributed Hash Table

newMeteredTable creates a new instance of the Table struct, which represents a Kademlia-like distributed hash table (DHT) used in the Ethereum node discovery protocol. Ethereum uses a modification version of Kademila DHT table, the purpose is to maintain a decentralized network where nodes can efficiently discover and communicate with each other without relying on a central server. detail.
Steps:
  1. calls newTable to create a new instance of the Table struct using the provided transport, database, and configuration.
  1. If metrics are enabled, the function sets two hooks on the table:
      • nodeAddedHook: This hook is called when a node is added to a bucket. It increments a counter (bucketsCounter[b.index]) associated with the bucket's index.
      • nodeRemovedHook: This hook is called when a node is removed from a bucket. It decrements the same counter.
/// ---p2p/discover/table.go--- func newMeteredTable(t transport, db *enode.DB, cfg Config) (*Table, error) { tab, err := newTable(t, db, cfg) if err != nil { return nil, err } if metrics.Enabled { tab.nodeAddedHook = func(b *bucket, n *node) { bucketsCounter[b.index].Inc(1) } tab.nodeRemovedHook = func(b *bucket, n *node) { bucketsCounter[b.index].Dec(1) } } return tab, nil }
 
newTable initializes the Table struct with the provided parameters and sets up its internal state.
Steps:
  1. Apply default values to config
    1. Applies default values to the configuration if they are not already set.
  1. Initialize Table
    1. Initializes the Table struct with the provided transport, database, configuration, and other necessary fields.
  1. Sets up fallback nodes for bootstrapping the network.
  1. Initializes the buckets used for storing nodes.
  1. Seeds the random number generator used for various table operations.
  1. Loads seed nodes from the database.
/// ---p2p/discover/table.go--- // Table is the 'node table', a Kademlia-like index of neighbor nodes. The table keeps // itself up-to-date by verifying the liveness of neighbors and requesting their node // records when announcements of a new record version are received. type Table struct { mutex sync.Mutex // protects buckets, bucket content, nursery, rand buckets [nBuckets]*bucket // index of known nodes by distance nursery []*node // bootstrap nodes rand *mrand.Rand // source of randomness, periodically reseeded ips netutil.DistinctNetSet db *enode.DB // database of known nodes net transport cfg Config log log.Logger // loop channels refreshReq chan chan struct{} initDone chan struct{} closeReq chan struct{} closed chan struct{} nodeAddedHook func(*bucket, *node) nodeRemovedHook func(*bucket, *node) } func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) { // Applies default values to the config cfg = cfg.withDefaults() // Intialize table tab := &Table{ net: t, db: db, cfg: cfg, log: cfg.Log, refreshReq: make(chan chan struct{}), initDone: make(chan struct{}), closeReq: make(chan struct{}), closed: make(chan struct{}), rand: mrand.New(mrand.NewSource(0)), ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, } // Sets up fallback nodes for bootstrapping the network. if err := tab.setFallbackNodes(cfg.Bootnodes); err != nil { return nil, err } // Initializes the buckets used for storing nodes. for i := range tab.buckets { tab.buckets[i] = &bucket{ index: i, ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit}, } } // Seeds the random number generator used for various table operations. tab.seedRand() // Loads seed nodes from the database. tab.loadSeedNodes() return tab, nil } // apply default values to config func (cfg Config) withDefaults() Config { // Node table configuration: if cfg.PingInterval == 0 { cfg.PingInterval = 10 * time.Second } if cfg.RefreshInterval == 0 { cfg.RefreshInterval = 30 * time.Minute } // Debug/test settings: if cfg.Log == nil { cfg.Log = log.Root() } if cfg.ValidSchemes == nil { cfg.ValidSchemes = enode.ValidSchemes } if cfg.Clock == nil { cfg.Clock = mclock.System{} } return cfg } // setFallbackNodes sets the initial points of contact. These nodes // are used to connect to the network if the table is empty and there // are no known nodes in the database. func (tab *Table) setFallbackNodes(nodes []*enode.Node) error { nursery := make([]*node, 0, len(nodes)) for _, n := range nodes { // validate the node's ENR is complete if err := n.ValidateComplete(); err != nil { return fmt.Errorf("bad bootstrap node %q: %v", n, err) } // validate the node's area is not banned if tab.cfg.NetRestrict != nil && !tab.cfg.NetRestrict.Contains(n.IP()) { tab.log.Error("Bootstrap node filtered by netrestrict", "id", n.ID(), "ip", n.IP()) continue } // add node nursery = append(nursery, wrapNode(n)) } // update bootrap nodes tab.nursery = nursery return nil }
 
seedRand update the seed of Table.rand which is used to generate randomness.
/// ---p2p/discover/table.go--- // Table is the 'node table', a Kademlia-like index of neighbor nodes. The table keeps // itself up-to-date by verifying the liveness of neighbors and requesting their node // records when announcements of a new record version are received. type Table struct { // ... rand *mrand.Rand // source of randomness, periodically reseeded // ... } func (tab *Table) seedRand() { var b [8]byte crand.Read(b[:]) tab.mutex.Lock() tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:]))) tab.mutex.Unlock() }
 
loadSeedNodes randomly loads nodes from db into Table, used with boottrap nodes to bootstrapping the P2P process.
Steps:
  1. Randomly pick seed nodes.
  1. Add nodes to Table’s buckets.
/// ---p2p/discover/table.go--- func (tab *Table) loadSeedNodes() { // Randomly pick seed nodes seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge)) // Add nodes to Table’s buckets. seeds = append(seeds, tab.nursery...) for i := range seeds { seed := seeds[i] if tab.log.Enabled(context.Background(), log.LevelTrace) { age := time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age) } tab.addSeenNode(seed) } }
 
QuerySeeds initialize id (32 bytes), randomly increments the first byte of id, searches node in db whose key is bigger than the modified id.
It increments the first byte by 16 each time, ensures that the seeking behavior is randomized but in a controlled manner, avoiding drastic jumps that could skip over large portions of the database.
Also it validates the fetched node’s validity, node should be active within maxAge.
/// ---p2p/enode/nodedb.go--- // QuerySeeds retrieves random nodes to be used as potential seed nodes // for bootstrapping. func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node { var ( now = time.Now() nodes = make([]*Node, 0, n) it = db.lvl.NewIterator(nil, nil) id ID ) defer it.Release() seek: for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ { // Seek to a random entry. The first byte is incremented by a // random amount each time in order to increase the likelihood // of hitting all existing nodes in very small databases. ctr := id[0] rand.Read(id[:]) id[0] = ctr + id[0]%16 it.Seek(nodeKey(id)) n := nextNode(it) if n == nil { id[0] = 0 continue seek // iterator exhausted } if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge { continue seek } for i := range nodes { if nodes[i].ID() == n.ID() { continue seek // duplicate } } nodes = append(nodes, n) } return nodes } /// ---p2p/enode/node.go--- // ID is a unique identifier for each node. type ID [32]byte
 
addSeenNode inserts node into table, specifically, into certain bucket according XOR distance.
Steps:
  1. Check for Self Node:
    1. if the node being added is the same as the local node. If it is, the function returns immediately without making any changes. This ensures that a node does not add itself to its own routing table.
  1. Lock the Table:
    1. acquires a lock on the table to ensure thread-safe access.
  1. Get the Appropriate Bucket:
    1. determines the appropriate bucket for the node based on its ID. The bucket is identified using the XOR distance between the node's ID and the local node's ID.
  1. Check for Existing Entry:
    1. check whether the node is already in the bucket. If the node is already present, the function returns without adding it again.
  1. Handle Full Bucket:
    1. If the bucket is full (i.e., it contains the maximum number of entries), then it tries to add the node to the replacement list instead.
  1. Update Subnet IP Count:
    1. calculate subnet representation key of the remote node’s IP. Validate the subnet is not full in table and bucket aspect, then update the amount of IP in that subnet. Table and each bucket have IP amount limitation for single subnet.
  1. Add Node to Bucket:
    1. add the node to the end of the bucket's entries. It also removes the node from the replacement list if it was present.
  1. Node Added Hook:
    1. If there is a hook defined for when a node is added, the function calls this hook with the bucket and the node as parameters. This allows for additional actions to be taken when a node is added.
/// ---p2p/discover/table.go--- // addSeenNode adds a node which may or may not be live to the end of a bucket. If the // bucket has space available, adding the node succeeds immediately. Otherwise, the node is // added to the replacements list. // // The caller must not hold tab.mutex. func (tab *Table) addSeenNode(n *node) { // Check for Self Node: if n.ID() == tab.self().ID() { return } // Lock the Table: tab.mutex.Lock() defer tab.mutex.Unlock() // Get the Appropriate Bucket: b := tab.bucket(n.ID()) // Check for Existing Entry: if contains(b.entries, n.ID()) { // Already in bucket, don't add. return } // Handle Full Bucket: if len(b.entries) >= bucketSize { // Bucket full, maybe add as replacement. tab.addReplacement(b, n) return } // Update Subnet IP Count: if !tab.addIP(b, n.IP()) { // Can't add: IP limit reached. return } // Add Node to Bucket: // Add to end of bucket: b.entries = append(b.entries, n) b.replacements = deleteNode(b.replacements, n) n.addedAt = time.Now() // Node Added Hook: if tab.nodeAddedHook != nil { tab.nodeAddedHook(b, n) } } // update amount of IP in subnet func (tab *Table) addIP(b *bucket, ip net.IP) bool { if len(ip) == 0 { return false // Nodes without IP cannot be added. } if netutil.IsLAN(ip) { return true } // update amount of IP in subnet in table aspect if !tab.ips.Add(ip) { tab.log.Debug("IP exceeds table limit", "ip", ip) return false } // update amount of IP in subnet in bucket aspect if !b.ips.Add(ip) { tab.log.Debug("IP exceeds bucket limit", "ip", ip) tab.ips.Remove(ip) return false } return true } /// ---p2p/netutil/net.go--- // DistinctNetSet tracks IPs, ensuring that at most N of them // fall into the same network range. type DistinctNetSet struct { Subnet uint // number of common prefix bits Limit uint // maximum number of IPs in each subnet members map[string]uint buf net.IP } // Add adds an IP address to the set. It returns false (and doesn't add the IP) if the // number of existing IPs in the defined range exceeds the limit. func (s *DistinctNetSet) Add(ip net.IP) bool { // calculates subnet key of the IP key := s.key(ip) // check this subnet is not full n := s.members[string(key)] if n < s.Limit { // update IP amount in this subnet s.members[string(key)] = n + 1 return true } return false } // Remove removes an IP from the set. func (s *DistinctNetSet) Remove(ip net.IP) { key := s.key(ip) if n, ok := s.members[string(key)]; ok { if n == 1 { delete(s.members, string(key)) } else { s.members[string(key)] = n - 1 } } }

Loop: Table

loop function of the Table struct is designed to handle periodic tasks and requests to maintain the state of the node discovery table.
  • doRefresh handles refresh nodes in the table route.
  • doRevalidate randomly selects nodes in table to re-ping and check validity.
  • copyLiveNodes inserted qualified nodes in table to database.
 
tab.seedRand update seed which in turn changes nodes to query during doRefresh.
If timer triggers, it will trigger corresponding task. If the task finishes before next timer trigger, it will reset the timer.
/// ---p2p/discover/table.go--- // loop schedules runs of doRefresh, doRevalidate and copyLiveNodes. func (tab *Table) loop() { var ( revalidate = time.NewTimer(tab.nextRevalidateTime()) refresh = time.NewTimer(tab.nextRefreshTime()) copyNodes = time.NewTicker(copyNodesInterval) refreshDone = make(chan struct{}) // where doRefresh reports completion revalidateDone chan struct{} // where doRevalidate reports completion waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs ) defer refresh.Stop() defer revalidate.Stop() defer copyNodes.Stop() // Start initial refresh. go tab.doRefresh(refreshDone) loop: for { select { case <-refresh.C: tab.seedRand() if refreshDone == nil { refreshDone = make(chan struct{}) go tab.doRefresh(refreshDone) } case req := <-tab.refreshReq: waiting = append(waiting, req) if refreshDone == nil { refreshDone = make(chan struct{}) go tab.doRefresh(refreshDone) } case <-refreshDone: for _, ch := range waiting { close(ch) } waiting, refreshDone = nil, nil refresh.Reset(tab.nextRefreshTime()) case <-revalidate.C: revalidateDone = make(chan struct{}) go tab.doRevalidate(revalidateDone) case <-revalidateDone: revalidate.Reset(tab.nextRevalidateTime()) revalidateDone = nil case <-copyNodes.C: go tab.copyLiveNodes() case <-tab.closeReq: break loop } } if refreshDone != nil { <-refreshDone } for _, ch := range waiting { close(ch) } if revalidateDone != nil { <-revalidateDone } close(tab.closed) }
Table.doRefresh
Table.doRefresh hanldes refreshing table nodes. It achieves this by continuously sending Findnode request to nodes in table to get other nodes.
Local node first sends Findnode request to bootnode, which responds with a list of peers that the new node can connect to, then local node adds these nodes to the table. After that, local node iterates all nodes in the table and sends Findnode to them to get more nodes to fill the table route. This process keeps going regularly.
Inside the doRefresh :
  1. Load seed nodes
    1. randomly loads nodes from db into Table, used with boottrap nodes to start refreshing process.
  1. Look up neighbour nodes of self
    1. tab.net.lookupSelf looks up neighbour nodes of self, then look up neighbours node of received neighbour nodes.
  1. Look neighbour nodes of random nodes
    1. it may take much time to fill all buckets if we only starts from fetching neighbours of local node, those nodes far away from local node may be hard to be fetched. To improve the speed of filling all buckets, geth randomly generates 3 node IDs and searches neighbours of them. This is done by tab.net.lookupRandom().
// doRefresh performs a lookup for a random target to keep buckets full. seed nodes are // inserted if the table is empty (initial bootstrap or discarded faulty peers). func (tab *Table) doRefresh(done chan struct{}) { defer close(done) // Load nodes from the database and insert // them. This should yield a few previously seen nodes that are // (hopefully) still alive. tab.loadSeedNodes() // Run self lookup to discover new neighbor nodes. tab.net.lookupSelf() // The Kademlia paper specifies that the bucket refresh should // perform a lookup in the least recently used bucket. We cannot // adhere to this because the findnode target is a 512bit value // (not hash-sized) and it is not easily possible to generate a // sha3 preimage that falls into a chosen bucket. // We perform a few lookups with a random target instead. for i := 0; i < 3; i++ { tab.net.lookupRandom() } }
 
UDPv4.lookupSelf performs a lookup for neighbour nodes of local node on the network. It initializes a lookup instance to manage related state and runs the look up process.
UDPv4.lookupRandom only differs in that it generates a random node ID and starts searching neighbours of that node.
Steps:
  1. Encode local node’s public key to target type.
  1. New lookup instance to manage lookup status:
    1. registers queryfunc inside lookup which is used to manage sending findnode request to remote nodes.
  1. Run lookup:
    1. call lookup.run to start lookup process.
/// ---p2p/discover/v4_udp.go--- // lookupSelf implements transport. func (t *UDPv4) lookupSelf() []*enode.Node { /// Encode local node’s public key to target type. /// Run lookup. return t.newLookup(t.closeCtx, encodePubkey(&t.priv.PublicKey)).run() } func (t *UDPv4) newLookup(ctx context.Context, targetKey encPubkey) *lookup { target := enode.ID(crypto.Keccak256Hash(targetKey[:])) ekey := v4wire.Pubkey(targetKey) /// New lookup instance to manage lookup status: it := newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) { return t.findnode(n.ID(), n.addr(), ekey) }) return it } func newLookup(ctx context.Context, tab *Table, target enode.ID, q queryFunc) *lookup { it := &lookup{ tab: tab, queryfunc: q, asked: make(map[enode.ID]bool), seen: make(map[enode.ID]bool), result: nodesByDistance{target: target}, replyCh: make(chan []*node, alpha), cancelCh: ctx.Done(), queries: -1, } // Don't query further if we hit ourself. // Unlikely to happen often in practice. it.asked[tab.self().ID()] = true return it } // lookupRandom implements transport. func (t *UDPv4) lookupRandom() []*enode.Node { return t.newRandomLookup(t.closeCtx).run() } func (t *UDPv4) newRandomLookup(ctx context.Context) *lookup { var target encPubkey crand.Read(target[:]) return t.newLookup(ctx, target) }
 
findnode's primary purpose is to send a FINDNODE request to a given node and process the NEIGHBORS responses to discover nodes that are closer to a specific target.
It registers corresponding reply message handling method to main loop. When main loop receives message from remote node, it matches the remote node’s ID and message type to regitered reply hanlding method, and uses that method to handle the reply.
Steps:
  1. Ensure Bond:
    1. Before sending the FINDNODE request, the method calls t.ensureBond(toid, toaddr) to verify that there is a recent endpoint proof from the target node. This step helps to prevent amplification attacks and ensures the target node is reachable. ("endpoint proof" refers to evidence that a particular node is reachable at a specific network address (IP and port). This proof is typically obtained through the exchange of PING and PONG messages between nodes.)
  1. Initialize Reply Matcher:
    1. registers a reply matcher to handle incoming NEIGHBORS packets in main loop. When main loop receives reply, it will match the reply to corresponding registered reply matcher to handle the reply. This matcher will collect nodes from the responses until the bucket size (bucketSize) is reached or the replies timeout.
  1. Send FINDNODE Request:
    1. The method constructs a FINDNODE request with the target node ID and sends it to the specified address. The request includes an expiration time to ensure it's valid for a limited period.
  1. Wait for Responses:
    1. The method waits for responses using the reply matcher. If the remote node sends enough NEIGHBORS responses, the matcher will collect the nodes. If the remote node doesn't have enough nodes, the reply matcher will time out.
  1. Return Discovered Nodes:
    1. Finally, the method returns the list of discovered nodes and any error encountered during the process
/// ---p2p/discover/v4_udp.go--- // findnode sends a findnode request to the given node and waits until // the node has sent up to k neighbors. func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubkey) ([]*node, error) { /// Ensure Bond: t.ensureBond(toid, toaddr) /// Initialize Reply Matcher: // Add a matcher for 'neighbours' replies to the pending reply queue. The matcher is // active until enough nodes have been received. nodes := make([]*node, 0, bucketSize) nreceived := 0 rm := t.pending(toid, toaddr.IP, v4wire.NeighborsPacket, func(r v4wire.Packet) (matched bool, requestDone bool) { reply := r.(*v4wire.Neighbors) for _, rn := range reply.Nodes { nreceived++ n, err := t.nodeFromRPC(toaddr, rn) if err != nil { t.log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err) continue } nodes = append(nodes, n) } return true, nreceived >= bucketSize }) /// Send FINDNODE Request: t.send(toaddr, toid, &v4wire.Findnode{ Target: target, Expiration: uint64(time.Now().Add(expiration).Unix()), }) /// Wait for Responses: // Ensure that callers don't see a timeout if the node actually responded. Since // findnode can receive more than one neighbors response, the reply matcher will be // active until the remote node sends enough nodes. If the remote end doesn't have // enough nodes the reply matcher will time out waiting for the second reply, but // there's no need for an error in that case. err := <-rm.errc if errors.Is(err, errTimeout) && rm.reply != nil { err = nil } /// Return Discovered Nodes: return nodes, err }
ensureBond's purpose is to ensure that there is a recent proof of the peer's endpoint. This is done to verify that the peer is reachable and valid before sending potentially expensive or resource-intensive requests like findnode. (currently there is no action when error occurs, like if the remote node doesn’t reply)
/// ---p2p/discover/v4_udp.go--- // ensureBond solicits a ping from a node if we haven't seen a ping from it for a while. // This ensures there is a valid endpoint proof on the remote end. func (t *UDPv4) ensureBond(toid enode.ID, toaddr *net.UDPAddr) { tooOld := time.Since(t.db.LastPingReceived(toid, toaddr.IP)) > bondExpiration if tooOld || t.db.FindFails(toid, toaddr.IP) > maxFindnodeFailures { rm := t.sendPing(toid, toaddr, nil) <-rm.errc // Wait for them to ping back and process our pong. time.Sleep(respTimeout) } } // sendPing sends a ping message to the given node and invokes the callback // when the reply arrives. func (t *UDPv4) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) *replyMatcher { req := t.makePing(toaddr) packet, hash, err := v4wire.Encode(t.priv, req) if err != nil { errc := make(chan error, 1) errc <- err return &replyMatcher{errc: errc} } // Add a matcher for the reply to the pending reply queue. Pongs are matched if they // reference the ping we're about to send. rm := t.pending(toid, toaddr.IP, v4wire.PongPacket, func(p v4wire.Packet) (matched bool, requestDone bool) { matched = bytes.Equal(p.(*v4wire.Pong).ReplyTok, hash) if matched && callback != nil { callback() } return matched, matched }) // Send the packet. t.localNode.UDPContact(toaddr) t.write(toaddr, toid, req.Name(), packet) return rm } func (t *UDPv4) makePing(toaddr *net.UDPAddr) *v4wire.Ping { return &v4wire.Ping{ Version: 4, From: t.ourEndpoint(), To: v4wire.NewEndpoint(toaddr, 0), Expiration: uint64(time.Now().Add(expiration).Unix()), ENRSeq: t.localNode.Node().Seq(), } }
 
lookup.run starts a loop to call look.advance to iteratively queries nodes to find those that are closer to the target until no new nodes can be found. After the loop is over, it unwraps nodes collected from remote nodes and returns.
/// ---p2p/discover/lookup.go--- // run runs the lookup to completion and returns the closest nodes found. func (it *lookup) run() []*enode.Node { for it.advance() { } return unwrapNodes(it.result.entries) } /// ---p2p/discover/node.go--- func unwrapNodes(ns []*node) []*enode.Node { result := make([]*enode.Node, len(ns)) for i, n := range ns { result[i] = unwrapNode(n) } return result } func unwrapNode(n *node) *enode.Node { return &n.Node }
 
lookup.advance performs node lookup process by querying nodes and processing their responses
Steps
  1. Start Queries:
    1. for it.startQueries() {}: This loop calls startQueries() to initiate new queries. It continues as long as startQueries() returns true.
  1. Wait for Responses:
    1. The select statement waits for either:
      • Responses from nodes (<-it.replyCh). Every query returns fetched nodes to channel it.replyCh.
      • A cancellation signal (<-it.cancelCh).
  1. Process Responses:
      • Response Case (<-it.replyCh):
        • Clears the reply buffer.
        • Iterates over the received nodes:
          • Checks if the node is not nil and hasn't been seen before.
          • Marks the node as seen (it.seen[n.ID()] = true).
          • Adds the node to the result set (it.result.push(n, bucketSize)).
          • Appends the node to the reply buffer (it.replyBuffer = append(it.replyBuffer, n)).
          • Decrements the query count (it.queries--).
          • If new nodes are found (len(it.replyBuffer) > 0), returns true to indicate that the lookup should continue.
      • Cancellation Case (<-it.cancelCh):
        • Calls it.shutdown() to shutdown this loopup process.
  1. Return Condition:
    1. If no new nodes are found and the for loop terminates, the function returns false to indicate that the lookup has ended.
/// ---p2p/discover/lookup.go--- // advance advances the lookup until any new nodes have been found. // It returns false when the lookup has ended. func (it *lookup) advance() bool { /// Start Queries: for it.startQueries() { /// Process Responses: select { /// receives reply from remote node case nodes := <-it.replyCh: it.replyBuffer = it.replyBuffer[:0] for _, n := range nodes { if n != nil && !it.seen[n.ID()] { /// update remote node as seen. it.seen[n.ID()] = true // push remote node into result it.result.push(n, bucketSize) // add to buffer to check later whether there is new node found in this query round. it.replyBuffer = append(it.replyBuffer, n) } } it.queries-- /// If there is new node found, then it returns true to continue lookup process. if len(it.replyBuffer) > 0 { return true } /// Cancellation Case: case <-it.cancelCh: /// shutdown this lookup process it.shutdown() } } return false }
 
startQueries function aims to initiate queries to nodes that are closer to the target and have not been queried yet. It first checks if the query function is defined. If so, it handles the first query differently by returning nodes from the local table. Subsequent queries are initiated to remote nodes.
Steps:
  1. Check Query Function:
    1. If the query function (queryfunc) is not defined, the function returns false, indicating that no more queries can be started.
  1. Handle First Query:
    1. returns the bucketSize nodes in the table that are closest to the given id.
  1. Query Unasked Nodes:
    1. Iterates over the result entries (result.entries), initiating queries to nodes that haven't been asked yet and limiting the number of ongoing queries to alpha.
  1. End Condition:
    1. Returns true if there are ongoing queries, false otherwise, indicating that the lookup should continue or end.
/// ---p2p/discover/lookup.go--- func (it *lookup) startQueries() bool { /// Check Query Function: if it.queryfunc == nil { return false } /// Handle First Query: // The first query returns nodes from the local table. if it.queries == -1 { closest := it.tab.findnodeByID(it.result.target, bucketSize, false) // Avoid finishing the lookup too quickly if table is empty. It'd be better to wait // for the table to fill in this case, but there is no good mechanism for that // yet. if len(closest.entries) == 0 { it.slowdown() } it.queries = 1 it.replyCh <- closest.entries return true } /// Query Unasked Nodes: // Ask the closest nodes that we haven't asked yet. for i := 0; i < len(it.result.entries) && it.queries < alpha; i++ { n := it.result.entries[i] if !it.asked[n.ID()] { it.asked[n.ID()] = true it.queries++ go it.query(n, it.replyCh) } } /// End Condition: // The lookup ends when no more nodes can be asked. return it.queries > 0 }
 
lookup.query performs sending a query to a specific node and handling the response. Steps:
  1. Retrieve Failure Count
    1. Fetches the number of times a query to this node has failed. This count helps determine whether the node should be removed from the local table if it fails repeatedly.
  1. Perform the Query
    1. Calls the queryfunc function to perform the actual query to the node. queryfunc is a function passed to the lookup struct that handles querying a node. From previous code, we know it’s findnode.
  1. Handle query results
      • Error Handling:
        • If the error indicates that the process is shutting down (errClosed), it avoids recording this as a failure and simply returns.
      • No Results Handling:
        • Handles the case where the query returns no results. Increments the failure count for the node. Updates the failure count in the database. If the node has failed too many times (maxFindnodeFailures) and the corresponding bucket has enough other nodes, the node is removed from the table.
      • Successful Results Handling:
        • If the query returns results and there were previous failures, it resets the failure count.
  1. Store fetched nodes
    1. Adds the returned nodes to the table and sends them to the replyCh channel for further process (loop in advance determines whether to continue lookup process according to whether there is new node fetched).
/// ---p2p/discover/lookup.go--- func (it *lookup) query(n *node, reply chan<- []*node) { /// retrieves the number of findnode failures since bonding. fails := it.tab.db.FindFails(n.ID(), n.IP()) r, err := it.queryfunc(n) if errors.Is(err, errClosed) { // Avoid recording failures on shutdown. reply <- nil return } else if len(r) == 0 { fails++ it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails) // Remove the node from the local table if it fails to return anything useful too // many times, but only if there are enough other nodes in the bucket. dropped := false if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 { dropped = true it.tab.delete(n) } it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err) } else if fails > 0 { // Reset failure counter because it counts _consecutive_ failures. it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0) } // Grab as many nodes as possible. Some of them might not be alive anymore, but we'll // just remove those again during revalidation. for _, n := range r { it.tab.addSeenNode(n) } reply <- r }
 
Table.doRevalidate
Table.doRevalidate function is responsible for ensuring that nodes in the Kademlia-like node table are still live. This is achieved by pinging the last node in a random bucket and replacing it if it does not respond.
Steps:
  1. Defer signal message to done channel:
    1. The function accepts a done channel as an argument. This channel is used to signal the completion of the revalidation process. The defer block ensures that the done channel is signaled regardless of how the function exits, indicating the end of the revalidation process. When the outer loop receives the done signal, it will schedule next doRevalidate.
  1. Node Selection for Revalidation:
    1. nodeToRevalidate selects the last node in a random, non-empty bucket for revalidation. If no such node is found, the function exits early.
  1. Ping the Selected Node:
    1. sends a ping to the selected node and waits for a pong (response). If the node responds, err will be nil, and remoteSeq will contain the sequence number from the node.
  1. Fetch Updated Node Record:
    1. If the node responds with a higher sequence number, indicating it has a more recent record, the function requests the updated record. If the request is successful, the node is updated with the new record.
  1. Update or Remove Node:
      • If the node responds (err == nil), it is considered live, its liveness check count is incremented, and it is moved to the front of the bucket using bumpInBucket.
      • If the node does not respond, it is considered dead. The function attempts to replace it with a node from the replacements list using replace. If no replacements are available, the node is removed from the bucket.
/// ---p2p/discover/table.go--- // doRevalidate checks that the last node in a random bucket is still live and replaces or // deletes the node if it isn't. func (tab *Table) doRevalidate(done chan<- struct{}) { /// Defer signal message to done channel: defer func() { done <- struct{}{} }() /// Node Selection for Revalidation: last, bi := tab.nodeToRevalidate() if last == nil { // No non-empty bucket found. return } /// Ping the Selected Node: // Ping the selected node and wait for a pong. remoteSeq, err := tab.net.ping(unwrapNode(last)) /// Fetch Updated Node Record: // Also fetch record if the node replied and returned a higher sequence number. if last.Seq() < remoteSeq { n, err := tab.net.RequestENR(unwrapNode(last)) if err != nil { tab.log.Debug("ENR request failed", "id", last.ID(), "addr", last.addr(), "err", err) } else { last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks} } } /// Update or Remove Node: tab.mutex.Lock() defer tab.mutex.Unlock() b := tab.buckets[bi] if err == nil { // The node responded, move it to the front. last.livenessChecks++ tab.log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks) tab.bumpInBucket(b, last) return } // No reply received, pick a replacement or delete the node if there aren't // any replacements. if r := tab.replace(b, last); r != nil { tab.log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks, "r", r.ID(), "rip", r.IP()) } else { tab.log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks) } }
 
nodeToRevalidate fetches the last node in a random, non-empty bucket.
/// ---p2p/discover/table.go--- // nodeToRevalidate returns the last node in a random, non-empty bucket. func (tab *Table) nodeToRevalidate() (n *node, bi int) { tab.mutex.Lock() defer tab.mutex.Unlock() for _, bi = range tab.rand.Perm(len(tab.buckets)) { b := tab.buckets[bi] if len(b.entries) > 0 { last := b.entries[len(b.entries)-1] return last, bi } } return nil, 0 }
 
UDPv4.ping sends a ping message to the given node and waits for a reply. It registers a replyMathcer on the main loop, after main loop handles the response fro remote node, ping will get reply and return remote node’s ENR seq(sequencer)
 
/// ---p2p/discover/v4_udp.go--- // ping sends a ping message to the given node and waits for a reply. func (t *UDPv4) ping(n *enode.Node) (seq uint64, err error) { rm := t.sendPing(n.ID(), &net.UDPAddr{IP: n.IP(), Port: n.UDP()}, nil) if err = <-rm.errc; err == nil { seq = rm.reply.(*v4wire.Pong).ENRSeq } return seq, err } // sendPing sends a ping message to the given node and invokes the callback // when the reply arrives. func (t *UDPv4) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) *replyMatcher { req := t.makePing(toaddr) packet, hash, err := v4wire.Encode(t.priv, req) if err != nil { errc := make(chan error, 1) errc <- err return &replyMatcher{errc: errc} } // Add a matcher for the reply to the pending reply queue. Pongs are matched if they // reference the ping we're about to send. rm := t.pending(toid, toaddr.IP, v4wire.PongPacket, func(p v4wire.Packet) (matched bool, requestDone bool) { matched = bytes.Equal(p.(*v4wire.Pong).ReplyTok, hash) if matched && callback != nil { callback() } return matched, matched }) // Send the packet. t.localNode.UDPContact(toaddr) t.write(toaddr, toid, req.Name(), packet) return rm }
 
RequestENR sends ENRRequest to the given node and waits for a response. After receiving the ENR from remote node, it will check the validity of it, construct a new node and return.
/// ---p2p/discover/v4_udp.go--- // RequestENR sends ENRRequest to the given node and waits for a response. func (t *UDPv4) RequestENR(n *enode.Node) (*enode.Node, error) { addr := &net.UDPAddr{IP: n.IP(), Port: n.UDP()} t.ensureBond(n.ID(), addr) req := &v4wire.ENRRequest{ Expiration: uint64(time.Now().Add(expiration).Unix()), } packet, hash, err := v4wire.Encode(t.priv, req) if err != nil { return nil, err } // Add a matcher for the reply to the pending reply queue. Responses are matched if // they reference the request we're about to send. rm := t.pending(n.ID(), addr.IP, v4wire.ENRResponsePacket, func(r v4wire.Packet) (matched bool, requestDone bool) { matched = bytes.Equal(r.(*v4wire.ENRResponse).ReplyTok, hash) return matched, matched }) // Send the packet and wait for the reply. t.write(addr, n.ID(), req.Name(), packet) if err := <-rm.errc; err != nil { return nil, err } // Verify the response record. respN, err := enode.New(enode.ValidSchemes, &rm.reply.(*v4wire.ENRResponse).Record) if err != nil { return nil, err } if respN.ID() != n.ID() { return nil, errors.New("invalid ID in response record") } if respN.Seq() < n.Seq() { return n, nil // response record is older } if err := netutil.CheckRelayIP(addr.IP, respN.IP()); err != nil { return nil, fmt.Errorf("invalid IP in response record: %v", err) } return respN, nil }
 
bumpInBucket moves the given node to the front of the bucket entry list.
/// ---p2p/discover/table.go--- // bumpInBucket moves the given node to the front of the bucket entry list // if it is contained in that list. func (tab *Table) bumpInBucket(b *bucket, n *node) bool { for i := range b.entries { if b.entries[i].ID() == n.ID() { if !n.IP().Equal(b.entries[i].IP()) { // Endpoint has changed, ensure that the new IP fits into table limits. tab.removeIP(b, b.entries[i].IP()) if !tab.addIP(b, n.IP()) { // It doesn't, put the previous one back. tab.addIP(b, b.entries[i].IP()) return false } } // Move it to the front. copy(b.entries[1:], b.entries[:i]) b.entries[0] = n return true } } return false }
 
Table.replace is responsible for replacing a node (last) in a bucket with a node from the replacement list.
/// ---p2p/discover/table.go--- // replace removes n from the replacement list and replaces 'last' with it if it is the // last entry in the bucket. If 'last' isn't the last entry, it has either been replaced // with someone else or became active. func (tab *Table) replace(b *bucket, last *node) *node { if len(b.entries) == 0 || b.entries[len(b.entries)-1].ID() != last.ID() { // Entry has moved, don't replace it. return nil } // Still the last entry. if len(b.replacements) == 0 { tab.deleteInBucket(b, last) return nil } r := b.replacements[tab.rand.Intn(len(b.replacements))] b.replacements = deleteNode(b.replacements, r) b.entries[len(b.entries)-1] = r tab.removeIP(b, last.IP()) return r }
 
Table.copyLiveNodes
copyLiveNodes function is responsible for periodically copying nodes from the node table to the node database if they have been in the table for a sufficient amount of time. This process ensures that the node database is updated with nodes that are verified to be alive and have been stable for a minimum duration.
Note that in the UpdateNode function, if the current node’s Seq(sequence) is smaller than the one stoerd in DB, it will do nothing. Because bigger Seq means newer node’s ENR version.
/// ---p2p/discover/table.go--- // copyLiveNodes adds nodes from the table to the database if they have been in the table // longer than seedMinTableTime. func (tab *Table) copyLiveNodes() { tab.mutex.Lock() defer tab.mutex.Unlock() now := time.Now() for _, b := range &tab.buckets { for _, n := range b.entries { if n.livenessChecks > 0 && now.Sub(n.addedAt) >= seedMinTableTime { tab.db.UpdateNode(unwrapNode(n)) } } } } /// ---p2p/enode/nodedb.go--- // UpdateNode inserts - potentially overwriting - a node into the peer database. func (db *DB) UpdateNode(node *Node) error { if node.Seq() < db.NodeSeq(node.ID()) { return nil } blob, err := rlp.EncodeToBytes(&node.r) if err != nil { return err } if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil { return err } return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq()) }

Loop: UDPv4

UDPv4.loop handles the refresh timer, the pending reply queue, and various timeouts. It continuously processes incoming replies, manages the timing of requests, and ensures local system time synchronization.
 
Steps:
Variables Initialization:
  • plist: A doubly linked list to keep track of pending reply matchers.
  • timeout: A timer to manage reply timeouts.
  • nextTimeout: Pointer to the next reply matcher whose deadline is soonest.
  • contTimeouts: Counter for continuous timeouts.
  • ntpWarnTime: Timestamp of the last NTP warning.
Initial Timeout:
Drains the first timeout channel to ignore the first timeout event.
Construct Reset Timeout Function:
  • Updates the timer to trigger when the next pending reply's deadline is reached.
  • Removes reply matchers if their deadlines are too far in the future, indicating a possible system clock issue.
(Main Loop)
The main loop continues running until the context is canceled, handling various cases using a select statement:
Case t.closeCtx.Done():
Gracefully shuts down by notifying all pending reply matchers of the closure and cleaning up resources.
Case t.addReplyMatcher:
Adds a new reply matcher to the pending list with a timeout deadline.
Case t.gotreply:
  • Handles incoming reply packets by checking each reply matcher for a match.
  • If a match is found, the matcher callback is invoked, which processes the reply and determines if the matcher should be removed from the pending list.
  • Resets the continuous timeout counter if a valid reply is received.
Case timeout.C:
Processes timeouts by:
  • Resetting the timeout variables.
  • Notifying and removing reply matchers whose deadlines have expired.
  • Checking if an NTP sync check is needed based on continuous timeouts.
 
Note that:
  • replyMatcher.errc is a communication channel to tab.loop, when UDPv4.loop finishes handling reply whether it receives remote node’ reply and executes replyMatcher.callback, or encounters reply timeout, it will notify tab.loop. When tab.loop receives the information, it can get the remote node’s reply from replyMatcher.reply if everything goes well.
  • t.reply is a commnunication channle to UDPv4.readLoop, which continuously read messages from remote nodes, and pass the message to UDPv4.loop which finds the corresponding reply callback and executes.
/// ---p2p/discover/v4_udp.go--- // loop runs in its own goroutine. it keeps track of // the refresh timer and the pending reply queue. func (t *UDPv4) loop() { // defer to signal current loop is done defer t.wg.Done() // Variables Initialization: var ( plist = list.New() timeout = time.NewTimer(0) nextTimeout *replyMatcher // head of plist when timeout was last reset contTimeouts = 0 // number of continuous timeouts to do NTP checks ntpWarnTime = time.Unix(0, 0) ) // ignore first timeout <-timeout.C // defer to stop timeout defer timeout.Stop() // Construct Reset Timeout Function: resetTimeout := func() { if plist.Front() == nil || nextTimeout == plist.Front().Value { return } // Start the timer so it fires when the next pending reply has expired. now := time.Now() for el := plist.Front(); el != nil; el = el.Next() { nextTimeout = el.Value.(*replyMatcher) if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout { timeout.Reset(dist) return } // Remove pending replies whose deadline is too far in the // future. These can occur if the system clock jumped // backwards after the deadline was assigned. nextTimeout.errc <- errClockWarp plist.Remove(el) } nextTimeout = nil timeout.Stop() } for { // Reset timeout to the next reply matcher whose deadline is soonest. resetTimeout() select { // Gracefully shuts down by notifying all pending reply matchers of the closure and cleaning up resources. case <-t.closeCtx.Done(): for el := plist.Front(); el != nil; el = el.Next() { el.Value.(*replyMatcher).errc <- errClosed } return // Adds a new reply matcher to the pending list with a timeout deadline. case p := <-t.addReplyMatcher: p.deadline = time.Now().Add(respTimeout) plist.PushBack(p) // Handles incoming reply packets case r := <-t.gotreply: var matched bool // whether any replyMatcher considered the reply acceptable. for el := plist.Front(); el != nil; el = el.Next() { p := el.Value.(*replyMatcher) if p.from == r.from && p.ptype == r.data.Kind() && p.ip.Equal(r.ip) { ok, requestDone := p.callback(r.data) matched = matched || ok p.reply = r.data // Remove the matcher if callback indicates that all replies have been received. if requestDone { p.errc <- nil plist.Remove(el) } // Reset the continuous timeout counter (time drift detection) contTimeouts = 0 } } r.matched <- matched // Processes timeouts case now := <-timeout.C: nextTimeout = nil // Notify and remove callbacks whose deadline is in the past. for el := plist.Front(); el != nil; el = el.Next() { p := el.Value.(*replyMatcher) if now.After(p.deadline) || now.Equal(p.deadline) { p.errc <- errTimeout plist.Remove(el) contTimeouts++ } } // If we've accumulated too many timeouts, do an NTP time sync check if contTimeouts > ntpFailureThreshold { if time.Since(ntpWarnTime) >= ntpWarningCooldown { ntpWarnTime = time.Now() go checkClockDrift() } contTimeouts = 0 } } } }
 

Loop: Read

UDPv4.readLoop handles the reception and initial processing of incoming UDP packets for the discovery protocol.
 
Steps:
  1. Defers:
      • t.wg.Done(): Indicates that the goroutine running readLoop has completed when the function returns.
      • If unhandled is not nil, it defers the closing of the unhandled channel.
  1. Buffer Allocation:
    1. Allocates a buffer buf of size maxPacketSize to hold the incoming UDP packets.
(Main Loop)
The main loop continues running until a permanent error occurs, handling incoming packets in each iteration:
  1. Read UDP Packets:
      • Reads data from the UDP connection into the buffer buf using t.conn.ReadFromUDP.
      • nbytes is the number of bytes read, and from is the source UDP address of the packet.
      • Handles temporary errors separately from permanent errors:
        • Temporary Errors: Logs a debug message and continues reading.
        • Permanent Errors: Logs the error and exits the loop unless the error is io.EOF.
  1. Handle the Packet:
      • Calls t.handlePacket(from, buf[:nbytes]) to process the packet.
      • If handlePacket returns a non-nil error and unhandled is not nil, sends the packet to the unhandled channel.
        • Uses a select statement to ensure it does not block indefinitely if the channel is full.
/// ---p2p/discover/v4_udp.go--- // readLoop runs in its own goroutine. it handles incoming UDP packets. func (t *UDPv4) readLoop(unhandled chan<- ReadPacket) { // Defers: defer t.wg.Done() if unhandled != nil { defer close(unhandled) } // Allocate buffer to store incoming packet buf := make([]byte, maxPacketSize) for { // Read UDP Packets: nbytes, from, err := t.conn.ReadFromUDP(buf) // Handle Error if netutil.IsTemporaryError(err) { // Ignore temporary read errors. t.log.Debug("Temporary UDP read error", "err", err) continue } else if err != nil { // Shut down the loop for permanent errors. if !errors.Is(err, io.EOF) { t.log.Debug("UDP read error", "err", err) } return } // Handle Packet: if t.handlePacket(from, buf[:nbytes]) != nil && unhandled != nil { select { case unhandled <- ReadPacket{buf[:nbytes], from}: default: } } } } /// ---go/src/net/udpsock.go--- // ReadFromUDP acts like ReadFrom but returns a UDPAddr. func (c *UDPConn) ReadFromUDP(b []byte) (n int, addr *UDPAddr, err error) { // This function is designed to allow the caller to control the lifetime // of the returned *UDPAddr and thereby prevent an allocation. // See https://blog.filippo.io/efficient-go-apis-with-the-inliner/. // The real work is done by readFromUDP, below. return c.readFromUDP(b, &UDPAddr{}) } /// ---p2p/netutil/error.go--- // IsTemporaryError checks whether the given error should be considered temporary. func IsTemporaryError(err error) bool { tempErr, ok := err.(interface { Temporary() bool }) return ok && tempErr.Temporary() || isPacketTooBig(err) }
 
UDPv4.handlePacket is responsible for processing incoming packets received via UDP. This function handles decoding, pre-verifying, and processing the packets.
Steps:
  1. Decode the Packet:
    1. decodes the received packet using v4wire.Decode. This decodes the raw byte slice into a v4wire.Packet and extracts the sender's public key (fromKey) and the packet hash.
  1. Packet Verification:
      • If the decoding fails (err != nil), it logs the error and returns it.
      • If decoding succeeds, the packet is wrapped using t.wrapPacket, which assigns handler functions to the packet based on its type.
      • The preverify function (if defined) is called to check the packet's validity before processing.
  1. Handle the Packet:
      • If there are no errors and a handle function is defined for the packet, it calls the handle function to process the packet.
      • Finally, it returns any error that may have occurred during pre-verification or handling.
 
From previous analysis, we know that local node sends Findnode message to remote node. According to code UDPv4.wrapPacket, we know that when remote node receives the Findnode message , it will sends Neighbors message which contains nodes closest to the request node .
/// ---p2p/discover/v4_udp.go--- func (t *UDPv4) handlePacket(from *net.UDPAddr, buf []byte) error { /// Decode the Packet: rawpacket, fromKey, hash, err := v4wire.Decode(buf) if err != nil { t.log.Debug("Bad discv4 packet", "addr", from, "err", err) return err } packet := t.wrapPacket(rawpacket) fromID := fromKey.ID() /// Packet Verification: if err == nil && packet.preverify != nil { err = packet.preverify(packet, from, fromID, fromKey) } t.log.Trace("<< "+packet.Name(), "id", fromID, "addr", from, "err", err) /// Handle the Packet: if err == nil && packet.handle != nil { packet.handle(packet, from, fromID, hash) } return err } // wrapPacket returns the handler functions applicable to a packet. func (t *UDPv4) wrapPacket(p v4wire.Packet) *packetHandlerV4 { var h packetHandlerV4 h.Packet = p switch p.(type) { case *v4wire.Ping: h.preverify = t.verifyPing h.handle = t.handlePing case *v4wire.Pong: h.preverify = t.verifyPong case *v4wire.Findnode: h.preverify = t.verifyFindnode h.handle = t.handleFindnode case *v4wire.Neighbors: h.preverify = t.verifyNeighbors case *v4wire.ENRRequest: h.preverify = t.verifyENRRequest h.handle = t.handleENRRequest case *v4wire.ENRResponse: h.preverify = t.verifyENRResponse } return &h }
 
Decode in the v4wire package handles decoding a discovery v4 protocol packet
/// ---p2p/discover/v4wire/v4wire.go--- // Decode reads a discovery v4 packet. func Decode(input []byte) (Packet, Pubkey, []byte, error) { if len(input) < headSize+1 { return nil, Pubkey{}, nil, ErrPacketTooSmall } hash, sig, sigdata := input[:macSize], input[macSize:headSize], input[headSize:] shouldhash := crypto.Keccak256(input[macSize:]) if !bytes.Equal(hash, shouldhash) { return nil, Pubkey{}, nil, ErrBadHash } fromKey, err := recoverNodeKey(crypto.Keccak256(input[headSize:]), sig) if err != nil { return nil, fromKey, hash, err } var req Packet switch ptype := sigdata[0]; ptype { case PingPacket: req = new(Ping) case PongPacket: req = new(Pong) case FindnodePacket: req = new(Findnode) case NeighborsPacket: req = new(Neighbors) case ENRRequestPacket: req = new(ENRRequest) case ENRResponsePacket: req = new(ENRResponse) default: return nil, fromKey, hash, fmt.Errorf("unknown type: %d", ptype) } // Here we use NewStream to allow for additional data after the first // RLP object (forward-compatibility). s := rlp.NewStream(bytes.NewReader(sigdata[1:]), 0) err = s.Decode(req) return req, fromKey, hash, err }

Message Handling

Handle Ping Message

UDPv4.verifyPing checks the request’s Expiration, and decode sender node’s key.
/// ---p2p/discover/v4_udp.go--- func (t *UDPv4) verifyPing(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, fromKey v4wire.Pubkey) error { req := h.Packet.(*v4wire.Ping) if v4wire.Expired(req.Expiration) { return errExpired } senderKey, err := v4wire.DecodePubkey(crypto.S256(), fromKey) if err != nil { return err } h.senderKey = senderKey return nil } /// ---p2p/discover/v4wire/v4wire.go--- // Expired checks whether the given UNIX time stamp is in the past. func Expired(ts uint64) bool { return time.Unix(int64(ts), 0).Before(time.Now()) } // DecodePubkey reads an encoded secp256k1 public key. func DecodePubkey(curve elliptic.Curve, e Pubkey) (*ecdsa.PublicKey, error) { p := &ecdsa.PublicKey{Curve: curve, X: new(big.Int), Y: new(big.Int)} half := len(e) / 2 p.X.SetBytes(e[:half]) p.Y.SetBytes(e[half:]) if !p.Curve.IsOnCurve(p.X, p.Y) { return nil, ErrBadPoint } return p, nil }
 
handlePing :
  1. Sends PONG message back to the remote node.
  1. If it has been a long time since the last time local node receives PONG message from the remote node, it sends PING message to the remote node. If the remote node respondes successfully, local node will insert the remote node into table.
  1. If local node and remote node are stilled bonded, local node inserts remote node directly into table.
  1. Update the time when last PING message received from the remote node.
  1. Update endpoint statement according to the PING message’s destination address, which helps local node predicts its external IP.
/// ---p2p/discover/v4_udp.go--- func (t *UDPv4) handlePing(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, mac []byte) { req := h.Packet.(*v4wire.Ping) // Reply. t.send(from, fromID, &v4wire.Pong{ To: v4wire.NewEndpoint(from, req.From.TCP), ReplyTok: mac, Expiration: uint64(time.Now().Add(expiration).Unix()), ENRSeq: t.localNode.Node().Seq(), }) // Ping back if our last pong on file is too far in the past. n := wrapNode(enode.NewV4(h.senderKey, from.IP, int(req.From.TCP), from.Port)) if time.Since(t.db.LastPongReceived(n.ID(), from.IP)) > bondExpiration { t.sendPing(fromID, from, func() { t.tab.addVerifiedNode(n) }) } else { t.tab.addVerifiedNode(n) } // Update node database and endpoint predictor. t.db.UpdateLastPingReceived(n.ID(), from.IP, time.Now()) t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)}) }

Handle Pong Message

verifyPong just passes the message to t.gotreply , after that the corresponding callback will be executed, like in the previous handle PING message example, the callback is to insert the remote node into table.
/// ---p2p/discover/v4_udp.go--- func (t *UDPv4) verifyPong(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, fromKey v4wire.Pubkey) error { req := h.Packet.(*v4wire.Pong) if v4wire.Expired(req.Expiration) { return errExpired } if !t.handleReply(fromID, from.IP, req) { return errUnsolicitedReply } t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)}) t.db.UpdateLastPongReceived(fromID, from.IP, time.Now()) return nil } // handleReply dispatches a reply packet, invoking reply matchers. It returns // whether any matcher considered the packet acceptable. func (t *UDPv4) handleReply(from enode.ID, fromIP net.IP, req v4wire.Packet) bool { matched := make(chan bool, 1) select { case t.gotreply <- reply{from, fromIP, req, matched}: // loop will handle it return <-matched case <-t.closeCtx.Done(): return false }
 

Handle Findnode Message

verifyFindnode checks whether local node has active bond with remote node. Recall in the previous code, it first calls ensureBond then sends Findnode message. This is to reduce network congestion. Because PONG and PONG has lower message payload, sending PING and PONG message can filter active node in a low cost. After nodes have established bond, they have confidence that the remote node has good chance to be active, then they send high payload message.
/// ---p2p/discover/v4_udp.go--- func (t *UDPv4) verifyFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, fromKey v4wire.Pubkey) error { req := h.Packet.(*v4wire.Findnode) if v4wire.Expired(req.Expiration) { return errExpired } if !t.checkBond(fromID, from.IP) { // No endpoint proof pong exists, we don't process the packet. This prevents an // attack vector where the discovery protocol could be used to amplify traffic in a // DDOS attack. A malicious actor would send a findnode request with the IP address // and UDP port of the target as the source address. The recipient of the findnode // packet would then send a neighbors packet (which is a much bigger packet than // findnode) to the victim. return errUnknownNode } return nil } // checkBond checks if the given node has a recent enough endpoint proof. func (t *UDPv4) checkBond(id enode.ID, ip net.IP) bool { return time.Since(t.db.LastPongReceived(id, ip)) < bondExpiration }
 
handleFindnode finds nodes closest to the remote node in local table, and sends those nodes to the remote node using Neighbors message.
/// ---p2p/discover/v4_udp.go--- func (t *UDPv4) handleFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, mac []byte) { req := h.Packet.(*v4wire.Findnode) // Determine closest nodes. target := enode.ID(crypto.Keccak256Hash(req.Target[:])) closest := t.tab.findnodeByID(target, bucketSize, true).entries // Send neighbors in chunks with at most maxNeighbors per packet // to stay below the packet size limit. p := v4wire.Neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())} var sent bool for _, n := range closest { if netutil.CheckRelayIP(from.IP, n.IP()) == nil { p.Nodes = append(p.Nodes, nodeToRPC(n)) } if len(p.Nodes) == v4wire.MaxNeighbors { t.send(from, fromID, &p) p.Nodes = p.Nodes[:0] sent = true } } if len(p.Nodes) > 0 || !sent { t.send(from, fromID, &p) } } /// ---p2p/discover/table.go--- // findnodeByID returns the n nodes in the table that are closest to the given id. // This is used by the FINDNODE/v4 handler. // // The preferLive parameter says whether the caller wants liveness-checked results. If // preferLive is true and the table contains any verified nodes, the result will not // contain unverified nodes. However, if there are no verified nodes at all, the result // will contain unverified nodes. func (tab *Table) findnodeByID(target enode.ID, nresults int, preferLive bool) *nodesByDistance { tab.mutex.Lock() defer tab.mutex.Unlock() // Scan all buckets. There might be a better way to do this, but there aren't that many // buckets, so this solution should be fine. The worst-case complexity of this loop // is O(tab.len() * nresults). nodes := &nodesByDistance{target: target} liveNodes := &nodesByDistance{target: target} for _, b := range &tab.buckets { for _, n := range b.entries { nodes.push(n, nresults) if preferLive && n.livenessChecks > 0 { liveNodes.push(n, nresults) } } } if preferLive && len(liveNodes.entries) > 0 { return liveNodes } return nodes } // push adds the given node to the list, keeping the total size below maxElems. func (h *nodesByDistance) push(n *node, maxElems int) { ix := sort.Search(len(h.entries), func(i int) bool { return enode.DistCmp(h.target, h.entries[i].ID(), n.ID()) > 0 }) end := len(h.entries) if len(h.entries) < maxElems { h.entries = append(h.entries, n) } if ix < end { // Slide existing entries down to make room. // This will overwrite the entry we just appended. copy(h.entries[ix+1:], h.entries[ix:]) h.entries[ix] = n } }

Handle Neighbors Message

verifyNeighbors checks:
  1. the reply is handled in time.
  1. calls handleReply to pass reply to t.gotreply . After UDPv4.loop receives the reply, it will calls corresponding replyMatcher.callback to handle. In the callback, it gets nodes and inserts them into table.
/// ---p2p/discover/v4_udp.go--- func (t *UDPv4) verifyNeighbors(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, fromKey v4wire.Pubkey) error { req := h.Packet.(*v4wire.Neighbors) if v4wire.Expired(req.Expiration) { return errExpired } if !t.handleReply(fromID, from.IP, h.Packet) { return errUnsolicitedReply } return nil } // handleReply dispatches a reply packet, invoking reply matchers. It returns // whether any matcher considered the packet acceptable. func (t *UDPv4) handleReply(from enode.ID, fromIP net.IP, req v4wire.Packet) bool { matched := make(chan bool, 1) select { case t.gotreply <- reply{from, fromIP, req, matched}: // loop will handle it return <-matched case <-t.closeCtx.Done(): return false } } /// ---p2p/discover/v4wire/v4wire.go--- // Expired checks whether the given UNIX time stamp is in the past. func Expired(ts uint64) bool { return time.Unix(int64(ts), 0).Before(time.Now()) }

Handle ENRRequest Message

handleENRRequest sends local node’s ENR to the remote node.
/// ---p2p/discover/v4_udp.go--- func (t *UDPv4) verifyENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, fromKey v4wire.Pubkey) error { req := h.Packet.(*v4wire.ENRRequest) if v4wire.Expired(req.Expiration) { return errExpired } if !t.checkBond(fromID, from.IP) { return errUnknownNode } return nil } func (t *UDPv4) handleENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, mac []byte) { t.send(from, fromID, &v4wire.ENRResponse{ ReplyTok: mac, Record: *t.localNode.Node().Record(), }) } /// ---p2p/enode/node.go--- // Node represents a host on the network. type Node struct { r enr.Record id ID } // Record returns the node's record. The return value is a copy and may // be modified by the caller. func (n *Node) Record() *enr.Record { cpy := n.r return &cpy } /// ---p2p/enr/enr.go--- // Record represents a node record. The zero value is an empty record. type Record struct { seq uint64 // sequence number signature []byte // the signature raw []byte // RLP encoded record pairs []pair // sorted list of all key/value pairs }

Handle ENRResponse Message

In the RequestENR code, after node receives ENR response, it verifies the ENR of the sender node, RequestENR function’s caller will update the node’s ENR in the table.
/// ---p2p/discover/v4_udp.go--- func (t *UDPv4) verifyENRResponse(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, fromKey v4wire.Pubkey) error { if !t.handleReply(fromID, from.IP, h.Packet) { return errUnsolicitedReply } return nil } // handleReply dispatches a reply packet, invoking reply matchers. It returns // whether any matcher considered the packet acceptable. func (t *UDPv4) handleReply(from enode.ID, fromIP net.IP, req v4wire.Packet) bool { matched := make(chan bool, 1) select { case t.gotreply <- reply{from, fromIP, req, matched}: // loop will handle it return <-matched case <-t.closeCtx.Done(): return false } }

Start V5 Discovery Service

/// ---p2p/discover/v5_udp.go--- // ListenV5 listens on the given connection. func ListenV5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { t, err := newUDPv5(conn, ln, cfg) if err != nil { return nil, err } go t.tab.loop() t.wg.Add(2) go t.readLoop() go t.dispatch() return t, nil }