diff options
| author | Dan Engelbrecht <[email protected]> | 2025-05-19 22:25:58 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-05-19 22:25:58 +0200 |
| commit | 49701314f570da3622f11eb37cc889c7d39d9a93 (patch) | |
| tree | 6159bfc2ba7974a453ded7a58813134e523e9a62 /src/zenstore/buildstore/buildstore.cpp | |
| parent | parallel work handle dispatch exception (#400) (diff) | |
| download | zen-49701314f570da3622f11eb37cc889c7d39d9a93.tar.xz zen-49701314f570da3622f11eb37cc889c7d39d9a93.zip | |
handle exception with batch work (#401)
* use ParallelWork in rpc playback
* use ParallelWork in projectstore
* use ParallelWork in buildstore
* use ParallelWork in cachedisklayer
* use ParallelWork in compactcas
* use ParallelWork in filecas
* don't set abort flag in ParallelWork destructor
* add PrepareFileForScatteredWrite for temp files in httpclient
* Use PrepareFileForScatteredWrite when stream-decompressing files
* be more relaxed when deleting temp files
* allow explicit zen-cache when using direct host url without resolving
* fix lambda capture when writing loose chunks
* no delay when attempting to remove temp files
Diffstat (limited to 'src/zenstore/buildstore/buildstore.cpp')
| -rw-r--r-- | src/zenstore/buildstore/buildstore.cpp | 58 |
1 files changed, 27 insertions, 31 deletions
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp index b4891a742..6eb01dfc4 100644 --- a/src/zenstore/buildstore/buildstore.cpp +++ b/src/zenstore/buildstore/buildstore.cpp @@ -9,6 +9,7 @@ #include <zencore/scopeguard.h> #include <zencore/trace.h> #include <zencore/workthreadpool.h> +#include <zenutil/parallelwork.h> #include <zencore/uid.h> #include <zencore/xxhash.h> @@ -22,6 +23,7 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <zencore/compress.h> # include <zencore/testing.h> # include <zencore/testutils.h> +# include <zenutil/workerpools.h> #endif // ZEN_WITH_TESTS namespace zen { @@ -480,11 +482,12 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O if (!MetaLocations.empty()) { - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); m_MetadataBlockStore.IterateChunks( MetaLocations, - [this, OptionalWorkerPool, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock, &WorkLatch]( + [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock]( uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) -> bool { ZEN_UNUSED(BlockIndex); @@ -496,40 +499,31 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O { ZEN_ASSERT(OptionalWorkerPool != nullptr); std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); - WorkLatch.AddCount(1); - try - { - OptionalWorkerPool->ScheduleWork([this, - &Result, - &MetaLocations, - &MetaLocationResultIndexes, - DoOneBlock, - &WorkLatch, - ChunkIndexes = std::move(TmpChunkIndexes)]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &Result, &MetaLocations, &MetaLocationResultIndexes, DoOneBlock, ChunkIndexes = std::move(TmpChunkIndexes)]( + std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } try { - DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result); + if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result)) + { + AbortFlag.store(true); + } } catch (const std::exception& Ex) { ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); } }); - } - catch (const std::exception& Ex) - { - WorkLatch.CountDown(); - ZEN_ERROR("Failed dispatching async work to fetch metadata for {} chunks. Reason: {}", - ChunkIndexes.size(), - Ex.what()); - } - return true; + return !Work.IsAborted(); } }); - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } for (size_t Index = 0; Index < Result.size(); Index++) { @@ -1661,6 +1655,8 @@ TEST_CASE("BuildStore.Metadata") ScopedTemporaryDirectory _; + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); + BuildStoreConfig Config; Config.RootDirectory = _.Path() / "build_store"; @@ -1678,7 +1674,7 @@ TEST_CASE("BuildStore.Metadata") } Store.PutMetadatas(BlobHashes, MetaPayloads); - std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); + std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) { @@ -1689,7 +1685,7 @@ TEST_CASE("BuildStore.Metadata") { GcManager Gc; BuildStore Store(Config, Gc); - std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); + std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) { @@ -1715,7 +1711,7 @@ TEST_CASE("BuildStore.Metadata") Store.PutBlob(CompressedBlobsHashes.back(), Payload); } - std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); for (const auto& MetadataIt : MetadataPayloads) { CHECK(!MetadataIt); @@ -1741,7 +1737,7 @@ TEST_CASE("BuildStore.Metadata") } Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); - std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { @@ -1754,7 +1750,7 @@ TEST_CASE("BuildStore.Metadata") GcManager Gc; BuildStore Store(Config, Gc); - std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { @@ -1783,7 +1779,7 @@ TEST_CASE("BuildStore.Metadata") GcManager Gc; BuildStore Store(Config, Gc); - std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { |