Setup Dial Scheduler
Geth sets up dial scheduler to initiate connections with remote peer. Recall in the setting up listening process, it receives remote node message and try to establish connection (RLPx handshake and Protocol handshake). Dial scheduler handling initiating the connection.
Basic process:
- Geth fetches nodes from nodes storage which are fetched through the discovery mechnism
- Geth dials those nodes to establish TCP connection according to dial scheduler strategy.
- Geth establishs connection with them through RLPx handshake and Protocol handshake which has been analyzed before.
setupDialScheduler
sets up and initializes the dialScheduler
, which is responsible for managing outbound connections to peers.Steps:
- Configuration Initialization:
- Initializes a
dialConfig
structure with various parameters: self
: The local node ID.maxDialPeers
: The maximum number of dialed peers, calculated usingsrv.maxDialedConns()
.maxActiveDials
: The maximum number of active dials, set tosrv.MaxPendingPeers
.log
: The logger for logging dial events.netRestrict
: Network restrictions.dialer
: TheNodeDialer
used to dial outbound connections, defaulting totcpDialer
if not provided. It handles establishing TCP connection with remote nodes. The connection will be used to perform RLPx and Protocol handshake with remote nodes later.clock
: The clock used for timing and scheduling, defaulting to the system clock if not provided.- Sets the
resolver
tosrv.ntab
if it exists, which is the node table for peer discovery.
- DialScheduler Initialization:
Calls
newDialScheduler
with the initialized configuration, the discovery mix (srv.discmix
), and the connection setup function (srv.SetupConn
). This creates a new dialScheduler
instance with the provided configuration and parameters. And it runs go rountines to dial nodes.- Adding Static Nodes:
Iterates over the
srv.StaticNodes
slice, which contains pre-configured nodes that should always be connected. For each static node, calls srv.dialsched.addStatic(n)
to add it to the dialScheduler
. Those static nodes will be dialed in go routine./// ---p2p/server.go--- type dialConfig struct { self enode.ID // our own ID maxDialPeers int // maximum number of dialed peers maxActiveDials int // maximum number of active dials netRestrict *netutil.Netlist // IP netrestrict list, disabled if nil resolver nodeResolver dialer NodeDialer log log.Logger clock mclock.Clock rand *mrand.Rand } func (srv *Server) setupDialScheduler() { // Configuration Initialization: config := dialConfig{ self: srv.localnode.ID(), maxDialPeers: srv.maxDialedConns(), maxActiveDials: srv.MaxPendingPeers, log: srv.Logger, netRestrict: srv.NetRestrict, dialer: srv.Dialer, clock: srv.clock, } if srv.ntab != nil { config.resolver = srv.ntab } if config.dialer == nil { config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}} } // DialScheduler Initialization: srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn) // Adding Static Nodes: for _, n := range srv.StaticNodes { srv.dialsched.addStatic(n) } } func (srv *Server) maxDialedConns() (limit int) { if srv.NoDial || srv.MaxPeers == 0 { return 0 } if srv.DialRatio == 0 { limit = srv.MaxPeers / defaultDialRatio } else { limit = srv.MaxPeers / srv.DialRatio } if limit == 0 { limit = 1 } return limit } /// ---p2p/dial.go--- // addStatic adds a static dial candidate. func (d *dialScheduler) addStatic(n *enode.Node) { select { case d.addStaticCh <- n: case <-d.ctx.Done(): } }
newDialScheduler
function initializes a new dialScheduler
instance. And starts go routines to dial remote nodes.Steps:
- Configuration Initialization:
Calls the
withDefaults
method on the provided config
to ensure all default values are set. This method initializes fields like maxActiveDials
, log
, clock
, and rand
if they are not already set.- DialScheduler Initialization:
dialConfig
: The configuration passed to the function, with defaults set.historyTimer
: An alarm based on the clock from the configuration.setupFunc
: The protocol-level nodes connection setup function. Currently issrv.SetupConn
.dialing
: A map to track active dialing tasks.static
: A map to track static dial tasks.peers
: A map to track all connected peers.doneCh
: A channel to signal when a dial task is done.nodesIn
: A channel to receive nodes from the iterator which is to be dialed.addStaticCh
: A channel to add static nodes.remStaticCh
: A channel to remove static nodes.addPeerCh
: A channel to signal when a peer is added.remPeerCh
: A channel to signal when a peer is removed.
Initializes a new
dialScheduler
instance d
with the following fields:- Initial Logging and Context Setup.
- Starting Goroutines.
/// ---p2p/dial.go--- func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler { // Configuration Initialization: cfg := config.withDefaults() // DialScheduler Initialization: d := &dialScheduler{ dialConfig: cfg, historyTimer: mclock.NewAlarm(cfg.clock), setupFunc: setupFunc, dialing: make(map[enode.ID]*dialTask), static: make(map[enode.ID]*dialTask), peers: make(map[enode.ID]struct{}), doneCh: make(chan *dialTask), nodesIn: make(chan *enode.Node), addStaticCh: make(chan *enode.Node), remStaticCh: make(chan *enode.Node), addPeerCh: make(chan *conn), remPeerCh: make(chan *conn), } // Initial Logging and Context Setup. d.lastStatsLog = d.clock.Now() d.ctx, d.cancel = context.WithCancel(context.Background()) // Starting Goroutines. d.wg.Add(2) go d.readNodes(it) go d.loop(it) return d } func (cfg dialConfig) withDefaults() dialConfig { if cfg.maxActiveDials == 0 { cfg.maxActiveDials = defaultMaxPendingPeers } if cfg.log == nil { cfg.log = log.Root() } if cfg.clock == nil { cfg.clock = mclock.System{} } if cfg.rand == nil { seedb := make([]byte, 8) crand.Read(seedb) seed := int64(binary.BigEndian.Uint64(seedb)) cfg.rand = mrand.New(mrand.NewSource(seed)) } return cfg } /// ---p2p/dial.go--- // Connectivity defaults. defaultMaxPendingPeers = 50
dialScheduler.setupFunc
is srv.SetupConn
, which handles setting up protocol-level connection (RLPx and protocol handshake) with remote node as we have analyzed before. Here, it is used to initiate the connection message rather than receives message from remote node./// ---p2p/server.go--- // SetupConn runs the handshakes and attempts to add the connection // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error { c := &conn{fd: fd, flags: flags, cont: make(chan error)} if dialDest == nil { c.transport = srv.newTransport(fd, nil) } else { c.transport = srv.newTransport(fd, dialDest.Pubkey()) } err := srv.setupConn(c, dialDest) if err != nil { if !c.is(inboundConn) { markDialError(err) } c.close(err) } return err }
Read nodes
readNodes
calls it.Next()
to fetch next node, and passes the fetched node to dialScheduler.nodesIn
to be dialed later. /// ---p2p/dial.go--- // readNodes runs in its own goroutine and delivers nodes from // the input iterator to the nodesIn channel. func (d *dialScheduler) readNodes(it enode.Iterator) { defer d.wg.Done() for it.Next() { select { case d.nodesIn <- it.Node(): case <-d.ctx.Done(): } } }
Dialer Loop
dialScheduler.loop
function is the main event loop of the dialScheduler
which is responsible for managing dialing tasks.Steps:
- Variable Initialization:
Declares
nodesCh
which will be used to receive nodes from the iterator when slots are available.- Dial Slot Management:
- Calculates the number of free dial slots available.
- Starts static dials up to the number of available slots.
- Sets
nodesCh
tod.nodesIn
if there are available slots, otherwise sets it tonil
. - Rearms the history timer to fire when the next item in
d.history
expires. - Logs dialer statistics if necessary.
- Dialing New node:
- If there are available dial slots, receives a node from
nodesCh
. - Checks if the node can be dialed using
checkDial
.If the node is valid, starts a new dial task.
- Handling Dial Task Completion:
- When a dial task completes, removes it from the
dialing
map and updates the static pool. - Increments the counter for tasks completed since the last log.
- Handling Peer Addition:
- When a peer is added, increments
d.dialPeers
if it is a dynamic or static dial. - Adds the peer to the
peers
map. - Removes the peer from the static pool if it exists there.
- Handling Peer Removal:
- When a peer is removed, decrements
d.dialPeers
if it is a dynamic or static dial. - Removes the peer from the
peers
map and updates the static pool.
- Handling Static Node Addition:
- When a static node is added, checks if it already exists in the
static
map. - If not, creates a new dial task and adds it to the
static
map. - If the node can be dialed, adds the task to the static pool.
- Handling Static Node Removal:
When a static node is removed, deletes it from the
static
map and removes it from the static pool if it exists.- Handling History Expiry:
When the history timer fires, expires old items in the history. History is used to reduce dials counts to recently failed-to-dial nodes.
- Dial Process Cancellation:
If the context is done, closes the iterator and breaks out of the loop.
- Cleanup
- Stops the history timer.
- Waits for all remaining dial tasks to complete.
- Marks the wait group as done.
// loop is the main loop of the dialer. func (d *dialScheduler) loop(it enode.Iterator) { // Variable Initialization: var ( nodesCh chan *enode.Node ) loop: for { // Dial Slot Management: // Launch new dials if slots are available. slots := d.freeDialSlots() slots -= d.startStaticDials(slots) if slots > 0 { nodesCh = d.nodesIn } else { nodesCh = nil } d.rearmHistoryTimer() d.logStats() select { // Dialing New node: case node := <-nodesCh: if err := d.checkDial(node); err != nil { d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err) } else { d.startDial(newDialTask(node, dynDialedConn)) } // Handling Dial Task Completion: case task := <-d.doneCh: id := task.dest().ID() delete(d.dialing, id) d.updateStaticPool(id) d.doneSinceLastLog++ // Handling Peer Addition: case c := <-d.addPeerCh: if c.is(dynDialedConn) || c.is(staticDialedConn) { d.dialPeers++ } id := c.node.ID() d.peers[id] = struct{}{} // Remove from static pool because the node is now connected. task := d.static[id] if task != nil && task.staticPoolIndex >= 0 { d.removeFromStaticPool(task.staticPoolIndex) } // TODO: cancel dials to connected peers // Handling Peer Removal: case c := <-d.remPeerCh: if c.is(dynDialedConn) || c.is(staticDialedConn) { d.dialPeers-- } delete(d.peers, c.node.ID()) d.updateStaticPool(c.node.ID()) // Handling Static Node Addition: case node := <-d.addStaticCh: id := node.ID() _, exists := d.static[id] d.log.Trace("Adding static node", "id", id, "ip", node.IP(), "added", !exists) if exists { continue loop } task := newDialTask(node, staticDialedConn) d.static[id] = task if d.checkDial(node) == nil { d.addToStaticPool(task) } // Handling Static Node Removal: case node := <-d.remStaticCh: id := node.ID() task := d.static[id] d.log.Trace("Removing static node", "id", id, "ok", task != nil) if task != nil { delete(d.static, id) if task.staticPoolIndex >= 0 { d.removeFromStaticPool(task.staticPoolIndex) } } // Handling History Expiry: case <-d.historyTimer.C(): d.expireHistory() // Dial Process Cancellation: case <-d.ctx.Done(): it.Close() break loop } } // Cleanup d.historyTimer.Stop() for range d.dialing { <-d.doneCh } d.wg.Done() }
freeDialSlots
calculates current free slots can be used to dial new nodes according to current dialing nodes count and dial configuration./// ---p2p/dial.go--- // freeDialSlots returns the number of free dial slots. The result can be negative // when peers are connected while their task is still running. func (d *dialScheduler) freeDialSlots() int { slots := (d.maxDialPeers - d.dialPeers) * 2 if slots > d.maxActiveDials { slots = d.maxActiveDials } free := slots - len(d.dialing) return free }
startStaticDials
loops static nodes stored in staticPool
to dial, after which the static node will be removed from staticPool
to be prevented from being dialed again./// ---p2p/dial.go--- // startStaticDials starts n static dial tasks. func (d *dialScheduler) startStaticDials(n int) (started int) { for started = 0; started < n && len(d.staticPool) > 0; started++ { idx := d.rand.Intn(len(d.staticPool)) task := d.staticPool[idx] d.startDial(task) d.removeFromStaticPool(idx) } return started }
checkDial
checks whether the node is qualified to be dialed. /// ---p2p/dial.go--- // checkDial returns an error if node n should not be dialed. func (d *dialScheduler) checkDial(n *enode.Node) error { if n.ID() == d.self { return errSelf } if n.IP() != nil && n.TCP() == 0 { // This check can trigger if a non-TCP node is found // by discovery. If there is no IP, the node is a static // node and the actual endpoint will be resolved later in dialTask. return errNoPort } if _, ok := d.dialing[n.ID()]; ok { return errAlreadyDialing } if _, ok := d.peers[n.ID()]; ok { return errAlreadyConnected } if d.netRestrict != nil && !d.netRestrict.Contains(n.IP()) { return errNetRestrict } if d.history.contains(string(n.ID().Bytes())) { return errRecentlyDialed } return nil }
Dial
After the loop receives nodes from node discovery mechanism. It calls
startDial
to initiate a new dial task for a given node and try to establish protocol-level connection.Steps:
- Retrieve Node Information:
Retrieves the destination node of the dial task using the
dest
method of the dialTask
.- Logging.
- Add Node to Dial History:
Adds the node's ID to the dial history to prevent redialing the same node within a short period.
- Track Active Dial Task:
Adds the dial task to the dialing map, which tracks currently active dial tasks.
- Run Dial Task in Separate Goroutine:
- Starts a new goroutine to run the dial task. The
run
method of thedialTask
is called, passing thedialScheduler
as an argument. - Once the dial task completes, it sends the task to the
doneCh
channel.
/// ---p2p/dial.go--- func newDialTask(dest *enode.Node, flags connFlag) *dialTask { t := &dialTask{flags: flags, staticPoolIndex: -1} t.destPtr.Store(dest) return t } // This is the amount of time spent waiting in between redialing a certain node. The // limit is a bit higher than inboundThrottleTime to prevent failing dials in small // private networks. dialHistoryExpiration = inboundThrottleTime + 5*time.Second // startDial runs the given dial task in a separate goroutine. func (d *dialScheduler) startDial(task *dialTask) { /// Retrieve Node Information: node := task.dest() /// Logging. d.log.Trace("Starting p2p dial", "id", node.ID(), "ip", node.IP(), "flag", task.flags) /// Add Node to Dial History: hkey := string(node.ID().Bytes()) d.history.add(hkey, d.clock.Now().Add(dialHistoryExpiration)) /// Track Active Dial Task: d.dialing[node.ID()] = task /// Run Dial Task in Separate Goroutine: go func() { task.run(d) d.doneCh <- task }() } /// ---p2p/server.go--- // This time limits inbound connection attempts per source IP. inboundThrottleTime = 30 * time.Second
dialTask.run
handles executing the dialing process for a node, including resolving the node's address if necessary and handling any potential errors. Steps:
- Resolve Node Address (if necessary):
- The function first checks if the node needs to be resolved (i.e., if it's a static node without an IP address).
- If resolution is needed, the
resolve
method is called. If the resolution fails (resolve
returnsfalse
), the function exits early. - Nodes fetched from node discovery mechanism doesn’t need to be resolved, because they have IP. Only static nodes may need to be resolved.
- Attempt to Dial the Node:
- The
dial
method is called to attempt a connection to the node. - If the dialing fails and the task is for a static node (
t.flags&staticDialedConn != 0
), the function tries to resolve the node's address again. - If the resolution succeeds after the initial failure, it attempts to dial the node once more.
/// ---p2p/dial.go--- func (t *dialTask) run(d *dialScheduler) { if t.needResolve() && !t.resolve(d) { return } err := t.dial(d, t.dest()) if err != nil { // For static nodes, resolve one more time if dialing fails. if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 { if t.resolve(d) { t.dial(d, t.dest()) } } } } /// ---p2p/server.go--- const ( dynDialedConn connFlag = 1 << iota staticDialedConn inboundConn trustedConn ) /// ---p2p/dial.go--- func (t *dialTask) needResolve() bool { return t.flags&staticDialedConn != 0 && t.dest().IP() == nil } // resolve attempts to find the current endpoint for the destination // using discovery. // // Resolve operations are throttled with backoff to avoid flooding the // discovery network with useless queries for nodes that don't exist. // The backoff delay resets when the node is found. func (t *dialTask) resolve(d *dialScheduler) bool { if d.resolver == nil { return false } if t.resolveDelay == 0 { t.resolveDelay = initialResolveDelay } if t.lastResolved > 0 && time.Duration(d.clock.Now()-t.lastResolved) < t.resolveDelay { return false } node := t.dest() resolved := d.resolver.Resolve(node) t.lastResolved = d.clock.Now() if resolved == nil { t.resolveDelay *= 2 if t.resolveDelay > maxResolveDelay { t.resolveDelay = maxResolveDelay } d.log.Debug("Resolving node failed", "id", node.ID(), "newdelay", t.resolveDelay) return false } // The node was found. t.resolveDelay = initialResolveDelay t.destPtr.Store(resolved) d.log.Debug("Resolved node", "id", resolved.ID(), "addr", &net.TCPAddr{IP: resolved.IP(), Port: resolved.TCP()}) return true }
dial
calls d.dialer.Dial
to establish TCP connection with the remote node first (d.dialer
is a TCP dialer by default). This returns a fd
(file descriptor) to communicate with remote node.After TCP connection has been established, it calls
d.setupFunc
which is Server.SetupConn
to perform RLPx and Protocol handshake with the remote node to establish protocol-level connection. details./// ---p2p/dial.go--- // dial performs the actual connection attempt. func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error { dialMeter.Mark(1) fd, err := d.dialer.Dial(d.ctx, dest) if err != nil { d.log.Trace("Dial error", "id", dest.ID(), "addr", nodeAddr(dest), "conn", t.flags, "err", cleanupDialErr(err)) dialConnectionError.Mark(1) return &dialError{err} } return d.setupFunc(newMeteredConn(fd), t.flags, dest) }