Setup Dial Scheduler

Setup Dial Scheduler

Geth sets up dial scheduler to initiate connections with remote peer. Recall in the setting up listening process, it receives remote node message and try to establish connection (RLPx handshake and Protocol handshake). Dial scheduler handling initiating the connection.
 
Basic process:
  1. Geth fetches nodes from nodes storage which are fetched through the discovery mechnism
  1. Geth dials those nodes to establish TCP connection according to dial scheduler strategy.
  1. Geth establishs connection with them through RLPx handshake and Protocol handshake which has been analyzed before.
 
setupDialScheduler sets up and initializes the dialScheduler, which is responsible for managing outbound connections to peers.
Steps:
  1. Configuration Initialization:
      • Initializes a dialConfig structure with various parameters:
        • self: The local node ID.
        • maxDialPeers: The maximum number of dialed peers, calculated using srv.maxDialedConns().
        • maxActiveDials: The maximum number of active dials, set to srv.MaxPendingPeers.
        • log: The logger for logging dial events.
        • netRestrict: Network restrictions.
        • dialer: The NodeDialer used to dial outbound connections, defaulting to tcpDialer if not provided. It handles establishing TCP connection with remote nodes. The connection will be used to perform RLPx and Protocol handshake with remote nodes later.
        • clock: The clock used for timing and scheduling, defaulting to the system clock if not provided.
      • Sets the resolver to srv.ntab if it exists, which is the node table for peer discovery.
  1. DialScheduler Initialization:
    1. Calls newDialScheduler with the initialized configuration, the discovery mix (srv.discmix), and the connection setup function (srv.SetupConn). This creates a new dialScheduler instance with the provided configuration and parameters. And it runs go rountines to dial nodes.
  1. Adding Static Nodes:
    1. Iterates over the srv.StaticNodes slice, which contains pre-configured nodes that should always be connected. For each static node, calls srv.dialsched.addStatic(n) to add it to the dialScheduler. Those static nodes will be dialed in go routine.
/// ---p2p/server.go--- type dialConfig struct { self enode.ID // our own ID maxDialPeers int // maximum number of dialed peers maxActiveDials int // maximum number of active dials netRestrict *netutil.Netlist // IP netrestrict list, disabled if nil resolver nodeResolver dialer NodeDialer log log.Logger clock mclock.Clock rand *mrand.Rand } func (srv *Server) setupDialScheduler() { // Configuration Initialization: config := dialConfig{ self: srv.localnode.ID(), maxDialPeers: srv.maxDialedConns(), maxActiveDials: srv.MaxPendingPeers, log: srv.Logger, netRestrict: srv.NetRestrict, dialer: srv.Dialer, clock: srv.clock, } if srv.ntab != nil { config.resolver = srv.ntab } if config.dialer == nil { config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}} } // DialScheduler Initialization: srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn) // Adding Static Nodes: for _, n := range srv.StaticNodes { srv.dialsched.addStatic(n) } } func (srv *Server) maxDialedConns() (limit int) { if srv.NoDial || srv.MaxPeers == 0 { return 0 } if srv.DialRatio == 0 { limit = srv.MaxPeers / defaultDialRatio } else { limit = srv.MaxPeers / srv.DialRatio } if limit == 0 { limit = 1 } return limit } /// ---p2p/dial.go--- // addStatic adds a static dial candidate. func (d *dialScheduler) addStatic(n *enode.Node) { select { case d.addStaticCh <- n: case <-d.ctx.Done(): } }
 
newDialScheduler function initializes a new dialScheduler instance. And starts go routines to dial remote nodes.
Steps:
  1. Configuration Initialization:
    1. Calls the withDefaults method on the provided config to ensure all default values are set. This method initializes fields like maxActiveDials, log, clock, and rand if they are not already set.
  1. DialScheduler Initialization:
    1. Initializes a new dialScheduler instance d with the following fields:
      • dialConfig: The configuration passed to the function, with defaults set.
      • historyTimer: An alarm based on the clock from the configuration.
      • setupFunc: The protocol-level nodes connection setup function. Currently is srv.SetupConn.
      • dialing: A map to track active dialing tasks.
      • static: A map to track static dial tasks.
      • peers: A map to track all connected peers.
      • doneCh: A channel to signal when a dial task is done.
      • nodesIn: A channel to receive nodes from the iterator which is to be dialed.
      • addStaticCh: A channel to add static nodes.
      • remStaticCh: A channel to remove static nodes.
      • addPeerCh: A channel to signal when a peer is added.
      • remPeerCh: A channel to signal when a peer is removed.
  1. Initial Logging and Context Setup.
  1. Starting Goroutines.
