diff options
| author | Dan Engelbrecht <[email protected]> | 2025-05-19 22:25:58 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-05-19 22:25:58 +0200 |
| commit | 49701314f570da3622f11eb37cc889c7d39d9a93 (patch) | |
| tree | 6159bfc2ba7974a453ded7a58813134e523e9a62 /src/zenserver | |
| parent | parallel work handle dispatch exception (#400) (diff) | |
| download | zen-49701314f570da3622f11eb37cc889c7d39d9a93.tar.xz zen-49701314f570da3622f11eb37cc889c7d39d9a93.zip | |
handle exception with batch work (#401)
* use ParallelWork in rpc playback
* use ParallelWork in projectstore
* use ParallelWork in buildstore
* use ParallelWork in cachedisklayer
* use ParallelWork in compactcas
* use ParallelWork in filecas
* don't set abort flag in ParallelWork destructor
* add PrepareFileForScatteredWrite for temp files in httpclient
* Use PrepareFileForScatteredWrite when stream-decompressing files
* be more relaxed when deleting temp files
* allow explicit zen-cache when using direct host url without resolving
* fix lambda capture when writing loose chunks
* no delay when attempting to remove temp files
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 17 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 40 |
2 files changed, 26 insertions, 31 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index b9a9ca380..f7e63433b 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -25,6 +25,7 @@ #include <zenutil/cache/cacherequests.h> #include <zenutil/cache/rpcrecording.h> #include <zenutil/jupiter/jupiterclient.h> +#include <zenutil/parallelwork.h> #include <zenutil/workerpools.h> #include "upstream/upstreamcache.h" @@ -1585,12 +1586,13 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co WorkerThreadPool WorkerPool(ThreadCount); uint64_t RequestCount = Replayer.GetRequestCount(); Stopwatch Timer; - auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); - Latch JobLatch(RequestCount); + auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { - WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() { + Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>&) { IoBuffer Body; zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); @@ -1634,16 +1636,15 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co } } } - JobLatch.CountDown(); }); } - while (!JobLatch.Wait(10000)) - { + Work.Wait(10000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted); ZEN_INFO("Replayed {} of {} requests, elapsed {}", - RequestCount - JobLatch.Remaining(), + RequestCount - PendingWork, RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - } + }); } void diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 6a55efdb7..7d22da717 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -22,6 +22,7 @@ #include <zenstore/scrubcontext.h> #include <zenutil/cache/rpcrecording.h> #include <zenutil/openprocesscache.h> +#include <zenutil/parallelwork.h> #include <zenutil/referencemetadata.h> #include <zenutil/workerpools.h> @@ -1588,17 +1589,14 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo } }; - Latch WorkLatch(1); - + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) { if (OptionalWorkerPool) { - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([&, Index = OpIndex]() { + Work.ScheduleWork(*OptionalWorkerPool, [&, Index = OpIndex](std::atomic<bool>&) { ZEN_MEMSCOPE(GetProjectstoreTag()); - - auto _ = MakeGuard([&WorkLatch] { WorkLatch.CountDown(); }); ValidateOne(Index); }); } @@ -1607,9 +1605,7 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo ValidateOne(OpIndex); } } - - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); { // Check if we were deleted while we were checking the references without a lock... @@ -2106,8 +2102,9 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, } if (OptionalWorkerPool) { - std::atomic_bool Result = true; - Latch WorkLatch(1); + std::atomic_bool Result = true; + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) { @@ -2115,10 +2112,10 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, { break; } - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork( - [this, &WorkLatch, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]( + std::atomic<bool>&) { if (Result.load() == false) { return; @@ -2162,8 +2159,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, LargeSizeLimit); } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); return Result.load(); } @@ -3886,13 +3882,12 @@ ProjectStore::Flush() } WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); for (const Ref<Project>& Project : Projects) { - WorkLatch.AddCount(1); - WorkerPool.ScheduleWork([this, &WorkLatch, Project]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) { try { Project->Flush(); @@ -3904,8 +3899,7 @@ ProjectStore::Flush() }); } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } void |