数据结构功能汇总 - Agzs/geth-pbft-study GitHub Wiki

数据结构功能汇总

snapshot

结构体

// Snapshot is the state of the authorization voting at a given point in time.
type Snapshot struct {
	config   *params.CliqueConfig // Consensus engine parameters to fine tune behavior
	sigcache *lru.ARCCache        // Cache of recent block signatures to speed up ecrecover

	Number  uint64                      `json:"number"`  // Block number where the snapshot was created
	Hash    common.Hash                 `json:"hash"`    // Block hash where the snapshot was created
	Signers map[common.Address]struct{} `json:"signers"` // Set of authorized signers at this moment
	Recents map[uint64]common.Address   `json:"recents"` // Set of recent signers for spam protections
	Votes   []*Vote                     `json:"votes"`   // List of votes cast in chronological order
	Tally   map[common.Address]Tally    `json:"tally"`   // Current vote tally to avoid recalculating
}

注意到:

1)sigcache*lru.ARCCache类型,为缓存算法LRU(Least recently used,最近最少使用),该变量和Clique中的signatures变量相关联,主要用于从signature中恢复signer

2) Number在创建snapshot时保存了当时的区块号,实际上Number保存了从gensis开始计数到目前为止,snapshot共处理的header个数,注意,这里的header不包括重复的header,并且该headerNumber是连续的。

3)Hash保存与Number相对的headerhash

  1. Signer只保存当前状态下授权者,为字典类型,keySigner的地址,value为空结构体。

  2. Recent保存当前已签过名的Signer,数目始终维持在header.number-(len(snap.Signers)/2 + 1)

  3. Votes保存最近未签过名的Signervote

  4. Tally保存投票数过半的Tally,票数保存在Tally.Votes中。

主要函数

1)func newSnapshot(config *params.CliqueConfig, sigcache *lru.ARCCache, number uint64, hash common.Hash, signers []common.Address) *Snapshot

初始化函数,根据传递的参数将Snapshot初始化,值得注意的是Signers成员变量在此函数中以(signer_address, struct{})形式完全初始化,在Cliquesnapshot()方法中被调用,且仅此一处。

  1. func loadSnapshot(config *params.CliqueConfig, sigcache *lru.ARCCache, db ethdb.Database, hash common.Hash) (*Snapshot, error)

从数据库中加载已存在的snapshot,由于每1024个块保存一次snapshot,所以调用该函数,可直接使用前期已保存的snapshot,与该函数相对应的是func (s *Snapshot) store(db ethdb.Database) error,该函数用于存储,每1024个块保存一次snapshot(第0块也需保存一次),供loadSnapshot()加载。

  1. cast() 和 uncast()

cast(address common.Address, authorize bool)snapshot.Tally中添加新的投票,若address存在,则票数加1; 若address不存在,创建新的投票。主要用于授权新的signer

uncast(address common.Address, authorize bool)cast()执行相反操作

这两个函数执行后,都需要更改snap.Votes,或添加,或删除。

  1. apply(headers []*types.Header)

该函数为核心函数,根据不同情况处理参数header,最终返回一个新的snap,该函数在Cliquesnapshot()方法中被调用,且仅此一处。

该函数通过for循环依照每个header处理snap,先后修改snap.Recentssnap.Signerssnap.Votessnap.Tally,for循环结束后,修改snap.Number += uint64(len(headers))snap.Hash = headers[len(headers)-1].Hash()

  1. signers() []common.Address

该函数实际为选择排序算法,用于对当前状态下的Signers进行排序,每Clique.config.Epoch+1个块时被调用,将当前状态下所有的Signers保存到第Clique.config.Epoch号块的header.Extra中。Clique中默认Epoch为30000。

  1. func (c *Clique) snapshot(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header) (*Snapshot, error)

Clique的方法,主要在Prepare()、verifyCascadingFields()、verifySeal()、Seal()中被调用。通过传递的参数,主要根据numberheaders的多种情况返回snap。值得注意的是,该函数的被调用时:snapshot(chain, number-1, header.ParentHash, parents),所传递的number为当前header.Number-1,处理的headers也是当前header的祖先。

个人理解

