Setup Listening

Setup Listening

setupListening sets up the TCP listener for incoming connections. It configures the local node to accept inbound connections on a specified address and port, and it handles updating the local node record with the listening port information.
  1. Launching the Listener:
      • starts by calling srv.listenFunc (default net.Listen), which sets up a TCP listener on the specified address.
      • On success, it assigns the listener to srv.listener and updates srv.ListenAddr with the actual address of the listener. This is because if there is no port assigned in the original ListenAddr, net.Listen automatically chooses a port number, so we need to update the actuall IP and port in srv.ListenAddr.
  1. Updating the Local Node Record:
      • checks if the listener address is a TCP address (net.TCPAddr).
      • If it is, it updates the local node record with the TCP port using srv.localnode.Set(enr.TCP(tcp.Port)).
      • If the IP address of the listener is not a loopback address (127.0.0.1) or a private address, it sends a portMapping struct to srv.portMappingRegister, port mapping process will update port mapping on NAT gateway.
 
Why Register for NAT Port Mapping Only for Public Addresses?
When the TCP address of the node is a loopback (localhost) or private address, it means that the node is configured to be accessible only within the local network or from the local machine itself, and not from the outside (public internet). Detail
/// ---p2p/server.go--- func (srv *Server) setupListening() error { // Launch the listener. listener, err := srv.listenFunc("tcp", srv.ListenAddr) if err != nil { return err } srv.listener = listener srv.ListenAddr = listener.Addr().String() // Update the local node record and map the TCP listening port if NAT is configured. tcp, isTCP := listener.Addr().(*net.TCPAddr) if isTCP { srv.localnode.Set(enr.TCP(tcp.Port)) if !tcp.IP.IsLoopback() && !tcp.IP.IsPrivate() { srv.portMappingRegister <- &portMapping{ protocol: "TCP", name: "ethereum p2p", port: tcp.Port, } } } srv.loopWG.Add(1) go srv.listenLoop() return nil }
 
In the Server.Start, the default srv.listenFunc is net.Listen, which starts listening on the given IP and Port, and returns a handler manages inbound messages.
/// ---p2p/server.go--- // Start starts running the server. // Servers can not be re-used after stopping. func (srv *Server) Start() (err error) { // ... if srv.listenFunc == nil { srv.listenFunc = net.Listen } // ... }
/// ---go/src/net/dial.go--- // Listen announces on the local network address. // // The network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket". // // For TCP networks, if the host in the address parameter is empty or // a literal unspecified IP address, Listen listens on all available // unicast and anycast IP addresses of the local system. // To only use IPv4, use network "tcp4". // The address can use a host name, but this is not recommended, // because it will create a listener for at most one of the host's IP // addresses. // If the port in the address parameter is empty or "0", as in // "127.0.0.1:" or "[::1]:0", a port number is automatically chosen. // The Addr method of Listener can be used to discover the chosen // port. // // See func Dial for a description of the network and address // parameters. // // Listen uses context.Background internally; to specify the context, use // ListenConfig.Listen. func Listen(network, address string) (Listener, error) { var lc ListenConfig return lc.Listen(context.Background(), network, address) } /// ---go/src/net/net.go--- // A Listener is a generic network listener for stream-oriented protocols. // // Multiple goroutines may invoke methods on a Listener simultaneously. type Listener interface { // Accept waits for and returns the next connection to the listener. Accept() (Conn, error) // Close closes the listener. // Any blocked Accept operations will be unblocked and return errors. Close() error // Addr returns the listener's network address. Addr() Addr }
 

Listen Loop

listenLoop continuously accepts inbound connections, performing various checks and setting up the connections as peers. The semaphore mechanism ensures that the number of pending peer connections does not exceed a specified limit.
 
Preparation Steps:
  1. Log Node is Listenning
  1. Semaphore Initialization
    1. tokens determines the maximum number of concurrent pending connections (defaultMaxPendingPeers or srv.MaxPendingPeers).
      slots is a buffered channel with capacity tokens. Each struct represents an available slot for a pending connection. During the start of main loop of accepting and setting up connection, it fetches a channel from the slots. Only when the connection process has been finished, slot will be inserted back into channel, this combination limits that there is only max tokens pending connection.
  1. Deferred Cleanup
      • Ensures that the function decrements the srv.loopWG wait group counter when it completes.
      • Ensures all slots are returned to the slots channel before the function exits. This guarantees that all connection goroutines have finished before listenLoop returns.
Main Loop:
  1. Semaphore Wait:
    1. The loop waits for a slot to become available by receiving from the slots channel. This limits the number of concurrent connections being processed.
  1. Connection Accept Loop:
      • Attempts to accept a new connection using srv.listener.Accept().
      • If a temporary error occurs (checked using netutil.IsTemporaryError), it logs the error and retries after a short delay.
      • If a non-temporary error occurs, it logs the error, returns a slot to the slots channel, and exits the listen loop. It exits the listen loop because it is an error related to the listener, needed to be hanlded by the caller.
      • If it successfully accepted connection, then breaks the connection accept loop, handles connection.
  1. Connection Validation
      • Extracts the remote IP address from the connection.
      • Validates the inbound connection using srv.checkInboundConn(remoteIP) where it basically checks the area the connection is from is valid and the connection request is not too frequent.
      • If the check fails, it logs the rejection, closes the connection, returns a slot to the slots channel, and continues to the next iteration.
      • If the check succeeds, it wraps the connection in a newMeteredConn, logs the acceptance, and updates metrics using serveMeter.Mark(1). newMeteredConn is a wrapper of connection which performs some statistic calculation.
  1. Connection Setup
      • Starts a new goroutine to set up the connection as a peer using srv.SetupConn. This allows the listenLoop to continue accepting new connections without waiting for the setup to complete.
      • After the connection setup completes, the slot is returned to the slots channel, allowing another connection to be accepted.
