Main Loop

Main Loop

Server.run is the main loop responsible for managing peer connections and orchestrating the server's lifecycle.
Steps:
  1. Logging and Setup:
    1. logs and sets up deferred calls to clean up resources when the function exits.
  1. Peer Management:
    1. initializes a map to keep track of peers, a counter for inbound connections, and a map for trusted nodes. Trusted nodes are those always allowed to connect, even if the server is at its peer limit.
  1. Trusted Nodes Setup:
    1. populates the trusted map with the initial set of trusted nodes.
(main loop)
main loop handles various events and operations related to peer connections and server management. It continuously listens for and processes events until the server is stopped.
  1. Handling Trusted Node Events:
    1. handle adding and removing nodes from the trusted list, updating the connection flags for existing peers if necessary.
  1. Handling Peer Operations:
    1. processes peer operations (like getting the list of peers) by executing a function passed through the peerOp channel.
  1. Post-Handshake Checks:
    1. handles connections that have completed the encryption handshake but haven't yet been verified or added as peers. It performs initial checks like ensuring the server isn't over its peer limit.
  1. Adding Peers
    1. finalizes adding a connection as a peer after it passes the protocol handshake. It updates the peer map, logging information, and relevant metrics.
  1. Dropping Peers
    1. handles peer disconnections, removing them from the peer map and updating the connection counts and metrics.
  1. Cleanup
    1. When the server is stopped, it performs cleanup by closing discovery mechanisms and disconnecting all peers. It waits for all peers to shut down before fully exiting.
/// ---p2p/server.go--- // run is the main loop of the server. func (srv *Server) run() { srv.log.Info("Started P2P networking", "self", srv.localnode.Node().URLv4()) defer srv.loopWG.Done() defer srv.nodedb.Close() defer srv.discmix.Close() defer srv.dialsched.stop() var ( peers = make(map[enode.ID]*Peer) inboundCount = 0 trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) ) // Put trusted nodes into a map to speed up checks. // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. for _, n := range srv.TrustedNodes { trusted[n.ID()] = true } running: for { select { case <-srv.quit: // The server was stopped. Run the cleanup logic. break running case n := <-srv.addtrusted: // This channel is used by AddTrustedPeer to add a node // to the trusted node set. srv.log.Trace("Adding trusted node", "node", n) trusted[n.ID()] = true if p, ok := peers[n.ID()]; ok { p.rw.set(trustedConn, true) } case n := <-srv.removetrusted: // This channel is used by RemoveTrustedPeer to remove a node // from the trusted node set. srv.log.Trace("Removing trusted node", "node", n) delete(trusted, n.ID()) if p, ok := peers[n.ID()]; ok { p.rw.set(trustedConn, false) } case op := <-srv.peerOp: // This channel is used by Peers and PeerCount. op(peers) srv.peerOpDone <- struct{}{} case c := <-srv.checkpointPostHandshake: // A connection has passed the encryption handshake so // the remote identity is known (but hasn't been verified yet). if trusted[c.node.ID()] { // Ensure that the trusted flag is set before checking against MaxPeers. c.flags |= trustedConn } // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. c.cont <- srv.postHandshakeChecks(peers, inboundCount, c) case c := <-srv.checkpointAddPeer: // At this point the connection is past the protocol handshake. // Its capabilities are known and the remote identity is verified. err := srv.addPeerChecks(peers, inboundCount, c) if err == nil { // The handshakes are done and it passed all checks. p := srv.launchPeer(c) peers[c.node.ID()] = p srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name()) srv.dialsched.peerAdded(c) if p.Inbound() { inboundCount++ serveSuccessMeter.Mark(1) activeInboundPeerGauge.Inc(1) } else { dialSuccessMeter.Mark(1) activeOutboundPeerGauge.Inc(1) } activePeerGauge.Inc(1) } c.cont <- err case pd := <-srv.delpeer: // A peer disconnected. d := common.PrettyDuration(mclock.Now() - pd.created) delete(peers, pd.ID()) srv.log.Debug("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err) srv.dialsched.peerRemoved(pd.rw) if pd.Inbound() { inboundCount-- activeInboundPeerGauge.Dec(1) } else { activeOutboundPeerGauge.Dec(1) } activePeerGauge.Dec(1) } } srv.log.Trace("P2P networking is spinning down") // Terminate discovery. If there is a running lookup it will terminate soon. if srv.ntab != nil { srv.ntab.Close() } if srv.DiscV5 != nil { srv.DiscV5.Close() } // Disconnect all peers. for _, p := range peers { p.Disconnect(DiscQuitting) } // Wait for peers to shut down. Pending connections and tasks are // not handled here and will terminate soon-ish because srv.quit // is closed. for len(peers) > 0 { p := <-srv.delpeer p.log.Trace("<-delpeer (spindown)") delete(peers, p.ID()) } }

