diff options
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 270 |
1 files changed, 246 insertions, 24 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 0d62f297c..792d764cb 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -2,23 +2,39 @@ #pragma once +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compress.h> #include <zencore/fmtutils.h> #include <zencore/httpserver.h> +#include <zencore/timer.h> -#include "cachestore.h" #include "structuredcache.h" +#include "structuredcachestore.h" #include "upstream/jupiter.h" +#include "zenstore/cidstore.h" #include <spdlog/spdlog.h> #include <filesystem> namespace zen { -HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore) +using namespace std::literals; + +HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore) : m_CasStore(InStore) , m_CacheStore(InStore, RootPath) +, m_CidStore(InCidStore) { spdlog::info("initializing structured cache at '{}'", RootPath); + +#if 0 + m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv, + "ue4.ddc"sv /* namespace */, + "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */, + "0oao91lrhqPiAlaGD0x7"sv /* client id */, + "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */); +#endif } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -42,6 +58,21 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) return Request.WriteResponse(zen::HttpResponse::BadRequest); // invalid URL } + if (Ref.PayloadId == IoHash::Zero) + { + return HandleCacheRecordRequest(Request, Ref); + } + else + { + return HandleCachePayloadRequest(Request, Ref); + } + + return; +} + +void +HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref) +{ switch (auto Verb = Request.RequestVerb()) { using enum zen::HttpVerb; @@ -49,25 +80,20 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) case kHead: case kGet: { - CacheValue Value; - bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); + ZenCacheValue Value; + bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); if (!Success) { - Request.WriteResponse(zen::HttpResponse::NotFound); + return Request.WriteResponse(zen::HttpResponse::NotFound); } - else + + if (Verb == kHead) { - if (Verb == kHead) - { - Request.SetSuppressResponseBody(); - Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value); - } - else - { - Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value); - } + Request.SetSuppressResponseBody(); } + + return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value); } break; @@ -75,12 +101,99 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) { if (zen::IoBuffer Body = Request.ReadPayload()) { - CacheValue Value; + if (Body.Size() == 0) + { + return Request.WriteResponse(zen::HttpResponse::BadRequest); + } + + ZenCacheValue Value; Value.Value = Body; + HttpContentType ContentType = Request.RequestContentType(); + + bool IsCompactBinary; + + switch (ContentType) + { + case HttpContentType::kUnknownContentType: + case HttpContentType::kBinary: + IsCompactBinary = false; + break; + + case HttpContentType::kCbObject: + IsCompactBinary = true; + break; + + default: + return Request.WriteResponse(zen::HttpResponse::BadRequest); + } + + // Compute index data + + if (IsCompactBinary) + { + // Validate payload before accessing it + zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); + + if (ValidationResult != CbValidateError::None) + { + // TODO: add details in response + return Request.WriteResponse(HttpResponse::BadRequest); + } + + // Extract data for index + zen::CbObjectView Cbo(Body.Data()); + + std::vector<IoHash> References; + Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + + if (!References.empty()) + { + zen::CbObjectWriter Idx; + Idx.BeginArray("r"); + + for (const IoHash& Hash : References) + { + Idx.AddHash(Hash); + } + + Idx.EndArray(); + } + + // TODO: store references in index + } + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); - Request.WriteResponse(zen::HttpResponse::Created); + // This is currently synchronous for simplicity and debuggability but should be + // made asynchronous + + if (m_Cloud) + { + CloudCacheSession Session(m_Cloud); + + zen::Stopwatch Timer; + + try + { + Session.Put(Ref.BucketSegment, Ref.HashKey, Value); + spdlog::debug("upstream PUT ({}) succeeded after {:5}!", + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + } + catch (std::exception& e) + { + spdlog::debug("upstream PUT ({}) failed after {:5}: '{}'", + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + + throw; + } + } + + return Request.WriteResponse(zen::HttpResponse::Created); } else { @@ -97,26 +210,136 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) } } -[[nodiscard]] bool +void +HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref) +{ + // Note: the URL references the uncompressed payload hash - so this maintains the mapping + // from uncompressed CAS identity to the stored payload hash + // + // this is a PITA but a consequence of the fact that the client side code is not able to + // address data by compressed hash + + switch (auto Verb = Request.RequestVerb()) + { + using enum zen::HttpVerb; + + case kHead: + case kGet: + { + // TODO: need to map from uncompressed content address into the storage + // (compressed) content address + + zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + + if (!Payload) + { + return Request.WriteResponse(zen::HttpResponse::NotFound); + } + + if (Verb == kHead) + { + Request.SetSuppressResponseBody(); + } + + return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Payload); + } + break; + + case kPut: + { + if (zen::IoBuffer Body = Request.ReadPayload()) + { + if (Body.Size() == 0) + { + return Request.WriteResponse(zen::HttpResponse::BadRequest); + } + + zen::IoHash ChunkHash = zen::IoHash::HashMemory(Body); + + zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(Body)); + + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) + { + // the URL specified content id and content hashes don't match! + return Request.WriteResponse(HttpResponse::BadRequest); + } + + zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); + + m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + + if (Result.New) + { + return Request.WriteResponse(zen::HttpResponse::Created); + } + else + { + return Request.WriteResponse(zen::HttpResponse::OK); + } + } + } + break; + + case kPost: + break; + + default: + break; + } +} + +bool HttpStructuredCacheService::ValidateUri(zen::HttpServerRequest& Request, CacheRef& OutRef) { std::string_view Key = Request.RelativeUri(); - std::string_view::size_type BucketSplitOffset = Key.find_last_of('/'); + std::string_view::size_type BucketSplitOffset = Key.find_first_of('/'); if (BucketSplitOffset == std::string_view::npos) { return false; } - OutRef.BucketSegment = Key.substr(0, BucketSplitOffset); - std::string_view HashSegment = Key.substr(BucketSplitOffset + 1); + OutRef.BucketSegment = Key.substr(0, BucketSplitOffset); + + std::string_view HashSegment; + std::string_view PayloadSegment; - if (HashSegment.size() != (2 * sizeof OutRef.HashKey.Hash)) + std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/'); + + // We know there is a slash so no need to check for npos return + + if (PayloadSplitOffset == BucketSplitOffset) + { + // Basic cache record lookup + HashSegment = Key.substr(BucketSplitOffset + 1); + } + else + { + // Cache record + payload lookup + HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1); + PayloadSegment = Key.substr(PayloadSplitOffset + 1); + } + + if (HashSegment.size() != zen::IoHash::StringLength) { return false; } - bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); + if (!PayloadSegment.empty() && PayloadSegment.size() == zen::IoHash::StringLength) + { + const bool IsOk = zen::ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); + + if (!IsOk) + { + return false; + } + } + else + { + OutRef.PayloadId = zen::IoHash::Zero; + } + + const bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); if (!IsOk) { @@ -125,5 +348,4 @@ HttpStructuredCacheService::ValidateUri(zen::HttpServerRequest& Request, CacheRe return true; } - } // namespace zen |