/// ---p2p/server.go--- // listenLoop runs in its own goroutine and accepts // inbound connections. func (srv *Server) listenLoop() { srv.log.Debug("TCP listener up", "addr", srv.listener.Addr()) // The slots channel limits accepts of new connections. tokens := defaultMaxPendingPeers if srv.MaxPendingPeers > 0 { tokens = srv.MaxPendingPeers } slots := make(chan struct{}, tokens) for i := 0; i < tokens; i++ { slots <- struct{}{} } // Wait for slots to be returned on exit. This ensures all connection goroutines // are down before listenLoop returns. defer srv.loopWG.Done() defer func() { for i := 0; i < cap(slots); i++ { <-slots } }() for { // Wait for a free slot before accepting. <-slots var ( fd net.Conn err error lastLog time.Time ) for { fd, err = srv.listener.Accept() if netutil.IsTemporaryError(err) { if time.Since(lastLog) > 1*time.Second { srv.log.Debug("Temporary read error", "err", err) lastLog = time.Now() } time.Sleep(time.Millisecond * 200) continue } else if err != nil { srv.log.Debug("Read error", "err", err) slots <- struct{}{} return } break } remoteIP := netutil.AddrIP(fd.RemoteAddr()) if err := srv.checkInboundConn(remoteIP); err != nil { srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err) fd.Close() slots <- struct{}{} continue } if remoteIP != nil { fd = newMeteredConn(fd) serveMeter.Mark(1) srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr()) } go func() { srv.SetupConn(fd, inboundConn, nil) slots <- struct{}{} }() } } type connFlag int32 const ( dynDialedConn connFlag = 1 << iota staticDialedConn inboundConn trustedConn )
 

Inbound Connection Check

checkInboundConn validates inbound connections based on their remote IP addresses. It ensures that the connections adhere to the server's network restrictions and throttles the rate at which connections from internet peers are accepted.
srv.inboundHistory records inbound connection history with IP and throttle time. srv.inboundHistory.expire cleans connection history based on each history connection’s throttle time.
// ---p2p/server.go--- func (srv *Server) checkInboundConn(remoteIP net.IP) error { if remoteIP == nil { return nil } // Reject connections that do not match NetRestrict. if srv.NetRestrict != nil && !srv.NetRestrict.Contains(remoteIP) { return errors.New("not in netrestrict list") } // Reject Internet peers that try too often. now := srv.clock.Now() srv.inboundHistory.expire(now, nil) if !netutil.IsLAN(remoteIP) && srv.inboundHistory.contains(remoteIP.String()) { return errors.New("too many attempts") } srv.inboundHistory.add(remoteIP.String(), now.Add(inboundThrottleTime)) return nil }
 

Set Up Connection

SetupConn handles setting up a network connection, running necessary handshakes, and attempting to add the connection as a peer. If the connection fails at any stage, it is closed and an error is returned.
 
Steps:
  1. Connection Wrapper:
    1. wraps the network connection fd into a conn struct, setting up initial flags and a channel for error signaling.
  1. Transport Setup:
    1. It initializes the transport field of the conn struct using srv.newTransport, depending on whether dialDest is provided. transport field represents an abstraction layer that handles the underlying network communication, responsible for managing the details of data transmission, encryption, and protocol-specific operations.
      Default srv.newTransport is p2p/transport:newRLPX which is set in Server.Start.
      • If dialDest is nil, it creates a transport without a specific destination.
      • If dialDest is provided, it uses the public key of dialDest to create the transport.
  • Connection Setup: It calls srv.setupConn to perform further setup tasks.
  • Error Handling: If srv.setupConn returns an error and the connection is not inbound, it marks the error and closes the connection.
/// ---p2p/server.go--- type connFlag int32 const ( dynDialedConn connFlag = 1 << iota staticDialedConn inboundConn trustedConn ) // conn wraps a network connection with information gathered // during the two handshakes. type conn struct { fd net.Conn transport node *enode.Node flags connFlag cont chan error // The run loop uses cont to signal errors to SetupConn. caps []Cap // valid after the protocol handshake name string // valid after the protocol handshake } // SetupConn runs the handshakes and attempts to add the connection // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error { // Connection Wrapper: c := &conn{fd: fd, flags: flags, cont: make(chan error)} // Transport Setup: if dialDest == nil { c.transport = srv.newTransport(fd, nil) } else { c.transport = srv.newTransport(fd, dialDest.Pubkey()) } // Connection Setup: err := srv.setupConn(c, dialDest) // Error Handling: if err != nil { if !c.is(inboundConn) { markDialError(err) } c.close(err) } return err } /// ---p2p/server.go--- // default transport is RLPX func (srv *Server) Start() (err error) { // ... if srv.newTransport == nil { srv.newTransport = newRLPX } // ... } /// ---p2p/transport.go--- // rlpxTransport is the transport used by actual (non-test) connections. // It wraps an RLPx connection with locks and read/write deadlines. type rlpxTransport struct { rmu, wmu sync.Mutex wbuf bytes.Buffer conn *rlpx.Conn } func newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport { return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)} } /// ---p2p/rlpx/rlpx.go--- // Conn is an RLPx network connection. It wraps a low-level network connection. The // underlying connection should not be used for other activity when it is wrapped by Conn. // // Before sending messages, a handshake must be performed by calling the Handshake method. // This type is not generally safe for concurrent use, but reading and writing of messages // may happen concurrently after the handshake. type Conn struct { dialDest *ecdsa.PublicKey conn net.Conn session *sessionState // These are the buffers for snappy compression. // Compression is enabled if they are non-nil. snappyReadBuffer []byte snappyWriteBuffer []byte } // NewConn wraps the given network connection. If dialDest is non-nil, the connection // behaves as the initiator during the handshake. func NewConn(conn net.Conn, dialDest *ecdsa.PublicKey) *Conn { return &Conn{ dialDest: dialDest, conn: conn, } }
 
transport interface definition
/// ---p2p/server.go--- type transport interface { // The two handshakes. doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) doProtoHandshake(our *protoHandshake) (*protoHandshake, error) // The MsgReadWriter can only be used after the encryption // handshake has completed. The code uses conn.id to track this // by setting it to a non-nil value after the encryption handshake. MsgReadWriter // transports must provide Close because we use MsgPipe in some of // the tests. Closing the actual network connection doesn't do // anything in those tests because MsgPipe doesn't use it. close(err error) } // Msg defines the structure of a p2p message. // // Note that a Msg can only be sent once since the Payload reader is // consumed during sending. It is not possible to create a Msg and // send it any number of times. If you want to reuse an encoded // structure, encode the payload into a byte array and create a // separate Msg with a bytes.Reader as Payload for each send. type Msg struct { Code uint64 Size uint32 // Size of the raw payload Payload io.Reader ReceivedAt time.Time meterCap Cap // Protocol name and version for egress metering meterCode uint64 // Message within protocol for egress metering meterSize uint32 // Compressed message size for ingress metering } /// ---p2p/message.go--- type MsgReader interface { ReadMsg() (Msg, error) } type MsgWriter interface { // WriteMsg sends a message. It will block until the message's // Payload has been consumed by the other end. // // Note that messages can be sent only once because their // payload reader is drained. WriteMsg(Msg) error } // MsgReadWriter provides reading and writing of encoded messages. // Implementations should ensure that ReadMsg and WriteMsg can be // called simultaneously from multiple goroutines. type MsgReadWriter interface { MsgReader MsgWriter }
 
