diff options
| author | Dan Engelbrecht <[email protected]> | 2024-05-17 10:55:56 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-05-17 10:55:56 +0200 |
| commit | baf9891624b758b686e6b1c284013bea08794d69 (patch) | |
| tree | 732f00270fa5264565a8535cea911d034fc12ba9 /src | |
| parent | safer partial requests (#82) (diff) | |
| download | zen-baf9891624b758b686e6b1c284013bea08794d69.tar.xz zen-baf9891624b758b686e6b1c284013bea08794d69.zip | |
refactor BlockStore IterateChunks (#77)
Improvement: Refactored IterateChunks to allow reuse in diskcachelayer and hide public GetBlockFile() function in BlockStore
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 385 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 106 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 73 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 23 |
4 files changed, 308 insertions, 279 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index ee58e6c5d..aef7b348b 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -8,7 +8,6 @@ #include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zencore/trace.h> -#include <zencore/workthreadpool.h> #include <algorithm> #include <unordered_map> @@ -23,6 +22,7 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <zencore/compactbinarybuilder.h> # include <zencore/testing.h> # include <zencore/testutils.h> +# include <zencore/workthreadpool.h> # include <random> #endif @@ -1037,39 +1037,24 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, } } -void -BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, - const IterateChunksSmallSizeCallback& AsyncSmallSizeCallback, - const IterateChunksLargeSizeCallback& AsyncLargeSizeCallback, - WorkerThreadPool* OptionalWorkerPool) +bool +BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, + std::span<const size_t> InChunkIndexes, + const IterateChunksSmallSizeCallback& SmallSizeCallback, + const IterateChunksLargeSizeCallback& LargeSizeCallback) { - ZEN_TRACE_CPU("BlockStore::IterateChunks"); - - Stopwatch Timer; - auto _ = MakeGuard([&]() { - ZEN_INFO("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath); - - std::unordered_map<uint32_t, size_t> BlockIndexToBlockChunks; - std::vector<std::pair<uint32_t, std::vector<size_t>>> BlocksChunks; - - for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) + if (InChunkIndexes.empty()) { - const BlockStoreLocation& Location = ChunkLocations[ChunkIndex]; - if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end()) - { - BlocksChunks[It->second].second.push_back(ChunkIndex); - } - else - { - BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size())); - BlocksChunks.push_back(std::make_pair(Location.BlockIndex, std::vector<size_t>({ChunkIndex}))); - } + return true; } - auto GetNextRange = [&ChunkLocations](const std::vector<size_t>& ChunkIndexes, size_t StartIndexOffset) -> size_t { + uint32_t BlockIndex = ChunkLocations[InChunkIndexes[0]].BlockIndex; + std::vector<size_t> ChunkIndexes(InChunkIndexes.begin(), InChunkIndexes.end()); + std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { + return ChunkLocations[IndexA].Offset < ChunkLocations[IndexB].Offset; + }); + + auto GetNextRange = [&ChunkLocations](std::span<const size_t> ChunkIndexes, size_t StartIndexOffset) -> size_t { size_t ChunkCount = 0; size_t StartIndex = ChunkIndexes[StartIndexOffset]; const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex]; @@ -1094,112 +1079,126 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, return ChunkCount; }; - auto ScanBlock = [&](const uint32_t BlockIndex, std::vector<size_t>& ChunkIndexes) { - std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { - const BlockStoreLocation& LocationA = ChunkLocations[IndexA]; - const BlockStoreLocation& LocationB = ChunkLocations[IndexB]; - if (LocationA.BlockIndex < LocationB.BlockIndex) - { - return true; - } - else if (LocationA.BlockIndex > LocationB.BlockIndex) - { - return false; - } - return LocationA.Offset < LocationB.Offset; - }); - RwLock::SharedLockScope InsertLock(m_InsertLock); - auto FindBlockIt = m_ChunkBlocks.find(BlockIndex); - if (FindBlockIt == m_ChunkBlocks.end()) - { - InsertLock.ReleaseNow(); + RwLock::SharedLockScope InsertLock(m_InsertLock); + auto FindBlockIt = m_ChunkBlocks.find(BlockIndex); + if (FindBlockIt == m_ChunkBlocks.end()) + { + InsertLock.ReleaseNow(); - ZEN_LOG_SCOPE("block #{} not available", BlockIndex); + ZEN_LOG_SCOPE("block #{} not available", BlockIndex); - for (size_t ChunkIndex : ChunkIndexes) + for (size_t ChunkIndex : ChunkIndexes) + { + if (!SmallSizeCallback(ChunkIndex, nullptr, 0)) { - AsyncSmallSizeCallback(ChunkIndex, nullptr, 0); + return false; } } - else - { - const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second; - ZEN_ASSERT(BlockFile); + } + else + { + const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second; + ZEN_ASSERT(BlockFile); - IoBuffer ReadBuffer{IterateSmallChunkWindowSize}; - void* BufferBase = ReadBuffer.MutableData(); + IoBuffer ReadBuffer{IterateSmallChunkWindowSize}; + void* BufferBase = ReadBuffer.MutableData(); - size_t LocationIndexOffset = 0; - while (LocationIndexOffset < ChunkIndexes.size()) - { - size_t ChunkIndex = ChunkIndexes[LocationIndexOffset]; - const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; + size_t LocationIndexOffset = 0; + while (LocationIndexOffset < ChunkIndexes.size()) + { + size_t ChunkIndex = ChunkIndexes[LocationIndexOffset]; + const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; - const size_t BlockSize = BlockFile->FileSize(); - const size_t RangeCount = GetNextRange(ChunkIndexes, LocationIndexOffset); - if (RangeCount > 0) + const size_t BlockSize = BlockFile->FileSize(); + const size_t RangeCount = GetNextRange(ChunkIndexes, LocationIndexOffset); + if (RangeCount > 0) + { + size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1]; + const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; + uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; + BlockFile->Read(BufferBase, Size, FirstLocation.Offset); + for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) { - size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1]; - const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; - uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; - BlockFile->Read(BufferBase, Size, FirstLocation.Offset); - for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) + size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex]; + const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex]; + if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize)) { - size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex]; - const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex]; - if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize)) - { - ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", - ChunkLocation.Offset, - ChunkLocation.Size, - BlockIndex, - BlockSize); - - AsyncSmallSizeCallback(NextChunkIndex, nullptr, 0); - continue; - } - void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; - AsyncSmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size); + ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", + ChunkLocation.Offset, + ChunkLocation.Size, + BlockIndex, + BlockSize); + + SmallSizeCallback(NextChunkIndex, nullptr, 0); + continue; } - LocationIndexOffset += RangeCount; - continue; + void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; + SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size); } - if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize)) - { - ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", - FirstLocation.Offset, - FirstLocation.Size, - BlockIndex, - BlockSize); - - AsyncSmallSizeCallback(ChunkIndex, nullptr, 0); - LocationIndexOffset++; - continue; - } - AsyncLargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size); + LocationIndexOffset += RangeCount; + continue; + } + if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize)) + { + ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", + FirstLocation.Offset, + FirstLocation.Size, + BlockIndex, + BlockSize); + + SmallSizeCallback(ChunkIndex, nullptr, 0); LocationIndexOffset++; + continue; + } + if (!LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size)) + { + return false; } + LocationIndexOffset++; } - }; + } + return true; +} - Latch WorkLatch(1); - for (auto& BlockChunks : BlocksChunks) +bool +BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocations, const IterateChunksCallback& Callback) +{ + ZEN_TRACE_CPU("BlockStore::IterateChunks"); + + Stopwatch Timer; + auto _ = MakeGuard([&]() { + ZEN_INFO("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath); + + std::unordered_map<uint32_t, size_t> BlockIndexToBlockChunks; + std::vector<std::vector<size_t>> BlocksChunks; + + for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) { - if (OptionalWorkerPool) + const BlockStoreLocation& Location = ChunkLocations[ChunkIndex]; + if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end()) { - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([&ChunkLocations, &BlockChunks, &ScanBlock, &WorkLatch]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - ScanBlock(BlockChunks.first, BlockChunks.second); - }); + BlocksChunks[It->second].push_back(ChunkIndex); } else { - ScanBlock(BlockChunks.first, BlockChunks.second); + BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size())); + BlocksChunks.push_back(std::vector<size_t>({ChunkIndex})); } } - WorkLatch.CountDown(); - WorkLatch.Wait(); + + for (auto& BlockChunks : BlocksChunks) + { + ZEN_ASSERT(!BlockChunks.empty()); + uint32_t BlockIndex = ChunkLocations[BlockChunks[0]].BlockIndex; + if (!Callback(BlockIndex, BlockChunks)) + { + return false; + } + } + return true; } void @@ -1513,17 +1512,6 @@ BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint return Path.ToPath(); } -Ref<BlockStoreFile> -BlockStore::GetBlockFile(uint32_t BlockIndex) -{ - RwLock::SharedLockScope _(m_InsertLock); - if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end()) - { - return It->second; - } - return {}; -} - #if ZEN_WITH_TESTS TEST_CASE("blockstore.blockstoredisklocation") @@ -1872,73 +1860,92 @@ TEST_CASE("blockstore.iterate.chunks") WorkerThreadPool WorkerPool(4); - Store.IterateChunks( - {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex}, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - switch (ChunkIndex) - { - case 0: - CHECK(Data); - CHECK(Size == FirstChunkData.size()); - CHECK(std::string((const char*)Data, Size) == FirstChunkData); - break; - case 1: - CHECK(Data); - CHECK(Size == SecondChunkData.size()); - CHECK(std::string((const char*)Data, Size) == SecondChunkData); - break; - case 2: - CHECK(false); - break; - case 3: - CHECK(!Data); - break; - case 4: - CHECK(!Data); - break; - case 5: - CHECK(!Data); - break; - default: - CHECK(false); - break; - } - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - switch (ChunkIndex) - { - case 0: - case 1: - CHECK(false); - break; - case 2: + std::vector<BlockStoreLocation> Locations{FirstChunkLocation, + SecondChunkLocation, + VeryLargeChunkLocation, + BadLocationZeroSize, + BadLocationOutOfRange, + BadBlockIndex}; + Latch WorkLatch(1); + Store.IterateChunks(Locations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { + WorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + bool Continue = Store.IterateBlock( + Locations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { + switch (ChunkIndex) { - CHECK(Size == VeryLargeChunk.size()); - char* Buffer = new char[Size]; - size_t HashOffset = 0; - File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { - memcpy(&Buffer[HashOffset], Data, Size); - HashOffset += Size; - }); - CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); - delete[] Buffer; + case 0: + CHECK(Data); + CHECK(Size == FirstChunkData.size()); + CHECK(std::string((const char*)Data, Size) == FirstChunkData); + break; + case 1: + CHECK(Data); + CHECK(Size == SecondChunkData.size()); + CHECK(std::string((const char*)Data, Size) == SecondChunkData); + break; + case 2: + CHECK(false); + break; + case 3: + CHECK(!Data); + break; + case 4: + CHECK(!Data); + break; + case 5: + CHECK(!Data); + break; + default: + CHECK(false); + break; } - break; - case 3: - CHECK(false); - break; - case 4: - CHECK(false); - break; - case 5: - CHECK(false); - break; - default: - CHECK(false); - break; - } - }, - &WorkerPool); + return true; + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { + switch (ChunkIndex) + { + case 0: + case 1: + CHECK(false); + break; + case 2: + { + CHECK(Size == VeryLargeChunk.size()); + char* Buffer = new char[Size]; + size_t HashOffset = 0; + File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { + memcpy(&Buffer[HashOffset], Data, Size); + HashOffset += Size; + }); + CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); + delete[] Buffer; + } + break; + case 3: + CHECK(false); + break; + case 4: + CHECK(false); + break; + case 5: + CHECK(false); + break; + default: + CHECK(false); + break; + } + return true; + }); + CHECK(Continue); + }); + return true; + }); + WorkLatch.CountDown(); + WorkLatch.Wait(); } TEST_CASE("blockstore.reclaim.space") diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 6cb749b5a..2e307118b 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1754,7 +1754,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) } } - const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void { + const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { ChunkCount.fetch_add(1); VerifiedChunkBytes.fetch_add(Size); const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; @@ -1762,18 +1762,18 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { // ChunkLocation out of range of stored blocks ReportBadKey(Hash); - return; + return true; } if (!Size) { ReportBadKey(Hash); - return; + return true; } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); if (!Buffer) { ReportBadKey(Hash); - return; + return true; } const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); @@ -1781,11 +1781,12 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) if (!ValidateIoBuffer(ContentType, Buffer)) { ReportBadKey(Hash); - return; + return true; } + return true; }; - const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void { + const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { Ctx.ThrowIfDeadlineExpired(); ChunkCount.fetch_add(1); @@ -1795,7 +1796,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) if (!Buffer) { ReportBadKey(Hash); - return; + return true; } const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); @@ -1803,11 +1804,14 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) if (!ValidateIoBuffer(ContentType, Buffer)) { ReportBadKey(Hash); - return; + return true; } + return true; }; - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk, nullptr); + m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { + return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk); + }); } catch (ScrubDeadlineExpiredException&) { @@ -3195,16 +3199,13 @@ public: std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys; { - std::vector<IoHash> InlineKeys; - std::unordered_map<uint32_t, std::size_t> BlockIndexToEntriesPerBlockIndex; - struct InlineEntry - { - uint32_t InlineKeyIndex; - uint32_t Offset; - uint32_t Size; - }; - std::vector<std::vector<InlineEntry>> EntriesPerBlock; + std::vector<IoHash> InlineKeys; + std::vector<BlockStoreLocation> InlineLocations; + std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes; + { + std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes; + RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); for (const auto& Entry : m_CacheBucket.m_Index) { @@ -3231,56 +3232,43 @@ public: } BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment); - InlineEntry UpdateEntry = {.InlineKeyIndex = gsl::narrow<uint32_t>(InlineKeys.size()), - .Offset = gsl::narrow<uint32_t>(ChunkLocation.Offset), - .Size = gsl::narrow<uint32_t>(ChunkLocation.Size)}; + size_t ChunkIndex = InlineLocations.size(); + InlineLocations.push_back(ChunkLocation); InlineKeys.push_back(Key); - - if (auto It = BlockIndexToEntriesPerBlockIndex.find(ChunkLocation.BlockIndex); - It != BlockIndexToEntriesPerBlockIndex.end()) + if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end()) { - EntriesPerBlock[It->second].emplace_back(UpdateEntry); + InlineBlockChunkIndexes[It->second].push_back(ChunkIndex); } else { - BlockIndexToEntriesPerBlockIndex.insert_or_assign(ChunkLocation.BlockIndex, EntriesPerBlock.size()); - EntriesPerBlock.emplace_back(std::vector<InlineEntry>{UpdateEntry}); + BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size()); + InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex}); } } } - for (auto It : BlockIndexToEntriesPerBlockIndex) + for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes) { - uint32_t BlockIndex = It.first; - - Ref<BlockStoreFile> BlockFile = m_CacheBucket.m_BlockStore.GetBlockFile(BlockIndex); - if (BlockFile) - { - size_t EntriesPerBlockIndex = It.second; - std::vector<InlineEntry>& InlineEntries = EntriesPerBlock[EntriesPerBlockIndex]; - - std::sort(InlineEntries.begin(), InlineEntries.end(), [&](const InlineEntry& Lhs, const InlineEntry& Rhs) -> bool { - return Lhs.Offset < Rhs.Offset; + ZEN_ASSERT(!ChunkIndexes.empty()); + + bool Continue = m_CacheBucket.m_BlockStore.IterateBlock( + InlineLocations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + ZEN_UNUSED(ChunkIndex, Size); + GetAttachments(Data); + return !Ctx.IsCancelledFlag.load(); + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + ZEN_UNUSED(ChunkIndex); + GetAttachments(File.GetChunk(Offset, Size).GetData()); + return !Ctx.IsCancelledFlag.load(); }); - uint64_t BlockFileSize = BlockFile->FileSize(); - BasicFileBuffer BlockBuffer(BlockFile->GetBasicFile(), 32768); - for (const InlineEntry& InlineEntry : InlineEntries) - { - if ((InlineEntry.Offset + InlineEntry.Size) <= BlockFileSize) - { - MemoryView ChunkView = BlockBuffer.MakeView(InlineEntry.Size, InlineEntry.Offset); - if (ChunkView.GetSize() == InlineEntry.Size) - { - GetAttachments(ChunkView.GetData()); - } - else - { - IoBuffer Buffer = BlockFile->GetChunk(InlineEntry.Offset, InlineEntry.Size); - GetAttachments(Buffer.GetData()); - } - } - } + if (!Continue && Ctx.IsCancelledFlag.load()) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); + return; } } } @@ -3293,12 +3281,10 @@ public: } IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(It.second, It.first); - if (!Buffer) + if (Buffer) { - continue; + GetAttachments(Buffer.GetData()); } - - GetAttachments(Buffer.GetData()); } } } diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index d6e5efdaa..dd92483b6 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -8,9 +8,11 @@ #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/trace.h> +#include <zencore/workthreadpool.h> #include <zenstore/scrubcontext.h> #include <gsl/gsl-lite.hpp> @@ -315,20 +317,51 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, FoundChunkLocations.push_back(m_Locations[KeyIt->second].Get(m_PayloadAlignment)); } } - bool Continue = true; - m_BlockStore.IterateChunks( - FoundChunkLocations, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - if (Data != nullptr) - { - Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size)); - } - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); - }, - OptionalWorkerPool); - return Continue; + + auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) { + return m_BlockStore.IterateBlock( + FoundChunkLocations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + if (Data == nullptr) + { + return true; + } + return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size)); + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); + }); + }; + + Latch WorkLatch(1); + std::atomic_bool AsyncContinue = true; + bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { + if (OptionalWorkerPool) + { + WorkLatch.AddCount(1); + OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + if (!AsyncContinue) + { + return; + } + bool Continue = DoOneBlock(ChunkIndexes); + if (!Continue) + { + AsyncContinue.store(false); + } + }); + return AsyncContinue.load(); + } + else + { + return DoOneBlock(ChunkIndexes); + } + }); + WorkLatch.CountDown(); + WorkLatch.Wait(); + return AsyncContinue.load() && Continue; } void @@ -387,7 +420,7 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) { // ChunkLocation out of range of stored blocks BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); - return; + return true; } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); @@ -398,10 +431,11 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) if (RawHash == Hash) { // TODO: this should also hash the (decompressed) contents - return; + return true; } } BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); + return true; }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { @@ -421,13 +455,16 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) if (RawHash == Hash) { // TODO: this should also hash the (decompressed) contents - return; + return true; } } BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); + return true; }; - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk, nullptr); + m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { + return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk); + }); } catch (const ScrubDeadlineExpiredException&) { diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 7ef2f7baa..a1e497533 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -110,7 +110,6 @@ private: }; class BlockStoreCompactState; -class WorkerThreadPool; class BlockStore { @@ -130,9 +129,10 @@ public: typedef std::function<void(const MovedChunksArray& MovedChunks, const ChunkIndexArray& RemovedChunks)> ReclaimCallback; typedef std::function<bool(const MovedChunksArray& MovedChunks, uint64_t FreedDiskSpace)> CompactCallback; typedef std::function<uint64_t()> ClaimDiskReserveCallback; - typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback; - typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback; + typedef std::function<bool(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback; + typedef std::function<bool(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback; typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback; + typedef std::function<bool(uint32_t BlockIndex, std::span<const size_t> ChunkIndexes)> IterateChunksCallback; struct BlockUsageInfo { @@ -178,10 +178,12 @@ public: const ReclaimCallback& ChangeCallback = [](const MovedChunksArray&, const ChunkIndexArray&) {}, const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; }); - void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, - const IterateChunksSmallSizeCallback& AsyncSmallSizeCallback, - const IterateChunksLargeSizeCallback& AsyncLargeSizeCallback, - WorkerThreadPool* OptionalWorkerPool); + bool IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocations, const IterateChunksCallback& Callback); + + bool IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, + std::span<const size_t> ChunkIndexes, + const IterateChunksSmallSizeCallback& SmallSizeCallback, + const IterateChunksLargeSizeCallback& LargeSizeCallback); void CompactBlocks( const BlockStoreCompactState& CompactState, @@ -190,14 +192,11 @@ public: const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; }, std::string_view LogPrefix = {}); - static const char* GetBlockFileExtension(); - static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex); - inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); } - Ref<BlockStoreFile> GetBlockFile(uint32_t BlockIndex); - private: + static const char* GetBlockFileExtension(); + static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex); uint32_t GetFreeBlockIndex(uint32_t StartProbeIndex, RwLock::ExclusiveLockScope&, std::filesystem::path& OutBlockPath) const; std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks; |