區塊鏈開發教程 兄弟連區塊鏈教程以太坊源碼分析chain-indexer區塊鏈索引一

chain_indexer 區塊鏈索引

chain_indexer.go 源碼解析

chain_indexer 顧名思義, 就是用來給區塊鏈創建索引的功能。 之前在eth協議的時候,介紹過BloomIndexer的功能,其實BloomIndexer是chain_indexer的一個特殊的實現, 可以理解為派生類, 主要的功能其實實在chain_indexer這里面實現的。雖說是派生類,但是chain_indexer其實就只被BloomIndexer使用。也就是給區塊鏈的布隆過濾器創建了索引,以便快速的響應用戶的日志搜索功能。 下面就來分析這部分的代碼。

### 數據結構

// ChainIndexerBackend defines the methods needed to process chain segments in
// the background and write the segment results into the database. These can be
// used to create filter blooms or CHTs.
// ChainIndexerBackend定義了處理區塊鏈片段的方法,并把處理結果寫入數據庫。 這些可以用來創建布隆過濾器或者CHTs.
// BloomIndexer 其實就是實現了這個接口 ChainIndexerBackend 這里的CHTs不知道是什么東西。
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
// Reset 方法用來初始化一個新的區塊鏈片段,可能會終止任何沒有完成的操作。
Reset(section uint64)

// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
// 對區塊鏈片段中的下一個區塊頭進行處理。 調用者將確保區塊頭的連續順序。
Process(header *types.Header)

// Commit finalizes the section metadata and stores it into the database.
Commit() error

// ChainIndexer does a post-processing job for equally sized sections of the
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
// connected to the blockchain through the event system by starting a
// ChainEventLoop in a goroutine.
// ChainIndexer 對區塊鏈進行 大小相等的片段 進行處。 ChainIndexer在ChainEventLoop方法中通過事件系統與區塊鏈通信,
// Further child ChainIndexers can be added which use the output of the parent
// section indexer. These child indexers receive new head notifications only
// after an entire section has been finished or in case of rollbacks that might
// affect already finished sections.
//更遠可以添加使用父section索引器的輸出的更多子鏈式索引器。 這些子鏈式索引器只有在整個部分完成后或在可能影響已完成部分的回滾的情況下才接收新的頭部通知。

