diff options
Diffstat (limited to 'src/zenserver/cache/httpstructuredcache.cpp')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 27 |
1 files changed, 19 insertions, 8 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 224cc6678..acb6053d9 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -25,6 +25,7 @@ #include <zenutil/cache/cacherequests.h> #include <zenutil/cache/rpcrecording.h> #include <zenutil/jupiter/jupiterclient.h> +#include <zenutil/parallelwork.h> #include <zenutil/workerpools.h> #include "upstream/upstreamcache.h" @@ -1643,15 +1644,26 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co WorkerThreadPool WorkerPool(ThreadCount); uint64_t RequestCount = Replayer.GetRequestCount(); Stopwatch Timer; - auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); - Latch JobLatch(RequestCount); + auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { - WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() { + if (AbortFlag) + { + break; + } + Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>& AbortFlag) { IoBuffer Body; zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); + if (AbortFlag) + { + return; + } + if (Body) { uint32_t AcceptMagic = 0; @@ -1692,16 +1704,15 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co } } } - JobLatch.CountDown(); }); } - while (!JobLatch.Wait(10000)) - { + Work.Wait(10000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused); ZEN_INFO("Replayed {} of {} requests, elapsed {}", - RequestCount - JobLatch.Remaining(), + RequestCount - PendingWork, RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - } + }); } void |