diff options
| author | Dan Engelbrecht <[email protected]> | 2022-09-06 12:18:22 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-09-07 12:19:08 +0200 |
| commit | 99faa73f5185e9dbe662d1739276ef6d2ea54b0d (patch) | |
| tree | 9bf207e8e9c9dcc3080722a5064f848b19e474a0 | |
| parent | Make policy data separate in PutCacheRecordsRequest (diff) | |
| download | zen-99faa73f5185e9dbe662d1739276ef6d2ea54b0d.tar.xz zen-99faa73f5185e9dbe662d1739276ef6d2ea54b0d.zip | |
GetCacheRecords uses proper separate Policy struct
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 59 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 321 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 18 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 18 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 3 | ||||
| -rw-r--r-- | zenutil/cache/cacherequests.cpp | 171 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cacherequests.h | 81 |
7 files changed, 424 insertions, 247 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 0d8070ef8..94cb80bc6 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1312,11 +1312,11 @@ TEST_CASE("zcache.rpc") { using namespace std::literals; - auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, - cacherequests::PutCacheRecordsRequestPolicy& Policy, - const zen::CacheKey& CacheKey, - size_t PayloadSize, - CachePolicy RecordPolicy) { + auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, + cacherequests::RecordsRequestPolicy& Policy, + const zen::CacheKey& CacheKey, + size_t PayloadSize, + CachePolicy RecordPolicy) { std::vector<uint8_t> Data; Data.resize(PayloadSize); for (size_t Idx = 0; Idx < PayloadSize; ++Idx) @@ -1342,8 +1342,8 @@ TEST_CASE("zcache.rpc") ((uint32_t*)(KeyHash.Hash))[0] = Key; const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); - cacherequests::PutCacheRecordsRequest Request = {.Namespace = std::string(Namespace)}; - cacherequests::PutCacheRecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default}; + cacherequests::PutCacheRecordsRequest Request = {.Namespace = std::string(Namespace)}; + cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default}; AppendCacheRecord(Request, Policy, CacheKey, PayloadSize, CachePolicy::Default); OutKeys.push_back(CacheKey); @@ -1372,14 +1372,16 @@ TEST_CASE("zcache.rpc") std::string_view Namespace, std::span<zen::CacheKey> Keys, zen::CachePolicy Policy) -> GetCacheRecordResult { - cacherequests::GetCacheRecordsRequest Request = {.DefaultPolicy = Policy, .Namespace = std::string(Namespace)}; + cacherequests::GetCacheRecordsRequest Request = {.Namespace = std::string(Namespace)}; + cacherequests::RecordsRequestPolicy RequestPolicy = {.DefaultPolicy = Policy}; for (const CacheKey& Key : Keys) { - Request.Requests.push_back({.Key = Key}); + Request.Requests.push_back(Key); + RequestPolicy.RecordPolicies.push_back({}); } CbObjectWriter RequestWriter; - CHECK(Request.Format(RequestWriter)); + CHECK(Request.Format(RequestWriter, RequestPolicy)); BinaryWriter Body; RequestWriter.Save(Body); @@ -1572,11 +1574,11 @@ TEST_CASE("zcache.failing.upstream") using namespace std::literals; - auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, - cacherequests::PutCacheRecordsRequestPolicy& Policy, - const zen::CacheKey& CacheKey, - size_t PayloadSize, - CachePolicy RecordPolicy) { + auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request, + cacherequests::RecordsRequestPolicy& Policy, + const zen::CacheKey& CacheKey, + size_t PayloadSize, + CachePolicy RecordPolicy) { std::vector<uint32_t> Data; Data.resize(PayloadSize / 4); for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx) @@ -1597,8 +1599,8 @@ TEST_CASE("zcache.failing.upstream") size_t PayloadSize = 8192) -> std::vector<CacheKey> { std::vector<zen::CacheKey> OutKeys; - cacherequests::PutCacheRecordsRequest Request = {.Namespace = std::string(Namespace)}; - cacherequests::PutCacheRecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default}; + cacherequests::PutCacheRecordsRequest Request = {.Namespace = std::string(Namespace)}; + cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default}; for (size_t Key = 1; Key <= Num; ++Key) { zen::IoHash KeyHash; @@ -1637,14 +1639,16 @@ TEST_CASE("zcache.failing.upstream") std::string_view Namespace, std::span<zen::CacheKey> Keys, zen::CachePolicy Policy) -> GetCacheRecordResult { - cacherequests::GetCacheRecordsRequest Request = {.DefaultPolicy = Policy, .Namespace = std::string(Namespace)}; + cacherequests::GetCacheRecordsRequest Request = {.Namespace = std::string(Namespace)}; + cacherequests::RecordsRequestPolicy RequestPolicy = {.DefaultPolicy = Policy}; for (const CacheKey& Key : Keys) { - Request.Requests.push_back({.Key = Key}); + Request.Requests.push_back(Key); + RequestPolicy.RecordPolicies.push_back({}); } CbObjectWriter RequestWriter; - CHECK(Request.Format(RequestWriter)); + CHECK(Request.Format(RequestWriter, RequestPolicy)); BinaryWriter Body; RequestWriter.Save(Body); @@ -2017,9 +2021,9 @@ TEST_CASE("zcache.rpc.allpolicies") // PutCacheRecords { - CachePolicy BatchDefaultPolicy = CachePolicy::Default; - cacherequests::PutCacheRecordsRequest Request = {.Namespace = std::string(TestNamespace)}; - cacherequests::PutCacheRecordsRequestPolicy Policy = {.DefaultPolicy = BatchDefaultPolicy}; + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + cacherequests::PutCacheRecordsRequest Request = {.Namespace = std::string(TestNamespace)}; + cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = BatchDefaultPolicy}; Request.Requests.reserve(PutRequests.size()); Policy.RecordPolicies.reserve(PutRequests.size()); for (CachePutRequest& PutRequest : PutRequests) @@ -2086,15 +2090,18 @@ TEST_CASE("zcache.rpc.allpolicies") // GetCacheRecords { CachePolicy BatchDefaultPolicy = CachePolicy::Default; - cacherequests::GetCacheRecordsRequest Request = {.DefaultPolicy = BatchDefaultPolicy, .Namespace = std::string(TestNamespace)}; + cacherequests::GetCacheRecordsRequest Request = {.Namespace = std::string(TestNamespace)}; + cacherequests::RecordsRequestPolicy RequestPolicy = {.DefaultPolicy = BatchDefaultPolicy}; Request.Requests.reserve(GetRequests.size()); + RequestPolicy.RecordPolicies.reserve(GetRequests.size()); for (CacheGetRequest& GetRequest : GetRequests) { - Request.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy}); + Request.Requests.push_back(GetRequest.Key); + RequestPolicy.RecordPolicies.push_back(GetRequest.Policy); } CbPackage Package; - CHECK(Request.Format(Package)); + CHECK(Request.Format(Package, RequestPolicy)); IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 428d690f8..6803b1cff 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1269,62 +1269,52 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack namespace impl { - HttpResponseCode GetCacheRecords(ZenCacheStoreBase& CacheStore, - CidStoreBase& CidStore, - UpstreamCache& UpstreamCache, - std::atomic_uint64_t& HitCount, - std::atomic_uint64_t& UpstreamHitCount, - std::atomic_uint64_t& MissCount, - cacherequests::GetCacheRecordsRequest&& Request, - cacherequests::GetCacheRecordsResult& OutResult) + HttpResponseCode GetCacheRecords(ZenCacheStoreBase& CacheStore, + CidStoreBase& CidStore, + UpstreamCache& UpstreamCache, + std::atomic_uint64_t& HitCount, + std::atomic_uint64_t& UpstreamHitCount, + std::atomic_uint64_t& MissCount, + const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, + cacherequests::GetCacheRecordsResult& OutResult) { size_t RequestCount = Request.Requests.size(); OutResult.Results.resize(RequestCount); std::vector<size_t> UpstreamRecordIndexes; UpstreamRecordIndexes.reserve(RequestCount); - CachePolicy DefaultDownstreamPolicy = Request.DefaultPolicy; - struct RecordRequestData { - std::optional<CacheRecordPolicy> DownstreamPolicy; - bool UsedUpstream = false; - std::unordered_set<IoHash> MissingValues; + bool MissingRecord = false; + bool UsedUpstream = false; + size_t MissingValueCount = 0; }; std::vector<RecordRequestData> RequestData(RequestCount); - std::unordered_set<size_t> RecordsToStoreLocally; - for (size_t RequestIndex = 0; RequestIndex < RequestCount; RequestIndex++) { - const cacherequests::GetCacheRecordRequest& RecordRequest = Request.Requests[RequestIndex]; + const CacheKey& Key = Request.Requests[RequestIndex]; RecordRequestData& RecordRequestData = RequestData[RequestIndex]; std::optional<cacherequests::GetCacheRecordResult>& RecordResult = OutResult.Results[RequestIndex]; - RecordRequestData.DownstreamPolicy = RecordRequest.Policy; - const CacheRecordPolicy DownstreamPolicy = RecordRequestData.DownstreamPolicy.value_or(DefaultDownstreamPolicy); + const CachePolicy RecordPolicy = cacherequests::GetEffectiveRecordPolicy(Policy, RequestIndex); - bool QueryLocalRecord = EnumHasAllFlags(DownstreamPolicy.GetRecordPolicy(), CachePolicy::QueryLocal); + bool QueryLocalRecord = EnumHasAllFlags(RecordPolicy, CachePolicy::QueryLocal); ZenCacheValue RecordCacheValue; - if (QueryLocalRecord && CacheStore.Get(Request.Namespace, RecordRequest.Key.Bucket, RecordRequest.Key.Hash, RecordCacheValue)) + if (QueryLocalRecord && CacheStore.Get(Request.Namespace, Key.Bucket, Key.Hash, RecordCacheValue)) { if (RecordCacheValue.Value.GetContentType() != ZenContentType::kCbObject) { - ZEN_WARN("local record {}/{}/{} is not a structured object, skipping.", - Request.Namespace, - RecordRequest.Key.Bucket, - RecordRequest.Key.Hash); + ZEN_WARN("local record {}/{}/{} is not a structured object, skipping.", Request.Namespace, Key.Bucket, Key.Hash); continue; } CbObject Record = LoadCompactBinaryObject(RecordCacheValue.Value); cacherequests::CacheRecord CacheRecord; if (!CacheRecord.Parse(Record)) { - ZEN_WARN("local record {}/{}/{} is corrupt, skipping.", - Request.Namespace, - RecordRequest.Key.Bucket, - RecordRequest.Key.Hash); + ZEN_WARN("local record {}/{}/{} is corrupt, skipping.", Request.Namespace, Key.Bucket, Key.Hash); continue; } @@ -1333,17 +1323,17 @@ namespace impl { RecordResult->Values.resize(CacheRecord.Values.size()); for (size_t ValueIndex = 0; ValueIndex < CacheRecord.Values.size(); ++ValueIndex) { - const cacherequests::CacheRecordValue& Value = CacheRecord.Values[ValueIndex]; - CachePolicy ValueDownstreamPolicy = DownstreamPolicy.GetValuePolicy(Value.Id); - RecordResult->Values[ValueIndex] = {.Id = Value.Id, .RawHash = Value.RawHash, .RawSize = Value.RawSize}; - bool ValueQueryAny = EnumHasAnyFlags(ValueDownstreamPolicy, CachePolicy::Query); + const cacherequests::CacheRecordValue& Value = CacheRecord.Values[ValueIndex]; + CachePolicy ValuePolicy = cacherequests::GetEffectiveValuePolicy(Policy, RequestIndex, Value.Id); + RecordResult->Values[ValueIndex] = {.Id = Value.Id, .RawHash = Value.RawHash, .RawSize = Value.RawSize}; + bool ValueQueryAny = EnumHasAnyFlags(ValuePolicy, CachePolicy::Query); if (!ValueQueryAny) { // A value that is requested without the Query flag (such as None/Disable) counts as existing, because we // didn't ask for it and thus the record is complete in its absence. continue; } - bool ValueSkipData = EnumHasAllFlags(ValueDownstreamPolicy, CachePolicy::SkipData); + bool ValueSkipData = EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData); if (ValueSkipData) { // Verify that chunk exist even if we skip the actual data @@ -1351,10 +1341,10 @@ namespace impl { { continue; } - bool ValueQueryRemote = EnumHasAllFlags(ValueDownstreamPolicy, CachePolicy::QueryRemote); + bool ValueQueryRemote = EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote); if (ValueQueryRemote) { - RecordRequestData.MissingValues.insert(Value.RawHash); + RecordRequestData.MissingValueCount++; } continue; } @@ -1364,57 +1354,68 @@ namespace impl { RecordResult->Values[ValueIndex].Body = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); continue; } - if (EnumHasAllFlags(ValueDownstreamPolicy, CachePolicy::QueryRemote)) + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { - RecordRequestData.MissingValues.insert(Value.RawHash); + RecordRequestData.MissingValueCount++; } } } - if (!RecordResult || !RecordRequestData.MissingValues.empty()) + ZEN_ASSERT(RecordRequestData.MissingValueCount <= RecordResult->Values.size()); + if (!RecordResult || RecordRequestData.MissingValueCount > 0) { - bool RecordQueryRemote = EnumHasAllFlags(DownstreamPolicy.GetRecordPolicy(), CachePolicy::QueryRemote); + bool RecordQueryRemote = EnumHasAllFlags(RecordPolicy, CachePolicy::QueryRemote); if (!RecordQueryRemote) { continue; } UpstreamRecordIndexes.push_back(RequestIndex); + RecordRequestData.MissingRecord = !RecordResult; } } if (!UpstreamRecordIndexes.empty()) { - Request.DefaultPolicy = ConvertToUpstream(DefaultDownstreamPolicy); - std::vector<size_t> MissingRecordIndexes; + cacherequests::RecordsRequestPolicy UpstreamPolicy = {.DefaultPolicy = + ConvertToUpstream(Policy.DefaultPolicy) | CachePolicy::SkipMeta}; + UpstreamPolicy.RecordPolicies.resize(Request.Requests.size()); + for (size_t RequestIndex : UpstreamRecordIndexes) { - // Request.DefaultPolicy | CachePolicy::SkipMeta; - cacherequests::GetCacheRecordRequest& RecordRequest = Request.Requests[RequestIndex]; - RecordRequestData& RecordRequestData = RequestData[RequestIndex]; - const CacheRecordPolicy DownstreamPolicy = RecordRequestData.DownstreamPolicy.value_or(DefaultDownstreamPolicy); std::optional<cacherequests::GetCacheRecordResult>& RecordResult = OutResult.Results[RequestIndex]; if (!RecordResult) { - MissingRecordIndexes.push_back(RequestIndex); - RecordRequest.Policy = DownstreamPolicy.ConvertToUpstream(); + if (Policy.RecordPolicies[RequestIndex].has_value()) + { + UpstreamPolicy.RecordPolicies[RequestIndex] = Policy.RecordPolicies[RequestIndex]->ConvertToUpstream(); + } continue; } - CachePolicy UpstreamBasePolicy = ConvertToUpstream(DownstreamPolicy.GetBasePolicy()) | CachePolicy::SkipMeta; - + const CachePolicy UpstreamBasePolicy = + ConvertToUpstream(cacherequests::GetEffectiveBasePolicy(Policy, RequestIndex)) | CachePolicy::SkipMeta; CacheRecordPolicyBuilder Builder(UpstreamBasePolicy); - for (const cacherequests::GetCacheRecordResultValue& ResultValue : RecordResult->Values) + for (size_t ValueIndex = 0; ValueIndex < RecordResult->Values.size(); ++ValueIndex) { - CachePolicy ValuePolicy = ConvertToUpstream(DownstreamPolicy.GetValuePolicy(ResultValue.Id)); - if (!RecordRequestData.MissingValues.contains(ResultValue.RawHash)) + cacherequests::GetCacheRecordResultValue& Value = RecordResult->Values[ValueIndex]; + const CachePolicy ValuePolicy = cacherequests::GetEffectiveValuePolicy(Policy, RequestIndex, Value.Id); + CachePolicy ValueUpstreamPolicy = ConvertToUpstream(ValuePolicy); + bool QueryRemote = EnumHasAllFlags(ValueUpstreamPolicy, CachePolicy::QueryLocal); + bool GetData = !EnumHasAnyFlags(ValueUpstreamPolicy, CachePolicy::SkipData) && + !EnumHasAnyFlags(ValuePolicy, CachePolicy::SkipData); + bool GetFromUpstream = QueryRemote && GetData; + if (RecordResult->Values[ValueIndex].Body || !GetFromUpstream) + { + ValueUpstreamPolicy |= CachePolicy::SkipData; + } + if ((ValueUpstreamPolicy | CachePolicy::SkipMeta) != UpstreamBasePolicy) { - ValuePolicy |= CachePolicy::SkipData; + Builder.AddValuePolicy(Value.Id, ValueUpstreamPolicy); } - Builder.AddValuePolicy(ResultValue.Id, ValuePolicy); } - RecordRequest.Policy = Builder.Build(); + UpstreamPolicy.RecordPolicies[RequestIndex] = Builder.Build(); } - UpstreamCache.GetCacheRecords(Request, OutResult, UpstreamRecordIndexes); + UpstreamCache.GetCacheRecords(Request, UpstreamPolicy, OutResult, UpstreamRecordIndexes); for (size_t RequestIndex : UpstreamRecordIndexes) { @@ -1425,23 +1426,20 @@ namespace impl { continue; } - // Did we have the record locally? - if (std::find(MissingRecordIndexes.begin(), MissingRecordIndexes.end(), RequestIndex) != MissingRecordIndexes.end()) + if (RecordRequestData.MissingRecord) { RecordRequestData.UsedUpstream = true; - cacherequests::CacheRecord Record = {.Key = RecordResult->Key}; - Record.Values.reserve(RecordResult->Values.size()); - for (const cacherequests::GetCacheRecordResultValue& ResultValue : RecordResult->Values) + const CachePolicy RecordPolicy = cacherequests::GetEffectiveRecordPolicy(Policy, RequestIndex); + if (EnumHasAllFlags(RecordPolicy, CachePolicy::StoreLocal)) { - Record.Values.push_back({.Id = ResultValue.Id, .RawHash = ResultValue.RawHash, .RawSize = ResultValue.RawSize}); - RecordRequestData.MissingValues.insert(ResultValue.RawHash); - } - - const CacheRecordPolicy DownstreamPolicy = RecordRequestData.DownstreamPolicy.value_or(DefaultDownstreamPolicy); + cacherequests::CacheRecord Record = {.Key = RecordResult->Key}; + Record.Values.reserve(RecordResult->Values.size()); + for (const cacherequests::GetCacheRecordResultValue& ResultValue : RecordResult->Values) + { + Record.Values.push_back({.Id = ResultValue.Id, .RawHash = ResultValue.RawHash, .RawSize = ResultValue.RawSize}); + } - if (EnumHasAllFlags(DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) - { CbObjectWriter Writer; if (!Record.Format(Writer)) { @@ -1456,34 +1454,33 @@ namespace impl { for (cacherequests::GetCacheRecordResultValue& ValueResult : RecordResult->Values) { - if (!RecordRequestData.MissingValues.contains(ValueResult.RawHash)) + const CachePolicy UpstreamValuePolicy = GetEffectiveValuePolicy(UpstreamPolicy, RequestIndex, ValueResult.Id); + if (!EnumHasAnyFlags(UpstreamValuePolicy, CachePolicy::Query)) { continue; } - const CacheRecordPolicy DownstreamPolicy = RecordRequestData.DownstreamPolicy.value_or(DefaultDownstreamPolicy); - CachePolicy ValuePolicy = DownstreamPolicy.GetValuePolicy(ValueResult.Id); - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + if (EnumHasAnyFlags(UpstreamValuePolicy, CachePolicy::SkipData)) { - ZEN_ASSERT(ValueResult.Body.IsNull()); continue; } - - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) + if (RecordRequestData.MissingRecord) { - if (ValueResult.Body) - { - RecordRequestData.UsedUpstream = true; - RecordRequestData.MissingValues.erase(ValueResult.RawHash); - if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) - { - CidStore.AddChunk(ValueResult.Body); - } - } + RecordRequestData.MissingValueCount++; + ZEN_ASSERT(RecordRequestData.MissingValueCount <= RecordResult->Values.size()); + } + if (!ValueResult.Body) + { + continue; } - if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) + + ZEN_ASSERT(RecordRequestData.MissingValueCount > 0); + RecordRequestData.MissingValueCount--; + + RecordRequestData.UsedUpstream = true; + const CachePolicy ValuePolicy = cacherequests::GetEffectiveValuePolicy(Policy, RequestIndex, ValueResult.Id); + if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { - RecordRequestData.MissingValues.erase(ValueResult.RawHash); - ValueResult.Body = CompressedBuffer::Null; + CidStore.AddChunk(ValueResult.Body); } } } @@ -1491,24 +1488,30 @@ namespace impl { for (size_t RequestIndex = 0; RequestIndex < RequestCount; RequestIndex++) { - const cacherequests::GetCacheRecordRequest& RecordRequest = Request.Requests[RequestIndex]; - std::optional<cacherequests::GetCacheRecordResult>& RecordResult = OutResult.Results[RequestIndex]; + const CacheKey& Key = Request.Requests[RequestIndex]; + std::optional<cacherequests::GetCacheRecordResult>& RecordResult = OutResult.Results[RequestIndex]; if (!RecordResult) { MissCount++; continue; } - RecordRequestData& RecordRequestData = RequestData[RequestIndex]; - const CacheRecordPolicy DownstreamPolicy = RecordRequestData.DownstreamPolicy.value_or(DefaultDownstreamPolicy); - bool IsPartial = !RecordRequestData.MissingValues.empty(); - if (IsPartial && !EnumHasAllFlags(DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)) + RecordRequestData& RecordRequestData = RequestData[RequestIndex]; + const CachePolicy RecordPolicy = cacherequests::GetEffectiveRecordPolicy(Policy, RequestIndex); + bool IsPartial = RecordRequestData.MissingValueCount > 0; + //! RecordRequestData.MissingValues.empty(); + if (IsPartial && !EnumHasAllFlags(RecordPolicy, CachePolicy::PartialRecord)) { // Clear the result record here, caller did not want any partial records + ZEN_DEBUG("MISS - '{}/{}/{}' {}{}", + Request.Namespace, + Key.Bucket, + Key.Hash, + IsPartial ? " (PARTIAL)"sv : ""sv, + RecordRequestData.UsedUpstream ? " (UPSTREAM)"sv : ""sv); RecordResult.reset(); MissCount++; continue; } - const CacheKey& Key = RecordRequest.Key; ZEN_DEBUG("HIT - '{}/{}/{}' {}{}", Request.Namespace, Key.Bucket, @@ -1529,7 +1532,8 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt ZEN_TRACE_CPU("Z$::RpcGetCacheRecordsNew"); cacherequests::GetCacheRecordsRequest Request; - if (!Request.Parse(RpcRequest)) + cacherequests::RecordsRequestPolicy Policy; + if (!Request.Parse(RpcRequest, Policy)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } @@ -1541,7 +1545,8 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt m_CacheStats.HitCount, m_CacheStats.UpstreamHitCount, m_CacheStats.MissCount, - std::move(Request), + Request, + Policy, Result); if (Response != HttpResponseCode::OK) { @@ -2542,6 +2547,7 @@ namespace testutils { { public: typedef std::function<void(const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span<const size_t> IncludeIndexes)> GetCacheRecordsFunc; @@ -2554,11 +2560,12 @@ namespace testutils { void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&&) override final{}; void GetCacheRecords(const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span<const size_t> IncludeIndexes) override final { CHECK(GetCacheRecordsCallbacks.size() > 0); - GetCacheRecordsCallbacks[0](Request, OutResult, IncludeIndexes); + GetCacheRecordsCallbacks[0](Request, Policy, OutResult, IncludeIndexes); GetCacheRecordsCallbacks.erase(GetCacheRecordsCallbacks.begin()); } @@ -2709,20 +2716,24 @@ TEST_CASE("zcache.getcacherecords") CreateRecord("bucket", std::vector<size_t>({46, 828, 976}), Attachments[2])}; SUBCASE("defaultpolicy.notincache") { - cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {{.Key = Records[0].Key}}}; + cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; + cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; cacherequests::GetCacheRecordsResult Result; Upstream.GetCacheRecordsCallbacks.emplace_back([](const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span<const size_t> IncludeIndexes) { - CHECK(Request.DefaultPolicy == CachePolicy::Default); + CHECK(Policy.DefaultPolicy == (CachePolicy::Default | CachePolicy::SkipMeta)); CHECK(Request.Requests.size() == 1); + CHECK(Policy.RecordPolicies.size() == 1); + CHECK(cacherequests::GetEffectiveRecordPolicy(Policy, 0) == Policy.DefaultPolicy); CHECK(IncludeIndexes.size() == 1); CHECK(IncludeIndexes[0] == 0); OutResult.Results.resize(1); }); - CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, std::move(Request), Result) == + CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); CHECK(Result.Results.size() == 1); @@ -2736,18 +2747,24 @@ TEST_CASE("zcache.getcacherecords") { Put(CacheStore, "namespace", Records[0]); - cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {{.Key = Records[0].Key}}}; + cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; + cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; cacherequests::GetCacheRecordsResult Result; Upstream.GetCacheRecordsCallbacks.emplace_back([](const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span<const size_t> IncludeIndexes) { - CHECK(Request.DefaultPolicy == CachePolicy::Default); + CHECK(Policy.DefaultPolicy == (CachePolicy::Default | CachePolicy::SkipMeta)); CHECK(Request.Requests.size() == 1); + CHECK(Policy.RecordPolicies.size() == 1); + CHECK(Policy.RecordPolicies[0].has_value()); + CHECK(Policy.RecordPolicies[0]->GetRecordPolicy() == Policy.DefaultPolicy); + CHECK(Policy.RecordPolicies[0]->IsUniform()); CHECK(IncludeIndexes.size() == 1); CHECK(IncludeIndexes[0] == 0); OutResult.Results.resize(1); }); - CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, std::move(Request), Result) == + CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); // We did not request any partial results, so we should not get anything back @@ -2762,20 +2779,26 @@ TEST_CASE("zcache.getcacherecords") { Put(CacheStore, "namespace", Records[0]); - cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {{.Key = Records[0].Key}}}; - Request.DefaultPolicy |= CachePolicy::PartialRecord; + cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; + cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; + Policy.DefaultPolicy |= CachePolicy::PartialRecord; cacherequests::GetCacheRecordsResult Result; Upstream.GetCacheRecordsCallbacks.emplace_back([](const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span<const size_t> IncludeIndexes) { - CHECK(Request.DefaultPolicy == (CachePolicy::Default | CachePolicy::PartialRecord)); + CHECK(Policy.DefaultPolicy == (CachePolicy::Default | CachePolicy::PartialRecord | CachePolicy::SkipMeta)); CHECK(Request.Requests.size() == 1); + CHECK(Policy.RecordPolicies.size() == 1); + CHECK(Policy.RecordPolicies[0].has_value()); + CHECK(Policy.RecordPolicies[0]->GetRecordPolicy() == Policy.DefaultPolicy); + CHECK(Policy.RecordPolicies[0]->IsUniform()); CHECK(IncludeIndexes.size() == 1); CHECK(IncludeIndexes[0] == 0); OutResult.Results.resize(1); }); - CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, std::move(Request), Result) == + CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); CHECK(Result.Results.size() == 1); @@ -2794,10 +2817,11 @@ TEST_CASE("zcache.getcacherecords") Put(CacheStore, "namespace", Records[0]); Put(CidStore, Attachments[0]); - cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {{.Key = Records[0].Key}}}; + cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; + cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; cacherequests::GetCacheRecordsResult Result; - CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, std::move(Request), Result) == + CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); CHECK(Result.Results.size() == 1); @@ -2813,14 +2837,18 @@ TEST_CASE("zcache.getcacherecords") SUBCASE("defaultpolicy.allfromupstream") { - cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {{.Key = Records[0].Key}}}; + cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; + cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; cacherequests::GetCacheRecordsResult Result; Upstream.GetCacheRecordsCallbacks.emplace_back([&](const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span<const size_t> IncludeIndexes) { - CHECK(Request.DefaultPolicy == CachePolicy::Default); + CHECK(Policy.DefaultPolicy == (CachePolicy::Default | CachePolicy::SkipMeta)); CHECK(Request.Requests.size() == 1); + CHECK(Policy.RecordPolicies.size() == 1); + CHECK(!Policy.RecordPolicies[0].has_value()); CHECK(IncludeIndexes.size() == 1); CHECK(IncludeIndexes[0] == 0); OutResult.Results.resize(1); @@ -2840,7 +2868,7 @@ TEST_CASE("zcache.getcacherecords") .Body = Attachments[0][2]}}}; }); - CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, std::move(Request), Result) == + CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); CHECK(Result.Results.size() == 1); @@ -2862,14 +2890,20 @@ TEST_CASE("zcache.getcacherecords") { Put(CacheStore, "namespace", Records[0]); - cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {{.Key = Records[0].Key}}}; + cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; + cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; cacherequests::GetCacheRecordsResult Result; Upstream.GetCacheRecordsCallbacks.emplace_back([&](const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span<const size_t> IncludeIndexes) { - CHECK(Request.DefaultPolicy == CachePolicy::Default); + CHECK(Policy.DefaultPolicy == (CachePolicy::Default | CachePolicy::SkipMeta)); CHECK(Request.Requests.size() == 1); + CHECK(Policy.RecordPolicies.size() == 1); + CHECK(Policy.RecordPolicies[0].has_value()); + CHECK(Policy.RecordPolicies[0]->GetRecordPolicy() == Policy.DefaultPolicy); + CHECK(Policy.RecordPolicies[0]->IsUniform()); CHECK(IncludeIndexes.size() == 1); CHECK(IncludeIndexes[0] == 0); OutResult.Results.resize(1); @@ -2889,7 +2923,7 @@ TEST_CASE("zcache.getcacherecords") .Body = Attachments[0][2]}}}; }); - CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, std::move(Request), Result) == + CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); CHECK(Result.Results.size() == 1); @@ -2908,16 +2942,65 @@ TEST_CASE("zcache.getcacherecords") CHECK(MissCount == 0); } + SUBCASE("defaultpolicy.partialattachmentsfromupstream") + { + Put(CacheStore, "namespace", Records[0]); + Put(CidStore, std::vector<CompressedBuffer>{Attachments[0][0], Attachments[0][2]}); + + cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; + cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; + cacherequests::GetCacheRecordsResult Result; + + Upstream.GetCacheRecordsCallbacks.emplace_back([&](const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, + cacherequests::GetCacheRecordsResult& OutResult, + const std::span<const size_t> IncludeIndexes) { + CHECK(Policy.DefaultPolicy == (CachePolicy::Default | CachePolicy::SkipMeta)); + CHECK(Request.Requests.size() == 1); + CHECK(Policy.RecordPolicies.size() == 1); + CHECK(Policy.RecordPolicies[0].has_value()); + CHECK(Policy.RecordPolicies[0]->GetRecordPolicy() == Policy.DefaultPolicy); + CHECK(!Policy.RecordPolicies[0]->IsUniform()); + CHECK(EnumHasAnyFlags(Policy.RecordPolicies[0]->GetValuePolicy(Records[0].Values[0].Id), CachePolicy::SkipData)); + CHECK(!EnumHasAnyFlags(Policy.RecordPolicies[0]->GetValuePolicy(Records[0].Values[1].Id), CachePolicy::SkipData)); + CHECK(EnumHasAnyFlags(Policy.RecordPolicies[0]->GetValuePolicy(Records[0].Values[2].Id), CachePolicy::SkipData)); + CHECK(IncludeIndexes.size() == 1); + CHECK(IncludeIndexes[0] == 0); + OutResult.Results.resize(1); + CHECK(OutResult.Results[0].has_value()); + OutResult.Results[0]->Values[1] = cacherequests::GetCacheRecordResultValue{.Id = Records[0].Values[1].Id, + .RawHash = Records[0].Values[1].RawHash, + .RawSize = Records[0].Values[1].RawSize, + .Body = Attachments[0][1]}; + }); + + CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == + HttpResponseCode::OK); + + CHECK(Result.Results.size() == 1); + CHECK(Result.Results[0].has_value()); + CHECK(Result.Results[0]->Values.size() == 3); + CHECK(Result.Results[0]->Values[0].Body); + CHECK(Result.Results[0]->Values[1].Body); + CHECK(Result.Results[0]->Values[2].Body); + + CHECK(Count(CacheStore, "namespace", Records[0].Key) == 1); + CHECK(Count(CidStore, Attachments[0]) == 3); + + CHECK(HitCount == 1); + CHECK(UpstreamHitCount == 1); + CHECK(MissCount == 0); + } + SUBCASE("defaultpolicy.attachmentsinupstreamnoupstreampolicy") { Put(CacheStore, "namespace", Records[1]); - cacherequests::GetCacheRecordsRequest Request = {.DefaultPolicy = CachePolicy::Local, - .Namespace = "namespace", - .Requests = {{.Key = Records[0].Key}}}; + cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; + cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Local, .RecordPolicies = {{}}}; cacherequests::GetCacheRecordsResult Result; - CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, std::move(Request), Result) == + CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); CHECK(Result.Results.size() == 1); diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 821086a9d..228b0f69f 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -163,18 +163,20 @@ class CidStoreBase; namespace cacherequests { struct GetCacheRecordsRequest; + struct RecordsRequestPolicy; struct GetCacheRecordsResult; } // namespace cacherequests namespace impl { - HttpResponseCode GetCacheRecords(ZenCacheStoreBase& CacheStore, - CidStoreBase& CidStore, - UpstreamCache& UpstreamCache, - std::atomic_uint64_t& HitCount, - std::atomic_uint64_t& UpstreamHitCount, - std::atomic_uint64_t& MissCount, - cacherequests::GetCacheRecordsRequest&& Request, - cacherequests::GetCacheRecordsResult& OutResult); + HttpResponseCode GetCacheRecords(ZenCacheStoreBase& CacheStore, + CidStoreBase& CidStore, + UpstreamCache& UpstreamCache, + std::atomic_uint64_t& HitCount, + std::atomic_uint64_t& UpstreamHitCount, + std::atomic_uint64_t& MissCount, + const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, + cacherequests::GetCacheRecordsResult& OutResult); } /** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index e8b453cf8..9a661b82b 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -182,8 +182,9 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } virtual GetUpstreamCacheResult GetCacheRecords(const cacherequests::GetCacheRecordsRequest& Request, - cacherequests::GetCacheRecordsResult& OutResult, - const std::span<const size_t> IncludeIndexes) override final + const cacherequests::RecordsRequestPolicy&, + cacherequests::GetCacheRecordsResult& OutResult, + const std::span<const size_t> IncludeIndexes) override final { ZEN_TRACE_CPU("Upstream::Horde::GetCacheRecords"); ZEN_ASSERT(!IncludeIndexes.empty()); @@ -193,8 +194,7 @@ namespace detail { for (size_t RequestIndex : IncludeIndexes) { - const cacherequests::GetCacheRecordRequest& RecordRequest = Request.Requests[RequestIndex]; - const CacheKey& CacheKey = RecordRequest.Key; + const CacheKey& CacheKey = Request.Requests[RequestIndex]; if (!Result.Error) { @@ -851,13 +851,14 @@ namespace detail { } virtual GetUpstreamCacheResult GetCacheRecords(const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span<const size_t> IncludeIndexes) override final { ZEN_TRACE_CPU("Upstream::Zen::GetCacheRecords"); ZEN_ASSERT(IncludeIndexes.size() > 0); CbObjectWriter BatchRequest; - if (!Request.Format(BatchRequest, IncludeIndexes)) + if (!Request.Format(BatchRequest, Policy, IncludeIndexes)) { // TODO return {.Success = false}; @@ -1421,6 +1422,7 @@ public: } virtual void GetCacheRecords(const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span<const size_t> IncludeIndexes) override final { @@ -1462,7 +1464,7 @@ public: { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCacheRecords(Request, OutResult, RemainingRequestIndexes); + Result = Endpoint->GetCacheRecords(Request, Policy, OutResult, RemainingRequestIndexes); if (!Result.Success) { Stats.CacheErrorCount.Increment(1); @@ -1478,11 +1480,11 @@ public: { if (OutResult.Results[RequestIndex].has_value()) { - HitRecordRequests.push_back(Request.Requests[RequestIndex].Key); + HitRecordRequests.push_back(Request.Requests[RequestIndex]); Stats.CacheHitCount.Increment(1); continue; } - MissRecordRequests.push_back(Request.Requests[RequestIndex].Key); + MissRecordRequests.push_back(Request.Requests[RequestIndex]); MissingRequestIndexes.push_back(RequestIndex); } diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 958215952..0c703863a 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -152,6 +152,7 @@ struct UpstreamEndpointInfo namespace cacherequests { struct GetCacheRecordsRequest; struct GetCacheRecordsResult; + struct RecordsRequestPolicy; } // namespace cacherequests /** @@ -170,6 +171,7 @@ public: virtual UpstreamEndpointStatus GetStatus() = 0; virtual GetUpstreamCacheResult GetCacheRecords(const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span<const size_t> IncludeIndexes) = 0; @@ -210,6 +212,7 @@ public: virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0; virtual void GetCacheRecords(const cacherequests::GetCacheRecordsRequest& Request, + const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span<const size_t> IncludeIndexes) = 0; diff --git a/zenutil/cache/cacherequests.cpp b/zenutil/cache/cacherequests.cpp index 7ced55cf9..21e7bb3b7 100644 --- a/zenutil/cache/cacherequests.cpp +++ b/zenutil/cache/cacherequests.cpp @@ -181,7 +181,36 @@ namespace cacherequests { } } - bool PutCacheRecordsRequest::Parse(const CbPackage& Package, PutCacheRecordsRequestPolicy& Policy) + // RecordsRequestPolicy RecordsRequestPolicy::ConvertToUpstream(const std::vector<IoHash>& IncludeDataValueHashes) const + // { + // const std::optional<CacheRecordPolicy>& RecordPolicy = RecordPolicies[RequestIndex]; + // + // RecordRequestData& RecordRequestData = RequestData[RequestIndex]; + // + // std::optional<cacherequests::GetCacheRecordResult>& RecordResult = OutResult.Results[RequestIndex]; + // if (!RecordResult) + // { + // MissingRecordIndexes.push_back(RequestIndex); + // continue; + // } + // + // CachePolicy UpstreamBasePolicy = RecordPolicy.has_value() ? ConvertToUpstream(RecordPolicy->GetBasePolicy()) | + // CachePolicy::SkipMeta : UpstreamPolicy.DefaultPolicy; + // + // CacheRecordPolicyBuilder Builder(UpstreamBasePolicy); + // for (const cacherequests::GetCacheRecordResultValue& ResultValue : RecordResult->Values) + // { + // CachePolicy ValuePolicy = ConvertToUpstream(RecordPolicy.value_or(UpstreamBasePolicy).GetValuePolicy(ResultValue.Id)); + // if (!RecordRequestData.MissingValues.contains(ResultValue.RawHash)) + // { + // ValuePolicy |= CachePolicy::SkipData; + // } + // Builder.AddValuePolicy(ResultValue.Id, ValuePolicy); + // } + // UpstreamPolicy.RecordPolicies.emplace_back(Builder.Build()); + // } + + bool PutCacheRecordsRequest::Parse(const CbPackage& Package, RecordsRequestPolicy& OutPolicy) { CbObjectView BatchObject = Package.GetObject(); ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); @@ -192,12 +221,12 @@ namespace cacherequests { { return false; } - Namespace = *RequestNamespace; - Policy.DefaultPolicy = GetCachePolicy(Params, "DefaultPolicy"sv).value_or(CachePolicy::Default); + Namespace = *RequestNamespace; + OutPolicy.DefaultPolicy = GetCachePolicy(Params, "DefaultPolicy"sv).value_or(CachePolicy::Default); CbArrayView RequestFieldArray = Params["Requests"sv].AsArrayView(); Requests.resize(RequestFieldArray.Num()); - Policy.RecordPolicies.resize(RequestFieldArray.Num()); + OutPolicy.RecordPolicies.resize(RequestFieldArray.Num()); for (size_t RequestIndex = 0; CbFieldView RequestField : RequestFieldArray) { CbObjectView RequestObject = RequestField.AsObjectView(); @@ -205,7 +234,7 @@ namespace cacherequests { CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); PutCacheRecordRequest& Request = Requests[RequestIndex]; - std::optional<CacheRecordPolicy>& RequestPolicy = Policy.RecordPolicies[RequestIndex]; + std::optional<CacheRecordPolicy>& RequestPolicy = OutPolicy.RecordPolicies[RequestIndex]; if (!GetRequestCacheKey(KeyView, Request.Key)) { @@ -248,12 +277,9 @@ namespace cacherequests { return true; } - bool PutCacheRecordsRequest::Format(CbPackage& OutPackage, const PutCacheRecordsRequestPolicy& Policy) const + bool PutCacheRecordsRequest::Format(CbPackage& OutPackage, const RecordsRequestPolicy& Policy) const { - if (Policy.RecordPolicies.size() != 0 && (Requests.size() != Policy.RecordPolicies.size())) - { - return false; - } + ZEN_ASSERT(Requests.size() == Policy.RecordPolicies.size()); CbObjectWriter Writer; Writer << "Method"sv << "PutCacheRecords"sv; @@ -297,11 +323,8 @@ namespace cacherequests { Writer.EndArray(); } Writer.EndObject(); - if (Policy.RecordPolicies.size() != 0) - { - const std::optional<CacheRecordPolicy>& RecordPolicy = Policy.RecordPolicies[RequestIndex]; - WriteOptionalCacheRequestPolicy(Writer, "Policy"sv, Policy.DefaultPolicy, RecordPolicy); - } + const std::optional<CacheRecordPolicy>& RecordPolicy = Policy.RecordPolicies[RequestIndex]; + WriteOptionalCacheRequestPolicy(Writer, "Policy"sv, Policy.DefaultPolicy, RecordPolicy); } Writer.EndObject(); RequestIndex++; @@ -344,7 +367,7 @@ namespace cacherequests { return true; } - bool GetCacheRecordsRequest::Parse(const CbObjectView& RpcRequest) + bool GetCacheRecordsRequest::Parse(const CbObjectView& RpcRequest, RecordsRequestPolicy& OutPolicy) { ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); @@ -355,60 +378,74 @@ namespace cacherequests { return false; } - Namespace = *RequestNamespace; - DefaultPolicy = GetCachePolicy(Params, "DefaultPolicy"sv).value_or(CachePolicy::Default); + Namespace = *RequestNamespace; + OutPolicy.DefaultPolicy = GetCachePolicy(Params, "DefaultPolicy"sv).value_or(CachePolicy::Default); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); Requests.reserve(RequestsArray.Num()); + OutPolicy.RecordPolicies.reserve(RequestsArray.Num()); for (CbFieldView RequestField : RequestsArray) { CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - GetCacheRecordRequest& Request = Requests.emplace_back(); + CacheKey& Key = Requests.emplace_back(); + std::optional<CacheRecordPolicy>& RecordPolicy = OutPolicy.RecordPolicies.emplace_back(); - if (!GetRequestCacheKey(KeyObject, Request.Key)) + if (!GetRequestCacheKey(KeyObject, Key)) { return false; } - Request.Policy = Convert(CacheRecordPolicy::Load(RequestObject["Policy"sv].AsObjectView())); + RecordPolicy = Convert(CacheRecordPolicy::Load(RequestObject["Policy"sv].AsObjectView())); } return true; } - bool GetCacheRecordsRequest::Parse(const CbPackage& RpcRequest) { return Parse(RpcRequest.GetObject()); } + bool GetCacheRecordsRequest::Parse(const CbPackage& RpcRequest, RecordsRequestPolicy& Policy) + { + return Parse(RpcRequest.GetObject(), Policy); + } - bool GetCacheRecordsRequest::Format(CbObjectWriter& Writer, const std::span<const size_t> OptionalRecordFilter) const + bool GetCacheRecordsRequest::Format(CbObjectWriter& Writer, + const RecordsRequestPolicy& Policy, + const std::span<const size_t> OptionalRecordFilter) const { + ZEN_ASSERT(Requests.size() == Policy.RecordPolicies.size()); Writer << "Method"sv << "GetCacheRecords"sv; Writer.BeginObject("Params"sv); { - Writer << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy); + Writer << "DefaultPolicy"sv << WriteToString<128>(Policy.DefaultPolicy); Writer << "Namespace"sv << Namespace; Writer.BeginArray("Requests"sv); if (OptionalRecordFilter.empty()) { - for (const GetCacheRecordRequest& RecordRequest : Requests) + for (size_t RequestIndex = 0; const CacheKey& Key : Requests) { Writer.BeginObject(); { - WriteCacheRequestKey(Writer, RecordRequest.Key); - WriteOptionalCacheRequestPolicy(Writer, "Policy"sv, DefaultPolicy, RecordRequest.Policy); + WriteCacheRequestKey(Writer, Key); + const std::optional<CacheRecordPolicy>& RecordPolicy = Policy.RecordPolicies[RequestIndex]; + WriteOptionalCacheRequestPolicy(Writer, "Policy"sv, Policy.DefaultPolicy, RecordPolicy); } Writer.EndObject(); + RequestIndex++; } } else { for (size_t Index : OptionalRecordFilter) { - const GetCacheRecordRequest& RecordRequest = Requests[Index]; + const CacheKey& Key = Requests[Index]; Writer.BeginObject(); { - WriteCacheRequestKey(Writer, RecordRequest.Key); - WriteOptionalCacheRequestPolicy(Writer, "Policy"sv, DefaultPolicy, RecordRequest.Policy); + WriteCacheRequestKey(Writer, Key); + if (Policy.RecordPolicies.size() != 0) + { + const std::optional<CacheRecordPolicy>& RecordPolicy = Policy.RecordPolicies[Index]; + WriteOptionalCacheRequestPolicy(Writer, "Policy"sv, Policy.DefaultPolicy, RecordPolicy); + } } Writer.EndObject(); } @@ -420,10 +457,12 @@ namespace cacherequests { return true; } - bool GetCacheRecordsRequest::Format(CbPackage& OutPackage, const std::span<const size_t> OptionalRecordFilter) const + bool GetCacheRecordsRequest::Format(CbPackage& OutPackage, + const RecordsRequestPolicy& Policy, + const std::span<const size_t> OptionalRecordFilter) const { CbObjectWriter Writer; - if (!Format(Writer, OptionalRecordFilter)) + if (!Format(Writer, Policy, OptionalRecordFilter)) { return false; } @@ -1081,7 +1120,7 @@ namespace cacherequests { return (Lhs.Namespace == Rhs.Namespace) && (Lhs.Requests == Rhs.Requests); } - static bool operator==(const PutCacheRecordsRequestPolicy& Lhs, const PutCacheRecordsRequestPolicy& Rhs) + static bool operator==(const RecordsRequestPolicy& Lhs, const RecordsRequestPolicy& Rhs) { if (Lhs.DefaultPolicy != Rhs.DefaultPolicy) { @@ -1103,14 +1142,9 @@ namespace cacherequests { static bool operator==(const PutCacheRecordsResult& Lhs, const PutCacheRecordsResult& Rhs) { return (Lhs.Success == Rhs.Success); } - static bool operator==(const GetCacheRecordRequest& Lhs, const GetCacheRecordRequest& Rhs) - { - return (Lhs.Key == Rhs.Key) && (Lhs.Policy == Rhs.Policy); - } - static bool operator==(const GetCacheRecordsRequest& Lhs, const GetCacheRecordsRequest& Rhs) { - return (Lhs.DefaultPolicy == Rhs.DefaultPolicy) && (Lhs.Namespace == Rhs.Namespace) && (Lhs.Requests == Rhs.Requests); + return (Lhs.Namespace == Rhs.Namespace) && (Lhs.Requests == Rhs.Requests); } static bool operator==(const GetCacheRecordResultValue& Lhs, const GetCacheRecordResultValue& Rhs) @@ -1217,12 +1251,12 @@ namespace cacherequests { { PutCacheRecordsRequest EmptyRequest; CbPackage EmptyRequestPackage; - CHECK(EmptyRequest.Format(EmptyRequestPackage, PutCacheRecordsRequestPolicy{})); - PutCacheRecordsRequest EmptyRequestCopy; - PutCacheRecordsRequestPolicy EmptyRequestPolicyCopy; + CHECK(EmptyRequest.Format(EmptyRequestPackage, RecordsRequestPolicy{})); + PutCacheRecordsRequest EmptyRequestCopy; + RecordsRequestPolicy EmptyRequestPolicyCopy; CHECK(!EmptyRequestCopy.Parse(EmptyRequestPackage, EmptyRequestPolicyCopy)); // Namespace is required CHECK(EmptyRequest == EmptyRequestCopy); - CHECK(EmptyRequestPolicyCopy == PutCacheRecordsRequestPolicy{}); + CHECK(EmptyRequestPolicyCopy == RecordsRequestPolicy{}); PutCacheRecordsRequest FullRequest = { .Namespace = "the_namespace", @@ -1238,13 +1272,13 @@ namespace cacherequests { .Values = {{.Id = Oid::NewOid(), .Body = MakeCompressedBuffer(19)}, {.Id = Oid::NewOid(), .Body = MakeCompressedBuffer(1248)}, {.Id = Oid::NewOid(), .Body = MakeCompressedBuffer(823)}}}}}; - PutCacheRecordsRequestPolicy FullRequestePolicy = {.DefaultPolicy = CachePolicy::Remote, - .RecordPolicies = {CachePolicy::StoreLocal, CachePolicy::Store, {}}}; + RecordsRequestPolicy FullRequestePolicy = {.DefaultPolicy = CachePolicy::Remote, + .RecordPolicies = {CachePolicy::StoreLocal, CachePolicy::Store, {}}}; CbPackage FullRequestPackage; CHECK(FullRequest.Format(FullRequestPackage, FullRequestePolicy)); - PutCacheRecordsRequest FullRequestCopy; - PutCacheRecordsRequestPolicy FullRequestePolicyCopy; + PutCacheRecordsRequest FullRequestCopy; + RecordsRequestPolicy FullRequestePolicyCopy; CHECK(FullRequestCopy.Parse(FullRequestPackage, FullRequestePolicyCopy)); CHECK(FullRequest == FullRequestCopy); CHECK(FullRequestePolicy == FullRequestePolicyCopy); @@ -1267,33 +1301,42 @@ namespace cacherequests { TEST_CASE("cacherequests.get.cache.records") { GetCacheRecordsRequest EmptyRequest; + RecordsRequestPolicy EmptyRequestPolicy; CbPackage EmptyRequestPackage; - CHECK(EmptyRequest.Format(EmptyRequestPackage)); + CHECK(EmptyRequest.Format(EmptyRequestPackage, EmptyRequestPolicy)); GetCacheRecordsRequest EmptyRequestCopy; - CHECK(!EmptyRequestCopy.Parse(EmptyRequestPackage)); // Namespace is required + RecordsRequestPolicy EmptyRequestPolicyCopy; + CHECK(!EmptyRequestCopy.Parse(EmptyRequestPackage, EmptyRequestPolicyCopy)); // Namespace is required + CHECK(EmptyRequest == EmptyRequestCopy); + CHECK(EmptyRequestPolicyCopy == EmptyRequestPolicyCopy); GetCacheRecordsRequest FullRequest = { - .DefaultPolicy = CachePolicy::StoreLocal, - .Namespace = "other_namespace", - .Requests = {{.Key = {.Bucket = "finebucket", .Hash = IoHash::FromHexString("d1df59fcab06793a5f2c372d795bb907a15cab15")}, - .Policy = CachePolicy::Local}, - {.Key = {.Bucket = "badbucket", .Hash = IoHash::FromHexString("177030568fdd461bf4fe5ddbf4d463e514e8178e")}, - .Policy = CachePolicy::Remote}, - {.Key = {.Bucket = "badbucket", .Hash = IoHash::FromHexString("e1ce9e1ac8a6f5953dc14c1fa9512b804ed689df")}}}}; + .Namespace = "other_namespace", + .Requests = {{.Bucket = "finebucket", .Hash = IoHash::FromHexString("d1df59fcab06793a5f2c372d795bb907a15cab15")}, + {.Bucket = "badbucket", .Hash = IoHash::FromHexString("177030568fdd461bf4fe5ddbf4d463e514e8178e")}, + {.Bucket = "badbucket", .Hash = IoHash::FromHexString("e1ce9e1ac8a6f5953dc14c1fa9512b804ed689df")}}}; + RecordsRequestPolicy FullRequestPolicy = {.DefaultPolicy = CachePolicy::StoreLocal, + .RecordPolicies = {CachePolicy::Local, CachePolicy::Remote, {}}}; CbPackage FullRequestPackage; - CHECK(FullRequest.Format(FullRequestPackage)); + CHECK(FullRequest.Format(FullRequestPackage, FullRequestPolicy)); GetCacheRecordsRequest FullRequestCopy; - CHECK(FullRequestCopy.Parse(FullRequestPackage)); + RecordsRequestPolicy FullRequestPolicyCopy; + CHECK(FullRequestCopy.Parse(FullRequestPackage, FullRequestPolicyCopy)); CHECK(FullRequest == FullRequestCopy); + CHECK(FullRequestPolicy == FullRequestPolicyCopy); CbPackage PartialRequestPackage; - CHECK(FullRequest.Format(PartialRequestPackage, std::initializer_list<size_t>{1, 2})); + CHECK(FullRequest.Format(PartialRequestPackage, FullRequestPolicy, std::initializer_list<size_t>{1, 2})); GetCacheRecordsRequest PartialRequest = FullRequest; + RecordsRequestPolicy PartialPolicy = FullRequestPolicy; PartialRequest.Requests.erase(PartialRequest.Requests.begin()); + PartialPolicy.RecordPolicies.erase(PartialPolicy.RecordPolicies.begin()); GetCacheRecordsRequest PartialRequestCopy; - CHECK(PartialRequestCopy.Parse(PartialRequestPackage)); + RecordsRequestPolicy PartialPolicyCopy; + CHECK(PartialRequestCopy.Parse(PartialRequestPackage, PartialPolicyCopy)); CHECK(PartialRequest == PartialRequestCopy); + CHECK(PartialPolicy == PartialPolicyCopy); GetCacheRecordsResult EmptyResult; CbPackage EmptyResponsePackage; @@ -1317,13 +1360,13 @@ namespace cacherequests { {.Id = Oid::NewOid(), .Body = MakeCompressedBuffer(1248)}, {.Id = Oid::NewOid(), .Body = MakeCompressedBuffer(823)}}}}}; - PutCacheRecordsRequestPolicy FullPutRequestPolicy = {.DefaultPolicy = CachePolicy::Remote, - .RecordPolicies = {CachePolicy::StoreLocal, CachePolicy::Store, {}}}; + RecordsRequestPolicy FullPutRequestPolicy = {.DefaultPolicy = CachePolicy::Remote, + .RecordPolicies = {CachePolicy::StoreLocal, CachePolicy::Store, {}}}; CbPackage FullPutRequestPackage; CHECK(FullPutRequest.Format(FullPutRequestPackage, FullPutRequestPolicy)); - PutCacheRecordsRequest FullPutRequestCopy; - PutCacheRecordsRequestPolicy FullPutRequestPolicyCopy; + PutCacheRecordsRequest FullPutRequestCopy; + RecordsRequestPolicy FullPutRequestPolicyCopy; CHECK(FullPutRequestCopy.Parse(FullPutRequestPackage, FullPutRequestPolicyCopy)); GetCacheRecordsResult FullResult = { diff --git a/zenutil/include/zenutil/cache/cacherequests.h b/zenutil/include/zenutil/cache/cacherequests.h index a14fd311a..e11a0478b 100644 --- a/zenutil/include/zenutil/cache/cacherequests.h +++ b/zenutil/include/zenutil/cache/cacherequests.h @@ -55,6 +55,52 @@ namespace cacherequests { // all its values without knowing all the Ids, right? // + struct RecordsRequestPolicy + { + CachePolicy DefaultPolicy = CachePolicy::Default; + std::vector<std::optional<CacheRecordPolicy>> RecordPolicies; + // RecordsRequestPolicy ConvertToUpstream(const std::vector<IoHash>& IncludeDataValueHashes) const; + }; + + inline CachePolicy GetEffectiveRecordPolicy(const RecordsRequestPolicy& Policy, size_t RecordIndex) + { + if (Policy.RecordPolicies.size() == 0) + { + return Policy.DefaultPolicy; + } + if (!Policy.RecordPolicies[RecordIndex].has_value()) + { + return Policy.DefaultPolicy; + } + return Policy.RecordPolicies[RecordIndex]->GetRecordPolicy(); + } + + inline CachePolicy GetEffectiveBasePolicy(const RecordsRequestPolicy& Policy, size_t RecordIndex) + { + if (Policy.RecordPolicies.size() == 0) + { + return Policy.DefaultPolicy; + } + if (!Policy.RecordPolicies[RecordIndex].has_value()) + { + return Policy.DefaultPolicy; + } + return Policy.RecordPolicies[RecordIndex]->GetBasePolicy(); + } + + inline CachePolicy GetEffectiveValuePolicy(const RecordsRequestPolicy& Policy, size_t RecordIndex, Oid ValueId) + { + if (Policy.RecordPolicies.size() == 0) + { + return Policy.DefaultPolicy; + } + if (!Policy.RecordPolicies[RecordIndex].has_value()) + { + return Policy.DefaultPolicy; + } + return Policy.RecordPolicies[RecordIndex]->GetValuePolicy(ValueId); + } + ////////////////////////////////////////////////////////////////////////// // Put 1..n structured cache records with optional attachments @@ -71,19 +117,13 @@ namespace cacherequests { std::vector<PutCacheRecordRequestValue> Values; }; - struct PutCacheRecordsRequestPolicy - { - CachePolicy DefaultPolicy = CachePolicy::Default; - std::vector<std::optional<CacheRecordPolicy>> RecordPolicies; - }; - struct PutCacheRecordsRequest { std::string Namespace; std::vector<PutCacheRecordRequest> Requests; - bool Parse(const CbPackage& Package, PutCacheRecordsRequestPolicy& Policy); - bool Format(CbPackage& OutPackage, const PutCacheRecordsRequestPolicy& Policy) const; + bool Parse(const CbPackage& Package, RecordsRequestPolicy& OutPolicy); + bool Format(CbPackage& OutPackage, const RecordsRequestPolicy& Policy) const; }; struct PutCacheRecordsResult @@ -101,22 +141,19 @@ namespace cacherequests { // we still want them. // Not sure if in that case we want different policies for the different attachemnts? - struct GetCacheRecordRequest - { - CacheKey Key = CacheKey::Empty; - std::optional<CacheRecordPolicy> Policy; - }; - struct GetCacheRecordsRequest { - CachePolicy DefaultPolicy = CachePolicy::Default; - std::string Namespace; - std::vector<GetCacheRecordRequest> Requests; - - bool Parse(const CbPackage& RpcRequest); - bool Parse(const CbObjectView& RpcRequest); - bool Format(CbPackage& OutPackage, const std::span<const size_t> OptionalRecordFilter = {}) const; - bool Format(CbObjectWriter& Writer, const std::span<const size_t> OptionalRecordFilter = {}) const; + std::string Namespace; + std::vector<CacheKey> Requests; + + bool Parse(const CbPackage& RpcRequest, RecordsRequestPolicy& Policy); + bool Parse(const CbObjectView& RpcRequest, RecordsRequestPolicy& Policy); + bool Format(CbPackage& OutPackage, + const RecordsRequestPolicy& Policy, + const std::span<const size_t> OptionalRecordFilter = {}) const; + bool Format(CbObjectWriter& Writer, + const RecordsRequestPolicy& Policy, + const std::span<const size_t> OptionalRecordFilter = {}) const; }; struct GetCacheRecordResultValue |