From 49701314f570da3622f11eb37cc889c7d39d9a93 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 19 May 2025 22:25:58 +0200 Subject: handle exception with batch work (#401) * use ParallelWork in rpc playback * use ParallelWork in projectstore * use ParallelWork in buildstore * use ParallelWork in cachedisklayer * use ParallelWork in compactcas * use ParallelWork in filecas * don't set abort flag in ParallelWork destructor * add PrepareFileForScatteredWrite for temp files in httpclient * Use PrepareFileForScatteredWrite when stream-decompressing files * be more relaxed when deleting temp files * allow explicit zen-cache when using direct host url without resolving * fix lambda capture when writing loose chunks * no delay when attempting to remove temp files --- src/zenserver/cache/httpstructuredcache.cpp | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index b9a9ca380..f7e63433b 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include "upstream/upstreamcache.h" @@ -1585,12 +1586,13 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co WorkerThreadPool WorkerPool(ThreadCount); uint64_t RequestCount = Replayer.GetRequestCount(); Stopwatch Timer; - auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); - Latch JobLatch(RequestCount); + auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); + std::atomic AbortFlag; + ParallelWork Work(AbortFlag); ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { - WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() { + Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic&) { IoBuffer Body; zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); @@ -1634,16 +1636,15 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co } } } - JobLatch.CountDown(); }); } - while (!JobLatch.Wait(10000)) - { + Work.Wait(10000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted); ZEN_INFO("Replayed {} of {} requests, elapsed {}", - RequestCount - JobLatch.Remaining(), + RequestCount - PendingWork, RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - } + }); } void -- cgit v1.2.3 From 40b9386054de3c23f77da74eefaa743240d164fd Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 5 Jun 2025 14:40:02 +0200 Subject: pause, resume and abort running builds cmd (#421) - Feature: `zen builds pause`, `zen builds resume` and `zen builds abort` commands to control a running `zen builds` command - `--process-id` the process id to control, if omitted it tries to find a running process using the same executable as itself - Improvement: Process report now indicates if it is pausing or aborting --- src/zenserver/cache/httpstructuredcache.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index f7e63433b..9f2e826d6 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1588,7 +1588,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co Stopwatch Timer; auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); std::atomic AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { @@ -1638,8 +1639,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co } }); } - Work.Wait(10000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted); + Work.Wait(10000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused); ZEN_INFO("Replayed {} of {} requests, elapsed {}", RequestCount - PendingWork, RequestCount, -- cgit v1.2.3 From d000167e12c6dde651ef86be9f67552291ff1b7d Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 16 Jun 2025 13:17:54 +0200 Subject: graceful wait in parallelwork destructor (#438) * exception safety when issuing ParallelWork * add asserts to Latch usage to catch usage errors * extended error messaging and recovery handling in ParallelWork destructor to help find issues --- src/zenserver/cache/httpstructuredcache.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 9f2e826d6..bb0c55618 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1593,10 +1593,19 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { - Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic&) { + if (AbortFlag) + { + break; + } + Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic& AbortFlag) { IoBuffer Body; zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); + if (AbortFlag) + { + return; + } + if (Body) { uint32_t AcceptMagic = 0; -- cgit v1.2.3