type ChainIndexer struct {
chainDb ethdb.Database // Chain database to index the data from 區塊鏈所在的數據庫
indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into 索引存儲的數據庫
backend ChainIndexerBackend // Background processor generating the index data content 索引生成的后端。
children []*ChainIndexer // Child indexers to cascade chain updates to??子索引

active uint32 // Flag whether the event loop was started
update chan struct{} // Notification channel that headers should be processed 接收到的headers
quit chan chan error // Quit channel to tear down running goroutines

sectionSize uint64 // Number of blocks in a single chain segment to process?section的大小。 默認是4096個區塊為一個section
confirmsReq uint64 // Number of confirmations before processing a completed segment 處理完成的段之前的確認次數

storedSections uint64 // Number of sections successfully indexed into the database 成功索引到數據庫的部分數量
knownSections uint64 // Number of sections known to be complete (block wise) 已知完成的部分數量
cascadedHead uint64 // Block number of the last completed section cascaded to subindexers 級聯到子索引的最后一個完成部分的塊號

throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources 磁盤限制,以防止大量資源的大量升級

log log.Logger
lock sync.RWMutex


const (
// bloomConfirms is the number of confirmation blocks before a bloom section is
// considered probably final and its rotated bits are calculated.
// bloomConfirms 用來表示確認區塊數量, 表示經過這么多區塊之后, bloom section被認為是已經不會更改了。
bloomConfirms = 256

// bloomThrottling is the time to wait between processing two consecutive index
// sections. It\’s useful during chain upgrades to prevent disk overload.
// bloomThrottling是處理兩個連續索引段之間的等待時間。 在區塊鏈升級過程中防止磁盤過載是很有用的。
bloomThrottling = 100 * time.Millisecond

func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer {
backend := &BloomIndexer{
db: db,
size: size,
// 可以看到indexDb和chainDb實際是同一個數據庫, 但是indexDb的每個key前面附加了一個BloomBitsIndexPrefix的前綴。
table := ethdb.NewTable(db, string(core.BloomBitsIndexPrefix))

return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, \”bloombits\”)

// NewChainIndexer creates a new chain indexer to do background processing on
// chain segments of a given size after certain number of confirmations passed.
// The throttling parameter might be used to prevent database thrashing.

func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer {
c := &ChainIndexer{
chainDb: chainDb,
indexDb: indexDb,
backend: backend,
update: make(chan struct{}, 1),
quit: make(chan chan error),
sectionSize: section,
confirmsReq: confirm,
throttling: throttling,
log: log.New(\”type\”, kind),
// Initialize database dependent fields and start the updater
go c.updateLoop()

return c

loadValidSections,用來從數據庫里面加載我們之前的處理信息, storedSections表示我們已經處理到哪里了。

// loadValidSections reads the number of valid sections from the index database
// and caches is into the local state.
func (c *ChainIndexer) loadValidSections() {
data, _ := c.indexDb.Get([]byte(\”count\”))
if len(data) == 8 {
c.storedSections = binary.BigEndian.Uint64(data[:])

updateLoop,是主要的事件循環,用于調用backend來處理區塊鏈section,這個需要注意的是,所有的主索引節點和所有的 child indexer 都會啟動這個goroutine 方法。

func (c *ChainIndexer) updateLoop() {
var (
updating bool
updated time.Time
for {
select {
case errc := <-c.quit:
// Chain indexer terminating, report no failure and abort
errc <- nil

case c.storedSections { // 如果當前以知的Section 大于已經存儲的Section
// Periodically print an upgrade log message to the user
// 每隔8秒打印一次日志信息。
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
updating = true
c.log.Info(\”Upgrading chain index\”, \”percentage\”, c.storedSections*100/c.knownSections)
updated = time.Now()
// Cache the current section count and head to allow unlocking the mutex
section := c.storedSections
var oldHead common.Hash
if section > 0 { // section – 1 代表section的下標是從0開始的。
// sectionHead用來獲取section的最后一個區塊的hash值。
oldHead = c.sectionHead(section – 1)
// Process the newly defined section in the background
// 處理 返回新的section的最后一個區塊的hash值
newHead, err := c.processSection(section, oldHead)
if err != nil {
c.log.Error(\”Section processing failed\”, \”error\”, err)

// If processing succeeded and no reorgs occcurred, mark the section completed
if err == nil && oldHead == c.sectionHead(section-1) {
c.setSectionHead(section, newHead) // 更新數據庫的狀態
c.setValidSections(section + 1) // 更新數據庫狀態
if c.storedSections == c.knownSections && updating {
updating = false
c.log.Info(\”Finished upgrading chain index\”)
// cascadedHead 是更新后的section的最后一個區塊的高度
// 用法是什么 ?
c.cascadedHead = c.storedSections*c.sectionSize – 1
for _, child := range c.children {
c.log.Trace(\”Cascading chain index update\”, \”head\”, c.cascadedHead)
child.newHead(c.cascadedHead, false)
} else { //如果處理失敗,那么在有新的通知之前不會重試。
// If processing failed, don\’t retry until further notification
c.log.Debug(\”Chain index processing failed\”, \”section\”, section, \”err\”, err)
c.knownSections = c.storedSections
// If there are still further sections to process, reschedule
// 如果還有section等待處理,那么等待throttling時間再處理。避免磁盤過載。
if c.knownSections > c.storedSections {
time.AfterFunc(c.throttling, func() {
select {
case c.update <- struct{}{}:


評論 1

  • 昵稱 (必填)
  • 郵箱 (必填)
  • 網址
  1. #1


    阿飛8個月前 (10-22)回復