diff options
| author | Per Larsson <[email protected]> | 2022-02-25 14:40:15 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2022-02-25 14:40:15 +0100 |
| commit | 4c00df1f37b3401321c5de6309613cfddc0166db (patch) | |
| tree | 35b831871faa6d349513c11644902dbc701dc467 /zenserver/cache/structuredcache.cpp | |
| parent | Use AppImage when bundling for Linux to avoid unmet GCC-11 dependencies (diff) | |
| download | zen-4c00df1f37b3401321c5de6309613cfddc0166db.tar.xz zen-4c00df1f37b3401321c5de6309613cfddc0166db.zip | |
Initial attempt of a streaming websocket API.
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 8ae531720..499329e94 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -162,6 +162,162 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) } void +HttpStructuredCacheService::RegisterHandlers(WebSocketServer& Server) +{ + Server.RegisterRequestHandler("GetBinaryCacheValue"sv, *this); + Server.RegisterRequestHandler("GetCacheValues"sv, *this); +} + +bool +HttpStructuredCacheService::HandleRequest(const WebSocketMessage& RequestMessage) +{ + CbObjectView Request = RequestMessage.Body().GetObject(); + + const auto Method = Request["Method"].AsString(); + CbObjectView Params = Request["Params"sv].AsObjectView(); + + if (Method == "GetBinaryCacheValue"sv) + { + ZEN_TRACE_CPU("Z$::WS_GetBinaryCacheValue"); + + // CachePolicy Policy; + CbObjectView KeyObject = Params["Key"sv].AsObjectView(); + CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); + + ZenCacheValue CacheValue; + const bool InLocalCache = m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue); + + CbPackage Response; + + if (InLocalCache) + { + m_CacheStats.HitCount++; + + CbAttachment Attachment(SharedBuffer(CacheValue.Value)); + + CbObjectWriter ResponseObject; + ResponseObject.AddAttachment("Result", Attachment); + Response.AddAttachment(std::move(Attachment)); + Response.SetObject(ResponseObject.Save()); + + ZenContentType ContentType = CacheValue.Value.GetContentType(); + + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", Key.Bucket, Key.Hash, NiceBytes(CacheValue.Value.Size()), ToString(ContentType)); + } + else + { + m_CacheStats.MissCount++; + + CbObjectWriter ResponseObject; + ResponseObject << "Error"sv + << "Not Found"sv; + Response.SetObject(ResponseObject.Save()); + + ZEN_DEBUG("MISS - '{}/{}' '{}'", Key.Bucket, Key.Hash, ToString(ZenContentType::kBinary)); + } + + WebSocketMessage ResponseMessage; + ResponseMessage.SetMessageType(WebSocketMessageType::kResponse); + ResponseMessage.SetCorrelationId(RequestMessage.CorrelationId()); + ResponseMessage.SetSocketId(RequestMessage.SocketId()); + ResponseMessage.SetBody(std::move(Response)); + + SocketServer().SendResponse(std::move(ResponseMessage)); + + return true; + } + + if (Method == "GetCacheValues"sv) + { + ZEN_TRACE_CPU("Z$::WS_GetCacheValues"); + + const std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = DefaultPolicyText.empty() ? CachePolicy::Default : ParseCachePolicy(DefaultPolicyText); + + for (uint32_t RequestIdx = 0; CbFieldView RequestField : Params["Requests"sv]) + { + CbObjectView RequestObject = RequestField.AsObjectView(); + + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); + std::string_view PolicyText = RequestObject["Policy"sv].AsString(); + CachePolicy Policy = PolicyText.empty() ? DefaultPolicy : ParseCachePolicy(PolicyText); + + CompressedBuffer Compressed; + bool InLocalCache = false; + + if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) + { + ZenCacheValue CacheValue; + if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) + { + Compressed = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); + InLocalCache = true; + } + } + + if (Compressed.IsNull() && EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) + { + if (auto UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kCompressedBinary); + UpstreamResult.Success) + { + Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)); + + if (Compressed) + { + UpstreamResult.Value.SetContentType(ZenContentType::kCompressedBinary); + m_CacheStore.Put(Key.Bucket, Key.Hash, ZenCacheValue{UpstreamResult.Value}); + } + } + } + + CbPackage Response; + CbObjectWriter ResponseObject; + + ResponseObject.BeginObject("Result"sv); + ResponseObject.AddInteger("RequestIndex"sv, RequestIdx++); + + const IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); + const uint64_t RawSize = Compressed.GetRawSize(); + + if (Compressed) + { + ResponseObject.AddHash("RawHash"sv, RawHash); + + if (EnumHasAllFlags(Policy, CachePolicy::SkipData)) + { + ResponseObject.AddInteger("RawSize"sv, RawSize); + } + else + { + Response.AddAttachment(CbAttachment(std::move(Compressed))); + } + } + + ResponseObject.EndObject(); + Response.SetObject(ResponseObject.Save()); + + SendStreamResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId(), std::move(Response)); + + if (RawSize > 0) + { + ZEN_DEBUG("HIT - '{}/{}' {} '{}'", Key.Bucket, Key.Hash, NiceBytes(RawSize), ToString(ZenContentType::kCompressedBinary)); + } + else + { + ZEN_DEBUG("MISS - '{}/{}'", Key.Bucket, Key.Hash); + } + } + + SendStreamCompleteResponse(RequestMessage.SocketId(), RequestMessage.CorrelationId()); + + return true; + } + + return false; +} + +void HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Bucket) { switch (Request.RequestVerb()) |