aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorLiam Mitchell <[email protected]>2025-08-21 23:58:51 +0000
committerLiam Mitchell <[email protected]>2025-08-21 23:58:51 +0000
commit33209bd6931f49362dfc2d62c6cb6b87a42c99e1 (patch)
treecfc7914634088b3f4feac2d4cec0b5650dfdcc3c /src/zenstore/cache/structuredcachestore.cpp
parentFix changelog merge issues (diff)
parentavoid new in static IoBuffer (#472) (diff)
downloadzen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.tar.xz
zen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.zip
Merge remote-tracking branch 'origin/main' into de/zen-service-command
Diffstat (limited to 'src/zenstore/cache/structuredcachestore.cpp')
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp134
1 files changed, 70 insertions, 64 deletions
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index d956384ca..1f2d6c37f 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -139,13 +139,10 @@ ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const st
CreateDirectories(m_RootDir);
m_DiskLayer.DiscoverBuckets();
-
- m_Gc.AddGcStorage(this);
}
ZenCacheNamespace::~ZenCacheNamespace()
{
- m_Gc.RemoveGcStorage(this);
}
struct ZenCacheNamespace::PutBatchHandle
@@ -154,7 +151,7 @@ struct ZenCacheNamespace::PutBatchHandle
};
ZenCacheNamespace::PutBatchHandle*
-ZenCacheNamespace::BeginPutBatch(std::vector<bool>& OutResult)
+ZenCacheNamespace::BeginPutBatch(std::vector<PutResult>& OutResult)
{
ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle;
Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult);
@@ -252,11 +249,12 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc
return;
}
-void
+ZenCacheNamespace::PutResult
ZenCacheNamespace::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU(OptionalBatchHandle ? "Z$::Namespace::Put(Batched)" : "Z$::Namespace::Put");
@@ -268,8 +266,12 @@ ZenCacheNamespace::Put(std::string_view InBucket,
ZEN_ASSERT(Value.Value.Size());
ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr;
- m_DiskLayer.Put(InBucket, HashKey, Value, References, DiskLayerBatchHandle);
- m_WriteCount++;
+ PutResult RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle);
+ if (RetVal.Status == zen::PutStatus::Success)
+ {
+ m_WriteCount++;
+ }
+ return RetVal;
}
bool
@@ -297,7 +299,6 @@ ZenCacheNamespace::EnumerateBucketContents(std::string_view
std::function<void()>
ZenCacheNamespace::Drop()
{
- m_Gc.RemoveGcStorage(this);
return m_DiskLayer.Drop();
}
@@ -307,25 +308,19 @@ ZenCacheNamespace::Flush()
m_DiskLayer.Flush();
}
+#if ZEN_WITH_TESTS
void
-ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx)
+ZenCacheNamespace::Scrub(ScrubContext& Ctx)
{
- if (m_LastScrubTime == Ctx.ScrubTimestamp())
- {
- return;
- }
-
ZEN_INFO("scrubbing '{}'", m_RootDir);
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_DiskLayer.ScrubStorage(Ctx);
+ m_DiskLayer.Scrub(Ctx);
}
+#endif // ZEN_WITH_TESTS
-GcStorageSize
-ZenCacheNamespace::StorageSize() const
+CacheStoreSize
+ZenCacheNamespace::TotalSize() const
{
- return m_DiskLayer.StorageSize();
+ return m_DiskLayer.TotalSize();
}
ZenCacheNamespace::Info
@@ -557,7 +552,7 @@ ZenCacheStore::LogWorker()
}
}
-ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<bool>& OutResult)
+ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<PutResult>& OutResult)
: m_CacheStore(CacheStore)
{
ZEN_MEMSCOPE(GetCacheStoreTag());
@@ -720,13 +715,14 @@ ZenCacheStore::Get(const CacheRequestContext& Context,
m_MissCount++;
}
-void
+ZenCacheStore::PutResult
ZenCacheStore::Put(const CacheRequestContext& Context,
std::string_view Namespace,
std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatch* OptionalBatchHandle)
{
// Ad hoc rejection of known bad usage patterns for DDC bucket names
@@ -734,7 +730,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (IsKnownBadBucketName(Bucket))
{
m_RejectedWriteCount++;
- return;
+ return PutResult{zen::PutStatus::Invalid, "Bad bucket name"};
}
ZEN_MEMSCOPE(GetCacheStoreTag());
@@ -764,9 +760,16 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
{
ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr;
- Store->Put(Bucket, HashKey, Value, References, BatchHandle);
- m_WriteCount++;
- return;
+ PutResult RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle);
+ if (RetVal.Status == zen::PutStatus::Success)
+ {
+ m_WriteCount++;
+ }
+ else
+ {
+ m_RejectedWriteCount++;
+ }
+ return RetVal;
}
ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'",
@@ -774,6 +777,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
Namespace,
Bucket,
HashKey.ToHexString());
+ return PutResult{zen::PutStatus::Fail, fmt::format("Unknown namespace '{}'", Namespace)};
}
bool
@@ -822,11 +826,13 @@ ZenCacheStore::Flush()
IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); });
}
+#if ZEN_WITH_TESTS
void
-ZenCacheStore::ScrubStorage(ScrubContext& Ctx)
+ZenCacheStore::Scrub(ScrubContext& Ctx)
{
- IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.ScrubStorage(Ctx); });
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); });
}
+#endif // ZEN_WITH_TESTS
CacheValueDetails
ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter,
@@ -951,12 +957,12 @@ ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Names
}
}
-GcStorageSize
-ZenCacheStore::StorageSize() const
+CacheStoreSize
+ZenCacheStore::TotalSize() const
{
- GcStorageSize Size;
+ CacheStoreSize Size;
IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) {
- GcStorageSize StoreSize = Store.StorageSize();
+ CacheStoreSize StoreSize = Store.TotalSize();
Size.MemorySize += StoreSize.MemorySize;
Size.DiskSize += StoreSize.DiskSize;
});
@@ -1026,7 +1032,7 @@ ZenCacheStore::SetLoggingConfig(const Configuration::LogConfig& Loggingconfig)
ZenCacheStore::Info
ZenCacheStore::GetInfo() const
{
- ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()};
+ ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = TotalSize()};
IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) {
Info.NamespaceNames.push_back(std::string(NamespaceName));
@@ -1378,7 +1384,7 @@ TEST_CASE("cachestore.store")
Value.Value = Obj.GetBuffer().AsIoBuffer();
Value.Value.SetContentType(ZenContentType::kCbObject);
- Zcs.Put("test_bucket"sv, Key, Value, {});
+ Zcs.Put("test_bucket"sv, Key, Value, {}, false);
}
for (int i = 0; i < kIterationCount; ++i)
@@ -1414,7 +1420,7 @@ TEST_CASE("cachestore.size")
const size_t Count = 16;
ScopedTemporaryDirectory TempDir;
- GcStorageSize CacheSize;
+ CacheStoreSize CacheSize;
{
GcManager Gc;
@@ -1432,10 +1438,10 @@ TEST_CASE("cachestore.size")
const size_t Bucket = Key % 4;
std::string BucketName = fmt::format("test_bucket-{}", Bucket);
IoHash Hash = IoHash::HashBuffer(&Key, sizeof(uint32_t));
- Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {});
+ Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}, false);
Keys.push_back({BucketName, Hash});
}
- CacheSize = Zcs.StorageSize();
+ CacheSize = Zcs.TotalSize();
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
CHECK_EQ(0, CacheSize.MemorySize);
@@ -1445,7 +1451,7 @@ TEST_CASE("cachestore.size")
Zcs.Get(Key.first, Key.second, _);
}
- CacheSize = Zcs.StorageSize();
+ CacheSize = Zcs.TotalSize();
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize);
}
@@ -1454,7 +1460,7 @@ TEST_CASE("cachestore.size")
GcManager Gc;
ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
- const GcStorageSize SerializedSize = Zcs.StorageSize();
+ const CacheStoreSize SerializedSize = Zcs.TotalSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
@@ -1462,8 +1468,8 @@ TEST_CASE("cachestore.size")
{
Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
}
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
- CHECK_EQ(0, Zcs.StorageSize().MemorySize);
+ CHECK_EQ(0, Zcs.TotalSize().DiskSize);
+ CHECK_EQ(0, Zcs.TotalSize().MemorySize);
}
}
@@ -1472,7 +1478,7 @@ TEST_CASE("cachestore.size")
const size_t Count = 16;
ScopedTemporaryDirectory TempDir;
- GcStorageSize CacheSize;
+ CacheStoreSize CacheSize;
{
GcManager Gc;
@@ -1486,10 +1492,10 @@ TEST_CASE("cachestore.size")
for (size_t Key = 0; Key < Count; ++Key)
{
const size_t Bucket = Key % 4;
- Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {});
+ Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}, false);
}
- CacheSize = Zcs.StorageSize();
+ CacheSize = Zcs.TotalSize();
CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
CHECK_EQ(0, CacheSize.MemorySize);
}
@@ -1498,7 +1504,7 @@ TEST_CASE("cachestore.size")
GcManager Gc;
ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
- const GcStorageSize SerializedSize = Zcs.StorageSize();
+ const CacheStoreSize SerializedSize = Zcs.TotalSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
@@ -1506,7 +1512,7 @@ TEST_CASE("cachestore.size")
{
Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
}
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ CHECK_EQ(0, Zcs.TotalSize().DiskSize);
}
}
}
@@ -1569,7 +1575,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
for (const auto& Chunk : Chunks)
{
ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
WorkCompleted.fetch_add(1);
});
}
@@ -1599,7 +1605,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
GcChunkHashes.swap(RemainingChunkHashes);
};
- const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
+ const uint64_t TotalSize = Zcs.TotalSize().DiskSize;
CHECK_LE(kChunkSize * Chunks.size(), TotalSize);
{
@@ -1650,7 +1656,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
for (const auto& Chunk : NewChunks)
{
ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
AddedChunkCount.fetch_add(1);
WorkCompleted.fetch_add(1);
});
@@ -1755,14 +1761,14 @@ TEST_CASE("cachestore.namespaces")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {});
+ Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {}, false);
ZenCacheValue GetValue;
CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
// This should just be dropped as we don't allow creating of namespaces on the fly
- Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {});
+ Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {}, false);
CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
}
@@ -1778,7 +1784,7 @@ TEST_CASE("cachestore.namespaces")
IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
Buffer2.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue2 = {.Value = Buffer2};
- Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {});
+ Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {}, false);
ZenCacheValue GetValue;
CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
@@ -1820,7 +1826,7 @@ TEST_CASE("cachestore.drop.bucket")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false);
return Key;
};
auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
@@ -1893,7 +1899,7 @@ TEST_CASE("cachestore.drop.namespace")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false);
return Key;
};
auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
@@ -1957,8 +1963,6 @@ TEST_CASE("cachestore.blocked.disklayer.put")
{
ScopedTemporaryDirectory TempDir;
- GcStorageSize CacheSize;
-
const auto CreateCacheValue = [](size_t Size) -> CbObject {
std::vector<uint8_t> Buf;
Buf.resize(Size, Size & 0xff);
@@ -1979,7 +1983,7 @@ TEST_CASE("cachestore.blocked.disklayer.put")
size_t Key = Buffer.Size();
IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t));
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {});
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {}, false);
ZenCacheValue BufferGet;
CHECK(Zcs.Get("test_bucket", HashKey, BufferGet));
@@ -1989,7 +1993,7 @@ TEST_CASE("cachestore.blocked.disklayer.put")
Buffer2.SetContentType(ZenContentType::kCbObject);
// We should be able to overwrite even if the file is open for read
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {});
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {}, false);
MemoryView OldView = BufferGet.Value.GetView();
@@ -2080,7 +2084,7 @@ TEST_CASE("cachestore.scrub")
AttachmentHashes.push_back(Attachment.DecodeRawHash());
CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), AttachmentHashes.back());
}
- Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes);
+ Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes, false);
}
};
@@ -2094,8 +2098,8 @@ TEST_CASE("cachestore.scrub")
WorkerThreadPool ThreadPool{1};
ScrubContext ScrubCtx{ThreadPool};
- Zcs.ScrubStorage(ScrubCtx);
- CidStore.ScrubStorage(ScrubCtx);
+ Zcs.Scrub(ScrubCtx);
+ CidStore.Scrub(ScrubCtx);
CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());
CHECK(ScrubCtx.BadCids().GetSize() == 0);
}
@@ -2129,7 +2133,8 @@ TEST_CASE("cachestore.newgc.basics")
{.Value = Record.second,
.RawSize = Record.second.GetSize(),
.RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())},
- AttachmentKeys);
+ AttachmentKeys,
+ false);
for (const auto& Attachment : Attachments)
{
CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash());
@@ -2145,7 +2150,8 @@ TEST_CASE("cachestore.newgc.basics")
{.Value = CacheValue.second,
.RawSize = CacheValue.second.GetSize(),
.RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())},
- {});
+ {},
+ false);
CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}});
return Key;
};