Post Handshake Checks

postHandshakeChecks performs a series of checks after the initial encryption handshake has been completed between two nodes but before the protocol handshake has occurred. This function ensures that the connection meets certain criteria before proceeding further.
Steps:
  1. Checking the Maximum Number of Peers.
  1. Checking the Maximum Number of Inbound Connections.
  1. Checking if the Node is Already Connected.
  1. Checking for Self-Connection.
/// ---p2p/server.go--- func (srv *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error { switch { /// Checking the Maximum Number of Peers. case !c.is(trustedConn) && len(peers) >= srv.MaxPeers: return DiscTooManyPeers /// Checking the Maximum Number of Inbound Connections. case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): return DiscTooManyPeers /// Checking if the Node is Already Connected. case peers[c.node.ID()] != nil: return DiscAlreadyConnected /// Checking for Self-Connection. case c.node.ID() == srv.localnode.ID(): return DiscSelf default: return nil } }

Add Peer

When a peer addition request from channel srv.checkpointAddPeer, it will do several checks, and launch a go-routine for the peer.
Steps:
  1. Check whether the peer is qualified to add.
  1. Runs the peer in its own go routine.
  1. Record the peer.
  1. Notify dial schedule the peer has been added.
  1. Update relavant variables.
    1. If it is the peer initiates the connection, then update inboundCount and activeInboundPeerGauge
    2. If local node initiates the connection, then update activeOutboundPeerGauge.
    3. Upate activePeerGauge.
/// ---p2p/server.go#Server.run--- case c := <-srv.checkpointAddPeer: /// Check whether the peer is qualified to add. // At this point the connection is past the protocol handshake. // Its capabilities are known and the remote identity is verified. err := srv.addPeerChecks(peers, inboundCount, c) if err == nil { /// Runs the peer in its own go routine. // The handshakes are done and it passed all checks. p := srv.launchPeer(c) /// Record the peer. peers[c.node.ID()] = p srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name()) /// Notify dial schedule the peer has been added. srv.dialsched.peerAdded(c) /// Update relavant variables. if p.Inbound() { inboundCount++ serveSuccessMeter.Mark(1) activeInboundPeerGauge.Inc(1) } else { dialSuccessMeter.Mark(1) activeOutboundPeerGauge.Inc(1) } activePeerGauge.Inc(1) } c.cont <- err
 
addPeerChecks checks the remote node has at least one same supported protocol. Also it calls srv.postHandshakeChecks to check other constraints like peer amount limitation.
/// ---p2p/server.go--- func (srv *Server) addPeerChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error { // Drop connections with no matching protocols. if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, c.caps) == 0 { return DiscUselessPeer } // Repeat the post-handshake checks because the // peer set might have changed since those checks were performed. return srv.postHandshakeChecks(peers, inboundCount, c) } /// --p2p/peer.go--- func countMatchingProtocols(protocols []Protocol, caps []Cap) int { n := 0 for _, cap := range caps { for _, proto := range protocols { if proto.Name == cap.Name && proto.Version == cap.Version { n++ } } } return n }
 
