aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/httpstructuredcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/cache/httpstructuredcache.cpp')
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp27
1 files changed, 19 insertions, 8 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 224cc6678..acb6053d9 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -25,6 +25,7 @@
#include <zenutil/cache/cacherequests.h>
#include <zenutil/cache/rpcrecording.h>
#include <zenutil/jupiter/jupiterclient.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/workerpools.h>
#include "upstream/upstreamcache.h"
@@ -1643,15 +1644,26 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
WorkerThreadPool WorkerPool(ThreadCount);
uint64_t RequestCount = Replayer.GetRequestCount();
Stopwatch Timer;
- auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
- Latch JobLatch(RequestCount);
+ auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
ZEN_INFO("Replaying {} requests", RequestCount);
for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
{
- WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() {
+ if (AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>& AbortFlag) {
IoBuffer Body;
zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body);
+ if (AbortFlag)
+ {
+ return;
+ }
+
if (Body)
{
uint32_t AcceptMagic = 0;
@@ -1692,16 +1704,15 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
}
}
}
- JobLatch.CountDown();
});
}
- while (!JobLatch.Wait(10000))
- {
+ Work.Wait(10000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused);
ZEN_INFO("Replayed {} of {} requests, elapsed {}",
- RequestCount - JobLatch.Remaining(),
+ RequestCount - PendingWork,
RequestCount,
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- }
+ });
}
void