aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cacherpc.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
-rw-r--r--src/zenstore/cache/cacherpc.cpp125
1 files changed, 116 insertions, 9 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 6cdfeabd0..0a1ed0e09 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -16,11 +16,30 @@
#include <zenstore/cidstore.h>
#include <zenutil/cache/cacherequests.h>
#include <zenutil/packageformat.h>
+#include <zenutil/workerpools.h>
+
+#include <zencore/memory/llm.h>
//////////////////////////////////////////////////////////////////////////
namespace zen {
+const FLLMTag&
+GetCacheTag()
+{
+ static FLLMTag CacheTag("cache");
+
+ return CacheTag;
+}
+
+const FLLMTag&
+GetCacheRpcTag()
+{
+ static FLLMTag CacheRpcTag("rpc", GetCacheTag());
+
+ return CacheRpcTag;
+}
+
using namespace std::literals;
std::optional<std::string>
@@ -155,6 +174,7 @@ CacheRpcHandler::AreDiskWritesAllowed() const
CacheRpcHandler::RpcResponseCode
CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
+ std::string_view UriNamespace,
const ZenContentType ContentType,
IoBuffer&& Body,
uint32_t& OutAcceptMagic,
@@ -164,6 +184,8 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
{
ZEN_TRACE_CPU("Z$::HandleRpcRequest");
+ ZEN_MEMSCOPE(GetCacheRpcTag());
+
m_CacheStats.RpcRequests.fetch_add(1);
CbPackage Package;
@@ -173,11 +195,17 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
{
if (ContentType == ZenContentType::kCbObject)
{
+ if (CbValidateError Error = ValidateCompactBinary(Body.GetView(), CbValidateMode::Default); Error != CbValidateError::None)
+ {
+ ZEN_WARN("Content format is corrupt, compact binary format validation failed. Reason: '{}'", ToString(Error));
+ return RpcResponseCode::BadRequest;
+ }
+
ObjectBuffer = LoadCompactBinaryObject(std::move(Body));
Object = ObjectBuffer;
if (!Object)
{
- ZEN_WARN("Content format not supported, expected compact binary format")
+ ZEN_WARN("Content format not supported, expected compact binary format");
return RpcResponseCode::BadRequest;
}
}
@@ -191,10 +219,27 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
return RpcResponseCode::BadRequest;
}
}
+
+ if (!UriNamespace.empty())
+ {
+ CbObjectView Params = Object["Params"sv].AsObjectView();
+ std::optional<std::string> ParamsNamespace = GetRpcRequestNamespace(Params);
+
+ if (ParamsNamespace)
+ {
+ if (UriNamespace != ParamsNamespace)
+ {
+ ZEN_WARN("Rpc message namespace mismatch, request rejected. Expected '{}', received '{}'",
+ UriNamespace,
+ ParamsNamespace.value());
+ return RpcResponseCode::BadRequest;
+ }
+ }
+ }
}
catch (const std::invalid_argument& ex)
{
- ZEN_WARN("Invalid rpc message package recevied, reason: '{}'", ex.what());
+ ZEN_WARN("Invalid rpc message package received, reason: '{}'", ex.what());
return RpcResponseCode::BadRequest;
}
OutAcceptMagic = Object["Accept"sv].AsUInt32();
@@ -295,7 +340,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
return CbPackage{};
}
- CbObjectWriter ResponseObject;
+ CbObjectWriter ResponseObject{256};
ResponseObject.BeginArray("Result"sv);
for (bool Value : Results)
{
@@ -517,7 +562,17 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
ParseValues(Request);
- Request.Complete = true;
+ Request.Complete = true;
+ size_t ValueCount = Request.Values.size();
+ std::vector<IoHash> CidHashes;
+ std::vector<size_t> RequestValueIndexes;
+ const bool DoBatch = ValueCount > 7;
+ if (DoBatch)
+ {
+ CidHashes.reserve(ValueCount);
+ RequestValueIndexes.reserve(ValueCount);
+ }
+ size_t ValueIndex = 0;
for (ValueRequestData& Value : Request.Values)
{
CachePolicy ValuePolicy = Value.DownstreamPolicy;
@@ -552,6 +607,11 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Request.Complete = false;
}
}
+ else if (DoBatch)
+ {
+ CidHashes.push_back(Value.ContentId);
+ RequestValueIndexes.push_back(ValueIndex);
+ }
else
{
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
@@ -567,7 +627,6 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
}
}
-
if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
{
NeedUpstreamAttachment = true;
@@ -575,6 +634,53 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
Request.Complete = false;
}
+ ValueIndex++;
+ }
+ if (!RequestValueIndexes.empty())
+ {
+ m_CidStore.IterateChunks(
+ CidHashes,
+ [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool {
+ try
+ {
+ const size_t ValueIndex = RequestValueIndexes[Index];
+ ValueRequestData& Value = Request.Values[ValueIndex];
+ if (Payload)
+ {
+ Value.Payload = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Payload)).MakeOwned();
+ if (Value.Payload)
+ {
+ Value.Exists = true;
+ }
+ else
+ {
+ ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("CacheRpcHandler::HandleRpcGetCacheRecords IterateChunks callback failed with '{}'", Ex.what());
+ }
+ return true;
+ },
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ 64u * 1024u);
+
+ for (size_t Index : RequestValueIndexes)
+ {
+ ValueRequestData& Value = Request.Values[Index];
+ if (!Value.Exists)
+ {
+ const CachePolicy ValuePolicy = Value.DownstreamPolicy;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
+ }
+ Request.Complete = false;
+ }
+ }
}
}
}
@@ -724,7 +830,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
{
ZEN_TRACE_CPU("Z$::RpcGetCacheRecords::Response");
CbPackage ResponsePackage;
- CbObjectWriter ResponseObject;
+ CbObjectWriter ResponseObject{2048};
ResponseObject.BeginArray("Result"sv);
for (RecordRequestData& Request : Requests)
@@ -953,7 +1059,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
}
{
ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
- CbObjectWriter ResponseObject;
+ CbObjectWriter ResponseObject{1024};
ResponseObject.BeginArray("Result"sv);
for (bool Value : Results)
{
@@ -1169,7 +1275,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
{
ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Response");
CbPackage RpcResponse;
- CbObjectWriter ResponseObject;
+ CbObjectWriter ResponseObject{1024};
ResponseObject.BeginArray("Result"sv);
for (const RequestData& Request : Requests)
{
@@ -1371,6 +1477,7 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
+ // TODO: BatchGet records?
std::vector<CacheKeyRequest*> UpstreamRecordRequests;
for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
{
@@ -1687,7 +1794,7 @@ CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequest
const bool AcceptsPartialChunks = EnumHasAnyFlags(AcceptOptions, RpcAcceptOptions::kAllowPartialCacheChunks);
CbPackage RpcResponse;
- CbObjectWriter Writer;
+ CbObjectWriter Writer{1024};
Writer.BeginArray("Result"sv);
for (ChunkRequest& Request : Requests)