Snapshot始终伴随着BlockchainSnapshot(snap.hash, snap)的形式保存在Clique.recents中。举个例子,比如,当前块号为16,需要获取snap中保存的数据(比如Signers、Recents等),但是Clique.recents只保存了snap1(0-8),然后snapshot()就会从15号块向前查找snap,直到找到第8号块,然后再顺序处理9-15号块,形成新的snap2(0-15),采用此种机制,0-8号块之前已处理过了,并且已保存起来,后期直接使用,无需重复此过程,以此类推,每1024个块保存一次snapshot

各种Store

xxxStore

pbftCore的成员变量xxxStore:blockStore、certStore、viewChangeStore、newViewStore、checkpointStore。这些store主要用于recvXXX()函数保存接收到的消息。

blockStore

define

blockStore map[string]*types.Block // track request batches

blockStore为字典类型,key是block的hash值,为string类型,一般由string(preprep.BlockHash)hash(preprep.GetBlockMsg())获得key值;valueblock类型,采用ethereum中已定义的block,如下:

// Block represents an entire block in the Ethereum blockchain.
type Block struct {
	header       *Header
	uncles       []*Header
	transactions Transactions

	// caches
	hash atomic.Value
	size atomic.Value

	// Td is used by package core to store the total difficulty of the chain up to and including the block.
	td *big.Int

	// These fields are used by package eth to track inter-peer block relay.
	ReceivedAt   time.Time
	ReceivedFrom interface{}
}

Usage

blockfabric中的requestBatch替换,并且用blockStore替换reqBatchStore

reqBatchStore主要用于request请求的打包,现在简化为Block,不需要做任何处理,走一遍共识过程就ok了。

certStore

define

certStore map[msgID]*msgCert // track quorum certificates for requests

certStore为字典类型,keymsgID{v,n}一般通过{view,SequenceNumber}赋值;valuemsgCertmsgCert用于三阶段协议(prePrepare、prepare、commit)消息数目的统计

type msgID struct { // our index through certStore
	v uint64
	n uint64
}
type msgCert struct {
	digest      string            // 保存block的hash的string形式
	prePrepare  *types.PrePrepare // 保存prePrepare消息,注意为指针类型,可在其他函数中修改prepare
	sentPrepare bool              // 标记是否已发送prepare消息
	prepare     []*types.Prepare  // 保存来自各个VP的Prepare消息,用于后期统计prepare消息的数目
	sentCommit  bool              // 标记是否已发送commit消息
	commit      []*types.Commit   // 保存来自各个VP的commit消息,用于后期统计commit消息的数目
}

prePrepare、prepare、commit结构如下:

type PrePrepare struct {
	View           uint64 `protobuf:"varint,1,opt,name=view" json:"view,omitempty"`
	SequenceNumber uint64 `protobuf:"varint,2,opt,name=sequence_number,json=sequenceNumber" json:"sequence_number,omitempty"`
	BlockHash      []byte `protobuf:"bytes,3,opt,name=block_hash,json=blockHash,proto3" json:"block_hash,omitempty"`
	Block          *Block `protobuf:"bytes,4,opt,name=block_msg,json=blockMsg" json:"block_msg,omitempty"`
	ReplicaId      uint64 `protobuf:"varint,5,opt,name=replica_id,json=replicaId" json:"replica_id,omitempty"`
}

type Prepare struct {
	View           uint64 `protobuf:"varint,1,opt,name=view" json:"view,omitempty"`
	SequenceNumber uint64 `protobuf:"varint,2,opt,name=sequence_number,json=sequenceNumber" json:"sequence_number,omitempty"`
	BlockHash      []byte `protobuf:"bytes,3,opt,name=block_hash,json=blockHash,proto3" json:"block_hash,omitempty"`
	ReplicaId      uint64 `protobuf:"varint,4,opt,name=replica_id,json=replicaId" json:"replica_id,omitempty"`
}

