區塊鏈
挖礦,比特幣,EOS,以太坊

比特幣開發教程,從零開始學習比特幣:P2P 網絡的建立之消息處理(上篇)

P2P 網絡的建立是在系統啟動的第 12 步,最后時刻調用?CConnman::Start?方法開始的。

本部分內容在?net.cpp、net_processing.cpp?等文件中。

前面幾個章節學完了?ThreadSocketHandler線程來處理套接字的讀取和發送ThreadDNSAddressSeed線程查詢DNS節點ThreadOpenAddedConnections線程生成地址對并連接到指定地址

終于,來到了我們非常非常關心的比特幣消息處理線程。通過比特幣消息處理,我們會理解比特幣的協義,理解比特幣是如何同步區塊,如何發送交易,從而建立起理解比特幣的至關重要一步。

本部分內容是如此的重要,也是相當的長,所以我們分上下兩部分來介紹具體的消息處理。

上篇主要以消息處理線程的分析為主,下篇以具體的比特幣消息即比特幣協義分析為主。

下面我們來看消息處理線程相關的代碼。

ThreadMessageHandler

處理所有消息的線程。主體是一個?while?循環,退出條件為?flagInterruptMsgProc?為假,循環體如下:

  1. 生成一個對等節點容器對象?vNodesCopy,類型為?CNode*。然后,設置其值為?vNodes?。后者,代表當前節點保存的所有對等節點信息。
  2. 遍歷所有的?vNodesCopy?節點,調用當前節點的?AddRef?方法,把對等節點的?nRefCount?屬性加1。
  3. 遍歷所有的?vNodesCopy?節點,進行如下處理:
    • 如果當前節點已斷開連接,則處理下一個。
    • 調用?PeerLogicValidation::ProcessMessages?方法,處理當前遠程對等節點發送給本對等節點的消息。
    • 調用?PeerLogicValidation::SendMessages?方法,處理本對等節點發送給當前遠程對等節點的消息。
  4. 遍歷所有的?vNodesCopy?節點,調用當前節點的?Release?方法,把對等節點的?nRefCount?屬性減1。

1、ProcessMessages

