node discover - Agzs/geth-pbft-study GitHub Wiki

node discover

本文在addPeer操作解析的基础上进行分析说明

server.run() >> for, scheduleTasks() >> startTasks() >> go, t.Do(srv); >> dialTask.resolve(srv) >> srv.ntab.Resolve(t.dest.ID) >> Lookup(targetID) >> lookup(targetID, true)

通过不断调用,最终会在lookup()中调用findnode()函数

findnode sends a findnode request to the given node and waits until the node has sent up to k neighbors.

func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
	nodes := make([]*Node, 0, bucketSize)
	nreceived := 0
	errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
		reply := r.(*neighbors)
		for _, rn := range reply.Nodes {
			nreceived++
			n, err := t.nodeFromRPC(toaddr, rn)
			if err != nil {
				log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err)
				continue
			}
			nodes = append(nodes, n)
		}
		return nreceived >= bucketSize
	})
	t.send(toaddr, findnodePacket, &findnode{
		Target:     target,
		Expiration: uint64(time.Now().Add(expiration).Unix()),
	})
	err := <-errc
	return nodes, err
}

该函数向指定的toaddr发送带有findnodePacket标识的findnode消息,findnode结构体如下:

// findnode is a query for nodes close to the given target.
	type findnode struct {
		Target     NodeID // doesn't need to be an actual public key
		Expiration uint64
		// Ignore additional fields (for forward compatibility).
		Rest []rlp.RawValue `rlp:"tail"`
	}

findnode()函数仅在p2p/discover/table.go中的lookup(targetID NodeID, refreshIfEmpty bool)中被调用,而lookup()函数在两处函数中被调用:

1)func (tab *Table) Lookup(targetID NodeID) []*Node

Lookup performs a network search for nodes close to the given target. It approaches the target by querying nodes that are closer to it on each iteration. The given target does not need to be an actual node identifier.

2)func (tab *Table) doRefresh(done chan struct{})

doRefresh performs a lookup for a random target to keep buckets full. seed nodes are inserted if the table is empty (initial bootstrap or discarded faulty peers).

两处调用是存在差别的,(1)中通过tab.lookup(targetID, true)调用,其中targetID为已知存在的ID,而(2)中通过tab.lookup(target, false)调用,其中的target为随机生成的,不确定是否存在;此外,bool类型的参数值用于区别是否是refresh操作。

findnodePacket标识的findnode消息的处理类似pbftMessage的处理,首先在启动服务器时,启动了一系列服务,通过层层调用,最终会通过goroutine启动readLoop()函数,该函数通过for死循环处理收到的新消息,如下:

ListenUDP() >> newUDP() >> go udp.readLoop() >> for, t.handlePacket(from, buf[:nbytes])
func (t *udp) readLoop() {
	defer t.conn.Close()
	// Discovery packets are defined to be no larger than 1280 bytes.
	// Packets larger than this size will be cut at the end and treated as invalid because their hash won't match.
	buf := make([]byte, 1280)
	for {
		nbytes, from, err := t.conn.ReadFromUDP(buf)
		if netutil.IsTemporaryError(err) {
			// Ignore temporary read errors.
			log.Debug("Temporary UDP read error", "err", err)
			continue
		} else if err != nil {
			// Shut down the loop for permament errors.
			log.Debug("UDP read error", "err", err)
			return
		}
		t.handlePacket(from, buf[:nbytes])
	}
}

从函数中可以看到,readLoop()函数读出消息后,调用handlePacket()进行处理,如下:

func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
	packet, fromID, hash, err := decodePacket(buf)
	if err != nil {
		log.Debug("Bad discv4 packet", "addr", from, "err", err)
		return err
	}
	err = packet.handle(t, from, fromID, hash)
	log.Trace("<< "+packet.name(), "addr", from, "err", err)
	return err
}

handlePacket()函数通过调用decodePacket()对收到的消息进行解码,分类别可解码出pingpongfindnodeneighbors四种类型。

