// Copyright Epic Games, Inc. All Rights Reserved. #include "upstreamcache.h" #include "jupiter.h" #include "zen.h" #include #include #include #include #include "cache/structuredcachestore.h" #include "diag/logging.h" #include #include #include namespace zen { using namespace std::literals; namespace detail { template class BlockingQueue { public: BlockingQueue() = default; ~BlockingQueue() { CompleteAdding(); } void Enqueue(T&& Item) { { std::lock_guard Lock(m_Lock); m_Queue.emplace_back(std::move(Item)); } m_NewItemSignal.notify_one(); } bool WaitAndDequeue(T& Item) { if (m_CompleteAdding.load()) { return false; } std::unique_lock Lock(m_Lock); m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); if (!m_Queue.empty()) { Item = std::move(m_Queue.front()); m_Queue.pop_front(); return true; } return false; } void CompleteAdding() { if (!m_CompleteAdding.load()) { m_CompleteAdding.store(true); m_NewItemSignal.notify_all(); } } std::size_t Num() const { std::unique_lock Lock(m_Lock); return m_Queue.size(); } private: mutable std::mutex m_Lock; std::condition_variable m_NewItemSignal; std::deque m_Queue; std::atomic_bool m_CompleteAdding{false}; }; } // namespace detail ////////////////////////////////////////////////////////////////////////// class DefaultUpstreamCache final : public UpstreamCache { public: DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) : m_Log(zen::logging::Get("upstream")) , m_Options(Options) , m_CacheStore(CacheStore) , m_CidStore(CidStore) { if (m_Options.JupiterEnabled) { m_CloudClient = new CloudCacheClient(m_Options.JupiterEndpoint, m_Options.JupiterDdcNamespace, m_Options.JupiterBlobStoreNamespace, m_Options.JupiterOAuthProvider, m_Options.JupiterOAuthClientId, m_Options.JupiterOAuthSecret); std::string TmpAuthHeader; if (m_CloudClient->AcquireAccessToken(TmpAuthHeader) && m_CloudClient->IsValid()) { m_Log.info("using Jupiter endpoint: '{}', DDC namespace: '{}', Blob Store namespace: '{}'", m_Options.JupiterEndpoint, m_Options.JupiterDdcNamespace, m_Options.JupiterBlobStoreNamespace); } else { m_Log.warn("failed to initialized Jupiter at '{}'", m_Options.JupiterEndpoint); } } m_IsRunning = m_CloudClient && m_CloudClient->IsValid(); if (m_IsRunning) { m_Log.info("using '{}' upstream thread(s)", m_Options.ThreadCount); for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); } } else { m_Log.warn("upstream disabled, no valid endpoints"); } } virtual ~DefaultUpstreamCache() { Shutdown(); } virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { if (m_CloudClient && m_CloudClient->IsValid()) { zen::Stopwatch Timer; try { CloudCacheSession Session(m_CloudClient); CloudCacheResult Result; if (Type == ZenContentType::kBinary) { Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash); } else { Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash); } return {.Value = Result.Value, .Success = Result.Success}; } catch (std::exception& e) { m_Log.warn("get cache record ({}/{}) FAILED after {:5}: '{}'", CacheKey.Bucket, CacheKey.Hash, zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), e.what()); } } return {}; } virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) override { if (m_CloudClient && m_CloudClient->IsValid()) { zen::Stopwatch Timer; try { CloudCacheSession Session(m_CloudClient); CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); return {.Payload = Result.Value, .Success = Result.Success}; } catch (std::exception& e) { m_Log.warn("get cache payload ({}/{}/{}) FAILED after {:5}: '{}'", PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId, zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), e.what()); } } return {}; } virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { if (m_IsRunning.load()) { m_UpstreamQueue.Enqueue(std::move(CacheRecord)); return {.Success = true}; } return {}; } private: void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) { const uint32_t MaxAttempts = 3; if (m_CloudClient && m_CloudClient->IsValid()) { CloudCacheSession Session(m_CloudClient); ZenCacheValue CacheValue; if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue)) { m_Log.warn("process upstream FAILED, '{}/{}' doesn't exist in cache", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); return; } if (CacheRecord.Type == ZenContentType::kBinary) { CloudCacheResult Result; for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); } if (!Result.Success) { m_Log.warn("upload (binary) '{}/{}' FAILED after '{}' attempts", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, MaxAttempts); } } else { ZEN_ASSERT(CacheRecord.Type == ZenContentType::kCbObject); CloudCacheResult Result; for (const IoHash& PayloadId : CacheRecord.PayloadIds) { Result.Success = false; if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) { for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutCompressedBlob(PayloadId, Payload); } } if (!Result.Success) { m_Log.warn("upload payload '{}' FAILED after '{}' attempts", PayloadId, MaxAttempts); break; } } if (Result.Success) { Result.Success = false; for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); } } if (!Result.Success) { m_Log.warn("upload cache record '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); } } } } void ProcessUpstreamQueue() { for (;;) { UpstreamCacheRecord CacheRecord; if (m_UpstreamQueue.WaitAndDequeue(CacheRecord)) { try { ProcessCacheRecord(std::move(CacheRecord)); } catch (std::exception& e) { m_Log.warn("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what()); } } if (!m_IsRunning.load()) { break; } } } void Shutdown() { if (m_IsRunning.load()) { m_IsRunning.store(false); m_UpstreamQueue.CompleteAdding(); for (std::thread& Thread : m_UpstreamThreads) { Thread.join(); } m_UpstreamThreads.clear(); } } using UpstreamQueue = detail::BlockingQueue; spdlog::logger& m_Log; UpstreamCacheOptions m_Options; ::ZenCacheStore& m_CacheStore; CidStore& m_CidStore; UpstreamQueue m_UpstreamQueue; RefPtr m_CloudClient; RefPtr m_ZenClient; std::vector m_UpstreamThreads; std::atomic_bool m_IsRunning{false}; }; ////////////////////////////////////////////////////////////////////////// std::unique_ptr MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) { return std::make_unique(Options, CacheStore, CidStore); } } // namespace zen