aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp347
1 files changed, 347 insertions, 0 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
new file mode 100644
index 000000000..ecd51a706
--- /dev/null
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -0,0 +1,347 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "upstreamcache.h"
+#include "jupiter.h"
+#include "zen.h"
+
+#include <zencore/fmtutils.h>
+#include <zencore/timer.h>
+#include <zenstore/cas.h>
+#include <zenstore/cidstore.h>
+
+#include "cache/structuredcachestore.h"
+#include "diag/logging.h"
+
+#include <atomic>
+#include <deque>
+#include <thread>
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace detail {
+
+ template<typename T>
+ class BlockingQueue
+ {
+ public:
+ BlockingQueue() = default;
+
+ ~BlockingQueue() { CompleteAdding(); }
+
+ void Enqueue(T&& Item)
+ {
+ {
+ std::lock_guard Lock(m_Lock);
+ m_Queue.emplace_back(std::move(Item));
+ }
+
+ m_NewItemSignal.notify_one();
+ }
+
+ bool WaitAndDequeue(T& Item)
+ {
+ if (m_CompleteAdding.load())
+ {
+ return false;
+ }
+
+ std::unique_lock Lock(m_Lock);
+ m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); });
+
+ if (!m_Queue.empty())
+ {
+ Item = std::move(m_Queue.front());
+ m_Queue.pop_front();
+
+ return true;
+ }
+
+ return false;
+ }
+
+ void CompleteAdding()
+ {
+ if (!m_CompleteAdding.load())
+ {
+ m_CompleteAdding.store(true);
+ m_NewItemSignal.notify_all();
+ }
+ }
+
+ std::size_t Num() const
+ {
+ std::unique_lock Lock(m_Lock);
+ return m_Queue.size();
+ }
+
+ private:
+ mutable std::mutex m_Lock;
+ std::condition_variable m_NewItemSignal;
+ std::deque<T> m_Queue;
+ std::atomic_bool m_CompleteAdding{false};
+ };
+
+} // namespace detail
+
+//////////////////////////////////////////////////////////////////////////
+
+class DefaultUpstreamCache final : public UpstreamCache
+{
+public:
+ DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
+ : m_Log(zen::logging::Get("upstream"))
+ , m_Options(Options)
+ , m_CacheStore(CacheStore)
+ , m_CidStore(CidStore)
+ {
+ if (m_Options.JupiterEnabled)
+ {
+ m_CloudClient = new CloudCacheClient(m_Options.JupiterEndpoint,
+ m_Options.JupiterDdcNamespace,
+ m_Options.JupiterBlobStoreNamespace,
+ m_Options.JupiterOAuthProvider,
+ m_Options.JupiterOAuthClientId,
+ m_Options.JupiterOAuthSecret);
+
+ std::string TmpAuthHeader;
+ if (m_CloudClient->AcquireAccessToken(TmpAuthHeader) && m_CloudClient->IsValid())
+ {
+ m_Log.info("using Jupiter endpoint: '{}', DDC namespace: '{}', Blob Store namespace: '{}'",
+ m_Options.JupiterEndpoint,
+ m_Options.JupiterDdcNamespace,
+ m_Options.JupiterBlobStoreNamespace);
+ }
+ else
+ {
+ m_Log.warn("failed to initialized Jupiter at '{}'", m_Options.JupiterEndpoint);
+ }
+ }
+
+ m_IsRunning = m_CloudClient && m_CloudClient->IsValid();
+
+ if (m_IsRunning)
+ {
+ m_Log.info("using '{}' upstream thread(s)", m_Options.ThreadCount);
+ for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
+ {
+ m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this);
+ }
+ }
+ else
+ {
+ m_Log.warn("upstream disabled, no valid endpoints");
+ }
+ }
+
+ virtual ~DefaultUpstreamCache() { Shutdown(); }
+
+ virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ {
+ if (m_CloudClient && m_CloudClient->IsValid())
+ {
+ zen::Stopwatch Timer;
+
+ try
+ {
+ CloudCacheSession Session(m_CloudClient);
+ CloudCacheResult Result;
+
+ if (Type == ZenContentType::kBinary)
+ {
+ Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash);
+ }
+ else
+ {
+ Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash);
+ }
+
+ return {.Value = Result.Value, .Success = Result.Success};
+ }
+ catch (std::exception& e)
+ {
+ m_Log.warn("get cache record ({}/{}) FAILED after {:5}: '{}'",
+ CacheKey.Bucket,
+ CacheKey.Hash,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ e.what());
+ }
+ }
+
+ return {};
+ }
+
+ virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ {
+ if (m_CloudClient && m_CloudClient->IsValid())
+ {
+ zen::Stopwatch Timer;
+
+ try
+ {
+ CloudCacheSession Session(m_CloudClient);
+
+ CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
+ return {.Payload = Result.Value, .Success = Result.Success};
+ }
+ catch (std::exception& e)
+ {
+ m_Log.warn("get cache payload ({}/{}/{}) FAILED after {:5}: '{}'",
+ PayloadKey.CacheKey.Bucket,
+ PayloadKey.CacheKey.Hash,
+ PayloadKey.PayloadId,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ e.what());
+ }
+ }
+
+ return {};
+ }
+
+ virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
+ {
+ if (m_IsRunning.load())
+ {
+ m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ return {.Success = true};
+ }
+
+ return {};
+ }
+
+private:
+ void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
+ {
+ const uint32_t MaxAttempts = 3;
+
+ if (m_CloudClient && m_CloudClient->IsValid())
+ {
+ CloudCacheSession Session(m_CloudClient);
+ ZenCacheValue CacheValue;
+ if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue))
+ {
+ m_Log.warn("process upstream FAILED, '{}/{}' doesn't exist in cache",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash);
+ return;
+ }
+
+ if (CacheRecord.Type == ZenContentType::kBinary)
+ {
+ CloudCacheResult Result;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value);
+ }
+
+ if (!Result.Success)
+ {
+ m_Log.warn("upload (binary) '{}/{}' FAILED after '{}' attempts",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ MaxAttempts);
+ }
+ }
+ else
+ {
+ ZEN_ASSERT(CacheRecord.Type == ZenContentType::kCbObject);
+
+ CloudCacheResult Result;
+ for (const IoHash& PayloadId : CacheRecord.PayloadIds)
+ {
+ Result.Success = false;
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId))
+ {
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutCompressedBlob(PayloadId, Payload);
+ }
+ }
+
+ if (!Result.Success)
+ {
+ m_Log.warn("upload payload '{}' FAILED after '{}' attempts", PayloadId, MaxAttempts);
+ break;
+ }
+ }
+
+ if (Result.Success)
+ {
+ Result.Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value);
+ }
+ }
+
+ if (!Result.Success)
+ {
+ m_Log.warn("upload cache record '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
+ }
+ }
+ }
+ }
+
+ void ProcessUpstreamQueue()
+ {
+ for (;;)
+ {
+ UpstreamCacheRecord CacheRecord;
+ if (m_UpstreamQueue.WaitAndDequeue(CacheRecord))
+ {
+ try
+ {
+ ProcessCacheRecord(std::move(CacheRecord));
+ }
+ catch (std::exception& e)
+ {
+ m_Log.warn("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what());
+ }
+ }
+
+ if (!m_IsRunning.load())
+ {
+ break;
+ }
+ }
+ }
+
+ void Shutdown()
+ {
+ if (m_IsRunning.load())
+ {
+ m_IsRunning.store(false);
+ m_UpstreamQueue.CompleteAdding();
+
+ for (std::thread& Thread : m_UpstreamThreads)
+ {
+ Thread.join();
+ }
+
+ m_UpstreamThreads.clear();
+ }
+ }
+
+ using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>;
+
+ spdlog::logger& m_Log;
+ UpstreamCacheOptions m_Options;
+ ::ZenCacheStore& m_CacheStore;
+ CidStore& m_CidStore;
+ UpstreamQueue m_UpstreamQueue;
+ RefPtr<CloudCacheClient> m_CloudClient;
+ RefPtr<ZenStructuredCacheClient> m_ZenClient;
+ std::vector<std::thread> m_UpstreamThreads;
+ std::atomic_bool m_IsRunning{false};
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<UpstreamCache>
+MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
+{
+ return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore);
+}
+
+} // namespace zen