/// ---p2p/dial.go--- func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler { // Configuration Initialization: cfg := config.withDefaults() // DialScheduler Initialization: d := &dialScheduler{ dialConfig: cfg, historyTimer: mclock.NewAlarm(cfg.clock), setupFunc: setupFunc, dialing: make(map[enode.ID]*dialTask), static: make(map[enode.ID]*dialTask), peers: make(map[enode.ID]struct{}), doneCh: make(chan *dialTask), nodesIn: make(chan *enode.Node), addStaticCh: make(chan *enode.Node), remStaticCh: make(chan *enode.Node), addPeerCh: make(chan *conn), remPeerCh: make(chan *conn), } // Initial Logging and Context Setup. d.lastStatsLog = d.clock.Now() d.ctx, d.cancel = context.WithCancel(context.Background()) // Starting Goroutines. d.wg.Add(2) go d.readNodes(it) go d.loop(it) return d } func (cfg dialConfig) withDefaults() dialConfig { if cfg.maxActiveDials == 0 { cfg.maxActiveDials = defaultMaxPendingPeers } if cfg.log == nil { cfg.log = log.Root() } if cfg.clock == nil { cfg.clock = mclock.System{} } if cfg.rand == nil { seedb := make([]byte, 8) crand.Read(seedb) seed := int64(binary.BigEndian.Uint64(seedb)) cfg.rand = mrand.New(mrand.NewSource(seed)) } return cfg } /// ---p2p/dial.go--- // Connectivity defaults. defaultMaxPendingPeers = 50
 
dialScheduler.setupFunc is srv.SetupConn, which handles setting up protocol-level connection (RLPx and protocol handshake) with remote node as we have analyzed before. Here, it is used to initiate the connection message rather than receives message from remote node.
/// ---p2p/server.go--- // SetupConn runs the handshakes and attempts to add the connection // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error { c := &conn{fd: fd, flags: flags, cont: make(chan error)} if dialDest == nil { c.transport = srv.newTransport(fd, nil) } else { c.transport = srv.newTransport(fd, dialDest.Pubkey()) } err := srv.setupConn(c, dialDest) if err != nil { if !c.is(inboundConn) { markDialError(err) } c.close(err) } return err }

Read nodes

readNodes calls it.Next() to fetch next node, and passes the fetched node to dialScheduler.nodesIn to be dialed later.
Here the it is FairMix which pickes nodes from two node discovery mechanisms. details.
/// ---p2p/dial.go--- // readNodes runs in its own goroutine and delivers nodes from // the input iterator to the nodesIn channel. func (d *dialScheduler) readNodes(it enode.Iterator) { defer d.wg.Done() for it.Next() { select { case d.nodesIn <- it.Node(): case <-d.ctx.Done(): } } }

Dialer Loop

dialScheduler.loop function is the main event loop of the dialScheduler which is responsible for managing dialing tasks.
 
Steps:
  1. Variable Initialization:
    1. Declares nodesCh which will be used to receive nodes from the iterator when slots are available.
  1. Dial Slot Management:
      • Calculates the number of free dial slots available.
      • Starts static dials up to the number of available slots.
      • Sets nodesCh to d.nodesIn if there are available slots, otherwise sets it to nil.
      • Rearms the history timer to fire when the next item in d.history expires.
      • Logs dialer statistics if necessary.
  1. Dialing New node:
      • If there are available dial slots, receives a node from nodesCh.
      • Checks if the node can be dialed using checkDial.If the node is valid, starts a new dial task.
  1. Handling Dial Task Completion:
      • When a dial task completes, removes it from the dialing map and updates the static pool.
      • Increments the counter for tasks completed since the last log.
  1. Handling Peer Addition:
      • When a peer is added, increments d.dialPeers if it is a dynamic or static dial.
      • Adds the peer to the peers map.
      • Removes the peer from the static pool if it exists there.
  1. Handling Peer Removal:
      • When a peer is removed, decrements d.dialPeers if it is a dynamic or static dial.
      • Removes the peer from the peers map and updates the static pool.
  1. Handling Static Node Addition:
      • When a static node is added, checks if it already exists in the static map.
      • If not, creates a new dial task and adds it to the static map.
      • If the node can be dialed, adds the task to the static pool.
  1. Handling Static Node Removal:
    1. When a static node is removed, deletes it from the static map and removes it from the static pool if it exists.
  1. Handling History Expiry:
    1. When the history timer fires, expires old items in the history. History is used to reduce dials counts to recently failed-to-dial nodes.
  1. Dial Process Cancellation:
    1. If the context is done, closes the iterator and breaks out of the loop.
  1. Cleanup
      • Stops the history timer.
      • Waits for all remaining dial tasks to complete.
      • Marks the wait group as done.
