aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/buildstore/buildstore.cpp58
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp25
-rw-r--r--src/zenstore/compactcas.cpp128
-rw-r--r--src/zenstore/filecas.cpp70
4 files changed, 147 insertions, 134 deletions
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
index b4891a742..6eb01dfc4 100644
--- a/src/zenstore/buildstore/buildstore.cpp
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -9,6 +9,7 @@
#include <zencore/scopeguard.h>
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
+#include <zenutil/parallelwork.h>
#include <zencore/uid.h>
#include <zencore/xxhash.h>
@@ -22,6 +23,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <zencore/compress.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zenutil/workerpools.h>
#endif // ZEN_WITH_TESTS
namespace zen {
@@ -480,11 +482,12 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
if (!MetaLocations.empty())
{
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
m_MetadataBlockStore.IterateChunks(
MetaLocations,
- [this, OptionalWorkerPool, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock, &WorkLatch](
+ [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock](
uint32_t BlockIndex,
std::span<const size_t> ChunkIndexes) -> bool {
ZEN_UNUSED(BlockIndex);
@@ -496,40 +499,31 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
{
ZEN_ASSERT(OptionalWorkerPool != nullptr);
std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
- WorkLatch.AddCount(1);
- try
- {
- OptionalWorkerPool->ScheduleWork([this,
- &Result,
- &MetaLocations,
- &MetaLocationResultIndexes,
- DoOneBlock,
- &WorkLatch,
- ChunkIndexes = std::move(TmpChunkIndexes)]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this, &Result, &MetaLocations, &MetaLocationResultIndexes, DoOneBlock, ChunkIndexes = std::move(TmpChunkIndexes)](
+ std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
try
{
- DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result);
+ if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result))
+ {
+ AbortFlag.store(true);
+ }
}
catch (const std::exception& Ex)
{
ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what());
}
});
- }
- catch (const std::exception& Ex)
- {
- WorkLatch.CountDown();
- ZEN_ERROR("Failed dispatching async work to fetch metadata for {} chunks. Reason: {}",
- ChunkIndexes.size(),
- Ex.what());
- }
- return true;
+ return !Work.IsAborted();
}
});
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
}
for (size_t Index = 0; Index < Result.size(); Index++)
{
@@ -1661,6 +1655,8 @@ TEST_CASE("BuildStore.Metadata")
ScopedTemporaryDirectory _;
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst);
+
BuildStoreConfig Config;
Config.RootDirectory = _.Path() / "build_store";
@@ -1678,7 +1674,7 @@ TEST_CASE("BuildStore.Metadata")
}
Store.PutMetadatas(BlobHashes, MetaPayloads);
- std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr);
+ std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool);
CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
for (size_t I = 0; I < ValidateMetaPayloads.size(); I++)
{
@@ -1689,7 +1685,7 @@ TEST_CASE("BuildStore.Metadata")
{
GcManager Gc;
BuildStore Store(Config, Gc);
- std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr);
+ std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool);
CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
for (size_t I = 0; I < ValidateMetaPayloads.size(); I++)
{
@@ -1715,7 +1711,7 @@ TEST_CASE("BuildStore.Metadata")
Store.PutBlob(CompressedBlobsHashes.back(), Payload);
}
- std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
for (const auto& MetadataIt : MetadataPayloads)
{
CHECK(!MetadataIt);
@@ -1741,7 +1737,7 @@ TEST_CASE("BuildStore.Metadata")
}
Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
- std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
for (size_t I = 0; I < MetadataPayloads.size(); I++)
{
@@ -1754,7 +1750,7 @@ TEST_CASE("BuildStore.Metadata")
GcManager Gc;
BuildStore Store(Config, Gc);
- std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
for (size_t I = 0; I < MetadataPayloads.size(); I++)
{
@@ -1783,7 +1779,7 @@ TEST_CASE("BuildStore.Metadata")
GcManager Gc;
BuildStore Store(Config, Gc);
- std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
for (size_t I = 0; I < MetadataPayloads.size(); I++)
{
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 91bd9cba8..d80da6ea6 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -14,6 +14,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zencore/xxhash.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/referencemetadata.h>
#include <zenutil/workerpools.h>
@@ -3936,14 +3937,13 @@ ZenCacheDiskLayer::DiscoverBuckets()
RwLock SyncLock;
WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst);
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
for (auto& BucketPath : FoundBucketDirectories)
{
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([this, &WorkLatch, &SyncLock, BucketPath]() {
+ Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) {
ZEN_MEMSCOPE(GetCacheDiskTag());
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
const std::string BucketName = PathToUtf8(BucketPath.stem());
try
{
@@ -3984,8 +3984,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
}
});
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
}
bool
@@ -4062,16 +4061,15 @@ ZenCacheDiskLayer::Flush()
}
{
WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst);
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
try
{
for (auto& Bucket : Buckets)
{
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([&WorkLatch, Bucket]() {
+ Work.ScheduleWork(Pool, [Bucket](std::atomic<bool>&) {
ZEN_MEMSCOPE(GetCacheDiskTag());
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
try
{
Bucket->Flush();
@@ -4087,11 +4085,8 @@ ZenCacheDiskLayer::Flush()
{
ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what());
}
- WorkLatch.CountDown();
- while (!WorkLatch.Wait(1000))
- {
- ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir);
- }
+ Work.Wait(1000,
+ [&](std::ptrdiff_t Remaining, bool) { ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", Remaining, m_RootDir); });
}
}
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 8cf241e34..15bea272b 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -15,6 +15,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenstore/scrubcontext.h>
+#include <zenutil/parallelwork.h>
#include <gsl/gsl-lite.hpp>
@@ -393,64 +394,75 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
LargeSizeLimit);
};
- Latch WorkLatch(1);
- std::atomic_bool AsyncContinue = true;
- bool Continue = m_BlockStore.IterateChunks(
- FoundChunkLocations,
- [this,
- &AsyncContinue,
- &WorkLatch,
- &AsyncCallback,
- LargeSizeLimit,
- DoOneBlock,
- &FoundChunkIndexes,
- &FoundChunkLocations,
- OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
- if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
- {
- std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([this,
- &AsyncContinue,
- &WorkLatch,
- &AsyncCallback,
- LargeSizeLimit,
- DoOneBlock,
- BlockIndex,
- &FoundChunkIndexes,
- &FoundChunkLocations,
- ChunkIndexes = std::move(TmpChunkIndexes)]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- if (!AsyncContinue)
- {
- return;
- }
- try
- {
- bool Continue = DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
- if (!Continue)
- {
- AsyncContinue.store(false);
- }
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
- m_RootDirectory,
- BlockIndex,
- Ex.what());
- }
- });
- return AsyncContinue.load();
- }
- else
- {
- return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
- }
- });
- WorkLatch.CountDown();
- WorkLatch.Wait();
- return AsyncContinue.load() && Continue;
+ std::atomic<bool> AsyncContinue = true;
+ {
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ const bool Continue = m_BlockStore.IterateChunks(
+ FoundChunkLocations,
+ [this,
+ &Work,
+ &AsyncContinue,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
+ if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
+ {
+ std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this,
+ &AsyncContinue,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
+ BlockIndex,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ AsyncContinue.store(false);
+ }
+ if (!AsyncContinue)
+ {
+ return;
+ }
+ try
+ {
+ bool Continue =
+ DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
+ if (!Continue)
+ {
+ AsyncContinue.store(false);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
+ m_RootDirectory,
+ BlockIndex,
+ Ex.what());
+ AsyncContinue.store(false);
+ }
+ });
+ return AsyncContinue.load();
+ }
+ else
+ {
+ return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
+ }
+ });
+ if (!Continue)
+ {
+ AsyncContinue.store(false);
+ }
+ Work.Wait();
+ }
+ return AsyncContinue.load();
}
void
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 4911bffb9..6354edf70 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -20,6 +20,7 @@
#include <zencore/workthreadpool.h>
#include <zenstore/gc.h>
#include <zenstore/scrubcontext.h>
+#include <zenutil/parallelwork.h>
#if ZEN_WITH_TESTS
# include <zencore/compactbinarybuilder.h>
@@ -632,10 +633,11 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
}
}
}
- std::atomic_bool Continue = true;
+ std::atomic<bool> AsyncContinue = true;
if (!FoundChunkIndexes.empty())
{
- auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) {
+ auto ProcessOne = [this, &ChunkHashes, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) {
+ ZEN_ASSERT(ChunkIndex < ChunkHashes.size());
const IoHash& ChunkHash = ChunkHashes[ChunkIndex];
IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize);
if (!AsyncCallback(ChunkIndex, std::move(Payload)))
@@ -645,49 +647,57 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
return true;
};
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
{
- size_t ChunkIndex = FoundChunkIndexes[Index];
- uint64_t ExpectedSize = FoundChunkExpectedSizes[Index];
- if (!Continue)
+ if (!AsyncContinue)
{
break;
}
+ size_t ChunkIndex = FoundChunkIndexes[Index];
+ uint64_t ExpectedSize = FoundChunkExpectedSizes[Index];
if (OptionalWorkerPool)
{
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &Continue]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- if (!Continue)
- {
- return;
- }
- try
- {
- if (!ProcessOne(ChunkIndex, ExpectedSize))
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
{
- Continue = false;
+ AsyncContinue.store(false);
}
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'",
- m_RootDirectory,
- ChunkHashes[ChunkIndex],
- Ex.what());
- }
- });
+ if (!AsyncContinue)
+ {
+ return;
+ }
+ try
+ {
+ if (!ProcessOne(ChunkIndex, ExpectedSize))
+ {
+ AsyncContinue.store(false);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'",
+ m_RootDirectory,
+ ChunkHashes[ChunkIndex],
+ Ex.what());
+ AsyncContinue.store(false);
+ }
+ });
}
else
{
- Continue = Continue && ProcessOne(ChunkIndex, ExpectedSize);
+ if (!ProcessOne(ChunkIndex, ExpectedSize))
+ {
+ AsyncContinue.store(false);
+ }
}
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
}
- return Continue;
+ return AsyncContinue.load();
}
void