func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
	...
	var req packet
	switch ptype := sigdata[0]; ptype {
	case pingPacket:
		req = new(ping)
	case pongPacket:
		req = new(pong)
	case findnodePacket:
		req = new(findnode)
	case neighborsPacket:
		req = new(neighbors)
	default:
		return nil, fromID, hash, fmt.Errorf("unknown type: %d", ptype)
	}
	s := rlp.NewStream(bytes.NewReader(sigdata[1:]), 0)
	err = s.Decode(req)
	return req, fromID, hash, err
}

随后,在handlePacket()中,执行packet.handle(t, from, fromID, hash),而packet为一个接口,该接口被 pingpongfindnodeneighbors分别实现,所以若packetfindnode类型,则调用findnodehandle()方法,如下:

func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
	if expired(req.Expiration) {
		return errExpired
	}
	if t.db.node(fromID) == nil {
		// No bond exists, we don't process the packet. 
		return errUnknownNode
	}
	target := crypto.Keccak256Hash(req.Target[:])
	t.mutex.Lock()
	closest := t.closest(target, bucketSize).entries
	t.mutex.Unlock()

	p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
	// Send neighbors in chunks with at most maxNeighbors per packet
	// to stay below the 1280 byte limit.
	for i, n := range closest {
		if netutil.CheckRelayIP(from.IP, n.IP) != nil {
			continue
		}
		p.Nodes = append(p.Nodes, nodeToRPC(n))
		if len(p.Nodes) == maxNeighbors || i == len(closest)-1 {
			t.send(from, neighborsPacket, &p)
			p.Nodes = p.Nodes[:0]
		}
	}
	return nil
}

从函数可以看出,通过调用closest(target, bucketSize)获得指定target相近(逻辑上距离相近)的Node数组closest,然后遍历该数组,并将每个合法的node转化为RpcNode,添加到neighbors的成员变量Nodes中,当达到一定条件后,会执行t.send(from, neighborsPacket, &p),发送neighborsPacket标识的neighbors消息。

neighbors消息的接收和处理和findnode消息的过程相同,不同的是handle()方法:

func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
	if expired(req.Expiration) {
		return errExpired
	}
	if !t.handleReply(fromID, neighborsPacket, req) {
		return errUnsolicitedReply
	}
	return nil
}

func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
	matched := make(chan bool, 1)
	select {
	case t.gotreply <- reply{from, ptype, req, matched}:
		// loop will handle it
		return <-matched
	case <-t.closing:
		return false
	}
}

neighborshandle()方法间接调用handleReply()方法,真正的消息处理也在该方法中,如上,从函数中可以看出,该函数会将reply{from, ptype, req, matched}发送到t.gotreply通道中;该通道中数据的读取在loop()中,loop()函数和readLoop()函数一样,都是在启动服务器,初始化UDP时,通过goroutine启动的函数,如下:

func (t *udp) loop() {
      ...
      for {
	 resetTimeout()

	 select {
             ...
	     case r := <-t.gotreply:
		var matched bool
		for el := plist.Front(); el != nil; el = el.Next() {
			p := el.Value.(*pending)
			if p.from == r.from && p.ptype == r.ptype {
				matched = true
				// Remove the matcher if its callback indicates that all replies have been received. 
				// This isrequired for packet types that expect multiple reply packets.
				if p.callback(r.data) {
					p.errc <- nil
					plist.Remove(el)
				}
				// Reset the continuous timeout counter (time drift detection)
				contTimeouts = 0
			}
		}
		r.matched <- matched
               ...
          }
          ...
      }
      ...
}
    

该case分支中,plist相当于一个队列,依次从队首遍历到队尾,每个元素为pending类型,这些元素在findnode()函数中通过执行t.pending(toid, neighborsPacket, func(r interface{}) bool {})操作,将&pending{from: id, ptype: ptype, callback: callback, errc: ch}发送到udp.addpending通道中,然后在loop()函数中的case p := <-t.addpending分支,将数据添加到plist中。(注:pending()还被pingPacketpongPacket调用,在此仅以neighborsPacket为例,因其callback函数复杂)

