Lotus同构集群挖矿 - okblockchainlab/lotus GitHub Wiki

一、同构集群的可行性分析

根据我们对lotus代码的分析,我们发现,miner旷工的代言人其实是位于链上的actor合约,每个旷工都有一个actor合约来代表其在链上的利益,包括抵押、出块奖励和存储奖励。actor的state状态就代表了miner的在链上的所有状态,该状态的更改都是靠miner发往message pool的msg来驱动更新的。因此,如果多个miner可以共用一个actor合约的话,那就意味着这几个miner组成了一个同构挖矿集群。

从行业内横向来看,filecoin的挖矿集群的形态主流形式有如下几种:单miner节点挖矿,miner + work异构集群挖矿。同构集群挖矿比较少见,国内主要有瑶池区块链团队在做同构集群挖矿。那就意味着,同构集群挖矿也是可行的,只是需要对lotus源码做一定的修改。经过一周多的调研,我们发现这些修改其实不是很多,可以以较低的代价去做到同构集群挖矿。

二、同构集群修改方案

1、Miner Init的修改

同构集群方案,多个miner共享一个actor,也就是说,在创建miner时,不能更改actor合约状态的信息。Miner actor状态的数据结构如下:

type State struct {
   // Information not related to sectors.
   // TODO: this should be a cid of the miner Info struct so it's not re-written when other fields change.
   // https://github.com/filecoin-project/specs-actors/issues/422
   Info MinerInfo

   PreCommitDeposits abi.TokenAmount // Total funds locked as PreCommitDeposits
   LockedFunds       abi.TokenAmount // Total unvested funds locked as pledge collateral
   VestingFunds      cid.Cid         // Array, AMT[ChainEpoch]TokenAmount

   PreCommittedSectors cid.Cid // Map, HAMT[SectorNumber]SectorPreCommitOnChainInfo

   Sectors cid.Cid // Array, AMT[SectorNumber]SectorOnChainInfo (sparse)

   ProvingPeriodStart abi.ChainEpoch

   NewSectors *abi.BitField

   SectorExpirations cid.Cid // Array, AMT[ChainEpoch]Bitfield

   Deadlines cid.Cid  // deadlines具体信息存在数据库中,这里只存了其在数据库的句柄

   Faults *abi.BitField

   FaultEpochs cid.Cid // AMT[ChainEpoch]Bitfield

   Recoveries *abi.BitField

   PostSubmissions *abi.BitField

   NextDeadlineToProcessFaults uint64
}

type MinerInfo struct {
	// Account that owns this miner.
	// - Income and returned collateral are paid to this address.
	// - This address is also allowed to change the worker address for the miner.
	Owner addr.Address // Must be an ID-address.

	// Worker account for this miner.
	// The associated pubkey-type address is used to sign blocks and messages on behalf of this miner.
	Worker addr.Address // Must be an ID-address.

	PendingWorkerKey *WorkerKeyChange

	// Byte array representing a Libp2p identity that should be used when connecting to this miner.
	PeerId abi.PeerID

	// Slice of byte arrays representing Libp2p multi-addresses used for establishing a connection with this miner.
	Multiaddrs []abi.Multiaddrs

	// The proof type used by this miner for sealing sectors.
	SealProofType abi.RegisteredSealProof

	// Amount of space in each sector committed by this miner.
	// This is computed from the proof type and represented here redundantly.
	SectorSize abi.SectorSize

	// The number of sectors in each Window PoSt partition (proof).
	// This is computed from the proof type and represented here redundantly.
	WindowPoStPartitionSectors uint64
}

// deadlines的存储
func (st *State) SaveDeadlines(store adt.Store, deadlines *Deadlines) error {
	c, err := store.Put(store.Context(), deadlines)
	if err != nil {
		return err
	}
	st.Deadlines = c
	return nil
}

从上面的信息可以看到,一个miner actor的可识别信息主要包括:Owner、Worker和PeerId,其中Owner是和"钱"相关的,包括抵押、奖励都是给这个地址或者来自这个地址,这个地址还可以用来更换Worker地址。Worker代表这个actor打工的人,主要用来代表owner签发区块。PeerID则是更底层的概念,用来在p2p层代表某个机器。

