aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2022-02-25 14:40:15 +0100
committerPer Larsson <[email protected]>2022-02-25 14:40:15 +0100
commit4c00df1f37b3401321c5de6309613cfddc0166db (patch)
tree35b831871faa6d349513c11644902dbc701dc467 /zenserver/cache/structuredcache.cpp
parentUse AppImage when bundling for Linux to avoid unmet GCC-11 dependencies (diff)
downloadzen-4c00df1f37b3401321c5de6309613cfddc0166db.tar.xz
zen-4c00df1f37b3401321c5de6309613cfddc0166db.zip
Initial attempt of a streaming websocket API.
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp156
1 files changed, 156 insertions, 0 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 8ae531720..499329e94 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -162,6 +162,162 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
}
void
+HttpStructuredCacheService::RegisterHandlers(WebSocketServer& Server)
+{
+ Server.RegisterRequestHandler("GetBinaryCacheValue"sv, *this);
+ Server.RegisterRequestHandler("GetCacheValues"sv, *this);
+}
+
+bool
+HttpStructuredCacheService::HandleRequest(const WebSocketMessage& RequestMessage)
+{
+ CbObjectView Request = RequestMessage.Body().GetObject();
+
+ const auto Method = Request["Method"].AsString();
+ CbObjectView Params = Request["Params"sv].AsObjectView();
+
+ if (Method == "GetBinaryCacheValue"sv)
+ {
+ ZEN_TRACE_CPU("Z$::WS_GetBinaryCacheValue");
+
+ // CachePolicy Policy;
+ CbObjectView KeyObject = Params["Key"sv].AsObjectView();
+ CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash());
+
+ ZenCacheValue CacheValue;
+ const bool InLocalCache = m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue);
+
+ CbPackage Response;
+
+ if (InLocalCache)
+ {
+ m_CacheStats.HitCount++;
+
+ CbAttachment Attachment(SharedBuffer(CacheValue.Value));
+
+ CbObjectWriter ResponseObject;
+ ResponseObject.AddAttachment("Result", Attachment);
+ Response.AddAttachment(std::move(Attachment));
+ Response.SetObject(ResponseObject.Save());
+
+ ZenContentType ContentType = CacheValue.Value.GetContentType();
+
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", Key.Bucket, Key.Hash, NiceBytes(CacheValue.Value.Size()), ToString(ContentType));
+ }
+ else
+ {
+ m_CacheStats.MissCount++;
+
+ CbObjectWriter ResponseObject;
+ ResponseObject << "Error"sv
+ << "Not Found"sv;
+ Response.SetObject(ResponseObject.Save());
+
+ ZEN_DEBUG("MISS - '{}/{}' '{}'", Key.Bucket, Key.Hash, ToString(ZenContentType::kBinary));
+ }
+
+ WebSocketMessage ResponseMessage;
+ ResponseMessage.SetMessageType(WebSocketMessageType::kResponse);
+ ResponseMessage.SetCorrelationId(RequestMessage.CorrelationId());
+ ResponseMessage.SetSocketId(RequestMessage.SocketId());
+ ResponseMessage.SetBody(std::move(Response));
+
+ SocketServer().SendResponse(std::move(ResponseMessage));
+
+ return true;
+ }
+
+ if (Method == "GetCacheValues"sv)
+ {
+ ZEN_TRACE_CPU("Z$::WS_GetCacheValues");
+
+ const std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = DefaultPolicyText.empty() ? CachePolicy::Default : ParseCachePolicy(DefaultPolicyText);
+
+ for (uint32_t RequestIdx = 0; CbFieldView RequestField : Params["Requests"sv])
+ {
+ CbObjectView RequestObject = RequestField.AsObjectView();
+
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+ CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash());
+ std::string_view PolicyText = RequestObject["Policy"sv].AsString();
+ CachePolicy Policy = PolicyText.empty() ? DefaultPolicy : ParseCachePolicy(PolicyText);
+
+ CompressedBuffer Compressed;
+ bool InLocalCache = false;
+
+ if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
+ {
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
+ {
+ Compressed = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
+ InLocalCache = true;
+ }
+ }
+
+ if (Compressed.IsNull() && EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
+ {
+ if (auto UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kCompressedBinary);
+ UpstreamResult.Success)
+ {
+ Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value));
+
+ if (Compressed)
+ {
+ UpstreamResult.Value.SetContentType(ZenContentType::kCompressedBinary);
+ m_CacheStore.Put(Key.Bucket, Key.Hash, ZenCacheValue{UpstreamResult.Value});
+ }
+ }
+ }
+
+ CbPackage Response;
+ CbObjectWriter ResponseObject;
+
+ ResponseObject.BeginObject("Result"sv);
+ ResponseObject.AddInteger("RequestIndex"sv, RequestIdx++);
+
+ const IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
+ const uint64_t RawSize = Compressed.GetRawSize();
+
+ if (Compressed)
+ {
+ ResponseObject.AddHash("RawHash"sv, RawHash);
+
+ if (EnumHasAllFlags(Policy, CachePolicy::SkipData))
+ {
+ ResponseObject.AddInteger("RawSize"sv, RawSize);
+ }
+ else
+ {
+ Response.AddAttachment(CbAttachment(std::move(Compressed)));
+ }
+ }
+
+ ResponseObject.EndObject();
+ Response.SetObject(ResponseObject.Save());
+
+ SendStreamResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId(), std::move(Response));
+
+ if (RawSize > 0)
+ {
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}'", Key.Bucket, Key.Hash, NiceBytes(RawSize), ToString(ZenContentType::kCompressedBinary));
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}'", Key.Bucket, Key.Hash);
+ }
+ }
+
+ SendStreamCompleteResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId());
+
+ return true;
+ }
+
+ return false;
+}
+
+void
HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Bucket)
{
switch (Request.RequestVerb())