diff options
| author | Dan Engelbrecht <[email protected]> | 2025-08-26 11:43:37 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-08-26 11:43:37 +0200 |
| commit | fb137cf9c8b7a9d1659b03472c9591c4863e9173 (patch) | |
| tree | bbd0df09d4b425ef668d22f3b12ea2cb3482bf66 /src/zenstore/cache/cacherpc.cpp | |
| parent | Merge pull request #139 from ue-foundation/de/zen-service-command (diff) | |
| download | zen-fb137cf9c8b7a9d1659b03472c9591c4863e9173.tar.xz zen-fb137cf9c8b7a9d1659b03472c9591c4863e9173.zip | |
revert multi-cid store (#475)
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 54 |
1 files changed, 17 insertions, 37 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index ff21d1ede..5d9a68919 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -153,13 +153,13 @@ CacheRpcHandler::CacheRpcHandler(LoggerRef InLog, CacheStats& InCacheStats, UpstreamCacheClient& InUpstreamCache, ZenCacheStore& InCacheStore, - GetCidStoreFunc&& InGetCidStore, + CidStore& InCidStore, const DiskWriteBlocker* InDiskWriteBlocker) : m_Log(InLog) , m_CacheStats(InCacheStats) , m_UpstreamCache(InUpstreamCache) , m_CacheStore(InCacheStore) -, m_GetCidStore(std::move(InGetCidStore)) +, m_CidStore(InCidStore) , m_DiskWriteBlocker(InDiskWriteBlocker) { } @@ -174,12 +174,6 @@ CacheRpcHandler::AreDiskWritesAllowed() const return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); } -CidStore& -CacheRpcHandler::GetCidStore(std::string_view Namespace) -{ - return m_GetCidStore(Namespace); -} - CacheRpcHandler::RpcResponseCode CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, std::string_view UriNamespace, @@ -387,12 +381,9 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag Stopwatch Timer; - CidStore& ChunkStore = m_GetCidStore(Request.Namespace); - Request.RecordObject.IterateAttachments([this, &Request, Package, - &ChunkStore, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, @@ -421,7 +412,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag Count.Invalid++; } } - else if (ChunkStore.ContainsChunk(ValueHash)) + else if (m_CidStore.ContainsChunk(ValueHash)) { ValidAttachments.emplace_back(ValueHash); Count.Valid++; @@ -457,7 +448,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (!WriteAttachmentBuffers.empty()) { - std::vector<CidStore::InsertResult> InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (size_t Index = 0; Index < InsertResults.size(); Index++) { if (InsertResults[Index].New) @@ -484,12 +475,10 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbPackage, - .Namespace = Request.Namespace, - .Key = Request.Key, - .ValueContentIds = std::move(ValidAttachments)}, - [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .Namespace = Request.Namespace, + .Key = Request.Key, + .ValueContentIds = std::move(ValidAttachments)}); } return PutStatus::Success; } @@ -532,8 +521,6 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb return CbPackage{}; } - CidStore& ChunkStore = m_GetCidStore(Namespace.value()); - const bool HasUpstream = m_UpstreamCache.IsActive(); eastl::fixed_vector<RecordRequestData, 16> Requests; @@ -633,7 +620,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { - if (ChunkStore.ContainsChunk(Value.ContentId)) + if (m_CidStore.ContainsChunk(Value.ContentId)) { Value.Exists = true; } @@ -654,7 +641,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } else { - if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Value.ContentId)) + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) { if (Chunk.GetSize() > 0) { @@ -678,7 +665,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } if (!RequestValueIndexes.empty()) { - ChunkStore.IterateChunks( + m_CidStore.IterateChunks( CidHashes, [this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { try @@ -771,7 +758,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } } - const auto OnCacheRecordGetComplete = [this, Namespace, &ChunkStore, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; @@ -840,7 +827,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Value.Exists = true; if (StoreLocal) { - ChunkStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); + m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { @@ -957,8 +944,6 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - CidStore& ChunkStore = m_GetCidStore(Namespace.value()); - std::vector<ZenCacheStore::PutResult> BatchResults; eastl::fixed_vector<size_t, 32> BatchResultIndexes; eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results; @@ -1109,8 +1094,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty) { m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}, - [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); + {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}); } } { @@ -1562,8 +1546,6 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); - CidStore& ChunkStore = m_GetCidStore(Namespace); - // TODO: BatchGet records? std::vector<CacheKeyRequest*> UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) @@ -1702,12 +1684,12 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown) { - if (ChunkStore.ContainsChunk(Request->Key->ChunkId)) + if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) { Request->Exists = true; } } - else if (IoBuffer Payload = ChunkStore.FindChunkByCid(Request->Key->ChunkId)) + else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { @@ -1840,8 +1822,6 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, return; } - CidStore& ChunkStore = m_GetCidStore(Namespace); - CacheChunkRequest& Key = Params.Request; size_t RequestIndex = std::distance(RequestKeys.data(), &Key); ChunkRequest& Request = Requests[RequestIndex]; @@ -1861,7 +1841,7 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal); if (Request.IsRecordRequest) { - ChunkStore.AddChunk(Params.Value, Params.RawHash); + m_CidStore.AddChunk(Params.Value, Params.RawHash); } else { |