區塊鏈
挖礦,比特幣,EOS,以太坊

區塊鏈開發教程 區塊鏈教程以太坊源碼分析chain-indexer區塊鏈索引二

兄弟連區塊鏈教程以太坊源碼分析chain-indexer區塊鏈索引二。
Start方法。 這個方法在eth協議啟動的時候被調用,這個方法接收兩個參數,一個是當前的區塊頭,一個是事件訂閱器,通過這個訂閱器可以獲取區塊鏈的改變信息。

eth.bloomIndexer.Start(eth.blockchain.CurrentHeader(), eth.blockchain.SubscribeChainEvent)

// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.

// 子鏈不需要被啟動。 以為他們的父節點會通知他們。
func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
go c.eventLoop(currentHeader, chainEventer)
}

// eventLoop is a secondary – optional – event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.

// eventLoop 循環只會在最外面的索引節點被調用。 所有的Child indexer不會被啟動這個方法。

func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)

events := make(chan ChainEvent, 10)
sub := chainEventer(events)
defer sub.Unsubscribe()

// Fire the initial new head event to start any outstanding processing
// 設置我們的其實的區塊高度,用來觸發之前未完成的操作。
c.newHead(currentHeader.Number.Uint64(), false)

var (
prevHeader = currentHeader
prevHash = currentHeader.Hash()
)
for {
select {
case errc := <-c.quit:
// Chain indexer terminating, report no failure and abort
errc <- nil
return

case ev, ok := <-events:
// Received a new event, ensure it\’s not nil (closing) and update
if !ok {
errc := <-c.quit
errc <- nil
return
}
header := ev.Block.Header()
if header.ParentHash != prevHash { //如果出現了分叉,那么我們首先
//找到公共祖先, 從公共祖先之后的索引需要重建。
c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true)
}
// 設置新的head
c.newHead(header.Number.Uint64(), false)

prevHeader, prevHash = header, header.Hash()
}
}
}

newHead方法,通知indexer新的區塊鏈頭,或者是需要重建索引,newHead方法會觸發

// newHead notifies the indexer about new chain heads and/or reorgs.
func (c *ChainIndexer) newHead(head uint64, reorg bool) {
c.lock.Lock()
defer c.lock.Unlock()

// If a reorg happened, invalidate all sections until that point
if reorg { // 需要重建索引 從head開始的所有section都需要重建。
// Revert the known section number to the reorg point
changed := head / c.sectionSize
if changed < c.knownSections {
c.knownSections = changed
}
// Revert the stored sections from the database to the reorg point
// 將存儲的部分從數據庫恢復到索引重建點
if changed < c.storedSections {
c.setValidSections(changed)
}
// Update the new head number to te finalized section end and notify children
// 生成新的head 并通知所有的子索引
head = changed * c.sectionSize

if head = c.confirmsReq {
sections = (head + 1 – c.confirmsReq) / c.sectionSize
if sections > c.knownSections {
c.knownSections = sections

select {
case c.update sections {
c.storedSections–
c.removeSectionHead(c.storedSections)
}
c.storedSections = sections // needed if new > old
}

processSection

// processSection processes an entire section by calling backend functions while
// ensuring the continuity of the passed headers. Since the chain mutex is not
// held while processing, the continuity can be broken by a long reorg, in which
// case the function returns with an error.

//processSection通過調用后端函數來處理整個部分,同時確保傳遞的頭文件的連續性。 由于鏈接互斥鎖在處理過程中沒有保持,連續性可能會被重新打斷,在這種情況下,函數返回一個錯誤。
func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) {
c.log.Trace(\”Processing new chain section\”, \”section\”, section)

// Reset and partial processing
c.backend.Reset(section)

for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
hash := GetCanonicalHash(c.chainDb, number)
if hash == (common.Hash{}) {
return common.Hash{}, fmt.Errorf(\”canonical block #%d unknown\”, number)
}
header := GetHeader(c.chainDb, hash, number)
if header == nil {
return common.Hash{}, fmt.Errorf(\”block #%d [%x…] not found\”, number, hash[:4])
} else if header.ParentHash != lastHead {
return common.Hash{}, fmt.Errorf(\”chain reorged during section processing\”)
}
c.backend.Process(header)
lastHead = header.Hash()
}
if err := c.backend.Commit(); err != nil {
c.log.Error(\”Section commit failed\”, \”error\”, err)
return common.Hash{}, err
}
return lastHead, nil
}

贊(0)

評論 搶沙發

  • 昵稱 (必填)
  • 郵箱 (必填)
  • 網址
p3试机号99