makeFullNode - t10471/geth-memo GitHub Wiki

makeFullNode

makeFullNode > makeConfigNode

func makeFullNode(ctx *cli.Context) *node.Node {
	stack, cfg := makeConfigNode(ctx)

	utils.RegisterEthService(stack, &cfg.Eth)

	if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
		utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
	}
	// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
	shhEnabled := enableWhisper(ctx)
	shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
	if shhEnabled || shhAutoEnabled {
		if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
			cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
		}
		if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
			cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
		}
		utils.RegisterShhService(stack, &cfg.Shh)
	}

	// Add the Ethereum Stats daemon if requested.
	if cfg.Ethstats.URL != "" {
		utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
	}
	return stack
}
  • configを作成
func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
	...
	cfg := gethConfig{
		Eth:       eth.DefaultConfig,
		Shh:       whisper.DefaultConfig,
		Node:      defaultNodeConfig(),
		Dashboard: dashboard.DefaultConfig,
	}
	...
}
type gethConfig struct {
	Eth       eth.Config
	Shh       whisper.Config
	Node      node.Config
	Ethstats  ethstatsConfig
	Dashboard dashboard.Config
}
func defaultNodeConfig() node.Config {
	cfg := node.DefaultConfig
	cfg.Name = clientIdentifier
	cfg.Version = params.VersionWithCommit(gitCommit)
	cfg.HTTPModules = append(cfg.HTTPModules, "eth", "shh")
	cfg.WSModules = append(cfg.WSModules, "eth", "shh")
	cfg.IPCPath = "geth.ipc"
	return cfg
}

var DefaultConfig = Config{
	DataDir:          DefaultDataDir(),
	HTTPPort:         DefaultHTTPPort,
	HTTPModules:      []string{"net", "web3"},
	HTTPVirtualHosts: []string{"localhost"},
	WSPort:           DefaultWSPort,
	WSModules:        []string{"net", "web3"},
	P2P: p2p.Config{
		ListenAddr: ":30303",
		MaxPeers:   25,
		NAT:        nat.Any(),
	},
}

func Any() Interface {
	return startautodisc("UPnP or NAT-PMP", func() Interface {
		found := make(chan Interface, 2)
		go func() { found <- discoverUPnP() }()
		go func() { found <- discoverPMP() }()
		for i := 0; i < cap(found); i++ {
			if c := <-found; c != nil {
				return c
			}
		}
		return nil
	})
}

UPnPとは

機器を通信ネットワークに接続すると、複雑な設定作業などを行わなくても即座に他の機器と通信したり、その機能を利用できるようにする通信規約(プロトコル)。
ネットワークに参加する機器同士はHTTPを使って情報を交換する。さらに話す内容(やりとりする情報)はXMLによって定義される。ある

Nat-PMPとは

NAT-PMP は、NAT デバイスと LAN 側ホストとの間でアドレス/ポートマッピングリクエストのやりとりを行うためのプロトコル。RFC 6886 では次のように記載されている。
「NAT ゲートウェイは、WAN 側 IP アドレス宛てに送られてきたマッピングリクエストや、ゲートウェイの WAN 側ネットワークインターフェースから受信したマッピングリクエストを受け入れてはならない。」
また、作成されるマッピングにおける LAN 側アドレスには、受信したマッピングリクエストパケットのソースアドレスを使わなければ「ならない」とされています。

makeFullNode > makeConfigNode > SetNodeConfig

func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
	SetP2PConfig(ctx, &cfg.P2P)
	setIPC(ctx, cfg)
	setHTTP(ctx, cfg)
	setWS(ctx, cfg)
	setNodeUserIdent(ctx, cfg)
	...
	// cfgの設定が続く
}

makeFullNode > makeConfigNode > SetNodeConfig > SetP2PConfig

func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
	// PrivateKeyの設定
	setNodeKey(ctx, cfg)
	setNAT(ctx, cfg)
	setListenAddress(ctx, cfg)
	// Ethereum Discovery Protocol用のbootstrapノードの設定
	setBootstrapNodes(ctx, cfg)
	setBootstrapNodesV5(ctx, cfg)
	...
	// light mode (client or server) の判定
	// DiscoveryV5を使うか判定
	// developer mode (p2pしない) 判定
}

makeFullNode > makeConfigNode > SetNodeConfig > SetP2PConfig > setIPC

  • geth console等で使用するIPCPathを設定する

makeFullNode > makeConfigNode > node.New

  • AccountManagerを作る
// New creates a new P2P node, ready for protocol registration.
func New(conf *Config) (*Node, error) {
	...
	// AccountManagerを作成する
	// amはaccountManager  
	// ephemeralKeystoreはkeystoreのディレクトリパス
	am, ephemeralKeystore, err := makeAccountManager(conf)
	...
	return &Node{
		accman:            am,
		ephemeralKeystore: ephemeralKeystore,
		config:            conf,
		serviceFuncs:      []ServiceConstructor{},
		ipcEndpoint:       conf.IPCEndpoint(),
		httpEndpoint:      conf.HTTPEndpoint(),
		wsEndpoint:        conf.WSEndpoint(),
		eventmux:          new(event.TypeMux),
		log:               conf.Logger,
	}, nil
}

makeFullNode > makeConfigNode > node.New > makeAccountManager

func makeAccountManager(conf *Config) (*accounts.Manager, string, error) {
	// 設定の呼び出し
	scryptN, scryptP, keydir, err := conf.AccountConfig()
	var ephemeral string
	...
	// BackendとKeystoreを作成
	backends := []accounts.Backend{
		keystore.NewKeyStore(keydir, scryptN, scryptP),
	}
	...
	return accounts.NewManager(backends...), ephemeral, nil
}

// BackendとKeyStoreの型
type Backend interface {
	Wallets() []Wallet
	Subscribe(sink chan<- WalletEvent) event.Subscription
}
type KeyStore struct {
	storage  keyStore                     // keyStorePassphrase(現在はこれだけっぽい) keyの保存場所と保存方法の定義をもっている
	cache    *accountCache                // Accountのメモリ上に持っているもの
	changes  chan struct{}                // accountCacheの変更を検知するチャンネル
	unlocked map[common.Address]*unlocked // アンロックされているAccountのmap

	wallets     []accounts.Wallet       // ImpleはKeystoreWalletなど
	updateFeed  event.Feed              // walletの追加・削除を検知する Feedは1対多のサブスクリプションを提供する
	updateScope event.SubscriptionScope // サブスクリプションの購読を一度に購読解除する機能を提供
	updating    bool                    // notification loopが実行中かどうか

	mu sync.RWMutex
}
type accountCache struct {
	keydir   string
	watcher  *watcher
	mu       sync.Mutex
	all      accountsByURL
	byAddr   map[common.Address][]accounts.Account
	throttle *time.Timer
	notify   chan struct{}
	fileC    fileCache
}

type Wallet interface {
	...
}
// WalletのImplがkeystoreWallet
type keystoreWallet struct {
	account  accounts.Account
	keystore *KeyStore
}
type Account struct {
	Address common.Address `json:"address"` // keyに紐付いたEthereumのaccount address
	URL     URL            `json:"url"`     // オプション backendのリソースの場所を示すもの
}

makeFullNode > makeConfigNode > node.New > makeAccountManager > NewKeyStore

func NewKeyStore(keydir string, scryptN, scryptP int) *KeyStore {
	keydir, _ = filepath.Abs(keydir)
	ks := &KeyStore{storage: &keyStorePassphrase{keydir, scryptN, scryptP}}
	// accountCache,keystoreの生きているaccountの一覧,変更channelの登録
	// keystoreWalletをksに登録
	ks.init(keydir)
	return ks
}

makeFullNode > makeConfigNode > node.New > makeAccountManager > NewManager

  • BackendとKeyStoreを使ってAccountManagerを作成する
  • AccountManagerはWalletのイベント監視をしている
  • Subscription・Feed・channelの関係が複雑...
    • event自体はchannelで受け取る
    • そのeventをFeedでpublishする
      • FeedはSubscriptionを複数登録している
func NewManager(backends ...Backend) *Manager {
	// バックエンドからwalletを取得してURLでsortする
	// Backendは*keystore.KeyStoreと*usbwallet.Hub
	var wallets []Wallet
	...

	// バックエンドからのwallet notificationsをサブスクライブする
	updates := make(chan WalletEvent, 4*len(backends))
	subs := make([]event.Subscription, len(backends))
	for i, backend := range backends {
		// keystore walletの追加削除イベントをサブスクライブする
		subs[i] = backend.Subscribe(updates)
	}
	am := &Manager{
		backends: make(map[reflect.Type][]Backend),
		updaters: subs,
		updates:  updates,
		wallets:  wallets,
		quit:     make(chan chan error),
	}
	for _, backend := range backends {
		kind := reflect.TypeOf(backend)
		// keystore walletやusb wallet
		am.backends[kind] = append(am.backends[kind], backend)
	}
	// WalletEventを監視
	go am.update()
	return am
}