// loop is the main loop of the dialer. func (d *dialScheduler) loop(it enode.Iterator) { // Variable Initialization: var ( nodesCh chan *enode.Node ) loop: for { // Dial Slot Management: // Launch new dials if slots are available. slots := d.freeDialSlots() slots -= d.startStaticDials(slots) if slots > 0 { nodesCh = d.nodesIn } else { nodesCh = nil } d.rearmHistoryTimer() d.logStats() select { // Dialing New node: case node := <-nodesCh: if err := d.checkDial(node); err != nil { d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err) } else { d.startDial(newDialTask(node, dynDialedConn)) } // Handling Dial Task Completion: case task := <-d.doneCh: id := task.dest().ID() delete(d.dialing, id) d.updateStaticPool(id) d.doneSinceLastLog++ // Handling Peer Addition: case c := <-d.addPeerCh: if c.is(dynDialedConn) || c.is(staticDialedConn) { d.dialPeers++ } id := c.node.ID() d.peers[id] = struct{}{} // Remove from static pool because the node is now connected. task := d.static[id] if task != nil && task.staticPoolIndex >= 0 { d.removeFromStaticPool(task.staticPoolIndex) } // TODO: cancel dials to connected peers // Handling Peer Removal: case c := <-d.remPeerCh: if c.is(dynDialedConn) || c.is(staticDialedConn) { d.dialPeers-- } delete(d.peers, c.node.ID()) d.updateStaticPool(c.node.ID()) // Handling Static Node Addition: case node := <-d.addStaticCh: id := node.ID() _, exists := d.static[id] d.log.Trace("Adding static node", "id", id, "ip", node.IP(), "added", !exists) if exists { continue loop } task := newDialTask(node, staticDialedConn) d.static[id] = task if d.checkDial(node) == nil { d.addToStaticPool(task) } // Handling Static Node Removal: case node := <-d.remStaticCh: id := node.ID() task := d.static[id] d.log.Trace("Removing static node", "id", id, "ok", task != nil) if task != nil { delete(d.static, id) if task.staticPoolIndex >= 0 { d.removeFromStaticPool(task.staticPoolIndex) } } // Handling History Expiry: case <-d.historyTimer.C(): d.expireHistory() // Dial Process Cancellation: case <-d.ctx.Done(): it.Close() break loop } } // Cleanup d.historyTimer.Stop() for range d.dialing { <-d.doneCh } d.wg.Done() }
 
freeDialSlots calculates current free slots can be used to dial new nodes according to current dialing nodes count and dial configuration.
/// ---p2p/dial.go--- // freeDialSlots returns the number of free dial slots. The result can be negative // when peers are connected while their task is still running. func (d *dialScheduler) freeDialSlots() int { slots := (d.maxDialPeers - d.dialPeers) * 2 if slots > d.maxActiveDials { slots = d.maxActiveDials } free := slots - len(d.dialing) return free }
 
startStaticDials loops static nodes stored in staticPool to dial, after which the static node will be removed from staticPool to be prevented from being dialed again.
/// ---p2p/dial.go--- // startStaticDials starts n static dial tasks. func (d *dialScheduler) startStaticDials(n int) (started int) { for started = 0; started < n && len(d.staticPool) > 0; started++ { idx := d.rand.Intn(len(d.staticPool)) task := d.staticPool[idx] d.startDial(task) d.removeFromStaticPool(idx) } return started }
 
checkDial checks whether the node is qualified to be dialed.
/// ---p2p/dial.go--- // checkDial returns an error if node n should not be dialed. func (d *dialScheduler) checkDial(n *enode.Node) error { if n.ID() == d.self { return errSelf } if n.IP() != nil && n.TCP() == 0 { // This check can trigger if a non-TCP node is found // by discovery. If there is no IP, the node is a static // node and the actual endpoint will be resolved later in dialTask. return errNoPort } if _, ok := d.dialing[n.ID()]; ok { return errAlreadyDialing } if _, ok := d.peers[n.ID()]; ok { return errAlreadyConnected } if d.netRestrict != nil && !d.netRestrict.Contains(n.IP()) { return errNetRestrict } if d.history.contains(string(n.ID().Bytes())) { return errRecentlyDialed } return nil }

Dial

After the loop receives nodes from node discovery mechanism. It calls startDial to initiate a new dial task for a given node and try to establish protocol-level connection.
Steps:
  1. Retrieve Node Information:
    1. Retrieves the destination node of the dial task using the dest method of the dialTask.
  1. Logging.
  1. Add Node to Dial History:
    1. Adds the node's ID to the dial history to prevent redialing the same node within a short period.
  1. Track Active Dial Task:
    1. Adds the dial task to the dialing map, which tracks currently active dial tasks.
  1. Run Dial Task in Separate Goroutine:
      • Starts a new goroutine to run the dial task. The run method of the dialTask is called, passing the dialScheduler as an argument.
      • Once the dial task completes, it sends the task to the doneCh channel.
