diff options
| author | Per Larsson <[email protected]> | 2021-08-31 15:01:46 +0200 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-08-31 15:16:22 +0200 |
| commit | fd3946f2b2b013af01fdf60f67afb655c38c1901 (patch) | |
| tree | eca4abed5d71a157e185699f4e9668a92b756ca8 /zenserver/cache | |
| parent | Removed unused packages from vcpkg.json (diff) | |
| download | zen-fd3946f2b2b013af01fdf60f67afb655c38c1901.tar.xz zen-fd3946f2b2b013af01fdf60f67afb655c38c1901.zip | |
Asynchronous upstream caching to Jupiter
Co-authored-by: Stefan Boberg <[email protected]>
Diffstat (limited to 'zenserver/cache')
| -rw-r--r-- | zenserver/cache/kvcache.cpp | 12 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 343 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 27 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 1 |
4 files changed, 189 insertions, 194 deletions
diff --git a/zenserver/cache/kvcache.cpp b/zenserver/cache/kvcache.cpp index 096edcef7..fbbb932ea 100644 --- a/zenserver/cache/kvcache.cpp +++ b/zenserver/cache/kvcache.cpp @@ -69,6 +69,7 @@ HttpKvCacheService::HttpKvCacheService() { m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv, "ue4.ddc"sv /* namespace */, + "test.ddc"sv /* blob store namespace */, "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */, "0oao91lrhqPiAlaGD0x7"sv /* client id */, "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */); @@ -115,16 +116,16 @@ HttpKvCacheService::HandleRequest(zen::HttpServerRequest& Request) zen::Stopwatch Timer; - if (IoBuffer CloudValue = Session.Get("default", Key)) + if (CloudCacheResult Result = Session.GetDerivedData("default", Key); Result.Success) { Success = true; spdlog::debug("upstream HIT after {:5} {:6}! {}", zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - NiceBytes(CloudValue.Size()), + NiceBytes(Result.Value.Size()), Key); - Value.Value = CloudValue; + Value.Value = Result.Value; } else { @@ -175,9 +176,10 @@ HttpKvCacheService::HandleRequest(zen::HttpServerRequest& Request) zen::Stopwatch Timer; - Session.Put("default", Key, Value.Value); + CloudCacheResult Result = Session.PutDerivedData("default", Key, Value.Value); - spdlog::debug("upstream PUT took {:5} {:6}! {}", + spdlog::debug("upstream PUT '{}', took {:5} {:6}! {}", + Result.Success ? "OK" : "FAILED", zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), NiceBytes(Value.Value.Size()), Key); diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index def1adb90..1e3eb5dcd 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -12,12 +12,16 @@ #include "structuredcache.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" +#include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" #include <spdlog/spdlog.h> #include <algorithm> +#include <atomic> #include <filesystem> +#include <queue> +#include <thread> namespace zen { @@ -44,31 +48,19 @@ MapToHttpContentType(zen::ZenContentType Type) } }; -HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore) +////////////////////////////////////////////////////////////////////////// + +HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InCacheStore, + zen::CasStore& InStore, + zen::CidStore& InCidStore, + std::unique_ptr<UpstreamCache> UpstreamCache) : m_Log("cache", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks())) +, m_CacheStore(InCacheStore) , m_CasStore(InStore) -, m_CacheStore(InStore, RootPath) , m_CidStore(InCidStore) +, m_UpstreamCache(std::move(UpstreamCache)) { m_Log.set_level(spdlog::level::debug); - - m_Log.info("initializing structured cache at '{}'", RootPath); - -#if 0 - m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv, - "ue4.ddc"sv /* namespace */, - "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */, - "0oao91lrhqPiAlaGD0x7"sv /* client id */, - "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */); -#endif - -#if 0 - std::string_view UpstreamSpec = "http://arn-wd-15192.epicgames.net"sv; - - m_Log.info("Using upstream Zen cache at '{}'", UpstreamSpec); - - m_ZenClient = new ZenStructuredCacheClient(UpstreamSpec); -#endif } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -161,66 +153,55 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req ZenCacheValue Value; bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); - if (!Success && m_ZenClient) + if (!Success && m_UpstreamCache) { - ZenStructuredCacheSession Session(*m_ZenClient); - - zen::Stopwatch Timer; + const ZenContentType CacheRecordType = + Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : ZenContentType::kCbObject; - try + if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); + UpstreamResult.Success) { - Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey); + Value.Value = UpstreamResult.Value; + Success = true; - m_Log.debug("Zen upstream GET ({}/{}) succeeded after {:5}!", - Ref.BucketSegment, - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); - - // TODO: this is incomplete and needs to propagate any referenced content - - Success = true; - } - catch (std::exception& e) - { - m_Log.warn("Zen upstream GET ({}/{}) FAILED after {:5}: '{}'", - Ref.BucketSegment, - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); - - throw; - } - } - - if (m_Cloud) - { - // Note that this is not fully functional, pending implementation work on - // the Jupiter end - - CloudCacheSession Session(m_Cloud); - - zen::Stopwatch Timer; - - try - { - Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey); - - m_Log.debug("Cloud upstream GET ({}/{}) succeeded after {:5}!", - Ref.BucketSegment, - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + if (CacheRecordType == ZenContentType::kCbObject) + { + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(MemoryView(UpstreamResult.Value.Data(), UpstreamResult.Value.Size()), + zen::CbValidateMode::All); - // TODO: this is incomplete and needs to propagate any referenced content - } - catch (std::exception& e) - { - m_Log.warn("Cloud upstream GET ({}/{}) FAILED after {:5}: '{}'", - Ref.BucketSegment, - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + if (ValidationResult == CbValidateError::None) + { + zen::CbObjectView Cbo(UpstreamResult.Value.Data()); + + std::vector<IoHash> References; + Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + + if (!References.empty()) + { + zen::CbObjectWriter Idx; + Idx.BeginArray("references"); + for (const IoHash& Hash : References) + { + Idx.AddHash(Hash); + } + Idx.EndArray(); + + Value.IndexData = Idx.Save(); + } + } + else + { + Value.Value = IoBuffer(); + Success = false; + m_Log.warn("Upstream cache record '{}/{}' failed validation", Ref.BucketSegment, Ref.HashKey); + } + } - throw; + if (Success) + { + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); + } } } @@ -248,140 +229,130 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req case kPut: { - if (zen::IoBuffer Body = Request.ReadPayload()) - { - if (Body.Size() == 0) - { - return Request.WriteResponse(zen::HttpResponse::BadRequest); - } + zen::IoBuffer Body = Request.ReadPayload(); - ZenCacheValue Value; - Value.Value = Body; + if (!Body || Body.Size() == 0) + { + return Request.WriteResponse(zen::HttpResponse::BadRequest); + } - HttpContentType ContentType = Request.RequestContentType(); + const HttpContentType ContentType = Request.RequestContentType(); - bool IsCompactBinary; + bool IsCompactBinary = false; - switch (ContentType) - { - case HttpContentType::kUnknownContentType: - case HttpContentType::kBinary: - IsCompactBinary = false; - break; + switch (ContentType) + { + case HttpContentType::kUnknownContentType: + case HttpContentType::kBinary: + IsCompactBinary = false; + break; - case HttpContentType::kCbObject: - IsCompactBinary = true; - break; + case HttpContentType::kCbObject: + IsCompactBinary = true; + break; - default: - return Request.WriteResponse(zen::HttpResponse::BadRequest); - } + default: + return Request.WriteResponse(zen::HttpResponse::BadRequest); + } - // Compute index data + if (!IsCompactBinary) + { + // TODO: create a cache record and put value in CAS? + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + m_Log.debug("PUT (binary) - '{}/{}' ({} bytes, {})", + Ref.BucketSegment, + Ref.HashKey, + Body.Size(), + Body.GetContentType()); - if (IsCompactBinary) + if (m_UpstreamCache) { - // Validate payload before accessing it - zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - - if (ValidationResult != CbValidateError::None) - { - m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); - - // TODO: add details in response - return Request.WriteResponse(HttpResponse::BadRequest); - } - - // Extract data for index - zen::CbObjectView Cbo(Body.Data()); + auto Result = m_UpstreamCache->EnqueueUpstream( + {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); + ZEN_ASSERT(Result.Success); + } - std::vector<IoHash> References; - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + return Request.WriteResponse(zen::HttpResponse::Created); + } - if (!References.empty()) - { - zen::CbObjectWriter Idx; - Idx.BeginArray("r"); + // Validate payload before accessing it + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - for (const IoHash& Hash : References) - { - Idx.AddHash(Hash); - } + if (ValidationResult != CbValidateError::None) + { + m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); - Idx.EndArray(); + // TODO: add details in response, kText || kCbObject? + return Request.WriteResponse(HttpResponse::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); + } - // TODO: store references in index - } - } + // Extract referenced payload hashes + zen::CbObjectView Cbo(Body.Data()); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); + std::vector<IoHash> References; + std::vector<IoHash> MissingRefs; + Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); - m_Log.debug("PUT - '{}/{}' ({} bytes, {})", - Ref.BucketSegment, - Ref.HashKey, - Value.Value.Size(), - Value.Value.GetContentType()); + ZenCacheValue CacheValue; + CacheValue.Value = Body; - // absolutely be made asynchronous. By default these should be deferred - // because the client should not care if the data has propagated upstream or - // not + if (!References.empty()) + { + zen::CbObjectWriter Idx; + Idx.BeginArray("references"); - if (m_ZenClient) + for (const IoHash& Hash : References) { - ZenStructuredCacheSession Session(*m_ZenClient); - - zen::Stopwatch Timer; - - try - { - Session.Put(Ref.BucketSegment, Ref.HashKey, Value); - - m_Log.debug("Zen upstream PUT ({}) succeeded after {:5}!", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); - } - catch (std::exception& e) + Idx.AddHash(Hash); + if (!m_CidStore.ContainsChunk(Hash)) { - m_Log.warn("Zen upstream PUT ({}) FAILED after {:5}: '{}'", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); - - throw; + MissingRefs.push_back(Hash); } } - if (m_Cloud) - { - CloudCacheSession Session(m_Cloud); + Idx.EndArray(); - zen::Stopwatch Timer; + CacheValue.IndexData = Idx.Save(); + } - try - { - Session.Put(Ref.BucketSegment, Ref.HashKey, Value); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); - m_Log.debug("upstream PUT ({}) succeeded after {:5}!", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); - } - catch (std::exception& e) - { - m_Log.warn("upstream PUT ({}) FAILED after {:5}: '{}'", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + m_Log.debug("PUT (cache record) - '{}/{}' ({} bytes, {}, ({}/{} refs/missing))", + Ref.BucketSegment, + Ref.HashKey, + CacheValue.Value.Size(), + CacheValue.Value.GetContentType(), + References.size(), + MissingRefs.size()); - throw; - } + if (MissingRefs.empty()) + { + // Only enqueue valid cache records, i.e. all referenced payloads exists + if (m_UpstreamCache) + { + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(References)}); + ZEN_ASSERT(Result.Success); } return Request.WriteResponse(zen::HttpResponse::Created); } else { - return; + // TODO: Binary attachments? + zen::CbObjectWriter Response; + Response.BeginArray("needs"); + for (const IoHash& MissingRef : MissingRefs) + { + Response.AddHash(MissingRef); + m_Log.debug("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); + } + Response.EndArray(); + + // Return Created | BadRequest? + return Request.WriteResponse(zen::HttpResponse::Created, Response.Save()); } } break; @@ -412,6 +383,26 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re { zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + if (!Payload && m_UpstreamCache) + { + if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); + UpstreamResult.Success) + { + if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Payload))) + { + Payload = UpstreamResult.Payload; + zen::IoHash ChunkHash = zen::IoHash::HashMemory(Payload); + zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); + + m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + } + else + { + m_Log.warn("got uncompressed upstream cache payload"); + } + } + } + if (!Payload) { m_Log.debug("MISS - '{}/{}/{}' ({} bytes, {})", @@ -512,7 +503,7 @@ HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, Cach return false; } - OutRef.BucketSegment = Key.substr(0, BucketSplitOffset); + OutRef.BucketSegment = ToLower(Key.substr(0, BucketSplitOffset)); if (!std::all_of(begin(OutRef.BucketSegment), end(OutRef.BucketSegment), [](const char c) { return std::isalnum(c); })) { diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 73b0825dc..b90301d84 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -3,18 +3,17 @@ #pragma once #include <zencore/httpserver.h> -#include <zencore/refcount.h> - -#include "structuredcachestore.h" -#include "upstream/jupiter.h" #include <spdlog/spdlog.h> +#include <memory> + +class ZenCacheStore; namespace zen { -class CloudCacheClient; +class CasStore; class CidStore; -class ZenStructuredCacheClient; +class UpstreamCache; /** * Structured cache service. Imposes constraints on keys, supports blobs and @@ -47,7 +46,10 @@ class ZenStructuredCacheClient; class HttpStructuredCacheService : public zen::HttpService { public: - HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore); + HttpStructuredCacheService(ZenCacheStore& InCacheStore, + zen::CasStore& InCasStore, + zen::CidStore& InCidStore, + std::unique_ptr<UpstreamCache> UpstreamCache); ~HttpStructuredCacheService(); virtual const char* BaseUri() const override; @@ -69,12 +71,11 @@ private: void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); - spdlog::logger m_Log; - zen::CasStore& m_CasStore; - zen::CidStore& m_CidStore; - ZenCacheStore m_CacheStore; - RefPtr<CloudCacheClient> m_Cloud; - RefPtr<ZenStructuredCacheClient> m_ZenClient; + spdlog::logger m_Log; + ZenCacheStore& m_CacheStore; + zen::CasStore& m_CasStore; + zen::CidStore& m_CidStore; + std::unique_ptr<UpstreamCache> m_UpstreamCache; }; } // namespace zen diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 79bcf4838..140ff1853 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -31,6 +31,7 @@ using namespace fmt::literals; ZenCacheStore::ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} { + spdlog::info("initializing structured cache at '{}'", RootDir); zen::CreateDirectories(RootDir); } |