setupConn handles establishing and validating a new P2P connection within a server.
Steps:
  1. Check Server Running State.
  1. Public Key Retrieval for Dialing:
    1. If the connection is being dialed (i.e., dialDest is not nil), it retrieves the remote public key from dialDest.
  1. RLPx Handshake:
      • performs the RLPx handshake using the server's private key.
      • If the handshake fails, it logs the error and returns an errEncHandshakeError.
      • If the handshake is successful, it sets the node information (c.node) based on whether dialDest was provided or not.
  1. Post-Handshake Checkpoint:
      • It logs the connection details and performs a checkpoint validation after the handshake.
      • If the checkpoint validation fails, it logs the rejection and returns the error.
  1. Capability Negotiation Handshake:
      • The function performs a second handshake to negotiate the supported capabilities (doProtoHandshake).
      • If this handshake fails, it logs the error and returns an errProtoHandshakeError.
      • It then verifies the peer's identity by comparing the handshake ID with the node's ID.
      • If the IDs do not match, it logs the discrepancy and returns a DiscUnexpectedIdentity error.
      • If the IDs match, it updates the connection's capabilities and name.
  1. Final Peer Addition Checkpoint:
      • It performs a final checkpoint validation to add the peer.
      • If the validation fails, it logs the rejection and returns error.
/// ---p2p/server.go--- func (srv *Server) setupConn(c *conn, dialDest *enode.Node) error { // Check Server Running State // Prevent leftover pending conns from entering the handshake. srv.lock.Lock() running := srv.running srv.lock.Unlock() if !running { return errServerStopped } // Public Key Retrieval for Dialing: // If dialing, figure out the remote public key. if dialDest != nil { dialPubkey := new(ecdsa.PublicKey) if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil { err = fmt.Errorf("%w: dial destination doesn't have a secp256k1 public key", errEncHandshakeError) srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) return err } } // RLPx Handshake: // Run the RLPx handshake. remotePubkey, err := c.doEncHandshake(srv.PrivateKey) if err != nil { srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) return fmt.Errorf("%w: %v", errEncHandshakeError, err) } if dialDest != nil { c.node = dialDest } else { c.node = nodeFromConn(remotePubkey, c.fd) } // Post-Handshake Checkpoint: clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags) err = srv.checkpoint(c, srv.checkpointPostHandshake) if err != nil { clog.Trace("Rejected peer", "err", err) return err } // Capability Negotiation Handshake: // Run the capability negotiation handshake. phs, err := c.doProtoHandshake(srv.ourHandshake) if err != nil { clog.Trace("Failed p2p handshake", "err", err) return fmt.Errorf("%w: %v", errProtoHandshakeError, err) } if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) { clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID)) return DiscUnexpectedIdentity } c.caps, c.name = phs.Caps, phs.Name // Final Peer Addition Checkpoint: err = srv.checkpoint(c, srv.checkpointAddPeer) if err != nil { clog.Trace("Rejected peer", "err", err) return err } return nil }

RLPx Handshake

Handshake handles establishing a secure session between peers by performing an encrypted handshake. It initiates or responds to a handshake, depending on whether the connection is an initiator (dialing) or a recipient (listening). During the handshake, a shared secret is established between two nodes using ECDH, from which aes, and mac is calculated, which is used to encrypt and decrypt messages in this session between two nodes.
As we know that each node has a long-term private key and corresponding public key which is the identity of the node, but geth doesn’t use those long-term keys to derive the shared secret of that session, but uses ephemeral keys generated in each session instead. If the peer disconnects, the ephemeral keys will be dropped, new keys will be generated in the next handshake progress between those two nodes, thus new shared secret and new aes and mac will be generated for that session.
Using ephemeral secret for each session ensures forward secrecy which means that even if an attacker gains access to the long-term private keys of the communicating parties, they cannot decrypt past communications.
 
Inside the Conn.Handshake:
  1. Variable Initialization:
    1. Initialize handshakeState and Secret.
      handshakeState stores intermediate states during handshake process, which will be used to calculate Secret.
      Secret stores AES and MACs which is used to encrypt and decrypt messages exchanged in the session.
  1. Dialing or Listening:
    1. checks if c.dialDest is non-nil to determine if the connection is initiating (dialing) or receiving (listening).Depending on the role, it calls h.runInitiator or h.runRecipient.
      runInitiator dials the peer, waits and handles response. runRecipient read dialing message from the peer and respond. Both will prepare the handshakeState.
  1. Calculate Secret:
    1. calculate Secret consiss of AES and MACs based on shared secret key derived from handshakeState.
/// ---p2p/transport.go--- func (t *rlpxTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) { t.conn.SetDeadline(time.Now().Add(handshakeTimeout)) return t.conn.Handshake(prv) } /// ---p2p/rlpx/rlpx.go--- // Secrets represents the connection secrets which are negotiated during the handshake. type Secrets struct { AES, MAC []byte EgressMAC, IngressMAC hash.Hash remote *ecdsa.PublicKey } // handshakeState contains the state of the encryption handshake. type handshakeState struct { initiator bool remote *ecies.PublicKey // remote-pubk initNonce, respNonce []byte // nonce randomPrivKey *ecies.PrivateKey // ecdhe-random remoteRandomPub *ecies.PublicKey // ecdhe-random-pubk rbuf readBuffer wbuf writeBuffer } // Handshake performs the handshake. This must be called before any data is written // or read from the connection. func (c *Conn) Handshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) { // Variable Initialization: var ( sec Secrets err error h handshakeState ) // Dialing or Listening: if c.dialDest != nil { sec, err = h.runInitiator(c.conn, prv, c.dialDest) } else { sec, err = h.runRecipient(c.conn, prv) } if err != nil { return nil, err } // Calculate Secret: c.InitWithSecrets(sec) c.session.rbuf = h.rbuf c.session.wbuf = h.wbuf return sec.remote, err }

Run Initiator

runInitiator handles initiating the handshake process on the dialing side of the connection.
Parameters:
  • conn io.ReadWriter: The connection interface for reading and writing data.
  • prv *ecdsa.PrivateKey: The private key of the local client (initiator).
  • remote *ecdsa.PublicKey: The public key of the remote client (recipient).