type Commit struct {
	View           uint64 `protobuf:"varint,1,opt,name=view" json:"view,omitempty"`
	SequenceNumber uint64 `protobuf:"varint,2,opt,name=sequence_number,json=sequenceNumber" json:"sequence_number,omitempty"`
	BlockHash      []byte `protobuf:"bytes,3,opt,name=block_hash,json=blockHash,proto3" json:"block_hash,omitempty"`
	ReplicaId      uint64 `protobuf:"varint,4,opt,name=replica_id,json=replicaId" json:"replica_id,omitempty"`
}

值得注意的是,同一个区块的prePrepare、prepare、commit中相同变量保存的数值都是相同的,比如SequenceNumber在三个结构体中保存的都是同一个数值。

Usage

根据{view,seqNo}组成的msgID获取cert,注意到只在func (instance *pbftCore) getCert(v uint64, n uint64) (cert *msgCert)中添加新的cert.其他处的调用只是获取cert,然后使用cert的成员变量判断条件,进行其他操作。

viewChangeStore

define

viewChangeStore map[vcidx]*types.ViewChange // track view-change messages

type vcidx struct {
	v  uint64
	id uint64
}
type ViewChange struct {
	View      uint64           `protobuf:"varint,1,opt,name=view" json:"view,omitempty"`
	H         uint64           `protobuf:"varint,2,opt,name=h" json:"h,omitempty"`
	Cset      []*ViewChange_C  `protobuf:"bytes,3,rep,name=cset" json:"cset,omitempty"`
	Pset      []*ViewChange_PQ `protobuf:"bytes,4,rep,name=pset" json:"pset,omitempty"`
	Qset      []*ViewChange_PQ `protobuf:"bytes,5,rep,name=qset" json:"qset,omitempty"`
	ReplicaId uint64           `protobuf:"varint,6,opt,name=replica_id,json=replicaId" json:"replica_id,omitempty"`
	Signature []byte           `protobuf:"bytes,7,opt,name=signature,proto3" json:"signature,omitempty"`
	Signer    common.Address   `protobuf:"bytes,8,opt,name=signer,proto3" json:"signature,omitempty"` //=> add Signer used for verifying signature --Agzs
}

Usage

viewchangeStore用于保存viewchange的信息,且仅在recvViewChange(vc *ViewChange)中被赋值,语句为instance.viewChangeStore[vcidx{vc.View, vc.ReplicaId}] = vc,其它的调用都只是获取其中的viewChange信息

newViewStore

define

newViewStore map[uint64]*types.NewView // track last new-view we received or sent

