aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/cache/structuredcachestore.cpp
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'src/zenserver/cache/structuredcachestore.cpp')
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp3648
1 files changed, 3648 insertions, 0 deletions
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
new file mode 100644
index 000000000..26e970073
--- /dev/null
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -0,0 +1,3648 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "structuredcachestore.h"
+
+#include <zencore/except.h>
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compress.h>
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/string.h>
+#include <zencore/thread.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/scrubcontext.h>
+
+#include <xxhash.h>
+
+#include <limits>
+
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+#endif
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/core.h>
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+# include <random>
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+namespace {
+
+#pragma pack(push)
+#pragma pack(1)
+
+ struct CacheBucketIndexHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t Version2 = 2;
+ static constexpr uint32_t CurrentVersion = Version2;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t PayloadAlignment = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+ };
+
+ static_assert(sizeof(CacheBucketIndexHeader) == 32);
+
+#pragma pack(pop)
+
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".slog";
+
+ std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + IndexExtension);
+ }
+
+ std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + ".tmp");
+ }
+
+ std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + LogExtension);
+ }
+
+ bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.GetFlags() &
+ ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+ {
+ return true;
+ }
+ if (Entry.Location.Reserved != 0)
+ {
+ OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString());
+ return false;
+ }
+ uint64_t Size = Entry.Location.Size();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+ bool MoveAndDeleteDirectory(const std::filesystem::path& Dir)
+ {
+ int DropIndex = 0;
+ do
+ {
+ if (!std::filesystem::exists(Dir))
+ {
+ return false;
+ }
+
+ std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex);
+ std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName;
+ if (std::filesystem::exists(DroppedBucketPath))
+ {
+ DropIndex++;
+ continue;
+ }
+
+ std::error_code Ec;
+ std::filesystem::rename(Dir, DroppedBucketPath, Ec);
+ if (!Ec)
+ {
+ DeleteDirectories(DroppedBucketPath);
+ return true;
+ }
+ // TODO: Do we need to bail at some point?
+ zen::Sleep(100);
+ } while (true);
+ }
+
+} // namespace
+
+namespace fs = std::filesystem;
+
+static CbObject
+LoadCompactBinaryObject(const fs::path& Path)
+{
+ FileContents Result = ReadFile(Path);
+
+ if (!Result.ErrorCode)
+ {
+ IoBuffer Buffer = Result.Flatten();
+ if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
+ {
+ return LoadCompactBinaryObject(Buffer);
+ }
+ }
+
+ return CbObject();
+}
+
+static void
+SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
+{
+ WriteFile(Path, Object.GetBuffer().AsIoBuffer());
+}
+
+ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir)
+: GcStorage(Gc)
+, GcContributor(Gc)
+, m_RootDir(RootDir)
+, m_DiskLayer(RootDir)
+{
+ ZEN_INFO("initializing structured cache at '{}'", RootDir);
+ CreateDirectories(RootDir);
+
+ m_DiskLayer.DiscoverBuckets();
+
+#if ZEN_USE_CACHE_TRACKER
+ m_AccessTracker.reset(new ZenCacheTracker(RootDir));
+#endif
+}
+
+ZenCacheNamespace::~ZenCacheNamespace()
+{
+}
+
+bool
+ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ ZEN_TRACE_CPU("Z$::Get");
+
+ bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue);
+
+#if ZEN_USE_CACHE_TRACKER
+ auto _ = MakeGuard([&] {
+ if (!Ok)
+ return;
+
+ m_AccessTracker->TrackAccess(InBucket, HashKey);
+ });
+#endif
+
+ if (Ok)
+ {
+ ZEN_ASSERT(OutValue.Value.Size());
+
+ return true;
+ }
+
+ Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue);
+
+ if (Ok)
+ {
+ ZEN_ASSERT(OutValue.Value.Size());
+
+ if (OutValue.Value.Size() <= m_DiskLayerSizeThreshold)
+ {
+ m_MemLayer.Put(InBucket, HashKey, OutValue);
+ }
+ }
+
+ return Ok;
+}
+
+void
+ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ ZEN_TRACE_CPU("Z$::Put");
+
+ // Store value and index
+
+ ZEN_ASSERT(Value.Value.Size());
+
+ m_DiskLayer.Put(InBucket, HashKey, Value);
+
+#if ZEN_USE_REF_TRACKING
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ if (ValidateCompactBinary(Value.Value, CbValidateMode::All) == CbValidateError::None)
+ {
+ CbObject Object{SharedBuffer(Value.Value)};
+
+ uint8_t TempBuffer[8 * sizeof(IoHash)];
+ std::pmr::monotonic_buffer_resource Linear{TempBuffer, sizeof TempBuffer};
+ std::pmr::polymorphic_allocator Allocator{&Linear};
+ std::pmr::vector<IoHash> CidReferences{Allocator};
+
+ Object.IterateAttachments([&](CbFieldView Field) { CidReferences.push_back(Field.AsAttachment()); });
+
+ m_Gc.OnNewCidReferences(CidReferences);
+ }
+ }
+#endif
+
+ if (Value.Value.Size() <= m_DiskLayerSizeThreshold)
+ {
+ m_MemLayer.Put(InBucket, HashKey, Value);
+ }
+}
+
+bool
+ZenCacheNamespace::DropBucket(std::string_view Bucket)
+{
+ ZEN_INFO("dropping bucket '{}'", Bucket);
+
+ // TODO: should ensure this is done atomically across all layers
+
+ const bool MemDropped = m_MemLayer.DropBucket(Bucket);
+ const bool DiskDropped = m_DiskLayer.DropBucket(Bucket);
+ const bool AnyDropped = MemDropped || DiskDropped;
+
+ ZEN_INFO("bucket '{}' was {}", Bucket, AnyDropped ? "dropped" : "not found");
+
+ return AnyDropped;
+}
+
+bool
+ZenCacheNamespace::Drop()
+{
+ m_MemLayer.Drop();
+ return m_DiskLayer.Drop();
+}
+
+void
+ZenCacheNamespace::Flush()
+{
+ m_DiskLayer.Flush();
+}
+
+void
+ZenCacheNamespace::Scrub(ScrubContext& Ctx)
+{
+ if (m_LastScrubTime == Ctx.ScrubTimestamp())
+ {
+ return;
+ }
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
+ m_DiskLayer.Scrub(Ctx);
+ m_MemLayer.Scrub(Ctx);
+}
+
+void
+ZenCacheNamespace::GatherReferences(GcContext& GcCtx)
+{
+ Stopwatch Timer;
+ const auto Guard =
+ MakeGuard([&] { ZEN_DEBUG("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
+ access_tracking::AccessTimes AccessTimes;
+ m_MemLayer.GatherAccessTimes(AccessTimes);
+
+ m_DiskLayer.UpdateAccessTimes(AccessTimes);
+ m_DiskLayer.GatherReferences(GcCtx);
+}
+
+void
+ZenCacheNamespace::CollectGarbage(GcContext& GcCtx)
+{
+ m_MemLayer.Reset();
+ m_DiskLayer.CollectGarbage(GcCtx);
+}
+
+GcStorageSize
+ZenCacheNamespace::StorageSize() const
+{
+ return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()};
+}
+
+ZenCacheNamespace::Info
+ZenCacheNamespace::GetInfo() const
+{
+ ZenCacheNamespace::Info Info = {.Config = {.RootDir = m_RootDir, .DiskLayerThreshold = m_DiskLayerSizeThreshold},
+ .DiskLayerInfo = m_DiskLayer.GetInfo(),
+ .MemoryLayerInfo = m_MemLayer.GetInfo()};
+ std::unordered_set<std::string> BucketNames;
+ for (const std::string& BucketName : Info.DiskLayerInfo.BucketNames)
+ {
+ BucketNames.insert(BucketName);
+ }
+ for (const std::string& BucketName : Info.MemoryLayerInfo.BucketNames)
+ {
+ BucketNames.insert(BucketName);
+ }
+ Info.BucketNames.insert(Info.BucketNames.end(), BucketNames.begin(), BucketNames.end());
+ return Info;
+}
+
+std::optional<ZenCacheNamespace::BucketInfo>
+ZenCacheNamespace::GetBucketInfo(std::string_view Bucket) const
+{
+ std::optional<ZenCacheDiskLayer::BucketInfo> DiskBucketInfo = m_DiskLayer.GetBucketInfo(Bucket);
+ if (!DiskBucketInfo.has_value())
+ {
+ return {};
+ }
+ ZenCacheNamespace::BucketInfo Info = {.DiskLayerInfo = *DiskBucketInfo,
+ .MemoryLayerInfo = m_MemLayer.GetBucketInfo(Bucket).value_or(ZenCacheMemoryLayer::BucketInfo{})};
+ return Info;
+}
+
+CacheValueDetails::NamespaceDetails
+ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const
+{
+ return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenCacheMemoryLayer::ZenCacheMemoryLayer()
+{
+}
+
+ZenCacheMemoryLayer::~ZenCacheMemoryLayer()
+{
+}
+
+bool
+ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ auto It = m_Buckets.find(std::string(InBucket));
+
+ if (It == m_Buckets.end())
+ {
+ return false;
+ }
+
+ CacheBucket* Bucket = It->second.get();
+
+ _.ReleaseNow();
+
+ // There's a race here. Since the lock is released early to allow
+ // inserts, the bucket delete path could end up deleting the
+ // underlying data structure
+
+ return Bucket->Get(HashKey, OutValue);
+}
+
+void
+ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ const auto BucketName = std::string(InBucket);
+ CacheBucket* Bucket = nullptr;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end())
+ {
+ Bucket = It->second.get();
+ }
+ }
+
+ if (Bucket == nullptr)
+ {
+ // New bucket
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end())
+ {
+ Bucket = It->second.get();
+ }
+ else
+ {
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>());
+ Bucket = InsertResult.first->second.get();
+ }
+ }
+
+ // Note that since the underlying IoBuffer is retained, the content type is also
+
+ Bucket->Put(HashKey, Value);
+}
+
+bool
+ZenCacheMemoryLayer::DropBucket(std::string_view InBucket)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ auto It = m_Buckets.find(std::string(InBucket));
+
+ if (It != m_Buckets.end())
+ {
+ CacheBucket& Bucket = *It->second;
+ m_DroppedBuckets.push_back(std::move(It->second));
+ m_Buckets.erase(It);
+ Bucket.Drop();
+ return true;
+ }
+ return false;
+}
+
+void
+ZenCacheMemoryLayer::Drop()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ std::vector<std::unique_ptr<CacheBucket>> Buckets;
+ Buckets.reserve(m_Buckets.size());
+ while (!m_Buckets.empty())
+ {
+ const auto& It = m_Buckets.begin();
+ CacheBucket& Bucket = *It->second;
+ m_DroppedBuckets.push_back(std::move(It->second));
+ m_Buckets.erase(It->first);
+ Bucket.Drop();
+ }
+}
+
+void
+ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx)
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_Buckets)
+ {
+ Kv.second->Scrub(Ctx);
+ }
+}
+
+void
+ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes)
+{
+ using namespace zen::access_tracking;
+
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_Buckets)
+ {
+ std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first];
+ Kv.second->GatherAccessTimes(Bucket);
+ }
+}
+
+void
+ZenCacheMemoryLayer::Reset()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Buckets.clear();
+}
+
+uint64_t
+ZenCacheMemoryLayer::TotalSize() const
+{
+ uint64_t TotalSize{};
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_Buckets)
+ {
+ TotalSize += Kv.second->TotalSize();
+ }
+
+ return TotalSize;
+}
+
+ZenCacheMemoryLayer::Info
+ZenCacheMemoryLayer::GetInfo() const
+{
+ ZenCacheMemoryLayer::Info Info = {.Config = m_Configuration, .TotalSize = TotalSize()};
+
+ RwLock::SharedLockScope _(m_Lock);
+ Info.BucketNames.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Info.BucketNames.push_back(Kv.first);
+ Info.EntryCount += Kv.second->EntryCount();
+ }
+ return Info;
+}
+
+std::optional<ZenCacheMemoryLayer::BucketInfo>
+ZenCacheMemoryLayer::GetBucketInfo(std::string_view Bucket) const
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
+ {
+ return ZenCacheMemoryLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()};
+ }
+ return {};
+}
+
+void
+ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
+{
+ RwLock::SharedLockScope _(m_BucketLock);
+
+ std::vector<IoHash> BadHashes;
+
+ auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) {
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
+ return Error == CbValidateError::None;
+ }
+ if (ContentType == ZenContentType::kCompressedBinary)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ {
+ return false;
+ }
+ if (Hash != RawHash)
+ {
+ return false;
+ }
+ }
+ return true;
+ };
+
+ for (auto& Kv : m_CacheMap)
+ {
+ const BucketPayload& Payload = m_Payloads[Kv.second];
+ if (!ValidateEntry(Kv.first, Payload.Payload.GetContentType(), Payload.Payload))
+ {
+ BadHashes.push_back(Kv.first);
+ }
+ }
+
+ if (!BadHashes.empty())
+ {
+ Ctx.ReportBadCidChunks(BadHashes);
+ }
+}
+
+void
+ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes)
+{
+ RwLock::SharedLockScope _(m_BucketLock);
+ std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [this](const auto& Kv) {
+ return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = m_AccessTimes[Kv.second]};
+ });
+}
+
+bool
+ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ RwLock::SharedLockScope _(m_BucketLock);
+
+ if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end())
+ {
+ uint32_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
+ ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size());
+
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash};
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+
+ return true;
+ }
+
+ return false;
+}
+
+void
+ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ size_t PayloadSize = Value.Value.GetSize();
+ {
+ GcClock::Tick AccessTime = GcClock::TickCount();
+ RwLock::ExclusiveLockScope _(m_BucketLock);
+ if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max())
+ {
+ // No more space in our memory cache!
+ return;
+ }
+ if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end())
+ {
+ uint32_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
+
+ m_TotalSize.fetch_sub(PayloadSize, std::memory_order::relaxed);
+ BucketPayload& Payload = m_Payloads[EntryIndex];
+ Payload.Payload = Value.Value;
+ Payload.RawHash = Value.RawHash;
+ Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize);
+ m_AccessTimes[EntryIndex] = AccessTime;
+ }
+ else
+ {
+ uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size());
+ m_Payloads.emplace_back(
+ BucketPayload{.Payload = Value.Value, .RawSize = gsl::narrow<uint32_t>(Value.RawSize), .RawHash = Value.RawHash});
+ m_AccessTimes.emplace_back(AccessTime);
+ m_CacheMap.insert_or_assign(HashKey, EntryIndex);
+ }
+ ZEN_ASSERT_SLOW(m_Payloads.size() == m_CacheMap.size());
+ ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size());
+ }
+
+ m_TotalSize.fetch_add(PayloadSize, std::memory_order::relaxed);
+}
+
+void
+ZenCacheMemoryLayer::CacheBucket::Drop()
+{
+ RwLock::ExclusiveLockScope _(m_BucketLock);
+ m_CacheMap.clear();
+ m_AccessTimes.clear();
+ m_Payloads.clear();
+ m_TotalSize.store(0);
+}
+
+uint64_t
+ZenCacheMemoryLayer::CacheBucket::EntryCount() const
+{
+ RwLock::SharedLockScope _(m_BucketLock);
+ return static_cast<uint64_t>(m_CacheMap.size());
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero)
+{
+}
+
+ZenCacheDiskLayer::CacheBucket::~CacheBucket()
+{
+}
+
+bool
+ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate)
+{
+ using namespace std::literals;
+
+ m_BlocksBasePath = BucketDir / "blocks";
+ m_BucketDir = BucketDir;
+
+ CreateDirectories(m_BucketDir);
+
+ std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"};
+
+ bool IsNew = false;
+
+ CbObject Manifest = LoadCompactBinaryObject(ManifestPath);
+
+ if (Manifest)
+ {
+ m_BucketId = Manifest["BucketId"sv].AsObjectId();
+ if (m_BucketId == Oid::Zero)
+ {
+ return false;
+ }
+ }
+ else if (AllowCreate)
+ {
+ m_BucketId.Generate();
+
+ CbObjectWriter Writer;
+ Writer << "BucketId"sv << m_BucketId;
+ Manifest = Writer.Save();
+ SaveCompactBinaryObject(ManifestPath, Manifest);
+ IsNew = true;
+ }
+ else
+ {
+ return false;
+ }
+
+ OpenLog(IsNew);
+
+ if (!IsNew)
+ {
+ Stopwatch Timer;
+ const auto _ =
+ MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
+ for (CbFieldView Entry : Manifest["Timestamps"sv])
+ {
+ const CbObjectView Obj = Entry.AsObjectView();
+ const IoHash Key = Obj["Key"sv].AsHash();
+
+ if (auto It = m_Index.find(Key); It != m_Index.end())
+ {
+ size_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
+ m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64();
+ }
+ }
+ for (CbFieldView Entry : Manifest["RawInfo"sv])
+ {
+ const CbObjectView Obj = Entry.AsObjectView();
+ const IoHash Key = Obj["Key"sv].AsHash();
+ if (auto It = m_Index.find(Key); It != m_Index.end())
+ {
+ size_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
+ m_Payloads[EntryIndex].RawHash = Obj["RawHash"sv].AsHash();
+ m_Payloads[EntryIndex].RawSize = Obj["RawSize"sv].AsUInt64();
+ }
+ }
+ }
+
+ return true;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
+{
+ uint64_t LogCount = m_SlogFile.GetLogCount();
+ if (m_LogFlushPosition == LogCount)
+ {
+ return;
+ }
+
+ ZEN_DEBUG("write store snapshot for '{}'", m_BucketDir / m_BucketName);
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
+ m_BucketDir / m_BucketName,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ namespace fs = std::filesystem;
+
+ fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName);
+
+ // Move index away, we keep it if something goes wrong
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+ if (fs::is_regular_file(IndexPath))
+ {
+ fs::rename(IndexPath, STmpIndexPath);
+ }
+
+ try
+ {
+ // Write the current state of the location map to a new index state
+ std::vector<DiskIndexEntry> Entries;
+
+ {
+ Entries.resize(m_Index.size());
+
+ uint64_t EntryIndex = 0;
+ for (auto& Entry : m_Index)
+ {
+ DiskIndexEntry& IndexEntry = Entries[EntryIndex++];
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Location = m_Payloads[Entry.second].Location;
+ }
+ }
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
+ CacheBucketIndexHeader Header = {.EntryCount = Entries.size(),
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
+
+ Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
+
+ ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
+ ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.Close();
+ EntryCount = Entries.size();
+ m_LogFlushPosition = LogCount;
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
+
+ // Restore any previous snapshot
+
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(IndexPath);
+ fs::rename(STmpIndexPath, IndexPath);
+ }
+ }
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+}
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion)
+{
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CacheBucketIndexHeader))
+ {
+ CacheBucketIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) &&
+ (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0))
+ {
+ switch (Header.Version)
+ {
+ case CacheBucketIndexHeader::Version2:
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ if (Header.EntryCount > ExpectedEntryCount)
+ {
+ break;
+ }
+ size_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing {} entries in {}",
+ IndexPath,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ m_PayloadAlignment = Header.PayloadAlignment;
+
+ std::vector<DiskIndexEntry> Entries;
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(),
+ Header.EntryCount * sizeof(DiskIndexEntry),
+ sizeof(CacheBucketIndexHeader));
+
+ m_Payloads.reserve(Header.EntryCount);
+ m_AccessTimes.reserve(Header.EntryCount);
+ m_Index.reserve(Header.EntryCount);
+
+ std::string InvalidEntryReason;
+ for (const DiskIndexEntry& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ size_t EntryIndex = m_Payloads.size();
+ m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ m_Index.insert_or_assign(Entry.Key, EntryIndex);
+ EntryCount++;
+ }
+ OutVersion = CacheBucketIndexHeader::Version2;
+ return Header.LogPosition;
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ ZEN_WARN("skipping invalid index file '{}'", IndexPath);
+ }
+ return 0;
+}
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
+{
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ uint64_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (CasLog.Initialize())
+ {
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+ LogEntryCount = EntryCount - SkipEntryCount;
+ m_Index.reserve(LogEntryCount);
+ uint64_t InvalidEntryCount = 0;
+ CasLog.Replay(
+ [&](const DiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Location.Flags & DiskLocation::kTombStone)
+ {
+ m_Index.erase(Record.Key);
+ return;
+ }
+ if (!ValidateEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ size_t EntryIndex = m_Payloads.size();
+ m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ m_Index.insert_or_assign(Record.Key, EntryIndex);
+ },
+ SkipEntryCount);
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
+ }
+ return LogEntryCount;
+ }
+ }
+ return 0;
+};
+
+void
+ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
+{
+ m_TotalStandaloneSize = 0;
+
+ m_Index.clear();
+ m_Payloads.clear();
+ m_AccessTimes.clear();
+
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+
+ if (IsNew)
+ {
+ fs::remove(LogPath);
+ fs::remove(IndexPath);
+ fs::remove_all(m_BlocksBasePath);
+ }
+
+ uint64_t LogEntryCount = 0;
+ {
+ uint32_t IndexVersion = 0;
+ m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion);
+ if (IndexVersion == 0 && std::filesystem::is_regular_file(IndexPath))
+ {
+ ZEN_WARN("removing invalid index file at '{}'", IndexPath);
+ fs::remove(IndexPath);
+ }
+
+ if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath))
+ {
+ LogEntryCount = ReadLog(LogPath, m_LogFlushPosition);
+ }
+ else
+ {
+ ZEN_WARN("removing invalid cas log at '{}'", LogPath);
+ fs::remove(LogPath);
+ }
+ }
+
+ CreateDirectories(m_BucketDir);
+
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ std::vector<BlockStoreLocation> KnownLocations;
+ KnownLocations.reserve(m_Index.size());
+ for (const auto& Entry : m_Index)
+ {
+ size_t EntryIndex = Entry.second;
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ const DiskLocation& Location = Payload.Location;
+
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
+ continue;
+ }
+ const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment);
+ KnownLocations.push_back(BlockLocation);
+ }
+
+ m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
+ if (IsNew || LogEntryCount > 0)
+ {
+ MakeIndexSnapshot();
+ }
+ // TODO: should validate integrity of container files here
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const
+{
+ char HexString[sizeof(HashKey.Hash) * 2];
+ ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString);
+
+ Path.Append(m_BucketDir);
+ Path.AppendSeparator();
+ Path.Append(L"blob");
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(HexString, HexString + 3);
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(HexString + 3, HexString + 5);
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString));
+}
+
+IoBuffer
+ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const
+{
+ BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment);
+
+ IoBuffer Value = m_BlockStore.TryGetChunk(Location);
+ if (Value)
+ {
+ Value.SetContentType(Loc.GetContentType());
+ }
+
+ return Value;
+}
+
+IoBuffer
+ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const
+{
+ ExtendablePathBuilder<256> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
+
+ RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
+
+ if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath()))
+ {
+ Data.SetContentType(Loc.GetContentType());
+
+ return Data;
+ }
+
+ return {};
+}
+
+bool
+ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ RwLock::SharedLockScope _(m_IndexLock);
+ auto It = m_Index.find(HashKey);
+ if (It == m_Index.end())
+ {
+ return false;
+ }
+ size_t EntryIndex = It.value();
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ DiskLocation Location = Payload.Location;
+ OutValue.RawSize = Payload.RawSize;
+ OutValue.RawHash = Payload.RawHash;
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ // We don't need to hold the index lock when we read a standalone file
+ _.ReleaseNow();
+ OutValue.Value = GetStandaloneCacheValue(Location, HashKey);
+ }
+ else
+ {
+ OutValue.Value = GetInlineCacheValue(Location);
+ }
+ _.ReleaseNow();
+
+ if (!Location.IsFlagSet(DiskLocation::kStructured))
+ {
+ if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0)
+ {
+ if (Location.IsFlagSet(DiskLocation::kCompressed))
+ {
+ (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize);
+ }
+ else
+ {
+ OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
+ OutValue.RawSize = OutValue.Value.GetSize();
+ }
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end())
+ {
+ BucketPayload& WritePayload = m_Payloads[WriteIt.value()];
+ WritePayload.RawHash = OutValue.RawHash;
+ WritePayload.RawSize = OutValue.RawSize;
+
+ m_LogFlushPosition = 0; // Force resave of index on exit
+ }
+ }
+ }
+
+ return (bool)OutValue.Value;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ if (Value.Value.Size() >= m_LargeObjectThreshold)
+ {
+ return PutStandaloneCacheValue(HashKey, Value);
+ }
+ PutInlineCacheValue(HashKey, Value);
+}
+
+bool
+ZenCacheDiskLayer::CacheBucket::Drop()
+{
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+
+ std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks;
+ ShardLocks.reserve(256);
+ for (RwLock& Lock : m_ShardedLocks)
+ {
+ ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock));
+ }
+ m_BlockStore.Close();
+ m_SlogFile.Close();
+
+ bool Deleted = MoveAndDeleteDirectory(m_BucketDir);
+
+ m_Index.clear();
+ m_Payloads.clear();
+ m_AccessTimes.clear();
+ return Deleted;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::Flush()
+{
+ m_BlockStore.Flush();
+
+ RwLock::SharedLockScope _(m_IndexLock);
+ m_SlogFile.Flush();
+ MakeIndexSnapshot();
+ SaveManifest();
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::SaveManifest()
+{
+ using namespace std::literals;
+
+ CbObjectWriter Writer;
+ Writer << "BucketId"sv << m_BucketId;
+
+ if (!m_Index.empty())
+ {
+ Writer.BeginArray("Timestamps"sv);
+ for (auto& Kv : m_Index)
+ {
+ const IoHash& Key = Kv.first;
+ GcClock::Tick AccessTime = m_AccessTimes[Kv.second];
+
+ Writer.BeginObject();
+ Writer << "Key"sv << Key;
+ Writer << "LastAccess"sv << AccessTime;
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+
+ Writer.BeginArray("RawInfo"sv);
+ {
+ for (auto& Kv : m_Index)
+ {
+ const IoHash& Key = Kv.first;
+ const BucketPayload& Payload = m_Payloads[Kv.second];
+ if (Payload.RawHash != IoHash::Zero)
+ {
+ Writer.BeginObject();
+ Writer << "Key"sv << Key;
+ Writer << "RawHash"sv << Payload.RawHash;
+ Writer << "RawSize"sv << Payload.RawSize;
+ Writer.EndObject();
+ }
+ }
+ }
+ Writer.EndArray();
+ }
+
+ SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save());
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
+{
+ std::vector<IoHash> BadKeys;
+ uint64_t ChunkCount{0}, ChunkBytes{0};
+ std::vector<BlockStoreLocation> ChunkLocations;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+
+ auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) {
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
+ return Error == CbValidateError::None;
+ }
+ if (ContentType == ZenContentType::kCompressedBinary)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ {
+ return false;
+ }
+ if (RawHash != Hash)
+ {
+ return false;
+ }
+ }
+ return true;
+ };
+
+ RwLock::SharedLockScope _(m_IndexLock);
+
+ const size_t BlockChunkInitialCount = m_Index.size() / 4;
+ ChunkLocations.reserve(BlockChunkInitialCount);
+ ChunkIndexToChunkHash.reserve(BlockChunkInitialCount);
+
+ for (auto& Kv : m_Index)
+ {
+ const IoHash& HashKey = Kv.first;
+ const BucketPayload& Payload = m_Payloads[Kv.second];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ ++ChunkCount;
+ ChunkBytes += Loc.Size();
+ if (Loc.GetContentType() == ZenContentType::kBinary)
+ {
+ ExtendablePathBuilder<256> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
+
+ RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
+
+ std::error_code Ec;
+ uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec);
+ if (Ec)
+ {
+ BadKeys.push_back(HashKey);
+ }
+ if (size != Loc.Size())
+ {
+ BadKeys.push_back(HashKey);
+ }
+ continue;
+ }
+ IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey);
+ if (!Buffer)
+ {
+ BadKeys.push_back(HashKey);
+ continue;
+ }
+ if (!ValidateEntry(HashKey, Loc.GetContentType(), Buffer))
+ {
+ BadKeys.push_back(HashKey);
+ continue;
+ }
+ }
+ else
+ {
+ ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment));
+ ChunkIndexToChunkHash.push_back(HashKey);
+ continue;
+ }
+ }
+
+ const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ if (!Data)
+ {
+ // ChunkLocation out of range of stored blocks
+ BadKeys.push_back(Hash);
+ return;
+ }
+ IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
+ if (!Buffer)
+ {
+ BadKeys.push_back(Hash);
+ return;
+ }
+ const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
+ ZenContentType ContentType = Payload.Location.GetContentType();
+ if (!ValidateEntry(Hash, ContentType, Buffer))
+ {
+ BadKeys.push_back(Hash);
+ return;
+ }
+ };
+
+ const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ // TODO: Add API to verify compressed buffer and possible structure data without having to memorymap the whole file
+ IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+ if (!Buffer)
+ {
+ BadKeys.push_back(Hash);
+ return;
+ }
+ const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
+ ZenContentType ContentType = Payload.Location.GetContentType();
+ if (!ValidateEntry(Hash, ContentType, Buffer))
+ {
+ BadKeys.push_back(Hash);
+ return;
+ }
+ };
+
+ m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
+
+ _.ReleaseNow();
+
+ Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
+
+ if (!BadKeys.empty())
+ {
+ ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName);
+
+ if (Ctx.RunRecovery())
+ {
+ // Deal with bad chunks by removing them from our lookup map
+
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(BadKeys.size());
+
+ {
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ for (const IoHash& BadKey : BadKeys)
+ {
+ // Log a tombstone and delete the in-memory index for the bad entry
+ const auto It = m_Index.find(BadKey);
+ const BucketPayload& Payload = m_Payloads[It->second];
+ DiskLocation Location = Payload.Location;
+ Location.Flags |= DiskLocation::kTombStone;
+ LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
+ m_Index.erase(BadKey);
+ }
+ }
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ ExtendablePathBuilder<256> Path;
+ BuildPath(Path, Entry.Key);
+ fs::path FilePath = Path.ToPath();
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key));
+ if (fs::is_regular_file(FilePath))
+ {
+ ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8());
+ std::error_code Ec;
+ fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file...
+ }
+ m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ }
+ }
+ m_SlogFile.Append(LogEntries);
+
+ // Clean up m_AccessTimes and m_Payloads vectors
+ {
+ std::vector<BucketPayload> Payloads;
+ std::vector<AccessTime> AccessTimes;
+ IndexMap Index;
+
+ {
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ size_t EntryCount = m_Index.size();
+ Payloads.reserve(EntryCount);
+ AccessTimes.reserve(EntryCount);
+ Index.reserve(EntryCount);
+ for (auto It : m_Index)
+ {
+ size_t EntryIndex = Payloads.size();
+ Payloads.push_back(m_Payloads[EntryIndex]);
+ AccessTimes.push_back(m_AccessTimes[EntryIndex]);
+ Index.insert({It.first, EntryIndex});
+ }
+ m_Index.swap(Index);
+ m_Payloads.swap(Payloads);
+ m_AccessTimes.swap(AccessTimes);
+ }
+ }
+ }
+ }
+
+ // Let whomever it concerns know about the bad chunks. This could
+ // be used to invalidate higher level data structures more efficiently
+ // than a full validation pass might be able to do
+ Ctx.ReportBadCidChunks(BadKeys);
+
+ ZEN_INFO("cache bucket scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
+{
+ ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences");
+
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})",
+ m_BucketDir / m_BucketName,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs));
+ });
+
+ const GcClock::TimePoint ExpireTime = GcCtx.ExpireTime();
+
+ const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
+
+ IndexMap Index;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketPayload> Payloads;
+ {
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ Index = m_Index;
+ AccessTimes = m_AccessTimes;
+ Payloads = m_Payloads;
+ }
+
+ std::vector<IoHash> ExpiredKeys;
+ ExpiredKeys.reserve(1024);
+
+ std::vector<IoHash> Cids;
+ Cids.reserve(1024);
+
+ for (const auto& Entry : Index)
+ {
+ const IoHash& Key = Entry.first;
+ GcClock::Tick AccessTime = AccessTimes[Entry.second];
+ if (AccessTime < ExpireTicks)
+ {
+ ExpiredKeys.push_back(Key);
+ continue;
+ }
+
+ const DiskLocation& Loc = Payloads[Entry.second].Location;
+
+ if (Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ if (Cids.size() > 1024)
+ {
+ GcCtx.AddRetainedCids(Cids);
+ Cids.clear();
+ }
+
+ IoBuffer Buffer;
+ {
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ // We don't need to hold the index lock when we read a standalone file
+ __.ReleaseNow();
+ if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer)
+ {
+ continue;
+ }
+ }
+ else if (Buffer = GetInlineCacheValue(Loc); !Buffer)
+ {
+ continue;
+ }
+ }
+
+ ZEN_ASSERT(Buffer);
+ ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
+ CbObject Obj(SharedBuffer{Buffer});
+ Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
+ }
+ }
+
+ GcCtx.AddRetainedCids(Cids);
+ GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys));
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
+{
+ ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage");
+
+ ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir / m_BucketName);
+
+ Stopwatch TotalTimer;
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = 0;
+ uint64_t DeletedSize = 0;
+ uint64_t OldTotalSize = TotalSize();
+
+ std::unordered_set<IoHash> DeletedChunks;
+ uint64_t MovedCount = 0;
+
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG(
+ "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved "
+ "{} "
+ "of {} "
+ "entires ({}).",
+ m_BucketDir / m_BucketName,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs),
+ NiceBytes(DeletedSize),
+ DeletedChunks.size(),
+ MovedCount,
+ TotalChunkCount,
+ NiceBytes(OldTotalSize));
+ RwLock::SharedLockScope _(m_IndexLock);
+ SaveManifest();
+ });
+
+ m_SlogFile.Flush();
+
+ std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string());
+ std::vector<IoHash> DeleteCacheKeys;
+ DeleteCacheKeys.reserve(ExpiredCacheKeys.size());
+ GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) {
+ if (Keep)
+ {
+ return;
+ }
+ DeleteCacheKeys.push_back(ChunkHash);
+ });
+ if (DeleteCacheKeys.empty())
+ {
+ ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName);
+ return;
+ }
+
+ auto __ = MakeGuard([&]() {
+ if (!DeletedChunks.empty())
+ {
+ // Clean up m_AccessTimes and m_Payloads vectors
+ std::vector<BucketPayload> Payloads;
+ std::vector<AccessTime> AccessTimes;
+ IndexMap Index;
+
+ {
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ size_t EntryCount = m_Index.size();
+ Payloads.reserve(EntryCount);
+ AccessTimes.reserve(EntryCount);
+ Index.reserve(EntryCount);
+ for (auto It : m_Index)
+ {
+ size_t EntryIndex = Payloads.size();
+ Payloads.push_back(m_Payloads[EntryIndex]);
+ AccessTimes.push_back(m_AccessTimes[EntryIndex]);
+ Index.insert({It.first, EntryIndex});
+ }
+ m_Index.swap(Index);
+ m_Payloads.swap(Payloads);
+ m_AccessTimes.swap(AccessTimes);
+ }
+ GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end()));
+ }
+ });
+
+ std::vector<DiskIndexEntry> ExpiredStandaloneEntries;
+ IndexMap Index;
+ BlockStore::ReclaimSnapshotState BlockStoreState;
+ {
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_Index.empty())
+ {
+ ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName);
+ return;
+ }
+ BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
+
+ SaveManifest();
+ Index = m_Index;
+
+ for (const IoHash& Key : DeleteCacheKeys)
+ {
+ if (auto It = Index.find(Key); It != Index.end())
+ {
+ const BucketPayload& Payload = m_Payloads[It->second];
+ DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location};
+ if (Entry.Location.Flags & DiskLocation::kStandaloneFile)
+ {
+ Entry.Location.Flags |= DiskLocation::kTombStone;
+ ExpiredStandaloneEntries.push_back(Entry);
+ }
+ }
+ }
+ if (GcCtx.IsDeletionMode())
+ {
+ for (const auto& Entry : ExpiredStandaloneEntries)
+ {
+ m_Index.erase(Entry.Key);
+ m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ DeletedChunks.insert(Entry.Key);
+ }
+ m_SlogFile.Append(ExpiredStandaloneEntries);
+ }
+ }
+
+ if (GcCtx.IsDeletionMode())
+ {
+ std::error_code Ec;
+ ExtendablePathBuilder<256> Path;
+
+ for (const auto& Entry : ExpiredStandaloneEntries)
+ {
+ const IoHash& Key = Entry.Key;
+ const DiskLocation& Loc = Entry.Location;
+
+ Path.Reset();
+ BuildPath(Path, Key);
+ fs::path FilePath = Path.ToPath();
+
+ {
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_Index.contains(Key))
+ {
+ // Someone added it back, let the file on disk be
+ ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8());
+ continue;
+ }
+ __.ReleaseNow();
+
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(Key));
+ if (fs::is_regular_file(FilePath))
+ {
+ ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
+ fs::remove(FilePath, Ec);
+ }
+ }
+
+ if (Ec)
+ {
+ ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
+ Ec.clear();
+ DiskLocation RestoreLocation = Loc;
+ RestoreLocation.Flags &= ~DiskLocation::kTombStone;
+
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ if (m_Index.contains(Key))
+ {
+ continue;
+ }
+ m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation});
+ size_t EntryIndex = m_Payloads.size();
+ m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ m_Index.insert({Key, EntryIndex});
+ m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed);
+ DeletedChunks.erase(Key);
+ continue;
+ }
+ DeletedSize += Entry.Location.Size();
+ }
+ }
+
+ TotalChunkCount = Index.size();
+
+ std::vector<IoHash> TotalChunkHashes;
+ TotalChunkHashes.reserve(TotalChunkCount);
+ for (const auto& Entry : Index)
+ {
+ const DiskLocation& Location = m_Payloads[Entry.second].Location;
+
+ if (Location.Flags & DiskLocation::kStandaloneFile)
+ {
+ continue;
+ }
+ TotalChunkHashes.push_back(Entry.first);
+ }
+
+ if (TotalChunkHashes.empty())
+ {
+ return;
+ }
+ TotalChunkCount = TotalChunkHashes.size();
+
+ std::vector<BlockStoreLocation> ChunkLocations;
+ BlockStore::ChunkIndexArray KeepChunkIndexes;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
+
+ GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
+ auto KeyIt = Index.find(ChunkHash);
+ const DiskLocation& DiskLocation = m_Payloads[KeyIt->second].Location;
+ BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment);
+ size_t ChunkIndex = ChunkLocations.size();
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ if (Keep)
+ {
+ KeepChunkIndexes.push_back(ChunkIndex);
+ }
+ });
+
+ size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size();
+
+ const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ if (!PerformDelete)
+ {
+ m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
+ uint64_t CurrentTotalSize = TotalSize();
+ ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}",
+ m_BucketDir / m_BucketName,
+ DeleteCount,
+ TotalChunkCount,
+ NiceBytes(CurrentTotalSize));
+ return;
+ }
+
+ m_BlockStore.ReclaimSpace(
+ BlockStoreState,
+ ChunkLocations,
+ KeepChunkIndexes,
+ m_PayloadAlignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
+ for (const auto& Entry : MovedChunks)
+ {
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]];
+ const DiskLocation& OldDiskLocation = OldPayload.Location;
+ LogEntries.push_back(
+ {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())});
+ }
+ for (const size_t ChunkIndex : RemovedChunks)
+ {
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]];
+ const DiskLocation& OldDiskLocation = OldPayload.Location;
+ LogEntries.push_back({.Key = ChunkHash,
+ .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment),
+ m_PayloadAlignment,
+ OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
+ DeletedChunks.insert(ChunkHash);
+ }
+
+ m_SlogFile.Append(LogEntries);
+ m_SlogFile.Flush();
+ {
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ if (Entry.Location.GetFlags() & DiskLocation::kTombStone)
+ {
+ m_Index.erase(Entry.Key);
+ continue;
+ }
+ m_Payloads[m_Index[Entry.Key]].Location = Entry.Location;
+ }
+ }
+ },
+ [&]() { return GcCtx.CollectSmallObjects(); });
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes)
+{
+ using namespace access_tracking;
+
+ for (const KeyAccessTime& KeyTime : AccessTimes)
+ {
+ if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end())
+ {
+ size_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
+ m_AccessTimes[EntryIndex] = KeyTime.LastAccess;
+ }
+ }
+}
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::EntryCount() const
+{
+ RwLock::SharedLockScope _(m_IndexLock);
+ return static_cast<uint64_t>(m_Index.size());
+}
+
+CacheValueDetails::ValueDetails
+ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index) const
+{
+ std::vector<IoHash> Attachments;
+ const BucketPayload& Payload = m_Payloads[Index];
+ if (Payload.Location.IsFlagSet(DiskLocation::kStructured))
+ {
+ IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location, Key)
+ : GetInlineCacheValue(Payload.Location);
+ CbObject Obj(SharedBuffer{Value});
+ Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); });
+ }
+ return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(),
+ .RawSize = Payload.RawSize,
+ .RawHash = Payload.RawHash,
+ .LastAccess = m_AccessTimes[Index],
+ .Attachments = std::move(Attachments),
+ .ContentType = Payload.Location.GetContentType()};
+}
+
+CacheValueDetails::BucketDetails
+ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const
+{
+ CacheValueDetails::BucketDetails Details;
+ RwLock::SharedLockScope _(m_IndexLock);
+ if (ValueFilter.empty())
+ {
+ Details.Values.reserve(m_Index.size());
+ for (const auto& It : m_Index)
+ {
+ Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second));
+ }
+ }
+ else
+ {
+ IoHash Key = IoHash::FromHexString(ValueFilter);
+ if (auto It = m_Index.find(Key); It != m_Index.end())
+ {
+ Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second));
+ }
+ }
+ return Details;
+}
+
+void
+ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_Buckets)
+ {
+ CacheBucket& Bucket = *Kv.second;
+ Bucket.CollectGarbage(GcCtx);
+ }
+}
+
+void
+ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes)
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (const auto& Kv : AccessTimes.Buckets)
+ {
+ if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end())
+ {
+ CacheBucket& Bucket = *It->second;
+ Bucket.UpdateAccessTimes(Kv.second);
+ }
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ uint64_t NewFileSize = Value.Value.Size();
+
+ TemporaryFile DataFile;
+
+ std::error_code Ec;
+ DataFile.CreateTemporary(m_BucketDir.c_str(), Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir));
+ }
+
+ bool CleanUpTempFile = false;
+ auto __ = MakeGuard([&] {
+ if (CleanUpTempFile)
+ {
+ std::error_code Ec;
+ std::filesystem::remove(DataFile.GetPath(), Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'",
+ DataFile.GetPath(),
+ m_BucketDir,
+ Ec.message());
+ }
+ }
+ });
+
+ DataFile.WriteAll(Value.Value, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec,
+ fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'",
+ NiceBytes(NewFileSize),
+ DataFile.GetPath().string(),
+ m_BucketDir));
+ }
+
+ ExtendablePathBuilder<256> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
+ std::filesystem::path FsPath{DataFilePath.ToPath()};
+
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
+
+ // We do a speculative remove of the file instead of probing with a exists call and check the error code instead
+ std::filesystem::remove(FsPath, Ec);
+ if (Ec)
+ {
+ if (Ec.value() != ENOENT)
+ {
+ ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message());
+ Sleep(100);
+ Ec.clear();
+ std::filesystem::remove(FsPath, Ec);
+ if (Ec && Ec.value() != ENOENT)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir));
+ }
+ }
+ }
+
+ DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
+ if (Ec)
+ {
+ CreateDirectories(FsPath.parent_path());
+ Ec.clear();
+
+ // Try again
+ DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retrying.",
+ FsPath,
+ DataFile.GetPath(),
+ m_BucketDir,
+ Ec.message());
+ Sleep(100);
+ Ec.clear();
+ DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
+ if (Ec)
+ {
+ throw std::system_error(
+ Ec,
+ fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir));
+ }
+ }
+ }
+
+ // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file
+ // will be disabled as the file handle has already been closed
+ CleanUpTempFile = false;
+
+ uint8_t EntryFlags = DiskLocation::kStandaloneFile;
+
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ EntryFlags |= DiskLocation::kStructured;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ EntryFlags |= DiskLocation::kCompressed;
+ }
+
+ DiskLocation Loc(NewFileSize, EntryFlags);
+
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It == m_Index.end())
+ {
+ // Previously unknown object
+ size_t EntryIndex = m_Payloads.size();
+ m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ m_Index.insert_or_assign(HashKey, EntryIndex);
+ }
+ else
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+ size_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
+ m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash};
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed);
+ }
+
+ m_SlogFile.Append({.Key = HashKey, .Location = Loc});
+ m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed);
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ uint8_t EntryFlags = 0;
+
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ EntryFlags |= DiskLocation::kStructured;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ EntryFlags |= DiskLocation::kCompressed;
+ }
+
+ m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) {
+ DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
+ m_SlogFile.Append({.Key = HashKey, .Location = Location});
+
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+ // this would requiring comparing contents on disk unless we add a
+ // content hash to the index entry
+ size_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
+ m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ }
+ else
+ {
+ size_t EntryIndex = m_Payloads.size();
+ m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ m_Index.insert_or_assign(HashKey, EntryIndex);
+ }
+ });
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir)
+{
+}
+
+ZenCacheDiskLayer::~ZenCacheDiskLayer() = default;
+
+bool
+ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ const auto BucketName = std::string(InBucket);
+ CacheBucket* Bucket = nullptr;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ auto It = m_Buckets.find(BucketName);
+
+ if (It != m_Buckets.end())
+ {
+ Bucket = It->second.get();
+ }
+ }
+
+ if (Bucket == nullptr)
+ {
+ // Bucket needs to be opened/created
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
+ {
+ Bucket = It->second.get();
+ }
+ else
+ {
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
+ Bucket = InsertResult.first->second.get();
+
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= BucketName;
+
+ if (!Bucket->OpenOrCreate(BucketPath))
+ {
+ m_Buckets.erase(InsertResult.first);
+ return false;
+ }
+ }
+ }
+
+ ZEN_ASSERT(Bucket != nullptr);
+ return Bucket->Get(HashKey, OutValue);
+}
+
+void
+ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ const auto BucketName = std::string(InBucket);
+ CacheBucket* Bucket = nullptr;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ auto It = m_Buckets.find(BucketName);
+
+ if (It != m_Buckets.end())
+ {
+ Bucket = It->second.get();
+ }
+ }
+
+ if (Bucket == nullptr)
+ {
+ // New bucket needs to be created
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
+ {
+ Bucket = It->second.get();
+ }
+ else
+ {
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
+ Bucket = InsertResult.first->second.get();
+
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= BucketName;
+
+ try
+ {
+ if (!Bucket->OpenOrCreate(BucketPath))
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+ m_Buckets.erase(InsertResult.first);
+ return;
+ }
+ }
+ catch (const std::exception& Err)
+ {
+ ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ return;
+ }
+ }
+ }
+
+ ZEN_ASSERT(Bucket != nullptr);
+
+ Bucket->Put(HashKey, Value);
+}
+
+void
+ZenCacheDiskLayer::DiscoverBuckets()
+{
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent);
+
+ // Initialize buckets
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ for (const std::filesystem::path& BucketPath : DirContent.Directories)
+ {
+ const std::string BucketName = PathToUtf8(BucketPath.stem());
+ // New bucket needs to be created
+ if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
+ {
+ continue;
+ }
+
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
+ CacheBucket& Bucket = *InsertResult.first->second;
+
+ try
+ {
+ if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false))
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+
+ m_Buckets.erase(InsertResult.first);
+ continue;
+ }
+ }
+ catch (const std::exception& Err)
+ {
+ ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ return;
+ }
+ ZEN_INFO("Discovered bucket '{}'", BucketName);
+ }
+}
+
+bool
+ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ auto It = m_Buckets.find(std::string(InBucket));
+
+ if (It != m_Buckets.end())
+ {
+ CacheBucket& Bucket = *It->second;
+ m_DroppedBuckets.push_back(std::move(It->second));
+ m_Buckets.erase(It);
+
+ return Bucket.Drop();
+ }
+
+ // Make sure we remove the folder even if we don't know about the bucket
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= std::string(InBucket);
+ return MoveAndDeleteDirectory(BucketPath);
+}
+
+bool
+ZenCacheDiskLayer::Drop()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ std::vector<std::unique_ptr<CacheBucket>> Buckets;
+ Buckets.reserve(m_Buckets.size());
+ while (!m_Buckets.empty())
+ {
+ const auto& It = m_Buckets.begin();
+ CacheBucket& Bucket = *It->second;
+ m_DroppedBuckets.push_back(std::move(It->second));
+ m_Buckets.erase(It->first);
+ if (!Bucket.Drop())
+ {
+ return false;
+ }
+ }
+ return MoveAndDeleteDirectory(m_RootDir);
+}
+
+void
+ZenCacheDiskLayer::Flush()
+{
+ std::vector<CacheBucket*> Buckets;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ Buckets.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ CacheBucket* Bucket = Kv.second.get();
+ Buckets.push_back(Bucket);
+ }
+ }
+
+ for (auto& Bucket : Buckets)
+ {
+ Bucket->Flush();
+ }
+}
+
+void
+ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_Buckets)
+ {
+ CacheBucket& Bucket = *Kv.second;
+ Bucket.Scrub(Ctx);
+ }
+}
+
+void
+ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx)
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_Buckets)
+ {
+ CacheBucket& Bucket = *Kv.second;
+ Bucket.GatherReferences(GcCtx);
+ }
+}
+
+uint64_t
+ZenCacheDiskLayer::TotalSize() const
+{
+ uint64_t TotalSize{};
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_Buckets)
+ {
+ TotalSize += Kv.second->TotalSize();
+ }
+
+ return TotalSize;
+}
+
+ZenCacheDiskLayer::Info
+ZenCacheDiskLayer::GetInfo() const
+{
+ ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir}, .TotalSize = TotalSize()};
+
+ RwLock::SharedLockScope _(m_Lock);
+ Info.BucketNames.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Info.BucketNames.push_back(Kv.first);
+ Info.EntryCount += Kv.second->EntryCount();
+ }
+ return Info;
+}
+
+std::optional<ZenCacheDiskLayer::BucketInfo>
+ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
+ {
+ return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()};
+ }
+ return {};
+}
+
+CacheValueDetails::NamespaceDetails
+ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const
+{
+ RwLock::SharedLockScope _(m_Lock);
+ CacheValueDetails::NamespaceDetails Details;
+ if (BucketFilter.empty())
+ {
+ Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1);
+ for (auto& Kv : m_Buckets)
+ {
+ Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter);
+ }
+ }
+ else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end())
+ {
+ Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter);
+ }
+ return Details;
+}
+
+//////////////////////////// ZenCacheStore
+
+static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc";
+
+ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) : m_Gc(Gc), m_Configuration(Configuration)
+{
+ CreateDirectories(m_Configuration.BasePath);
+
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
+
+ std::vector<std::string> Namespaces;
+ for (const std::filesystem::path& DirPath : DirContent.Directories)
+ {
+ std::string DirName = PathToUtf8(DirPath.filename());
+ if (DirName.starts_with(NamespaceDiskPrefix))
+ {
+ Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length()));
+ continue;
+ }
+ }
+
+ ZEN_INFO("Found {} namespaces in '{}'", Namespaces.size(), m_Configuration.BasePath);
+
+ if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end())
+ {
+ // default (unspecified) and ue4-ddc namespace points to the same namespace instance
+
+ std::filesystem::path DefaultNamespaceFolder =
+ m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
+ CreateDirectories(DefaultNamespaceFolder);
+ Namespaces.push_back(std::string(UE4DDCNamespaceName));
+ }
+
+ for (const std::string& NamespaceName : Namespaces)
+ {
+ m_Namespaces[NamespaceName] =
+ std::make_unique<ZenCacheNamespace>(Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName));
+ }
+}
+
+ZenCacheStore::~ZenCacheStore()
+{
+ m_Namespaces.clear();
+}
+
+bool
+ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ return Store->Get(Bucket, HashKey, OutValue);
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString());
+
+ return false;
+}
+
+void
+ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ return Store->Put(Bucket, HashKey, Value);
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString());
+}
+
+bool
+ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket)
+{
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ return Store->DropBucket(Bucket);
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropBucket, bucket '{}'", Namespace, Bucket);
+ return false;
+}
+
+bool
+ZenCacheStore::DropNamespace(std::string_view InNamespace)
+{
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end())
+ {
+ ZenCacheNamespace& Namespace = *It->second;
+ m_DroppedNamespaces.push_back(std::move(It->second));
+ m_Namespaces.erase(It);
+ return Namespace.Drop();
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace);
+ return false;
+}
+
+void
+ZenCacheStore::Flush()
+{
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); });
+}
+
+void
+ZenCacheStore::Scrub(ScrubContext& Ctx)
+{
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); });
+}
+
+CacheValueDetails
+ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter,
+ const std::string_view BucketFilter,
+ const std::string_view ValueFilter) const
+{
+ CacheValueDetails Details;
+ if (NamespaceFilter.empty())
+ {
+ IterateNamespaces([&](std::string_view Namespace, ZenCacheNamespace& Store) {
+ Details.Namespaces[std::string(Namespace)] = Store.GetValueDetails(BucketFilter, ValueFilter);
+ });
+ }
+ else if (const ZenCacheNamespace* Store = FindNamespace(NamespaceFilter); Store != nullptr)
+ {
+ Details.Namespaces[std::string(NamespaceFilter)] = Store->GetValueDetails(BucketFilter, ValueFilter);
+ }
+ return Details;
+}
+
+ZenCacheNamespace*
+ZenCacheStore::GetNamespace(std::string_view Namespace)
+{
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ if (Namespace == DefaultNamespace)
+ {
+ if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ }
+ _.ReleaseNow();
+
+ if (!m_Configuration.AllowAutomaticCreationOfNamespaces)
+ {
+ return nullptr;
+ }
+
+ RwLock::ExclusiveLockScope __(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+
+ auto NewNamespace = m_Namespaces.insert_or_assign(
+ std::string(Namespace),
+ std::make_unique<ZenCacheNamespace>(m_Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace)));
+ return NewNamespace.first->second.get();
+}
+
+const ZenCacheNamespace*
+ZenCacheStore::FindNamespace(std::string_view Namespace) const
+{
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ if (Namespace == DefaultNamespace)
+ {
+ if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ }
+ return nullptr;
+}
+
+void
+ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const
+{
+ std::vector<std::pair<std::string, ZenCacheNamespace&>> Namespaces;
+ {
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ Namespaces.reserve(m_Namespaces.size());
+ for (const auto& Entry : m_Namespaces)
+ {
+ if (Entry.first == DefaultNamespace)
+ {
+ continue;
+ }
+ Namespaces.push_back({Entry.first, *Entry.second});
+ }
+ }
+ for (auto& Entry : Namespaces)
+ {
+ Callback(Entry.first, Entry.second);
+ }
+}
+
+GcStorageSize
+ZenCacheStore::StorageSize() const
+{
+ GcStorageSize Size;
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) {
+ GcStorageSize StoreSize = Store.StorageSize();
+ Size.MemorySize += StoreSize.MemorySize;
+ Size.DiskSize += StoreSize.DiskSize;
+ });
+ return Size;
+}
+
+ZenCacheStore::Info
+ZenCacheStore::GetInfo() const
+{
+ ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()};
+
+ IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) {
+ Info.NamespaceNames.push_back(std::string(NamespaceName));
+ ZenCacheNamespace::Info NamespaceInfo = Namespace.GetInfo();
+ Info.DiskEntryCount += NamespaceInfo.DiskLayerInfo.EntryCount;
+ Info.MemoryEntryCount += NamespaceInfo.MemoryLayerInfo.EntryCount;
+ });
+
+ return Info;
+}
+
+std::optional<ZenCacheNamespace::Info>
+ZenCacheStore::GetNamespaceInfo(std::string_view NamespaceName)
+{
+ if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace)
+ {
+ return Namespace->GetInfo();
+ }
+ return {};
+}
+
+std::optional<ZenCacheNamespace::BucketInfo>
+ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view BucketName)
+{
+ if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace)
+ {
+ return Namespace->GetBucketInfo(BucketName);
+ }
+ return {};
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+using namespace std::literals;
+
+namespace testutils {
+ IoHash CreateKey(size_t KeyValue) { return IoHash::HashBuffer(&KeyValue, sizeof(size_t)); }
+
+ IoBuffer CreateBinaryCacheValue(uint64_t Size)
+ {
+ static std::random_device rd;
+ static std::mt19937 g(rd());
+
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
+ {
+ Values[Idx] = static_cast<uint8_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
+
+ IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size());
+ Buf.SetContentType(ZenContentType::kBinary);
+ return Buf;
+ };
+
+} // namespace testutils
+
+TEST_CASE("z$.store")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+
+ const int kIterationCount = 100;
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ const IoHash Key = IoHash::HashBuffer(&i, sizeof i);
+
+ CbObjectWriter Cbo;
+ Cbo << "hey" << i;
+ CbObject Obj = Cbo.Save();
+
+ ZenCacheValue Value;
+ Value.Value = Obj.GetBuffer().AsIoBuffer();
+ Value.Value.SetContentType(ZenContentType::kCbObject);
+
+ Zcs.Put("test_bucket"sv, Key, Value);
+ }
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ const IoHash Key = IoHash::HashBuffer(&i, sizeof i);
+
+ ZenCacheValue Value;
+ Zcs.Get("test_bucket"sv, Key, /* out */ Value);
+
+ REQUIRE(Value.Value);
+ CHECK(Value.Value.GetContentType() == ZenContentType::kCbObject);
+ CHECK_EQ(ValidateCompactBinary(Value.Value, CbValidateMode::All), CbValidateError::None);
+ CbObject Obj = LoadCompactBinaryObject(Value.Value);
+ CHECK_EQ(Obj["hey"].AsInt32(), i);
+ }
+}
+
+TEST_CASE("z$.size")
+{
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ SUBCASE("mem/disklayer")
+ {
+ const size_t Count = 16;
+ ScopedTemporaryDirectory TempDir;
+
+ GcStorageSize CacheSize;
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+
+ CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ for (size_t Key = 0; Key < Count; ++Key)
+ {
+ const size_t Bucket = Key % 4;
+ Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), ZenCacheValue{.Value = Buffer});
+ }
+
+ CacheSize = Zcs.StorageSize();
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize);
+ }
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+
+ const GcStorageSize SerializedSize = Zcs.StorageSize();
+ CHECK_EQ(SerializedSize.MemorySize, 0);
+ CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
+
+ for (size_t Bucket = 0; Bucket < 4; ++Bucket)
+ {
+ Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
+ }
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ }
+ }
+
+ SUBCASE("disklayer")
+ {
+ const size_t Count = 16;
+ ScopedTemporaryDirectory TempDir;
+
+ GcStorageSize CacheSize;
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+
+ CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ for (size_t Key = 0; Key < Count; ++Key)
+ {
+ const size_t Bucket = Key % 4;
+ Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer});
+ }
+
+ CacheSize = Zcs.StorageSize();
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
+ CHECK_EQ(0, CacheSize.MemorySize);
+ }
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+
+ const GcStorageSize SerializedSize = Zcs.StorageSize();
+ CHECK_EQ(SerializedSize.MemorySize, 0);
+ CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
+
+ for (size_t Bucket = 0; Bucket < 4; ++Bucket)
+ {
+ Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
+ }
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ }
+ }
+}
+
+TEST_CASE("z$.gc")
+{
+ using namespace testutils;
+
+ SUBCASE("gather references does NOT add references for expired cache entries")
+ {
+ ScopedTemporaryDirectory TempDir;
+ std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)};
+
+ const auto CollectAndFilter = [](GcManager& Gc,
+ GcClock::TimePoint Time,
+ GcClock::Duration MaxDuration,
+ std::span<const IoHash> Cids,
+ std::vector<IoHash>& OutKeep) {
+ GcContext GcCtx(Time - MaxDuration);
+ Gc.CollectGarbage(GcCtx);
+ OutKeep.clear();
+ GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); });
+ };
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ const auto Bucket = "teardrinker"sv;
+
+ // Create a cache record
+ const IoHash Key = CreateKey(42);
+ CbObjectWriter Record;
+ Record << "Key"sv
+ << "SomeRecord"sv;
+
+ for (size_t Idx = 0; auto& Cid : Cids)
+ {
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid);
+ }
+
+ IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ Zcs.Put(Bucket, Key, {.Value = Buffer});
+
+ std::vector<IoHash> Keep;
+
+ // Collect garbage with 1 hour max cache duration
+ {
+ CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(Cids.size(), Keep.size());
+ }
+
+ // Move forward in time
+ {
+ CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(0, Keep.size());
+ }
+ }
+
+ // Expect timestamps to be serialized
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ std::vector<IoHash> Keep;
+
+ // Collect garbage with 1 hour max cache duration
+ {
+ CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(3, Keep.size());
+ }
+
+ // Move forward in time
+ {
+ CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(0, Keep.size());
+ }
+ }
+ }
+
+ SUBCASE("gc removes standalone values")
+ {
+ ScopedTemporaryDirectory TempDir;
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ const auto Bucket = "fortysixandtwo"sv;
+ const GcClock::TimePoint CurrentTime = GcClock::Now();
+
+ std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
+
+ for (const auto& Key : Keys)
+ {
+ IoBuffer Value = testutils::CreateBinaryCacheValue(128 << 10);
+ Zcs.Put(Bucket, Key, {.Value = Value});
+ }
+
+ {
+ GcContext GcCtx(CurrentTime - std::chrono::hours(46));
+
+ Gc.CollectGarbage(GcCtx);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(Exists);
+ }
+ }
+
+ // Move forward in time and collect again
+ {
+ GcContext GcCtx(CurrentTime + std::chrono::minutes(2));
+ Gc.CollectGarbage(GcCtx);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(!Exists);
+ }
+
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ }
+ }
+
+ SUBCASE("gc removes small objects")
+ {
+ ScopedTemporaryDirectory TempDir;
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ const auto Bucket = "rightintwo"sv;
+
+ std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
+
+ for (const auto& Key : Keys)
+ {
+ IoBuffer Value = testutils::CreateBinaryCacheValue(128);
+ Zcs.Put(Bucket, Key, {.Value = Value});
+ }
+
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(2));
+ GcCtx.CollectSmallObjects(true);
+
+ Gc.CollectGarbage(GcCtx);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(Exists);
+ }
+ }
+
+ // Move forward in time and collect again
+ {
+ GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2));
+ GcCtx.CollectSmallObjects(true);
+
+ Zcs.Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(!Exists);
+ }
+
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ }
+ }
+}
+
+TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
+{
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 8192;
+
+ struct Chunk
+ {
+ std::string Bucket;
+ IoBuffer Buffer;
+ };
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks;
+ Chunks.reserve(kChunkCount);
+
+ const std::string Bucket1 = "rightinone";
+ const std::string Bucket2 = "rightintwo";
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ while (true)
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ break;
+ }
+ while (true)
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ break;
+ }
+ }
+
+ CreateDirectories(TempDir.Path());
+
+ WorkerThreadPool ThreadPool(4);
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path());
+
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer});
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+
+ const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
+ CHECK_LE(kChunkSize * Chunks.size(), TotalSize);
+
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
+ std::string Bucket = Chunk.second.Bucket;
+ IoHash ChunkHash = Chunk.first;
+ ZenCacheValue CacheValue;
+
+ CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
+ IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+ std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes;
+ GcChunkHashes.reserve(Chunks.size());
+ for (const auto& Chunk : Chunks)
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ {
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks;
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ }
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ }
+ }
+
+ std::atomic<size_t> WorkCompleted = 0;
+ std::atomic_uint32_t AddedChunkCount = 0;
+ for (const auto& Chunk : NewChunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer});
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ }
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (AddedChunkCount.load() < NewChunks.size())
+ {
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ }
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.AddRetainedCids(KeepHashes);
+ Zcs.CollectGarbage(GcCtx);
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+
+ while (WorkCompleted < NewChunks.size() + Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ {
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ }
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.AddRetainedCids(KeepHashes);
+ Zcs.CollectGarbage(GcCtx);
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+ }
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : GcChunkHashes)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < GcChunkHashes.size())
+ {
+ Sleep(1);
+ }
+ }
+ }
+}
+
+TEST_CASE("z$.namespaces")
+{
+ using namespace testutils;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ ScopedTemporaryDirectory TempDir;
+ CreateDirectories(TempDir.Path());
+
+ IoHash Key1;
+ IoHash Key2;
+ {
+ GcManager Gc;
+ ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false});
+ const auto Bucket = "teardrinker"sv;
+ const auto CustomNamespace = "mynamespace"sv;
+
+ // Create a cache record
+ Key1 = CreateKey(42);
+ CbObject CacheValue = CreateCacheValue(4096);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue);
+
+ ZenCacheValue GetValue;
+ CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
+ CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue));
+
+ // This should just be dropped as we don't allow creating of namespaces on the fly
+ Zcs.Put(CustomNamespace, Bucket, Key1, PutValue);
+ CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue));
+ }
+
+ {
+ GcManager Gc;
+ ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
+ const auto Bucket = "teardrinker"sv;
+ const auto CustomNamespace = "mynamespace"sv;
+
+ Key2 = CreateKey(43);
+ CbObject CacheValue2 = CreateCacheValue(4096);
+
+ IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
+ Buffer2.SetContentType(ZenContentType::kCbObject);
+ ZenCacheValue PutValue2 = {.Value = Buffer2};
+ Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2);
+
+ ZenCacheValue GetValue;
+ CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
+ CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
+ CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue));
+ CHECK(Zcs.Get(CustomNamespace, Bucket, Key2, GetValue));
+ }
+}
+
+TEST_CASE("z$.drop.bucket")
+{
+ using namespace testutils;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ ScopedTemporaryDirectory TempDir;
+ CreateDirectories(TempDir.Path());
+
+ IoHash Key1;
+ IoHash Key2;
+
+ auto PutValue =
+ [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
+ // Create a cache record
+ IoHash Key = CreateKey(KeyIndex);
+ CbObject CacheValue = CreateCacheValue(Size);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(Namespace, Bucket, Key, PutValue);
+ return Key;
+ };
+ auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
+ ZenCacheValue GetValue;
+ Zcs.Get(Namespace, Bucket, Key, GetValue);
+ return GetValue;
+ };
+ WorkerThreadPool Workers(1);
+ {
+ GcManager Gc;
+ ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
+ const auto Bucket = "teardrinker"sv;
+ const auto Namespace = "mynamespace"sv;
+
+ Key1 = PutValue(Zcs, Namespace, Bucket, 42, 4096);
+ Key2 = PutValue(Zcs, Namespace, Bucket, 43, 2048);
+
+ ZenCacheValue Value1 = GetValue(Zcs, Namespace, Bucket, Key1);
+ CHECK(Value1.Value);
+
+ std::atomic_bool WorkComplete = false;
+ Workers.ScheduleWork([&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ WorkComplete = true;
+ });
+ // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
+ // Our DropBucket execution blocks any incoming request from completing until we are done with the drop
+ CHECK(Zcs.DropBucket(Namespace, Bucket));
+ while (!WorkComplete)
+ {
+ zen::Sleep(1);
+ }
+
+ // Entire bucket should be dropped, but doing a request should will re-create the namespace but it must still be empty
+ Value1 = GetValue(Zcs, Namespace, Bucket, Key1);
+ CHECK(!Value1.Value);
+ ZenCacheValue Value2 = GetValue(Zcs, Namespace, Bucket, Key2);
+ CHECK(!Value2.Value);
+ }
+}
+
+TEST_CASE("z$.drop.namespace")
+{
+ using namespace testutils;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ ScopedTemporaryDirectory TempDir;
+ CreateDirectories(TempDir.Path());
+
+ auto PutValue =
+ [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
+ // Create a cache record
+ IoHash Key = CreateKey(KeyIndex);
+ CbObject CacheValue = CreateCacheValue(Size);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(Namespace, Bucket, Key, PutValue);
+ return Key;
+ };
+ auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
+ ZenCacheValue GetValue;
+ Zcs.Get(Namespace, Bucket, Key, GetValue);
+ return GetValue;
+ };
+ WorkerThreadPool Workers(1);
+ {
+ GcManager Gc;
+ ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
+ const auto Bucket1 = "teardrinker1"sv;
+ const auto Bucket2 = "teardrinker2"sv;
+ const auto Namespace1 = "mynamespace1"sv;
+ const auto Namespace2 = "mynamespace2"sv;
+
+ IoHash Key1 = PutValue(Zcs, Namespace1, Bucket1, 42, 4096);
+ IoHash Key2 = PutValue(Zcs, Namespace1, Bucket2, 43, 2048);
+ IoHash Key3 = PutValue(Zcs, Namespace2, Bucket1, 44, 4096);
+ IoHash Key4 = PutValue(Zcs, Namespace2, Bucket2, 45, 2048);
+
+ ZenCacheValue Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1);
+ CHECK(Value1.Value);
+ ZenCacheValue Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2);
+ CHECK(Value2.Value);
+ ZenCacheValue Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3);
+ CHECK(Value3.Value);
+ ZenCacheValue Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4);
+ CHECK(Value4.Value);
+
+ std::atomic_bool WorkComplete = false;
+ Workers.ScheduleWork([&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ Value2.Value = IoBuffer{};
+ Value3.Value = IoBuffer{};
+ Value4.Value = IoBuffer{};
+ WorkComplete = true;
+ });
+ // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
+ // Our DropBucket execution blocks any incoming request from completing until we are done with the drop
+ CHECK(Zcs.DropNamespace(Namespace1));
+ while (!WorkComplete)
+ {
+ zen::Sleep(1);
+ }
+
+ // Entire namespace should be dropped, but doing a request should will re-create the namespace but it must still be empty
+ Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1);
+ CHECK(!Value1.Value);
+ Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2);
+ CHECK(!Value2.Value);
+ Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3);
+ CHECK(Value3.Value);
+ Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4);
+ CHECK(Value4.Value);
+ }
+}
+
+TEST_CASE("z$.blocked.disklayer.put")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ GcStorageSize CacheSize;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size, Size & 0xff);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+
+ CbObject CacheValue = CreateCacheValue(64 * 1024 + 64);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ size_t Key = Buffer.Size();
+ IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t));
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer});
+
+ ZenCacheValue BufferGet;
+ CHECK(Zcs.Get("test_bucket", HashKey, BufferGet));
+
+ CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1);
+ IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
+ Buffer2.SetContentType(ZenContentType::kCbObject);
+
+ // We should be able to overwrite even if the file is open for read
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer2});
+
+ MemoryView OldView = BufferGet.Value.GetView();
+
+ ZenCacheValue BufferGet2;
+ CHECK(Zcs.Get("test_bucket", HashKey, BufferGet2));
+ MemoryView NewView = BufferGet2.Value.GetView();
+
+ // Make sure file openend for read before we wrote it still have old data
+ CHECK(OldView.GetSize() == Buffer.GetSize());
+ CHECK(memcmp(OldView.GetData(), Buffer.GetData(), OldView.GetSize()) == 0);
+
+ // Make sure we get the new data when reading after we write new data
+ CHECK(NewView.GetSize() == Buffer2.GetSize());
+ CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0);
+}
+
+TEST_CASE("z$.scrub")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ using namespace testutils;
+
+ struct CacheRecord
+ {
+ IoBuffer Record;
+ std::vector<CompressedBuffer> Attachments;
+ };
+
+ auto CreateCacheRecord = [](bool Structured, std::string_view Bucket, const IoHash& Key, const std::vector<size_t>& AttachmentSizes) {
+ CacheRecord Result;
+ if (Structured)
+ {
+ Result.Attachments.resize(AttachmentSizes.size());
+ CbObjectWriter Record;
+ Record.BeginObject("Key"sv);
+ {
+ Record << "Bucket"sv << Bucket;
+ Record << "Hash"sv << Key;
+ }
+ Record.EndObject();
+ for (size_t Index = 0; Index < AttachmentSizes.size(); Index++)
+ {
+ IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSizes[Index]);
+ CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData));
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), CompressedAttachmentData.DecodeRawHash());
+ Result.Attachments[Index] = CompressedAttachmentData;
+ }
+ Result.Record = Record.Save().GetBuffer().AsIoBuffer();
+ Result.Record.SetContentType(ZenContentType::kCbObject);
+ }
+ else
+ {
+ std::string RecordData = fmt::format("{}:{}", Bucket, Key.ToHexString());
+ size_t TotalSize = RecordData.length() + 1;
+ for (size_t AttachmentSize : AttachmentSizes)
+ {
+ TotalSize += AttachmentSize;
+ }
+ Result.Record = IoBuffer(TotalSize);
+ char* DataPtr = (char*)Result.Record.MutableData();
+ memcpy(DataPtr, RecordData.c_str(), RecordData.length() + 1);
+ DataPtr += RecordData.length() + 1;
+ for (size_t AttachmentSize : AttachmentSizes)
+ {
+ IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSize);
+ memcpy(DataPtr, AttachmentData.GetData(), AttachmentData.GetSize());
+ DataPtr += AttachmentData.GetSize();
+ }
+ }
+ return Result;
+ };
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ auto CreateRecords =
+ [&](bool IsStructured, std::string_view BucketName, const std::vector<IoHash>& Cids, const std::vector<size_t>& AttachmentSizes) {
+ for (const IoHash& Cid : Cids)
+ {
+ CacheRecord Record = CreateCacheRecord(IsStructured, BucketName, Cid, AttachmentSizes);
+ Zcs.Put("mybucket", Cid, {.Value = Record.Record});
+ for (const CompressedBuffer& Attachment : Record.Attachments)
+ {
+ CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), Attachment.DecodeRawHash());
+ }
+ }
+ };
+
+ std::vector<size_t> AttachmentSizes = {16, 1000, 2000, 4000, 8000, 64000, 80000};
+
+ std::vector<IoHash> UnstructuredCids{CreateKey(4), CreateKey(5), CreateKey(6)};
+ CreateRecords(false, "mybucket"sv, UnstructuredCids, AttachmentSizes);
+
+ std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)};
+ CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes);
+
+ ScrubContext ScrubCtx;
+ Zcs.Scrub(ScrubCtx);
+ CidStore.Scrub(ScrubCtx);
+ CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());
+ CHECK(ScrubCtx.BadCids().GetSize() == 0);
+}
+
+#endif
+
+void
+z$_forcelink()
+{
+}
+
+} // namespace zen