/// ---p2p/dial.go--- func newDialTask(dest *enode.Node, flags connFlag) *dialTask { t := &dialTask{flags: flags, staticPoolIndex: -1} t.destPtr.Store(dest) return t } // This is the amount of time spent waiting in between redialing a certain node. The // limit is a bit higher than inboundThrottleTime to prevent failing dials in small // private networks. dialHistoryExpiration = inboundThrottleTime + 5*time.Second // startDial runs the given dial task in a separate goroutine. func (d *dialScheduler) startDial(task *dialTask) { /// Retrieve Node Information: node := task.dest() /// Logging. d.log.Trace("Starting p2p dial", "id", node.ID(), "ip", node.IP(), "flag", task.flags) /// Add Node to Dial History: hkey := string(node.ID().Bytes()) d.history.add(hkey, d.clock.Now().Add(dialHistoryExpiration)) /// Track Active Dial Task: d.dialing[node.ID()] = task /// Run Dial Task in Separate Goroutine: go func() { task.run(d) d.doneCh <- task }() } /// ---p2p/server.go--- // This time limits inbound connection attempts per source IP. inboundThrottleTime = 30 * time.Second
 
dialTask.run handles executing the dialing process for a node, including resolving the node's address if necessary and handling any potential errors.
Steps:
  1. Resolve Node Address (if necessary):
      • The function first checks if the node needs to be resolved (i.e., if it's a static node without an IP address).
      • If resolution is needed, the resolve method is called. If the resolution fails (resolve returns false), the function exits early.
      • Nodes fetched from node discovery mechanism doesn’t need to be resolved, because they have IP. Only static nodes may need to be resolved.
  1. Attempt to Dial the Node:
      • The dial method is called to attempt a connection to the node.
      • If the dialing fails and the task is for a static node (t.flags&staticDialedConn != 0), the function tries to resolve the node's address again.
      • If the resolution succeeds after the initial failure, it attempts to dial the node once more.
/// ---p2p/dial.go--- func (t *dialTask) run(d *dialScheduler) { if t.needResolve() && !t.resolve(d) { return } err := t.dial(d, t.dest()) if err != nil { // For static nodes, resolve one more time if dialing fails. if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 { if t.resolve(d) { t.dial(d, t.dest()) } } } } /// ---p2p/server.go--- const ( dynDialedConn connFlag = 1 << iota staticDialedConn inboundConn trustedConn ) /// ---p2p/dial.go--- func (t *dialTask) needResolve() bool { return t.flags&staticDialedConn != 0 && t.dest().IP() == nil } // resolve attempts to find the current endpoint for the destination // using discovery. // // Resolve operations are throttled with backoff to avoid flooding the // discovery network with useless queries for nodes that don't exist. // The backoff delay resets when the node is found. func (t *dialTask) resolve(d *dialScheduler) bool { if d.resolver == nil { return false } if t.resolveDelay == 0 { t.resolveDelay = initialResolveDelay } if t.lastResolved > 0 && time.Duration(d.clock.Now()-t.lastResolved) < t.resolveDelay { return false } node := t.dest() resolved := d.resolver.Resolve(node) t.lastResolved = d.clock.Now() if resolved == nil { t.resolveDelay *= 2 if t.resolveDelay > maxResolveDelay { t.resolveDelay = maxResolveDelay } d.log.Debug("Resolving node failed", "id", node.ID(), "newdelay", t.resolveDelay) return false } // The node was found. t.resolveDelay = initialResolveDelay t.destPtr.Store(resolved) d.log.Debug("Resolved node", "id", resolved.ID(), "addr", &net.TCPAddr{IP: resolved.IP(), Port: resolved.TCP()}) return true }
 
dial calls d.dialer.Dial to establish TCP connection with the remote node first (d.dialer is a TCP dialer by default). This returns a fd (file descriptor) to communicate with remote node.
After TCP connection has been established, it calls d.setupFunc which is Server.SetupConn to perform RLPx and Protocol handshake with the remote node to establish protocol-level connection. details.
/// ---p2p/dial.go--- // dial performs the actual connection attempt. func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error { dialMeter.Mark(1) fd, err := d.dialer.Dial(d.ctx, dest) if err != nil { d.log.Trace("Dial error", "id", dest.ID(), "addr", nodeAddr(dest), "conn", t.flags, "err", cleanupDialErr(err)) dialConnectionError.Mark(1) return &dialError{err} } return d.setupFunc(newMeteredConn(fd), t.flags, dest) }