From 049f51d56a59299bcbf718bf1a55d7e745f2cf57 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 20 May 2021 14:05:01 +0200 Subject: WIP structured cache endpoints - tactical check-in not fully functional yet --- zenserver/cache/structuredcache.cpp | 47 ++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) (limited to 'zenserver/cache/structuredcache.cpp') diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 0d62f297c..e90f838da 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -4,6 +4,7 @@ #include #include +#include #include "cachestore.h" #include "structuredcache.h" @@ -14,11 +15,19 @@ namespace zen { +using namespace std::literals; + HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore) : m_CasStore(InStore) , m_CacheStore(InStore, RootPath) { spdlog::info("initializing structured cache at '{}'", RootPath); + + m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv, + "ue4.ddc"sv /* namespace */, + "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */, + "0oao91lrhqPiAlaGD0x7"sv /* client id */, + "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */); } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -78,9 +87,45 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) CacheValue Value; Value.Value = Body; + HttpContentType ContentType = Request.RequestContentType(); + + switch (ContentType) + { + case HttpContentType::kUnknownContentType: + case HttpContentType::kBinary: + Value.IsCompactBinary = false; + break; + + case HttpContentType::kCbObject: + Value.IsCompactBinary = true; + break; + + default: + return Request.WriteResponse(zen::HttpResponse::BadRequest); + } + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); - Request.WriteResponse(zen::HttpResponse::Created); + // This is currently synchronous for simplicity and debuggability but should be + // made asynchronous + + if (m_Cloud) + { + CloudCacheSession Session(m_Cloud); + + zen::Stopwatch Timer; + + if (Session.Put(Ref.BucketSegment, Ref.HashKey)) + { + spdlog::debug("upstream PUT succeeded after {:5}! {}", zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), Ref.HashKey); + } + else + { + spdlog::debug("upstream PUT failed after {:5}! {}", zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), Ref.HashKey); + } + } + + return Request.WriteResponse(zen::HttpResponse::Created); } else { -- cgit v1.2.3 From 78dbac2648bbfe2a3687f37e06a3ed32241cb809 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Fri, 21 May 2021 20:41:34 +0200 Subject: Partial refactoring of structured cache implementation - WIP --- zenserver/cache/structuredcache.cpp | 61 +++++++++++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 12 deletions(-) (limited to 'zenserver/cache/structuredcache.cpp') diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index e90f838da..9e771a715 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -23,11 +23,13 @@ HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path Roo { spdlog::info("initializing structured cache at '{}'", RootPath); +#if 0 m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv, "ue4.ddc"sv /* namespace */, "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */, "0oao91lrhqPiAlaGD0x7"sv /* client id */, "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */); +#endif } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -58,8 +60,8 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) case kHead: case kGet: { - CacheValue Value; - bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); + ZenCacheValue Value; + bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); if (!Success) { @@ -84,7 +86,7 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) { if (zen::IoBuffer Body = Request.ReadPayload()) { - CacheValue Value; + ZenCacheValue Value; Value.Value = Body; HttpContentType ContentType = Request.RequestContentType(); @@ -115,13 +117,21 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) zen::Stopwatch Timer; - if (Session.Put(Ref.BucketSegment, Ref.HashKey)) + try { - spdlog::debug("upstream PUT succeeded after {:5}! {}", zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), Ref.HashKey); + Session.Put(Ref.BucketSegment, Ref.HashKey, Value); + spdlog::debug("upstream PUT ({}) succeeded after {:5}!", + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); } - else + catch (std::exception& e) { - spdlog::debug("upstream PUT failed after {:5}! {}", zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), Ref.HashKey); + spdlog::debug("upstream PUT ({}) failed after {:5}: '{}'", + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + + throw; } } @@ -146,22 +156,50 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) HttpStructuredCacheService::ValidateUri(zen::HttpServerRequest& Request, CacheRef& OutRef) { std::string_view Key = Request.RelativeUri(); - std::string_view::size_type BucketSplitOffset = Key.find_last_of('/'); + std::string_view::size_type BucketSplitOffset = Key.find_first_of('/'); if (BucketSplitOffset == std::string_view::npos) { return false; } - OutRef.BucketSegment = Key.substr(0, BucketSplitOffset); - std::string_view HashSegment = Key.substr(BucketSplitOffset + 1); + OutRef.BucketSegment = Key.substr(0, BucketSplitOffset); + + std::string_view HashSegment; + std::string_view PayloadSegment; + + std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/'); + + // We know there is a slash so no need to check for npos return + + if (PayloadSplitOffset == BucketSplitOffset) + { + // Basic cache record lookup + HashSegment = Key.substr(BucketSplitOffset + 1); + } + else + { + // Cache record + payload lookup + HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1); + PayloadSegment = Key.substr(PayloadSplitOffset + 1); + } if (HashSegment.size() != (2 * sizeof OutRef.HashKey.Hash)) { return false; } - bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); + if (!PayloadSegment.empty() && PayloadSegment.size() != 24) + { + OutRef.PayloadId = zen::Oid::FromHexString(PayloadSegment); + + if (!OutRef.PayloadId) + { + return false; + } + } + + const bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); if (!IsOk) { @@ -170,5 +208,4 @@ HttpStructuredCacheService::ValidateUri(zen::HttpServerRequest& Request, CacheRe return true; } - } // namespace zen -- cgit v1.2.3 From f93d04dd9381eac38be82733c34baa2075efa11c Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Sat, 22 May 2021 11:53:09 +0200 Subject: Split out structured cache store code into dedicated cpp/h pair --- zenserver/cache/structuredcache.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'zenserver/cache/structuredcache.cpp') diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 9e771a715..2704bedc3 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -6,8 +6,8 @@ #include #include -#include "cachestore.h" #include "structuredcache.h" +#include "structuredcachestore.h" #include "upstream/jupiter.h" #include -- cgit v1.2.3 From d6c221e378813e47b29694c99296943b9f2a4fd8 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Sun, 23 May 2021 21:29:21 +0200 Subject: Implemented new URI addressing scheme for the Zen cache endpoints, and prepared for additional indexing capabilities --- zenserver/cache/structuredcache.cpp | 155 +++++++++++++++++++++++++++++++----- 1 file changed, 137 insertions(+), 18 deletions(-) (limited to 'zenserver/cache/structuredcache.cpp') diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 2704bedc3..728cfaded 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -2,6 +2,8 @@ #pragma once +#include +#include #include #include #include @@ -53,6 +55,21 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) return Request.WriteResponse(zen::HttpResponse::BadRequest); // invalid URL } + if (Ref.PayloadId == IoHash::Zero) + { + return HandleCacheRecordRequest(Request, Ref); + } + else + { + return HandleCachePayloadRequest(Request, Ref); + } + + return; +} + +void +HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref) +{ switch (auto Verb = Request.RequestVerb()) { using enum zen::HttpVerb; @@ -65,20 +82,15 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) if (!Success) { - Request.WriteResponse(zen::HttpResponse::NotFound); + return Request.WriteResponse(zen::HttpResponse::NotFound); } - else + + if (Verb == kHead) { - if (Verb == kHead) - { - Request.SetSuppressResponseBody(); - Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value); - } - else - { - Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value); - } + Request.SetSuppressResponseBody(); } + + return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value); } break; @@ -91,21 +103,55 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) HttpContentType ContentType = Request.RequestContentType(); + bool IsCompactBinary; + switch (ContentType) { case HttpContentType::kUnknownContentType: case HttpContentType::kBinary: - Value.IsCompactBinary = false; + IsCompactBinary = false; break; case HttpContentType::kCbObject: - Value.IsCompactBinary = true; + IsCompactBinary = true; break; default: return Request.WriteResponse(zen::HttpResponse::BadRequest); } + // Compute index data + + if (IsCompactBinary) + { + // Validate payload before accessing it + zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); + + if (ValidationResult != CbValidateError::None) + { + // TODO: add details in response + return Request.WriteResponse(HttpResponse::BadRequest); + } + + // Extract data for index + zen::CbObjectView Cbo(Body.Data()); + + int ReferenceCount = 0; + + zen::CbObjectWriter Idx; + Idx.BeginArray(); + + Cbo.IterateAttachments([&](CbFieldView AttachmentView) { + Idx.AddHash(AttachmentView.AsHash()); + ++ReferenceCount; + }); + + Idx.EndArray(); + + // TODO: store references in index + } + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); // This is currently synchronous for simplicity and debuggability but should be @@ -152,7 +198,76 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) } } -[[nodiscard]] bool +void +HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref) +{ + // Note: the URL references the uncompressed payload hash - so this maintains the mapping + // from uncompressed CAS identity to the stored payload hash + // + // this is a PITA but a consequence of the fact that the client side code is not able to + // address data by compressed hash + + switch (auto Verb = Request.RequestVerb()) + { + using enum zen::HttpVerb; + + case kHead: + case kGet: + { + // TODO: need to map from uncompressed content address into the storage + // (compressed) content address + + zen::IoBuffer Payload = m_CasStore.FindChunk(Ref.PayloadId); + + if (!Payload) + { + return Request.WriteResponse(zen::HttpResponse::NotFound); + } + + if (Verb == kHead) + { + Request.SetSuppressResponseBody(); + } + + return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Payload); + } + break; + + case kPut: + { + if (zen::IoBuffer Body = Request.ReadPayload()) + { + zen::IoHash ChunkHash = zen::IoHash::HashMemory(Body); + + if (ChunkHash != Ref.PayloadId) + { + // the URL and data hashes don't match! + return Request.WriteResponse(HttpResponse::BadRequest); + } + + zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); + + if (Result.New) + { + return Request.WriteResponse(zen::HttpResponse::Created); + } + else + { + return Request.WriteResponse(zen::HttpResponse::OK); + } + } + } + break; + + case kPost: + break; + + default: + break; + } +} + +bool HttpStructuredCacheService::ValidateUri(zen::HttpServerRequest& Request, CacheRef& OutRef) { std::string_view Key = Request.RelativeUri(); @@ -184,20 +299,24 @@ HttpStructuredCacheService::ValidateUri(zen::HttpServerRequest& Request, CacheRe PayloadSegment = Key.substr(PayloadSplitOffset + 1); } - if (HashSegment.size() != (2 * sizeof OutRef.HashKey.Hash)) + if (HashSegment.size() != zen::IoHash::StringLength) { return false; } - if (!PayloadSegment.empty() && PayloadSegment.size() != 24) + if (!PayloadSegment.empty() && PayloadSegment.size() == zen::IoHash::StringLength) { - OutRef.PayloadId = zen::Oid::FromHexString(PayloadSegment); + const bool IsOk = zen::ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); - if (!OutRef.PayloadId) + if (!IsOk) { return false; } } + else + { + OutRef.PayloadId = zen::IoHash::Zero; + } const bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); -- cgit v1.2.3 From 508671de7550d103073c87413c35acfc21734406 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 24 May 2021 16:32:43 +0200 Subject: Fixed attachment index code so it doesn't generate zero-sized compactbinary arrays (which are disallowed) --- zenserver/cache/structuredcache.cpp | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) (limited to 'zenserver/cache/structuredcache.cpp') diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 728cfaded..ecc2367ad 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -137,17 +137,21 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req // Extract data for index zen::CbObjectView Cbo(Body.Data()); - int ReferenceCount = 0; + std::vector References; + Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); - zen::CbObjectWriter Idx; - Idx.BeginArray(); + if (!References.empty()) + { + zen::CbObjectWriter Idx; + Idx.BeginArray(); - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { - Idx.AddHash(AttachmentView.AsHash()); - ++ReferenceCount; - }); + for (const IoHash& Hash : References) + { + Idx.AddHash(Hash); + } - Idx.EndArray(); + Idx.EndArray(); + } // TODO: store references in index } @@ -214,7 +218,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re case kHead: case kGet: { - // TODO: need to map from uncompressed content address into the storage + // TODO: need to map from uncompressed content address into the storage // (compressed) content address zen::IoBuffer Payload = m_CasStore.FindChunk(Ref.PayloadId); -- cgit v1.2.3 From e08d6da5d64d3c9b287e2ce53f45414628f59909 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 24 May 2021 17:55:19 +0200 Subject: Validate payloads using embedded CompressedBuffer hash --- zenserver/cache/structuredcache.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'zenserver/cache/structuredcache.cpp') diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index ecc2367ad..317c5641a 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -243,9 +244,11 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re { zen::IoHash ChunkHash = zen::IoHash::HashMemory(Body); - if (ChunkHash != Ref.PayloadId) + zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(Body)); + + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) { - // the URL and data hashes don't match! + // the URL specified content id and content hashes don't match! return Request.WriteResponse(HttpResponse::BadRequest); } -- cgit v1.2.3 From e70e2b4453a27d9d1caf77224ffd6335c50981fb Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 24 May 2021 19:26:40 +0200 Subject: Added CidStore, currently used to track relationships between compressed and uncompressed chunk hashes This first implementation is in-memory only, persistence is next --- zenserver/cache/structuredcache.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'zenserver/cache/structuredcache.cpp') diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 317c5641a..43364af1d 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -12,6 +12,7 @@ #include "structuredcache.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" +#include "zenstore/cidstore.h" #include #include @@ -20,9 +21,10 @@ namespace zen { using namespace std::literals; -HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore) +HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore) : m_CasStore(InStore) , m_CacheStore(InStore, RootPath) +, m_CidStore(InCidStore) { spdlog::info("initializing structured cache at '{}'", RootPath); @@ -144,7 +146,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req if (!References.empty()) { zen::CbObjectWriter Idx; - Idx.BeginArray(); + Idx.BeginArray("r"); for (const IoHash& Hash : References) { @@ -222,7 +224,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re // TODO: need to map from uncompressed content address into the storage // (compressed) content address - zen::IoBuffer Payload = m_CasStore.FindChunk(Ref.PayloadId); + zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); if (!Payload) { @@ -254,6 +256,8 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); + m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + if (Result.New) { return Request.WriteResponse(zen::HttpResponse::Created); -- cgit v1.2.3 From aa1d315d88d5f1118a48b3deaab295ef268610d9 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 24 May 2021 22:25:03 +0200 Subject: Added guards against zero-sized PUTs to structured cache --- zenserver/cache/structuredcache.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'zenserver/cache/structuredcache.cpp') diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 43364af1d..792d764cb 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -101,6 +101,11 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req { if (zen::IoBuffer Body = Request.ReadPayload()) { + if (Body.Size() == 0) + { + return Request.WriteResponse(zen::HttpResponse::BadRequest); + } + ZenCacheValue Value; Value.Value = Body; @@ -244,6 +249,11 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re { if (zen::IoBuffer Body = Request.ReadPayload()) { + if (Body.Size() == 0) + { + return Request.WriteResponse(zen::HttpResponse::BadRequest); + } + zen::IoHash ChunkHash = zen::IoHash::HashMemory(Body); zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(Body)); -- cgit v1.2.3