aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-05-19 22:25:58 +0200
committerGitHub Enterprise <[email protected]>2025-05-19 22:25:58 +0200
commit49701314f570da3622f11eb37cc889c7d39d9a93 (patch)
tree6159bfc2ba7974a453ded7a58813134e523e9a62 /src/zenserver
parentparallel work handle dispatch exception (#400) (diff)
downloadzen-49701314f570da3622f11eb37cc889c7d39d9a93.tar.xz
zen-49701314f570da3622f11eb37cc889c7d39d9a93.zip
handle exception with batch work (#401)
* use ParallelWork in rpc playback * use ParallelWork in projectstore * use ParallelWork in buildstore * use ParallelWork in cachedisklayer * use ParallelWork in compactcas * use ParallelWork in filecas * don't set abort flag in ParallelWork destructor * add PrepareFileForScatteredWrite for temp files in httpclient * Use PrepareFileForScatteredWrite when stream-decompressing files * be more relaxed when deleting temp files * allow explicit zen-cache when using direct host url without resolving * fix lambda capture when writing loose chunks * no delay when attempting to remove temp files
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp17
-rw-r--r--src/zenserver/projectstore/projectstore.cpp40
2 files changed, 26 insertions, 31 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index b9a9ca380..f7e63433b 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -25,6 +25,7 @@
#include <zenutil/cache/cacherequests.h>
#include <zenutil/cache/rpcrecording.h>
#include <zenutil/jupiter/jupiterclient.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/workerpools.h>
#include "upstream/upstreamcache.h"
@@ -1585,12 +1586,13 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
WorkerThreadPool WorkerPool(ThreadCount);
uint64_t RequestCount = Replayer.GetRequestCount();
Stopwatch Timer;
- auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
- Latch JobLatch(RequestCount);
+ auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
ZEN_INFO("Replaying {} requests", RequestCount);
for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
{
- WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() {
+ Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>&) {
IoBuffer Body;
zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body);
@@ -1634,16 +1636,15 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
}
}
}
- JobLatch.CountDown();
});
}
- while (!JobLatch.Wait(10000))
- {
+ Work.Wait(10000, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted);
ZEN_INFO("Replayed {} of {} requests, elapsed {}",
- RequestCount - JobLatch.Remaining(),
+ RequestCount - PendingWork,
RequestCount,
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- }
+ });
}
void
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 6a55efdb7..7d22da717 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -22,6 +22,7 @@
#include <zenstore/scrubcontext.h>
#include <zenutil/cache/rpcrecording.h>
#include <zenutil/openprocesscache.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/referencemetadata.h>
#include <zenutil/workerpools.h>
@@ -1588,17 +1589,14 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo
}
};
- Latch WorkLatch(1);
-
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
{
if (OptionalWorkerPool)
{
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([&, Index = OpIndex]() {
+ Work.ScheduleWork(*OptionalWorkerPool, [&, Index = OpIndex](std::atomic<bool>&) {
ZEN_MEMSCOPE(GetProjectstoreTag());
-
- auto _ = MakeGuard([&WorkLatch] { WorkLatch.CountDown(); });
ValidateOne(Index);
});
}
@@ -1607,9 +1605,7 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo
ValidateOne(OpIndex);
}
}
-
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
{
// Check if we were deleted while we were checking the references without a lock...
@@ -2106,8 +2102,9 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
}
if (OptionalWorkerPool)
{
- std::atomic_bool Result = true;
- Latch WorkLatch(1);
+ std::atomic_bool Result = true;
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
{
@@ -2115,10 +2112,10 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
{
break;
}
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork(
- [this, &WorkLatch, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result](
+ std::atomic<bool>&) {
if (Result.load() == false)
{
return;
@@ -2162,8 +2159,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
LargeSizeLimit);
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
return Result.load();
}
@@ -3886,13 +3882,12 @@ ProjectStore::Flush()
}
WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst);
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
for (const Ref<Project>& Project : Projects)
{
- WorkLatch.AddCount(1);
- WorkerPool.ScheduleWork([this, &WorkLatch, Project]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) {
try
{
Project->Flush();
@@ -3904,8 +3899,7 @@ ProjectStore::Flush()
});
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
}
void