aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-09-06 16:42:59 +0200
committerGitHub <[email protected]>2022-09-06 07:42:59 -0700
commitf9ddb13500f41549fa472f6aaffa096d64554145 (patch)
tree24de689afc0cdb6cd7dd99ce85d66b7223630c64 /zenserver/upstream/upstreamcache.cpp
parentupdated codeowners (#157) (diff)
downloadzen-f9ddb13500f41549fa472f6aaffa096d64554145.tar.xz
zen-f9ddb13500f41549fa472f6aaffa096d64554145.zip
Implement proper GetCacheValues upstream (#155)
* Implement proper GetCacheValues upstream * changelog
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp249
1 files changed, 235 insertions, 14 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index a897c21fd..7f5759e47 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -378,11 +378,11 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace,
+ virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace,
std::span<CacheChunkRequest*> CacheChunkRequests,
- OnCacheValueGetComplete&& OnComplete) override final
+ OnCacheChunksGetComplete&& OnComplete) override final
{
- ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues");
+ ZEN_TRACE_CPU("Upstream::Horde::GetCacheChunks");
CloudCacheSession Session(m_Client);
GetUpstreamCacheResult Result;
@@ -424,6 +424,52 @@ namespace detail {
return Result;
}
+ virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace,
+ std::span<CacheValueRequest*> CacheValueRequests,
+ OnCacheValueGetComplete&& OnComplete) override final
+ {
+ ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues");
+
+ CloudCacheSession Session(m_Client);
+ GetUpstreamCacheResult Result;
+
+ for (CacheValueRequest* RequestPtr : CacheValueRequests)
+ {
+ CacheValueRequest& Request = *RequestPtr;
+ IoBuffer Payload;
+
+ CompressedBuffer Compressed;
+ if (!Result.Error)
+ {
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
+ const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, IoHash::Zero);
+ Payload = BlobResult.Response;
+
+ AppendResult(BlobResult, Result);
+
+ m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
+ if (Payload && IsCompressedBinary(Payload.GetContentType()))
+ {
+ Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ }
+ }
+
+ if (Compressed)
+ {
+ OnComplete({.Request = Request,
+ .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()),
+ .RawSize = Compressed.GetRawSize(),
+ .Value = Payload});
+ }
+ else
+ {
+ OnComplete({.Request = Request, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
+ }
+ }
+
+ return Result;
+ }
+
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
std::span<IoBuffer const> Values) override
@@ -851,7 +897,7 @@ namespace detail {
const CacheKey& CacheKey,
const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue");
+ ZEN_TRACE_CPU("Upstream::Zen::GetCacheValue");
try
{
@@ -881,22 +927,130 @@ namespace detail {
}
virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace,
- std::span<CacheChunkRequest*> CacheChunkRequests,
+ std::span<CacheValueRequest*> CacheValueRequests,
OnCacheValueGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues");
+ ZEN_ASSERT(!CacheValueRequests.empty());
+
+ CbObjectWriter BatchRequest;
+ BatchRequest << "Method"sv
+ << "GetCacheValues"sv;
+
+ BatchRequest.BeginObject("Params"sv);
+ {
+ CachePolicy DefaultPolicy = CacheValueRequests[0]->Policy;
+ BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView();
+ BatchRequest << "Namespace"sv << Namespace;
+
+ BatchRequest.BeginArray("Requests"sv);
+ {
+ for (CacheValueRequest* RequestPtr : CacheValueRequests)
+ {
+ const CacheValueRequest& Request = *RequestPtr;
+
+ BatchRequest.BeginObject();
+ {
+ BatchRequest.BeginObject("Key"sv);
+ BatchRequest << "Bucket"sv << Request.Key.Bucket;
+ BatchRequest << "Hash"sv << Request.Key.Hash;
+ BatchRequest.EndObject();
+ if (Request.Policy != DefaultPolicy)
+ {
+ BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView();
+ }
+ }
+ BatchRequest.EndObject();
+ }
+ }
+ BatchRequest.EndArray();
+ }
+ BatchRequest.EndObject();
+
+ CbPackage BatchResponse;
+ ZenCacheResult Result;
+
+ {
+ ZenStructuredCacheSession Session(GetClientRef());
+ Result = Session.InvokeRpc(BatchRequest.Save());
+ }
+
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
+ if (Result.Success)
+ {
+ if (BatchResponse.TryLoad(Result.Response))
+ {
+ CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView();
+ if (CacheValueRequests.size() != Results.Num())
+ {
+ ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream.");
+ }
+ else
+ {
+ for (size_t RequestIndex = 0; CbFieldView ChunkField : Results)
+ {
+ CacheValueRequest& Request = *CacheValueRequests[RequestIndex++];
+ CbObjectView ChunkObject = ChunkField.AsObjectView();
+ IoHash RawHash = ChunkObject["RawHash"sv].AsHash();
+ IoBuffer Payload;
+ uint64_t RawSize = 0;
+ if (RawHash != IoHash::Zero)
+ {
+ bool Success = false;
+ const CbAttachment* Attachment = BatchResponse.FindAttachment(RawHash);
+ if (Attachment)
+ {
+ if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary())
+ {
+ Payload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+ RawSize = Compressed.GetRawSize();
+ Success = true;
+ }
+ }
+ if (!Success)
+ {
+ CbFieldView RawSizeField = ChunkObject["RawSize"sv];
+ RawSize = RawSizeField.AsUInt64();
+ Success = !RawSizeField.HasError();
+ }
+ if (!Success)
+ {
+ RawHash = IoHash::Zero;
+ }
+ }
+ OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload)});
+ }
+
+ return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
+ }
+ }
+ }
+
+ for (CacheValueRequest* RequestPtr : CacheValueRequests)
+ {
+ OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
+ }
+
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
+
+ virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace,
+ std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheChunksGetComplete&& OnComplete) override final
+ {
+ ZEN_TRACE_CPU("Upstream::Zen::GetCacheChunks");
ZEN_ASSERT(!CacheChunkRequests.empty());
CbObjectWriter BatchRequest;
BatchRequest << "Method"sv
<< "GetCacheChunks"sv;
- BatchRequest << "Namespace"sv << Namespace;
BatchRequest.BeginObject("Params"sv);
{
CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy;
BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView();
-
BatchRequest << "Namespace"sv << Namespace;
BatchRequest.BeginArray("ChunkRequests"sv);
@@ -956,7 +1110,7 @@ namespace detail {
CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView();
if (CacheChunkRequests.size() != Results.Num())
{
- ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream.");
+ ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Requests from Upstream.");
}
else
{
@@ -1401,11 +1555,11 @@ public:
}
}
- virtual void GetCacheValues(std::string_view Namespace,
+ virtual void GetCacheChunks(std::string_view Namespace,
std::span<CacheChunkRequest*> CacheChunkRequests,
- OnCacheValueGetComplete&& OnComplete) override final
+ OnCacheChunksGetComplete&& OnComplete) override final
{
- ZEN_TRACE_CPU("Upstream::GetCacheValues");
+ ZEN_TRACE_CPU("Upstream::GetCacheChunks");
std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
@@ -1431,10 +1585,10 @@ public:
{
metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming);
- Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) {
+ Result = Endpoint->GetCacheChunks(Namespace, RemainingKeys, [&](CacheChunkGetCompleteParams&& Params) {
if (Params.RawHash != Params.RawHash.Zero)
{
- OnComplete(std::forward<CacheValueGetCompleteParams>(Params));
+ OnComplete(std::forward<CacheChunkGetCompleteParams>(Params));
Stats.CacheHitCount.Increment(1);
}
@@ -1462,7 +1616,7 @@ public:
}
}
- for (CacheChunkRequest* RequestPtr : CacheChunkRequests)
+ for (CacheChunkRequest* RequestPtr : RemainingKeys)
{
OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
}
@@ -1516,6 +1670,73 @@ public:
return {};
}
+ virtual void GetCacheValues(std::string_view Namespace,
+ std::span<CacheValueRequest*> CacheValueRequests,
+ OnCacheValueGetComplete&& OnComplete) override final
+ {
+ ZEN_TRACE_CPU("Upstream::GetCacheValues");
+
+ std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
+
+ std::vector<CacheValueRequest*> RemainingKeys(CacheValueRequests.begin(), CacheValueRequests.end());
+
+ if (m_Options.ReadUpstream)
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (RemainingKeys.empty())
+ {
+ break;
+ }
+
+ if (Endpoint->GetState() != UpstreamEndpointState::kOk)
+ {
+ continue;
+ }
+
+ UpstreamEndpointStats& Stats = Endpoint->Stats();
+ std::vector<CacheValueRequest*> Missing;
+ GetUpstreamCacheResult Result;
+ {
+ metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming);
+
+ Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) {
+ if (Params.RawHash != Params.RawHash.Zero)
+ {
+ OnComplete(std::forward<CacheValueGetCompleteParams>(Params));
+
+ Stats.CacheHitCount.Increment(1);
+ }
+ else
+ {
+ Missing.push_back(&Params.Request);
+ }
+ });
+ }
+
+ Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size()));
+ Stats.CacheGetTotalBytes.Increment(Result.Bytes);
+
+ if (Result.Error)
+ {
+ Stats.CacheErrorCount.Increment(1);
+
+ ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
+ }
+
+ RemainingKeys = std::move(Missing);
+ }
+ }
+
+ for (CacheValueRequest* RequestPtr : RemainingKeys)
+ {
+ OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
+ }
+ }
+
virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
{
if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0)