为了构造同构集群,一起组团挖矿,必须确保共用一个actor,因此在初始化init时,除了第一个miner(主 miner)需要在创建时给集群创建actor外,其它miner都不能再创建actor。因此,需要修改下面这个函数:

func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode, r repo.Repo, ssize abi.SectorSize, gasPrice types.BigInt) error {
   lr, err := r.Lock(repo.StorageMiner)
   if err != nil {
      return err
   }
   defer lr.Close() //nolint:errcheck

   log.Info("Initializing libp2p identity")

   p2pSk, err := makeHostKey(lr)
   if err != nil {
      return xerrors.Errorf("make host key: %w", err)
   }

   peerid, err := peer.IDFromPrivateKey(p2pSk)
   if err != nil {
      return xerrors.Errorf("peer ID from private key: %w", err)
   }

   mds, err := lr.Datastore("/metadata")
   if err != nil {
      return err
   }

   var addr address.Address
   if act := cctx.String("actor"); act != "" {
      a, err := address.NewFromString(act)
      if err != nil {
         return xerrors.Errorf("failed parsing actor flag value (%q): %w", act, err)
      }

      // 创世miner,非主miner应该也是不需要去执行
      if cctx.Bool("genesis-miner") {
         if err := mds.Put(datastore.NewKey("miner-address"), a.Bytes()); err != nil {
            return err
         }

         spt, err := ffiwrapper.SealProofTypeFromSectorSize(ssize)
         if err != nil {
            return err
         }

         mid, err := address.IDFromAddress(a)
         if err != nil {
            return xerrors.Errorf("getting id address: %w", err)
         }

         sa, err := modules.StorageAuth(ctx, api)
         if err != nil {
            return err
         }

         smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), &ffiwrapper.Config{
            SealProofType: spt,
         }, sectorstorage.SealerConfig{true, true, true, true}, nil, sa)
         if err != nil {
            return err
         }
         epp, err := storage.NewWinningPoStProver(api, smgr, ffiwrapper.ProofVerifier, dtypes.MinerID(mid))
         if err != nil {
            return err
         }

         m := miner.NewMiner(api, epp, a)
         {
            if err := m.Start(ctx); err != nil {
               return xerrors.Errorf("failed to start up genesis miner: %w", err)
            }

            cerr := configureStorageMiner(ctx, api, a, peerid, gasPrice)

            if err := m.Stop(ctx); err != nil {
               log.Error("failed to shut down storage miner: ", err)
            }

            if cerr != nil {
               return xerrors.Errorf("failed to configure storage miner: %w", cerr)
            }
         }

         if pssb := cctx.String("pre-sealed-metadata"); pssb != "" {
            pssb, err := homedir.Expand(pssb)
            if err != nil {
               return err
            }

            log.Infof("Importing pre-sealed sector metadata for %s", a)

            if err := migratePreSealMeta(ctx, api, pssb, a, mds); err != nil {
               return xerrors.Errorf("migrating presealed sector metadata: %w", err)
            }
         }

         return nil
      }

      if pssb := cctx.String("pre-sealed-metadata"); pssb != "" {
         pssb, err := homedir.Expand(pssb)
         if err != nil {
            return err
         }

         log.Infof("Importing pre-sealed sector metadata for %s", a)

         if err := migratePreSealMeta(ctx, api, pssb, a, mds); err != nil {
            return xerrors.Errorf("migrating presealed sector metadata: %w", err)
         }
      }

      // 更新actor的peerID,理论上应该也是不需要执行
      if err := configureStorageMiner(ctx, api, a, peerid, gasPrice); err != nil {
         return xerrors.Errorf("failed to configure storage miner: %w", err)
      }

      addr = a
   } else {
      // 下面的代码不能执行,因此在init时,对非主miner,需要指定”actor字段“
      // 创建新的actor
      a, err := createStorageMiner(ctx, api, peerid, gasPrice, cctx)
      if err != nil {
         return xerrors.Errorf("creating miner failed: %w", err)
      }

      addr = a
   }

   log.Infof("Created new storage miner: %s", addr)
   if err := mds.Put(datastore.NewKey("miner-address"), addr.Bytes()); err != nil {
      return err
   }

   return nil
}

