// Copyright Epic Games, Inc. All Rights Reserved. #include "upstreamcache.h" #include "zen.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "cache/httpstructuredcache.h" #include "diag/logging.h" #include #include #include #include #include namespace zen { using namespace std::literals; namespace detail { class UpstreamStatus { public: UpstreamEndpointState EndpointState() const { return static_cast(m_State.load(std::memory_order_relaxed)); } UpstreamEndpointStatus EndpointStatus() const { const UpstreamEndpointState State = EndpointState(); { std::unique_lock _(m_Mutex); return {.Reason = m_ErrorText, .State = State}; } } void Set(UpstreamEndpointState NewState) { m_State.store(static_cast(NewState), std::memory_order_relaxed); { std::unique_lock _(m_Mutex); m_ErrorText.clear(); } } void Set(UpstreamEndpointState NewState, std::string ErrorText) { m_State.store(static_cast(NewState), std::memory_order_relaxed); { std::unique_lock _(m_Mutex); m_ErrorText = std::move(ErrorText); } } void SetFromErrorCode(int32_t ErrorCode, std::string_view ErrorText) { if (ErrorCode != 0) { Set(ErrorCode == 401 ? UpstreamEndpointState::kUnauthorized : UpstreamEndpointState::kError, std::string(ErrorText)); } } private: mutable std::mutex m_Mutex; std::string m_ErrorText; std::atomic_uint32_t m_State; }; class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: JupiterUpstreamEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr) : m_AuthMgr(Mgr) , m_Log(zen::logging::Get("upstream")) { ZEN_ASSERT(!Options.Name.empty()); m_Info.Name = Options.Name; m_Info.Url = Options.ServiceUrl; std::function TokenProvider; if (AuthConfig.OAuthUrl.empty() == false) { TokenProvider = httpclientauth::CreateFromOAuthClientCredentials( {.Url = AuthConfig.OAuthUrl, .ClientId = AuthConfig.OAuthClientId, .ClientSecret = AuthConfig.OAuthClientSecret}); } else if (!AuthConfig.OpenIdProvider.empty()) { TokenProvider = httpclientauth::CreateFromOpenIdProvider(m_AuthMgr, AuthConfig.OpenIdProvider); } else if (!AuthConfig.AccessToken.empty()) { TokenProvider = httpclientauth::CreateFromStaticToken(AuthConfig.AccessToken); } else { TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(m_AuthMgr); } m_Client = new JupiterClient(Options, std::move(TokenProvider)); } virtual ~JupiterUpstreamEndpoint() {} virtual const UpstreamEndpointInfo& GetEndpointInfo() const override { return m_Info; } virtual UpstreamEndpointStatus Initialize() override { ZEN_TRACE_CPU("Upstream::Jupiter::Initialize"); try { if (m_Status.EndpointState() == UpstreamEndpointState::kOk) { return {.State = UpstreamEndpointState::kOk}; } JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); const JupiterResult Result = Session.Authenticate(); if (Result.Success) { m_Status.Set(UpstreamEndpointState::kOk); } else if (Result.ErrorCode != 0) { m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); } else { m_Status.Set(UpstreamEndpointState::kUnauthorized); } return m_Status.EndpointStatus(); } catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); return {.Reason = Err.what(), .State = GetState()}; } } std::string_view GetActualBlobStoreNamespace(std::string_view Namespace) { if (Namespace == ZenCacheStore::DefaultNamespace) { return m_Client->DefaultBlobStoreNamespace(); } return Namespace; } virtual UpstreamEndpointState GetState() override { return m_Status.EndpointState(); } virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Jupiter::GetSingleCacheRecord"); try { JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); JupiterResult Result; std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); if (Type == ZenContentType::kCompressedBinary) { Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); if (Result.Success) { const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); IoBuffer ContentBuffer; int NumAttachments = 0; CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); Result.ReceivedBytes += AttachmentResult.ReceivedBytes; Result.SentBytes += AttachmentResult.SentBytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; IoHash RawHash; uint64_t RawSize; if (CompressedBuffer::ValidateCompressedHeader(AttachmentResult.Response, RawHash, RawSize)) { Result.Response = AttachmentResult.Response; ++NumAttachments; } else { Result.Success = false; } }); if (NumAttachments != 1) { Result.Success = false; } } } } else { const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, AcceptType); if (Result.Success && Type == ZenContentType::kCbPackage) { CbPackage Package; const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); Result.ReceivedBytes += AttachmentResult.ReceivedBytes; Result.SentBytes += AttachmentResult.SentBytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; IoHash RawHash; uint64_t RawSize; if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response), RawHash, RawSize)) { Package.AddAttachment(CbAttachment(Chunk, AttachmentHash.AsHash())); } else { Result.Success = false; } }); Package.SetObject(CacheRecord); } if (Result.Success) { BinaryWriter MemStream; Package.Save(MemStream); Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); } } } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); if (Result.ErrorCode == 0) { return {.Status = {.Bytes = gsl::narrow(Result.ReceivedBytes), .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, .Value = Result.Response, .Source = &m_Info}; } else { return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; } } virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, std::span Requests, OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheRecords"); JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); GetUpstreamCacheResult Result; for (CacheKeyRequest* Request : Requests) { const CacheKey& CacheKey = Request->Key; CbPackage Package; CbObject Record; double ElapsedSeconds = 0.0; if (!Result.Error) { std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); JupiterResult RefResult = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); AppendResult(RefResult, Result); ElapsedSeconds = RefResult.ElapsedSeconds; m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); if (RefResult.ErrorCode == 0) { const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All); if (ValidationResult == CbValidateError::None) { Record = LoadCompactBinaryObject(RefResult.Response); Record.IterateAttachments([&](CbFieldView AttachmentHash) { JupiterResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); AppendResult(BlobResult, Result); m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); if (BlobResult.ErrorCode == 0) { IoHash RawHash; uint64_t RawSize; if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response), RawHash, RawSize)) { if (RawHash == AttachmentHash.AsHash()) { Package.AddAttachment(CbAttachment(Chunk, RawHash)); } } } }); } } } OnComplete( {.Request = *Request, .Record = Record, .Package = Package, .ElapsedSeconds = ElapsedSeconds, .Source = &m_Info}); } return Result; } virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, const CacheKey&, const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Jupiter::GetSingleCacheChunk"); try { JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); const JupiterResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); if (Result.ErrorCode == 0) { return {.Status = {.Bytes = gsl::narrow(Result.ReceivedBytes), .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, .Value = Result.Response, .Source = &m_Info}; } else { return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; } } virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, std::span CacheChunkRequests, OnCacheChunksGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheChunks"); JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); GetUpstreamCacheResult Result; for (CacheChunkRequest* RequestPtr : CacheChunkRequests) { CacheChunkRequest& Request = *RequestPtr; IoBuffer Payload; IoHash RawHash = IoHash::Zero; uint64_t RawSize = 0; double ElapsedSeconds = 0.0; bool IsCompressed = false; if (!Result.Error) { std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); const JupiterResult BlobResult = Request.ChunkId == IoHash::Zero ? Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, Request.ChunkId) : Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId); ElapsedSeconds = BlobResult.ElapsedSeconds; Payload = BlobResult.Response; AppendResult(BlobResult, Result); m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); if (Payload && IsCompressedBinary(Payload.GetContentType())) { IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize); } } if (IsCompressed) { OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = Payload, .ElapsedSeconds = ElapsedSeconds, .Source = &m_Info}); } else { OnComplete({.Request = Request, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } } return Result; } virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, std::span CacheValueRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheValues"); JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); GetUpstreamCacheResult Result; for (CacheValueRequest* RequestPtr : CacheValueRequests) { CacheValueRequest& Request = *RequestPtr; IoBuffer Payload; IoHash RawHash = IoHash::Zero; uint64_t RawSize = 0; double ElapsedSeconds = 0.0; bool IsCompressed = false; if (!Result.Error) { std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); IoHash PayloadHash; const JupiterResult BlobResult = Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, PayloadHash); ElapsedSeconds = BlobResult.ElapsedSeconds; Payload = BlobResult.Response; AppendResult(BlobResult, Result); m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); if (Payload) { if (IsCompressedBinary(Payload.GetContentType())) { IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize) && RawHash != PayloadHash; } else { CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(Payload)); RawHash = Compressed.DecodeRawHash(); if (RawHash == PayloadHash) { IsCompressed = true; } else { ZEN_WARN("Jupiter request for inline payload of {}/{}/{} has hash {}, expected hash {} from header", Namespace, Request.Key.Bucket, Request.Key.Hash.ToHexString(), RawHash.ToHexString(), PayloadHash.ToHexString()); } } } } if (IsCompressed) { OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = Payload, .ElapsedSeconds = ElapsedSeconds, .Source = &m_Info}); } else { OnComplete({.Request = Request, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } } return Result; } virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span Values) override { ZEN_TRACE_CPU("Upstream::Jupiter::PutCacheRecord"); ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size()); const int32_t MaxAttempts = 3; try { JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); if (CacheRecord.Type == ZenContentType::kBinary) { JupiterResult Result; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(CacheRecord.Namespace); Result = Session.PutRef(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kBinary); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); return {.Reason = std::move(Result.Reason), .Bytes = gsl::narrow(Result.ReceivedBytes), .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } else if (CacheRecord.Type == ZenContentType::kCompressedBinary) { IoHash RawHash; uint64_t RawSize; if (!CompressedBuffer::ValidateCompressedHeader(RecordValue, RawHash, RawSize)) { return {.Reason = std::string("Invalid compressed value buffer"), .Success = false}; } CbObjectWriter ReferencingObject; ReferencingObject.AddBinaryAttachment("RawHash", RawHash); ReferencingObject.AddInteger("RawSize", RawSize); return PerformStructuredPut( Session, CacheRecord.Namespace, CacheRecord.Key, ReferencingObject.Save().GetBuffer().AsIoBuffer(), MaxAttempts, [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) { if (ValueContentId != RawHash) { OutReason = fmt::format("Value '{}' MISMATCHED from compressed buffer raw hash {}", ValueContentId, RawHash); return false; } OutBuffer = RecordValue; return true; }); } else { return PerformStructuredPut( Session, CacheRecord.Namespace, CacheRecord.Key, RecordValue, MaxAttempts, [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) { const auto It = std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId); if (It == std::end(CacheRecord.ValueContentIds)) { OutReason = fmt::format("value '{}' MISSING from local cache", ValueContentId); return false; } const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It); OutBuffer = Values[Idx]; return true; }); } } catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); return {.Reason = std::string(Err.what()), .Success = false}; } } virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: static void AppendResult(const JupiterResult& Result, GetUpstreamCacheResult& Out) { Out.Success &= Result.Success; Out.Bytes += gsl::narrow(Result.ReceivedBytes); Out.ElapsedSeconds += Result.ElapsedSeconds; if (Result.ErrorCode) { Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; } }; PutUpstreamCacheResult PerformStructuredPut( JupiterSession& Session, std::string_view Namespace, const CacheKey& Key, IoBuffer ObjectBuffer, const int32_t MaxAttempts, std::function&& BlobFetchFn) { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); const auto PutBlobs = [&](std::span ValueContentIds, std::string& OutReason) -> bool { for (const IoHash& ValueContentId : ValueContentIds) { IoBuffer BlobBuffer; if (!BlobFetchFn(ValueContentId, BlobBuffer, OutReason)) { return false; } JupiterResult BlobResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { BlobResult = Session.PutCompressedBlob(BlobStoreNamespace, ValueContentId, BlobBuffer); } m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); if (!BlobResult.Success) { OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason); return false; } TotalBytes += gsl::narrow(BlobResult.ReceivedBytes); TotalElapsedSeconds += BlobResult.ElapsedSeconds; } return true; }; PutRefResult RefResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) { RefResult = Session.PutRef(BlobStoreNamespace, Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject); } m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); if (!RefResult.Success) { return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, RefResult.Reason), .Success = false}; } TotalBytes += gsl::narrow(RefResult.ReceivedBytes); TotalElapsedSeconds += RefResult.ElapsedSeconds; std::string Reason; if (!PutBlobs(RefResult.Needs, Reason)) { return {.Reason = std::move(Reason), .Success = false}; } const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer); FinalizeRefResult FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash); m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); if (!FinalizeResult.Success) { return { .Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason), .Success = false}; } if (!FinalizeResult.Needs.empty()) { if (!PutBlobs(FinalizeResult.Needs, Reason)) { return {.Reason = std::move(Reason), .Success = false}; } FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash); m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); if (!FinalizeResult.Success) { return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason), .Success = false}; } if (!FinalizeResult.Needs.empty()) { ExtendableStringBuilder<256> Sb; for (const IoHash& MissingHash : FinalizeResult.Needs) { Sb << MissingHash.ToHexString() << ","; } return { .Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'", Key.Bucket, Key.Hash, Sb.ToString()), .Success = false}; } } TotalBytes += gsl::narrow(FinalizeResult.ReceivedBytes); TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; } LoggerRef Log() { return m_Log; } AuthMgr& m_AuthMgr; LoggerRef m_Log; UpstreamEndpointInfo m_Info; UpstreamStatus m_Status; UpstreamEndpointStats m_Stats; RefPtr m_Client; const bool m_AllowRedirect = false; }; class ZenUpstreamEndpoint final : public UpstreamEndpoint { struct ZenEndpoint { std::string Url; std::string Reason; double Latency{}; bool Ok = false; bool operator<(const ZenEndpoint& RHS) const { return Ok && RHS.Ok ? Latency < RHS.Latency : Ok; } }; public: ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) : m_Log(zen::logging::Get("upstream")) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) { ZEN_ASSERT(!Options.Name.empty()); m_Info.Name = Options.Name; for (const auto& Url : Options.Urls) { m_Endpoints.push_back({.Url = Url}); } } ~ZenUpstreamEndpoint() {} virtual const UpstreamEndpointInfo& GetEndpointInfo() const override { return m_Info; } virtual UpstreamEndpointStatus Initialize() override { ZEN_TRACE_CPU("Upstream::Zen::Initialize"); try { if (m_Status.EndpointState() == UpstreamEndpointState::kOk) { return {.State = UpstreamEndpointState::kOk}; } const ZenEndpoint& Ep = GetEndpoint(); if (m_Info.Url != Ep.Url) { ZEN_INFO("Setting Zen upstream URL to '{}'", Ep.Url); m_Info.Url = Ep.Url; } if (Ep.Ok) { RwLock::ExclusiveLockScope _(m_ClientLock); m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_Status.Set(UpstreamEndpointState::kOk); } else { m_Status.Set(UpstreamEndpointState::kError, Ep.Reason); } return m_Status.EndpointStatus(); } catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); return {.Reason = Err.what(), .State = GetState()}; } } virtual UpstreamEndpointState GetState() override { return m_Status.EndpointState(); } virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord"); try { ZenStructuredCacheSession Session(GetClientRef()); const ZenCacheResult Result = Session.GetCacheRecord(Namespace, CacheKey.Bucket, CacheKey.Hash, Type); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); if (Result.ErrorCode == 0) { return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, .Value = Result.Response, .Source = &m_Info}; } else { return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; } } virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, std::span Requests, OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Zen::GetCacheRecords"); ZEN_ASSERT(Requests.size() > 0); CbObjectWriter BatchRequest; BatchRequest << "Method"sv << "GetCacheRecords"sv; BatchRequest << "Accept"sv << kCbPkgMagic; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = Requests[0]->Policy.GetRecordPolicy(); BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy); BatchRequest << "Namespace"sv << Namespace; BatchRequest.BeginArray("Requests"sv); for (CacheKeyRequest* Request : Requests) { BatchRequest.BeginObject(); { const CacheKey& Key = Request->Key; BatchRequest.BeginObject("Key"sv); { BatchRequest << "Bucket"sv << Key.Bucket; BatchRequest << "Hash"sv << Key.Hash; } BatchRequest.EndObject(); if (!Request->Policy.IsUniform() || Request->Policy.GetRecordPolicy() != DefaultPolicy) { BatchRequest.SetName("Policy"sv); Request->Policy.Save(BatchRequest); } } BatchRequest.EndObject(); } BatchRequest.EndArray(); } BatchRequest.EndObject(); ZenCacheResult Result; { ZenStructuredCacheSession Session(GetClientRef()); Result = Session.InvokeRpc(BatchRequest.Save()); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); if (Result.Success) { CbPackage BatchResponse; if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse)) { CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); if (Results.Num() != Requests.size()) { ZEN_WARN("Upstream::Zen::GetCacheRecords invalid number of Response results from Upstream."); } else { for (size_t Index = 0; CbFieldView Record : Results) { CacheKeyRequest* Request = Requests[Index++]; OnComplete({.Request = *Request, .Record = Record.AsObjectView(), .Package = BatchResponse, .ElapsedSeconds = Result.ElapsedSeconds, .Source = &m_Info}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } else { ZEN_WARN("Upstream::Zen::GetCacheRecords invalid Response from Upstream."); } } for (CacheKeyRequest* Request : Requests) { OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Zen::GetCacheChunk"); try { ZenStructuredCacheSession Session(GetClientRef()); const ZenCacheResult Result = Session.GetCacheChunk(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); if (Result.ErrorCode == 0) { return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, .Value = Result.Response, .Source = &m_Info}; } else { return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; } } virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, std::span CacheValueRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); ZEN_ASSERT(!CacheValueRequests.empty()); CbObjectWriter BatchRequest; BatchRequest << "Method"sv << "GetCacheValues"sv; BatchRequest << "Accept"sv << kCbPkgMagic; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = CacheValueRequests[0]->Policy; BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); BatchRequest << "Namespace"sv << Namespace; BatchRequest.BeginArray("Requests"sv); { for (CacheValueRequest* RequestPtr : CacheValueRequests) { const CacheValueRequest& Request = *RequestPtr; BatchRequest.BeginObject(); { BatchRequest.BeginObject("Key"sv); BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); if (Request.Policy != DefaultPolicy) { BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView(); } } BatchRequest.EndObject(); } } BatchRequest.EndArray(); } BatchRequest.EndObject(); ZenCacheResult Result; { ZenStructuredCacheSession Session(GetClientRef()); Result = Session.InvokeRpc(BatchRequest.Save()); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); if (Result.Success) { CbPackage BatchResponse; if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse)) { CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); if (CacheValueRequests.size() != Results.Num()) { ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Response results from Upstream."); } else { for (size_t RequestIndex = 0; CbFieldView ChunkField : Results) { CacheValueRequest& Request = *CacheValueRequests[RequestIndex++]; CbObjectView ChunkObject = ChunkField.AsObjectView(); IoHash RawHash = ChunkObject["RawHash"sv].AsHash(); IoBuffer Payload; uint64_t RawSize = 0; if (RawHash != IoHash::Zero) { bool Success = false; const CbAttachment* Attachment = BatchResponse.FindAttachment(RawHash); if (Attachment) { if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) { Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); RawSize = Compressed.DecodeRawSize(); Success = true; } } if (!Success) { CbFieldView RawSizeField = ChunkObject["RawSize"sv]; RawSize = RawSizeField.AsUInt64(); Success = !RawSizeField.HasError(); } if (!Success) { RawHash = IoHash::Zero; } } OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload), .ElapsedSeconds = Result.ElapsedSeconds, .Source = &m_Info}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } else { ZEN_WARN("Upstream::Zen::GetCacheValues invalid Response from Upstream."); } } for (CacheValueRequest* RequestPtr : CacheValueRequests) { OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, std::span CacheChunkRequests, OnCacheChunksGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Zen::GetCacheChunks"); ZEN_ASSERT(!CacheChunkRequests.empty()); CbObjectWriter BatchRequest; BatchRequest << "Method"sv << "GetCacheChunks"sv; BatchRequest << "Accept"sv << kCbPkgMagic; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy; BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); BatchRequest << "Namespace"sv << Namespace; BatchRequest.BeginArray("ChunkRequests"sv); { for (CacheChunkRequest* RequestPtr : CacheChunkRequests) { const CacheChunkRequest& Request = *RequestPtr; BatchRequest.BeginObject(); { BatchRequest.BeginObject("Key"sv); BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); if (Request.ValueId) { BatchRequest.AddObjectId("ValueId"sv, Request.ValueId); } if (Request.ChunkId != Request.ChunkId.Zero) { BatchRequest << "ChunkId"sv << Request.ChunkId; } if (Request.RawOffset != 0) { BatchRequest << "RawOffset"sv << Request.RawOffset; } if (Request.RawSize != UINT64_MAX) { BatchRequest << "RawSize"sv << Request.RawSize; } if (Request.Policy != DefaultPolicy) { BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView(); } } BatchRequest.EndObject(); } } BatchRequest.EndArray(); } BatchRequest.EndObject(); ZenCacheResult Result; { ZenStructuredCacheSession Session(GetClientRef()); Result = Session.InvokeRpc(BatchRequest.Save()); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); if (Result.Success) { CbPackage BatchResponse; if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse)) { CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); if (CacheChunkRequests.size() != Results.Num()) { ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Response results from Upstream."); } else { for (size_t RequestIndex = 0; CbFieldView ChunkField : Results) { CacheChunkRequest& Request = *CacheChunkRequests[RequestIndex++]; CbObjectView ChunkObject = ChunkField.AsObjectView(); IoHash RawHash = ChunkObject["RawHash"sv].AsHash(); IoBuffer Payload; uint64_t RawSize = 0; if (RawHash != IoHash::Zero) { bool Success = false; const CbAttachment* Attachment = BatchResponse.FindAttachment(RawHash); if (Attachment) { if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) { Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); RawSize = Compressed.DecodeRawSize(); Success = true; } } if (!Success) { CbFieldView RawSizeField = ChunkObject["RawSize"sv]; RawSize = RawSizeField.AsUInt64(); Success = !RawSizeField.HasError(); } if (!Success) { RawHash = IoHash::Zero; } } OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload), .ElapsedSeconds = Result.ElapsedSeconds, .Source = &m_Info}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } else { ZEN_WARN("Upstream::Zen::GetCacheChunks invalid Response from Upstream."); } } for (CacheChunkRequest* RequestPtr : CacheChunkRequests) { OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span Values) override { ZEN_TRACE_CPU("Upstream::Zen::PutCacheRecord"); ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size()); const int32_t MaxAttempts = 3; try { ZenStructuredCacheSession Session(GetClientRef()); ZenCacheResult Result; int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; if (CacheRecord.Type == ZenContentType::kCbPackage) { CbPackage Package; Package.SetObject(CbObject(SharedBuffer(RecordValue))); for (const IoBuffer& Value : Values) { IoHash RawHash; uint64_t RawSize; if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value), RawHash, RawSize)) { Package.AddAttachment(CbAttachment(AttachmentBuffer, RawHash)); } else { return {.Reason = std::string("Invalid value buffer"), .Success = false}; } } BinaryWriter MemStream; Package.Save(MemStream); IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size()); for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutCacheRecord(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, PackagePayload, CacheRecord.Type); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); TotalBytes = Result.Bytes; TotalElapsedSeconds = Result.ElapsedSeconds; } else if (CacheRecord.Type == ZenContentType::kCompressedBinary) { IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue), RawHash, RawSize); if (!Compressed) { return {.Reason = std::string("Invalid value compressed buffer"), .Success = false}; } CbPackage BatchPackage; CbObjectWriter BatchWriter; BatchWriter << "Method"sv << "PutCacheValues"sv; BatchWriter << "Accept"sv << kCbPkgMagic; BatchWriter.BeginObject("Params"sv); { // DefaultPolicy unspecified and expected to be Default BatchWriter << "Namespace"sv << CacheRecord.Namespace; BatchWriter.BeginArray("Requests"sv); { BatchWriter.BeginObject(); { const CacheKey& Key = CacheRecord.Key; BatchWriter.BeginObject("Key"sv); { BatchWriter << "Bucket"sv << Key.Bucket; BatchWriter << "Hash"sv << Key.Hash; } BatchWriter.EndObject(); // Policy unspecified and expected to be Default BatchWriter.AddBinaryAttachment("RawHash"sv, RawHash); BatchPackage.AddAttachment(CbAttachment(Compressed, RawHash)); } BatchWriter.EndObject(); } BatchWriter.EndArray(); } BatchWriter.EndObject(); BatchPackage.SetObject(BatchWriter.Save()); Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.InvokeRpc(BatchPackage); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; } else { for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++) { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutCacheValue(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheRecord.ValueContentIds[Idx], Values[Idx]); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { return {.Reason = "Failed to upload value", .Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = false}; } } Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutCacheRecord(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, CacheRecord.Type); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; } return {.Reason = std::move(Result.Reason), .Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); return {.Reason = std::string(Err.what()), .Success = false}; } } virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: Ref GetClientRef() { // m_Client can be modified at any time by a different thread. // Make sure we safely bump the refcount inside a scope lock RwLock::SharedLockScope _(m_ClientLock); ZEN_ASSERT(m_Client); Ref ClientRef(m_Client); _.ReleaseNow(); return ClientRef; } const ZenEndpoint& GetEndpoint() { for (ZenEndpoint& Ep : m_Endpoints) { Ref Client( new ZenStructuredCacheClient({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)})); ZenStructuredCacheSession Session(std::move(Client)); const int32_t SampleCount = 2; Ep.Ok = false; Ep.Latency = {}; for (int32_t Sample = 0; Sample < SampleCount; ++Sample) { ZenCacheResult Result = Session.CheckHealth(); Ep.Ok = Result.Success; Ep.Reason = std::move(Result.Reason); Ep.Latency += Result.ElapsedSeconds; } Ep.Latency /= double(SampleCount); } std::sort(std::begin(m_Endpoints), std::end(m_Endpoints)); for (const auto& Ep : m_Endpoints) { ZEN_INFO("ping 'Zen' endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason); } return m_Endpoints.front(); } LoggerRef Log() { return m_Log; } LoggerRef m_Log; UpstreamEndpointInfo m_Info; UpstreamStatus m_Status; UpstreamEndpointStats m_Stats; std::vector m_Endpoints; std::chrono::milliseconds m_ConnectTimeout; std::chrono::milliseconds m_Timeout; RwLock m_ClientLock; RefPtr m_Client; }; } // namespace detail ////////////////////////////////////////////////////////////////////////// class UpstreamCacheImpl final : public UpstreamCache { struct EnqueuedRequest { UpstreamCacheRecord Record; std::function GetValueFunc; }; public: UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore) : m_Log(logging::Get("upstream")) , m_Options(Options) , m_CacheStore(CacheStore) { } virtual ~UpstreamCacheImpl() { Shutdown(); } virtual void Initialize() override { ZEN_TRACE_CPU("Upstream::Initialize"); m_RunState.IsRunning = true; } virtual bool IsActive() override { std::shared_lock _(m_EndpointsMutex); return !m_Endpoints.empty(); } virtual void RegisterEndpoint(std::unique_ptr Endpoint) override { ZEN_TRACE_CPU("Upstream::RegisterEndpoint"); const UpstreamEndpointStatus Status = Endpoint->Initialize(); const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); if (Status.State == UpstreamEndpointState::kOk) { ZEN_INFO("register endpoint '{} - {}' {}", Info.Name, Info.Url, ToString(Status.State)); } else { ZEN_WARN("register endpoint '{} - {}' {}", Info.Name, Info.Url, ToString(Status.State)); } // Register endpoint even if it fails, the health monitor thread will probe failing endpoint(s) std::unique_lock _(m_EndpointsMutex); if (m_Endpoints.empty()) { for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { m_UpstreamThreads.emplace_back(&UpstreamCacheImpl::ProcessUpstreamQueue, this, Idx + 1); } m_EndpointMonitorThread = std::thread(&UpstreamCacheImpl::MonitorEndpoints, this); } m_Endpoints.emplace_back(std::move(Endpoint)); } virtual void IterateEndpoints(std::function&& Fn) override { std::shared_lock _(m_EndpointsMutex); for (auto& Ep : m_Endpoints) { if (!Fn(*Ep)) { break; } } } virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::GetCacheRecord"); std::shared_lock _(m_EndpointsMutex); if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { if (Endpoint->GetState() != UpstreamEndpointState::kOk) { continue; } UpstreamEndpointStats& Stats = Endpoint->Stats(); metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); GetUpstreamCacheSingleResult Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type); Scope.Stop(); Stats.CacheGetCount.Increment(1); Stats.CacheGetTotalBytes.Increment(Result.Status.Bytes); if (Result.Status.Success) { Stats.CacheHitCount.Increment(1); return Result; } if (Result.Status.Error) { Stats.CacheErrorCount.Increment(1); ZEN_WARN("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Status.Error.Reason, Result.Status.Error.ErrorCode); } } } return {}; } virtual void GetCacheRecords(std::string_view Namespace, std::span Requests, OnCacheRecordGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); std::shared_lock _(m_EndpointsMutex); std::vector RemainingKeys(Requests.begin(), Requests.end()); if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { if (RemainingKeys.empty()) { break; } if (Endpoint->GetState() != UpstreamEndpointState::kOk) { continue; } UpstreamEndpointStats& Stats = Endpoint->Stats(); std::vector Missing; GetUpstreamCacheResult Result; { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); Result = Endpoint->GetCacheRecords(Namespace, RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { if (Params.Record) { OnComplete(std::forward(Params)); Stats.CacheHitCount.Increment(1); } else { Missing.push_back(&Params.Request); } }); } Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); Stats.CacheGetTotalBytes.Increment(Result.Bytes); if (Result.Error) { Stats.CacheErrorCount.Increment(1); ZEN_WARN("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); } RemainingKeys = std::move(Missing); } } const UpstreamEndpointInfo Info; for (CacheKeyRequest* Request : RemainingKeys) { OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()}); } } virtual void GetCacheChunks(std::string_view Namespace, std::span CacheChunkRequests, OnCacheChunksGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheChunks"); std::shared_lock _(m_EndpointsMutex); std::vector RemainingKeys(CacheChunkRequests.begin(), CacheChunkRequests.end()); if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { if (RemainingKeys.empty()) { break; } if (Endpoint->GetState() != UpstreamEndpointState::kOk) { continue; } UpstreamEndpointStats& Stats = Endpoint->Stats(); std::vector Missing; GetUpstreamCacheResult Result; { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); Result = Endpoint->GetCacheChunks(Namespace, RemainingKeys, [&](CacheChunkGetCompleteParams&& Params) { if (Params.RawHash != Params.RawHash.Zero) { OnComplete(std::forward(Params)); Stats.CacheHitCount.Increment(1); } else { Missing.push_back(&Params.Request); } }); } Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); Stats.CacheGetTotalBytes.Increment(Result.Bytes); if (Result.Error) { Stats.CacheErrorCount.Increment(1); ZEN_WARN("get cache chunks(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); } RemainingKeys = std::move(Missing); } } const UpstreamEndpointInfo Info; for (CacheChunkRequest* RequestPtr : RemainingKeys) { OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } } virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::GetCacheChunk"); if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { if (Endpoint->GetState() != UpstreamEndpointState::kOk) { continue; } UpstreamEndpointStats& Stats = Endpoint->Stats(); metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); GetUpstreamCacheSingleResult Result = Endpoint->GetCacheChunk(Namespace, CacheKey, ValueContentId); Scope.Stop(); Stats.CacheGetCount.Increment(1); Stats.CacheGetTotalBytes.Increment(Result.Status.Bytes); if (Result.Status.Success) { Stats.CacheHitCount.Increment(1); return Result; } if (Result.Status.Error) { Stats.CacheErrorCount.Increment(1); ZEN_WARN("get cache chunk FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Status.Error.Reason, Result.Status.Error.ErrorCode); } } } return {}; } virtual void GetCacheValues(std::string_view Namespace, std::span CacheValueRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheValues"); std::shared_lock _(m_EndpointsMutex); std::vector RemainingKeys(CacheValueRequests.begin(), CacheValueRequests.end()); if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { if (RemainingKeys.empty()) { break; } if (Endpoint->GetState() != UpstreamEndpointState::kOk) { continue; } UpstreamEndpointStats& Stats = Endpoint->Stats(); std::vector Missing; GetUpstreamCacheResult Result; { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { if (Params.RawHash != Params.RawHash.Zero) { OnComplete(std::forward(Params)); Stats.CacheHitCount.Increment(1); } else { Missing.push_back(&Params.Request); } }); } Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); Stats.CacheGetTotalBytes.Increment(Result.Bytes); if (Result.Error) { Stats.CacheErrorCount.Increment(1); ZEN_WARN("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); } RemainingKeys = std::move(Missing); } } const UpstreamEndpointInfo Info; for (CacheValueRequest* RequestPtr : RemainingKeys) { OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } } virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function&& GetValueFunc) override { if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0) { if (!m_UpstreamThreads.empty()) { m_UpstreamQueue.Enqueue(EnqueuedRequest{.Record = std::move(CacheRecord), .GetValueFunc = GetValueFunc}); } else { ProcessCacheRecord(std::move(CacheRecord), std::move(GetValueFunc)); } } } virtual void GetStatus(CbObjectWriter& Status) override { ZEN_TRACE_CPU("Upstream::GetStatus"); Status << "active" << IsActive(); Status << "reading" << m_Options.ReadUpstream; Status << "writing" << m_Options.WriteUpstream; Status << "worker_threads" << m_UpstreamThreads.size(); Status << "queue_count" << m_UpstreamQueue.Size(); Status.BeginArray("endpoints"); for (const auto& Ep : m_Endpoints) { const UpstreamEndpointInfo& EpInfo = Ep->GetEndpointInfo(); const UpstreamEndpointStatus EpStatus = Ep->GetStatus(); UpstreamEndpointStats& EpStats = Ep->Stats(); Status.BeginObject(); Status << "name" << EpInfo.Name; Status << "url" << EpInfo.Url; Status << "state" << ToString(EpStatus.State); Status << "reason" << EpStatus.Reason; Status.BeginObject("cache"sv); { const int64_t GetCount = EpStats.CacheGetCount.Value(); const int64_t HitCount = EpStats.CacheHitCount.Value(); const int64_t ErrorCount = EpStats.CacheErrorCount.Value(); const double HitRatio = GetCount > 0 ? double(HitCount) / double(GetCount) : 0.0; const double ErrorRatio = GetCount > 0 ? double(ErrorCount) / double(GetCount) : 0.0; metrics::EmitSnapshot("get_requests"sv, EpStats.CacheGetRequestTiming, Status); Status << "get_bytes" << EpStats.CacheGetTotalBytes.Value(); Status << "get_count" << GetCount; Status << "hit_count" << HitCount; Status << "hit_ratio" << HitRatio; Status << "error_count" << ErrorCount; Status << "error_ratio" << ErrorRatio; metrics::EmitSnapshot("put_requests"sv, EpStats.CachePutRequestTiming, Status); Status << "put_bytes" << EpStats.CachePutTotalBytes.Value(); } Status.EndObject(); Status.EndObject(); } Status.EndArray(); } private: void ProcessCacheRecord(const UpstreamCacheRecord& CacheRecord, std::function&& GetValueFunc) { ZEN_TRACE_CPU("Upstream::ProcessCacheRecord"); ZenCacheValue CacheValue; std::vector Payloads; if (!m_CacheStore.Get(CacheRecord.Context, CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}/{}', cache record doesn't exist", CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash); return; } for (const IoHash& ValueContentId : CacheRecord.ValueContentIds) { if (IoBuffer Payload = GetValueFunc(ValueContentId)) { Payloads.push_back(Payload); } else { ZEN_WARN("process upstream FAILED, '{}/{}/{}/{}', ValueContentId doesn't exist in CAS", CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, ValueContentId); return; } } std::shared_lock _(m_EndpointsMutex); for (auto& Endpoint : m_Endpoints) { if (Endpoint->GetState() != UpstreamEndpointState::kOk) { continue; } UpstreamEndpointStats& Stats = Endpoint->Stats(); PutUpstreamCacheResult Result; { metrics::OperationTiming::Scope Scope(Stats.CachePutRequestTiming); Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); } Stats.CachePutTotalBytes.Increment(Result.Bytes); if (!Result.Success) { ZEN_WARN("upload cache record '{}/{}/{}' FAILED, endpoint '{}', reason '{}'", CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Endpoint->GetEndpointInfo().Url, Result.Reason); } } } void ProcessUpstreamQueue(int ThreadIndex) { std::string ThreadName = fmt::format("upstream_{}", ThreadIndex); SetCurrentThreadName(ThreadName); for (;;) { EnqueuedRequest Request; if (m_UpstreamQueue.WaitAndDequeue(Request)) { try { ProcessCacheRecord(Request.Record, std::move(Request.GetValueFunc)); } catch (const std::exception& Err) { ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'", Request.Record.Namespace, Request.Record.Key.Bucket, Request.Record.Key.Hash, Err.what()); } } if (!m_RunState.IsRunning) { break; } } } void MonitorEndpoints() { SetCurrentThreadName("upstream_monitor"); for (;;) { { std::unique_lock lk(m_RunState.Mutex); if (m_RunState.ExitSignal.wait_for(lk, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) { break; } } try { std::vector Endpoints; { std::shared_lock _(m_EndpointsMutex); for (auto& Endpoint : m_Endpoints) { UpstreamEndpointState State = Endpoint->GetState(); if (State == UpstreamEndpointState::kError) { Endpoints.push_back(Endpoint.get()); ZEN_WARN("HEALTH - endpoint '{} - {}' is in error state '{}'", Endpoint->GetEndpointInfo().Name, Endpoint->GetEndpointInfo().Url, Endpoint->GetStatus().Reason); } if (State == UpstreamEndpointState::kUnauthorized) { Endpoints.push_back(Endpoint.get()); } } } for (auto& Endpoint : Endpoints) { const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); const UpstreamEndpointStatus Status = Endpoint->Initialize(); if (Status.State == UpstreamEndpointState::kOk) { ZEN_INFO("HEALTH - endpoint '{} - {}' Ok", Info.Name, Info.Url); } else { const std::string Reason = Status.Reason.empty() ? "" : fmt::format(", reason '{}'", Status.Reason); ZEN_WARN("HEALTH - endpoint '{} - {}' {} {}", Info.Name, Info.Url, ToString(Status.State), Reason); } } } catch (const std::exception& Err) { ZEN_ERROR("check endpoint(s) health FAILED, reason '{}'", Err.what()); } } } void Shutdown() { if (m_RunState.Stop()) { m_UpstreamQueue.CompleteAdding(); for (std::thread& Thread : m_UpstreamThreads) { Thread.join(); } if (m_EndpointMonitorThread.joinable()) { m_EndpointMonitorThread.join(); } m_UpstreamThreads.clear(); m_Endpoints.clear(); } } LoggerRef Log() { return m_Log; } using UpstreamQueue = BlockingQueue; struct RunState { std::mutex Mutex; std::condition_variable ExitSignal; std::atomic_bool IsRunning{false}; bool Stop() { bool Stopped = false; { std::lock_guard _(Mutex); Stopped = IsRunning.exchange(false); } if (Stopped) { ExitSignal.notify_all(); } return Stopped; } }; LoggerRef m_Log; UpstreamCacheOptions m_Options; ZenCacheStore& m_CacheStore; UpstreamQueue m_UpstreamQueue; std::shared_mutex m_EndpointsMutex; std::vector> m_Endpoints; std::vector m_UpstreamThreads; std::thread m_EndpointMonitorThread; RunState m_RunState; }; ////////////////////////////////////////////////////////////////////////// std::unique_ptr UpstreamEndpoint::CreateZenEndpoint(const ZenStructuredCacheClientOptions& Options) { return std::make_unique(Options); } std::unique_ptr UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr) { return std::make_unique(Options, AuthConfig, Mgr); } std::unique_ptr CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore) { return std::make_unique(Options, CacheStore); } } // namespace zen