aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp416
1 files changed, 280 insertions, 136 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index ecd51a706..40d7ebd26 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -12,6 +12,9 @@
#include "cache/structuredcachestore.h"
#include "diag/logging.h"
+#include <fmt/format.h>
+
+#include <algorithm>
#include <atomic>
#include <deque>
#include <thread>
@@ -83,70 +86,32 @@ namespace detail {
std::atomic_bool m_CompleteAdding{false};
};
-} // namespace detail
-
-//////////////////////////////////////////////////////////////////////////
-
-class DefaultUpstreamCache final : public UpstreamCache
-{
-public:
- DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
- : m_Log(zen::logging::Get("upstream"))
- , m_Options(Options)
- , m_CacheStore(CacheStore)
- , m_CidStore(CidStore)
+ class JupiterUpstreamEndpoint final : public zen::UpstreamEndpoint
{
- if (m_Options.JupiterEnabled)
+ public:
+ JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
{
- m_CloudClient = new CloudCacheClient(m_Options.JupiterEndpoint,
- m_Options.JupiterDdcNamespace,
- m_Options.JupiterBlobStoreNamespace,
- m_Options.JupiterOAuthProvider,
- m_Options.JupiterOAuthClientId,
- m_Options.JupiterOAuthSecret);
-
- std::string TmpAuthHeader;
- if (m_CloudClient->AcquireAccessToken(TmpAuthHeader) && m_CloudClient->IsValid())
- {
- m_Log.info("using Jupiter endpoint: '{}', DDC namespace: '{}', Blob Store namespace: '{}'",
- m_Options.JupiterEndpoint,
- m_Options.JupiterDdcNamespace,
- m_Options.JupiterBlobStoreNamespace);
- }
- else
- {
- m_Log.warn("failed to initialized Jupiter at '{}'", m_Options.JupiterEndpoint);
- }
+ using namespace fmt::literals;
+ m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl);
+ m_Client = new CloudCacheClient(Options);
}
- m_IsRunning = m_CloudClient && m_CloudClient->IsValid();
+ virtual ~JupiterUpstreamEndpoint() = default;
- if (m_IsRunning)
+ virtual bool Initialize() override
{
- m_Log.info("using '{}' upstream thread(s)", m_Options.ThreadCount);
- for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
- {
- m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this);
- }
+ //TODO: Test and authenticate Jupiter client connection
+ return !m_Client->ServiceUrl().empty();
}
- else
- {
- m_Log.warn("upstream disabled, no valid endpoints");
- }
- }
- virtual ~DefaultUpstreamCache() { Shutdown(); }
+ virtual std::string_view DisplayName() const override { return m_DisplayName; }
- virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
- {
- if (m_CloudClient && m_CloudClient->IsValid())
+ virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
{
- zen::Stopwatch Timer;
-
try
{
- CloudCacheSession Session(m_CloudClient);
- CloudCacheResult Result;
+ zen::CloudCacheSession Session(m_Client);
+ CloudCacheResult Result;
if (Type == ZenContentType::kBinary)
{
@@ -161,126 +126,293 @@ public:
}
catch (std::exception& e)
{
- m_Log.warn("get cache record ({}/{}) FAILED after {:5}: '{}'",
- CacheKey.Bucket,
- CacheKey.Hash,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
+ return {.Reason = std::string(e.what()), .Success = false};
}
}
- return {};
- }
+ virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ {
+ try
+ {
+ zen::CloudCacheSession Session(m_Client);
+ const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
+ return {.Value = Result.Value, .Success = Result.Success};
+ }
+ catch (std::exception& e)
+ {
+ return {.Reason = std::string(e.what()), .Success = false};
+ }
+ }
- virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
- {
- if (m_CloudClient && m_CloudClient->IsValid())
+ virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
+ IoBuffer RecordValue,
+ std::span<IoBuffer const> Payloads) override
{
- zen::Stopwatch Timer;
+ ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
+ const uint32_t MaxAttempts = 3;
try
{
- CloudCacheSession Session(m_CloudClient);
+ CloudCacheSession Session(m_Client);
+
+ if (CacheRecord.Type == ZenContentType::kBinary)
+ {
+ CloudCacheResult Result;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue);
+ }
- CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
- return {.Payload = Result.Value, .Success = Result.Success};
+ return {.Success = Result.Success};
+ }
+ else
+ {
+ for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ {
+ CloudCacheResult Result;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ }
+
+ if (!Result.Success)
+ {
+ return {.Reason = "Failed to upload payload", .Success = false};
+ }
+ }
+
+ {
+ CloudCacheResult Result;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue);
+ }
+
+ return {.Success = Result.Success};
+ }
+ }
}
catch (std::exception& e)
{
- m_Log.warn("get cache payload ({}/{}/{}) FAILED after {:5}: '{}'",
- PayloadKey.CacheKey.Bucket,
- PayloadKey.CacheKey.Hash,
- PayloadKey.PayloadId,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
+ return {.Reason = std::string(e.what()), .Success = false};
}
}
- return {};
- }
+ private:
+ std::string m_DisplayName;
+ RefPtr<CloudCacheClient> m_Client;
+ };
- virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
+ class ZenUpstreamEndpoint final : public zen::UpstreamEndpoint
{
- if (m_IsRunning.load())
+ public:
+ ZenUpstreamEndpoint(std::string_view ServiceUrl)
{
- m_UpstreamQueue.Enqueue(std::move(CacheRecord));
- return {.Success = true};
+ using namespace fmt::literals;
+ m_DisplayName = "Zen - '{}'"_format(ServiceUrl);
+ m_Client = new ZenStructuredCacheClient(ServiceUrl);
}
- return {};
- }
+ ~ZenUpstreamEndpoint() = default;
-private:
- void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
- {
- const uint32_t MaxAttempts = 3;
+ virtual bool Initialize() override
+ {
+ //TODO: Test and authenticate Zen client connection
+ return !m_Client->ServiceUrl().empty();
+ }
- if (m_CloudClient && m_CloudClient->IsValid())
+ virtual std::string_view DisplayName() const override { return m_DisplayName; }
+
+ virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
{
- CloudCacheSession Session(m_CloudClient);
- ZenCacheValue CacheValue;
- if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue))
+ try
{
- m_Log.warn("process upstream FAILED, '{}/{}' doesn't exist in cache",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash);
- return;
+ ZenStructuredCacheSession Session(*m_Client);
+ const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
+ return {.Value = Result.Response, .Success = Result.Success};
}
-
- if (CacheRecord.Type == ZenContentType::kBinary)
+ catch (std::exception& e)
{
- CloudCacheResult Result;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value);
- }
+ return {.Reason = std::string(e.what()), .Success = false};
+ }
+ }
- if (!Result.Success)
- {
- m_Log.warn("upload (binary) '{}/{}' FAILED after '{}' attempts",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- MaxAttempts);
- }
+ virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ {
+ try
+ {
+ ZenStructuredCacheSession Session(*m_Client);
+ const ZenCacheResult Result =
+ Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
+ return {.Value = Result.Response, .Success = Result.Success};
}
- else
+ catch (std::exception& e)
{
- ZEN_ASSERT(CacheRecord.Type == ZenContentType::kCbObject);
+ return {.Reason = std::string(e.what()), .Success = false};
+ }
+ }
+
+ virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
+ IoBuffer RecordValue,
+ std::span<IoBuffer const> Payloads) override
+ {
+ ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
+ const uint32_t MaxAttempts = 3;
- CloudCacheResult Result;
- for (const IoHash& PayloadId : CacheRecord.PayloadIds)
+ try
+ {
+ zen::ZenStructuredCacheSession Session(*m_Client);
+ ZenCacheResult Result;
+
+ for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
{
Result.Success = false;
- if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId))
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutCompressedBlob(PayloadId, Payload);
- }
+ Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ CacheRecord.PayloadIds[Idx],
+ Payloads[Idx]);
}
if (!Result.Success)
{
- m_Log.warn("upload payload '{}' FAILED after '{}' attempts", PayloadId, MaxAttempts);
- break;
+ return {.Reason = "Failed to upload payload", .Success = false};
}
}
- if (Result.Success)
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- Result.Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value);
- }
+ Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
}
- if (!Result.Success)
- {
- m_Log.warn("upload cache record '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
- }
+ return {.Success = Result.Success};
+ }
+ catch (std::exception& e)
+ {
+ return {.Reason = std::string(e.what()), .Success = false};
+ }
+ }
+
+ private:
+ std::string m_DisplayName;
+ RefPtr<zen::ZenStructuredCacheClient> m_Client;
+ };
+
+} // namespace detail
+
+//////////////////////////////////////////////////////////////////////////
+
+class DefaultUpstreamCache final : public UpstreamCache
+{
+public:
+ DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
+ : m_Log(zen::logging::Get("upstream"))
+ , m_Options(Options)
+ , m_CacheStore(CacheStore)
+ , m_CidStore(CidStore)
+ {
+ ZEN_ASSERT(m_Options.ThreadCount > 0);
+ }
+
+ virtual ~DefaultUpstreamCache() { Shutdown(); }
+
+ virtual bool Initialize() override
+ {
+ auto NewEnd = std::remove_if(std::begin(m_Endpoints), std::end(m_Endpoints), [this](auto& Endpoint) {
+ const bool Ok = Endpoint->Initialize();
+ m_Log.info("{} [{}]", Endpoint->DisplayName(), Ok ? "OK" : "FAILED");
+ return !Ok;
+ });
+
+ m_Endpoints.erase(NewEnd, std::end(m_Endpoints));
+ m_IsRunning = !m_Endpoints.empty();
+
+ if (m_IsRunning)
+ {
+ for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
+ {
+ m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this);
+ }
+ }
+
+ return m_IsRunning;
+ }
+
+ virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); }
+
+ virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ {
+ return Result;
+ }
+ }
+
+ return {};
+ }
+
+ virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ {
+ return Result;
+ }
+ }
+
+ return {};
+ }
+
+ virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
+ {
+ if (m_IsRunning.load())
+ {
+ m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ return {.Success = true};
+ }
+
+ return {};
+ }
+
+private:
+ void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
+ {
+ ZenCacheValue CacheValue;
+ std::vector<IoBuffer> Payloads;
+
+ if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue))
+ {
+ m_Log.warn("process upstream FAILED, '{}/{}', cache record doesn't exist",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash);
+ return;
+ }
+
+ for (const IoHash& PayloadId : CacheRecord.PayloadIds)
+ {
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId))
+ {
+ Payloads.push_back(Payload);
+ }
+ else
+ {
+ m_Log.warn("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ PayloadId);
+ return;
}
}
+
+ for (auto& Endpoint : m_Endpoints)
+ {
+ Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ }
}
void ProcessUpstreamQueue()
@@ -320,20 +452,20 @@ private:
}
m_UpstreamThreads.clear();
+ m_Endpoints.clear();
}
}
using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>;
- spdlog::logger& m_Log;
- UpstreamCacheOptions m_Options;
- ::ZenCacheStore& m_CacheStore;
- CidStore& m_CidStore;
- UpstreamQueue m_UpstreamQueue;
- RefPtr<CloudCacheClient> m_CloudClient;
- RefPtr<ZenStructuredCacheClient> m_ZenClient;
- std::vector<std::thread> m_UpstreamThreads;
- std::atomic_bool m_IsRunning{false};
+ spdlog::logger& m_Log;
+ UpstreamCacheOptions m_Options;
+ ::ZenCacheStore& m_CacheStore;
+ CidStore& m_CidStore;
+ UpstreamQueue m_UpstreamQueue;
+ std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints;
+ std::vector<std::thread> m_UpstreamThreads;
+ std::atomic_bool m_IsRunning{false};
};
//////////////////////////////////////////////////////////////////////////
@@ -344,4 +476,16 @@ MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheSto
return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore);
}
+std::unique_ptr<UpstreamEndpoint>
+MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
+{
+ return std::make_unique<detail::JupiterUpstreamEndpoint>(Options);
+}
+
+std::unique_ptr<UpstreamEndpoint>
+MakeZenUpstreamEndpoint(std::string_view Url)
+{
+ return std::make_unique<detail::ZenUpstreamEndpoint>(Url);
+}
+
} // namespace zen