diff options
Diffstat (limited to 'src/zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | src/zenserver/upstream/upstreamcache.cpp | 2134 |
1 files changed, 0 insertions, 2134 deletions
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp deleted file mode 100644 index 8558e2a10..000000000 --- a/src/zenserver/upstream/upstreamcache.cpp +++ /dev/null @@ -1,2134 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "upstreamcache.h" -#include "zen.h" - -#include <zencore/blockingqueue.h> -#include <zencore/compactbinary.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/fmtutils.h> -#include <zencore/stats.h> -#include <zencore/stream.h> -#include <zencore/timer.h> -#include <zencore/trace.h> - -#include <zenhttp/httpclientauth.h> -#include <zenhttp/packageformat.h> - -#include <zenstore/cache/structuredcachestore.h> -#include <zenstore/cidstore.h> - -#include <zenremotestore/jupiter/jupiterclient.h> -#include <zenremotestore/jupiter/jupitersession.h> - -#include "cache/httpstructuredcache.h" -#include "diag/logging.h" - -#include <fmt/format.h> - -#include <algorithm> -#include <atomic> -#include <shared_mutex> -#include <thread> - -namespace zen { - -using namespace std::literals; - -namespace detail { - - class UpstreamStatus - { - public: - UpstreamEndpointState EndpointState() const { return static_cast<UpstreamEndpointState>(m_State.load(std::memory_order_relaxed)); } - - UpstreamEndpointStatus EndpointStatus() const - { - const UpstreamEndpointState State = EndpointState(); - { - std::unique_lock _(m_Mutex); - return {.Reason = m_ErrorText, .State = State}; - } - } - - void Set(UpstreamEndpointState NewState) - { - m_State.store(static_cast<uint32_t>(NewState), std::memory_order_relaxed); - { - std::unique_lock _(m_Mutex); - m_ErrorText.clear(); - } - } - - void Set(UpstreamEndpointState NewState, std::string ErrorText) - { - m_State.store(static_cast<uint32_t>(NewState), std::memory_order_relaxed); - { - std::unique_lock _(m_Mutex); - m_ErrorText = std::move(ErrorText); - } - } - - void SetFromErrorCode(int32_t ErrorCode, std::string_view ErrorText) - { - if (ErrorCode != 0) - { - Set(ErrorCode == 401 ? UpstreamEndpointState::kUnauthorized : UpstreamEndpointState::kError, std::string(ErrorText)); - } - } - - private: - mutable std::mutex m_Mutex; - std::string m_ErrorText; - std::atomic_uint32_t m_State; - }; - - class JupiterUpstreamEndpoint final : public UpstreamEndpoint - { - public: - JupiterUpstreamEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr) - : m_AuthMgr(Mgr) - , m_Log(zen::logging::Get("upstream")) - { - ZEN_ASSERT(!Options.Name.empty()); - m_Info.Name = Options.Name; - m_Info.Url = Options.ServiceUrl; - - std::function<HttpClientAccessToken()> TokenProvider; - - if (AuthConfig.OAuthUrl.empty() == false) - { - TokenProvider = httpclientauth::CreateFromOAuthClientCredentials( - {.Url = AuthConfig.OAuthUrl, .ClientId = AuthConfig.OAuthClientId, .ClientSecret = AuthConfig.OAuthClientSecret}); - } - else if (!AuthConfig.OpenIdProvider.empty()) - { - TokenProvider = httpclientauth::CreateFromOpenIdProvider(m_AuthMgr, AuthConfig.OpenIdProvider); - } - else if (!AuthConfig.AccessToken.empty()) - { - TokenProvider = httpclientauth::CreateFromStaticToken(AuthConfig.AccessToken); - } - else - { - TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(m_AuthMgr); - } - - m_Client = new JupiterClient(Options, std::move(TokenProvider)); - } - - virtual ~JupiterUpstreamEndpoint() {} - - virtual const UpstreamEndpointInfo& GetEndpointInfo() const override { return m_Info; } - - virtual UpstreamEndpointStatus Initialize() override - { - ZEN_TRACE_CPU("Upstream::Jupiter::Initialize"); - - try - { - if (m_Status.EndpointState() == UpstreamEndpointState::kOk) - { - return {.State = UpstreamEndpointState::kOk}; - } - - JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); - const JupiterResult Result = Session.Authenticate(); - - if (Result.Success) - { - m_Status.Set(UpstreamEndpointState::kOk); - } - else if (Result.ErrorCode != 0) - { - m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); - } - else - { - m_Status.Set(UpstreamEndpointState::kUnauthorized); - } - - return m_Status.EndpointStatus(); - } - catch (const std::exception& Err) - { - m_Status.Set(UpstreamEndpointState::kError, Err.what()); - - return {.Reason = Err.what(), .State = GetState()}; - } - } - - std::string_view GetActualBlobStoreNamespace(std::string_view Namespace) - { - if (Namespace == ZenCacheStore::DefaultNamespace) - { - return m_Client->DefaultBlobStoreNamespace(); - } - return Namespace; - } - - virtual UpstreamEndpointState GetState() override { return m_Status.EndpointState(); } - - virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - - virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, - const CacheKey& CacheKey, - ZenContentType Type) override - { - ZEN_TRACE_CPU("Upstream::Jupiter::GetSingleCacheRecord"); - - try - { - JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); - JupiterResult Result; - - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); - - if (Type == ZenContentType::kCompressedBinary) - { - Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); - - if (Result.Success) - { - const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); - if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) - { - CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); - IoBuffer ContentBuffer; - int NumAttachments = 0; - - CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { - JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); - Result.ReceivedBytes += AttachmentResult.ReceivedBytes; - Result.SentBytes += AttachmentResult.SentBytes; - Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; - Result.ErrorCode = AttachmentResult.ErrorCode; - - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(AttachmentResult.Response, RawHash, RawSize)) - { - Result.Response = AttachmentResult.Response; - ++NumAttachments; - } - else - { - Result.Success = false; - } - }); - if (NumAttachments != 1) - { - Result.Success = false; - } - } - } - } - else - { - const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; - Result = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, AcceptType); - - if (Result.Success && Type == ZenContentType::kCbPackage) - { - CbPackage Package; - - const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); - if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) - { - CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); - - CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { - JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); - Result.ReceivedBytes += AttachmentResult.ReceivedBytes; - Result.SentBytes += AttachmentResult.SentBytes; - Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; - Result.ErrorCode = AttachmentResult.ErrorCode; - - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer Chunk = - CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response), RawHash, RawSize)) - { - Package.AddAttachment(CbAttachment(Chunk, AttachmentHash.AsHash())); - } - else - { - Result.Success = false; - } - }); - - Package.SetObject(CacheRecord); - } - - if (Result.Success) - { - BinaryWriter MemStream; - Package.Save(MemStream); - - Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - } - } - } - - m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); - - if (Result.ErrorCode == 0) - { - return {.Status = {.Bytes = gsl::narrow<int64_t>(Result.ReceivedBytes), - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}, - .Value = Result.Response, - .Source = &m_Info}; - } - else - { - return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; - } - } - catch (const std::exception& Err) - { - m_Status.Set(UpstreamEndpointState::kError, Err.what()); - - return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; - } - } - - virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, - std::span<CacheKeyRequest*> Requests, - OnCacheRecordGetComplete&& OnComplete) override - { - ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheRecords"); - - JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); - GetUpstreamCacheResult Result; - - for (CacheKeyRequest* Request : Requests) - { - const CacheKey& CacheKey = Request->Key; - CbPackage Package; - CbObject Record; - - double ElapsedSeconds = 0.0; - if (!Result.Error) - { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); - JupiterResult RefResult = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); - AppendResult(RefResult, Result); - ElapsedSeconds = RefResult.ElapsedSeconds; - - m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); - - if (RefResult.ErrorCode == 0) - { - const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All); - if (ValidationResult == CbValidateError::None) - { - Record = LoadCompactBinaryObject(RefResult.Response); - Record.IterateAttachments([&](CbFieldView AttachmentHash) { - JupiterResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); - AppendResult(BlobResult, Result); - - m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); - - if (BlobResult.ErrorCode == 0) - { - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer Chunk = - CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response), RawHash, RawSize)) - { - if (RawHash == AttachmentHash.AsHash()) - { - Package.AddAttachment(CbAttachment(Chunk, RawHash)); - } - } - } - }); - } - } - } - - OnComplete( - {.Request = *Request, .Record = Record, .Package = Package, .ElapsedSeconds = ElapsedSeconds, .Source = &m_Info}); - } - - return Result; - } - - virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, - const CacheKey&, - const IoHash& ValueContentId) override - { - ZEN_TRACE_CPU("Upstream::Jupiter::GetSingleCacheChunk"); - - try - { - JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); - const JupiterResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); - - m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); - - if (Result.ErrorCode == 0) - { - return {.Status = {.Bytes = gsl::narrow<int64_t>(Result.ReceivedBytes), - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}, - .Value = Result.Response, - .Source = &m_Info}; - } - else - { - return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; - } - } - catch (const std::exception& Err) - { - m_Status.Set(UpstreamEndpointState::kError, Err.what()); - - return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; - } - } - - virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, - std::span<CacheChunkRequest*> CacheChunkRequests, - OnCacheChunksGetComplete&& OnComplete) override final - { - ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheChunks"); - - JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); - GetUpstreamCacheResult Result; - - for (CacheChunkRequest* RequestPtr : CacheChunkRequests) - { - CacheChunkRequest& Request = *RequestPtr; - IoBuffer Payload; - IoHash RawHash = IoHash::Zero; - uint64_t RawSize = 0; - - double ElapsedSeconds = 0.0; - bool IsCompressed = false; - if (!Result.Error) - { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); - const JupiterResult BlobResult = - Request.ChunkId == IoHash::Zero - ? Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, Request.ChunkId) - : Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId); - ElapsedSeconds = BlobResult.ElapsedSeconds; - Payload = BlobResult.Response; - - AppendResult(BlobResult, Result); - - m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); - if (Payload && IsCompressedBinary(Payload.GetContentType())) - { - IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize); - } - } - - if (IsCompressed) - { - OnComplete({.Request = Request, - .RawHash = RawHash, - .RawSize = RawSize, - .Value = Payload, - .ElapsedSeconds = ElapsedSeconds, - .Source = &m_Info}); - } - else - { - OnComplete({.Request = Request, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); - } - } - - return Result; - } - - virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, - std::span<CacheValueRequest*> CacheValueRequests, - OnCacheValueGetComplete&& OnComplete) override final - { - ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheValues"); - - JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); - GetUpstreamCacheResult Result; - - for (CacheValueRequest* RequestPtr : CacheValueRequests) - { - CacheValueRequest& Request = *RequestPtr; - IoBuffer Payload; - IoHash RawHash = IoHash::Zero; - uint64_t RawSize = 0; - - double ElapsedSeconds = 0.0; - bool IsCompressed = false; - if (!Result.Error) - { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); - IoHash PayloadHash; - const JupiterResult BlobResult = - Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, PayloadHash); - ElapsedSeconds = BlobResult.ElapsedSeconds; - Payload = BlobResult.Response; - - AppendResult(BlobResult, Result); - - m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); - if (Payload) - { - if (IsCompressedBinary(Payload.GetContentType())) - { - IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize) && RawHash != PayloadHash; - } - else - { - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(Payload)); - RawHash = Compressed.DecodeRawHash(); - if (RawHash == PayloadHash) - { - IsCompressed = true; - } - else - { - ZEN_WARN("Jupiter request for inline payload of {}/{}/{} has hash {}, expected hash {} from header", - Namespace, - Request.Key.Bucket, - Request.Key.Hash.ToHexString(), - RawHash.ToHexString(), - PayloadHash.ToHexString()); - } - } - } - } - - if (IsCompressed) - { - OnComplete({.Request = Request, - .RawHash = RawHash, - .RawSize = RawSize, - .Value = Payload, - .ElapsedSeconds = ElapsedSeconds, - .Source = &m_Info}); - } - else - { - OnComplete({.Request = Request, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); - } - } - - return Result; - } - - virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, - IoBuffer RecordValue, - std::span<IoBuffer const> Values) override - { - ZEN_TRACE_CPU("Upstream::Jupiter::PutCacheRecord"); - - ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size()); - const int32_t MaxAttempts = 3; - - try - { - JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); - - if (CacheRecord.Type == ZenContentType::kBinary) - { - JupiterResult Result; - for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(CacheRecord.Namespace); - Result = Session.PutRef(BlobStoreNamespace, - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - RecordValue, - ZenContentType::kBinary); - } - - m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); - - return {.Reason = std::move(Result.Reason), - .Bytes = gsl::narrow<int64_t>(Result.ReceivedBytes), - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; - } - else if (CacheRecord.Type == ZenContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(RecordValue, RawHash, RawSize)) - { - return {.Reason = std::string("Invalid compressed value buffer"), .Success = false}; - } - - CbObjectWriter ReferencingObject; - ReferencingObject.AddBinaryAttachment("RawHash", RawHash); - ReferencingObject.AddInteger("RawSize", RawSize); - - return PerformStructuredPut( - Session, - CacheRecord.Namespace, - CacheRecord.Key, - ReferencingObject.Save().GetBuffer().AsIoBuffer(), - MaxAttempts, - [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) { - if (ValueContentId != RawHash) - { - OutReason = - fmt::format("Value '{}' MISMATCHED from compressed buffer raw hash {}", ValueContentId, RawHash); - return false; - } - - OutBuffer = RecordValue; - return true; - }); - } - else - { - return PerformStructuredPut( - Session, - CacheRecord.Namespace, - CacheRecord.Key, - RecordValue, - MaxAttempts, - [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) { - const auto It = - std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId); - - if (It == std::end(CacheRecord.ValueContentIds)) - { - OutReason = fmt::format("value '{}' MISSING from local cache", ValueContentId); - return false; - } - - const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It); - - OutBuffer = Values[Idx]; - return true; - }); - } - } - catch (const std::exception& Err) - { - m_Status.Set(UpstreamEndpointState::kError, Err.what()); - - return {.Reason = std::string(Err.what()), .Success = false}; - } - } - - virtual UpstreamEndpointStats& Stats() override { return m_Stats; } - - private: - static void AppendResult(const JupiterResult& Result, GetUpstreamCacheResult& Out) - { - Out.Success &= Result.Success; - Out.Bytes += gsl::narrow<int64_t>(Result.ReceivedBytes); - Out.ElapsedSeconds += Result.ElapsedSeconds; - - if (Result.ErrorCode) - { - Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; - } - }; - - PutUpstreamCacheResult PerformStructuredPut( - JupiterSession& Session, - std::string_view Namespace, - const CacheKey& Key, - IoBuffer ObjectBuffer, - const int32_t MaxAttempts, - std::function<bool(const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason)>&& BlobFetchFn) - { - int64_t TotalBytes = 0ull; - double TotalElapsedSeconds = 0.0; - - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); - const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { - for (const IoHash& ValueContentId : ValueContentIds) - { - IoBuffer BlobBuffer; - if (!BlobFetchFn(ValueContentId, BlobBuffer, OutReason)) - { - return false; - } - - JupiterResult BlobResult; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) - { - BlobResult = Session.PutCompressedBlob(BlobStoreNamespace, ValueContentId, BlobBuffer); - } - - m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); - - if (!BlobResult.Success) - { - OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason); - return false; - } - - TotalBytes += gsl::narrow<int64_t>(BlobResult.ReceivedBytes); - TotalElapsedSeconds += BlobResult.ElapsedSeconds; - } - - return true; - }; - - PutRefResult RefResult; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) - { - RefResult = Session.PutRef(BlobStoreNamespace, Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject); - } - - m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); - - if (!RefResult.Success) - { - return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, RefResult.Reason), - .Success = false}; - } - - TotalBytes += gsl::narrow<int64_t>(RefResult.ReceivedBytes); - TotalElapsedSeconds += RefResult.ElapsedSeconds; - - std::string Reason; - if (!PutBlobs(RefResult.Needs, Reason)) - { - return {.Reason = std::move(Reason), .Success = false}; - } - - const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer); - FinalizeRefResult FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash); - - m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); - - if (!FinalizeResult.Success) - { - return { - .Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason), - .Success = false}; - } - - if (!FinalizeResult.Needs.empty()) - { - if (!PutBlobs(FinalizeResult.Needs, Reason)) - { - return {.Reason = std::move(Reason), .Success = false}; - } - - FinalizeResult = Session.FinalizeRef(BlobStoreNamespace, Key.Bucket, Key.Hash, RefHash); - - m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); - - if (!FinalizeResult.Success) - { - return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason), - .Success = false}; - } - - if (!FinalizeResult.Needs.empty()) - { - ExtendableStringBuilder<256> Sb; - for (const IoHash& MissingHash : FinalizeResult.Needs) - { - Sb << MissingHash.ToHexString() << ","; - } - - return { - .Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'", Key.Bucket, Key.Hash, Sb.ToString()), - .Success = false}; - } - } - - TotalBytes += gsl::narrow<int64_t>(FinalizeResult.ReceivedBytes); - TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; - - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; - } - - LoggerRef Log() { return m_Log; } - - AuthMgr& m_AuthMgr; - LoggerRef m_Log; - UpstreamEndpointInfo m_Info; - UpstreamStatus m_Status; - UpstreamEndpointStats m_Stats; - RefPtr<JupiterClient> m_Client; - const bool m_AllowRedirect = false; - }; - - class ZenUpstreamEndpoint final : public UpstreamEndpoint - { - struct ZenEndpoint - { - std::string Url; - std::string Reason; - double Latency{}; - bool Ok = false; - - bool operator<(const ZenEndpoint& RHS) const { return Ok && RHS.Ok ? Latency < RHS.Latency : Ok; } - }; - - public: - ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) - : m_Log(zen::logging::Get("upstream")) - , m_ConnectTimeout(Options.ConnectTimeout) - , m_Timeout(Options.Timeout) - { - ZEN_ASSERT(!Options.Name.empty()); - m_Info.Name = Options.Name; - - for (const auto& Url : Options.Urls) - { - m_Endpoints.push_back({.Url = Url}); - } - } - - ~ZenUpstreamEndpoint() {} - - virtual const UpstreamEndpointInfo& GetEndpointInfo() const override { return m_Info; } - - virtual UpstreamEndpointStatus Initialize() override - { - ZEN_TRACE_CPU("Upstream::Zen::Initialize"); - - try - { - if (m_Status.EndpointState() == UpstreamEndpointState::kOk) - { - return {.State = UpstreamEndpointState::kOk}; - } - - const ZenEndpoint& Ep = GetEndpoint(); - - if (m_Info.Url != Ep.Url) - { - ZEN_INFO("Setting Zen upstream URL to '{}'", Ep.Url); - m_Info.Url = Ep.Url; - } - - if (Ep.Ok) - { - RwLock::ExclusiveLockScope _(m_ClientLock); - m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); - m_Status.Set(UpstreamEndpointState::kOk); - } - else - { - m_Status.Set(UpstreamEndpointState::kError, Ep.Reason); - } - - return m_Status.EndpointStatus(); - } - catch (const std::exception& Err) - { - m_Status.Set(UpstreamEndpointState::kError, Err.what()); - - return {.Reason = Err.what(), .State = GetState()}; - } - } - - virtual UpstreamEndpointState GetState() override { return m_Status.EndpointState(); } - - virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - - virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, - const CacheKey& CacheKey, - ZenContentType Type) override - { - ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord"); - - try - { - ZenStructuredCacheSession Session(GetClientRef()); - const ZenCacheResult Result = Session.GetCacheRecord(Namespace, CacheKey.Bucket, CacheKey.Hash, Type); - - m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); - - if (Result.ErrorCode == 0) - { - return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, - .Value = Result.Response, - .Source = &m_Info}; - } - else - { - return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; - } - } - catch (const std::exception& Err) - { - m_Status.Set(UpstreamEndpointState::kError, Err.what()); - - return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; - } - } - - virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, - std::span<CacheKeyRequest*> Requests, - OnCacheRecordGetComplete&& OnComplete) override - { - ZEN_TRACE_CPU("Upstream::Zen::GetCacheRecords"); - ZEN_ASSERT(Requests.size() > 0); - - CbObjectWriter BatchRequest; - BatchRequest << "Method"sv - << "GetCacheRecords"sv; - BatchRequest << "Accept"sv << kCbPkgMagic; - - BatchRequest.BeginObject("Params"sv); - { - CachePolicy DefaultPolicy = Requests[0]->Policy.GetRecordPolicy(); - BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy); - - BatchRequest << "Namespace"sv << Namespace; - - BatchRequest.BeginArray("Requests"sv); - for (CacheKeyRequest* Request : Requests) - { - BatchRequest.BeginObject(); - { - const CacheKey& Key = Request->Key; - BatchRequest.BeginObject("Key"sv); - { - BatchRequest << "Bucket"sv << Key.Bucket; - BatchRequest << "Hash"sv << Key.Hash; - } - BatchRequest.EndObject(); - if (!Request->Policy.IsUniform() || Request->Policy.GetRecordPolicy() != DefaultPolicy) - { - BatchRequest.SetName("Policy"sv); - Request->Policy.Save(BatchRequest); - } - } - BatchRequest.EndObject(); - } - BatchRequest.EndArray(); - } - BatchRequest.EndObject(); - - ZenCacheResult Result; - - { - ZenStructuredCacheSession Session(GetClientRef()); - Result = Session.InvokeRpc(BatchRequest.Save()); - } - - m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); - - if (Result.Success) - { - CbPackage BatchResponse; - if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse)) - { - CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); - if (Results.Num() != Requests.size()) - { - ZEN_WARN("Upstream::Zen::GetCacheRecords invalid number of Response results from Upstream."); - } - else - { - for (size_t Index = 0; CbFieldView Record : Results) - { - CacheKeyRequest* Request = Requests[Index++]; - OnComplete({.Request = *Request, - .Record = Record.AsObjectView(), - .Package = BatchResponse, - .ElapsedSeconds = Result.ElapsedSeconds, - .Source = &m_Info}); - } - - return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; - } - } - else - { - ZEN_WARN("Upstream::Zen::GetCacheRecords invalid Response from Upstream."); - } - } - - for (CacheKeyRequest* Request : Requests) - { - OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()}); - } - - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; - } - - virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, - const CacheKey& CacheKey, - const IoHash& ValueContentId) override - { - ZEN_TRACE_CPU("Upstream::Zen::GetCacheChunk"); - - try - { - ZenStructuredCacheSession Session(GetClientRef()); - const ZenCacheResult Result = Session.GetCacheChunk(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId); - - m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); - - if (Result.ErrorCode == 0) - { - return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, - .Value = Result.Response, - .Source = &m_Info}; - } - else - { - return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; - } - } - catch (const std::exception& Err) - { - m_Status.Set(UpstreamEndpointState::kError, Err.what()); - - return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; - } - } - - virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, - std::span<CacheValueRequest*> CacheValueRequests, - OnCacheValueGetComplete&& OnComplete) override final - { - ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); - ZEN_ASSERT(!CacheValueRequests.empty()); - - CbObjectWriter BatchRequest; - BatchRequest << "Method"sv - << "GetCacheValues"sv; - BatchRequest << "Accept"sv << kCbPkgMagic; - - BatchRequest.BeginObject("Params"sv); - { - CachePolicy DefaultPolicy = CacheValueRequests[0]->Policy; - BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); - BatchRequest << "Namespace"sv << Namespace; - - BatchRequest.BeginArray("Requests"sv); - { - for (CacheValueRequest* RequestPtr : CacheValueRequests) - { - const CacheValueRequest& Request = *RequestPtr; - - BatchRequest.BeginObject(); - { - BatchRequest.BeginObject("Key"sv); - BatchRequest << "Bucket"sv << Request.Key.Bucket; - BatchRequest << "Hash"sv << Request.Key.Hash; - BatchRequest.EndObject(); - if (Request.Policy != DefaultPolicy) - { - BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView(); - } - } - BatchRequest.EndObject(); - } - } - BatchRequest.EndArray(); - } - BatchRequest.EndObject(); - - ZenCacheResult Result; - - { - ZenStructuredCacheSession Session(GetClientRef()); - Result = Session.InvokeRpc(BatchRequest.Save()); - } - - m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); - - if (Result.Success) - { - CbPackage BatchResponse; - if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse)) - { - CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); - if (CacheValueRequests.size() != Results.Num()) - { - ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Response results from Upstream."); - } - else - { - for (size_t RequestIndex = 0; CbFieldView ChunkField : Results) - { - CacheValueRequest& Request = *CacheValueRequests[RequestIndex++]; - CbObjectView ChunkObject = ChunkField.AsObjectView(); - IoHash RawHash = ChunkObject["RawHash"sv].AsHash(); - IoBuffer Payload; - uint64_t RawSize = 0; - if (RawHash != IoHash::Zero) - { - bool Success = false; - const CbAttachment* Attachment = BatchResponse.FindAttachment(RawHash); - if (Attachment) - { - if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) - { - Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCompressedBinary); - RawSize = Compressed.DecodeRawSize(); - Success = true; - } - } - if (!Success) - { - CbFieldView RawSizeField = ChunkObject["RawSize"sv]; - RawSize = RawSizeField.AsUInt64(); - Success = !RawSizeField.HasError(); - } - if (!Success) - { - RawHash = IoHash::Zero; - } - } - OnComplete({.Request = Request, - .RawHash = RawHash, - .RawSize = RawSize, - .Value = std::move(Payload), - .ElapsedSeconds = Result.ElapsedSeconds, - .Source = &m_Info}); - } - - return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; - } - } - else - { - ZEN_WARN("Upstream::Zen::GetCacheValues invalid Response from Upstream."); - } - } - - for (CacheValueRequest* RequestPtr : CacheValueRequests) - { - OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); - } - - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; - } - - virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, - std::span<CacheChunkRequest*> CacheChunkRequests, - OnCacheChunksGetComplete&& OnComplete) override final - { - ZEN_TRACE_CPU("Upstream::Zen::GetCacheChunks"); - ZEN_ASSERT(!CacheChunkRequests.empty()); - - CbObjectWriter BatchRequest; - BatchRequest << "Method"sv - << "GetCacheChunks"sv; - BatchRequest << "Accept"sv << kCbPkgMagic; - - BatchRequest.BeginObject("Params"sv); - { - CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy; - BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); - BatchRequest << "Namespace"sv << Namespace; - - BatchRequest.BeginArray("ChunkRequests"sv); - { - for (CacheChunkRequest* RequestPtr : CacheChunkRequests) - { - const CacheChunkRequest& Request = *RequestPtr; - - BatchRequest.BeginObject(); - { - BatchRequest.BeginObject("Key"sv); - BatchRequest << "Bucket"sv << Request.Key.Bucket; - BatchRequest << "Hash"sv << Request.Key.Hash; - BatchRequest.EndObject(); - if (Request.ValueId) - { - BatchRequest.AddObjectId("ValueId"sv, Request.ValueId); - } - if (Request.ChunkId != Request.ChunkId.Zero) - { - BatchRequest << "ChunkId"sv << Request.ChunkId; - } - if (Request.RawOffset != 0) - { - BatchRequest << "RawOffset"sv << Request.RawOffset; - } - if (Request.RawSize != UINT64_MAX) - { - BatchRequest << "RawSize"sv << Request.RawSize; - } - if (Request.Policy != DefaultPolicy) - { - BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView(); - } - } - BatchRequest.EndObject(); - } - } - BatchRequest.EndArray(); - } - BatchRequest.EndObject(); - - ZenCacheResult Result; - - { - ZenStructuredCacheSession Session(GetClientRef()); - Result = Session.InvokeRpc(BatchRequest.Save()); - } - - m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); - - if (Result.Success) - { - CbPackage BatchResponse; - if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse)) - { - CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); - if (CacheChunkRequests.size() != Results.Num()) - { - ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Response results from Upstream."); - } - else - { - for (size_t RequestIndex = 0; CbFieldView ChunkField : Results) - { - CacheChunkRequest& Request = *CacheChunkRequests[RequestIndex++]; - CbObjectView ChunkObject = ChunkField.AsObjectView(); - IoHash RawHash = ChunkObject["RawHash"sv].AsHash(); - IoBuffer Payload; - uint64_t RawSize = 0; - if (RawHash != IoHash::Zero) - { - bool Success = false; - const CbAttachment* Attachment = BatchResponse.FindAttachment(RawHash); - if (Attachment) - { - if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) - { - Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCompressedBinary); - RawSize = Compressed.DecodeRawSize(); - Success = true; - } - } - if (!Success) - { - CbFieldView RawSizeField = ChunkObject["RawSize"sv]; - RawSize = RawSizeField.AsUInt64(); - Success = !RawSizeField.HasError(); - } - if (!Success) - { - RawHash = IoHash::Zero; - } - } - OnComplete({.Request = Request, - .RawHash = RawHash, - .RawSize = RawSize, - .Value = std::move(Payload), - .ElapsedSeconds = Result.ElapsedSeconds, - .Source = &m_Info}); - } - - return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; - } - } - else - { - ZEN_WARN("Upstream::Zen::GetCacheChunks invalid Response from Upstream."); - } - } - - for (CacheChunkRequest* RequestPtr : CacheChunkRequests) - { - OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); - } - - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; - } - - virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, - IoBuffer RecordValue, - std::span<IoBuffer const> Values) override - { - ZEN_TRACE_CPU("Upstream::Zen::PutCacheRecord"); - - ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size()); - const int32_t MaxAttempts = 3; - - try - { - ZenStructuredCacheSession Session(GetClientRef()); - ZenCacheResult Result; - int64_t TotalBytes = 0ull; - double TotalElapsedSeconds = 0.0; - - if (CacheRecord.Type == ZenContentType::kCbPackage) - { - CbPackage Package; - Package.SetObject(CbObject(SharedBuffer(RecordValue))); - - for (const IoBuffer& Value : Values) - { - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value), RawHash, RawSize)) - { - Package.AddAttachment(CbAttachment(AttachmentBuffer, RawHash)); - } - else - { - return {.Reason = std::string("Invalid value buffer"), .Success = false}; - } - } - - BinaryWriter MemStream; - Package.Save(MemStream); - IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size()); - - for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutCacheRecord(CacheRecord.Namespace, - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - PackagePayload, - CacheRecord.Type); - } - - m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); - - TotalBytes = Result.Bytes; - TotalElapsedSeconds = Result.ElapsedSeconds; - } - else if (CacheRecord.Type == ZenContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue), RawHash, RawSize); - if (!Compressed) - { - return {.Reason = std::string("Invalid value compressed buffer"), .Success = false}; - } - - CbPackage BatchPackage; - CbObjectWriter BatchWriter; - BatchWriter << "Method"sv - << "PutCacheValues"sv; - BatchWriter << "Accept"sv << kCbPkgMagic; - - BatchWriter.BeginObject("Params"sv); - { - // DefaultPolicy unspecified and expected to be Default - - BatchWriter << "Namespace"sv << CacheRecord.Namespace; - - BatchWriter.BeginArray("Requests"sv); - { - BatchWriter.BeginObject(); - { - const CacheKey& Key = CacheRecord.Key; - BatchWriter.BeginObject("Key"sv); - { - BatchWriter << "Bucket"sv << Key.Bucket; - BatchWriter << "Hash"sv << Key.Hash; - } - BatchWriter.EndObject(); - // Policy unspecified and expected to be Default - BatchWriter.AddBinaryAttachment("RawHash"sv, RawHash); - BatchPackage.AddAttachment(CbAttachment(Compressed, RawHash)); - } - BatchWriter.EndObject(); - } - BatchWriter.EndArray(); - } - BatchWriter.EndObject(); - BatchPackage.SetObject(BatchWriter.Save()); - - Result.Success = false; - for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.InvokeRpc(BatchPackage); - } - - m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); - - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - } - else - { - for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++) - { - Result.Success = false; - for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutCacheValue(CacheRecord.Namespace, - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - CacheRecord.ValueContentIds[Idx], - Values[Idx]); - } - - m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); - - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - - if (!Result.Success) - { - return {.Reason = "Failed to upload value", - .Bytes = TotalBytes, - .ElapsedSeconds = TotalElapsedSeconds, - .Success = false}; - } - } - - Result.Success = false; - for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutCacheRecord(CacheRecord.Namespace, - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - RecordValue, - CacheRecord.Type); - } - - m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); - - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - } - - return {.Reason = std::move(Result.Reason), - .Bytes = TotalBytes, - .ElapsedSeconds = TotalElapsedSeconds, - .Success = Result.Success}; - } - catch (const std::exception& Err) - { - m_Status.Set(UpstreamEndpointState::kError, Err.what()); - - return {.Reason = std::string(Err.what()), .Success = false}; - } - } - - virtual UpstreamEndpointStats& Stats() override { return m_Stats; } - - private: - Ref<ZenStructuredCacheClient> GetClientRef() - { - // m_Client can be modified at any time by a different thread. - // Make sure we safely bump the refcount inside a scope lock - RwLock::SharedLockScope _(m_ClientLock); - ZEN_ASSERT(m_Client); - Ref<ZenStructuredCacheClient> ClientRef(m_Client); - _.ReleaseNow(); - return ClientRef; - } - - const ZenEndpoint& GetEndpoint() - { - for (ZenEndpoint& Ep : m_Endpoints) - { - Ref<ZenStructuredCacheClient> Client( - new ZenStructuredCacheClient({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)})); - ZenStructuredCacheSession Session(std::move(Client)); - const int32_t SampleCount = 2; - - Ep.Ok = false; - Ep.Latency = {}; - - for (int32_t Sample = 0; Sample < SampleCount; ++Sample) - { - ZenCacheResult Result = Session.CheckHealth(); - Ep.Ok = Result.Success; - Ep.Reason = std::move(Result.Reason); - Ep.Latency += Result.ElapsedSeconds; - } - Ep.Latency /= double(SampleCount); - } - - std::sort(std::begin(m_Endpoints), std::end(m_Endpoints)); - - for (const auto& Ep : m_Endpoints) - { - ZEN_INFO("ping 'Zen' endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason); - } - - return m_Endpoints.front(); - } - - LoggerRef Log() { return m_Log; } - - LoggerRef m_Log; - UpstreamEndpointInfo m_Info; - UpstreamStatus m_Status; - UpstreamEndpointStats m_Stats; - std::vector<ZenEndpoint> m_Endpoints; - std::chrono::milliseconds m_ConnectTimeout; - std::chrono::milliseconds m_Timeout; - RwLock m_ClientLock; - RefPtr<ZenStructuredCacheClient> m_Client; - }; - -} // namespace detail - -////////////////////////////////////////////////////////////////////////// - -class UpstreamCacheImpl final : public UpstreamCache -{ -public: - UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) - : m_Log(logging::Get("upstream")) - , m_Options(Options) - , m_CacheStore(CacheStore) - , m_CidStore(CidStore) - { - } - - virtual ~UpstreamCacheImpl() { Shutdown(); } - - virtual void Initialize() override - { - ZEN_TRACE_CPU("Upstream::Initialize"); - - m_RunState.IsRunning = true; - } - - virtual bool IsActive() override - { - std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); - return !m_Endpoints.empty(); - } - - virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override - { - ZEN_TRACE_CPU("Upstream::RegisterEndpoint"); - - const UpstreamEndpointStatus Status = Endpoint->Initialize(); - const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); - - if (Status.State == UpstreamEndpointState::kOk) - { - ZEN_INFO("register endpoint '{} - {}' {}", Info.Name, Info.Url, ToString(Status.State)); - } - else - { - ZEN_WARN("register endpoint '{} - {}' {}", Info.Name, Info.Url, ToString(Status.State)); - } - - // Register endpoint even if it fails, the health monitor thread will probe failing endpoint(s) - std::unique_lock<std::shared_mutex> _(m_EndpointsMutex); - if (m_Endpoints.empty()) - { - for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) - { - m_UpstreamThreads.emplace_back(&UpstreamCacheImpl::ProcessUpstreamQueue, this, Idx + 1); - } - - m_EndpointMonitorThread = std::thread(&UpstreamCacheImpl::MonitorEndpoints, this); - } - m_Endpoints.emplace_back(std::move(Endpoint)); - } - - virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) override - { - std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); - - for (auto& Ep : m_Endpoints) - { - if (!Fn(*Ep)) - { - break; - } - } - } - - virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override - { - ZEN_TRACE_CPU("Upstream::GetCacheRecord"); - - std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); - - if (m_Options.ReadUpstream) - { - for (auto& Endpoint : m_Endpoints) - { - if (Endpoint->GetState() != UpstreamEndpointState::kOk) - { - continue; - } - - UpstreamEndpointStats& Stats = Endpoint->Stats(); - metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - GetUpstreamCacheSingleResult Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type); - Scope.Stop(); - - Stats.CacheGetCount.Increment(1); - Stats.CacheGetTotalBytes.Increment(Result.Status.Bytes); - - if (Result.Status.Success) - { - Stats.CacheHitCount.Increment(1); - - return Result; - } - - if (Result.Status.Error) - { - Stats.CacheErrorCount.Increment(1); - - ZEN_WARN("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Status.Error.Reason, - Result.Status.Error.ErrorCode); - } - } - } - - return {}; - } - - virtual void GetCacheRecords(std::string_view Namespace, - std::span<CacheKeyRequest*> Requests, - OnCacheRecordGetComplete&& OnComplete) override final - { - ZEN_TRACE_CPU("Upstream::GetCacheRecords"); - - std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); - - std::vector<CacheKeyRequest*> RemainingKeys(Requests.begin(), Requests.end()); - - if (m_Options.ReadUpstream) - { - for (auto& Endpoint : m_Endpoints) - { - if (RemainingKeys.empty()) - { - break; - } - - if (Endpoint->GetState() != UpstreamEndpointState::kOk) - { - continue; - } - - UpstreamEndpointStats& Stats = Endpoint->Stats(); - std::vector<CacheKeyRequest*> Missing; - GetUpstreamCacheResult Result; - { - metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - - Result = Endpoint->GetCacheRecords(Namespace, RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { - if (Params.Record) - { - OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); - - Stats.CacheHitCount.Increment(1); - } - else - { - Missing.push_back(&Params.Request); - } - }); - } - - Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); - Stats.CacheGetTotalBytes.Increment(Result.Bytes); - - if (Result.Error) - { - Stats.CacheErrorCount.Increment(1); - - ZEN_WARN("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); - } - - RemainingKeys = std::move(Missing); - } - } - - const UpstreamEndpointInfo Info; - for (CacheKeyRequest* Request : RemainingKeys) - { - OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()}); - } - } - - virtual void GetCacheChunks(std::string_view Namespace, - std::span<CacheChunkRequest*> CacheChunkRequests, - OnCacheChunksGetComplete&& OnComplete) override final - { - ZEN_TRACE_CPU("Upstream::GetCacheChunks"); - - std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); - - std::vector<CacheChunkRequest*> RemainingKeys(CacheChunkRequests.begin(), CacheChunkRequests.end()); - - if (m_Options.ReadUpstream) - { - for (auto& Endpoint : m_Endpoints) - { - if (RemainingKeys.empty()) - { - break; - } - - if (Endpoint->GetState() != UpstreamEndpointState::kOk) - { - continue; - } - - UpstreamEndpointStats& Stats = Endpoint->Stats(); - std::vector<CacheChunkRequest*> Missing; - GetUpstreamCacheResult Result; - { - metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - - Result = Endpoint->GetCacheChunks(Namespace, RemainingKeys, [&](CacheChunkGetCompleteParams&& Params) { - if (Params.RawHash != Params.RawHash.Zero) - { - OnComplete(std::forward<CacheChunkGetCompleteParams>(Params)); - - Stats.CacheHitCount.Increment(1); - } - else - { - Missing.push_back(&Params.Request); - } - }); - } - - Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); - Stats.CacheGetTotalBytes.Increment(Result.Bytes); - - if (Result.Error) - { - Stats.CacheErrorCount.Increment(1); - - ZEN_WARN("get cache chunks(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); - } - - RemainingKeys = std::move(Missing); - } - } - - const UpstreamEndpointInfo Info; - for (CacheChunkRequest* RequestPtr : RemainingKeys) - { - OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); - } - } - - virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, - const CacheKey& CacheKey, - const IoHash& ValueContentId) override - { - ZEN_TRACE_CPU("Upstream::GetCacheChunk"); - - if (m_Options.ReadUpstream) - { - for (auto& Endpoint : m_Endpoints) - { - if (Endpoint->GetState() != UpstreamEndpointState::kOk) - { - continue; - } - - UpstreamEndpointStats& Stats = Endpoint->Stats(); - metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - GetUpstreamCacheSingleResult Result = Endpoint->GetCacheChunk(Namespace, CacheKey, ValueContentId); - Scope.Stop(); - - Stats.CacheGetCount.Increment(1); - Stats.CacheGetTotalBytes.Increment(Result.Status.Bytes); - - if (Result.Status.Success) - { - Stats.CacheHitCount.Increment(1); - - return Result; - } - - if (Result.Status.Error) - { - Stats.CacheErrorCount.Increment(1); - - ZEN_WARN("get cache chunk FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Status.Error.Reason, - Result.Status.Error.ErrorCode); - } - } - } - - return {}; - } - - virtual void GetCacheValues(std::string_view Namespace, - std::span<CacheValueRequest*> CacheValueRequests, - OnCacheValueGetComplete&& OnComplete) override final - { - ZEN_TRACE_CPU("Upstream::GetCacheValues"); - - std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); - - std::vector<CacheValueRequest*> RemainingKeys(CacheValueRequests.begin(), CacheValueRequests.end()); - - if (m_Options.ReadUpstream) - { - for (auto& Endpoint : m_Endpoints) - { - if (RemainingKeys.empty()) - { - break; - } - - if (Endpoint->GetState() != UpstreamEndpointState::kOk) - { - continue; - } - - UpstreamEndpointStats& Stats = Endpoint->Stats(); - std::vector<CacheValueRequest*> Missing; - GetUpstreamCacheResult Result; - { - metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - - Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { - if (Params.RawHash != Params.RawHash.Zero) - { - OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); - - Stats.CacheHitCount.Increment(1); - } - else - { - Missing.push_back(&Params.Request); - } - }); - } - - Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); - Stats.CacheGetTotalBytes.Increment(Result.Bytes); - - if (Result.Error) - { - Stats.CacheErrorCount.Increment(1); - - ZEN_WARN("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); - } - - RemainingKeys = std::move(Missing); - } - } - - const UpstreamEndpointInfo Info; - for (CacheValueRequest* RequestPtr : RemainingKeys) - { - OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); - } - } - - virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override - { - if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0) - { - if (!m_UpstreamThreads.empty()) - { - m_UpstreamQueue.Enqueue(std::move(CacheRecord)); - } - else - { - ProcessCacheRecord(std::move(CacheRecord)); - } - } - } - - virtual void GetStatus(CbObjectWriter& Status) override - { - ZEN_TRACE_CPU("Upstream::GetStatus"); - - Status << "active" << IsActive(); - Status << "reading" << m_Options.ReadUpstream; - Status << "writing" << m_Options.WriteUpstream; - Status << "worker_threads" << m_UpstreamThreads.size(); - Status << "queue_count" << m_UpstreamQueue.Size(); - - Status.BeginArray("endpoints"); - for (const auto& Ep : m_Endpoints) - { - const UpstreamEndpointInfo& EpInfo = Ep->GetEndpointInfo(); - const UpstreamEndpointStatus EpStatus = Ep->GetStatus(); - UpstreamEndpointStats& EpStats = Ep->Stats(); - - Status.BeginObject(); - Status << "name" << EpInfo.Name; - Status << "url" << EpInfo.Url; - Status << "state" << ToString(EpStatus.State); - Status << "reason" << EpStatus.Reason; - - Status.BeginObject("cache"sv); - { - const int64_t GetCount = EpStats.CacheGetCount.Value(); - const int64_t HitCount = EpStats.CacheHitCount.Value(); - const int64_t ErrorCount = EpStats.CacheErrorCount.Value(); - const double HitRatio = GetCount > 0 ? double(HitCount) / double(GetCount) : 0.0; - const double ErrorRatio = GetCount > 0 ? double(ErrorCount) / double(GetCount) : 0.0; - - metrics::EmitSnapshot("get_requests"sv, EpStats.CacheGetRequestTiming, Status); - Status << "get_bytes" << EpStats.CacheGetTotalBytes.Value(); - Status << "get_count" << GetCount; - Status << "hit_count" << HitCount; - Status << "hit_ratio" << HitRatio; - Status << "error_count" << ErrorCount; - Status << "error_ratio" << ErrorRatio; - metrics::EmitSnapshot("put_requests"sv, EpStats.CachePutRequestTiming, Status); - Status << "put_bytes" << EpStats.CachePutTotalBytes.Value(); - } - Status.EndObject(); - - Status.EndObject(); - } - Status.EndArray(); - } - -private: - void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) - { - ZEN_TRACE_CPU("Upstream::ProcessCacheRecord"); - - ZenCacheValue CacheValue; - std::vector<IoBuffer> Payloads; - - if (!m_CacheStore.Get(CacheRecord.Context, CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) - { - ZEN_WARN("process upstream FAILED, '{}/{}/{}', cache record doesn't exist", - CacheRecord.Namespace, - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash); - return; - } - - for (const IoHash& ValueContentId : CacheRecord.ValueContentIds) - { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId)) - { - Payloads.push_back(Payload); - } - else - { - ZEN_WARN("process upstream FAILED, '{}/{}/{}/{}', ValueContentId doesn't exist in CAS", - CacheRecord.Namespace, - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - ValueContentId); - return; - } - } - - std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); - - for (auto& Endpoint : m_Endpoints) - { - if (Endpoint->GetState() != UpstreamEndpointState::kOk) - { - continue; - } - - UpstreamEndpointStats& Stats = Endpoint->Stats(); - PutUpstreamCacheResult Result; - { - metrics::OperationTiming::Scope Scope(Stats.CachePutRequestTiming); - Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); - } - - Stats.CachePutTotalBytes.Increment(Result.Bytes); - - if (!Result.Success) - { - ZEN_WARN("upload cache record '{}/{}/{}' FAILED, endpoint '{}', reason '{}'", - CacheRecord.Namespace, - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - Endpoint->GetEndpointInfo().Url, - Result.Reason); - } - } - } - - void ProcessUpstreamQueue(int ThreadIndex) - { - std::string ThreadName = fmt::format("upstream_{}", ThreadIndex); - SetCurrentThreadName(ThreadName); - - for (;;) - { - UpstreamCacheRecord CacheRecord; - if (m_UpstreamQueue.WaitAndDequeue(CacheRecord)) - { - try - { - ProcessCacheRecord(std::move(CacheRecord)); - } - catch (const std::exception& Err) - { - ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'", - CacheRecord.Namespace, - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - Err.what()); - } - } - - if (!m_RunState.IsRunning) - { - break; - } - } - } - - void MonitorEndpoints() - { - SetCurrentThreadName("upstream_monitor"); - - for (;;) - { - { - std::unique_lock lk(m_RunState.Mutex); - if (m_RunState.ExitSignal.wait_for(lk, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) - { - break; - } - } - - try - { - std::vector<UpstreamEndpoint*> Endpoints; - - { - std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); - - for (auto& Endpoint : m_Endpoints) - { - UpstreamEndpointState State = Endpoint->GetState(); - if (State == UpstreamEndpointState::kError) - { - Endpoints.push_back(Endpoint.get()); - ZEN_WARN("HEALTH - endpoint '{} - {}' is in error state '{}'", - Endpoint->GetEndpointInfo().Name, - Endpoint->GetEndpointInfo().Url, - Endpoint->GetStatus().Reason); - } - if (State == UpstreamEndpointState::kUnauthorized) - { - Endpoints.push_back(Endpoint.get()); - } - } - } - - for (auto& Endpoint : Endpoints) - { - const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); - const UpstreamEndpointStatus Status = Endpoint->Initialize(); - - if (Status.State == UpstreamEndpointState::kOk) - { - ZEN_INFO("HEALTH - endpoint '{} - {}' Ok", Info.Name, Info.Url); - } - else - { - const std::string Reason = Status.Reason.empty() ? "" : fmt::format(", reason '{}'", Status.Reason); - ZEN_WARN("HEALTH - endpoint '{} - {}' {} {}", Info.Name, Info.Url, ToString(Status.State), Reason); - } - } - } - catch (const std::exception& Err) - { - ZEN_ERROR("check endpoint(s) health FAILED, reason '{}'", Err.what()); - } - } - } - - void Shutdown() - { - if (m_RunState.Stop()) - { - m_UpstreamQueue.CompleteAdding(); - for (std::thread& Thread : m_UpstreamThreads) - { - Thread.join(); - } - if (m_EndpointMonitorThread.joinable()) - { - m_EndpointMonitorThread.join(); - } - m_UpstreamThreads.clear(); - m_Endpoints.clear(); - } - } - - LoggerRef Log() { return m_Log; } - - using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>; - - struct RunState - { - std::mutex Mutex; - std::condition_variable ExitSignal; - std::atomic_bool IsRunning{false}; - - bool Stop() - { - bool Stopped = false; - { - std::lock_guard _(Mutex); - Stopped = IsRunning.exchange(false); - } - if (Stopped) - { - ExitSignal.notify_all(); - } - return Stopped; - } - }; - - LoggerRef m_Log; - UpstreamCacheOptions m_Options; - ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; - UpstreamQueue m_UpstreamQueue; - std::shared_mutex m_EndpointsMutex; - std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints; - std::vector<std::thread> m_UpstreamThreads; - std::thread m_EndpointMonitorThread; - RunState m_RunState; -}; - -////////////////////////////////////////////////////////////////////////// - -std::unique_ptr<UpstreamEndpoint> -UpstreamEndpoint::CreateZenEndpoint(const ZenStructuredCacheClientOptions& Options) -{ - return std::make_unique<detail::ZenUpstreamEndpoint>(Options); -} - -std::unique_ptr<UpstreamEndpoint> -UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr) -{ - return std::make_unique<detail::JupiterUpstreamEndpoint>(Options, AuthConfig, Mgr); -} - -std::unique_ptr<UpstreamCache> -CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) -{ - return std::make_unique<UpstreamCacheImpl>(Options, CacheStore, CidStore); -} - -} // namespace zen |