aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cacherpc.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-08-26 11:43:37 +0200
committerGitHub Enterprise <[email protected]>2025-08-26 11:43:37 +0200
commitfb137cf9c8b7a9d1659b03472c9591c4863e9173 (patch)
treebbd0df09d4b425ef668d22f3b12ea2cb3482bf66 /src/zenstore/cache/cacherpc.cpp
parentMerge pull request #139 from ue-foundation/de/zen-service-command (diff)
downloadzen-fb137cf9c8b7a9d1659b03472c9591c4863e9173.tar.xz
zen-fb137cf9c8b7a9d1659b03472c9591c4863e9173.zip
revert multi-cid store (#475)
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
-rw-r--r--src/zenstore/cache/cacherpc.cpp54
1 files changed, 17 insertions, 37 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index ff21d1ede..5d9a68919 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -153,13 +153,13 @@ CacheRpcHandler::CacheRpcHandler(LoggerRef InLog,
CacheStats& InCacheStats,
UpstreamCacheClient& InUpstreamCache,
ZenCacheStore& InCacheStore,
- GetCidStoreFunc&& InGetCidStore,
+ CidStore& InCidStore,
const DiskWriteBlocker* InDiskWriteBlocker)
: m_Log(InLog)
, m_CacheStats(InCacheStats)
, m_UpstreamCache(InUpstreamCache)
, m_CacheStore(InCacheStore)
-, m_GetCidStore(std::move(InGetCidStore))
+, m_CidStore(InCidStore)
, m_DiskWriteBlocker(InDiskWriteBlocker)
{
}
@@ -174,12 +174,6 @@ CacheRpcHandler::AreDiskWritesAllowed() const
return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
}
-CidStore&
-CacheRpcHandler::GetCidStore(std::string_view Namespace)
-{
- return m_GetCidStore(Namespace);
-}
-
CacheRpcHandler::RpcResponseCode
CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
std::string_view UriNamespace,
@@ -387,12 +381,9 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
Stopwatch Timer;
- CidStore& ChunkStore = m_GetCidStore(Request.Namespace);
-
Request.RecordObject.IterateAttachments([this,
&Request,
Package,
- &ChunkStore,
&WriteAttachmentBuffers,
&WriteRawHashes,
&ValidAttachments,
@@ -421,7 +412,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
Count.Invalid++;
}
}
- else if (ChunkStore.ContainsChunk(ValueHash))
+ else if (m_CidStore.ContainsChunk(ValueHash))
{
ValidAttachments.emplace_back(ValueHash);
Count.Valid++;
@@ -457,7 +448,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (!WriteAttachmentBuffers.empty())
{
- std::vector<CidStore::InsertResult> InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
for (size_t Index = 0; Index < InsertResults.size(); Index++)
{
if (InsertResults[Index].New)
@@ -484,12 +475,10 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCbPackage,
- .Namespace = Request.Namespace,
- .Key = Request.Key,
- .ValueContentIds = std::move(ValidAttachments)},
- [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
+ .Namespace = Request.Namespace,
+ .Key = Request.Key,
+ .ValueContentIds = std::move(ValidAttachments)});
}
return PutStatus::Success;
}
@@ -532,8 +521,6 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
return CbPackage{};
}
- CidStore& ChunkStore = m_GetCidStore(Namespace.value());
-
const bool HasUpstream = m_UpstreamCache.IsActive();
eastl::fixed_vector<RecordRequestData, 16> Requests;
@@ -633,7 +620,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
- if (ChunkStore.ContainsChunk(Value.ContentId))
+ if (m_CidStore.ContainsChunk(Value.ContentId))
{
Value.Exists = true;
}
@@ -654,7 +641,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
else
{
- if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Value.ContentId))
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
{
if (Chunk.GetSize() > 0)
{
@@ -678,7 +665,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
if (!RequestValueIndexes.empty())
{
- ChunkStore.IterateChunks(
+ m_CidStore.IterateChunks(
CidHashes,
[this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool {
try
@@ -771,7 +758,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
}
- const auto OnCacheRecordGetComplete = [this, Namespace, &ChunkStore, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) {
if (!Params.Record)
{
return;
@@ -840,7 +827,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Value.Exists = true;
if (StoreLocal)
{
- ChunkStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
+ m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
}
if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
@@ -957,8 +944,6 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- CidStore& ChunkStore = m_GetCidStore(Namespace.value());
-
std::vector<ZenCacheStore::PutResult> BatchResults;
eastl::fixed_vector<size_t, 32> BatchResultIndexes;
eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results;
@@ -1109,8 +1094,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty)
{
m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]},
- [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
+ {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]});
}
}
{
@@ -1562,8 +1546,6 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
- CidStore& ChunkStore = m_GetCidStore(Namespace);
-
// TODO: BatchGet records?
std::vector<CacheKeyRequest*> UpstreamRecordRequests;
for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
@@ -1702,12 +1684,12 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
{
if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
{
- if (ChunkStore.ContainsChunk(Request->Key->ChunkId))
+ if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
{
Request->Exists = true;
}
}
- else if (IoBuffer Payload = ChunkStore.FindChunkByCid(Request->Key->ChunkId))
+ else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
{
if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
@@ -1840,8 +1822,6 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
return;
}
- CidStore& ChunkStore = m_GetCidStore(Namespace);
-
CacheChunkRequest& Key = Params.Request;
size_t RequestIndex = std::distance(RequestKeys.data(), &Key);
ChunkRequest& Request = Requests[RequestIndex];
@@ -1861,7 +1841,7 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal);
if (Request.IsRecordRequest)
{
- ChunkStore.AddChunk(Params.Value, Params.RawHash);
+ m_CidStore.AddChunk(Params.Value, Params.RawHash);
}
else
{