// updaters・updates・feedの関係がややこしい
// updaters(Subscription)はSubscriptionの集合を管理したいため
// updatesはWalletEventをうけとるため
// feedはsubscribeしてるものにpublishするため
type Manager struct {
	backends map[reflect.Type][]Backend // *keystore.KeyStoreと*usbwallet.Hub
	updaters []event.Subscription       // backend.Subscribe (まとめてUnsubscribeするための持ってる?)
	updates  chan WalletEvent           // WalletEvent (WalletArrived | WalletOpened | WalletDropped)
	wallets  []Wallet                   // KeyStoreの場合、keydirにある分

	feed event.Feed // walletのEventをpublish(一斉配信)する

	quit chan chan error
	lock sync.RWMutex
}

func (am *Manager) update() {
	defer func() {
		am.lock.Lock()
		for _, sub := range am.updaters {
			sub.Unsubscribe()
		}
		am.updaters = nil
		am.lock.Unlock()
	}()
	for {
		select {
		case event := <-am.updates:
			am.lock.Lock()
			switch event.Kind {
			case WalletArrived:
				am.wallets = merge(am.wallets, event.Wallet)
			case WalletDropped:
				am.wallets = drop(am.wallets, event.Wallet)
			}
			am.lock.Unlock()
			am.feed.Send(event)
		case errc := <-am.quit:
			errc <- nil
			return
		}
	}
}
  • BackendのSubscribeはKeyStoreの。
  • WalletEventをFeedにしてSubscribeしてSubscriptionにして返す  
  • 定期的にWalletを更新する

FeedSubscriptionを使って1対多の通知を実現している

// keystore walletの追加削除イベントをサブスクライブする
func (ks *KeyStore) Subscribe(sink chan<- accounts.WalletEvent) event.Subscription {
	...
	// WalletEventをFeedに登録しSubscriptionとして返ってきたのをSubscriptionScopeとしてさらに登録
	sub := ks.updateScope.Track(ks.updateFeed.Subscribe(sink))
	...
	return sub
}

func (ks *KeyStore) updater() {
	for {
		select {
		case <-ks.changes:
		case <-time.After(walletRefreshCycle):
		}
		// Walletの状態を調べて、walletsの状態を更新して
		// WalletDropped WalletArrivedのイベントを送信する(ks.updateFeed.Send(event))
		ks.refreshWallets()

		ks.mu.Lock()
		if ks.updateScope.Count() == 0 {
			ks.updating = false
			ks.mu.Unlock()
			return
		}
		ks.mu.Unlock()
	}
}
func (ks *KeyStore) refreshWallets() {
	...
	accs := ks.cache.accounts()
	wallets := make([]accounts.Wallet, 0, len(accs))
	events := []accounts.WalletEvent{}

	for _, account := range accs {
		// 古いwalletを消す(WalletDropped EVENTの追加)
		// 知らないwalletなら追加する(WalletArrived EVENTの追加)
		// 一致したら、ks.walletsをwalletsに追加
		...
	}
	// 残りをWalletDropped EVENTの追加にする 
	...
	// EVENTの発火
	for _, event := range events {
		ks.updateFeed.Send(event)
	}
}

makeFullNode > makeConfigNode > SetEthConfig

func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
	...
	// 1つ目はかならずKeyStore(他はUSB)
	ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)
	// マイニングの報酬を受け取るアドレスを設定する
	setEtherbase(ctx, ks, cfg)
	// GPO は Gas Price Oracle (Gas Priceを決める) gasprice.Configの設定をする
	setGPO(ctx, &cfg.GPO)
	setTxPool(ctx, &cfg.TxPool)
	setEthash(ctx, cfg)
	// flagによる設定のcfgに登録する
	// devフラグ指定時の設定はここにある
}

// eth/gasprice/gasprice.go
// Configという名前の構造体が多い...
type Config struct {
		Blocks     int
		Percentile int
		Default    *big.Int `toml:",omitempty"`
}

