diff options
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 347 |
1 files changed, 347 insertions, 0 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp new file mode 100644 index 000000000..ecd51a706 --- /dev/null +++ b/zenserver/upstream/upstreamcache.cpp @@ -0,0 +1,347 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "upstreamcache.h" +#include "jupiter.h" +#include "zen.h" + +#include <zencore/fmtutils.h> +#include <zencore/timer.h> +#include <zenstore/cas.h> +#include <zenstore/cidstore.h> + +#include "cache/structuredcachestore.h" +#include "diag/logging.h" + +#include <atomic> +#include <deque> +#include <thread> + +namespace zen { + +using namespace std::literals; + +namespace detail { + + template<typename T> + class BlockingQueue + { + public: + BlockingQueue() = default; + + ~BlockingQueue() { CompleteAdding(); } + + void Enqueue(T&& Item) + { + { + std::lock_guard Lock(m_Lock); + m_Queue.emplace_back(std::move(Item)); + } + + m_NewItemSignal.notify_one(); + } + + bool WaitAndDequeue(T& Item) + { + if (m_CompleteAdding.load()) + { + return false; + } + + std::unique_lock Lock(m_Lock); + m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); + + if (!m_Queue.empty()) + { + Item = std::move(m_Queue.front()); + m_Queue.pop_front(); + + return true; + } + + return false; + } + + void CompleteAdding() + { + if (!m_CompleteAdding.load()) + { + m_CompleteAdding.store(true); + m_NewItemSignal.notify_all(); + } + } + + std::size_t Num() const + { + std::unique_lock Lock(m_Lock); + return m_Queue.size(); + } + + private: + mutable std::mutex m_Lock; + std::condition_variable m_NewItemSignal; + std::deque<T> m_Queue; + std::atomic_bool m_CompleteAdding{false}; + }; + +} // namespace detail + +////////////////////////////////////////////////////////////////////////// + +class DefaultUpstreamCache final : public UpstreamCache +{ +public: + DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) + : m_Log(zen::logging::Get("upstream")) + , m_Options(Options) + , m_CacheStore(CacheStore) + , m_CidStore(CidStore) + { + if (m_Options.JupiterEnabled) + { + m_CloudClient = new CloudCacheClient(m_Options.JupiterEndpoint, + m_Options.JupiterDdcNamespace, + m_Options.JupiterBlobStoreNamespace, + m_Options.JupiterOAuthProvider, + m_Options.JupiterOAuthClientId, + m_Options.JupiterOAuthSecret); + + std::string TmpAuthHeader; + if (m_CloudClient->AcquireAccessToken(TmpAuthHeader) && m_CloudClient->IsValid()) + { + m_Log.info("using Jupiter endpoint: '{}', DDC namespace: '{}', Blob Store namespace: '{}'", + m_Options.JupiterEndpoint, + m_Options.JupiterDdcNamespace, + m_Options.JupiterBlobStoreNamespace); + } + else + { + m_Log.warn("failed to initialized Jupiter at '{}'", m_Options.JupiterEndpoint); + } + } + + m_IsRunning = m_CloudClient && m_CloudClient->IsValid(); + + if (m_IsRunning) + { + m_Log.info("using '{}' upstream thread(s)", m_Options.ThreadCount); + for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) + { + m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); + } + } + else + { + m_Log.warn("upstream disabled, no valid endpoints"); + } + } + + virtual ~DefaultUpstreamCache() { Shutdown(); } + + virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + { + if (m_CloudClient && m_CloudClient->IsValid()) + { + zen::Stopwatch Timer; + + try + { + CloudCacheSession Session(m_CloudClient); + CloudCacheResult Result; + + if (Type == ZenContentType::kBinary) + { + Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash); + } + else + { + Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash); + } + + return {.Value = Result.Value, .Success = Result.Success}; + } + catch (std::exception& e) + { + m_Log.warn("get cache record ({}/{}) FAILED after {:5}: '{}'", + CacheKey.Bucket, + CacheKey.Hash, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + } + } + + return {}; + } + + virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + { + if (m_CloudClient && m_CloudClient->IsValid()) + { + zen::Stopwatch Timer; + + try + { + CloudCacheSession Session(m_CloudClient); + + CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + return {.Payload = Result.Value, .Success = Result.Success}; + } + catch (std::exception& e) + { + m_Log.warn("get cache payload ({}/{}/{}) FAILED after {:5}: '{}'", + PayloadKey.CacheKey.Bucket, + PayloadKey.CacheKey.Hash, + PayloadKey.PayloadId, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + } + } + + return {}; + } + + virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + { + if (m_IsRunning.load()) + { + m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + return {.Success = true}; + } + + return {}; + } + +private: + void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) + { + const uint32_t MaxAttempts = 3; + + if (m_CloudClient && m_CloudClient->IsValid()) + { + CloudCacheSession Session(m_CloudClient); + ZenCacheValue CacheValue; + if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue)) + { + m_Log.warn("process upstream FAILED, '{}/{}' doesn't exist in cache", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash); + return; + } + + if (CacheRecord.Type == ZenContentType::kBinary) + { + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); + } + + if (!Result.Success) + { + m_Log.warn("upload (binary) '{}/{}' FAILED after '{}' attempts", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + MaxAttempts); + } + } + else + { + ZEN_ASSERT(CacheRecord.Type == ZenContentType::kCbObject); + + CloudCacheResult Result; + for (const IoHash& PayloadId : CacheRecord.PayloadIds) + { + Result.Success = false; + if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) + { + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutCompressedBlob(PayloadId, Payload); + } + } + + if (!Result.Success) + { + m_Log.warn("upload payload '{}' FAILED after '{}' attempts", PayloadId, MaxAttempts); + break; + } + } + + if (Result.Success) + { + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); + } + } + + if (!Result.Success) + { + m_Log.warn("upload cache record '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); + } + } + } + } + + void ProcessUpstreamQueue() + { + for (;;) + { + UpstreamCacheRecord CacheRecord; + if (m_UpstreamQueue.WaitAndDequeue(CacheRecord)) + { + try + { + ProcessCacheRecord(std::move(CacheRecord)); + } + catch (std::exception& e) + { + m_Log.warn("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what()); + } + } + + if (!m_IsRunning.load()) + { + break; + } + } + } + + void Shutdown() + { + if (m_IsRunning.load()) + { + m_IsRunning.store(false); + m_UpstreamQueue.CompleteAdding(); + + for (std::thread& Thread : m_UpstreamThreads) + { + Thread.join(); + } + + m_UpstreamThreads.clear(); + } + } + + using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>; + + spdlog::logger& m_Log; + UpstreamCacheOptions m_Options; + ::ZenCacheStore& m_CacheStore; + CidStore& m_CidStore; + UpstreamQueue m_UpstreamQueue; + RefPtr<CloudCacheClient> m_CloudClient; + RefPtr<ZenStructuredCacheClient> m_ZenClient; + std::vector<std::thread> m_UpstreamThreads; + std::atomic_bool m_IsRunning{false}; +}; + +////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<UpstreamCache> +MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) +{ + return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore); +} + +} // namespace zen |