From 920120bbcec9f91df3336f62970b3e010a4fa6c2 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 6 Mar 2025 16:18:32 +0100 Subject: reduced memory churn using fixed_xxx containers (#236) * Added EASTL to help with eliminating memory allocations * Applied EASTL to eliminate memory allocations, primarily by using `fixed_vector` et al to use stack allocations / inline struct allocations Reduces memory events in traces by close to a factor of 10 in test scenario (starting editor for project F) --- src/zenstore/cache/cacherpc.cpp | 73 ++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 33 deletions(-) (limited to 'src/zenstore/cache/cacherpc.cpp') diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index cca51e63e..97e26a38d 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -20,6 +20,8 @@ #include +#include + ////////////////////////////////////////////////////////////////////////// namespace zen { @@ -89,7 +91,7 @@ GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) return false; } IoHash Hash = HashField.AsHash(); - Key = CacheKey::Create(*Bucket, Hash); + Key = CacheKey::CreateValidated(std::move(*Bucket), Hash); return true; } @@ -305,7 +307,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co } DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::vector Results; + eastl::fixed_vector Results; CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); for (CbFieldView RequestField : RequestsArray) @@ -481,16 +483,15 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb bool Exists = false; bool ReadFromUpstream = false; }; - struct RecordRequestData + struct RecordRequestData : public CacheKeyRequest { - CacheKeyRequest Upstream; - CbObjectView RecordObject; - IoBuffer RecordCacheValue; - CacheRecordPolicy DownstreamPolicy; - std::vector Values; - bool Complete = false; - const UpstreamEndpointInfo* Source = nullptr; - uint64_t ElapsedTimeUs; + CbObjectView RecordObject; + IoBuffer RecordCacheValue; + CacheRecordPolicy DownstreamPolicy; + eastl::fixed_vector Values; + bool Complete = false; + const UpstreamEndpointInfo* Source = nullptr; + uint64_t ElapsedTimeUs; }; std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); @@ -503,8 +504,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb const bool HasUpstream = m_UpstreamCache.IsActive(); - std::vector Requests; - std::vector UpstreamIndexes; + eastl::fixed_vector Requests; + eastl::fixed_vector UpstreamIndexes; auto ParseValues = [](RecordRequestData& Request) { CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView(); @@ -535,7 +536,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - CacheKey& Key = Request.Upstream.Key; + CacheKey& Key = Request.Key; if (!GetRpcRequestCacheKey(KeyObject, Key)) { return CbPackage{}; @@ -707,7 +708,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb for (size_t Index : UpstreamIndexes) { RecordRequestData& Request = Requests[Index]; - UpstreamRequests.push_back(&Request.Upstream); + UpstreamRequests.push_back(&Request); if (Request.Values.size()) { @@ -721,13 +722,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None; Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy); } - Request.Upstream.Policy = Builder.Build(); + Request.Policy = Builder.Build(); } else { // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants, // and convert the CacheRecordPolicy to an upstream policy - Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream(); + Request.Policy = Request.DownstreamPolicy.ConvertToUpstream(); } } @@ -737,10 +738,9 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb return; } - RecordRequestData& Request = - *reinterpret_cast(reinterpret_cast(&Params.Request) - offsetof(RecordRequestData, Upstream)); + RecordRequestData& Request = *static_cast(&Params.Request); Request.ElapsedTimeUs += static_cast(Params.ElapsedSeconds * 1000000.0); - const CacheKey& Key = Request.Upstream.Key; + const CacheKey& Key = Request.Key; Stopwatch Timer; auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); }); if (!Request.RecordObject) @@ -832,10 +832,12 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb CbPackage ResponsePackage; CbObjectWriter ResponseObject{2048}; + ResponsePackage.ReserveAttachments(Requests.size()); + ResponseObject.BeginArray("Result"sv); for (RecordRequestData& Request : Requests) { - const CacheKey& Key = Request.Upstream.Key; + const CacheKey& Key = Request.Key; if (Request.Complete || (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) { @@ -910,11 +912,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector BatchResults; - std::vector BatchResultIndexes; - std::vector Results; - std::vector UpstreamCacheKeys; - uint64_t RequestCount = RequestsArray.Num(); + std::vector BatchResults; + eastl::fixed_vector BatchResultIndexes; + eastl::fixed_vector Results; + eastl::fixed_vector UpstreamCacheKeys; + + uint64_t RequestCount = RequestsArray.Num(); { Results.reserve(RequestCount); std::unique_ptr Batch; @@ -1099,15 +1102,15 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO uint64_t RawSize = 0; CompressedBuffer Result; }; - std::vector Requests; + eastl::fixed_vector Requests; - std::vector RemoteRequestIndexes; + eastl::fixed_vector RemoteRequestIndexes; const bool HasUpstream = m_UpstreamCache.IsActive(); - CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector CacheValues; - const uint64_t RequestCount = RequestsArray.Num(); + CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + ZenCacheValueVec_t CacheValues; + const uint64_t RequestCount = RequestsArray.Num(); CacheValues.reserve(RequestCount); { std::unique_ptr Batch; @@ -1136,7 +1139,6 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO CacheKey& Key = Request.Key; CachePolicy Policy = Request.Policy; - ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { if (Batch) @@ -1276,6 +1278,9 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Response"); CbPackage RpcResponse; CbObjectWriter ResponseObject{1024}; + + RpcResponse.ReserveAttachments(Requests.size()); + ResponseObject.BeginArray("Result"sv); for (const RequestData& Request : Requests) { @@ -1642,7 +1647,7 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); - std::vector Chunks; + ZenCacheValueVec_t Chunks; Chunks.reserve(ValueRequests.size()); { std::unique_ptr Batch; @@ -1796,6 +1801,8 @@ CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequest CbPackage RpcResponse; CbObjectWriter Writer{1024}; + RpcResponse.ReserveAttachments(Requests.size()); + Writer.BeginArray("Result"sv); for (ChunkRequest& Request : Requests) { -- cgit v1.2.3 From 8d927c40cf10361870c1b42384f4f00d6cc2b10c Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 2 May 2025 14:15:55 +0200 Subject: cbobject validation (#377) * validate incoming CbObject to cache when receiving a package * validate records when fetched from store in cache before parsing them --- src/zenstore/cache/cacherpc.cpp | 41 +++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) (limited to 'src/zenstore/cache/cacherpc.cpp') diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 97e26a38d..bf78dae86 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -220,6 +220,11 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, ZEN_WARN("Content format not supported, expected package message format"); return RpcResponseCode::BadRequest; } + if (CbValidateError Error = ValidateCompactBinary(Object.GetView(), CbValidateMode::Default); Error != CbValidateError::None) + { + ZEN_WARN("Content format is corrupt, compact binary format validation failed. Reason: '{}'", ToString(Error)); + return RpcResponseCode::BadRequest; + } } if (!UriNamespace.empty()) @@ -558,6 +563,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb { FoundLocalInvalid = true; } + else if (CbValidateError Error = ValidateCompactBinary(Request.RecordCacheValue.GetView(), CbValidateMode::Default); + Error != CbValidateError::None) + { + ZEN_WARN("HandleRpcGetCacheRecords stored record is corrupt, compact binary format validation failed. Reason: '{}'", + ToString(Error)); + FoundLocalInvalid = true; + } else { Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); @@ -1563,18 +1575,27 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, Record.ValuesRead = true; if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject) { - CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); - CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); - Record.Values.reserve(ValuesArray.Num()); - for (CbFieldView ValueField : ValuesArray) + if (CbValidateError Error = ValidateCompactBinary(Record.CacheValue.GetView(), CbValidateMode::Default); + Error != CbValidateError::None) + { + ZEN_WARN("GetLocalCacheRecords stored record for is corrupt, compact binary format validation failed. Reason: '{}'", + ToString(Error)); + } + else { - CbObjectView ValueObject = ValueField.AsObjectView(); - Oid ValueId = ValueObject["Id"sv].AsObjectId(); - CbFieldView RawHashField = ValueObject["RawHash"sv]; - IoHash RawHash = RawHashField.AsBinaryAttachment(); - if (ValueId && !RawHashField.HasError()) + CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); + CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); + Record.Values.reserve(ValuesArray.Num()); + for (CbFieldView ValueField : ValuesArray) { - Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); + CbObjectView ValueObject = ValueField.AsObjectView(); + Oid ValueId = ValueObject["Id"sv].AsObjectId(); + CbFieldView RawHashField = ValueObject["RawHash"sv]; + IoHash RawHash = RawHashField.AsBinaryAttachment(); + if (ValueId && !RawHashField.HasError()) + { + Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); + } } } } -- cgit v1.2.3 From a9c83b3c1299692923e2c16b696f5b9e211f5737 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 2 May 2025 15:33:58 +0200 Subject: iterate chunks crash fix (#376) * Bugfix: Add explicit lambda capture in CasContainer::IterateChunks to avoid accessing state data references --- src/zenstore/cache/cacherpc.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/zenstore/cache/cacherpc.cpp') diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index bf78dae86..de4b0a37c 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -653,7 +653,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb { m_CidStore.IterateChunks( CidHashes, - [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { + [this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { try { const size_t ValueIndex = RequestValueIndexes[Index]; -- cgit v1.2.3