type TxPoolConfig struct {
	// ローカルのトランザクションはGasPriceがTxPool.gasPriceを無視する (locals  *accountSetにトランザクションを入れておく)
	NoLocals  bool          // ローカルでトランザクションを発行するか?(localで発行したものは自ブロック作成時に優先させたいため)
	Journal   string        // ローカルトランザクションを保存しておくパス
	Rejournal time.Duration // pool.journal.rotateを呼ぶ間隔

	PriceLimit uint64 // poolに入れるgasPriceの最小値
	PriceBump  uint64 // 新しいトランザクションを既にあるトランザクションよりも優先順位をあげるために必要なgasPriceの割合

	AccountSlots uint64 // アカウントごとの保存できるトランザクション最少数、これを越したpendingは削除される 
	GlobalSlots  uint64 // 全てのアカウントの保存できるトランザクションの最大数
	AccountQueue uint64 // アカウントごとの実行していないトランザクションのアカウントごとの最大サイズ、超えると削除さいれる
	GlobalQueue  uint64 //  全てのアカウントの実行していないトランザクションの保存サイズaccounts

	Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}
var DefaultTxPoolConfig = TxPoolConfig{
	Journal:   "transactions.rlp",
	Rejournal: time.Hour,

	PriceLimit: 1,
	PriceBump:  10,

	AccountSlots: 16,
	GlobalSlots:  4096,
	AccountQueue: 64,
	GlobalQueue:  1024,

	Lifetime: 3 * time.Hour,
}

// ethashの設定
type Config struct {
	CacheDir       string
	CachesInMem    int
	CachesOnDisk   int
	DatasetDir     string
	DatasetsInMem  int
	DatasetsOnDisk int
	PowMode        Mode
}

// Ethash Ethereum用のPowのアルゴリズム
type Ethash struct {
	config Config

	caches   *lru // マイニングと検証に使うデータ
	datasets *lru // マイニングに使うデータ chachesを元に生成される

	// Mining related fields
	rand     *rand.Rand    // noneを生成するための乱数オブジェクト
	threads  int           // マイニングするのに使うスレッド数
	update   chan struct{} // マイニングのパラメータの更新を受け取るチャンネル
	hashrate metrics.Meter // hashrateを監視するオブジェクト

	// The fields below are hooks for testing
	shared    *Ethash       // Shared PoW verifier to avoid cache regeneration
	fakeFail  uint64        // Block number which fails PoW check even in fake mode
	fakeDelay time.Duration // Time delay to sleep for before returning from verify

	lock sync.Mutex // キャッシュとマイニングの為のフィールドをスレッドセーフにするためのロック
}

type TxPool struct {
	config       TxPoolConfig
	chainconfig  *params.ChainConfig
	chain        blockChain
	gasPrice     *big.Int
	txFeed       event.Feed
	scope        event.SubscriptionScope
	chainHeadCh  chan ChainHeadEvent
	chainHeadSub event.Subscription
	signer       types.Signer
	mu           sync.RWMutex

	currentState  *state.StateDB      // ブロックチェーンのヘッダーの現在の状態
	pendingState  *state.ManagedState // 仮のnonceのペンディング状態
	currentMaxGas uint64              // 現在のトランザクションごとのgas limit

	locals  *accountSet // プールから削除されるルールの適用外のローカルトランザクションの集合
	journal *txJournal  // ディスクに保存するローカルトランザクションのジャーナル

	pending map[common.Address]*txList   // 現在処理可能なすべてのトランザクション
	queue   map[common.Address]*txList   // キューに積まれてるが処理不能なトランザクション
	beats   map[common.Address]time.Time // 各既知のアカウントからの最後の通知
	all     *txLookup                    // 検索可能な全てのトランザクション
	priced  *txPricedList                // 金額順に並んだトランザクション

	wg sync.WaitGroup // シャットダウンの為のsync

	homestead bool
}

makeFullNode > RegisterEthService

eth Serviceを設定する

func RegisterEthService(stack *node.Node, cfg *eth.Config) {
	var err error
	if cfg.SyncMode == downloader.LightSync {
		err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
			return les.New(ctx, cfg)
		})
	} else {
		err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
			fullNode, err := eth.New(ctx, cfg)
			if fullNode != nil && cfg.LightServ > 0 {
				ls, _ := les.NewLesServer(fullNode, cfg)
				fullNode.AddLesServer(ls)
			}
			return fullNode, err
		})
	}
	if err != nil {
		Fatalf("Failed to register the Ethereum service: %v", err)
	}
}

func (n *Node) Register(constructor ServiceConstructor) error {
	n.lock.Lock()
	defer n.lock.Unlock()

	if n.server != nil {
		return ErrNodeRunning
	}
	// serviceFuncs: []ServiceConstructor{}
	n.serviceFuncs = append(n.serviceFuncs, constructor)
	return nil
}

Serviceという抽象化した形で機能を登録する