aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/cache')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp4397
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp2440
2 files changed, 6837 insertions, 0 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
new file mode 100644
index 000000000..82f190caa
--- /dev/null
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -0,0 +1,4397 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenstore/cachedisklayer.h"
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compress.h>
+#include <zencore/except.h>
+#include <zencore/fmtutils.h>
+#include <zencore/jobqueue.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
+#include <zencore/xxhash.h>
+#include <zenutil/workerpools.h>
+
+#include <future>
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+bool
+IsKnownBadBucketName(std::string_view Bucket)
+{
+ if (Bucket.size() == 32)
+ {
+ uint8_t BucketHex[16];
+ if (ParseHexBytes(Bucket, BucketHex))
+ {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+namespace {
+
+#pragma pack(push)
+#pragma pack(1)
+
+ struct CacheBucketIndexHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t Version2 = 2;
+ static constexpr uint32_t CurrentVersion = Version2;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t PayloadAlignment = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+
+ bool IsValid() const
+ {
+ if (Magic != ExpectedMagic)
+ {
+ return false;
+ }
+
+ if (Checksum != ComputeChecksum(*this))
+ {
+ return false;
+ }
+
+ if (PayloadAlignment == 0)
+ {
+ return false;
+ }
+
+ return true;
+ }
+ };
+
+ static_assert(sizeof(CacheBucketIndexHeader) == 32);
+
+ struct BucketMetaHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta';
+ static constexpr uint32_t Version1 = 1;
+ static constexpr uint32_t CurrentVersion = Version1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t Padding = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const BucketMetaHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+
+ bool IsValid() const
+ {
+ if (Magic != ExpectedMagic)
+ {
+ return false;
+ }
+
+ if (Checksum != ComputeChecksum(*this))
+ {
+ return false;
+ }
+
+ if (Padding != 0)
+ {
+ return false;
+ }
+
+ return true;
+ }
+ };
+
+ static_assert(sizeof(BucketMetaHeader) == 32);
+
+#pragma pack(pop)
+
+ //////////////////////////////////////////////////////////////////////////
+
+ template<typename T>
+ void Reset(T& V)
+ {
+ T Tmp;
+ V.swap(Tmp);
+ }
+
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".slog";
+ const char* MetaExtension = ".meta";
+
+ std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + IndexExtension);
+ }
+
+ std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + MetaExtension);
+ }
+
+ std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + LogExtension);
+ }
+
+ std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ ZEN_UNUSED(BucketName);
+ return BucketDir / "zen_manifest";
+ }
+
+ bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.Reserved != 0)
+ {
+ OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.GetFlags() &
+ ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+ {
+ return true;
+ }
+ if (Entry.Location.Reserved != 0)
+ {
+ OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString());
+ return false;
+ }
+ uint64_t Size = Entry.Location.Size();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+ bool MoveAndDeleteDirectory(const std::filesystem::path& Dir)
+ {
+ int DropIndex = 0;
+ do
+ {
+ if (!std::filesystem::exists(Dir))
+ {
+ return false;
+ }
+
+ std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex);
+ std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName;
+ if (std::filesystem::exists(DroppedBucketPath))
+ {
+ DropIndex++;
+ continue;
+ }
+
+ std::error_code Ec;
+ std::filesystem::rename(Dir, DroppedBucketPath, Ec);
+ if (!Ec)
+ {
+ DeleteDirectories(DroppedBucketPath);
+ return true;
+ }
+ // TODO: Do we need to bail at some point?
+ zen::Sleep(100);
+ } while (true);
+ }
+} // namespace
+
+namespace fs = std::filesystem;
+using namespace std::literals;
+
+class BucketManifestSerializer
+{
+ using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex;
+ using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData;
+
+ using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex;
+ using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload;
+
+public:
+ // We use this to indicate if a on disk bucket needs wiping
+ // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references
+ // to block items.
+ // See: https://github.com/EpicGames/zen/pull/299
+ static inline const uint32_t CurrentDiskBucketVersion = 1;
+
+ bool Open(std::filesystem::path ManifestPath)
+ {
+ Manifest = LoadCompactBinaryObject(ManifestPath);
+ return !!Manifest;
+ }
+
+ Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); }
+
+ bool IsCurrentVersion(uint32_t& OutVersion) const
+ {
+ OutVersion = Manifest["Version"sv].AsUInt32(0);
+ return OutVersion == CurrentDiskBucketVersion;
+ }
+
+ void ParseManifest(RwLock::ExclusiveLockScope& BucketLock,
+ ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path ManifestPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads);
+
+ Oid GenerateNewManifest(std::filesystem::path ManifestPath);
+
+ IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount);
+ uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); }
+ void WriteSidecarFile(RwLock::SharedLockScope& BucketLock,
+ const std::filesystem::path& SidecarPath,
+ uint64_t SnapshotLogPosition,
+ const ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ const std::vector<AccessTime>& AccessTimes,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas);
+ bool ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock,
+ ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path SidecarPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads);
+
+ IoBuffer MakeManifest(const Oid& BucketId,
+ ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
+ std::vector<AccessTime>&& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>&& Payloads,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>&& MetaDatas);
+
+ CbObject Manifest;
+
+private:
+ CbObject LoadCompactBinaryObject(const fs::path& Path)
+ {
+ FileContents Result = ReadFile(Path);
+
+ if (!Result.ErrorCode)
+ {
+ IoBuffer Buffer = Result.Flatten();
+ if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
+ {
+ return zen::LoadCompactBinaryObject(Buffer);
+ }
+ }
+
+ return CbObject();
+ }
+
+ uint64_t m_ManifestEntryCount = 0;
+
+ struct ManifestData
+ {
+ IoHash Key; // 20
+ AccessTime Timestamp; // 4
+ IoHash RawHash; // 20
+ uint32_t Padding_0; // 4
+ size_t RawSize; // 8
+ uint64_t Padding_1; // 8
+ };
+
+ static_assert(sizeof(ManifestData) == 64);
+};
+
+void
+BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& BucketLock,
+ ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path ManifestPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads)
+{
+ if (Manifest["UsingMetaFile"sv].AsBool())
+ {
+ ReadSidecarFile(BucketLock, Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads);
+
+ return;
+ }
+
+ ZEN_TRACE_CPU("Z$::ParseManifest");
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
+ const uint64_t Count = Manifest["Count"sv].AsUInt64(0);
+ std::vector<PayloadIndex> KeysIndexes;
+ KeysIndexes.reserve(Count);
+
+ CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView();
+ for (CbFieldView& KeyView : KeyArray)
+ {
+ if (auto It = Index.find(KeyView.AsHash()); It != Index.end())
+ {
+ KeysIndexes.push_back(It.value());
+ }
+ else
+ {
+ KeysIndexes.push_back(PayloadIndex());
+ }
+ }
+
+ size_t KeyIndexOffset = 0;
+ CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView();
+ for (CbFieldView& TimeStampView : TimeStampArray)
+ {
+ const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
+ if (KeyIndex)
+ {
+ AccessTimes[KeyIndex] = TimeStampView.AsInt64();
+ }
+ }
+
+ KeyIndexOffset = 0;
+ CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView();
+ CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView();
+ if (RawHashArray.Num() == RawSizeArray.Num())
+ {
+ auto RawHashIt = RawHashArray.CreateViewIterator();
+ auto RawSizeIt = RawSizeArray.CreateViewIterator();
+ while (RawHashIt != CbFieldViewIterator())
+ {
+ const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
+
+ if (KeyIndex)
+ {
+ uint64_t RawSize = RawSizeIt.AsUInt64();
+ IoHash RawHash = RawHashIt.AsHash();
+ if (RawSize != 0 || RawHash != IoHash::Zero)
+ {
+ BucketPayload& Payload = Payloads[KeyIndex];
+ Bucket.SetMetaData(BucketLock, Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash});
+ }
+ }
+
+ RawHashIt++;
+ RawSizeIt++;
+ }
+ }
+ else
+ {
+ ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath);
+ }
+}
+
+Oid
+BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath)
+{
+ const Oid BucketId = Oid::NewOid();
+
+ CbObjectWriter Writer;
+ Writer << "BucketId"sv << BucketId;
+ Writer << "Version"sv << CurrentDiskBucketVersion;
+ Manifest = Writer.Save();
+ WriteFile(ManifestPath, Manifest.GetBuffer().AsIoBuffer());
+
+ return BucketId;
+}
+
+IoBuffer
+BucketManifestSerializer::MakeManifest(const Oid& BucketId,
+ ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
+ std::vector<AccessTime>&& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>&& Payloads,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>&& MetaDatas)
+{
+ using namespace std::literals;
+
+ ZEN_TRACE_CPU("Z$::MakeManifest");
+
+ size_t ItemCount = Index.size();
+
+ // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth
+ // And we don't need to reallocate the underlying buffer in almost every case
+ const size_t EstimatedSizePerItem = 54u;
+ const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128);
+ CbObjectWriter Writer(ReserveSize);
+
+ Writer << "BucketId"sv << BucketId;
+ Writer << "Version"sv << CurrentDiskBucketVersion;
+
+ if (!Index.empty())
+ {
+ Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size()));
+ Writer.BeginArray("Keys"sv);
+ for (auto& Kv : Index)
+ {
+ const IoHash& Key = Kv.first;
+ Writer.AddHash(Key);
+ }
+ Writer.EndArray();
+
+ Writer.BeginArray("Timestamps"sv);
+ for (auto& Kv : Index)
+ {
+ GcClock::Tick AccessTime = AccessTimes[Kv.second];
+ Writer.AddInteger(AccessTime);
+ }
+ Writer.EndArray();
+
+ if (!MetaDatas.empty())
+ {
+ Writer.BeginArray("RawHash"sv);
+ for (auto& Kv : Index)
+ {
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second];
+ if (Payload.MetaData)
+ {
+ Writer.AddHash(MetaDatas[Payload.MetaData].RawHash);
+ }
+ else
+ {
+ Writer.AddHash(IoHash::Zero);
+ }
+ }
+ Writer.EndArray();
+
+ Writer.BeginArray("RawSize"sv);
+ for (auto& Kv : Index)
+ {
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second];
+ if (Payload.MetaData)
+ {
+ Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize);
+ }
+ else
+ {
+ Writer.AddInteger(0);
+ }
+ }
+ Writer.EndArray();
+ }
+ }
+
+ Manifest = Writer.Save();
+ return Manifest.GetBuffer().AsIoBuffer();
+}
+
+IoBuffer
+BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount)
+{
+ m_ManifestEntryCount = EntryCount;
+
+ CbObjectWriter Writer;
+ Writer << "BucketId"sv << BucketId;
+ Writer << "Version"sv << CurrentDiskBucketVersion;
+ Writer << "Count"sv << EntryCount;
+ Writer << "UsingMetaFile"sv << true;
+ Manifest = Writer.Save();
+
+ return Manifest.GetBuffer().AsIoBuffer();
+}
+
+bool
+BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock,
+ ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path SidecarPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads)
+{
+ ZEN_TRACE_CPU("Z$::ReadSidecarFile");
+
+ ZEN_ASSERT(AccessTimes.size() == Payloads.size());
+
+ std::error_code Ec;
+
+ BasicFile SidecarFile;
+ SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to open sidecar file '{}'", SidecarPath));
+ }
+
+ uint64_t FileSize = SidecarFile.FileSize();
+
+ auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); });
+
+ if (FileSize < sizeof(BucketMetaHeader))
+ {
+ return false;
+ }
+
+ BasicFileBuffer Sidecar(SidecarFile, 128 * 1024);
+
+ BucketMetaHeader Header;
+ Sidecar.Read(&Header, sizeof Header, 0);
+
+ if (!Header.IsValid())
+ {
+ return false;
+ }
+
+ if (Header.Version != BucketMetaHeader::Version1)
+ {
+ return false;
+ }
+
+ const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData);
+ if (Header.EntryCount > ExpectedEntryCount)
+ {
+ return false;
+ }
+
+ InvalidGuard.Dismiss();
+
+ uint64_t RemainingEntryCount = ExpectedEntryCount;
+ uint64_t EntryCount = 0;
+ uint64_t CurrentReadOffset = sizeof(Header);
+
+ while (RemainingEntryCount--)
+ {
+ const ManifestData* Entry = Sidecar.MakeView<ManifestData>(CurrentReadOffset);
+ CurrentReadOffset += sizeof(ManifestData);
+
+ if (auto It = Index.find(Entry->Key); It != Index.end())
+ {
+ PayloadIndex PlIndex = It.value();
+
+ ZEN_ASSERT(size_t(PlIndex) <= Payloads.size());
+
+ ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex];
+
+ AccessTimes[PlIndex] = Entry->Timestamp;
+
+ if (Entry->RawSize && Entry->RawHash != IoHash::Zero)
+ {
+ Bucket.SetMetaData(BucketLock, PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash});
+ }
+ }
+
+ EntryCount++;
+ }
+
+ ZEN_ASSERT(EntryCount == ExpectedEntryCount);
+
+ return true;
+}
+
+void
+BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&,
+ const std::filesystem::path& SidecarPath,
+ uint64_t SnapshotLogPosition,
+ const ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ const std::vector<AccessTime>& AccessTimes,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas)
+{
+ ZEN_TRACE_CPU("Z$::WriteSidecarFile");
+
+ BucketMetaHeader Header;
+ Header.EntryCount = m_ManifestEntryCount;
+ Header.LogPosition = SnapshotLogPosition;
+ Header.Checksum = Header.ComputeChecksum(Header);
+
+ std::error_code Ec;
+
+ TemporaryFile SidecarFile;
+ SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath()));
+ }
+
+ SidecarFile.Write(&Header, sizeof Header, 0);
+
+ // TODO: make this batching for better performance
+ {
+ uint64_t WriteOffset = sizeof Header;
+
+ // BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024);
+
+ std::vector<ManifestData> ManifestDataBuffer;
+ const size_t MaxManifestDataBufferCount = Min(Index.size(), 4096u); // 256 Kb
+ ManifestDataBuffer.reserve(MaxManifestDataBufferCount);
+ for (auto& Kv : Index)
+ {
+ const IoHash& Key = Kv.first;
+ const PayloadIndex PlIndex = Kv.second;
+
+ IoHash RawHash = IoHash::Zero;
+ uint64_t RawSize = 0;
+
+ if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData)
+ {
+ RawHash = MetaDatas[MetaIndex].RawHash;
+ RawSize = MetaDatas[MetaIndex].RawSize;
+ }
+
+ ManifestDataBuffer.emplace_back(ManifestData{.Key = Key,
+ .Timestamp = AccessTimes[PlIndex],
+ .RawHash = RawHash,
+ .Padding_0 = 0,
+ .RawSize = RawSize,
+ .Padding_1 = 0});
+ if (ManifestDataBuffer.size() == MaxManifestDataBufferCount)
+ {
+ const uint64_t WriteSize = sizeof(ManifestData) * ManifestDataBuffer.size();
+ SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset);
+ WriteOffset += WriteSize;
+ ManifestDataBuffer.clear();
+ ManifestDataBuffer.reserve(MaxManifestDataBufferCount);
+ }
+ }
+ if (ManifestDataBuffer.size() > 0)
+ {
+ SidecarFile.Write(ManifestDataBuffer.data(), sizeof(ManifestData) * ManifestDataBuffer.size(), WriteOffset);
+ }
+ }
+
+ SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath));
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+static const float IndexMinLoadFactor = 0.2f;
+static const float IndexMaxLoadFactor = 0.7f;
+
+ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
+ std::atomic_uint64_t& OuterCacheMemoryUsage,
+ std::string BucketName,
+ const BucketConfiguration& Config)
+: m_Gc(Gc)
+, m_OuterCacheMemoryUsage(OuterCacheMemoryUsage)
+, m_BucketName(std::move(BucketName))
+, m_Configuration(Config)
+, m_BucketId(Oid::Zero)
+{
+ m_Index.min_load_factor(IndexMinLoadFactor);
+ m_Index.max_load_factor(IndexMaxLoadFactor);
+
+ if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap")))
+ {
+ const uint64_t LegacyOverrideSize = 16 * 1024 * 1024;
+
+ // This is pretty ad hoc but in order to avoid too many individual files
+ // it makes sense to have a different strategy for legacy values
+ m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, LegacyOverrideSize);
+ }
+ m_Gc.AddGcReferencer(*this);
+}
+
+ZenCacheDiskLayer::CacheBucket::~CacheBucket()
+{
+ m_Gc.RemoveGcReferencer(*this);
+}
+
+bool
+ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate)
+{
+ using namespace std::literals;
+
+ ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate");
+ ZEN_ASSERT(m_IsFlushing.load());
+
+ // We want to take the lock here since we register as a GC referencer a construction
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+
+ ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir);
+
+ m_BlocksBasePath = BucketDir / "blocks";
+ m_BucketDir = BucketDir;
+
+ CreateDirectories(m_BucketDir);
+
+ std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
+
+ bool IsNew = false;
+
+ BucketManifestSerializer ManifestReader;
+
+ if (ManifestReader.Open(ManifestPath))
+ {
+ m_BucketId = ManifestReader.GetBucketId();
+ if (m_BucketId == Oid::Zero)
+ {
+ return false;
+ }
+
+ uint32_t Version = 0;
+ if (ManifestReader.IsCurrentVersion(/* out */ Version) == false)
+ {
+ ZEN_INFO("Wiping bucket '{}', found version {}, required version {}",
+ BucketDir,
+ Version,
+ BucketManifestSerializer::CurrentDiskBucketVersion);
+ IsNew = true;
+ }
+ }
+ else if (AllowCreate)
+ {
+ m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath);
+ IsNew = true;
+ }
+ else
+ {
+ return false;
+ }
+
+ InitializeIndexFromDisk(IndexLock, IsNew);
+
+ auto _ = MakeGuard([&]() {
+ // We are now initialized, allow flushing when we exit
+ m_IsFlushing.store(false);
+ });
+
+ if (IsNew)
+ {
+ return true;
+ }
+
+ ManifestReader.ParseManifest(IndexLock, *this, ManifestPath, m_Index, m_AccessTimes, m_Payloads);
+
+ return true;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot");
+
+ const uint64_t LogCount = m_SlogFile.GetLogCount();
+ if (m_LogFlushPosition == LogCount)
+ {
+ return;
+ }
+
+ ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir);
+ const uint64_t EntryCount = m_Index.size();
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
+ m_BucketDir,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ namespace fs = std::filesystem;
+
+ fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+
+ try
+ {
+ const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry);
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
+ if (Error)
+ {
+ throw std::system_error(Error, fmt::format("get disk space in '{}' FAILED", m_BucketDir));
+ }
+
+ bool EnoughSpace = Space.Free >= IndexSize + 1024 * 512;
+ if (!EnoughSpace)
+ {
+ uint64_t ReclaimedSpace = ClaimDiskReserveFunc();
+ EnoughSpace = (Space.Free + ReclaimedSpace) >= IndexSize + 1024 * 512;
+ }
+ if (!EnoughSpace)
+ {
+ throw std::runtime_error(
+ fmt::format("not enough free disk space in '{}' to save index of size {}", m_BucketDir, NiceBytes(IndexSize)));
+ }
+
+ TemporaryFile ObjectIndexFile;
+ std::error_code Ec;
+ ObjectIndexFile.CreateTemporary(m_BucketDir, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir));
+ }
+
+ {
+ // This is in a separate scope just to ensure IndexWriter goes out
+ // of scope before the file is flushed/closed, in order to ensure
+ // all data is written to the file
+ BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024);
+
+ CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
+
+ Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
+ IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
+
+ uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader);
+
+ for (auto& Entry : m_Index)
+ {
+ DiskIndexEntry IndexEntry;
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Location = m_Payloads[Entry.second].Location;
+ IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset);
+
+ IndexWriteOffset += sizeof(DiskIndexEntry);
+ }
+
+ IndexWriter.Flush();
+ }
+
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec);
+ if (Ec)
+ {
+ std::filesystem::path TempFilePath = ObjectIndexFile.GetPath();
+ ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message());
+
+ if (std::filesystem::is_regular_file(TempFilePath))
+ {
+ if (!std::filesystem::remove(TempFilePath, Ec) || Ec)
+ {
+ ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempFilePath, Ec.message());
+ }
+ }
+ }
+ else
+ {
+ // We must only update the log flush position once the snapshot write succeeds
+ m_LogFlushPosition = LogCount;
+ }
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what());
+ }
+}
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile");
+
+ if (!std::filesystem::is_regular_file(IndexPath))
+ {
+ return 0;
+ }
+
+ auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); });
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t FileSize = ObjectIndexFile.FileSize();
+ if (FileSize < sizeof(CacheBucketIndexHeader))
+ {
+ return 0;
+ }
+
+ CacheBucketIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+
+ if (!Header.IsValid())
+ {
+ return 0;
+ }
+
+ if (Header.Version != CacheBucketIndexHeader::Version2)
+ {
+ return 0;
+ }
+
+ const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ if (Header.EntryCount > ExpectedEntryCount)
+ {
+ return 0;
+ }
+
+ InvalidGuard.Dismiss();
+
+ size_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ m_Configuration.PayloadAlignment = Header.PayloadAlignment;
+
+ m_Payloads.reserve(Header.EntryCount);
+ m_Index.reserve(Header.EntryCount);
+
+ BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024);
+
+ uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader);
+ uint64_t RemainingEntryCount = Header.EntryCount;
+
+ std::string InvalidEntryReason;
+ while (RemainingEntryCount--)
+ {
+ const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset);
+ CurrentReadOffset += sizeof(DiskIndexEntry);
+
+ if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+
+ const PayloadIndex EntryIndex = PayloadIndex(EntryCount);
+ m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location});
+ m_Index.insert_or_assign(Entry->Key, EntryIndex);
+
+ EntryCount++;
+ }
+
+ ZEN_ASSERT(EntryCount == m_Payloads.size());
+
+ m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount()));
+
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.resize(EntryCount);
+ }
+
+ OutVersion = CacheBucketIndexHeader::Version2;
+ return Header.LogPosition;
+}
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::ReadLog");
+
+ if (!std::filesystem::is_regular_file(LogPath))
+ {
+ return 0;
+ }
+
+ uint64_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (!CasLog.Initialize())
+ {
+ return 0;
+ }
+
+ const uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+
+ LogEntryCount = EntryCount - SkipEntryCount;
+ uint64_t InvalidEntryCount = 0;
+
+ CasLog.Replay(
+ [&](const DiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Location.Flags & DiskLocation::kTombStone)
+ {
+ // Note: this leaves m_Payloads and other arrays with 'holes' in them
+ m_Index.erase(Record.Key);
+ return;
+ }
+
+ if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
+ m_Payloads.emplace_back(BucketPayload{.Location = Record.Location});
+ m_Index.insert_or_assign(Record.Key, EntryIndex);
+ },
+ SkipEntryCount);
+
+ m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
+
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.resize(m_Payloads.size());
+ }
+
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir);
+ }
+
+ return LogEntryCount;
+};
+
+void
+ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockScope& IndexLock, const bool IsNew)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::Initialize");
+
+ m_StandaloneSize = 0;
+
+ m_Index.clear();
+ m_Payloads.clear();
+ m_AccessTimes.clear();
+ m_MetaDatas.clear();
+ m_FreeMetaDatas.clear();
+ m_MemCachedPayloads.clear();
+ m_FreeMemCachedPayloads.clear();
+ m_FirstReferenceIndex.clear();
+ m_ReferenceHashes.clear();
+ m_NextReferenceHashesIndexes.clear();
+ m_ReferenceCount = 0;
+
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+
+ if (IsNew)
+ {
+ fs::remove(LogPath);
+ fs::remove(IndexPath);
+ fs::remove_all(m_BlocksBasePath);
+ }
+
+ CreateDirectories(m_BucketDir);
+
+ m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1);
+
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ uint32_t IndexVersion = 0;
+ m_LogFlushPosition = ReadIndexFile(IndexLock, IndexPath, IndexVersion);
+ if (IndexVersion == 0)
+ {
+ ZEN_WARN("removing invalid index file at '{}'", IndexPath);
+ std::filesystem::remove(IndexPath);
+ }
+ }
+
+ uint64_t LogEntryCount = 0;
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath))
+ {
+ LogEntryCount = ReadLog(IndexLock, LogPath, m_LogFlushPosition);
+ }
+ else if (fs::is_regular_file(LogPath))
+ {
+ ZEN_WARN("removing invalid log at '{}'", LogPath);
+ std::filesystem::remove(LogPath);
+ }
+ }
+
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ BlockStore::BlockIndexSet KnownBlocks;
+ for (const auto& Entry : m_Index)
+ {
+ size_t EntryIndex = Entry.second;
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ const DiskLocation& Location = Payload.Location;
+
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
+ }
+ else
+ {
+ const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment);
+ KnownBlocks.Add(BlockLocation.BlockIndex);
+ }
+ }
+ m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+
+ if (IsNew || LogEntryCount > 0)
+ {
+ WriteIndexSnapshot(IndexLock);
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const
+{
+ char HexString[sizeof(HashKey.Hash) * 2];
+ ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString);
+
+ Path.Append(m_BucketDir);
+ Path.AppendSeparator();
+ Path.Append(L"blob");
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(HexString, HexString + 3);
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(HexString + 3, HexString + 5);
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString));
+}
+
+IoBuffer
+ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const
+{
+ ZEN_TRACE_CPU("Z$::Bucket::GetInlineCacheValue");
+
+ BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment);
+
+ IoBuffer Value = m_BlockStore.TryGetChunk(Location);
+ if (Value)
+ {
+ Value.SetContentType(Loc.GetContentType());
+ }
+
+ return Value;
+}
+
+IoBuffer
+ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const
+{
+ ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue");
+
+ ExtendablePathBuilder<256> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
+
+ RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
+
+ if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath()))
+ {
+ Data.SetContentType(ContentType);
+
+ return Data;
+ }
+
+ return {};
+}
+
+bool
+ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::Get");
+
+ metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
+
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ auto It = m_Index.find(HashKey);
+ if (It == m_Index.end())
+ {
+ m_DiskMissCount++;
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
+ return false;
+ }
+
+ PayloadIndex EntryIndex = It.value();
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ DiskLocation Location = m_Payloads[EntryIndex].Location;
+
+ bool FillRawHashAndRawSize = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0);
+
+ const BucketPayload* Payload = &m_Payloads[EntryIndex];
+ if (Payload->MetaData)
+ {
+ const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData];
+ OutValue.RawHash = MetaData.RawHash;
+ OutValue.RawSize = MetaData.RawSize;
+ FillRawHashAndRawSize = false;
+ }
+
+ if (Payload->MemCached)
+ {
+ OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload;
+ Payload = nullptr;
+ IndexLock.ReleaseNow();
+ m_MemoryHitCount++;
+ }
+ else
+ {
+ Payload = nullptr;
+ IndexLock.ReleaseNow();
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey);
+ }
+ else
+ {
+ OutValue.Value = GetInlineCacheValue(Location);
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ size_t ValueSize = OutValue.Value.GetSize();
+ if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::Get::MemCache");
+ OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
+ RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
+ if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end())
+ {
+ BucketPayload& WritePayload = m_Payloads[UpdateIt->second];
+ // Only update if it has not already been updated by other thread
+ if (!WritePayload.MemCached)
+ {
+ SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if (FillRawHashAndRawSize)
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData");
+ if (Location.IsFlagSet(DiskLocation::kCompressed))
+ {
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ {
+ OutValue = ZenCacheValue{};
+ m_DiskMissCount++;
+ return false;
+ }
+ }
+ else
+ {
+ OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
+ OutValue.RawSize = OutValue.Value.GetSize();
+ }
+ RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
+ if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end())
+ {
+ BucketPayload& WritePayload = m_Payloads[WriteIt.value()];
+
+ // Only set if no other path has already updated the meta data
+ if (!WritePayload.MetaData)
+ {
+ SetMetaData(UpdateIndexLock, WritePayload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash});
+ }
+ }
+ }
+ if (OutValue.Value)
+ {
+ m_DiskHitCount++;
+ StatsScope.SetBytes(OutValue.Value.GetSize());
+ return true;
+ }
+ else
+ {
+ m_DiskMissCount++;
+ return false;
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::Put");
+
+ metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
+
+ if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
+ {
+ PutStandaloneCacheValue(HashKey, Value, References);
+ }
+ else
+ {
+ PutInlineCacheValue(HashKey, Value, References);
+ }
+
+ m_DiskWriteCount++;
+}
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::MemCacheTrim");
+
+ uint64_t Trimmed = 0;
+ GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
+
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
+ if (MemCachedCount == 0)
+ {
+ return 0;
+ }
+
+ uint32_t WriteIndex = 0;
+ for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
+ {
+ MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
+ if (!Data.Payload)
+ {
+ continue;
+ }
+ PayloadIndex Index = Data.OwnerIndex;
+ ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
+ GcClock::Tick AccessTime = m_AccessTimes[Index];
+ if (AccessTime < ExpireTicks)
+ {
+ size_t PayloadSize = Data.Payload.GetSize();
+ RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
+ Data = {};
+ m_Payloads[Index].MemCached = {};
+ Trimmed += PayloadSize;
+ continue;
+ }
+ if (ReadIndex > WriteIndex)
+ {
+ m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index};
+ m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex);
+ }
+ WriteIndex++;
+ }
+ m_MemCachedPayloads.resize(WriteIndex);
+ m_MemCachedPayloads.shrink_to_fit();
+ zen::Reset(m_FreeMemCachedPayloads);
+ return Trimmed;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::GetUsageByAccess");
+
+ size_t SlotCount = InOutUsageSlots.capacity();
+ RwLock::SharedLockScope _(m_IndexLock);
+ uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
+ if (MemCachedCount == 0)
+ {
+ return;
+ }
+ for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
+ {
+ MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
+ if (!Data.Payload)
+ {
+ continue;
+ }
+ PayloadIndex Index = Data.OwnerIndex;
+ ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
+ GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index]));
+ GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0);
+ size_t Slot = Age < MaxAge ? gsl::narrow<size_t>((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1);
+ ZEN_ASSERT_SLOW(Slot < SlotCount);
+ if (Slot >= InOutUsageSlots.size())
+ {
+ InOutUsageSlots.resize(Slot + 1, 0);
+ }
+ InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize());
+ }
+}
+
+bool
+ZenCacheDiskLayer::CacheBucket::Drop()
+{
+ ZEN_TRACE_CPU("Z$::Bucket::Drop");
+
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+
+ std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks;
+ ShardLocks.reserve(256);
+ for (RwLock& Lock : m_ShardedLocks)
+ {
+ ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock));
+ }
+ m_BlockStore.Close();
+ m_SlogFile.Close();
+
+ bool Deleted = MoveAndDeleteDirectory(m_BucketDir);
+
+ m_Index.clear();
+ m_Payloads.clear();
+ m_AccessTimes.clear();
+ m_MetaDatas.clear();
+ m_FreeMetaDatas.clear();
+ m_MemCachedPayloads.clear();
+ m_FreeMemCachedPayloads.clear();
+ m_FirstReferenceIndex.clear();
+ m_ReferenceHashes.clear();
+ m_NextReferenceHashesIndexes.clear();
+ m_ReferenceCount = 0;
+ m_StandaloneSize.store(0);
+ m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load());
+ m_MemCachedSize.store(0);
+
+ return Deleted;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::Flush()
+{
+ ZEN_TRACE_CPU("Z$::Bucket::Flush");
+ bool Expected = false;
+ if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
+ {
+ return;
+ }
+ auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
+
+ ZEN_INFO("Flushing bucket {}", m_BucketDir);
+
+ try
+ {
+ m_BlockStore.Flush(/*ForceNewBlock*/ false);
+ m_SlogFile.Flush();
+
+ SaveSnapshot();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("Failed to flush bucket in '{}'. Reason: '{}'", m_BucketDir, Ex.what());
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot");
+ try
+ {
+ bool UseLegacyScheme = false;
+
+ IoBuffer Buffer;
+ BucketManifestSerializer ManifestWriter;
+
+ if (UseLegacyScheme)
+ {
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketPayload> Payloads;
+ std::vector<BucketMetaData> MetaDatas;
+ IndexMap Index;
+
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ WriteIndexSnapshot(IndexLock);
+ // Note: this copy could be eliminated on shutdown to
+ // reduce memory usage and execution time
+ Index = m_Index;
+ Payloads = m_Payloads;
+ AccessTimes = m_AccessTimes;
+ MetaDatas = m_MetaDatas;
+ }
+
+ Buffer = ManifestWriter.MakeManifest(m_BucketId,
+ std::move(Index),
+ std::move(AccessTimes),
+ std::move(Payloads),
+ std::move(MetaDatas));
+ const uint64_t RequiredSpace = Buffer.GetSize() + 1024 * 512;
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
+ if (Error)
+ {
+ ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
+ return;
+ }
+ bool EnoughSpace = Space.Free >= RequiredSpace;
+ if (!EnoughSpace)
+ {
+ uint64_t ReclaimedSpace = ClaimDiskReserveFunc();
+ EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace;
+ }
+ if (!EnoughSpace)
+ {
+ ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}",
+ m_BucketDir,
+ NiceBytes(Buffer.GetSize()));
+ return;
+ }
+ }
+ else
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ WriteIndexSnapshot(IndexLock);
+ const uint64_t EntryCount = m_Index.size();
+ Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount);
+ uint64_t SidecarSize = ManifestWriter.GetSidecarSize();
+
+ const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512;
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
+ if (Error)
+ {
+ ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
+ return;
+ }
+ bool EnoughSpace = Space.Free >= RequiredSpace;
+ if (!EnoughSpace)
+ {
+ uint64_t ReclaimedSpace = ClaimDiskReserveFunc();
+ EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace;
+ }
+ if (!EnoughSpace)
+ {
+ ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}",
+ m_BucketDir,
+ NiceBytes(Buffer.GetSize()));
+ return;
+ }
+
+ ManifestWriter.WriteSidecarFile(IndexLock,
+ GetMetaPath(m_BucketDir, m_BucketName),
+ m_LogFlushPosition,
+ m_Index,
+ m_AccessTimes,
+ m_Payloads,
+ m_MetaDatas);
+ }
+
+ std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
+ WriteFile(ManifestPath, Buffer);
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what());
+ }
+}
+
+IoHash
+HashBuffer(const CompositeBuffer& Buffer)
+{
+ IoHashStream Hasher;
+
+ for (const SharedBuffer& Segment : Buffer.GetSegments())
+ {
+ Hasher.Append(Segment.GetView());
+ }
+
+ return Hasher.GetHash();
+}
+
+bool
+ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer)
+{
+ ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType);
+
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
+
+ if (Error == CbValidateError::None)
+ {
+ return true;
+ }
+
+ ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error));
+
+ return false;
+ }
+ else if (ContentType == ZenContentType::kCompressedBinary)
+ {
+ IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer);
+
+ IoHash HeaderRawHash;
+ uint64_t RawSize = 0;
+ if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize))
+ {
+ ZEN_SCOPED_ERROR("compressed buffer header validation failed");
+
+ return false;
+ }
+
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize);
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ IoHash DecompressedHash = HashBuffer(Decompressed);
+
+ if (HeaderRawHash != DecompressedHash)
+ {
+ ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash);
+
+ return false;
+ }
+ }
+ else
+ {
+ // No way to verify this kind of content (what is it exactly?)
+
+ static int Once = [&] {
+ ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType));
+ return 42;
+ }();
+ }
+
+ return true;
+};
+
+void
+ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::Scrub");
+
+ ZEN_INFO("scrubbing '{}'", m_BucketDir);
+
+ Stopwatch Timer;
+ uint64_t ChunkCount = 0;
+ uint64_t VerifiedChunkBytes = 0;
+
+ auto LogStats = MakeGuard([&] {
+ const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs());
+
+ ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})",
+ m_BucketName,
+ NiceBytes(VerifiedChunkBytes),
+ NiceTimeSpanMs(DurationMs),
+ ChunkCount,
+ NiceRate(VerifiedChunkBytes, DurationMs));
+ });
+
+ std::vector<IoHash> BadKeys;
+ auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); };
+
+ try
+ {
+ std::vector<BlockStoreLocation> ChunkLocations;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+
+ RwLock::SharedLockScope _(m_IndexLock);
+
+ const size_t BlockChunkInitialCount = m_Index.size() / 4;
+ ChunkLocations.reserve(BlockChunkInitialCount);
+ ChunkIndexToChunkHash.reserve(BlockChunkInitialCount);
+
+ // Do a pass over the index and verify any standalone file values straight away
+ // all other storage classes are gathered and verified in bulk in order to enable
+ // more efficient I/O scheduling
+
+ for (auto& Kv : m_Index)
+ {
+ const IoHash& HashKey = Kv.first;
+ const BucketPayload& Payload = m_Payloads[Kv.second];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ Ctx.ThrowIfDeadlineExpired();
+
+ ++ChunkCount;
+ VerifiedChunkBytes += Loc.Size();
+
+ if (Loc.GetContentType() == ZenContentType::kBinary)
+ {
+ // Blob cache value, not much we can do about data integrity checking
+ // here since there's no hash available
+ ExtendablePathBuilder<256> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
+
+ RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
+
+ std::error_code Ec;
+ uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec);
+ if (Ec)
+ {
+ ReportBadKey(HashKey);
+ }
+ if (size != Loc.Size())
+ {
+ ReportBadKey(HashKey);
+ }
+ continue;
+ }
+ else
+ {
+ // Structured cache value
+ IoBuffer Buffer = GetStandaloneCacheValue(Loc.GetContentType(), HashKey);
+ if (!Buffer)
+ {
+ ReportBadKey(HashKey);
+ continue;
+ }
+ if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer))
+ {
+ ReportBadKey(HashKey);
+ continue;
+ }
+ }
+ }
+ else
+ {
+ ChunkLocations.emplace_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment));
+ ChunkIndexToChunkHash.push_back(HashKey);
+ continue;
+ }
+ }
+
+ const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void {
+ ++ChunkCount;
+ VerifiedChunkBytes += Size;
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ if (!Data)
+ {
+ // ChunkLocation out of range of stored blocks
+ ReportBadKey(Hash);
+ return;
+ }
+ if (!Size)
+ {
+ ReportBadKey(Hash);
+ return;
+ }
+ IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
+ if (!Buffer)
+ {
+ ReportBadKey(Hash);
+ return;
+ }
+ const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
+ ZenContentType ContentType = Payload.Location.GetContentType();
+ Buffer.SetContentType(ContentType);
+ if (!ValidateCacheBucketEntryValue(ContentType, Buffer))
+ {
+ ReportBadKey(Hash);
+ return;
+ }
+ };
+
+ const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void {
+ Ctx.ThrowIfDeadlineExpired();
+
+ ++ChunkCount;
+ VerifiedChunkBytes += Size;
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+ if (!Buffer)
+ {
+ ReportBadKey(Hash);
+ return;
+ }
+ const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
+ ZenContentType ContentType = Payload.Location.GetContentType();
+ Buffer.SetContentType(ContentType);
+ if (!ValidateCacheBucketEntryValue(ContentType, Buffer))
+ {
+ ReportBadKey(Hash);
+ return;
+ }
+ };
+
+ m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
+ }
+ catch (ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ }
+
+ Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes);
+
+ if (!BadKeys.empty())
+ {
+ ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir);
+
+ if (Ctx.RunRecovery())
+ {
+ // Deal with bad chunks by removing them from our lookup map
+
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(BadKeys.size());
+
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ for (const IoHash& BadKey : BadKeys)
+ {
+ // Log a tombstone and delete the in-memory index for the bad entry
+ const auto It = m_Index.find(BadKey);
+ BucketPayload& Payload = m_Payloads[It->second];
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ RemoveReferences(IndexLock, m_FirstReferenceIndex[It->second]);
+ }
+ DiskLocation Location = Payload.Location;
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed);
+ }
+
+ RemoveMemCachedData(IndexLock, Payload);
+ RemoveMetaData(IndexLock, Payload);
+
+ Location.Flags |= DiskLocation::kTombStone;
+ LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
+ m_Index.erase(BadKey);
+ }
+ }
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ ExtendablePathBuilder<256> Path;
+ BuildPath(Path, Entry.Key);
+ fs::path FilePath = Path.ToPath();
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key));
+ if (fs::is_regular_file(FilePath))
+ {
+ ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8());
+ std::error_code Ec;
+ fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file...
+ }
+ }
+ }
+ m_SlogFile.Append(LogEntries);
+
+ // Clean up m_AccessTimes and m_Payloads vectors
+ {
+ std::vector<BucketPayload> Payloads;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketMetaData> MetaDatas;
+ std::vector<MemCacheData> MemCachedPayloads;
+ std::vector<ReferenceIndex> FirstReferenceIndex;
+ IndexMap Index;
+
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock);
+ }
+ }
+ }
+ }
+
+ // Let whomever it concerns know about the bad chunks. This could
+ // be used to invalidate higher level data structures more efficiently
+ // than a full validation pass might be able to do
+ if (!BadKeys.empty())
+ {
+ Ctx.ReportBadCidChunks(BadKeys);
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::GatherReferences");
+
+#define CALCULATE_BLOCKING_TIME 0
+
+#if CALCULATE_BLOCKING_TIME
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+#endif // CALCULATE_BLOCKING_TIME
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+#if CALCULATE_BLOCKING_TIME
+ ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})",
+ m_BucketDir,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs));
+#else
+ ZEN_DEBUG("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()));
+#endif // CALCULATE_BLOCKING_TIME
+ });
+
+ const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime();
+
+ const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
+
+ IndexMap Index;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketPayload> Payloads;
+ std::vector<ReferenceIndex> FirstReferenceIndex;
+ {
+ RwLock::SharedLockScope __(m_IndexLock);
+#if CALCULATE_BLOCKING_TIME
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+#endif // CALCULATE_BLOCKING_TIME
+ if (m_Index.empty())
+ {
+ return;
+ }
+ Index = m_Index;
+ AccessTimes = m_AccessTimes;
+ Payloads = m_Payloads;
+ FirstReferenceIndex = m_FirstReferenceIndex;
+ }
+
+ std::vector<IoHash> ExpiredKeys;
+ ExpiredKeys.reserve(1024);
+
+ std::vector<IoHash> Cids;
+ if (!GcCtx.SkipCid())
+ {
+ Cids.reserve(1024);
+ }
+
+ std::vector<std::pair<IoHash, size_t>> StructuredItemsWithUnknownAttachments;
+
+ for (const auto& Entry : Index)
+ {
+ const IoHash& Key = Entry.first;
+ size_t PayloadIndex = Entry.second;
+ GcClock::Tick AccessTime = AccessTimes[PayloadIndex];
+ if (AccessTime < ExpireTicks)
+ {
+ ExpiredKeys.push_back(Key);
+ continue;
+ }
+
+ if (GcCtx.SkipCid())
+ {
+ continue;
+ }
+
+ BucketPayload& Payload = Payloads[PayloadIndex];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ continue;
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ if (FirstReferenceIndex.empty() || FirstReferenceIndex[PayloadIndex] == ReferenceIndex::Unknown())
+ {
+ StructuredItemsWithUnknownAttachments.push_back(Entry);
+ continue;
+ }
+
+ bool ReferencesAreKnown = false;
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+#if CALCULATE_BLOCKING_TIME
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+#endif // CALCULATE_BLOCKING_TIME
+ if (auto It = m_Index.find(Key); It != m_Index.end())
+ {
+ ReferencesAreKnown = GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids);
+ }
+ }
+ if (ReferencesAreKnown)
+ {
+ if (Cids.size() >= 1024)
+ {
+ GcCtx.AddRetainedCids(Cids);
+ Cids.clear();
+ }
+ continue;
+ }
+ }
+ StructuredItemsWithUnknownAttachments.push_back(Entry);
+ }
+
+ for (const auto& Entry : StructuredItemsWithUnknownAttachments)
+ {
+ const IoHash& Key = Entry.first;
+ BucketPayload& Payload = Payloads[Entry.second];
+ const DiskLocation& Loc = Payload.Location;
+ {
+ IoBuffer Buffer;
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Key); !Buffer)
+ {
+ continue;
+ }
+ }
+ else
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+#if CALCULATE_BLOCKING_TIME
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+#endif // CALCULATE_BLOCKING_TIME
+ if (auto It = m_Index.find(Key); It != m_Index.end())
+ {
+ const BucketPayload& CachedPayload = m_Payloads[It->second];
+ if (CachedPayload.MemCached)
+ {
+ Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload;
+ ZEN_ASSERT_SLOW(Buffer);
+ }
+ else
+ {
+ DiskLocation Location = m_Payloads[It->second].Location;
+ IndexLock.ReleaseNow();
+ Buffer = GetInlineCacheValue(Location);
+ // Don't memcache items when doing GC
+ }
+ }
+ if (!Buffer)
+ {
+ continue;
+ }
+ }
+
+ ZEN_ASSERT(Buffer);
+ ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
+ CbObjectView Obj(Buffer.GetData());
+ size_t CurrentCidCount = Cids.size();
+ Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+#if CALCULATE_BLOCKING_TIME
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+#endif // CALCULATE_BLOCKING_TIME
+ if (auto It = m_Index.find(Key); It != m_Index.end())
+ {
+ if (m_FirstReferenceIndex[It->second] == ReferenceIndex::Unknown())
+ {
+ SetReferences(IndexLock,
+ m_FirstReferenceIndex[It->second],
+ std::span<IoHash>(Cids.data() + CurrentCidCount, Cids.size() - CurrentCidCount));
+ }
+ else
+ {
+ Cids.resize(CurrentCidCount);
+ (void)GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids);
+ }
+ }
+ }
+ if (Cids.size() >= 1024)
+ {
+ GcCtx.AddRetainedCids(Cids);
+ Cids.clear();
+ }
+ }
+ }
+
+ GcCtx.AddRetainedCids(Cids);
+ GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys));
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage");
+
+ ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir);
+
+ Stopwatch TotalTimer;
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = 0;
+ uint64_t DeletedSize = 0;
+ GcStorageSize OldTotalSize = StorageSize();
+
+ std::unordered_set<IoHash> DeletedChunks;
+ uint64_t MovedCount = 0;
+
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG(
+ "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved "
+ "{} "
+ "of {} "
+ "entries ({}/{}).",
+ m_BucketDir,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs),
+ NiceBytes(DeletedSize),
+ DeletedChunks.size(),
+ MovedCount,
+ TotalChunkCount,
+ NiceBytes(OldTotalSize.DiskSize),
+ NiceBytes(OldTotalSize.MemorySize));
+
+ bool Expected = false;
+ if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
+ {
+ return;
+ }
+ auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
+
+ try
+ {
+ SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); });
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("Failed to write index and manifest after GC in '{}'. Reason: '{}'", m_BucketDir, Ex.what());
+ }
+ });
+
+ auto __ = MakeGuard([&]() {
+ if (!DeletedChunks.empty())
+ {
+ // Clean up m_AccessTimes and m_Payloads vectors
+ std::vector<BucketPayload> Payloads;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketMetaData> MetaDatas;
+ std::vector<MemCacheData> MemCachedPayloads;
+ std::vector<ReferenceIndex> FirstReferenceIndex;
+ IndexMap Index;
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock);
+ }
+ GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end()));
+ }
+ });
+
+ std::span<const IoHash> ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string());
+ if (ExpiredCacheKeySpan.empty())
+ {
+ return;
+ }
+
+ m_SlogFile.Flush();
+
+ std::unordered_set<IoHash, IoHash::Hasher> ExpiredCacheKeys(ExpiredCacheKeySpan.begin(), ExpiredCacheKeySpan.end());
+
+ std::vector<DiskIndexEntry> ExpiredStandaloneEntries;
+ IndexMap IndexSnapshot;
+ std::vector<BucketPayload> PayloadsSnapshot;
+ BlockStore::ReclaimSnapshotState BlockStoreState;
+ {
+ bool Expected = false;
+ if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
+ {
+ ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is currently flushing", m_BucketDir);
+ return;
+ }
+ auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
+
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::State");
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+
+ BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
+
+ for (const IoHash& Key : ExpiredCacheKeys)
+ {
+ if (auto It = m_Index.find(Key); It != m_Index.end())
+ {
+ const BucketPayload& Payload = m_Payloads[It->second];
+ if (Payload.Location.Flags & DiskLocation::kStandaloneFile)
+ {
+ DiskIndexEntry Entry = {.Key = Key, .Location = Payload.Location};
+ Entry.Location.Flags |= DiskLocation::kTombStone;
+ ExpiredStandaloneEntries.push_back(Entry);
+ }
+ }
+ }
+
+ PayloadsSnapshot = m_Payloads;
+ IndexSnapshot = m_Index;
+
+ if (GcCtx.IsDeletionMode())
+ {
+ IndexLock.ReleaseNow();
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ for (const auto& Entry : ExpiredStandaloneEntries)
+ {
+ if (m_Index.erase(Entry.Key) == 1)
+ {
+ m_StandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ DeletedChunks.insert(Entry.Key);
+ }
+ }
+ m_SlogFile.Append(ExpiredStandaloneEntries);
+ }
+ }
+ }
+
+ if (GcCtx.IsDeletionMode())
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::Delete");
+
+ ExtendablePathBuilder<256> Path;
+
+ for (const auto& Entry : ExpiredStandaloneEntries)
+ {
+ const IoHash& Key = Entry.Key;
+
+ Path.Reset();
+ BuildPath(Path, Key);
+ fs::path FilePath = Path.ToPath();
+
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_Index.contains(Key))
+ {
+ // Someone added it back, let the file on disk be
+ ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8());
+ continue;
+ }
+ IndexLock.ReleaseNow();
+
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(Key));
+ if (fs::is_regular_file(FilePath))
+ {
+ ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
+ std::error_code Ec;
+ fs::remove(FilePath, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
+ continue;
+ }
+ }
+ }
+ DeletedSize += Entry.Location.Size();
+ }
+ }
+
+ TotalChunkCount = IndexSnapshot.size();
+
+ std::vector<BlockStoreLocation> ChunkLocations;
+ BlockStore::ChunkIndexArray KeepChunkIndexes;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
+ {
+ TotalChunkCount = 0;
+ for (const auto& Entry : IndexSnapshot)
+ {
+ size_t EntryIndex = Entry.second;
+ const DiskLocation& DiskLocation = PayloadsSnapshot[EntryIndex].Location;
+
+ if (DiskLocation.Flags & DiskLocation::kStandaloneFile)
+ {
+ continue;
+ }
+ const IoHash& Key = Entry.first;
+ BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment);
+ size_t ChunkIndex = ChunkLocations.size();
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash.push_back(Key);
+ if (ExpiredCacheKeys.contains(Key))
+ {
+ continue;
+ }
+ KeepChunkIndexes.push_back(ChunkIndex);
+ }
+ }
+ TotalChunkCount = ChunkLocations.size();
+ size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size();
+
+ const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ if (!PerformDelete)
+ {
+ m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, true);
+ GcStorageSize CurrentTotalSize = StorageSize();
+ ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} ({}/{})",
+ m_BucketDir,
+ DeleteCount,
+ TotalChunkCount,
+ NiceBytes(CurrentTotalSize.DiskSize),
+ NiceBytes(CurrentTotalSize.MemorySize));
+ return;
+ }
+
+ m_BlockStore.ReclaimSpace(
+ BlockStoreState,
+ ChunkLocations,
+ KeepChunkIndexes,
+ m_Configuration.PayloadAlignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ for (const auto& Entry : MovedChunks)
+ {
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ size_t EntryIndex = m_Index[ChunkHash];
+ BucketPayload& Payload = m_Payloads[EntryIndex];
+ if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != m_Payloads[EntryIndex].Location)
+ {
+ // Entry has been updated while GC was running, ignore the move
+ continue;
+ }
+ Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags());
+ LogEntries.push_back({.Key = ChunkHash, .Location = Payload.Location});
+ }
+ for (const size_t ChunkIndex : RemovedChunks)
+ {
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ size_t EntryIndex = m_Index[ChunkHash];
+ BucketPayload& Payload = m_Payloads[EntryIndex];
+ if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != Payload.Location)
+ {
+ // Entry has been updated while GC was running, ignore the delete
+ continue;
+ }
+ const DiskLocation& OldDiskLocation = Payload.Location;
+ LogEntries.push_back({.Key = ChunkHash,
+ .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment),
+ m_Configuration.PayloadAlignment,
+ OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
+
+ RemoveMemCachedData(IndexLock, Payload);
+ RemoveMetaData(IndexLock, Payload);
+
+ m_Index.erase(ChunkHash);
+ DeletedChunks.insert(ChunkHash);
+ }
+ }
+
+ m_SlogFile.Append(LogEntries);
+ m_SlogFile.Flush();
+ },
+ [&]() { return GcCtx.ClaimGCReserve(); });
+}
+
+ZenCacheDiskLayer::BucketStats
+ZenCacheDiskLayer::CacheBucket::Stats()
+{
+ GcStorageSize Size = StorageSize();
+ return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize,
+ .MemorySize = Size.MemorySize,
+ .DiskHitCount = m_DiskHitCount,
+ .DiskMissCount = m_DiskMissCount,
+ .DiskWriteCount = m_DiskWriteCount,
+ .MemoryHitCount = m_MemoryHitCount,
+ .MemoryMissCount = m_MemoryMissCount,
+ .MemoryWriteCount = m_MemoryWriteCount,
+ .PutOps = m_PutOps.Snapshot(),
+ .GetOps = m_GetOps.Snapshot()};
+}
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::EntryCount() const
+{
+ RwLock::SharedLockScope _(m_IndexLock);
+ return static_cast<uint64_t>(m_Index.size());
+}
+
+CacheValueDetails::ValueDetails
+ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const IoHash& Key, PayloadIndex Index) const
+{
+ std::vector<IoHash> Attachments;
+ const BucketPayload& Payload = m_Payloads[Index];
+ if (Payload.Location.IsFlagSet(DiskLocation::kStructured))
+ {
+ IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile)
+ ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key)
+ : GetInlineCacheValue(Payload.Location);
+ CbObjectView Obj(Value.GetData());
+ Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); });
+ }
+ BucketMetaData MetaData = GetMetaData(IndexLock, Payload);
+ return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(),
+ .RawSize = MetaData.RawSize,
+ .RawHash = MetaData.RawHash,
+ .LastAccess = m_AccessTimes[Index],
+ .Attachments = std::move(Attachments),
+ .ContentType = Payload.Location.GetContentType()};
+}
+
+CacheValueDetails::BucketDetails
+ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const
+{
+ CacheValueDetails::BucketDetails Details;
+ RwLock::SharedLockScope _(m_IndexLock);
+ if (ValueFilter.empty())
+ {
+ Details.Values.reserve(m_Index.size());
+ for (const auto& It : m_Index)
+ {
+ Details.Values.insert_or_assign(It.first, GetValueDetails(IndexLock, It.first, It.second));
+ }
+ }
+ else
+ {
+ IoHash Key = IoHash::FromHexString(ValueFilter);
+ if (auto It = m_Index.find(Key); It != m_Index.end())
+ {
+ Details.Values.insert_or_assign(It->first, GetValueDetails(IndexLock, It->first, It->second));
+ }
+ }
+ return Details;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents(
+ std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const
+{
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ for (const auto& It : m_Index)
+ {
+ CacheValueDetails::ValueDetails Vd = GetValueDetails(IndexLock, It.first, It.second);
+
+ Fn(It.first, Vd);
+ }
+}
+
+void
+ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
+{
+ ZEN_TRACE_CPU("Z$::CollectGarbage");
+
+ std::vector<CacheBucket*> Buckets;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ Buckets.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Buckets.push_back(Kv.second.get());
+ }
+ }
+ for (CacheBucket* Bucket : Buckets)
+ {
+ Bucket->CollectGarbage(GcCtx);
+ }
+ if (!m_IsMemCacheTrimming)
+ {
+ MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue");
+
+ uint64_t NewFileSize = Value.Value.Size();
+
+ TemporaryFile DataFile;
+
+ std::error_code Ec;
+ DataFile.CreateTemporary(m_BucketDir.c_str(), Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir));
+ }
+
+ bool CleanUpTempFile = true;
+ auto __ = MakeGuard([&] {
+ if (CleanUpTempFile)
+ {
+ std::error_code Ec;
+ std::filesystem::remove(DataFile.GetPath(), Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'",
+ DataFile.GetPath(),
+ m_BucketDir,
+ Ec.message());
+ }
+ }
+ });
+
+ DataFile.WriteAll(Value.Value, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec,
+ fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'",
+ NiceBytes(NewFileSize),
+ DataFile.GetPath().string(),
+ m_BucketDir));
+ }
+
+ ExtendablePathBuilder<256> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
+ std::filesystem::path FsPath{DataFilePath.ToPath()};
+
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
+
+ // We do a speculative remove of the file instead of probing with a exists call and check the error code instead
+ std::filesystem::remove(FsPath, Ec);
+ if (Ec)
+ {
+ if (Ec.value() != ENOENT)
+ {
+ ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message());
+ Sleep(100);
+ Ec.clear();
+ std::filesystem::remove(FsPath, Ec);
+ if (Ec && Ec.value() != ENOENT)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir));
+ }
+ }
+ }
+
+ // Assume parent directory exists
+ DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
+ if (Ec)
+ {
+ CreateDirectories(FsPath.parent_path());
+
+ // Try again after we or someone else created the directory
+ Ec.clear();
+ DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
+
+ // Retry if we still fail to handle contention to file system
+ uint32_t RetriesLeft = 3;
+ while (Ec && RetriesLeft > 0)
+ {
+ ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retries left: {}.",
+ FsPath,
+ DataFile.GetPath(),
+ m_BucketDir,
+ Ec.message(),
+ RetriesLeft);
+ Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
+ Ec.clear();
+ DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
+ RetriesLeft--;
+ }
+ if (Ec)
+ {
+ throw std::system_error(
+ Ec,
+ fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir));
+ }
+ }
+
+ // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file
+ // will be disabled as the file handle has already been closed
+ CleanUpTempFile = false;
+
+ uint8_t EntryFlags = DiskLocation::kStandaloneFile;
+
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ EntryFlags |= DiskLocation::kStructured;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ EntryFlags |= DiskLocation::kCompressed;
+ }
+
+ DiskLocation Loc(NewFileSize, EntryFlags);
+
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ ValueLock.ReleaseNow();
+ if (m_UpdatedKeys)
+ {
+ m_UpdatedKeys->insert(HashKey);
+ }
+
+ PayloadIndex EntryIndex = {};
+ if (auto It = m_Index.find(HashKey); It == m_Index.end())
+ {
+ // Previously unknown object
+ EntryIndex = PayloadIndex(m_Payloads.size());
+ m_Payloads.emplace_back(BucketPayload{.Location = Loc});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.emplace_back(ReferenceIndex{});
+ SetReferences(IndexLock, m_FirstReferenceIndex.back(), References);
+ }
+ m_Index.insert_or_assign(HashKey, EntryIndex);
+ }
+ else
+ {
+ EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size()));
+ BucketPayload& Payload = m_Payloads[EntryIndex];
+ uint64_t OldSize = Payload.Location.Size();
+ Payload = BucketPayload{.Location = Loc};
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References);
+ }
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ RemoveMemCachedData(IndexLock, Payload);
+ m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed);
+ }
+ if (Value.RawSize != 0 || Value.RawHash != IoHash::Zero)
+ {
+ SetMetaData(IndexLock, m_Payloads[EntryIndex], {.RawSize = Value.RawSize, .RawHash = Value.RawHash});
+ }
+ else
+ {
+ RemoveMetaData(IndexLock, m_Payloads[EntryIndex]);
+ }
+
+ m_SlogFile.Append({.Key = HashKey, .Location = Loc});
+ m_StandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed);
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::SetMetaData(RwLock::ExclusiveLockScope&,
+ BucketPayload& Payload,
+ const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData)
+{
+ if (Payload.MetaData)
+ {
+ m_MetaDatas[Payload.MetaData] = MetaData;
+ }
+ else
+ {
+ if (m_FreeMetaDatas.empty())
+ {
+ Payload.MetaData = MetaDataIndex(m_MetaDatas.size());
+ m_MetaDatas.emplace_back(MetaData);
+ }
+ else
+ {
+ Payload.MetaData = m_FreeMetaDatas.back();
+ m_FreeMetaDatas.pop_back();
+ m_MetaDatas[Payload.MetaData] = MetaData;
+ }
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload)
+{
+ if (Payload.MetaData)
+ {
+ m_FreeMetaDatas.push_back(Payload.MetaData);
+ Payload.MetaData = {};
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData)
+{
+ BucketPayload& Payload = m_Payloads[PayloadIndex];
+ uint64_t PayloadSize = MemCachedData.GetSize();
+ ZEN_ASSERT(PayloadSize != 0);
+ if (m_FreeMemCachedPayloads.empty())
+ {
+ if (m_MemCachedPayloads.size() != std::numeric_limits<uint32_t>::max())
+ {
+ Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(m_MemCachedPayloads.size()));
+ m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex});
+ AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
+ m_MemoryWriteCount++;
+ }
+ }
+ else
+ {
+ Payload.MemCached = m_FreeMemCachedPayloads.back();
+ m_FreeMemCachedPayloads.pop_back();
+ m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex};
+ AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
+ m_MemoryWriteCount++;
+ }
+}
+
+size_t
+ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload)
+{
+ if (Payload.MemCached)
+ {
+ size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize();
+ RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
+ m_MemCachedPayloads[Payload.MemCached] = {};
+ m_FreeMemCachedPayloads.push_back(Payload.MemCached);
+ Payload.MemCached = {};
+ return PayloadSize;
+ }
+ return 0;
+}
+
+ZenCacheDiskLayer::CacheBucket::BucketMetaData
+ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const
+{
+ if (Payload.MetaData)
+ {
+ return m_MetaDatas[Payload.MetaData];
+ }
+ return {};
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
+
+ uint8_t EntryFlags = 0;
+
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ EntryFlags |= DiskLocation::kStructured;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ EntryFlags |= DiskLocation::kCompressed;
+ }
+
+ m_BlockStore.WriteChunk(Value.Value.Data(),
+ Value.Value.Size(),
+ m_Configuration.PayloadAlignment,
+ [&](const BlockStoreLocation& BlockStoreLocation) {
+ DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags);
+ m_SlogFile.Append({.Key = HashKey, .Location = Location});
+
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ if (m_UpdatedKeys)
+ {
+ m_UpdatedKeys->insert(HashKey);
+ }
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ {
+ PayloadIndex EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size()));
+ BucketPayload& Payload = m_Payloads[EntryIndex];
+
+ RemoveMemCachedData(IndexLock, Payload);
+ RemoveMetaData(IndexLock, Payload);
+
+ Payload = (BucketPayload{.Location = Location});
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References);
+ }
+ }
+ else
+ {
+ PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
+ m_Payloads.emplace_back(BucketPayload{.Location = Location});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.emplace_back(ReferenceIndex{});
+ SetReferences(IndexLock, m_FirstReferenceIndex.back(), References);
+ }
+ m_Index.insert_or_assign(HashKey, EntryIndex);
+ }
+ });
+}
+
+std::string
+ZenCacheDiskLayer::CacheBucket::GetGcName(GcCtx&)
+{
+ return fmt::format("cachebucket:'{}'", m_BucketDir.string());
+}
+
+class DiskBucketStoreCompactor : public GcStoreCompactor
+{
+public:
+ DiskBucketStoreCompactor(ZenCacheDiskLayer::CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys)
+ : m_Bucket(Bucket)
+ , m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys))
+ {
+ m_ExpiredStandaloneKeys.shrink_to_fit();
+ }
+
+ virtual ~DiskBucketStoreCompactor() {}
+
+ virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::CompactStore");
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ Reset(m_ExpiredStandaloneKeys);
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': RemovedDisk: {} in {}",
+ m_Bucket.m_BucketDir,
+ NiceBytes(Stats.RemovedDisk),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ if (!m_ExpiredStandaloneKeys.empty())
+ {
+ // Compact standalone items
+ size_t Skipped = 0;
+ ExtendablePathBuilder<256> Path;
+ for (const std::pair<IoHash, uint64_t>& ExpiredKey : m_ExpiredStandaloneKeys)
+ {
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return;
+ }
+ Path.Reset();
+ m_Bucket.BuildPath(Path, ExpiredKey.first);
+ fs::path FilePath = Path.ToPath();
+
+ RwLock::SharedLockScope IndexLock(m_Bucket.m_IndexLock);
+ if (m_Bucket.m_Index.contains(ExpiredKey.first))
+ {
+ // Someone added it back, let the file on disk be
+ ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back",
+ m_Bucket.m_BucketDir,
+ Path.ToUtf8());
+ continue;
+ }
+
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ RwLock::ExclusiveLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first));
+ IndexLock.ReleaseNow();
+ ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8());
+
+ std::error_code Ec;
+ if (!fs::remove(FilePath, Ec))
+ {
+ continue;
+ }
+ if (Ec)
+ {
+ ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'",
+ m_Bucket.m_BucketDir,
+ Path.ToUtf8(),
+ Ec.message());
+ continue;
+ }
+ Stats.RemovedDisk += ExpiredKey.second;
+ }
+ else
+ {
+ RwLock::SharedLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first));
+ IndexLock.ReleaseNow();
+ ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8());
+
+ std::error_code Ec;
+ bool Existed = std::filesystem::is_regular_file(FilePath, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'",
+ m_Bucket.m_BucketDir,
+ FilePath,
+ Ec.message());
+ continue;
+ }
+ if (!Existed)
+ {
+ continue;
+ }
+ Skipped++;
+ }
+ }
+ if (Skipped > 0)
+ {
+ ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipped deleting of {} eligible files", m_Bucket.m_BucketDir, Skipped);
+ }
+ }
+
+ if (Ctx.Settings.CollectSmallObjects)
+ {
+ m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys = std::make_unique<HashSet>(); });
+ auto __ = MakeGuard([&]() { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys.reset(); }); });
+
+ size_t InlineEntryCount = 0;
+ BlockStore::BlockUsageMap BlockUsage;
+ {
+ RwLock::SharedLockScope ___(m_Bucket.m_IndexLock);
+ for (const auto& Entry : m_Bucket.m_Index)
+ {
+ ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second;
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
+ InlineEntryCount++;
+ uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex();
+ uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment);
+ if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end())
+ {
+ It->second.EntryCount++;
+ It->second.DiskUsage += ChunkSize;
+ }
+ else
+ {
+ BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1});
+ }
+ }
+ }
+
+ {
+ BlockStoreCompactState BlockCompactState;
+ std::vector<IoHash> BlockCompactStateKeys;
+ BlockCompactStateKeys.reserve(InlineEntryCount);
+
+ BlockStore::BlockEntryCountMap BlocksToCompact =
+ m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent);
+ BlockCompactState.IncludeBlocks(BlocksToCompact);
+
+ if (BlocksToCompact.size() > 0)
+ {
+ {
+ RwLock::SharedLockScope ___(m_Bucket.m_IndexLock);
+ for (const auto& Entry : m_Bucket.m_Index)
+ {
+ ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second;
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
+ if (!BlockCompactState.AddKeepLocation(Loc.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment)))
+ {
+ continue;
+ }
+ BlockCompactStateKeys.push_back(Entry.first);
+ }
+ }
+
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks",
+ m_Bucket.m_BucketDir,
+ BlocksToCompact.size());
+ }
+
+ m_Bucket.m_BlockStore.CompactBlocks(
+ BlockCompactState,
+ m_Bucket.m_Configuration.PayloadAlignment,
+ [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) {
+ std::vector<DiskIndexEntry> MovedEntries;
+ MovedEntries.reserve(MovedArray.size());
+ RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock);
+ for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
+ {
+ size_t ChunkIndex = Moved.first;
+ const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
+
+ if (m_Bucket.m_UpdatedKeys->contains(Key))
+ {
+ continue;
+ }
+
+ if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end())
+ {
+ ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second];
+ const BlockStoreLocation& NewLocation = Moved.second;
+ Payload.Location = DiskLocation(NewLocation,
+ m_Bucket.m_Configuration.PayloadAlignment,
+ Payload.Location.GetFlags());
+ MovedEntries.push_back({.Key = Key, .Location = Payload.Location});
+ }
+ }
+ m_Bucket.m_SlogFile.Append(MovedEntries);
+ Stats.RemovedDisk += FreedDiskSpace;
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return false;
+ }
+ return true;
+ },
+ ClaimDiskReserveCallback);
+ }
+ else
+ {
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks",
+ m_Bucket.m_BucketDir,
+ BlocksToCompact.size());
+ }
+ }
+ }
+ }
+ }
+ }
+
+private:
+ ZenCacheDiskLayer::CacheBucket& m_Bucket;
+ std::vector<std::pair<IoHash, uint64_t>> m_ExpiredStandaloneKeys;
+};
+
+GcStoreCompactor*
+ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::RemoveExpiredData");
+
+ size_t TotalEntries = 0;
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: cachebucket [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}",
+ m_BucketDir,
+ Stats.CheckedCount,
+ Stats.FoundCount,
+ Stats.DeletedCount,
+ NiceBytes(Stats.FreedMemory),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count();
+
+ std::vector<DiskIndexEntry> ExpiredEntries;
+ std::vector<std::pair<IoHash, uint64_t>> ExpiredStandaloneKeys;
+ uint64_t RemovedStandaloneSize = 0;
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return nullptr;
+ }
+ if (m_Index.empty())
+ {
+ return nullptr;
+ }
+
+ TotalEntries = m_Index.size();
+
+ // Find out expired keys
+ for (const auto& Entry : m_Index)
+ {
+ const IoHash& Key = Entry.first;
+ PayloadIndex EntryIndex = Entry.second;
+ GcClock::Tick AccessTime = m_AccessTimes[EntryIndex];
+ if (AccessTime >= ExpireTicks)
+ {
+ continue;
+ }
+
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ DiskIndexEntry ExpiredEntry = {.Key = Key, .Location = Payload.Location};
+ ExpiredEntry.Location.Flags |= DiskLocation::kTombStone;
+
+ if (Payload.Location.Flags & DiskLocation::kStandaloneFile)
+ {
+ ExpiredStandaloneKeys.push_back({Key, Payload.Location.Size()});
+ RemovedStandaloneSize += Payload.Location.Size();
+ ExpiredEntries.push_back(ExpiredEntry);
+ }
+ else if (Ctx.Settings.CollectSmallObjects)
+ {
+ ExpiredEntries.push_back(ExpiredEntry);
+ }
+ }
+
+ Stats.CheckedCount += TotalEntries;
+ Stats.FoundCount += ExpiredEntries.size();
+
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return nullptr;
+ }
+
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ for (const DiskIndexEntry& Entry : ExpiredEntries)
+ {
+ auto It = m_Index.find(Entry.Key);
+ ZEN_ASSERT(It != m_Index.end());
+ BucketPayload& Payload = m_Payloads[It->second];
+ RemoveMetaData(IndexLock, Payload);
+ Stats.FreedMemory += RemoveMemCachedData(IndexLock, Payload);
+ m_Index.erase(It);
+ Stats.DeletedCount++;
+ }
+ m_SlogFile.Append(ExpiredEntries);
+ m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed);
+ }
+ }
+
+ if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty())
+ {
+ std::vector<BucketPayload> Payloads;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketMetaData> MetaDatas;
+ std::vector<MemCacheData> MemCachedPayloads;
+ std::vector<ReferenceIndex> FirstReferenceIndex;
+ IndexMap Index;
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock);
+ }
+ }
+
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return nullptr;
+ }
+
+ return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys));
+}
+
+class DiskBucketReferenceChecker : public GcReferenceChecker
+{
+ using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex;
+ using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload;
+ using CacheBucket = ZenCacheDiskLayer::CacheBucket;
+ using ReferenceIndex = ZenCacheDiskLayer::CacheBucket::ReferenceIndex;
+
+public:
+ DiskBucketReferenceChecker(CacheBucket& Owner) : m_CacheBucket(Owner) {}
+
+ virtual ~DiskBucketReferenceChecker()
+ {
+ try
+ {
+ m_IndexLock.reset();
+ if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it
+ m_CacheBucket.ClearReferenceCache();
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("~DiskBucketReferenceChecker threw exception: '{}'", Ex.what());
+ }
+ }
+
+ virtual void PreCache(GcCtx& Ctx) override
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::PreCache");
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}",
+ m_CacheBucket.m_BucketDir,
+ m_CacheBucket.m_ReferenceCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ std::vector<IoHash> UpdateKeys;
+ std::vector<size_t> ReferenceCounts;
+ std::vector<IoHash> References;
+
+ auto GetAttachments = [&References, &ReferenceCounts](const void* CbObjectData) {
+ size_t CurrentReferenceCount = References.size();
+ CbObjectView Obj(CbObjectData);
+ Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); });
+ ReferenceCounts.push_back(References.size() - CurrentReferenceCount);
+ };
+
+ // Refresh cache
+ {
+ // If reference caching is enabled the references will be updated at modification for us so we don't need to track modifications
+ if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys = std::make_unique<HashSet>(); });
+ }
+
+ std::vector<IoHash> StandaloneKeys;
+ {
+ std::vector<IoHash> InlineKeys;
+ std::unordered_map<uint32_t, std::size_t> BlockIndexToEntriesPerBlockIndex;
+ struct InlineEntry
+ {
+ uint32_t InlineKeyIndex;
+ uint32_t Offset;
+ uint32_t Size;
+ };
+ std::vector<std::vector<InlineEntry>> EntriesPerBlock;
+ size_t UpdateCount = 0;
+ {
+ RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock);
+ for (const auto& Entry : m_CacheBucket.m_Index)
+ {
+ if (Ctx.IsCancelledFlag.load())
+ {
+ IndexLock.ReleaseNow();
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ return;
+ }
+
+ PayloadIndex EntryIndex = Entry.second;
+ const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ continue;
+ }
+ if (m_CacheBucket.m_Configuration.EnableReferenceCaching &&
+ m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown())
+ {
+ continue;
+ }
+ UpdateCount++;
+ const IoHash& Key = Entry.first;
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ StandaloneKeys.push_back(Key);
+ continue;
+ }
+
+ BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment);
+ InlineEntry UpdateEntry = {.InlineKeyIndex = gsl::narrow<uint32_t>(InlineKeys.size()),
+ .Offset = gsl::narrow<uint32_t>(ChunkLocation.Offset),
+ .Size = gsl::narrow<uint32_t>(ChunkLocation.Size)};
+ InlineKeys.push_back(Key);
+
+ if (auto It = BlockIndexToEntriesPerBlockIndex.find(ChunkLocation.BlockIndex);
+ It != BlockIndexToEntriesPerBlockIndex.end())
+ {
+ EntriesPerBlock[It->second].emplace_back(UpdateEntry);
+ }
+ else
+ {
+ BlockIndexToEntriesPerBlockIndex.insert_or_assign(ChunkLocation.BlockIndex, EntriesPerBlock.size());
+ EntriesPerBlock.emplace_back(std::vector<InlineEntry>{UpdateEntry});
+ }
+ }
+ }
+
+ UpdateKeys.reserve(UpdateCount);
+
+ for (auto It : BlockIndexToEntriesPerBlockIndex)
+ {
+ uint32_t BlockIndex = It.first;
+
+ Ref<BlockStoreFile> BlockFile = m_CacheBucket.m_BlockStore.GetBlockFile(BlockIndex);
+ if (BlockFile)
+ {
+ size_t EntriesPerBlockIndex = It.second;
+ std::vector<InlineEntry>& InlineEntries = EntriesPerBlock[EntriesPerBlockIndex];
+
+ std::sort(InlineEntries.begin(), InlineEntries.end(), [&](const InlineEntry& Lhs, const InlineEntry& Rhs) -> bool {
+ return Lhs.Offset < Rhs.Offset;
+ });
+
+ uint64_t BlockFileSize = BlockFile->FileSize();
+ BasicFileBuffer BlockBuffer(BlockFile->GetBasicFile(), 32768);
+ for (const InlineEntry& InlineEntry : InlineEntries)
+ {
+ if ((InlineEntry.Offset + InlineEntry.Size) > BlockFileSize)
+ {
+ ReferenceCounts.push_back(0);
+ }
+ else
+ {
+ MemoryView ChunkView = BlockBuffer.MakeView(InlineEntry.Size, InlineEntry.Offset);
+ if (ChunkView.GetSize() == InlineEntry.Size)
+ {
+ GetAttachments(ChunkView.GetData());
+ }
+ else
+ {
+ std::vector<uint8_t> Buffer(InlineEntry.Size);
+ BlockBuffer.Read(Buffer.data(), InlineEntry.Size, InlineEntry.Offset);
+ GetAttachments(Buffer.data());
+ }
+ }
+ const IoHash& Key = InlineKeys[InlineEntry.InlineKeyIndex];
+ UpdateKeys.push_back(Key);
+ }
+ }
+ }
+ }
+ {
+ for (const IoHash& Key : StandaloneKeys)
+ {
+ if (Ctx.IsCancelledFlag.load())
+ {
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ return;
+ }
+
+ IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(ZenContentType::kCbObject, Key);
+ if (!Buffer)
+ {
+ continue;
+ }
+
+ GetAttachments(Buffer.GetData());
+ UpdateKeys.push_back(Key);
+ }
+ }
+ }
+
+ {
+ size_t ReferenceOffset = 0;
+ RwLock::ExclusiveLockScope IndexLock(m_CacheBucket.m_IndexLock);
+
+ if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ ZEN_ASSERT(m_CacheBucket.m_FirstReferenceIndex.empty());
+ ZEN_ASSERT(m_CacheBucket.m_ReferenceHashes.empty());
+ ZEN_ASSERT(m_CacheBucket.m_NextReferenceHashesIndexes.empty());
+ ZEN_ASSERT(m_CacheBucket.m_ReferenceCount == 0);
+ ZEN_ASSERT(m_CacheBucket.m_UpdatedKeys);
+
+ // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when
+ // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted.
+ m_CacheBucket.m_FirstReferenceIndex.resize(m_CacheBucket.m_Payloads.size(), ReferenceIndex::Unknown());
+ m_CacheBucket.m_ReferenceHashes.reserve(References.size());
+ m_CacheBucket.m_NextReferenceHashesIndexes.reserve(References.size());
+ }
+ else
+ {
+ ZEN_ASSERT(!m_CacheBucket.m_UpdatedKeys);
+ }
+
+ for (size_t Index = 0; Index < UpdateKeys.size(); Index++)
+ {
+ const IoHash& Key = UpdateKeys[Index];
+ size_t ReferenceCount = ReferenceCounts[Index];
+ if (auto It = m_CacheBucket.m_Index.find(Key); It != m_CacheBucket.m_Index.end())
+ {
+ PayloadIndex EntryIndex = It->second;
+ if (m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ if (m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown())
+ {
+ // The reference data is valid and what we have is old/redundant
+ continue;
+ }
+ }
+ else if (m_CacheBucket.m_UpdatedKeys->contains(Key))
+ {
+ // Our pre-cache data is invalid
+ continue;
+ }
+
+ m_CacheBucket.SetReferences(IndexLock,
+ m_CacheBucket.m_FirstReferenceIndex[EntryIndex],
+ std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount});
+ }
+ ReferenceOffset += ReferenceCount;
+ }
+
+ if (m_CacheBucket.m_Configuration.EnableReferenceCaching && !UpdateKeys.empty())
+ {
+ m_CacheBucket.CompactReferences(IndexLock);
+ }
+ }
+ }
+
+ virtual void LockState(GcCtx& Ctx) override
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::LockState");
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}",
+ m_CacheBucket.m_BucketDir,
+ m_CacheBucket.m_ReferenceCount + m_UncachedReferences.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock);
+ if (Ctx.IsCancelledFlag.load())
+ {
+ m_UncachedReferences.clear();
+ m_IndexLock.reset();
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ return;
+ }
+
+ if (m_CacheBucket.m_UpdatedKeys)
+ {
+ const HashSet& UpdatedKeys(*m_CacheBucket.m_UpdatedKeys);
+ for (const IoHash& Key : UpdatedKeys)
+ {
+ if (Ctx.IsCancelledFlag.load())
+ {
+ m_UncachedReferences.clear();
+ m_IndexLock.reset();
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ return;
+ }
+
+ auto It = m_CacheBucket.m_Index.find(Key);
+ if (It == m_CacheBucket.m_Index.end())
+ {
+ continue;
+ }
+
+ PayloadIndex EntryIndex = It->second;
+ const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ continue;
+ }
+
+ IoBuffer Buffer;
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ Buffer = m_CacheBucket.GetStandaloneCacheValue(Loc.GetContentType(), Key);
+ }
+ else
+ {
+ Buffer = m_CacheBucket.GetInlineCacheValue(Loc);
+ }
+
+ if (Buffer)
+ {
+ ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
+ CbObjectView Obj(Buffer.GetData());
+ Obj.IterateAttachments([this](CbFieldView Field) { m_UncachedReferences.insert(Field.AsAttachment()); });
+ }
+ }
+ }
+ }
+
+ virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet");
+
+ ZEN_ASSERT(m_IndexLock);
+ size_t InitialCount = IoCids.size();
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: cachebucket [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}",
+ m_CacheBucket.m_BucketDir,
+ InitialCount - IoCids.size(),
+ InitialCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes)
+ {
+ if (IoCids.erase(ReferenceHash) == 1)
+ {
+ if (IoCids.empty())
+ {
+ return;
+ }
+ }
+ }
+
+ for (const IoHash& ReferenceHash : m_UncachedReferences)
+ {
+ if (IoCids.erase(ReferenceHash) == 1)
+ {
+ if (IoCids.empty())
+ {
+ return;
+ }
+ }
+ }
+ }
+ CacheBucket& m_CacheBucket;
+ std::unique_ptr<RwLock::SharedLockScope> m_IndexLock;
+ HashSet m_UncachedReferences;
+};
+
+std::vector<GcReferenceChecker*>
+ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::CreateReferenceCheckers");
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': completed in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ {
+ RwLock::SharedLockScope __(m_IndexLock);
+ if (m_Index.empty())
+ {
+ return {};
+ }
+ }
+
+ return {new DiskBucketReferenceChecker(*this)};
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::CompactReferences");
+
+ std::vector<ReferenceIndex> FirstReferenceIndex;
+ std::vector<IoHash> NewReferenceHashes;
+ std::vector<ReferenceIndex> NewNextReferenceHashesIndexes;
+
+ FirstReferenceIndex.reserve(m_ReferenceCount);
+ NewReferenceHashes.reserve(m_ReferenceCount);
+ NewNextReferenceHashesIndexes.reserve(m_ReferenceCount);
+
+ for (const auto& It : m_Index)
+ {
+ ReferenceIndex SourceIndex = m_FirstReferenceIndex[It.second];
+ if (SourceIndex == ReferenceIndex::Unknown())
+ {
+ FirstReferenceIndex.push_back(ReferenceIndex{});
+ continue;
+ }
+ if (SourceIndex == ReferenceIndex::None())
+ {
+ FirstReferenceIndex.push_back(ReferenceIndex::None());
+ continue;
+ }
+ FirstReferenceIndex.push_back(ReferenceIndex{NewNextReferenceHashesIndexes.size()});
+ NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]);
+ NewNextReferenceHashesIndexes.push_back(ReferenceIndex::None());
+
+ SourceIndex = m_NextReferenceHashesIndexes[SourceIndex];
+ while (SourceIndex != ReferenceIndex::None())
+ {
+ NewNextReferenceHashesIndexes.back() = ReferenceIndex{NewReferenceHashes.size()};
+ NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]);
+ NewNextReferenceHashesIndexes.push_back(ReferenceIndex::None());
+ SourceIndex = m_NextReferenceHashesIndexes[SourceIndex];
+ }
+ }
+ m_FirstReferenceIndex.swap(FirstReferenceIndex);
+ m_ReferenceHashes.swap(NewReferenceHashes);
+ m_ReferenceHashes.shrink_to_fit();
+ m_NextReferenceHashesIndexes.swap(NewNextReferenceHashesIndexes);
+ m_NextReferenceHashesIndexes.shrink_to_fit();
+ m_ReferenceCount = m_ReferenceHashes.size();
+}
+
+ZenCacheDiskLayer::CacheBucket::ReferenceIndex
+ZenCacheDiskLayer::CacheBucket::AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key)
+{
+ ReferenceIndex NewIndex = ReferenceIndex{m_ReferenceHashes.size()};
+ m_ReferenceHashes.push_back(Key);
+ m_NextReferenceHashesIndexes.emplace_back(ReferenceIndex::None());
+ m_ReferenceCount++;
+ return NewIndex;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::SetReferences(RwLock::ExclusiveLockScope& Lock,
+ ReferenceIndex& FirstReferenceIndex,
+ std::span<IoHash> References)
+{
+ auto ReferenceIt = References.begin();
+
+ if (FirstReferenceIndex == ReferenceIndex::Unknown())
+ {
+ FirstReferenceIndex = ReferenceIndex::None();
+ }
+
+ ReferenceIndex CurrentIndex = FirstReferenceIndex;
+ if (CurrentIndex != ReferenceIndex::None())
+ {
+ if (ReferenceIt != References.end())
+ {
+ ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero);
+ if (CurrentIndex == ReferenceIndex::None())
+ {
+ CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt);
+ FirstReferenceIndex = CurrentIndex;
+ }
+ else
+ {
+ m_ReferenceHashes[CurrentIndex] = *ReferenceIt;
+ }
+ ReferenceIt++;
+ }
+ }
+ else
+ {
+ if (ReferenceIt != References.end())
+ {
+ ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero);
+ CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt);
+ ReferenceIt++;
+ }
+ FirstReferenceIndex = CurrentIndex;
+ }
+
+ while (ReferenceIt != References.end())
+ {
+ ZEN_ASSERT(CurrentIndex != ReferenceIndex::None());
+ ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero);
+ ReferenceIndex NextReferenceIndex = m_NextReferenceHashesIndexes[CurrentIndex];
+ if (NextReferenceIndex == ReferenceIndex::None())
+ {
+ NextReferenceIndex = AllocateReferenceEntry(Lock, *ReferenceIt);
+ m_NextReferenceHashesIndexes[CurrentIndex] = NextReferenceIndex;
+ }
+ else
+ {
+ m_ReferenceHashes[NextReferenceIndex] = *ReferenceIt;
+ }
+ CurrentIndex = NextReferenceIndex;
+ ReferenceIt++;
+ }
+
+ while (CurrentIndex != ReferenceIndex::None())
+ {
+ ReferenceIndex NextIndex = m_NextReferenceHashesIndexes[CurrentIndex];
+ if (NextIndex != ReferenceIndex::None())
+ {
+ m_ReferenceHashes[CurrentIndex] = IoHash::Zero;
+ ZEN_ASSERT(m_ReferenceCount > 0);
+ m_ReferenceCount--;
+ m_NextReferenceHashesIndexes[CurrentIndex] = ReferenceIndex::None();
+ }
+ CurrentIndex = NextIndex;
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::RemoveReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex)
+{
+ if (FirstReferenceIndex == ReferenceIndex::Unknown())
+ {
+ return;
+ }
+ ReferenceIndex CurrentIndex = FirstReferenceIndex;
+ while (CurrentIndex == ReferenceIndex::None())
+ {
+ m_ReferenceHashes[CurrentIndex] = IoHash::Zero;
+ ZEN_ASSERT(m_ReferenceCount > 0);
+ m_ReferenceCount--;
+ CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex];
+ }
+ FirstReferenceIndex = {};
+}
+
+bool
+ZenCacheDiskLayer::CacheBucket::LockedGetReferences(ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const
+{
+ if (FirstReferenceIndex == ReferenceIndex::Unknown())
+ {
+ return false;
+ }
+
+ ReferenceIndex CurrentIndex = FirstReferenceIndex;
+ while (CurrentIndex != ReferenceIndex::None())
+ {
+ ZEN_ASSERT_SLOW(m_ReferenceHashes[CurrentIndex] != IoHash::Zero);
+ OutReferences.push_back(m_ReferenceHashes[CurrentIndex]);
+ CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex];
+ }
+ return true;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::ClearReferenceCache()
+{
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ Reset(m_FirstReferenceIndex);
+ Reset(m_ReferenceHashes);
+ Reset(m_NextReferenceHashesIndexes);
+ m_ReferenceCount = 0;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
+ std::vector<BucketPayload>& Payloads,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<BucketMetaData>& MetaDatas,
+ std::vector<MemCacheData>& MemCachedPayloads,
+ std::vector<ReferenceIndex>& FirstReferenceIndex,
+ IndexMap& Index,
+ RwLock::ExclusiveLockScope& IndexLock)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::CompactState");
+
+ size_t EntryCount = m_Index.size();
+ Payloads.reserve(EntryCount);
+ AccessTimes.reserve(EntryCount);
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ FirstReferenceIndex.reserve(EntryCount);
+ }
+ Index.reserve(EntryCount);
+ Index.min_load_factor(IndexMinLoadFactor);
+ Index.max_load_factor(IndexMaxLoadFactor);
+ for (auto It : m_Index)
+ {
+ PayloadIndex EntryIndex = PayloadIndex(Payloads.size());
+ Payloads.push_back(m_Payloads[It.second]);
+ BucketPayload& Payload = Payloads.back();
+ AccessTimes.push_back(m_AccessTimes[It.second]);
+ if (Payload.MetaData)
+ {
+ MetaDatas.push_back(m_MetaDatas[Payload.MetaData]);
+ Payload.MetaData = MetaDataIndex(MetaDatas.size() - 1);
+ }
+ if (Payload.MemCached)
+ {
+ MemCachedPayloads.emplace_back(
+ MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex});
+ Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(MemCachedPayloads.size() - 1));
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]);
+ }
+ Index.insert({It.first, EntryIndex});
+ }
+ m_Index.swap(Index);
+ m_Payloads.swap(Payloads);
+ m_AccessTimes.swap(AccessTimes);
+ m_MetaDatas.swap(MetaDatas);
+ Reset(m_FreeMetaDatas);
+ m_MemCachedPayloads.swap(MemCachedPayloads);
+ Reset(m_FreeMemCachedPayloads);
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.swap(FirstReferenceIndex);
+ CompactReferences(IndexLock);
+ }
+}
+
+#if ZEN_WITH_TESTS
+void
+ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time)
+{
+ GcClock::Tick TimeTick = Time.time_since_epoch().count();
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ {
+ size_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
+ m_AccessTimes[EntryIndex] = TimeTick;
+ }
+}
+#endif // ZEN_WITH_TESTS
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config)
+: m_Gc(Gc)
+, m_JobQueue(JobQueue)
+, m_RootDir(RootDir)
+, m_Configuration(Config)
+{
+}
+
+ZenCacheDiskLayer::~ZenCacheDiskLayer()
+{
+ try
+ {
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ for (auto& It : m_Buckets)
+ {
+ m_DroppedBuckets.emplace_back(std::move(It.second));
+ }
+ m_Buckets.clear();
+ }
+ // We destroy the buckets without holding a lock since destructor calls GcManager::RemoveGcReferencer which takes an exclusive lock.
+ // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock
+ m_DroppedBuckets.clear();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("~ZenCacheDiskLayer() failed. Reason: '{}'", Ex.what());
+ }
+}
+
+ZenCacheDiskLayer::CacheBucket*
+ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
+{
+ ZEN_TRACE_CPU("Z$::GetOrCreateBucket");
+ const auto BucketName = std::string(InBucket);
+
+ {
+ RwLock::SharedLockScope SharedLock(m_Lock);
+ if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
+ {
+ return It->second.get();
+ }
+ }
+
+ // We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock.
+ // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock
+ std::unique_ptr<CacheBucket> Bucket(
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
+
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
+ {
+ return It->second.get();
+ }
+
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= BucketName;
+ try
+ {
+ if (!Bucket->OpenOrCreate(BucketPath))
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+ return nullptr;
+ }
+ }
+ catch (const std::exception& Err)
+ {
+ ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ throw;
+ }
+
+ CacheBucket* Result = Bucket.get();
+ m_Buckets.emplace(BucketName, std::move(Bucket));
+
+ return Result;
+}
+
+bool
+ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ ZEN_TRACE_CPU("Z$::Get");
+
+ if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
+ {
+ if (Bucket->Get(HashKey, OutValue))
+ {
+ TryMemCacheTrim();
+ return true;
+ }
+ }
+ return false;
+}
+
+void
+ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+{
+ ZEN_TRACE_CPU("Z$::Put");
+
+ if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
+ {
+ Bucket->Put(HashKey, Value, References);
+ TryMemCacheTrim();
+ }
+}
+
+void
+ZenCacheDiskLayer::DiscoverBuckets()
+{
+ ZEN_TRACE_CPU("Z$::DiscoverBuckets");
+
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent);
+
+ // Initialize buckets
+
+ std::vector<std::filesystem::path> BadBucketDirectories;
+ std::vector<std::filesystem::path> FoundBucketDirectories;
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ for (const std::filesystem::path& BucketPath : DirContent.Directories)
+ {
+ const std::string BucketName = PathToUtf8(BucketPath.stem());
+
+ if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
+ {
+ continue;
+ }
+
+ if (IsKnownBadBucketName(BucketName))
+ {
+ BadBucketDirectories.push_back(BucketPath);
+
+ continue;
+ }
+
+ FoundBucketDirectories.push_back(BucketPath);
+ ZEN_INFO("Discovered bucket '{}'", BucketName);
+ }
+
+ for (const std::filesystem::path& BadBucketPath : BadBucketDirectories)
+ {
+ bool IsOk = false;
+
+ try
+ {
+ IsOk = DeleteDirectories(BadBucketPath);
+ }
+ catch (std::exception&)
+ {
+ }
+
+ if (IsOk)
+ {
+ ZEN_INFO("found bad bucket at '{}', deleted contents", BadBucketPath);
+ }
+ else
+ {
+ ZEN_WARN("bad bucket delete failed for '{}'", BadBucketPath);
+ }
+ }
+
+ RwLock SyncLock;
+
+ WorkerThreadPool& Pool = GetLargeWorkerPool();
+ Latch WorkLatch(1);
+ for (auto& BucketPath : FoundBucketDirectories)
+ {
+ WorkLatch.AddCount(1);
+ Pool.ScheduleWork([&]() {
+ auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
+
+ const std::string BucketName = PathToUtf8(BucketPath.stem());
+ try
+ {
+ std::unique_ptr<CacheBucket> NewBucket =
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig);
+
+ CacheBucket* Bucket = nullptr;
+ {
+ RwLock::ExclusiveLockScope __(SyncLock);
+ auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket));
+ Bucket = InsertResult.first->second.get();
+ }
+ ZEN_ASSERT(Bucket);
+
+ if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false))
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+
+ {
+ RwLock::ExclusiveLockScope __(SyncLock);
+ m_Buckets.erase(BucketName);
+ }
+ }
+ }
+ catch (const std::exception& Err)
+ {
+ ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ return;
+ }
+ });
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+}
+
+bool
+ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
+{
+ ZEN_TRACE_CPU("Z$::DropBucket");
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ auto It = m_Buckets.find(std::string(InBucket));
+
+ if (It != m_Buckets.end())
+ {
+ CacheBucket& Bucket = *It->second;
+ m_DroppedBuckets.push_back(std::move(It->second));
+ m_Buckets.erase(It);
+
+ return Bucket.Drop();
+ }
+
+ // Make sure we remove the folder even if we don't know about the bucket
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= std::string(InBucket);
+ return MoveAndDeleteDirectory(BucketPath);
+}
+
+bool
+ZenCacheDiskLayer::Drop()
+{
+ ZEN_TRACE_CPU("Z$::Drop");
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ std::vector<std::unique_ptr<CacheBucket>> Buckets;
+ Buckets.reserve(m_Buckets.size());
+ while (!m_Buckets.empty())
+ {
+ const auto& It = m_Buckets.begin();
+ CacheBucket& Bucket = *It->second;
+ m_DroppedBuckets.push_back(std::move(It->second));
+ m_Buckets.erase(It->first);
+ if (!Bucket.Drop())
+ {
+ return false;
+ }
+ }
+ return MoveAndDeleteDirectory(m_RootDir);
+}
+
+void
+ZenCacheDiskLayer::Flush()
+{
+ ZEN_TRACE_CPU("Z$::Flush");
+
+ std::vector<CacheBucket*> Buckets;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (Buckets.empty())
+ {
+ return;
+ }
+ ZEN_INFO("Flushed {} buckets at '{}' in {}", Buckets.size(), m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ Buckets.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ CacheBucket* Bucket = Kv.second.get();
+ Buckets.push_back(Bucket);
+ }
+ }
+ {
+ WorkerThreadPool& Pool = GetSmallWorkerPool();
+ Latch WorkLatch(1);
+ for (auto& Bucket : Buckets)
+ {
+ WorkLatch.AddCount(1);
+ Pool.ScheduleWork([&]() {
+ auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
+ Bucket->Flush();
+ });
+ }
+ WorkLatch.CountDown();
+ while (!WorkLatch.Wait(1000))
+ {
+ ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir);
+ }
+ }
+}
+
+void
+ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
+{
+ ZEN_TRACE_CPU("Z$::ScrubStorage");
+
+ RwLock::SharedLockScope _(m_Lock);
+ {
+ std::vector<std::future<void>> Results;
+ Results.reserve(m_Buckets.size());
+
+ for (auto& Kv : m_Buckets)
+ {
+#if 1
+ Results.push_back(Ctx.ThreadPool().EnqueueTask(
+ std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
+#else
+ CacheBucket& Bucket = *Kv.second;
+ Bucket.ScrubStorage(Ctx);
+#endif
+ }
+
+ for (auto& Result : Results)
+ {
+ Result.get();
+ }
+ }
+}
+
+void
+ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx)
+{
+ ZEN_TRACE_CPU("Z$::GatherReferences");
+
+ std::vector<CacheBucket*> Buckets;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ Buckets.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Buckets.push_back(Kv.second.get());
+ }
+ }
+ for (CacheBucket* Bucket : Buckets)
+ {
+ Bucket->GatherReferences(GcCtx);
+ }
+}
+
+GcStorageSize
+ZenCacheDiskLayer::StorageSize() const
+{
+ GcStorageSize StorageSize{};
+
+ RwLock::SharedLockScope _(m_Lock);
+ for (auto& Kv : m_Buckets)
+ {
+ GcStorageSize BucketSize = Kv.second->StorageSize();
+ StorageSize.DiskSize += BucketSize.DiskSize;
+ StorageSize.MemorySize += BucketSize.MemorySize;
+ }
+
+ return StorageSize;
+}
+
+ZenCacheDiskLayer::DiskStats
+ZenCacheDiskLayer::Stats() const
+{
+ GcStorageSize Size = StorageSize();
+ ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize};
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ Stats.BucketStats.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Stats.BucketStats.emplace_back(NamedBucketStats{.BucketName = Kv.first, .Stats = Kv.second->Stats()});
+ }
+ }
+ return Stats;
+}
+
+ZenCacheDiskLayer::Info
+ZenCacheDiskLayer::GetInfo() const
+{
+ ZenCacheDiskLayer::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration};
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ Info.BucketNames.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Info.BucketNames.push_back(Kv.first);
+ Info.EntryCount += Kv.second->EntryCount();
+ GcStorageSize BucketSize = Kv.second->StorageSize();
+ Info.StorageSize.DiskSize += BucketSize.DiskSize;
+ Info.StorageSize.MemorySize += BucketSize.MemorySize;
+ }
+ }
+ return Info;
+}
+
+std::optional<ZenCacheDiskLayer::BucketInfo>
+ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
+ {
+ return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()};
+ }
+ return {};
+}
+
+void
+ZenCacheDiskLayer::EnumerateBucketContents(std::string_view Bucket,
+ std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
+ {
+ It->second->EnumerateBucketContents(Fn);
+ }
+}
+
+CacheValueDetails::NamespaceDetails
+ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const
+{
+ CacheValueDetails::NamespaceDetails Details;
+ {
+ RwLock::SharedLockScope IndexLock(m_Lock);
+ if (BucketFilter.empty())
+ {
+ Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1);
+ for (auto& Kv : m_Buckets)
+ {
+ Details.Buckets[Kv.first] = Kv.second->GetValueDetails(IndexLock, ValueFilter);
+ }
+ }
+ else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end())
+ {
+ Details.Buckets[It->first] = It->second->GetValueDetails(IndexLock, ValueFilter);
+ }
+ }
+ return Details;
+}
+
+void
+ZenCacheDiskLayer::MemCacheTrim()
+{
+ ZEN_TRACE_CPU("Z$::MemCacheTrim");
+
+ ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0);
+ ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0);
+ ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0);
+
+ bool Expected = false;
+ if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true))
+ {
+ return;
+ }
+
+ try
+ {
+ m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) {
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]");
+
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
+ uint64_t TrimmedSize = 0;
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([&] {
+ ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
+ NiceBytes(TrimmedSize),
+ NiceBytes(m_TotalMemCachedSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ const GcClock::Tick NowTick = GcClock::TickCount();
+ const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_NextAllowedTrimTick.store(NextTrimTick);
+ m_IsMemCacheTrimming.store(false);
+ });
+
+ const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
+
+ static const size_t UsageSlotCount = 2048;
+ std::vector<uint64_t> UsageSlots;
+ UsageSlots.reserve(UsageSlotCount);
+
+ std::vector<CacheBucket*> Buckets;
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ Buckets.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Buckets.push_back(Kv.second.get());
+ }
+ }
+
+ const GcClock::TimePoint Now = GcClock::Now();
+ {
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess");
+ for (CacheBucket* Bucket : Buckets)
+ {
+ Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots);
+ }
+ }
+
+ uint64_t TotalSize = 0;
+ for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
+ {
+ TotalSize += UsageSlots[Index];
+ if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes)
+ {
+ GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount);
+ TrimmedSize = MemCacheTrim(Buckets, ExpireTime);
+ break;
+ }
+ }
+ });
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what());
+ m_IsMemCacheTrimming.store(false);
+ }
+}
+
+uint64_t
+ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime)
+{
+ if (m_Configuration.MemCacheTargetFootprintBytes == 0)
+ {
+ return 0;
+ }
+ uint64_t TrimmedSize = 0;
+ for (CacheBucket* Bucket : Buckets)
+ {
+ TrimmedSize += Bucket->MemCacheTrim(ExpireTime);
+ }
+ const GcClock::TimePoint Now = GcClock::Now();
+ const GcClock::Tick NowTick = Now.time_since_epoch().count();
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
+ GcClock::Tick LastTrimTick = m_NextAllowedTrimTick;
+ const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
+ return TrimmedSize;
+}
+
+#if ZEN_WITH_TESTS
+void
+ZenCacheDiskLayer::SetAccessTime(std::string_view InBucket, const IoHash& HashKey, GcClock::TimePoint Time)
+{
+ const auto BucketName = std::string(InBucket);
+ CacheBucket* Bucket = nullptr;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ auto It = m_Buckets.find(BucketName);
+
+ if (It != m_Buckets.end())
+ {
+ Bucket = It->second.get();
+ }
+ }
+
+ if (Bucket == nullptr)
+ {
+ return;
+ }
+ Bucket->SetAccessTime(HashKey, Time);
+}
+#endif // ZEN_WITH_TESTS
+
+} // namespace zen
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
new file mode 100644
index 000000000..f92baaf0b
--- /dev/null
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -0,0 +1,2440 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenstore/structuredcachestore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compress.h>
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/string.h>
+#include <zencore/thread.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
+#include <zenstore/scrubcontext.h>
+#include <zenutil/cache/cache.h>
+
+#include <future>
+#include <limits>
+
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+#endif
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/core.h>
+#include <xxhash.h>
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#if ZEN_WITH_TESTS
+# include <zencore/jobqueue.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+# include <zenstore/cidstore.h>
+# include <random>
+# include <unordered_map>
+#endif
+
+namespace zen {
+
+ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config)
+: m_Gc(Gc)
+, m_JobQueue(JobQueue)
+, m_RootDir(RootDir)
+, m_Configuration(Config)
+, m_DiskLayer(m_Gc, m_JobQueue, m_RootDir, m_Configuration.DiskLayerConfig)
+{
+ ZEN_INFO("initializing structured cache at '{}'", m_RootDir);
+ CreateDirectories(m_RootDir);
+
+ m_DiskLayer.DiscoverBuckets();
+
+ m_Gc.AddGcContributor(this);
+ m_Gc.AddGcStorage(this);
+}
+
+ZenCacheNamespace::~ZenCacheNamespace()
+{
+ m_Gc.RemoveGcStorage(this);
+ m_Gc.RemoveGcContributor(this);
+}
+
+bool
+ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ ZEN_TRACE_CPU("Z$::Namespace::Get");
+
+ metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
+
+ bool Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue);
+
+ if (Ok)
+ {
+ ZEN_ASSERT(OutValue.Value.Size());
+ StatsScope.SetBytes(OutValue.Value.Size());
+
+ m_HitCount++;
+ return true;
+ }
+
+ m_MissCount++;
+ return false;
+}
+
+void
+ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+{
+ ZEN_TRACE_CPU("Z$::Namespace::Put");
+
+ metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
+
+ // Store value and index
+
+ ZEN_ASSERT(Value.Value.Size());
+
+ m_DiskLayer.Put(InBucket, HashKey, Value, References);
+ m_WriteCount++;
+}
+
+bool
+ZenCacheNamespace::DropBucket(std::string_view Bucket)
+{
+ ZEN_INFO("dropping bucket '{}'", Bucket);
+
+ const bool Dropped = m_DiskLayer.DropBucket(Bucket);
+
+ ZEN_INFO("bucket '{}' was {}", Bucket, Dropped ? "dropped" : "not found");
+
+ return Dropped;
+}
+
+void
+ZenCacheNamespace::EnumerateBucketContents(std::string_view Bucket,
+ std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const
+{
+ m_DiskLayer.EnumerateBucketContents(Bucket, Fn);
+}
+
+bool
+ZenCacheNamespace::Drop()
+{
+ return m_DiskLayer.Drop();
+}
+
+void
+ZenCacheNamespace::Flush()
+{
+ m_DiskLayer.Flush();
+}
+
+void
+ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx)
+{
+ if (m_LastScrubTime == Ctx.ScrubTimestamp())
+ {
+ return;
+ }
+
+ ZEN_INFO("scrubbing '{}'", m_RootDir);
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
+ m_DiskLayer.ScrubStorage(Ctx);
+}
+
+void
+ZenCacheNamespace::GatherReferences(GcContext& GcCtx)
+{
+ ZEN_TRACE_CPU("Z$::ZenCacheNamespace::GatherReferences");
+
+ Stopwatch Timer;
+ const auto Guard =
+ MakeGuard([&] { ZEN_DEBUG("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
+ m_DiskLayer.GatherReferences(GcCtx);
+}
+
+void
+ZenCacheNamespace::CollectGarbage(GcContext& GcCtx)
+{
+ ZEN_TRACE_CPU("Z$::Namespace::CollectGarbage");
+
+ m_DiskLayer.CollectGarbage(GcCtx);
+}
+
+GcStorageSize
+ZenCacheNamespace::StorageSize() const
+{
+ return m_DiskLayer.StorageSize();
+}
+
+ZenCacheNamespace::Info
+ZenCacheNamespace::GetInfo() const
+{
+ ZenCacheNamespace::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration, .DiskLayerInfo = m_DiskLayer.GetInfo()};
+ std::unordered_set<std::string> BucketNames;
+ for (const std::string& BucketName : Info.DiskLayerInfo.BucketNames)
+ {
+ BucketNames.insert(BucketName);
+ }
+ Info.BucketNames.insert(Info.BucketNames.end(), BucketNames.begin(), BucketNames.end());
+ return Info;
+}
+
+std::optional<ZenCacheNamespace::BucketInfo>
+ZenCacheNamespace::GetBucketInfo(std::string_view Bucket) const
+{
+ std::optional<ZenCacheDiskLayer::BucketInfo> DiskBucketInfo = m_DiskLayer.GetBucketInfo(Bucket);
+ if (!DiskBucketInfo.has_value())
+ {
+ return {};
+ }
+ ZenCacheNamespace::BucketInfo Info = {.DiskLayerInfo = *DiskBucketInfo};
+ return Info;
+}
+
+ZenCacheNamespace::NamespaceStats
+ZenCacheNamespace::Stats()
+{
+ return ZenCacheNamespace::NamespaceStats{.HitCount = m_HitCount,
+ .MissCount = m_MissCount,
+ .WriteCount = m_WriteCount,
+ .PutOps = m_PutOps.Snapshot(),
+ .GetOps = m_GetOps.Snapshot(),
+ .DiskStats = m_DiskLayer.Stats()};
+}
+
+CacheValueDetails::NamespaceDetails
+ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const
+{
+ return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter);
+}
+
+#if ZEN_WITH_TESTS
+void
+ZenCacheNamespace::SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time)
+{
+ m_DiskLayer.SetAccessTime(Bucket, HashKey, Time);
+}
+#endif // ZEN_WITH_TESTS
+
+//////////////////////////// ZenCacheStore
+
+ZEN_DEFINE_LOG_CATEGORY_STATIC(LogCacheActivity, "z$");
+
+static constinit std::string_view UE4DDCNamespaceName = "ue4.ddc";
+
+ZenCacheStore::ZenCacheStore(GcManager& Gc,
+ JobQueue& JobQueue,
+ const std::filesystem::path& BasePath,
+ const Configuration& Configuration,
+ const DiskWriteBlocker* InDiskWriteBlocker)
+: m_DiskWriteBlocker(InDiskWriteBlocker)
+, m_Gc(Gc)
+, m_JobQueue(JobQueue)
+, m_BasePath(BasePath)
+, m_Configuration(Configuration)
+, m_ExitLogging(false)
+{
+ SetLoggingConfig(m_Configuration.Logging);
+ CreateDirectories(m_BasePath);
+
+ ZEN_INFO("initializing cache store at '{}'", m_BasePath);
+
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
+
+ std::vector<std::string> Namespaces;
+ for (const std::filesystem::path& DirPath : DirContent.Directories)
+ {
+ std::string DirName = PathToUtf8(DirPath.filename());
+ if (DirName.starts_with(NamespaceDiskPrefix))
+ {
+ Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length()));
+ continue;
+ }
+ }
+
+ ZEN_INFO("Found {} namespaces in '{}'", Namespaces.size(), m_BasePath);
+
+ if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end())
+ {
+ // default (unspecified) and ue4-ddc namespace points to the same namespace instance
+
+ std::filesystem::path DefaultNamespaceFolder = m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
+ CreateDirectories(DefaultNamespaceFolder);
+ Namespaces.push_back(std::string(UE4DDCNamespaceName));
+ }
+
+ for (const std::string& NamespaceName : Namespaces)
+ {
+ m_Namespaces[NamespaceName] =
+ std::make_unique<ZenCacheNamespace>(Gc,
+ m_JobQueue,
+ m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName),
+ m_Configuration.NamespaceConfig);
+ }
+}
+
+ZenCacheStore::~ZenCacheStore()
+{
+ ZEN_INFO("closing cache store at '{}'", m_BasePath);
+ SetLoggingConfig({.EnableWriteLog = false, .EnableAccessLog = false});
+ m_Namespaces.clear();
+}
+
+void
+ZenCacheStore::LogWorker()
+{
+ SetCurrentThreadName("ZenCacheStore::LogWorker");
+
+ LoggerRef ZCacheLog(logging::Get("z$"));
+
+ auto Log = [&ZCacheLog]() -> LoggerRef { return ZCacheLog; };
+
+ std::vector<AccessLogItem> Items;
+ while (true)
+ {
+ try
+ {
+ m_LogQueueLock.WithExclusiveLock([this, &Items]() { Items.swap(m_LogQueue); });
+ if (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed())
+ {
+ for (const auto& Item : Items)
+ {
+ if (Item.Value.Value)
+ {
+ const bool IsCbObject = Item.Value.Value.GetContentType() == ZenContentType::kCbObject;
+
+ try
+ {
+ const IoHash ObjectHash = IsCbObject ? IoHash::HashBuffer(Item.Value.Value.GetView()) : Item.Value.RawHash;
+ const size_t ObjectSize = IsCbObject ? Item.Value.Value.GetSize() : Item.Value.RawSize;
+
+ ZEN_LOG_INFO(LogCacheActivity,
+ "{} [{}] {}/{}/{} -> {} {} {}",
+ Item.Op,
+ Item.Context,
+ Item.Namespace,
+ Item.Bucket,
+ Item.HashKey,
+ ObjectHash,
+ ObjectSize,
+ ToString(Item.Value.Value.GetContentType()))
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_LOG_INFO(LogCacheActivity,
+ "{} [{}] {}/{}/{} failed: Reason: '{}'",
+ Item.Op,
+ Item.Context,
+ Item.Namespace,
+ Item.Bucket,
+ Item.HashKey,
+ Ex.what())
+ }
+ }
+ else
+ {
+ ZEN_LOG_INFO(LogCacheActivity,
+ "{} [{}] {}/{}/{}",
+ Item.Op,
+ Item.Context,
+ Item.Namespace,
+ Item.Bucket,
+ Item.HashKey);
+ }
+ }
+ }
+ if (!Items.empty())
+ {
+ Items.resize(0);
+ continue;
+ }
+ if (m_ExitLogging)
+ {
+ break;
+ }
+ m_LogEvent.Wait();
+ m_LogEvent.Reset();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("Log writer failed: '{}'", Ex.what());
+ }
+ }
+}
+
+bool
+ZenCacheStore::Get(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ ZenCacheValue& OutValue)
+{
+ // Ad hoc rejection of known bad usage patterns for DDC bucket names
+
+ if (IsKnownBadBucketName(Bucket))
+ {
+ m_RejectedReadCount++;
+ return false;
+ }
+
+ ZEN_TRACE_CPU("Z$::Get");
+
+ metrics::RequestStats::Scope OpScope(m_GetOps, 0);
+
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ bool Result = Store->Get(Bucket, HashKey, OutValue);
+
+ if (m_AccessLogEnabled)
+ {
+ ZEN_TRACE_CPU("Z$::Get::AccessLog");
+ bool Signal = false;
+ m_LogQueueLock.WithExclusiveLock([&]() {
+ Signal = m_LogQueue.empty();
+ m_LogQueue.emplace_back(AccessLogItem{.Op = Result ? "GET HIT " : "GET MISS",
+ .Context = Context,
+ .Namespace = std::string(Namespace),
+ .Bucket = std::string(Bucket),
+ .HashKey = HashKey,
+ .Value = OutValue /*,
+ .Result = Result*/});
+ });
+ if (Signal)
+ {
+ m_LogEvent.Set();
+ }
+ }
+ if (Result)
+ {
+ m_HitCount++;
+ OpScope.SetBytes(OutValue.Value.GetSize());
+ return true;
+ }
+
+ m_MissCount++;
+ return false;
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get [{}], bucket '{}', key '{}'",
+ Context,
+ Namespace,
+ Bucket,
+ HashKey.ToHexString());
+
+ m_MissCount++;
+ return false;
+}
+
+void
+ZenCacheStore::Put(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References)
+{
+ // Ad hoc rejection of known bad usage patterns for DDC bucket names
+
+ if (IsKnownBadBucketName(Bucket))
+ {
+ m_RejectedWriteCount++;
+ return;
+ }
+
+ ZEN_TRACE_CPU("Z$::Put");
+
+ metrics::RequestStats::Scope $(m_PutOps, Value.Value.GetSize());
+
+ if (m_WriteLogEnabled)
+ {
+ ZEN_TRACE_CPU("Z$::Get::WriteLog");
+ bool Signal = false;
+ m_LogQueueLock.WithExclusiveLock([&]() {
+ Signal = m_LogQueue.empty();
+ m_LogQueue.emplace_back(AccessLogItem{.Op = "PUT ",
+ .Context = Context,
+ .Namespace = std::string(Namespace),
+ .Bucket = std::string(Bucket),
+ .HashKey = HashKey,
+ .Value = Value /*,
+ .Result = true*/});
+ });
+ if (Signal)
+ {
+ m_LogEvent.Set();
+ }
+ }
+
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ Store->Put(Bucket, HashKey, Value, References);
+ m_WriteCount++;
+ return;
+ }
+
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'",
+ Context,
+ Namespace,
+ Bucket,
+ HashKey.ToHexString());
+}
+
+bool
+ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket)
+{
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ return Store->DropBucket(Bucket);
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropBucket, bucket '{}'", Namespace, Bucket);
+ return false;
+}
+
+bool
+ZenCacheStore::DropNamespace(std::string_view InNamespace)
+{
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end())
+ {
+ ZenCacheNamespace& Namespace = *It->second;
+ m_DroppedNamespaces.push_back(std::move(It->second));
+ m_Namespaces.erase(It);
+ return Namespace.Drop();
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace);
+ return false;
+}
+
+void
+ZenCacheStore::Flush()
+{
+ ZEN_INFO("flushing cache store at '{}'", m_BasePath);
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); });
+}
+
+void
+ZenCacheStore::ScrubStorage(ScrubContext& Ctx)
+{
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.ScrubStorage(Ctx); });
+}
+
+CacheValueDetails
+ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter,
+ const std::string_view BucketFilter,
+ const std::string_view ValueFilter) const
+{
+ CacheValueDetails Details;
+ if (NamespaceFilter.empty())
+ {
+ IterateNamespaces([&](std::string_view Namespace, ZenCacheNamespace& Store) {
+ Details.Namespaces[std::string(Namespace)] = Store.GetValueDetails(BucketFilter, ValueFilter);
+ });
+ }
+ else if (const ZenCacheNamespace* Store = FindNamespace(NamespaceFilter); Store != nullptr)
+ {
+ Details.Namespaces[std::string(NamespaceFilter)] = Store->GetValueDetails(BucketFilter, ValueFilter);
+ }
+ return Details;
+}
+
+void
+ZenCacheStore::EnumerateBucketContents(std::string_view Namespace,
+ std::string_view Bucket,
+ std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>&& Fn)
+{
+ if (const ZenCacheNamespace* Ns = FindNamespace(Namespace))
+ {
+ Ns->EnumerateBucketContents(Bucket, Fn);
+ }
+}
+
+ZenCacheNamespace*
+ZenCacheStore::GetNamespace(std::string_view Namespace)
+{
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ if (Namespace == DefaultNamespace)
+ {
+ if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ }
+ _.ReleaseNow();
+
+ if (!m_Configuration.AllowAutomaticCreationOfNamespaces)
+ {
+ return nullptr;
+ }
+
+ RwLock::ExclusiveLockScope __(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+
+ auto NewNamespace =
+ m_Namespaces.insert_or_assign(std::string(Namespace),
+ std::make_unique<ZenCacheNamespace>(m_Gc,
+ m_JobQueue,
+ m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace),
+ m_Configuration.NamespaceConfig));
+ return NewNamespace.first->second.get();
+}
+
+const ZenCacheNamespace*
+ZenCacheStore::FindNamespace(std::string_view Namespace) const
+{
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ if (Namespace == DefaultNamespace)
+ {
+ if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ }
+ return nullptr;
+}
+
+std::vector<std::string>
+ZenCacheStore::GetNamespaces()
+{
+ std::vector<std::string> Namespaces;
+
+ IterateNamespaces([&](std::string_view Namespace, ZenCacheNamespace&) { Namespaces.push_back(std::string(Namespace)); });
+
+ return Namespaces;
+}
+
+void
+ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const
+{
+ std::vector<std::pair<std::string, ZenCacheNamespace&>> Namespaces;
+ {
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ Namespaces.reserve(m_Namespaces.size());
+ for (const auto& Entry : m_Namespaces)
+ {
+ if (Entry.first == DefaultNamespace)
+ {
+ continue;
+ }
+ Namespaces.push_back({Entry.first, *Entry.second});
+ }
+ }
+ for (auto& Entry : Namespaces)
+ {
+ Callback(Entry.first, Entry.second);
+ }
+}
+
+GcStorageSize
+ZenCacheStore::StorageSize() const
+{
+ GcStorageSize Size;
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) {
+ GcStorageSize StoreSize = Store.StorageSize();
+ Size.MemorySize += StoreSize.MemorySize;
+ Size.DiskSize += StoreSize.DiskSize;
+ });
+ return Size;
+}
+
+ZenCacheStore::CacheStoreStats
+ZenCacheStore::Stats(bool IncludeNamespaceStats)
+{
+ ZenCacheStore::CacheStoreStats Result{.HitCount = m_HitCount,
+ .MissCount = m_MissCount,
+ .WriteCount = m_WriteCount,
+ .RejectedWriteCount = m_RejectedWriteCount,
+ .RejectedReadCount = m_RejectedReadCount,
+ .PutOps = m_PutOps.Snapshot(),
+ .GetOps = m_GetOps.Snapshot()};
+
+ if (IncludeNamespaceStats)
+ {
+ IterateNamespaces([&](std::string_view NamespaceName, ZenCacheNamespace& Store) {
+ Result.NamespaceStats.emplace_back(NamedNamespaceStats{.NamespaceName = std::string(NamespaceName), .Stats = Store.Stats()});
+ });
+ }
+
+ return Result;
+}
+
+void
+ZenCacheStore::ReportMetrics(StatsMetrics& Statsd)
+{
+ const bool IncludeNamespaceStats = false;
+ const CacheStoreStats Now = Stats(IncludeNamespaceStats);
+ const CacheStoreStats& Old = m_LastReportedMetrics;
+
+ Statsd.Meter("zen.cache_hits", Now.HitCount - Old.HitCount);
+ Statsd.Meter("zen.cache_misses", Now.MissCount - Old.MissCount);
+ Statsd.Meter("zen.cache_writes", Now.WriteCount - Old.WriteCount);
+
+ m_LastReportedMetrics = Now;
+}
+
+void
+ZenCacheStore::SetLoggingConfig(const Configuration::LogConfig& Loggingconfig)
+{
+ if (!Loggingconfig.EnableAccessLog && !Loggingconfig.EnableWriteLog)
+ {
+ m_AccessLogEnabled.store(false);
+ m_WriteLogEnabled.store(false);
+ m_ExitLogging.store(true);
+ m_LogEvent.Set();
+ if (m_AsyncLoggingThread.joinable())
+ {
+ m_AsyncLoggingThread.join();
+ }
+ m_Configuration.Logging = Loggingconfig;
+ return;
+ }
+ if (!m_AccessLogEnabled.load() && !m_WriteLogEnabled.load())
+ {
+ m_AsyncLoggingThread = std::thread(&ZenCacheStore::LogWorker, this);
+ }
+ m_WriteLogEnabled.store(Loggingconfig.EnableWriteLog);
+ m_AccessLogEnabled.store(Loggingconfig.EnableAccessLog);
+ m_Configuration.Logging = Loggingconfig;
+}
+
+ZenCacheStore::Info
+ZenCacheStore::GetInfo() const
+{
+ ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()};
+
+ IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) {
+ Info.NamespaceNames.push_back(std::string(NamespaceName));
+ ZenCacheNamespace::Info NamespaceInfo = Namespace.GetInfo();
+ Info.DiskEntryCount += NamespaceInfo.DiskLayerInfo.EntryCount;
+ });
+
+ return Info;
+}
+
+std::optional<ZenCacheNamespace::Info>
+ZenCacheStore::GetNamespaceInfo(std::string_view NamespaceName)
+{
+ if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace)
+ {
+ return Namespace->GetInfo();
+ }
+ return {};
+}
+
+std::optional<ZenCacheNamespace::BucketInfo>
+ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view BucketName)
+{
+ if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace)
+ {
+ return Namespace->GetBucketInfo(BucketName);
+ }
+ return {};
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+using namespace std::literals;
+
+namespace testutils {
+ IoHash CreateKey(size_t KeyValue) { return IoHash::HashBuffer(&KeyValue, sizeof(size_t)); }
+
+ IoHash ToIoHash(const Oid& Id)
+ {
+ char OIdString[24 + 1];
+ Id.ToString(OIdString);
+ IoHash Key = IoHash::HashBuffer(OIdString, 24);
+ return Key;
+ }
+
+ std::pair<Oid, IoBuffer> CreateBinaryBlob(size_t Size) { return {Oid::NewOid(), CreateRandomBlob(Size)}; }
+
+ std::vector<std::pair<Oid, CompressedBuffer>> CreateCompressedAttachment(CidStore& Store, const std::span<const size_t>& Sizes)
+ {
+ std::vector<std::pair<Oid, CompressedBuffer>> Result;
+ Result.reserve(Sizes.size());
+ for (size_t Size : Sizes)
+ {
+ auto Blob = CreateBinaryBlob(Size);
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Blob.second.Data(), Blob.second.Size()));
+ CHECK(!Store.ContainsChunk(Compressed.DecodeRawHash()));
+ Result.emplace_back(std::pair<Oid, CompressedBuffer>(Blob.first, Compressed));
+ }
+ return Result;
+ }
+
+ std::pair<IoHash, IoBuffer> CreateRecord(std::span<std::pair<Oid, CompressedBuffer>> Attachments)
+ {
+ Oid Id = Oid::NewOid();
+ IoHash Key = ToIoHash(Id);
+ CbObjectWriter Record;
+ Record << "Key"sv << Id;
+
+ for (size_t Idx = 0; auto& Cid : Attachments)
+ {
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid.second.DecodeRawHash());
+ }
+
+ IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+ return {Key, Buffer};
+ }
+
+ struct FalseType
+ {
+ static const bool Enabled = false;
+ };
+ struct TrueType
+ {
+ static const bool Enabled = true;
+ };
+
+} // namespace testutils
+
+TEST_CASE_TEMPLATE("z$.store", ReferenceCaching, testutils::FalseType, testutils::TrueType)
+{
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ const int kIterationCount = 100;
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ const IoHash Key = IoHash::HashBuffer(&i, sizeof i);
+
+ CbObjectWriter Cbo;
+ Cbo << "hey" << i;
+ CbObject Obj = Cbo.Save();
+
+ ZenCacheValue Value;
+ Value.Value = Obj.GetBuffer().AsIoBuffer();
+ Value.Value.SetContentType(ZenContentType::kCbObject);
+
+ Zcs.Put("test_bucket"sv, Key, Value, {});
+ }
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ const IoHash Key = IoHash::HashBuffer(&i, sizeof i);
+
+ ZenCacheValue Value;
+ Zcs.Get("test_bucket"sv, Key, /* out */ Value);
+
+ REQUIRE(Value.Value);
+ CHECK(Value.Value.GetContentType() == ZenContentType::kCbObject);
+ CHECK_EQ(ValidateCompactBinary(Value.Value, CbValidateMode::All), CbValidateError::None);
+ CbObject Obj = LoadCompactBinaryObject(Value.Value);
+ CHECK_EQ(Obj["hey"].AsInt32(), i);
+ }
+}
+
+TEST_CASE_TEMPLATE("z$.size", ReferenceCaching, testutils::FalseType, testutils::TrueType)
+{
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ SUBCASE("mem/disklayer")
+ {
+ const size_t Count = 16;
+ ScopedTemporaryDirectory TempDir;
+
+ GcStorageSize CacheSize;
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ CbObject CacheValue = CreateCacheValue(Zcs.GetConfig().DiskLayerConfig.BucketConfig.MemCacheSizeThreshold - 256);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ std::vector<std::pair<std::string, IoHash>> Keys;
+
+ for (size_t Key = 0; Key < Count; ++Key)
+ {
+ const size_t Bucket = Key % 4;
+ std::string BucketName = fmt::format("test_bucket-{}", Bucket);
+ IoHash Hash = IoHash::HashBuffer(&Key, sizeof(uint32_t));
+ Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {});
+ Keys.push_back({BucketName, Hash});
+ }
+ CacheSize = Zcs.StorageSize();
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
+ CHECK_EQ(0, CacheSize.MemorySize);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue _;
+ Zcs.Get(Key.first, Key.second, _);
+ }
+
+ CacheSize = Zcs.StorageSize();
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize);
+ }
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ const GcStorageSize SerializedSize = Zcs.StorageSize();
+ CHECK_EQ(SerializedSize.MemorySize, 0);
+ CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
+
+ for (size_t Bucket = 0; Bucket < 4; ++Bucket)
+ {
+ Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
+ }
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ CHECK_EQ(0, Zcs.StorageSize().MemorySize);
+ }
+ }
+
+ SUBCASE("disklayer")
+ {
+ const size_t Count = 16;
+ ScopedTemporaryDirectory TempDir;
+
+ GcStorageSize CacheSize;
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ CbObject CacheValue = CreateCacheValue(Zcs.GetConfig().DiskLayerConfig.BucketConfig.MemCacheSizeThreshold + 64);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ for (size_t Key = 0; Key < Count; ++Key)
+ {
+ const size_t Bucket = Key % 4;
+ Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {});
+ }
+
+ CacheSize = Zcs.StorageSize();
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
+ CHECK_EQ(0, CacheSize.MemorySize);
+ }
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ const GcStorageSize SerializedSize = Zcs.StorageSize();
+ CHECK_EQ(SerializedSize.MemorySize, 0);
+ CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
+
+ for (size_t Bucket = 0; Bucket < 4; ++Bucket)
+ {
+ Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
+ }
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ }
+ }
+}
+
+TEST_CASE_TEMPLATE("z$.gc", ReferenceCaching, testutils::FalseType, testutils::TrueType)
+{
+ using namespace testutils;
+
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
+ SUBCASE("gather references does NOT add references for expired cache entries")
+ {
+ ScopedTemporaryDirectory TempDir;
+ std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)};
+
+ const auto CollectAndFilter = [](GcManager& Gc,
+ GcClock::TimePoint Time,
+ GcClock::Duration MaxDuration,
+ std::span<const IoHash> Cids,
+ std::vector<IoHash>& OutKeep) {
+ GcContext GcCtx(Time - MaxDuration, Time - MaxDuration);
+ Gc.CollectGarbage(GcCtx);
+ OutKeep.clear();
+ GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); });
+ };
+
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ const auto Bucket = "teardrinker"sv;
+
+ // Create a cache record
+ const IoHash Key = CreateKey(42);
+ CbObjectWriter Record;
+ Record << "Key"sv
+ << "SomeRecord"sv;
+
+ for (size_t Idx = 0; auto& Cid : Cids)
+ {
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid);
+ }
+
+ IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ Zcs.Put(Bucket, Key, {.Value = Buffer}, Cids);
+
+ std::vector<IoHash> Keep;
+
+ // Collect garbage with 1 hour max cache duration
+ {
+ CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(Cids.size(), Keep.size());
+ }
+
+ // Move forward in time
+ {
+ CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(0, Keep.size());
+ }
+ }
+
+ // Expect timestamps to be serialized
+ {
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ std::vector<IoHash> Keep;
+
+ // Collect garbage with 1 hour max cache duration
+ {
+ CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(3, Keep.size());
+ }
+
+ // Move forward in time
+ {
+ CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
+ CHECK_EQ(0, Keep.size());
+ }
+ }
+ }
+
+ SUBCASE("gc removes standalone values")
+ {
+ ScopedTemporaryDirectory TempDir;
+ GcManager Gc;
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ const auto Bucket = "fortysixandtwo"sv;
+ const GcClock::TimePoint CurrentTime = GcClock::Now();
+
+ std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
+
+ for (const auto& Key : Keys)
+ {
+ IoBuffer Value = CreateRandomBlob(128 << 10);
+ Zcs.Put(Bucket, Key, {.Value = Value}, {});
+ }
+
+ {
+ GcContext GcCtx(CurrentTime - std::chrono::hours(46), CurrentTime - std::chrono::hours(46));
+
+ Gc.CollectGarbage(GcCtx);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(Exists);
+ }
+ }
+
+ // Move forward in time and collect again
+ {
+ GcContext GcCtx(CurrentTime + std::chrono::minutes(2), CurrentTime + std::chrono::minutes(2));
+ Gc.CollectGarbage(GcCtx);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(!Exists);
+ }
+
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ }
+ }
+
+ SUBCASE("gc removes small objects")
+ {
+ ScopedTemporaryDirectory TempDir;
+ GcManager Gc;
+ {
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ const auto Bucket = "rightintwo"sv;
+
+ std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
+
+ for (const auto& Key : Keys)
+ {
+ IoBuffer Value = CreateRandomBlob(128);
+ Zcs.Put(Bucket, Key, {.Value = Value}, {});
+ }
+
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(2), GcClock::Now() - std::chrono::hours(2));
+ GcCtx.CollectSmallObjects(true);
+
+ Gc.CollectGarbage(GcCtx);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(Exists);
+ }
+ }
+
+ // Move forward in time and collect again
+ {
+ GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2), GcClock::Now() + std::chrono::minutes(2));
+ GcCtx.CollectSmallObjects(true);
+
+ Zcs.Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(!Exists);
+ }
+ // GC could not remove the currently written block so size will not be zero
+ CHECK_NE(0, Zcs.StorageSize().DiskSize);
+ }
+ }
+ {
+ // Unreferenced blocks will be pruned so size should now be zero
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ }
+ }
+}
+
+TEST_CASE_TEMPLATE("z$.threadedinsert", ReferenceCaching, testutils::FalseType, testutils::TrueType) // * doctest::skip(true))
+{
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 8192;
+
+ struct Chunk
+ {
+ std::string Bucket;
+ IoBuffer Buffer;
+ };
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks;
+ Chunks.reserve(kChunkCount);
+
+ const std::string Bucket1 = "rightinone";
+ const std::string Bucket2 = "rightintwo";
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ while (true)
+ {
+ IoBuffer Chunk = CreateRandomBlob(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ break;
+ }
+ while (true)
+ {
+ IoBuffer Chunk = CreateRandomBlob(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ break;
+ }
+ }
+
+ CreateDirectories(TempDir.Path());
+
+ WorkerThreadPool ThreadPool(4);
+ GcManager Gc;
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path(),
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+
+ const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
+ CHECK_LE(kChunkSize * Chunks.size(), TotalSize);
+
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
+ std::string Bucket = Chunk.second.Bucket;
+ IoHash ChunkHash = Chunk.first;
+ ZenCacheValue CacheValue;
+
+ CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
+ IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+ std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes;
+ GcChunkHashes.reserve(Chunks.size());
+ for (const auto& Chunk : Chunks)
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ {
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks;
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ {
+ IoBuffer Chunk = CreateRandomBlob(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ }
+ {
+ IoBuffer Chunk = CreateRandomBlob(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ }
+ }
+
+ std::atomic<size_t> WorkCompleted = 0;
+ std::atomic_uint32_t AddedChunkCount = 0;
+ for (const auto& Chunk : NewChunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ }
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (AddedChunkCount.load() < NewChunks.size())
+ {
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ }
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.AddRetainedCids(KeepHashes);
+ Zcs.CollectGarbage(GcCtx);
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+
+ while (WorkCompleted < NewChunks.size() + Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ {
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ }
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.AddRetainedCids(KeepHashes);
+ Zcs.CollectGarbage(GcCtx);
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+ }
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : GcChunkHashes)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < GcChunkHashes.size())
+ {
+ Sleep(1);
+ }
+ }
+ }
+}
+
+TEST_CASE("z$.namespaces")
+{
+ using namespace testutils;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
+ ScopedTemporaryDirectory TempDir;
+ CreateDirectories(TempDir.Path());
+
+ const CacheRequestContext Context;
+ IoHash Key1;
+ IoHash Key2;
+ {
+ GcManager Gc;
+ ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = false}, nullptr);
+ const auto Bucket = "teardrinker"sv;
+ const auto CustomNamespace = "mynamespace"sv;
+
+ // Create a cache record
+ Key1 = CreateKey(42);
+ CbObject CacheValue = CreateCacheValue(4096);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {});
+
+ ZenCacheValue GetValue;
+ CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
+ CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
+
+ // This should just be dropped as we don't allow creating of namespaces on the fly
+ Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {});
+ CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
+ }
+
+ {
+ GcManager Gc;
+ ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr);
+ const auto Bucket = "teardrinker"sv;
+ const auto CustomNamespace = "mynamespace"sv;
+
+ Key2 = CreateKey(43);
+ CbObject CacheValue2 = CreateCacheValue(4096);
+
+ IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
+ Buffer2.SetContentType(ZenContentType::kCbObject);
+ ZenCacheValue PutValue2 = {.Value = Buffer2};
+ Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {});
+
+ ZenCacheValue GetValue;
+ CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
+ CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
+ CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
+ CHECK(Zcs.Get(Context, CustomNamespace, Bucket, Key2, GetValue));
+ }
+}
+
+TEST_CASE("z$.drop.bucket")
+{
+ using namespace testutils;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
+ ScopedTemporaryDirectory TempDir;
+ CreateDirectories(TempDir.Path());
+
+ const CacheRequestContext Context;
+ IoHash Key1;
+ IoHash Key2;
+
+ auto PutValue = [&CreateCacheValue,
+ &Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
+ // Create a cache record
+ IoHash Key = CreateKey(KeyIndex);
+ CbObject CacheValue = CreateCacheValue(Size);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ return Key;
+ };
+ auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
+ ZenCacheValue GetValue;
+ Zcs.Get(Context, Namespace, Bucket, Key, GetValue);
+ return GetValue;
+ };
+ WorkerThreadPool Workers(1);
+ {
+ GcManager Gc;
+ ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr);
+ const auto Bucket = "teardrinker"sv;
+ const auto Namespace = "mynamespace"sv;
+
+ Key1 = PutValue(Zcs, Namespace, Bucket, 42, 4096);
+ Key2 = PutValue(Zcs, Namespace, Bucket, 43, 2048);
+
+ ZenCacheValue Value1 = GetValue(Zcs, Namespace, Bucket, Key1);
+ CHECK(Value1.Value);
+
+ std::atomic_bool WorkComplete = false;
+ Workers.ScheduleWork([&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ WorkComplete = true;
+ });
+ // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
+ // Our DropBucket execution blocks any incoming request from completing until we are done with the drop
+ CHECK(Zcs.DropBucket(Namespace, Bucket));
+ while (!WorkComplete)
+ {
+ zen::Sleep(1);
+ }
+
+ // Entire bucket should be dropped, but doing a request should will re-create the namespace but it must still be empty
+ Value1 = GetValue(Zcs, Namespace, Bucket, Key1);
+ CHECK(!Value1.Value);
+ ZenCacheValue Value2 = GetValue(Zcs, Namespace, Bucket, Key2);
+ CHECK(!Value2.Value);
+ }
+}
+
+TEST_CASE("z$.drop.namespace")
+{
+ using namespace testutils;
+
+ const CacheRequestContext Context;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
+ ScopedTemporaryDirectory TempDir;
+ CreateDirectories(TempDir.Path());
+
+ auto PutValue = [&CreateCacheValue,
+ &Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
+ // Create a cache record
+ IoHash Key = CreateKey(KeyIndex);
+ CbObject CacheValue = CreateCacheValue(Size);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ return Key;
+ };
+ auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
+ ZenCacheValue GetValue;
+ Zcs.Get(Context, Namespace, Bucket, Key, GetValue);
+ return GetValue;
+ };
+ WorkerThreadPool Workers(1);
+ {
+ GcManager Gc;
+ ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr);
+ const auto Bucket1 = "teardrinker1"sv;
+ const auto Bucket2 = "teardrinker2"sv;
+ const auto Namespace1 = "mynamespace1"sv;
+ const auto Namespace2 = "mynamespace2"sv;
+
+ IoHash Key1 = PutValue(Zcs, Namespace1, Bucket1, 42, 4096);
+ IoHash Key2 = PutValue(Zcs, Namespace1, Bucket2, 43, 2048);
+ IoHash Key3 = PutValue(Zcs, Namespace2, Bucket1, 44, 4096);
+ IoHash Key4 = PutValue(Zcs, Namespace2, Bucket2, 45, 2048);
+
+ ZenCacheValue Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1);
+ CHECK(Value1.Value);
+ ZenCacheValue Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2);
+ CHECK(Value2.Value);
+ ZenCacheValue Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3);
+ CHECK(Value3.Value);
+ ZenCacheValue Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4);
+ CHECK(Value4.Value);
+
+ std::atomic_bool WorkComplete = false;
+ Workers.ScheduleWork([&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ Value2.Value = IoBuffer{};
+ Value3.Value = IoBuffer{};
+ Value4.Value = IoBuffer{};
+ WorkComplete = true;
+ });
+ // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
+ // Our DropBucket execution blocks any incoming request from completing until we are done with the drop
+ CHECK(Zcs.DropNamespace(Namespace1));
+ while (!WorkComplete)
+ {
+ zen::Sleep(1);
+ }
+
+ // Entire namespace should be dropped, but doing a request should will re-create the namespace but it must still be empty
+ Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1);
+ CHECK(!Value1.Value);
+ Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2);
+ CHECK(!Value2.Value);
+ Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3);
+ CHECK(Value3.Value);
+ Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4);
+ CHECK(Value4.Value);
+ }
+}
+
+TEST_CASE_TEMPLATE("z$.blocked.disklayer.put", ReferenceCaching, testutils::FalseType, testutils::TrueType)
+{
+ ScopedTemporaryDirectory TempDir;
+
+ GcStorageSize CacheSize;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size, Size & 0xff);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ GcManager Gc;
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ CbObject CacheValue = CreateCacheValue(64 * 1024 + 64);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ size_t Key = Buffer.Size();
+ IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t));
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {});
+
+ ZenCacheValue BufferGet;
+ CHECK(Zcs.Get("test_bucket", HashKey, BufferGet));
+
+ CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1);
+ IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
+ Buffer2.SetContentType(ZenContentType::kCbObject);
+
+ // We should be able to overwrite even if the file is open for read
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {});
+
+ MemoryView OldView = BufferGet.Value.GetView();
+
+ ZenCacheValue BufferGet2;
+ CHECK(Zcs.Get("test_bucket", HashKey, BufferGet2));
+ MemoryView NewView = BufferGet2.Value.GetView();
+
+ // Make sure file openend for read before we wrote it still have old data
+ CHECK(OldView.GetSize() == Buffer.GetSize());
+ CHECK(memcmp(OldView.GetData(), Buffer.GetData(), OldView.GetSize()) == 0);
+
+ // Make sure we get the new data when reading after we write new data
+ CHECK(NewView.GetSize() == Buffer2.GetSize());
+ CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0);
+}
+
+TEST_CASE_TEMPLATE("z$.scrub", ReferenceCaching, testutils::FalseType, testutils::TrueType)
+{
+ ScopedTemporaryDirectory TempDir;
+
+ using namespace testutils;
+
+ struct CacheRecord
+ {
+ IoBuffer Record;
+ std::vector<CompressedBuffer> Attachments;
+ };
+
+ auto CreateCacheRecord = [](bool Structured, std::string_view Bucket, const IoHash& Key, const std::vector<size_t>& AttachmentSizes) {
+ CacheRecord Result;
+ if (Structured)
+ {
+ Result.Attachments.resize(AttachmentSizes.size());
+ CbObjectWriter Record;
+ Record.BeginObject("Key"sv);
+ {
+ Record << "Bucket"sv << Bucket;
+ Record << "Hash"sv << Key;
+ }
+ Record.EndObject();
+ for (size_t Index = 0; Index < AttachmentSizes.size(); Index++)
+ {
+ IoBuffer AttachmentData = CreateRandomBlob(AttachmentSizes[Index]);
+ CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData));
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), CompressedAttachmentData.DecodeRawHash());
+ Result.Attachments[Index] = CompressedAttachmentData;
+ }
+ Result.Record = Record.Save().GetBuffer().AsIoBuffer();
+ Result.Record.SetContentType(ZenContentType::kCbObject);
+ }
+ else
+ {
+ std::string RecordData = fmt::format("{}:{}", Bucket, Key.ToHexString());
+ size_t TotalSize = RecordData.length() + 1;
+ for (size_t AttachmentSize : AttachmentSizes)
+ {
+ TotalSize += AttachmentSize;
+ }
+ Result.Record = IoBuffer(TotalSize);
+ char* DataPtr = (char*)Result.Record.MutableData();
+ memcpy(DataPtr, RecordData.c_str(), RecordData.length() + 1);
+ DataPtr += RecordData.length() + 1;
+ for (size_t AttachmentSize : AttachmentSizes)
+ {
+ IoBuffer AttachmentData = CreateRandomBlob(AttachmentSize);
+ memcpy(DataPtr, AttachmentData.GetData(), AttachmentData.GetSize());
+ DataPtr += AttachmentData.GetSize();
+ }
+ }
+ return Result;
+ };
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ auto CreateRecords =
+ [&](bool IsStructured, std::string_view BucketName, const std::vector<IoHash>& Cids, const std::vector<size_t>& AttachmentSizes) {
+ for (const IoHash& Cid : Cids)
+ {
+ CacheRecord Record = CreateCacheRecord(IsStructured, BucketName, Cid, AttachmentSizes);
+ std::vector<IoHash> AttachmentHashes;
+ for (const CompressedBuffer& Attachment : Record.Attachments)
+ {
+ AttachmentHashes.push_back(Attachment.DecodeRawHash());
+ CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), AttachmentHashes.back());
+ }
+ Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes);
+ }
+ };
+
+ std::vector<size_t> AttachmentSizes = {16, 1000, 2000, 4000, 8000, 64000, 80000};
+
+ std::vector<IoHash> UnstructuredCids{CreateKey(4), CreateKey(5), CreateKey(6)};
+ CreateRecords(false, "mybucket"sv, UnstructuredCids, AttachmentSizes);
+
+ std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)};
+ CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes);
+
+ WorkerThreadPool ThreadPool{1};
+ ScrubContext ScrubCtx{ThreadPool};
+ Zcs.ScrubStorage(ScrubCtx);
+ CidStore.ScrubStorage(ScrubCtx);
+ CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());
+ CHECK(ScrubCtx.BadCids().GetSize() == 0);
+}
+
+TEST_CASE_TEMPLATE("z$.newgc.basics", ReferenceCaching, testutils::FalseType, testutils::TrueType)
+{
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
+ struct CacheEntry
+ {
+ IoBuffer Data;
+ std::vector<std::pair<Oid, CompressedBuffer>> Attachments;
+ };
+
+ std::unordered_map<IoHash, CacheEntry> CacheEntries;
+
+ auto CreateCacheRecord =
+ [&](ZenCacheNamespace& Zcs, CidStore& CidStore, std::string_view Bucket, std::span<std::pair<Oid, CompressedBuffer>> Attachments) {
+ std::vector<IoHash> AttachmentKeys;
+ for (const auto& Attachment : Attachments)
+ {
+ AttachmentKeys.push_back(Attachment.second.DecodeRawHash());
+ }
+ auto Record = CreateRecord(Attachments);
+ Zcs.Put(Bucket,
+ Record.first,
+ {.Value = Record.second,
+ .RawSize = Record.second.GetSize(),
+ .RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())},
+ AttachmentKeys);
+ for (const auto& Attachment : Attachments)
+ {
+ CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash());
+ }
+ CacheEntries.insert({Record.first, CacheEntry{.Data = Record.second, .Attachments = {Attachments.begin(), Attachments.end()}}});
+ return Record.first;
+ };
+ auto CreateCacheValue = [&](ZenCacheNamespace& Zcs, std::string_view Bucket, size_t Size) {
+ std::pair<Oid, IoBuffer> CacheValue = CreateBinaryBlob(Size);
+ IoHash Key = ToIoHash(CacheValue.first);
+ Zcs.Put(Bucket,
+ Key,
+ {.Value = CacheValue.second,
+ .RawSize = CacheValue.second.GetSize(),
+ .RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())},
+ {});
+ CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}});
+ return Key;
+ };
+
+ auto ValidateCacheEntry = [&](ZenCacheNamespace& Zcs,
+ CidStore& CidStore,
+ std::string_view Bucket,
+ const IoHash& Key,
+ bool ExpectCacheEntry,
+ bool ExpectAttachments) {
+ const CacheEntry& Entry = CacheEntries[Key];
+ ZenCacheValue Value;
+ bool CacheExists = Zcs.Get(Bucket, Key, Value);
+ if (ExpectCacheEntry)
+ {
+ if (!CacheExists)
+ {
+ return false;
+ }
+ if (Value.Value.GetSize() != Entry.Data.GetSize())
+ {
+ return false;
+ }
+ if (!Value.Value.GetView().EqualBytes(Entry.Data.GetView()))
+ {
+ return false;
+ }
+ }
+ else if (CacheExists)
+ {
+ return false;
+ }
+
+ if (ExpectAttachments)
+ {
+ for (const auto& Attachment : Entry.Attachments)
+ {
+ IoHash AttachmentHash = Attachment.second.DecodeRawHash();
+ IoBuffer StoredData = CidStore.FindChunkByCid(AttachmentHash);
+ if (!StoredData)
+ {
+ return false;
+ }
+ if (!StoredData.GetView().EqualBytes(Attachment.second.GetCompressed().Flatten().GetView()))
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ for (const auto& Attachment : Entry.Attachments)
+ {
+ IoHash AttachmentHash = Attachment.second.DecodeRawHash();
+ if (CidStore.ContainsChunk(AttachmentHash))
+ {
+ return false;
+ }
+ }
+ }
+ return true;
+ };
+
+ std::vector<IoHash> CacheRecords;
+ std::vector<IoHash> UnstructuredCacheValues;
+
+ const auto TearDrinkerBucket = "teardrinker"sv;
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+
+ // Create some basic data
+ {
+ // Structured record with attachments
+ auto Attachments1 = CreateCompressedAttachment(CidStore, std::vector<size_t>{77, 1024 * 1024 * 2, 99, 1024 * 1024 * 2 + 87});
+ CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments1));
+
+ // Structured record with reuse of attachments
+ auto Attachments2 = CreateCompressedAttachment(CidStore, std::vector<size_t>{971});
+ Attachments2.push_back(Attachments1[0]);
+ Attachments2.push_back(Attachments1[1]);
+ CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments2));
+ }
+
+ CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, {}));
+
+ {
+ // Unstructured cache values
+ UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 84));
+ UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 591));
+ UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 1024 * 1024 * 3 + 7));
+ UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 71));
+ }
+ }
+
+ SUBCASE("expire nothing")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() - std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.CompactStoresStatSum.RemovedDisk);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all large objects, delete nothing")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.CompactStoresStatSum.RemovedDisk);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all object, delete nothing")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.CompactStoresStatSum.RemovedDisk);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all large objects, skip cid")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = true,
+ .SkipCidDelete = true,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_EQ(CacheEntries[UnstructuredCacheValues[2]].Data.GetSize(), Result.CompactStoresStatSum.RemovedDisk);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all objects, skip cid")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = true,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_GE(Result.CompactStoresStatSum.RemovedDisk, 0);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, true));
+ }
+ SUBCASE("expire all large objects")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(1u,
+ Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount); // Only one cache value is pruned/deleted as that is the only
+ // large item in the cache (all other large items as in cas)
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u,
+ Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats
+ .FoundCount); // We won't remove any references since all referencers are small which retains all references
+ CHECK_EQ(0u,
+ Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats
+ .DeletedCount); // We won't remove any references since all referencers are small which retains all references
+ CHECK_EQ(CacheEntries[UnstructuredCacheValues[2]].Data.GetSize(), Result.CompactStoresStatSum.RemovedDisk);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all objects")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_GT(Result.CompactStoresStatSum.RemovedDisk, 0);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false));
+ }
+
+ SUBCASE("keep 1 cache record, skip cid")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::hours(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = true,
+ .Verbose = true,
+ .CompactBlockUsageThresholdPercent = 100});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(6u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(6u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ uint64_t MinExpectedRemoveSize = CacheEntries[UnstructuredCacheValues[2]].Data.GetSize();
+ CHECK_LT(MinExpectedRemoveSize, Result.CompactStoresStatSum.RemovedDisk);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false));
+ }
+
+ SUBCASE("keep 2 cache records")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[1], GcClock::Now() + std::chrono::hours(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_GT(Result.CompactStoresStatSum.RemovedDisk, 0);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false));
+ }
+
+ SUBCASE("keep 3 cache value")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::hours(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false,
+ .Verbose = true});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_GT(Result.CompactStoresStatSum.RemovedDisk, 0);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+
+ SUBCASE("keep 3 cache value, skip cid")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ // Prime so we can check GC of memory layer
+ ZenCacheValue Dummy;
+ Zcs.Get(TearDrinkerBucket, CacheRecords[0], Dummy);
+ Zcs.Get(TearDrinkerBucket, CacheRecords[1], Dummy);
+ Zcs.Get(TearDrinkerBucket, CacheRecords[2], Dummy);
+ Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[0], Dummy);
+ Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[1], Dummy);
+ Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[2], Dummy);
+ Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[3], Dummy);
+
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::hours(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = true,
+ .Verbose = true,
+ .CompactBlockUsageThresholdPercent = 100});
+ CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_GT(Result.CompactStoresStatSum.RemovedDisk, 0);
+ uint64_t MemoryClean = CacheEntries[CacheRecords[0]].Data.GetSize() + CacheEntries[CacheRecords[1]].Data.GetSize() +
+ CacheEntries[CacheRecords[2]].Data.GetSize() + CacheEntries[UnstructuredCacheValues[0]].Data.GetSize();
+ CHECK_EQ(MemoryClean, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+
+ SUBCASE("leave write block")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
+ CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
+
+ auto Attachments =
+ CreateCompressedAttachment(CidStore, std::vector<size_t>{177, 1024 * 1024 * 2 + 31, 8999, 1024 * 1024 * 2 + 187});
+ IoHash CacheRecord = CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments);
+ {
+ // Do get so it ends up in memcache
+ ZenCacheValue _;
+ Zcs.Get(TearDrinkerBucket, CacheRecord, _);
+ }
+
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecord, GcClock::Now() - std::chrono::hours(2));
+
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[1], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[2], GcClock::Now() + std::chrono::hours(2));
+
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[0], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::hours(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::hours(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false,
+ .Verbose = true});
+ // Write block can't be compacted so Compacted will be less than Deleted
+ CHECK_EQ(8u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(9u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(4u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
+ CHECK_EQ(4u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK_EQ(Attachments[1].second.GetCompressed().GetSize() + Attachments[3].second.GetCompressed().GetSize(),
+ Result.CompactStoresStatSum.RemovedDisk);
+ uint64_t MemoryClean = CacheEntries[CacheRecord].Data.GetSize();
+ CHECK_EQ(MemoryClean, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
+ }
+}
+
+#endif
+
+void
+z$_forcelink()
+{
+}
+
+} // namespace zen