Returns:
  • s Secrets: The negotiated secrets for the connection.
  • err error: An error if the handshake fails.
Steps:
  1. Initiator Flag:
    1. Sets the initiator flag in the handshake state to true indicating that this side is initiating the handshake.
  1. Convert Remote Peer Public Key to ECIES Version:
    1. Converts the remote client's ECDSA public key into an ECIES public key for encryption purposes.
  1. Auth Message Creation:
    1. Creates an authentication message which is used for the other peer to recover ephemeral public key of this node.
  1. Auth Message Sealing:
    1. Seals the authentication message using EIP-8.
  1. Auth Packet Transmission:
    1. Sends the sealed authentication packet over the connection.
  1. Auth Response Reading:
    1. Reads the authentication response message from the remote client.
  1. Auth Response Handling:
    1. Handles the authentication response message, extract the remote peer’s ephemeral public key.
  1. Secret Derivation:
    1. Derives the session connection secrets from the authentication packet and the authentication response packet.
/// ---p2p/rlpx/rlpx.go--- // runInitiator negotiates a session token on conn. // it should be called on the dialing side of the connection. // // prv is the local client's private key. func (h *handshakeState) runInitiator(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s Secrets, err error) { // Initiator Flag: h.initiator = true // Convert Remote Peer Public Key to ECIES Version: h.remote = ecies.ImportECDSAPublic(remote) // Auth Message Creation: authMsg, err := h.makeAuthMsg(prv) if err != nil { return s, err } // Auth Message Sealing: authPacket, err := h.sealEIP8(authMsg) if err != nil { return s, err } // Auth Packet Transmission: if _, err = conn.Write(authPacket); err != nil { return s, err } // Auth Response Reading: authRespMsg := new(authRespV4) authRespPacket, err := h.readMsg(authRespMsg, prv, conn) if err != nil { return s, err } // Auth Response Handling: if err := h.handleAuthResp(authRespMsg); err != nil { return s, err } // Secret Derivation: return h.secrets(authPacket, authRespPacket) }
 
makeAuthMsg function handles creating the initiator's authentication message during the handshake process.
 
  1. Generate Random Nonce:
    1. A random nonce (initNonce) is generated for the initiator. Geth uses it and node’s long-term private key to calculate message to be signed by session’s ephemeral private key. This nonce is a random value used to ensure that the message to be signed in each handshake is unique.
  1. Generate Ephemeral Key Pair:
    1. A random ephemeral key pair is generated. The ephemeral private key is stored in h.randomPrivKey only used in this session.The ephemeral key pair will be used to calculate shared secret, based on which the AES and MACs of the session are calcualted.
  1. Generate Static Shared Secret:
    1. The static shared secret(token) is generated using the local private key (prv) and the remote public key (h.remote). This secret is derived using ECDH. Static Shared Secret is only used to calculate message to be signed by ephemeral private key, which is to ensure that only the target peer can calculate the static shared secret, and thus the message to be signed, so that it can recover the ephemeral public key of this node.
  1. Sign the Secret Message:
    1. A secret message is created by XORing the static shared secret (token) with the initiator's nonce (initNonce). We can it “secret message” because only the target peer with the private key can calcualte the message.
  1. Construct the Authentication Message:
    1. An authMsgV4 struct is created to hold the authentication message. We can see it doesn’t include the secret message.
 
staticSharedSecret uses ECDH to calcualte shared secret.
/// ---p2p/rlpx/rlpx.go--- // makeAuthMsg creates the initiator handshake message. func (h *handshakeState) makeAuthMsg(prv *ecdsa.PrivateKey) (*authMsgV4, error) { // Generate Random Nonce: // Generate random initiator nonce. h.initNonce = make([]byte, shaLen) _, err := rand.Read(h.initNonce) if err != nil { return nil, err } // Generate Ephemeral Key Pair for ECDH: // Generate random keypair to for ECDH. h.randomPrivKey, err = ecies.GenerateKey(rand.Reader, crypto.S256(), nil) if err != nil { return nil, err } // Generate Static Shared Secret: // Sign known message: static-shared-secret ^ nonce token, err := h.staticSharedSecret(prv) if err != nil { return nil, err } // Sign the Secret Message: signed := xor(token, h.initNonce) signature, err := crypto.Sign(signed, h.randomPrivKey.ExportECDSA()) if err != nil { return nil, err } // Construct the Authentication Message: msg := new(authMsgV4) copy(msg.Signature[:], signature) copy(msg.InitiatorPubkey[:], crypto.FromECDSAPub(&prv.PublicKey)[1:]) copy(msg.Nonce[:], h.initNonce) msg.Version = 4 return msg, nil } // staticSharedSecret returns the static shared secret, the result // of key agreement between the local and remote static node key. func (h *handshakeState) staticSharedSecret(prv *ecdsa.PrivateKey) ([]byte, error) { return ecies.ImportECDSA(prv).GenerateShared(h.remote, sskLen, sskLen) }
 
sealEIP8 handles encrypting authentication msg using the target peer’s Public key. The prefix records the length of the encrypted data. Geth uses prefix as associated data in the encrypt process to ensure the prefix won’t be tampered.
/// ---p2p/rlpx/rlpx.go--- // sealEIP8 encrypts a handshake message. func (h *handshakeState) sealEIP8(msg interface{}) ([]byte, error) { h.wbuf.reset() // Write the message plaintext. if err := rlp.Encode(&h.wbuf, msg); err != nil { return nil, err } // Pad with random amount of data. the amount needs to be at least 100 bytes to make // the message distinguishable from pre-EIP-8 handshakes. h.wbuf.appendZero(mrand.Intn(100) + 100) prefix := make([]byte, 2) binary.BigEndian.PutUint16(prefix, uint16(len(h.wbuf.data)+eciesOverhead)) enc, err := ecies.Encrypt(rand.Reader, h.remote, h.wbuf.data, nil, prefix) return append(prefix, enc...), err }
 
After dialing node has sent authentication message, it calls readMsg to wait and hanlde respond.
Step:
  1. Reads the Prefix:
    1. It reads first two bytes, which is the prefix records the size of the encrypted message.
  1. Read Messsage Body:
    1. Read the body of message.
  1. Decrypt Message Using Private Key:
    1. decrypts the encrypted message using its private key. If prefix has been tampered, decryption will fail.
  1. Decode Message Into Struct:
    1. Use RLP to decode the message into struct authRespV4.
  1. Return Message Byte Array.
