// Copyright Epic Games, Inc. All Rights Reserved. #include "upstreamcache.h" #include "jupiter.h" #include "zen.h" #include #include #include #include #include #include #include #include #include #include "cache/structuredcachestore.h" #include "diag/logging.h" #include #include #include #include #include #include #include namespace zen { using namespace std::literals; namespace detail { template class BlockingQueue { public: BlockingQueue() = default; ~BlockingQueue() { CompleteAdding(); } void Enqueue(T&& Item) { { std::lock_guard Lock(m_Lock); m_Queue.emplace_back(std::move(Item)); } m_NewItemSignal.notify_one(); } bool WaitAndDequeue(T& Item) { if (m_CompleteAdding.load()) { return false; } std::unique_lock Lock(m_Lock); m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); if (!m_Queue.empty()) { Item = std::move(m_Queue.front()); m_Queue.pop_front(); return true; } return false; } void CompleteAdding() { if (!m_CompleteAdding.load()) { m_CompleteAdding.store(true); m_NewItemSignal.notify_all(); } } std::size_t Num() const { std::unique_lock Lock(m_Lock); return m_Queue.size(); } private: mutable std::mutex m_Lock; std::condition_variable m_NewItemSignal; std::deque m_Queue; std::atomic_bool m_CompleteAdding{false}; }; class JupiterUpstreamEndpoint final : public zen::UpstreamEndpoint { public: JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc) { using namespace fmt::literals; m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); m_Client = new CloudCacheClient(Options); } virtual ~JupiterUpstreamEndpoint() = default; virtual bool Initialize() override { zen::CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.Authenticate(); return Result.Success; } virtual std::string_view DisplayName() const override { return m_DisplayName; } virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { try { zen::CloudCacheSession Session(m_Client); CloudCacheResult Result; if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash); } else { const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType); if (Result.Success && Type == ZenContentType::kCbPackage) { CbPackage Package; const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All); if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) { Package.AddAttachment(CbAttachment(Chunk)); } else { Result.Success = false; } }); Package.SetObject(CacheRecord); } if (Result.Success) { MemoryOutStream MemStream; BinaryWriter Writer(MemStream); Package.Save(Writer); Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); Result.Bytes = MemStream.Size(); } } } return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } catch (std::exception& e) { return {.Reason = std::string(e.what()), .Success = false}; } } virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override { try { zen::CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } catch (std::exception& e) { return {.Reason = std::string(e.what()), .Success = false}; } } virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span Payloads) override { ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); const uint32_t MaxAttempts = 3; try { CloudCacheSession Session(m_Client); if (CacheRecord.Type == ZenContentType::kBinary) { CloudCacheResult Result; for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { if (m_UseLegacyDdc) { Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue); } else { Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kBinary); } } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } else { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { CloudCacheResult Result; for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); } TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { return {.Reason = "Failed to upload payload", .Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = false}; } } CloudCacheResult Result; for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject); } TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } } catch (std::exception& e) { return {.Reason = std::string(e.what()), .Success = false}; } } private: bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr m_Client; }; class ZenUpstreamEndpoint final : public zen::UpstreamEndpoint { public: ZenUpstreamEndpoint(std::string_view ServiceUrl) { using namespace fmt::literals; m_DisplayName = "Zen - '{}'"_format(ServiceUrl); m_Client = new ZenStructuredCacheClient(ServiceUrl); } ~ZenUpstreamEndpoint() = default; virtual bool Initialize() override { try { ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; for (int32_t Attempt = 0, MaxAttempts = 3; Attempt < MaxAttempts && !Result.Success; ++Attempt) { Result = Session.SayHello(); } return Result.Success; } catch (std::exception&) { return false; } } virtual std::string_view DisplayName() const override { return m_DisplayName; } virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { try { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } catch (std::exception& e) { return {.Reason = std::string(e.what()), .Success = false}; } } virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override { try { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } catch (std::exception& e) { return {.Reason = std::string(e.what()), .Success = false}; } } virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span Payloads) override { ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); const uint32_t MaxAttempts = 3; try { zen::ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; if (CacheRecord.Type == ZenContentType::kCbPackage) { zen::CbPackage Package; Package.SetObject(CbObject(SharedBuffer(RecordValue))); for (const IoBuffer& Payload : Payloads) { if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) { Package.AddAttachment(CbAttachment(AttachmentBuffer)); } else { return {.Reason = std::string("invalid payload buffer"), .Success = false}; } } MemoryOutStream MemStream; BinaryWriter Writer(MemStream); Package.Save(Writer); IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size()); for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, PackagePayload, CacheRecord.Type); } TotalBytes = Result.Bytes; TotalElapsedSeconds = Result.ElapsedSeconds; } else { for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { Result.Success = false; for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheRecord.PayloadIds[Idx], Payloads[Idx]); } TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { return {.Reason = "Failed to upload payload", .Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = false}; } } Result.Success = false; for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); } TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; } return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } catch (std::exception& e) { return {.Reason = std::string(e.what()), .Success = false}; } } private: std::string m_DisplayName; RefPtr m_Client; }; } // namespace detail ////////////////////////////////////////////////////////////////////////// class UpstreamStats final { static constexpr uint64_t MaxSampleCount = 100ull; struct StatCounters { int64_t Bytes = {}; int64_t Count = {}; double Seconds = {}; }; using StatsMap = std::unordered_map; struct EndpointStats { mutable std::mutex Lock; StatsMap Counters; }; public: UpstreamStats() : m_Log(zen::logging::Get("upstream")) {} void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result) { std::unique_lock Lock(m_DownStats.Lock); auto& Counters = m_DownStats.Counters[&Endpoint]; Counters.Bytes += Result.Bytes; Counters.Seconds += Result.ElapsedSeconds; Counters.Count++; if (Counters.Count >= MaxSampleCount) { LogStats("STATS - (downstream):"sv, m_DownStats.Counters); Counters = StatCounters{}; } } void Add(const UpstreamEndpoint& Endpoint, const PutUpstreamCacheResult& Result) { std::unique_lock Lock(m_UpStats.Lock); auto& Counters = m_UpStats.Counters[&Endpoint]; Counters.Bytes += Result.Bytes; Counters.Seconds += Result.ElapsedSeconds; Counters.Count++; if (Counters.Count >= MaxSampleCount) { LogStats("STATS - (upstream):"sv, m_UpStats.Counters); Counters = StatCounters{}; } } private: void LogStats(std::string_view What, const std::unordered_map& EndpointStats) { for (const auto& Kv : EndpointStats) { const UpstreamEndpoint& Endpoint = *Kv.first; const StatCounters& Counters = Kv.second; const double TotalMb = double(Counters.Bytes) / 1024.0 / 1024.0; ZEN_UNUSED(Endpoint); ZEN_INFO("{} Endpoint: {}, Bytes: {:.2f} MB, Time: {:.2f} s, Speed: {:.2f} MB/s, Avg: {:.2f} ms/request, Samples: {}", What, Kv.first->DisplayName(), TotalMb, Counters.Seconds, TotalMb / Counters.Seconds, (Counters.Seconds * 1000.0) / double(Counters.Count), Counters.Count); } } spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; EndpointStats m_UpStats; EndpointStats m_DownStats; }; ////////////////////////////////////////////////////////////////////////// class DefaultUpstreamCache final : public UpstreamCache { public: DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) : m_Log(zen::logging::Get("upstream")) , m_Options(Options) , m_CacheStore(CacheStore) , m_CidStore(CidStore) { } virtual ~DefaultUpstreamCache() { Shutdown(); } virtual bool Initialize() override { auto NewEnd = std::remove_if(std::begin(m_Endpoints), std::end(m_Endpoints), [this](auto& Endpoint) { const bool Ok = Endpoint->Initialize(); ZEN_INFO("{} [{}]", Endpoint->DisplayName(), Ok ? "OK" : "FAILED"); return !Ok; }); m_Endpoints.erase(NewEnd, std::end(m_Endpoints)); m_IsRunning = !m_Endpoints.empty(); if (m_IsRunning) { for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); } } return m_IsRunning; } virtual void AddEndpoint(std::unique_ptr Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) { m_Stats.Add(*Endpoint, Result); return Result; } } } return {}; } virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override { if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) { m_Stats.Add(*Endpoint, Result); return Result; } } } return {}; } virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { if (m_IsRunning.load() && m_Options.WriteUpstream) { if (!m_UpstreamThreads.empty()) { m_UpstreamQueue.Enqueue(std::move(CacheRecord)); } else { ProcessCacheRecord(std::move(CacheRecord)); } return {.Success = true}; } return {}; } private: void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) { ZenCacheValue CacheValue; std::vector Payloads; if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); return; } for (const IoHash& PayloadId : CacheRecord.PayloadIds) { if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) { Payloads.push_back(Payload); } else { ZEN_WARN("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, PayloadId); return; } } for (auto& Endpoint : m_Endpoints) { const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); if (Result.Success) { m_Stats.Add(*Endpoint, Result); } else { ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, Endpoint->DisplayName(), Result.Reason); } } } void ProcessUpstreamQueue() { for (;;) { UpstreamCacheRecord CacheRecord; if (m_UpstreamQueue.WaitAndDequeue(CacheRecord)) { try { ProcessCacheRecord(std::move(CacheRecord)); } catch (std::exception& e) { ZEN_WARN("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what()); } } if (!m_IsRunning.load()) { break; } } } void Shutdown() { if (m_IsRunning.load()) { m_IsRunning.store(false); m_UpstreamQueue.CompleteAdding(); for (std::thread& Thread : m_UpstreamThreads) { Thread.join(); } m_UpstreamThreads.clear(); m_Endpoints.clear(); } } using UpstreamQueue = detail::BlockingQueue; spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; UpstreamCacheOptions m_Options; ::ZenCacheStore& m_CacheStore; CidStore& m_CidStore; UpstreamQueue m_UpstreamQueue; UpstreamStats m_Stats; std::vector> m_Endpoints; std::vector m_UpstreamThreads; std::atomic_bool m_IsRunning{false}; }; ////////////////////////////////////////////////////////////////////////// std::unique_ptr MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) { return std::make_unique(Options, CacheStore, CidStore); } std::unique_ptr MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) { return std::make_unique(Options); } std::unique_ptr MakeZenUpstreamEndpoint(std::string_view Url) { return std::make_unique(Url); } } // namespace zen