diff options
| author | Per Larsson <[email protected]> | 2021-08-31 15:01:46 +0200 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-08-31 15:16:22 +0200 |
| commit | fd3946f2b2b013af01fdf60f67afb655c38c1901 (patch) | |
| tree | eca4abed5d71a157e185699f4e9668a92b756ca8 | |
| parent | Removed unused packages from vcpkg.json (diff) | |
| download | zen-fd3946f2b2b013af01fdf60f67afb655c38c1901.tar.xz zen-fd3946f2b2b013af01fdf60f67afb655c38c1901.zip | |
Asynchronous upstream caching to Jupiter
Co-authored-by: Stefan Boberg <[email protected]>
| -rw-r--r-- | zencore/include/zencore/string.h | 15 | ||||
| -rw-r--r-- | zenserver/cache/kvcache.cpp | 12 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 343 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 27 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 1 | ||||
| -rw-r--r-- | zenserver/config.cpp | 16 | ||||
| -rw-r--r-- | zenserver/config.h | 1 | ||||
| -rw-r--r-- | zenserver/diag/logging.cpp | 64 | ||||
| -rw-r--r-- | zenserver/diag/logging.h | 10 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 235 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 38 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 347 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 82 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 28 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj | 2 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj.filters | 6 | ||||
| -rw-r--r-- | zenstore/cidstore.cpp | 25 | ||||
| -rw-r--r-- | zenstore/include/zenstore/cidstore.h | 1 |
18 files changed, 965 insertions, 288 deletions
diff --git a/zencore/include/zencore/string.h b/zencore/include/zencore/string.h index 684e27827..291b18043 100644 --- a/zencore/include/zencore/string.h +++ b/zencore/include/zencore/string.h @@ -605,6 +605,21 @@ HashStringDjb2(const std::string_view& InString) ////////////////////////////////////////////////////////////////////////// +inline std::string +ToLower(const std::string_view& InString) +{ + std::string Out(InString); + + for (char& C : Out) + { + C = static_cast<char>(std::tolower(C)); + } + + return Out; +} + +////////////////////////////////////////////////////////////////////////// + void string_forcelink(); // internal } // namespace zen diff --git a/zenserver/cache/kvcache.cpp b/zenserver/cache/kvcache.cpp index 096edcef7..fbbb932ea 100644 --- a/zenserver/cache/kvcache.cpp +++ b/zenserver/cache/kvcache.cpp @@ -69,6 +69,7 @@ HttpKvCacheService::HttpKvCacheService() { m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv, "ue4.ddc"sv /* namespace */, + "test.ddc"sv /* blob store namespace */, "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */, "0oao91lrhqPiAlaGD0x7"sv /* client id */, "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */); @@ -115,16 +116,16 @@ HttpKvCacheService::HandleRequest(zen::HttpServerRequest& Request) zen::Stopwatch Timer; - if (IoBuffer CloudValue = Session.Get("default", Key)) + if (CloudCacheResult Result = Session.GetDerivedData("default", Key); Result.Success) { Success = true; spdlog::debug("upstream HIT after {:5} {:6}! {}", zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - NiceBytes(CloudValue.Size()), + NiceBytes(Result.Value.Size()), Key); - Value.Value = CloudValue; + Value.Value = Result.Value; } else { @@ -175,9 +176,10 @@ HttpKvCacheService::HandleRequest(zen::HttpServerRequest& Request) zen::Stopwatch Timer; - Session.Put("default", Key, Value.Value); + CloudCacheResult Result = Session.PutDerivedData("default", Key, Value.Value); - spdlog::debug("upstream PUT took {:5} {:6}! {}", + spdlog::debug("upstream PUT '{}', took {:5} {:6}! {}", + Result.Success ? "OK" : "FAILED", zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), NiceBytes(Value.Value.Size()), Key); diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index def1adb90..1e3eb5dcd 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -12,12 +12,16 @@ #include "structuredcache.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" +#include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" #include <spdlog/spdlog.h> #include <algorithm> +#include <atomic> #include <filesystem> +#include <queue> +#include <thread> namespace zen { @@ -44,31 +48,19 @@ MapToHttpContentType(zen::ZenContentType Type) } }; -HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore) +////////////////////////////////////////////////////////////////////////// + +HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InCacheStore, + zen::CasStore& InStore, + zen::CidStore& InCidStore, + std::unique_ptr<UpstreamCache> UpstreamCache) : m_Log("cache", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks())) +, m_CacheStore(InCacheStore) , m_CasStore(InStore) -, m_CacheStore(InStore, RootPath) , m_CidStore(InCidStore) +, m_UpstreamCache(std::move(UpstreamCache)) { m_Log.set_level(spdlog::level::debug); - - m_Log.info("initializing structured cache at '{}'", RootPath); - -#if 0 - m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv, - "ue4.ddc"sv /* namespace */, - "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */, - "0oao91lrhqPiAlaGD0x7"sv /* client id */, - "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */); -#endif - -#if 0 - std::string_view UpstreamSpec = "http://arn-wd-15192.epicgames.net"sv; - - m_Log.info("Using upstream Zen cache at '{}'", UpstreamSpec); - - m_ZenClient = new ZenStructuredCacheClient(UpstreamSpec); -#endif } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -161,66 +153,55 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req ZenCacheValue Value; bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); - if (!Success && m_ZenClient) + if (!Success && m_UpstreamCache) { - ZenStructuredCacheSession Session(*m_ZenClient); - - zen::Stopwatch Timer; + const ZenContentType CacheRecordType = + Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : ZenContentType::kCbObject; - try + if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); + UpstreamResult.Success) { - Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey); + Value.Value = UpstreamResult.Value; + Success = true; - m_Log.debug("Zen upstream GET ({}/{}) succeeded after {:5}!", - Ref.BucketSegment, - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); - - // TODO: this is incomplete and needs to propagate any referenced content - - Success = true; - } - catch (std::exception& e) - { - m_Log.warn("Zen upstream GET ({}/{}) FAILED after {:5}: '{}'", - Ref.BucketSegment, - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); - - throw; - } - } - - if (m_Cloud) - { - // Note that this is not fully functional, pending implementation work on - // the Jupiter end - - CloudCacheSession Session(m_Cloud); - - zen::Stopwatch Timer; - - try - { - Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey); - - m_Log.debug("Cloud upstream GET ({}/{}) succeeded after {:5}!", - Ref.BucketSegment, - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + if (CacheRecordType == ZenContentType::kCbObject) + { + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(MemoryView(UpstreamResult.Value.Data(), UpstreamResult.Value.Size()), + zen::CbValidateMode::All); - // TODO: this is incomplete and needs to propagate any referenced content - } - catch (std::exception& e) - { - m_Log.warn("Cloud upstream GET ({}/{}) FAILED after {:5}: '{}'", - Ref.BucketSegment, - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + if (ValidationResult == CbValidateError::None) + { + zen::CbObjectView Cbo(UpstreamResult.Value.Data()); + + std::vector<IoHash> References; + Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + + if (!References.empty()) + { + zen::CbObjectWriter Idx; + Idx.BeginArray("references"); + for (const IoHash& Hash : References) + { + Idx.AddHash(Hash); + } + Idx.EndArray(); + + Value.IndexData = Idx.Save(); + } + } + else + { + Value.Value = IoBuffer(); + Success = false; + m_Log.warn("Upstream cache record '{}/{}' failed validation", Ref.BucketSegment, Ref.HashKey); + } + } - throw; + if (Success) + { + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); + } } } @@ -248,140 +229,130 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req case kPut: { - if (zen::IoBuffer Body = Request.ReadPayload()) - { - if (Body.Size() == 0) - { - return Request.WriteResponse(zen::HttpResponse::BadRequest); - } + zen::IoBuffer Body = Request.ReadPayload(); - ZenCacheValue Value; - Value.Value = Body; + if (!Body || Body.Size() == 0) + { + return Request.WriteResponse(zen::HttpResponse::BadRequest); + } - HttpContentType ContentType = Request.RequestContentType(); + const HttpContentType ContentType = Request.RequestContentType(); - bool IsCompactBinary; + bool IsCompactBinary = false; - switch (ContentType) - { - case HttpContentType::kUnknownContentType: - case HttpContentType::kBinary: - IsCompactBinary = false; - break; + switch (ContentType) + { + case HttpContentType::kUnknownContentType: + case HttpContentType::kBinary: + IsCompactBinary = false; + break; - case HttpContentType::kCbObject: - IsCompactBinary = true; - break; + case HttpContentType::kCbObject: + IsCompactBinary = true; + break; - default: - return Request.WriteResponse(zen::HttpResponse::BadRequest); - } + default: + return Request.WriteResponse(zen::HttpResponse::BadRequest); + } - // Compute index data + if (!IsCompactBinary) + { + // TODO: create a cache record and put value in CAS? + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + m_Log.debug("PUT (binary) - '{}/{}' ({} bytes, {})", + Ref.BucketSegment, + Ref.HashKey, + Body.Size(), + Body.GetContentType()); - if (IsCompactBinary) + if (m_UpstreamCache) { - // Validate payload before accessing it - zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - - if (ValidationResult != CbValidateError::None) - { - m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); - - // TODO: add details in response - return Request.WriteResponse(HttpResponse::BadRequest); - } - - // Extract data for index - zen::CbObjectView Cbo(Body.Data()); + auto Result = m_UpstreamCache->EnqueueUpstream( + {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); + ZEN_ASSERT(Result.Success); + } - std::vector<IoHash> References; - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + return Request.WriteResponse(zen::HttpResponse::Created); + } - if (!References.empty()) - { - zen::CbObjectWriter Idx; - Idx.BeginArray("r"); + // Validate payload before accessing it + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - for (const IoHash& Hash : References) - { - Idx.AddHash(Hash); - } + if (ValidationResult != CbValidateError::None) + { + m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); - Idx.EndArray(); + // TODO: add details in response, kText || kCbObject? + return Request.WriteResponse(HttpResponse::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); + } - // TODO: store references in index - } - } + // Extract referenced payload hashes + zen::CbObjectView Cbo(Body.Data()); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); + std::vector<IoHash> References; + std::vector<IoHash> MissingRefs; + Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); - m_Log.debug("PUT - '{}/{}' ({} bytes, {})", - Ref.BucketSegment, - Ref.HashKey, - Value.Value.Size(), - Value.Value.GetContentType()); + ZenCacheValue CacheValue; + CacheValue.Value = Body; - // absolutely be made asynchronous. By default these should be deferred - // because the client should not care if the data has propagated upstream or - // not + if (!References.empty()) + { + zen::CbObjectWriter Idx; + Idx.BeginArray("references"); - if (m_ZenClient) + for (const IoHash& Hash : References) { - ZenStructuredCacheSession Session(*m_ZenClient); - - zen::Stopwatch Timer; - - try - { - Session.Put(Ref.BucketSegment, Ref.HashKey, Value); - - m_Log.debug("Zen upstream PUT ({}) succeeded after {:5}!", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); - } - catch (std::exception& e) + Idx.AddHash(Hash); + if (!m_CidStore.ContainsChunk(Hash)) { - m_Log.warn("Zen upstream PUT ({}) FAILED after {:5}: '{}'", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); - - throw; + MissingRefs.push_back(Hash); } } - if (m_Cloud) - { - CloudCacheSession Session(m_Cloud); + Idx.EndArray(); - zen::Stopwatch Timer; + CacheValue.IndexData = Idx.Save(); + } - try - { - Session.Put(Ref.BucketSegment, Ref.HashKey, Value); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); - m_Log.debug("upstream PUT ({}) succeeded after {:5}!", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); - } - catch (std::exception& e) - { - m_Log.warn("upstream PUT ({}) FAILED after {:5}: '{}'", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + m_Log.debug("PUT (cache record) - '{}/{}' ({} bytes, {}, ({}/{} refs/missing))", + Ref.BucketSegment, + Ref.HashKey, + CacheValue.Value.Size(), + CacheValue.Value.GetContentType(), + References.size(), + MissingRefs.size()); - throw; - } + if (MissingRefs.empty()) + { + // Only enqueue valid cache records, i.e. all referenced payloads exists + if (m_UpstreamCache) + { + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(References)}); + ZEN_ASSERT(Result.Success); } return Request.WriteResponse(zen::HttpResponse::Created); } else { - return; + // TODO: Binary attachments? + zen::CbObjectWriter Response; + Response.BeginArray("needs"); + for (const IoHash& MissingRef : MissingRefs) + { + Response.AddHash(MissingRef); + m_Log.debug("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); + } + Response.EndArray(); + + // Return Created | BadRequest? + return Request.WriteResponse(zen::HttpResponse::Created, Response.Save()); } } break; @@ -412,6 +383,26 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re { zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + if (!Payload && m_UpstreamCache) + { + if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); + UpstreamResult.Success) + { + if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Payload))) + { + Payload = UpstreamResult.Payload; + zen::IoHash ChunkHash = zen::IoHash::HashMemory(Payload); + zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); + + m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + } + else + { + m_Log.warn("got uncompressed upstream cache payload"); + } + } + } + if (!Payload) { m_Log.debug("MISS - '{}/{}/{}' ({} bytes, {})", @@ -512,7 +503,7 @@ HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, Cach return false; } - OutRef.BucketSegment = Key.substr(0, BucketSplitOffset); + OutRef.BucketSegment = ToLower(Key.substr(0, BucketSplitOffset)); if (!std::all_of(begin(OutRef.BucketSegment), end(OutRef.BucketSegment), [](const char c) { return std::isalnum(c); })) { diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 73b0825dc..b90301d84 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -3,18 +3,17 @@ #pragma once #include <zencore/httpserver.h> -#include <zencore/refcount.h> - -#include "structuredcachestore.h" -#include "upstream/jupiter.h" #include <spdlog/spdlog.h> +#include <memory> + +class ZenCacheStore; namespace zen { -class CloudCacheClient; +class CasStore; class CidStore; -class ZenStructuredCacheClient; +class UpstreamCache; /** * Structured cache service. Imposes constraints on keys, supports blobs and @@ -47,7 +46,10 @@ class ZenStructuredCacheClient; class HttpStructuredCacheService : public zen::HttpService { public: - HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore); + HttpStructuredCacheService(ZenCacheStore& InCacheStore, + zen::CasStore& InCasStore, + zen::CidStore& InCidStore, + std::unique_ptr<UpstreamCache> UpstreamCache); ~HttpStructuredCacheService(); virtual const char* BaseUri() const override; @@ -69,12 +71,11 @@ private: void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); - spdlog::logger m_Log; - zen::CasStore& m_CasStore; - zen::CidStore& m_CidStore; - ZenCacheStore m_CacheStore; - RefPtr<CloudCacheClient> m_Cloud; - RefPtr<ZenStructuredCacheClient> m_ZenClient; + spdlog::logger m_Log; + ZenCacheStore& m_CacheStore; + zen::CasStore& m_CasStore; + zen::CidStore& m_CidStore; + std::unique_ptr<UpstreamCache> m_UpstreamCache; }; } // namespace zen diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 79bcf4838..140ff1853 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -31,6 +31,7 @@ using namespace fmt::literals; ZenCacheStore::ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} { + spdlog::info("initializing structured cache at '{}'", RootDir); zen::CreateDirectories(RootDir); } diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 6d725e55b..6e8b48703 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -95,6 +95,12 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z cxxopts::value<bool>(ServiceConfig.ShouldCrash)->default_value("false"), ""); + options.add_option("cache", + "", + "enable-upstream-cache", + "Whether upstream caching is enabled", + cxxopts::value<bool>(ServiceConfig.UpstreamCacheEnabled)->default_value("false"), + ""); try { auto result = options.parse(argc, argv); @@ -164,9 +170,11 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv throw std::exception("fatal zen global config script ({}) failure: {}"_format(ConfigScript, e.what()).c_str()); } - ServiceConfig.LegacyCacheEnabled = lua["legacycache"]["enable"]; - const std::string path = lua["legacycache"]["readpath"]; - ServiceConfig.StructuredCacheEnabled = lua["structuredcache"]["enable"]; - ServiceConfig.MeshEnabled = lua["mesh"]["enable"]; + + ServiceConfig.LegacyCacheEnabled = lua["legacycache"]["enable"].get_or(ServiceConfig.LegacyCacheEnabled); + const std::string path = lua["legacycache"]["readpath"].get_or(std::string()); + ServiceConfig.StructuredCacheEnabled = lua["structuredcache"]["enable"].get_or(ServiceConfig.StructuredCacheEnabled); + ServiceConfig.MeshEnabled = lua["mesh"]["enable"].get_or(ServiceConfig.MeshEnabled); + ServiceConfig.UpstreamCacheEnabled = lua["structuredcache"]["upstream"]["enable"].get_or(ServiceConfig.UpstreamCacheEnabled); } } diff --git a/zenserver/config.h b/zenserver/config.h index 53538314d..04498a520 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -20,6 +20,7 @@ struct ZenServiceConfig { bool LegacyCacheEnabled = false; bool StructuredCacheEnabled = true; + bool UpstreamCacheEnabled = false; bool ShouldCrash = false; // Option for testing crash handling bool MeshEnabled = false; // Experimental p2p mesh discovery std::string FlockId; // Id for grouping test instances into sets diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp index 2583784c3..23933f12e 100644 --- a/zenserver/diag/logging.cpp +++ b/zenserver/diag/logging.cpp @@ -4,13 +4,14 @@ #include "config.h" -#include <zencore/string.h> #include <spdlog/pattern_formatter.h> #include <spdlog/sinks/ansicolor_sink.h> #include <spdlog/sinks/basic_file_sink.h> #include <spdlog/sinks/stdout_color_sinks.h> #include <spdlog/spdlog.h> +#include <zencore/string.h> #include <memory> +#include "spdlog/sinks/basic_file_sink.h" // Custom logging -- test code, this should be tweaked @@ -188,20 +189,38 @@ InitializeLogging(const ZenServerOptions& GlobalOptions) { EnableVTMode(); - std::filesystem::path LogPath = GlobalOptions.DataDir / "logs/zenserver.txt"; + std::filesystem::path LogPath = GlobalOptions.DataDir / "logs/zenserver.log"; + + spdlog::level::level_enum LogLevel = spdlog::level::debug; - auto file_sink = std::make_shared<spdlog::sinks::basic_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()), /* truncate */ true); - file_sink->set_level(spdlog::level::trace); + // Sinks + auto ConsoleSink = std::make_shared<spdlog::sinks::ansicolor_stdout_sink_mt>(); + auto FileSink = std::make_shared<spdlog::sinks::basic_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()), /* truncate */ true); - auto& sinks = spdlog::default_logger()->sinks(); - sinks.clear(); - sinks.push_back(std::make_shared<spdlog::sinks::ansicolor_stdout_sink_mt>()); - sinks.push_back(file_sink); + // Default + auto DefaultLogger = spdlog::default_logger(); + auto& Sinks = spdlog::default_logger()->sinks(); + Sinks.clear(); + Sinks.push_back(ConsoleSink); + Sinks.push_back(FileSink); + DefaultLogger->set_level(LogLevel); - spdlog::set_level(spdlog::level::debug); + // Jupiter - only log HTTP traffic to file + auto JupiterLogger = std::make_shared<spdlog::logger>("jupiter", FileSink); + spdlog::register_logger(JupiterLogger); + JupiterLogger->set_level(LogLevel); + + spdlog::set_level(LogLevel); spdlog::set_formatter(std::make_unique<logging::full_formatter>(GlobalOptions.LogId, std::chrono::system_clock::now())); } +void +ShutdownLogging() +{ + spdlog::drop_all(); + spdlog::shutdown(); +} + spdlog::logger& ConsoleLog() { @@ -211,3 +230,30 @@ ConsoleLog() return *ConLogger; } + +namespace zen::logging { + +spdlog::logger& +Default() +{ + return *spdlog::default_logger(); +} + +spdlog::logger& +Get(std::string_view Name) +{ + std::shared_ptr<spdlog::logger> Logger = spdlog::get(std::string(Name)); + if (!Logger) + { + Logger = std::make_shared<spdlog::logger>(std::string(Name), + begin(spdlog::default_logger()->sinks()), + end(spdlog::default_logger()->sinks())); + + Logger->set_level(spdlog::default_logger()->level()); + spdlog::register_logger(Logger); + } + + return *Logger; +} + +} // namespace zen::logging diff --git a/zenserver/diag/logging.h b/zenserver/diag/logging.h index 52ec5d1bd..bc93898ad 100644 --- a/zenserver/diag/logging.h +++ b/zenserver/diag/logging.h @@ -7,4 +7,14 @@ struct ZenServerOptions; void InitializeLogging(const ZenServerOptions& GlobalOptions); +void ShutdownLogging(); + spdlog::logger& ConsoleLog(); + +namespace zen::logging { + +spdlog::logger& Default(); + +spdlog::logger& Get(std::string_view Name); + +} // namespace zen::logging diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 977bcc712..09be2c776 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -3,6 +3,7 @@ #include "jupiter.h" #include "cache/structuredcachestore.h" +#include "diag/logging.h" #include <fmt/format.h> #include <zencore/compactbinary.h> @@ -25,7 +26,6 @@ # pragma comment(lib, "Wldap32.lib") #endif -#include <spdlog/spdlog.h> #include <json11.hpp> using namespace std::literals; @@ -51,9 +51,47 @@ namespace detail { CloudCacheClient& OwnerClient; cpr::Session Session; }; + + void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response) + { + std::string_view ContentType = "unknown"sv; + if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) + { + ContentType = It->second; + } + + const double Bytes = Verb == "GET"sv ? Response.downloaded_bytes : Response.uploaded_bytes; + + const bool IsBinary = + ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv || ContentType == "application/octet-stream"; + + if (IsBinary) + { + Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}' '{}' Bytes, Reason: '{}'", + Verb, + Response.url.str(), + Response.status_code, + Response.elapsed, + ContentType, + Bytes, + Response.reason); + } + else + { + Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}': '{}', Reason: '{}'", + Verb, + Response.url.str(), + Response.status_code, + Response.elapsed, + ContentType, + Response.text, + Response.reason); + } + } + } // namespace detail -CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_CacheClient(OuterClient) +CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_Log(OuterClient->Logger()), m_CacheClient(OuterClient) { m_SessionState = m_CacheClient->AllocSessionState(); } @@ -63,94 +101,163 @@ CloudCacheSession::~CloudCacheSession() m_CacheClient->FreeSessionState(m_SessionState); } -#define TESTING_PREFIX "aaaaa" - -IoBuffer -CloudCacheSession::Get(std::string_view BucketId, std::string_view Key) +CloudCacheResult +CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key) { + std::string Auth; + m_CacheClient->AcquireAccessToken(Auth); + ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key << ".raw"; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw"; - auto& Session = m_SessionState->Session; - Session.SetUrl(cpr::Url{Uri.c_str()}); + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", Auth}}); cpr::Response Response = Session.Get(); - if (!Response.error) + detail::Log(m_Log, "GET"sv, Response); + + if (Response.status_code == 200) { - return IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); + return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; } - return {}; + return {.Success = false}; } -void -CloudCacheSession::Put(std::string_view BucketId, std::string_view Key, IoBuffer Data) +CloudCacheResult +CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key) { + return GetDerivedData(BucketId, Key.ToHexString()); +} + +CloudCacheResult +CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key) +{ + std::string Auth; + m_CacheClient->AcquireAccessToken(Auth); + ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" + << Key.ToHexString(); - auto& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-cb"}}); - IoHash Hash = IoHash::HashMemory(Data.Data(), Data.Size()); + cpr::Response Response = Session.Get(); + detail::Log(m_Log, "GET"sv, Response); + if (Response.status_code == 200) + { + return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; + } + + return {.Success = false}; +} + +CloudCacheResult +CloudCacheSession::GetCompressedBlob(const IoHash& Key) +{ std::string Auth; m_CacheClient->AcquireAccessToken(Auth); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption( - cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}}); - Session.SetOption(cpr::Body{(const char*)Data.Data(), Data.Size()}); + Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-comp"}}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Get(); + detail::Log(m_Log, "GET"sv, Response); - if (Response.error) + if (Response.status_code == 200) { - spdlog::warn("PUT failed: '{}'", Response.error.message); + return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; } + + return {.Success = false}; } -void -CloudCacheSession::Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data) +CloudCacheResult +CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData) { + IoHash Hash = IoHash::HashMemory(DerivedData.Data(), DerivedData.Size()); + + std::string Auth; + m_CacheClient->AcquireAccessToken(Auth); + ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; auto& Session = m_SessionState->Session; + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption( + cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}}); + Session.SetBody(cpr::Body{(const char*)DerivedData.Data(), DerivedData.Size()}); + + cpr::Response Response = Session.Put(); + detail::Log(m_Log, "PUT"sv, Response); + + return {.Success = Response.status_code == 200}; +} + +CloudCacheResult +CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData) +{ + return PutDerivedData(BucketId, Key.ToHexString(), DerivedData); +} + +CloudCacheResult +CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref) +{ + IoHash Hash = IoHash::HashMemory(Ref.Data(), Ref.Size()); + std::string Auth; m_CacheClient->AcquireAccessToken(Auth); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" + << Key.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption( + cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()}); - if (Data.Value.GetContentType() == ZenContentType::kCbObject) - { - CbObjectView Cbo(Data.Value.Data()); - const IoHash Hash = Cbo.GetHash(); - const MemoryView DataView = Cbo.GetView(); + cpr::Response Response = Session.Put(); + detail::Log(m_Log, "PUT"sv, Response); - Session.SetOption( - cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + return {.Success = Response.status_code == 200}; +} - Session.SetOption(cpr::Body{reinterpret_cast<const char*>(DataView.GetData()), DataView.GetSize()}); - } - else - { - const IoHash Hash = IoHash::HashMemory(Data.Value.Data(), Data.Value.Size()); +CloudCacheResult +CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) +{ + std::string Auth; + m_CacheClient->AcquireAccessToken(Auth); - Session.SetOption( - cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - Session.SetOption(cpr::Body{reinterpret_cast<const char*>(Data.Value.Data()), Data.Value.Size()}); - } + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Content-Type", "application/x-ue-comp"}}); + Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); cpr::Response Response = Session.Put(); + detail::Log(m_Log, "PUT"sv, Response); - if (Response.error) - { - spdlog::warn("PUT failed: '{}'", Response.error.message); - } + return {.Success = Response.status_code == 200}; } std::vector<IoHash> @@ -158,7 +265,7 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& { ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/s/" << m_CacheClient->Namespace(); + Uri << "/api/v1/s/" << m_CacheClient->DdcNamespace(); ZEN_UNUSED(BucketId, ChunkHashes); @@ -167,17 +274,6 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& ////////////////////////////////////////////////////////////////////////// -IoBuffer -CloudCacheSession::Get(std::string_view BucketId, const IoHash& Key) -{ - StringBuilder<64> KeyString; - Key.ToHexString(KeyString); - - return Get(BucketId, KeyString); -} - -////////////////////////////////////////////////////////////////////////// - std::string CloudCacheAccessToken::GetAuthorizationHeaderValue() { @@ -197,27 +293,30 @@ CloudCacheAccessToken::SetToken(std::string_view Token) ////////////////////////////////////////////////////////////////////////// // // ServiceUrl: https://jupiter.devtools.epicgames.com -// Namespace: ue4.ddc +// DdcNamespace: ue4.ddc // OAuthClientId: 0oao91lrhqPiAlaGD0x7 // OAuthProvider: https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token // OAuthSecret: -GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d // CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, - std::string_view Namespace, + std::string_view DdcNamespace, + std::string_view BlobStoreNamespace, std::string_view OAuthProvider, std::string_view OAuthClientId, std::string_view OAuthSecret) -: m_ServiceUrl(ServiceUrl) +: m_Log(zen::logging::Get("jupiter")) +, m_ServiceUrl(ServiceUrl) , m_OAuthFullUri(OAuthProvider) -, m_Namespace(Namespace) +, m_DdcNamespace(DdcNamespace) +, m_BlobStoreNamespace(BlobStoreNamespace) , m_DefaultBucket("default") , m_OAuthClientId(OAuthClientId) , m_OAuthSecret(OAuthSecret) { if (!OAuthProvider.starts_with("http://"sv) && !OAuthProvider.starts_with("https://"sv)) { - spdlog::warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str()); + m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str()); m_IsValid = false; return; @@ -229,7 +328,7 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, if (SchemePos == std::string::npos) { - spdlog::warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl); + m_Log.warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl); m_IsValid = false; return; @@ -239,7 +338,7 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, if (DomainEnd == std::string::npos) { - spdlog::warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl); + m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl); m_IsValid = false; return; diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 2ed458142..61d1bd99c 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -6,6 +6,8 @@ #include <zencore/refcount.h> #include <zencore/thread.h> +#include <spdlog/spdlog.h> + #include <atomic> #include <list> #include <memory> @@ -39,6 +41,12 @@ private: std::atomic<uint32_t> m_Serial; }; +struct CloudCacheResult +{ + IoBuffer Value; + bool Success = false; +}; + /** * Context for performing Jupiter operations * @@ -52,17 +60,20 @@ public: CloudCacheSession(CloudCacheClient* OuterClient); ~CloudCacheSession(); - // Key-value cache operations - IoBuffer Get(std::string_view BucketId, std::string_view Key); - void Put(std::string_view BucketId, std::string_view Key, IoBuffer Data); + CloudCacheResult GetDerivedData(std::string_view BucketId, std::string_view Key); + CloudCacheResult GetDerivedData(std::string_view BucketId, const IoHash& Key); + CloudCacheResult GetRef(std::string_view BucketId, const IoHash& Key); + CloudCacheResult GetCompressedBlob(const IoHash& Key); - // Structured cache operations - IoBuffer Get(std::string_view BucketId, const IoHash& Key); - void Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data); + CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData); + CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData); + CloudCacheResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref); + CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob); std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); private: + spdlog::logger& m_Log; RefPtr<CloudCacheClient> m_CacheClient; detail::CloudCacheSessionState* m_SessionState; }; @@ -74,28 +85,35 @@ class CloudCacheClient : public RefCounted { public: CloudCacheClient(std::string_view ServiceUrl, - std::string_view Namespace, + std::string_view DdcNamespace, + std::string_view BlobStoreNamespace, std::string_view OAuthProvider, std::string_view OAuthClientId, std::string_view OAuthSecret); ~CloudCacheClient(); bool AcquireAccessToken(std::string& AuthorizationHeaderValue); - std::string_view Namespace() const { return m_Namespace; } + std::string_view DdcNamespace() const { return m_DdcNamespace; } + std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; } std::string_view DefaultBucket() const { return m_DefaultBucket; } std::string_view ServiceUrl() const { return m_ServiceUrl; } + bool IsValid() const { return m_IsValid; } + + spdlog::logger& Logger() { return m_Log; } private: - bool m_IsValid = false; + spdlog::logger& m_Log; std::string m_ServiceUrl; std::string m_OAuthDomain; std::string m_OAuthUriPath; std::string m_OAuthFullUri; - std::string m_Namespace; + std::string m_DdcNamespace; + std::string m_BlobStoreNamespace; std::string m_DefaultBucket; std::string m_OAuthClientId; std::string m_OAuthSecret; CloudCacheAccessToken m_AccessToken; + bool m_IsValid = false; RwLock m_SessionStateLock; std::list<detail::CloudCacheSessionState*> m_SessionStateCache; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp new file mode 100644 index 000000000..ecd51a706 --- /dev/null +++ b/zenserver/upstream/upstreamcache.cpp @@ -0,0 +1,347 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "upstreamcache.h" +#include "jupiter.h" +#include "zen.h" + +#include <zencore/fmtutils.h> +#include <zencore/timer.h> +#include <zenstore/cas.h> +#include <zenstore/cidstore.h> + +#include "cache/structuredcachestore.h" +#include "diag/logging.h" + +#include <atomic> +#include <deque> +#include <thread> + +namespace zen { + +using namespace std::literals; + +namespace detail { + + template<typename T> + class BlockingQueue + { + public: + BlockingQueue() = default; + + ~BlockingQueue() { CompleteAdding(); } + + void Enqueue(T&& Item) + { + { + std::lock_guard Lock(m_Lock); + m_Queue.emplace_back(std::move(Item)); + } + + m_NewItemSignal.notify_one(); + } + + bool WaitAndDequeue(T& Item) + { + if (m_CompleteAdding.load()) + { + return false; + } + + std::unique_lock Lock(m_Lock); + m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); + + if (!m_Queue.empty()) + { + Item = std::move(m_Queue.front()); + m_Queue.pop_front(); + + return true; + } + + return false; + } + + void CompleteAdding() + { + if (!m_CompleteAdding.load()) + { + m_CompleteAdding.store(true); + m_NewItemSignal.notify_all(); + } + } + + std::size_t Num() const + { + std::unique_lock Lock(m_Lock); + return m_Queue.size(); + } + + private: + mutable std::mutex m_Lock; + std::condition_variable m_NewItemSignal; + std::deque<T> m_Queue; + std::atomic_bool m_CompleteAdding{false}; + }; + +} // namespace detail + +////////////////////////////////////////////////////////////////////////// + +class DefaultUpstreamCache final : public UpstreamCache +{ +public: + DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) + : m_Log(zen::logging::Get("upstream")) + , m_Options(Options) + , m_CacheStore(CacheStore) + , m_CidStore(CidStore) + { + if (m_Options.JupiterEnabled) + { + m_CloudClient = new CloudCacheClient(m_Options.JupiterEndpoint, + m_Options.JupiterDdcNamespace, + m_Options.JupiterBlobStoreNamespace, + m_Options.JupiterOAuthProvider, + m_Options.JupiterOAuthClientId, + m_Options.JupiterOAuthSecret); + + std::string TmpAuthHeader; + if (m_CloudClient->AcquireAccessToken(TmpAuthHeader) && m_CloudClient->IsValid()) + { + m_Log.info("using Jupiter endpoint: '{}', DDC namespace: '{}', Blob Store namespace: '{}'", + m_Options.JupiterEndpoint, + m_Options.JupiterDdcNamespace, + m_Options.JupiterBlobStoreNamespace); + } + else + { + m_Log.warn("failed to initialized Jupiter at '{}'", m_Options.JupiterEndpoint); + } + } + + m_IsRunning = m_CloudClient && m_CloudClient->IsValid(); + + if (m_IsRunning) + { + m_Log.info("using '{}' upstream thread(s)", m_Options.ThreadCount); + for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) + { + m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); + } + } + else + { + m_Log.warn("upstream disabled, no valid endpoints"); + } + } + + virtual ~DefaultUpstreamCache() { Shutdown(); } + + virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + { + if (m_CloudClient && m_CloudClient->IsValid()) + { + zen::Stopwatch Timer; + + try + { + CloudCacheSession Session(m_CloudClient); + CloudCacheResult Result; + + if (Type == ZenContentType::kBinary) + { + Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash); + } + else + { + Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash); + } + + return {.Value = Result.Value, .Success = Result.Success}; + } + catch (std::exception& e) + { + m_Log.warn("get cache record ({}/{}) FAILED after {:5}: '{}'", + CacheKey.Bucket, + CacheKey.Hash, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + } + } + + return {}; + } + + virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + { + if (m_CloudClient && m_CloudClient->IsValid()) + { + zen::Stopwatch Timer; + + try + { + CloudCacheSession Session(m_CloudClient); + + CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + return {.Payload = Result.Value, .Success = Result.Success}; + } + catch (std::exception& e) + { + m_Log.warn("get cache payload ({}/{}/{}) FAILED after {:5}: '{}'", + PayloadKey.CacheKey.Bucket, + PayloadKey.CacheKey.Hash, + PayloadKey.PayloadId, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + } + } + + return {}; + } + + virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + { + if (m_IsRunning.load()) + { + m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + return {.Success = true}; + } + + return {}; + } + +private: + void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) + { + const uint32_t MaxAttempts = 3; + + if (m_CloudClient && m_CloudClient->IsValid()) + { + CloudCacheSession Session(m_CloudClient); + ZenCacheValue CacheValue; + if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue)) + { + m_Log.warn("process upstream FAILED, '{}/{}' doesn't exist in cache", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash); + return; + } + + if (CacheRecord.Type == ZenContentType::kBinary) + { + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); + } + + if (!Result.Success) + { + m_Log.warn("upload (binary) '{}/{}' FAILED after '{}' attempts", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + MaxAttempts); + } + } + else + { + ZEN_ASSERT(CacheRecord.Type == ZenContentType::kCbObject); + + CloudCacheResult Result; + for (const IoHash& PayloadId : CacheRecord.PayloadIds) + { + Result.Success = false; + if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) + { + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutCompressedBlob(PayloadId, Payload); + } + } + + if (!Result.Success) + { + m_Log.warn("upload payload '{}' FAILED after '{}' attempts", PayloadId, MaxAttempts); + break; + } + } + + if (Result.Success) + { + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); + } + } + + if (!Result.Success) + { + m_Log.warn("upload cache record '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); + } + } + } + } + + void ProcessUpstreamQueue() + { + for (;;) + { + UpstreamCacheRecord CacheRecord; + if (m_UpstreamQueue.WaitAndDequeue(CacheRecord)) + { + try + { + ProcessCacheRecord(std::move(CacheRecord)); + } + catch (std::exception& e) + { + m_Log.warn("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what()); + } + } + + if (!m_IsRunning.load()) + { + break; + } + } + } + + void Shutdown() + { + if (m_IsRunning.load()) + { + m_IsRunning.store(false); + m_UpstreamQueue.CompleteAdding(); + + for (std::thread& Thread : m_UpstreamThreads) + { + Thread.join(); + } + + m_UpstreamThreads.clear(); + } + } + + using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>; + + spdlog::logger& m_Log; + UpstreamCacheOptions m_Options; + ::ZenCacheStore& m_CacheStore; + CidStore& m_CidStore; + UpstreamQueue m_UpstreamQueue; + RefPtr<CloudCacheClient> m_CloudClient; + RefPtr<ZenStructuredCacheClient> m_ZenClient; + std::vector<std::thread> m_UpstreamThreads; + std::atomic_bool m_IsRunning{false}; +}; + +////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<UpstreamCache> +MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) +{ + return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore); +} + +} // namespace zen diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h new file mode 100644 index 000000000..23a542151 --- /dev/null +++ b/zenserver/upstream/upstreamcache.h @@ -0,0 +1,82 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/zencore.h> + +#include <memory> + +class ZenCacheStore; + +namespace zen { + +class CidStore; + +struct UpstreamCacheKey +{ + std::string Bucket; + IoHash Hash; +}; + +struct UpstreamPayloadKey +{ + UpstreamCacheKey CacheKey; + IoHash PayloadId; +}; + +struct UpstreamCacheRecord +{ + ZenContentType Type = ZenContentType::kBinary; + UpstreamCacheKey CacheKey; + std::vector<IoHash> PayloadIds; +}; + +struct UpstreamCacheOptions +{ + uint32_t ThreadCount = 4; + bool JupiterEnabled = true; + std::string_view JupiterEndpoint; + std::string_view JupiterDdcNamespace; + std::string_view JupiterBlobStoreNamespace; + std::string_view JupiterOAuthProvider; + std::string_view JupiterOAuthClientId; + std::string_view JupiterOAuthSecret; +}; + +/** + * Manages one or more upstream cache endpoints. + */ +class UpstreamCache +{ +public: + virtual ~UpstreamCache() = default; + + struct GetCacheRecordResult + { + IoBuffer Value; + bool Success = false; + }; + + virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; + + struct GetCachePayloadResult + { + IoBuffer Payload; + bool Success = false; + }; + + virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; + + struct EnqueueResult + { + bool Success = false; + }; + + virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; +}; + +std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore); + +} // namespace zen diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 32a468452..395ae5fc2 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -54,12 +54,14 @@ #include "admin/admin.h" #include "cache/kvcache.h" #include "cache/structuredcache.h" +#include "cache/structuredcachestore.h" #include "compute/apply.h" #include "diag/diagsvcs.h" #include "experimental/usnjournal.h" #include "projectstore.h" #include "testing/launch.h" #include "upstream/jupiter.h" +#include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/gc.h" #include "zenstore/scrub.h" @@ -139,7 +141,28 @@ public: if (ServiceConfig.StructuredCacheEnabled) { spdlog::info("instantiating structured cache service"); - m_StructuredCacheService.reset(new zen::HttpStructuredCacheService(m_DataRoot / "cache", *m_CasStore, *m_CidStore)); + m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); + + std::unique_ptr<zen::UpstreamCache> UpstreamCache; + if (ServiceConfig.UpstreamCacheEnabled) + { + using namespace std::literals; + + zen::UpstreamCacheOptions UpstreamOptions; + UpstreamOptions.ThreadCount = 4; + UpstreamOptions.JupiterEnabled = true; + UpstreamOptions.JupiterEndpoint = "https://jupiter.devtools-dev.epicgames.com"sv; + UpstreamOptions.JupiterDdcNamespace = "ue4.ddc"sv; + UpstreamOptions.JupiterBlobStoreNamespace = "test.ddc"sv; + UpstreamOptions.JupiterOAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv; + UpstreamOptions.JupiterOAuthClientId = "0oao91lrhqPiAlaGD0x7"sv; + UpstreamOptions.JupiterOAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv; + + UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); + } + + m_StructuredCacheService.reset( + new zen::HttpStructuredCacheService(*m_CacheStore, *m_CasStore, *m_CidStore, std::move(UpstreamCache))); } else { @@ -297,6 +320,7 @@ private: zen::HttpServer m_Http; std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()}; std::unique_ptr<zen::CidStore> m_CidStore; + std::unique_ptr<ZenCacheStore> m_CacheStore; zen::CasGc m_Gc{*m_CasStore}; zen::CasScrubber m_Scrubber{*m_CasStore}; HttpTestService m_TestService; @@ -397,5 +421,7 @@ main(int argc, char* argv[]) SPDLOG_CRITICAL("Caught exception in main: {}", e.what()); } + ShutdownLogging(); + return 0; } diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index 6c87b4a68..9b2fc891e 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -120,6 +120,7 @@ <ClInclude Include="diag\diagsvcs.h" /> <ClInclude Include="experimental\usnjournal.h" /> <ClInclude Include="targetver.h" /> + <ClInclude Include="upstream\upstreamcache.h" /> <ClInclude Include="upstream\zen.h" /> <ClInclude Include="vfs.h" /> </ItemGroup> @@ -138,6 +139,7 @@ <ClCompile Include="cache\cachestore.cpp" /> <ClCompile Include="casstore.cpp" /> <ClCompile Include="experimental\usnjournal.cpp" /> + <ClCompile Include="upstream\upstreamcache.cpp" /> <ClCompile Include="upstream\zen.cpp" /> <ClCompile Include="vfs.cpp" /> <ClCompile Include="zenserver.cpp" /> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index 79fcfb803..1fe902731 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -38,6 +38,9 @@ <ClInclude Include="cache\structuredcachestore.h" /> <ClInclude Include="compute\apply.h" /> <ClInclude Include="sos\sos.h" /> + <ClInclude Include="upstream\upstreamcache.h"> + <Filter>upstream</Filter> + </ClInclude> </ItemGroup> <ItemGroup> <ClCompile Include="zenserver.cpp" /> @@ -71,6 +74,9 @@ <ClCompile Include="cache\structuredcachestore.cpp" /> <ClCompile Include="compute\apply.cpp" /> <ClCompile Include="sos\sos.cpp" /> + <ClCompile Include="upstream\upstreamcache.cpp"> + <Filter>upstream</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <Filter Include="cache"> diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index 3460e1df2..4e5188f1c 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -36,14 +36,29 @@ struct CidStore::CidState IoBuffer FindChunkByCid(const IoHash& DecompressedId) { - if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) + IoHash CompressedHash; { - return m_CasStore.FindChunk(It->second); + RwLock::SharedLockScope _(m_Lock); + if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) + { + CompressedHash = It->second; + } + } + + if (CompressedHash != IoHash::Zero) + { + return m_CasStore.FindChunk(CompressedHash); } return IoBuffer(); } + bool ContainsChunk(const IoHash& DecompressedId) + { + RwLock::SharedLockScope _(m_Lock); + return m_CidMap.find(DecompressedId) != m_CidMap.end(); + } + void InitializeIndex(const std::filesystem::path& RootDir) { zen::CreateDirectories(RootDir); @@ -84,6 +99,12 @@ CidStore::FindChunkByCid(const IoHash& DecompressedId) return m_Impl->FindChunkByCid(DecompressedId); } +bool +CidStore::ContainsChunk(const IoHash& DecompressedId) +{ + return m_Impl->ContainsChunk(DecompressedId); +} + void CidStore::Flush() { diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h index edc3f5582..2c2b395a5 100644 --- a/zenstore/include/zenstore/cidstore.h +++ b/zenstore/include/zenstore/cidstore.h @@ -31,6 +31,7 @@ public: void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed); IoBuffer FindChunkByCid(const IoHash& DecompressedId); + bool ContainsChunk(const IoHash& DecompressedId); void Flush(); private: |