type NewView struct {
	View      uint64            `protobuf:"varint,1,opt,name=view" json:"view,omitempty"`
	Vset      []*ViewChange     `protobuf:"bytes,2,rep,name=vset" json:"vset,omitempty"`
	Xset      map[uint64]string `protobuf:"bytes,3,rep,name=xset" json:"xset,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
	ReplicaId uint64            `protobuf:"varint,4,opt,name=replica_id,json=replicaId" json:"replica_id,omitempty"`
}

Usage

newViewStore用于保存NewView,当同一个viewviewchange到达一定的的数量(instance.N - instance.f)后,就会产生viewChangeQuorumEvent,在收到 viewChangeQuorumEvent事件后, 会产生NewViewsend操作

viewChangeQuorumEvent is returned to the event loop when a new ViewChange message is received which is part of a quorum cert

newViewStore的赋值操作只在下面两个函数中进行:

func (instance *pbftCore) sendNewView() events.Event {
        ...
	nv := &NewView{
		View:      instance.view,
		Vset:      vset,
		Xset:      msgList,
		ReplicaId: instance.id,
	}
	...
	instance.newViewStore[instance.view] = nv
	return instance.processNewView()
}

func (instance *pbftCore) recvNewView(nv *NewView) events.Event {
	logger.Infof("Replica %d received new-view %d",instance.id, nv.View)
	...
	instance.newViewStore[nv.View] = nv
	return instance.processNewView()
}

checkpointStore

define

checkpointStore map[Checkpoint]bool // track checkpoints as set

type Checkpoint struct {
	SequenceNumber uint64 `protobuf:"varint,1,opt,name=sequence_number,json=sequenceNumber" json:"sequence_number,omitempty"`
	ReplicaId      uint64 `protobuf:"varint,2,opt,name=replica_id,json=replicaId" json:"replica_id,omitempty"`
	Id             string `protobuf:"bytes,3,opt,name=id" json:"id,omitempty"`   //=>checkpoint's name
}

Usage

checkpointStore用于保存checkpoint, checkpoint主要用于垃圾回收和viewchange中:

我们一连执行了K条请求,在第K条请求执行完时,向全网发起广播,告诉大家它已经将这K条执行完毕,要是大家反馈说这K条我们也执行完毕了,那就可以删除这K条的信息了,接下来再执行K条,完成后再发起一次广播,即每隔K条发起一次全网共识,这个概念叫checkpoint,每隔K条去征求一下大家的意见,要是获得了大多数的认同(a quorum certificate with 2 f + 1 CHECKPOINT messages (including its own)),就形成了一个 stable checkpoint(记录在第K条的编号)

We refer to the states produced by the execution of these requests as checkpoints and we say that a checkpoint with a proof is a stable checkpoint.

checkpointStore的赋值操作仅在recvCheckpoint()函数中:

func (instance *pbftCore) recvCheckpoint(chkpt *Checkpoint) events.Event {
	logger.Debugf("Replica %d received checkpoint from replica %d, seqNo %d, digest %s", instance.id, chkpt.ReplicaId, chkpt.SequenceNumber, chkpt.Id)

	...
	instance.checkpointStore[*chkpt] = true

	...
	logger.Debugf("Replica %d found checkpoint quorum for seqNo %d, digest %s", instance.id, chkpt.SequenceNumber, chkpt.Id)

	instance.moveWatermarks(chkpt.SequenceNumber)

	return instance.processNewView()
}

viewChange

待完善。。。

当主节点挂掉后就触发了viewchange协议。新的view要延续上一个view的最终状态,比如给此时收到的新请求编号,还有处理上一个view还没来得及完全处理号的请求。

The basic idea behind the protocol is for the new primary to read information about stable and prepared certificates from a quorum and to propagate this information to the new view.

viewChange

viewchange

type ViewChange struct {
	View      uint64           `protobuf:"varint,1,opt,name=view" json:"view,omitempty"`
	H         uint64           `protobuf:"varint,2,opt,name=h" json:"h,omitempty"`
	Cset      []*ViewChange_C  `protobuf:"bytes,3,rep,name=cset" json:"cset,omitempty"`
	Pset      []*ViewChange_PQ `protobuf:"bytes,4,rep,name=pset" json:"pset,omitempty"`
	Qset      []*ViewChange_PQ `protobuf:"bytes,5,rep,name=qset" json:"qset,omitempty"`
	ReplicaId uint64           `protobuf:"varint,6,opt,name=replica_id,json=replicaId" json:"replica_id,omitempty"`
	Signature []byte           `protobuf:"bytes,7,opt,name=signature,proto3" json:"signature,omitempty"`
	Signer    common.Address   `protobuf:"bytes,8,opt,name=signer,proto3" json:"signature,omitempty"` //=> add Signer used for verifying signature --Agzs
}

type ViewChange_C struct {
	SequenceNumber uint64 `protobuf:"varint,1,opt,name=sequence_number,json=sequenceNumber" json:"sequence_number,omitempty"`
	Id             string `protobuf:"bytes,3,opt,name=id" json:"id,omitempty"`
}

type ViewChange_PQ struct {
	SequenceNumber uint64 `protobuf:"varint,1,opt,name=sequence_number,json=sequenceNumber" json:"sequence_number,omitempty"`
	BlockHash      string `protobuf:"bytes,2,opt,name=block_hash,json=blockHash" json:"block_hash,omitempty"`
	View           uint64 `protobuf:"varint,3,opt,name=view" json:"view,omitempty"`
}
  • Pset 保存上一个view中达到prepared状态的请求的一些信息。该集合只在发生viewchange时生效。 Pset中的赋值发生在calcPSet(),借助certStore进行赋值

a quorum certificate with the PRE-PREPARE and 2 f matching PREPARE messages for sequence number n, view v, and request m,如果一个replica达到了英文所说的条件,那么我们就说该请求在这个replica上的状态是prepared

  • Qset 记录在上一个view里到达pre-prepared状态的请求的一些信息。该集合只在发生viewchange时生效。 Qset中的赋值发生在calcQSet(),借助certStore进行赋值

如果一个replica对请求m发出了PRE-PREPARE和PREPARE信息,那么我们就说该请求m在这个replica节点上处于pre-prepared状态

  • Cset 保存(sequence_number, checkpoint_digest)
  • h 是指 replica_i所保存的最近稳定的checkpoint的sequence number,即代码中的low water mark。
  • H 即代码中的high water mark,且H = h + L,L为log size。
  • n 分配给消息的编号,h < n < H

对某个replica来说,它的低水位h等于它上一个stable checkpoint的编号,高水位H=h+L,L是指定的log数值,它一般是checkpoint周期K的常数倍(这个常数是比较小的, 比如2倍),这样即使该replica步伐很快,它处理的请求编号达到高水位H后也得停一停自己的脚步,等待其他的replica, 直到它的stable checkpoint发生变化,它才能继续向前。

viewchange中PSet和QSet, 与pbftCore中的pset、qset关系密切:

type pbftCore struct {
	...
	pset  map[uint64]*ViewChange_PQ //=> (ViewChange_PQ.SequenceNumber, ViewChange_PQ)
	qset  map[qidx]*ViewChange_PQ   //=> (qidx{ViewChange_PQ.BlockHash, ViewChange_PQ.SequenceNumber}, ViewChange_PQ)
        ...
}
type qidx struct {
	d string
	n uint64
}

summary

当其他节点发现主节点为恶意节点时,他们让自己的view ++(实际上指定了primary_Node=view % R,R为replica的总数),然后sendViewChange()向其他replica广播

其他replica收到viewChange事件后,启动recvViewChange()函数,当同一个replica收集到同一个view的viewchange数量达到(pbftCore.N - pbftCore.f)时,会发送一个viewChangeQuorumEvent事件

processEvent()函数中对应viewChangeQuorumEvent事件的case分支下,如果当前的replica为新的view下的primary,则会调用sendNewView(),然后广播NewView,其他replica收到NewView后进行处理调用processNewView();若为其他replica,则直接调用processNewView()处理;

processNewView()中会对请求(也就是block)进行合法分配编号,然后在processNewView2()中为这些请求初始化prePrepare,最终返回viewChangedEvent事件。

公钥和PeerID的对应

EnrollPrivKey and PeeID

type Impl struct {
	handlerFactory HandlerFactory
	handlerMap     *handlerMap
	ledgerWrapper  *ledgerWrapper
	secHelper      crypto.Peer
	engine         Engine
	isValidator    bool
	reconnectOnce  sync.Once
	discHelper     discovery.Discovery
	discPersist    bool
}
  • peer.handlerMap = &handlerMap{m: make(map[pb.PeerID]MessageHandler)}

  • peer.engine, err = engFactory(peer) = helper.GetEngine(peer) 返回engine

    • engine.helper = NewHelper(peer)
    • engine.consenter = controller.NewConsenter(engine.helper)>>pbft.GetPlugin(helper)>>New(helper)>> newObcBatch(id, config, stack) >> newPbftCore(id, config, op, etf)
  • peer.handlerFactory = peer.engine.GetHandlerFactory()

  • peer.isValidator = ValidatorEnabled()

  • peer.secHelper = secHelperFunc() 返回secHelper

peer.engine.consenter.pbft,peer为Impl类型

peer.handlerFactory实际上等于NewConsensusHandler()用于获得handler ConsensusHandler.MessageHandler.Coordinator由Impl实现

peer.secHelper.nodeImpl.enrollPrivKey, secHelper为crypto.Peer类型,实际为crypto.peerImpl被强制转换

type EngineImpl struct {
	consenter    consensus.Consenter
	helper       *Helper
	peerEndpoint *pb.PeerEndpoint
	consensusFan *util.MessageFan
}

EngineImpl.peerEndpoint.peerID peer.engine.peerEndpoint.peerID

summary

  • peer.engine.consenter.pbft
  • peer.engine.peerEndpoint.peerID
  • peer.secHelper.nodeImpl.enrollPrivKey
  • 都保存在peer中,peer为Impl类型