diff options
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 125 |
1 files changed, 116 insertions, 9 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 6cdfeabd0..0a1ed0e09 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -16,11 +16,30 @@ #include <zenstore/cidstore.h> #include <zenutil/cache/cacherequests.h> #include <zenutil/packageformat.h> +#include <zenutil/workerpools.h> + +#include <zencore/memory/llm.h> ////////////////////////////////////////////////////////////////////////// namespace zen { +const FLLMTag& +GetCacheTag() +{ + static FLLMTag CacheTag("cache"); + + return CacheTag; +} + +const FLLMTag& +GetCacheRpcTag() +{ + static FLLMTag CacheRpcTag("rpc", GetCacheTag()); + + return CacheRpcTag; +} + using namespace std::literals; std::optional<std::string> @@ -155,6 +174,7 @@ CacheRpcHandler::AreDiskWritesAllowed() const CacheRpcHandler::RpcResponseCode CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, + std::string_view UriNamespace, const ZenContentType ContentType, IoBuffer&& Body, uint32_t& OutAcceptMagic, @@ -164,6 +184,8 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, { ZEN_TRACE_CPU("Z$::HandleRpcRequest"); + ZEN_MEMSCOPE(GetCacheRpcTag()); + m_CacheStats.RpcRequests.fetch_add(1); CbPackage Package; @@ -173,11 +195,17 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, { if (ContentType == ZenContentType::kCbObject) { + if (CbValidateError Error = ValidateCompactBinary(Body.GetView(), CbValidateMode::Default); Error != CbValidateError::None) + { + ZEN_WARN("Content format is corrupt, compact binary format validation failed. Reason: '{}'", ToString(Error)); + return RpcResponseCode::BadRequest; + } + ObjectBuffer = LoadCompactBinaryObject(std::move(Body)); Object = ObjectBuffer; if (!Object) { - ZEN_WARN("Content format not supported, expected compact binary format") + ZEN_WARN("Content format not supported, expected compact binary format"); return RpcResponseCode::BadRequest; } } @@ -191,10 +219,27 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, return RpcResponseCode::BadRequest; } } + + if (!UriNamespace.empty()) + { + CbObjectView Params = Object["Params"sv].AsObjectView(); + std::optional<std::string> ParamsNamespace = GetRpcRequestNamespace(Params); + + if (ParamsNamespace) + { + if (UriNamespace != ParamsNamespace) + { + ZEN_WARN("Rpc message namespace mismatch, request rejected. Expected '{}', received '{}'", + UriNamespace, + ParamsNamespace.value()); + return RpcResponseCode::BadRequest; + } + } + } } catch (const std::invalid_argument& ex) { - ZEN_WARN("Invalid rpc message package recevied, reason: '{}'", ex.what()); + ZEN_WARN("Invalid rpc message package received, reason: '{}'", ex.what()); return RpcResponseCode::BadRequest; } OutAcceptMagic = Object["Accept"sv].AsUInt32(); @@ -295,7 +340,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co return CbPackage{}; } - CbObjectWriter ResponseObject; + CbObjectWriter ResponseObject{256}; ResponseObject.BeginArray("Result"sv); for (bool Value : Results) { @@ -517,7 +562,17 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); ParseValues(Request); - Request.Complete = true; + Request.Complete = true; + size_t ValueCount = Request.Values.size(); + std::vector<IoHash> CidHashes; + std::vector<size_t> RequestValueIndexes; + const bool DoBatch = ValueCount > 7; + if (DoBatch) + { + CidHashes.reserve(ValueCount); + RequestValueIndexes.reserve(ValueCount); + } + size_t ValueIndex = 0; for (ValueRequestData& Value : Request.Values) { CachePolicy ValuePolicy = Value.DownstreamPolicy; @@ -552,6 +607,11 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Request.Complete = false; } } + else if (DoBatch) + { + CidHashes.push_back(Value.ContentId); + RequestValueIndexes.push_back(ValueIndex); + } else { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) @@ -567,7 +627,6 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId); } } - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { NeedUpstreamAttachment = true; @@ -575,6 +634,53 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } Request.Complete = false; } + ValueIndex++; + } + if (!RequestValueIndexes.empty()) + { + m_CidStore.IterateChunks( + CidHashes, + [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { + try + { + const size_t ValueIndex = RequestValueIndexes[Index]; + ValueRequestData& Value = Request.Values[ValueIndex]; + if (Payload) + { + Value.Payload = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Payload)).MakeOwned(); + if (Value.Payload) + { + Value.Exists = true; + } + else + { + ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId); + } + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("CacheRpcHandler::HandleRpcGetCacheRecords IterateChunks callback failed with '{}'", Ex.what()); + } + return true; + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + 64u * 1024u); + + for (size_t Index : RequestValueIndexes) + { + ValueRequestData& Value = Request.Values[Index]; + if (!Value.Exists) + { + const CachePolicy ValuePolicy = Value.DownstreamPolicy; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + NeedUpstreamAttachment = true; + Value.ReadFromUpstream = true; + } + Request.Complete = false; + } + } } } } @@ -724,7 +830,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb { ZEN_TRACE_CPU("Z$::RpcGetCacheRecords::Response"); CbPackage ResponsePackage; - CbObjectWriter ResponseObject; + CbObjectWriter ResponseObject{2048}; ResponseObject.BeginArray("Result"sv); for (RecordRequestData& Request : Requests) @@ -953,7 +1059,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con } { ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response"); - CbObjectWriter ResponseObject; + CbObjectWriter ResponseObject{1024}; ResponseObject.BeginArray("Result"sv); for (bool Value : Results) { @@ -1169,7 +1275,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO { ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Response"); CbPackage RpcResponse; - CbObjectWriter ResponseObject; + CbObjectWriter ResponseObject{1024}; ResponseObject.BeginArray("Result"sv); for (const RequestData& Request : Requests) { @@ -1371,6 +1477,7 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); + // TODO: BatchGet records? std::vector<CacheKeyRequest*> UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) { @@ -1687,7 +1794,7 @@ CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequest const bool AcceptsPartialChunks = EnumHasAnyFlags(AcceptOptions, RpcAcceptOptions::kAllowPartialCacheChunks); CbPackage RpcResponse; - CbObjectWriter Writer; + CbObjectWriter Writer{1024}; Writer.BeginArray("Result"sv); for (ChunkRequest& Request : Requests) |