peerAdded passes the connection to channel d.addPeerCh to notify dial scheduller that the peer has been added.
/// ---p2p/dial.go--- // peerAdded updates the peer set. func (d *dialScheduler) peerAdded(c *conn) { select { case d.addPeerCh <- c: case <-d.ctx.Done(): } }
 
launchPeer initializes a new Peer instance, and starts a go-routine to run the peer.
Steps:
  1. Initializing a new peer:
    1. Calls newPeer to initialize the peer. newPeer sets up various aspects of the peer, such as its protocols, logging, and communication channels. Inside the newPeer, it calls matchProtocols to constructs a shared protocols mapping protomap with the peer.
  1. Starting go-routine to run the peer.
 
Protocol Message Codes
In the matchProtocols function, the protocol.Length refers to the number of message codes used by that specific protocol. Each protocol in the Ethereum P2P network uses a set of message codes to identify different types of messages that can be sent and received.
The Length attribute defines how many distinct message codes are associated with that protocol. The offset variable starts at baseProtocolLength, which is a constant value representing the starting point for protocol-specific message codes beyond the base protocol messages.
As each protocol is matched with its corresponding capability (cap), the protocol's length is added to the offset to ensure that each protocol's message codes are assigned a unique and non-overlapping range.
This assignment is stored in the protoRW structure, which keeps track of the protocol's message codes and their offset.
Because two nodes follow same message code calculation protocol, they will get same message codes for each overlapping sub protocol.
Example:
Suppose we have the following protocols defined:
protocols := []Protocol{ {Name: "eth", Version: 63, Length: 17}, // Ethereum protocol with 17 message codes {Name: "shh", Version: 5, Length: 7}, // Whisper protocol with 7 message codes }
When these protocols are matched with capabilities (caps), the matchProtocols function assigns them as follows:
  1. Ethereum Protocol ("eth/63"):
      • Initial offset = baseProtocolLength
      • baseProtocolLength might be 16 (as an example).
      • The eth protocol is assigned message codes from 16 to 32 (16 + 17 - 1).
  1. Whisper Protocol ("shh/5"):
      • The next available offset is now 33 (16 + 17).
      • The shh protocol is assigned message codes from 33 to 39 (33 + 7 - 1).
Visualization:
  • Base Protocol Messages: 0 to 15
  • Ethereum Protocol Messages: 16 to 32
  • Whisper Protocol Messages: 33 to 39
/// ---p2p/server.go--- func (srv *Server) launchPeer(c *conn) *Peer { /// Initializing a new peer: p := newPeer(srv.log, c, srv.Protocols) if srv.EnableMsgEvents { // If message events are enabled, pass the peerFeed // to the peer. p.events = &srv.peerFeed } /// Starting go-routine to run the peer. go srv.runPeer(p) return p } /// ---p2p/peer.go--- const baseProtocolLength = uint64(16) func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer { protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ rw: conn, running: protomap, created: mclock.Now(), disc: make(chan DiscReason), protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}), pingRecv: make(chan struct{}, 16), log: log.New("id", conn.node.ID(), "conn", conn.flags), } return p } // matchProtocols creates structures for matching named subprotocols. func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW { slices.SortFunc(caps, Cap.Cmp) offset := baseProtocolLength result := make(map[string]*protoRW) outer: for _, cap := range caps { for _, proto := range protocols { if proto.Name == cap.Name && proto.Version == cap.Version { // If an old protocol version matched, revert it if old := result[cap.Name]; old != nil { offset -= old.Length } // Assign the new match result[cap.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw} offset += proto.Length continue outer } } } return result } /// ---p2p/protocol.go--- // Protocol represents a P2P subprotocol implementation. type Protocol struct { // Name should contain the official protocol name, // often a three-letter word. Name string // Version should contain the version number of the protocol. Version uint // Length should contain the number of message codes used // by the protocol. Length uint64 /// ... } // Cmp defines the canonical sorting order of capabilities. func (cap Cap) Cmp(other Cap) int { if cap.Name == other.Name { if cap.Version < other.Version { return -1 } if cap.Version > other.Version { return 1 } return 0 } return strings.Compare(cap.Name, other.Name) }

Run Peer

runPeer function in the Ethereum P2P library is responsible for managing the lifecycle of a peer connection. It starts the peer's main loop, handles the peer's activities, and cleans up when the peer disconnects.
Steps:
  1. Check for newPeerHook:
    1. If srv.newPeerHook is not nil, it is called with the peer p. This hook can be used for custom initialization or logging when a new peer is created.
  1. Send PeerEventTypeAdd Event:
    1. This sends an event indicating that a new peer has been added. The event includes the peer's ID, remote address, and local address. This event is sent to all subscribers of the srv.peerFeed.
  1. Run the Peer Main Loop:
      • This calls the run method of the peer p. The run method handles the peer's main loop, including reading and writing messages, handling pings, and managing protocol handlers.
      • The run method returns two values: remoteRequested, which indicates whether the remote peer requested the disconnection, and err, which is any error that occurred during the peer's operation.
  1. Announce Peer Disconnection:
      • This sends a peerDrop struct to the srv.delpeer channel. The peerDrop struct contains the peer p, the error err, and the remoteRequested flag.
      • The main server loop processes this channel to update the peer set and handle the disconnection.
  1. Send PeerEventTypeDrop Event:
      • This sends an event indicating that the peer has been dropped. The event includes the peer's ID, error message, remote address, and local address.
      • This event is sent after the peer has been removed from the srv.delpeer channel to ensure subscribers have a consistent view of the peer set.
/// ---p2p/server.go--- // runPeer runs in its own goroutine for each peer. func (srv *Server) runPeer(p *Peer) { // Check for newPeerHook: if srv.newPeerHook != nil { srv.newPeerHook(p) } // Broadcast peer addition to external subscribers srv.peerFeed.Send(&PeerEvent{ Type: PeerEventTypeAdd, Peer: p.ID(), RemoteAddress: p.RemoteAddr().String(), LocalAddress: p.LocalAddr().String(), }) // Run the per-peer main loop. remoteRequested, err := p.run() // Announce disconnect on the main loop to update the peer set. // The main loop waits for existing peers to be sent on srv.delpeer // before returning, so this send should not select on srv.quit. srv.delpeer <- peerDrop{p, err, remoteRequested} // Broadcast peer drop to external subscribers. This needs to be // after the send to delpeer so subscribers have a consistent view of // the peer set (i.e. Server.Peers() doesn't include the peer when the // event is received). srv.peerFeed.Send(&PeerEvent{ Type: PeerEventTypeDrop, Peer: p.ID(), Error: err.Error(), RemoteAddress: p.RemoteAddr().String(), LocalAddress: p.LocalAddr().String(), }) }

Send Peer Info to Subscribed Channels

Feed.Send performs delivering a value to all subscribed channels simultaneously. It ensures that all subscribers receive the value, managing concurrency and synchronization to handle subscriptions and unsubscriptions efficiently.
/// ---event/feed.go--- // Send delivers to all subscribed channels simultaneously. // It returns the number of subscribers that the value was sent to. func (f *Feed) Send(value interface{}) (nsent int) { rvalue := reflect.ValueOf(value) f.once.Do(func() { f.init(rvalue.Type()) }) if f.etype != rvalue.Type() { panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) } <-f.sendLock // Add new cases from the inbox after taking the send lock. f.mu.Lock() f.sendCases = append(f.sendCases, f.inbox...) f.inbox = nil f.mu.Unlock() // Set the sent value on all channels. for i := firstSubSendCase; i < len(f.sendCases); i++ { f.sendCases[i].Send = rvalue } // Send until all channels except removeSub have been chosen. 'cases' tracks a prefix // of sendCases. When a send succeeds, the corresponding case moves to the end of // 'cases' and it shrinks by one element. cases := f.sendCases for { // Fast path: try sending without blocking before adding to the select set. // This should usually succeed if subscribers are fast enough and have free // buffer space. for i := firstSubSendCase; i < len(cases); i++ { if cases[i].Chan.TrySend(rvalue) { nsent++ cases = cases.deactivate(i) i-- } } if len(cases) == firstSubSendCase { break } // Select on all the receivers, waiting for them to unblock. chosen, recv, _ := reflect.Select(cases) if chosen == 0 /* <-f.removeSub */ { index := f.sendCases.find(recv.Interface()) f.sendCases = f.sendCases.delete(index) if index >= 0 && index < len(cases) { // Shrink 'cases' too because the removed case was still active. cases = f.sendCases[:len(cases)-1] } } else { cases = cases.deactivate(chosen) nsent++ } } // Forget about the sent value and hand off the send lock. for i := firstSubSendCase; i < len(f.sendCases); i++ { f.sendCases[i].Send = reflect.Value{} } f.sendLock <- struct{}{} return nsent }

Run peer

peer.run manages the lifecycle of a peer connection, including handling reading, writing, and protocol-specific operations.
Steps:
  1. Channel Initialization:
      • writeStart: Signals when to start the next write operation.
      • writeErr: Receives errors from write operations.
      • readErr: Receives errors from read operations.
      • reason: Used to indicate the reason for disconnection.
  1. Starting Read Goroutine:
    1. go p.readLoop(readErr) starts the readLoop goroutine, which handles incoming messages.
  1. Starting Ping Goroutine:
    1. go p.pingLoop() starts the pingLoop goroutine, which manages sending and receiving ping messages to keep the connection alive.
  1. Starting Protocol Handlers:
      • writeStart <- struct{}{} allows the first write operation to proceed.
      • p.startProtocols(writeStart, writeErr) starts protocol-specific handlers in separate goroutines. Each protocol will handle messages specific to it.
  1. Main Loop:
      • The main loop waits for various events: write completion, read errors, protocol errors, and disconnection requests.
      • case err = <-writeErr: Handles write errors. If an error occurs, it sets the reason to DiscNetworkError and breaks the loop.
      • case err = <-readErr: Handles read errors. If the error is of type DiscReason, it means the remote peer requested the disconnection. Otherwise, it sets the reason to DiscNetworkError.
      • case err = <-p.protoErr: Handles protocol errors by setting the appropriate disconnection reason.
      • case err = <-p.disc: Handles disconnection requests, setting the reason accordingly.
  1. Cleanup:
      • close(p.closed): Closes the closed channel to signal all goroutines to stop.
      • p.rw.close(reason): Closes the connection with the given reason.
      • p.wg.Wait(): Waits for all goroutines (readLoop and pingLoop) to finish.
/// ---p2p/peer.go--- func (p *Peer) run() (remoteRequested bool, err error) { /// Channel Initialization: var ( writeStart = make(chan struct{}, 1) writeErr = make(chan error, 1) readErr = make(chan error, 1) reason DiscReason // sent to the peer ) p.wg.Add(2) // Starting Read Goroutine: go p.readLoop(readErr) // Starting Ping Goroutine: go p.pingLoop() // Start all protocol handlers. writeStart <- struct{}{} p.startProtocols(writeStart, writeErr) /// Main Loop: // Wait for an error or disconnect. loop: for { select { case err = <-writeErr: // A write finished. Allow the next write to start if // there was no error. if err != nil { reason = DiscNetworkError break loop } writeStart <- struct{}{} case err = <-readErr: if r, ok := err.(DiscReason); ok { remoteRequested = true reason = r } else { reason = DiscNetworkError } break loop case err = <-p.protoErr: reason = discReasonForError(err) break loop case err = <-p.disc: reason = discReasonForError(err) break loop } } /// Cleanup: close(p.closed) p.rw.close(reason) p.wg.Wait() return remoteRequested, err } /// ---p2p/peer_error.go--- func discReasonForError(err error) DiscReason { if reason, ok := err.(DiscReason); ok { return reason } if errors.Is(err, errProtocolReturned) { return DiscQuitting } peerError, ok := err.(*peerError) if ok { switch peerError.code { case errInvalidMsgCode, errInvalidMsg: return DiscProtocolError default: return DiscSubprotocolError } } return DiscSubprotocolError }

Loop: Read Loop

Peer.readLoop continuously tries to read from rw connection. If it receives message, then it will call Peer.handle to handle the message.
The Peer.rw is passed from Listening Loop, where local node and corresponding node have finished RLPx and Protocol handshake, exchanged shared secret and established protocol-level connection.p.rw.ReadMsg reads message from remote node, use the shared secret to decrypt and verify incoming message, and return (code).
/// ---p2p/peer.go--- func (p *Peer) readLoop(errc chan<- error) { defer p.wg.Done() for { msg, err := p.rw.ReadMsg() if err != nil { errc <- err return } msg.ReceivedAt = time.Now() if err = p.handle(msg); err != nil { errc <- err return } } }
 
Peer.handle handles protocol messages, routes them to the appropriate protocol handler, and managing special message types like pings and disconnects.
Steps:
  1. Ping Message Handling:
      • If the message code indicates a ping (pingMsg), the message is discarded.
      • A signal is sent to the pingRecv channel to indicate that a ping has been received.
      • If the closed channel is closed, the select will terminate without sending the signal.
  1. Disconnect Message Handling:
      • If the message code indicates a disconnect (discMsg), the message payload is decoded to extract the disconnection reason (DiscReason).
      • The function returns the disconnection reason, signaling that the peer should be disconnected.
  1. Base Protocol Messages Handling:
      • If the message code is less than the baseProtocolLength, it is considered a base protocol message.
      • These messages are ignored, and the message is discarded.
  1. Subprotocol Messages Handling:
      • If the message code does not match any of the special cases, it is considered a subprotocol message.
      • The getProto function is called to find the protocol handler responsible for this message code.
      • If the protocol handler is found, and metrics are enabled, metrics for the message are recorded.
      • The message is sent to the protocol handler's input channel (proto.in). If the peer is closed, io.EOF is returned.
/// ---p2p/peer.go--- const ( // devp2p message codes handshakeMsg = 0x00 discMsg = 0x01 pingMsg = 0x02 pongMsg = 0x03 ) func (p *Peer) handle(msg Msg) error { switch { /// Ping Message Handling: case msg.Code == pingMsg: msg.Discard() select { case p.pingRecv <- struct{}{}: case <-p.closed: } // Disconnect Message Handling: case msg.Code == discMsg: // This is the last message. We don't need to discard or // check errors because, the connection will be closed after it. var m struct{ R DiscReason } rlp.Decode(msg.Payload, &m) return m.R // Base Protocol Messages Handling: case msg.Code < baseProtocolLength: // ignore other base protocol messages return msg.Discard() // Subprotocol Messages Handling: default: // it's a subprotocol message proto, err := p.getProto(msg.Code) if err != nil { return fmt.Errorf("msg code out of range: %v", msg.Code) } if metrics.Enabled { m := fmt.Sprintf("%s/%s/%d/%#02x", ingressMeterName, proto.Name, proto.Version, msg.Code-proto.offset) metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize)) metrics.GetOrRegisterMeter(m+"/packets", nil).Mark(1) } select { case proto.in <- msg: return nil case <-p.closed: return io.EOF } } return nil } // getProto finds the protocol responsible for handling // the given message code. func (p *Peer) getProto(code uint64) (*protoRW, error) { for _, proto := range p.running { if code >= proto.offset && code < proto.offset+proto.Length { return proto, nil } } return nil, newPeerError(errInvalidMsgCode, "%d", code) } /// ---p2p/message.go--- // Discard reads any remaining payload data into a black hole. func (msg Msg) Discard() error { _, err := io.Copy(io.Discard, msg.Payload) return err }

Loop: Ping

The Peer.pingLoop handles the periodic ping messages to ensure that the connection between peers remains active. This function is crucial for maintaining the liveliness of peer connections by regularly sending ping messages and responding to incoming ping messages with pong messages.
Steps:
  1. Deferring the WaitGroup Done:
    1. The defer p.wg.Done() statement ensures that the wait group's counter is decremented when the pingLoop function exits. This is part of the synchronization mechanism to ensure that the peer's goroutines are properly cleaned up.
  1. Creating and Stopping the Timer:
      • A time.NewTimer is created with the interval set to pingInterval (15 seconds).
      • The defer ping.Stop() ensures that the timer is stopped when the function exits, which helps to release resources.
(Main Loop)
The function enters an infinite loop where it waits for events on multiple channels using a select statement.
  1. Handling Timer Expiration:
      • When the timer expires (ping.C), the SendItems function is called to send a pingMsg to the peer.
      • If there is an error while sending the ping message, it is sent to the protoErr channel, and the function returns, effectively terminating the ping loop.
      • If the ping message is sent successfully, the timer is reset for the next interval.
  1. Handling Incoming Ping Messages:
    1. When a ping message is received (p.pingRecv) by peer read loop, the function responds by sending a pongMsg back to the peer using the SendItems function.
  1. Handling Peer Closure:
    1. When the closed channel is closed, indicating that the peer connection is being shut down, the function returns, exiting the loop and stopping further processing.
/// ---p2p/peer.go-- const pingInterval = 15 * time.Second func (p *Peer) pingLoop() { // Deferring the WaitGroup Done: defer p.wg.Done() // Creating and Stopping the Timer: ping := time.NewTimer(pingInterval) defer ping.Stop() for { select { // Handling Timer Expiration: case <-ping.C: if err := SendItems(p.rw, pingMsg); err != nil { p.protoErr <- err return } ping.Reset(pingInterval) // Handling Incoming Ping Messages: case <-p.pingRecv: SendItems(p.rw, pongMsg) // Handling Peer Closure: case <-p.closed: return } } } /// ---p2p/message.go--- // SendItems writes an RLP with the given code and data elements. // For a call such as: // // SendItems(w, code, e1, e2, e3) // // the message payload will be an RLP list containing the items: // // [e1, e2, e3] func SendItems(w MsgWriter, msgcode uint64, elems ...interface{}) error { return Send(w, msgcode, elems) } // Send writes an RLP-encoded message with the given code. // data should encode as an RLP list. func Send(w MsgWriter, msgcode uint64, data interface{}) error { size, r, err := rlp.EncodeToReader(data) if err != nil { return err } return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r}) }
 

Start Sub Protocols

startProtocols is responsible for initializing and starting the protocol handlers for a peer.
Steps:
  1. Adding to WaitGroup:
    1. The p.wg.Add(len(p.running)) statement increments the wait group's counter by the number of running protocols. This ensures that the wait group knows how many goroutines it needs to wait for before it can proceed.
  1. Iterating Over Protocols:
    1. iiterates over all running protocols associated with the peer. For each protocol, it initializes and starts a new goroutine to handle the protocol's operations
  1. Setting Up Protocol Properties:
    1. The closed, wstart, and werr channels are set for the protocol. These channels are used for signaling when the peer is shutting down, when a write operation can start, and for reporting write errors, respectively.
  1. Wrapping the Message Read/Writer with Event Handling:
    1. If event handling is enabled (p.events is not nil), the protocol's message read/writer is wrapped with an event handler using newMsgEventer. This allows for logging and metrics collection of message events.
  1. Starting Protocol Goroutine:
    1. A new goroutine is started to run the protocol's Run method.
      • The defer p.wg.Done() statement ensures that the wait group's counter is decremented when the goroutine exits.
      • The proto.Run(p, rw) method is called to start the protocol's operations. Run is registered by each sub protocol containing core logic of each sub protocol, rw manages reading/writing message from/to peer.
      • The error (if any) is sent to the p.protoErr channel.
 
For a sub protocol to write message, it needs to first acquire the writeStart lock, and all sub protocols share one writeStart lock. This is to achive thread-safe because all sub protocols uses same connection to send message to peer.
/// ---p2p/peer.go--- // Peer represents a connected remote node. type Peer struct { rw *conn running map[string]*protoRW log log.Logger created mclock.AbsTime wg sync.WaitGroup protoErr chan error closed chan struct{} pingRecv chan struct{} disc chan DiscReason // events receives message send / receive events if set events *event.Feed testPipe *MsgPipeRW // for testing } type protoRW struct { Protocol in chan Msg // receives read messages closed <-chan struct{} // receives when peer is shutting down wstart <-chan struct{} // receives when write may start werr chan<- error // for write results offset uint64 w MsgWriter } func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) { /// Adding to WaitGroup: p.wg.Add(len(p.running)) /// Iterating Over Protocols: for _, proto := range p.running { /// Setting Up Protocol Properties: proto := proto proto.closed = p.closed proto.wstart = writeStart proto.werr = writeErr var rw MsgReadWriter = proto /// Wrapping the Message Read/Writer with Event Handling: if p.events != nil { rw = newMsgEventer(rw, p.events, p.ID(), proto.Name, p.Info().Network.RemoteAddress, p.Info().Network.LocalAddress) } p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version)) /// Starting Protocol Goroutine: go func() { defer p.wg.Done() err := proto.Run(p, rw) if err == nil { p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version)) err = errProtocolReturned } else if !errors.Is(err, io.EOF) { p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err) } p.protoErr <- err }() } } /// ---p2p/protocol.go--- // Protocol represents a P2P subprotocol implementation. type Protocol struct { // ... // Run is called in a new goroutine when the protocol has been // negotiated with a peer. It should read and write messages from // rw. The Payload for each message must be fully consumed. // // The peer connection is closed when Start returns. It should return // any protocol-level error (such as an I/O error) that is // encountered. Run func(peer *Peer, rw MsgReadWriter) error // ... }
 
rw (protoRw) implements WriteMsg and ReadMsg.
  • ReadMsg: read message from channel protoRW.in passed in by read loop.
  • WriteMsg: calls protoRW.w.WriteMsg to send message. protoRw.w is protocol-level connection with remote node. It uses shared secret to encrypt message and send to remote node. WriteMsg attaches message code to the message and send to the peer.
/// ---p2p/peer.go--- // Msg defines the structure of a p2p message. // // Note that a Msg can only be sent once since the Payload reader is // consumed during sending. It is not possible to create a Msg and // send it any number of times. If you want to reuse an encoded // structure, encode the payload into a byte array and create a // separate Msg with a bytes.Reader as Payload for each send. type Msg struct { Code uint64 Size uint32 // Size of the raw payload Payload io.Reader ReceivedAt time.Time meterCap Cap // Protocol name and version for egress metering meterCode uint64 // Message within protocol for egress metering meterSize uint32 // Compressed message size for ingress metering } func (rw *protoRW) WriteMsg(msg Msg) (err error) { if msg.Code >= rw.Length { return newPeerError(errInvalidMsgCode, "not handled") } msg.meterCap = rw.cap() msg.meterCode = msg.Code msg.Code += rw.offset select { case <-rw.wstart: err = rw.w.WriteMsg(msg) // Report write status back to Peer.run. It will initiate // shutdown if the error is non-nil and unblock the next write // otherwise. The calling protocol code should exit for errors // as well but we don't want to rely on that. rw.werr <- err case <-rw.closed: err = ErrShuttingDown } return err } func (rw *protoRW) ReadMsg() (Msg, error) { select { case msg := <-rw.in: msg.Code -= rw.offset return msg, nil case <-rw.closed: return Msg{}, io.EOF } }