本方法主要處理對等節點接收到的相關消息,具體代碼在?net_processing.cpp?文件中。

  1. 如果對等節點的已接收請求數據集合不為空,也就是保存別的對等節點請求數據的集合不為空,則調用?ProcessGetData?方法,處理別的對等獲取數據的請求。
    if (!pfrom->vRecvGetData.empty()) ? ? ProcessGetData(pfrom, chainparams, connman, interruptMsgProc);

    vRecvGetData?屬性是一個 inv 消息(CInv)的雙端隊列。下面,我們來看?ProcessGetData?方法是怎樣處理收到的消息。

    • 從對等節點的?vRecvGetData?集合中,取得其迭代器。std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();std::vector<CInv> vNotFound;
    • 生成一個?CNetMsgMaker?類型的對象?msgMaker。其?nVersionIn?屬性是對等節點已經發送的版本消息。
    • 遍歷請求數據集合,如果請求數據的類型是交易或者見證隔離交易,進行如下的處理:如果已經終止處理消息信號為真,則直接返回。如果當前節點處于暫停狀態(緩沖區太慢而不能響應),則推出循環。取得當前的?inv?消息。從?mapRelay?集合中取得當前?inv?消息。這個?mapRelay?是一個 Map 集合,Key 是交易的哈希,Value 是指向交易的智能指針。

      如果查找的交易存在于集合中,則調用位于?net.cpp?文件中的?CConnman::PushMessage?方法,發送交易;否則,如果節點最后一次?MEMPOOL?請求存在,則從內存池中取得對應的交易信息,然后調用?CConnman::PushMessage?方法,發送交易。

      while (it != pfrom->vRecvGetData.end() && (it->type == MSG_TX || it->type == MSG_WITNESS_TX)) { ? if (interruptMsgProc) ? ? ? return; ? // Don't bother if send buffer is too full to respond anyway ? if (pfrom->fPauseSend) ? ? ? break; ? const CInv &inv = *it; ? it++; ? bool push = false; ? auto mi = mapRelay.find(inv.hash); ? int nSendFlags = (inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0); ? if (mi != mapRelay.end()) { ? ? ? connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *mi->second)); ? ? ? push = true; ? } else if (pfrom->timeLastMempoolReq) { ? ? ? auto txinfo = mempool.info(inv.hash); ? ? ? if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) { ? ? ? ? ? connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *txinfo.tx)); ? ? ? ? ? push = true; ? ? ? } ? } ? if (!push) { ? ? ? vNotFound.push_back(inv); ? } }
    • 如果沒有達到請求數據集合尾部,且節點對象不是暫停狀態,并且當前請求的?inv?消息類型是區塊、過濾型區塊、緊湊型區塊、隔離見證型區塊之一,則調用?ProcessGetBlockData?方法處理區塊數據。 遍歷收到的所有數據,如果數據類型是交易或者見證隔離交易,進行如下的處理:ProcessGetBlockData?方法,因為處理過程比較長,所以其處理過程放在下面進行詳細說明。
    • 清空已收到數據集合從起始部分到當前位置之間的?inv?數據。
  2. 如果對等節點已斷開,則直接返回。
  3. 如果對等節點要處理的數據不為空,則直接返回。這樣維護了響應的順序。
  4. 如果對等節點已暫停,則直接返回。緩沖區太滿而不能進行繼續處理。
  5. 如果節點對象待處理的消息列表?vProcessMsg?為空,則返回假。否則,取出第一個消息對象,放入消息列表?msgs?中,并待處理的消息列表中刪除。將節點對象處理隊列大小減去已刪除的消息對象收到的數據長度與消息頭部大小之和。根據節點對象處理隊列大小與允許的接收上限比較,如果大于接收上限,則設置節點暫停接收消息。
    if (pfrom->vProcessMsg.empty()) ? ? return false; msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE; pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman->GetReceiveFloodSize(); fMoreWork = !pfrom->vProcessMsg.empty();
  6. 生成一個消息對象,設置其版本為對等節點已接收到的版本。
    CNetMessage& msg(msgs.front()); msg.SetVersion(pfrom->GetRecvVersion());
  7. 驗證消息的?MESSAGESTART?是否有效。如果無效,則設置對等節點為斷開,然后返回。
    if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { ? ? LogPrint(BCLog::NET, "PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->GetId()); ? ? pfrom->fDisconnect = true; ? ? return false; }
  8. 從消息對象中取得消息頭部,并進行驗證。如果無效,則返回。
    CMessageHeader& hdr = msg.hdr; if (!hdr.IsValid(chainparams.MessageStart())) { ? ? LogPrint(BCLog::NET, "PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->GetId()); ? ? return fMoreWork; }
  9. 從消息對象中取得具體的命令和消息大小。
    std::string strCommand = hdr.GetCommand(); unsigned int nMessageSize = hdr.nMessageSize;
  10. 調用消息對象自身的哈希,并與消息頭部的校驗和進行驗證。如果驗證失敗,則返回。
    const uint256& hash = msg.GetMessageHash(); if (memcmp(hash.begin(), hdr.pchChecksum, CMessageHeader::CHECKSUM_SIZE) != 0) { ? ? LogPrint(BCLog::NET, "%s(%s, %u bytes): CHECKSUM ERROR expected %s was %s\n", __func__, ? ? ? SanitizeString(strCommand), nMessageSize, ? ? ? HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE), ? ? ? HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE)); ? ? return fMoreWork; }
  11. 調用?ProcessMessage?方法,進行消息處理。對于比特幣網絡來說,最最重要的方法來了。這個方法處理比特幣的所有具體,比如:版本消息、獲取區塊消息等。因為這個方法是如此的重要,所以我們把留在下一篇文章中進行說明。
  12. 調用?SendRejectsAndCheckIfBanned?方法,進行可能的?reject?處理。具體如下:
    • 取得節點的狀態對象。
    • 如果啟用 BIP61,則遍歷狀態對象中保存的?reject?,調用?PushMessage?方法,發送?reject?消息。
    • 清空狀態對象中保存的?reject.
    • 如果狀態對象的禁止屬性?fShouldBan?為真,則:
      • 設置禁止屬性為假。
      • 如果節點在白名單中,或者是手動連接的,則進行警告。否則,進行下面的處理:
      • 設置對等節點的斷開連接屬性為真;如果節點的地址是本地地址,則進行警告,否則,調用?Ban?方法,禁止對等節點連接。

ProcessGetBlockData

  1. 生成一些內部變量,并設置為已緩存的變量值。
  2. 調用?LookupBlockIndex?方法,查詢消息對應的區塊索引。如果區塊索引存在,并且索引對應的區塊在鏈上的交易也存在,但區塊還沒有驗證過,那么設置變量?need_activate_chain?為真。const CBlockIndex* pindex = LookupBlockIndex(inv.hash);if (pindex) {? if (pindex->nChainTx && !pindex->IsValid(BLOCK_VALID_SCRIPTS) && pindex->IsValid(BLOCK_VALID_TREE)) {? need_activate_chain = true;? }}
  3. 如果需要激活區塊鏈,那么調用?ActivateBestChain?方法來激活。
  4. 如果區塊索引存在,調用?BlockRequestAllowed?方法檢查是否允許發送數據。
  5. 如果允許發送,且達到歷史區塊服務限額的情況下,斷開節點連接,并設置發送標志為假。
    if (send && connman->OutboundTargetReached(true) && ( ((pindexBestHeader != nullptr) && (pindexBestHeader->GetBlockTime() - pindex->GetBlockTime() > HISTORICAL_BLOCK_AGE)) || inv.type == MSG_FILTERED_BLOCK) && !pfrom->fWhitelisted) { ? ? LogPrint(BCLog::NET, "historical block serving limit reached, disconnect peer=%d\n", pfrom->GetId()); ? ? pfrom->fDisconnect = true; ? ? send = false; }
  6. 如果允許發送,且節點不在白名單中,且節點支持的服務是?NODE_NETWORK_LIMITED(只支持 288個區塊,即2天內生成的區塊),且不支持?NODE_NETWORK?服務,且區塊鏈棧頂的高度與當前區塊索引的高度之差大于網絡限制允許的最小區塊數(NODE_NETWORK_LIMITED_MIN_BLOCKS,288個區塊)加上額外的兩個區塊(為了防止競爭,指的是分叉?所以增加兩個區塊緩沖),則斷開節點連接,并設置發送標志為假。
    if (send && !pfrom->fWhitelisted && ( ? ? ? ? (((pfrom->GetLocalServices() & NODE_NETWORK_LIMITED) == NODE_NETWORK_LIMITED) && ? ? ? ? ((pfrom->GetLocalServices() & NODE_NETWORK) != NODE_NETWORK) && ? ? ? ? (chainActive.Tip()->nHeight - pindex->nHeight > (int)NODE_NETWORK_LIMITED_MIN_BLOCKS + 2) ) ? )) { ? ? pfrom->fDisconnect = true; ? ? send = false; }
  7. 如果允許發送,并且這個區塊索引的狀態等于?BLOCK_HAVE_DATA?(全部數據都?blk*.dat?文件中可用),則進行下面的處理:
    • 生成一個指向區塊對象的智能指針對象?pblock。
    • 如果最近區塊對象存在,且其哈希與區塊索引對象的哈希一樣,那么設置?pblock?為最近區塊對象;
    • 否則,如果消息對象類型是隔離見證區塊,那么:調用?ReadRawBlockFromDisk?方法,從磁盤中讀取原始的區塊數據。如果可以讀到,則調用?PushMessage?方法,發送區塊數據。這種情況下,直接發送了區塊數據,所以不設置?pblock?變量。
    • 否則,調用?ReadBlockFromDisk?方法,從磁盤中讀取原始的區塊數據。如果可以讀到,則設置?pblock?為讀取到的數據。
    • 如果?pblock?為真,則發送消息。如果消息對象類型是區塊,則調用?PushMessage?方法,發送標志為?SERIALIZE_TRANSACTION_NO_WITNESS?的區塊消息;如果消息類型是隔離見證區塊,則調用?PushMessage?方法,發送區塊消息;

      如果消息類型是過濾區塊,則:如果對節對象的 Bloom 過濾器存在,那么生成默克爾區塊,并設置發送過濾區塊標志為真;如果發送過濾區塊標志為真,則調用?PushMessage?方法,發送默克爾區塊,然后調用?PushMessage?方法,發送默克爾區塊的每個交易數據,標志為?SERIALIZE_TRANSACTION_NO_WITNESS;

      如果消息類型是緊湊區塊,同樣調用?PushMessage?方法,發送區塊消息。

    • 如果消息的哈希等于節點的繼續發送屬性(hashContinue?屬性,代表繼續發送的哈希),則:生成一個?CInv?向量;然后,構造一個?CInv?對象,類型為區塊,哈希為區塊鏈棧頂元素的哈希;然后,調用?PushMessage?方法,發送?inv?消息;最后,設置節點的繼續發送屬性為空。

    代碼如下:

    if (send && (pindex->nStatus & BLOCK_HAVE_DATA)){ ? std::shared_ptr<const CBlock> pblock; ? if (a_recent_block && a_recent_block->GetHash() == pindex->GetBlockHash()) { ? ? ? pblock = a_recent_block; ? } else if (inv.type == MSG_WITNESS_BLOCK) { ? ? ? std::vector<uint8_t> block_data; ? ? ? if (!ReadRawBlockFromDisk(block_data, pindex, chainparams.MessageStart())) { ? ? ? ? ? assert(!"cannot load block from disk"); ? ? ? } ? ? ? connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, MakeSpan(block_data))); ? } else { ? ? ? std::shared_ptr<CBlock> pblockRead = std::make_shared<CBlock>(); ? ? ? if (!ReadBlockFromDisk(*pblockRead, pindex, consensusParams)) ? ? ? ? ? assert(!"cannot load block from disk"); ? ? ? pblock = pblockRead; ? } ? if (pblock) { ? ? ? if (inv.type == MSG_BLOCK) ? ? ? ? ? connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, *pblock)); ? ? ? else if (inv.type == MSG_WITNESS_BLOCK) ? ? ? ? ? connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock)); ? ? ? else if (inv.type == MSG_FILTERED_BLOCK) ? ? ? { ? ? ? ? ? bool sendMerkleBlock = false; ? ? ? ? ? CMerkleBlock merkleBlock; ? ? ? ? ? { ? ? ? ? ? ? ? LOCK(pfrom->cs_filter); ? ? ? ? ? ? ? if (pfrom->pfilter) { ? ? ? ? ? ? ? ? ? sendMerkleBlock = true; ? ? ? ? ? ? ? ? ? merkleBlock = CMerkleBlock(*pblock, *pfrom->pfilter); ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? ? ? if (sendMerkleBlock) { ? ? ? ? ? ? ? connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::MERKLEBLOCK, merkleBlock)); ? ? ? ? ? ? ? typedef std::pair<unsigned int, uint256> PairType; ? ? ? ? ? ? ? for (PairType& pair : merkleBlock.vMatchedTxn) ? ? ? ? ? ? ? ? ? connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, *pblock->vtx[pair.first])); ? ? ? ? ? } ? ? ? } ? ? ? else if (inv.type == MSG_CMPCT_BLOCK) ? ? ? { ? ? ? ? ? bool fPeerWantsWitness = State(pfrom->GetId())->fWantsCmpctWitness; ? ? ? ? ? int nSendFlags = fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS; ? ? ? ? ? if (CanDirectFetch(consensusParams) && pindex->nHeight >= chainActive.Height() - MAX_CMPCTBLOCK_DEPTH) { ? ? ? ? ? ? ? if ((fPeerWantsWitness || !fWitnessesPresentInARecentCompactBlock) && a_recent_compact_block && a_recent_compact_block->header.GetHash() == pindex->GetBlockHash()) { ? ? ? ? ? ? ? ? ? connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *a_recent_compact_block)); ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? CBlockHeaderAndShortTxIDs cmpctblock(*pblock, fPeerWantsWitness); ? ? ? ? ? ? ? ? ? connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); ? ? ? ? ? ? ? } ? ? ? ? ? } else { ? ? ? ? ? ? ? connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCK, *pblock)); ? ? ? ? ? } ? ? ? } ? } ? if (inv.hash == pfrom->hashContinue) ? { ? ? ? std::vector<CInv> vInv; ? ? ? vInv.push_back(CInv(MSG_BLOCK, chainActive.Tip()->GetBlockHash())); ? ? ? connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::INV, vInv)); ? ? ? pfrom->hashContinue.SetNull(); ? } }

