diff options
| author | mattpetersepic <[email protected]> | 2022-01-26 18:45:04 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-01-26 18:45:04 -0700 |
| commit | 09f2ac4d9aaea0107af8fbd6a41c2d3fe3e450ba (patch) | |
| tree | 41bc55af44f16fafa674c7e8c21d56fec7c79d53 | |
| parent | Implement SkipData,QueryLocal,StoreLocal for GET-verb CacheGet requests (#39) (diff) | |
| download | zen-09f2ac4d9aaea0107af8fbd6a41c2d3fe3e450ba.tar.xz zen-09f2ac4d9aaea0107af8fbd6a41c2d3fe3e450ba.zip | |
Implement SkipData,QueryLocal,StoreLocal for HandleRpcGetCacheRecords (#41)
* Implement SkipData,QueryLocal,StoreLocal for HandleRpcGetCacheRecords.
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 154 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 27 | ||||
| -rw-r--r-- | zenutil/cache/cachepolicy.cpp | 20 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cachepolicy.h | 4 |
4 files changed, 142 insertions, 63 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 228d33202..0f385116b 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -822,19 +822,14 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); CbPackage RpcResponse; - CacheRecordPolicy Policy; - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + CacheRecordPolicy BatchPolicy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView()); std::vector<CacheKey> CacheKeys; std::vector<IoBuffer> CacheValues; std::vector<size_t> UpstreamRequests; ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); - Policy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView()); - - const bool PartialRecord = EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::PartialRecord); - const bool QueryRemote = EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote); - for (CbFieldView KeyView : Params["CacheKeys"sv]) { CbObjectView KeyObject = KeyView.AsObjectView(); @@ -851,44 +846,84 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys) { ZenCacheValue CacheValue; - uint32_t MissingCount = 0; + uint32_t MissingCount = 0; + uint32_t MissingReadFromUpstreamCount = 0; - if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) + if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) { CbObjectView CacheRecord(CacheValue.Value.Data()); - CacheRecord.IterateAttachments([this, &MissingCount, &RpcResponse](CbFieldView AttachmentHash) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - ZEN_ASSERT(Chunk.GetSize() > 0); - RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); - } - else - { - MissingCount++; - } - }); + CacheRecord.IterateAttachments( + [this, &MissingCount, &MissingReadFromUpstreamCount, &RpcResponse, &BatchPolicy](CbFieldView AttachmentHash) { + CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy(); + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal)) + { + // A value that is requested without the Query flag (such as None/Disable) does not count as missing, because we + // didn't ask for it and thus the record is complete in its absence. + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + MissingReadFromUpstreamCount++; + MissingCount++; + } + } + else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) + { + if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + { + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + MissingReadFromUpstreamCount++; + } + MissingCount++; + } + } + else + { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + ZEN_ASSERT(Chunk.GetSize() > 0); + RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + } + else + { + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + MissingReadFromUpstreamCount++; + } + MissingCount++; + } + } + }); } - if (CacheValue.Value && (MissingCount == 0 || PartialRecord)) + if ((!CacheValue.Value && EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryRemote)) || + MissingReadFromUpstreamCount != 0) + { + UpstreamRequests.push_back(KeyIndex); + } + else if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) { ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL) {}", Key.Bucket, Key.Hash, NiceBytes(CacheValue.Value.Size()), ToString(CacheValue.Value.GetContentType()), - MissingCount ? "(PARTIAl)" : ""sv); + MissingCount ? "(PARTIAL)" : ""sv); CacheValues[KeyIndex] = std::move(CacheValue.Value); m_CacheStats.HitCount++; } - else if (QueryRemote) - { - UpstreamRequests.push_back(KeyIndex); - } else { - ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAl)"sv : ""sv); - m_CacheStats.MissCount++; + if (!EnumHasAnyFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::Query)) + { + // If they requested no query, do not record this as a miss + ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); + } + else + { + ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAL)"sv : ""sv); + m_CacheStats.MissCount++; + } } ++KeyIndex; @@ -896,7 +931,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req if (!UpstreamRequests.empty()) { - const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, PartialRecord](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, &BatchPolicy](CacheRecordGetCompleteParams&& Params) { ZEN_ASSERT(Params.KeyIndex < CacheValues.size()); IoBuffer CacheValue; @@ -904,37 +939,52 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req if (Params.Record) { - Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count](CbFieldView HashView) { - if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash())) + Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count, &BatchPolicy](CbFieldView HashView) { + CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy(); + bool FoundInUpstream = false; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { - if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) + if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash())) { - auto InsertResult = m_CidStore.AddChunk(Compressed); - if (InsertResult.New) + FoundInUpstream = true; + if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { - Count.New++; - } - Count.Valid++; + FoundInUpstream = true; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) + { + auto InsertResult = m_CidStore.AddChunk(Compressed); + if (InsertResult.New) + { + Count.New++; + } + } + Count.Valid++; - RpcResponse.AddAttachment(CbAttachment(Compressed)); - } - else - { - ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", - HashView.AsHash(), - Params.Key.Bucket, - Params.Key.Hash); - Count.Invalid++; + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) + { + RpcResponse.AddAttachment(CbAttachment(Compressed)); + } + } + else + { + ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", + HashView.AsHash(), + Params.Key.Bucket, + Params.Key.Hash); + Count.Invalid++; + } } } - else if (m_CidStore.ContainsChunk(HashView.AsHash())) + if (!FoundInUpstream && EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal) && + m_CidStore.ContainsChunk(HashView.AsHash())) { + // We added the attachment for this Value in the local loop before calling m_UpstreamCache Count.Valid++; } Count.Total++; }); - if ((Count.Valid == Count.Total) || PartialRecord) + if ((Count.Valid == Count.Total) || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)) { CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer(); } @@ -952,9 +1002,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req Count.Total); CacheValue.SetContentType(ZenContentType::kCbObject); - CacheValues[Params.KeyIndex] = CacheValue; - m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue}); + if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) + { + m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue}); + } m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; @@ -967,7 +1019,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req } }; - m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete)); + m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, BatchPolicy, std::move(OnCacheRecordGetComplete)); } CbObjectWriter ResponseObject; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 8b02a437a..091406db3 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -1046,7 +1046,7 @@ public: virtual void GetCacheRecords(std::span<CacheKey> CacheKeys, std::span<size_t> KeyIndex, - const CacheRecordPolicy& Policy, + const CacheRecordPolicy& DownstreamPolicy, OnCacheRecordGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); @@ -1057,6 +1057,8 @@ public: if (m_Options.ReadUpstream) { + CacheRecordPolicy UpstreamPolicy = DownstreamPolicy.ConvertToUpstream(); + for (auto& Endpoint : m_Endpoints) { if (RemainingKeys.empty()) @@ -1075,18 +1077,19 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { - if (Params.Record) - { - OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); + Result = + Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, UpstreamPolicy, [&](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); - Stats.CacheHitCount.Increment(1); - } - else - { - Missing.push_back(Params.KeyIndex); - } - }); + Stats.CacheHitCount.Increment(1); + } + else + { + Missing.push_back(Params.KeyIndex); + } + }); } Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); diff --git a/zenutil/cache/cachepolicy.cpp b/zenutil/cache/cachepolicy.cpp index ba345485a..3bf7a0c67 100644 --- a/zenutil/cache/cachepolicy.cpp +++ b/zenutil/cache/cachepolicy.cpp @@ -4,6 +4,7 @@ #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/enumflags.h> #include <zencore/string.h> #include <algorithm> @@ -206,6 +207,25 @@ CacheRecordPolicy::Load(CbObjectView Object, CachePolicy DefaultPolicy) return Builder.Build(); } +CacheRecordPolicy +CacheRecordPolicy::ConvertToUpstream() const +{ + auto DownstreamToUpstream = [](CachePolicy P) { + // Remote|Local -> Set Remote + // Delete Skip Flags + // Maintain Remaining Flags + return (EnumHasAllFlags(P, CachePolicy::QueryRemote) ? CachePolicy::QueryLocal : CachePolicy::None) | + (EnumHasAllFlags(P, CachePolicy::StoreRemote) ? CachePolicy::StoreLocal : CachePolicy::None) | + (P & ~(CachePolicy::SkipData | CachePolicy::SkipMeta)); + }; + CacheRecordPolicyBuilder Builder(DownstreamToUpstream(GetDefaultValuePolicy())); + for (const CacheValuePolicy& ValuePolicy : GetValuePolicies()) + { + Builder.AddValuePolicy(ValuePolicy.Id, DownstreamToUpstream(ValuePolicy.Policy)); + } + return Builder.Build(); +} + void CacheRecordPolicyBuilder::AddValuePolicy(const CacheValuePolicy& Policy) { diff --git a/zenutil/include/zenutil/cache/cachepolicy.h b/zenutil/include/zenutil/cache/cachepolicy.h index f967f707b..b3602edbd 100644 --- a/zenutil/include/zenutil/cache/cachepolicy.h +++ b/zenutil/include/zenutil/cache/cachepolicy.h @@ -144,6 +144,10 @@ public: */ static CacheRecordPolicy Load(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default); + /** Return *this converted into the equivalent policy that the upstream should use when forwarding a put or get to an upstream server. + */ + CacheRecordPolicy ConvertToUpstream() const; + private: friend class CacheRecordPolicyBuilder; |