diff options
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 343 |
1 files changed, 167 insertions, 176 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index def1adb90..1e3eb5dcd 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -12,12 +12,16 @@ #include "structuredcache.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" +#include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" #include <spdlog/spdlog.h> #include <algorithm> +#include <atomic> #include <filesystem> +#include <queue> +#include <thread> namespace zen { @@ -44,31 +48,19 @@ MapToHttpContentType(zen::ZenContentType Type) } }; -HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore) +////////////////////////////////////////////////////////////////////////// + +HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InCacheStore, + zen::CasStore& InStore, + zen::CidStore& InCidStore, + std::unique_ptr<UpstreamCache> UpstreamCache) : m_Log("cache", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks())) +, m_CacheStore(InCacheStore) , m_CasStore(InStore) -, m_CacheStore(InStore, RootPath) , m_CidStore(InCidStore) +, m_UpstreamCache(std::move(UpstreamCache)) { m_Log.set_level(spdlog::level::debug); - - m_Log.info("initializing structured cache at '{}'", RootPath); - -#if 0 - m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv, - "ue4.ddc"sv /* namespace */, - "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */, - "0oao91lrhqPiAlaGD0x7"sv /* client id */, - "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */); -#endif - -#if 0 - std::string_view UpstreamSpec = "http://arn-wd-15192.epicgames.net"sv; - - m_Log.info("Using upstream Zen cache at '{}'", UpstreamSpec); - - m_ZenClient = new ZenStructuredCacheClient(UpstreamSpec); -#endif } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -161,66 +153,55 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req ZenCacheValue Value; bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); - if (!Success && m_ZenClient) + if (!Success && m_UpstreamCache) { - ZenStructuredCacheSession Session(*m_ZenClient); - - zen::Stopwatch Timer; + const ZenContentType CacheRecordType = + Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : ZenContentType::kCbObject; - try + if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); + UpstreamResult.Success) { - Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey); + Value.Value = UpstreamResult.Value; + Success = true; - m_Log.debug("Zen upstream GET ({}/{}) succeeded after {:5}!", - Ref.BucketSegment, - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); - - // TODO: this is incomplete and needs to propagate any referenced content - - Success = true; - } - catch (std::exception& e) - { - m_Log.warn("Zen upstream GET ({}/{}) FAILED after {:5}: '{}'", - Ref.BucketSegment, - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); - - throw; - } - } - - if (m_Cloud) - { - // Note that this is not fully functional, pending implementation work on - // the Jupiter end - - CloudCacheSession Session(m_Cloud); - - zen::Stopwatch Timer; - - try - { - Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey); - - m_Log.debug("Cloud upstream GET ({}/{}) succeeded after {:5}!", - Ref.BucketSegment, - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + if (CacheRecordType == ZenContentType::kCbObject) + { + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(MemoryView(UpstreamResult.Value.Data(), UpstreamResult.Value.Size()), + zen::CbValidateMode::All); - // TODO: this is incomplete and needs to propagate any referenced content - } - catch (std::exception& e) - { - m_Log.warn("Cloud upstream GET ({}/{}) FAILED after {:5}: '{}'", - Ref.BucketSegment, - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + if (ValidationResult == CbValidateError::None) + { + zen::CbObjectView Cbo(UpstreamResult.Value.Data()); + + std::vector<IoHash> References; + Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + + if (!References.empty()) + { + zen::CbObjectWriter Idx; + Idx.BeginArray("references"); + for (const IoHash& Hash : References) + { + Idx.AddHash(Hash); + } + Idx.EndArray(); + + Value.IndexData = Idx.Save(); + } + } + else + { + Value.Value = IoBuffer(); + Success = false; + m_Log.warn("Upstream cache record '{}/{}' failed validation", Ref.BucketSegment, Ref.HashKey); + } + } - throw; + if (Success) + { + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); + } } } @@ -248,140 +229,130 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req case kPut: { - if (zen::IoBuffer Body = Request.ReadPayload()) - { - if (Body.Size() == 0) - { - return Request.WriteResponse(zen::HttpResponse::BadRequest); - } + zen::IoBuffer Body = Request.ReadPayload(); - ZenCacheValue Value; - Value.Value = Body; + if (!Body || Body.Size() == 0) + { + return Request.WriteResponse(zen::HttpResponse::BadRequest); + } - HttpContentType ContentType = Request.RequestContentType(); + const HttpContentType ContentType = Request.RequestContentType(); - bool IsCompactBinary; + bool IsCompactBinary = false; - switch (ContentType) - { - case HttpContentType::kUnknownContentType: - case HttpContentType::kBinary: - IsCompactBinary = false; - break; + switch (ContentType) + { + case HttpContentType::kUnknownContentType: + case HttpContentType::kBinary: + IsCompactBinary = false; + break; - case HttpContentType::kCbObject: - IsCompactBinary = true; - break; + case HttpContentType::kCbObject: + IsCompactBinary = true; + break; - default: - return Request.WriteResponse(zen::HttpResponse::BadRequest); - } + default: + return Request.WriteResponse(zen::HttpResponse::BadRequest); + } - // Compute index data + if (!IsCompactBinary) + { + // TODO: create a cache record and put value in CAS? + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + m_Log.debug("PUT (binary) - '{}/{}' ({} bytes, {})", + Ref.BucketSegment, + Ref.HashKey, + Body.Size(), + Body.GetContentType()); - if (IsCompactBinary) + if (m_UpstreamCache) { - // Validate payload before accessing it - zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - - if (ValidationResult != CbValidateError::None) - { - m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); - - // TODO: add details in response - return Request.WriteResponse(HttpResponse::BadRequest); - } - - // Extract data for index - zen::CbObjectView Cbo(Body.Data()); + auto Result = m_UpstreamCache->EnqueueUpstream( + {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); + ZEN_ASSERT(Result.Success); + } - std::vector<IoHash> References; - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + return Request.WriteResponse(zen::HttpResponse::Created); + } - if (!References.empty()) - { - zen::CbObjectWriter Idx; - Idx.BeginArray("r"); + // Validate payload before accessing it + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - for (const IoHash& Hash : References) - { - Idx.AddHash(Hash); - } + if (ValidationResult != CbValidateError::None) + { + m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); - Idx.EndArray(); + // TODO: add details in response, kText || kCbObject? + return Request.WriteResponse(HttpResponse::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); + } - // TODO: store references in index - } - } + // Extract referenced payload hashes + zen::CbObjectView Cbo(Body.Data()); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); + std::vector<IoHash> References; + std::vector<IoHash> MissingRefs; + Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); - m_Log.debug("PUT - '{}/{}' ({} bytes, {})", - Ref.BucketSegment, - Ref.HashKey, - Value.Value.Size(), - Value.Value.GetContentType()); + ZenCacheValue CacheValue; + CacheValue.Value = Body; - // absolutely be made asynchronous. By default these should be deferred - // because the client should not care if the data has propagated upstream or - // not + if (!References.empty()) + { + zen::CbObjectWriter Idx; + Idx.BeginArray("references"); - if (m_ZenClient) + for (const IoHash& Hash : References) { - ZenStructuredCacheSession Session(*m_ZenClient); - - zen::Stopwatch Timer; - - try - { - Session.Put(Ref.BucketSegment, Ref.HashKey, Value); - - m_Log.debug("Zen upstream PUT ({}) succeeded after {:5}!", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); - } - catch (std::exception& e) + Idx.AddHash(Hash); + if (!m_CidStore.ContainsChunk(Hash)) { - m_Log.warn("Zen upstream PUT ({}) FAILED after {:5}: '{}'", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); - - throw; + MissingRefs.push_back(Hash); } } - if (m_Cloud) - { - CloudCacheSession Session(m_Cloud); + Idx.EndArray(); - zen::Stopwatch Timer; + CacheValue.IndexData = Idx.Save(); + } - try - { - Session.Put(Ref.BucketSegment, Ref.HashKey, Value); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); - m_Log.debug("upstream PUT ({}) succeeded after {:5}!", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); - } - catch (std::exception& e) - { - m_Log.warn("upstream PUT ({}) FAILED after {:5}: '{}'", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + m_Log.debug("PUT (cache record) - '{}/{}' ({} bytes, {}, ({}/{} refs/missing))", + Ref.BucketSegment, + Ref.HashKey, + CacheValue.Value.Size(), + CacheValue.Value.GetContentType(), + References.size(), + MissingRefs.size()); - throw; - } + if (MissingRefs.empty()) + { + // Only enqueue valid cache records, i.e. all referenced payloads exists + if (m_UpstreamCache) + { + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(References)}); + ZEN_ASSERT(Result.Success); } return Request.WriteResponse(zen::HttpResponse::Created); } else { - return; + // TODO: Binary attachments? + zen::CbObjectWriter Response; + Response.BeginArray("needs"); + for (const IoHash& MissingRef : MissingRefs) + { + Response.AddHash(MissingRef); + m_Log.debug("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); + } + Response.EndArray(); + + // Return Created | BadRequest? + return Request.WriteResponse(zen::HttpResponse::Created, Response.Save()); } } break; @@ -412,6 +383,26 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re { zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + if (!Payload && m_UpstreamCache) + { + if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); + UpstreamResult.Success) + { + if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Payload))) + { + Payload = UpstreamResult.Payload; + zen::IoHash ChunkHash = zen::IoHash::HashMemory(Payload); + zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); + + m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + } + else + { + m_Log.warn("got uncompressed upstream cache payload"); + } + } + } + if (!Payload) { m_Log.debug("MISS - '{}/{}/{}' ({} bytes, {})", @@ -512,7 +503,7 @@ HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, Cach return false; } - OutRef.BucketSegment = Key.substr(0, BucketSplitOffset); + OutRef.BucketSegment = ToLower(Key.substr(0, BucketSplitOffset)); if (!std::all_of(begin(OutRef.BucketSegment), end(OutRef.BucketSegment), [](const char c) { return std::isalnum(c); })) { |