aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-09-06 16:42:59 +0200
committerGitHub <[email protected]>2022-09-06 07:42:59 -0700
commitf9ddb13500f41549fa472f6aaffa096d64554145 (patch)
tree24de689afc0cdb6cd7dd99ce85d66b7223630c64
parentupdated codeowners (#157) (diff)
downloadzen-f9ddb13500f41549fa472f6aaffa096d64554145.tar.xz
zen-f9ddb13500f41549fa472f6aaffa096d64554145.zip
Implement proper GetCacheValues upstream (#155)
* Implement proper GetCacheValues upstream * changelog
-rw-r--r--CHANGELOG.md1
-rw-r--r--VERSION.txt2
-rw-r--r--zenserver/cache/structuredcache.cpp51
-rw-r--r--zenserver/upstream/upstreamcache.cpp249
-rw-r--r--zenserver/upstream/upstreamcache.h24
-rw-r--r--zenutil/include/zenutil/cache/cachekey.h6
6 files changed, 298 insertions, 35 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8d5a41975..b3378372e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,7 @@
- Improvement: namespace/bucket validation now uses AsciiSet for more efficient validation
- Improvement: Frontend: simplified content-type logic
- Improvement: Improved message indicating no GC is scheduled
+- Improvement: Implement proper GetCacheValues upstream path
- Bugfix: Fixed issue in CbPackage marshaling of local reference
- Bugfix: Fix crash when switching Zen upstream configured via DNS when one endpoint becomes unresposive
- Bugfix: Fixed issue where projects would not be discovered via DiscoverProjects due to use of stem() vs filename()
diff --git a/VERSION.txt b/VERSION.txt
index 650599101..c812ecb80 100644
--- a/VERSION.txt
+++ b/VERSION.txt
@@ -1 +1 @@
-0.1.4-pre24 \ No newline at end of file
+0.1.4-pre26 \ No newline at end of file
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 1f89e7362..f8cbfa55c 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -1718,6 +1718,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
{
CacheKey Key;
CachePolicy Policy;
+ IoHash RawHash = IoHash::Zero;
+ uint64_t RawSize = 0;
CompressedBuffer Result;
};
std::vector<RequestData> Requests;
@@ -1788,35 +1790,45 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
if (!RemoteRequestIndexes.empty())
{
- std::vector<CacheChunkRequest> RequestedRecordsData;
- std::vector<CacheChunkRequest*> CacheChunkRequests;
+ std::vector<CacheValueRequest> RequestedRecordsData;
+ std::vector<CacheValueRequest*> CacheValueRequests;
RequestedRecordsData.reserve(RemoteRequestIndexes.size());
- CacheChunkRequests.reserve(RemoteRequestIndexes.size());
+ CacheValueRequests.reserve(RemoteRequestIndexes.size());
for (size_t Index : RemoteRequestIndexes)
{
RequestData& Request = Requests[Index];
- RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}});
- CacheChunkRequests.push_back(&RequestedRecordsData.back());
+ RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}, .Policy = ConvertToUpstream(Request.Policy)});
+ CacheValueRequests.push_back(&RequestedRecordsData.back());
}
Stopwatch Timer;
m_UpstreamCache.GetCacheValues(
*Namespace,
- CacheChunkRequests,
+ CacheValueRequests,
[this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) {
- CacheChunkRequest& ChunkRequest = Params.Request;
- if (Params.Value)
+ CacheValueRequest& ChunkRequest = Params.Request;
+ if (Params.RawHash != IoHash::Zero)
{
size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest);
size_t RequestIndex = RemoteRequestIndexes[RequestOffset];
RequestData& Request = Requests[RequestIndex];
- Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
- if (Request.Result && IsCompressedBinary(Params.Value.GetContentType()))
+ Request.RawHash = Params.RawHash;
+ Request.RawSize = Params.RawSize;
+ const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
+ const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
+ const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal);
+ const bool IsHit = SkipData || HasData;
+ if (IsHit)
{
- // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement
- // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive
- // for us to keep the data only on the upstream server.
- // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
- m_CacheStore.Put(*Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value});
+ if (HasData && !SkipData)
+ {
+ Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
+ }
+
+ if (HasData && StoreData)
+ {
+ m_CacheStore.Put(*Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value});
+ }
+
ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
*Namespace,
ChunkRequest.Key.Bucket,
@@ -1864,6 +1876,11 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize());
}
}
+ else if (Request.RawHash != IoHash::Zero)
+ {
+ ResponseObject.AddHash("RawHash"sv, Request.RawHash);
+ ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
+ }
}
ResponseObject.EndObject();
}
@@ -2260,7 +2277,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
if (!UpstreamChunks.empty())
{
- const auto OnCacheValueGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) {
+ const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheChunkGetCompleteParams&& Params) {
if (Params.RawHash == Params.RawHash.Zero)
{
return;
@@ -2304,7 +2321,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
m_CacheStats.UpstreamHitCount++;
};
- m_UpstreamCache.GetCacheValues(Namespace, UpstreamChunks, std::move(OnCacheValueGetComplete));
+ m_UpstreamCache.GetCacheChunks(Namespace, UpstreamChunks, std::move(OnCacheChunksGetComplete));
}
}
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index a897c21fd..7f5759e47 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -378,11 +378,11 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace,
+ virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace,
std::span<CacheChunkRequest*> CacheChunkRequests,
- OnCacheValueGetComplete&& OnComplete) override final
+ OnCacheChunksGetComplete&& OnComplete) override final
{
- ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues");
+ ZEN_TRACE_CPU("Upstream::Horde::GetCacheChunks");
CloudCacheSession Session(m_Client);
GetUpstreamCacheResult Result;
@@ -424,6 +424,52 @@ namespace detail {
return Result;
}
+ virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace,
+ std::span<CacheValueRequest*> CacheValueRequests,
+ OnCacheValueGetComplete&& OnComplete) override final
+ {
+ ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues");
+
+ CloudCacheSession Session(m_Client);
+ GetUpstreamCacheResult Result;
+
+ for (CacheValueRequest* RequestPtr : CacheValueRequests)
+ {
+ CacheValueRequest& Request = *RequestPtr;
+ IoBuffer Payload;
+
+ CompressedBuffer Compressed;
+ if (!Result.Error)
+ {
+ std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
+ const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, IoHash::Zero);
+ Payload = BlobResult.Response;
+
+ AppendResult(BlobResult, Result);
+
+ m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
+ if (Payload && IsCompressedBinary(Payload.GetContentType()))
+ {
+ Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ }
+ }
+
+ if (Compressed)
+ {
+ OnComplete({.Request = Request,
+ .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()),
+ .RawSize = Compressed.GetRawSize(),
+ .Value = Payload});
+ }
+ else
+ {
+ OnComplete({.Request = Request, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
+ }
+ }
+
+ return Result;
+ }
+
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
std::span<IoBuffer const> Values) override
@@ -851,7 +897,7 @@ namespace detail {
const CacheKey& CacheKey,
const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue");
+ ZEN_TRACE_CPU("Upstream::Zen::GetCacheValue");
try
{
@@ -881,22 +927,130 @@ namespace detail {
}
virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace,
- std::span<CacheChunkRequest*> CacheChunkRequests,
+ std::span<CacheValueRequest*> CacheValueRequests,
OnCacheValueGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues");
+ ZEN_ASSERT(!CacheValueRequests.empty());
+
+ CbObjectWriter BatchRequest;
+ BatchRequest << "Method"sv
+ << "GetCacheValues"sv;
+
+ BatchRequest.BeginObject("Params"sv);
+ {
+ CachePolicy DefaultPolicy = CacheValueRequests[0]->Policy;
+ BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView();
+ BatchRequest << "Namespace"sv << Namespace;
+
+ BatchRequest.BeginArray("Requests"sv);
+ {
+ for (CacheValueRequest* RequestPtr : CacheValueRequests)
+ {
+ const CacheValueRequest& Request = *RequestPtr;
+
+ BatchRequest.BeginObject();
+ {
+ BatchRequest.BeginObject("Key"sv);
+ BatchRequest << "Bucket"sv << Request.Key.Bucket;
+ BatchRequest << "Hash"sv << Request.Key.Hash;
+ BatchRequest.EndObject();
+ if (Request.Policy != DefaultPolicy)
+ {
+ BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView();
+ }
+ }
+ BatchRequest.EndObject();
+ }
+ }
+ BatchRequest.EndArray();
+ }
+ BatchRequest.EndObject();
+
+ CbPackage BatchResponse;
+ ZenCacheResult Result;
+
+ {
+ ZenStructuredCacheSession Session(GetClientRef());
+ Result = Session.InvokeRpc(BatchRequest.Save());
+ }
+
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
+ if (Result.Success)
+ {
+ if (BatchResponse.TryLoad(Result.Response))
+ {
+ CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView();
+ if (CacheValueRequests.size() != Results.Num())
+ {
+ ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream.");
+ }
+ else
+ {
+ for (size_t RequestIndex = 0; CbFieldView ChunkField : Results)
+ {
+ CacheValueRequest& Request = *CacheValueRequests[RequestIndex++];
+ CbObjectView ChunkObject = ChunkField.AsObjectView();
+ IoHash RawHash = ChunkObject["RawHash"sv].AsHash();
+ IoBuffer Payload;
+ uint64_t RawSize = 0;
+ if (RawHash != IoHash::Zero)
+ {
+ bool Success = false;
+ const CbAttachment* Attachment = BatchResponse.FindAttachment(RawHash);
+ if (Attachment)
+ {
+ if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary())
+ {
+ Payload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+ RawSize = Compressed.GetRawSize();
+ Success = true;
+ }
+ }
+ if (!Success)
+ {
+ CbFieldView RawSizeField = ChunkObject["RawSize"sv];
+ RawSize = RawSizeField.AsUInt64();
+ Success = !RawSizeField.HasError();
+ }
+ if (!Success)
+ {
+ RawHash = IoHash::Zero;
+ }
+ }
+ OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload)});
+ }
+
+ return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
+ }
+ }
+ }
+
+ for (CacheValueRequest* RequestPtr : CacheValueRequests)
+ {
+ OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
+ }
+
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
+
+ virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace,
+ std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheChunksGetComplete&& OnComplete) override final
+ {
+ ZEN_TRACE_CPU("Upstream::Zen::GetCacheChunks");
ZEN_ASSERT(!CacheChunkRequests.empty());
CbObjectWriter BatchRequest;
BatchRequest << "Method"sv
<< "GetCacheChunks"sv;
- BatchRequest << "Namespace"sv << Namespace;
BatchRequest.BeginObject("Params"sv);
{
CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy;
BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView();
-
BatchRequest << "Namespace"sv << Namespace;
BatchRequest.BeginArray("ChunkRequests"sv);
@@ -956,7 +1110,7 @@ namespace detail {
CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView();
if (CacheChunkRequests.size() != Results.Num())
{
- ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream.");
+ ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Requests from Upstream.");
}
else
{
@@ -1401,11 +1555,11 @@ public:
}
}
- virtual void GetCacheValues(std::string_view Namespace,
+ virtual void GetCacheChunks(std::string_view Namespace,
std::span<CacheChunkRequest*> CacheChunkRequests,
- OnCacheValueGetComplete&& OnComplete) override final
+ OnCacheChunksGetComplete&& OnComplete) override final
{
- ZEN_TRACE_CPU("Upstream::GetCacheValues");
+ ZEN_TRACE_CPU("Upstream::GetCacheChunks");
std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
@@ -1431,10 +1585,10 @@ public:
{
metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming);
- Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) {
+ Result = Endpoint->GetCacheChunks(Namespace, RemainingKeys, [&](CacheChunkGetCompleteParams&& Params) {
if (Params.RawHash != Params.RawHash.Zero)
{
- OnComplete(std::forward<CacheValueGetCompleteParams>(Params));
+ OnComplete(std::forward<CacheChunkGetCompleteParams>(Params));
Stats.CacheHitCount.Increment(1);
}
@@ -1462,7 +1616,7 @@ public:
}
}
- for (CacheChunkRequest* RequestPtr : CacheChunkRequests)
+ for (CacheChunkRequest* RequestPtr : RemainingKeys)
{
OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
}
@@ -1516,6 +1670,73 @@ public:
return {};
}
+ virtual void GetCacheValues(std::string_view Namespace,
+ std::span<CacheValueRequest*> CacheValueRequests,
+ OnCacheValueGetComplete&& OnComplete) override final
+ {
+ ZEN_TRACE_CPU("Upstream::GetCacheValues");
+
+ std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
+
+ std::vector<CacheValueRequest*> RemainingKeys(CacheValueRequests.begin(), CacheValueRequests.end());
+
+ if (m_Options.ReadUpstream)
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (RemainingKeys.empty())
+ {
+ break;
+ }
+
+ if (Endpoint->GetState() != UpstreamEndpointState::kOk)
+ {
+ continue;
+ }
+
+ UpstreamEndpointStats& Stats = Endpoint->Stats();
+ std::vector<CacheValueRequest*> Missing;
+ GetUpstreamCacheResult Result;
+ {
+ metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming);
+
+ Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) {
+ if (Params.RawHash != Params.RawHash.Zero)
+ {
+ OnComplete(std::forward<CacheValueGetCompleteParams>(Params));
+
+ Stats.CacheHitCount.Increment(1);
+ }
+ else
+ {
+ Missing.push_back(&Params.Request);
+ }
+ });
+ }
+
+ Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size()));
+ Stats.CacheGetTotalBytes.Increment(Result.Bytes);
+
+ if (Result.Error)
+ {
+ Stats.CacheErrorCount.Increment(1);
+
+ ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
+ }
+
+ RemainingKeys = std::move(Missing);
+ }
+ }
+
+ for (CacheValueRequest* RequestPtr : RemainingKeys)
+ {
+ OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()});
+ }
+ }
+
virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
{
if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0)
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 13548efc8..5b154a1b5 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -81,7 +81,7 @@ using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams
struct CacheValueGetCompleteParams
{
- CacheChunkRequest& Request;
+ CacheValueRequest& Request;
IoHash RawHash;
uint64_t RawSize;
IoBuffer Value;
@@ -89,6 +89,16 @@ struct CacheValueGetCompleteParams
using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>;
+struct CacheChunkGetCompleteParams
+{
+ CacheChunkRequest& Request;
+ IoHash RawHash;
+ uint64_t RawSize;
+ IoBuffer Value;
+};
+
+using OnCacheChunksGetComplete = std::function<void(CacheChunkGetCompleteParams&&)>;
+
struct UpstreamEndpointStats
{
metrics::OperationTiming CacheGetRequestTiming;
@@ -171,9 +181,13 @@ public:
virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace,
- std::span<CacheChunkRequest*> CacheChunkRequests,
+ std::span<CacheValueRequest*> CacheValueRequests,
OnCacheValueGetComplete&& OnComplete) = 0;
+ virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace,
+ std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheChunksGetComplete&& OnComplete) = 0;
+
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
std::span<IoBuffer const> Payloads) = 0;
@@ -207,9 +221,13 @@ public:
virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& ValueContentId) = 0;
virtual void GetCacheValues(std::string_view Namespace,
- std::span<CacheChunkRequest*> CacheChunkRequests,
+ std::span<CacheValueRequest*> CacheValueRequests,
OnCacheValueGetComplete&& OnComplete) = 0;
+ virtual void GetCacheChunks(std::string_view Namespace,
+ std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheChunksGetComplete&& OnComplete) = 0;
+
virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
virtual void GetStatus(CbObjectWriter& CbO) = 0;
diff --git a/zenutil/include/zenutil/cache/cachekey.h b/zenutil/include/zenutil/cache/cachekey.h
index 9adde8fc7..741375946 100644
--- a/zenutil/include/zenutil/cache/cachekey.h
+++ b/zenutil/include/zenutil/cache/cachekey.h
@@ -47,6 +47,12 @@ struct CacheKeyRequest
CacheRecordPolicy Policy;
};
+struct CacheValueRequest
+{
+ CacheKey Key;
+ CachePolicy Policy = CachePolicy::Default;
+};
+
inline bool
operator<(const CacheChunkRequest& A, const CacheChunkRequest& B)
{