aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-08-20 12:33:03 +0200
committerGitHub Enterprise <[email protected]>2025-08-20 12:33:03 +0200
commit4c05d1041461b630cd5770dae5e8d03147d5674b (patch)
tree3f5d6b1b4b2b3f167f94e98f902a5f60c2e3d753 /src/zenstore
parentzen print fixes/improvements (#469) (diff)
downloadzen-4c05d1041461b630cd5770dae5e8d03147d5674b.tar.xz
zen-4c05d1041461b630cd5770dae5e8d03147d5674b.zip
per namespace/project cas prep refactor (#470)
- Refactor so we can have more than one cas store for project store and cache. - Refactor `UpstreamCacheClient` so it is not tied to a specific CidStore - Refactor scrub to keep the GC interface ScrubStorage function separate from scrub accessor functions (renamed to Scrub). - Refactor storage size to keep GC interface StorageSize function separate from size accessor functions (renamed to TotalSize) - Refactor cache storage so `ZenCacheDiskLayer::CacheBucket` implements GcStorage interface rather than `ZenCacheNamespace`
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp28
-rw-r--r--src/zenstore/cache/cacherpc.cpp54
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp68
-rw-r--r--src/zenstore/cas.cpp18
-rw-r--r--src/zenstore/cas.h5
-rw-r--r--src/zenstore/cidstore.cpp20
-rw-r--r--src/zenstore/compactcas.h2
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h73
-rw-r--r--src/zenstore/include/zenstore/cache/cacherpc.h8
-rw-r--r--src/zenstore/include/zenstore/cache/structuredcachestore.h18
-rw-r--r--src/zenstore/include/zenstore/cache/upstreamcacheclient.h2
-rw-r--r--src/zenstore/include/zenstore/cidstore.h5
12 files changed, 165 insertions, 136 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 219caca01..cacbbd966 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -798,6 +798,7 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, IoStoreDDCOverrideSize);
}
m_Gc.AddGcReferencer(*this);
+ m_Gc.AddGcStorage(this);
}
ZenCacheDiskLayer::CacheBucket::~CacheBucket()
@@ -812,6 +813,7 @@ ZenCacheDiskLayer::CacheBucket::~CacheBucket()
{
ZEN_ERROR("~CacheBucket() failed with: ", Ex.what());
}
+ m_Gc.RemoveGcStorage(this);
m_Gc.RemoveGcReferencer(*this);
}
@@ -2587,7 +2589,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
ZenCacheDiskLayer::BucketStats
ZenCacheDiskLayer::CacheBucket::Stats()
{
- GcStorageSize Size = StorageSize();
+ CacheStoreSize Size = TotalSize();
return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize,
.MemorySize = Size.MemorySize,
.DiskHitCount = m_DiskHitCount,
@@ -4417,8 +4419,9 @@ ZenCacheDiskLayer::Flush()
}
}
+#if ZEN_WITH_TESTS
void
-ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
+ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
{
ZEN_TRACE_CPU("Z$::ScrubStorage");
@@ -4429,13 +4432,13 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
for (auto& Kv : m_Buckets)
{
-#if 1
+# if 1
Results.push_back(Ctx.ThreadPool().EnqueueTask(
std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
-#else
+# else
CacheBucket& Bucket = *Kv.second;
Bucket.ScrubStorage(Ctx);
-#endif
+# endif
}
for (auto& Result : Results)
@@ -4451,16 +4454,17 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
}
}
}
+#endif // ZEN_WITH_TESTS
-GcStorageSize
-ZenCacheDiskLayer::StorageSize() const
+CacheStoreSize
+ZenCacheDiskLayer::TotalSize() const
{
- GcStorageSize StorageSize{};
+ CacheStoreSize StorageSize{};
RwLock::SharedLockScope _(m_Lock);
for (auto& Kv : m_Buckets)
{
- GcStorageSize BucketSize = Kv.second->StorageSize();
+ CacheStoreSize BucketSize = Kv.second->TotalSize();
StorageSize.DiskSize += BucketSize.DiskSize;
StorageSize.MemorySize += BucketSize.MemorySize;
}
@@ -4471,7 +4475,7 @@ ZenCacheDiskLayer::StorageSize() const
ZenCacheDiskLayer::DiskStats
ZenCacheDiskLayer::Stats() const
{
- GcStorageSize Size = StorageSize();
+ CacheStoreSize Size = TotalSize();
ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize};
{
RwLock::SharedLockScope _(m_Lock);
@@ -4495,7 +4499,7 @@ ZenCacheDiskLayer::GetInfo() const
{
Info.BucketNames.push_back(Kv.first);
Info.EntryCount += Kv.second->EntryCount();
- GcStorageSize BucketSize = Kv.second->StorageSize();
+ CacheStoreSize BucketSize = Kv.second->TotalSize();
Info.StorageSize.DiskSize += BucketSize.DiskSize;
Info.StorageSize.MemorySize += BucketSize.MemorySize;
}
@@ -4510,7 +4514,7 @@ ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const
if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
{
- return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()};
+ return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->TotalSize()};
}
return {};
}
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 5d9a68919..ff21d1ede 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -153,13 +153,13 @@ CacheRpcHandler::CacheRpcHandler(LoggerRef InLog,
CacheStats& InCacheStats,
UpstreamCacheClient& InUpstreamCache,
ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
+ GetCidStoreFunc&& InGetCidStore,
const DiskWriteBlocker* InDiskWriteBlocker)
: m_Log(InLog)
, m_CacheStats(InCacheStats)
, m_UpstreamCache(InUpstreamCache)
, m_CacheStore(InCacheStore)
-, m_CidStore(InCidStore)
+, m_GetCidStore(std::move(InGetCidStore))
, m_DiskWriteBlocker(InDiskWriteBlocker)
{
}
@@ -174,6 +174,12 @@ CacheRpcHandler::AreDiskWritesAllowed() const
return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
}
+CidStore&
+CacheRpcHandler::GetCidStore(std::string_view Namespace)
+{
+ return m_GetCidStore(Namespace);
+}
+
CacheRpcHandler::RpcResponseCode
CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
std::string_view UriNamespace,
@@ -381,9 +387,12 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
Stopwatch Timer;
+ CidStore& ChunkStore = m_GetCidStore(Request.Namespace);
+
Request.RecordObject.IterateAttachments([this,
&Request,
Package,
+ &ChunkStore,
&WriteAttachmentBuffers,
&WriteRawHashes,
&ValidAttachments,
@@ -412,7 +421,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
Count.Invalid++;
}
}
- else if (m_CidStore.ContainsChunk(ValueHash))
+ else if (ChunkStore.ContainsChunk(ValueHash))
{
ValidAttachments.emplace_back(ValueHash);
Count.Valid++;
@@ -448,7 +457,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (!WriteAttachmentBuffers.empty())
{
- std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ std::vector<CidStore::InsertResult> InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
for (size_t Index = 0; Index < InsertResults.size(); Index++)
{
if (InsertResults[Index].New)
@@ -475,10 +484,12 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .Namespace = Request.Namespace,
- .Key = Request.Key,
- .ValueContentIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ZenContentType::kCbPackage,
+ .Namespace = Request.Namespace,
+ .Key = Request.Key,
+ .ValueContentIds = std::move(ValidAttachments)},
+ [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
}
return PutStatus::Success;
}
@@ -521,6 +532,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
return CbPackage{};
}
+ CidStore& ChunkStore = m_GetCidStore(Namespace.value());
+
const bool HasUpstream = m_UpstreamCache.IsActive();
eastl::fixed_vector<RecordRequestData, 16> Requests;
@@ -620,7 +633,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
- if (m_CidStore.ContainsChunk(Value.ContentId))
+ if (ChunkStore.ContainsChunk(Value.ContentId))
{
Value.Exists = true;
}
@@ -641,7 +654,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
else
{
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
+ if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Value.ContentId))
{
if (Chunk.GetSize() > 0)
{
@@ -665,7 +678,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
if (!RequestValueIndexes.empty())
{
- m_CidStore.IterateChunks(
+ ChunkStore.IterateChunks(
CidHashes,
[this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool {
try
@@ -758,7 +771,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
}
- const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, Namespace, &ChunkStore, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) {
if (!Params.Record)
{
return;
@@ -827,7 +840,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Value.Exists = true;
if (StoreLocal)
{
- m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
+ ChunkStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
}
if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
@@ -944,6 +957,8 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ CidStore& ChunkStore = m_GetCidStore(Namespace.value());
+
std::vector<ZenCacheStore::PutResult> BatchResults;
eastl::fixed_vector<size_t, 32> BatchResultIndexes;
eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results;
@@ -1094,7 +1109,8 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty)
{
m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]});
+ {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]},
+ [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
}
}
{
@@ -1546,6 +1562,8 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
+ CidStore& ChunkStore = m_GetCidStore(Namespace);
+
// TODO: BatchGet records?
std::vector<CacheKeyRequest*> UpstreamRecordRequests;
for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
@@ -1684,12 +1702,12 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
{
if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
{
- if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
+ if (ChunkStore.ContainsChunk(Request->Key->ChunkId))
{
Request->Exists = true;
}
}
- else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
+ else if (IoBuffer Payload = ChunkStore.FindChunkByCid(Request->Key->ChunkId))
{
if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
@@ -1822,6 +1840,8 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
return;
}
+ CidStore& ChunkStore = m_GetCidStore(Namespace);
+
CacheChunkRequest& Key = Params.Request;
size_t RequestIndex = std::distance(RequestKeys.data(), &Key);
ChunkRequest& Request = Requests[RequestIndex];
@@ -1841,7 +1861,7 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal);
if (Request.IsRecordRequest)
{
- m_CidStore.AddChunk(Params.Value, Params.RawHash);
+ ChunkStore.AddChunk(Params.Value, Params.RawHash);
}
else
{
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index 973af52b2..1f2d6c37f 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -139,13 +139,10 @@ ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const st
CreateDirectories(m_RootDir);
m_DiskLayer.DiscoverBuckets();
-
- m_Gc.AddGcStorage(this);
}
ZenCacheNamespace::~ZenCacheNamespace()
{
- m_Gc.RemoveGcStorage(this);
}
struct ZenCacheNamespace::PutBatchHandle
@@ -302,7 +299,6 @@ ZenCacheNamespace::EnumerateBucketContents(std::string_view
std::function<void()>
ZenCacheNamespace::Drop()
{
- m_Gc.RemoveGcStorage(this);
return m_DiskLayer.Drop();
}
@@ -312,25 +308,19 @@ ZenCacheNamespace::Flush()
m_DiskLayer.Flush();
}
+#if ZEN_WITH_TESTS
void
-ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx)
+ZenCacheNamespace::Scrub(ScrubContext& Ctx)
{
- if (m_LastScrubTime == Ctx.ScrubTimestamp())
- {
- return;
- }
-
ZEN_INFO("scrubbing '{}'", m_RootDir);
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_DiskLayer.ScrubStorage(Ctx);
+ m_DiskLayer.Scrub(Ctx);
}
+#endif // ZEN_WITH_TESTS
-GcStorageSize
-ZenCacheNamespace::StorageSize() const
+CacheStoreSize
+ZenCacheNamespace::TotalSize() const
{
- return m_DiskLayer.StorageSize();
+ return m_DiskLayer.TotalSize();
}
ZenCacheNamespace::Info
@@ -836,11 +826,13 @@ ZenCacheStore::Flush()
IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); });
}
+#if ZEN_WITH_TESTS
void
-ZenCacheStore::ScrubStorage(ScrubContext& Ctx)
+ZenCacheStore::Scrub(ScrubContext& Ctx)
{
- IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.ScrubStorage(Ctx); });
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); });
}
+#endif // ZEN_WITH_TESTS
CacheValueDetails
ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter,
@@ -965,12 +957,12 @@ ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Names
}
}
-GcStorageSize
-ZenCacheStore::StorageSize() const
+CacheStoreSize
+ZenCacheStore::TotalSize() const
{
- GcStorageSize Size;
+ CacheStoreSize Size;
IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) {
- GcStorageSize StoreSize = Store.StorageSize();
+ CacheStoreSize StoreSize = Store.TotalSize();
Size.MemorySize += StoreSize.MemorySize;
Size.DiskSize += StoreSize.DiskSize;
});
@@ -1040,7 +1032,7 @@ ZenCacheStore::SetLoggingConfig(const Configuration::LogConfig& Loggingconfig)
ZenCacheStore::Info
ZenCacheStore::GetInfo() const
{
- ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()};
+ ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = TotalSize()};
IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) {
Info.NamespaceNames.push_back(std::string(NamespaceName));
@@ -1428,7 +1420,7 @@ TEST_CASE("cachestore.size")
const size_t Count = 16;
ScopedTemporaryDirectory TempDir;
- GcStorageSize CacheSize;
+ CacheStoreSize CacheSize;
{
GcManager Gc;
@@ -1449,7 +1441,7 @@ TEST_CASE("cachestore.size")
Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}, false);
Keys.push_back({BucketName, Hash});
}
- CacheSize = Zcs.StorageSize();
+ CacheSize = Zcs.TotalSize();
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
CHECK_EQ(0, CacheSize.MemorySize);
@@ -1459,7 +1451,7 @@ TEST_CASE("cachestore.size")
Zcs.Get(Key.first, Key.second, _);
}
- CacheSize = Zcs.StorageSize();
+ CacheSize = Zcs.TotalSize();
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize);
}
@@ -1468,7 +1460,7 @@ TEST_CASE("cachestore.size")
GcManager Gc;
ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
- const GcStorageSize SerializedSize = Zcs.StorageSize();
+ const CacheStoreSize SerializedSize = Zcs.TotalSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
@@ -1476,8 +1468,8 @@ TEST_CASE("cachestore.size")
{
Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
}
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
- CHECK_EQ(0, Zcs.StorageSize().MemorySize);
+ CHECK_EQ(0, Zcs.TotalSize().DiskSize);
+ CHECK_EQ(0, Zcs.TotalSize().MemorySize);
}
}
@@ -1486,7 +1478,7 @@ TEST_CASE("cachestore.size")
const size_t Count = 16;
ScopedTemporaryDirectory TempDir;
- GcStorageSize CacheSize;
+ CacheStoreSize CacheSize;
{
GcManager Gc;
@@ -1503,7 +1495,7 @@ TEST_CASE("cachestore.size")
Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}, false);
}
- CacheSize = Zcs.StorageSize();
+ CacheSize = Zcs.TotalSize();
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
CHECK_EQ(0, CacheSize.MemorySize);
}
@@ -1512,7 +1504,7 @@ TEST_CASE("cachestore.size")
GcManager Gc;
ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
- const GcStorageSize SerializedSize = Zcs.StorageSize();
+ const CacheStoreSize SerializedSize = Zcs.TotalSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
@@ -1520,7 +1512,7 @@ TEST_CASE("cachestore.size")
{
Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
}
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ CHECK_EQ(0, Zcs.TotalSize().DiskSize);
}
}
}
@@ -1613,7 +1605,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
GcChunkHashes.swap(RemainingChunkHashes);
};
- const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
+ const uint64_t TotalSize = Zcs.TotalSize().DiskSize;
CHECK_LE(kChunkSize * Chunks.size(), TotalSize);
{
@@ -1971,8 +1963,6 @@ TEST_CASE("cachestore.blocked.disklayer.put")
{
ScopedTemporaryDirectory TempDir;
- GcStorageSize CacheSize;
-
const auto CreateCacheValue = [](size_t Size) -> CbObject {
std::vector<uint8_t> Buf;
Buf.resize(Size, Size & 0xff);
@@ -2108,8 +2098,8 @@ TEST_CASE("cachestore.scrub")
WorkerThreadPool ThreadPool{1};
ScrubContext ScrubCtx{ThreadPool};
- Zcs.ScrubStorage(ScrubCtx);
- CidStore.ScrubStorage(ScrubCtx);
+ Zcs.Scrub(ScrubCtx);
+ CidStore.Scrub(ScrubCtx);
CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());
CHECK(ScrubCtx.BadCids().GetSize() == 0);
}
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index 5879a6742..6b89beb3d 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -73,9 +73,12 @@ public:
WorkerThreadPool* OptionalWorkerPool,
uint64_t LargeSizeLimit) override;
virtual void Flush() override;
- virtual void ScrubStorage(ScrubContext& Ctx) override;
virtual CidStoreSize TotalSize() const override;
+#if ZEN_WITH_TESTS
+ virtual void Scrub(ScrubContext& Ctx) override;
+#endif // ZEN_WITH_TESTS
+
private:
CasContainerStrategy m_TinyStrategy;
CasContainerStrategy m_SmallStrategy;
@@ -463,24 +466,19 @@ CasImpl::Flush()
m_LargeStrategy.Flush();
}
+#if ZEN_WITH_TESTS
void
-CasImpl::ScrubStorage(ScrubContext& Ctx)
+CasImpl::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetCasTag());
ZEN_TRACE_CPU("Cas::ScrubStorage");
- if (m_LastScrubTime == Ctx.ScrubTimestamp())
- {
- return;
- }
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
m_SmallStrategy.ScrubStorage(Ctx);
m_TinyStrategy.ScrubStorage(Ctx);
m_LargeStrategy.ScrubStorage(Ctx);
}
+#endif // ZEN_WITH_TESTS
CidStoreSize
CasImpl::TotalSize() const
@@ -523,7 +521,7 @@ TEST_CASE("CasStore")
WorkerThreadPool ThreadPool{1};
ScrubContext Ctx{ThreadPool};
- Store->ScrubStorage(Ctx);
+ Store->Scrub(Ctx);
IoBuffer Value1{16};
memcpy(Value1.MutableData(), "1234567890123456", 16);
diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h
index e279dd2cc..0f6e2ba9d 100644
--- a/src/zenstore/cas.h
+++ b/src/zenstore/cas.h
@@ -50,12 +50,13 @@ public:
WorkerThreadPool* OptionalWorkerPool,
uint64_t LargeSizeLimit) = 0;
virtual void Flush() = 0;
- virtual void ScrubStorage(ScrubContext& Ctx) = 0;
virtual CidStoreSize TotalSize() const = 0;
+#if ZEN_WITH_TESTS
+ virtual void Scrub(ScrubContext& Ctx) = 0;
+#endif // ZEN_WITH_TESTS
protected:
CidStoreConfiguration m_Config;
- uint64_t m_LastScrubTime = 0;
};
ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc);
diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp
index 2ab769d04..ae1b59dc0 100644
--- a/src/zenstore/cidstore.cpp
+++ b/src/zenstore/cidstore.cpp
@@ -127,17 +127,9 @@ struct CidStore::Impl
void Flush() { m_CasStore.Flush(); }
- void ScrubStorage(ScrubContext& Ctx)
- {
- if (Ctx.ScrubTimestamp() == m_LastScrubTime)
- {
- return;
- }
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_CasStore.ScrubStorage(Ctx);
- }
+#if ZEN_WITH_TESTS
+ void Scrub(ScrubContext& Ctx) { m_CasStore.Scrub(Ctx); }
+#endif // ZEN_WITH_TESTS
CidStoreStats Stats()
{
@@ -236,11 +228,13 @@ CidStore::Flush()
m_Impl->Flush();
}
+#if ZEN_WITH_TESTS
void
-CidStore::ScrubStorage(ScrubContext& Ctx)
+CidStore::Scrub(ScrubContext& Ctx)
{
- m_Impl->ScrubStorage(Ctx);
+ m_Impl->Scrub(Ctx);
}
+#endif // ZEN_WITH_TESTS
CidStoreSize
CidStore::TotalSize() const
diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h
index 15e4cbf81..32c256a42 100644
--- a/src/zenstore/compactcas.h
+++ b/src/zenstore/compactcas.h
@@ -68,10 +68,10 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore
void Flush();
// GcStorage
-
virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
virtual GcStorageSize StorageSize() const override;
+ // GcReferenceStore
virtual std::string GetGcName(GcCtx& Ctx) override;
virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) override;
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index ae5b2014d..49c52f847 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -106,6 +106,12 @@ static_assert(sizeof(DiskIndexEntry) == 32);
//////////////////////////////////////////////////////////////////////////
+struct CacheStoreSize
+{
+ uint64_t DiskSize = 0;
+ uint64_t MemorySize = 0;
+};
+
class ZenCacheDiskLayer
{
public:
@@ -131,8 +137,8 @@ public:
struct BucketInfo
{
- uint64_t EntryCount = 0;
- GcStorageSize StorageSize;
+ uint64_t EntryCount = 0;
+ CacheStoreSize StorageSize;
};
struct Info
@@ -141,7 +147,7 @@ public:
Configuration Config;
std::vector<std::string> BucketNames;
uint64_t EntryCount = 0;
- GcStorageSize StorageSize;
+ CacheStoreSize StorageSize;
};
struct BucketStats
@@ -199,11 +205,10 @@ public:
std::function<void()> Drop();
std::function<void()> DropBucket(std::string_view Bucket);
void Flush();
- void ScrubStorage(ScrubContext& Ctx);
- void DiscoverBuckets();
- GcStorageSize StorageSize() const;
- DiskStats Stats() const;
+ void DiscoverBuckets();
+ CacheStoreSize TotalSize() const;
+ DiskStats Stats() const;
Info GetInfo() const;
std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
@@ -220,6 +225,7 @@ public:
#if ZEN_WITH_TESTS
void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time);
+ void Scrub(ScrubContext& Ctx);
#endif // ZEN_WITH_TESTS
bool GetContentStats(std::string_view BucketName, CacheContentStats& OutContentStats) const;
@@ -227,7 +233,7 @@ public:
/** A cache bucket manages a single directory containing
metadata and data for that bucket
*/
- struct CacheBucket : public GcReferencer
+ struct CacheBucket : public GcReferencer, public GcStorage
{
CacheBucket(GcManager& Gc,
std::atomic_uint64_t& OuterCacheMemoryUsage,
@@ -244,17 +250,22 @@ public:
void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle);
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult);
- void EndPutBatch(PutBatchHandle* Batch) noexcept;
- PutResult Put(const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- bool Overwrite,
- PutBatchHandle* OptionalBatchHandle);
- uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
- std::function<void()> Drop();
- void Flush();
- void ScrubStorage(ScrubContext& Ctx);
+ PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult);
+ void EndPutBatch(PutBatchHandle* Batch) noexcept;
+ PutResult Put(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle);
+ uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
+ std::function<void()> Drop();
+ void Flush();
+ inline CacheStoreSize TotalSize() const
+ {
+ return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(),
+ .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)};
+ }
+
RwLock::SharedLockScope GetGcReferencerLock();
struct ReferencesStats
@@ -277,11 +288,6 @@ public:
std::span<const std::size_t> ChunkIndexes,
std::vector<IoHash>& OutReferences) const;
- inline GcStorageSize StorageSize() const
- {
- return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(),
- .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)};
- }
uint64_t EntryCount() const;
BucketStats Stats();
@@ -293,6 +299,20 @@ public:
void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time);
#endif // ZEN_WITH_TESTS
+ // GcReferencer
+ virtual std::string GetGcName(GcCtx& Ctx) override;
+ virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override;
+ virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override;
+ virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override;
+
+ // GcStorage
+ virtual void ScrubStorage(ScrubContext& Ctx) override;
+ virtual GcStorageSize StorageSize() const override
+ {
+ CacheStoreSize Size = TotalSize();
+ return {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize};
+ }
+
private:
#pragma pack(push)
#pragma pack(1)
@@ -391,11 +411,6 @@ public:
std::atomic_uint64_t m_StandaloneSize{};
std::atomic_uint64_t m_MemCachedSize{};
- virtual std::string GetGcName(GcCtx& Ctx) override;
- virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override;
- virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override;
- virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override;
-
void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
bool ShouldRejectPut(const IoHash& HashKey, ZenCacheValue& InOutValue, bool Overwrite, ZenCacheDiskLayer::PutResult& OutPutResult);
void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h
index 104746aba..80340d72c 100644
--- a/src/zenstore/include/zenstore/cache/cacherpc.h
+++ b/src/zenstore/include/zenstore/cache/cacherpc.h
@@ -70,11 +70,13 @@ IsCompressedBinary(ZenContentType Type)
struct CacheRpcHandler
{
+ typedef std::function<CidStore&(std::string_view Context)> GetCidStoreFunc;
+
CacheRpcHandler(LoggerRef InLog,
CacheStats& InCacheStats,
UpstreamCacheClient& InUpstreamCache,
ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
+ GetCidStoreFunc&& InGetCidStore,
const DiskWriteBlocker* InDiskWriteBlocker);
~CacheRpcHandler();
@@ -94,6 +96,8 @@ struct CacheRpcHandler
int& OutTargetProcessId,
CbPackage& OutPackage);
+ CidStore& GetCidStore(std::string_view Namespace);
+
private:
CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest);
CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest);
@@ -142,7 +146,7 @@ private:
CacheStats& m_CacheStats;
UpstreamCacheClient& m_UpstreamCache;
ZenCacheStore& m_CacheStore;
- CidStore& m_CidStore;
+ GetCidStoreFunc m_GetCidStore;
const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
bool AreDiskWritesAllowed() const;
diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h
index b6e8e7565..c51d7312c 100644
--- a/src/zenstore/include/zenstore/cache/structuredcachestore.h
+++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h
@@ -49,7 +49,7 @@ class JobQueue;
*/
-class ZenCacheNamespace final : public GcStorage
+class ZenCacheNamespace final
{
public:
struct Configuration
@@ -107,10 +107,7 @@ public:
std::function<void()> Drop();
void Flush();
- // GcStorage
- virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
- virtual GcStorageSize StorageSize() const override;
-
+ CacheStoreSize TotalSize() const;
Configuration GetConfig() const { return m_Configuration; }
Info GetInfo() const;
std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
@@ -127,6 +124,7 @@ public:
#if ZEN_WITH_TESTS
void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time);
+ void Scrub(ScrubContext& ScrubCtx);
#endif // ZEN_WITH_TESTS
private:
@@ -140,7 +138,6 @@ private:
std::atomic<uint64_t> m_WriteCount{};
metrics::RequestStats m_PutOps;
metrics::RequestStats m_GetOps;
- uint64_t m_LastScrubTime = 0;
ZenCacheNamespace(const ZenCacheNamespace&) = delete;
ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete;
@@ -178,7 +175,7 @@ public:
Configuration Config;
std::vector<std::string> NamespaceNames;
uint64_t DiskEntryCount = 0;
- GcStorageSize StorageSize;
+ CacheStoreSize StorageSize;
};
struct NamedNamespaceStats
@@ -260,13 +257,12 @@ public:
bool DropBucket(std::string_view Namespace, std::string_view Bucket);
bool DropNamespace(std::string_view Namespace);
void Flush();
- void ScrubStorage(ScrubContext& Ctx);
CacheValueDetails GetValueDetails(const std::string_view NamespaceFilter,
const std::string_view BucketFilter,
const std::string_view ValueFilter) const;
- GcStorageSize StorageSize() const;
+ CacheStoreSize TotalSize() const;
CacheStoreStats Stats(bool IncludeNamespaceStats = true);
Configuration GetConfiguration() const { return m_Configuration; }
@@ -296,6 +292,10 @@ public:
bool GetContentStats(std::string_view Namespace, std::string_view BucketName, CacheContentStats& OutContentStats) const;
+#if ZEN_WITH_TESTS
+ void Scrub(ScrubContext& Ctx);
+#endif // ZEN_WITH_TESTS
+
private:
const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const;
ZenCacheNamespace* GetNamespace(std::string_view Namespace);
diff --git a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
index 152031c3a..c3993c028 100644
--- a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
+++ b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
@@ -113,7 +113,7 @@ public:
std::span<CacheChunkRequest*> CacheChunkRequests,
OnCacheChunksGetComplete&& OnComplete) = 0;
- virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+ virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function<IoBuffer(const IoHash&)>&& GetValueFunc) = 0;
};
} // namespace zen
diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h
index b3d00fec0..8918b119f 100644
--- a/src/zenstore/include/zenstore/cidstore.h
+++ b/src/zenstore/include/zenstore/cidstore.h
@@ -87,12 +87,15 @@ public:
bool ContainsChunk(const IoHash& DecompressedId);
void FilterChunks(HashKeySet& InOutChunks);
void Flush();
- void ScrubStorage(ScrubContext& Ctx);
CidStoreSize TotalSize() const;
CidStoreStats Stats() const;
virtual void ReportMetrics(StatsMetrics& Statsd) override;
+#if ZEN_WITH_TESTS
+ void Scrub(ScrubContext& Ctx);
+#endif // ZEN_WITH_TESTS
+
private:
struct Impl;
std::unique_ptr<CasStore> m_CasStore;