aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/buildstore/buildstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-05-19 22:25:58 +0200
committerGitHub Enterprise <[email protected]>2025-05-19 22:25:58 +0200
commit49701314f570da3622f11eb37cc889c7d39d9a93 (patch)
tree6159bfc2ba7974a453ded7a58813134e523e9a62 /src/zenstore/buildstore/buildstore.cpp
parentparallel work handle dispatch exception (#400) (diff)
downloadzen-49701314f570da3622f11eb37cc889c7d39d9a93.tar.xz
zen-49701314f570da3622f11eb37cc889c7d39d9a93.zip
handle exception with batch work (#401)
* use ParallelWork in rpc playback * use ParallelWork in projectstore * use ParallelWork in buildstore * use ParallelWork in cachedisklayer * use ParallelWork in compactcas * use ParallelWork in filecas * don't set abort flag in ParallelWork destructor * add PrepareFileForScatteredWrite for temp files in httpclient * Use PrepareFileForScatteredWrite when stream-decompressing files * be more relaxed when deleting temp files * allow explicit zen-cache when using direct host url without resolving * fix lambda capture when writing loose chunks * no delay when attempting to remove temp files
Diffstat (limited to 'src/zenstore/buildstore/buildstore.cpp')
-rw-r--r--src/zenstore/buildstore/buildstore.cpp58
1 files changed, 27 insertions, 31 deletions
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
index b4891a742..6eb01dfc4 100644
--- a/src/zenstore/buildstore/buildstore.cpp
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -9,6 +9,7 @@
#include <zencore/scopeguard.h>
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
+#include <zenutil/parallelwork.h>
#include <zencore/uid.h>
#include <zencore/xxhash.h>
@@ -22,6 +23,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <zencore/compress.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zenutil/workerpools.h>
#endif // ZEN_WITH_TESTS
namespace zen {
@@ -480,11 +482,12 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
if (!MetaLocations.empty())
{
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
m_MetadataBlockStore.IterateChunks(
MetaLocations,
- [this, OptionalWorkerPool, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock, &WorkLatch](
+ [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock](
uint32_t BlockIndex,
std::span<const size_t> ChunkIndexes) -> bool {
ZEN_UNUSED(BlockIndex);
@@ -496,40 +499,31 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
{
ZEN_ASSERT(OptionalWorkerPool != nullptr);
std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
- WorkLatch.AddCount(1);
- try
- {
- OptionalWorkerPool->ScheduleWork([this,
- &Result,
- &MetaLocations,
- &MetaLocationResultIndexes,
- DoOneBlock,
- &WorkLatch,
- ChunkIndexes = std::move(TmpChunkIndexes)]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this, &Result, &MetaLocations, &MetaLocationResultIndexes, DoOneBlock, ChunkIndexes = std::move(TmpChunkIndexes)](
+ std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
try
{
- DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result);
+ if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result))
+ {
+ AbortFlag.store(true);
+ }
}
catch (const std::exception& Ex)
{
ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what());
}
});
- }
- catch (const std::exception& Ex)
- {
- WorkLatch.CountDown();
- ZEN_ERROR("Failed dispatching async work to fetch metadata for {} chunks. Reason: {}",
- ChunkIndexes.size(),
- Ex.what());
- }
- return true;
+ return !Work.IsAborted();
}
});
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
}
for (size_t Index = 0; Index < Result.size(); Index++)
{
@@ -1661,6 +1655,8 @@ TEST_CASE("BuildStore.Metadata")
ScopedTemporaryDirectory _;
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst);
+
BuildStoreConfig Config;
Config.RootDirectory = _.Path() / "build_store";
@@ -1678,7 +1674,7 @@ TEST_CASE("BuildStore.Metadata")
}
Store.PutMetadatas(BlobHashes, MetaPayloads);
- std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr);
+ std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool);
CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
for (size_t I = 0; I < ValidateMetaPayloads.size(); I++)
{
@@ -1689,7 +1685,7 @@ TEST_CASE("BuildStore.Metadata")
{
GcManager Gc;
BuildStore Store(Config, Gc);
- std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr);
+ std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool);
CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
for (size_t I = 0; I < ValidateMetaPayloads.size(); I++)
{
@@ -1715,7 +1711,7 @@ TEST_CASE("BuildStore.Metadata")
Store.PutBlob(CompressedBlobsHashes.back(), Payload);
}
- std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
for (const auto& MetadataIt : MetadataPayloads)
{
CHECK(!MetadataIt);
@@ -1741,7 +1737,7 @@ TEST_CASE("BuildStore.Metadata")
}
Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
- std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
for (size_t I = 0; I < MetadataPayloads.size(); I++)
{
@@ -1754,7 +1750,7 @@ TEST_CASE("BuildStore.Metadata")
GcManager Gc;
BuildStore Store(Config, Gc);
- std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
for (size_t I = 0; I < MetadataPayloads.size(); I++)
{
@@ -1783,7 +1779,7 @@ TEST_CASE("BuildStore.Metadata")
GcManager Gc;
BuildStore Store(Config, Gc);
- std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
+ std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
for (size_t I = 0; I < MetadataPayloads.size(); I++)
{