aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormattpetersepic <[email protected]>2022-01-26 18:45:04 -0700
committerGitHub <[email protected]>2022-01-26 18:45:04 -0700
commit09f2ac4d9aaea0107af8fbd6a41c2d3fe3e450ba (patch)
tree41bc55af44f16fafa674c7e8c21d56fec7c79d53
parentImplement SkipData,QueryLocal,StoreLocal for GET-verb CacheGet requests (#39) (diff)
downloadzen-09f2ac4d9aaea0107af8fbd6a41c2d3fe3e450ba.tar.xz
zen-09f2ac4d9aaea0107af8fbd6a41c2d3fe3e450ba.zip
Implement SkipData,QueryLocal,StoreLocal for HandleRpcGetCacheRecords (#41)
* Implement SkipData,QueryLocal,StoreLocal for HandleRpcGetCacheRecords.
-rw-r--r--zenserver/cache/structuredcache.cpp154
-rw-r--r--zenserver/upstream/upstreamcache.cpp27
-rw-r--r--zenutil/cache/cachepolicy.cpp20
-rw-r--r--zenutil/include/zenutil/cache/cachepolicy.h4
4 files changed, 142 insertions, 63 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 228d33202..0f385116b 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -822,19 +822,14 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
CbPackage RpcResponse;
- CacheRecordPolicy Policy;
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ CacheRecordPolicy BatchPolicy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView());
std::vector<CacheKey> CacheKeys;
std::vector<IoBuffer> CacheValues;
std::vector<size_t> UpstreamRequests;
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
- Policy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView());
-
- const bool PartialRecord = EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::PartialRecord);
- const bool QueryRemote = EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote);
-
for (CbFieldView KeyView : Params["CacheKeys"sv])
{
CbObjectView KeyObject = KeyView.AsObjectView();
@@ -851,44 +846,84 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys)
{
ZenCacheValue CacheValue;
- uint32_t MissingCount = 0;
+ uint32_t MissingCount = 0;
+ uint32_t MissingReadFromUpstreamCount = 0;
- if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
+ if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
{
CbObjectView CacheRecord(CacheValue.Value.Data());
- CacheRecord.IterateAttachments([this, &MissingCount, &RpcResponse](CbFieldView AttachmentHash) {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- ZEN_ASSERT(Chunk.GetSize() > 0);
- RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
- }
- else
- {
- MissingCount++;
- }
- });
+ CacheRecord.IterateAttachments(
+ [this, &MissingCount, &MissingReadFromUpstreamCount, &RpcResponse, &BatchPolicy](CbFieldView AttachmentHash) {
+ CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy();
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal))
+ {
+ // A value that is requested without the Query flag (such as None/Disable) does not count as missing, because we
+ // didn't ask for it and thus the record is complete in its absence.
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ MissingReadFromUpstreamCount++;
+ MissingCount++;
+ }
+ }
+ else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
+ {
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ MissingReadFromUpstreamCount++;
+ }
+ MissingCount++;
+ }
+ }
+ else
+ {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ ZEN_ASSERT(Chunk.GetSize() > 0);
+ RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ }
+ else
+ {
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ MissingReadFromUpstreamCount++;
+ }
+ MissingCount++;
+ }
+ }
+ });
}
- if (CacheValue.Value && (MissingCount == 0 || PartialRecord))
+ if ((!CacheValue.Value && EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryRemote)) ||
+ MissingReadFromUpstreamCount != 0)
+ {
+ UpstreamRequests.push_back(KeyIndex);
+ }
+ else if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
{
ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL) {}",
Key.Bucket,
Key.Hash,
NiceBytes(CacheValue.Value.Size()),
ToString(CacheValue.Value.GetContentType()),
- MissingCount ? "(PARTIAl)" : ""sv);
+ MissingCount ? "(PARTIAL)" : ""sv);
CacheValues[KeyIndex] = std::move(CacheValue.Value);
m_CacheStats.HitCount++;
}
- else if (QueryRemote)
- {
- UpstreamRequests.push_back(KeyIndex);
- }
else
{
- ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAl)"sv : ""sv);
- m_CacheStats.MissCount++;
+ if (!EnumHasAnyFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::Query))
+ {
+ // If they requested no query, do not record this as a miss
+ ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash);
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAL)"sv : ""sv);
+ m_CacheStats.MissCount++;
+ }
}
++KeyIndex;
@@ -896,7 +931,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
if (!UpstreamRequests.empty())
{
- const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, PartialRecord](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, &BatchPolicy](CacheRecordGetCompleteParams&& Params) {
ZEN_ASSERT(Params.KeyIndex < CacheValues.size());
IoBuffer CacheValue;
@@ -904,37 +939,52 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
if (Params.Record)
{
- Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count](CbFieldView HashView) {
- if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash()))
+ Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count, &BatchPolicy](CbFieldView HashView) {
+ CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy();
+ bool FoundInUpstream = false;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
{
- if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
+ if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash()))
{
- auto InsertResult = m_CidStore.AddChunk(Compressed);
- if (InsertResult.New)
+ FoundInUpstream = true;
+ if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
{
- Count.New++;
- }
- Count.Valid++;
+ FoundInUpstream = true;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
+ {
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ }
+ Count.Valid++;
- RpcResponse.AddAttachment(CbAttachment(Compressed));
- }
- else
- {
- ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'",
- HashView.AsHash(),
- Params.Key.Bucket,
- Params.Key.Hash);
- Count.Invalid++;
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ RpcResponse.AddAttachment(CbAttachment(Compressed));
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'",
+ HashView.AsHash(),
+ Params.Key.Bucket,
+ Params.Key.Hash);
+ Count.Invalid++;
+ }
}
}
- else if (m_CidStore.ContainsChunk(HashView.AsHash()))
+ if (!FoundInUpstream && EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal) &&
+ m_CidStore.ContainsChunk(HashView.AsHash()))
{
+ // We added the attachment for this Value in the local loop before calling m_UpstreamCache
Count.Valid++;
}
Count.Total++;
});
- if ((Count.Valid == Count.Total) || PartialRecord)
+ if ((Count.Valid == Count.Total) || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))
{
CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer();
}
@@ -952,9 +1002,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
Count.Total);
CacheValue.SetContentType(ZenContentType::kCbObject);
-
CacheValues[Params.KeyIndex] = CacheValue;
- m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue});
+ if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::StoreLocal))
+ {
+ m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue});
+ }
m_CacheStats.HitCount++;
m_CacheStats.UpstreamHitCount++;
@@ -967,7 +1019,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
}
};
- m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete));
+ m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, BatchPolicy, std::move(OnCacheRecordGetComplete));
}
CbObjectWriter ResponseObject;
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 8b02a437a..091406db3 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -1046,7 +1046,7 @@ public:
virtual void GetCacheRecords(std::span<CacheKey> CacheKeys,
std::span<size_t> KeyIndex,
- const CacheRecordPolicy& Policy,
+ const CacheRecordPolicy& DownstreamPolicy,
OnCacheRecordGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::GetCacheRecords");
@@ -1057,6 +1057,8 @@ public:
if (m_Options.ReadUpstream)
{
+ CacheRecordPolicy UpstreamPolicy = DownstreamPolicy.ConvertToUpstream();
+
for (auto& Endpoint : m_Endpoints)
{
if (RemainingKeys.empty())
@@ -1075,18 +1077,19 @@ public:
{
metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) {
- if (Params.Record)
- {
- OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
+ Result =
+ Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, UpstreamPolicy, [&](CacheRecordGetCompleteParams&& Params) {
+ if (Params.Record)
+ {
+ OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
- Stats.CacheHitCount.Increment(1);
- }
- else
- {
- Missing.push_back(Params.KeyIndex);
- }
- });
+ Stats.CacheHitCount.Increment(1);
+ }
+ else
+ {
+ Missing.push_back(Params.KeyIndex);
+ }
+ });
}
Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size()));
diff --git a/zenutil/cache/cachepolicy.cpp b/zenutil/cache/cachepolicy.cpp
index ba345485a..3bf7a0c67 100644
--- a/zenutil/cache/cachepolicy.cpp
+++ b/zenutil/cache/cachepolicy.cpp
@@ -4,6 +4,7 @@
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/enumflags.h>
#include <zencore/string.h>
#include <algorithm>
@@ -206,6 +207,25 @@ CacheRecordPolicy::Load(CbObjectView Object, CachePolicy DefaultPolicy)
return Builder.Build();
}
+CacheRecordPolicy
+CacheRecordPolicy::ConvertToUpstream() const
+{
+ auto DownstreamToUpstream = [](CachePolicy P) {
+ // Remote|Local -> Set Remote
+ // Delete Skip Flags
+ // Maintain Remaining Flags
+ return (EnumHasAllFlags(P, CachePolicy::QueryRemote) ? CachePolicy::QueryLocal : CachePolicy::None) |
+ (EnumHasAllFlags(P, CachePolicy::StoreRemote) ? CachePolicy::StoreLocal : CachePolicy::None) |
+ (P & ~(CachePolicy::SkipData | CachePolicy::SkipMeta));
+ };
+ CacheRecordPolicyBuilder Builder(DownstreamToUpstream(GetDefaultValuePolicy()));
+ for (const CacheValuePolicy& ValuePolicy : GetValuePolicies())
+ {
+ Builder.AddValuePolicy(ValuePolicy.Id, DownstreamToUpstream(ValuePolicy.Policy));
+ }
+ return Builder.Build();
+}
+
void
CacheRecordPolicyBuilder::AddValuePolicy(const CacheValuePolicy& Policy)
{
diff --git a/zenutil/include/zenutil/cache/cachepolicy.h b/zenutil/include/zenutil/cache/cachepolicy.h
index f967f707b..b3602edbd 100644
--- a/zenutil/include/zenutil/cache/cachepolicy.h
+++ b/zenutil/include/zenutil/cache/cachepolicy.h
@@ -144,6 +144,10 @@ public:
*/
static CacheRecordPolicy Load(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default);
+ /** Return *this converted into the equivalent policy that the upstream should use when forwarding a put or get to an upstream server.
+ */
+ CacheRecordPolicy ConvertToUpstream() const;
+
private:
friend class CacheRecordPolicyBuilder;