aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/cache')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp28
-rw-r--r--src/zenstore/cache/cacherpc.cpp54
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp68
3 files changed, 82 insertions, 68 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 219caca01..cacbbd966 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -798,6 +798,7 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, IoStoreDDCOverrideSize);
}
m_Gc.AddGcReferencer(*this);
+ m_Gc.AddGcStorage(this);
}
ZenCacheDiskLayer::CacheBucket::~CacheBucket()
@@ -812,6 +813,7 @@ ZenCacheDiskLayer::CacheBucket::~CacheBucket()
{
ZEN_ERROR("~CacheBucket() failed with: ", Ex.what());
}
+ m_Gc.RemoveGcStorage(this);
m_Gc.RemoveGcReferencer(*this);
}
@@ -2587,7 +2589,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
ZenCacheDiskLayer::BucketStats
ZenCacheDiskLayer::CacheBucket::Stats()
{
- GcStorageSize Size = StorageSize();
+ CacheStoreSize Size = TotalSize();
return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize,
.MemorySize = Size.MemorySize,
.DiskHitCount = m_DiskHitCount,
@@ -4417,8 +4419,9 @@ ZenCacheDiskLayer::Flush()
}
}
+#if ZEN_WITH_TESTS
void
-ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
+ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
{
ZEN_TRACE_CPU("Z$::ScrubStorage");
@@ -4429,13 +4432,13 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
for (auto& Kv : m_Buckets)
{
-#if 1
+# if 1
Results.push_back(Ctx.ThreadPool().EnqueueTask(
std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
-#else
+# else
CacheBucket& Bucket = *Kv.second;
Bucket.ScrubStorage(Ctx);
-#endif
+# endif
}
for (auto& Result : Results)
@@ -4451,16 +4454,17 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
}
}
}
+#endif // ZEN_WITH_TESTS
-GcStorageSize
-ZenCacheDiskLayer::StorageSize() const
+CacheStoreSize
+ZenCacheDiskLayer::TotalSize() const
{
- GcStorageSize StorageSize{};
+ CacheStoreSize StorageSize{};
RwLock::SharedLockScope _(m_Lock);
for (auto& Kv : m_Buckets)
{
- GcStorageSize BucketSize = Kv.second->StorageSize();
+ CacheStoreSize BucketSize = Kv.second->TotalSize();
StorageSize.DiskSize += BucketSize.DiskSize;
StorageSize.MemorySize += BucketSize.MemorySize;
}
@@ -4471,7 +4475,7 @@ ZenCacheDiskLayer::StorageSize() const
ZenCacheDiskLayer::DiskStats
ZenCacheDiskLayer::Stats() const
{
- GcStorageSize Size = StorageSize();
+ CacheStoreSize Size = TotalSize();
ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize};
{
RwLock::SharedLockScope _(m_Lock);
@@ -4495,7 +4499,7 @@ ZenCacheDiskLayer::GetInfo() const
{
Info.BucketNames.push_back(Kv.first);
Info.EntryCount += Kv.second->EntryCount();
- GcStorageSize BucketSize = Kv.second->StorageSize();
+ CacheStoreSize BucketSize = Kv.second->TotalSize();
Info.StorageSize.DiskSize += BucketSize.DiskSize;
Info.StorageSize.MemorySize += BucketSize.MemorySize;
}
@@ -4510,7 +4514,7 @@ ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const
if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
{
- return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()};
+ return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->TotalSize()};
}
return {};
}
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 5d9a68919..ff21d1ede 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -153,13 +153,13 @@ CacheRpcHandler::CacheRpcHandler(LoggerRef InLog,
CacheStats& InCacheStats,
UpstreamCacheClient& InUpstreamCache,
ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
+ GetCidStoreFunc&& InGetCidStore,
const DiskWriteBlocker* InDiskWriteBlocker)
: m_Log(InLog)
, m_CacheStats(InCacheStats)
, m_UpstreamCache(InUpstreamCache)
, m_CacheStore(InCacheStore)
-, m_CidStore(InCidStore)
+, m_GetCidStore(std::move(InGetCidStore))
, m_DiskWriteBlocker(InDiskWriteBlocker)
{
}
@@ -174,6 +174,12 @@ CacheRpcHandler::AreDiskWritesAllowed() const
return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
}
+CidStore&
+CacheRpcHandler::GetCidStore(std::string_view Namespace)
+{
+ return m_GetCidStore(Namespace);
+}
+
CacheRpcHandler::RpcResponseCode
CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
std::string_view UriNamespace,
@@ -381,9 +387,12 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
Stopwatch Timer;
+ CidStore& ChunkStore = m_GetCidStore(Request.Namespace);
+
Request.RecordObject.IterateAttachments([this,
&Request,
Package,
+ &ChunkStore,
&WriteAttachmentBuffers,
&WriteRawHashes,
&ValidAttachments,
@@ -412,7 +421,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
Count.Invalid++;
}
}
- else if (m_CidStore.ContainsChunk(ValueHash))
+ else if (ChunkStore.ContainsChunk(ValueHash))
{
ValidAttachments.emplace_back(ValueHash);
Count.Valid++;
@@ -448,7 +457,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (!WriteAttachmentBuffers.empty())
{
- std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ std::vector<CidStore::InsertResult> InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
for (size_t Index = 0; Index < InsertResults.size(); Index++)
{
if (InsertResults[Index].New)
@@ -475,10 +484,12 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .Namespace = Request.Namespace,
- .Key = Request.Key,
- .ValueContentIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ZenContentType::kCbPackage,
+ .Namespace = Request.Namespace,
+ .Key = Request.Key,
+ .ValueContentIds = std::move(ValidAttachments)},
+ [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
}
return PutStatus::Success;
}
@@ -521,6 +532,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
return CbPackage{};
}
+ CidStore& ChunkStore = m_GetCidStore(Namespace.value());
+
const bool HasUpstream = m_UpstreamCache.IsActive();
eastl::fixed_vector<RecordRequestData, 16> Requests;
@@ -620,7 +633,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
- if (m_CidStore.ContainsChunk(Value.ContentId))
+ if (ChunkStore.ContainsChunk(Value.ContentId))
{
Value.Exists = true;
}
@@ -641,7 +654,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
else
{
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
+ if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Value.ContentId))
{
if (Chunk.GetSize() > 0)
{
@@ -665,7 +678,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
if (!RequestValueIndexes.empty())
{
- m_CidStore.IterateChunks(
+ ChunkStore.IterateChunks(
CidHashes,
[this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool {
try
@@ -758,7 +771,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
}
- const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, Namespace, &ChunkStore, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) {
if (!Params.Record)
{
return;
@@ -827,7 +840,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Value.Exists = true;
if (StoreLocal)
{
- m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
+ ChunkStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
}
if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
@@ -944,6 +957,8 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ CidStore& ChunkStore = m_GetCidStore(Namespace.value());
+
std::vector<ZenCacheStore::PutResult> BatchResults;
eastl::fixed_vector<size_t, 32> BatchResultIndexes;
eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results;
@@ -1094,7 +1109,8 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty)
{
m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]});
+ {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]},
+ [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
}
}
{
@@ -1546,6 +1562,8 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
+ CidStore& ChunkStore = m_GetCidStore(Namespace);
+
// TODO: BatchGet records?
std::vector<CacheKeyRequest*> UpstreamRecordRequests;
for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
@@ -1684,12 +1702,12 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
{
if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
{
- if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
+ if (ChunkStore.ContainsChunk(Request->Key->ChunkId))
{
Request->Exists = true;
}
}
- else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
+ else if (IoBuffer Payload = ChunkStore.FindChunkByCid(Request->Key->ChunkId))
{
if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
@@ -1822,6 +1840,8 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
return;
}
+ CidStore& ChunkStore = m_GetCidStore(Namespace);
+
CacheChunkRequest& Key = Params.Request;
size_t RequestIndex = std::distance(RequestKeys.data(), &Key);
ChunkRequest& Request = Requests[RequestIndex];
@@ -1841,7 +1861,7 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal);
if (Request.IsRecordRequest)
{
- m_CidStore.AddChunk(Params.Value, Params.RawHash);
+ ChunkStore.AddChunk(Params.Value, Params.RawHash);
}
else
{
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index 973af52b2..1f2d6c37f 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -139,13 +139,10 @@ ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const st
CreateDirectories(m_RootDir);
m_DiskLayer.DiscoverBuckets();
-
- m_Gc.AddGcStorage(this);
}
ZenCacheNamespace::~ZenCacheNamespace()
{
- m_Gc.RemoveGcStorage(this);
}
struct ZenCacheNamespace::PutBatchHandle
@@ -302,7 +299,6 @@ ZenCacheNamespace::EnumerateBucketContents(std::string_view
std::function<void()>
ZenCacheNamespace::Drop()
{
- m_Gc.RemoveGcStorage(this);
return m_DiskLayer.Drop();
}
@@ -312,25 +308,19 @@ ZenCacheNamespace::Flush()
m_DiskLayer.Flush();
}
+#if ZEN_WITH_TESTS
void
-ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx)
+ZenCacheNamespace::Scrub(ScrubContext& Ctx)
{
- if (m_LastScrubTime == Ctx.ScrubTimestamp())
- {
- return;
- }
-
ZEN_INFO("scrubbing '{}'", m_RootDir);
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_DiskLayer.ScrubStorage(Ctx);
+ m_DiskLayer.Scrub(Ctx);
}
+#endif // ZEN_WITH_TESTS
-GcStorageSize
-ZenCacheNamespace::StorageSize() const
+CacheStoreSize
+ZenCacheNamespace::TotalSize() const
{
- return m_DiskLayer.StorageSize();
+ return m_DiskLayer.TotalSize();
}
ZenCacheNamespace::Info
@@ -836,11 +826,13 @@ ZenCacheStore::Flush()
IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); });
}
+#if ZEN_WITH_TESTS
void
-ZenCacheStore::ScrubStorage(ScrubContext& Ctx)
+ZenCacheStore::Scrub(ScrubContext& Ctx)
{
- IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.ScrubStorage(Ctx); });
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); });
}
+#endif // ZEN_WITH_TESTS
CacheValueDetails
ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter,
@@ -965,12 +957,12 @@ ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Names
}
}
-GcStorageSize
-ZenCacheStore::StorageSize() const
+CacheStoreSize
+ZenCacheStore::TotalSize() const
{
- GcStorageSize Size;
+ CacheStoreSize Size;
IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) {
- GcStorageSize StoreSize = Store.StorageSize();
+ CacheStoreSize StoreSize = Store.TotalSize();
Size.MemorySize += StoreSize.MemorySize;
Size.DiskSize += StoreSize.DiskSize;
});
@@ -1040,7 +1032,7 @@ ZenCacheStore::SetLoggingConfig(const Configuration::LogConfig& Loggingconfig)
ZenCacheStore::Info
ZenCacheStore::GetInfo() const
{
- ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()};
+ ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = TotalSize()};
IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) {
Info.NamespaceNames.push_back(std::string(NamespaceName));
@@ -1428,7 +1420,7 @@ TEST_CASE("cachestore.size")
const size_t Count = 16;
ScopedTemporaryDirectory TempDir;
- GcStorageSize CacheSize;
+ CacheStoreSize CacheSize;
{
GcManager Gc;
@@ -1449,7 +1441,7 @@ TEST_CASE("cachestore.size")
Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}, false);
Keys.push_back({BucketName, Hash});
}
- CacheSize = Zcs.StorageSize();
+ CacheSize = Zcs.TotalSize();
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
CHECK_EQ(0, CacheSize.MemorySize);
@@ -1459,7 +1451,7 @@ TEST_CASE("cachestore.size")
Zcs.Get(Key.first, Key.second, _);
}
- CacheSize = Zcs.StorageSize();
+ CacheSize = Zcs.TotalSize();
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize);
}
@@ -1468,7 +1460,7 @@ TEST_CASE("cachestore.size")
GcManager Gc;
ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
- const GcStorageSize SerializedSize = Zcs.StorageSize();
+ const CacheStoreSize SerializedSize = Zcs.TotalSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
@@ -1476,8 +1468,8 @@ TEST_CASE("cachestore.size")
{
Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
}
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
- CHECK_EQ(0, Zcs.StorageSize().MemorySize);
+ CHECK_EQ(0, Zcs.TotalSize().DiskSize);
+ CHECK_EQ(0, Zcs.TotalSize().MemorySize);
}
}
@@ -1486,7 +1478,7 @@ TEST_CASE("cachestore.size")
const size_t Count = 16;
ScopedTemporaryDirectory TempDir;
- GcStorageSize CacheSize;
+ CacheStoreSize CacheSize;
{
GcManager Gc;
@@ -1503,7 +1495,7 @@ TEST_CASE("cachestore.size")
Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}, false);
}
- CacheSize = Zcs.StorageSize();
+ CacheSize = Zcs.TotalSize();
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
CHECK_EQ(0, CacheSize.MemorySize);
}
@@ -1512,7 +1504,7 @@ TEST_CASE("cachestore.size")
GcManager Gc;
ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
- const GcStorageSize SerializedSize = Zcs.StorageSize();
+ const CacheStoreSize SerializedSize = Zcs.TotalSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
@@ -1520,7 +1512,7 @@ TEST_CASE("cachestore.size")
{
Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
}
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ CHECK_EQ(0, Zcs.TotalSize().DiskSize);
}
}
}
@@ -1613,7 +1605,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
GcChunkHashes.swap(RemainingChunkHashes);
};
- const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
+ const uint64_t TotalSize = Zcs.TotalSize().DiskSize;
CHECK_LE(kChunkSize * Chunks.size(), TotalSize);
{
@@ -1971,8 +1963,6 @@ TEST_CASE("cachestore.blocked.disklayer.put")
{
ScopedTemporaryDirectory TempDir;
- GcStorageSize CacheSize;
-
const auto CreateCacheValue = [](size_t Size) -> CbObject {
std::vector<uint8_t> Buf;
Buf.resize(Size, Size & 0xff);
@@ -2108,8 +2098,8 @@ TEST_CASE("cachestore.scrub")
WorkerThreadPool ThreadPool{1};
ScrubContext ScrubCtx{ThreadPool};
- Zcs.ScrubStorage(ScrubCtx);
- CidStore.ScrubStorage(ScrubCtx);
+ Zcs.Scrub(ScrubCtx);
+ CidStore.Scrub(ScrubCtx);
CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());
CHECK(ScrubCtx.BadCids().GetSize() == 0);
}