From 34d68bf3a3db2b78c07180416949bbc58bd0b682 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 8 Dec 2017 10:19:57 -0800 Subject: [index] Create new TxIndex class. The TxIndex will be responsible for building the transaction index concurrently with the main validation thread by implementing ValidationInterface. This does not process blocks concurrently yet. --- src/index/txindex.cpp | 157 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 src/index/txindex.cpp (limited to 'src/index/txindex.cpp') diff --git a/src/index/txindex.cpp b/src/index/txindex.cpp new file mode 100644 index 000000000..27cf844ce --- /dev/null +++ b/src/index/txindex.cpp @@ -0,0 +1,157 @@ +// Copyright (c) 2017-2018 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include +#include +#include +#include +#include +#include + +template +static void FatalError(const char* fmt, const Args&... args) +{ + std::string strMessage = tfm::format(fmt, args...); + SetMiscWarning(strMessage); + LogPrintf("*** %s\n", strMessage); + uiInterface.ThreadSafeMessageBox( + "Error: A fatal internal error occurred, see debug.log for details", + "", CClientUIInterface::MSG_ERROR); + StartShutdown(); +} + +TxIndex::TxIndex(std::unique_ptr db) : + m_db(std::move(db)), m_synced(false), m_best_block_index(nullptr) +{} + +bool TxIndex::Init() +{ + LOCK(cs_main); + + // Attempt to migrate txindex from the old database to the new one. Even if + // chain_tip is null, the node could be reindexing and we still want to + // delete txindex records in the old database. + if (!m_db->MigrateData(*pblocktree, chainActive.GetLocator())) { + return false; + } + + CBlockLocator locator; + if (!m_db->ReadBestBlock(locator)) { + locator.SetNull(); + } + + m_best_block_index = FindForkInGlobalIndex(chainActive, locator); + m_synced = m_best_block_index.load() == chainActive.Tip(); + return true; +} + +bool TxIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex) +{ + CDiskTxPos pos(pindex->GetBlockPos(), GetSizeOfCompactSize(block.vtx.size())); + std::vector> vPos; + vPos.reserve(block.vtx.size()); + for (const auto& tx : block.vtx) { + vPos.emplace_back(tx->GetHash(), pos); + pos.nTxOffset += ::GetSerializeSize(*tx, SER_DISK, CLIENT_VERSION); + } + return m_db->WriteTxs(vPos); +} + +void TxIndex::BlockConnected(const std::shared_ptr& block, const CBlockIndex* pindex, + const std::vector& txn_conflicted) +{ + if (!m_synced) { + return; + } + + const CBlockIndex* best_block_index = m_best_block_index.load(); + if (!best_block_index) { + if (pindex->nHeight != 0) { + FatalError("%s: First block connected is not the genesis block (height=%d)", + __func__, pindex->nHeight); + return; + } + } else { + // Ensure block connects to an ancestor of the current best block. This should be the case + // most of the time, but may not be immediately after the the sync thread catches up and sets + // m_synced. Consider the case where there is a reorg and the blocks on the stale branch are + // in the ValidationInterface queue backlog even after the sync thread has caught up to the + // new chain tip. In this unlikely event, log a warning and let the queue clear. + if (best_block_index->GetAncestor(pindex->nHeight - 1) != pindex->pprev) { + LogPrintf("%s: WARNING: Block %s does not connect to an ancestor of " /* Continued */ + "known best chain (tip=%s); not updating txindex\n", + __func__, pindex->GetBlockHash().ToString(), + best_block_index->GetBlockHash().ToString()); + return; + } + } + + if (WriteBlock(*block, pindex)) { + m_best_block_index = pindex; + } else { + FatalError("%s: Failed to write block %s to txindex", + __func__, pindex->GetBlockHash().ToString()); + return; + } +} + +void TxIndex::SetBestChain(const CBlockLocator& locator) +{ + if (!m_synced) { + return; + } + + const uint256& locator_tip_hash = locator.vHave.front(); + const CBlockIndex* locator_tip_index; + { + LOCK(cs_main); + locator_tip_index = LookupBlockIndex(locator_tip_hash); + } + + if (!locator_tip_index) { + FatalError("%s: First block (hash=%s) in locator was not found", + __func__, locator_tip_hash.ToString()); + return; + } + + // This checks that SetBestChain callbacks are received after BlockConnected. The check may fail + // immediately after the the sync thread catches up and sets m_synced. Consider the case where + // there is a reorg and the blocks on the stale branch are in the ValidationInterface queue + // backlog even after the sync thread has caught up to the new chain tip. In this unlikely + // event, log a warning and let the queue clear. + const CBlockIndex* best_block_index = m_best_block_index.load(); + if (best_block_index->GetAncestor(locator_tip_index->nHeight) != locator_tip_index) { + LogPrintf("%s: WARNING: Locator contains block (hash=%s) not on known best " /* Continued */ + "chain (tip=%s); not writing txindex locator\n", + __func__, locator_tip_hash.ToString(), + best_block_index->GetBlockHash().ToString()); + return; + } + + if (!m_db->WriteBestBlock(locator)) { + error("%s: Failed to write locator to disk", __func__); + } +} + +bool TxIndex::FindTx(const uint256& txid, CDiskTxPos& pos) const +{ + return m_db->ReadTxPos(txid, pos); +} + +void TxIndex::Start() +{ + // Need to register this ValidationInterface before running Init(), so that + // callbacks are not missed if Init sets m_synced to true. + RegisterValidationInterface(this); + if (!Init()) { + FatalError("%s: txindex failed to initialize", __func__); + return; + } +} + +void TxIndex::Stop() +{ + UnregisterValidationInterface(this); +} -- cgit v1.2.3 From 94b4f8bbb9e7e37f3057b47bf13a74de12b8e0cc Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 8 Dec 2017 10:42:31 -0800 Subject: [index] TxIndex initial sync thread. TxIndex starts up a background thread to get in sync with the block index before blocks are processed through the ValidationInterface. --- src/index/txindex.cpp | 89 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) (limited to 'src/index/txindex.cpp') diff --git a/src/index/txindex.cpp b/src/index/txindex.cpp index 27cf844ce..56966021a 100644 --- a/src/index/txindex.cpp +++ b/src/index/txindex.cpp @@ -2,6 +2,7 @@ // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. +#include #include #include #include @@ -10,6 +11,9 @@ #include #include +constexpr int64_t SYNC_LOG_INTERVAL = 30; // seconds +constexpr int64_t SYNC_LOCATOR_WRITE_INTERVAL = 30; // seconds + template static void FatalError(const char* fmt, const Args&... args) { @@ -47,6 +51,75 @@ bool TxIndex::Init() return true; } +static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev) +{ + AssertLockHeld(cs_main); + + if (!pindex_prev) { + return chainActive.Genesis(); + } + + const CBlockIndex* pindex = chainActive.Next(pindex_prev); + if (pindex) { + return pindex; + } + + return chainActive.Next(chainActive.FindFork(pindex_prev)); +} + +void TxIndex::ThreadSync() +{ + const CBlockIndex* pindex = m_best_block_index.load(); + if (!m_synced) { + auto& consensus_params = Params().GetConsensus(); + + int64_t last_log_time = 0; + int64_t last_locator_write_time = 0; + while (true) { + { + LOCK(cs_main); + const CBlockIndex* pindex_next = NextSyncBlock(pindex); + if (!pindex_next) { + WriteBestBlock(pindex); + m_best_block_index = pindex; + m_synced = true; + break; + } + pindex = pindex_next; + } + + int64_t current_time = GetTime(); + if (last_log_time + SYNC_LOG_INTERVAL < current_time) { + LogPrintf("Syncing txindex with block chain from height %d\n", pindex->nHeight); + last_log_time = current_time; + } + + if (last_locator_write_time + SYNC_LOCATOR_WRITE_INTERVAL < current_time) { + WriteBestBlock(pindex); + last_locator_write_time = current_time; + } + + CBlock block; + if (!ReadBlockFromDisk(block, pindex, consensus_params)) { + FatalError("%s: Failed to read block %s from disk", + __func__, pindex->GetBlockHash().ToString()); + return; + } + if (!WriteBlock(block, pindex)) { + FatalError("%s: Failed to write block %s to tx index database", + __func__, pindex->GetBlockHash().ToString()); + return; + } + } + } + + if (pindex) { + LogPrintf("txindex is enabled at height %d\n", pindex->nHeight); + } else { + LogPrintf("txindex is enabled\n"); + } +} + bool TxIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex) { CDiskTxPos pos(pindex->GetBlockPos(), GetSizeOfCompactSize(block.vtx.size())); @@ -59,6 +132,15 @@ bool TxIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex) return m_db->WriteTxs(vPos); } +bool TxIndex::WriteBestBlock(const CBlockIndex* block_index) +{ + LOCK(cs_main); + if (!m_db->WriteBestBlock(chainActive.GetLocator(block_index))) { + return error("%s: Failed to write locator to disk", __func__); + } + return true; +} + void TxIndex::BlockConnected(const std::shared_ptr& block, const CBlockIndex* pindex, const std::vector& txn_conflicted) { @@ -149,9 +231,16 @@ void TxIndex::Start() FatalError("%s: txindex failed to initialize", __func__); return; } + + m_thread_sync = std::thread(&TraceThread>, "txindex", + std::bind(&TxIndex::ThreadSync, this)); } void TxIndex::Stop() { UnregisterValidationInterface(this); + + if (m_thread_sync.joinable()) { + m_thread_sync.join(); + } } -- cgit v1.2.3 From 70d510d93c08a168407f55c932ab09c644dea3b8 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 8 Dec 2017 10:52:42 -0800 Subject: [index] Allow TxIndex sync thread to be interrupted. --- src/index/txindex.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'src/index/txindex.cpp') diff --git a/src/index/txindex.cpp b/src/index/txindex.cpp index 56966021a..82798fbcc 100644 --- a/src/index/txindex.cpp +++ b/src/index/txindex.cpp @@ -30,6 +30,12 @@ TxIndex::TxIndex(std::unique_ptr db) : m_db(std::move(db)), m_synced(false), m_best_block_index(nullptr) {} +TxIndex::~TxIndex() +{ + Interrupt(); + Stop(); +} + bool TxIndex::Init() { LOCK(cs_main); @@ -76,6 +82,11 @@ void TxIndex::ThreadSync() int64_t last_log_time = 0; int64_t last_locator_write_time = 0; while (true) { + if (m_interrupt) { + WriteBestBlock(pindex); + return; + } + { LOCK(cs_main); const CBlockIndex* pindex_next = NextSyncBlock(pindex); @@ -222,6 +233,11 @@ bool TxIndex::FindTx(const uint256& txid, CDiskTxPos& pos) const return m_db->ReadTxPos(txid, pos); } +void TxIndex::Interrupt() +{ + m_interrupt(); +} + void TxIndex::Start() { // Need to register this ValidationInterface before running Init(), so that -- cgit v1.2.3 From f90c3a62f506d1bc0fe26972b312f07152c79b2e Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 8 Dec 2017 11:20:10 -0800 Subject: [index] TxIndex method to wait until caught up. In order to preserve getrawtransaction RPC behavior, there needs to be a way for a thread to ensure the transaction index is in sync with the current state of the blockchain. --- src/index/txindex.cpp | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) (limited to 'src/index/txindex.cpp') diff --git a/src/index/txindex.cpp b/src/index/txindex.cpp index 82798fbcc..484526a6d 100644 --- a/src/index/txindex.cpp +++ b/src/index/txindex.cpp @@ -228,6 +228,30 @@ void TxIndex::SetBestChain(const CBlockLocator& locator) } } +bool TxIndex::BlockUntilSyncedToCurrentChain() +{ + AssertLockNotHeld(cs_main); + + if (!m_synced) { + return false; + } + + { + // Skip the queue-draining stuff if we know we're caught up with + // chainActive.Tip(). + LOCK(cs_main); + const CBlockIndex* chain_tip = chainActive.Tip(); + const CBlockIndex* best_block_index = m_best_block_index.load(); + if (best_block_index->GetAncestor(chain_tip->nHeight) == chain_tip) { + return true; + } + } + + LogPrintf("%s: txindex is catching up on block notifications\n", __func__); + SyncWithValidationInterfaceQueue(); + return true; +} + bool TxIndex::FindTx(const uint256& txid, CDiskTxPos& pos) const { return m_db->ReadTxPos(txid, pos); -- cgit v1.2.3 From 8181db88f6e0ed96654951e18b1558cd8f78765b Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 8 Dec 2017 11:29:59 -0800 Subject: [init] Initialize and start TxIndex in init code. --- src/index/txindex.cpp | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/index/txindex.cpp') diff --git a/src/index/txindex.cpp b/src/index/txindex.cpp index 484526a6d..7992d8533 100644 --- a/src/index/txindex.cpp +++ b/src/index/txindex.cpp @@ -14,6 +14,8 @@ constexpr int64_t SYNC_LOG_INTERVAL = 30; // seconds constexpr int64_t SYNC_LOCATOR_WRITE_INTERVAL = 30; // seconds +std::unique_ptr g_txindex; + template static void FatalError(const char* fmt, const Args&... args) { -- cgit v1.2.3 From a03f804f2aa0261ed3a47103dfe989ebd9302480 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 30 Mar 2018 00:39:08 -0700 Subject: [index] Move disk IO logic from GetTransaction to TxIndex::FindTx. --- src/index/txindex.cpp | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) (limited to 'src/index/txindex.cpp') diff --git a/src/index/txindex.cpp b/src/index/txindex.cpp index 7992d8533..2a661f033 100644 --- a/src/index/txindex.cpp +++ b/src/index/txindex.cpp @@ -254,9 +254,30 @@ bool TxIndex::BlockUntilSyncedToCurrentChain() return true; } -bool TxIndex::FindTx(const uint256& txid, CDiskTxPos& pos) const +bool TxIndex::FindTx(const uint256& tx_hash, uint256& block_hash, CTransactionRef& tx) const { - return m_db->ReadTxPos(txid, pos); + CDiskTxPos postx; + if (!m_db->ReadTxPos(tx_hash, postx)) { + return false; + } + + CAutoFile file(OpenBlockFile(postx, true), SER_DISK, CLIENT_VERSION); + if (file.IsNull()) { + return error("%s: OpenBlockFile failed", __func__); + } + CBlockHeader header; + try { + file >> header; + fseek(file.Get(), postx.nTxOffset, SEEK_CUR); + file >> tx; + } catch (const std::exception& e) { + return error("%s: Deserialize or I/O error - %s", __func__, e.what()); + } + if (tx->GetHash() != tx_hash) { + return error("%s: txid mismatch", __func__); + } + block_hash = header.GetHash(); + return true; } void TxIndex::Interrupt() -- cgit v1.2.3