aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/cache/structuredcache.cpp27
-rw-r--r--zenserver/cache/structuredcache.h1
-rw-r--r--zenserver/upstream/upstreamcache.cpp16
-rw-r--r--zenserver/upstream/upstreamcache.h2
4 files changed, 46 insertions, 0 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index ef312d800..6078f892e 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -786,6 +786,10 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
{
HandleRpcGetCachePayloads(AsyncRequest, RpcRequest);
}
+ else if (Method == "WaitForQuiescence"sv)
+ {
+ HandleRpcWaitForQuiescence(AsyncRequest, RpcRequest);
+ }
else
{
AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
@@ -1197,6 +1201,29 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
}
void
+HttpStructuredCacheService::HandleRpcWaitForQuiescence(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcWaitForQuiescence");
+
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "WaitForQuiescence"sv);
+
+ if (m_UpstreamCache)
+ {
+ m_UpstreamCache->WaitForIdle();
+ }
+
+ CbPackage RpcResponse;
+ CbObjectWriter ResponseObject;
+ RpcResponse.SetObject(ResponseObject.Save());
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+
+void
HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request)
{
CbObjectWriter Cbo;
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 9ee7da99b..6b3478e68 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -93,6 +93,7 @@ private:
void HandleRpcRequest(zen::HttpServerRequest& Request);
void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleRpcWaitForQuiescence(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index c9ba59780..1a35f7283 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -1243,6 +1243,22 @@ public:
}
Status.EndArray();
}
+
+ virtual void WaitForIdle() override
+ {
+ UpstreamCacheRecord CacheRecord;
+ while (m_RunState.IsRunning && m_UpstreamQueue.TryDequeue(CacheRecord))
+ {
+ try
+ {
+ ProcessCacheRecord(std::move(CacheRecord));
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Err.what());
+ }
+ }
+ }
private:
void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index c463c4996..c53374a6e 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -175,6 +175,8 @@ public:
virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
virtual void GetStatus(CbObjectWriter& CbO) = 0;
+
+ virtual void WaitForIdle() = 0;
};
std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);