diff options
| author | Per Larsson <[email protected]> | 2021-09-03 15:37:19 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2021-09-03 15:37:19 +0200 |
| commit | c04fa527593da17c719c4d899ec19caeb6480a94 (patch) | |
| tree | fa65fd7585d55712bad546d76a0fc3518023a905 | |
| parent | oops: Fixed AssertException implementation namespace (diff) | |
| download | zen-c04fa527593da17c719c4d899ec19caeb6480a94.tar.xz zen-c04fa527593da17c719c4d899ec19caeb6480a94.zip | |
Zen upstream support (#7)
| -rw-r--r-- | zenserver/cache/kvcache.cpp | 15 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 36 | ||||
| -rw-r--r-- | zenserver/config.cpp | 130 | ||||
| -rw-r--r-- | zenserver/config.h | 36 | ||||
| -rw-r--r-- | zenserver/diag/logging.cpp | 4 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 38 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 19 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 416 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 64 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 130 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 25 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 64 |
12 files changed, 712 insertions, 265 deletions
diff --git a/zenserver/cache/kvcache.cpp b/zenserver/cache/kvcache.cpp index a044b9dd0..d78329ee9 100644 --- a/zenserver/cache/kvcache.cpp +++ b/zenserver/cache/kvcache.cpp @@ -65,13 +65,14 @@ HttpKvCacheService::AccessTracker::TrackAccess(std::string_view Key) HttpKvCacheService::HttpKvCacheService() { - m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv, - "ue4.ddc"sv /* namespace */, - "test.ddc"sv /* blob store namespace */, - "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */, - "0oao91lrhqPiAlaGD0x7"sv /* client id */, - "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */); - + m_Cloud = new CloudCacheClient(CloudCacheClientOptions{ + .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, + .DdcNamespace = "ue4.ddc"sv, + .BlobStoreNamespace = "test.ddc"sv, + .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, + .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, + .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, + }); m_AccessTracker = std::make_unique<AccessTracker>(); } diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 00b3b545b..0e235a9be 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -149,7 +149,8 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req case kGet: { ZenCacheValue Value; - bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); + bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); + bool InUpstreamCache = false; if (!Success && m_UpstreamCache) { @@ -159,8 +160,9 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); UpstreamResult.Success) { - Value.Value = UpstreamResult.Value; - Success = true; + Value.Value = UpstreamResult.Value; + Success = true; + InUpstreamCache = true; if (CacheRecordType == ZenContentType::kCbObject) { @@ -215,11 +217,12 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Request.SetSuppressResponseBody(); } - m_Log.debug("HIT - '{}/{}' ({} bytes, {})", + m_Log.debug("HIT - '{}/{}' ({} bytes {}) ({})", Ref.BucketSegment, Ref.HashKey, Value.Value.Size(), - Value.Value.GetContentType()); + Value.Value.GetContentType(), + InUpstreamCache ? "upstream" : "local"); return Request.WriteResponse(zen::HttpResponse::OK, MapToHttpContentType(Value.Value.GetContentType()), Value.Value); } @@ -267,7 +270,6 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req { auto Result = m_UpstreamCache->EnqueueUpstream( {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); - ZEN_ASSERT(Result.Success); } return Request.WriteResponse(zen::HttpResponse::Created); @@ -332,7 +334,6 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, .CacheKey = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(References)}); - ZEN_ASSERT(Result.Success); } return Request.WriteResponse(zen::HttpResponse::Created); @@ -379,18 +380,20 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re case kHead: case kGet: { - zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + bool InUpstreamCache = false; if (!Payload && m_UpstreamCache) { if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); UpstreamResult.Success) { - if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Payload))) + if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { - Payload = UpstreamResult.Payload; + Payload = UpstreamResult.Value; zen::IoHash ChunkHash = zen::IoHash::HashBuffer(Payload); zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); + InUpstreamCache = true; m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); } @@ -403,22 +406,17 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re if (!Payload) { - m_Log.debug("MISS - '{}/{}/{}' ({} bytes, {})", - Ref.BucketSegment, - Ref.HashKey, - Ref.PayloadId, - Payload.Size(), - Payload.GetContentType()); - + m_Log.debug("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); return Request.WriteResponse(zen::HttpResponse::NotFound); } - m_Log.debug("HIT - '{}/{}/{}' ({} bytes, {})", + m_Log.debug("HIT - '{}/{}/{}' ({} bytes, {}) ({})", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, Payload.Size(), - Payload.GetContentType()); + Payload.GetContentType(), + InUpstreamCache ? "upstream" : "local"); if (Verb == kHead) { diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 6e8b48703..8fe9f72f2 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -97,10 +97,74 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z options.add_option("cache", "", - "enable-upstream-cache", - "Whether upstream caching is enabled", - cxxopts::value<bool>(ServiceConfig.UpstreamCacheEnabled)->default_value("false"), + "upstream-jupiter-url", + "URL to a Jupiter instance", + cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.Url)->default_value(""), ""); + + options.add_option("cache", + "", + "upstream-jupiter-oauth-url", + "URL to the OAuth provier", + cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.OAuthProvider)->default_value(""), + ""); + + options.add_option("cache", + "", + "upstream-jupiter-oauth-clientid", + "The OAuth client ID", + cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.OAuthClientId)->default_value(""), + ""); + + options.add_option("cache", + "", + "upstream-jupiter-oauth-clientsecret", + "The OAuth client secret", + cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.OAuthClientSecret)->default_value(""), + ""); + + options.add_option("cache", + "", + "upstream-jupiter-namespace", + "The Common Blob Store API namespace", + cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.Namespace)->default_value(""), + ""); + + options.add_option("cache", + "", + "upstream-jupiter-namespace-ddc", + "The lecacy DDC namespace", + cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.DdcNamespace)->default_value(""), + ""); + + options.add_option("cache", + "", + "upstream-jupiter-dev", + "Enable Jupiter upstream caching using development settings", + cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.UseDevelopmentSettings)->default_value("false"), + ""); + + options.add_option("cache", + "", + "upstream-zen-url", + "URL to a remote Zen server instance", + cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.ZenConfig.Url)->default_value(""), + ""); + + options.add_option("cache", + "", + "upstream-enabled", + "Whether upstream caching is disabled", + cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.Enabled)->default_value("true"), + ""); + + options.add_option("cache", + "", + "upstream-thread-count", + "Number of threads used for upstream procsssing", + cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"), + ""); + try { auto result = options.parse(argc, argv); @@ -171,10 +235,60 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv throw std::exception("fatal zen global config script ({}) failure: {}"_format(ConfigScript, e.what()).c_str()); } - ServiceConfig.LegacyCacheEnabled = lua["legacycache"]["enable"].get_or(ServiceConfig.LegacyCacheEnabled); - const std::string path = lua["legacycache"]["readpath"].get_or(std::string()); - ServiceConfig.StructuredCacheEnabled = lua["structuredcache"]["enable"].get_or(ServiceConfig.StructuredCacheEnabled); - ServiceConfig.MeshEnabled = lua["mesh"]["enable"].get_or(ServiceConfig.MeshEnabled); - ServiceConfig.UpstreamCacheEnabled = lua["structuredcache"]["upstream"]["enable"].get_or(ServiceConfig.UpstreamCacheEnabled); + ServiceConfig.LegacyCacheEnabled = lua["legacycache"]["enable"].get_or(ServiceConfig.LegacyCacheEnabled); + const std::string path = lua["legacycache"]["readpath"].get_or(std::string()); + ServiceConfig.MeshEnabled = lua["mesh"]["enable"].get_or(ServiceConfig.MeshEnabled); + + auto UpdateStringValueFromConfig = [](const sol::table& Table, std::string_view Key, std::string& OutValue) { + // Update the specified config value unless it has been set, i.e. from command line + if (auto MaybeValue = Table.get<sol::optional<std::string>>(Key); MaybeValue.has_value() && OutValue.empty()) + { + OutValue = MaybeValue.value(); + } + }; + + if (sol::optional<sol::table> StructuredCacheConfig = lua["structuredcache"]) + { + ServiceConfig.StructuredCacheEnabled = StructuredCacheConfig->get_or("enable", ServiceConfig.StructuredCacheEnabled); + + if (auto UpstreamConfig = StructuredCacheConfig->get<sol::optional<sol::table>>("upstream")) + { + ServiceConfig.UpstreamCacheConfig.Enabled = UpstreamConfig->get_or("enable", ServiceConfig.UpstreamCacheConfig.Enabled); + ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount = UpstreamConfig->get_or("upstreamthreadcount", 4); + + if (auto JupiterConfig = UpstreamConfig->get<sol::optional<sol::table>>("jupiter")) + { + UpdateStringValueFromConfig(JupiterConfig.value(), + std::string_view("url"), + ServiceConfig.UpstreamCacheConfig.JupiterConfig.Url); + UpdateStringValueFromConfig(JupiterConfig.value(), + std::string_view("oauthprovider"), + ServiceConfig.UpstreamCacheConfig.JupiterConfig.OAuthProvider); + UpdateStringValueFromConfig(JupiterConfig.value(), + std::string_view("oauthclientid"), + ServiceConfig.UpstreamCacheConfig.JupiterConfig.OAuthClientId); + UpdateStringValueFromConfig(JupiterConfig.value(), + std::string_view("oauthclientsecret"), + ServiceConfig.UpstreamCacheConfig.JupiterConfig.OAuthClientSecret); + UpdateStringValueFromConfig(JupiterConfig.value(), + std::string_view("namespace"), + ServiceConfig.UpstreamCacheConfig.JupiterConfig.Namespace); + UpdateStringValueFromConfig(JupiterConfig.value(), + std::string_view("ddcnamespace"), + ServiceConfig.UpstreamCacheConfig.JupiterConfig.DdcNamespace); + + ServiceConfig.UpstreamCacheConfig.JupiterConfig.UseDevelopmentSettings = + JupiterConfig->get_or("usedevelopmentsettings", + ServiceConfig.UpstreamCacheConfig.JupiterConfig.UseDevelopmentSettings); + }; + + if (auto ZenConfig = UpstreamConfig->get<sol::optional<sol::table>>("zen")) + { + UpdateStringValueFromConfig(ZenConfig.value(), + std::string_view("url"), + ServiceConfig.UpstreamCacheConfig.ZenConfig.Url); + } + } + } } } diff --git a/zenserver/config.h b/zenserver/config.h index 04498a520..d0f5f7280 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -16,14 +16,38 @@ struct ZenServerOptions std::filesystem::path DataDir; // Root directory for state (used for testing) }; +struct ZenUpstreamJupiterConfig +{ + std::string Url; + std::string OAuthProvider; + std::string OAuthClientId; + std::string OAuthClientSecret; + std::string Namespace; + std::string DdcNamespace; + bool UseDevelopmentSettings; +}; + +struct ZenUpstreamZenConfig +{ + std::string Url; +}; + +struct ZenUpstreamCacheConfig +{ + ZenUpstreamJupiterConfig JupiterConfig; + ZenUpstreamZenConfig ZenConfig; + int UpstreamThreadCount = 4; + bool Enabled = false; +}; + struct ZenServiceConfig { - bool LegacyCacheEnabled = false; - bool StructuredCacheEnabled = true; - bool UpstreamCacheEnabled = false; - bool ShouldCrash = false; // Option for testing crash handling - bool MeshEnabled = false; // Experimental p2p mesh discovery - std::string FlockId; // Id for grouping test instances into sets + bool LegacyCacheEnabled = false; + bool StructuredCacheEnabled = true; + bool ShouldCrash = false; // Option for testing crash handling + bool MeshEnabled = false; // Experimental p2p mesh discovery + std::string FlockId; // Id for grouping test instances into sets + ZenUpstreamCacheConfig UpstreamCacheConfig; }; void ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, ZenServiceConfig& ServiceConfig); diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp index 23933f12e..89ed4ac6a 100644 --- a/zenserver/diag/logging.cpp +++ b/zenserver/diag/logging.cpp @@ -210,6 +210,10 @@ InitializeLogging(const ZenServerOptions& GlobalOptions) spdlog::register_logger(JupiterLogger); JupiterLogger->set_level(LogLevel); + auto ZenClientLogger = std::make_shared<spdlog::logger>("zenclient", FileSink); + spdlog::register_logger(ZenClientLogger); + ZenClientLogger->set_level(LogLevel); + spdlog::set_level(LogLevel); spdlog::set_formatter(std::make_unique<logging::full_formatter>(GlobalOptions.LogId, std::chrono::system_clock::now())); } diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index a59b9d317..7e22b9af9 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -52,7 +52,7 @@ namespace detail { cpr::Session Session; }; - void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response) + static void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response) { std::string_view ContentType = "unknown"sv; if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) @@ -299,24 +299,18 @@ CloudCacheAccessToken::SetToken(std::string_view Token) // OAuthSecret: -GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d // -CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, - std::string_view DdcNamespace, - std::string_view BlobStoreNamespace, - std::string_view OAuthProvider, - std::string_view OAuthClientId, - std::string_view OAuthSecret) +CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options) : m_Log(zen::logging::Get("jupiter")) -, m_ServiceUrl(ServiceUrl) -, m_OAuthFullUri(OAuthProvider) -, m_DdcNamespace(DdcNamespace) -, m_BlobStoreNamespace(BlobStoreNamespace) -, m_DefaultBucket("default") -, m_OAuthClientId(OAuthClientId) -, m_OAuthSecret(OAuthSecret) +, m_ServiceUrl(Options.ServiceUrl) +, m_OAuthFullUri(Options.OAuthProvider) +, m_DdcNamespace(Options.DdcNamespace) +, m_BlobStoreNamespace(Options.BlobStoreNamespace) +, m_OAuthClientId(Options.OAuthClientId) +, m_OAuthSecret(Options.OAuthSecret) { - if (!OAuthProvider.starts_with("http://"sv) && !OAuthProvider.starts_with("https://"sv)) + if (!Options.OAuthProvider.starts_with("http://"sv) && !Options.OAuthProvider.starts_with("https://"sv)) { - m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str()); + m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(Options.OAuthProvider).c_str()); m_IsValid = false; return; @@ -324,28 +318,28 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, // Split into host and Uri substrings - auto SchemePos = OAuthProvider.find("://"sv); + auto SchemePos = Options.OAuthProvider.find("://"sv); if (SchemePos == std::string::npos) { - m_Log.warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl); + m_Log.warn("Bad service URL passed to cloud cache client: '{}'", Options.ServiceUrl); m_IsValid = false; return; } - auto DomainEnd = OAuthProvider.find('/', /* also skip the :// */ SchemePos + 3); + auto DomainEnd = Options.OAuthProvider.find('/', /* also skip the :// */ SchemePos + 3); if (DomainEnd == std::string::npos) { - m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl); + m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", Options.ServiceUrl); m_IsValid = false; return; } - m_OAuthDomain = OAuthProvider.substr(SchemePos + 3, DomainEnd - SchemePos - 3); // epicgames.okta.com - m_OAuthUriPath = OAuthProvider.substr(DomainEnd + 1); // oauth2/..../v1/token + m_OAuthDomain = Options.OAuthProvider.substr(SchemePos + 3, DomainEnd - SchemePos - 3); // epicgames.okta.com + m_OAuthUriPath = Options.OAuthProvider.substr(DomainEnd + 1); // oauth2/..../v1/token } CloudCacheClient::~CloudCacheClient() diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 61d1bd99c..ba5e0a65a 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -78,24 +78,28 @@ private: detail::CloudCacheSessionState* m_SessionState; }; +struct CloudCacheClientOptions +{ + std::string_view ServiceUrl; + std::string_view DdcNamespace; + std::string_view BlobStoreNamespace; + std::string_view OAuthProvider; + std::string_view OAuthClientId; + std::string_view OAuthSecret; +}; + /** * Jupiter upstream cache client */ class CloudCacheClient : public RefCounted { public: - CloudCacheClient(std::string_view ServiceUrl, - std::string_view DdcNamespace, - std::string_view BlobStoreNamespace, - std::string_view OAuthProvider, - std::string_view OAuthClientId, - std::string_view OAuthSecret); + CloudCacheClient(const CloudCacheClientOptions& Options); ~CloudCacheClient(); bool AcquireAccessToken(std::string& AuthorizationHeaderValue); std::string_view DdcNamespace() const { return m_DdcNamespace; } std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; } - std::string_view DefaultBucket() const { return m_DefaultBucket; } std::string_view ServiceUrl() const { return m_ServiceUrl; } bool IsValid() const { return m_IsValid; } @@ -109,7 +113,6 @@ private: std::string m_OAuthFullUri; std::string m_DdcNamespace; std::string m_BlobStoreNamespace; - std::string m_DefaultBucket; std::string m_OAuthClientId; std::string m_OAuthSecret; CloudCacheAccessToken m_AccessToken; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index ecd51a706..40d7ebd26 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -12,6 +12,9 @@ #include "cache/structuredcachestore.h" #include "diag/logging.h" +#include <fmt/format.h> + +#include <algorithm> #include <atomic> #include <deque> #include <thread> @@ -83,70 +86,32 @@ namespace detail { std::atomic_bool m_CompleteAdding{false}; }; -} // namespace detail - -////////////////////////////////////////////////////////////////////////// - -class DefaultUpstreamCache final : public UpstreamCache -{ -public: - DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) - : m_Log(zen::logging::Get("upstream")) - , m_Options(Options) - , m_CacheStore(CacheStore) - , m_CidStore(CidStore) + class JupiterUpstreamEndpoint final : public zen::UpstreamEndpoint { - if (m_Options.JupiterEnabled) + public: + JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) { - m_CloudClient = new CloudCacheClient(m_Options.JupiterEndpoint, - m_Options.JupiterDdcNamespace, - m_Options.JupiterBlobStoreNamespace, - m_Options.JupiterOAuthProvider, - m_Options.JupiterOAuthClientId, - m_Options.JupiterOAuthSecret); - - std::string TmpAuthHeader; - if (m_CloudClient->AcquireAccessToken(TmpAuthHeader) && m_CloudClient->IsValid()) - { - m_Log.info("using Jupiter endpoint: '{}', DDC namespace: '{}', Blob Store namespace: '{}'", - m_Options.JupiterEndpoint, - m_Options.JupiterDdcNamespace, - m_Options.JupiterBlobStoreNamespace); - } - else - { - m_Log.warn("failed to initialized Jupiter at '{}'", m_Options.JupiterEndpoint); - } + using namespace fmt::literals; + m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); + m_Client = new CloudCacheClient(Options); } - m_IsRunning = m_CloudClient && m_CloudClient->IsValid(); + virtual ~JupiterUpstreamEndpoint() = default; - if (m_IsRunning) + virtual bool Initialize() override { - m_Log.info("using '{}' upstream thread(s)", m_Options.ThreadCount); - for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) - { - m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); - } + //TODO: Test and authenticate Jupiter client connection + return !m_Client->ServiceUrl().empty(); } - else - { - m_Log.warn("upstream disabled, no valid endpoints"); - } - } - virtual ~DefaultUpstreamCache() { Shutdown(); } + virtual std::string_view DisplayName() const override { return m_DisplayName; } - virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override - { - if (m_CloudClient && m_CloudClient->IsValid()) + virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { - zen::Stopwatch Timer; - try { - CloudCacheSession Session(m_CloudClient); - CloudCacheResult Result; + zen::CloudCacheSession Session(m_Client); + CloudCacheResult Result; if (Type == ZenContentType::kBinary) { @@ -161,126 +126,293 @@ public: } catch (std::exception& e) { - m_Log.warn("get cache record ({}/{}) FAILED after {:5}: '{}'", - CacheKey.Bucket, - CacheKey.Hash, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + return {.Reason = std::string(e.what()), .Success = false}; } } - return {}; - } + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + { + try + { + zen::CloudCacheSession Session(m_Client); + const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + return {.Value = Result.Value, .Success = Result.Success}; + } + catch (std::exception& e) + { + return {.Reason = std::string(e.what()), .Success = false}; + } + } - virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) override - { - if (m_CloudClient && m_CloudClient->IsValid()) + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, + IoBuffer RecordValue, + std::span<IoBuffer const> Payloads) override { - zen::Stopwatch Timer; + ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + const uint32_t MaxAttempts = 3; try { - CloudCacheSession Session(m_CloudClient); + CloudCacheSession Session(m_Client); + + if (CacheRecord.Type == ZenContentType::kBinary) + { + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue); + } - CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); - return {.Payload = Result.Value, .Success = Result.Success}; + return {.Success = Result.Success}; + } + else + { + for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + { + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + } + + if (!Result.Success) + { + return {.Reason = "Failed to upload payload", .Success = false}; + } + } + + { + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue); + } + + return {.Success = Result.Success}; + } + } } catch (std::exception& e) { - m_Log.warn("get cache payload ({}/{}/{}) FAILED after {:5}: '{}'", - PayloadKey.CacheKey.Bucket, - PayloadKey.CacheKey.Hash, - PayloadKey.PayloadId, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + return {.Reason = std::string(e.what()), .Success = false}; } } - return {}; - } + private: + std::string m_DisplayName; + RefPtr<CloudCacheClient> m_Client; + }; - virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + class ZenUpstreamEndpoint final : public zen::UpstreamEndpoint { - if (m_IsRunning.load()) + public: + ZenUpstreamEndpoint(std::string_view ServiceUrl) { - m_UpstreamQueue.Enqueue(std::move(CacheRecord)); - return {.Success = true}; + using namespace fmt::literals; + m_DisplayName = "Zen - '{}'"_format(ServiceUrl); + m_Client = new ZenStructuredCacheClient(ServiceUrl); } - return {}; - } + ~ZenUpstreamEndpoint() = default; -private: - void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) - { - const uint32_t MaxAttempts = 3; + virtual bool Initialize() override + { + //TODO: Test and authenticate Zen client connection + return !m_Client->ServiceUrl().empty(); + } - if (m_CloudClient && m_CloudClient->IsValid()) + virtual std::string_view DisplayName() const override { return m_DisplayName; } + + virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { - CloudCacheSession Session(m_CloudClient); - ZenCacheValue CacheValue; - if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue)) + try { - m_Log.warn("process upstream FAILED, '{}/{}' doesn't exist in cache", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash); - return; + ZenStructuredCacheSession Session(*m_Client); + const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); + return {.Value = Result.Response, .Success = Result.Success}; } - - if (CacheRecord.Type == ZenContentType::kBinary) + catch (std::exception& e) { - CloudCacheResult Result; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); - } + return {.Reason = std::string(e.what()), .Success = false}; + } + } - if (!Result.Success) - { - m_Log.warn("upload (binary) '{}/{}' FAILED after '{}' attempts", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - MaxAttempts); - } + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + { + try + { + ZenStructuredCacheSession Session(*m_Client); + const ZenCacheResult Result = + Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); + return {.Value = Result.Response, .Success = Result.Success}; } - else + catch (std::exception& e) { - ZEN_ASSERT(CacheRecord.Type == ZenContentType::kCbObject); + return {.Reason = std::string(e.what()), .Success = false}; + } + } + + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, + IoBuffer RecordValue, + std::span<IoBuffer const> Payloads) override + { + ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + const uint32_t MaxAttempts = 3; - CloudCacheResult Result; - for (const IoHash& PayloadId : CacheRecord.PayloadIds) + try + { + zen::ZenStructuredCacheSession Session(*m_Client); + ZenCacheResult Result; + + for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { Result.Success = false; - if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutCompressedBlob(PayloadId, Payload); - } + Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + CacheRecord.PayloadIds[Idx], + Payloads[Idx]); } if (!Result.Success) { - m_Log.warn("upload payload '{}' FAILED after '{}' attempts", PayloadId, MaxAttempts); - break; + return {.Reason = "Failed to upload payload", .Success = false}; } } - if (Result.Success) + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result.Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); - } + Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); } - if (!Result.Success) - { - m_Log.warn("upload cache record '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); - } + return {.Success = Result.Success}; + } + catch (std::exception& e) + { + return {.Reason = std::string(e.what()), .Success = false}; + } + } + + private: + std::string m_DisplayName; + RefPtr<zen::ZenStructuredCacheClient> m_Client; + }; + +} // namespace detail + +////////////////////////////////////////////////////////////////////////// + +class DefaultUpstreamCache final : public UpstreamCache +{ +public: + DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) + : m_Log(zen::logging::Get("upstream")) + , m_Options(Options) + , m_CacheStore(CacheStore) + , m_CidStore(CidStore) + { + ZEN_ASSERT(m_Options.ThreadCount > 0); + } + + virtual ~DefaultUpstreamCache() { Shutdown(); } + + virtual bool Initialize() override + { + auto NewEnd = std::remove_if(std::begin(m_Endpoints), std::end(m_Endpoints), [this](auto& Endpoint) { + const bool Ok = Endpoint->Initialize(); + m_Log.info("{} [{}]", Endpoint->DisplayName(), Ok ? "OK" : "FAILED"); + return !Ok; + }); + + m_Endpoints.erase(NewEnd, std::end(m_Endpoints)); + m_IsRunning = !m_Endpoints.empty(); + + if (m_IsRunning) + { + for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) + { + m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); + } + } + + return m_IsRunning; + } + + virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } + + virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + { + for (auto& Endpoint : m_Endpoints) + { + if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + { + return Result; + } + } + + return {}; + } + + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + { + for (auto& Endpoint : m_Endpoints) + { + if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + { + return Result; + } + } + + return {}; + } + + virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + { + if (m_IsRunning.load()) + { + m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + return {.Success = true}; + } + + return {}; + } + +private: + void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) + { + ZenCacheValue CacheValue; + std::vector<IoBuffer> Payloads; + + if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue)) + { + m_Log.warn("process upstream FAILED, '{}/{}', cache record doesn't exist", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash); + return; + } + + for (const IoHash& PayloadId : CacheRecord.PayloadIds) + { + if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) + { + Payloads.push_back(Payload); + } + else + { + m_Log.warn("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + PayloadId); + return; } } + + for (auto& Endpoint : m_Endpoints) + { + Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + } } void ProcessUpstreamQueue() @@ -320,20 +452,20 @@ private: } m_UpstreamThreads.clear(); + m_Endpoints.clear(); } } using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>; - spdlog::logger& m_Log; - UpstreamCacheOptions m_Options; - ::ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; - UpstreamQueue m_UpstreamQueue; - RefPtr<CloudCacheClient> m_CloudClient; - RefPtr<ZenStructuredCacheClient> m_ZenClient; - std::vector<std::thread> m_UpstreamThreads; - std::atomic_bool m_IsRunning{false}; + spdlog::logger& m_Log; + UpstreamCacheOptions m_Options; + ::ZenCacheStore& m_CacheStore; + CidStore& m_CidStore; + UpstreamQueue m_UpstreamQueue; + std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints; + std::vector<std::thread> m_UpstreamThreads; + std::atomic_bool m_IsRunning{false}; }; ////////////////////////////////////////////////////////////////////////// @@ -344,4 +476,16 @@ MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheSto return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore); } +std::unique_ptr<UpstreamEndpoint> +MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) +{ + return std::make_unique<detail::JupiterUpstreamEndpoint>(Options); +} + +std::unique_ptr<UpstreamEndpoint> +MakeZenUpstreamEndpoint(std::string_view Url) +{ + return std::make_unique<detail::ZenUpstreamEndpoint>(Url); +} + } // namespace zen diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 23a542151..d8359bc2c 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -13,6 +13,7 @@ class ZenCacheStore; namespace zen { class CidStore; +struct CloudCacheClientOptions; struct UpstreamCacheKey { @@ -35,14 +36,41 @@ struct UpstreamCacheRecord struct UpstreamCacheOptions { - uint32_t ThreadCount = 4; - bool JupiterEnabled = true; - std::string_view JupiterEndpoint; - std::string_view JupiterDdcNamespace; - std::string_view JupiterBlobStoreNamespace; - std::string_view JupiterOAuthProvider; - std::string_view JupiterOAuthClientId; - std::string_view JupiterOAuthSecret; + uint32_t ThreadCount = 4; +}; + +struct GetUpstreamCacheResult +{ + IoBuffer Value; + std::string Reason; + bool Success = false; +}; + +struct PutUpstreamCacheResult +{ + std::string Reason; + bool Success; +}; + +/** + * The upstream endpont is responsible for handling upload/downloading of cache records. + */ +class UpstreamEndpoint +{ +public: + virtual ~UpstreamEndpoint() = default; + + virtual bool Initialize() = 0; + + virtual std::string_view DisplayName() const = 0; + + virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; + + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; + + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, + IoBuffer RecordValue, + std::span<IoBuffer const> Payloads) = 0; }; /** @@ -53,21 +81,13 @@ class UpstreamCache public: virtual ~UpstreamCache() = default; - struct GetCacheRecordResult - { - IoBuffer Value; - bool Success = false; - }; + virtual bool Initialize() = 0; - virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; + virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; - struct GetCachePayloadResult - { - IoBuffer Payload; - bool Success = false; - }; + virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; - virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; struct EnqueueResult { @@ -79,4 +99,8 @@ public: std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore); +std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options); + +std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::string_view Url); + } // namespace zen diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index e9102ad45..3d4999e5d 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -8,6 +8,7 @@ #include <zencore/stream.h> #include "cache/structuredcachestore.h" +#include "diag/logging.h" // cpr //////////////////////////////////////////////////////////////////// // @@ -20,7 +21,6 @@ #include <cpr/cpr.h> #pragma warning(pop) -#include <spdlog/spdlog.h> #include <xxhash.h> #include <gsl/gsl-lite.hpp> @@ -322,6 +322,45 @@ namespace detail { ZenStructuredCacheClient& OwnerClient; cpr::Session Session; }; + + static void LogResponse(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response) + { + using namespace std::literals; + + std::string_view ContentType = "unknown"sv; + if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) + { + ContentType = It->second; + } + + const uint64_t Bytes = Verb == "GET"sv ? Response.downloaded_bytes : Response.uploaded_bytes; + + const bool IsBinary = + ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv || ContentType == "application/octet-stream"; + + if (IsBinary) + { + Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}' '{}' Bytes, Reason: '{}'", + Verb, + Response.url.str(), + Response.status_code, + Response.elapsed, + ContentType, + Bytes, + Response.reason); + } + else + { + Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}': '{}', Reason: '{}'", + Verb, + Response.url.str(), + Response.status_code, + Response.elapsed, + ContentType, + Response.text, + Response.reason); + } + } } // namespace detail ////////////////////////////////////////////////////////////////////////// @@ -364,50 +403,99 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) ////////////////////////////////////////////////////////////////////////// -ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) : m_Client(OuterClient) +using namespace std::literals; + +ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) +: m_Log(zen::logging::Get("zenclient"sv)) +, m_Client(OuterClient) { + m_SessionState = m_Client.AllocSessionState(); } ZenStructuredCacheSession::~ZenStructuredCacheSession() { + m_Client.FreeSessionState(m_SessionState); } -IoBuffer -ZenStructuredCacheSession::Get(std::string_view BucketId, std::string_view Key) +ZenCacheResult +ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type) { - ZEN_UNUSED(BucketId, Key); + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Accept", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}}); + + cpr::Response Response = Session.Get(); + detail::LogResponse(m_Log, "GET"sv, Response); + + if (Response.status_code == 200) + { + return {.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), .Success = true}; + } return {}; } -void -ZenStructuredCacheSession::Put(std::string_view BucketId, std::string_view Key, IoBuffer Data) +ZenCacheResult +ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId) { - ZEN_UNUSED(BucketId, Key, Data); -} + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); -// Structured cache operations + cpr::Session& Session = m_SessionState->Session; -IoBuffer -ZenStructuredCacheSession::Get(std::string_view BucketId, const IoHash& Key) -{ - ZEN_UNUSED(BucketId, Key); + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}}); + + cpr::Response Response = Session.Get(); + detail::LogResponse(m_Log, "GET"sv, Response); + + if (Response.status_code == 200) + { + return {.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), .Success = true}; + } return {}; } -IoBuffer -ZenStructuredCacheSession::Get(std::string_view BucketId, const IoHash& Key, const IoHash& ContentId) +ZenCacheResult +ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type) { - ZEN_UNUSED(BucketId, Key, ContentId); + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); - return {}; + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader( + cpr::Header{{"Content-Type", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}}); + Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()}); + + cpr::Response Response = Session.Put(); + detail::LogResponse(m_Log, "PUT"sv, Response); + + return {.Success = Response.status_code == 200}; } -void -ZenStructuredCacheSession::Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data) +ZenCacheResult +ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload) { - ZEN_UNUSED(BucketId, Key, Data); + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}}); + Session.SetBody(cpr::Body{static_cast<const char*>(Payload.Data()), Payload.Size()}); + + cpr::Response Response = Session.Put(); + detail::LogResponse(m_Log, "PUT"sv, Response); + + return {.Success = Response.status_code == 200}; } } // namespace zen diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 1d7d5752e..5df6da4a3 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -20,6 +20,10 @@ struct ZenCacheValue; +namespace spdlog { +class logger; +} + namespace zen { class CbObjectWriter; @@ -81,6 +85,12 @@ namespace detail { struct ZenCacheSessionState; } +struct ZenCacheResult +{ + IoBuffer Response; + bool Success = false; +}; + /** Zen Structured Cache session * * This provides a context in which cache queries can be performed @@ -93,16 +103,13 @@ public: ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient); ~ZenStructuredCacheSession(); - // Key-value cache operations - IoBuffer Get(std::string_view BucketId, std::string_view Key); - void Put(std::string_view BucketId, std::string_view Key, IoBuffer Data); - - // Structured cache operations - IoBuffer Get(std::string_view BucketId, const IoHash& Key); - IoBuffer Get(std::string_view BucketId, const IoHash& Key, const IoHash& ContentId); - void Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data); + ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type); + ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId); + ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); + ZenCacheResult PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload); private: + spdlog::logger& m_Log; ZenStructuredCacheClient& m_Client; detail::ZenCacheSessionState* m_SessionState; }; @@ -118,6 +125,8 @@ public: ZenStructuredCacheClient(std::string_view ServiceUrl); ~ZenStructuredCacheClient(); + std::string_view ServiceUrl() const { return m_ServiceUrl; } + private: std::string m_ServiceUrl; diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 395ae5fc2..5f8b23efa 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -140,25 +140,69 @@ public: if (ServiceConfig.StructuredCacheEnabled) { + using namespace std::literals; + auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; }; + spdlog::info("instantiating structured cache service"); m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); std::unique_ptr<zen::UpstreamCache> UpstreamCache; - if (ServiceConfig.UpstreamCacheEnabled) + if (ServiceConfig.UpstreamCacheConfig.Enabled) { - using namespace std::literals; + const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig; zen::UpstreamCacheOptions UpstreamOptions; - UpstreamOptions.ThreadCount = 4; - UpstreamOptions.JupiterEnabled = true; - UpstreamOptions.JupiterEndpoint = "https://jupiter.devtools-dev.epicgames.com"sv; - UpstreamOptions.JupiterDdcNamespace = "ue4.ddc"sv; - UpstreamOptions.JupiterBlobStoreNamespace = "test.ddc"sv; - UpstreamOptions.JupiterOAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv; - UpstreamOptions.JupiterOAuthClientId = "0oao91lrhqPiAlaGD0x7"sv; - UpstreamOptions.JupiterOAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv; + + if (UpstreamConfig.UpstreamThreadCount > 0 && UpstreamConfig.UpstreamThreadCount < 32) + { + UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); + } UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); + + if (!UpstreamConfig.ZenConfig.Url.empty()) + { + std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Url); + UpstreamCache->AddEndpoint(std::move(ZenEndpoint)); + } + + { + zen::CloudCacheClientOptions Options; + if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings) + { + Options = zen::CloudCacheClientOptions{ + .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, + .DdcNamespace = "ue4.ddc"sv, + .BlobStoreNamespace = "test.ddc"sv, + .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, + .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, + .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, + }; + } + + Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl); + Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace); + Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace); + Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider); + Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId); + Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret); + + if (!Options.ServiceUrl.empty()) + { + std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options); + UpstreamCache->AddEndpoint(std::move(JupiterEndpoint)); + } + } + + if (UpstreamCache->Initialize()) + { + spdlog::info("upstream cache active"); + } + else + { + UpstreamCache.reset(); + spdlog::info("upstream cache NOT active"); + } } m_StructuredCacheService.reset( |