2、SendMessages

本方法主要處理對等節點發送的相關邏輯,具體代碼在?net_processing.cpp?文件中。

  1. 如果對等節點間還沒有完成握手,或者已經斷開連接,則返回。
    if (!pto->fSuccessfullyConnected || pto->fDisconnect) ? ? return true;
  2. 如果對等節點的 Ping 已經被請求,則設置?pingSend?變量為真。
  3. 如果對等節點沒有期望 Pong 回復(即對等節點的?nPingNonceSent?等于0),且用戶開始 ping 的時間(nPingUsecStart?)加上規定的節點 ping 間隔小于當前時間,則設置?pingSend?變量為真。
  4. 如果?pingSend?為真,處理如下:
    • 設置對等節點的 Ping 已經被請求假。
    • 設置開始 ping 的時間為當前時間。
    • 如果對等節點的版本大于 BIP 0031 規定的版本(60000),則設置對等節點的?nPingNonceSent?為隨機變量?nonce,調用?PushMessage?方法,發送?ping?消息,消息中包括?nonce;否則,即對等節點不支持帶隨機數的 Ping 命令,則設置設置對等節點的?nPingNonceSent?為0,調用?PushMessage?方法,發送?ping消息,消息中不包括?nonce。
    if (pingSend) { ? uint64_t nonce = 0; ? while (nonce == 0) { ? ? ? GetRandBytes((unsigned char*)&nonce, sizeof(nonce)); ? } ? pto->fPingQueued = false; ? pto->nPingUsecStart = GetTimeMicros(); ? if (pto->nVersion > BIP0031_VERSION) { ? ? ? pto->nPingNonceSent = nonce; ? ? ? connman->PushMessage(pto, msgMaker.Make(NetMsgType::PING, nonce)); ? } else { ? ? ? pto->nPingNonceSent = 0; ? ? ? connman->PushMessage(pto, msgMaker.Make(NetMsgType::PING)); ? } }
  5. 調用?SendRejectsAndCheckIfBanned?方法,進行可能的?reject?處理。如果該函數返回為真,則返回。
  6. 獲取節點的狀態對象。
  7. 如果當前沒有在 IBD 下載中(IsInitialBlockDownload?函數為假),且節點下次發送本地地址的時間(nNextLocalAddrSend)小于當前時間,那么進行如下處理:
    • 調用?AdvertiseLocal?方法,發送我們自己本身的地址給對等節點。方法的主要邏輯是調用節點對象的?PushAddress?方法,把要發送的地址保存在?vAddrToSend?集合中。
    • 調用?PoissonNextSend?方法,計算下次發送地址的時間,并設置節點的下次發送本地地址的時間為該值。
  8. 如果節點的下次發送地址時間小于當前時間,則:
    • 調用?PoissonNextSend?方法,計算下次發送地址的時間,并保存為節點的下次發送地址時間?nNextAddrSend?屬性。
    • 生成一個地址向量集合?vAddr。
    • 遍歷節點待發送的地址向量?vAddrToSend,如果當前地址不在節點的概率“跟蹤最近插入的”集合addrKnown?中,則:
      • 保存當前地址到節點的概率“跟蹤最近插入的”集合中。
      • 保存地址到?vAddr?向量中。
      • 如果當前的地址向量集合數量大于等于 1000,則調用?PushMessage?方法,發送地址向量;然后清空地址向量集合。
    • 清空節點的發送地址集合?vAddrToSend。
    • 如果地址向量集合不空,即地址向量集合的數量不超過 1000個,則調用?PushMessage?方法,發送地址消息。
    • 如果節點的待發送的地址向量集合的預分配的內存空間(capacity())大于40,則調用其?shrink_to_fit方法來縮減空間,即只允許發送一次大的地址包。
    if (pto->nNextAddrSend < nNow) { ? pto->nNextAddrSend = PoissonNextSend(nNow, AVG_ADDRESS_BROADCAST_INTERVAL); ? std::vector<CAddress> vAddr; ? vAddr.reserve(pto->vAddrToSend.size()); ? for (const CAddress& addr : pto->vAddrToSend) ? { ? ? ? if (!pto->addrKnown.contains(addr.GetKey())) ? ? ? { ? ? ? ? ? pto->addrKnown.insert(addr.GetKey()); ? ? ? ? ? vAddr.push_back(addr); ? ? ? ? ? // receiver rejects addr messages larger than 1000 ? ? ? ? ? if (vAddr.size() >= 1000) ? ? ? ? ? { ? ? ? ? ? ? ? connman->PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr)); ? ? ? ? ? ? ? vAddr.clear(); ? ? ? ? ? } ? ? ? } ? } ? pto->vAddrToSend.clear(); ? if (!vAddr.empty()) ? ? ? connman->PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr)); ? if (pto->vAddrToSend.capacity() > 40) ? ? ? pto->vAddrToSend.shrink_to_fit(); }
  9. 接下來,開始區塊同步。
    • 如果指向最佳區塊鏈頭部的指針(pindexBestHeader)為空指針,則設置其為當前活躍區塊鏈的棧頂元素指針。
      if (pindexBestHeader == nullptr) ? pindexBestHeader = chainActive.Tip();
    • 如果還沒有從這個節點同步區塊頭部,并且節點的?fClient、fImporting、fReindex?等屬性為假,進一步,如果已經同步的節點數量為 0 且需要獲取區塊數據,或者最佳區塊頭部的區塊時間距離現在已超過 24 小時,那么調用?PushMessage?方法,發出請求?GETHEADERS?命令,開始同步區塊頭部。
      bool fFetch = state.fPreferredDownload || (nPreferredDownload == 0 && !pto->fClient && !pto->fOneShot); // Download if this is a nice peer, or we have no nice peers and this one might do. if (!state.fSyncStarted && !pto->fClient && !fImporting && !fReindex) { ? if ((nSyncStarted == 0 && fFetch) || pindexBestHeader->GetBlockTime() > GetAdjustedTime() - 24 * 60 * 60) { ? ? ? state.fSyncStarted = true; ? ? ? state.nHeadersSyncTimeout = GetTimeMicros() + HEADERS_DOWNLOAD_TIMEOUT_BASE + HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER * (GetAdjustedTime() - pindexBestHeader->GetBlockTime())/(consensusParams.nPowTargetSpacing); ? ? ? nSyncStarted++; ? ? ? const CBlockIndex *pindexStart = pindexBestHeader; ? ? ? if (pindexStart->pprev) ? ? ? ? ? pindexStart = pindexStart->pprev; ? ? ? LogPrint(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), pto->nStartingHeight); ? ? ? connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexStart), uint256())); ? } }
  10. 如果當前不是重建索引、重新導入和 IBD 下載期間,則重新發送尚未進入區塊的錢包交易。if (!fReindex && !fImporting && !IsInitialBlockDownload()){
    GetMainSignals().Broadcast(nTimeBestReceived, connman);

    }

  11. 如果不需要轉化為?Inv?消息,那么進行如下處理:遍歷區塊頭部進行區塊公告的集合(vBlockHashesToAnnounce),按如下處理:
    • 調用?LookupBlockIndex?方法,查找當前對應的區塊索引。
    • 如果當前活躍區塊鏈上沒有這個索引,那么設置需要轉化為?Inv?消息為真,然后退出當前循環。
      const CBlockIndex* pindex = LookupBlockIndex(hash); if (chainActive[pindex->nHeight] != pindex) { ? fRevertToInv = true; ? break; }
    • 如果最佳索引不是空指針,并且當前區塊索引不等于最佳指針,那么設置需要轉化為?Inv?消息為真,然后退出當前循環。
      if (pBestIndex != nullptr && pindex->pprev != pBestIndex) { ? fRevertToInv = true; ? break; }
    • 接下來,設置當前區塊索引為最佳索引,處理哪些區塊索引可以放入頭部集合。
      pBestIndex = pindex; if (fFoundStartingHeader) { ? // add this to the headers message ? vHeaders.push_back(pindex->GetBlockHeader()); } else if (PeerHasHeader(&state, pindex)) { ? continue; // keep looking for the first new block } else if (pindex->pprev == nullptr || PeerHasHeader(&state, pindex->pprev)) { ? // Peer doesn't have this header but they do have the prior one. ? // Start sending headers. ? fFoundStartingHeader = true; ? vHeaders.push_back(pindex->GetBlockHeader()); } else { ? // Peer doesn't have this header or the prior one -- nothing will ? // connect, so bail out. ? fRevertToInv = true; ? break; }
  12. 如果不需要轉化成?Inv?消息,并且區塊頭部集合(vHeaders)不空,進行下面的處理:
    • 如果區塊頭部長度為1,并且需要下載頭部和ID,那么:生成發送標志,如果對等節點想要緊湊的隔離見證類型,則設置發送標志為 0,否則設置為?SERIALIZE_TRANSACTION_NO_WITNESS。如果最近的區塊哈希與最佳區塊索引的哈希相等,則調用?PushMessage?方法,發送消息,類型為?CMPCTBLOCK,然后設置區塊是從緩存區中加載的標志為真。

      如果區塊不是從緩存區中加載的,那么就需要調用?ReadBlockFromDisk?方法,從硬盤中加載區塊,然后再調用?PushMessage?方法,發送消息,類型為?CMPCTBLOCK。

    • 否則,如果不是優先下載頭部(即區塊狀態對象?fPreferHeaders)為假,那么就調用?PushMessage?方法,發送消息,類型為?HEADERS,然后設置區塊狀態對象的?pindexBestHeaderSent?屬性為當前的最佳索引區塊。

    具體代碼如下:

    if (!fRevertToInv && !vHeaders.empty()) { ? if (vHeaders.size() == 1 && state.fPreferHeaderAndIDs) { ? ? ? int nSendFlags = state.fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS; ? ? ? bool fGotBlockFromCache = false; ? ? ? { ? ? ? ? ? LOCK(cs_most_recent_block); ? ? ? ? ? if (most_recent_block_hash == pBestIndex->GetBlockHash()) { ? ? ? ? ? ? ? if (state.fWantsCmpctWitness || !fWitnessesPresentInMostRecentCompactBlock) ? ? ? ? ? ? ? ? ? connman->PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *most_recent_compact_block)); ? ? ? ? ? ? ? else { ? ? ? ? ? ? ? ? ? CBlockHeaderAndShortTxIDs cmpctblock(*most_recent_block, state.fWantsCmpctWitness); ? ? ? ? ? ? ? ? ? connman->PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); ? ? ? ? ? ? ? } ? ? ? ? ? ? ? fGotBlockFromCache = true; ? ? ? ? ? } ? ? ? } ? ? ? if (!fGotBlockFromCache) { ? ? ? ? ? CBlock block; ? ? ? ? ? bool ret = ReadBlockFromDisk(block, pBestIndex, consensusParams); ? ? ? ? ? assert(ret); ? ? ? ? ? CBlockHeaderAndShortTxIDs cmpctblock(block, state.fWantsCmpctWitness); ? ? ? ? ? connman->PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); ? ? ? } ? ? ? state.pindexBestHeaderSent = pBestIndex; ? } else if (state.fPreferHeaders) { ? ? ? connman->PushMessage(pto, msgMaker.Make(NetMsgType::HEADERS, vHeaders)); ? ? ? state.pindexBestHeaderSent = pBestIndex; ? } else ? ? ? fRevertToInv = true; }
  1. 如果需要轉化成?Inv?消息,進一步,如果使用區塊頭部進行區塊公告的集合(vBlockHashesToAnnounce),則進行如下處理:
    • 返回集合最后一個元素,調用?LookupBlockIndex?方法,查找這個元素對應的區塊索引。
    • 如果活躍區塊鏈在區塊索引指定的高度上對應的索引不是我們找到的索引,即要公告的區塊不在主鏈上,則打印一個警告。
    • 如果這個區塊不在節點的區塊鏈上,那么就把這個區塊放在區塊庫存清單中。生成一個?Inv?消息,類型是區塊,然后調用節點對象的?PushInventory?方法,放入節點對象的庫存清單集合中。
  2. 清空節點對象的區塊頭部進行區塊公告的集合。
  3. 生成?vInv?集合,并設置其長度。
    std::vector<CInv> vInv; vInv.reserve(std::max<size_t>(pto->vInventoryBlockToSend.size(), INVENTORY_BROADCAST_MAX));
  4. 遍歷已經公告的區塊 ID 列表,如果沒有達到?vInv?集合的最大長度,則加入集合尾部,如果已經達到則調用?PushMessage?方法,發送?INV?消息。然后,清空已經公告的區塊 ID 列表。
    for (const uint256& hash : pto->vInventoryBlockToSend) { ? ? vInv.push_back(CInv(MSG_BLOCK, hash)); ? ? if (vInv.size() == MAX_INV_SZ) { ? ? ? ? connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); ? ? ? ? vInv.clear(); ? ? } } pto->vInventoryBlockToSend.clear();
  5. 如果節點對象下次發送?Inv?消息的時間已經小于當前時間,那么設置?fSendTrickle?變量為真,根據是否為入門節點,設置不同的節點對象下次發送?Inv?消息。
    bool fSendTrickle = pto->fWhitelisted; if (pto->nNextInvSend < nNow) { ? ? fSendTrickle = true; ? ? if (pto->fInbound) { ? ? ? ? pto->nNextInvSend = connman->PoissonNextSendInbound(nNow, INVENTORY_BROADCAST_INTERVAL); ? ? } else { ? ? ? ? pto->nNextInvSend = PoissonNextSend(nNow, INVENTORY_BROADCAST_INTERVAL >> 1); ? ? } }
  6. 如果發送時間已到,但是節點請求我們不要發送中繼交易,那么清空節點對象的發送?Inv?消息的集合集合?setInventoryTxToSend。
    if (fSendTrickle) { ? ? LOCK(pto->cs_filter); ? ? if (!pto->fRelayTxes) pto->setInventoryTxToSend.clear(); }
  7. 如果發送時間已到,并且節點請求過 BIP35 規定的?mempool?,那么:
    • 調用內存池對象的?infoAll?方法,返回內存池交易信息集合。
    • 設置區塊對象的?mempool?為假。
    • 獲取節點設置的最小交易費用過濾器。默認為 0。
      CAmount filterrate = 0; { ? LOCK(pto->cs_feeFilter); ? filterrate = pto->minFeeFilter; }
    • 遍歷內存池交易信息集合并進行處理。用當前交易信息生成一個 inv 對象,然后從區塊對象的?setInventoryTxToSend?集合中刪除對應的交易信息。如果設置了最小交易費用,并且當前交易的費用小于設置的最小費用,那么處理下一個。
      const uint256& hash = txinfo.tx->GetHash(); CInv inv(MSG_TX, hash); pto->setInventoryTxToSend.erase(hash); if (filterrate) { ? if (txinfo.feeRate.GetFeePerK() < filterrate) ? ? ? continue; }

      如果區塊對象設置了布隆過濾器,并且當前交易不符合要求,那么處理下一個。

      if (pto->pfilter) { ? if (!pto->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue; }

      把當前交易的哈希加入區塊對象的?filterInventoryKnown?集合;把 inv 對象加入?vInv?集合。如果集合已經達到規定的最大數量,那么調用?PushMessage?方法,發送?INV?消息給遠程對等節點,然后清空集合。

      pto->filterInventoryKnown.insert(hash); vInv.push_back(inv); if (vInv.size() == MAX_INV_SZ) { ? connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); ? vInv.clear(); }
    • 設置區塊對象的?timeLastMempoolReq?屬性。
  8. 如果需要發送,即發送時間已到,那么:
    • 生成交易的向量集合,并設置其大小。然后從區塊對象的?setInventoryTxToSend?集合中取得其迭代器放進新生成的向量集合。
      std::vector<std::set<uint256>::iterator> vInvTx; vInvTx.reserve(pto->setInventoryTxToSend.size()); for (std::set<uint256>::iterator it = pto->setInventoryTxToSend.begin(); it != pto->setInventoryTxToSend.end(); it++) { ? vInvTx.push_back(it); }
    • 設置區塊對象的最小交易費用,默認為0。
      CAmount filterrate = 0; { LOCK(pto->cs_feeFilter); filterrate = pto->minFeeFilter; }
    • 如果?vInvTx?集合不空,并且需要中繼的交易數量小于規定的最大 INV 廣播數量,那就進行?while?循環。下面是循環體:
      while (!vInvTx.empty() && nRelayedTransactions < INVENTORY_BROADCAST_MAX) { std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder); std::set<uint256>::iterator it = vInvTx.back(); vInvTx.pop_back(); uint256 hash = *it; pto->setInventoryTxToSend.erase(it); if (pto->filterInventoryKnown.contains(hash)) { continue; } auto txinfo = mempool.info(hash); if (!txinfo.tx) { continue; } if (filterrate && txinfo.feeRate.GetFeePerK() < filterrate) { continue; } if (pto->pfilter && !pto->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue; vInv.push_back(CInv(MSG_TX, hash)); nRelayedTransactions++; { while (!vRelayExpiration.empty() && vRelayExpiration.front().first < nNow) { mapRelay.erase(vRelayExpiration.front().second); vRelayExpiration.pop_front(); } auto ret = mapRelay.insert(std::make_pair(hash, std::move(txinfo.tx))); if (ret.second) { vRelayExpiration.push_back(std::make_pair(nNow + 15 * 60 * 1000000, ret.first)); } } if (vInv.size() == MAX_INV_SZ) { connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); vInv.clear(); } pto->filterInventoryKnown.insert(hash); }
  9. 如果?vInv?集合不空,那么就調用?PushMessage?方法,發送?INV?消息。
  10. 如果區塊狀態對象的停止下載區塊的時間不等于0,并且小于當前時間減去規定的時間,那么設置區塊對象為斷開,然后返回真。
    nNow = GetTimeMicros(); if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) { pto->fDisconnect = true; return true; }
  11. 如果區塊下載超時,那么設置區塊對象為斷開,然后返回真。
    if (state.vBlocksInFlight.size() > 0) { QueuedBlock &queuedBlock = state.vBlocksInFlight.front(); int nOtherPeersWithValidatedDownloads = nPeersWithValidatedDownloads - (state.nBlocksInFlightValidHeaders > 0); if (nNow > state.nDownloadingSince + consensusParams.nPowTargetSpacing * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) { pto->fDisconnect = true; return true; } }
  12. 接下來,檢查區塊頭部下載是否超時。
    if (state.fSyncStarted && state.nHeadersSyncTimeout < std::numeric_limits<int64_t>::max()) { if (pindexBestHeader->GetBlockTime() <= GetAdjustedTime() - 24*60*60) { if (nNow > state.nHeadersSyncTimeout && nSyncStarted == 1 && (nPreferredDownload - state.fPreferredDownload >= 1)) { if (!pto->fWhitelisted) { LogPrintf("Timeout downloading headers from peer=%d, disconnecting\n", pto->GetId()); pto->fDisconnect = true; return true; } else { state.fSyncStarted = false; nSyncStarted--; state.nHeadersSyncTimeout = 0; } } } else { state.nHeadersSyncTimeout = std::numeric_limits<int64_t>::max(); } }
  13. 獲取所有節點請求的數據,包括區塊與非區塊,并放在?vGetData?集合中,然后調用?PushMessage?方法,發送給遠程節點。
    std::vector<CInv> vGetData; if (!pto->fClient && ((fFetch && !pto->m_limited_node) || !IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { std::vector<const CBlockIndex*> vToDownload; NodeId staller = -1; FindNextBlocksToDownload(pto->GetId(), MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller, consensusParams); for (const CBlockIndex *pindex : vToDownload) { uint32_t nFetchFlags = GetFetchFlags(pto); vGetData.push_back(CInv(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash())); MarkBlockAsInFlight(pto->GetId(), pindex->GetBlockHash(), pindex); } if (state.nBlocksInFlight == 0 && staller != -1) { if (State(staller)->nStallingSince == 0) { State(staller)->nStallingSince = nNow; } } } while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow) { const CInv& inv = (*pto->mapAskFor.begin()).second; if (!AlreadyHave(inv)) { LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); vGetData.push_back(inv); if (vGetData.size() >= 1000) { connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); vGetData.clear(); } } else { pto->setAskFor.erase(inv.hash); } pto->mapAskFor.erase(pto->mapAskFor.begin()); } if (!vGetData.empty()) connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));

我是區小白,Ulord全球社區聯盟(優得社區)核心區塊鏈技術開發者,深入研究比特幣,以太坊,EOS Dash,Rsk,Java, Nodejs,PHP,Python,C++ 我希望能聚集更多區塊鏈開發者,一起學習共同進步。為了更高效的交流探討區塊鏈開發過程中遇到的問題,歡迎將以上問題的答案在帖子下面留言。

贊(0)

評論 搶沙發

  • 昵稱 (必填)
  • 郵箱 (必填)
  • 網址
p3试机号99