/// ---p2p/rlpx/rlpx.go--- // readMsg reads an encrypted handshake message, decoding it into msg. func (h *handshakeState) readMsg(msg interface{}, prv *ecdsa.PrivateKey, r io.Reader) ([]byte, error) { // Reads the Prefix: h.rbuf.reset() h.rbuf.grow(512) // Read the size prefix. prefix, err := h.rbuf.read(r, 2) if err != nil { return nil, err } // Read Messsage Body: size := binary.BigEndian.Uint16(prefix) // Read the handshake packet. packet, err := h.rbuf.read(r, int(size)) if err != nil { return nil, err } // Decrypt Message Using Private Key: dec, err := ecies.ImportECDSA(prv).Decrypt(packet, nil, prefix) if err != nil { return nil, err } // Decode Message Into Struct: // Can't use rlp.DecodeBytes here because it rejects // trailing data (forward-compatibility). s := rlp.NewStream(bytes.NewReader(dec), 0) err = s.Decode(msg) // Return Message Byte Array: return h.rbuf.data[:len(prefix)+len(packet)], err }
 
handleAuthResp records the remote peer’s response nonce and ephemeral public key into handshakeState, used later to calculate AES and MACs.
// ---p2p/rlpx/rlpx.go--- func (h *handshakeState) handleAuthResp(msg *authRespV4) (err error) { h.respNonce = msg.Nonce[:] h.remoteRandomPub, err = importPublicKey(msg.RandomPubkey[:]) return err }
 
handshakeState.secret calculates shares secret, and corresponding AES and MACs which are used to encrypt and decrypt messages between peers in this session.
Steps:
  1. Calculate shared secrets using ECDH:
    1. Based on node’s ephemeral private key and peer’s ephemeral public key.
  1. Calculate AES:
    1. Based on shared secret, request nonce and respond nonce.
  1. Calculate base MAC:
    1. Base MAC is used to calculate egressMAC and ingressMAC
  1. Calculate egressMAC and ingressMAC:
    1. egressMAC is used by initiator peer during message encryption.
      ingressMAC is used by the other peer during message encryption.
      Use two different macs in different message directions helps improve communication security.
  1. Store AES and MACs.
    1. Store AES and MACs in Secret, used to encrypt and decrypt message later in peer communication.
/// ---p2p/server.go--- // secrets is called after the handshake is completed. // It extracts the connection secrets from the handshake values. func (h *handshakeState) secrets(auth, authResp []byte) (Secrets, error) { // Calculate shared secrets using ECDH: ecdheSecret, err := h.randomPrivKey.GenerateShared(h.remoteRandomPub, sskLen, sskLen) if err != nil { return Secrets{}, err } // derive base secrets from ephemeral key agreement sharedSecret := crypto.Keccak256(ecdheSecret, crypto.Keccak256(h.respNonce, h.initNonce)) // Calculate AES: aesSecret := crypto.Keccak256(ecdheSecret, sharedSecret) // Calculate base MAC: s := Secrets{ remote: h.remote.ExportECDSA(), AES: aesSecret, MAC: crypto.Keccak256(ecdheSecret, aesSecret), } // Calculate egressMAC and ingressMAC: // setup sha3 instances for the MACs mac1 := sha3.NewLegacyKeccak256() mac1.Write(xor(s.MAC, h.respNonce)) mac1.Write(auth) mac2 := sha3.NewLegacyKeccak256() mac2.Write(xor(s.MAC, h.initNonce)) mac2.Write(authResp) // Store AES and MACs. if h.initiator { s.EgressMAC, s.IngressMAC = mac1, mac2 } else { s.EgressMAC, s.IngressMAC = mac2, mac1 } return s, nil }

Run Recipient

runRecipient reads authentication message from another peer, respond authentication message to establish session with the peer.
Steps:
  1. Read and Decrypt Authentication Msg:
    1. Read authentication message from connection, use its long-term private key to decrypt.
  1. Handle Authentication Message:
    1. Construct signed message. Recover remote peer’s ephemeral public key based on the message and signature.
  1. Construct Authentication Response Message.
  1. Seal Authentication Reponse Message.
  1. Respond to peer.
  1. Initialize Secret.
/// ---p2p/rlpx/rlpx.go--- // runRecipient negotiates a session token on conn. // it should be called on the listening side of the connection. // // prv is the local client's private key. func (h *handshakeState) runRecipient(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s Secrets, err error) { // Read and Decrypt Authentication Msg: authMsg := new(authMsgV4) authPacket, err := h.readMsg(authMsg, prv, conn) if err != nil { return s, err } // Handle Authentication Message: if err := h.handleAuthMsg(authMsg, prv); err != nil { return s, err } // Construct Authentication Response Message. authRespMsg, err := h.makeAuthResp() if err != nil { return s, err } // Seal Authentication Reponse Message. authRespPacket, err := h.sealEIP8(authRespMsg) if err != nil { return s, err } // Respond to peer. if _, err = conn.Write(authRespPacket); err != nil { return s, err } // Initialize Secret. return h.secrets(authPacket, authRespPacket) }
 
