diff options
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 27 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 1 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 16 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 2 |
4 files changed, 46 insertions, 0 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index ef312d800..6078f892e 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -786,6 +786,10 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) { HandleRpcGetCachePayloads(AsyncRequest, RpcRequest); } + else if (Method == "WaitForQuiescence"sv) + { + HandleRpcWaitForQuiescence(AsyncRequest, RpcRequest); + } else { AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); @@ -1197,6 +1201,29 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re } void +HttpStructuredCacheService::HandleRpcWaitForQuiescence(zen::HttpServerRequest& Request, CbObjectView RpcRequest) +{ + ZEN_TRACE_CPU("Z$::RpcWaitForQuiescence"); + + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "WaitForQuiescence"sv); + + if (m_UpstreamCache) + { + m_UpstreamCache->WaitForIdle(); + } + + CbPackage RpcResponse; + CbObjectWriter ResponseObject; + RpcResponse.SetObject(ResponseObject.Save()); + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +void HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) { CbObjectWriter Cbo; diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 9ee7da99b..6b3478e68 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -93,6 +93,7 @@ private: void HandleRpcRequest(zen::HttpServerRequest& Request); void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest); void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleRpcWaitForQuiescence(zen::HttpServerRequest& Request, CbObjectView BatchRequest); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index c9ba59780..1a35f7283 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -1243,6 +1243,22 @@ public: } Status.EndArray(); } + + virtual void WaitForIdle() override + { + UpstreamCacheRecord CacheRecord; + while (m_RunState.IsRunning && m_UpstreamQueue.TryDequeue(CacheRecord)) + { + try + { + ProcessCacheRecord(std::move(CacheRecord)); + } + catch (std::exception& Err) + { + ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Err.what()); + } + } + } private: void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index c463c4996..c53374a6e 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -175,6 +175,8 @@ public: virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; virtual void GetStatus(CbObjectWriter& CbO) = 0; + + virtual void WaitForIdle() = 0; }; std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore); |