aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-09-03 15:37:19 +0200
committerGitHub <[email protected]>2021-09-03 15:37:19 +0200
commitc04fa527593da17c719c4d899ec19caeb6480a94 (patch)
treefa65fd7585d55712bad546d76a0fc3518023a905 /zenserver/upstream
parentoops: Fixed AssertException implementation namespace (diff)
downloadzen-c04fa527593da17c719c4d899ec19caeb6480a94.tar.xz
zen-c04fa527593da17c719c4d899ec19caeb6480a94.zip
Zen upstream support (#7)
Diffstat (limited to 'zenserver/upstream')
-rw-r--r--zenserver/upstream/jupiter.cpp38
-rw-r--r--zenserver/upstream/jupiter.h19
-rw-r--r--zenserver/upstream/upstreamcache.cpp416
-rw-r--r--zenserver/upstream/upstreamcache.h64
-rw-r--r--zenserver/upstream/zen.cpp130
-rw-r--r--zenserver/upstream/zen.h25
6 files changed, 477 insertions, 215 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index a59b9d317..7e22b9af9 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -52,7 +52,7 @@ namespace detail {
cpr::Session Session;
};
- void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response)
+ static void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response)
{
std::string_view ContentType = "unknown"sv;
if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
@@ -299,24 +299,18 @@ CloudCacheAccessToken::SetToken(std::string_view Token)
// OAuthSecret: -GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d
//
-CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl,
- std::string_view DdcNamespace,
- std::string_view BlobStoreNamespace,
- std::string_view OAuthProvider,
- std::string_view OAuthClientId,
- std::string_view OAuthSecret)
+CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options)
: m_Log(zen::logging::Get("jupiter"))
-, m_ServiceUrl(ServiceUrl)
-, m_OAuthFullUri(OAuthProvider)
-, m_DdcNamespace(DdcNamespace)
-, m_BlobStoreNamespace(BlobStoreNamespace)
-, m_DefaultBucket("default")
-, m_OAuthClientId(OAuthClientId)
-, m_OAuthSecret(OAuthSecret)
+, m_ServiceUrl(Options.ServiceUrl)
+, m_OAuthFullUri(Options.OAuthProvider)
+, m_DdcNamespace(Options.DdcNamespace)
+, m_BlobStoreNamespace(Options.BlobStoreNamespace)
+, m_OAuthClientId(Options.OAuthClientId)
+, m_OAuthSecret(Options.OAuthSecret)
{
- if (!OAuthProvider.starts_with("http://"sv) && !OAuthProvider.starts_with("https://"sv))
+ if (!Options.OAuthProvider.starts_with("http://"sv) && !Options.OAuthProvider.starts_with("https://"sv))
{
- m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str());
+ m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(Options.OAuthProvider).c_str());
m_IsValid = false;
return;
@@ -324,28 +318,28 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl,
// Split into host and Uri substrings
- auto SchemePos = OAuthProvider.find("://"sv);
+ auto SchemePos = Options.OAuthProvider.find("://"sv);
if (SchemePos == std::string::npos)
{
- m_Log.warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl);
+ m_Log.warn("Bad service URL passed to cloud cache client: '{}'", Options.ServiceUrl);
m_IsValid = false;
return;
}
- auto DomainEnd = OAuthProvider.find('/', /* also skip the :// */ SchemePos + 3);
+ auto DomainEnd = Options.OAuthProvider.find('/', /* also skip the :// */ SchemePos + 3);
if (DomainEnd == std::string::npos)
{
- m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl);
+ m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", Options.ServiceUrl);
m_IsValid = false;
return;
}
- m_OAuthDomain = OAuthProvider.substr(SchemePos + 3, DomainEnd - SchemePos - 3); // epicgames.okta.com
- m_OAuthUriPath = OAuthProvider.substr(DomainEnd + 1); // oauth2/..../v1/token
+ m_OAuthDomain = Options.OAuthProvider.substr(SchemePos + 3, DomainEnd - SchemePos - 3); // epicgames.okta.com
+ m_OAuthUriPath = Options.OAuthProvider.substr(DomainEnd + 1); // oauth2/..../v1/token
}
CloudCacheClient::~CloudCacheClient()
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 61d1bd99c..ba5e0a65a 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -78,24 +78,28 @@ private:
detail::CloudCacheSessionState* m_SessionState;
};
+struct CloudCacheClientOptions
+{
+ std::string_view ServiceUrl;
+ std::string_view DdcNamespace;
+ std::string_view BlobStoreNamespace;
+ std::string_view OAuthProvider;
+ std::string_view OAuthClientId;
+ std::string_view OAuthSecret;
+};
+
/**
* Jupiter upstream cache client
*/
class CloudCacheClient : public RefCounted
{
public:
- CloudCacheClient(std::string_view ServiceUrl,
- std::string_view DdcNamespace,
- std::string_view BlobStoreNamespace,
- std::string_view OAuthProvider,
- std::string_view OAuthClientId,
- std::string_view OAuthSecret);
+ CloudCacheClient(const CloudCacheClientOptions& Options);
~CloudCacheClient();
bool AcquireAccessToken(std::string& AuthorizationHeaderValue);
std::string_view DdcNamespace() const { return m_DdcNamespace; }
std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; }
- std::string_view DefaultBucket() const { return m_DefaultBucket; }
std::string_view ServiceUrl() const { return m_ServiceUrl; }
bool IsValid() const { return m_IsValid; }
@@ -109,7 +113,6 @@ private:
std::string m_OAuthFullUri;
std::string m_DdcNamespace;
std::string m_BlobStoreNamespace;
- std::string m_DefaultBucket;
std::string m_OAuthClientId;
std::string m_OAuthSecret;
CloudCacheAccessToken m_AccessToken;
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index ecd51a706..40d7ebd26 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -12,6 +12,9 @@
#include "cache/structuredcachestore.h"
#include "diag/logging.h"
+#include <fmt/format.h>
+
+#include <algorithm>
#include <atomic>
#include <deque>
#include <thread>
@@ -83,70 +86,32 @@ namespace detail {
std::atomic_bool m_CompleteAdding{false};
};
-} // namespace detail
-
-//////////////////////////////////////////////////////////////////////////
-
-class DefaultUpstreamCache final : public UpstreamCache
-{
-public:
- DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
- : m_Log(zen::logging::Get("upstream"))
- , m_Options(Options)
- , m_CacheStore(CacheStore)
- , m_CidStore(CidStore)
+ class JupiterUpstreamEndpoint final : public zen::UpstreamEndpoint
{
- if (m_Options.JupiterEnabled)
+ public:
+ JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
{
- m_CloudClient = new CloudCacheClient(m_Options.JupiterEndpoint,
- m_Options.JupiterDdcNamespace,
- m_Options.JupiterBlobStoreNamespace,
- m_Options.JupiterOAuthProvider,
- m_Options.JupiterOAuthClientId,
- m_Options.JupiterOAuthSecret);
-
- std::string TmpAuthHeader;
- if (m_CloudClient->AcquireAccessToken(TmpAuthHeader) && m_CloudClient->IsValid())
- {
- m_Log.info("using Jupiter endpoint: '{}', DDC namespace: '{}', Blob Store namespace: '{}'",
- m_Options.JupiterEndpoint,
- m_Options.JupiterDdcNamespace,
- m_Options.JupiterBlobStoreNamespace);
- }
- else
- {
- m_Log.warn("failed to initialized Jupiter at '{}'", m_Options.JupiterEndpoint);
- }
+ using namespace fmt::literals;
+ m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl);
+ m_Client = new CloudCacheClient(Options);
}
- m_IsRunning = m_CloudClient && m_CloudClient->IsValid();
+ virtual ~JupiterUpstreamEndpoint() = default;
- if (m_IsRunning)
+ virtual bool Initialize() override
{
- m_Log.info("using '{}' upstream thread(s)", m_Options.ThreadCount);
- for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
- {
- m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this);
- }
+ //TODO: Test and authenticate Jupiter client connection
+ return !m_Client->ServiceUrl().empty();
}
- else
- {
- m_Log.warn("upstream disabled, no valid endpoints");
- }
- }
- virtual ~DefaultUpstreamCache() { Shutdown(); }
+ virtual std::string_view DisplayName() const override { return m_DisplayName; }
- virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
- {
- if (m_CloudClient && m_CloudClient->IsValid())
+ virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
{
- zen::Stopwatch Timer;
-
try
{
- CloudCacheSession Session(m_CloudClient);
- CloudCacheResult Result;
+ zen::CloudCacheSession Session(m_Client);
+ CloudCacheResult Result;
if (Type == ZenContentType::kBinary)
{
@@ -161,126 +126,293 @@ public:
}
catch (std::exception& e)
{
- m_Log.warn("get cache record ({}/{}) FAILED after {:5}: '{}'",
- CacheKey.Bucket,
- CacheKey.Hash,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
+ return {.Reason = std::string(e.what()), .Success = false};
}
}
- return {};
- }
+ virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ {
+ try
+ {
+ zen::CloudCacheSession Session(m_Client);
+ const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
+ return {.Value = Result.Value, .Success = Result.Success};
+ }
+ catch (std::exception& e)
+ {
+ return {.Reason = std::string(e.what()), .Success = false};
+ }
+ }
- virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
- {
- if (m_CloudClient && m_CloudClient->IsValid())
+ virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
+ IoBuffer RecordValue,
+ std::span<IoBuffer const> Payloads) override
{
- zen::Stopwatch Timer;
+ ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
+ const uint32_t MaxAttempts = 3;
try
{
- CloudCacheSession Session(m_CloudClient);
+ CloudCacheSession Session(m_Client);
+
+ if (CacheRecord.Type == ZenContentType::kBinary)
+ {
+ CloudCacheResult Result;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue);
+ }
- CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
- return {.Payload = Result.Value, .Success = Result.Success};
+ return {.Success = Result.Success};
+ }
+ else
+ {
+ for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ {
+ CloudCacheResult Result;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ }
+
+ if (!Result.Success)
+ {
+ return {.Reason = "Failed to upload payload", .Success = false};
+ }
+ }
+
+ {
+ CloudCacheResult Result;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue);
+ }
+
+ return {.Success = Result.Success};
+ }
+ }
}
catch (std::exception& e)
{
- m_Log.warn("get cache payload ({}/{}/{}) FAILED after {:5}: '{}'",
- PayloadKey.CacheKey.Bucket,
- PayloadKey.CacheKey.Hash,
- PayloadKey.PayloadId,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
+ return {.Reason = std::string(e.what()), .Success = false};
}
}
- return {};
- }
+ private:
+ std::string m_DisplayName;
+ RefPtr<CloudCacheClient> m_Client;
+ };
- virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
+ class ZenUpstreamEndpoint final : public zen::UpstreamEndpoint
{
- if (m_IsRunning.load())
+ public:
+ ZenUpstreamEndpoint(std::string_view ServiceUrl)
{
- m_UpstreamQueue.Enqueue(std::move(CacheRecord));
- return {.Success = true};
+ using namespace fmt::literals;
+ m_DisplayName = "Zen - '{}'"_format(ServiceUrl);
+ m_Client = new ZenStructuredCacheClient(ServiceUrl);
}
- return {};
- }
+ ~ZenUpstreamEndpoint() = default;
-private:
- void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
- {
- const uint32_t MaxAttempts = 3;
+ virtual bool Initialize() override
+ {
+ //TODO: Test and authenticate Zen client connection
+ return !m_Client->ServiceUrl().empty();
+ }
- if (m_CloudClient && m_CloudClient->IsValid())
+ virtual std::string_view DisplayName() const override { return m_DisplayName; }
+
+ virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
{
- CloudCacheSession Session(m_CloudClient);
- ZenCacheValue CacheValue;
- if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue))
+ try
{
- m_Log.warn("process upstream FAILED, '{}/{}' doesn't exist in cache",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash);
- return;
+ ZenStructuredCacheSession Session(*m_Client);
+ const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
+ return {.Value = Result.Response, .Success = Result.Success};
}
-
- if (CacheRecord.Type == ZenContentType::kBinary)
+ catch (std::exception& e)
{
- CloudCacheResult Result;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value);
- }
+ return {.Reason = std::string(e.what()), .Success = false};
+ }
+ }
- if (!Result.Success)
- {
- m_Log.warn("upload (binary) '{}/{}' FAILED after '{}' attempts",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- MaxAttempts);
- }
+ virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ {
+ try
+ {
+ ZenStructuredCacheSession Session(*m_Client);
+ const ZenCacheResult Result =
+ Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
+ return {.Value = Result.Response, .Success = Result.Success};
}
- else
+ catch (std::exception& e)
{
- ZEN_ASSERT(CacheRecord.Type == ZenContentType::kCbObject);
+ return {.Reason = std::string(e.what()), .Success = false};
+ }
+ }
+
+ virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
+ IoBuffer RecordValue,
+ std::span<IoBuffer const> Payloads) override
+ {
+ ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
+ const uint32_t MaxAttempts = 3;
- CloudCacheResult Result;
- for (const IoHash& PayloadId : CacheRecord.PayloadIds)
+ try
+ {
+ zen::ZenStructuredCacheSession Session(*m_Client);
+ ZenCacheResult Result;
+
+ for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
{
Result.Success = false;
- if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId))
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutCompressedBlob(PayloadId, Payload);
- }
+ Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ CacheRecord.PayloadIds[Idx],
+ Payloads[Idx]);
}
if (!Result.Success)
{
- m_Log.warn("upload payload '{}' FAILED after '{}' attempts", PayloadId, MaxAttempts);
- break;
+ return {.Reason = "Failed to upload payload", .Success = false};
}
}
- if (Result.Success)
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- Result.Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value);
- }
+ Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
}
- if (!Result.Success)
- {
- m_Log.warn("upload cache record '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
- }
+ return {.Success = Result.Success};
+ }
+ catch (std::exception& e)
+ {
+ return {.Reason = std::string(e.what()), .Success = false};
+ }
+ }
+
+ private:
+ std::string m_DisplayName;
+ RefPtr<zen::ZenStructuredCacheClient> m_Client;
+ };
+
+} // namespace detail
+
+//////////////////////////////////////////////////////////////////////////
+
+class DefaultUpstreamCache final : public UpstreamCache
+{
+public:
+ DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
+ : m_Log(zen::logging::Get("upstream"))
+ , m_Options(Options)
+ , m_CacheStore(CacheStore)
+ , m_CidStore(CidStore)
+ {
+ ZEN_ASSERT(m_Options.ThreadCount > 0);
+ }
+
+ virtual ~DefaultUpstreamCache() { Shutdown(); }
+
+ virtual bool Initialize() override
+ {
+ auto NewEnd = std::remove_if(std::begin(m_Endpoints), std::end(m_Endpoints), [this](auto& Endpoint) {
+ const bool Ok = Endpoint->Initialize();
+ m_Log.info("{} [{}]", Endpoint->DisplayName(), Ok ? "OK" : "FAILED");
+ return !Ok;
+ });
+
+ m_Endpoints.erase(NewEnd, std::end(m_Endpoints));
+ m_IsRunning = !m_Endpoints.empty();
+
+ if (m_IsRunning)
+ {
+ for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
+ {
+ m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this);
+ }
+ }
+
+ return m_IsRunning;
+ }
+
+ virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); }
+
+ virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ {
+ return Result;
+ }
+ }
+
+ return {};
+ }
+
+ virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ {
+ return Result;
+ }
+ }
+
+ return {};
+ }
+
+ virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
+ {
+ if (m_IsRunning.load())
+ {
+ m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ return {.Success = true};
+ }
+
+ return {};
+ }
+
+private:
+ void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
+ {
+ ZenCacheValue CacheValue;
+ std::vector<IoBuffer> Payloads;
+
+ if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue))
+ {
+ m_Log.warn("process upstream FAILED, '{}/{}', cache record doesn't exist",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash);
+ return;
+ }
+
+ for (const IoHash& PayloadId : CacheRecord.PayloadIds)
+ {
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId))
+ {
+ Payloads.push_back(Payload);
+ }
+ else
+ {
+ m_Log.warn("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ PayloadId);
+ return;
}
}
+
+ for (auto& Endpoint : m_Endpoints)
+ {
+ Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ }
}
void ProcessUpstreamQueue()
@@ -320,20 +452,20 @@ private:
}
m_UpstreamThreads.clear();
+ m_Endpoints.clear();
}
}
using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>;
- spdlog::logger& m_Log;
- UpstreamCacheOptions m_Options;
- ::ZenCacheStore& m_CacheStore;
- CidStore& m_CidStore;
- UpstreamQueue m_UpstreamQueue;
- RefPtr<CloudCacheClient> m_CloudClient;
- RefPtr<ZenStructuredCacheClient> m_ZenClient;
- std::vector<std::thread> m_UpstreamThreads;
- std::atomic_bool m_IsRunning{false};
+ spdlog::logger& m_Log;
+ UpstreamCacheOptions m_Options;
+ ::ZenCacheStore& m_CacheStore;
+ CidStore& m_CidStore;
+ UpstreamQueue m_UpstreamQueue;
+ std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints;
+ std::vector<std::thread> m_UpstreamThreads;
+ std::atomic_bool m_IsRunning{false};
};
//////////////////////////////////////////////////////////////////////////
@@ -344,4 +476,16 @@ MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheSto
return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore);
}
+std::unique_ptr<UpstreamEndpoint>
+MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
+{
+ return std::make_unique<detail::JupiterUpstreamEndpoint>(Options);
+}
+
+std::unique_ptr<UpstreamEndpoint>
+MakeZenUpstreamEndpoint(std::string_view Url)
+{
+ return std::make_unique<detail::ZenUpstreamEndpoint>(Url);
+}
+
} // namespace zen
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 23a542151..d8359bc2c 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -13,6 +13,7 @@ class ZenCacheStore;
namespace zen {
class CidStore;
+struct CloudCacheClientOptions;
struct UpstreamCacheKey
{
@@ -35,14 +36,41 @@ struct UpstreamCacheRecord
struct UpstreamCacheOptions
{
- uint32_t ThreadCount = 4;
- bool JupiterEnabled = true;
- std::string_view JupiterEndpoint;
- std::string_view JupiterDdcNamespace;
- std::string_view JupiterBlobStoreNamespace;
- std::string_view JupiterOAuthProvider;
- std::string_view JupiterOAuthClientId;
- std::string_view JupiterOAuthSecret;
+ uint32_t ThreadCount = 4;
+};
+
+struct GetUpstreamCacheResult
+{
+ IoBuffer Value;
+ std::string Reason;
+ bool Success = false;
+};
+
+struct PutUpstreamCacheResult
+{
+ std::string Reason;
+ bool Success;
+};
+
+/**
+ * The upstream endpont is responsible for handling upload/downloading of cache records.
+ */
+class UpstreamEndpoint
+{
+public:
+ virtual ~UpstreamEndpoint() = default;
+
+ virtual bool Initialize() = 0;
+
+ virtual std::string_view DisplayName() const = 0;
+
+ virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
+
+ virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0;
+
+ virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
+ IoBuffer RecordValue,
+ std::span<IoBuffer const> Payloads) = 0;
};
/**
@@ -53,21 +81,13 @@ class UpstreamCache
public:
virtual ~UpstreamCache() = default;
- struct GetCacheRecordResult
- {
- IoBuffer Value;
- bool Success = false;
- };
+ virtual bool Initialize() = 0;
- virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
+ virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
- struct GetCachePayloadResult
- {
- IoBuffer Payload;
- bool Success = false;
- };
+ virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
- virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0;
+ virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0;
struct EnqueueResult
{
@@ -79,4 +99,8 @@ public:
std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore);
+std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options);
+
+std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::string_view Url);
+
} // namespace zen
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index e9102ad45..3d4999e5d 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -8,6 +8,7 @@
#include <zencore/stream.h>
#include "cache/structuredcachestore.h"
+#include "diag/logging.h"
// cpr ////////////////////////////////////////////////////////////////////
//
@@ -20,7 +21,6 @@
#include <cpr/cpr.h>
#pragma warning(pop)
-#include <spdlog/spdlog.h>
#include <xxhash.h>
#include <gsl/gsl-lite.hpp>
@@ -322,6 +322,45 @@ namespace detail {
ZenStructuredCacheClient& OwnerClient;
cpr::Session Session;
};
+
+ static void LogResponse(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response)
+ {
+ using namespace std::literals;
+
+ std::string_view ContentType = "unknown"sv;
+ if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
+ {
+ ContentType = It->second;
+ }
+
+ const uint64_t Bytes = Verb == "GET"sv ? Response.downloaded_bytes : Response.uploaded_bytes;
+
+ const bool IsBinary =
+ ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv || ContentType == "application/octet-stream";
+
+ if (IsBinary)
+ {
+ Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}' '{}' Bytes, Reason: '{}'",
+ Verb,
+ Response.url.str(),
+ Response.status_code,
+ Response.elapsed,
+ ContentType,
+ Bytes,
+ Response.reason);
+ }
+ else
+ {
+ Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}': '{}', Reason: '{}'",
+ Verb,
+ Response.url.str(),
+ Response.status_code,
+ Response.elapsed,
+ ContentType,
+ Response.text,
+ Response.reason);
+ }
+ }
} // namespace detail
//////////////////////////////////////////////////////////////////////////
@@ -364,50 +403,99 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State)
//////////////////////////////////////////////////////////////////////////
-ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) : m_Client(OuterClient)
+using namespace std::literals;
+
+ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient)
+: m_Log(zen::logging::Get("zenclient"sv))
+, m_Client(OuterClient)
{
+ m_SessionState = m_Client.AllocSessionState();
}
ZenStructuredCacheSession::~ZenStructuredCacheSession()
{
+ m_Client.FreeSessionState(m_SessionState);
}
-IoBuffer
-ZenStructuredCacheSession::Get(std::string_view BucketId, std::string_view Key)
+ZenCacheResult
+ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type)
{
- ZEN_UNUSED(BucketId, Key);
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Accept", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}});
+
+ cpr::Response Response = Session.Get();
+ detail::LogResponse(m_Log, "GET"sv, Response);
+
+ if (Response.status_code == 200)
+ {
+ return {.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), .Success = true};
+ }
return {};
}
-void
-ZenStructuredCacheSession::Put(std::string_view BucketId, std::string_view Key, IoBuffer Data)
+ZenCacheResult
+ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId)
{
- ZEN_UNUSED(BucketId, Key, Data);
-}
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
-// Structured cache operations
+ cpr::Session& Session = m_SessionState->Session;
-IoBuffer
-ZenStructuredCacheSession::Get(std::string_view BucketId, const IoHash& Key)
-{
- ZEN_UNUSED(BucketId, Key);
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}});
+
+ cpr::Response Response = Session.Get();
+ detail::LogResponse(m_Log, "GET"sv, Response);
+
+ if (Response.status_code == 200)
+ {
+ return {.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), .Success = true};
+ }
return {};
}
-IoBuffer
-ZenStructuredCacheSession::Get(std::string_view BucketId, const IoHash& Key, const IoHash& ContentId)
+ZenCacheResult
+ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type)
{
- ZEN_UNUSED(BucketId, Key, ContentId);
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString();
- return {};
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(
+ cpr::Header{{"Content-Type", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}});
+ Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()});
+
+ cpr::Response Response = Session.Put();
+ detail::LogResponse(m_Log, "PUT"sv, Response);
+
+ return {.Success = Response.status_code == 200};
}
-void
-ZenStructuredCacheSession::Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data)
+ZenCacheResult
+ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload)
{
- ZEN_UNUSED(BucketId, Key, Data);
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}});
+ Session.SetBody(cpr::Body{static_cast<const char*>(Payload.Data()), Payload.Size()});
+
+ cpr::Response Response = Session.Put();
+ detail::LogResponse(m_Log, "PUT"sv, Response);
+
+ return {.Success = Response.status_code == 200};
}
} // namespace zen
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 1d7d5752e..5df6da4a3 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -20,6 +20,10 @@
struct ZenCacheValue;
+namespace spdlog {
+class logger;
+}
+
namespace zen {
class CbObjectWriter;
@@ -81,6 +85,12 @@ namespace detail {
struct ZenCacheSessionState;
}
+struct ZenCacheResult
+{
+ IoBuffer Response;
+ bool Success = false;
+};
+
/** Zen Structured Cache session
*
* This provides a context in which cache queries can be performed
@@ -93,16 +103,13 @@ public:
ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient);
~ZenStructuredCacheSession();
- // Key-value cache operations
- IoBuffer Get(std::string_view BucketId, std::string_view Key);
- void Put(std::string_view BucketId, std::string_view Key, IoBuffer Data);
-
- // Structured cache operations
- IoBuffer Get(std::string_view BucketId, const IoHash& Key);
- IoBuffer Get(std::string_view BucketId, const IoHash& Key, const IoHash& ContentId);
- void Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data);
+ ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type);
+ ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId);
+ ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type);
+ ZenCacheResult PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload);
private:
+ spdlog::logger& m_Log;
ZenStructuredCacheClient& m_Client;
detail::ZenCacheSessionState* m_SessionState;
};
@@ -118,6 +125,8 @@ public:
ZenStructuredCacheClient(std::string_view ServiceUrl);
~ZenStructuredCacheClient();
+ std::string_view ServiceUrl() const { return m_ServiceUrl; }
+
private:
std::string m_ServiceUrl;