所以,执行p.callback(r.data),实际上执行findnode()函数中t.pending()参数中自定义构造的函数,如下。而r.data实际为handleReply()的req参数,实际上为neighbors,最终将neighbors中的Nodes添加到findnode()函数的返回值nodes数组中。

func(r interface{}) bool {
	reply := r.(*neighbors)
	for _, rn := range reply.Nodes {
		nreceived++
		n, err := t.nodeFromRPC(toaddr, rn)
		if err != nil {
			log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err)
			continue
		}
		nodes = append(nodes, n)
	}
	return nreceived >= bucketSize
}

再从最开始lookup()中调用findnode()函数处说起,lookup()相关函数如下:

func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
	...
    for {
	// ask the alpha closest nodes that we haven't asked yet
	for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
	    n := result.entries[i]
	    if !asked[n.ID] {
		asked[n.ID] = true
		pendingQueries++
		go func() {
			// Find potential neighbors to bond with
			r, err := tab.net.findnode(n.ID, n.addr(), targetID)
			if err != nil {
				// Bump the failure counter to detect and evacuate non-bonded entries
				fails := tab.db.findFails(n.ID) + 1
				tab.db.updateFindFails(n.ID, fails)
				log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)

				if fails >= maxFindnodeFailures {
					log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
					tab.delete(n)
				}
			}
			reply <- tab.bondall(r)
		}()
	    }
	}
	if pendingQueries == 0 {
		// we have asked all closest nodes, stop the search
		break
	}
	// wait for the next reply
	for _, n := range <-reply {
		if n != nil && !seen[n.ID] {
			seen[n.ID] = true
			result.push(n, bucketSize)
		}
	}
	pendingQueries--
    }
    return result.entries
}

lookup()函数在for循环中会启动goroutine,调用自定义的函数,先执行findnode()操作,后执行tab.bondall()操作,后续调用过程如下

table.bondall() >> table.bond() >> table.pingpong() 和 table.add() >> table.ping() >> tab.net.ping(id, addr) <==> udp.ping()

最终在udp.ping()函数中发送带有pingPacket标识的ping消息,发送消息前,依然是通过t.pending(toid, pongPacket, func(interface{}) bool { return true })构造pending,然后将其发送t.addpending通道中,后续操作同neighborsPacketpending

func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
	// TODO: maybe check for ReplyTo field in callback to measure RTT
	errc := t.pending(toid, pongPacket, func(interface{}) bool { return true })
	t.send(toaddr, pingPacket, &ping{
		Version:    Version,
		From:       t.ourEndpoint,
		To:         makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB
		Expiration: uint64(time.Now().Add(expiration).Unix()),
	})
	return <-errc
}

后期ping消息的读取和处理都和findnode相同,handle方法如下:

func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
	if expired(req.Expiration) {
		return errExpired
	}
	t.send(from, pongPacket, &pong{
		To:         makeEndpoint(from, req.From.TCP),
		ReplyTok:   mac,
		Expiration: uint64(time.Now().Add(expiration).Unix()),
	})
	if !t.handleReply(fromID, pingPacket, req) {
		// Note: we're ignoring the provided IP address right now
		go t.bond(true, fromID, from, req.From.TCP)
	}
	return nil
}

在处理ping消息的时候,发送带有pongPacket标识的pong消息进行响应,后续pong消息的收发不再赘述。

summary

该协议中共有四种类型的消息:ping、pong、findnode、neighbors,对应的标识分别为pingPacket、pongPacket、findnodePacket、neighborsPacket

当前node调用findnode()函数,通过给定target搜索该目标附近的node,并向其发送findnode消息;

对方收到findNode消息后,通过处理,以neighbors消息进行响应;

当前node收到neighbors消息,会处理其包含的nodes,并对这些nodes发送ping消息;

其他nodes会以pong消息进行响应.

其他相关资料

RLPx Node Discovery Protocol

以太坊P2P模块节点发现算法剖析

Rlpx