aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-24 13:53:54 +0200
committerGitHub Enterprise <[email protected]>2024-04-24 13:53:54 +0200
commit1c0ddc112a6c18d411f1f3ae6d236ecc2bedcfaa (patch)
treecfafeecb830a44a6d0870a217edabcc62d37669c /src
parentremove obsolete code (diff)
downloadzen-1c0ddc112a6c18d411f1f3ae6d236ecc2bedcfaa.tar.xz
zen-1c0ddc112a6c18d411f1f3ae6d236ecc2bedcfaa.zip
iterate cas chunks (#59)
- Improvement: Reworked GetChunkInfos in oplog store to reduce disk thrashing and improve performance
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp55
-rw-r--r--src/zenserver/projectstore/projectstore.h3
-rw-r--r--src/zenstore/blockstore.cpp205
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp27
-rw-r--r--src/zenstore/cas.cpp24
-rw-r--r--src/zenstore/cas.h3
-rw-r--r--src/zenstore/cidstore.cpp15
-rw-r--r--src/zenstore/compactcas.cpp53
-rw-r--r--src/zenstore/compactcas.h3
-rw-r--r--src/zenstore/filecas.cpp31
-rw-r--r--src/zenstore/filecas.h9
-rw-r--r--src/zenstore/include/zenstore/blockstore.h6
-rw-r--r--src/zenstore/include/zenstore/cidstore.h4
13 files changed, 298 insertions, 140 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 874715818..d177b0b2b 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -885,6 +885,14 @@ ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash)
return Chunk;
}
+bool
+ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool)
+{
+ return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool);
+}
+
IoBuffer
ProjectStore::Oplog::FindChunk(const Oid& ChunkId)
{
@@ -2769,36 +2777,27 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
}
WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); // GetSyncWorkerPool();
- Latch WorkLatch(1);
-
- for (size_t Index = 0; Index < Hashes.size(); Index++)
- {
- WorkLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&WorkLatch, FoundLog, WantsSizeField, WantsRawSizeField, &Sizes, &RawSizes, Index, RawHash = Hashes[Index]]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- if (IoBuffer Chunk = FoundLog->GetChunkByRawHash(RawHash))
+ (void)FoundLog->IterateChunks(
+ Hashes,
+ [&](size_t Index, const IoBuffer& Chunk) -> bool {
+ uint64_t Size = Chunk.GetSize();
+ if (WantsRawSizeField)
+ {
+ uint64_t RawSize = Size;
+ if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
{
- uint64_t Size = Chunk.GetSize();
- if (WantsRawSizeField)
- {
- uint64_t RawSize = Size;
- if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
- {
- IoHash __;
- (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize);
- }
- RawSizes[Index] = RawSize;
- }
- if (WantsSizeField)
- {
- Sizes[Index] = Size;
- }
+ IoHash __;
+ (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize);
}
- });
- }
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ RawSizes[Index] = RawSize;
+ }
+ if (WantsSizeField)
+ {
+ Sizes[Index] = Size;
+ }
+ return true;
+ },
+ &WorkerPool);
}
CbObjectWriter Response;
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 269fe7336..9bae0382b 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -104,6 +104,9 @@ public:
IoBuffer FindChunk(const Oid& ChunkId);
IoBuffer GetChunkByRawHash(const IoHash& RawHash);
+ bool IterateChunks(std::span<IoHash> RawHashes,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool);
inline static const uint32_t kInvalidOp = ~0u;
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 8ef9e842f..ee58e6c5d 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -8,6 +8,7 @@
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
#include <algorithm>
#include <unordered_map>
@@ -22,7 +23,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <zencore/compactbinarybuilder.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
-# include <zencore/workthreadpool.h>
# include <random>
#endif
@@ -156,7 +156,7 @@ BlockStoreFile::IsOpen() const
return !!m_IoBuffer;
}
-constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024;
+constexpr uint64_t IterateSmallChunkWindowSize = 2 * 1024 * 1024;
BlockStore::BlockStore()
{
@@ -1039,131 +1039,167 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
void
BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
- const IterateChunksSmallSizeCallback& SmallSizeCallback,
- const IterateChunksLargeSizeCallback& LargeSizeCallback)
+ const IterateChunksSmallSizeCallback& AsyncSmallSizeCallback,
+ const IterateChunksLargeSizeCallback& AsyncLargeSizeCallback,
+ WorkerThreadPool* OptionalWorkerPool)
{
ZEN_TRACE_CPU("BlockStore::IterateChunks");
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() {
+ ZEN_INFO("Iterated {} chunks from '{}' in {}", ChunkLocations.size(), m_BlocksBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath);
- std::vector<size_t> LocationIndexes;
- LocationIndexes.reserve(ChunkLocations.size());
+ std::unordered_map<uint32_t, size_t> BlockIndexToBlockChunks;
+ std::vector<std::pair<uint32_t, std::vector<size_t>>> BlocksChunks;
+
for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex)
{
- LocationIndexes.push_back(ChunkIndex);
- }
- std::sort(LocationIndexes.begin(), LocationIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool {
- const BlockStoreLocation& LocationA = ChunkLocations[IndexA];
- const BlockStoreLocation& LocationB = ChunkLocations[IndexB];
- if (LocationA.BlockIndex < LocationB.BlockIndex)
+ const BlockStoreLocation& Location = ChunkLocations[ChunkIndex];
+ if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end())
{
- return true;
+ BlocksChunks[It->second].second.push_back(ChunkIndex);
}
- else if (LocationA.BlockIndex > LocationB.BlockIndex)
+ else
{
- return false;
+ BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size()));
+ BlocksChunks.push_back(std::make_pair(Location.BlockIndex, std::vector<size_t>({ChunkIndex})));
}
- return LocationA.Offset < LocationB.Offset;
- });
-
- IoBuffer ReadBuffer{ScrubSmallChunkWindowSize};
- void* BufferBase = ReadBuffer.MutableData();
-
- RwLock::SharedLockScope _(m_InsertLock);
+ }
- auto GetNextRange = [&](size_t StartIndexOffset) {
+ auto GetNextRange = [&ChunkLocations](const std::vector<size_t>& ChunkIndexes, size_t StartIndexOffset) -> size_t {
size_t ChunkCount = 0;
- size_t StartIndex = LocationIndexes[StartIndexOffset];
+ size_t StartIndex = ChunkIndexes[StartIndexOffset];
const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex];
uint64_t StartOffset = StartLocation.Offset;
- while (StartIndexOffset + ChunkCount < LocationIndexes.size())
+ uint64_t LastEnd = StartOffset + StartLocation.Size;
+ while (StartIndexOffset + ChunkCount < ChunkIndexes.size())
{
- size_t NextIndex = LocationIndexes[StartIndexOffset + ChunkCount];
+ size_t NextIndex = ChunkIndexes[StartIndexOffset + ChunkCount];
const BlockStoreLocation& Location = ChunkLocations[NextIndex];
- if (Location.BlockIndex != StartLocation.BlockIndex)
+ ZEN_ASSERT(Location.BlockIndex == StartLocation.BlockIndex);
+ if (Location.Offset >= (LastEnd + (4u * 1024u)))
{
break;
}
- if ((Location.Offset + Location.Size) - StartOffset > ScrubSmallChunkWindowSize)
+ if ((Location.Offset + Location.Size) - StartOffset > IterateSmallChunkWindowSize)
{
break;
}
+ LastEnd = Location.Offset + Location.Size;
++ChunkCount;
}
return ChunkCount;
};
- size_t LocationIndexOffset = 0;
- while (LocationIndexOffset < LocationIndexes.size())
- {
- size_t ChunkIndex = LocationIndexes[LocationIndexOffset];
- const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex];
-
- const uint32_t BlockIndex = FirstLocation.BlockIndex;
- auto FindBlockIt = m_ChunkBlocks.find(BlockIndex);
+ auto ScanBlock = [&](const uint32_t BlockIndex, std::vector<size_t>& ChunkIndexes) {
+ std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool {
+ const BlockStoreLocation& LocationA = ChunkLocations[IndexA];
+ const BlockStoreLocation& LocationB = ChunkLocations[IndexB];
+ if (LocationA.BlockIndex < LocationB.BlockIndex)
+ {
+ return true;
+ }
+ else if (LocationA.BlockIndex > LocationB.BlockIndex)
+ {
+ return false;
+ }
+ return LocationA.Offset < LocationB.Offset;
+ });
+ RwLock::SharedLockScope InsertLock(m_InsertLock);
+ auto FindBlockIt = m_ChunkBlocks.find(BlockIndex);
if (FindBlockIt == m_ChunkBlocks.end())
{
+ InsertLock.ReleaseNow();
+
ZEN_LOG_SCOPE("block #{} not available", BlockIndex);
- while (ChunkLocations[ChunkIndex].BlockIndex == BlockIndex)
+ for (size_t ChunkIndex : ChunkIndexes)
{
- SmallSizeCallback(ChunkIndex, nullptr, 0);
- LocationIndexOffset++;
- if (LocationIndexOffset == LocationIndexes.size())
- {
- break;
- }
- ChunkIndex = LocationIndexes[LocationIndexOffset];
+ AsyncSmallSizeCallback(ChunkIndex, nullptr, 0);
}
- continue;
}
- const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second;
- ZEN_ASSERT(BlockFile);
-
- const size_t BlockSize = BlockFile->FileSize();
- const size_t RangeCount = GetNextRange(LocationIndexOffset);
- if (RangeCount > 0)
+ else
{
- size_t LastChunkIndex = LocationIndexes[LocationIndexOffset + RangeCount - 1];
- const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex];
- uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset;
- BlockFile->Read(BufferBase, Size, FirstLocation.Offset);
- for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex)
+ const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second;
+ ZEN_ASSERT(BlockFile);
+
+ IoBuffer ReadBuffer{IterateSmallChunkWindowSize};
+ void* BufferBase = ReadBuffer.MutableData();
+
+ size_t LocationIndexOffset = 0;
+ while (LocationIndexOffset < ChunkIndexes.size())
{
- size_t NextChunkIndex = LocationIndexes[LocationIndexOffset + RangeIndex];
- const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex];
- if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize))
+ size_t ChunkIndex = ChunkIndexes[LocationIndexOffset];
+ const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex];
+
+ const size_t BlockSize = BlockFile->FileSize();
+ const size_t RangeCount = GetNextRange(ChunkIndexes, LocationIndexOffset);
+ if (RangeCount > 0)
+ {
+ size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1];
+ const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex];
+ uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset;
+ BlockFile->Read(BufferBase, Size, FirstLocation.Offset);
+ for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex)
+ {
+ size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex];
+ const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex];
+ if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize))
+ {
+ ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
+ ChunkLocation.Offset,
+ ChunkLocation.Size,
+ BlockIndex,
+ BlockSize);
+
+ AsyncSmallSizeCallback(NextChunkIndex, nullptr, 0);
+ continue;
+ }
+ void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset];
+ AsyncSmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size);
+ }
+ LocationIndexOffset += RangeCount;
+ continue;
+ }
+ if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize))
{
ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
- ChunkLocation.Offset,
- ChunkLocation.Size,
+ FirstLocation.Offset,
+ FirstLocation.Size,
BlockIndex,
BlockSize);
- SmallSizeCallback(NextChunkIndex, nullptr, 0);
+ AsyncSmallSizeCallback(ChunkIndex, nullptr, 0);
+ LocationIndexOffset++;
continue;
}
- void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset];
- SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size);
+ AsyncLargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size);
+ LocationIndexOffset++;
}
- LocationIndexOffset += RangeCount;
- continue;
}
- if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize))
- {
- ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
- FirstLocation.Offset,
- FirstLocation.Size,
- BlockIndex,
- BlockSize);
+ };
- SmallSizeCallback(ChunkIndex, nullptr, 0);
- LocationIndexOffset++;
- continue;
+ Latch WorkLatch(1);
+ for (auto& BlockChunks : BlocksChunks)
+ {
+ if (OptionalWorkerPool)
+ {
+ WorkLatch.AddCount(1);
+ OptionalWorkerPool->ScheduleWork([&ChunkLocations, &BlockChunks, &ScanBlock, &WorkLatch]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ ScanBlock(BlockChunks.first, BlockChunks.second);
+ });
+ }
+ else
+ {
+ ScanBlock(BlockChunks.first, BlockChunks.second);
}
- LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size);
- LocationIndexOffset++;
}
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
}
void
@@ -1814,7 +1850,7 @@ TEST_CASE("blockstore.iterate.chunks")
auto RootDirectory = TempDir.Path();
BlockStore Store;
- Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024);
+ Store.Initialize(RootDirectory / "store", IterateSmallChunkWindowSize * 2, 1024);
IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512});
CHECK(!BadChunk);
@@ -1825,15 +1861,17 @@ TEST_CASE("blockstore.iterate.chunks")
BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
Store.Flush(/*ForceNewBlock*/ false);
- std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L');
+ std::string VeryLargeChunk(IterateSmallChunkWindowSize * 2, 'L');
BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4);
BlockStoreLocation BadLocationZeroSize = {.BlockIndex = 0, .Offset = 0, .Size = 0};
BlockStoreLocation BadLocationOutOfRange = {.BlockIndex = 0,
- .Offset = ScrubSmallChunkWindowSize,
- .Size = ScrubSmallChunkWindowSize * 2};
+ .Offset = IterateSmallChunkWindowSize,
+ .Size = IterateSmallChunkWindowSize * 2};
BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024};
+ WorkerThreadPool WorkerPool(4);
+
Store.IterateChunks(
{FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex},
[&](size_t ChunkIndex, const void* Data, uint64_t Size) {
@@ -1899,7 +1937,8 @@ TEST_CASE("blockstore.iterate.chunks")
CHECK(false);
break;
}
- });
+ },
+ &WorkerPool);
}
TEST_CASE("blockstore.reclaim.space")
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 4911ff4f8..51d547b3d 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1553,23 +1553,24 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
ZEN_INFO("scrubbing '{}'", m_BucketDir);
- Stopwatch Timer;
- uint64_t ChunkCount = 0;
- uint64_t VerifiedChunkBytes = 0;
+ Stopwatch Timer;
+ std::atomic_uint64_t ChunkCount = 0;
+ std::atomic_uint64_t VerifiedChunkBytes = 0;
auto LogStats = MakeGuard([&] {
const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs());
ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})",
m_BucketName,
- NiceBytes(VerifiedChunkBytes),
+ NiceBytes(VerifiedChunkBytes.load()),
NiceTimeSpanMs(DurationMs),
- ChunkCount,
+ ChunkCount.load(),
NiceRate(VerifiedChunkBytes, DurationMs));
});
+ RwLock BadKeysLock;
std::vector<IoHash> BadKeys;
- auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); };
+ auto ReportBadKey = [&](const IoHash& Key) { BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Key); }); };
try
{
@@ -1596,8 +1597,8 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
{
Ctx.ThrowIfDeadlineExpired();
- ++ChunkCount;
- VerifiedChunkBytes += Loc.Size();
+ ChunkCount.fetch_add(1);
+ VerifiedChunkBytes.fetch_add(Loc.Size());
if (Loc.GetContentType() == ZenContentType::kBinary)
{
@@ -1645,8 +1646,8 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
}
const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void {
- ++ChunkCount;
- VerifiedChunkBytes += Size;
+ ChunkCount.fetch_add(1);
+ VerifiedChunkBytes.fetch_add(Size);
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
if (!Data)
{
@@ -1678,8 +1679,8 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void {
Ctx.ThrowIfDeadlineExpired();
- ++ChunkCount;
- VerifiedChunkBytes += Size;
+ ChunkCount.fetch_add(1);
+ VerifiedChunkBytes.fetch_add(Size);
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
if (!Buffer)
@@ -1697,7 +1698,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
}
};
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
+ m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk, nullptr);
}
catch (ScrubDeadlineExpiredException&)
{
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index 2fe466028..26ec2b10d 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -57,6 +57,9 @@ public:
virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
virtual bool ContainsChunk(const IoHash& ChunkHash) override;
virtual void FilterChunks(HashKeySet& InOutChunks) override;
+ virtual bool IterateChunks(std::span<IoHash> DecompressedIds,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool) override;
virtual void Flush() override;
virtual void ScrubStorage(ScrubContext& Ctx) override;
virtual void GarbageCollect(GcContext& GcCtx) override;
@@ -396,6 +399,27 @@ CasImpl::FilterChunks(HashKeySet& InOutChunks)
m_LargeStrategy.FilterChunks(InOutChunks);
}
+bool
+CasImpl::IterateChunks(std::span<IoHash> DecompressedIds,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool)
+{
+ ZEN_TRACE_CPU("CAS::IterateChunks");
+ if (!m_SmallStrategy.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool))
+ {
+ return false;
+ }
+ if (!m_TinyStrategy.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool))
+ {
+ return false;
+ }
+ if (!m_LargeStrategy.IterateChunks(DecompressedIds, AsyncCallback))
+ {
+ return false;
+ }
+ return true;
+}
+
void
CasImpl::Flush()
{
diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h
index f93724905..169f4d58c 100644
--- a/src/zenstore/cas.h
+++ b/src/zenstore/cas.h
@@ -46,6 +46,9 @@ public:
virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0;
virtual bool ContainsChunk(const IoHash& ChunkHash) = 0;
virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
+ virtual bool IterateChunks(std::span<IoHash> DecompressedIds,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool) = 0;
virtual void Flush() = 0;
virtual void ScrubStorage(ScrubContext& Ctx) = 0;
virtual void GarbageCollect(GcContext& GcCtx) = 0;
diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp
index 68bccd06b..71fd596f4 100644
--- a/src/zenstore/cidstore.cpp
+++ b/src/zenstore/cidstore.cpp
@@ -117,6 +117,13 @@ struct CidStore::Impl
InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return ContainsChunk(Hash); });
}
+ bool IterateChunks(std::span<IoHash> DecompressedIds,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool)
+ {
+ return m_CasStore.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool);
+ }
+
void Flush() { m_CasStore.Flush(); }
void ScrubStorage(ScrubContext& Ctx)
@@ -207,6 +214,14 @@ CidStore::ContainsChunk(const IoHash& DecompressedId)
return m_Impl->ContainsChunk(DecompressedId);
}
+bool
+CidStore::IterateChunks(std::span<IoHash> DecompressedIds,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool)
+{
+ return m_Impl->IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool);
+}
+
void
CidStore::FilterChunks(HashKeySet& InOutChunks)
{
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index f3c67eb9a..7b11200a5 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -299,6 +299,38 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
}
+bool
+CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool)
+{
+ std::vector<size_t> FoundChunkIndexes;
+ std::vector<BlockStoreLocation> FoundChunkLocations;
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++)
+ {
+ if (auto KeyIt = m_LocationMap.find(ChunkHashes[ChunkIndex]); KeyIt != m_LocationMap.end())
+ {
+ FoundChunkIndexes.push_back(ChunkIndex);
+ FoundChunkLocations.push_back(m_Locations[KeyIt->second].Get(m_PayloadAlignment));
+ }
+ }
+ bool Continue = true;
+ m_BlockStore.IterateChunks(
+ FoundChunkLocations,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ if (Data != nullptr)
+ {
+ Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size));
+ }
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ Continue = Continue && AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size));
+ },
+ OptionalWorkerPool);
+ return Continue;
+}
+
void
CasContainerStrategy::Flush()
{
@@ -321,8 +353,9 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
ZEN_INFO("scrubbing '{}'", m_BlocksBasePath);
+ RwLock BadKeysLock;
std::vector<IoHash> BadKeys;
- uint64_t ChunkCount{0}, ChunkBytes{0};
+ std::atomic_uint64_t ChunkCount{0}, ChunkBytes{0};
std::vector<BlockStoreLocation> ChunkLocations;
std::vector<IoHash> ChunkIndexToChunkHash;
@@ -346,14 +379,14 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
}
const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
+ ChunkCount.fetch_add(1);
+ ChunkBytes.fetch_add(Size);
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
if (!Data)
{
// ChunkLocation out of range of stored blocks
- BadKeys.push_back(Hash);
+ BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
return;
}
@@ -368,14 +401,14 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
return;
}
}
- BadKeys.push_back(Hash);
+ BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
};
const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
Ctx.ThrowIfDeadlineExpired();
- ++ChunkCount;
- ChunkBytes += Size;
+ ChunkCount.fetch_add(1);
+ ChunkBytes.fetch_add(Size);
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
@@ -391,10 +424,10 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
return;
}
}
- BadKeys.push_back(Hash);
+ BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
};
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
+ m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk, nullptr);
}
catch (const ScrubDeadlineExpiredException&)
{
@@ -443,7 +476,7 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
Ctx.ReportBadCidChunks(BadKeys);
}
- ZEN_INFO("scrubbed {} chunks ({}) in '{}'", ChunkCount, NiceBytes(ChunkBytes), m_RootDirectory / m_ContainerBaseName);
+ ZEN_INFO("scrubbed {} chunks ({}) in '{}'", ChunkCount.load(), NiceBytes(ChunkBytes.load()), m_RootDirectory / m_ContainerBaseName);
}
void
diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h
index eb1d36b31..db6b4c914 100644
--- a/src/zenstore/compactcas.h
+++ b/src/zenstore/compactcas.h
@@ -56,6 +56,9 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
void FilterChunks(HashKeySet& InOutChunks);
+ bool IterateChunks(std::span<IoHash> ChunkHashes,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool);
void Initialize(const std::filesystem::path& RootDirectory,
const std::string_view ContainerBaseName,
uint32_t MaxBlockSize,
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 7fbc2b9ad..88d64eb45 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -809,6 +809,37 @@ FileCasStrategy::FilterChunks(HashKeySet& InOutChunks)
InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
}
+bool
+FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback)
+{
+ std::vector<size_t> FoundChunkIndexes;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++)
+ {
+ if (auto KeyIt = m_Index.find(ChunkHashes[ChunkIndex]); KeyIt != m_Index.end())
+ {
+ FoundChunkIndexes.push_back(ChunkIndex);
+ }
+ }
+ }
+ bool Continue = true;
+ for (size_t ChunkIndex : FoundChunkIndexes)
+ {
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]);
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
+ if (Payload)
+ {
+ Continue = Callback(ChunkIndex, std::move(Payload));
+ if (!Continue)
+ {
+ break;
+ }
+ }
+ }
+ return Continue;
+}
+
void
FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback)
{
diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h
index 70cd4ef5a..06e35de23 100644
--- a/src/zenstore/filecas.h
+++ b/src/zenstore/filecas.h
@@ -39,10 +39,11 @@ struct FileCasStrategy final : public GcStorage, public GcReferenceStore
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
void FilterChunks(HashKeySet& InOutChunks);
- void Flush();
- virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
- virtual void CollectGarbage(GcContext& GcCtx) override;
- virtual GcStorageSize StorageSize() const override;
+ bool IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& Callback);
+ void Flush();
+ virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
+ virtual void CollectGarbage(GcContext& GcCtx) override;
+ virtual GcStorageSize StorageSize() const override;
virtual std::string GetGcName(GcCtx& Ctx) override;
virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) override;
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index c28aa8102..7ef2f7baa 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -110,6 +110,7 @@ private:
};
class BlockStoreCompactState;
+class WorkerThreadPool;
class BlockStore
{
@@ -178,8 +179,9 @@ public:
const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; });
void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
- const IterateChunksSmallSizeCallback& SmallSizeCallback,
- const IterateChunksLargeSizeCallback& LargeSizeCallback);
+ const IterateChunksSmallSizeCallback& AsyncSmallSizeCallback,
+ const IterateChunksLargeSizeCallback& AsyncLargeSizeCallback,
+ WorkerThreadPool* OptionalWorkerPool);
void CompactBlocks(
const BlockStoreCompactState& CompactState,
diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h
index 54f562767..d95fa7cd4 100644
--- a/src/zenstore/include/zenstore/cidstore.h
+++ b/src/zenstore/include/zenstore/cidstore.h
@@ -18,6 +18,7 @@ class CasStore;
class CompressedBuffer;
class IoBuffer;
class ScrubContext;
+class WorkerThreadPool;
struct CidStoreSize
{
@@ -79,6 +80,9 @@ public:
std::span<IoHash> RawHashes,
InsertMode Mode = InsertMode::kMayBeMovedInPlace);
virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+ bool IterateChunks(std::span<IoHash> DecompressedIds,
+ const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool);
bool ContainsChunk(const IoHash& DecompressedId);
void FilterChunks(HashKeySet& InOutChunks);
void Flush();