diff options
| -rw-r--r-- | zencore/include/zencore/blockingqueue.h | 20 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 27 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 1 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 16 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 2 |
5 files changed, 66 insertions, 0 deletions
diff --git a/zencore/include/zencore/blockingqueue.h b/zencore/include/zencore/blockingqueue.h index f92df5a54..a80cd52f4 100644 --- a/zencore/include/zencore/blockingqueue.h +++ b/zencore/include/zencore/blockingqueue.h @@ -50,6 +50,26 @@ public: return false; } + bool TryDequeue(T& Item) + { + if (m_CompleteAdding.load()) + { + return false; + } + + std::unique_lock Lock(m_Lock); + if (!m_Queue.empty()) + { + Item = std::move(m_Queue.front()); + m_Queue.pop_front(); + m_Size--; + + return true; + } + + return false; + } + void CompleteAdding() { if (!m_CompleteAdding.load()) diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 1d43d372b..8639ed1c8 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -795,6 +795,10 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) { HandleRpcGetCachePayloads(AsyncRequest, RpcRequest); } + else if (Method == "WaitForQuiescence"sv) + { + HandleRpcWaitForQuiescence(AsyncRequest, RpcRequest); + } else { AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); @@ -1207,6 +1211,29 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re } void +HttpStructuredCacheService::HandleRpcWaitForQuiescence(zen::HttpServerRequest& Request, CbObjectView RpcRequest) +{ + ZEN_TRACE_CPU("Z$::RpcWaitForQuiescence"); + + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "WaitForQuiescence"sv); + + if (m_UpstreamCache) + { + m_UpstreamCache->WaitForIdle(); + } + + CbPackage RpcResponse; + CbObjectWriter ResponseObject; + RpcResponse.SetObject(ResponseObject.Save()); + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +void HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) { CbObjectWriter Cbo; diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 9ee7da99b..6b3478e68 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -93,6 +93,7 @@ private: void HandleRpcRequest(zen::HttpServerRequest& Request); void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest); void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleRpcWaitForQuiescence(zen::HttpServerRequest& Request, CbObjectView BatchRequest); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 38179f490..2ed74cdd4 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -1249,6 +1249,22 @@ public: } Status.EndArray(); } + + virtual void WaitForIdle() override + { + UpstreamCacheRecord CacheRecord; + while (m_RunState.IsRunning && m_UpstreamQueue.TryDequeue(CacheRecord)) + { + try + { + ProcessCacheRecord(std::move(CacheRecord)); + } + catch (std::exception& Err) + { + ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Err.what()); + } + } + } private: void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index c463c4996..c53374a6e 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -175,6 +175,8 @@ public: virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; virtual void GetStatus(CbObjectWriter& CbO) = 0; + + virtual void WaitForIdle() = 0; }; std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore); |