diff options
| author | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
| commit | bb298631ba35a323827dda0b8cd6158e276b5f61 (patch) | |
| tree | 7ba8db91c44ce83f2c518f80f80ab14910eefa6f /src/zenstore/cache/cacherpc.cpp | |
| parent | Change to PutResult structure (diff) | |
| parent | 5.6.14 (diff) | |
| download | zen-bb298631ba35a323827dda0b8cd6158e276b5f61.tar.xz zen-bb298631ba35a323827dda0b8cd6158e276b5f61.zip | |
Merge branch 'main' into zs/put-overwrite-policy
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 116 |
1 files changed, 72 insertions, 44 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 20c244250..436e8a083 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -20,6 +20,8 @@ #include <zencore/memory/llm.h> +#include <EASTL/fixed_vector.h> + ////////////////////////////////////////////////////////////////////////// namespace zen { @@ -89,7 +91,7 @@ GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) return false; } IoHash Hash = HashField.AsHash(); - Key = CacheKey::Create(*Bucket, Hash); + Key = CacheKey::CreateValidated(std::move(*Bucket), Hash); return true; } @@ -218,6 +220,11 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, ZEN_WARN("Content format not supported, expected package message format"); return RpcResponseCode::BadRequest; } + if (CbValidateError Error = ValidateCompactBinary(Object.GetView(), CbValidateMode::Default); Error != CbValidateError::None) + { + ZEN_WARN("Content format is corrupt, compact binary format validation failed. Reason: '{}'", ToString(Error)); + return RpcResponseCode::BadRequest; + } } if (!UriNamespace.empty()) @@ -305,7 +312,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co } DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::vector<bool> Results; + eastl::fixed_vector<bool, 32> Results; CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); for (CbFieldView RequestField : RequestsArray) @@ -495,16 +502,15 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb bool Exists = false; bool ReadFromUpstream = false; }; - struct RecordRequestData + struct RecordRequestData : public CacheKeyRequest { - CacheKeyRequest Upstream; - CbObjectView RecordObject; - IoBuffer RecordCacheValue; - CacheRecordPolicy DownstreamPolicy; - std::vector<ValueRequestData> Values; - bool Complete = false; - const UpstreamEndpointInfo* Source = nullptr; - uint64_t ElapsedTimeUs; + CbObjectView RecordObject; + IoBuffer RecordCacheValue; + CacheRecordPolicy DownstreamPolicy; + eastl::fixed_vector<ValueRequestData, 4> Values; + bool Complete = false; + const UpstreamEndpointInfo* Source = nullptr; + uint64_t ElapsedTimeUs; }; std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); @@ -517,8 +523,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb const bool HasUpstream = m_UpstreamCache.IsActive(); - std::vector<RecordRequestData> Requests; - std::vector<size_t> UpstreamIndexes; + eastl::fixed_vector<RecordRequestData, 16> Requests; + eastl::fixed_vector<size_t, 16> UpstreamIndexes; auto ParseValues = [](RecordRequestData& Request) { CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView(); @@ -549,7 +555,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - CacheKey& Key = Request.Upstream.Key; + CacheKey& Key = Request.Key; if (!GetRpcRequestCacheKey(KeyObject, Key)) { return CbPackage{}; @@ -571,6 +577,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb { FoundLocalInvalid = true; } + else if (CbValidateError Error = ValidateCompactBinary(Request.RecordCacheValue.GetView(), CbValidateMode::Default); + Error != CbValidateError::None) + { + ZEN_WARN("HandleRpcGetCacheRecords stored record is corrupt, compact binary format validation failed. Reason: '{}'", + ToString(Error)); + FoundLocalInvalid = true; + } else { Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); @@ -654,7 +667,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb { m_CidStore.IterateChunks( CidHashes, - [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { + [this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { try { const size_t ValueIndex = RequestValueIndexes[Index]; @@ -721,7 +734,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb for (size_t Index : UpstreamIndexes) { RecordRequestData& Request = Requests[Index]; - UpstreamRequests.push_back(&Request.Upstream); + UpstreamRequests.push_back(&Request); if (Request.Values.size()) { @@ -735,13 +748,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None; Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy); } - Request.Upstream.Policy = Builder.Build(); + Request.Policy = Builder.Build(); } else { // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants, // and convert the CacheRecordPolicy to an upstream policy - Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream(); + Request.Policy = Request.DownstreamPolicy.ConvertToUpstream(); } } @@ -751,10 +764,9 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb return; } - RecordRequestData& Request = - *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream)); + RecordRequestData& Request = *static_cast<RecordRequestData*>(&Params.Request); Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - const CacheKey& Key = Request.Upstream.Key; + const CacheKey& Key = Request.Key; Stopwatch Timer; auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); }); if (!Request.RecordObject) @@ -852,10 +864,12 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb CbPackage ResponsePackage; CbObjectWriter ResponseObject{2048}; + ResponsePackage.ReserveAttachments(Requests.size()); + ResponseObject.BeginArray("Result"sv); for (RecordRequestData& Request : Requests) { - const CacheKey& Key = Request.Upstream.Key; + const CacheKey& Key = Request.Key; if (Request.Complete || (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) { @@ -930,11 +944,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<ZenCacheStore::PutResult> BatchResults; - std::vector<size_t> BatchResultIndexes; - std::vector<ZenCacheStore::PutResult> Results; - std::vector<CacheKey> UpstreamCacheKeys; - uint64_t RequestCount = RequestsArray.Num(); + std::vector<ZenCacheStore::PutResult> BatchResults; + eastl::fixed_vector<size_t, 32> BatchResultIndexes; + eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results; + eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys; + + uint64_t RequestCount = RequestsArray.Num(); { Results.reserve(RequestCount); std::unique_ptr<ZenCacheStore::PutBatch> Batch; @@ -1145,15 +1160,15 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO uint64_t RawSize = 0; CompressedBuffer Result; }; - std::vector<RequestData> Requests; + eastl::fixed_vector<RequestData, 16> Requests; - std::vector<size_t> RemoteRequestIndexes; + eastl::fixed_vector<size_t, 16> RemoteRequestIndexes; const bool HasUpstream = m_UpstreamCache.IsActive(); - CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<ZenCacheValue> CacheValues; - const uint64_t RequestCount = RequestsArray.Num(); + CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + ZenCacheValueVec_t CacheValues; + const uint64_t RequestCount = RequestsArray.Num(); CacheValues.reserve(RequestCount); { std::unique_ptr<ZenCacheStore::GetBatch> Batch; @@ -1182,7 +1197,6 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO CacheKey& Key = Request.Key; CachePolicy Policy = Request.Policy; - ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { if (Batch) @@ -1328,6 +1342,9 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Response"); CbPackage RpcResponse; CbObjectWriter ResponseObject{1024}; + + RpcResponse.ReserveAttachments(Requests.size()); + ResponseObject.BeginArray("Result"sv); for (const RequestData& Request : Requests) { @@ -1622,18 +1639,27 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, Record.ValuesRead = true; if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject) { - CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); - CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); - Record.Values.reserve(ValuesArray.Num()); - for (CbFieldView ValueField : ValuesArray) + if (CbValidateError Error = ValidateCompactBinary(Record.CacheValue.GetView(), CbValidateMode::Default); + Error != CbValidateError::None) { - CbObjectView ValueObject = ValueField.AsObjectView(); - Oid ValueId = ValueObject["Id"sv].AsObjectId(); - CbFieldView RawHashField = ValueObject["RawHash"sv]; - IoHash RawHash = RawHashField.AsBinaryAttachment(); - if (ValueId && !RawHashField.HasError()) + ZEN_WARN("GetLocalCacheRecords stored record for is corrupt, compact binary format validation failed. Reason: '{}'", + ToString(Error)); + } + else + { + CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); + CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); + Record.Values.reserve(ValuesArray.Num()); + for (CbFieldView ValueField : ValuesArray) { - Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); + CbObjectView ValueObject = ValueField.AsObjectView(); + Oid ValueId = ValueObject["Id"sv].AsObjectId(); + CbFieldView RawHashField = ValueObject["RawHash"sv]; + IoHash RawHash = RawHashField.AsBinaryAttachment(); + if (ValueId && !RawHashField.HasError()) + { + Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); + } } } } @@ -1706,7 +1732,7 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); - std::vector<ZenCacheValue> Chunks; + ZenCacheValueVec_t Chunks; Chunks.reserve(ValueRequests.size()); { std::unique_ptr<ZenCacheStore::GetBatch> Batch; @@ -1866,6 +1892,8 @@ CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequest CbPackage RpcResponse; CbObjectWriter Writer{1024}; + RpcResponse.ReserveAttachments(Requests.size()); + Writer.BeginArray("Result"sv); for (ChunkRequest& Request : Requests) { |