diff options
Diffstat (limited to 'src/zenstore/cache')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 4 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 69 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 2 |
3 files changed, 70 insertions, 5 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 9f09713ee..25f68330a 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1267,6 +1267,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept size_t IndexOffset = 0; m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { + ZEN_MEMSCOPE(GetCacheDiskTag()); std::vector<DiskIndexEntry> DiskEntries; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); @@ -2679,6 +2680,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, Value.Value.Size(), m_Configuration.PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { + ZEN_MEMSCOPE(GetCacheDiskTag()); ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation"); DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); m_SlogFile.Append({.Key = HashKey, .Location = Location}); @@ -3856,7 +3858,7 @@ ZenCacheDiskLayer::DiscoverBuckets() ZEN_TRACE_CPU("Z$::DiscoverBuckets"); DirectoryContent DirContent; - GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); + GetDirectoryContent(m_RootDir, DirectoryContentFlags::IncludeDirs, DirContent); // Initialize buckets diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index e6b6be525..cca51e63e 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -10,12 +10,13 @@ #include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zenhttp/packageformat.h> #include <zenstore/cache/cacheshared.h> #include <zenstore/cache/structuredcachestore.h> #include <zenstore/cache/upstreamcacheclient.h> #include <zenstore/cidstore.h> #include <zenutil/cache/cacherequests.h> -#include <zenutil/packageformat.h> +#include <zenutil/workerpools.h> #include <zencore/memory/llm.h> @@ -561,7 +562,17 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); ParseValues(Request); - Request.Complete = true; + Request.Complete = true; + size_t ValueCount = Request.Values.size(); + std::vector<IoHash> CidHashes; + std::vector<size_t> RequestValueIndexes; + const bool DoBatch = ValueCount > 7; + if (DoBatch) + { + CidHashes.reserve(ValueCount); + RequestValueIndexes.reserve(ValueCount); + } + size_t ValueIndex = 0; for (ValueRequestData& Value : Request.Values) { CachePolicy ValuePolicy = Value.DownstreamPolicy; @@ -596,6 +607,11 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Request.Complete = false; } } + else if (DoBatch) + { + CidHashes.push_back(Value.ContentId); + RequestValueIndexes.push_back(ValueIndex); + } else { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) @@ -611,7 +627,6 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId); } } - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { NeedUpstreamAttachment = true; @@ -619,6 +634,53 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } Request.Complete = false; } + ValueIndex++; + } + if (!RequestValueIndexes.empty()) + { + m_CidStore.IterateChunks( + CidHashes, + [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { + try + { + const size_t ValueIndex = RequestValueIndexes[Index]; + ValueRequestData& Value = Request.Values[ValueIndex]; + if (Payload) + { + Value.Payload = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Payload)).MakeOwned(); + if (Value.Payload) + { + Value.Exists = true; + } + else + { + ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId); + } + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("CacheRpcHandler::HandleRpcGetCacheRecords IterateChunks callback failed with '{}'", Ex.what()); + } + return true; + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + 64u * 1024u); + + for (size_t Index : RequestValueIndexes) + { + ValueRequestData& Value = Request.Values[Index]; + if (!Value.Exists) + { + const CachePolicy ValuePolicy = Value.DownstreamPolicy; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + NeedUpstreamAttachment = true; + Value.ReadFromUpstream = true; + } + Request.Complete = false; + } + } } } } @@ -1415,6 +1477,7 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); + // TODO: BatchGet records? std::vector<CacheKeyRequest*> UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) { diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index c14ea73a8..133cb42d7 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -424,7 +424,7 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, ZEN_INFO("initializing cache store at '{}'", m_BasePath); DirectoryContent DirContent; - GetDirectoryContent(m_BasePath, DirectoryContent::IncludeDirsFlag, DirContent); + GetDirectoryContent(m_BasePath, DirectoryContentFlags::IncludeDirs, DirContent); std::vector<std::string> Namespaces; for (const std::filesystem::path& DirPath : DirContent.Directories) |