aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-09-03 15:37:19 +0200
committerGitHub <[email protected]>2021-09-03 15:37:19 +0200
commitc04fa527593da17c719c4d899ec19caeb6480a94 (patch)
treefa65fd7585d55712bad546d76a0fc3518023a905
parentoops: Fixed AssertException implementation namespace (diff)
downloadzen-c04fa527593da17c719c4d899ec19caeb6480a94.tar.xz
zen-c04fa527593da17c719c4d899ec19caeb6480a94.zip
Zen upstream support (#7)
-rw-r--r--zenserver/cache/kvcache.cpp15
-rw-r--r--zenserver/cache/structuredcache.cpp36
-rw-r--r--zenserver/config.cpp130
-rw-r--r--zenserver/config.h36
-rw-r--r--zenserver/diag/logging.cpp4
-rw-r--r--zenserver/upstream/jupiter.cpp38
-rw-r--r--zenserver/upstream/jupiter.h19
-rw-r--r--zenserver/upstream/upstreamcache.cpp416
-rw-r--r--zenserver/upstream/upstreamcache.h64
-rw-r--r--zenserver/upstream/zen.cpp130
-rw-r--r--zenserver/upstream/zen.h25
-rw-r--r--zenserver/zenserver.cpp64
12 files changed, 712 insertions, 265 deletions
diff --git a/zenserver/cache/kvcache.cpp b/zenserver/cache/kvcache.cpp
index a044b9dd0..d78329ee9 100644
--- a/zenserver/cache/kvcache.cpp
+++ b/zenserver/cache/kvcache.cpp
@@ -65,13 +65,14 @@ HttpKvCacheService::AccessTracker::TrackAccess(std::string_view Key)
HttpKvCacheService::HttpKvCacheService()
{
- m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv,
- "ue4.ddc"sv /* namespace */,
- "test.ddc"sv /* blob store namespace */,
- "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */,
- "0oao91lrhqPiAlaGD0x7"sv /* client id */,
- "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */);
-
+ m_Cloud = new CloudCacheClient(CloudCacheClientOptions{
+ .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
+ .DdcNamespace = "ue4.ddc"sv,
+ .BlobStoreNamespace = "test.ddc"sv,
+ .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
+ .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
+ .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
+ });
m_AccessTracker = std::make_unique<AccessTracker>();
}
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 00b3b545b..0e235a9be 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -149,7 +149,8 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
case kGet:
{
ZenCacheValue Value;
- bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
+ bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
+ bool InUpstreamCache = false;
if (!Success && m_UpstreamCache)
{
@@ -159,8 +160,9 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType);
UpstreamResult.Success)
{
- Value.Value = UpstreamResult.Value;
- Success = true;
+ Value.Value = UpstreamResult.Value;
+ Success = true;
+ InUpstreamCache = true;
if (CacheRecordType == ZenContentType::kCbObject)
{
@@ -215,11 +217,12 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
Request.SetSuppressResponseBody();
}
- m_Log.debug("HIT - '{}/{}' ({} bytes, {})",
+ m_Log.debug("HIT - '{}/{}' ({} bytes {}) ({})",
Ref.BucketSegment,
Ref.HashKey,
Value.Value.Size(),
- Value.Value.GetContentType());
+ Value.Value.GetContentType(),
+ InUpstreamCache ? "upstream" : "local");
return Request.WriteResponse(zen::HttpResponse::OK, MapToHttpContentType(Value.Value.GetContentType()), Value.Value);
}
@@ -267,7 +270,6 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
{
auto Result = m_UpstreamCache->EnqueueUpstream(
{.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}});
- ZEN_ASSERT(Result.Success);
}
return Request.WriteResponse(zen::HttpResponse::Created);
@@ -332,7 +334,6 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
.CacheKey = {Ref.BucketSegment, Ref.HashKey},
.PayloadIds = std::move(References)});
- ZEN_ASSERT(Result.Success);
}
return Request.WriteResponse(zen::HttpResponse::Created);
@@ -379,18 +380,20 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
case kHead:
case kGet:
{
- zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
+ zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
+ bool InUpstreamCache = false;
if (!Payload && m_UpstreamCache)
{
if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId});
UpstreamResult.Success)
{
- if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Payload)))
+ if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
{
- Payload = UpstreamResult.Payload;
+ Payload = UpstreamResult.Value;
zen::IoHash ChunkHash = zen::IoHash::HashBuffer(Payload);
zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash);
+ InUpstreamCache = true;
m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
}
@@ -403,22 +406,17 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
if (!Payload)
{
- m_Log.debug("MISS - '{}/{}/{}' ({} bytes, {})",
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.PayloadId,
- Payload.Size(),
- Payload.GetContentType());
-
+ m_Log.debug("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId);
return Request.WriteResponse(zen::HttpResponse::NotFound);
}
- m_Log.debug("HIT - '{}/{}/{}' ({} bytes, {})",
+ m_Log.debug("HIT - '{}/{}/{}' ({} bytes, {}) ({})",
Ref.BucketSegment,
Ref.HashKey,
Ref.PayloadId,
Payload.Size(),
- Payload.GetContentType());
+ Payload.GetContentType(),
+ InUpstreamCache ? "upstream" : "local");
if (Verb == kHead)
{
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index 6e8b48703..8fe9f72f2 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -97,10 +97,74 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
options.add_option("cache",
"",
- "enable-upstream-cache",
- "Whether upstream caching is enabled",
- cxxopts::value<bool>(ServiceConfig.UpstreamCacheEnabled)->default_value("false"),
+ "upstream-jupiter-url",
+ "URL to a Jupiter instance",
+ cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.Url)->default_value(""),
"");
+
+ options.add_option("cache",
+ "",
+ "upstream-jupiter-oauth-url",
+ "URL to the OAuth provier",
+ cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.OAuthProvider)->default_value(""),
+ "");
+
+ options.add_option("cache",
+ "",
+ "upstream-jupiter-oauth-clientid",
+ "The OAuth client ID",
+ cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.OAuthClientId)->default_value(""),
+ "");
+
+ options.add_option("cache",
+ "",
+ "upstream-jupiter-oauth-clientsecret",
+ "The OAuth client secret",
+ cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.OAuthClientSecret)->default_value(""),
+ "");
+
+ options.add_option("cache",
+ "",
+ "upstream-jupiter-namespace",
+ "The Common Blob Store API namespace",
+ cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.Namespace)->default_value(""),
+ "");
+
+ options.add_option("cache",
+ "",
+ "upstream-jupiter-namespace-ddc",
+ "The lecacy DDC namespace",
+ cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.DdcNamespace)->default_value(""),
+ "");
+
+ options.add_option("cache",
+ "",
+ "upstream-jupiter-dev",
+ "Enable Jupiter upstream caching using development settings",
+ cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.UseDevelopmentSettings)->default_value("false"),
+ "");
+
+ options.add_option("cache",
+ "",
+ "upstream-zen-url",
+ "URL to a remote Zen server instance",
+ cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.ZenConfig.Url)->default_value(""),
+ "");
+
+ options.add_option("cache",
+ "",
+ "upstream-enabled",
+ "Whether upstream caching is disabled",
+ cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.Enabled)->default_value("true"),
+ "");
+
+ options.add_option("cache",
+ "",
+ "upstream-thread-count",
+ "Number of threads used for upstream procsssing",
+ cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"),
+ "");
+
try
{
auto result = options.parse(argc, argv);
@@ -171,10 +235,60 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv
throw std::exception("fatal zen global config script ({}) failure: {}"_format(ConfigScript, e.what()).c_str());
}
- ServiceConfig.LegacyCacheEnabled = lua["legacycache"]["enable"].get_or(ServiceConfig.LegacyCacheEnabled);
- const std::string path = lua["legacycache"]["readpath"].get_or(std::string());
- ServiceConfig.StructuredCacheEnabled = lua["structuredcache"]["enable"].get_or(ServiceConfig.StructuredCacheEnabled);
- ServiceConfig.MeshEnabled = lua["mesh"]["enable"].get_or(ServiceConfig.MeshEnabled);
- ServiceConfig.UpstreamCacheEnabled = lua["structuredcache"]["upstream"]["enable"].get_or(ServiceConfig.UpstreamCacheEnabled);
+ ServiceConfig.LegacyCacheEnabled = lua["legacycache"]["enable"].get_or(ServiceConfig.LegacyCacheEnabled);
+ const std::string path = lua["legacycache"]["readpath"].get_or(std::string());
+ ServiceConfig.MeshEnabled = lua["mesh"]["enable"].get_or(ServiceConfig.MeshEnabled);
+
+ auto UpdateStringValueFromConfig = [](const sol::table& Table, std::string_view Key, std::string& OutValue) {
+ // Update the specified config value unless it has been set, i.e. from command line
+ if (auto MaybeValue = Table.get<sol::optional<std::string>>(Key); MaybeValue.has_value() && OutValue.empty())
+ {
+ OutValue = MaybeValue.value();
+ }
+ };
+
+ if (sol::optional<sol::table> StructuredCacheConfig = lua["structuredcache"])
+ {
+ ServiceConfig.StructuredCacheEnabled = StructuredCacheConfig->get_or("enable", ServiceConfig.StructuredCacheEnabled);
+
+ if (auto UpstreamConfig = StructuredCacheConfig->get<sol::optional<sol::table>>("upstream"))
+ {
+ ServiceConfig.UpstreamCacheConfig.Enabled = UpstreamConfig->get_or("enable", ServiceConfig.UpstreamCacheConfig.Enabled);
+ ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount = UpstreamConfig->get_or("upstreamthreadcount", 4);
+
+ if (auto JupiterConfig = UpstreamConfig->get<sol::optional<sol::table>>("jupiter"))
+ {
+ UpdateStringValueFromConfig(JupiterConfig.value(),
+ std::string_view("url"),
+ ServiceConfig.UpstreamCacheConfig.JupiterConfig.Url);
+ UpdateStringValueFromConfig(JupiterConfig.value(),
+ std::string_view("oauthprovider"),
+ ServiceConfig.UpstreamCacheConfig.JupiterConfig.OAuthProvider);
+ UpdateStringValueFromConfig(JupiterConfig.value(),
+ std::string_view("oauthclientid"),
+ ServiceConfig.UpstreamCacheConfig.JupiterConfig.OAuthClientId);
+ UpdateStringValueFromConfig(JupiterConfig.value(),
+ std::string_view("oauthclientsecret"),
+ ServiceConfig.UpstreamCacheConfig.JupiterConfig.OAuthClientSecret);
+ UpdateStringValueFromConfig(JupiterConfig.value(),
+ std::string_view("namespace"),
+ ServiceConfig.UpstreamCacheConfig.JupiterConfig.Namespace);
+ UpdateStringValueFromConfig(JupiterConfig.value(),
+ std::string_view("ddcnamespace"),
+ ServiceConfig.UpstreamCacheConfig.JupiterConfig.DdcNamespace);
+
+ ServiceConfig.UpstreamCacheConfig.JupiterConfig.UseDevelopmentSettings =
+ JupiterConfig->get_or("usedevelopmentsettings",
+ ServiceConfig.UpstreamCacheConfig.JupiterConfig.UseDevelopmentSettings);
+ };
+
+ if (auto ZenConfig = UpstreamConfig->get<sol::optional<sol::table>>("zen"))
+ {
+ UpdateStringValueFromConfig(ZenConfig.value(),
+ std::string_view("url"),
+ ServiceConfig.UpstreamCacheConfig.ZenConfig.Url);
+ }
+ }
+ }
}
}
diff --git a/zenserver/config.h b/zenserver/config.h
index 04498a520..d0f5f7280 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -16,14 +16,38 @@ struct ZenServerOptions
std::filesystem::path DataDir; // Root directory for state (used for testing)
};
+struct ZenUpstreamJupiterConfig
+{
+ std::string Url;
+ std::string OAuthProvider;
+ std::string OAuthClientId;
+ std::string OAuthClientSecret;
+ std::string Namespace;
+ std::string DdcNamespace;
+ bool UseDevelopmentSettings;
+};
+
+struct ZenUpstreamZenConfig
+{
+ std::string Url;
+};
+
+struct ZenUpstreamCacheConfig
+{
+ ZenUpstreamJupiterConfig JupiterConfig;
+ ZenUpstreamZenConfig ZenConfig;
+ int UpstreamThreadCount = 4;
+ bool Enabled = false;
+};
+
struct ZenServiceConfig
{
- bool LegacyCacheEnabled = false;
- bool StructuredCacheEnabled = true;
- bool UpstreamCacheEnabled = false;
- bool ShouldCrash = false; // Option for testing crash handling
- bool MeshEnabled = false; // Experimental p2p mesh discovery
- std::string FlockId; // Id for grouping test instances into sets
+ bool LegacyCacheEnabled = false;
+ bool StructuredCacheEnabled = true;
+ bool ShouldCrash = false; // Option for testing crash handling
+ bool MeshEnabled = false; // Experimental p2p mesh discovery
+ std::string FlockId; // Id for grouping test instances into sets
+ ZenUpstreamCacheConfig UpstreamCacheConfig;
};
void ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, ZenServiceConfig& ServiceConfig);
diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp
index 23933f12e..89ed4ac6a 100644
--- a/zenserver/diag/logging.cpp
+++ b/zenserver/diag/logging.cpp
@@ -210,6 +210,10 @@ InitializeLogging(const ZenServerOptions& GlobalOptions)
spdlog::register_logger(JupiterLogger);
JupiterLogger->set_level(LogLevel);
+ auto ZenClientLogger = std::make_shared<spdlog::logger>("zenclient", FileSink);
+ spdlog::register_logger(ZenClientLogger);
+ ZenClientLogger->set_level(LogLevel);
+
spdlog::set_level(LogLevel);
spdlog::set_formatter(std::make_unique<logging::full_formatter>(GlobalOptions.LogId, std::chrono::system_clock::now()));
}
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index a59b9d317..7e22b9af9 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -52,7 +52,7 @@ namespace detail {
cpr::Session Session;
};
- void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response)
+ static void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response)
{
std::string_view ContentType = "unknown"sv;
if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
@@ -299,24 +299,18 @@ CloudCacheAccessToken::SetToken(std::string_view Token)
// OAuthSecret: -GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d
//
-CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl,
- std::string_view DdcNamespace,
- std::string_view BlobStoreNamespace,
- std::string_view OAuthProvider,
- std::string_view OAuthClientId,
- std::string_view OAuthSecret)
+CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options)
: m_Log(zen::logging::Get("jupiter"))
-, m_ServiceUrl(ServiceUrl)
-, m_OAuthFullUri(OAuthProvider)
-, m_DdcNamespace(DdcNamespace)
-, m_BlobStoreNamespace(BlobStoreNamespace)
-, m_DefaultBucket("default")
-, m_OAuthClientId(OAuthClientId)
-, m_OAuthSecret(OAuthSecret)
+, m_ServiceUrl(Options.ServiceUrl)
+, m_OAuthFullUri(Options.OAuthProvider)
+, m_DdcNamespace(Options.DdcNamespace)
+, m_BlobStoreNamespace(Options.BlobStoreNamespace)
+, m_OAuthClientId(Options.OAuthClientId)
+, m_OAuthSecret(Options.OAuthSecret)
{
- if (!OAuthProvider.starts_with("http://"sv) && !OAuthProvider.starts_with("https://"sv))
+ if (!Options.OAuthProvider.starts_with("http://"sv) && !Options.OAuthProvider.starts_with("https://"sv))
{
- m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str());
+ m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(Options.OAuthProvider).c_str());
m_IsValid = false;
return;
@@ -324,28 +318,28 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl,
// Split into host and Uri substrings
- auto SchemePos = OAuthProvider.find("://"sv);
+ auto SchemePos = Options.OAuthProvider.find("://"sv);
if (SchemePos == std::string::npos)
{
- m_Log.warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl);
+ m_Log.warn("Bad service URL passed to cloud cache client: '{}'", Options.ServiceUrl);
m_IsValid = false;
return;
}
- auto DomainEnd = OAuthProvider.find('/', /* also skip the :// */ SchemePos + 3);
+ auto DomainEnd = Options.OAuthProvider.find('/', /* also skip the :// */ SchemePos + 3);
if (DomainEnd == std::string::npos)
{
- m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl);
+ m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", Options.ServiceUrl);
m_IsValid = false;
return;
}
- m_OAuthDomain = OAuthProvider.substr(SchemePos + 3, DomainEnd - SchemePos - 3); // epicgames.okta.com
- m_OAuthUriPath = OAuthProvider.substr(DomainEnd + 1); // oauth2/..../v1/token
+ m_OAuthDomain = Options.OAuthProvider.substr(SchemePos + 3, DomainEnd - SchemePos - 3); // epicgames.okta.com
+ m_OAuthUriPath = Options.OAuthProvider.substr(DomainEnd + 1); // oauth2/..../v1/token
}
CloudCacheClient::~CloudCacheClient()
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 61d1bd99c..ba5e0a65a 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -78,24 +78,28 @@ private:
detail::CloudCacheSessionState* m_SessionState;
};
+struct CloudCacheClientOptions
+{
+ std::string_view ServiceUrl;
+ std::string_view DdcNamespace;
+ std::string_view BlobStoreNamespace;
+ std::string_view OAuthProvider;
+ std::string_view OAuthClientId;
+ std::string_view OAuthSecret;
+};
+
/**
* Jupiter upstream cache client
*/
class CloudCacheClient : public RefCounted
{
public:
- CloudCacheClient(std::string_view ServiceUrl,
- std::string_view DdcNamespace,
- std::string_view BlobStoreNamespace,
- std::string_view OAuthProvider,
- std::string_view OAuthClientId,
- std::string_view OAuthSecret);
+ CloudCacheClient(const CloudCacheClientOptions& Options);
~CloudCacheClient();
bool AcquireAccessToken(std::string& AuthorizationHeaderValue);
std::string_view DdcNamespace() const { return m_DdcNamespace; }
std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; }
- std::string_view DefaultBucket() const { return m_DefaultBucket; }
std::string_view ServiceUrl() const { return m_ServiceUrl; }
bool IsValid() const { return m_IsValid; }
@@ -109,7 +113,6 @@ private:
std::string m_OAuthFullUri;
std::string m_DdcNamespace;
std::string m_BlobStoreNamespace;
- std::string m_DefaultBucket;
std::string m_OAuthClientId;
std::string m_OAuthSecret;
CloudCacheAccessToken m_AccessToken;
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index ecd51a706..40d7ebd26 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -12,6 +12,9 @@
#include "cache/structuredcachestore.h"
#include "diag/logging.h"
+#include <fmt/format.h>
+
+#include <algorithm>
#include <atomic>
#include <deque>
#include <thread>
@@ -83,70 +86,32 @@ namespace detail {
std::atomic_bool m_CompleteAdding{false};
};
-} // namespace detail
-
-//////////////////////////////////////////////////////////////////////////
-
-class DefaultUpstreamCache final : public UpstreamCache
-{
-public:
- DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
- : m_Log(zen::logging::Get("upstream"))
- , m_Options(Options)
- , m_CacheStore(CacheStore)
- , m_CidStore(CidStore)
+ class JupiterUpstreamEndpoint final : public zen::UpstreamEndpoint
{
- if (m_Options.JupiterEnabled)
+ public:
+ JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
{
- m_CloudClient = new CloudCacheClient(m_Options.JupiterEndpoint,
- m_Options.JupiterDdcNamespace,
- m_Options.JupiterBlobStoreNamespace,
- m_Options.JupiterOAuthProvider,
- m_Options.JupiterOAuthClientId,
- m_Options.JupiterOAuthSecret);
-
- std::string TmpAuthHeader;
- if (m_CloudClient->AcquireAccessToken(TmpAuthHeader) && m_CloudClient->IsValid())
- {
- m_Log.info("using Jupiter endpoint: '{}', DDC namespace: '{}', Blob Store namespace: '{}'",
- m_Options.JupiterEndpoint,
- m_Options.JupiterDdcNamespace,
- m_Options.JupiterBlobStoreNamespace);
- }
- else
- {
- m_Log.warn("failed to initialized Jupiter at '{}'", m_Options.JupiterEndpoint);
- }
+ using namespace fmt::literals;
+ m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl);
+ m_Client = new CloudCacheClient(Options);
}
- m_IsRunning = m_CloudClient && m_CloudClient->IsValid();
+ virtual ~JupiterUpstreamEndpoint() = default;
- if (m_IsRunning)
+ virtual bool Initialize() override
{
- m_Log.info("using '{}' upstream thread(s)", m_Options.ThreadCount);
- for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
- {
- m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this);
- }
+ //TODO: Test and authenticate Jupiter client connection
+ return !m_Client->ServiceUrl().empty();
}
- else
- {
- m_Log.warn("upstream disabled, no valid endpoints");
- }
- }
- virtual ~DefaultUpstreamCache() { Shutdown(); }
+ virtual std::string_view DisplayName() const override { return m_DisplayName; }
- virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
- {
- if (m_CloudClient && m_CloudClient->IsValid())
+ virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
{
- zen::Stopwatch Timer;
-
try
{
- CloudCacheSession Session(m_CloudClient);
- CloudCacheResult Result;
+ zen::CloudCacheSession Session(m_Client);
+ CloudCacheResult Result;
if (Type == ZenContentType::kBinary)
{
@@ -161,126 +126,293 @@ public:
}
catch (std::exception& e)
{
- m_Log.warn("get cache record ({}/{}) FAILED after {:5}: '{}'",
- CacheKey.Bucket,
- CacheKey.Hash,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
+ return {.Reason = std::string(e.what()), .Success = false};
}
}
- return {};
- }
+ virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ {
+ try
+ {
+ zen::CloudCacheSession Session(m_Client);
+ const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
+ return {.Value = Result.Value, .Success = Result.Success};
+ }
+ catch (std::exception& e)
+ {
+ return {.Reason = std::string(e.what()), .Success = false};
+ }
+ }
- virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
- {
- if (m_CloudClient && m_CloudClient->IsValid())
+ virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
+ IoBuffer RecordValue,
+ std::span<IoBuffer const> Payloads) override
{
- zen::Stopwatch Timer;
+ ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
+ const uint32_t MaxAttempts = 3;
try
{
- CloudCacheSession Session(m_CloudClient);
+ CloudCacheSession Session(m_Client);
+
+ if (CacheRecord.Type == ZenContentType::kBinary)
+ {
+ CloudCacheResult Result;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue);
+ }
- CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
- return {.Payload = Result.Value, .Success = Result.Success};
+ return {.Success = Result.Success};
+ }
+ else
+ {
+ for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ {
+ CloudCacheResult Result;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ }
+
+ if (!Result.Success)
+ {
+ return {.Reason = "Failed to upload payload", .Success = false};
+ }
+ }
+
+ {
+ CloudCacheResult Result;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue);
+ }
+
+ return {.Success = Result.Success};
+ }
+ }
}
catch (std::exception& e)
{
- m_Log.warn("get cache payload ({}/{}/{}) FAILED after {:5}: '{}'",
- PayloadKey.CacheKey.Bucket,
- PayloadKey.CacheKey.Hash,
- PayloadKey.PayloadId,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
+ return {.Reason = std::string(e.what()), .Success = false};
}
}
- return {};
- }
+ private:
+ std::string m_DisplayName;
+ RefPtr<CloudCacheClient> m_Client;
+ };
- virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
+ class ZenUpstreamEndpoint final : public zen::UpstreamEndpoint
{
- if (m_IsRunning.load())
+ public:
+ ZenUpstreamEndpoint(std::string_view ServiceUrl)
{
- m_UpstreamQueue.Enqueue(std::move(CacheRecord));
- return {.Success = true};
+ using namespace fmt::literals;
+ m_DisplayName = "Zen - '{}'"_format(ServiceUrl);
+ m_Client = new ZenStructuredCacheClient(ServiceUrl);
}
- return {};
- }
+ ~ZenUpstreamEndpoint() = default;
-private:
- void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
- {
- const uint32_t MaxAttempts = 3;
+ virtual bool Initialize() override
+ {
+ //TODO: Test and authenticate Zen client connection
+ return !m_Client->ServiceUrl().empty();
+ }
- if (m_CloudClient && m_CloudClient->IsValid())
+ virtual std::string_view DisplayName() const override { return m_DisplayName; }
+
+ virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
{
- CloudCacheSession Session(m_CloudClient);
- ZenCacheValue CacheValue;
- if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue))
+ try
{
- m_Log.warn("process upstream FAILED, '{}/{}' doesn't exist in cache",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash);
- return;
+ ZenStructuredCacheSession Session(*m_Client);
+ const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
+ return {.Value = Result.Response, .Success = Result.Success};
}
-
- if (CacheRecord.Type == ZenContentType::kBinary)
+ catch (std::exception& e)
{
- CloudCacheResult Result;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value);
- }
+ return {.Reason = std::string(e.what()), .Success = false};
+ }
+ }
- if (!Result.Success)
- {
- m_Log.warn("upload (binary) '{}/{}' FAILED after '{}' attempts",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- MaxAttempts);
- }
+ virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ {
+ try
+ {
+ ZenStructuredCacheSession Session(*m_Client);
+ const ZenCacheResult Result =
+ Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
+ return {.Value = Result.Response, .Success = Result.Success};
}
- else
+ catch (std::exception& e)
{
- ZEN_ASSERT(CacheRecord.Type == ZenContentType::kCbObject);
+ return {.Reason = std::string(e.what()), .Success = false};
+ }
+ }
+
+ virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
+ IoBuffer RecordValue,
+ std::span<IoBuffer const> Payloads) override
+ {
+ ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
+ const uint32_t MaxAttempts = 3;
- CloudCacheResult Result;
- for (const IoHash& PayloadId : CacheRecord.PayloadIds)
+ try
+ {
+ zen::ZenStructuredCacheSession Session(*m_Client);
+ ZenCacheResult Result;
+
+ for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
{
Result.Success = false;
- if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId))
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutCompressedBlob(PayloadId, Payload);
- }
+ Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ CacheRecord.PayloadIds[Idx],
+ Payloads[Idx]);
}
if (!Result.Success)
{
- m_Log.warn("upload payload '{}' FAILED after '{}' attempts", PayloadId, MaxAttempts);
- break;
+ return {.Reason = "Failed to upload payload", .Success = false};
}
}
- if (Result.Success)
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- Result.Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value);
- }
+ Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
}
- if (!Result.Success)
- {
- m_Log.warn("upload cache record '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
- }
+ return {.Success = Result.Success};
+ }
+ catch (std::exception& e)
+ {
+ return {.Reason = std::string(e.what()), .Success = false};
+ }
+ }
+
+ private:
+ std::string m_DisplayName;
+ RefPtr<zen::ZenStructuredCacheClient> m_Client;
+ };
+
+} // namespace detail
+
+//////////////////////////////////////////////////////////////////////////
+
+class DefaultUpstreamCache final : public UpstreamCache
+{
+public:
+ DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
+ : m_Log(zen::logging::Get("upstream"))
+ , m_Options(Options)
+ , m_CacheStore(CacheStore)
+ , m_CidStore(CidStore)
+ {
+ ZEN_ASSERT(m_Options.ThreadCount > 0);
+ }
+
+ virtual ~DefaultUpstreamCache() { Shutdown(); }
+
+ virtual bool Initialize() override
+ {
+ auto NewEnd = std::remove_if(std::begin(m_Endpoints), std::end(m_Endpoints), [this](auto& Endpoint) {
+ const bool Ok = Endpoint->Initialize();
+ m_Log.info("{} [{}]", Endpoint->DisplayName(), Ok ? "OK" : "FAILED");
+ return !Ok;
+ });
+
+ m_Endpoints.erase(NewEnd, std::end(m_Endpoints));
+ m_IsRunning = !m_Endpoints.empty();
+
+ if (m_IsRunning)
+ {
+ for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
+ {
+ m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this);
+ }
+ }
+
+ return m_IsRunning;
+ }
+
+ virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); }
+
+ virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ {
+ return Result;
+ }
+ }
+
+ return {};
+ }
+
+ virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ {
+ return Result;
+ }
+ }
+
+ return {};
+ }
+
+ virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
+ {
+ if (m_IsRunning.load())
+ {
+ m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ return {.Success = true};
+ }
+
+ return {};
+ }
+
+private:
+ void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
+ {
+ ZenCacheValue CacheValue;
+ std::vector<IoBuffer> Payloads;
+
+ if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue))
+ {
+ m_Log.warn("process upstream FAILED, '{}/{}', cache record doesn't exist",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash);
+ return;
+ }
+
+ for (const IoHash& PayloadId : CacheRecord.PayloadIds)
+ {
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId))
+ {
+ Payloads.push_back(Payload);
+ }
+ else
+ {
+ m_Log.warn("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ PayloadId);
+ return;
}
}
+
+ for (auto& Endpoint : m_Endpoints)
+ {
+ Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ }
}
void ProcessUpstreamQueue()
@@ -320,20 +452,20 @@ private:
}
m_UpstreamThreads.clear();
+ m_Endpoints.clear();
}
}
using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>;
- spdlog::logger& m_Log;
- UpstreamCacheOptions m_Options;
- ::ZenCacheStore& m_CacheStore;
- CidStore& m_CidStore;
- UpstreamQueue m_UpstreamQueue;
- RefPtr<CloudCacheClient> m_CloudClient;
- RefPtr<ZenStructuredCacheClient> m_ZenClient;
- std::vector<std::thread> m_UpstreamThreads;
- std::atomic_bool m_IsRunning{false};
+ spdlog::logger& m_Log;
+ UpstreamCacheOptions m_Options;
+ ::ZenCacheStore& m_CacheStore;
+ CidStore& m_CidStore;
+ UpstreamQueue m_UpstreamQueue;
+ std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints;
+ std::vector<std::thread> m_UpstreamThreads;
+ std::atomic_bool m_IsRunning{false};
};
//////////////////////////////////////////////////////////////////////////
@@ -344,4 +476,16 @@ MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheSto
return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore);
}
+std::unique_ptr<UpstreamEndpoint>
+MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
+{
+ return std::make_unique<detail::JupiterUpstreamEndpoint>(Options);
+}
+
+std::unique_ptr<UpstreamEndpoint>
+MakeZenUpstreamEndpoint(std::string_view Url)
+{
+ return std::make_unique<detail::ZenUpstreamEndpoint>(Url);
+}
+
} // namespace zen
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 23a542151..d8359bc2c 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -13,6 +13,7 @@ class ZenCacheStore;
namespace zen {
class CidStore;
+struct CloudCacheClientOptions;
struct UpstreamCacheKey
{
@@ -35,14 +36,41 @@ struct UpstreamCacheRecord
struct UpstreamCacheOptions
{
- uint32_t ThreadCount = 4;
- bool JupiterEnabled = true;
- std::string_view JupiterEndpoint;
- std::string_view JupiterDdcNamespace;
- std::string_view JupiterBlobStoreNamespace;
- std::string_view JupiterOAuthProvider;
- std::string_view JupiterOAuthClientId;
- std::string_view JupiterOAuthSecret;
+ uint32_t ThreadCount = 4;
+};
+
+struct GetUpstreamCacheResult
+{
+ IoBuffer Value;
+ std::string Reason;
+ bool Success = false;
+};
+
+struct PutUpstreamCacheResult
+{
+ std::string Reason;
+ bool Success;
+};
+
+/**
+ * The upstream endpont is responsible for handling upload/downloading of cache records.
+ */
+class UpstreamEndpoint
+{
+public:
+ virtual ~UpstreamEndpoint() = default;
+
+ virtual bool Initialize() = 0;
+
+ virtual std::string_view DisplayName() const = 0;
+
+ virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
+
+ virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0;
+
+ virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
+ IoBuffer RecordValue,
+ std::span<IoBuffer const> Payloads) = 0;
};
/**
@@ -53,21 +81,13 @@ class UpstreamCache
public:
virtual ~UpstreamCache() = default;
- struct GetCacheRecordResult
- {
- IoBuffer Value;
- bool Success = false;
- };
+ virtual bool Initialize() = 0;
- virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
+ virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
- struct GetCachePayloadResult
- {
- IoBuffer Payload;
- bool Success = false;
- };
+ virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
- virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0;
+ virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0;
struct EnqueueResult
{
@@ -79,4 +99,8 @@ public:
std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore);
+std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options);
+
+std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::string_view Url);
+
} // namespace zen
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index e9102ad45..3d4999e5d 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -8,6 +8,7 @@
#include <zencore/stream.h>
#include "cache/structuredcachestore.h"
+#include "diag/logging.h"
// cpr ////////////////////////////////////////////////////////////////////
//
@@ -20,7 +21,6 @@
#include <cpr/cpr.h>
#pragma warning(pop)
-#include <spdlog/spdlog.h>
#include <xxhash.h>
#include <gsl/gsl-lite.hpp>
@@ -322,6 +322,45 @@ namespace detail {
ZenStructuredCacheClient& OwnerClient;
cpr::Session Session;
};
+
+ static void LogResponse(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response)
+ {
+ using namespace std::literals;
+
+ std::string_view ContentType = "unknown"sv;
+ if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
+ {
+ ContentType = It->second;
+ }
+
+ const uint64_t Bytes = Verb == "GET"sv ? Response.downloaded_bytes : Response.uploaded_bytes;
+
+ const bool IsBinary =
+ ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv || ContentType == "application/octet-stream";
+
+ if (IsBinary)
+ {
+ Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}' '{}' Bytes, Reason: '{}'",
+ Verb,
+ Response.url.str(),
+ Response.status_code,
+ Response.elapsed,
+ ContentType,
+ Bytes,
+ Response.reason);
+ }
+ else
+ {
+ Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}': '{}', Reason: '{}'",
+ Verb,
+ Response.url.str(),
+ Response.status_code,
+ Response.elapsed,
+ ContentType,
+ Response.text,
+ Response.reason);
+ }
+ }
} // namespace detail
//////////////////////////////////////////////////////////////////////////
@@ -364,50 +403,99 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State)
//////////////////////////////////////////////////////////////////////////
-ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) : m_Client(OuterClient)
+using namespace std::literals;
+
+ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient)
+: m_Log(zen::logging::Get("zenclient"sv))
+, m_Client(OuterClient)
{
+ m_SessionState = m_Client.AllocSessionState();
}
ZenStructuredCacheSession::~ZenStructuredCacheSession()
{
+ m_Client.FreeSessionState(m_SessionState);
}
-IoBuffer
-ZenStructuredCacheSession::Get(std::string_view BucketId, std::string_view Key)
+ZenCacheResult
+ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type)
{
- ZEN_UNUSED(BucketId, Key);
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Accept", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}});
+
+ cpr::Response Response = Session.Get();
+ detail::LogResponse(m_Log, "GET"sv, Response);
+
+ if (Response.status_code == 200)
+ {
+ return {.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), .Success = true};
+ }
return {};
}
-void
-ZenStructuredCacheSession::Put(std::string_view BucketId, std::string_view Key, IoBuffer Data)
+ZenCacheResult
+ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId)
{
- ZEN_UNUSED(BucketId, Key, Data);
-}
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
-// Structured cache operations
+ cpr::Session& Session = m_SessionState->Session;
-IoBuffer
-ZenStructuredCacheSession::Get(std::string_view BucketId, const IoHash& Key)
-{
- ZEN_UNUSED(BucketId, Key);
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}});
+
+ cpr::Response Response = Session.Get();
+ detail::LogResponse(m_Log, "GET"sv, Response);
+
+ if (Response.status_code == 200)
+ {
+ return {.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), .Success = true};
+ }
return {};
}
-IoBuffer
-ZenStructuredCacheSession::Get(std::string_view BucketId, const IoHash& Key, const IoHash& ContentId)
+ZenCacheResult
+ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type)
{
- ZEN_UNUSED(BucketId, Key, ContentId);
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString();
- return {};
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(
+ cpr::Header{{"Content-Type", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}});
+ Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()});
+
+ cpr::Response Response = Session.Put();
+ detail::LogResponse(m_Log, "PUT"sv, Response);
+
+ return {.Success = Response.status_code == 200};
}
-void
-ZenStructuredCacheSession::Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data)
+ZenCacheResult
+ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload)
{
- ZEN_UNUSED(BucketId, Key, Data);
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}});
+ Session.SetBody(cpr::Body{static_cast<const char*>(Payload.Data()), Payload.Size()});
+
+ cpr::Response Response = Session.Put();
+ detail::LogResponse(m_Log, "PUT"sv, Response);
+
+ return {.Success = Response.status_code == 200};
}
} // namespace zen
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 1d7d5752e..5df6da4a3 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -20,6 +20,10 @@
struct ZenCacheValue;
+namespace spdlog {
+class logger;
+}
+
namespace zen {
class CbObjectWriter;
@@ -81,6 +85,12 @@ namespace detail {
struct ZenCacheSessionState;
}
+struct ZenCacheResult
+{
+ IoBuffer Response;
+ bool Success = false;
+};
+
/** Zen Structured Cache session
*
* This provides a context in which cache queries can be performed
@@ -93,16 +103,13 @@ public:
ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient);
~ZenStructuredCacheSession();
- // Key-value cache operations
- IoBuffer Get(std::string_view BucketId, std::string_view Key);
- void Put(std::string_view BucketId, std::string_view Key, IoBuffer Data);
-
- // Structured cache operations
- IoBuffer Get(std::string_view BucketId, const IoHash& Key);
- IoBuffer Get(std::string_view BucketId, const IoHash& Key, const IoHash& ContentId);
- void Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data);
+ ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type);
+ ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId);
+ ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type);
+ ZenCacheResult PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload);
private:
+ spdlog::logger& m_Log;
ZenStructuredCacheClient& m_Client;
detail::ZenCacheSessionState* m_SessionState;
};
@@ -118,6 +125,8 @@ public:
ZenStructuredCacheClient(std::string_view ServiceUrl);
~ZenStructuredCacheClient();
+ std::string_view ServiceUrl() const { return m_ServiceUrl; }
+
private:
std::string m_ServiceUrl;
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 395ae5fc2..5f8b23efa 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -140,25 +140,69 @@ public:
if (ServiceConfig.StructuredCacheEnabled)
{
+ using namespace std::literals;
+ auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; };
+
spdlog::info("instantiating structured cache service");
m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
std::unique_ptr<zen::UpstreamCache> UpstreamCache;
- if (ServiceConfig.UpstreamCacheEnabled)
+ if (ServiceConfig.UpstreamCacheConfig.Enabled)
{
- using namespace std::literals;
+ const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig;
zen::UpstreamCacheOptions UpstreamOptions;
- UpstreamOptions.ThreadCount = 4;
- UpstreamOptions.JupiterEnabled = true;
- UpstreamOptions.JupiterEndpoint = "https://jupiter.devtools-dev.epicgames.com"sv;
- UpstreamOptions.JupiterDdcNamespace = "ue4.ddc"sv;
- UpstreamOptions.JupiterBlobStoreNamespace = "test.ddc"sv;
- UpstreamOptions.JupiterOAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv;
- UpstreamOptions.JupiterOAuthClientId = "0oao91lrhqPiAlaGD0x7"sv;
- UpstreamOptions.JupiterOAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv;
+
+ if (UpstreamConfig.UpstreamThreadCount > 0 && UpstreamConfig.UpstreamThreadCount < 32)
+ {
+ UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
+ }
UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
+
+ if (!UpstreamConfig.ZenConfig.Url.empty())
+ {
+ std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Url);
+ UpstreamCache->AddEndpoint(std::move(ZenEndpoint));
+ }
+
+ {
+ zen::CloudCacheClientOptions Options;
+ if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
+ {
+ Options = zen::CloudCacheClientOptions{
+ .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
+ .DdcNamespace = "ue4.ddc"sv,
+ .BlobStoreNamespace = "test.ddc"sv,
+ .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
+ .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
+ .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
+ };
+ }
+
+ Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl);
+ Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace);
+ Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace);
+ Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider);
+ Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId);
+ Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret);
+
+ if (!Options.ServiceUrl.empty())
+ {
+ std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options);
+ UpstreamCache->AddEndpoint(std::move(JupiterEndpoint));
+ }
+ }
+
+ if (UpstreamCache->Initialize())
+ {
+ spdlog::info("upstream cache active");
+ }
+ else
+ {
+ UpstreamCache.reset();
+ spdlog::info("upstream cache NOT active");
+ }
}
m_StructuredCacheService.reset(