上面的方案,需要经过测试去验证,不排除还有地方需要修改。

2、PoRep的修改

PoRep算法主要是在做新生成sector的编码与零知识证明,sector在哪个机器上就归哪个机器进行PoRep证明,结果直接提交到message pool中,由actor合约中的定时方法进行处理。因此,PoRep模块不用修改。

3、WiningPost的修改

同构集群情况下, 集群中的所有miner都可以参加出块,而且这种出块是一种良性的竞争,集群内的miner越多,其出块的可能性也越大(虽然lotus网络出块与miner的算力有关,但是当算力超出一定值,同构集群节点越多,就越可以趋近100%出块)。

当前情况下,同构集群中的多个miner都出块的话,如果多个miner在同一个epoch都出块的话,可能会被网络判为该集群作恶,需要做一定的改进,改进的地方如下:

func (m *Miner) mine(ctx context.Context) {
   ctx, span := trace.StartSpan(ctx, "/mine")
   defer span.End()

   var lastBase MiningBase

   for {
     ...
     b, err := m.mineOne(ctx, base)
		 ...
     
		 if b != nil {
       // ==================== 修改地方 ===================
       // TODO: should do better 'anti slash' protection here
			 blkKey := fmt.Sprintf("%d", b.Header.Height)
       // 将blkKey高度出块的信息广播到同构集群中其它节点
       
			 if _, ok := m.minedBlockHeights.Get(blkKey); ok {
				 log.Warnw("Created a block at the same height as another block we've created", "height", b.Header.Height, "miner", b.Header.Miner, "parents", b.Header.Parents)
				 continue
			 }
			 m.minedBlockHeights.Add(blkKey, true)
       // ==================== 修改地方 ===================
       
		   btime := time.Unix(int64(b.Header.Timestamp), 0)
			 if time.Now().Before(btime) {
				 if !m.niceSleep(time.Until(btime)) {
					 log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out")
					 time.Sleep(time.Until(btime))
				 }
			 } else {
				 log.Warnw("mined block in the past", "block-time", btime,
				 	 "time", time.Now(), "duration", time.Since(btime))
			 }

       /*
       注释下面代码
			 // TODO: should do better 'anti slash' protection here
			 blkKey := fmt.Sprintf("%d", b.Header.Height)
			 if _, ok := m.minedBlockHeights.Get(blkKey); ok {
				 log.Warnw("Created a block at the same height as another block we've created", "height", b.Header.Height, "miner", b.Header.Miner, "parents", b.Header.Parents)
				 continue
			 }

			 m.minedBlockHeights.Add(blkKey, true)
			 */
       
			 if err := m.api.SyncSubmitBlock(ctx, b); err != nil {
			 	 log.Errorf("failed to submit newly mined block: %s", err)
			 }
		 }
     ...
   }
}

其它地方不需要做修改。

可能会担心,在确认有出块资格,要做winingPost时空证明的时候,如果被选中的sector不在自己节点,在别的节点会怎么办。经过代码确认,如果在本机找不到这个被选中的sector,那本机的本次winingPost就失败了,本机坐等下一个出块epoch。这里就会出现一个问题,如果同构集群中所有节点在做winingPost时,都发现选中的sector不在自己机器上,那就意味着整个集群错过了一次出块的机会,少了不少奖励。这个问题暂时没有啥快速的解决方案,最好的解决方案就是同构集群中各个miner间相互备份sector,尽量减少这种错失出块机会。

4、WindowPost的修改

