aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
authorMatt Peters <[email protected]>2022-01-07 07:11:45 -0700
committerMatt Peters <[email protected]>2022-01-07 07:11:45 -0700
commit6bffc131a4514629dd2192882ecd1a3c763cf2fb (patch)
tree93294018cbe3708ad62eace0c0dbce7137dbd191 /zenserver
parentImplemented IsProcessRunning() on a Macintosh (diff)
downloadzen-6bffc131a4514629dd2192882ecd1a3c763cf2fb.tar.xz
zen-6bffc131a4514629dd2192882ecd1a3c763cf2fb.zip
Add WaitForQuiescence RPC
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/cache/structuredcache.cpp27
-rw-r--r--zenserver/cache/structuredcache.h1
-rw-r--r--zenserver/upstream/upstreamcache.cpp16
-rw-r--r--zenserver/upstream/upstreamcache.h2
4 files changed, 46 insertions, 0 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 1d43d372b..8639ed1c8 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -795,6 +795,10 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
{
HandleRpcGetCachePayloads(AsyncRequest, RpcRequest);
}
+ else if (Method == "WaitForQuiescence"sv)
+ {
+ HandleRpcWaitForQuiescence(AsyncRequest, RpcRequest);
+ }
else
{
AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
@@ -1207,6 +1211,29 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
}
void
+HttpStructuredCacheService::HandleRpcWaitForQuiescence(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcWaitForQuiescence");
+
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "WaitForQuiescence"sv);
+
+ if (m_UpstreamCache)
+ {
+ m_UpstreamCache->WaitForIdle();
+ }
+
+ CbPackage RpcResponse;
+ CbObjectWriter ResponseObject;
+ RpcResponse.SetObject(ResponseObject.Save());
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+
+void
HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request)
{
CbObjectWriter Cbo;
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 9ee7da99b..6b3478e68 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -93,6 +93,7 @@ private:
void HandleRpcRequest(zen::HttpServerRequest& Request);
void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleRpcWaitForQuiescence(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 38179f490..2ed74cdd4 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -1249,6 +1249,22 @@ public:
}
Status.EndArray();
}
+
+ virtual void WaitForIdle() override
+ {
+ UpstreamCacheRecord CacheRecord;
+ while (m_RunState.IsRunning && m_UpstreamQueue.TryDequeue(CacheRecord))
+ {
+ try
+ {
+ ProcessCacheRecord(std::move(CacheRecord));
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Err.what());
+ }
+ }
+ }
private:
void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index c463c4996..c53374a6e 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -175,6 +175,8 @@ public:
virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
virtual void GetStatus(CbObjectWriter& CbO) = 0;
+
+ virtual void WaitForIdle() = 0;
};
std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);