aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-09-13 12:42:40 +0200
committerDan Engelbrecht <[email protected]>2022-09-15 12:36:45 +0200
commitb2f3b832437590b5c9eedf0423150df3f199f4b7 (patch)
treee1b09dcec8ef2541d13d1f3e5f1084e25ddc6cd9 /zenserver
parentGetCacheChunksRequest with separate policy and OptionalValueFilter (diff)
downloadzen-de/get-cache-chunks-api-with-separate-policy.tar.xz
zen-de/get-cache-chunks-api-with-separate-policy.zip
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/cache/structuredcache.cpp581
-rw-r--r--zenserver/cache/structuredcache.h20
2 files changed, 594 insertions, 7 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 11a23242f..dfed42aa5 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -1535,6 +1535,237 @@ namespace impl {
return HttpResponseCode::OK;
}
+ HttpResponseCode GetCacheChunks(ZenCacheStoreBase& CacheStore,
+ CidStoreBase& CidStore,
+ UpstreamCache& UpstreamCache,
+ std::atomic_uint64_t&,
+ std::atomic_uint64_t&,
+ std::atomic_uint64_t&,
+ cacherequests::GetCacheChunksRequest& Request,
+ const cacherequests::ChunksRequestPolicy& Policy,
+ cacherequests::GetCacheChunksResult& OutResult)
+ {
+ auto GetChunkIdsFromCacheRecord = [&Request](size_t RequestIndex, const ZenCacheValue& RecordCacheValue) -> size_t {
+ if (RecordCacheValue.Value.GetContentType() != ZenContentType::kCbObject)
+ {
+ ZEN_WARN("local record {}/{}/{} is not a structured object, skipping.",
+ Request.Namespace,
+ Request.Requests[RequestIndex].Key.Bucket,
+ Request.Requests[RequestIndex].Key.Hash);
+ return 0;
+ }
+ cacherequests::CacheRecord CacheRecord;
+ CbObject Record = LoadCompactBinaryObject(RecordCacheValue.Value);
+ if (!CacheRecord.Parse(Record))
+ {
+ ZEN_WARN("local record {}/{}/{} is corrupt, skipping",
+ Request.Namespace,
+ Request.Requests[RequestIndex].Key.Bucket,
+ Request.Requests[RequestIndex].Key.Hash);
+ return 0;
+ }
+ size_t ChunkCount = 0;
+ while (CacheRecord.Key == Request.Requests[RequestIndex + ChunkCount].Key)
+ {
+ cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex + ChunkCount];
+ auto FindIt = std::find_if(
+ CacheRecord.Values.begin(),
+ CacheRecord.Values.end(),
+ [&ChunkRequest](const cacherequests::CacheRecordValue& Value) { return Value.Id == ChunkRequest.ValueId; });
+ if (FindIt != CacheRecord.Values.end())
+ {
+ ChunkRequest.ChunkId = FindIt->RawHash;
+ ChunkRequest.RawSize = FindIt->RawSize;
+ }
+ ChunkCount++;
+ }
+ return ChunkCount;
+ };
+
+ // Figure out which records we need, locally we just get them, upstream we request them
+ std::vector<size_t> UpstreamRecordRequestIndexes;
+ size_t RequestCount = Request.Requests.size();
+ {
+ size_t RequestIndex = 0;
+ while (RequestIndex < RequestCount)
+ {
+ cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex];
+ // cacherequests::CacheValueResult& ChunkResult = OutResult.Results[RequestIndex];
+ CachePolicy ChunkPolicy = cacherequests::GetEffectiveChunkPolicy(Policy, RequestIndex);
+ const bool HasValueId = ChunkRequest.ValueId != Oid::Zero;
+ const bool HasChunkId = ChunkRequest.ChunkId != IoHash::Zero;
+ const bool QueryLocal = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryLocal);
+ // const bool SkipData = EnumHasAnyFlags(ChunkPolicy, CachePolicy::SkipData);
+ const bool QueryRemote = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryRemote);
+ const bool GetInlineCacheValue = !HasValueId && !HasChunkId;
+
+ if (GetInlineCacheValue)
+ {
+ RequestIndex++;
+ continue;
+ }
+ if (HasChunkId)
+ {
+ RequestIndex++;
+ continue;
+ }
+ ZenCacheValue RecordCacheValue;
+ if (QueryLocal && CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue))
+ {
+ size_t ResolvedChunkCount = GetChunkIdsFromCacheRecord(RequestIndex, RecordCacheValue);
+ if (ResolvedChunkCount != 0)
+ {
+ RequestIndex += ResolvedChunkCount;
+ continue;
+ }
+ }
+ if (QueryRemote)
+ {
+ UpstreamRecordRequestIndexes.push_back(RequestIndex);
+ size_t ChunkCount = 1;
+ while (Request.Requests[RequestIndex].Key == Request.Requests[RequestIndex + ChunkCount].Key)
+ {
+ ChunkCount++;
+ }
+ RequestIndex += ChunkCount;
+ continue;
+ }
+ }
+ }
+
+ if (!UpstreamRecordRequestIndexes.empty())
+ {
+ // We need to do GetCacheValue for the actual record - we don't want to fetch cache records...
+ cacherequests::GetCacheRecordsRequest RecordsRequest = {.Namespace = Request.Namespace};
+ RecordsRequest.Requests.reserve(UpstreamRecordRequestIndexes.size());
+
+ for (size_t RequestIndex : UpstreamRecordRequestIndexes)
+ {
+ cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex];
+ RecordsRequest.Requests.push_back(ChunkRequest.Key);
+ }
+
+ cacherequests::RecordsRequestPolicy RecordsRequestPolicy = {.DefaultPolicy = Policy.DefaultPolicy | CachePolicy::SkipData |
+ CachePolicy::SkipMeta};
+ cacherequests::GetCacheRecordsResult RecordsResult;
+ RecordsRequestPolicy.RecordPolicies.resize(UpstreamRecordRequestIndexes.size());
+ UpstreamCache.GetCacheRecords(RecordsRequest, RecordsRequestPolicy, RecordsResult, {});
+
+ size_t ResultIndex = 0;
+ for (size_t RequestIndex : UpstreamRecordRequestIndexes)
+ {
+ const std::optional<cacherequests::GetCacheRecordResult>& RecordResult = RecordsResult.Results[ResultIndex++];
+ if (!RecordResult)
+ {
+ continue;
+ }
+ ZEN_ASSERT(RecordResult->Key == Request.Requests[RequestIndex].Key);
+
+ while (RecordResult->Key == Request.Requests[RequestIndex].Key)
+ {
+ cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex];
+ auto FindIt = std::find_if(RecordResult->Values.begin(),
+ RecordResult->Values.end(),
+ [&ChunkRequest](const cacherequests::GetCacheRecordResultValue& Value) {
+ return Value.Id == ChunkRequest.ValueId;
+ });
+ if (FindIt != RecordResult->Values.end())
+ {
+ ChunkRequest.ChunkId = FindIt->RawHash;
+ ChunkRequest.RawSize = FindIt->RawSize;
+ }
+ RequestIndex++;
+ }
+ }
+ }
+
+ // Now we should have any ChunkId we can get for all requests
+
+ std::vector<size_t> UpstreamChunkIndexes;
+ {
+ for (size_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
+ {
+ cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex];
+ cacherequests::CacheValueResult& ChunkResult = OutResult.Results[RequestIndex];
+ CachePolicy ChunkPolicy = cacherequests::GetEffectiveChunkPolicy(Policy, RequestIndex);
+ const bool HasValueId = ChunkRequest.ValueId != Oid::Zero;
+ const bool HasChunkId = ChunkRequest.ChunkId != IoHash::Zero;
+ const bool QueryLocal = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryLocal);
+ const bool SkipData = EnumHasAnyFlags(ChunkPolicy, CachePolicy::SkipData);
+ const bool QueryRemote = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryRemote);
+ const bool GetInlineCacheValue = !HasValueId && !HasChunkId;
+
+ if (GetInlineCacheValue)
+ {
+ if (QueryLocal)
+ {
+ ZenCacheValue RecordCacheValue;
+ if (CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue))
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordCacheValue.Value));
+ if (!Compressed)
+ {
+ Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value));
+ }
+ if (!SkipData)
+ {
+ ChunkResult.Body = Compressed;
+ }
+ ChunkResult.RawSize = Compressed.GetRawSize();
+ ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
+ continue;
+ }
+ }
+ if (QueryRemote)
+ {
+ UpstreamChunkIndexes.push_back(RequestIndex);
+ }
+ continue;
+ }
+ if (!HasChunkId)
+ {
+ // Miss
+ continue;
+ }
+ if (QueryLocal)
+ {
+ if (SkipData && ChunkRequest.RawSize != ~uint64_t(0))
+ {
+ if (CidStore.ContainsChunk(ChunkRequest.ChunkId))
+ {
+ // Hit
+ ChunkResult.RawSize = ChunkRequest.RawSize;
+ ChunkResult.RawHash = ChunkRequest.ChunkId;
+ continue;
+ }
+ }
+ ZenCacheValue RecordCacheValue;
+ if (IoBuffer Chunk = CidStore.FindChunkByCid(ChunkRequest.ChunkId); Chunk)
+ {
+ // Hit
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ if (!Compressed)
+ {
+ Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value));
+ }
+ if (!SkipData)
+ {
+ ChunkResult.Body = Compressed;
+ }
+ ChunkResult.RawSize = Compressed.GetRawSize();
+ ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
+ continue;
+ }
+ }
+ if (QueryRemote)
+ {
+ UpstreamChunkIndexes.push_back(RequestIndex);
+ }
+ }
+ }
+ return HttpResponseCode::OK;
+ }
+
} // namespace impl
void
@@ -1919,9 +2150,359 @@ namespace cache::detail {
} // namespace cache::detail
+static bool UseHandleRpcGetCacheChunksNew = true;
+
+void
+HttpStructuredCacheService::HandleRpcGetCacheChunksNew(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
+{
+ using namespace cache::detail;
+
+ ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
+
+ cacherequests::GetCacheChunksRequest Request;
+ cacherequests::ChunksRequestPolicy Policy;
+ if (!Request.Parse(RpcRequest, Policy))
+ {
+ return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ cacherequests::GetCacheChunksResult Result;
+ Result.Results.resize(Request.Requests.size());
+
+ impl::GetCacheChunks(m_CacheStore,
+ m_CidStore,
+ m_UpstreamCache,
+ m_CacheStats.HitCount,
+ m_CacheStats.UpstreamHitCount,
+ m_CacheStats.MissCount,
+ Request,
+ Policy,
+ Result);
+
+#if 0
+
+ std::vector<size_t> UpstreamChunkIndexes;
+ UpstreamChunkIndexes.reserve(Request.Requests.size());
+ std::vector<CacheKey> UpstreamRecords;
+ std::vector<size_t> MissingChunksIndexes;
+ UpstreamRecords.reserve(Request.Requests.size());
+ cacherequests::CacheRecord CurrentCacheRecord;
+
+ // Requests should be sorted by Key.Hash - it currently does not care about bucket, but is that an issue?
+
+ // Should we just start by checking for which records we need and handle that first and the run through the cache values themselves?
+
+ for (size_t RequestIndex = 0; RequestIndex < Request.Requests.size(); ++RequestIndex)
+ {
+ cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex];
+ cacherequests::CacheValueResult& ChunkResult = Result.Results[RequestIndex];
+ CachePolicy ChunkPolicy = cacherequests::GetEffectiveChunkPolicy(Policy, RequestIndex);
+ const bool HasValueId = ChunkRequest.ValueId != Oid::Zero;
+ const bool HasChunkId = ChunkRequest.ChunkId != IoHash::Zero;
+ const bool QueryLocal = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryLocal);
+ const bool SkipData = EnumHasAnyFlags(ChunkPolicy, CachePolicy::SkipData);
+ const bool QueryRemote = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryRemote);
+ const bool GetInlineCacheValue = !HasValueId && !HasChunkId;
+ if (GetInlineCacheValue)
+ {
+ if (QueryLocal)
+ {
+ ZenCacheValue RecordCacheValue;
+ if (m_CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue))
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordCacheValue.Value));
+ if (!Compressed)
+ {
+ Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value));
+ }
+ if (!SkipData)
+ {
+ ChunkResult.Body = Compressed;
+ }
+ ChunkResult.RawSize = Compressed.GetRawSize();
+ ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
+ continue;
+ }
+ }
+ if (QueryRemote)
+ {
+ UpstreamChunkIndexes.push_back(RequestIndex);
+ }
+ continue;
+ }
+
+ if (!HasChunkId)
+ {
+ ZEN_ASSERT_SLOW(HasValueId);
+ if (CurrentCacheRecord.Key != ChunkRequest.Key)
+ {
+ CurrentCacheRecord.Values.clear();
+
+ ZenCacheValue RecordCacheValue;
+ if (QueryLocal && m_CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue))
+ {
+ if (RecordCacheValue.Value.GetContentType() != ZenContentType::kCbObject)
+ {
+ ZEN_WARN("local record {}/{}/{} is not a structured object, skipping.",
+ Request.Namespace,
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash);
+ // continue;
+ }
+ CbObject Record = LoadCompactBinaryObject(RecordCacheValue.Value);
+ if (!CurrentCacheRecord.Parse(Record))
+ {
+ ZEN_WARN("local record {}/{}/{} is corrupt, skipping",
+ Request.Namespace,
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash);
+ // continue;
+ }
+ CurrentCacheRecord.Key = ChunkRequest.Key;
+ }
+ else
+ {
+ if (QueryRemote && (UpstreamRecords.empty() || UpstreamRecords.back() != ChunkRequest.Key))
+ {
+ UpstreamRecords.push_back(ChunkRequest.Key);
+ }
+ }
+ }
+
+ auto FindIt =
+ std::find_if(CurrentCacheRecord.Values.begin(),
+ CurrentCacheRecord.Values.end(),
+ [&ChunkRequest](const cacherequests::CacheRecordValue& Value) { return Value.Id == ChunkRequest.ValueId; });
+ if (FindIt == CurrentCacheRecord.Values.end())
+ {
+ MissingChunksIndexes.push_back(RequestIndex);
+ continue;
+ }
+ ZEN_ASSERT(FindIt->RawHash != IoHash::Zero);
+ ChunkRequest.ChunkId = FindIt->RawHash;
+ ChunkRequest.RawSize = FindIt->RawSize;
+ }
+ if (QueryLocal)
+ {
+ ZenCacheValue RecordCacheValue;
+ if (m_CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue))
+ {
+ // Parse
+
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordCacheValue.Value));
+ if (!Compressed)
+ {
+ Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value));
+ }
+ if (!SkipData)
+ {
+ ChunkResult.Body = Compressed;
+ }
+ ChunkResult.RawSize = Compressed.GetRawSize();
+ ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
+ continue;
+ }
+ }
+ if (QueryRemote)
+ {
+ UpstreamChunkIndexes.push_back(RequestIndex);
+ }
+ }
+
+ if (!UpstreamRecords.empty())
+ {
+ // We need to do GetCacheValue for the actual record - we don't want to fetch cache records...
+ cacherequests::GetCacheRecordsRequest RecordsRequest = {.Namespace = Request.Namespace, .Requests = UpstreamRecords};
+ cacherequests::RecordsRequestPolicy RecordsRequestPolicy = {.DefaultPolicy = Policy.DefaultPolicy | CachePolicy::SkipData | CachePolicy::SkipMeta}; cacherequests::GetCacheRecordsResult RecordsResult;
+ RecordsRequestPolicy.RecordPolicies.resize(UpstreamRecords.size());
+ m_UpstreamCache.GetCacheRecords(RecordsRequest, RecordsRequestPolicy, RecordsResult, {});
+ size_t MissingChunkOffset = 0;
+ for (const auto& RecordResult : RecordsResult.Results)
+ {
+ if (RecordResult)
+ {
+ while (MissingChunkOffset < MissingChunksIndexes.size())
+ {
+ cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[MissingChunksIndexes[MissingChunkOffset]];
+ if (ChunkRequest.Key != RecordResult->Key)
+ {
+ auto FindIt = std::find_if(
+ CurrentCacheRecord.Values.begin(),
+ CurrentCacheRecord.Values.end(),
+ [&ChunkRequest](const cacherequests::CacheRecordValue& Value) { return Value.Id == ChunkRequest.ValueId; });
+ if (FindIt == CurrentCacheRecord.Values.end())
+ {
+ // The ValueId is not part of the the Record, ignore it
+ continue;
+ }
+ ZEN_ASSERT(FindIt->RawHash != IoHash::Zero);
+ ChunkRequest.ChunkId = FindIt->RawHash;
+ ChunkRequest.RawSize = FindIt->RawSize;
+ break;
+ }
+ MissingChunkOffset++;
+ }
+ }
+ }
+ // Fill in
+ for (size_t Index = 0; Index < Policy.ChunkPolicies.size(); ++Index)
+ {
+
+ }
+ RecordsRequest.Requests = UpstreamRecords;
+ }
+
+ // Try to fetch records locally
+ // Fill in RawHash for any chunk request matching the found cache record
+ // Add any RecordKeys to UpstreamRecords for any records that does not exist locally
+ // Request all UpstreamRecords from upstream
+ // Fill in RawHash for any chunk request matching the found cache record
+ // Run through all requests and fetch the chunks we need where we have filled in RawHash
+ // If chunk is not found locally, add to UpstreamChunkIndexes and try to fetch from upstream
+
+# if 0
+ {
+ {
+ // Try to get local record, if we can't find it add the chunk request to fetch from upstream - if we do, will me miss
+ // opportunity to find payload locally?
+ if (QueryLocal)
+ {
+ ZenCacheValue RecordCacheValue;
+ if (m_CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue))
+ {
+ // Parse
+
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordCacheValue.Value));
+ if (!Compressed)
+ {
+ Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value));
+ }
+ if (!SkipData)
+ {
+ ChunkResult.Body = Compressed;
+ }
+ ChunkResult.RawSize = Compressed.GetRawSize();
+ ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
+ continue;
+ }
+ }
+
+ if (QueryRemote)
+ {
+ // We need the record to be able to fill this request
+ if (UpstreamRecords.empty() || UpstreamRecords.back() != ChunkRequest.Key)
+ {
+ UpstreamRecords.push_back(ChunkRequest.Key);
+ }
+ }
+ continue;
+ }
+ }
+# endif
+# if 0
+ {
+ if (QueryLocal)
+ {
+ if (SkipData && Result.Results[RequestIndex].RawSize != 0)
+ {
+ if (m_CidStore.ContainsChunk(ChunkRequest.ChunkId))
+ {
+ continue;
+ }
+ else
+ {
+ // Fail! Not found!
+ Result.Results[RequestIndex].RawSize = 0;
+ Result.Results[RequestIndex].RawHash = IoHash::Zero;
+ }
+ }
+ IoBuffer Value = m_CidStore.FindChunkByCid(ChunkRequest.ChunkId);
+ if (Value)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Value));
+ if (!Compressed)
+ {
+ Compressed = CompressedBuffer::Compress(SharedBuffer(Value));
+ }
+ if (Result.Results[RequestIndex].RawHash != IoHash::FromBLAKE3(Compressed.GetRawHash()))
+ {
+ // Fail! Not found!
+ Result.Results[RequestIndex].RawSize = 0;
+ Result.Results[RequestIndex].RawHash = IoHash::Zero;
+ continue;
+ }
+ Result.Results[RequestIndex].RawSize = Compressed.GetRawSize();
+ if (!SkipData)
+ {
+ Result.Results[RequestIndex].Body = Compressed;
+ }
+ continue;
+ }
+ }
+ if (QueryRemote)
+ {
+ UpstreamChunkIndexes.push_back(RequestIndex);
+ }
+ }
+# endif
+
+ if (!UpstreamRecords.empty())
+ {
+ // We need to do GetCacheValue for the actual record - we don't want to fetch cache records...
+ // cacherequests::GetCacheRecordsRequest RecordsRequest = {.Namespace = Request.Namespace, .Requests = UpstreamRecords};
+ // cacherequests::RecordsRequestPolicy RecordsRequestPolicy = {.DefaultPolicy = Policy.DefaultPolicy | CachePolicy::SkipData
+ //| CachePolicy::SkipMeta}; cacherequests::GetCacheRecordsResult RecordsResult;
+ // RecordsRequestPolicy.RecordPolicies.resize(UpstreamRecords.size());
+ // m_UpstreamCache.GetCacheRecords(RecordsRequest, RecordsRequestPolicy, RecordsResult, {});
+ // // Fill in
+ // for (size_t Index = 0; Index < Policy.ChunkPolicies.size(); ++Index)
+ // {
+ //
+ // }
+ // RecordsRequest.Requests = UpstreamRecords;
+ }
+
+ if (!UpstreamChunkIndexes.empty())
+ {
+// m_UpstreamCache.GetCacheChunks(Request.Namespace, Request, Policy, UpstreamChunkIndexes);
+ }
+#endif
+ std::string Namespace;
+ std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
+ std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests
+ std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream
+ std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest
+ std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key
+ std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
+ std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
+
+ // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
+ if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
+ {
+ return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
+ // have it locally, and otherwise append a request for the payload to UpstreamChunks
+ GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks);
+
+ // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks
+ GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks);
+
+ // Call GetCacheChunks on the upstream for any payloads we do not have locally
+ GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests);
+
+ // Send the payload and descriptive data about each chunk to the client
+ WriteGetCacheChunksResponse(Namespace, Requests, HttpRequest);
+}
+
void
HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
{
+ if (UseHandleRpcGetCacheChunksNew)
+ {
+ HandleRpcGetCacheChunksNew(HttpRequest, RpcRequest);
+ return;
+ }
using namespace cache::detail;
ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 228b0f69f..0b8e8ee93 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -4,6 +4,7 @@
#include <zencore/stats.h>
#include <zenhttp/httpserver.h>
+#include <zenutil/cache/cacherequests.h>
#include "monitoring/httpstats.h"
#include "monitoring/httpstatus.h"
@@ -109,6 +110,7 @@ private:
void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
void HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest);
void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleRpcGetCacheChunksNew(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
void HandleRpcGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
void HandleCacheNamespaceRequest(zen::HttpServerRequest& Request, std::string_view Namespace);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket);
@@ -161,12 +163,6 @@ private:
class ZenCacheStoreBase;
class CidStoreBase;
-namespace cacherequests {
- struct GetCacheRecordsRequest;
- struct RecordsRequestPolicy;
- struct GetCacheRecordsResult;
-} // namespace cacherequests
-
namespace impl {
HttpResponseCode GetCacheRecords(ZenCacheStoreBase& CacheStore,
CidStoreBase& CidStore,
@@ -177,7 +173,17 @@ namespace impl {
const cacherequests::GetCacheRecordsRequest& Request,
const cacherequests::RecordsRequestPolicy& Policy,
cacherequests::GetCacheRecordsResult& OutResult);
-}
+
+ HttpResponseCode GetCacheChunks(ZenCacheStoreBase& CacheStore,
+ CidStoreBase& CidStore,
+ UpstreamCache& UpstreamCache,
+ std::atomic_uint64_t& HitCount,
+ std::atomic_uint64_t& UpstreamHitCount,
+ std::atomic_uint64_t& MissCount,
+ cacherequests::GetCacheChunksRequest& Request,
+ const cacherequests::ChunksRequestPolicy& Policy,
+ cacherequests::GetCacheChunksResult& OutResult);
+} // namespace impl
/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
* We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */