diff options
Diffstat (limited to 'src/zenserver/cache/httpstructuredcache.cpp')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 71 |
1 files changed, 56 insertions, 15 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 8db96f914..f61fbd8bc 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -338,7 +338,11 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach HttpStructuredCacheService::~HttpStructuredCacheService() { ZEN_INFO("closing structured cache"); - m_RequestRecorder.reset(); + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } m_StatsService.UnregisterHandler("z$", *this); m_StatusService.UnregisterHandler("z$", *this); @@ -615,24 +619,44 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) if (Key == HttpZCacheUtilStartRecording) { - m_RequestRecorder.reset(); HttpServerRequest::QueryParams Params = Request.GetQueryParams(); std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); - m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); + + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + + m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); + m_RequestRecordingEnabled.store(true); + } + ZEN_INFO("cache RPC recording STARTED -> '{}'", RecordPath); Request.WriteResponse(HttpResponseCode::OK); return; } + if (Key == HttpZCacheUtilStopRecording) { - m_RequestRecorder.reset(); + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } + ZEN_INFO("cache RPC recording STOPPED"); Request.WriteResponse(HttpResponseCode::OK); return; } + if (Key == HttpZCacheUtilReplayRecording) { CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; - m_RequestRecorder.reset(); + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); uint32_t ThreadCount = std::thread::hardware_concurrency(); @@ -643,11 +667,18 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) ThreadCount = gsl::narrow<uint32_t>(Value.value()); } } + + ZEN_INFO("initiating cache RPC replay using {} threads, from '{}'", ThreadCount, RecordPath); + std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount); + + ZEN_INFO("cache RPC replay STARTED"); + Request.WriteResponse(HttpResponseCode::OK); return; } + if (Key.starts_with(HttpZCacheDetailsPrefix)) { HandleDetailsRequest(Request); @@ -1776,11 +1807,15 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { uint64_t RequestIndex = ~0ull; - if (m_RequestRecorder) + if (m_RequestRecordingEnabled) { - RequestIndex = m_RequestRecorder->RecordRequest( - {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, - Body); + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) + { + RequestIndex = m_RequestRecorder->RecordRequest( + {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, + Body); + } } uint32_t AcceptMagic = 0; @@ -1816,8 +1851,11 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); if (RequestIndex != ~0ull) { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) + { + m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + } } AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } @@ -1828,10 +1866,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) if (RequestIndex != ~0ull) { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) + { + m_RequestRecorder->RecordResponse(RequestIndex, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } } AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, |