diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-24 13:53:54 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-24 13:53:54 +0200 |
| commit | 1c0ddc112a6c18d411f1f3ae6d236ecc2bedcfaa (patch) | |
| tree | cfafeecb830a44a6d0870a217edabcc62d37669c /src | |
| parent | remove obsolete code (diff) | |
| download | zen-1c0ddc112a6c18d411f1f3ae6d236ecc2bedcfaa.tar.xz zen-1c0ddc112a6c18d411f1f3ae6d236ecc2bedcfaa.zip | |
iterate cas chunks (#59)
- Improvement: Reworked GetChunkInfos in oplog store to reduce disk thrashing and improve performance
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 55 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 3 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 205 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 27 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 24 | ||||
| -rw-r--r-- | src/zenstore/cas.h | 3 | ||||
| -rw-r--r-- | src/zenstore/cidstore.cpp | 15 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 53 | ||||
| -rw-r--r-- | src/zenstore/compactcas.h | 3 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 31 | ||||
| -rw-r--r-- | src/zenstore/filecas.h | 9 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 6 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cidstore.h | 4 |
13 files changed, 298 insertions, 140 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 874715818..d177b0b2b 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -885,6 +885,14 @@ ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash) return Chunk; } +bool +ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool) +{ + return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool); +} + IoBuffer ProjectStore::Oplog::FindChunk(const Oid& ChunkId) { @@ -2769,36 +2777,27 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, } WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); // GetSyncWorkerPool(); - Latch WorkLatch(1); - - for (size_t Index = 0; Index < Hashes.size(); Index++) - { - WorkLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&WorkLatch, FoundLog, WantsSizeField, WantsRawSizeField, &Sizes, &RawSizes, Index, RawHash = Hashes[Index]]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (IoBuffer Chunk = FoundLog->GetChunkByRawHash(RawHash)) + (void)FoundLog->IterateChunks( + Hashes, + [&](size_t Index, const IoBuffer& Chunk) -> bool { + uint64_t Size = Chunk.GetSize(); + if (WantsRawSizeField) + { + uint64_t RawSize = Size; + if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) { - uint64_t Size = Chunk.GetSize(); - if (WantsRawSizeField) - { - uint64_t RawSize = Size; - if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) - { - IoHash __; - (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize); - } - RawSizes[Index] = RawSize; - } - if (WantsSizeField) - { - Sizes[Index] = Size; - } + IoHash __; + (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize); } - }); - } - WorkLatch.CountDown(); - WorkLatch.Wait(); + RawSizes[Index] = RawSize; + } + if (WantsSizeField) + { + Sizes[Index] = Size; + } + return true; + }, + &WorkerPool); } CbObjectWriter Response; diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 269fe7336..9bae0382b 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -104,6 +104,9 @@ public: IoBuffer FindChunk(const Oid& ChunkId); IoBuffer GetChunkByRawHash(const IoHash& RawHash); + bool IterateChunks(std::span<IoHash> RawHashes, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool); inline static const uint32_t kInvalidOp = ~0u; diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 8ef9e842f..ee58e6c5d 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -8,6 +8,7 @@ #include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zencore/workthreadpool.h> #include <algorithm> #include <unordered_map> @@ -22,7 +23,6 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <zencore/compactbinarybuilder.h> # include <zencore/testing.h> # include <zencore/testutils.h> -# include <zencore/workthreadpool.h> # include <random> #endif @@ -156,7 +156,7 @@ BlockStoreFile::IsOpen() const return !!m_IoBuffer; } -constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024; +constexpr uint64_t IterateSmallChunkWindowSize = 2 * 1024 * 1024; BlockStore::BlockStore() { @@ -1039,131 +1039,167 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, void BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, - const IterateChunksSmallSizeCallback& SmallSizeCallback, - const IterateChunksLargeSizeCallback& LargeSizeCallback) + const IterateChunksSmallSizeCallback& AsyncSmallSizeCallback, + const IterateChunksLargeSizeCallback& AsyncLargeSizeCallback, + WorkerThreadPool* OptionalWorkerPool) { ZEN_TRACE_CPU("BlockStore::IterateChunks"); + Stopwatch Timer; + auto _ = MakeGuard([&]() { + ZEN_INFO("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath); - std::vector<size_t> LocationIndexes; - LocationIndexes.reserve(ChunkLocations.size()); + std::unordered_map<uint32_t, size_t> BlockIndexToBlockChunks; + std::vector<std::pair<uint32_t, std::vector<size_t>>> BlocksChunks; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) { - LocationIndexes.push_back(ChunkIndex); - } - std::sort(LocationIndexes.begin(), LocationIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { - const BlockStoreLocation& LocationA = ChunkLocations[IndexA]; - const BlockStoreLocation& LocationB = ChunkLocations[IndexB]; - if (LocationA.BlockIndex < LocationB.BlockIndex) + const BlockStoreLocation& Location = ChunkLocations[ChunkIndex]; + if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end()) { - return true; + BlocksChunks[It->second].second.push_back(ChunkIndex); } - else if (LocationA.BlockIndex > LocationB.BlockIndex) + else { - return false; + BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size())); + BlocksChunks.push_back(std::make_pair(Location.BlockIndex, std::vector<size_t>({ChunkIndex}))); } - return LocationA.Offset < LocationB.Offset; - }); - - IoBuffer ReadBuffer{ScrubSmallChunkWindowSize}; - void* BufferBase = ReadBuffer.MutableData(); - - RwLock::SharedLockScope _(m_InsertLock); + } - auto GetNextRange = [&](size_t StartIndexOffset) { + auto GetNextRange = [&ChunkLocations](const std::vector<size_t>& ChunkIndexes, size_t StartIndexOffset) -> size_t { size_t ChunkCount = 0; - size_t StartIndex = LocationIndexes[StartIndexOffset]; + size_t StartIndex = ChunkIndexes[StartIndexOffset]; const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex]; uint64_t StartOffset = StartLocation.Offset; - while (StartIndexOffset + ChunkCount < LocationIndexes.size()) + uint64_t LastEnd = StartOffset + StartLocation.Size; + while (StartIndexOffset + ChunkCount < ChunkIndexes.size()) { - size_t NextIndex = LocationIndexes[StartIndexOffset + ChunkCount]; + size_t NextIndex = ChunkIndexes[StartIndexOffset + ChunkCount]; const BlockStoreLocation& Location = ChunkLocations[NextIndex]; - if (Location.BlockIndex != StartLocation.BlockIndex) + ZEN_ASSERT(Location.BlockIndex == StartLocation.BlockIndex); + if (Location.Offset >= (LastEnd + (4u * 1024u))) { break; } - if ((Location.Offset + Location.Size) - StartOffset > ScrubSmallChunkWindowSize) + if ((Location.Offset + Location.Size) - StartOffset > IterateSmallChunkWindowSize) { break; } + LastEnd = Location.Offset + Location.Size; ++ChunkCount; } return ChunkCount; }; - size_t LocationIndexOffset = 0; - while (LocationIndexOffset < LocationIndexes.size()) - { - size_t ChunkIndex = LocationIndexes[LocationIndexOffset]; - const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; - - const uint32_t BlockIndex = FirstLocation.BlockIndex; - auto FindBlockIt = m_ChunkBlocks.find(BlockIndex); + auto ScanBlock = [&](const uint32_t BlockIndex, std::vector<size_t>& ChunkIndexes) { + std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { + const BlockStoreLocation& LocationA = ChunkLocations[IndexA]; + const BlockStoreLocation& LocationB = ChunkLocations[IndexB]; + if (LocationA.BlockIndex < LocationB.BlockIndex) + { + return true; + } + else if (LocationA.BlockIndex > LocationB.BlockIndex) + { + return false; + } + return LocationA.Offset < LocationB.Offset; + }); + RwLock::SharedLockScope InsertLock(m_InsertLock); + auto FindBlockIt = m_ChunkBlocks.find(BlockIndex); if (FindBlockIt == m_ChunkBlocks.end()) { + InsertLock.ReleaseNow(); + ZEN_LOG_SCOPE("block #{} not available", BlockIndex); - while (ChunkLocations[ChunkIndex].BlockIndex == BlockIndex) + for (size_t ChunkIndex : ChunkIndexes) { - SmallSizeCallback(ChunkIndex, nullptr, 0); - LocationIndexOffset++; - if (LocationIndexOffset == LocationIndexes.size()) - { - break; - } - ChunkIndex = LocationIndexes[LocationIndexOffset]; + AsyncSmallSizeCallback(ChunkIndex, nullptr, 0); } - continue; } - const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second; - ZEN_ASSERT(BlockFile); - - const size_t BlockSize = BlockFile->FileSize(); - const size_t RangeCount = GetNextRange(LocationIndexOffset); - if (RangeCount > 0) + else { - size_t LastChunkIndex = LocationIndexes[LocationIndexOffset + RangeCount - 1]; - const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; - uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; - BlockFile->Read(BufferBase, Size, FirstLocation.Offset); - for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) + const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second; + ZEN_ASSERT(BlockFile); + + IoBuffer ReadBuffer{IterateSmallChunkWindowSize}; + void* BufferBase = ReadBuffer.MutableData(); + + size_t LocationIndexOffset = 0; + while (LocationIndexOffset < ChunkIndexes.size()) { - size_t NextChunkIndex = LocationIndexes[LocationIndexOffset + RangeIndex]; - const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex]; - if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize)) + size_t ChunkIndex = ChunkIndexes[LocationIndexOffset]; + const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; + + const size_t BlockSize = BlockFile->FileSize(); + const size_t RangeCount = GetNextRange(ChunkIndexes, LocationIndexOffset); + if (RangeCount > 0) + { + size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1]; + const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; + uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; + BlockFile->Read(BufferBase, Size, FirstLocation.Offset); + for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) + { + size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex]; + const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex]; + if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize)) + { + ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", + ChunkLocation.Offset, + ChunkLocation.Size, + BlockIndex, + BlockSize); + + AsyncSmallSizeCallback(NextChunkIndex, nullptr, 0); + continue; + } + void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; + AsyncSmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size); + } + LocationIndexOffset += RangeCount; + continue; + } + if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize)) { ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", - ChunkLocation.Offset, - ChunkLocation.Size, + FirstLocation.Offset, + FirstLocation.Size, BlockIndex, BlockSize); - SmallSizeCallback(NextChunkIndex, nullptr, 0); + AsyncSmallSizeCallback(ChunkIndex, nullptr, 0); + LocationIndexOffset++; continue; } - void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; - SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size); + AsyncLargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size); + LocationIndexOffset++; } - LocationIndexOffset += RangeCount; - continue; } - if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize)) - { - ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})", - FirstLocation.Offset, - FirstLocation.Size, - BlockIndex, - BlockSize); + }; - SmallSizeCallback(ChunkIndex, nullptr, 0); - LocationIndexOffset++; - continue; + Latch WorkLatch(1); + for (auto& BlockChunks : BlocksChunks) + { + if (OptionalWorkerPool) + { + WorkLatch.AddCount(1); + OptionalWorkerPool->ScheduleWork([&ChunkLocations, &BlockChunks, &ScanBlock, &WorkLatch]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + ScanBlock(BlockChunks.first, BlockChunks.second); + }); + } + else + { + ScanBlock(BlockChunks.first, BlockChunks.second); } - LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size); - LocationIndexOffset++; } + WorkLatch.CountDown(); + WorkLatch.Wait(); } void @@ -1814,7 +1850,7 @@ TEST_CASE("blockstore.iterate.chunks") auto RootDirectory = TempDir.Path(); BlockStore Store; - Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024); + Store.Initialize(RootDirectory / "store", IterateSmallChunkWindowSize * 2, 1024); IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); CHECK(!BadChunk); @@ -1825,15 +1861,17 @@ TEST_CASE("blockstore.iterate.chunks") BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); Store.Flush(/*ForceNewBlock*/ false); - std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L'); + std::string VeryLargeChunk(IterateSmallChunkWindowSize * 2, 'L'); BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4); BlockStoreLocation BadLocationZeroSize = {.BlockIndex = 0, .Offset = 0, .Size = 0}; BlockStoreLocation BadLocationOutOfRange = {.BlockIndex = 0, - .Offset = ScrubSmallChunkWindowSize, - .Size = ScrubSmallChunkWindowSize * 2}; + .Offset = IterateSmallChunkWindowSize, + .Size = IterateSmallChunkWindowSize * 2}; BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024}; + WorkerThreadPool WorkerPool(4); + Store.IterateChunks( {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex}, [&](size_t ChunkIndex, const void* Data, uint64_t Size) { @@ -1899,7 +1937,8 @@ TEST_CASE("blockstore.iterate.chunks") CHECK(false); break; } - }); + }, + &WorkerPool); } TEST_CASE("blockstore.reclaim.space") diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 4911ff4f8..51d547b3d 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1553,23 +1553,24 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) ZEN_INFO("scrubbing '{}'", m_BucketDir); - Stopwatch Timer; - uint64_t ChunkCount = 0; - uint64_t VerifiedChunkBytes = 0; + Stopwatch Timer; + std::atomic_uint64_t ChunkCount = 0; + std::atomic_uint64_t VerifiedChunkBytes = 0; auto LogStats = MakeGuard([&] { const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs()); ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", m_BucketName, - NiceBytes(VerifiedChunkBytes), + NiceBytes(VerifiedChunkBytes.load()), NiceTimeSpanMs(DurationMs), - ChunkCount, + ChunkCount.load(), NiceRate(VerifiedChunkBytes, DurationMs)); }); + RwLock BadKeysLock; std::vector<IoHash> BadKeys; - auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); }; + auto ReportBadKey = [&](const IoHash& Key) { BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Key); }); }; try { @@ -1596,8 +1597,8 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { Ctx.ThrowIfDeadlineExpired(); - ++ChunkCount; - VerifiedChunkBytes += Loc.Size(); + ChunkCount.fetch_add(1); + VerifiedChunkBytes.fetch_add(Loc.Size()); if (Loc.GetContentType() == ZenContentType::kBinary) { @@ -1645,8 +1646,8 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) } const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void { - ++ChunkCount; - VerifiedChunkBytes += Size; + ChunkCount.fetch_add(1); + VerifiedChunkBytes.fetch_add(Size); const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; if (!Data) { @@ -1678,8 +1679,8 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void { Ctx.ThrowIfDeadlineExpired(); - ++ChunkCount; - VerifiedChunkBytes += Size; + ChunkCount.fetch_add(1); + VerifiedChunkBytes.fetch_add(Size); const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); if (!Buffer) @@ -1697,7 +1698,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) } }; - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk, nullptr); } catch (ScrubDeadlineExpiredException&) { diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index 2fe466028..26ec2b10d 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -57,6 +57,9 @@ public: virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; virtual bool ContainsChunk(const IoHash& ChunkHash) override; virtual void FilterChunks(HashKeySet& InOutChunks) override; + virtual bool IterateChunks(std::span<IoHash> DecompressedIds, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool) override; virtual void Flush() override; virtual void ScrubStorage(ScrubContext& Ctx) override; virtual void GarbageCollect(GcContext& GcCtx) override; @@ -396,6 +399,27 @@ CasImpl::FilterChunks(HashKeySet& InOutChunks) m_LargeStrategy.FilterChunks(InOutChunks); } +bool +CasImpl::IterateChunks(std::span<IoHash> DecompressedIds, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool) +{ + ZEN_TRACE_CPU("CAS::IterateChunks"); + if (!m_SmallStrategy.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool)) + { + return false; + } + if (!m_TinyStrategy.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool)) + { + return false; + } + if (!m_LargeStrategy.IterateChunks(DecompressedIds, AsyncCallback)) + { + return false; + } + return true; +} + void CasImpl::Flush() { diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h index f93724905..169f4d58c 100644 --- a/src/zenstore/cas.h +++ b/src/zenstore/cas.h @@ -46,6 +46,9 @@ public: virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0; virtual bool ContainsChunk(const IoHash& ChunkHash) = 0; virtual void FilterChunks(HashKeySet& InOutChunks) = 0; + virtual bool IterateChunks(std::span<IoHash> DecompressedIds, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool) = 0; virtual void Flush() = 0; virtual void ScrubStorage(ScrubContext& Ctx) = 0; virtual void GarbageCollect(GcContext& GcCtx) = 0; diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp index 68bccd06b..71fd596f4 100644 --- a/src/zenstore/cidstore.cpp +++ b/src/zenstore/cidstore.cpp @@ -117,6 +117,13 @@ struct CidStore::Impl InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return ContainsChunk(Hash); }); } + bool IterateChunks(std::span<IoHash> DecompressedIds, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool) + { + return m_CasStore.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool); + } + void Flush() { m_CasStore.Flush(); } void ScrubStorage(ScrubContext& Ctx) @@ -207,6 +214,14 @@ CidStore::ContainsChunk(const IoHash& DecompressedId) return m_Impl->ContainsChunk(DecompressedId); } +bool +CidStore::IterateChunks(std::span<IoHash> DecompressedIds, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool) +{ + return m_Impl->IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool); +} + void CidStore::FilterChunks(HashKeySet& InOutChunks) { diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index f3c67eb9a..7b11200a5 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -299,6 +299,38 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } +bool +CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool) +{ + std::vector<size_t> FoundChunkIndexes; + std::vector<BlockStoreLocation> FoundChunkLocations; + RwLock::SharedLockScope _(m_LocationMapLock); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++) + { + if (auto KeyIt = m_LocationMap.find(ChunkHashes[ChunkIndex]); KeyIt != m_LocationMap.end()) + { + FoundChunkIndexes.push_back(ChunkIndex); + FoundChunkLocations.push_back(m_Locations[KeyIt->second].Get(m_PayloadAlignment)); + } + } + bool Continue = true; + m_BlockStore.IterateChunks( + FoundChunkLocations, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + if (Data != nullptr) + { + Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size)); + } + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); + }, + OptionalWorkerPool); + return Continue; +} + void CasContainerStrategy::Flush() { @@ -321,8 +353,9 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) ZEN_INFO("scrubbing '{}'", m_BlocksBasePath); + RwLock BadKeysLock; std::vector<IoHash> BadKeys; - uint64_t ChunkCount{0}, ChunkBytes{0}; + std::atomic_uint64_t ChunkCount{0}, ChunkBytes{0}; std::vector<BlockStoreLocation> ChunkLocations; std::vector<IoHash> ChunkIndexToChunkHash; @@ -346,14 +379,14 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) } const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; + ChunkCount.fetch_add(1); + ChunkBytes.fetch_add(Size); const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; if (!Data) { // ChunkLocation out of range of stored blocks - BadKeys.push_back(Hash); + BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); return; } @@ -368,14 +401,14 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) return; } } - BadKeys.push_back(Hash); + BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { Ctx.ThrowIfDeadlineExpired(); - ++ChunkCount; - ChunkBytes += Size; + ChunkCount.fetch_add(1); + ChunkBytes.fetch_add(Size); const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); @@ -391,10 +424,10 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) return; } } - BadKeys.push_back(Hash); + BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); }; - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk, nullptr); } catch (const ScrubDeadlineExpiredException&) { @@ -443,7 +476,7 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) Ctx.ReportBadCidChunks(BadKeys); } - ZEN_INFO("scrubbed {} chunks ({}) in '{}'", ChunkCount, NiceBytes(ChunkBytes), m_RootDirectory / m_ContainerBaseName); + ZEN_INFO("scrubbed {} chunks ({}) in '{}'", ChunkCount.load(), NiceBytes(ChunkBytes.load()), m_RootDirectory / m_ContainerBaseName); } void diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h index eb1d36b31..db6b4c914 100644 --- a/src/zenstore/compactcas.h +++ b/src/zenstore/compactcas.h @@ -56,6 +56,9 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); void FilterChunks(HashKeySet& InOutChunks); + bool IterateChunks(std::span<IoHash> ChunkHashes, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool); void Initialize(const std::filesystem::path& RootDirectory, const std::string_view ContainerBaseName, uint32_t MaxBlockSize, diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 7fbc2b9ad..88d64eb45 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -809,6 +809,37 @@ FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } +bool +FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback) +{ + std::vector<size_t> FoundChunkIndexes; + { + RwLock::SharedLockScope _(m_Lock); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++) + { + if (auto KeyIt = m_Index.find(ChunkHashes[ChunkIndex]); KeyIt != m_Index.end()) + { + FoundChunkIndexes.push_back(ChunkIndex); + } + } + } + bool Continue = true; + for (size_t ChunkIndex : FoundChunkIndexes) + { + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]); + IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); + if (Payload) + { + Continue = Callback(ChunkIndex, std::move(Payload)); + if (!Continue) + { + break; + } + } + } + return Continue; +} + void FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback) { diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h index 70cd4ef5a..06e35de23 100644 --- a/src/zenstore/filecas.h +++ b/src/zenstore/filecas.h @@ -39,10 +39,11 @@ struct FileCasStrategy final : public GcStorage, public GcReferenceStore IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); void FilterChunks(HashKeySet& InOutChunks); - void Flush(); - virtual void ScrubStorage(ScrubContext& ScrubCtx) override; - virtual void CollectGarbage(GcContext& GcCtx) override; - virtual GcStorageSize StorageSize() const override; + bool IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback); + void Flush(); + virtual void ScrubStorage(ScrubContext& ScrubCtx) override; + virtual void CollectGarbage(GcContext& GcCtx) override; + virtual GcStorageSize StorageSize() const override; virtual std::string GetGcName(GcCtx& Ctx) override; virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) override; diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index c28aa8102..7ef2f7baa 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -110,6 +110,7 @@ private: }; class BlockStoreCompactState; +class WorkerThreadPool; class BlockStore { @@ -178,8 +179,9 @@ public: const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; }); void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, - const IterateChunksSmallSizeCallback& SmallSizeCallback, - const IterateChunksLargeSizeCallback& LargeSizeCallback); + const IterateChunksSmallSizeCallback& AsyncSmallSizeCallback, + const IterateChunksLargeSizeCallback& AsyncLargeSizeCallback, + WorkerThreadPool* OptionalWorkerPool); void CompactBlocks( const BlockStoreCompactState& CompactState, diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h index 54f562767..d95fa7cd4 100644 --- a/src/zenstore/include/zenstore/cidstore.h +++ b/src/zenstore/include/zenstore/cidstore.h @@ -18,6 +18,7 @@ class CasStore; class CompressedBuffer; class IoBuffer; class ScrubContext; +class WorkerThreadPool; struct CidStoreSize { @@ -79,6 +80,9 @@ public: std::span<IoHash> RawHashes, InsertMode Mode = InsertMode::kMayBeMovedInPlace); virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + bool IterateChunks(std::span<IoHash> DecompressedIds, + const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool); bool ContainsChunk(const IoHash& DecompressedId); void FilterChunks(HashKeySet& InOutChunks); void Flush(); |