10.16 moveStore() and doByzantine. - Agzs/geth-pbft-study GitHub Wiki

moveStore()

说明

由于certStoreblockStore都只有添加操作,没有删除操作,这很容易造成资源浪费,系统性能降低,所以自己设计了moveStore()函数,用于定量清理certStoreblockStore

之前,在fabric中,certStoreblockStore是利用lastexec等参数通过moveWaterMark()实现的。

现在,初步设计了moveStore(),实现了xxxStore的清理,但是考虑的情况(即调用关系)可能需要完善。//TODO

设计思路

初步设想,在blockcommited后,清理certStoreblockStore,所以该函数的调用发生在pbftCore.recvCommit()中,利用commit信息中保存的blockhash获取当前header.number,通过比较number的值的大小,结合其他因素,进行定量删除。具体调用形式:instance.moveStore(commit.BlockHash).

在pbft.go中添加const certStorePeriod = 100,初步设定certStoreblockStore至少保存100条数据,然后每累计到200条数据,就删除前期的100条数据,注意,最多能删除100条。

在函数体中,当数据累计到200条时,通过比较header.number,删除certStore中保存的header.number小于当前committedheader.number的数据,进一步通过cert.digest删除blockStore中的block.

代码

func (instance *pbftCore) moveStore(blockHash []byte) {

	if len(instance.certStore) <= certStorePeriod {
		return
	}
	count := 0
	number := instance.blockStore[string(blockHash)].Header().Number.Uint64()
	for idx, cert := range instance.certStore {
		header := cert.prePrepare.Block.Header()
		if header.Number.Uint64() < number {
			logger.Debugf("Replica %d cleaning quorum certificate for view=%d/seqNo=%d",
				instance.id, idx.v, idx.n)
			delete(instance.blockStore, cert.digest)
			delete(instance.certStore, idx)
			count++
		}
		if 2*count > certStorePeriod {
			break
		}
	}

}

doByzantine

说明

参照fabric中的概率模拟byzantine部分的代码,改写了innerBroadcast()函数,在fabric中,通过调用instance.consumer.unicast(msgRaw, uint64(i))模拟恶意节点发送消息,通过调用instance.consumer.broadcast(msgRaw)模拟正常节点广播消息。

修改

现在,将instance.consumer.unicast(msgRaw, uint64(i))改为continue,即恶意节点什么都不做; 将instance.consumer.broadcast(msgRaw)改为instance.commChan <- msg,利用channel机制,其中msg是函数的参数。

代码

func (instance *pbftCore) innerBroadcast(msg *types.PbftMessage) error {

	doByzantine := false
	if instance.byzantine {
		rand1 := rand.New(rand.NewSource(time.Now().UnixNano()))
		doIt := rand1.Intn(3) // go byzantine about 1/3 of the time
		if doIt == 1 {
			doByzantine = true
		}
	}

	// testing byzantine fault.
	if doByzantine {
		rand2 := rand.New(rand.NewSource(time.Now().UnixNano()))
		ignoreidx := rand2.Intn(instance.N)
		for i := 0; i < instance.N; i++ {
			if i != ignoreidx && uint64(i) != instance.id { //Pick a random replica and do not send message
				continue
			} else {
				logger.Debugf("PBFT byzantine: not broadcasting to replica %v", i)
			}
		}
	} else {
		instance.commChan <- msg
	}
	return nil
}

调用

innerBroadcast()的调用主要在viewchange.go中,将之间的channel机制改装到innnerBroadcast()中:

  • sendViewChange()
func (instance *pbftCore) sendViewChange() events.Event {
        ...
	///instance.innerBroadcast(&Message{Payload: &Message_ViewChange{ViewChange: vc}})
	//=>instance.commChan <- &types.PbftMessage{Payload: &types.PbftMessage_ViewChange{ViewChange: vc}} --Agzs
	instance.innerBroadcast(&types.PbftMessage{Payload: &types.PbftMessage_ViewChange{ViewChange: vc}}) //=> --Agzs

	instance.vcResendTimer.Reset(instance.vcResendTimeout, viewChangeResendTimerEvent{})

	return instance.recvViewChange(vc)
}
  • sendNewView()
func (instance *pbftCore) sendNewView() events.Event {
        ...
	instance.innerBroadcast(&types.PbftMessage{Payload: &types.PbftMessage_NewView{NewView: nv}}) //=> --Agzs
	instance.newViewStore[instance.view] = nv
	return instance.processNewView()
}
  • processNewView2()
func (instance *pbftCore) processNewView2(nv *types.NewView) events.Event {
	...
	if instance.primary(instance.view) != instance.id {
		for n, d := range nv.Xset {
			prep := &types.Prepare{ //=> add types --Agzs
				View:           instance.view,
				SequenceNumber: n,
				BlockHash:      instance.blockStore[d].Hash().Bytes(), //=> BatchDigest -> BlockHash    --Agzs
				ReplicaId:      instance.id,
			}
			if n > instance.h {
				cert := instance.getCert(instance.view, n)
				cert.sentPrepare = true
				instance.recvPrepare(prep)
			}
			instance.innerBroadcast(&types.PbftMessage{Payload: &types.PbftMessage_Prepare{Prepare: prep}}) //=> --Agzs
		}
	} else {
		logger.Debugf("Replica %d is now primary, attempting to resubmit requests", instance.id)
		instance.resubmitRequestBatches()
	}

	...
	return viewChangedEvent{}
}