handshakeState.makeAuthResp constructs authentication response message. Unlike authentication initialial message, it adds node’s ephemeral public directly into the response.
func (h *handshakeState) makeAuthResp() (msg *authRespV4, err error) { // Generate random nonce. h.respNonce = make([]byte, shaLen) if _, err = rand.Read(h.respNonce); err != nil { return nil, err } msg = new(authRespV4) copy(msg.Nonce[:], h.respNonce) copy(msg.RandomPubkey[:], exportPubkey(&h.randomPrivKey.PublicKey)) msg.Version = 4 return msg, nil }

Init with secrets

After shared secrets has been established between two peers. Geth construct sessionState stores instance used during encryption and decryption.
sessionState Fields:
  1. enc
    1. used to encrypt message
  1. dec
    1. used to decrypt message
  1. egressMAC
    1. used by initiator peer node to calculate encrypted message’s mac to ensure message integrity when sending message.
  1. ingressMAC
    1. used by non-initiator peer node to calculate encrypted message’s mac to ensure message integrity when sending message.
/// ---p2p/rlpx/rlpx.go--- // InitWithSecrets injects connection secrets as if a handshake had // been performed. This cannot be called after the handshake. func (c *Conn) InitWithSecrets(sec Secrets) { if c.session != nil { panic("can't handshake twice") } macc, err := aes.NewCipher(sec.MAC) if err != nil { panic("invalid MAC secret: " + err.Error()) } encc, err := aes.NewCipher(sec.AES) if err != nil { panic("invalid AES secret: " + err.Error()) } // we use an all-zeroes IV for AES because the key used // for encryption is ephemeral. iv := make([]byte, encc.BlockSize()) c.session = &sessionState{ enc: cipher.NewCTR(encc, iv), dec: cipher.NewCTR(encc, iv), egressMAC: newHashMAC(macc, sec.EgressMAC), ingressMAC: newHashMAC(macc, sec.IngressMAC), } }

Use Shared Secret to Exchange Message

After the handshake, secret AES and MACs has been shared between two nodes, they can use them to commnunicate with each other safely.
Geth maintains a conn for each peer which has performed handshake successfully. conn.transport has been initialized to be rlpxTransport instance. Other modules can call conn.ReadMsg and conn.WriteMsg to exchange messages with target peer.
 
Send Message
WriteMsg handles sending message to target peer.
Step:
  1. Lock wmu to ensure thread safety.
  1. Set write operation deadline.
  1. Call rlpxTransport.conn.Write to encrypt and send message.
  1. Set metrics.
/// ---p2p/transport.go--- func (t *rlpxTransport) WriteMsg(msg Msg) error { // Lock wmu to ensure thread safety: t.wmu.Lock() defer t.wmu.Unlock() // Copy message data to write buffer: t.wbuf.Reset() if _, err := io.CopyN(&t.wbuf, msg.Payload, int64(msg.Size)); err != nil { return err } // Set write operation deadline: t.conn.SetWriteDeadline(time.Now().Add(frameWriteTimeout)) // Call rlpxTransport.conn.Write to encrypt and send message: size, err := t.conn.Write(msg.Code, t.wbuf.Bytes()) if err != nil { return err } // Set metrics. msg.meterSize = size if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages m := fmt.Sprintf("%s/%s/%d/%#02x", egressMeterName, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode) metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize)) metrics.GetOrRegisterMeter(m+"/packets", nil).Mark(1) } return nil }
 
Steps in Write:
  1. Check handshake has been performed(Conn.session has been created).
  1. Check message size satisifies limitation.
  1. Check whether supports snappy, if supports, then use snappy to compress message.
  1. call c.session.writeFrame to encrypt and send message.
/// ---p2p/rlpx/rlpx.go--- // Conn is an RLPx network connection. It wraps a low-level network connection. The // underlying connection should not be used for other activity when it is wrapped by Conn. // // Before sending messages, a handshake must be performed by calling the Handshake method. // This type is not generally safe for concurrent use, but reading and writing of messages // may happen concurrently after the handshake. type Conn struct { dialDest *ecdsa.PublicKey conn net.Conn session *sessionState // These are the buffers for snappy compression. // Compression is enabled if they are non-nil. snappyReadBuffer []byte snappyWriteBuffer []byte } // sessionState contains the session keys. type sessionState struct { enc cipher.Stream dec cipher.Stream egressMAC hashMAC ingressMAC hashMAC rbuf readBuffer wbuf writeBuffer } // Write writes a message to the connection. // // Write returns the written size of the message data. This may be less than or equal to // len(data) depending on whether snappy compression is enabled. func (c *Conn) Write(code uint64, data []byte) (uint32, error) { // Check handshake has been performed(Conn.session has been created). if c.session == nil { panic("can't WriteMsg before handshake") } // Check message size satisifies limitation. if len(data) > maxUint24 { return 0, errPlainMessageTooLarge } // Check whether supports snappy, if supports, then use snappy to compress message. if c.snappyWriteBuffer != nil { // Ensure the buffer has sufficient size. // Package snappy will allocate its own buffer if the provided // one is smaller than MaxEncodedLen. c.snappyWriteBuffer = growslice(c.snappyWriteBuffer, snappy.MaxEncodedLen(len(data))) data = snappy.Encode(c.snappyWriteBuffer, data) } // call c.session.writeFrame to encrypt and send message. wireSize := uint32(len(data)) err := c.session.writeFrame(c.conn, code, data) return wireSize, err }
 
writeFrame uses sessionState.enc and sessionState.egressMAC to encrypt message, and send to the remote peer.
/// --p2p/rlpx/rlpx.go--- func (h *sessionState) writeFrame(conn io.Writer, code uint64, data []byte) error { h.wbuf.reset() // Write header. fsize := rlp.IntSize(code) + len(data) if fsize > maxUint24 { return errPlainMessageTooLarge } header := h.wbuf.appendZero(16) putUint24(uint32(fsize), header) copy(header[3:], zeroHeader) h.enc.XORKeyStream(header, header) // Write header MAC. h.wbuf.Write(h.egressMAC.computeHeader(header)) // Encode and encrypt the frame data. offset := len(h.wbuf.data) h.wbuf.data = rlp.AppendUint64(h.wbuf.data, code) h.wbuf.Write(data) if padding := fsize % 16; padding > 0 { h.wbuf.appendZero(16 - padding) } framedata := h.wbuf.data[offset:] h.enc.XORKeyStream(framedata, framedata) // Write frame MAC. h.wbuf.Write(h.egressMAC.computeFrame(framedata)) _, err := conn.Write(h.wbuf.data) return err }
 
Receive Message
ReadMsg handles receiving message from the remote peer.
Step:
  1. Lock rmu to ensure read operation thread safety.
  1. Set read operation deadline.
  1. Call rlpxTransport.conn.Read to receive and decrypt message.
  1. Construct Msg and return.
/// ---p2p/server.go--- // conn wraps a network connection with information gathered // during the two handshakes. type conn struct { fd net.Conn transport node *enode.Node flags connFlag cont chan error // The run loop uses cont to signal errors to SetupConn. caps []Cap // valid after the protocol handshake name string // valid after the protocol handshake } /// ---p2p/transport.go--- func (t *rlpxTransport) ReadMsg() (Msg, error) { // Lock rmu to ensure read operation thread safety. t.rmu.Lock() defer t.rmu.Unlock() var msg Msg // Set read operation deadline. t.conn.SetReadDeadline(time.Now().Add(frameReadTimeout)) // Call rlpxTransport.conn.Read to receive and decrypt message. code, data, wireSize, err := t.conn.Read() // Construct Msg and return. if err == nil { // Protocol messages are dispatched to subprotocol handlers asynchronously, // but package rlpx may reuse the returned 'data' buffer on the next call // to Read. Copy the message data to avoid this being an issue. data = common.CopyBytes(data) msg = Msg{ ReceivedAt: time.Now(), Code: code, Size: uint32(len(data)), meterSize: uint32(wireSize), Payload: bytes.NewReader(data), } } return msg, err }
 
Steps in Read:
  1. Check handshake has been performed(Conn.session has been created).
  1. Call session.readFrame to read, verify and decrypt message.
  1. If snappy is enabled, verify and decompress message.
/// ---p2p/rlpx/rlpx.go--- // Read reads a message from the connection. // The returned data buffer is valid until the next call to Read. func (c *Conn) Read() (code uint64, data []byte, wireSize int, err error) { // Check handshake has been performed(Conn.session has been created). if c.session == nil { panic("can't ReadMsg before handshake") } // Call session.readFrame to read, verify and decrypt message. frame, err := c.session.readFrame(c.conn) if err != nil { return 0, nil, 0, err } code, data, err = rlp.SplitUint64(frame) if err != nil { return 0, nil, 0, fmt.Errorf("invalid message code: %v", err) } wireSize = len(data) // If snappy is enabled, verify and decompress message. if c.snappyReadBuffer != nil { var actualSize int actualSize, err = snappy.DecodedLen(data) if err != nil { return code, nil, 0, err } if actualSize > maxUint24 { return code, nil, 0, errPlainMessageTooLarge } c.snappyReadBuffer = growslice(c.snappyReadBuffer, actualSize) data, err = snappy.Decode(c.snappyReadBuffer, data) } return code, data, wireSize, err }
 
readFrame uses sessionState.ingressMAC and sessionState.dec to read, verify and decrypt message.
/// ---p2p/rlpx/rlpx.go--- func (h *sessionState) readFrame(conn io.Reader) ([]byte, error) { h.rbuf.reset() // Read the frame header. header, err := h.rbuf.read(conn, 32) if err != nil { return nil, err } // Verify header MAC. wantHeaderMAC := h.ingressMAC.computeHeader(header[:16]) if !hmac.Equal(wantHeaderMAC, header[16:]) { return nil, errors.New("bad header MAC") } // Decrypt the frame header to get the frame size. h.dec.XORKeyStream(header[:16], header[:16]) fsize := readUint24(header[:16]) // Frame size rounded up to 16 byte boundary for padding. rsize := fsize if padding := fsize % 16; padding > 0 { rsize += 16 - padding } // Read the frame content. frame, err := h.rbuf.read(conn, int(rsize)) if err != nil { return nil, err } // Validate frame MAC. frameMAC, err := h.rbuf.read(conn, 16) if err != nil { return nil, err } wantFrameMAC := h.ingressMAC.computeFrame(frame) if !hmac.Equal(wantFrameMAC, frameMAC) { return nil, errors.New("bad frame MAC") } // Decrypt the frame data. h.dec.XORKeyStream(frame, frame) return frame[:fsize], nil }

Record Peer Node

After the handshake succeeds, the peer node’s ENR is recorded in the conn.
If its not the dialing node, then calls nodeFromConn to create corresponding ENR.
/// ---p2p/server.go--- // conn wraps a network connection with information gathered // during the two handshakes. type conn struct { fd net.Conn transport node *enode.Node flags connFlag cont chan error // The run loop uses cont to signal errors to SetupConn. caps []Cap // valid after the protocol handshake name string // valid after the protocol handshake } func (srv *Server) setupConn(c *conn, dialDest *enode.Node) error { // ... // Run the RLPx handshake. remotePubkey, err := c.doEncHandshake(srv.PrivateKey) if err != nil { srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) return fmt.Errorf("%w: %v", errEncHandshakeError, err) } // record peer node's ENR in conn if dialDest != nil { c.node = dialDest } else { c.node = nodeFromConn(remotePubkey, c.fd) } // ... } func nodeFromConn(pubkey *ecdsa.PublicKey, conn net.Conn) *enode.Node { var ip net.IP var port int if tcp, ok := conn.RemoteAddr().(*net.TCPAddr); ok { ip = tcp.IP port = tcp.Port } return enode.NewV4(pubkey, ip, port, port) }

Capability Negotiation Handshake

doProtoHandshake performs the protocol handshake between two nodes. This handshake exchanges information about protocols supported by two peers, establishes communication parameters and capabilities between peers.
Function Signature
  • Parameters:
    • our *protoHandshake: The handshake data to be sent to the remote peer, which includes information of protocols supported by the node.
  • Returns:
    • their *protoHandshake: The handshake data received from the remote peer.
    • err error: Any error encountered during the handshake process.
Steps:
  1. Concurrent Write Operation:
      • A goroutine is spawned to send the local handshake message (our) to the remote peer. The Send function is used to encode message using RLP and sends to the target peer.
      • werr is a buffered channel used to capture any errors from the write operation
  1. Read Remote Handshake:
      • waits to read the handshake message from the remote peer.
      • If an error occurs while reading the handshake, the function waits for the write operation to complete (<-werr) and then returns the error.
  1. Check Write Error:
      • After successfully reading the handshake from the remote peer, the function checks for any errors that may have occurred during the write operation.
  1. Enable Snappy Compression (if supported):
    1. If the protocol version supports Snappy compression, it is enabled by calling SetSnappy on the connection, which initialize the corresponding data in conn.
      Snappy is a fast compression and decompression library. It is often used in networking to reduce the size of the data being transmitted, thereby improving transmission speed and reducing bandwidth usage.
  1. Return the Remote Peer’s Supported Protocol Information:
/// ---p2p/transport.go--- func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) { // Writing our handshake happens concurrently, we prefer // returning the handshake read error. If the remote side // disconnects us early with a valid reason, we should return it // as the error so it can be tracked elsewhere. werr := make(chan error, 1) go func() { werr <- Send(t, handshakeMsg, our) }() if their, err = readProtocolHandshake(t); err != nil { <-werr // make sure the write terminates too return nil, err } if err := <-werr; err != nil { return nil, fmt.Errorf("write error: %v", err) } // If the protocol version supports Snappy encoding, upgrade immediately t.conn.SetSnappy(their.Version >= snappyProtocolVersion) return their, nil } /// ---p2p/peer.go--- const ( // devp2p message codes handshakeMsg = 0x00 discMsg = 0x01 pingMsg = 0x02 pongMsg = 0x03 ) // protoHandshake is the RLP structure of the protocol handshake. type protoHandshake struct { Version uint64 Name string Caps []Cap ListenPort uint64 ID []byte // secp256k1 public key // Ignore additional fields (for forward compatibility). Rest []rlp.RawValue `rlp:"tail"` } // Cap is the structure of a peer capability. type Cap struct { Name string Version uint }
 
Send encodes message using RLP, sends the encoded message to the remote peer, and performs some metrics.
/// ---p2p/message.go--- // Send writes an RLP-encoded message with the given code. // data should encode as an RLP list. func Send(w MsgWriter, msgcode uint64, data interface{}) error { size, r, err := rlp.EncodeToReader(data) if err != nil { return err } return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r}) }
 
readProtocolHandshake reads handshake message from the remote peer, validates message, and decodes the payload into protoHandshake which includes protocols information supported by the remote peer.
/// ---p2p/transport.go--- func readProtocolHandshake(rw MsgReader) (*protoHandshake, error) { msg, err := rw.ReadMsg() if err != nil { return nil, err } if msg.Size > baseProtocolMaxMsgSize { return nil, errors.New("message too big") } if msg.Code == discMsg { // Disconnect before protocol handshake is valid according to the // spec and we send it ourself if the post-handshake checks fail. // We can't return the reason directly, though, because it is echoed // back otherwise. Wrap it in a string instead. var reason [1]DiscReason rlp.Decode(msg.Payload, &reason) return nil, reason[0] } if msg.Code != handshakeMsg { return nil, fmt.Errorf("expected handshake, got %x", msg.Code) } var hs protoHandshake if err := msg.Decode(&hs); err != nil { return nil, err } if len(hs.ID) != 64 || !bitutil.TestBytes(hs.ID) { return nil, DiscInvalidIdentity } return &hs, nil } /// ---p2p/transport.go--- func (t *rlpxTransport) ReadMsg() (Msg, error) { t.rmu.Lock() defer t.rmu.Unlock() var msg Msg t.conn.SetReadDeadline(time.Now().Add(frameReadTimeout)) code, data, wireSize, err := t.conn.Read() if err == nil { // Protocol messages are dispatched to subprotocol handlers asynchronously, // but package rlpx may reuse the returned 'data' buffer on the next call // to Read. Copy the message data to avoid this being an issue. data = common.CopyBytes(data) msg = Msg{ ReceivedAt: time.Now(), Code: code, Size: uint32(len(data)), meterSize: uint32(wireSize), Payload: bytes.NewReader(data), } } return msg, err }
 
SetSnappy intialize Conn's fields to support Snappy communication.
/// ---p2p/transport.go--- type Conn struct { dialDest *ecdsa.PublicKey conn net.Conn session *sessionState // These are the buffers for snappy compression. // Compression is enabled if they are non-nil. snappyReadBuffer []byte snappyWriteBuffer []byte } // SetSnappy enables or disables snappy compression of messages. This is usually called // after the devp2p Hello message exchange when the negotiated version indicates that // compression is available on both ends of the connection. func (c *Conn) SetSnappy(snappy bool) { if snappy { c.snappyReadBuffer = []byte{} c.snappyWriteBuffer = []byte{} } else { c.snappyReadBuffer = nil c.snappyWriteBuffer = nil } }

Record Remote Node Capabilities

After protocol handshake succeeds, it checks the remote node’s ID matches the ID sent during protocol handshake. Then record the remote node’s protocol capabilities into conn.caps.
/// ---p2p/server.go--- // conn wraps a network connection with information gathered // during the two handshakes. type conn struct { fd net.Conn transport node *enode.Node flags connFlag cont chan error // The run loop uses cont to signal errors to SetupConn. caps []Cap // valid after the protocol handshake name string // valid after the protocol handshake } func (srv *Server) setupConn(c *conn, dialDest *enode.Node) error { // ... // Run the capability negotiation handshake. phs, err := c.doProtoHandshake(srv.ourHandshake) if err != nil { clog.Trace("Failed p2p handshake", "err", err) return fmt.Errorf("%w: %v", errProtoHandshakeError, err) } if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) { clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID)) return DiscUnexpectedIdentity } c.caps, c.name = phs.Caps, phs.Name // ... }
 

Post-Handshake/Final-Peer-Addition Checkpoint

checkpoint is used to coordinate the progression of connections through different stages of the connection setup process.
How It Works
The checkpoint function is designed to:
  1. Send the Connection to the Specified Stage Channel:
      • It sends the connection c to the specified stage channel. This allows the server's main loop (Server.run function) to process the connection at the appropriate stage.
      • This stage channel could be one of several stages, such as checkpointPostHandshake or checkpointAddPeer.
  1. Handle Server Stopping:
      • The function includes a select statement with a case for the server's quit channel. If the server is stopping (i.e., srv.quit is closed), the function will return an errServerStopped error.
  1. Wait for Continuation Signal:
      • After sending the connection to the stage channel, the function waits for a signal on the connection's cont channel sent by the main loop(Server.run function). This is used to indicate whether the connection has successfully passed the current stage checks.
      • The continuation signal is an error value, which will be nil if the connection has passed the checks, or contain an error if it failed.
/// ---p2p/server.go--- // checkpoint sends the conn to run, which performs the // post-handshake checks for the stage (posthandshake, addpeer). func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error { select { case stage <- c: case <-srv.quit: return errServerStopped } return <-c.cont }
 
During the peer connection set up phase, it needs two checks(checkpoints) from the main Server.run loop, one is after the handshake, the other is after the protocol handshake.
/// ---p2p/server.go--- func (srv *Server) setupConn(c *conn, dialDest *enode.Node) error { // ... // Run the RLPx handshake. remotePubkey, err := c.doEncHandshake(srv.PrivateKey) if err != nil { srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) return fmt.Errorf("%w: %v", errEncHandshakeError, err) } if dialDest != nil { c.node = dialDest } else { c.node = nodeFromConn(remotePubkey, c.fd) } clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags) err = srv.checkpoint(c, srv.checkpointPostHandshake) if err != nil { clog.Trace("Rejected peer", "err", err) return err } // Run the capability negotiation handshake. phs, err := c.doProtoHandshake(srv.ourHandshake) if err != nil { clog.Trace("Failed p2p handshake", "err", err) return fmt.Errorf("%w: %v", errProtoHandshakeError, err) } if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) { clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID)) return DiscUnexpectedIdentity } c.caps, c.name = phs.Caps, phs.Name err = srv.checkpoint(c, srv.checkpointAddPeer) if err != nil { clog.Trace("Rejected peer", "err", err) return err } return nil }