windowPost在做时空证明时,也存在一个问题:同一个deadline中的sectors是来自集群中所有miner节点的,当前的windowPost做时空证明时,是需要对deadline中的所有sectors都进行一次Post计算,所有的计算结果proof都需要提交到链上,缺少任何一个secot的proof都将会被惩罚,并且还将导致算力减少,从而进一步降低本同构集群爆块的几率。为了解决这个问题,必须要挑选出一个代表来收集同构集群中的所有节点的post计算结果,然后再代表整个集群将结果提交到链上去。

修改方案如下:

func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.DeadlineInfo, ts *types.TipSet) {
   ctx, abort := context.WithCancel(ctx)

   s.abort = abort
   s.activeDeadline = deadline

   go func() {
      defer abort()

      ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.doPost")
      defer span.End()

      proof, err := s.runPost(ctx, *deadline, ts)
      if err != nil {
        log.Errorf("runPost failed: %+v", err)
        return
      }
     
      // 将proof广播出去
   }()
}

func collectProof() {
  // 监听别的节点的proof结果
  // 如果收到的结果确实是本deadline的proof,则 proofs = append(proofs, proof)
  // 校验是否收集完成,如果是则submitPost(ctx, proof)
}

这个方案最好是选择一个节点来完成这个事,比方说同构集群中的主节点(第一个创建该同构集群的miner)。

5、Storage deal的修改

学习了一遍storage deal的相关代码,没有找到有需要修改的地方,因为storage deal时,用的是actor的地址:

// NewProvider returns a new storage provider
func NewProvider(net network.StorageMarketNetwork,
   ds datastore.Batching,
   bs blockstore.Blockstore,
   fs filestore.FileStore,
   pieceStore piecestore.PieceStore,
   dataTransfer datatransfer.Manager,
   spn storagemarket.StorageProviderNode,
   minerAddress address.Address,
   rt abi.RegisteredSealProof,
   storedAsk StoredAsk,
   options ...StorageProviderOption,
) (storagemarket.StorageProvider, error) {
   carIO := cario.NewCarIO()
   pio := pieceio.NewPieceIOWithStore(carIO, fs, bs)

   h := &Provider{
      net:                  net,
      proofType:            rt,
      spn:                  spn,
      fs:                   fs,
      pio:                  pio,
      pieceStore:           pieceStore,
      conns:                connmanager.NewConnManager(),
      storedAsk:            storedAsk,
      actor:                minerAddress,
      dataTransfer:         dataTransfer,
      dealAcceptanceBuffer: DefaultDealAcceptanceBuffer,
      pubSub:               pubsub.New(providerDispatcher),
   }
   ...
}

storage deal的数据传输是用的ProviderDataTransfer类,而这个类在做p2p传输时又主要用到了StagingGraphsync类,这个类保存了p2p的一些peer节点的ip信息,初步判断数据传输是直接走的tcp/ip等常规通信,没有用到actor的具体信息,这里设置的actor地址也主要是用来做扣费使用的,因此,该模块基本不用怎么修改即可使用。

6、Retrieve deal的修改

同Storage deal,retrieve模块当前来看也不用做啥修改:

// NewProvider returns a new retrieval Provider
func NewProvider(minerAddress address.Address, node retrievalmarket.RetrievalProviderNode,
   network rmnet.RetrievalMarketNetwork, pieceStore piecestore.PieceStore,
   bs blockstore.Blockstore, ds datastore.Batching, opts ...RetrievalProviderOption,
) (retrievalmarket.RetrievalProvider, error) {

   p := &Provider{
      bs:                      bs,
      node:                    node,
      network:                 network,
      minerAddress:            minerAddress,
      pieceStore:              pieceStore,
      pricePerByte:            DefaultPricePerByte, // TODO: allow setting
      paymentInterval:         DefaultPaymentInterval,
      paymentIntervalIncrease: DefaultPaymentIntervalIncrease,
      dealStreams:             make(map[retrievalmarket.ProviderDealIdentifier]rmnet.RetrievalDealStream),
      blockReaders:            make(map[retrievalmarket.ProviderDealIdentifier]blockio.BlockReader),
   }
   ...
}

这里的minerAddress其实也是actor的地址,主要也是用来扣费的。因此,该模块也不用怎么修改。