aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-05-02 15:33:58 +0200
committerGitHub Enterprise <[email protected]>2025-05-02 15:33:58 +0200
commita9c83b3c1299692923e2c16b696f5b9e211f5737 (patch)
tree21e9ea21386672d2b870baf3103ebde2b5fbbb76 /src
parentcbobject validation (#377) (diff)
downloadzen-a9c83b3c1299692923e2c16b696f5b9e211f5737.tar.xz
zen-a9c83b3c1299692923e2c16b696f5b9e211f5737.zip
iterate chunks crash fix (#376)
* Bugfix: Add explicit lambda capture in CasContainer::IterateChunks to avoid accessing state data references
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp2
-rw-r--r--src/zenstore/blockstore.cpp31
-rw-r--r--src/zenstore/buildstore/buildstore.cpp76
-rw-r--r--src/zenstore/cache/cacherpc.cpp2
-rw-r--r--src/zenstore/compactcas.cpp281
5 files changed, 315 insertions, 77 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 9aa800434..e91e6ac51 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -2185,7 +2185,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
if (!Payload)
{
- ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[ChunkIndex], FilePath);
+ ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath);
}
if (!AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0))
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 7cc09be15..c58080e6a 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -735,6 +735,8 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
return true;
}
+ ZEN_ASSERT(ChunkLocations.size() >= InChunkIndexes.size());
+
if (LargeSizeLimit == 0)
{
LargeSizeLimit = DefaultIterateSmallChunkWindowSize;
@@ -746,7 +748,10 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
IterateSmallChunkWindowSize = Min((LargeSizeLimit + IterateSmallChunkMaxGapSize) * ChunkLocations.size(), IterateSmallChunkWindowSize);
- uint32_t BlockIndex = ChunkLocations[InChunkIndexes[0]].BlockIndex;
+ const size_t FirstLocationIndex = InChunkIndexes[0];
+ ZEN_ASSERT(FirstLocationIndex < ChunkLocations.size());
+
+ const uint32_t BlockIndex = ChunkLocations[FirstLocationIndex].BlockIndex;
std::vector<size_t> ChunkIndexes(InChunkIndexes.begin(), InChunkIndexes.end());
std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool {
return ChunkLocations[IndexA].Offset < ChunkLocations[IndexB].Offset;
@@ -756,8 +761,9 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
IterateSmallChunkWindowSize,
IterateSmallChunkMaxGapSize,
&ChunkLocations](uint64_t BlockFileSize, std::span<const size_t> ChunkIndexes, size_t StartIndexOffset) -> size_t {
- size_t ChunkCount = 0;
- size_t StartIndex = ChunkIndexes[StartIndexOffset];
+ size_t ChunkCount = 0;
+ size_t StartIndex = ChunkIndexes[StartIndexOffset];
+ ZEN_ASSERT(StartIndex < ChunkLocations.size());
const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex];
uint64_t StartOffset = StartLocation.Offset;
uint64_t LastEnd = StartOffset + StartLocation.Size;
@@ -810,22 +816,26 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
ZEN_ASSERT(BlockFile);
InsertLock.ReleaseNow();
+ const size_t BlockSize = BlockFile->FileSize();
+
IoBuffer ReadBuffer;
void* BufferBase = nullptr;
size_t LocationIndexOffset = 0;
while (LocationIndexOffset < ChunkIndexes.size())
{
- size_t ChunkIndex = ChunkIndexes[LocationIndexOffset];
+ size_t ChunkIndex = ChunkIndexes[LocationIndexOffset];
+ ZEN_ASSERT(ChunkIndex < ChunkLocations.size());
const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex];
+ ZEN_ASSERT(FirstLocation.BlockIndex == BlockIndex);
- const size_t BlockSize = BlockFile->FileSize();
const size_t RangeCount = GetNextRange(BlockSize, ChunkIndexes, LocationIndexOffset);
if (RangeCount > 1)
{
- size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1];
- const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex];
- uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset;
+ size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1];
+ ZEN_ASSERT(LastChunkIndex < ChunkLocations.size());
+ const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex];
+ uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset;
if (ReadBuffer.GetSize() < Size)
{
ReadBuffer = IoBuffer(Min(Size * 2, IterateSmallChunkWindowSize));
@@ -834,8 +844,9 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
BlockFile->Read(BufferBase, Size, FirstLocation.Offset);
for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex)
{
- size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex];
- const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex];
+ size_t NextChunkIndex = ChunkIndexes[LocationIndexOffset + RangeIndex];
+ ZEN_ASSERT(NextChunkIndex < ChunkLocations.size());
+ const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex];
if (ChunkLocation.Size == 0 || ((ChunkLocation.Offset + ChunkLocation.Size) > BlockSize))
{
ZEN_LOG_SCOPE("chunk [{},{}] out of bounds (block #{} file size = {})",
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
index cf518c06f..b4891a742 100644
--- a/src/zenstore/buildstore/buildstore.cpp
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -442,7 +442,10 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
}
}
- auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) {
+ auto DoOneBlock = [this](std::span<const BlockStoreLocation> MetaLocations,
+ std::span<const size_t> MetaLocationResultIndexes,
+ std::span<const size_t> ChunkIndexes,
+ std::vector<IoBuffer>& Result) {
if (ChunkIndexes.size() < 4)
{
for (size_t ChunkIndex : ChunkIndexes)
@@ -459,7 +462,7 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
return m_MetadataBlockStore.IterateBlock(
MetaLocations,
ChunkIndexes,
- [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ [&MetaLocationResultIndexes, &Result](size_t ChunkIndex, const void* Data, uint64_t Size) {
if (Data != nullptr)
{
size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex];
@@ -479,38 +482,51 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
{
Latch WorkLatch(1);
- m_MetadataBlockStore.IterateChunks(MetaLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) -> bool {
- ZEN_UNUSED(BlockIndex);
- if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1)
- {
- return DoOneBlock(ChunkIndexes);
- }
- else
- {
- ZEN_ASSERT(OptionalWorkerPool != nullptr);
- WorkLatch.AddCount(1);
- try
+ m_MetadataBlockStore.IterateChunks(
+ MetaLocations,
+ [this, OptionalWorkerPool, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock, &WorkLatch](
+ uint32_t BlockIndex,
+ std::span<const size_t> ChunkIndexes) -> bool {
+ ZEN_UNUSED(BlockIndex);
+ if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1)
{
- OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- try
- {
- DoOneBlock(ChunkIndexes);
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what());
- }
- });
+ return DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result);
}
- catch (const std::exception& Ex)
+ else
{
- WorkLatch.CountDown();
- ZEN_ERROR("Failed dispatching async work to fetch metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what());
+ ZEN_ASSERT(OptionalWorkerPool != nullptr);
+ std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
+ WorkLatch.AddCount(1);
+ try
+ {
+ OptionalWorkerPool->ScheduleWork([this,
+ &Result,
+ &MetaLocations,
+ &MetaLocationResultIndexes,
+ DoOneBlock,
+ &WorkLatch,
+ ChunkIndexes = std::move(TmpChunkIndexes)]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ try
+ {
+ DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what());
+ }
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ WorkLatch.CountDown();
+ ZEN_ERROR("Failed dispatching async work to fetch metadata for {} chunks. Reason: {}",
+ ChunkIndexes.size(),
+ Ex.what());
+ }
+ return true;
}
- return true;
- }
- });
+ });
WorkLatch.CountDown();
WorkLatch.Wait();
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index bf78dae86..de4b0a37c 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -653,7 +653,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
{
m_CidStore.IterateChunks(
CidHashes,
- [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool {
+ [this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool {
try
{
const size_t ValueIndex = RequestValueIndexes[Index];
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 184251da7..90e77e48a 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -360,7 +360,11 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
return true;
}
- auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) {
+ auto DoOneBlock = [this](const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ uint64_t LargeSizeLimit,
+ std::span<const size_t> FoundChunkIndexes,
+ std::span<const BlockStoreLocation> FoundChunkLocations,
+ std::span<const size_t> ChunkIndexes) {
if (ChunkIndexes.size() < 4)
{
for (size_t ChunkIndex : ChunkIndexes)
@@ -376,7 +380,7 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
return m_BlockStore.IterateBlock(
FoundChunkLocations,
ChunkIndexes,
- [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ [AsyncCallback, FoundChunkIndexes, LargeSizeLimit](size_t ChunkIndex, const void* Data, uint64_t Size) {
if (Data == nullptr)
{
return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer());
@@ -391,39 +395,59 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
Latch WorkLatch(1);
std::atomic_bool AsyncContinue = true;
- bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
- if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
- {
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- if (!AsyncContinue)
- {
- return;
- }
- try
- {
- bool Continue = DoOneBlock(ChunkIndexes);
- if (!Continue)
- {
- AsyncContinue.store(false);
- }
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
- m_RootDirectory,
- BlockIndex,
- Ex.what());
- }
- });
- return AsyncContinue.load();
- }
- else
- {
- return DoOneBlock(ChunkIndexes);
- }
- });
+ bool Continue = m_BlockStore.IterateChunks(
+ FoundChunkLocations,
+ [this,
+ &AsyncContinue,
+ &WorkLatch,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
+ if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
+ {
+ std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
+ WorkLatch.AddCount(1);
+ OptionalWorkerPool->ScheduleWork([this,
+ &AsyncContinue,
+ &WorkLatch,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
+ BlockIndex,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ ChunkIndexes = std::move(TmpChunkIndexes)]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ if (!AsyncContinue)
+ {
+ return;
+ }
+ try
+ {
+ bool Continue = DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
+ if (!Continue)
+ {
+ AsyncContinue.store(false);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
+ m_RootDirectory,
+ BlockIndex,
+ Ex.what());
+ }
+ });
+ return AsyncContinue.load();
+ }
+ else
+ {
+ return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
+ }
+ });
WorkLatch.CountDown();
WorkLatch.Wait();
return AsyncContinue.load() && Continue;
@@ -1573,6 +1597,193 @@ TEST_CASE("compactcas.threadedinsert")
}
}
+TEST_CASE("compactcas.iteratechunks")
+{
+ std::atomic<size_t> WorkCompleted = 0;
+ WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put");
+
+ const uint64_t kChunkSize = 1048 + 395;
+ const size_t kChunkCount = 63840;
+
+ for (uint32_t N = 0; N < 4; N++)
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ ScopedTemporaryDirectory TempDir;
+ Cas.Initialize(TempDir.Path(), "test", 65536 * 128, 8, true);
+
+ CHECK(Cas.IterateChunks(
+ {},
+ [](size_t Index, const IoBuffer& Payload) {
+ ZEN_UNUSED(Index, Payload);
+ return true;
+ },
+ &ThreadPool,
+ 2048u));
+
+ uint64_t ExpectedSize = 0;
+
+ std::vector<IoHash> Hashes;
+ Hashes.reserve(kChunkCount);
+
+ {
+ Latch WorkLatch(1);
+ tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup;
+ ChunkHashesLookup.reserve(kChunkCount);
+ RwLock InsertLock;
+ for (size_t Offset = 0; Offset < kChunkCount;)
+ {
+ size_t BatchCount = Min<size_t>(kChunkCount - Offset, 512u);
+ WorkLatch.AddCount(1);
+ ThreadPool.ScheduleWork(
+ [N, &WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+
+ std::vector<IoBuffer> BatchBlobs;
+ std::vector<IoHash> BatchHashes;
+ BatchBlobs.reserve(BatchCount);
+ BatchHashes.reserve(BatchCount);
+
+ while (BatchBlobs.size() < BatchCount)
+ {
+ IoBuffer Chunk = CreateRandomBlob(
+ N + kChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377));
+ IoHash Hash = IoHash::HashBuffer(Chunk);
+ {
+ RwLock::ExclusiveLockScope __(InsertLock);
+ if (ChunkHashesLookup.contains(Hash))
+ {
+ continue;
+ }
+ ChunkHashesLookup.insert(Hash);
+ ExpectedSize += Chunk.Size();
+ }
+
+ BatchBlobs.emplace_back(std::move(Chunk));
+ BatchHashes.push_back(Hash);
+ }
+
+ Cas.InsertChunks(BatchBlobs, BatchHashes);
+ {
+ RwLock::ExclusiveLockScope __(InsertLock);
+ Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
+ }
+ });
+ Offset += BatchCount;
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ }
+
+ WorkerThreadPool BatchWorkerPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "fetch");
+ {
+ std::vector<std::atomic<bool>> FetchedFlags(Hashes.size());
+ std::atomic<uint64_t> FetchedSize = 0;
+ CHECK(Cas.IterateChunks(
+ Hashes,
+ [&Hashes, &FetchedFlags, &FetchedSize](size_t Index, const IoBuffer& Payload) {
+ CHECK(FetchedFlags[Index].load() == false);
+ FetchedFlags[Index].store(true);
+ const IoHash& Hash = Hashes[Index];
+ CHECK(Hash == IoHash::HashBuffer(Payload));
+ FetchedSize += Payload.GetSize();
+ return true;
+ },
+ &BatchWorkerPool,
+ 2048u));
+ for (const auto& Flag : FetchedFlags)
+ {
+ CHECK(Flag.load());
+ }
+ CHECK(FetchedSize == ExpectedSize);
+ }
+
+ Latch WorkLatch(1);
+ for (size_t I = 0; I < 2; I++)
+ {
+ WorkLatch.AddCount(1);
+ ThreadPool.ScheduleWork([&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ std::vector<IoHash> PartialHashes;
+ PartialHashes.reserve(Hashes.size() / 4);
+ for (size_t Index = 0; Index < Hashes.size(); Index++)
+ {
+ size_t TestIndex = Index + I;
+ if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1))
+ {
+ PartialHashes.push_back(Hashes[Index]);
+ }
+ }
+ std::reverse(PartialHashes.begin(), PartialHashes.end());
+
+ std::vector<IoHash> NoFoundHashes;
+ std::vector<size_t> NoFindIndexes;
+
+ NoFoundHashes.reserve(9);
+ for (size_t J = 0; J < 9; J++)
+ {
+ std::string Data = fmt::format("oh no, we don't exist {}", J + 1);
+ NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length()));
+ }
+
+ NoFindIndexes.reserve(9);
+
+ // Sprinkle in chunks that are not found!
+ auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+
+ std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size());
+ std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size());
+
+ CHECK(Cas.IterateChunks(
+ PartialHashes,
+ [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) {
+ CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index));
+ uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1);
+ CHECK(PreviousCount == 0);
+ FoundFlags[Index] = !!Payload;
+ const IoHash& Hash = PartialHashes[Index];
+ CHECK(Hash == IoHash::HashBuffer(Payload));
+ return true;
+ },
+ &BatchWorkerPool,
+ 2048u));
+
+ for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++)
+ {
+ CHECK(FetchedCounts[FoundIndex].load() <= 1);
+ if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end())
+ {
+ CHECK(FoundFlags[FoundIndex]);
+ }
+ else
+ {
+ CHECK(!FoundFlags[FoundIndex]);
+ }
+ }
+ });
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ }
+}
+
#endif
void