aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/cache')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp4
-rw-r--r--src/zenstore/cache/cacherpc.cpp69
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp2
3 files changed, 70 insertions, 5 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 9f09713ee..25f68330a 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1267,6 +1267,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
size_t IndexOffset = 0;
m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
+ ZEN_MEMSCOPE(GetCacheDiskTag());
std::vector<DiskIndexEntry> DiskEntries;
{
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
@@ -2679,6 +2680,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
Value.Value.Size(),
m_Configuration.PayloadAlignment,
[&](const BlockStoreLocation& BlockStoreLocation) {
+ ZEN_MEMSCOPE(GetCacheDiskTag());
ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation");
DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags);
m_SlogFile.Append({.Key = HashKey, .Location = Location});
@@ -3856,7 +3858,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
ZEN_TRACE_CPU("Z$::DiscoverBuckets");
DirectoryContent DirContent;
- GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent);
+ GetDirectoryContent(m_RootDir, DirectoryContentFlags::IncludeDirs, DirContent);
// Initialize buckets
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index e6b6be525..cca51e63e 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -10,12 +10,13 @@
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zenhttp/packageformat.h>
#include <zenstore/cache/cacheshared.h>
#include <zenstore/cache/structuredcachestore.h>
#include <zenstore/cache/upstreamcacheclient.h>
#include <zenstore/cidstore.h>
#include <zenutil/cache/cacherequests.h>
-#include <zenutil/packageformat.h>
+#include <zenutil/workerpools.h>
#include <zencore/memory/llm.h>
@@ -561,7 +562,17 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
ParseValues(Request);
- Request.Complete = true;
+ Request.Complete = true;
+ size_t ValueCount = Request.Values.size();
+ std::vector<IoHash> CidHashes;
+ std::vector<size_t> RequestValueIndexes;
+ const bool DoBatch = ValueCount > 7;
+ if (DoBatch)
+ {
+ CidHashes.reserve(ValueCount);
+ RequestValueIndexes.reserve(ValueCount);
+ }
+ size_t ValueIndex = 0;
for (ValueRequestData& Value : Request.Values)
{
CachePolicy ValuePolicy = Value.DownstreamPolicy;
@@ -596,6 +607,11 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Request.Complete = false;
}
}
+ else if (DoBatch)
+ {
+ CidHashes.push_back(Value.ContentId);
+ RequestValueIndexes.push_back(ValueIndex);
+ }
else
{
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
@@ -611,7 +627,6 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
}
}
-
if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
{
NeedUpstreamAttachment = true;
@@ -619,6 +634,53 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
Request.Complete = false;
}
+ ValueIndex++;
+ }
+ if (!RequestValueIndexes.empty())
+ {
+ m_CidStore.IterateChunks(
+ CidHashes,
+ [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool {
+ try
+ {
+ const size_t ValueIndex = RequestValueIndexes[Index];
+ ValueRequestData& Value = Request.Values[ValueIndex];
+ if (Payload)
+ {
+ Value.Payload = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Payload)).MakeOwned();
+ if (Value.Payload)
+ {
+ Value.Exists = true;
+ }
+ else
+ {
+ ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("CacheRpcHandler::HandleRpcGetCacheRecords IterateChunks callback failed with '{}'", Ex.what());
+ }
+ return true;
+ },
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ 64u * 1024u);
+
+ for (size_t Index : RequestValueIndexes)
+ {
+ ValueRequestData& Value = Request.Values[Index];
+ if (!Value.Exists)
+ {
+ const CachePolicy ValuePolicy = Value.DownstreamPolicy;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
+ }
+ Request.Complete = false;
+ }
+ }
}
}
}
@@ -1415,6 +1477,7 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
+ // TODO: BatchGet records?
std::vector<CacheKeyRequest*> UpstreamRecordRequests;
for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
{
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index c14ea73a8..133cb42d7 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -424,7 +424,7 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc,
ZEN_INFO("initializing cache store at '{}'", m_BasePath);
DirectoryContent DirContent;
- GetDirectoryContent(m_BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
+ GetDirectoryContent(m_BasePath, DirectoryContentFlags::IncludeDirs, DirContent);
std::vector<std::string> Namespaces;
for (const std::filesystem::path& DirPath : DirContent.Directories)