aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-19 21:49:55 +0100
committerGitHub <[email protected]>2023-12-19 21:49:55 +0100
commit8a90a40b4517d198855c1e52740a7e7fb21ecc20 (patch)
treeffd8524bcebf8841b69725934f31d438a03e7746 /src/zenstore/cache/structuredcachestore.cpp
parent0.2.38 (diff)
downloadzen-8a90a40b4517d198855c1e52740a7e7fb21ecc20.tar.xz
zen-8a90a40b4517d198855c1e52740a7e7fb21ecc20.zip
move cachedisklayer and structuredcachestore into zenstore (#624)
Diffstat (limited to 'src/zenstore/cache/structuredcachestore.cpp')
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp2440
1 files changed, 2440 insertions, 0 deletions
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
new file mode 100644
index 000000000..f92baaf0b
--- /dev/null
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -0,0 +1,2440 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenstore/structuredcachestore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compress.h>
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/string.h>
+#include <zencore/thread.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
+#include <zenstore/scrubcontext.h>
+#include <zenutil/cache/cache.h>
+
+#include <future>
+#include <limits>
+
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+#endif
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/core.h>
+#include <xxhash.h>
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#if ZEN_WITH_TESTS
+# include <zencore/jobqueue.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+# include <zenstore/cidstore.h>
+# include <random>
+# include <unordered_map>
+#endif
+
+namespace zen {
+
+ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config)
+: m_Gc(Gc)
+, m_JobQueue(JobQueue)
+, m_RootDir(RootDir)
+, m_Configuration(Config)
+, m_DiskLayer(m_Gc, m_JobQueue, m_RootDir, m_Configuration.DiskLayerConfig)
+{
+ ZEN_INFO("initializing structured cache at '{}'", m_RootDir);
+ CreateDirectories(m_RootDir);
+
+ m_DiskLayer.DiscoverBuckets();
+
+ m_Gc.AddGcContributor(this);
+ m_Gc.AddGcStorage(this);
+}
+
+ZenCacheNamespace::~ZenCacheNamespace()
+{
+ m_Gc.RemoveGcStorage(this);
+ m_Gc.RemoveGcContributor(this);
+}
+
+bool
+ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ ZEN_TRACE_CPU("Z$::Namespace::Get");
+
+ metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
+
+ bool Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue);
+
+ if (Ok)
+ {
+ ZEN_ASSERT(OutValue.Value.Size());
+ StatsScope.SetBytes(OutValue.Value.Size());
+
+ m_HitCount++;
+ return true;
+ }
+
+ m_MissCount++;
+ return false;
+}
+
+void
+ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+{
+ ZEN_TRACE_CPU("Z$::Namespace::Put");
+
+ metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
+
+ // Store value and index
+
+ ZEN_ASSERT(Value.Value.Size());
+
+ m_DiskLayer.Put(InBucket, HashKey, Value, References);
+ m_WriteCount++;
+}
+
+bool
+ZenCacheNamespace::DropBucket(std::string_view Bucket)
+{
+ ZEN_INFO("dropping bucket '{}'", Bucket);
+
+ const bool Dropped = m_DiskLayer.DropBucket(Bucket);
+
+ ZEN_INFO("bucket '{}' was {}", Bucket, Dropped ? "dropped" : "not found");
+
+ return Dropped;
+}
+
+void
+ZenCacheNamespace::EnumerateBucketContents(std::string_view Bucket,
+ std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const
+{
+ m_DiskLayer.EnumerateBucketContents(Bucket, Fn);
+}
+
+bool
+ZenCacheNamespace::Drop()
+{
+ return m_DiskLayer.Drop();
+}
+
+void
+ZenCacheNamespace::Flush()
+{
+ m_DiskLayer.Flush();
+}
+
+void
+ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx)
+{
+ if (m_LastScrubTime == Ctx.ScrubTimestamp())
+ {
+ return;
+ }
+
+ ZEN_INFO("scrubbing '{}'", m_RootDir);
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
+ m_DiskLayer.ScrubStorage(Ctx);
+}
+
+void
+ZenCacheNamespace::GatherReferences(GcContext& GcCtx)
+{
+ ZEN_TRACE_CPU("Z$::ZenCacheNamespace::GatherReferences");
+
+ Stopwatch Timer;
+ const auto Guard =
+ MakeGuard([&] { ZEN_DEBUG("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
+ m_DiskLayer.GatherReferences(GcCtx);
+}
+
+void
+ZenCacheNamespace::CollectGarbage(GcContext& GcCtx)
+{
+ ZEN_TRACE_CPU("Z$::Namespace::CollectGarbage");
+
+ m_DiskLayer.CollectGarbage(GcCtx);
+}
+
+GcStorageSize
+ZenCacheNamespace::StorageSize() const
+{
+ return m_DiskLayer.StorageSize();
+}
+
+ZenCacheNamespace::Info
+ZenCacheNamespace::GetInfo() const
+{
+ ZenCacheNamespace::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration, .DiskLayerInfo = m_DiskLayer.GetInfo()};
+ std::unordered_set<std::string> BucketNames;
+ for (const std::string& BucketName : Info.DiskLayerInfo.BucketNames)
+ {
+ BucketNames.insert(BucketName);
+ }
+ Info.BucketNames.insert(Info.BucketNames.end(), BucketNames.begin(), BucketNames.end());
+ return Info;
+}
+
+std::optional<ZenCacheNamespace::BucketInfo>
+ZenCacheNamespace::GetBucketInfo(std::string_view Bucket) const
+{
+ std::optional<ZenCacheDiskLayer::BucketInfo> DiskBucketInfo = m_DiskLayer.GetBucketInfo(Bucket);
+ if (!DiskBucketInfo.has_value())
+ {
+ return {};
+ }
+ ZenCacheNamespace::BucketInfo Info = {.DiskLayerInfo = *DiskBucketInfo};
+ return Info;
+}
+
+ZenCacheNamespace::NamespaceStats
+ZenCacheNamespace::Stats()
+{
+ return ZenCacheNamespace::NamespaceStats{.HitCount = m_HitCount,
+ .MissCount = m_MissCount,
+ .WriteCount = m_WriteCount,
+ .PutOps = m_PutOps.Snapshot(),
+ .GetOps = m_GetOps.Snapshot(),
+ .DiskStats = m_DiskLayer.Stats()};
+}
+
+CacheValueDetails::NamespaceDetails
+ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const
+{
+ return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter);
+}
+
+#if ZEN_WITH_TESTS
+void
+ZenCacheNamespace::SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time)
+{
+ m_DiskLayer.SetAccessTime(Bucket, HashKey, Time);
+}
+#endif // ZEN_WITH_TESTS
+
+//////////////////////////// ZenCacheStore
+
+ZEN_DEFINE_LOG_CATEGORY_STATIC(LogCacheActivity, "z$");
+
+static constinit std::string_view UE4DDCNamespaceName = "ue4.ddc";
+
+ZenCacheStore::ZenCacheStore(GcManager& Gc,
+ JobQueue& JobQueue,
+ const std::filesystem::path& BasePath,
+ const Configuration& Configuration,
+ const DiskWriteBlocker* InDiskWriteBlocker)
+: m_DiskWriteBlocker(InDiskWriteBlocker)
+, m_Gc(Gc)
+, m_JobQueue(JobQueue)
+, m_BasePath(BasePath)
+, m_Configuration(Configuration)
+, m_ExitLogging(false)
+{
+ SetLoggingConfig(m_Configuration.Logging);
+ CreateDirectories(m_BasePath);
+
+ ZEN_INFO("initializing cache store at '{}'", m_BasePath);
+
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
+
+ std::vector<std::string> Namespaces;
+ for (const std::filesystem::path& DirPath : DirContent.Directories)
+ {
+ std::string DirName = PathToUtf8(DirPath.filename());
+ if (DirName.starts_with(NamespaceDiskPrefix))
+ {
+ Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length()));
+ continue;
+ }
+ }
+
+ ZEN_INFO("Found {} namespaces in '{}'", Namespaces.size(), m_BasePath);
+
+ if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end())
+ {
+ // default (unspecified) and ue4-ddc namespace points to the same namespace instance
+
+ std::filesystem::path DefaultNamespaceFolder = m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
+ CreateDirectories(DefaultNamespaceFolder);
+ Namespaces.push_back(std::string(UE4DDCNamespaceName));
+ }
+
+ for (const std::string& NamespaceName : Namespaces)
+ {
+ m_Namespaces[NamespaceName] =
+ std::make_unique<ZenCacheNamespace>(Gc,
+ m_JobQueue,
+ m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName),
+ m_Configuration.NamespaceConfig);
+ }
+}
+
+ZenCacheStore::~ZenCacheStore()
+{
+ ZEN_INFO("closing cache store at '{}'", m_BasePath);
+ SetLoggingConfig({.EnableWriteLog = false, .EnableAccessLog = false});
+ m_Namespaces.clear();
+}
+
+void
+ZenCacheStore::LogWorker()
+{
+ SetCurrentThreadName("ZenCacheStore::LogWorker");
+
+ LoggerRef ZCacheLog(logging::Get("z$"));
+
+ auto Log = [&ZCacheLog]() -> LoggerRef { return ZCacheLog; };
+
+ std::vector<AccessLogItem> Items;
+ while (true)
+ {
+ try
+ {
+ m_LogQueueLock.WithExclusiveLock([this, &Items]() { Items.swap(m_LogQueue); });
+ if (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed())
+ {
+ for (const auto& Item : Items)
+ {
+ if (Item.Value.Value)
+ {
+ const bool IsCbObject = Item.Value.Value.GetContentType() == ZenContentType::kCbObject;
+
+ try
+ {
+ const IoHash ObjectHash = IsCbObject ? IoHash::HashBuffer(Item.Value.Value.GetView()) : Item.Value.RawHash;
+ const size_t ObjectSize = IsCbObject ? Item.Value.Value.GetSize() : Item.Value.RawSize;
+
+ ZEN_LOG_INFO(LogCacheActivity,
+ "{} [{}] {}/{}/{} -> {} {} {}",
+ Item.Op,
+ Item.Context,
+ Item.Namespace,
+ Item.Bucket,
+ Item.HashKey,
+ ObjectHash,
+ ObjectSize,
+ ToString(Item.Value.Value.GetContentType()))
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_LOG_INFO(LogCacheActivity,
+ "{} [{}] {}/{}/{} failed: Reason: '{}'",
+ Item.Op,
+ Item.Context,
+ Item.Namespace,
+ Item.Bucket,
+ Item.HashKey,
+ Ex.what())
+ }
+ }
+ else
+ {
+ ZEN_LOG_INFO(LogCacheActivity,
+ "{} [{}] {}/{}/{}",
+ Item.Op,
+ Item.Context,
+ Item.Namespace,
+ Item.Bucket,
+ Item.HashKey);
+ }
+ }
+ }
+ if (!Items.empty())
+ {
+ Items.resize(0);
+ continue;
+ }
+ if (m_ExitLogging)
+ {
+ break;
+ }
+ m_LogEvent.Wait();
+ m_LogEvent.Reset();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("Log writer failed: '{}'", Ex.what());
+ }
+ }
+}
+
+bool
+ZenCacheStore::Get(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ ZenCacheValue& OutValue)
+{
+ // Ad hoc rejection of known bad usage patterns for DDC bucket names
+
+ if (IsKnownBadBucketName(Bucket))
+ {
+ m_RejectedReadCount++;
+ return false;
+ }
+
+ ZEN_TRACE_CPU("Z$::Get");
+
+ metrics::RequestStats::Scope OpScope(m_GetOps, 0);
+
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ bool Result = Store->Get(Bucket, HashKey, OutValue);
+
+ if (m_AccessLogEnabled)
+ {
+ ZEN_TRACE_CPU("Z$::Get::AccessLog");
+ bool Signal = false;
+ m_LogQueueLock.WithExclusiveLock([&]() {
+ Signal = m_LogQueue.empty();
+ m_LogQueue.emplace_back(AccessLogItem{.Op = Result ? "GET HIT " : "GET MISS",
+ .Context = Context,
+ .Namespace = std::string(Namespace),
+ .Bucket = std::string(Bucket),
+ .HashKey = HashKey,
+ .Value = OutValue /*,
+ .Result = Result*/});
+ });
+ if (Signal)
+ {
+ m_LogEvent.Set();
+ }
+ }
+ if (Result)
+ {
+ m_HitCount++;
+ OpScope.SetBytes(OutValue.Value.GetSize());
+ return true;
+ }
+
+ m_MissCount++;
+ return false;
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get [{}], bucket '{}', key '{}'",
+ Context,
+ Namespace,
+ Bucket,
+ HashKey.ToHexString());
+
+ m_MissCount++;
+ return false;
+}
+
+void
+ZenCacheStore::Put(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References)
+{
+ // Ad hoc rejection of known bad usage patterns for DDC bucket names
+
+ if (IsKnownBadBucketName(Bucket))
+ {
+ m_RejectedWriteCount++;
+ return;
+ }
+
+ ZEN_TRACE_CPU("Z$::Put");
+
+ metrics::RequestStats::Scope $(m_PutOps, Value.Value.GetSize());
+
+ if (m_WriteLogEnabled)
+ {
+ ZEN_TRACE_CPU("Z$::Get::WriteLog");
+ bool Signal = false;
+ m_LogQueueLock.WithExclusiveLock([&]() {
+ Signal = m_LogQueue.empty();
+ m_LogQueue.emplace_back(AccessLogItem{.Op = "PUT ",
+ .Context = Context,
+ .Namespace = std::string(Namespace),
+ .Bucket = std::string(Bucket),
+ .HashKey = HashKey,
+ .Value = Value /*,
+ .Result = true*/});
+ });
+ if (Signal)
+ {
+ m_LogEvent.Set();
+ }
+ }
+
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ Store->Put(Bucket, HashKey, Value, References);
+ m_WriteCount++;
+ return;
+ }
+
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'",
+ Context,
+ Namespace,
+ Bucket,
+ HashKey.ToHexString());
+}
+
+bool
+ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket)
+{
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ return Store->DropBucket(Bucket);
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropBucket, bucket '{}'", Namespace, Bucket);
+ return false;
+}
+
+bool
+ZenCacheStore::DropNamespace(std::string_view InNamespace)
+{
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end())
+ {
+ ZenCacheNamespace& Namespace = *It->second;
+ m_DroppedNamespaces.push_back(std::move(It->second));
+ m_Namespaces.erase(It);
+ return Namespace.Drop();
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace);
+ return false;
+}
+
+void
+ZenCacheStore::Flush()
+{
+ ZEN_INFO("flushing cache store at '{}'", m_BasePath);
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); });
+}
+
+void
+ZenCacheStore::ScrubStorage(ScrubContext& Ctx)
+{
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.ScrubStorage(Ctx); });
+}
+
+CacheValueDetails
+ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter,
+ const std::string_view BucketFilter,
+ const std::string_view ValueFilter) const
+{
+ CacheValueDetails Details;
+ if (NamespaceFilter.empty())
+ {
+ IterateNamespaces([&](std::string_view Namespace, ZenCacheNamespace& Store) {
+ Details.Namespaces[std::string(Namespace)] = Store.GetValueDetails(BucketFilter, ValueFilter);
+ });
+ }
+ else if (const ZenCacheNamespace* Store = FindNamespace(NamespaceFilter); Store != nullptr)
+ {
+ Details.Namespaces[std::string(NamespaceFilter)] = Store->GetValueDetails(BucketFilter, ValueFilter);
+ }
+ return Details;
+}
+
+void
+ZenCacheStore::EnumerateBucketContents(std::string_view Namespace,
+ std::string_view Bucket,
+ std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>&& Fn)
+{
+ if (const ZenCacheNamespace* Ns = FindNamespace(Namespace))
+ {
+ Ns->EnumerateBucketContents(Bucket, Fn);
+ }
+}
+
+ZenCacheNamespace*
+ZenCacheStore::GetNamespace(std::string_view Namespace)
+{
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ if (Namespace == DefaultNamespace)
+ {
+ if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ }
+ _.ReleaseNow();
+
+ if (!m_Configuration.AllowAutomaticCreationOfNamespaces)
+ {
+ return nullptr;
+ }
+
+ RwLock::ExclusiveLockScope __(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+
+ auto NewNamespace =
+ m_Namespaces.insert_or_assign(std::string(Namespace),
+ std::make_unique<ZenCacheNamespace>(m_Gc,
+ m_JobQueue,
+ m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace),
+ m_Configuration.NamespaceConfig));
+ return NewNamespace.first->second.get();
+}
+
+const ZenCacheNamespace*
+ZenCacheStore::FindNamespace(std::string_view Namespace) const
+{
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ if (Namespace == DefaultNamespace)
+ {
+ if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ }
+ return nullptr;
+}
+
+std::vector<std::string>
+ZenCacheStore::GetNamespaces()
+{
+ std::vector<std::string> Namespaces;
+
+ IterateNamespaces([&](std::string_view Namespace, ZenCacheNamespace&) { Namespaces.push_back(std::string(Namespace)); });
+
+ return Namespaces;
+}
+
+void
+ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const
+{
+ std::vector<std::pair<std::string, ZenCacheNamespace&>> Namespaces;
+ {
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ Namespaces.reserve(m_Namespaces.size());
+ for (const auto& Entry : m_Namespaces)
+ {
+ if (Entry.first == DefaultNamespace)
+ {
+ continue;
+ }
+ Namespaces.push_back({Entry.first, *Entry.second});
+ }
+ }
+ for (auto& Entry : Namespaces)
+ {
+ Callback(Entry.first, Entry.second);
+ }
+}
+
+GcStorageSize
+ZenCacheStore::StorageSize() const
+{
+ GcStorageSize Size;
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) {
+ GcStorageSize StoreSize = Store.StorageSize();
+ Size.MemorySize += StoreSize.MemorySize;
+ Size.DiskSize += StoreSize.DiskSize;
+ });
+ return Size;
+}
+
+ZenCacheStore::CacheStoreStats
+ZenCacheStore::Stats(bool IncludeNamespaceStats)
+{
+ ZenCacheStore::CacheStoreStats Result{.HitCount = m_HitCount,
+ .MissCount = m_MissCount,
+ .WriteCount = m_WriteCount,
+ .RejectedWriteCount = m_RejectedWriteCount,
+ .RejectedReadCount = m_RejectedReadCount,
+ .PutOps = m_PutOps.Snapshot(),
+ .GetOps = m_GetOps.Snapshot()};
+
+ if (IncludeNamespaceStats)
+ {
+ IterateNamespaces([&](std::string_view NamespaceName, ZenCacheNamespace& Store) {
+ Result.NamespaceStats.emplace_back(NamedNamespaceStats{.NamespaceName = std::string(NamespaceName), .Stats = Store.Stats()});
+ });
+ }
+
+ return Result;
+}
+
+void
+ZenCacheStore::ReportMetrics(StatsMetrics& Statsd)
+{
+ const bool IncludeNamespaceStats = false;
+ const CacheStoreStats Now = Stats(IncludeNamespaceStats);
+ const CacheStoreStats& Old = m_LastReportedMetrics;
+
+ Statsd.Meter("zen.cache_hits", Now.HitCount - Old.HitCount);
+ Statsd.Meter("zen.cache_misses", Now.MissCount - Old.MissCount);
+ Statsd.Meter("zen.cache_writes", Now.WriteCount - Old.WriteCount);
+
+ m_LastReportedMetrics = Now;
+}
+
+void
+ZenCacheStore::SetLoggingConfig(const Configuration::LogConfig& Loggingconfig)
+{
+ if (!Loggingconfig.EnableAccessLog && !Loggingconfig.EnableWriteLog)
+ {
+ m_AccessLogEnabled.store(false);
+ m_WriteLogEnabled.store(false);
+ m_ExitLogging.store(true);
+ m_LogEvent.Set();
+ if (m_AsyncLoggingThread.joinable())
+ {
+ m_AsyncLoggingThread.join();
+ }
+ m_Configuration.Logging = Loggingconfig;
+ return;
+ }
+ if (!m_AccessLogEnabled.load() && !m_WriteLogEnabled.load())
+ {
+ m_AsyncLoggingThread = std::thread(&ZenCacheStore::LogWorker, this);
+ }
+ m_WriteLogEnabled.store(Loggingconfig.EnableWriteLog);
+ m_AccessLogEnabled.store(Loggingconfig.EnableAccessLog);
+ m_Configuration.Logging = Loggingconfig;
+}
+
+ZenCacheStore::Info
+ZenCacheStore::GetInfo() const
+{
+ ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()};
+
+ IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) {
+ Info.NamespaceNames.push_back(std::string(NamespaceName));
+ ZenCacheNamespace::Info NamespaceInfo = Namespace.GetInfo();
+ Info.DiskEntryCount += NamespaceInfo.DiskLayerInfo.EntryCount;
+ });
+
+ return Info;
+}
+
+std::optional<ZenCacheNamespace::Info>
+ZenCacheStore::GetNamespaceInfo(std::string_view NamespaceName)
+{
+ if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace)
+ {
+ return Namespace->GetInfo();
+ }
+ return {};
+}
+
+std::optional<ZenCacheNamespace::BucketInfo>
+ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view BucketName)
+{
+ if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace)
+ {
+ return Namespace->GetBucketInfo(BucketName);
+ }
+ return {};
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+using namespace std::literals;
+
+namespace testutils {
+ IoHash CreateKey(size_t KeyValue) { return IoHash::HashBuffer(&KeyValue, sizeof(size_t)); }
+
+ IoHash ToIoHash(const Oid& Id)
+ {
+ char OIdString[24 + 1];
+ Id.ToString(OIdString);
+ IoHash Key = IoHash::HashBuffer(OIdString, 24);
+ return Key;
+ }
+
+ std::pair<Oid, IoBuffer> CreateBinaryBlob(size_t Size) { return {Oid::NewOid(), CreateRandomBlob(Size)}; }
+
+ std::vector<std::pair<Oid, CompressedBuffer>> CreateCompressedAttachment(CidStore& Store, const std::span<const size_t>& Sizes)
+ {
+ std::vector<std::pair<Oid, CompressedBuffer>> Result;
+ Result.reserve(Sizes.size());
+ for (size_t Size : Sizes)
+ {
+ auto Blob = CreateBinaryBlob(Size);
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Blob.second.Data(), Blob.second.Size()));
+ CHECK(!Store.ContainsChunk(Compressed.DecodeRawHash()));
+ Result.emplace_back(std::pair<Oid, CompressedBuffer>(Blob.first, Compressed));
+ }
+ return Result;
+ }
+
+ std::pair<IoHash, IoBuffer> CreateRecord(std::span<std::pair<Oid, CompressedBuffer>> Attachments)
+ {
+ Oid Id = Oid::NewOid();
+ IoHash Key = ToIoHash(Id);
+ CbObjectWriter Record;
+ Record << "Key"sv << Id;
+
+ for (size_t Idx = 0; auto& Cid : Attachments)
+ {
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid.second.DecodeRawHash());
+ }
+
+ IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+ return {Key, Buffer};
+ }
+
+ struct FalseType
+ {
+ static const bool Enabled = false;
+ };
+ struct TrueType
+ {
+ static const bool Enabled = true;
+ };
+
+} // namespace testutils
+
+TEST_CASE_TEMPLATE("z$.store", ReferenceCaching, testutils::FalseType, testutils::TrueType)
+{
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ const int kIterationCount = 100;
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ const IoHash Key = IoHash::HashBuffer(&i, sizeof i);
+
+ CbObjectWriter Cbo;
+ Cbo << "hey" << i;
+ CbObject Obj = Cbo.Save();
+
+ ZenCacheValue Value;
+ Value.Value = Obj.GetBuffer().AsIoBuffer();
+ Value.Value.SetContentType(ZenContentType::kCbObject);
+
+ Zcs.Put("test_bucket"sv, Key, Value, {});
+ }
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ const IoHash Key = IoHash::HashBuffer(&i, sizeof i);
+
+ ZenCacheValue Value;
+ Zcs.Get("test_bucket"sv, Key, /* out */ Value);
+
+ REQUIRE(Value.Value);
+ CHECK(Value.Value.GetContentType() == ZenContentType::kCbObject);
+ CHECK_EQ(ValidateCompactBinary(Value.Value, CbValidateMode::All), CbValidateError::None);
+ CbObject Obj = LoadCompactBinaryObject(Value.Value);
+ CHECK_EQ(Obj["hey"].AsInt32(), i);
+ }
+}
+
+TEST_CASE_TEMPLATE("z$.size", ReferenceCaching, testutils::FalseType, testutils::TrueType)
+{
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ SUBCASE("mem/disklayer")
+ {
+ const size_t Count = 16;
+ ScopedTemporaryDirectory TempDir;
+
+ GcStorageSize CacheSize;
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ CbObject CacheValue = CreateCacheValue(Zcs.GetConfig().DiskLayerConfig.BucketConfig.MemCacheSizeThreshold - 256);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ std::vector<std::pair<std::string, IoHash>> Keys;
+
+ for (size_t Key = 0; Key < Count; ++Key)
+ {
+ const size_t Bucket = Key % 4;
+ std::string BucketName = fmt::format("test_bucket-{}", Bucket);
+ IoHash Hash = IoHash::HashBuffer(&Key, sizeof(uint32_t));
+ Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {});
+ Keys.push_back({BucketName, Hash});
+ }
+ CacheSize = Zcs.StorageSize();
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
+ CHECK_EQ(0, CacheSize.MemorySize);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue _;
+ Zcs.Get(Key.first, Key.second, _);
+ }
+
+ CacheSize = Zcs.StorageSize();
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize);
+ }
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ const GcStorageSize SerializedSize = Zcs.StorageSize();
+ CHECK_EQ(SerializedSize.MemorySize, 0);
+ CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
+
+ for (size_t Bucket = 0; Bucket < 4; ++Bucket)
+ {
+ Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
+ }
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ CHECK_EQ(0, Zcs.StorageSize().MemorySize);
+ }
+ }
+
+ SUBCASE("disklayer")
+ {
+ const size_t Count = 16;
+ ScopedTemporaryDirectory TempDir;
+
+ GcStorageSize CacheSize;
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ CbObject CacheValue = CreateCacheValue(Zcs.GetConfig().DiskLayerConfig.BucketConfig.MemCacheSizeThreshold + 64);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ for (size_t Key = 0; Key < Count; ++Key)
+ {
+ const size_t Bucket = Key % 4;
+ Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {});
+ }
+
+ CacheSize = Zcs.StorageSize();
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
+ CHECK_EQ(0, CacheSize.MemorySize);
+ }
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ const GcStorageSize SerializedSize = Zcs.StorageSize();
+ CHECK_EQ(SerializedSize.MemorySize, 0);
+ CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
+
+ for (size_t Bucket = 0; Bucket < 4; ++Bucket)
+ {
+ Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
+ }
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ }
+ }
+}
+
+TEST_CASE_TEMPLATE("z$.gc", ReferenceCaching, testutils::FalseType, testutils::TrueType)
+{
+ using namespace testutils;
+
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
+ SUBCASE("gather references does NOT add references for expired cache entries")
+ {
+ ScopedTemporaryDirectory TempDir;
+ std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)};
+
+ const auto CollectAndFilter = [](GcManager& Gc,
+ GcClock::TimePoint Time,
+ GcClock::Duration MaxDuration,
+ std::span<const IoHash> Cids,
+ std::vector<IoHash>& OutKeep) {
+ GcContext GcCtx(Time - MaxDuration, Time - MaxDuration);
+ Gc.CollectGarbage(GcCtx);
+ OutKeep.clear();
+ GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); });
+ };
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ const auto Bucket = "teardrinker"sv;
+
+ // Create a cache record
+ const IoHash Key = CreateKey(42);
+ CbObjectWriter Record;
+ Record << "Key"sv
+ << "SomeRecord"sv;
+
+ for (size_t Idx = 0; auto& Cid : Cids)
+ {
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid);
+ }
+
+ IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ Zcs.Put(Bucket, Key, {.Value = Buffer}, Cids);
+
+ std::vector<IoHash> Keep;
+
+ // Collect garbage with 1 hour max cache duration
+ {
+ CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(Cids.size(), Keep.size());
+ }
+
+ // Move forward in time
+ {
+ CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(0, Keep.size());
+ }
+ }
+
+ // Expect timestamps to be serialized
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ std::vector<IoHash> Keep;
+
+ // Collect garbage with 1 hour max cache duration
+ {
+ CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(3, Keep.size());
+ }
+
+ // Move forward in time
+ {
+ CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(0, Keep.size());
+ }
+ }
+ }
+
+ SUBCASE("gc removes standalone values")
+ {
+ ScopedTemporaryDirectory TempDir;
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ const auto Bucket = "fortysixandtwo"sv;
+ const GcClock::TimePoint CurrentTime = GcClock::Now();
+
+ std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
+
+ for (const auto& Key : Keys)
+ {
+ IoBuffer Value = CreateRandomBlob(128 << 10);
+ Zcs.Put(Bucket, Key, {.Value = Value}, {});
+ }
+
+ {
+ GcContext GcCtx(CurrentTime - std::chrono::hours(46), CurrentTime - std::chrono::hours(46));
+
+ Gc.CollectGarbage(GcCtx);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(Exists);
+ }
+ }
+
+ // Move forward in time and collect again
+ {
+ GcContext GcCtx(CurrentTime + std::chrono::minutes(2), CurrentTime + std::chrono::minutes(2));
+ Gc.CollectGarbage(GcCtx);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(!Exists);
+ }
+
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ }
+ }
+
+ SUBCASE("gc removes small objects")
+ {
+ ScopedTemporaryDirectory TempDir;
+ GcManager Gc;
+ {
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ const auto Bucket = "rightintwo"sv;
+
+ std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
+
+ for (const auto& Key : Keys)
+ {
+ IoBuffer Value = CreateRandomBlob(128);
+ Zcs.Put(Bucket, Key, {.Value = Value}, {});
+ }
+
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(2), GcClock::Now() - std::chrono::hours(2));
+ GcCtx.CollectSmallObjects(true);
+
+ Gc.CollectGarbage(GcCtx);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(Exists);
+ }
+ }
+
+ // Move forward in time and collect again
+ {
+ GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2), GcClock::Now() + std::chrono::minutes(2));
+ GcCtx.CollectSmallObjects(true);
+
+ Zcs.Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(!Exists);
+ }
+ // GC could not remove the currently written block so size will not be zero
+ CHECK_NE(0, Zcs.StorageSize().DiskSize);
+ }
+ }
+ {
+ // Unreferenced blocks will be pruned so size should now be zero
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ }
+ }
+}
+
+TEST_CASE_TEMPLATE("z$.threadedinsert", ReferenceCaching, testutils::FalseType, testutils::TrueType) // * doctest::skip(true))
+{
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 8192;
+
+ struct Chunk
+ {
+ std::string Bucket;
+ IoBuffer Buffer;
+ };
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks;
+ Chunks.reserve(kChunkCount);
+
+ const std::string Bucket1 = "rightinone";
+ const std::string Bucket2 = "rightintwo";
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ while (true)
+ {
+ IoBuffer Chunk = CreateRandomBlob(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ break;
+ }
+ while (true)
+ {
+ IoBuffer Chunk = CreateRandomBlob(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ break;
+ }
+ }
+
+ CreateDirectories(TempDir.Path());
+
+ WorkerThreadPool ThreadPool(4);
+ GcManager Gc;
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path(),
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+
+ const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
+ CHECK_LE(kChunkSize * Chunks.size(), TotalSize);
+
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
+ std::string Bucket = Chunk.second.Bucket;
+ IoHash ChunkHash = Chunk.first;
+ ZenCacheValue CacheValue;
+
+ CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
+ IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+ std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes;
+ GcChunkHashes.reserve(Chunks.size());
+ for (const auto& Chunk : Chunks)
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ {
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks;
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ {
+ IoBuffer Chunk = CreateRandomBlob(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ }
+ {
+ IoBuffer Chunk = CreateRandomBlob(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ }
+ }
+
+ std::atomic<size_t> WorkCompleted = 0;
+ std::atomic_uint32_t AddedChunkCount = 0;
+ for (const auto& Chunk : NewChunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ }
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (AddedChunkCount.load() < NewChunks.size())
+ {
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ }
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.AddRetainedCids(KeepHashes);
+ Zcs.CollectGarbage(GcCtx);
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+
+ while (WorkCompleted < NewChunks.size() + Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ {
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ }
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.AddRetainedCids(KeepHashes);
+ Zcs.CollectGarbage(GcCtx);
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+ }
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : GcChunkHashes)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < GcChunkHashes.size())
+ {
+ Sleep(1);
+ }
+ }
+ }
+}
+
+TEST_CASE("z$.namespaces")
+{
+ using namespace testutils;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
+ ScopedTemporaryDirectory TempDir;
+ CreateDirectories(TempDir.Path());
+
+ const CacheRequestContext Context;
+ IoHash Key1;
+ IoHash Key2;
+ {
+ GcManager Gc;
+ ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = false}, nullptr);
+ const auto Bucket = "teardrinker"sv;
+ const auto CustomNamespace = "mynamespace"sv;
+
+ // Create a cache record
+ Key1 = CreateKey(42);
+ CbObject CacheValue = CreateCacheValue(4096);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {});
+
+ ZenCacheValue GetValue;
+ CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
+ CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
+
+ // This should just be dropped as we don't allow creating of namespaces on the fly
+ Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {});
+ CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
+ }
+
+ {
+ GcManager Gc;
+ ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr);
+ const auto Bucket = "teardrinker"sv;
+ const auto CustomNamespace = "mynamespace"sv;
+
+ Key2 = CreateKey(43);
+ CbObject CacheValue2 = CreateCacheValue(4096);
+
+ IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
+ Buffer2.SetContentType(ZenContentType::kCbObject);
+ ZenCacheValue PutValue2 = {.Value = Buffer2};
+ Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {});
+
+ ZenCacheValue GetValue;
+ CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
+ CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
+ CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
+ CHECK(Zcs.Get(Context, CustomNamespace, Bucket, Key2, GetValue));
+ }
+}
+
+TEST_CASE("z$.drop.bucket")
+{
+ using namespace testutils;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
+ ScopedTemporaryDirectory TempDir;
+ CreateDirectories(TempDir.Path());
+
+ const CacheRequestContext Context;
+ IoHash Key1;
+ IoHash Key2;
+
+ auto PutValue = [&CreateCacheValue,
+ &Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
+ // Create a cache record
+ IoHash Key = CreateKey(KeyIndex);
+ CbObject CacheValue = CreateCacheValue(Size);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ return Key;
+ };
+ auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
+ ZenCacheValue GetValue;
+ Zcs.Get(Context, Namespace, Bucket, Key, GetValue);
+ return GetValue;
+ };
+ WorkerThreadPool Workers(1);
+ {
+ GcManager Gc;
+ ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr);
+ const auto Bucket = "teardrinker"sv;
+ const auto Namespace = "mynamespace"sv;
+
+ Key1 = PutValue(Zcs, Namespace, Bucket, 42, 4096);
+ Key2 = PutValue(Zcs, Namespace, Bucket, 43, 2048);
+
+ ZenCacheValue Value1 = GetValue(Zcs, Namespace, Bucket, Key1);
+ CHECK(Value1.Value);
+
+ std::atomic_bool WorkComplete = false;
+ Workers.ScheduleWork([&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ WorkComplete = true;
+ });
+ // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
+ // Our DropBucket execution blocks any incoming request from completing until we are done with the drop
+ CHECK(Zcs.DropBucket(Namespace, Bucket));
+ while (!WorkComplete)
+ {
+ zen::Sleep(1);
+ }
+
+ // Entire bucket should be dropped, but doing a request should will re-create the namespace but it must still be empty
+ Value1 = GetValue(Zcs, Namespace, Bucket, Key1);
+ CHECK(!Value1.Value);
+ ZenCacheValue Value2 = GetValue(Zcs, Namespace, Bucket, Key2);
+ CHECK(!Value2.Value);
+ }
+}
+
+TEST_CASE("z$.drop.namespace")
+{
+ using namespace testutils;
+
+ const CacheRequestContext Context;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
+ ScopedTemporaryDirectory TempDir;
+ CreateDirectories(TempDir.Path());
+
+ auto PutValue = [&CreateCacheValue,
+ &Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
+ // Create a cache record
+ IoHash Key = CreateKey(KeyIndex);
+ CbObject CacheValue = CreateCacheValue(Size);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ return Key;
+ };
+ auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
+ ZenCacheValue GetValue;
+ Zcs.Get(Context, Namespace, Bucket, Key, GetValue);
+ return GetValue;
+ };
+ WorkerThreadPool Workers(1);
+ {
+ GcManager Gc;
+ ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr);
+ const auto Bucket1 = "teardrinker1"sv;
+ const auto Bucket2 = "teardrinker2"sv;
+ const auto Namespace1 = "mynamespace1"sv;
+ const auto Namespace2 = "mynamespace2"sv;
+
+ IoHash Key1 = PutValue(Zcs, Namespace1, Bucket1, 42, 4096);
+ IoHash Key2 = PutValue(Zcs, Namespace1, Bucket2, 43, 2048);
+ IoHash Key3 = PutValue(Zcs, Namespace2, Bucket1, 44, 4096);
+ IoHash Key4 = PutValue(Zcs, Namespace2, Bucket2, 45, 2048);
+
+ ZenCacheValue Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1);
+ CHECK(Value1.Value);
+ ZenCacheValue Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2);
+ CHECK(Value2.Value);
+ ZenCacheValue Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3);
+ CHECK(Value3.Value);
+ ZenCacheValue Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4);
+ CHECK(Value4.Value);
+
+ std::atomic_bool WorkComplete = false;
+ Workers.ScheduleWork([&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ Value2.Value = IoBuffer{};
+ Value3.Value = IoBuffer{};
+ Value4.Value = IoBuffer{};
+ WorkComplete = true;
+ });
+ // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
+ // Our DropBucket execution blocks any incoming request from completing until we are done with the drop
+ CHECK(Zcs.DropNamespace(Namespace1));
+ while (!WorkComplete)
+ {
+ zen::Sleep(1);
+ }
+
+ // Entire namespace should be dropped, but doing a request should will re-create the namespace but it must still be empty
+ Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1);
+ CHECK(!Value1.Value);
+ Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2);
+ CHECK(!Value2.Value);
+ Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3);
+ CHECK(Value3.Value);
+ Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4);
+ CHECK(Value4.Value);
+ }
+}
+
+TEST_CASE_TEMPLATE("z$.blocked.disklayer.put", ReferenceCaching, testutils::FalseType, testutils::TrueType)
+{
+ ScopedTemporaryDirectory TempDir;
+
+ GcStorageSize CacheSize;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size, Size & 0xff);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ GcManager Gc;
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ CbObject CacheValue = CreateCacheValue(64 * 1024 + 64);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ size_t Key = Buffer.Size();
+ IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t));
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {});
+
+ ZenCacheValue BufferGet;
+ CHECK(Zcs.Get("test_bucket", HashKey, BufferGet));
+
+ CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1);
+ IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
+ Buffer2.SetContentType(ZenContentType::kCbObject);
+
+ // We should be able to overwrite even if the file is open for read
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {});
+
+ MemoryView OldView = BufferGet.Value.GetView();
+
+ ZenCacheValue BufferGet2;
+ CHECK(Zcs.Get("test_bucket", HashKey, BufferGet2));
+ MemoryView NewView = BufferGet2.Value.GetView();
+
+ // Make sure file openend for read before we wrote it still have old data
+ CHECK(OldView.GetSize() == Buffer.GetSize());
+ CHECK(memcmp(OldView.GetData(), Buffer.GetData(), OldView.GetSize()) == 0);
+
+ // Make sure we get the new data when reading after we write new data
+ CHECK(NewView.GetSize() == Buffer2.GetSize());
+ CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0);
+}
+
+TEST_CASE_TEMPLATE("z$.scrub", ReferenceCaching, testutils::FalseType, testutils::TrueType)
+{
+ ScopedTemporaryDirectory TempDir;
+
+ using namespace testutils;
+
+ struct CacheRecord
+ {
+ IoBuffer Record;
+ std::vector<CompressedBuffer> Attachments;
+ };
+
+ auto CreateCacheRecord = [](bool Structured, std::string_view Bucket, const IoHash& Key, const std::vector<size_t>& AttachmentSizes) {
+ CacheRecord Result;
+ if (Structured)
+ {
+ Result.Attachments.resize(AttachmentSizes.size());
+ CbObjectWriter Record;
+ Record.BeginObject("Key"sv);
+ {
+ Record << "Bucket"sv << Bucket;
+ Record << "Hash"sv << Key;
+ }
+ Record.EndObject();
+ for (size_t Index = 0; Index < AttachmentSizes.size(); Index++)
+ {
+ IoBuffer AttachmentData = CreateRandomBlob(AttachmentSizes[Index]);
+ CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData));
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), CompressedAttachmentData.DecodeRawHash());
+ Result.Attachments[Index] = CompressedAttachmentData;
+ }
+ Result.Record = Record.Save().GetBuffer().AsIoBuffer();
+ Result.Record.SetContentType(ZenContentType::kCbObject);
+ }
+ else
+ {
+ std::string RecordData = fmt::format("{}:{}", Bucket, Key.ToHexString());
+ size_t TotalSize = RecordData.length() + 1;
+ for (size_t AttachmentSize : AttachmentSizes)
+ {
+ TotalSize += AttachmentSize;
+ }
+ Result.Record = IoBuffer(TotalSize);
+ char* DataPtr = (char*)Result.Record.MutableData();
+ memcpy(DataPtr, RecordData.c_str(), RecordData.length() + 1);
+ DataPtr += RecordData.length() + 1;
+ for (size_t AttachmentSize : AttachmentSizes)
+ {
+ IoBuffer AttachmentData = CreateRandomBlob(AttachmentSize);
+ memcpy(DataPtr, AttachmentData.GetData(), AttachmentData.GetSize());
+ DataPtr += AttachmentData.GetSize();
+ }
+ }
+ return Result;
+ };
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ auto CreateRecords =
+ [&](bool IsStructured, std::string_view BucketName, const std::vector<IoHash>& Cids, const std::vector<size_t>& AttachmentSizes) {
+ for (const IoHash& Cid : Cids)
+ {
+ CacheRecord Record = CreateCacheRecord(IsStructured, BucketName, Cid, AttachmentSizes);
+ std::vector<IoHash> AttachmentHashes;
+ for (const CompressedBuffer& Attachment : Record.Attachments)
+ {
+ AttachmentHashes.push_back(Attachment.DecodeRawHash());
+ CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), AttachmentHashes.back());
+ }
+ Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes);
+ }
+ };
+
+ std::vector<size_t> AttachmentSizes = {16, 1000, 2000, 4000, 8000, 64000, 80000};
+
+ std::vector<IoHash> UnstructuredCids{CreateKey(4), CreateKey(5), CreateKey(6)};
+ CreateRecords(false, "mybucket"sv, UnstructuredCids, AttachmentSizes);
+
+ std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)};
+ CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes);
+
+ WorkerThreadPool ThreadPool{1};
+ ScrubContext ScrubCtx{ThreadPool};
+ Zcs.ScrubStorage(ScrubCtx);
+ CidStore.ScrubStorage(ScrubCtx);
+ CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());
+ CHECK(ScrubCtx.BadCids().GetSize() == 0);
+}
+
+TEST_CASE_TEMPLATE("z$.newgc.basics", ReferenceCaching, testutils::FalseType, testutils::TrueType)
+{
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
+ struct CacheEntry
+ {
+ IoBuffer Data;
+ std::vector<std::pair<Oid, CompressedBuffer>> Attachments;
+ };
+
+ std::unordered_map<IoHash, CacheEntry> CacheEntries;
+
+ auto CreateCacheRecord =
+ [&](ZenCacheNamespace& Zcs, CidStore& CidStore, std::string_view Bucket, std::span<std::pair<Oid, CompressedBuffer>> Attachments) {
+ std::vector<IoHash> AttachmentKeys;
+ for (const auto& Attachment : Attachments)
+ {
+ AttachmentKeys.push_back(Attachment.second.DecodeRawHash());
+ }
+ auto Record = CreateRecord(Attachments);
+ Zcs.Put(Bucket,
+ Record.first,
+ {.Value = Record.second,
+ .RawSize = Record.second.GetSize(),
+ .RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())},
+ AttachmentKeys);
+ for (const auto& Attachment : Attachments)
+ {
+ CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash());
+ }
+ CacheEntries.insert({Record.first, CacheEntry{.Data = Record.second, .Attachments = {Attachments.begin(), Attachments.end()}}});
+ return Record.first;
+ };
+ auto CreateCacheValue = [&](ZenCacheNamespace& Zcs, std::string_view Bucket, size_t Size) {
+ std::pair<Oid, IoBuffer> CacheValue = CreateBinaryBlob(Size);
+ IoHash Key = ToIoHash(CacheValue.first);
+ Zcs.Put(Bucket,
+ Key,
+ {.Value = CacheValue.second,
+ .RawSize = CacheValue.second.GetSize(),
+ .RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())},
+ {});
+ CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}});
+ return Key;
+ };
+
+ auto ValidateCacheEntry = [&](ZenCacheNamespace& Zcs,
+ CidStore& CidStore,
+ std::string_view Bucket,
+ const IoHash& Key,
+ bool ExpectCacheEntry,
+ bool ExpectAttachments) {
+ const CacheEntry& Entry = CacheEntries[Key];
+ ZenCacheValue Value;
+ bool CacheExists = Zcs.Get(Bucket, Key, Value);
+ if (ExpectCacheEntry)
+ {
+ if (!CacheExists)
+ {
+ return false;
+ }
+ if (Value.Value.GetSize() != Entry.Data.GetSize())
+ {
+ return false;
+ }
+ if (!Value.Value.GetView().EqualBytes(Entry.Data.GetView()))
+ {
+ return false;
+ }
+ }
+ else if (CacheExists)
+ {
+ return false;
+ }
+
+ if (ExpectAttachments)
+ {
+ for (const auto& Attachment : Entry.Attachments)
+ {
+ IoHash AttachmentHash = Attachment.second.DecodeRawHash();
+ IoBuffer StoredData = CidStore.FindChunkByCid(AttachmentHash);
+ if (!StoredData)
+ {
+ return false;
+ }
+ if (!StoredData.GetView().EqualBytes(Attachment.second.GetCompressed().Flatten().GetView()))
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ for (const auto& Attachment : Entry.Attachments)
+ {
+ IoHash AttachmentHash = Attachment.second.DecodeRawHash();
+ if (CidStore.ContainsChunk(AttachmentHash))
+ {
+ return false;
+ }
+ }
+ }
+ return true;
+ };
+
+ std::vector<IoHash> CacheRecords;
+ std::vector<IoHash> UnstructuredCacheValues;
+
+ const auto TearDrinkerBucket = "teardrinker"sv;
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ // Create some basic data
+ {
+ // Structured record with attachments
+ auto Attachments1 = CreateCompressedAttachment(CidStore, std::vector<size_t>{77, 1024 * 1024 * 2, 99, 1024 * 1024 * 2 + 87});
+ CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments1));
+
+ // Structured record with reuse of attachments
+ auto Attachments2 = CreateCompressedAttachment(CidStore, std::vector<size_t>{971});
+ Attachments2.push_back(Attachments1[0]);
+ Attachments2.push_back(Attachments1[1]);
+ CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments2));
+ }
+
+ CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, {}));
+
+ {
+ // Unstructured cache values
+ UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 84));
+ UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 591));
+ UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 1024 * 1024 * 3 + 7));
+ UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 71));
+ }
+ }
+
+ SUBCASE("expire nothing")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() - std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.CompactStoresStatSum.RemovedDisk);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all large objects, delete nothing")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.CompactStoresStatSum.RemovedDisk);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all object, delete nothing")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.CompactStoresStatSum.RemovedDisk);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all large objects, skip cid")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = true,
+ .SkipCidDelete = true,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_EQ(CacheEntries[UnstructuredCacheValues[2]].Data.GetSize(), Result.CompactStoresStatSum.RemovedDisk);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all objects, skip cid")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = true,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_GE(Result.CompactStoresStatSum.RemovedDisk, 0);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, true));
+ }
+ SUBCASE("expire all large objects")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(1u,
+ Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount); // Only one cache value is pruned/deleted as that is the only
+ // large item in the cache (all other large items as in cas)
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u,
+ Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats
+ .FoundCount); // We won't remove any references since all referencers are small which retains all references
+ CHECK_EQ(0u,
+ Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats
+ .DeletedCount); // We won't remove any references since all referencers are small which retains all references
+ CHECK_EQ(CacheEntries[UnstructuredCacheValues[2]].Data.GetSize(), Result.CompactStoresStatSum.RemovedDisk);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all objects")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_GT(Result.CompactStoresStatSum.RemovedDisk, 0);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false));
+ }
+
+ SUBCASE("keep 1 cache record, skip cid")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::hours(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = true,
+ .Verbose = true,
+ .CompactBlockUsageThresholdPercent = 100});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(6u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(6u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ uint64_t MinExpectedRemoveSize = CacheEntries[UnstructuredCacheValues[2]].Data.GetSize();
+ CHECK_LT(MinExpectedRemoveSize, Result.CompactStoresStatSum.RemovedDisk);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false));
+ }
+
+ SUBCASE("keep 2 cache records")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[1], GcClock::Now() + std::chrono::hours(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_GT(Result.CompactStoresStatSum.RemovedDisk, 0);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false));
+ }
+
+ SUBCASE("keep 3 cache value")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::hours(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_GT(Result.CompactStoresStatSum.RemovedDisk, 0);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+
+ SUBCASE("keep 3 cache value, skip cid")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ // Prime so we can check GC of memory layer
+ ZenCacheValue Dummy;
+ Zcs.Get(TearDrinkerBucket, CacheRecords[0], Dummy);
+ Zcs.Get(TearDrinkerBucket, CacheRecords[1], Dummy);
+ Zcs.Get(TearDrinkerBucket, CacheRecords[2], Dummy);
+ Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[0], Dummy);
+ Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[1], Dummy);
+ Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[2], Dummy);
+ Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[3], Dummy);
+
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::hours(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = true,
+ .Verbose = true,
+ .CompactBlockUsageThresholdPercent = 100});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_GT(Result.CompactStoresStatSum.RemovedDisk, 0);
+ uint64_t MemoryClean = CacheEntries[CacheRecords[0]].Data.GetSize() + CacheEntries[CacheRecords[1]].Data.GetSize() +
+ CacheEntries[CacheRecords[2]].Data.GetSize() + CacheEntries[UnstructuredCacheValues[0]].Data.GetSize();
+ CHECK_EQ(MemoryClean, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+
+ SUBCASE("leave write block")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ auto Attachments =
+ CreateCompressedAttachment(CidStore, std::vector<size_t>{177, 1024 * 1024 * 2 + 31, 8999, 1024 * 1024 * 2 + 187});
+ IoHash CacheRecord = CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments);
+ {
+ // Do get so it ends up in memcache
+ ZenCacheValue _;
+ Zcs.Get(TearDrinkerBucket, CacheRecord, _);
+ }
+
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecord, GcClock::Now() - std::chrono::hours(2));
+
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[1], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[2], GcClock::Now() + std::chrono::hours(2));
+
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[0], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::hours(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false,
+ .Verbose = true});
+ // Write block can't be compacted so Compacted will be less than Deleted
+ CHECK_EQ(8u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(9u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(4u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(4u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_EQ(Attachments[1].second.GetCompressed().GetSize() + Attachments[3].second.GetCompressed().GetSize(),
+ Result.CompactStoresStatSum.RemovedDisk);
+ uint64_t MemoryClean = CacheEntries[CacheRecord].Data.GetSize();
+ CHECK_EQ(MemoryClean, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+ }
+}
+
+#endif
+
+void
+z$_forcelink()
+{
+}
+
+} // namespace zen