// Copyright Epic Games, Inc. All Rights Reserved. #include "upstreamcache.h" #include "jupiter.h" #include "zen.h" #include #include #include #include #include #include #include #include #include #include #include #include #include "cache/structuredcachestore.h" #include "diag/logging.h" #include #include #include #include #include namespace zen { using namespace std::literals; namespace detail { class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_Log(zen::logging::Get("upstream")) , m_UseLegacyDdc(Options.UseLegacyDdc) { m_Info.Name = "Horde"sv; m_Info.Url = Options.ServiceUrl; m_Client = new CloudCacheClient(Options); } virtual ~JupiterUpstreamEndpoint() = default; virtual const UpstreamEndpointInfo& GetEndpointInfo() const override { return m_Info; } virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } virtual bool IsHealthy() const override { return m_HealthOk.load(); } virtual UpstreamEndpointHealth CheckHealth() override { try { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.Authenticate(); m_HealthOk = Result.Success && Result.ErrorCode == 0; return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; } catch (std::exception& Err) { return {.Reason = Err.what(), .Ok = false}; } } virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheRecord"); try { CloudCacheSession Session(m_Client); CloudCacheResult Result; if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash); } else { const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType); if (Result.Success && Type == ZenContentType::kCbPackage) { CbPackage Package; const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); Result.Bytes += AttachmentResult.Bytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) { Package.AddAttachment(CbAttachment(Chunk)); } else { Result.Success = false; } }); Package.SetObject(CacheRecord); } if (Result.Success) { BinaryWriter MemStream; Package.Save(MemStream); Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); } } } if (Result.ErrorCode == 0) { return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } else { m_HealthOk = false; return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } } catch (std::exception& Err) { m_HealthOk = false; return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } virtual GetUpstreamCacheResult GetCacheRecords(std::span CacheKeys, std::span KeyIndex, const CacheRecordPolicy& Policy, OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Horde::GetCacheRecords"); ZEN_UNUSED(Policy); CloudCacheSession Session(m_Client); GetUpstreamCacheResult Result; for (size_t Index : KeyIndex) { const CacheKey& CacheKey = CacheKeys[Index]; CbPackage Package; CbObject Record; if (!Result.Error) { CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); AppendResult(RefResult, Result); if (RefResult.ErrorCode == 0) { const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All); if (ValidationResult == CbValidateError::None) { Record = LoadCompactBinaryObject(RefResult.Response); Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) { CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); AppendResult(BlobResult, Result); if (BlobResult.ErrorCode == 0) { if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response))) { Package.AddAttachment(CbAttachment(Chunk)); } } else { m_HealthOk = false; } }); } } else { m_HealthOk = false; } } OnComplete({.Key = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); } return Result; } virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCachePayload"); try { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId); if (Result.ErrorCode == 0) { return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } else { m_HealthOk = false; return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } } catch (std::exception& Err) { m_HealthOk = false; return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } virtual GetUpstreamCacheResult GetCachePayloads(std::span CacheChunkRequests, std::span RequestIndex, OnCachePayloadGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Horde::GetCachePayloads"); CloudCacheSession Session(m_Client); GetUpstreamCacheResult Result; for (size_t Index : RequestIndex) { const CacheChunkRequest& Request = CacheChunkRequests[Index]; IoBuffer Payload; if (!Result.Error) { const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId); Payload = BlobResult.Response; AppendResult(BlobResult, Result); m_HealthOk = BlobResult.ErrorCode == 0; } OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload}); } return Result; } virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span Payloads) override { ZEN_TRACE_CPU("Upstream::Horde::PutCacheRecord"); ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); const int32_t MaxAttempts = 3; try { CloudCacheSession Session(m_Client); if (CacheRecord.Type == ZenContentType::kBinary) { CloudCacheResult Result; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { if (m_UseLegacyDdc) { Result = Session.PutDerivedData(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue); } else { Result = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kBinary); } } m_HealthOk = Result.ErrorCode == 0; return {.Reason = std::move(Result.Reason), .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } else { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; const auto PutBlobs = [&](std::span PayloadIds, std::string& OutReason) -> bool { for (const IoHash& PayloadId : PayloadIds) { const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId); if (It == std::end(CacheRecord.PayloadIds)) { OutReason = fmt::format("payload '{}' MISSING from local cache", PayloadId); return false; } const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It); CloudCacheResult BlobResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); } m_HealthOk = BlobResult.ErrorCode == 0; if (!BlobResult.Success) { OutReason = fmt::format("upload payload '{}' FAILED, reason '{}'", PayloadId, BlobResult.Reason); return false; } TotalBytes += BlobResult.Bytes; TotalElapsedSeconds += BlobResult.ElapsedSeconds; } return true; }; PutRefResult RefResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) { RefResult = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kCbObject); } m_HealthOk = RefResult.ErrorCode == 0; if (!RefResult.Success) { return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefResult.Reason), .Success = false}; } TotalBytes += RefResult.Bytes; TotalElapsedSeconds += RefResult.ElapsedSeconds; std::string Reason; if (!PutBlobs(RefResult.Needs, Reason)) { return {.Reason = std::move(Reason), .Success = false}; } const IoHash RefHash = IoHash::HashBuffer(RecordValue); FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash); m_HealthOk = FinalizeResult.ErrorCode == 0; if (!FinalizeResult.Success) { return {.Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, FinalizeResult.Reason), .Success = false}; } if (!FinalizeResult.Needs.empty()) { if (!PutBlobs(FinalizeResult.Needs, Reason)) { return {.Reason = std::move(Reason), .Success = false}; } FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash); m_HealthOk = FinalizeResult.ErrorCode == 0; if (!FinalizeResult.Success) { return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, FinalizeResult.Reason), .Success = false}; } if (!FinalizeResult.Needs.empty()) { ExtendableStringBuilder<256> Sb; for (const IoHash& MissingHash : FinalizeResult.Needs) { Sb << MissingHash.ToHexString() << ","; } return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs payload(s) '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Sb.ToString()), .Success = false}; } } TotalBytes += FinalizeResult.Bytes; TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; } } catch (std::exception& Err) { m_HealthOk = false; return {.Reason = std::string(Err.what()), .Success = false}; } } virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) { Out.Success &= Result.Success; Out.Bytes += Result.Bytes; Out.ElapsedSeconds += Result.ElapsedSeconds; if (Result.ErrorCode) { Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; } }; spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; UpstreamEndpointInfo m_Info; bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr m_Client; UpstreamEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; }; class ZenUpstreamEndpoint final : public UpstreamEndpoint { struct ZenEndpoint { std::string Url; std::string Reason; double Latency{}; bool Ok = false; bool operator<(const ZenEndpoint& RHS) const { return Ok && RHS.Ok ? Latency < RHS.Latency : Ok; } }; public: ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) : m_Log(zen::logging::Get("upstream")) , m_Info({.Name = std::string("Zen")}) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) { for (const auto& Url : Options.Urls) { m_Endpoints.push_back({.Url = Url}); } } ~ZenUpstreamEndpoint() = default; virtual const UpstreamEndpointInfo& GetEndpointInfo() const override { return m_Info; } virtual UpstreamEndpointHealth Initialize() override { const ZenEndpoint& Ep = GetEndpoint(); if (Ep.Ok) { m_Info.Url = Ep.Url; m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_HealthOk = true; return {.Ok = true}; } m_HealthOk = false; return {.Reason = Ep.Reason}; } virtual bool IsHealthy() const override { return m_HealthOk; } virtual UpstreamEndpointHealth CheckHealth() override { try { if (m_Client.IsNull()) { const ZenEndpoint& Ep = GetEndpoint(); if (Ep.Ok) { m_Info.Url = Ep.Url; m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_HealthOk = true; return {.Ok = true}; } return {.Reason = Ep.Reason}; } ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; for (int32_t Attempt = 0, MaxAttempts = 2; Attempt < MaxAttempts && !Result.Success; ++Attempt) { Result = Session.CheckHealth(); } m_HealthOk = Result.ErrorCode == 0; return {.Reason = std::move(Result.Reason), .Ok = m_HealthOk}; } catch (std::exception& Err) { return {.Reason = Err.what(), .Ok = false}; } } virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord"); try { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); if (Result.ErrorCode == 0) { return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } else { m_HealthOk = false; return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } } catch (std::exception& Err) { m_HealthOk = false; return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } virtual GetUpstreamCacheResult GetCacheRecords(std::span CacheKeys, std::span KeyIndex, const CacheRecordPolicy& Policy, OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Zen::GetCacheRecords"); std::vector IndexMap; IndexMap.reserve(KeyIndex.size()); CbObjectWriter BatchRequest; BatchRequest << "Method"sv << "GetCacheRecords"; BatchRequest.BeginObject("Params"sv); { BatchRequest.BeginArray("CacheKeys"sv); for (size_t Index : KeyIndex) { const CacheKey& Key = CacheKeys[Index]; IndexMap.push_back(Index); BatchRequest.BeginObject(); BatchRequest << "Bucket"sv << Key.Bucket; BatchRequest << "Hash"sv << Key.Hash; BatchRequest.EndObject(); } BatchRequest.EndArray(); BatchRequest.BeginObject("Policy"sv); CacheRecordPolicy::Save(Policy, BatchRequest); BatchRequest.EndObject(); } BatchRequest.EndObject(); CbPackage BatchResponse; ZenCacheResult Result; { ZenStructuredCacheSession Session(*m_Client); Result = Session.InvokeRpc(BatchRequest.Save()); } if (Result.Success) { if (BatchResponse.TryLoad(Result.Response)) { for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv]) { const size_t Index = IndexMap[LocalIndex++]; OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } else if (Result.ErrorCode) { m_HealthOk = false; } for (size_t Index : KeyIndex) { OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCachePayload"); try { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId); if (Result.ErrorCode == 0) { return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } else { m_HealthOk = false; return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } } catch (std::exception& Err) { m_HealthOk = false; return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } virtual GetUpstreamCacheResult GetCachePayloads(std::span CacheChunkRequests, std::span RequestIndex, OnCachePayloadGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Zen::GetCachePayloads"); std::vector IndexMap; IndexMap.reserve(RequestIndex.size()); CbObjectWriter BatchRequest; BatchRequest << "Method"sv << "GetCachePayloads"; BatchRequest.BeginObject("Params"sv); { BatchRequest.BeginArray("ChunkRequests"sv); { for (size_t Index : RequestIndex) { const CacheChunkRequest& Request = CacheChunkRequests[Index]; IndexMap.push_back(Index); BatchRequest.BeginObject(); { BatchRequest.BeginObject("Key"sv); BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); BatchRequest.AddObjectId("PayloadId"sv, Request.PayloadId); BatchRequest << "ChunkId"sv << Request.ChunkId; BatchRequest << "RawOffset"sv << Request.RawOffset; BatchRequest << "RawSize"sv << Request.RawSize; BatchRequest << "Policy"sv << static_cast(Request.Policy); } BatchRequest.EndObject(); } } BatchRequest.EndArray(); } BatchRequest.EndObject(); CbPackage BatchResponse; ZenCacheResult Result; { ZenStructuredCacheSession Session(*m_Client); Result = Session.InvokeRpc(BatchRequest.Save()); } if (Result.Success) { if (BatchResponse.TryLoad(Result.Response)) { for (size_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv]) { const size_t Index = IndexMap[LocalIndex++]; IoBuffer Payload; if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash())) { if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) { Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); } } OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } else if (Result.ErrorCode) { m_HealthOk = false; } for (size_t Index : RequestIndex) { OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span Payloads) override { ZEN_TRACE_CPU("Upstream::Zen::PutCacheRecord"); ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); const int32_t MaxAttempts = 3; try { ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; if (CacheRecord.Type == ZenContentType::kCbPackage) { CbPackage Package; Package.SetObject(CbObject(SharedBuffer(RecordValue))); for (const IoBuffer& Payload : Payloads) { if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) { Package.AddAttachment(CbAttachment(AttachmentBuffer)); } else { return {.Reason = std::string("invalid payload buffer"), .Success = false}; } } BinaryWriter MemStream; Package.Save(MemStream); IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size()); for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, PackagePayload, CacheRecord.Type); m_HealthOk = Result.ErrorCode == 0; } TotalBytes = Result.Bytes; TotalElapsedSeconds = Result.ElapsedSeconds; } else { for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutCachePayload(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheRecord.PayloadIds[Idx], Payloads[Idx]); m_HealthOk = Result.ErrorCode == 0; } TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { return {.Reason = "Failed to upload payload", .Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = false}; } } Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, CacheRecord.Type); m_HealthOk = Result.ErrorCode == 0; } TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; } return {.Reason = std::move(Result.Reason), .Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } catch (std::exception& Err) { m_HealthOk = false; return {.Reason = std::string(Err.what()), .Success = false}; } } virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: const ZenEndpoint& GetEndpoint() { for (ZenEndpoint& Ep : m_Endpoints) { ZenStructuredCacheClient Client({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)}); ZenStructuredCacheSession Session(Client); const int32_t SampleCount = 2; Ep.Ok = false; Ep.Latency = {}; for (int32_t Sample = 0; Sample < SampleCount; ++Sample) { ZenCacheResult Result = Session.CheckHealth(); Ep.Ok = Result.Success; Ep.Reason = std::move(Result.Reason); Ep.Latency += Result.ElapsedSeconds; } Ep.Latency /= double(SampleCount); } std::sort(std::begin(m_Endpoints), std::end(m_Endpoints)); for (const auto& Ep : m_Endpoints) { ZEN_INFO("ping 'Zen' endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason); } return m_Endpoints.front(); } spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; UpstreamEndpointInfo m_Info; std::vector m_Endpoints; std::chrono::milliseconds m_ConnectTimeout; std::chrono::milliseconds m_Timeout; RefPtr m_Client; UpstreamEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; }; } // namespace detail ////////////////////////////////////////////////////////////////////////// struct UpstreamStats { static constexpr uint64_t MaxSampleCount = 1000ull; UpstreamStats(bool Enabled) : m_Enabled(Enabled) {} void Add(spdlog::logger& Logger, UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result, const std::vector>& Endpoints) { UpstreamEndpointStats& Stats = Endpoint.Stats(); if (Result.Error) { Stats.ErrorCount++; } else if (Result.Success) { Stats.HitCount++; Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); Stats.SecondsDown.fetch_add(Result.ElapsedSeconds); } else { Stats.MissCount++; } if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } } void Add(spdlog::logger& Logger, UpstreamEndpoint& Endpoint, const PutUpstreamCacheResult& Result, const std::vector>& Endpoints) { UpstreamEndpointStats& Stats = Endpoint.Stats(); if (Result.Success) { Stats.UpCount++; Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); } else { Stats.ErrorCount++; } if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } } void Dump(spdlog::logger& Logger, const std::vector>& Endpoints) { for (auto& Ep : Endpoints) { // These stats will not be totally correct as the numbers are not captured atomically UpstreamEndpointStats& Stats = Ep->Stats(); const uint64_t HitCount = Stats.HitCount; const uint64_t MissCount = Stats.MissCount; const double DownBytes = Stats.DownBytes; const double SecondsDown = Stats.SecondsDown; const double UpBytes = Stats.UpBytes; const double SecondsUp = Stats.SecondsUp; const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0; const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0; const uint64_t TotalCount = HitCount + MissCount; const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0; Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", Ep->GetEndpointInfo().Name, HitRate, DownBytes, DownSpeed, UpBytes, UpSpeed); } } bool m_Enabled; std::atomic_uint64_t m_SampleCount = {}; }; ////////////////////////////////////////////////////////////////////////// class DefaultUpstreamCache final : public UpstreamCache { public: DefaultUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) : m_Log(logging::Get("upstream")) , m_Options(Options) , m_CacheStore(CacheStore) , m_CidStore(CidStore) , m_Stats(Options.StatsEnabled) { } virtual ~DefaultUpstreamCache() { Shutdown(); } virtual bool Initialize() override { for (auto& Endpoint : m_Endpoints) { const UpstreamEndpointHealth Health = Endpoint->Initialize(); const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); if (Health.Ok) { ZEN_INFO("'{}' endpoint '{}' OK", Info.Name, Info.Url); } else { ZEN_WARN("'{}' endpoint '{}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason); } } m_RunState.IsRunning = !m_Endpoints.empty(); if (m_RunState.IsRunning) { for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); } m_EndpointMonitorThread = std::thread(&DefaultUpstreamCache::MonitorEndpoints, this); } return m_RunState.IsRunning; } virtual void RegisterEndpoint(std::unique_ptr Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::GetCacheRecord"); if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { if (Endpoint->IsHealthy()) { const GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); if (Result.Success) { return Result; } if (Result.Error) { ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); } } } } return {}; } virtual void GetCacheRecords(std::span CacheKeys, std::span KeyIndex, const CacheRecordPolicy& Policy, OnCacheRecordGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); std::vector MissingKeys(KeyIndex.begin(), KeyIndex.end()); if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { if (Endpoint->IsHealthy() && !MissingKeys.empty()) { std::vector Missing; auto Result = Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { if (Params.Record) { OnComplete(std::forward(Params)); } else { Missing.push_back(Params.KeyIndex); } }); if (Result.Error) { ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); } m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); MissingKeys = std::move(Missing); } } } for (size_t Index : MissingKeys) { OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); } } virtual void GetCachePayloads(std::span CacheChunkRequests, std::span RequestIndex, OnCachePayloadGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCachePayloads"); std::vector MissingPayloads(RequestIndex.begin(), RequestIndex.end()); if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { if (Endpoint->IsHealthy() && !MissingPayloads.empty()) { std::vector Missing; auto Result = Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) { if (Params.Payload) { OnComplete(std::forward(Params)); } else { Missing.push_back(Params.RequestIndex); } }); if (Result.Error) { ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); } m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); MissingPayloads = std::move(Missing); } } } for (size_t Index : MissingPayloads) { OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); } } virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override { ZEN_TRACE_CPU("Upstream::GetCachePayload"); if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { if (Endpoint->IsHealthy()) { const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(CacheKey, PayloadId); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); if (Result.Success) { return Result; } if (Result.Error) { ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); } } } } return {}; } virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { if (m_RunState.IsRunning && m_Options.WriteUpstream) { if (!m_UpstreamThreads.empty()) { m_UpstreamQueue.Enqueue(std::move(CacheRecord)); } else { ProcessCacheRecord(std::move(CacheRecord)); } return {.Success = true}; } return {}; } virtual void GetStatus(CbObjectWriter& Status) override { Status << "reading" << m_Options.ReadUpstream; Status << "writing" << m_Options.WriteUpstream; Status << "worker_threads" << m_Options.ThreadCount; Status << "queue_count" << m_UpstreamQueue.Size(); Status.BeginArray("endpoints"); for (const auto& Ep : m_Endpoints) { const UpstreamEndpointInfo& Info = Ep->GetEndpointInfo(); Status.BeginObject(); Status << "name" << Info.Name; Status << "url" << Info.Url; Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); UpstreamEndpointStats& Stats = Ep->Stats(); const uint64_t HitCount = Stats.HitCount; const uint64_t MissCount = Stats.MissCount; const uint64_t TotalCount = HitCount + MissCount; const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0; Status << "hit_ratio" << HitRate; Status << "downloaded_mb" << Stats.DownBytes; Status << "uploaded_mb" << Stats.UpBytes; Status << "error_count" << Stats.ErrorCount; Status.EndObject(); } Status.EndArray(); } virtual void WaitForIdle() override { UpstreamCacheRecord CacheRecord; while (m_RunState.IsRunning && m_UpstreamQueue.TryDequeue(CacheRecord)) { try { ProcessCacheRecord(std::move(CacheRecord)); } catch (std::exception& Err) { ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Err.what()); } } } private: void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) { ZEN_TRACE_CPU("Upstream::ProcessCacheRecord"); ZenCacheValue CacheValue; std::vector Payloads; if (!m_CacheStore.Get(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); return; } for (const IoHash& PayloadId : CacheRecord.PayloadIds) { if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) { Payloads.push_back(Payload); } else { ZEN_WARN("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, PayloadId); return; } } for (auto& Endpoint : m_Endpoints) { if (Endpoint->IsHealthy()) { const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); if (!Result.Success) { ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Endpoint->GetEndpointInfo().Url, Result.Reason); } } } } void ProcessUpstreamQueue() { for (;;) { UpstreamCacheRecord CacheRecord; if (m_UpstreamQueue.WaitAndDequeue(CacheRecord)) { try { ProcessCacheRecord(std::move(CacheRecord)); } catch (std::exception& Err) { ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Err.what()); } } if (!m_RunState.IsRunning) { break; } } } void MonitorEndpoints() { for (;;) { { std::unique_lock lk(m_RunState.Mutex); if (m_RunState.ExitSignal.wait_for(lk, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) { break; } } try { for (auto& Endpoint : m_Endpoints) { if (!Endpoint->IsHealthy()) { const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) { ZEN_INFO("health check endpoint '{} - {}' OK", Info.Name, Info.Url, Health.Reason); } else { ZEN_WARN("health check endpoint '{} - {}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason); } } } } catch (std::exception& Err) { ZEN_ERROR("check endpoint(s) health FAILED, reason '{}'", Err.what()); } } } void Shutdown() { if (m_RunState.Stop()) { m_UpstreamQueue.CompleteAdding(); for (std::thread& Thread : m_UpstreamThreads) { Thread.join(); } m_EndpointMonitorThread.join(); m_UpstreamThreads.clear(); m_Endpoints.clear(); } } spdlog::logger& Log() { return m_Log; } using UpstreamQueue = BlockingQueue; struct RunState { std::mutex Mutex; std::condition_variable ExitSignal; std::atomic_bool IsRunning{false}; bool Stop() { bool Stopped = false; { std::lock_guard _(Mutex); Stopped = IsRunning.exchange(false); } if (Stopped) { ExitSignal.notify_all(); } return Stopped; } }; spdlog::logger& m_Log; UpstreamCacheOptions m_Options; ZenCacheStore& m_CacheStore; CidStore& m_CidStore; UpstreamQueue m_UpstreamQueue; UpstreamStats m_Stats; std::vector> m_Endpoints; std::vector m_UpstreamThreads; std::thread m_EndpointMonitorThread; RunState m_RunState; }; ////////////////////////////////////////////////////////////////////////// std::unique_ptr MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) { return std::make_unique(Options, CacheStore, CidStore); } std::unique_ptr MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) { return std::make_unique(Options); } std::unique_ptr MakeZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) { return std::make_unique(Options); } } // namespace zen