diff options
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/buildstore/buildstore.cpp | 58 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 25 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 128 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 70 |
4 files changed, 147 insertions, 134 deletions
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp index b4891a742..6eb01dfc4 100644 --- a/src/zenstore/buildstore/buildstore.cpp +++ b/src/zenstore/buildstore/buildstore.cpp @@ -9,6 +9,7 @@ #include <zencore/scopeguard.h> #include <zencore/trace.h> #include <zencore/workthreadpool.h> +#include <zenutil/parallelwork.h> #include <zencore/uid.h> #include <zencore/xxhash.h> @@ -22,6 +23,7 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <zencore/compress.h> # include <zencore/testing.h> # include <zencore/testutils.h> +# include <zenutil/workerpools.h> #endif // ZEN_WITH_TESTS namespace zen { @@ -480,11 +482,12 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O if (!MetaLocations.empty()) { - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); m_MetadataBlockStore.IterateChunks( MetaLocations, - [this, OptionalWorkerPool, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock, &WorkLatch]( + [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock]( uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) -> bool { ZEN_UNUSED(BlockIndex); @@ -496,40 +499,31 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O { ZEN_ASSERT(OptionalWorkerPool != nullptr); std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); - WorkLatch.AddCount(1); - try - { - OptionalWorkerPool->ScheduleWork([this, - &Result, - &MetaLocations, - &MetaLocationResultIndexes, - DoOneBlock, - &WorkLatch, - ChunkIndexes = std::move(TmpChunkIndexes)]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &Result, &MetaLocations, &MetaLocationResultIndexes, DoOneBlock, ChunkIndexes = std::move(TmpChunkIndexes)]( + std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } try { - DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result); + if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result)) + { + AbortFlag.store(true); + } } catch (const std::exception& Ex) { ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); } }); - } - catch (const std::exception& Ex) - { - WorkLatch.CountDown(); - ZEN_ERROR("Failed dispatching async work to fetch metadata for {} chunks. Reason: {}", - ChunkIndexes.size(), - Ex.what()); - } - return true; + return !Work.IsAborted(); } }); - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } for (size_t Index = 0; Index < Result.size(); Index++) { @@ -1661,6 +1655,8 @@ TEST_CASE("BuildStore.Metadata") ScopedTemporaryDirectory _; + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); + BuildStoreConfig Config; Config.RootDirectory = _.Path() / "build_store"; @@ -1678,7 +1674,7 @@ TEST_CASE("BuildStore.Metadata") } Store.PutMetadatas(BlobHashes, MetaPayloads); - std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); + std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) { @@ -1689,7 +1685,7 @@ TEST_CASE("BuildStore.Metadata") { GcManager Gc; BuildStore Store(Config, Gc); - std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); + std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) { @@ -1715,7 +1711,7 @@ TEST_CASE("BuildStore.Metadata") Store.PutBlob(CompressedBlobsHashes.back(), Payload); } - std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); for (const auto& MetadataIt : MetadataPayloads) { CHECK(!MetadataIt); @@ -1741,7 +1737,7 @@ TEST_CASE("BuildStore.Metadata") } Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); - std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { @@ -1754,7 +1750,7 @@ TEST_CASE("BuildStore.Metadata") GcManager Gc; BuildStore Store(Config, Gc); - std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { @@ -1783,7 +1779,7 @@ TEST_CASE("BuildStore.Metadata") GcManager Gc; BuildStore Store(Config, Gc); - std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 91bd9cba8..d80da6ea6 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -14,6 +14,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zencore/xxhash.h> +#include <zenutil/parallelwork.h> #include <zenutil/referencemetadata.h> #include <zenutil/workerpools.h> @@ -3936,14 +3937,13 @@ ZenCacheDiskLayer::DiscoverBuckets() RwLock SyncLock; WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); for (auto& BucketPath : FoundBucketDirectories) { - WorkLatch.AddCount(1); - Pool.ScheduleWork([this, &WorkLatch, &SyncLock, BucketPath]() { + Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) { ZEN_MEMSCOPE(GetCacheDiskTag()); - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); const std::string BucketName = PathToUtf8(BucketPath.stem()); try { @@ -3984,8 +3984,7 @@ ZenCacheDiskLayer::DiscoverBuckets() } }); } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } bool @@ -4062,16 +4061,15 @@ ZenCacheDiskLayer::Flush() } { WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); try { for (auto& Bucket : Buckets) { - WorkLatch.AddCount(1); - Pool.ScheduleWork([&WorkLatch, Bucket]() { + Work.ScheduleWork(Pool, [Bucket](std::atomic<bool>&) { ZEN_MEMSCOPE(GetCacheDiskTag()); - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); try { Bucket->Flush(); @@ -4087,11 +4085,8 @@ ZenCacheDiskLayer::Flush() { ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what()); } - WorkLatch.CountDown(); - while (!WorkLatch.Wait(1000)) - { - ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir); - } + Work.Wait(1000, + [&](std::ptrdiff_t Remaining, bool) { ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", Remaining, m_RootDir); }); } } diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 8cf241e34..15bea272b 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -15,6 +15,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenstore/scrubcontext.h> +#include <zenutil/parallelwork.h> #include <gsl/gsl-lite.hpp> @@ -393,64 +394,75 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas LargeSizeLimit); }; - Latch WorkLatch(1); - std::atomic_bool AsyncContinue = true; - bool Continue = m_BlockStore.IterateChunks( - FoundChunkLocations, - [this, - &AsyncContinue, - &WorkLatch, - &AsyncCallback, - LargeSizeLimit, - DoOneBlock, - &FoundChunkIndexes, - &FoundChunkLocations, - OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { - if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) - { - std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([this, - &AsyncContinue, - &WorkLatch, - &AsyncCallback, - LargeSizeLimit, - DoOneBlock, - BlockIndex, - &FoundChunkIndexes, - &FoundChunkLocations, - ChunkIndexes = std::move(TmpChunkIndexes)]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (!AsyncContinue) - { - return; - } - try - { - bool Continue = DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); - if (!Continue) - { - AsyncContinue.store(false); - } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", - m_RootDirectory, - BlockIndex, - Ex.what()); - } - }); - return AsyncContinue.load(); - } - else - { - return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); - } - }); - WorkLatch.CountDown(); - WorkLatch.Wait(); - return AsyncContinue.load() && Continue; + std::atomic<bool> AsyncContinue = true; + { + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); + const bool Continue = m_BlockStore.IterateChunks( + FoundChunkLocations, + [this, + &Work, + &AsyncContinue, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, + &FoundChunkIndexes, + &FoundChunkLocations, + OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { + if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) + { + std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); + Work.ScheduleWork( + *OptionalWorkerPool, + [this, + &AsyncContinue, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, + BlockIndex, + &FoundChunkIndexes, + &FoundChunkLocations, + ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + AsyncContinue.store(false); + } + if (!AsyncContinue) + { + return; + } + try + { + bool Continue = + DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); + if (!Continue) + { + AsyncContinue.store(false); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", + m_RootDirectory, + BlockIndex, + Ex.what()); + AsyncContinue.store(false); + } + }); + return AsyncContinue.load(); + } + else + { + return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); + } + }); + if (!Continue) + { + AsyncContinue.store(false); + } + Work.Wait(); + } + return AsyncContinue.load(); } void diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 4911bffb9..6354edf70 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -20,6 +20,7 @@ #include <zencore/workthreadpool.h> #include <zenstore/gc.h> #include <zenstore/scrubcontext.h> +#include <zenutil/parallelwork.h> #if ZEN_WITH_TESTS # include <zencore/compactbinarybuilder.h> @@ -632,10 +633,11 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, } } } - std::atomic_bool Continue = true; + std::atomic<bool> AsyncContinue = true; if (!FoundChunkIndexes.empty()) { - auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) { + auto ProcessOne = [this, &ChunkHashes, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) { + ZEN_ASSERT(ChunkIndex < ChunkHashes.size()); const IoHash& ChunkHash = ChunkHashes[ChunkIndex]; IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize); if (!AsyncCallback(ChunkIndex, std::move(Payload))) @@ -645,49 +647,57 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, return true; }; - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) { - size_t ChunkIndex = FoundChunkIndexes[Index]; - uint64_t ExpectedSize = FoundChunkExpectedSizes[Index]; - if (!Continue) + if (!AsyncContinue) { break; } + size_t ChunkIndex = FoundChunkIndexes[Index]; + uint64_t ExpectedSize = FoundChunkExpectedSizes[Index]; if (OptionalWorkerPool) { - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &Continue]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (!Continue) - { - return; - } - try - { - if (!ProcessOne(ChunkIndex, ExpectedSize)) + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) { + if (AbortFlag) { - Continue = false; + AsyncContinue.store(false); } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'", - m_RootDirectory, - ChunkHashes[ChunkIndex], - Ex.what()); - } - }); + if (!AsyncContinue) + { + return; + } + try + { + if (!ProcessOne(ChunkIndex, ExpectedSize)) + { + AsyncContinue.store(false); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'", + m_RootDirectory, + ChunkHashes[ChunkIndex], + Ex.what()); + AsyncContinue.store(false); + } + }); } else { - Continue = Continue && ProcessOne(ChunkIndex, ExpectedSize); + if (!ProcessOne(ChunkIndex, ExpectedSize)) + { + AsyncContinue.store(false); + } } } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } - return Continue; + return AsyncContinue.load(); } void |