aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-19 21:49:55 +0100
committerGitHub <[email protected]>2023-12-19 21:49:55 +0100
commit8a90a40b4517d198855c1e52740a7e7fb21ecc20 (patch)
treeffd8524bcebf8841b69725934f31d438a03e7746 /src/zenserver/cache
parent0.2.38 (diff)
downloadzen-8a90a40b4517d198855c1e52740a7e7fb21ecc20.tar.xz
zen-8a90a40b4517d198855c1e52740a7e7fb21ecc20.zip
move cachedisklayer and structuredcachestore into zenstore (#624)
Diffstat (limited to 'src/zenserver/cache')
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp4381
-rw-r--r--src/zenserver/cache/cachedisklayer.h482
-rw-r--r--src/zenserver/cache/cacheshared.h100
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp2
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp2456
-rw-r--r--src/zenserver/cache/structuredcachestore.h272
6 files changed, 1 insertions, 7692 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
deleted file mode 100644
index 8d046105d..000000000
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ /dev/null
@@ -1,4381 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "cachedisklayer.h"
-
-#include <zencore/compactbinary.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/compress.h>
-#include <zencore/except.h>
-#include <zencore/fmtutils.h>
-#include <zencore/jobqueue.h>
-#include <zencore/logging.h>
-#include <zencore/scopeguard.h>
-#include <zencore/trace.h>
-#include <zencore/workthreadpool.h>
-#include <zencore/xxhash.h>
-#include <zenutil/workerpools.h>
-
-#include <future>
-
-//////////////////////////////////////////////////////////////////////////
-
-namespace zen {
-namespace {
-
-#pragma pack(push)
-#pragma pack(1)
-
- struct CacheBucketIndexHeader
- {
- static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
- static constexpr uint32_t Version2 = 2;
- static constexpr uint32_t CurrentVersion = Version2;
-
- uint32_t Magic = ExpectedMagic;
- uint32_t Version = CurrentVersion;
- uint64_t EntryCount = 0;
- uint64_t LogPosition = 0;
- uint32_t PayloadAlignment = 0;
- uint32_t Checksum = 0;
-
- static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header)
- {
- return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
- }
-
- bool IsValid() const
- {
- if (Magic != ExpectedMagic)
- {
- return false;
- }
-
- if (Checksum != ComputeChecksum(*this))
- {
- return false;
- }
-
- if (PayloadAlignment == 0)
- {
- return false;
- }
-
- return true;
- }
- };
-
- static_assert(sizeof(CacheBucketIndexHeader) == 32);
-
- struct BucketMetaHeader
- {
- static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta';
- static constexpr uint32_t Version1 = 1;
- static constexpr uint32_t CurrentVersion = Version1;
-
- uint32_t Magic = ExpectedMagic;
- uint32_t Version = CurrentVersion;
- uint64_t EntryCount = 0;
- uint64_t LogPosition = 0;
- uint32_t Padding = 0;
- uint32_t Checksum = 0;
-
- static uint32_t ComputeChecksum(const BucketMetaHeader& Header)
- {
- return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA);
- }
-
- bool IsValid() const
- {
- if (Magic != ExpectedMagic)
- {
- return false;
- }
-
- if (Checksum != ComputeChecksum(*this))
- {
- return false;
- }
-
- if (Padding != 0)
- {
- return false;
- }
-
- return true;
- }
- };
-
- static_assert(sizeof(BucketMetaHeader) == 32);
-
-#pragma pack(pop)
-
- //////////////////////////////////////////////////////////////////////////
-
- template<typename T>
- void Reset(T& V)
- {
- T Tmp;
- V.swap(Tmp);
- }
-
- const char* IndexExtension = ".uidx";
- const char* LogExtension = ".slog";
- const char* MetaExtension = ".meta";
-
- std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
- {
- return BucketDir / (BucketName + IndexExtension);
- }
-
- std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
- {
- return BucketDir / (BucketName + MetaExtension);
- }
-
- std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
- {
- return BucketDir / (BucketName + LogExtension);
- }
-
- std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
- {
- ZEN_UNUSED(BucketName);
- return BucketDir / "zen_manifest";
- }
-
- bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason)
- {
- if (Entry.Key == IoHash::Zero)
- {
- OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Location.Reserved != 0)
- {
- OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Location.GetFlags() &
- ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed))
- {
- OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
- {
- return true;
- }
- if (Entry.Location.Reserved != 0)
- {
- OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString());
- return false;
- }
- uint64_t Size = Entry.Location.Size();
- if (Size == 0)
- {
- OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
- return false;
- }
- return true;
- }
-
- bool MoveAndDeleteDirectory(const std::filesystem::path& Dir)
- {
- int DropIndex = 0;
- do
- {
- if (!std::filesystem::exists(Dir))
- {
- return false;
- }
-
- std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex);
- std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName;
- if (std::filesystem::exists(DroppedBucketPath))
- {
- DropIndex++;
- continue;
- }
-
- std::error_code Ec;
- std::filesystem::rename(Dir, DroppedBucketPath, Ec);
- if (!Ec)
- {
- DeleteDirectories(DroppedBucketPath);
- return true;
- }
- // TODO: Do we need to bail at some point?
- zen::Sleep(100);
- } while (true);
- }
-} // namespace
-
-namespace fs = std::filesystem;
-using namespace std::literals;
-
-class BucketManifestSerializer
-{
- using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex;
- using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData;
-
- using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex;
- using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload;
-
-public:
- // We use this to indicate if a on disk bucket needs wiping
- // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references
- // to block items.
- // See: https://github.com/EpicGames/zen/pull/299
- static inline const uint32_t CurrentDiskBucketVersion = 1;
-
- bool Open(std::filesystem::path ManifestPath)
- {
- Manifest = LoadCompactBinaryObject(ManifestPath);
- return !!Manifest;
- }
-
- Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); }
-
- bool IsCurrentVersion(uint32_t& OutVersion) const
- {
- OutVersion = Manifest["Version"sv].AsUInt32(0);
- return OutVersion == CurrentDiskBucketVersion;
- }
-
- void ParseManifest(RwLock::ExclusiveLockScope& BucketLock,
- ZenCacheDiskLayer::CacheBucket& Bucket,
- std::filesystem::path ManifestPath,
- ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
- std::vector<AccessTime>& AccessTimes,
- std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads);
-
- Oid GenerateNewManifest(std::filesystem::path ManifestPath);
-
- IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount);
- uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); }
- void WriteSidecarFile(RwLock::SharedLockScope& BucketLock,
- const std::filesystem::path& SidecarPath,
- uint64_t SnapshotLogPosition,
- const ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
- const std::vector<AccessTime>& AccessTimes,
- const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
- const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas);
- bool ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock,
- ZenCacheDiskLayer::CacheBucket& Bucket,
- std::filesystem::path SidecarPath,
- ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
- std::vector<AccessTime>& AccessTimes,
- std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads);
-
- IoBuffer MakeManifest(const Oid& BucketId,
- ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
- std::vector<AccessTime>&& AccessTimes,
- std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>&& Payloads,
- std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>&& MetaDatas);
-
- CbObject Manifest;
-
-private:
- CbObject LoadCompactBinaryObject(const fs::path& Path)
- {
- FileContents Result = ReadFile(Path);
-
- if (!Result.ErrorCode)
- {
- IoBuffer Buffer = Result.Flatten();
- if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
- {
- return zen::LoadCompactBinaryObject(Buffer);
- }
- }
-
- return CbObject();
- }
-
- uint64_t m_ManifestEntryCount = 0;
-
- struct ManifestData
- {
- IoHash Key; // 20
- AccessTime Timestamp; // 4
- IoHash RawHash; // 20
- uint32_t Padding_0; // 4
- size_t RawSize; // 8
- uint64_t Padding_1; // 8
- };
-
- static_assert(sizeof(ManifestData) == 64);
-};
-
-void
-BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& BucketLock,
- ZenCacheDiskLayer::CacheBucket& Bucket,
- std::filesystem::path ManifestPath,
- ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
- std::vector<AccessTime>& AccessTimes,
- std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads)
-{
- if (Manifest["UsingMetaFile"sv].AsBool())
- {
- ReadSidecarFile(BucketLock, Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads);
-
- return;
- }
-
- ZEN_TRACE_CPU("Z$::ParseManifest");
-
- Stopwatch Timer;
- const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
-
- const uint64_t Count = Manifest["Count"sv].AsUInt64(0);
- std::vector<PayloadIndex> KeysIndexes;
- KeysIndexes.reserve(Count);
-
- CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView();
- for (CbFieldView& KeyView : KeyArray)
- {
- if (auto It = Index.find(KeyView.AsHash()); It != Index.end())
- {
- KeysIndexes.push_back(It.value());
- }
- else
- {
- KeysIndexes.push_back(PayloadIndex());
- }
- }
-
- size_t KeyIndexOffset = 0;
- CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView();
- for (CbFieldView& TimeStampView : TimeStampArray)
- {
- const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
- if (KeyIndex)
- {
- AccessTimes[KeyIndex] = TimeStampView.AsInt64();
- }
- }
-
- KeyIndexOffset = 0;
- CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView();
- CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView();
- if (RawHashArray.Num() == RawSizeArray.Num())
- {
- auto RawHashIt = RawHashArray.CreateViewIterator();
- auto RawSizeIt = RawSizeArray.CreateViewIterator();
- while (RawHashIt != CbFieldViewIterator())
- {
- const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
-
- if (KeyIndex)
- {
- uint64_t RawSize = RawSizeIt.AsUInt64();
- IoHash RawHash = RawHashIt.AsHash();
- if (RawSize != 0 || RawHash != IoHash::Zero)
- {
- BucketPayload& Payload = Payloads[KeyIndex];
- Bucket.SetMetaData(BucketLock, Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash});
- }
- }
-
- RawHashIt++;
- RawSizeIt++;
- }
- }
- else
- {
- ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath);
- }
-}
-
-Oid
-BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath)
-{
- const Oid BucketId = Oid::NewOid();
-
- CbObjectWriter Writer;
- Writer << "BucketId"sv << BucketId;
- Writer << "Version"sv << CurrentDiskBucketVersion;
- Manifest = Writer.Save();
- WriteFile(ManifestPath, Manifest.GetBuffer().AsIoBuffer());
-
- return BucketId;
-}
-
-IoBuffer
-BucketManifestSerializer::MakeManifest(const Oid& BucketId,
- ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
- std::vector<AccessTime>&& AccessTimes,
- std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>&& Payloads,
- std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>&& MetaDatas)
-{
- using namespace std::literals;
-
- ZEN_TRACE_CPU("Z$::MakeManifest");
-
- size_t ItemCount = Index.size();
-
- // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth
- // And we don't need to reallocate the underlying buffer in almost every case
- const size_t EstimatedSizePerItem = 54u;
- const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128);
- CbObjectWriter Writer(ReserveSize);
-
- Writer << "BucketId"sv << BucketId;
- Writer << "Version"sv << CurrentDiskBucketVersion;
-
- if (!Index.empty())
- {
- Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size()));
- Writer.BeginArray("Keys"sv);
- for (auto& Kv : Index)
- {
- const IoHash& Key = Kv.first;
- Writer.AddHash(Key);
- }
- Writer.EndArray();
-
- Writer.BeginArray("Timestamps"sv);
- for (auto& Kv : Index)
- {
- GcClock::Tick AccessTime = AccessTimes[Kv.second];
- Writer.AddInteger(AccessTime);
- }
- Writer.EndArray();
-
- if (!MetaDatas.empty())
- {
- Writer.BeginArray("RawHash"sv);
- for (auto& Kv : Index)
- {
- const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second];
- if (Payload.MetaData)
- {
- Writer.AddHash(MetaDatas[Payload.MetaData].RawHash);
- }
- else
- {
- Writer.AddHash(IoHash::Zero);
- }
- }
- Writer.EndArray();
-
- Writer.BeginArray("RawSize"sv);
- for (auto& Kv : Index)
- {
- const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second];
- if (Payload.MetaData)
- {
- Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize);
- }
- else
- {
- Writer.AddInteger(0);
- }
- }
- Writer.EndArray();
- }
- }
-
- Manifest = Writer.Save();
- return Manifest.GetBuffer().AsIoBuffer();
-}
-
-IoBuffer
-BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount)
-{
- m_ManifestEntryCount = EntryCount;
-
- CbObjectWriter Writer;
- Writer << "BucketId"sv << BucketId;
- Writer << "Version"sv << CurrentDiskBucketVersion;
- Writer << "Count"sv << EntryCount;
- Writer << "UsingMetaFile"sv << true;
- Manifest = Writer.Save();
-
- return Manifest.GetBuffer().AsIoBuffer();
-}
-
-bool
-BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock,
- ZenCacheDiskLayer::CacheBucket& Bucket,
- std::filesystem::path SidecarPath,
- ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
- std::vector<AccessTime>& AccessTimes,
- std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads)
-{
- ZEN_TRACE_CPU("Z$::ReadSidecarFile");
-
- ZEN_ASSERT(AccessTimes.size() == Payloads.size());
-
- std::error_code Ec;
-
- BasicFile SidecarFile;
- SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec);
-
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("failed to open sidecar file '{}'", SidecarPath));
- }
-
- uint64_t FileSize = SidecarFile.FileSize();
-
- auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); });
-
- if (FileSize < sizeof(BucketMetaHeader))
- {
- return false;
- }
-
- BasicFileBuffer Sidecar(SidecarFile, 128 * 1024);
-
- BucketMetaHeader Header;
- Sidecar.Read(&Header, sizeof Header, 0);
-
- if (!Header.IsValid())
- {
- return false;
- }
-
- if (Header.Version != BucketMetaHeader::Version1)
- {
- return false;
- }
-
- const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData);
- if (Header.EntryCount > ExpectedEntryCount)
- {
- return false;
- }
-
- InvalidGuard.Dismiss();
-
- uint64_t RemainingEntryCount = ExpectedEntryCount;
- uint64_t EntryCount = 0;
- uint64_t CurrentReadOffset = sizeof(Header);
-
- while (RemainingEntryCount--)
- {
- const ManifestData* Entry = Sidecar.MakeView<ManifestData>(CurrentReadOffset);
- CurrentReadOffset += sizeof(ManifestData);
-
- if (auto It = Index.find(Entry->Key); It != Index.end())
- {
- PayloadIndex PlIndex = It.value();
-
- ZEN_ASSERT(size_t(PlIndex) <= Payloads.size());
-
- ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex];
-
- AccessTimes[PlIndex] = Entry->Timestamp;
-
- if (Entry->RawSize && Entry->RawHash != IoHash::Zero)
- {
- Bucket.SetMetaData(BucketLock, PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash});
- }
- }
-
- EntryCount++;
- }
-
- ZEN_ASSERT(EntryCount == ExpectedEntryCount);
-
- return true;
-}
-
-void
-BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&,
- const std::filesystem::path& SidecarPath,
- uint64_t SnapshotLogPosition,
- const ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
- const std::vector<AccessTime>& AccessTimes,
- const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
- const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas)
-{
- ZEN_TRACE_CPU("Z$::WriteSidecarFile");
-
- BucketMetaHeader Header;
- Header.EntryCount = m_ManifestEntryCount;
- Header.LogPosition = SnapshotLogPosition;
- Header.Checksum = Header.ComputeChecksum(Header);
-
- std::error_code Ec;
-
- TemporaryFile SidecarFile;
- SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec);
-
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath()));
- }
-
- SidecarFile.Write(&Header, sizeof Header, 0);
-
- // TODO: make this batching for better performance
- {
- uint64_t WriteOffset = sizeof Header;
-
- // BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024);
-
- std::vector<ManifestData> ManifestDataBuffer;
- const size_t MaxManifestDataBufferCount = Min(Index.size(), 4096u); // 256 Kb
- ManifestDataBuffer.reserve(MaxManifestDataBufferCount);
- for (auto& Kv : Index)
- {
- const IoHash& Key = Kv.first;
- const PayloadIndex PlIndex = Kv.second;
-
- IoHash RawHash = IoHash::Zero;
- uint64_t RawSize = 0;
-
- if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData)
- {
- RawHash = MetaDatas[MetaIndex].RawHash;
- RawSize = MetaDatas[MetaIndex].RawSize;
- }
-
- ManifestDataBuffer.emplace_back(ManifestData{.Key = Key,
- .Timestamp = AccessTimes[PlIndex],
- .RawHash = RawHash,
- .Padding_0 = 0,
- .RawSize = RawSize,
- .Padding_1 = 0});
- if (ManifestDataBuffer.size() == MaxManifestDataBufferCount)
- {
- const uint64_t WriteSize = sizeof(ManifestData) * ManifestDataBuffer.size();
- SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset);
- WriteOffset += WriteSize;
- ManifestDataBuffer.clear();
- ManifestDataBuffer.reserve(MaxManifestDataBufferCount);
- }
- }
- if (ManifestDataBuffer.size() > 0)
- {
- SidecarFile.Write(ManifestDataBuffer.data(), sizeof(ManifestData) * ManifestDataBuffer.size(), WriteOffset);
- }
- }
-
- SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec);
-
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath));
- }
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-static const float IndexMinLoadFactor = 0.2f;
-static const float IndexMaxLoadFactor = 0.7f;
-
-ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
- std::atomic_uint64_t& OuterCacheMemoryUsage,
- std::string BucketName,
- const BucketConfiguration& Config)
-: m_Gc(Gc)
-, m_OuterCacheMemoryUsage(OuterCacheMemoryUsage)
-, m_BucketName(std::move(BucketName))
-, m_Configuration(Config)
-, m_BucketId(Oid::Zero)
-{
- m_Index.min_load_factor(IndexMinLoadFactor);
- m_Index.max_load_factor(IndexMaxLoadFactor);
-
- if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap")))
- {
- const uint64_t LegacyOverrideSize = 16 * 1024 * 1024;
-
- // This is pretty ad hoc but in order to avoid too many individual files
- // it makes sense to have a different strategy for legacy values
- m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, LegacyOverrideSize);
- }
- m_Gc.AddGcReferencer(*this);
-}
-
-ZenCacheDiskLayer::CacheBucket::~CacheBucket()
-{
- m_Gc.RemoveGcReferencer(*this);
-}
-
-bool
-ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate)
-{
- using namespace std::literals;
-
- ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate");
- ZEN_ASSERT(m_IsFlushing.load());
-
- // We want to take the lock here since we register as a GC referencer a construction
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
-
- ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir);
-
- m_BlocksBasePath = BucketDir / "blocks";
- m_BucketDir = BucketDir;
-
- CreateDirectories(m_BucketDir);
-
- std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
-
- bool IsNew = false;
-
- BucketManifestSerializer ManifestReader;
-
- if (ManifestReader.Open(ManifestPath))
- {
- m_BucketId = ManifestReader.GetBucketId();
- if (m_BucketId == Oid::Zero)
- {
- return false;
- }
-
- uint32_t Version = 0;
- if (ManifestReader.IsCurrentVersion(/* out */ Version) == false)
- {
- ZEN_INFO("Wiping bucket '{}', found version {}, required version {}",
- BucketDir,
- Version,
- BucketManifestSerializer::CurrentDiskBucketVersion);
- IsNew = true;
- }
- }
- else if (AllowCreate)
- {
- m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath);
- IsNew = true;
- }
- else
- {
- return false;
- }
-
- InitializeIndexFromDisk(IndexLock, IsNew);
-
- auto _ = MakeGuard([&]() {
- // We are now initialized, allow flushing when we exit
- m_IsFlushing.store(false);
- });
-
- if (IsNew)
- {
- return true;
- }
-
- ManifestReader.ParseManifest(IndexLock, *this, ManifestPath, m_Index, m_AccessTimes, m_Payloads);
-
- return true;
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc)
-{
- ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot");
-
- const uint64_t LogCount = m_SlogFile.GetLogCount();
- if (m_LogFlushPosition == LogCount)
- {
- return;
- }
-
- ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir);
- const uint64_t EntryCount = m_Index.size();
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
- m_BucketDir,
- EntryCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- namespace fs = std::filesystem;
-
- fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
-
- try
- {
- const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry);
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
- if (Error)
- {
- throw std::system_error(Error, fmt::format("get disk space in '{}' FAILED", m_BucketDir));
- }
-
- bool EnoughSpace = Space.Free >= IndexSize + 1024 * 512;
- if (!EnoughSpace)
- {
- uint64_t ReclaimedSpace = ClaimDiskReserveFunc();
- EnoughSpace = (Space.Free + ReclaimedSpace) >= IndexSize + 1024 * 512;
- }
- if (!EnoughSpace)
- {
- throw std::runtime_error(
- fmt::format("not enough free disk space in '{}' to save index of size {}", m_BucketDir, NiceBytes(IndexSize)));
- }
-
- TemporaryFile ObjectIndexFile;
- std::error_code Ec;
- ObjectIndexFile.CreateTemporary(m_BucketDir, Ec);
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir));
- }
-
- {
- // This is in a separate scope just to ensure IndexWriter goes out
- // of scope before the file is flushed/closed, in order to ensure
- // all data is written to the file
- BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024);
-
- CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
- .LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
-
- Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
- IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
-
- uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader);
-
- for (auto& Entry : m_Index)
- {
- DiskIndexEntry IndexEntry;
- IndexEntry.Key = Entry.first;
- IndexEntry.Location = m_Payloads[Entry.second].Location;
- IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset);
-
- IndexWriteOffset += sizeof(DiskIndexEntry);
- }
-
- IndexWriter.Flush();
- }
-
- ObjectIndexFile.Flush();
- ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec);
- if (Ec)
- {
- std::filesystem::path TempFilePath = ObjectIndexFile.GetPath();
- ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message());
-
- if (std::filesystem::is_regular_file(TempFilePath))
- {
- if (!std::filesystem::remove(TempFilePath, Ec) || Ec)
- {
- ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempFilePath, Ec.message());
- }
- }
- }
- else
- {
- // We must only update the log flush position once the snapshot write succeeds
- m_LogFlushPosition = LogCount;
- }
- }
- catch (std::exception& Err)
- {
- ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what());
- }
-}
-
-uint64_t
-ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion)
-{
- ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile");
-
- if (!std::filesystem::is_regular_file(IndexPath))
- {
- return 0;
- }
-
- auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); });
-
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t FileSize = ObjectIndexFile.FileSize();
- if (FileSize < sizeof(CacheBucketIndexHeader))
- {
- return 0;
- }
-
- CacheBucketIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
-
- if (!Header.IsValid())
- {
- return 0;
- }
-
- if (Header.Version != CacheBucketIndexHeader::Version2)
- {
- return 0;
- }
-
- const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
- if (Header.EntryCount > ExpectedEntryCount)
- {
- return 0;
- }
-
- InvalidGuard.Dismiss();
-
- size_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- m_Configuration.PayloadAlignment = Header.PayloadAlignment;
-
- m_Payloads.reserve(Header.EntryCount);
- m_Index.reserve(Header.EntryCount);
-
- BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024);
-
- uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader);
- uint64_t RemainingEntryCount = Header.EntryCount;
-
- std::string InvalidEntryReason;
- while (RemainingEntryCount--)
- {
- const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset);
- CurrentReadOffset += sizeof(DiskIndexEntry);
-
- if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
- continue;
- }
-
- const PayloadIndex EntryIndex = PayloadIndex(EntryCount);
- m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location});
- m_Index.insert_or_assign(Entry->Key, EntryIndex);
-
- EntryCount++;
- }
-
- ZEN_ASSERT(EntryCount == m_Payloads.size());
-
- m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount()));
-
- if (m_Configuration.EnableReferenceCaching)
- {
- m_FirstReferenceIndex.resize(EntryCount);
- }
-
- OutVersion = CacheBucketIndexHeader::Version2;
- return Header.LogPosition;
-}
-
-uint64_t
-ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
-{
- ZEN_TRACE_CPU("Z$::Bucket::ReadLog");
-
- if (!std::filesystem::is_regular_file(LogPath))
- {
- return 0;
- }
-
- uint64_t LogEntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- TCasLogFile<DiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- if (!CasLog.Initialize())
- {
- return 0;
- }
-
- const uint64_t EntryCount = CasLog.GetLogCount();
- if (EntryCount < SkipEntryCount)
- {
- ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
- SkipEntryCount = 0;
- }
-
- LogEntryCount = EntryCount - SkipEntryCount;
- uint64_t InvalidEntryCount = 0;
-
- CasLog.Replay(
- [&](const DiskIndexEntry& Record) {
- std::string InvalidEntryReason;
- if (Record.Location.Flags & DiskLocation::kTombStone)
- {
- // Note: this leaves m_Payloads and other arrays with 'holes' in them
- m_Index.erase(Record.Key);
- return;
- }
-
- if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
- ++InvalidEntryCount;
- return;
- }
- PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
- m_Payloads.emplace_back(BucketPayload{.Location = Record.Location});
- m_Index.insert_or_assign(Record.Key, EntryIndex);
- },
- SkipEntryCount);
-
- m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
-
- if (m_Configuration.EnableReferenceCaching)
- {
- m_FirstReferenceIndex.resize(m_Payloads.size());
- }
-
- if (InvalidEntryCount)
- {
- ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir);
- }
-
- return LogEntryCount;
-};
-
-void
-ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockScope& IndexLock, const bool IsNew)
-{
- ZEN_TRACE_CPU("Z$::Bucket::Initialize");
-
- m_StandaloneSize = 0;
-
- m_Index.clear();
- m_Payloads.clear();
- m_AccessTimes.clear();
- m_MetaDatas.clear();
- m_FreeMetaDatas.clear();
- m_MemCachedPayloads.clear();
- m_FreeMemCachedPayloads.clear();
- m_FirstReferenceIndex.clear();
- m_ReferenceHashes.clear();
- m_NextReferenceHashesIndexes.clear();
- m_ReferenceCount = 0;
-
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
- std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
-
- if (IsNew)
- {
- fs::remove(LogPath);
- fs::remove(IndexPath);
- fs::remove_all(m_BlocksBasePath);
- }
-
- CreateDirectories(m_BucketDir);
-
- m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1);
-
- if (std::filesystem::is_regular_file(IndexPath))
- {
- uint32_t IndexVersion = 0;
- m_LogFlushPosition = ReadIndexFile(IndexLock, IndexPath, IndexVersion);
- if (IndexVersion == 0)
- {
- ZEN_WARN("removing invalid index file at '{}'", IndexPath);
- std::filesystem::remove(IndexPath);
- }
- }
-
- uint64_t LogEntryCount = 0;
- if (std::filesystem::is_regular_file(LogPath))
- {
- if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath))
- {
- LogEntryCount = ReadLog(IndexLock, LogPath, m_LogFlushPosition);
- }
- else if (fs::is_regular_file(LogPath))
- {
- ZEN_WARN("removing invalid log at '{}'", LogPath);
- std::filesystem::remove(LogPath);
- }
- }
-
- m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
-
- BlockStore::BlockIndexSet KnownBlocks;
- for (const auto& Entry : m_Index)
- {
- size_t EntryIndex = Entry.second;
- const BucketPayload& Payload = m_Payloads[EntryIndex];
- const DiskLocation& Location = Payload.Location;
-
- if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
- }
- else
- {
- const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment);
- KnownBlocks.Add(BlockLocation.BlockIndex);
- }
- }
- m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
-
- if (IsNew || LogEntryCount > 0)
- {
- WriteIndexSnapshot(IndexLock);
- }
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const
-{
- char HexString[sizeof(HashKey.Hash) * 2];
- ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString);
-
- Path.Append(m_BucketDir);
- Path.AppendSeparator();
- Path.Append(L"blob");
- Path.AppendSeparator();
- Path.AppendAsciiRange(HexString, HexString + 3);
- Path.AppendSeparator();
- Path.AppendAsciiRange(HexString + 3, HexString + 5);
- Path.AppendSeparator();
- Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString));
-}
-
-IoBuffer
-ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const
-{
- ZEN_TRACE_CPU("Z$::Bucket::GetInlineCacheValue");
-
- BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment);
-
- IoBuffer Value = m_BlockStore.TryGetChunk(Location);
- if (Value)
- {
- Value.SetContentType(Loc.GetContentType());
- }
-
- return Value;
-}
-
-IoBuffer
-ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const
-{
- ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue");
-
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
-
- RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
-
- if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath()))
- {
- Data.SetContentType(ContentType);
-
- return Data;
- }
-
- return {};
-}
-
-bool
-ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- ZEN_TRACE_CPU("Z$::Bucket::Get");
-
- metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
-
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- auto It = m_Index.find(HashKey);
- if (It == m_Index.end())
- {
- m_DiskMissCount++;
- if (m_Configuration.MemCacheSizeThreshold > 0)
- {
- m_MemoryMissCount++;
- }
- return false;
- }
-
- PayloadIndex EntryIndex = It.value();
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
- DiskLocation Location = m_Payloads[EntryIndex].Location;
-
- bool FillRawHashAndRawSize = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0);
-
- const BucketPayload* Payload = &m_Payloads[EntryIndex];
- if (Payload->MetaData)
- {
- const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData];
- OutValue.RawHash = MetaData.RawHash;
- OutValue.RawSize = MetaData.RawSize;
- FillRawHashAndRawSize = false;
- }
-
- if (Payload->MemCached)
- {
- OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload;
- Payload = nullptr;
- IndexLock.ReleaseNow();
- m_MemoryHitCount++;
- }
- else
- {
- Payload = nullptr;
- IndexLock.ReleaseNow();
- if (m_Configuration.MemCacheSizeThreshold > 0)
- {
- m_MemoryMissCount++;
- }
- if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey);
- }
- else
- {
- OutValue.Value = GetInlineCacheValue(Location);
- if (m_Configuration.MemCacheSizeThreshold > 0)
- {
- size_t ValueSize = OutValue.Value.GetSize();
- if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
- {
- ZEN_TRACE_CPU("Z$::Bucket::Get::MemCache");
- OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
- RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
- if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end())
- {
- BucketPayload& WritePayload = m_Payloads[UpdateIt->second];
- // Only update if it has not already been updated by other thread
- if (!WritePayload.MemCached)
- {
- SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value);
- }
- }
- }
- }
- }
- }
-
- if (FillRawHashAndRawSize)
- {
- ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData");
- if (Location.IsFlagSet(DiskLocation::kCompressed))
- {
- if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
- {
- OutValue = ZenCacheValue{};
- m_DiskMissCount++;
- return false;
- }
- }
- else
- {
- OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
- OutValue.RawSize = OutValue.Value.GetSize();
- }
- RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
- if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end())
- {
- BucketPayload& WritePayload = m_Payloads[WriteIt.value()];
-
- // Only set if no other path has already updated the meta data
- if (!WritePayload.MetaData)
- {
- SetMetaData(UpdateIndexLock, WritePayload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash});
- }
- }
- }
- if (OutValue.Value)
- {
- m_DiskHitCount++;
- StatsScope.SetBytes(OutValue.Value.GetSize());
- return true;
- }
- else
- {
- m_DiskMissCount++;
- return false;
- }
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
-{
- ZEN_TRACE_CPU("Z$::Bucket::Put");
-
- metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
-
- if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
- {
- PutStandaloneCacheValue(HashKey, Value, References);
- }
- else
- {
- PutInlineCacheValue(HashKey, Value, References);
- }
-
- m_DiskWriteCount++;
-}
-
-uint64_t
-ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime)
-{
- ZEN_TRACE_CPU("Z$::Bucket::MemCacheTrim");
-
- uint64_t Trimmed = 0;
- GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
-
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
- if (MemCachedCount == 0)
- {
- return 0;
- }
-
- uint32_t WriteIndex = 0;
- for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
- {
- MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
- if (!Data.Payload)
- {
- continue;
- }
- PayloadIndex Index = Data.OwnerIndex;
- ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
- GcClock::Tick AccessTime = m_AccessTimes[Index];
- if (AccessTime < ExpireTicks)
- {
- size_t PayloadSize = Data.Payload.GetSize();
- RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
- Data = {};
- m_Payloads[Index].MemCached = {};
- Trimmed += PayloadSize;
- continue;
- }
- if (ReadIndex > WriteIndex)
- {
- m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index};
- m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex);
- }
- WriteIndex++;
- }
- m_MemCachedPayloads.resize(WriteIndex);
- m_MemCachedPayloads.shrink_to_fit();
- zen::Reset(m_FreeMemCachedPayloads);
- return Trimmed;
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots)
-{
- ZEN_TRACE_CPU("Z$::Bucket::GetUsageByAccess");
-
- size_t SlotCount = InOutUsageSlots.capacity();
- RwLock::SharedLockScope _(m_IndexLock);
- uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
- if (MemCachedCount == 0)
- {
- return;
- }
- for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
- {
- MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
- if (!Data.Payload)
- {
- continue;
- }
- PayloadIndex Index = Data.OwnerIndex;
- ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
- GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index]));
- GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0);
- size_t Slot = Age < MaxAge ? gsl::narrow<size_t>((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1);
- ZEN_ASSERT_SLOW(Slot < SlotCount);
- if (Slot >= InOutUsageSlots.size())
- {
- InOutUsageSlots.resize(Slot + 1, 0);
- }
- InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize());
- }
-}
-
-bool
-ZenCacheDiskLayer::CacheBucket::Drop()
-{
- ZEN_TRACE_CPU("Z$::Bucket::Drop");
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
-
- std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks;
- ShardLocks.reserve(256);
- for (RwLock& Lock : m_ShardedLocks)
- {
- ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock));
- }
- m_BlockStore.Close();
- m_SlogFile.Close();
-
- bool Deleted = MoveAndDeleteDirectory(m_BucketDir);
-
- m_Index.clear();
- m_Payloads.clear();
- m_AccessTimes.clear();
- m_MetaDatas.clear();
- m_FreeMetaDatas.clear();
- m_MemCachedPayloads.clear();
- m_FreeMemCachedPayloads.clear();
- m_FirstReferenceIndex.clear();
- m_ReferenceHashes.clear();
- m_NextReferenceHashesIndexes.clear();
- m_ReferenceCount = 0;
- m_StandaloneSize.store(0);
- m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load());
- m_MemCachedSize.store(0);
-
- return Deleted;
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::Flush()
-{
- ZEN_TRACE_CPU("Z$::Bucket::Flush");
- bool Expected = false;
- if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
- {
- return;
- }
- auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
-
- ZEN_INFO("Flushing bucket {}", m_BucketDir);
-
- try
- {
- m_BlockStore.Flush(/*ForceNewBlock*/ false);
- m_SlogFile.Flush();
-
- SaveSnapshot();
- }
- catch (std::exception& Ex)
- {
- ZEN_WARN("Failed to flush bucket in '{}'. Reason: '{}'", m_BucketDir, Ex.what());
- }
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc)
-{
- ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot");
- try
- {
- bool UseLegacyScheme = false;
-
- IoBuffer Buffer;
- BucketManifestSerializer ManifestWriter;
-
- if (UseLegacyScheme)
- {
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketPayload> Payloads;
- std::vector<BucketMetaData> MetaDatas;
- IndexMap Index;
-
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- WriteIndexSnapshot(IndexLock);
- // Note: this copy could be eliminated on shutdown to
- // reduce memory usage and execution time
- Index = m_Index;
- Payloads = m_Payloads;
- AccessTimes = m_AccessTimes;
- MetaDatas = m_MetaDatas;
- }
-
- Buffer = ManifestWriter.MakeManifest(m_BucketId,
- std::move(Index),
- std::move(AccessTimes),
- std::move(Payloads),
- std::move(MetaDatas));
- const uint64_t RequiredSpace = Buffer.GetSize() + 1024 * 512;
-
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
- if (Error)
- {
- ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
- return;
- }
- bool EnoughSpace = Space.Free >= RequiredSpace;
- if (!EnoughSpace)
- {
- uint64_t ReclaimedSpace = ClaimDiskReserveFunc();
- EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace;
- }
- if (!EnoughSpace)
- {
- ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}",
- m_BucketDir,
- NiceBytes(Buffer.GetSize()));
- return;
- }
- }
- else
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- WriteIndexSnapshot(IndexLock);
- const uint64_t EntryCount = m_Index.size();
- Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount);
- uint64_t SidecarSize = ManifestWriter.GetSidecarSize();
-
- const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512;
-
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
- if (Error)
- {
- ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
- return;
- }
- bool EnoughSpace = Space.Free >= RequiredSpace;
- if (!EnoughSpace)
- {
- uint64_t ReclaimedSpace = ClaimDiskReserveFunc();
- EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace;
- }
- if (!EnoughSpace)
- {
- ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}",
- m_BucketDir,
- NiceBytes(Buffer.GetSize()));
- return;
- }
-
- ManifestWriter.WriteSidecarFile(IndexLock,
- GetMetaPath(m_BucketDir, m_BucketName),
- m_LogFlushPosition,
- m_Index,
- m_AccessTimes,
- m_Payloads,
- m_MetaDatas);
- }
-
- std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
- WriteFile(ManifestPath, Buffer);
- }
- catch (std::exception& Err)
- {
- ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what());
- }
-}
-
-IoHash
-HashBuffer(const CompositeBuffer& Buffer)
-{
- IoHashStream Hasher;
-
- for (const SharedBuffer& Segment : Buffer.GetSegments())
- {
- Hasher.Append(Segment.GetView());
- }
-
- return Hasher.GetHash();
-}
-
-bool
-ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer)
-{
- ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType);
-
- if (ContentType == ZenContentType::kCbObject)
- {
- CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
-
- if (Error == CbValidateError::None)
- {
- return true;
- }
-
- ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error));
-
- return false;
- }
- else if (ContentType == ZenContentType::kCompressedBinary)
- {
- IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer);
-
- IoHash HeaderRawHash;
- uint64_t RawSize = 0;
- if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize))
- {
- ZEN_SCOPED_ERROR("compressed buffer header validation failed");
-
- return false;
- }
-
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize);
- CompositeBuffer Decompressed = Compressed.DecompressToComposite();
- IoHash DecompressedHash = HashBuffer(Decompressed);
-
- if (HeaderRawHash != DecompressedHash)
- {
- ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash);
-
- return false;
- }
- }
- else
- {
- // No way to verify this kind of content (what is it exactly?)
-
- static int Once = [&] {
- ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType));
- return 42;
- }();
- }
-
- return true;
-};
-
-void
-ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
-{
- ZEN_TRACE_CPU("Z$::Bucket::Scrub");
-
- ZEN_INFO("scrubbing '{}'", m_BucketDir);
-
- Stopwatch Timer;
- uint64_t ChunkCount = 0;
- uint64_t VerifiedChunkBytes = 0;
-
- auto LogStats = MakeGuard([&] {
- const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs());
-
- ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})",
- m_BucketName,
- NiceBytes(VerifiedChunkBytes),
- NiceTimeSpanMs(DurationMs),
- ChunkCount,
- NiceRate(VerifiedChunkBytes, DurationMs));
- });
-
- std::vector<IoHash> BadKeys;
- auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); };
-
- try
- {
- std::vector<BlockStoreLocation> ChunkLocations;
- std::vector<IoHash> ChunkIndexToChunkHash;
-
- RwLock::SharedLockScope _(m_IndexLock);
-
- const size_t BlockChunkInitialCount = m_Index.size() / 4;
- ChunkLocations.reserve(BlockChunkInitialCount);
- ChunkIndexToChunkHash.reserve(BlockChunkInitialCount);
-
- // Do a pass over the index and verify any standalone file values straight away
- // all other storage classes are gathered and verified in bulk in order to enable
- // more efficient I/O scheduling
-
- for (auto& Kv : m_Index)
- {
- const IoHash& HashKey = Kv.first;
- const BucketPayload& Payload = m_Payloads[Kv.second];
- const DiskLocation& Loc = Payload.Location;
-
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- Ctx.ThrowIfDeadlineExpired();
-
- ++ChunkCount;
- VerifiedChunkBytes += Loc.Size();
-
- if (Loc.GetContentType() == ZenContentType::kBinary)
- {
- // Blob cache value, not much we can do about data integrity checking
- // here since there's no hash available
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
-
- RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
-
- std::error_code Ec;
- uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec);
- if (Ec)
- {
- ReportBadKey(HashKey);
- }
- if (size != Loc.Size())
- {
- ReportBadKey(HashKey);
- }
- continue;
- }
- else
- {
- // Structured cache value
- IoBuffer Buffer = GetStandaloneCacheValue(Loc.GetContentType(), HashKey);
- if (!Buffer)
- {
- ReportBadKey(HashKey);
- continue;
- }
- if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer))
- {
- ReportBadKey(HashKey);
- continue;
- }
- }
- }
- else
- {
- ChunkLocations.emplace_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment));
- ChunkIndexToChunkHash.push_back(HashKey);
- continue;
- }
- }
-
- const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void {
- ++ChunkCount;
- VerifiedChunkBytes += Size;
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- if (!Data)
- {
- // ChunkLocation out of range of stored blocks
- ReportBadKey(Hash);
- return;
- }
- if (!Size)
- {
- ReportBadKey(Hash);
- return;
- }
- IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
- if (!Buffer)
- {
- ReportBadKey(Hash);
- return;
- }
- const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
- ZenContentType ContentType = Payload.Location.GetContentType();
- Buffer.SetContentType(ContentType);
- if (!ValidateCacheBucketEntryValue(ContentType, Buffer))
- {
- ReportBadKey(Hash);
- return;
- }
- };
-
- const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void {
- Ctx.ThrowIfDeadlineExpired();
-
- ++ChunkCount;
- VerifiedChunkBytes += Size;
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
- if (!Buffer)
- {
- ReportBadKey(Hash);
- return;
- }
- const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
- ZenContentType ContentType = Payload.Location.GetContentType();
- Buffer.SetContentType(ContentType);
- if (!ValidateCacheBucketEntryValue(ContentType, Buffer))
- {
- ReportBadKey(Hash);
- return;
- }
- };
-
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
- }
- catch (ScrubDeadlineExpiredException&)
- {
- ZEN_INFO("Scrubbing deadline expired, operation incomplete");
- }
-
- Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes);
-
- if (!BadKeys.empty())
- {
- ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir);
-
- if (Ctx.RunRecovery())
- {
- // Deal with bad chunks by removing them from our lookup map
-
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(BadKeys.size());
-
- {
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- for (const IoHash& BadKey : BadKeys)
- {
- // Log a tombstone and delete the in-memory index for the bad entry
- const auto It = m_Index.find(BadKey);
- BucketPayload& Payload = m_Payloads[It->second];
- if (m_Configuration.EnableReferenceCaching)
- {
- RemoveReferences(IndexLock, m_FirstReferenceIndex[It->second]);
- }
- DiskLocation Location = Payload.Location;
- if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed);
- }
-
- RemoveMemCachedData(IndexLock, Payload);
- RemoveMetaData(IndexLock, Payload);
-
- Location.Flags |= DiskLocation::kTombStone;
- LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
- m_Index.erase(BadKey);
- }
- }
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- ExtendablePathBuilder<256> Path;
- BuildPath(Path, Entry.Key);
- fs::path FilePath = Path.ToPath();
- RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key));
- if (fs::is_regular_file(FilePath))
- {
- ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8());
- std::error_code Ec;
- fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file...
- }
- }
- }
- m_SlogFile.Append(LogEntries);
-
- // Clean up m_AccessTimes and m_Payloads vectors
- {
- std::vector<BucketPayload> Payloads;
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketMetaData> MetaDatas;
- std::vector<MemCacheData> MemCachedPayloads;
- std::vector<ReferenceIndex> FirstReferenceIndex;
- IndexMap Index;
-
- {
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock);
- }
- }
- }
- }
-
- // Let whomever it concerns know about the bad chunks. This could
- // be used to invalidate higher level data structures more efficiently
- // than a full validation pass might be able to do
- if (!BadKeys.empty())
- {
- Ctx.ReportBadCidChunks(BadKeys);
- }
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::Bucket::GatherReferences");
-
-#define CALCULATE_BLOCKING_TIME 0
-
-#if CALCULATE_BLOCKING_TIME
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
-#endif // CALCULATE_BLOCKING_TIME
-
- Stopwatch TotalTimer;
- const auto _ = MakeGuard([&] {
-#if CALCULATE_BLOCKING_TIME
- ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})",
- m_BucketDir,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- NiceLatencyNs(WriteBlockTimeUs),
- NiceLatencyNs(WriteBlockLongestTimeUs),
- NiceLatencyNs(ReadBlockTimeUs),
- NiceLatencyNs(ReadBlockLongestTimeUs));
-#else
- ZEN_DEBUG("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()));
-#endif // CALCULATE_BLOCKING_TIME
- });
-
- const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime();
-
- const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
-
- IndexMap Index;
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketPayload> Payloads;
- std::vector<ReferenceIndex> FirstReferenceIndex;
- {
- RwLock::SharedLockScope __(m_IndexLock);
-#if CALCULATE_BLOCKING_TIME
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
-#endif // CALCULATE_BLOCKING_TIME
- if (m_Index.empty())
- {
- return;
- }
- Index = m_Index;
- AccessTimes = m_AccessTimes;
- Payloads = m_Payloads;
- FirstReferenceIndex = m_FirstReferenceIndex;
- }
-
- std::vector<IoHash> ExpiredKeys;
- ExpiredKeys.reserve(1024);
-
- std::vector<IoHash> Cids;
- if (!GcCtx.SkipCid())
- {
- Cids.reserve(1024);
- }
-
- std::vector<std::pair<IoHash, size_t>> StructuredItemsWithUnknownAttachments;
-
- for (const auto& Entry : Index)
- {
- const IoHash& Key = Entry.first;
- size_t PayloadIndex = Entry.second;
- GcClock::Tick AccessTime = AccessTimes[PayloadIndex];
- if (AccessTime < ExpireTicks)
- {
- ExpiredKeys.push_back(Key);
- continue;
- }
-
- if (GcCtx.SkipCid())
- {
- continue;
- }
-
- BucketPayload& Payload = Payloads[PayloadIndex];
- const DiskLocation& Loc = Payload.Location;
-
- if (!Loc.IsFlagSet(DiskLocation::kStructured))
- {
- continue;
- }
- if (m_Configuration.EnableReferenceCaching)
- {
- if (FirstReferenceIndex.empty() || FirstReferenceIndex[PayloadIndex] == ReferenceIndex::Unknown())
- {
- StructuredItemsWithUnknownAttachments.push_back(Entry);
- continue;
- }
-
- bool ReferencesAreKnown = false;
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
-#if CALCULATE_BLOCKING_TIME
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
-#endif // CALCULATE_BLOCKING_TIME
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- ReferencesAreKnown = GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids);
- }
- }
- if (ReferencesAreKnown)
- {
- if (Cids.size() >= 1024)
- {
- GcCtx.AddRetainedCids(Cids);
- Cids.clear();
- }
- continue;
- }
- }
- StructuredItemsWithUnknownAttachments.push_back(Entry);
- }
-
- for (const auto& Entry : StructuredItemsWithUnknownAttachments)
- {
- const IoHash& Key = Entry.first;
- BucketPayload& Payload = Payloads[Entry.second];
- const DiskLocation& Loc = Payload.Location;
- {
- IoBuffer Buffer;
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Key); !Buffer)
- {
- continue;
- }
- }
- else
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
-#if CALCULATE_BLOCKING_TIME
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
-#endif // CALCULATE_BLOCKING_TIME
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- const BucketPayload& CachedPayload = m_Payloads[It->second];
- if (CachedPayload.MemCached)
- {
- Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload;
- ZEN_ASSERT_SLOW(Buffer);
- }
- else
- {
- DiskLocation Location = m_Payloads[It->second].Location;
- IndexLock.ReleaseNow();
- Buffer = GetInlineCacheValue(Location);
- // Don't memcache items when doing GC
- }
- }
- if (!Buffer)
- {
- continue;
- }
- }
-
- ZEN_ASSERT(Buffer);
- ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
- CbObjectView Obj(Buffer.GetData());
- size_t CurrentCidCount = Cids.size();
- Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
- if (m_Configuration.EnableReferenceCaching)
- {
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
-#if CALCULATE_BLOCKING_TIME
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
-#endif // CALCULATE_BLOCKING_TIME
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- if (m_FirstReferenceIndex[It->second] == ReferenceIndex::Unknown())
- {
- SetReferences(IndexLock,
- m_FirstReferenceIndex[It->second],
- std::span<IoHash>(Cids.data() + CurrentCidCount, Cids.size() - CurrentCidCount));
- }
- else
- {
- Cids.resize(CurrentCidCount);
- (void)GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids);
- }
- }
- }
- if (Cids.size() >= 1024)
- {
- GcCtx.AddRetainedCids(Cids);
- Cids.clear();
- }
- }
- }
-
- GcCtx.AddRetainedCids(Cids);
- GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys));
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage");
-
- ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir);
-
- Stopwatch TotalTimer;
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
- uint64_t TotalChunkCount = 0;
- uint64_t DeletedSize = 0;
- GcStorageSize OldTotalSize = StorageSize();
-
- std::unordered_set<IoHash> DeletedChunks;
- uint64_t MovedCount = 0;
-
- const auto _ = MakeGuard([&] {
- ZEN_DEBUG(
- "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved "
- "{} "
- "of {} "
- "entries ({}/{}).",
- m_BucketDir,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- NiceLatencyNs(WriteBlockTimeUs),
- NiceLatencyNs(WriteBlockLongestTimeUs),
- NiceLatencyNs(ReadBlockTimeUs),
- NiceLatencyNs(ReadBlockLongestTimeUs),
- NiceBytes(DeletedSize),
- DeletedChunks.size(),
- MovedCount,
- TotalChunkCount,
- NiceBytes(OldTotalSize.DiskSize),
- NiceBytes(OldTotalSize.MemorySize));
-
- bool Expected = false;
- if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
- {
- return;
- }
- auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
-
- try
- {
- SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); });
- }
- catch (std::exception& Ex)
- {
- ZEN_WARN("Failed to write index and manifest after GC in '{}'. Reason: '{}'", m_BucketDir, Ex.what());
- }
- });
-
- auto __ = MakeGuard([&]() {
- if (!DeletedChunks.empty())
- {
- // Clean up m_AccessTimes and m_Payloads vectors
- std::vector<BucketPayload> Payloads;
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketMetaData> MetaDatas;
- std::vector<MemCacheData> MemCachedPayloads;
- std::vector<ReferenceIndex> FirstReferenceIndex;
- IndexMap Index;
- {
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock);
- }
- GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end()));
- }
- });
-
- std::span<const IoHash> ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string());
- if (ExpiredCacheKeySpan.empty())
- {
- return;
- }
-
- m_SlogFile.Flush();
-
- std::unordered_set<IoHash, IoHash::Hasher> ExpiredCacheKeys(ExpiredCacheKeySpan.begin(), ExpiredCacheKeySpan.end());
-
- std::vector<DiskIndexEntry> ExpiredStandaloneEntries;
- IndexMap IndexSnapshot;
- std::vector<BucketPayload> PayloadsSnapshot;
- BlockStore::ReclaimSnapshotState BlockStoreState;
- {
- bool Expected = false;
- if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
- {
- ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is currently flushing", m_BucketDir);
- return;
- }
- auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
-
- {
- ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::State");
- RwLock::SharedLockScope IndexLock(m_IndexLock);
-
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
-
- BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
-
- for (const IoHash& Key : ExpiredCacheKeys)
- {
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- const BucketPayload& Payload = m_Payloads[It->second];
- if (Payload.Location.Flags & DiskLocation::kStandaloneFile)
- {
- DiskIndexEntry Entry = {.Key = Key, .Location = Payload.Location};
- Entry.Location.Flags |= DiskLocation::kTombStone;
- ExpiredStandaloneEntries.push_back(Entry);
- }
- }
- }
-
- PayloadsSnapshot = m_Payloads;
- IndexSnapshot = m_Index;
-
- if (GcCtx.IsDeletionMode())
- {
- IndexLock.ReleaseNow();
- RwLock::ExclusiveLockScope __(m_IndexLock);
- for (const auto& Entry : ExpiredStandaloneEntries)
- {
- if (m_Index.erase(Entry.Key) == 1)
- {
- m_StandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
- DeletedChunks.insert(Entry.Key);
- }
- }
- m_SlogFile.Append(ExpiredStandaloneEntries);
- }
- }
- }
-
- if (GcCtx.IsDeletionMode())
- {
- ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::Delete");
-
- ExtendablePathBuilder<256> Path;
-
- for (const auto& Entry : ExpiredStandaloneEntries)
- {
- const IoHash& Key = Entry.Key;
-
- Path.Reset();
- BuildPath(Path, Key);
- fs::path FilePath = Path.ToPath();
-
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (m_Index.contains(Key))
- {
- // Someone added it back, let the file on disk be
- ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8());
- continue;
- }
- IndexLock.ReleaseNow();
-
- RwLock::ExclusiveLockScope ValueLock(LockForHash(Key));
- if (fs::is_regular_file(FilePath))
- {
- ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
- std::error_code Ec;
- fs::remove(FilePath, Ec);
- if (Ec)
- {
- ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
- continue;
- }
- }
- }
- DeletedSize += Entry.Location.Size();
- }
- }
-
- TotalChunkCount = IndexSnapshot.size();
-
- std::vector<BlockStoreLocation> ChunkLocations;
- BlockStore::ChunkIndexArray KeepChunkIndexes;
- std::vector<IoHash> ChunkIndexToChunkHash;
- ChunkLocations.reserve(TotalChunkCount);
- ChunkLocations.reserve(TotalChunkCount);
- ChunkIndexToChunkHash.reserve(TotalChunkCount);
- {
- TotalChunkCount = 0;
- for (const auto& Entry : IndexSnapshot)
- {
- size_t EntryIndex = Entry.second;
- const DiskLocation& DiskLocation = PayloadsSnapshot[EntryIndex].Location;
-
- if (DiskLocation.Flags & DiskLocation::kStandaloneFile)
- {
- continue;
- }
- const IoHash& Key = Entry.first;
- BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment);
- size_t ChunkIndex = ChunkLocations.size();
- ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash.push_back(Key);
- if (ExpiredCacheKeys.contains(Key))
- {
- continue;
- }
- KeepChunkIndexes.push_back(ChunkIndex);
- }
- }
- TotalChunkCount = ChunkLocations.size();
- size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size();
-
- const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
- if (!PerformDelete)
- {
- m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, true);
- GcStorageSize CurrentTotalSize = StorageSize();
- ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} ({}/{})",
- m_BucketDir,
- DeleteCount,
- TotalChunkCount,
- NiceBytes(CurrentTotalSize.DiskSize),
- NiceBytes(CurrentTotalSize.MemorySize));
- return;
- }
-
- m_BlockStore.ReclaimSpace(
- BlockStoreState,
- ChunkLocations,
- KeepChunkIndexes,
- m_Configuration.PayloadAlignment,
- false,
- [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
- {
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- size_t EntryIndex = m_Index[ChunkHash];
- BucketPayload& Payload = m_Payloads[EntryIndex];
- if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != m_Payloads[EntryIndex].Location)
- {
- // Entry has been updated while GC was running, ignore the move
- continue;
- }
- Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags());
- LogEntries.push_back({.Key = ChunkHash, .Location = Payload.Location});
- }
- for (const size_t ChunkIndex : RemovedChunks)
- {
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- size_t EntryIndex = m_Index[ChunkHash];
- BucketPayload& Payload = m_Payloads[EntryIndex];
- if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != Payload.Location)
- {
- // Entry has been updated while GC was running, ignore the delete
- continue;
- }
- const DiskLocation& OldDiskLocation = Payload.Location;
- LogEntries.push_back({.Key = ChunkHash,
- .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment),
- m_Configuration.PayloadAlignment,
- OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
-
- RemoveMemCachedData(IndexLock, Payload);
- RemoveMetaData(IndexLock, Payload);
-
- m_Index.erase(ChunkHash);
- DeletedChunks.insert(ChunkHash);
- }
- }
-
- m_SlogFile.Append(LogEntries);
- m_SlogFile.Flush();
- },
- [&]() { return GcCtx.ClaimGCReserve(); });
-}
-
-ZenCacheDiskLayer::BucketStats
-ZenCacheDiskLayer::CacheBucket::Stats()
-{
- GcStorageSize Size = StorageSize();
- return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize,
- .MemorySize = Size.MemorySize,
- .DiskHitCount = m_DiskHitCount,
- .DiskMissCount = m_DiskMissCount,
- .DiskWriteCount = m_DiskWriteCount,
- .MemoryHitCount = m_MemoryHitCount,
- .MemoryMissCount = m_MemoryMissCount,
- .MemoryWriteCount = m_MemoryWriteCount,
- .PutOps = m_PutOps.Snapshot(),
- .GetOps = m_GetOps.Snapshot()};
-}
-
-uint64_t
-ZenCacheDiskLayer::CacheBucket::EntryCount() const
-{
- RwLock::SharedLockScope _(m_IndexLock);
- return static_cast<uint64_t>(m_Index.size());
-}
-
-CacheValueDetails::ValueDetails
-ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const IoHash& Key, PayloadIndex Index) const
-{
- std::vector<IoHash> Attachments;
- const BucketPayload& Payload = m_Payloads[Index];
- if (Payload.Location.IsFlagSet(DiskLocation::kStructured))
- {
- IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile)
- ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key)
- : GetInlineCacheValue(Payload.Location);
- CbObjectView Obj(Value.GetData());
- Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); });
- }
- BucketMetaData MetaData = GetMetaData(IndexLock, Payload);
- return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(),
- .RawSize = MetaData.RawSize,
- .RawHash = MetaData.RawHash,
- .LastAccess = m_AccessTimes[Index],
- .Attachments = std::move(Attachments),
- .ContentType = Payload.Location.GetContentType()};
-}
-
-CacheValueDetails::BucketDetails
-ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const
-{
- CacheValueDetails::BucketDetails Details;
- RwLock::SharedLockScope _(m_IndexLock);
- if (ValueFilter.empty())
- {
- Details.Values.reserve(m_Index.size());
- for (const auto& It : m_Index)
- {
- Details.Values.insert_or_assign(It.first, GetValueDetails(IndexLock, It.first, It.second));
- }
- }
- else
- {
- IoHash Key = IoHash::FromHexString(ValueFilter);
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- Details.Values.insert_or_assign(It->first, GetValueDetails(IndexLock, It->first, It->second));
- }
- }
- return Details;
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents(
- std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const
-{
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- for (const auto& It : m_Index)
- {
- CacheValueDetails::ValueDetails Vd = GetValueDetails(IndexLock, It.first, It.second);
-
- Fn(It.first, Vd);
- }
-}
-
-void
-ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::CollectGarbage");
-
- std::vector<CacheBucket*> Buckets;
- {
- RwLock::SharedLockScope _(m_Lock);
- Buckets.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Buckets.push_back(Kv.second.get());
- }
- }
- for (CacheBucket* Bucket : Buckets)
- {
- Bucket->CollectGarbage(GcCtx);
- }
- if (!m_IsMemCacheTrimming)
- {
- MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
- }
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
-{
- ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue");
-
- uint64_t NewFileSize = Value.Value.Size();
-
- TemporaryFile DataFile;
-
- std::error_code Ec;
- DataFile.CreateTemporary(m_BucketDir.c_str(), Ec);
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir));
- }
-
- bool CleanUpTempFile = true;
- auto __ = MakeGuard([&] {
- if (CleanUpTempFile)
- {
- std::error_code Ec;
- std::filesystem::remove(DataFile.GetPath(), Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'",
- DataFile.GetPath(),
- m_BucketDir,
- Ec.message());
- }
- }
- });
-
- DataFile.WriteAll(Value.Value, Ec);
- if (Ec)
- {
- throw std::system_error(Ec,
- fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'",
- NiceBytes(NewFileSize),
- DataFile.GetPath().string(),
- m_BucketDir));
- }
-
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
- std::filesystem::path FsPath{DataFilePath.ToPath()};
-
- RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
-
- // We do a speculative remove of the file instead of probing with a exists call and check the error code instead
- std::filesystem::remove(FsPath, Ec);
- if (Ec)
- {
- if (Ec.value() != ENOENT)
- {
- ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message());
- Sleep(100);
- Ec.clear();
- std::filesystem::remove(FsPath, Ec);
- if (Ec && Ec.value() != ENOENT)
- {
- throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir));
- }
- }
- }
-
- // Assume parent directory exists
- DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
- if (Ec)
- {
- CreateDirectories(FsPath.parent_path());
-
- // Try again after we or someone else created the directory
- Ec.clear();
- DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
-
- // Retry if we still fail to handle contention to file system
- uint32_t RetriesLeft = 3;
- while (Ec && RetriesLeft > 0)
- {
- ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retries left: {}.",
- FsPath,
- DataFile.GetPath(),
- m_BucketDir,
- Ec.message(),
- RetriesLeft);
- Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
- Ec.clear();
- DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
- RetriesLeft--;
- }
- if (Ec)
- {
- throw std::system_error(
- Ec,
- fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir));
- }
- }
-
- // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file
- // will be disabled as the file handle has already been closed
- CleanUpTempFile = false;
-
- uint8_t EntryFlags = DiskLocation::kStandaloneFile;
-
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
- else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
- {
- EntryFlags |= DiskLocation::kCompressed;
- }
-
- DiskLocation Loc(NewFileSize, EntryFlags);
-
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- ValueLock.ReleaseNow();
- if (m_UpdatedKeys)
- {
- m_UpdatedKeys->insert(HashKey);
- }
-
- PayloadIndex EntryIndex = {};
- if (auto It = m_Index.find(HashKey); It == m_Index.end())
- {
- // Previously unknown object
- EntryIndex = PayloadIndex(m_Payloads.size());
- m_Payloads.emplace_back(BucketPayload{.Location = Loc});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- if (m_Configuration.EnableReferenceCaching)
- {
- m_FirstReferenceIndex.emplace_back(ReferenceIndex{});
- SetReferences(IndexLock, m_FirstReferenceIndex.back(), References);
- }
- m_Index.insert_or_assign(HashKey, EntryIndex);
- }
- else
- {
- EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size()));
- BucketPayload& Payload = m_Payloads[EntryIndex];
- uint64_t OldSize = Payload.Location.Size();
- Payload = BucketPayload{.Location = Loc};
- if (m_Configuration.EnableReferenceCaching)
- {
- SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References);
- }
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
- RemoveMemCachedData(IndexLock, Payload);
- m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed);
- }
- if (Value.RawSize != 0 || Value.RawHash != IoHash::Zero)
- {
- SetMetaData(IndexLock, m_Payloads[EntryIndex], {.RawSize = Value.RawSize, .RawHash = Value.RawHash});
- }
- else
- {
- RemoveMetaData(IndexLock, m_Payloads[EntryIndex]);
- }
-
- m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_StandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed);
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::SetMetaData(RwLock::ExclusiveLockScope&,
- BucketPayload& Payload,
- const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData)
-{
- if (Payload.MetaData)
- {
- m_MetaDatas[Payload.MetaData] = MetaData;
- }
- else
- {
- if (m_FreeMetaDatas.empty())
- {
- Payload.MetaData = MetaDataIndex(m_MetaDatas.size());
- m_MetaDatas.emplace_back(MetaData);
- }
- else
- {
- Payload.MetaData = m_FreeMetaDatas.back();
- m_FreeMetaDatas.pop_back();
- m_MetaDatas[Payload.MetaData] = MetaData;
- }
- }
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload)
-{
- if (Payload.MetaData)
- {
- m_FreeMetaDatas.push_back(Payload.MetaData);
- Payload.MetaData = {};
- }
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData)
-{
- BucketPayload& Payload = m_Payloads[PayloadIndex];
- uint64_t PayloadSize = MemCachedData.GetSize();
- ZEN_ASSERT(PayloadSize != 0);
- if (m_FreeMemCachedPayloads.empty())
- {
- if (m_MemCachedPayloads.size() != std::numeric_limits<uint32_t>::max())
- {
- Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(m_MemCachedPayloads.size()));
- m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex});
- AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
- m_MemoryWriteCount++;
- }
- }
- else
- {
- Payload.MemCached = m_FreeMemCachedPayloads.back();
- m_FreeMemCachedPayloads.pop_back();
- m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex};
- AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
- m_MemoryWriteCount++;
- }
-}
-
-size_t
-ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload)
-{
- if (Payload.MemCached)
- {
- size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize();
- RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
- m_MemCachedPayloads[Payload.MemCached] = {};
- m_FreeMemCachedPayloads.push_back(Payload.MemCached);
- Payload.MemCached = {};
- return PayloadSize;
- }
- return 0;
-}
-
-ZenCacheDiskLayer::CacheBucket::BucketMetaData
-ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const
-{
- if (Payload.MetaData)
- {
- return m_MetaDatas[Payload.MetaData];
- }
- return {};
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
-{
- ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
-
- uint8_t EntryFlags = 0;
-
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
- else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
- {
- EntryFlags |= DiskLocation::kCompressed;
- }
-
- m_BlockStore.WriteChunk(Value.Value.Data(),
- Value.Value.Size(),
- m_Configuration.PayloadAlignment,
- [&](const BlockStoreLocation& BlockStoreLocation) {
- DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags);
- m_SlogFile.Append({.Key = HashKey, .Location = Location});
-
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- if (m_UpdatedKeys)
- {
- m_UpdatedKeys->insert(HashKey);
- }
- if (auto It = m_Index.find(HashKey); It != m_Index.end())
- {
- PayloadIndex EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size()));
- BucketPayload& Payload = m_Payloads[EntryIndex];
-
- RemoveMemCachedData(IndexLock, Payload);
- RemoveMetaData(IndexLock, Payload);
-
- Payload = (BucketPayload{.Location = Location});
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
-
- if (m_Configuration.EnableReferenceCaching)
- {
- SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References);
- }
- }
- else
- {
- PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
- m_Payloads.emplace_back(BucketPayload{.Location = Location});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- if (m_Configuration.EnableReferenceCaching)
- {
- m_FirstReferenceIndex.emplace_back(ReferenceIndex{});
- SetReferences(IndexLock, m_FirstReferenceIndex.back(), References);
- }
- m_Index.insert_or_assign(HashKey, EntryIndex);
- }
- });
-}
-
-std::string
-ZenCacheDiskLayer::CacheBucket::GetGcName(GcCtx&)
-{
- return fmt::format("cachebucket:'{}'", m_BucketDir.string());
-}
-
-class DiskBucketStoreCompactor : public GcStoreCompactor
-{
-public:
- DiskBucketStoreCompactor(ZenCacheDiskLayer::CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys)
- : m_Bucket(Bucket)
- , m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys))
- {
- m_ExpiredStandaloneKeys.shrink_to_fit();
- }
-
- virtual ~DiskBucketStoreCompactor() {}
-
- virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override
- {
- ZEN_TRACE_CPU("Z$::Bucket::CompactStore");
-
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- Reset(m_ExpiredStandaloneKeys);
- if (!Ctx.Settings.Verbose)
- {
- return;
- }
- ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': RemovedDisk: {} in {}",
- m_Bucket.m_BucketDir,
- NiceBytes(Stats.RemovedDisk),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- if (!m_ExpiredStandaloneKeys.empty())
- {
- // Compact standalone items
- size_t Skipped = 0;
- ExtendablePathBuilder<256> Path;
- for (const std::pair<IoHash, uint64_t>& ExpiredKey : m_ExpiredStandaloneKeys)
- {
- if (Ctx.IsCancelledFlag.load())
- {
- return;
- }
- Path.Reset();
- m_Bucket.BuildPath(Path, ExpiredKey.first);
- fs::path FilePath = Path.ToPath();
-
- RwLock::SharedLockScope IndexLock(m_Bucket.m_IndexLock);
- if (m_Bucket.m_Index.contains(ExpiredKey.first))
- {
- // Someone added it back, let the file on disk be
- ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back",
- m_Bucket.m_BucketDir,
- Path.ToUtf8());
- continue;
- }
-
- if (Ctx.Settings.IsDeleteMode)
- {
- RwLock::ExclusiveLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first));
- IndexLock.ReleaseNow();
- ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8());
-
- std::error_code Ec;
- if (!fs::remove(FilePath, Ec))
- {
- continue;
- }
- if (Ec)
- {
- ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'",
- m_Bucket.m_BucketDir,
- Path.ToUtf8(),
- Ec.message());
- continue;
- }
- Stats.RemovedDisk += ExpiredKey.second;
- }
- else
- {
- RwLock::SharedLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first));
- IndexLock.ReleaseNow();
- ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8());
-
- std::error_code Ec;
- bool Existed = std::filesystem::is_regular_file(FilePath, Ec);
- if (Ec)
- {
- ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'",
- m_Bucket.m_BucketDir,
- FilePath,
- Ec.message());
- continue;
- }
- if (!Existed)
- {
- continue;
- }
- Skipped++;
- }
- }
- if (Skipped > 0)
- {
- ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipped deleting of {} eligible files", m_Bucket.m_BucketDir, Skipped);
- }
- }
-
- if (Ctx.Settings.CollectSmallObjects)
- {
- m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys = std::make_unique<HashSet>(); });
- auto __ = MakeGuard([&]() { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys.reset(); }); });
-
- size_t InlineEntryCount = 0;
- BlockStore::BlockUsageMap BlockUsage;
- {
- RwLock::SharedLockScope ___(m_Bucket.m_IndexLock);
- for (const auto& Entry : m_Bucket.m_Index)
- {
- ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second;
- const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index];
- const DiskLocation& Loc = Payload.Location;
-
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- continue;
- }
- InlineEntryCount++;
- uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex();
- uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment);
- if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end())
- {
- It->second.EntryCount++;
- It->second.DiskUsage += ChunkSize;
- }
- else
- {
- BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1});
- }
- }
- }
-
- {
- BlockStoreCompactState BlockCompactState;
- std::vector<IoHash> BlockCompactStateKeys;
- BlockCompactStateKeys.reserve(InlineEntryCount);
-
- BlockStore::BlockEntryCountMap BlocksToCompact =
- m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent);
- BlockCompactState.IncludeBlocks(BlocksToCompact);
-
- if (BlocksToCompact.size() > 0)
- {
- {
- RwLock::SharedLockScope ___(m_Bucket.m_IndexLock);
- for (const auto& Entry : m_Bucket.m_Index)
- {
- ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second;
- const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index];
- const DiskLocation& Loc = Payload.Location;
-
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- continue;
- }
- if (!BlockCompactState.AddKeepLocation(Loc.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment)))
- {
- continue;
- }
- BlockCompactStateKeys.push_back(Entry.first);
- }
- }
-
- if (Ctx.Settings.IsDeleteMode)
- {
- if (Ctx.Settings.Verbose)
- {
- ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks",
- m_Bucket.m_BucketDir,
- BlocksToCompact.size());
- }
-
- m_Bucket.m_BlockStore.CompactBlocks(
- BlockCompactState,
- m_Bucket.m_Configuration.PayloadAlignment,
- [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) {
- std::vector<DiskIndexEntry> MovedEntries;
- MovedEntries.reserve(MovedArray.size());
- RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock);
- for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
- {
- size_t ChunkIndex = Moved.first;
- const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
-
- if (m_Bucket.m_UpdatedKeys->contains(Key))
- {
- continue;
- }
-
- if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end())
- {
- ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second];
- const BlockStoreLocation& NewLocation = Moved.second;
- Payload.Location = DiskLocation(NewLocation,
- m_Bucket.m_Configuration.PayloadAlignment,
- Payload.Location.GetFlags());
- MovedEntries.push_back({.Key = Key, .Location = Payload.Location});
- }
- }
- m_Bucket.m_SlogFile.Append(MovedEntries);
- Stats.RemovedDisk += FreedDiskSpace;
- if (Ctx.IsCancelledFlag.load())
- {
- return false;
- }
- return true;
- },
- ClaimDiskReserveCallback);
- }
- else
- {
- if (Ctx.Settings.Verbose)
- {
- ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks",
- m_Bucket.m_BucketDir,
- BlocksToCompact.size());
- }
- }
- }
- }
- }
- }
-
-private:
- ZenCacheDiskLayer::CacheBucket& m_Bucket;
- std::vector<std::pair<IoHash, uint64_t>> m_ExpiredStandaloneKeys;
-};
-
-GcStoreCompactor*
-ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
-{
- ZEN_TRACE_CPU("Z$::Bucket::RemoveExpiredData");
-
- size_t TotalEntries = 0;
-
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- if (!Ctx.Settings.Verbose)
- {
- return;
- }
- ZEN_INFO("GCV2: cachebucket [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}",
- m_BucketDir,
- Stats.CheckedCount,
- Stats.FoundCount,
- Stats.DeletedCount,
- NiceBytes(Stats.FreedMemory),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count();
-
- std::vector<DiskIndexEntry> ExpiredEntries;
- std::vector<std::pair<IoHash, uint64_t>> ExpiredStandaloneKeys;
- uint64_t RemovedStandaloneSize = 0;
- {
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- if (Ctx.IsCancelledFlag.load())
- {
- return nullptr;
- }
- if (m_Index.empty())
- {
- return nullptr;
- }
-
- TotalEntries = m_Index.size();
-
- // Find out expired keys
- for (const auto& Entry : m_Index)
- {
- const IoHash& Key = Entry.first;
- PayloadIndex EntryIndex = Entry.second;
- GcClock::Tick AccessTime = m_AccessTimes[EntryIndex];
- if (AccessTime >= ExpireTicks)
- {
- continue;
- }
-
- const BucketPayload& Payload = m_Payloads[EntryIndex];
- DiskIndexEntry ExpiredEntry = {.Key = Key, .Location = Payload.Location};
- ExpiredEntry.Location.Flags |= DiskLocation::kTombStone;
-
- if (Payload.Location.Flags & DiskLocation::kStandaloneFile)
- {
- ExpiredStandaloneKeys.push_back({Key, Payload.Location.Size()});
- RemovedStandaloneSize += Payload.Location.Size();
- ExpiredEntries.push_back(ExpiredEntry);
- }
- else if (Ctx.Settings.CollectSmallObjects)
- {
- ExpiredEntries.push_back(ExpiredEntry);
- }
- }
-
- Stats.CheckedCount += TotalEntries;
- Stats.FoundCount += ExpiredEntries.size();
-
- if (Ctx.IsCancelledFlag.load())
- {
- return nullptr;
- }
-
- if (Ctx.Settings.IsDeleteMode)
- {
- for (const DiskIndexEntry& Entry : ExpiredEntries)
- {
- auto It = m_Index.find(Entry.Key);
- ZEN_ASSERT(It != m_Index.end());
- BucketPayload& Payload = m_Payloads[It->second];
- RemoveMetaData(IndexLock, Payload);
- Stats.FreedMemory += RemoveMemCachedData(IndexLock, Payload);
- m_Index.erase(It);
- Stats.DeletedCount++;
- }
- m_SlogFile.Append(ExpiredEntries);
- m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed);
- }
- }
-
- if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty())
- {
- std::vector<BucketPayload> Payloads;
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketMetaData> MetaDatas;
- std::vector<MemCacheData> MemCachedPayloads;
- std::vector<ReferenceIndex> FirstReferenceIndex;
- IndexMap Index;
- {
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock);
- }
- }
-
- if (Ctx.IsCancelledFlag.load())
- {
- return nullptr;
- }
-
- return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys));
-}
-
-class DiskBucketReferenceChecker : public GcReferenceChecker
-{
- using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex;
- using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload;
- using CacheBucket = ZenCacheDiskLayer::CacheBucket;
- using ReferenceIndex = ZenCacheDiskLayer::CacheBucket::ReferenceIndex;
-
-public:
- DiskBucketReferenceChecker(CacheBucket& Owner) : m_CacheBucket(Owner) {}
-
- virtual ~DiskBucketReferenceChecker()
- {
- try
- {
- m_IndexLock.reset();
- if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
- {
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
- // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it
- m_CacheBucket.ClearReferenceCache();
- }
- }
- catch (std::exception& Ex)
- {
- ZEN_ERROR("~DiskBucketReferenceChecker threw exception: '{}'", Ex.what());
- }
- }
-
- virtual void PreCache(GcCtx& Ctx) override
- {
- ZEN_TRACE_CPU("Z$::Bucket::PreCache");
-
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- if (!Ctx.Settings.Verbose)
- {
- return;
- }
- ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}",
- m_CacheBucket.m_BucketDir,
- m_CacheBucket.m_ReferenceCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- std::vector<IoHash> UpdateKeys;
- std::vector<size_t> ReferenceCounts;
- std::vector<IoHash> References;
-
- auto GetAttachments = [&References, &ReferenceCounts](const void* CbObjectData) {
- size_t CurrentReferenceCount = References.size();
- CbObjectView Obj(CbObjectData);
- Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); });
- ReferenceCounts.push_back(References.size() - CurrentReferenceCount);
- };
-
- // Refresh cache
- {
- // If reference caching is enabled the references will be updated at modification for us so we don't need to track modifications
- if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
- {
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys = std::make_unique<HashSet>(); });
- }
-
- std::vector<IoHash> StandaloneKeys;
- {
- std::vector<IoHash> InlineKeys;
- std::unordered_map<uint32_t, std::size_t> BlockIndexToEntriesPerBlockIndex;
- struct InlineEntry
- {
- uint32_t InlineKeyIndex;
- uint32_t Offset;
- uint32_t Size;
- };
- std::vector<std::vector<InlineEntry>> EntriesPerBlock;
- size_t UpdateCount = 0;
- {
- RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock);
- for (const auto& Entry : m_CacheBucket.m_Index)
- {
- if (Ctx.IsCancelledFlag.load())
- {
- IndexLock.ReleaseNow();
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
- return;
- }
-
- PayloadIndex EntryIndex = Entry.second;
- const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex];
- const DiskLocation& Loc = Payload.Location;
-
- if (!Loc.IsFlagSet(DiskLocation::kStructured))
- {
- continue;
- }
- if (m_CacheBucket.m_Configuration.EnableReferenceCaching &&
- m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown())
- {
- continue;
- }
- UpdateCount++;
- const IoHash& Key = Entry.first;
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- StandaloneKeys.push_back(Key);
- continue;
- }
-
- BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment);
- InlineEntry UpdateEntry = {.InlineKeyIndex = gsl::narrow<uint32_t>(InlineKeys.size()),
- .Offset = gsl::narrow<uint32_t>(ChunkLocation.Offset),
- .Size = gsl::narrow<uint32_t>(ChunkLocation.Size)};
- InlineKeys.push_back(Key);
-
- if (auto It = BlockIndexToEntriesPerBlockIndex.find(ChunkLocation.BlockIndex);
- It != BlockIndexToEntriesPerBlockIndex.end())
- {
- EntriesPerBlock[It->second].emplace_back(UpdateEntry);
- }
- else
- {
- BlockIndexToEntriesPerBlockIndex.insert_or_assign(ChunkLocation.BlockIndex, EntriesPerBlock.size());
- EntriesPerBlock.emplace_back(std::vector<InlineEntry>{UpdateEntry});
- }
- }
- }
-
- UpdateKeys.reserve(UpdateCount);
-
- for (auto It : BlockIndexToEntriesPerBlockIndex)
- {
- uint32_t BlockIndex = It.first;
-
- Ref<BlockStoreFile> BlockFile = m_CacheBucket.m_BlockStore.GetBlockFile(BlockIndex);
- if (BlockFile)
- {
- size_t EntriesPerBlockIndex = It.second;
- std::vector<InlineEntry>& InlineEntries = EntriesPerBlock[EntriesPerBlockIndex];
-
- std::sort(InlineEntries.begin(), InlineEntries.end(), [&](const InlineEntry& Lhs, const InlineEntry& Rhs) -> bool {
- return Lhs.Offset < Rhs.Offset;
- });
-
- uint64_t BlockFileSize = BlockFile->FileSize();
- BasicFileBuffer BlockBuffer(BlockFile->GetBasicFile(), 32768);
- for (const InlineEntry& InlineEntry : InlineEntries)
- {
- if ((InlineEntry.Offset + InlineEntry.Size) > BlockFileSize)
- {
- ReferenceCounts.push_back(0);
- }
- else
- {
- MemoryView ChunkView = BlockBuffer.MakeView(InlineEntry.Size, InlineEntry.Offset);
- if (ChunkView.GetSize() == InlineEntry.Size)
- {
- GetAttachments(ChunkView.GetData());
- }
- else
- {
- std::vector<uint8_t> Buffer(InlineEntry.Size);
- BlockBuffer.Read(Buffer.data(), InlineEntry.Size, InlineEntry.Offset);
- GetAttachments(Buffer.data());
- }
- }
- const IoHash& Key = InlineKeys[InlineEntry.InlineKeyIndex];
- UpdateKeys.push_back(Key);
- }
- }
- }
- }
- {
- for (const IoHash& Key : StandaloneKeys)
- {
- if (Ctx.IsCancelledFlag.load())
- {
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
- return;
- }
-
- IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(ZenContentType::kCbObject, Key);
- if (!Buffer)
- {
- continue;
- }
-
- GetAttachments(Buffer.GetData());
- UpdateKeys.push_back(Key);
- }
- }
- }
-
- {
- size_t ReferenceOffset = 0;
- RwLock::ExclusiveLockScope IndexLock(m_CacheBucket.m_IndexLock);
-
- if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
- {
- ZEN_ASSERT(m_CacheBucket.m_FirstReferenceIndex.empty());
- ZEN_ASSERT(m_CacheBucket.m_ReferenceHashes.empty());
- ZEN_ASSERT(m_CacheBucket.m_NextReferenceHashesIndexes.empty());
- ZEN_ASSERT(m_CacheBucket.m_ReferenceCount == 0);
- ZEN_ASSERT(m_CacheBucket.m_UpdatedKeys);
-
- // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when
- // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted.
- m_CacheBucket.m_FirstReferenceIndex.resize(m_CacheBucket.m_Payloads.size(), ReferenceIndex::Unknown());
- m_CacheBucket.m_ReferenceHashes.reserve(References.size());
- m_CacheBucket.m_NextReferenceHashesIndexes.reserve(References.size());
- }
- else
- {
- ZEN_ASSERT(!m_CacheBucket.m_UpdatedKeys);
- }
-
- for (size_t Index = 0; Index < UpdateKeys.size(); Index++)
- {
- const IoHash& Key = UpdateKeys[Index];
- size_t ReferenceCount = ReferenceCounts[Index];
- if (auto It = m_CacheBucket.m_Index.find(Key); It != m_CacheBucket.m_Index.end())
- {
- PayloadIndex EntryIndex = It->second;
- if (m_CacheBucket.m_Configuration.EnableReferenceCaching)
- {
- if (m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown())
- {
- // The reference data is valid and what we have is old/redundant
- continue;
- }
- }
- else if (m_CacheBucket.m_UpdatedKeys->contains(Key))
- {
- // Our pre-cache data is invalid
- continue;
- }
-
- m_CacheBucket.SetReferences(IndexLock,
- m_CacheBucket.m_FirstReferenceIndex[EntryIndex],
- std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount});
- }
- ReferenceOffset += ReferenceCount;
- }
-
- if (m_CacheBucket.m_Configuration.EnableReferenceCaching && !UpdateKeys.empty())
- {
- m_CacheBucket.CompactReferences(IndexLock);
- }
- }
- }
-
- virtual void LockState(GcCtx& Ctx) override
- {
- ZEN_TRACE_CPU("Z$::Bucket::LockState");
-
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- if (!Ctx.Settings.Verbose)
- {
- return;
- }
- ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}",
- m_CacheBucket.m_BucketDir,
- m_CacheBucket.m_ReferenceCount + m_UncachedReferences.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock);
- if (Ctx.IsCancelledFlag.load())
- {
- m_UncachedReferences.clear();
- m_IndexLock.reset();
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
- return;
- }
-
- if (m_CacheBucket.m_UpdatedKeys)
- {
- const HashSet& UpdatedKeys(*m_CacheBucket.m_UpdatedKeys);
- for (const IoHash& Key : UpdatedKeys)
- {
- if (Ctx.IsCancelledFlag.load())
- {
- m_UncachedReferences.clear();
- m_IndexLock.reset();
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
- return;
- }
-
- auto It = m_CacheBucket.m_Index.find(Key);
- if (It == m_CacheBucket.m_Index.end())
- {
- continue;
- }
-
- PayloadIndex EntryIndex = It->second;
- const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex];
- const DiskLocation& Loc = Payload.Location;
-
- if (!Loc.IsFlagSet(DiskLocation::kStructured))
- {
- continue;
- }
-
- IoBuffer Buffer;
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- Buffer = m_CacheBucket.GetStandaloneCacheValue(Loc.GetContentType(), Key);
- }
- else
- {
- Buffer = m_CacheBucket.GetInlineCacheValue(Loc);
- }
-
- if (Buffer)
- {
- ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
- CbObjectView Obj(Buffer.GetData());
- Obj.IterateAttachments([this](CbFieldView Field) { m_UncachedReferences.insert(Field.AsAttachment()); });
- }
- }
- }
- }
-
- virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
- {
- ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet");
-
- ZEN_ASSERT(m_IndexLock);
- size_t InitialCount = IoCids.size();
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- if (!Ctx.Settings.Verbose)
- {
- return;
- }
- ZEN_INFO("GCV2: cachebucket [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}",
- m_CacheBucket.m_BucketDir,
- InitialCount - IoCids.size(),
- InitialCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes)
- {
- if (IoCids.erase(ReferenceHash) == 1)
- {
- if (IoCids.empty())
- {
- return;
- }
- }
- }
-
- for (const IoHash& ReferenceHash : m_UncachedReferences)
- {
- if (IoCids.erase(ReferenceHash) == 1)
- {
- if (IoCids.empty())
- {
- return;
- }
- }
- }
- }
- CacheBucket& m_CacheBucket;
- std::unique_ptr<RwLock::SharedLockScope> m_IndexLock;
- HashSet m_UncachedReferences;
-};
-
-std::vector<GcReferenceChecker*>
-ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
-{
- ZEN_TRACE_CPU("Z$::Bucket::CreateReferenceCheckers");
-
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- if (!Ctx.Settings.Verbose)
- {
- return;
- }
- ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': completed in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- {
- RwLock::SharedLockScope __(m_IndexLock);
- if (m_Index.empty())
- {
- return {};
- }
- }
-
- return {new DiskBucketReferenceChecker(*this)};
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&)
-{
- ZEN_TRACE_CPU("Z$::Bucket::CompactReferences");
-
- std::vector<ReferenceIndex> FirstReferenceIndex;
- std::vector<IoHash> NewReferenceHashes;
- std::vector<ReferenceIndex> NewNextReferenceHashesIndexes;
-
- FirstReferenceIndex.reserve(m_ReferenceCount);
- NewReferenceHashes.reserve(m_ReferenceCount);
- NewNextReferenceHashesIndexes.reserve(m_ReferenceCount);
-
- for (const auto& It : m_Index)
- {
- ReferenceIndex SourceIndex = m_FirstReferenceIndex[It.second];
- if (SourceIndex == ReferenceIndex::Unknown())
- {
- FirstReferenceIndex.push_back(ReferenceIndex{});
- continue;
- }
- if (SourceIndex == ReferenceIndex::None())
- {
- FirstReferenceIndex.push_back(ReferenceIndex::None());
- continue;
- }
- FirstReferenceIndex.push_back(ReferenceIndex{NewNextReferenceHashesIndexes.size()});
- NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]);
- NewNextReferenceHashesIndexes.push_back(ReferenceIndex::None());
-
- SourceIndex = m_NextReferenceHashesIndexes[SourceIndex];
- while (SourceIndex != ReferenceIndex::None())
- {
- NewNextReferenceHashesIndexes.back() = ReferenceIndex{NewReferenceHashes.size()};
- NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]);
- NewNextReferenceHashesIndexes.push_back(ReferenceIndex::None());
- SourceIndex = m_NextReferenceHashesIndexes[SourceIndex];
- }
- }
- m_FirstReferenceIndex.swap(FirstReferenceIndex);
- m_ReferenceHashes.swap(NewReferenceHashes);
- m_ReferenceHashes.shrink_to_fit();
- m_NextReferenceHashesIndexes.swap(NewNextReferenceHashesIndexes);
- m_NextReferenceHashesIndexes.shrink_to_fit();
- m_ReferenceCount = m_ReferenceHashes.size();
-}
-
-ZenCacheDiskLayer::CacheBucket::ReferenceIndex
-ZenCacheDiskLayer::CacheBucket::AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key)
-{
- ReferenceIndex NewIndex = ReferenceIndex{m_ReferenceHashes.size()};
- m_ReferenceHashes.push_back(Key);
- m_NextReferenceHashesIndexes.emplace_back(ReferenceIndex::None());
- m_ReferenceCount++;
- return NewIndex;
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::SetReferences(RwLock::ExclusiveLockScope& Lock,
- ReferenceIndex& FirstReferenceIndex,
- std::span<IoHash> References)
-{
- auto ReferenceIt = References.begin();
-
- if (FirstReferenceIndex == ReferenceIndex::Unknown())
- {
- FirstReferenceIndex = ReferenceIndex::None();
- }
-
- ReferenceIndex CurrentIndex = FirstReferenceIndex;
- if (CurrentIndex != ReferenceIndex::None())
- {
- if (ReferenceIt != References.end())
- {
- ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero);
- if (CurrentIndex == ReferenceIndex::None())
- {
- CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt);
- FirstReferenceIndex = CurrentIndex;
- }
- else
- {
- m_ReferenceHashes[CurrentIndex] = *ReferenceIt;
- }
- ReferenceIt++;
- }
- }
- else
- {
- if (ReferenceIt != References.end())
- {
- ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero);
- CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt);
- ReferenceIt++;
- }
- FirstReferenceIndex = CurrentIndex;
- }
-
- while (ReferenceIt != References.end())
- {
- ZEN_ASSERT(CurrentIndex != ReferenceIndex::None());
- ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero);
- ReferenceIndex NextReferenceIndex = m_NextReferenceHashesIndexes[CurrentIndex];
- if (NextReferenceIndex == ReferenceIndex::None())
- {
- NextReferenceIndex = AllocateReferenceEntry(Lock, *ReferenceIt);
- m_NextReferenceHashesIndexes[CurrentIndex] = NextReferenceIndex;
- }
- else
- {
- m_ReferenceHashes[NextReferenceIndex] = *ReferenceIt;
- }
- CurrentIndex = NextReferenceIndex;
- ReferenceIt++;
- }
-
- while (CurrentIndex != ReferenceIndex::None())
- {
- ReferenceIndex NextIndex = m_NextReferenceHashesIndexes[CurrentIndex];
- if (NextIndex != ReferenceIndex::None())
- {
- m_ReferenceHashes[CurrentIndex] = IoHash::Zero;
- ZEN_ASSERT(m_ReferenceCount > 0);
- m_ReferenceCount--;
- m_NextReferenceHashesIndexes[CurrentIndex] = ReferenceIndex::None();
- }
- CurrentIndex = NextIndex;
- }
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::RemoveReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex)
-{
- if (FirstReferenceIndex == ReferenceIndex::Unknown())
- {
- return;
- }
- ReferenceIndex CurrentIndex = FirstReferenceIndex;
- while (CurrentIndex == ReferenceIndex::None())
- {
- m_ReferenceHashes[CurrentIndex] = IoHash::Zero;
- ZEN_ASSERT(m_ReferenceCount > 0);
- m_ReferenceCount--;
- CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex];
- }
- FirstReferenceIndex = {};
-}
-
-bool
-ZenCacheDiskLayer::CacheBucket::LockedGetReferences(ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const
-{
- if (FirstReferenceIndex == ReferenceIndex::Unknown())
- {
- return false;
- }
-
- ReferenceIndex CurrentIndex = FirstReferenceIndex;
- while (CurrentIndex != ReferenceIndex::None())
- {
- ZEN_ASSERT_SLOW(m_ReferenceHashes[CurrentIndex] != IoHash::Zero);
- OutReferences.push_back(m_ReferenceHashes[CurrentIndex]);
- CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex];
- }
- return true;
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::ClearReferenceCache()
-{
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- Reset(m_FirstReferenceIndex);
- Reset(m_ReferenceHashes);
- Reset(m_NextReferenceHashesIndexes);
- m_ReferenceCount = 0;
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
- std::vector<BucketPayload>& Payloads,
- std::vector<AccessTime>& AccessTimes,
- std::vector<BucketMetaData>& MetaDatas,
- std::vector<MemCacheData>& MemCachedPayloads,
- std::vector<ReferenceIndex>& FirstReferenceIndex,
- IndexMap& Index,
- RwLock::ExclusiveLockScope& IndexLock)
-{
- ZEN_TRACE_CPU("Z$::Bucket::CompactState");
-
- size_t EntryCount = m_Index.size();
- Payloads.reserve(EntryCount);
- AccessTimes.reserve(EntryCount);
- if (m_Configuration.EnableReferenceCaching)
- {
- FirstReferenceIndex.reserve(EntryCount);
- }
- Index.reserve(EntryCount);
- Index.min_load_factor(IndexMinLoadFactor);
- Index.max_load_factor(IndexMaxLoadFactor);
- for (auto It : m_Index)
- {
- PayloadIndex EntryIndex = PayloadIndex(Payloads.size());
- Payloads.push_back(m_Payloads[It.second]);
- BucketPayload& Payload = Payloads.back();
- AccessTimes.push_back(m_AccessTimes[It.second]);
- if (Payload.MetaData)
- {
- MetaDatas.push_back(m_MetaDatas[Payload.MetaData]);
- Payload.MetaData = MetaDataIndex(MetaDatas.size() - 1);
- }
- if (Payload.MemCached)
- {
- MemCachedPayloads.emplace_back(
- MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex});
- Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(MemCachedPayloads.size() - 1));
- }
- if (m_Configuration.EnableReferenceCaching)
- {
- FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]);
- }
- Index.insert({It.first, EntryIndex});
- }
- m_Index.swap(Index);
- m_Payloads.swap(Payloads);
- m_AccessTimes.swap(AccessTimes);
- m_MetaDatas.swap(MetaDatas);
- Reset(m_FreeMetaDatas);
- m_MemCachedPayloads.swap(MemCachedPayloads);
- Reset(m_FreeMemCachedPayloads);
- if (m_Configuration.EnableReferenceCaching)
- {
- m_FirstReferenceIndex.swap(FirstReferenceIndex);
- CompactReferences(IndexLock);
- }
-}
-
-#if ZEN_WITH_TESTS
-void
-ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time)
-{
- GcClock::Tick TimeTick = Time.time_since_epoch().count();
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- if (auto It = m_Index.find(HashKey); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_AccessTimes[EntryIndex] = TimeTick;
- }
-}
-#endif // ZEN_WITH_TESTS
-
-//////////////////////////////////////////////////////////////////////////
-
-ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config)
-: m_Gc(Gc)
-, m_JobQueue(JobQueue)
-, m_RootDir(RootDir)
-, m_Configuration(Config)
-{
-}
-
-ZenCacheDiskLayer::~ZenCacheDiskLayer()
-{
- try
- {
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- for (auto& It : m_Buckets)
- {
- m_DroppedBuckets.emplace_back(std::move(It.second));
- }
- m_Buckets.clear();
- }
- // We destroy the buckets without holding a lock since destructor calls GcManager::RemoveGcReferencer which takes an exclusive lock.
- // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock
- m_DroppedBuckets.clear();
- }
- catch (std::exception& Ex)
- {
- ZEN_ERROR("~ZenCacheDiskLayer() failed. Reason: '{}'", Ex.what());
- }
-}
-
-ZenCacheDiskLayer::CacheBucket*
-ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
-{
- ZEN_TRACE_CPU("Z$::GetOrCreateBucket");
- const auto BucketName = std::string(InBucket);
-
- {
- RwLock::SharedLockScope SharedLock(m_Lock);
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
- {
- return It->second.get();
- }
- }
-
- // We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock.
- // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock
- std::unique_ptr<CacheBucket> Bucket(
- std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
-
- RwLock::ExclusiveLockScope Lock(m_Lock);
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
- {
- return It->second.get();
- }
-
- std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= BucketName;
- try
- {
- if (!Bucket->OpenOrCreate(BucketPath))
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
- return nullptr;
- }
- }
- catch (const std::exception& Err)
- {
- ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
- throw;
- }
-
- CacheBucket* Result = Bucket.get();
- m_Buckets.emplace(BucketName, std::move(Bucket));
-
- return Result;
-}
-
-bool
-ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- ZEN_TRACE_CPU("Z$::Get");
-
- if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
- {
- if (Bucket->Get(HashKey, OutValue))
- {
- TryMemCacheTrim();
- return true;
- }
- }
- return false;
-}
-
-void
-ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
-{
- ZEN_TRACE_CPU("Z$::Put");
-
- if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
- {
- Bucket->Put(HashKey, Value, References);
- TryMemCacheTrim();
- }
-}
-
-void
-ZenCacheDiskLayer::DiscoverBuckets()
-{
- ZEN_TRACE_CPU("Z$::DiscoverBuckets");
-
- DirectoryContent DirContent;
- GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent);
-
- // Initialize buckets
-
- std::vector<std::filesystem::path> BadBucketDirectories;
- std::vector<std::filesystem::path> FoundBucketDirectories;
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- for (const std::filesystem::path& BucketPath : DirContent.Directories)
- {
- const std::string BucketName = PathToUtf8(BucketPath.stem());
-
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
- {
- continue;
- }
-
- if (IsKnownBadBucketName(BucketName))
- {
- BadBucketDirectories.push_back(BucketPath);
-
- continue;
- }
-
- FoundBucketDirectories.push_back(BucketPath);
- ZEN_INFO("Discovered bucket '{}'", BucketName);
- }
-
- for (const std::filesystem::path& BadBucketPath : BadBucketDirectories)
- {
- bool IsOk = false;
-
- try
- {
- IsOk = DeleteDirectories(BadBucketPath);
- }
- catch (std::exception&)
- {
- }
-
- if (IsOk)
- {
- ZEN_INFO("found bad bucket at '{}', deleted contents", BadBucketPath);
- }
- else
- {
- ZEN_WARN("bad bucket delete failed for '{}'", BadBucketPath);
- }
- }
-
- RwLock SyncLock;
-
- WorkerThreadPool& Pool = GetLargeWorkerPool();
- Latch WorkLatch(1);
- for (auto& BucketPath : FoundBucketDirectories)
- {
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([&]() {
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
-
- const std::string BucketName = PathToUtf8(BucketPath.stem());
- try
- {
- std::unique_ptr<CacheBucket> NewBucket =
- std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig);
-
- CacheBucket* Bucket = nullptr;
- {
- RwLock::ExclusiveLockScope __(SyncLock);
- auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket));
- Bucket = InsertResult.first->second.get();
- }
- ZEN_ASSERT(Bucket);
-
- if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false))
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
-
- {
- RwLock::ExclusiveLockScope __(SyncLock);
- m_Buckets.erase(BucketName);
- }
- }
- }
- catch (const std::exception& Err)
- {
- ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
- return;
- }
- });
- }
- WorkLatch.CountDown();
- WorkLatch.Wait();
-}
-
-bool
-ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
-{
- ZEN_TRACE_CPU("Z$::DropBucket");
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- auto It = m_Buckets.find(std::string(InBucket));
-
- if (It != m_Buckets.end())
- {
- CacheBucket& Bucket = *It->second;
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It);
-
- return Bucket.Drop();
- }
-
- // Make sure we remove the folder even if we don't know about the bucket
- std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= std::string(InBucket);
- return MoveAndDeleteDirectory(BucketPath);
-}
-
-bool
-ZenCacheDiskLayer::Drop()
-{
- ZEN_TRACE_CPU("Z$::Drop");
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- std::vector<std::unique_ptr<CacheBucket>> Buckets;
- Buckets.reserve(m_Buckets.size());
- while (!m_Buckets.empty())
- {
- const auto& It = m_Buckets.begin();
- CacheBucket& Bucket = *It->second;
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It->first);
- if (!Bucket.Drop())
- {
- return false;
- }
- }
- return MoveAndDeleteDirectory(m_RootDir);
-}
-
-void
-ZenCacheDiskLayer::Flush()
-{
- ZEN_TRACE_CPU("Z$::Flush");
-
- std::vector<CacheBucket*> Buckets;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- if (Buckets.empty())
- {
- return;
- }
- ZEN_INFO("Flushed {} buckets at '{}' in {}", Buckets.size(), m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- {
- RwLock::SharedLockScope __(m_Lock);
- Buckets.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- CacheBucket* Bucket = Kv.second.get();
- Buckets.push_back(Bucket);
- }
- }
- {
- WorkerThreadPool& Pool = GetSmallWorkerPool();
- Latch WorkLatch(1);
- for (auto& Bucket : Buckets)
- {
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([&]() {
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
- Bucket->Flush();
- });
- }
- WorkLatch.CountDown();
- while (!WorkLatch.Wait(1000))
- {
- ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir);
- }
- }
-}
-
-void
-ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
-{
- ZEN_TRACE_CPU("Z$::ScrubStorage");
-
- RwLock::SharedLockScope _(m_Lock);
- {
- std::vector<std::future<void>> Results;
- Results.reserve(m_Buckets.size());
-
- for (auto& Kv : m_Buckets)
- {
-#if 1
- Results.push_back(Ctx.ThreadPool().EnqueueTask(
- std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
-#else
- CacheBucket& Bucket = *Kv.second;
- Bucket.ScrubStorage(Ctx);
-#endif
- }
-
- for (auto& Result : Results)
- {
- Result.get();
- }
- }
-}
-
-void
-ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::GatherReferences");
-
- std::vector<CacheBucket*> Buckets;
- {
- RwLock::SharedLockScope _(m_Lock);
- Buckets.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Buckets.push_back(Kv.second.get());
- }
- }
- for (CacheBucket* Bucket : Buckets)
- {
- Bucket->GatherReferences(GcCtx);
- }
-}
-
-GcStorageSize
-ZenCacheDiskLayer::StorageSize() const
-{
- GcStorageSize StorageSize{};
-
- RwLock::SharedLockScope _(m_Lock);
- for (auto& Kv : m_Buckets)
- {
- GcStorageSize BucketSize = Kv.second->StorageSize();
- StorageSize.DiskSize += BucketSize.DiskSize;
- StorageSize.MemorySize += BucketSize.MemorySize;
- }
-
- return StorageSize;
-}
-
-ZenCacheDiskLayer::DiskStats
-ZenCacheDiskLayer::Stats() const
-{
- GcStorageSize Size = StorageSize();
- ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize};
- {
- RwLock::SharedLockScope _(m_Lock);
- Stats.BucketStats.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Stats.BucketStats.emplace_back(NamedBucketStats{.BucketName = Kv.first, .Stats = Kv.second->Stats()});
- }
- }
- return Stats;
-}
-
-ZenCacheDiskLayer::Info
-ZenCacheDiskLayer::GetInfo() const
-{
- ZenCacheDiskLayer::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration};
- {
- RwLock::SharedLockScope _(m_Lock);
- Info.BucketNames.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Info.BucketNames.push_back(Kv.first);
- Info.EntryCount += Kv.second->EntryCount();
- GcStorageSize BucketSize = Kv.second->StorageSize();
- Info.StorageSize.DiskSize += BucketSize.DiskSize;
- Info.StorageSize.MemorySize += BucketSize.MemorySize;
- }
- }
- return Info;
-}
-
-std::optional<ZenCacheDiskLayer::BucketInfo>
-ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const
-{
- RwLock::SharedLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
- {
- return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()};
- }
- return {};
-}
-
-void
-ZenCacheDiskLayer::EnumerateBucketContents(std::string_view Bucket,
- std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const
-{
- RwLock::SharedLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
- {
- It->second->EnumerateBucketContents(Fn);
- }
-}
-
-CacheValueDetails::NamespaceDetails
-ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const
-{
- CacheValueDetails::NamespaceDetails Details;
- {
- RwLock::SharedLockScope IndexLock(m_Lock);
- if (BucketFilter.empty())
- {
- Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1);
- for (auto& Kv : m_Buckets)
- {
- Details.Buckets[Kv.first] = Kv.second->GetValueDetails(IndexLock, ValueFilter);
- }
- }
- else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end())
- {
- Details.Buckets[It->first] = It->second->GetValueDetails(IndexLock, ValueFilter);
- }
- }
- return Details;
-}
-
-void
-ZenCacheDiskLayer::MemCacheTrim()
-{
- ZEN_TRACE_CPU("Z$::MemCacheTrim");
-
- ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0);
- ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0);
- ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0);
-
- bool Expected = false;
- if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true))
- {
- return;
- }
-
- try
- {
- m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) {
- ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]");
-
- const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
- uint64_t TrimmedSize = 0;
- Stopwatch Timer;
- const auto Guard = MakeGuard([&] {
- ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
- NiceBytes(TrimmedSize),
- NiceBytes(m_TotalMemCachedSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
-
- const GcClock::Tick NowTick = GcClock::TickCount();
- const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_NextAllowedTrimTick.store(NextTrimTick);
- m_IsMemCacheTrimming.store(false);
- });
-
- const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
-
- static const size_t UsageSlotCount = 2048;
- std::vector<uint64_t> UsageSlots;
- UsageSlots.reserve(UsageSlotCount);
-
- std::vector<CacheBucket*> Buckets;
- {
- RwLock::SharedLockScope __(m_Lock);
- Buckets.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Buckets.push_back(Kv.second.get());
- }
- }
-
- const GcClock::TimePoint Now = GcClock::Now();
- {
- ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess");
- for (CacheBucket* Bucket : Buckets)
- {
- Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots);
- }
- }
-
- uint64_t TotalSize = 0;
- for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
- {
- TotalSize += UsageSlots[Index];
- if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes)
- {
- GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount);
- TrimmedSize = MemCacheTrim(Buckets, ExpireTime);
- break;
- }
- }
- });
- }
- catch (std::exception& Ex)
- {
- ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what());
- m_IsMemCacheTrimming.store(false);
- }
-}
-
-uint64_t
-ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime)
-{
- if (m_Configuration.MemCacheTargetFootprintBytes == 0)
- {
- return 0;
- }
- uint64_t TrimmedSize = 0;
- for (CacheBucket* Bucket : Buckets)
- {
- TrimmedSize += Bucket->MemCacheTrim(ExpireTime);
- }
- const GcClock::TimePoint Now = GcClock::Now();
- const GcClock::Tick NowTick = Now.time_since_epoch().count();
- const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
- GcClock::Tick LastTrimTick = m_NextAllowedTrimTick;
- const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
- return TrimmedSize;
-}
-
-#if ZEN_WITH_TESTS
-void
-ZenCacheDiskLayer::SetAccessTime(std::string_view InBucket, const IoHash& HashKey, GcClock::TimePoint Time)
-{
- const auto BucketName = std::string(InBucket);
- CacheBucket* Bucket = nullptr;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- auto It = m_Buckets.find(BucketName);
-
- if (It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- }
-
- if (Bucket == nullptr)
- {
- return;
- }
- Bucket->SetAccessTime(HashKey, Time);
-}
-#endif // ZEN_WITH_TESTS
-
-} // namespace zen
diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h
deleted file mode 100644
index 6997a12e4..000000000
--- a/src/zenserver/cache/cachedisklayer.h
+++ /dev/null
@@ -1,482 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "cacheshared.h"
-
-#include <zencore/stats.h>
-#include <zenstore/blockstore.h>
-#include <zenstore/caslog.h>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <tsl/robin_map.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-#include <filesystem>
-#include <unordered_map>
-
-namespace zen {
-
-class IoBuffer;
-class JobQueue;
-
-#pragma pack(push)
-#pragma pack(1)
-
-struct DiskLocation
-{
- inline DiskLocation() = default;
-
- inline DiskLocation(uint64_t ValueSize, uint8_t Flags) : Flags(Flags | kStandaloneFile) { Location.StandaloneSize = ValueSize; }
-
- inline DiskLocation(const BlockStoreLocation& Location, uint32_t PayloadAlignment, uint8_t Flags) : Flags(Flags & ~kStandaloneFile)
- {
- this->Location.BlockLocation = BlockStoreDiskLocation(Location, PayloadAlignment);
- }
-
- inline bool operator!=(const DiskLocation& Rhs) const
- {
- if (Flags != Rhs.Flags)
- {
- return true;
- }
- if (Flags & kStandaloneFile)
- {
- return Location.StandaloneSize != Rhs.Location.StandaloneSize;
- }
- return Location.BlockLocation != Rhs.Location.BlockLocation;
- }
-
- inline BlockStoreLocation GetBlockLocation(uint32_t PayloadAlignment) const
- {
- ZEN_ASSERT(!(Flags & kStandaloneFile));
- return Location.BlockLocation.Get(PayloadAlignment);
- }
-
- inline uint64_t Size() const { return (Flags & kStandaloneFile) ? Location.StandaloneSize : Location.BlockLocation.GetSize(); }
- inline uint8_t IsFlagSet(uint64_t Flag) const { return Flags & Flag; }
- inline uint8_t GetFlags() const { return Flags; }
- inline ZenContentType GetContentType() const
- {
- ZenContentType ContentType = ZenContentType::kBinary;
-
- if (IsFlagSet(kStructured))
- {
- ContentType = ZenContentType::kCbObject;
- }
-
- if (IsFlagSet(kCompressed))
- {
- ContentType = ZenContentType::kCompressedBinary;
- }
-
- return ContentType;
- }
-
- union
- {
- BlockStoreDiskLocation BlockLocation; // 10 bytes
- uint64_t StandaloneSize = 0; // 8 bytes
- } Location;
-
- static const uint8_t kStandaloneFile = 0x80u; // Stored as a separate file
- static const uint8_t kStructured = 0x40u; // Serialized as compact binary
- static const uint8_t kTombStone = 0x20u; // Represents a deleted key/value
- static const uint8_t kCompressed = 0x10u; // Stored in compressed buffer format
-
- uint8_t Flags = 0;
- uint8_t Reserved = 0;
-};
-
-struct DiskIndexEntry
-{
- IoHash Key; // 20 bytes
- DiskLocation Location; // 12 bytes
-};
-
-#pragma pack(pop)
-
-static_assert(sizeof(DiskIndexEntry) == 32);
-
-//////////////////////////////////////////////////////////////////////////
-
-class ZenCacheDiskLayer
-{
-public:
- struct BucketConfiguration
- {
- uint64_t MaxBlockSize = 1ull << 30;
- uint32_t PayloadAlignment = 1u << 4;
- uint64_t MemCacheSizeThreshold = 1 * 1024;
- uint64_t LargeObjectThreshold = 128 * 1024;
- bool EnableReferenceCaching = false;
- };
-
- struct Configuration
- {
- BucketConfiguration BucketConfig;
- uint64_t MemCacheTargetFootprintBytes = 512 * 1024 * 1024;
- uint64_t MemCacheTrimIntervalSeconds = 60;
- uint64_t MemCacheMaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count());
- };
-
- struct BucketInfo
- {
- uint64_t EntryCount = 0;
- GcStorageSize StorageSize;
- };
-
- struct Info
- {
- std::filesystem::path RootDir;
- Configuration Config;
- std::vector<std::string> BucketNames;
- uint64_t EntryCount = 0;
- GcStorageSize StorageSize;
- };
-
- struct BucketStats
- {
- uint64_t DiskSize;
- uint64_t MemorySize;
- uint64_t DiskHitCount;
- uint64_t DiskMissCount;
- uint64_t DiskWriteCount;
- uint64_t MemoryHitCount;
- uint64_t MemoryMissCount;
- uint64_t MemoryWriteCount;
- metrics::RequestStatsSnapshot PutOps;
- metrics::RequestStatsSnapshot GetOps;
- };
-
- struct NamedBucketStats
- {
- std::string BucketName;
- BucketStats Stats;
- };
-
- struct DiskStats
- {
- std::vector<NamedBucketStats> BucketStats;
- uint64_t DiskSize;
- uint64_t MemorySize;
- };
-
- explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
- ~ZenCacheDiskLayer();
-
- bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
- bool Drop();
- bool DropBucket(std::string_view Bucket);
- void Flush();
- void ScrubStorage(ScrubContext& Ctx);
- void GatherReferences(GcContext& GcCtx);
- void CollectGarbage(GcContext& GcCtx);
-
- void DiscoverBuckets();
- GcStorageSize StorageSize() const;
- DiskStats Stats() const;
-
- Info GetInfo() const;
- std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
- void EnumerateBucketContents(std::string_view Bucket,
- std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const;
-
- CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const;
-
-#if ZEN_WITH_TESTS
- void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time);
-#endif // ZEN_WITH_TESTS
-
- /** A cache bucket manages a single directory containing
- metadata and data for that bucket
- */
- struct CacheBucket : public GcReferencer
- {
- CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config);
- ~CacheBucket();
-
- bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
- bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
- uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
- bool Drop();
- void Flush();
- void ScrubStorage(ScrubContext& Ctx);
- void GatherReferences(GcContext& GcCtx);
- void CollectGarbage(GcContext& GcCtx);
-
- inline GcStorageSize StorageSize() const
- {
- return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(),
- .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)};
- }
- uint64_t EntryCount() const;
- BucketStats Stats();
-
- CacheValueDetails::BucketDetails GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const;
- void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const;
-
- void GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots);
-#if ZEN_WITH_TESTS
- void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time);
-#endif // ZEN_WITH_TESTS
-
- private:
-#pragma pack(push)
-#pragma pack(1)
- struct MetaDataIndex
- {
- uint32_t Index = std::numeric_limits<uint32_t>::max();
-
- operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); };
- MetaDataIndex() = default;
- explicit MetaDataIndex(size_t InIndex) : Index(uint32_t(InIndex)) {}
- operator size_t() const { return Index; };
- inline auto operator<=>(const MetaDataIndex& Other) const = default;
- };
-
- struct MemCachedIndex
- {
- uint32_t Index = std::numeric_limits<uint32_t>::max();
-
- operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); };
- MemCachedIndex() = default;
- explicit MemCachedIndex(uint32_t InIndex) : Index(InIndex) {}
- operator size_t() const { return Index; };
- inline auto operator<=>(const MemCachedIndex& Other) const = default;
- };
-
- struct ReferenceIndex
- {
- uint32_t Index = std::numeric_limits<uint32_t>::max();
-
- static const ReferenceIndex Unknown() { return ReferenceIndex{std::numeric_limits<uint32_t>::max()}; }
- static const ReferenceIndex None() { return ReferenceIndex{std::numeric_limits<uint32_t>::max() - 1}; }
-
- ReferenceIndex() = default;
- explicit ReferenceIndex(size_t InIndex) : Index(uint32_t(InIndex)) {}
- operator size_t() const { return Index; };
- operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); };
- inline auto operator<=>(const ReferenceIndex& Other) const = default;
- };
-
- struct PayloadIndex
- {
- uint32_t Index = std::numeric_limits<uint32_t>::max();
-
- operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); };
- PayloadIndex() = default;
- explicit PayloadIndex(size_t InIndex) : Index(uint32_t(InIndex)) {}
- operator size_t() const { return Index; };
- inline auto operator<=>(const PayloadIndex& Other) const = default;
- };
-
- struct BucketPayload
- {
- DiskLocation Location; // 12
- MetaDataIndex MetaData; // 4
- MemCachedIndex MemCached; // 4
- };
- struct BucketMetaData
- {
- uint64_t RawSize = 0; // 8
- IoHash RawHash; // 20
-
- operator bool() const { return RawSize != 0 || RawHash != IoHash::Zero; };
- };
- struct MemCacheData
- {
- IoBuffer Payload;
- PayloadIndex OwnerIndex;
- };
-#pragma pack(pop)
- static_assert(sizeof(BucketPayload) == 20u);
- static_assert(sizeof(BucketMetaData) == 28u);
- static_assert(sizeof(AccessTime) == 4u);
-
- using IndexMap = tsl::robin_map<IoHash, PayloadIndex, IoHash::Hasher>;
-
- GcManager& m_Gc;
- std::atomic_uint64_t& m_OuterCacheMemoryUsage;
- std::string m_BucketName;
- std::filesystem::path m_BucketDir;
- std::filesystem::path m_BlocksBasePath;
- BucketConfiguration m_Configuration;
- BlockStore m_BlockStore;
- Oid m_BucketId;
- std::atomic_bool m_IsFlushing{true}; // Don't allow flush until we are properly initialized
-
- // These files are used to manage storage of small objects for this bucket
-
- TCasLogFile<DiskIndexEntry> m_SlogFile;
- uint64_t m_LogFlushPosition = 0;
-
- std::atomic<uint64_t> m_DiskHitCount;
- std::atomic<uint64_t> m_DiskMissCount;
- std::atomic<uint64_t> m_DiskWriteCount;
- std::atomic<uint64_t> m_MemoryHitCount;
- std::atomic<uint64_t> m_MemoryMissCount;
- std::atomic<uint64_t> m_MemoryWriteCount;
- metrics::RequestStats m_PutOps;
- metrics::RequestStats m_GetOps;
-
- mutable RwLock m_IndexLock;
- IndexMap m_Index;
- std::vector<AccessTime> m_AccessTimes;
- std::vector<BucketPayload> m_Payloads;
- std::vector<BucketMetaData> m_MetaDatas;
- std::vector<MetaDataIndex> m_FreeMetaDatas;
- std::vector<MemCacheData> m_MemCachedPayloads;
- std::vector<MemCachedIndex> m_FreeMemCachedPayloads;
- std::vector<ReferenceIndex> m_FirstReferenceIndex;
- std::vector<IoHash> m_ReferenceHashes;
- std::vector<ReferenceIndex> m_NextReferenceHashesIndexes;
- std::unique_ptr<HashSet> m_UpdatedKeys;
- size_t m_ReferenceCount = 0;
- std::atomic_uint64_t m_StandaloneSize{};
- std::atomic_uint64_t m_MemCachedSize{};
-
- virtual std::string GetGcName(GcCtx& Ctx) override;
- virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override;
- virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override;
-
- void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
- void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
- IoBuffer GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const;
- void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
- IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
- CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const;
-
- void CompactReferences(RwLock::ExclusiveLockScope&);
- void SetReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex, std::span<IoHash> References);
- void RemoveReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex);
- inline bool GetReferences(RwLock::SharedLockScope&, ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const
- {
- return LockedGetReferences(FirstReferenceIndex, OutReferences);
- }
- inline bool GetReferences(RwLock::ExclusiveLockScope&, ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const
- {
- return LockedGetReferences(FirstReferenceIndex, OutReferences);
- }
- ReferenceIndex AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key);
- bool LockedGetReferences(ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const;
- void ClearReferenceCache();
-
- void SetMetaData(RwLock::ExclusiveLockScope&,
- BucketPayload& Payload,
- const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData);
- void RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload);
- BucketMetaData GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const;
- void SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData);
- size_t RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload);
-
- void InitializeIndexFromDisk(RwLock::ExclusiveLockScope&, bool IsNew);
- uint64_t ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion);
- uint64_t ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t LogPosition);
-
- void SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; });
- void WriteIndexSnapshot(
- RwLock::ExclusiveLockScope&,
- const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; })
- {
- WriteIndexSnapshotLocked(ClaimDiskReserveFunc);
- }
- void WriteIndexSnapshot(
- RwLock::SharedLockScope&,
- const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; })
- {
- WriteIndexSnapshotLocked(ClaimDiskReserveFunc);
- }
- void WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; });
-
- void CompactState(RwLock::ExclusiveLockScope&,
- std::vector<BucketPayload>& Payloads,
- std::vector<AccessTime>& AccessTimes,
- std::vector<BucketMetaData>& MetaDatas,
- std::vector<MemCacheData>& MemCachedPayloads,
- std::vector<ReferenceIndex>& FirstReferenceIndex,
- IndexMap& Index,
- RwLock::ExclusiveLockScope& IndexLock);
-
- void AddMemCacheUsage(uint64_t ValueSize)
- {
- m_MemCachedSize.fetch_add(ValueSize, std::memory_order::relaxed);
- m_OuterCacheMemoryUsage.fetch_add(ValueSize, std::memory_order::relaxed);
- }
- void RemoveMemCacheUsage(uint64_t ValueSize)
- {
- m_MemCachedSize.fetch_sub(ValueSize, std::memory_order::relaxed);
- m_OuterCacheMemoryUsage.fetch_sub(ValueSize, std::memory_order::relaxed);
- }
- static inline uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize)
- {
- return sizeof(MemCacheData) + sizeof(IoBufferCore) + RoundUp(PayloadSize, 8u);
- }
-
- // These locks are here to avoid contention on file creation, therefore it's sufficient
- // that we take the same lock for the same hash
- //
- // These locks are small and should really be spaced out so they don't share cache lines,
- // but we don't currently access them at particularly high frequency so it should not be
- // an issue in practice
- mutable RwLock m_ShardedLocks[256];
- inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; }
-
- friend class DiskBucketReferenceChecker;
- friend class DiskBucketStoreCompactor;
- friend class BucketManifestSerializer;
- };
-
-private:
- CacheBucket* GetOrCreateBucket(std::string_view InBucket);
- inline void TryMemCacheTrim()
- {
- if (m_Configuration.MemCacheTargetFootprintBytes == 0)
- {
- return;
- }
- if (m_Configuration.MemCacheMaxAgeSeconds == 0 || m_Configuration.MemCacheTrimIntervalSeconds == 0)
- {
- return;
- }
- if (m_TotalMemCachedSize <= m_Configuration.MemCacheTargetFootprintBytes)
- {
- return;
- }
- if (m_IsMemCacheTrimming)
- {
- return;
- }
-
- const GcClock::Tick NowTick = GcClock::TickCount();
- if (NowTick < m_NextAllowedTrimTick)
- {
- return;
- }
-
- MemCacheTrim();
- }
- void MemCacheTrim();
- uint64_t MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime);
-
- GcManager& m_Gc;
- JobQueue& m_JobQueue;
- std::filesystem::path m_RootDir;
- Configuration m_Configuration;
- std::atomic_uint64_t m_TotalMemCachedSize{};
- std::atomic_bool m_IsMemCacheTrimming = false;
- std::atomic<GcClock::Tick> m_NextAllowedTrimTick;
- mutable RwLock m_Lock;
- std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets;
- std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
-
- ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete;
- ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete;
-
- friend class DiskBucketStoreCompactor;
- friend class DiskBucketReferenceChecker;
-};
-
-} // namespace zen
diff --git a/src/zenserver/cache/cacheshared.h b/src/zenserver/cache/cacheshared.h
deleted file mode 100644
index e3e8a2f84..000000000
--- a/src/zenserver/cache/cacheshared.h
+++ /dev/null
@@ -1,100 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-#include <zenstore/gc.h>
-
-#include <gsl/gsl-lite.hpp>
-#include <unordered_map>
-
-namespace zen {
-
-namespace access_tracking {
-
- struct KeyAccessTime
- {
- IoHash Key;
- GcClock::Tick LastAccess{};
- };
-
- struct AccessTimes
- {
- std::unordered_map<std::string, std::vector<KeyAccessTime>> Buckets;
- };
-}; // namespace access_tracking
-
-struct ZenCacheValue
-{
- IoBuffer Value;
- uint64_t RawSize = 0;
- IoHash RawHash = IoHash::Zero;
-};
-
-struct CacheValueDetails
-{
- struct ValueDetails
- {
- uint64_t Size;
- uint64_t RawSize;
- IoHash RawHash;
- GcClock::Tick LastAccess{};
- std::vector<IoHash> Attachments;
- ZenContentType ContentType;
- };
-
- struct BucketDetails
- {
- std::unordered_map<IoHash, ValueDetails, IoHash::Hasher> Values;
- };
-
- struct NamespaceDetails
- {
- std::unordered_map<std::string, BucketDetails> Buckets;
- };
-
- std::unordered_map<std::string, NamespaceDetails> Namespaces;
-};
-
-bool IsKnownBadBucketName(std::string_view BucketName);
-
-//////////////////////////////////////////////////////////////////////////
-
-// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch
-struct AccessTime
-{
- explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {}
- AccessTime& operator=(GcClock::Tick Tick) noexcept
- {
- SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed);
- return *this;
- }
- operator GcClock::Tick() const noexcept
- {
- return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed)))
- .count();
- }
-
- AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {}
- AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {}
- AccessTime& operator=(AccessTime&& Rhs) noexcept
- {
- SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed);
- return *this;
- }
- AccessTime& operator=(const AccessTime& Rhs) noexcept
- {
- SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed);
- return *this;
- }
-
-private:
- static uint32_t ToSeconds(GcClock::Tick Tick)
- {
- return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count());
- }
- std::atomic_uint32_t SecondsSinceEpoch;
-};
-
-} // namespace zen
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index f61fbd8bc..f9cd7ae13 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -19,10 +19,10 @@
#include <zenhttp/httpshared.h>
#include <zenhttp/httpstats.h>
#include <zenstore/gc.h>
+#include <zenstore/structuredcachestore.h>
#include <zenutil/cache/cache.h>
#include <zenutil/cache/rpcrecording.h>
-#include "structuredcachestore.h"
#include "upstream/jupiter.h"
#include "upstream/upstreamcache.h"
#include "upstream/zen.h"
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
deleted file mode 100644
index 9155e209c..000000000
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ /dev/null
@@ -1,2456 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "structuredcachestore.h"
-
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinarypackage.h>
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/compress.h>
-#include <zencore/except.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/scopeguard.h>
-#include <zencore/string.h>
-#include <zencore/thread.h>
-#include <zencore/timer.h>
-#include <zencore/trace.h>
-#include <zencore/workthreadpool.h>
-#include <zennet/statsdclient.h>
-#include <zenstore/scrubcontext.h>
-#include <zenutil/cache/cache.h>
-
-#include <future>
-#include <limits>
-
-#if ZEN_PLATFORM_WINDOWS
-# include <zencore/windows.h>
-#endif
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <fmt/core.h>
-#include <xxhash.h>
-#include <gsl/gsl-lite.hpp>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-#if ZEN_WITH_TESTS
-# include <zencore/jobqueue.h>
-# include <zencore/testing.h>
-# include <zencore/testutils.h>
-# include <zencore/workthreadpool.h>
-# include <zenstore/cidstore.h>
-# include <random>
-# include <unordered_map>
-#endif
-
-namespace zen {
-
-bool
-IsKnownBadBucketName(std::string_view Bucket)
-{
- if (Bucket.size() == 32)
- {
- uint8_t BucketHex[16];
- if (ParseHexBytes(Bucket, BucketHex))
- {
- return true;
- }
- }
-
- return false;
-}
-
-ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config)
-: m_Gc(Gc)
-, m_JobQueue(JobQueue)
-, m_RootDir(RootDir)
-, m_Configuration(Config)
-, m_DiskLayer(m_Gc, m_JobQueue, m_RootDir, m_Configuration.DiskLayerConfig)
-{
- ZEN_INFO("initializing structured cache at '{}'", m_RootDir);
- CreateDirectories(m_RootDir);
-
- m_DiskLayer.DiscoverBuckets();
-
- m_Gc.AddGcContributor(this);
- m_Gc.AddGcStorage(this);
-}
-
-ZenCacheNamespace::~ZenCacheNamespace()
-{
- m_Gc.RemoveGcStorage(this);
- m_Gc.RemoveGcContributor(this);
-}
-
-bool
-ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- ZEN_TRACE_CPU("Z$::Namespace::Get");
-
- metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
-
- bool Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue);
-
- if (Ok)
- {
- ZEN_ASSERT(OutValue.Value.Size());
- StatsScope.SetBytes(OutValue.Value.Size());
-
- m_HitCount++;
- return true;
- }
-
- m_MissCount++;
- return false;
-}
-
-void
-ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
-{
- ZEN_TRACE_CPU("Z$::Namespace::Put");
-
- metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
-
- // Store value and index
-
- ZEN_ASSERT(Value.Value.Size());
-
- m_DiskLayer.Put(InBucket, HashKey, Value, References);
- m_WriteCount++;
-}
-
-bool
-ZenCacheNamespace::DropBucket(std::string_view Bucket)
-{
- ZEN_INFO("dropping bucket '{}'", Bucket);
-
- const bool Dropped = m_DiskLayer.DropBucket(Bucket);
-
- ZEN_INFO("bucket '{}' was {}", Bucket, Dropped ? "dropped" : "not found");
-
- return Dropped;
-}
-
-void
-ZenCacheNamespace::EnumerateBucketContents(std::string_view Bucket,
- std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const
-{
- m_DiskLayer.EnumerateBucketContents(Bucket, Fn);
-}
-
-bool
-ZenCacheNamespace::Drop()
-{
- return m_DiskLayer.Drop();
-}
-
-void
-ZenCacheNamespace::Flush()
-{
- m_DiskLayer.Flush();
-}
-
-void
-ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx)
-{
- if (m_LastScrubTime == Ctx.ScrubTimestamp())
- {
- return;
- }
-
- ZEN_INFO("scrubbing '{}'", m_RootDir);
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_DiskLayer.ScrubStorage(Ctx);
-}
-
-void
-ZenCacheNamespace::GatherReferences(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::ZenCacheNamespace::GatherReferences");
-
- Stopwatch Timer;
- const auto Guard =
- MakeGuard([&] { ZEN_DEBUG("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
-
- m_DiskLayer.GatherReferences(GcCtx);
-}
-
-void
-ZenCacheNamespace::CollectGarbage(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::Namespace::CollectGarbage");
-
- m_DiskLayer.CollectGarbage(GcCtx);
-}
-
-GcStorageSize
-ZenCacheNamespace::StorageSize() const
-{
- return m_DiskLayer.StorageSize();
-}
-
-ZenCacheNamespace::Info
-ZenCacheNamespace::GetInfo() const
-{
- ZenCacheNamespace::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration, .DiskLayerInfo = m_DiskLayer.GetInfo()};
- std::unordered_set<std::string> BucketNames;
- for (const std::string& BucketName : Info.DiskLayerInfo.BucketNames)
- {
- BucketNames.insert(BucketName);
- }
- Info.BucketNames.insert(Info.BucketNames.end(), BucketNames.begin(), BucketNames.end());
- return Info;
-}
-
-std::optional<ZenCacheNamespace::BucketInfo>
-ZenCacheNamespace::GetBucketInfo(std::string_view Bucket) const
-{
- std::optional<ZenCacheDiskLayer::BucketInfo> DiskBucketInfo = m_DiskLayer.GetBucketInfo(Bucket);
- if (!DiskBucketInfo.has_value())
- {
- return {};
- }
- ZenCacheNamespace::BucketInfo Info = {.DiskLayerInfo = *DiskBucketInfo};
- return Info;
-}
-
-ZenCacheNamespace::NamespaceStats
-ZenCacheNamespace::Stats()
-{
- return ZenCacheNamespace::NamespaceStats{.HitCount = m_HitCount,
- .MissCount = m_MissCount,
- .WriteCount = m_WriteCount,
- .PutOps = m_PutOps.Snapshot(),
- .GetOps = m_GetOps.Snapshot(),
- .DiskStats = m_DiskLayer.Stats()};
-}
-
-CacheValueDetails::NamespaceDetails
-ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const
-{
- return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter);
-}
-
-#if ZEN_WITH_TESTS
-void
-ZenCacheNamespace::SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time)
-{
- m_DiskLayer.SetAccessTime(Bucket, HashKey, Time);
-}
-#endif // ZEN_WITH_TESTS
-
-//////////////////////////// ZenCacheStore
-
-ZEN_DEFINE_LOG_CATEGORY_STATIC(LogCacheActivity, "z$");
-
-static constinit std::string_view UE4DDCNamespaceName = "ue4.ddc";
-
-ZenCacheStore::ZenCacheStore(GcManager& Gc,
- JobQueue& JobQueue,
- const std::filesystem::path& BasePath,
- const Configuration& Configuration,
- const DiskWriteBlocker* InDiskWriteBlocker)
-: m_DiskWriteBlocker(InDiskWriteBlocker)
-, m_Gc(Gc)
-, m_JobQueue(JobQueue)
-, m_BasePath(BasePath)
-, m_Configuration(Configuration)
-, m_ExitLogging(false)
-{
- SetLoggingConfig(m_Configuration.Logging);
- CreateDirectories(m_BasePath);
-
- ZEN_INFO("initializing cache store at '{}'", m_BasePath);
-
- DirectoryContent DirContent;
- GetDirectoryContent(m_BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
-
- std::vector<std::string> Namespaces;
- for (const std::filesystem::path& DirPath : DirContent.Directories)
- {
- std::string DirName = PathToUtf8(DirPath.filename());
- if (DirName.starts_with(NamespaceDiskPrefix))
- {
- Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length()));
- continue;
- }
- }
-
- ZEN_INFO("Found {} namespaces in '{}'", Namespaces.size(), m_BasePath);
-
- if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end())
- {
- // default (unspecified) and ue4-ddc namespace points to the same namespace instance
-
- std::filesystem::path DefaultNamespaceFolder = m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
- CreateDirectories(DefaultNamespaceFolder);
- Namespaces.push_back(std::string(UE4DDCNamespaceName));
- }
-
- for (const std::string& NamespaceName : Namespaces)
- {
- m_Namespaces[NamespaceName] =
- std::make_unique<ZenCacheNamespace>(Gc,
- m_JobQueue,
- m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName),
- m_Configuration.NamespaceConfig);
- }
-}
-
-ZenCacheStore::~ZenCacheStore()
-{
- ZEN_INFO("closing cache store at '{}'", m_BasePath);
- SetLoggingConfig({.EnableWriteLog = false, .EnableAccessLog = false});
- m_Namespaces.clear();
-}
-
-void
-ZenCacheStore::LogWorker()
-{
- SetCurrentThreadName("ZenCacheStore::LogWorker");
-
- LoggerRef ZCacheLog(logging::Get("z$"));
-
- auto Log = [&ZCacheLog]() -> LoggerRef { return ZCacheLog; };
-
- std::vector<AccessLogItem> Items;
- while (true)
- {
- try
- {
- m_LogQueueLock.WithExclusiveLock([this, &Items]() { Items.swap(m_LogQueue); });
- if (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed())
- {
- for (const auto& Item : Items)
- {
- if (Item.Value.Value)
- {
- const bool IsCbObject = Item.Value.Value.GetContentType() == ZenContentType::kCbObject;
-
- try
- {
- const IoHash ObjectHash = IsCbObject ? IoHash::HashBuffer(Item.Value.Value.GetView()) : Item.Value.RawHash;
- const size_t ObjectSize = IsCbObject ? Item.Value.Value.GetSize() : Item.Value.RawSize;
-
- ZEN_LOG_INFO(LogCacheActivity,
- "{} [{}] {}/{}/{} -> {} {} {}",
- Item.Op,
- Item.Context,
- Item.Namespace,
- Item.Bucket,
- Item.HashKey,
- ObjectHash,
- ObjectSize,
- ToString(Item.Value.Value.GetContentType()))
- }
- catch (std::exception& Ex)
- {
- ZEN_LOG_INFO(LogCacheActivity,
- "{} [{}] {}/{}/{} failed: Reason: '{}'",
- Item.Op,
- Item.Context,
- Item.Namespace,
- Item.Bucket,
- Item.HashKey,
- Ex.what())
- }
- }
- else
- {
- ZEN_LOG_INFO(LogCacheActivity,
- "{} [{}] {}/{}/{}",
- Item.Op,
- Item.Context,
- Item.Namespace,
- Item.Bucket,
- Item.HashKey);
- }
- }
- }
- if (!Items.empty())
- {
- Items.resize(0);
- continue;
- }
- if (m_ExitLogging)
- {
- break;
- }
- m_LogEvent.Wait();
- m_LogEvent.Reset();
- }
- catch (std::exception& Ex)
- {
- ZEN_WARN("Log writer failed: '{}'", Ex.what());
- }
- }
-}
-
-bool
-ZenCacheStore::Get(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::string_view Bucket,
- const IoHash& HashKey,
- ZenCacheValue& OutValue)
-{
- // Ad hoc rejection of known bad usage patterns for DDC bucket names
-
- if (IsKnownBadBucketName(Bucket))
- {
- m_RejectedReadCount++;
- return false;
- }
-
- ZEN_TRACE_CPU("Z$::Get");
-
- metrics::RequestStats::Scope OpScope(m_GetOps, 0);
-
- if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
- {
- bool Result = Store->Get(Bucket, HashKey, OutValue);
-
- if (m_AccessLogEnabled)
- {
- ZEN_TRACE_CPU("Z$::Get::AccessLog");
- bool Signal = false;
- m_LogQueueLock.WithExclusiveLock([&]() {
- Signal = m_LogQueue.empty();
- m_LogQueue.emplace_back(AccessLogItem{.Op = Result ? "GET HIT " : "GET MISS",
- .Context = Context,
- .Namespace = std::string(Namespace),
- .Bucket = std::string(Bucket),
- .HashKey = HashKey,
- .Value = OutValue /*,
- .Result = Result*/});
- });
- if (Signal)
- {
- m_LogEvent.Set();
- }
- }
- if (Result)
- {
- m_HitCount++;
- OpScope.SetBytes(OutValue.Value.GetSize());
- return true;
- }
-
- m_MissCount++;
- return false;
- }
- ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get [{}], bucket '{}', key '{}'",
- Context,
- Namespace,
- Bucket,
- HashKey.ToHexString());
-
- m_MissCount++;
- return false;
-}
-
-void
-ZenCacheStore::Put(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::string_view Bucket,
- const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References)
-{
- // Ad hoc rejection of known bad usage patterns for DDC bucket names
-
- if (IsKnownBadBucketName(Bucket))
- {
- m_RejectedWriteCount++;
- return;
- }
-
- ZEN_TRACE_CPU("Z$::Put");
-
- metrics::RequestStats::Scope $(m_PutOps, Value.Value.GetSize());
-
- if (m_WriteLogEnabled)
- {
- ZEN_TRACE_CPU("Z$::Get::WriteLog");
- bool Signal = false;
- m_LogQueueLock.WithExclusiveLock([&]() {
- Signal = m_LogQueue.empty();
- m_LogQueue.emplace_back(AccessLogItem{.Op = "PUT ",
- .Context = Context,
- .Namespace = std::string(Namespace),
- .Bucket = std::string(Bucket),
- .HashKey = HashKey,
- .Value = Value /*,
- .Result = true*/});
- });
- if (Signal)
- {
- m_LogEvent.Set();
- }
- }
-
- if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
- {
- Store->Put(Bucket, HashKey, Value, References);
- m_WriteCount++;
- return;
- }
-
- ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'",
- Context,
- Namespace,
- Bucket,
- HashKey.ToHexString());
-}
-
-bool
-ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket)
-{
- if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
- {
- return Store->DropBucket(Bucket);
- }
- ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropBucket, bucket '{}'", Namespace, Bucket);
- return false;
-}
-
-bool
-ZenCacheStore::DropNamespace(std::string_view InNamespace)
-{
- RwLock::SharedLockScope _(m_NamespacesLock);
- if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end())
- {
- ZenCacheNamespace& Namespace = *It->second;
- m_DroppedNamespaces.push_back(std::move(It->second));
- m_Namespaces.erase(It);
- return Namespace.Drop();
- }
- ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace);
- return false;
-}
-
-void
-ZenCacheStore::Flush()
-{
- ZEN_INFO("flushing cache store at '{}'", m_BasePath);
- IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); });
-}
-
-void
-ZenCacheStore::ScrubStorage(ScrubContext& Ctx)
-{
- IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.ScrubStorage(Ctx); });
-}
-
-CacheValueDetails
-ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter,
- const std::string_view BucketFilter,
- const std::string_view ValueFilter) const
-{
- CacheValueDetails Details;
- if (NamespaceFilter.empty())
- {
- IterateNamespaces([&](std::string_view Namespace, ZenCacheNamespace& Store) {
- Details.Namespaces[std::string(Namespace)] = Store.GetValueDetails(BucketFilter, ValueFilter);
- });
- }
- else if (const ZenCacheNamespace* Store = FindNamespace(NamespaceFilter); Store != nullptr)
- {
- Details.Namespaces[std::string(NamespaceFilter)] = Store->GetValueDetails(BucketFilter, ValueFilter);
- }
- return Details;
-}
-
-void
-ZenCacheStore::EnumerateBucketContents(std::string_view Namespace,
- std::string_view Bucket,
- std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>&& Fn)
-{
- if (const ZenCacheNamespace* Ns = FindNamespace(Namespace))
- {
- Ns->EnumerateBucketContents(Bucket, Fn);
- }
-}
-
-ZenCacheNamespace*
-ZenCacheStore::GetNamespace(std::string_view Namespace)
-{
- RwLock::SharedLockScope _(m_NamespacesLock);
- if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
- {
- return It->second.get();
- }
- if (Namespace == DefaultNamespace)
- {
- if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end())
- {
- return It->second.get();
- }
- }
- _.ReleaseNow();
-
- if (!m_Configuration.AllowAutomaticCreationOfNamespaces)
- {
- return nullptr;
- }
-
- RwLock::ExclusiveLockScope __(m_NamespacesLock);
- if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
- {
- return It->second.get();
- }
-
- auto NewNamespace =
- m_Namespaces.insert_or_assign(std::string(Namespace),
- std::make_unique<ZenCacheNamespace>(m_Gc,
- m_JobQueue,
- m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace),
- m_Configuration.NamespaceConfig));
- return NewNamespace.first->second.get();
-}
-
-const ZenCacheNamespace*
-ZenCacheStore::FindNamespace(std::string_view Namespace) const
-{
- RwLock::SharedLockScope _(m_NamespacesLock);
- if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
- {
- return It->second.get();
- }
- if (Namespace == DefaultNamespace)
- {
- if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end())
- {
- return It->second.get();
- }
- }
- return nullptr;
-}
-
-std::vector<std::string>
-ZenCacheStore::GetNamespaces()
-{
- std::vector<std::string> Namespaces;
-
- IterateNamespaces([&](std::string_view Namespace, ZenCacheNamespace&) { Namespaces.push_back(std::string(Namespace)); });
-
- return Namespaces;
-}
-
-void
-ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const
-{
- std::vector<std::pair<std::string, ZenCacheNamespace&>> Namespaces;
- {
- RwLock::SharedLockScope _(m_NamespacesLock);
- Namespaces.reserve(m_Namespaces.size());
- for (const auto& Entry : m_Namespaces)
- {
- if (Entry.first == DefaultNamespace)
- {
- continue;
- }
- Namespaces.push_back({Entry.first, *Entry.second});
- }
- }
- for (auto& Entry : Namespaces)
- {
- Callback(Entry.first, Entry.second);
- }
-}
-
-GcStorageSize
-ZenCacheStore::StorageSize() const
-{
- GcStorageSize Size;
- IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) {
- GcStorageSize StoreSize = Store.StorageSize();
- Size.MemorySize += StoreSize.MemorySize;
- Size.DiskSize += StoreSize.DiskSize;
- });
- return Size;
-}
-
-ZenCacheStore::CacheStoreStats
-ZenCacheStore::Stats(bool IncludeNamespaceStats)
-{
- ZenCacheStore::CacheStoreStats Result{.HitCount = m_HitCount,
- .MissCount = m_MissCount,
- .WriteCount = m_WriteCount,
- .RejectedWriteCount = m_RejectedWriteCount,
- .RejectedReadCount = m_RejectedReadCount,
- .PutOps = m_PutOps.Snapshot(),
- .GetOps = m_GetOps.Snapshot()};
-
- if (IncludeNamespaceStats)
- {
- IterateNamespaces([&](std::string_view NamespaceName, ZenCacheNamespace& Store) {
- Result.NamespaceStats.emplace_back(NamedNamespaceStats{.NamespaceName = std::string(NamespaceName), .Stats = Store.Stats()});
- });
- }
-
- return Result;
-}
-
-void
-ZenCacheStore::ReportMetrics(StatsMetrics& Statsd)
-{
- const bool IncludeNamespaceStats = false;
- const CacheStoreStats Now = Stats(IncludeNamespaceStats);
- const CacheStoreStats& Old = m_LastReportedMetrics;
-
- Statsd.Meter("zen.cache_hits", Now.HitCount - Old.HitCount);
- Statsd.Meter("zen.cache_misses", Now.MissCount - Old.MissCount);
- Statsd.Meter("zen.cache_writes", Now.WriteCount - Old.WriteCount);
-
- m_LastReportedMetrics = Now;
-}
-
-void
-ZenCacheStore::SetLoggingConfig(const Configuration::LogConfig& Loggingconfig)
-{
- if (!Loggingconfig.EnableAccessLog && !Loggingconfig.EnableWriteLog)
- {
- m_AccessLogEnabled.store(false);
- m_WriteLogEnabled.store(false);
- m_ExitLogging.store(true);
- m_LogEvent.Set();
- if (m_AsyncLoggingThread.joinable())
- {
- m_AsyncLoggingThread.join();
- }
- m_Configuration.Logging = Loggingconfig;
- return;
- }
- if (!m_AccessLogEnabled.load() && !m_WriteLogEnabled.load())
- {
- m_AsyncLoggingThread = std::thread(&ZenCacheStore::LogWorker, this);
- }
- m_WriteLogEnabled.store(Loggingconfig.EnableWriteLog);
- m_AccessLogEnabled.store(Loggingconfig.EnableAccessLog);
- m_Configuration.Logging = Loggingconfig;
-}
-
-ZenCacheStore::Info
-ZenCacheStore::GetInfo() const
-{
- ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()};
-
- IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) {
- Info.NamespaceNames.push_back(std::string(NamespaceName));
- ZenCacheNamespace::Info NamespaceInfo = Namespace.GetInfo();
- Info.DiskEntryCount += NamespaceInfo.DiskLayerInfo.EntryCount;
- });
-
- return Info;
-}
-
-std::optional<ZenCacheNamespace::Info>
-ZenCacheStore::GetNamespaceInfo(std::string_view NamespaceName)
-{
- if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace)
- {
- return Namespace->GetInfo();
- }
- return {};
-}
-
-std::optional<ZenCacheNamespace::BucketInfo>
-ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view BucketName)
-{
- if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace)
- {
- return Namespace->GetBucketInfo(BucketName);
- }
- return {};
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-#if ZEN_WITH_TESTS
-
-using namespace std::literals;
-
-namespace testutils {
- IoHash CreateKey(size_t KeyValue) { return IoHash::HashBuffer(&KeyValue, sizeof(size_t)); }
-
- IoHash ToIoHash(const Oid& Id)
- {
- char OIdString[24 + 1];
- Id.ToString(OIdString);
- IoHash Key = IoHash::HashBuffer(OIdString, 24);
- return Key;
- }
-
- std::pair<Oid, IoBuffer> CreateBinaryBlob(size_t Size) { return {Oid::NewOid(), CreateRandomBlob(Size)}; }
-
- std::vector<std::pair<Oid, CompressedBuffer>> CreateCompressedAttachment(CidStore& Store, const std::span<const size_t>& Sizes)
- {
- std::vector<std::pair<Oid, CompressedBuffer>> Result;
- Result.reserve(Sizes.size());
- for (size_t Size : Sizes)
- {
- auto Blob = CreateBinaryBlob(Size);
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Blob.second.Data(), Blob.second.Size()));
- CHECK(!Store.ContainsChunk(Compressed.DecodeRawHash()));
- Result.emplace_back(std::pair<Oid, CompressedBuffer>(Blob.first, Compressed));
- }
- return Result;
- }
-
- std::pair<IoHash, IoBuffer> CreateRecord(std::span<std::pair<Oid, CompressedBuffer>> Attachments)
- {
- Oid Id = Oid::NewOid();
- IoHash Key = ToIoHash(Id);
- CbObjectWriter Record;
- Record << "Key"sv << Id;
-
- for (size_t Idx = 0; auto& Cid : Attachments)
- {
- Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid.second.DecodeRawHash());
- }
-
- IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
- return {Key, Buffer};
- }
-
- struct FalseType
- {
- static const bool Enabled = false;
- };
- struct TrueType
- {
- static const bool Enabled = true;
- };
-
-} // namespace testutils
-
-TEST_CASE_TEMPLATE("z$.store", ReferenceCaching, testutils::FalseType, testutils::TrueType)
-{
- ScopedTemporaryDirectory TempDir;
-
- GcManager Gc;
-
- auto JobQueue = MakeJobQueue(1, "testqueue");
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
-
- const int kIterationCount = 100;
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- const IoHash Key = IoHash::HashBuffer(&i, sizeof i);
-
- CbObjectWriter Cbo;
- Cbo << "hey" << i;
- CbObject Obj = Cbo.Save();
-
- ZenCacheValue Value;
- Value.Value = Obj.GetBuffer().AsIoBuffer();
- Value.Value.SetContentType(ZenContentType::kCbObject);
-
- Zcs.Put("test_bucket"sv, Key, Value, {});
- }
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- const IoHash Key = IoHash::HashBuffer(&i, sizeof i);
-
- ZenCacheValue Value;
- Zcs.Get("test_bucket"sv, Key, /* out */ Value);
-
- REQUIRE(Value.Value);
- CHECK(Value.Value.GetContentType() == ZenContentType::kCbObject);
- CHECK_EQ(ValidateCompactBinary(Value.Value, CbValidateMode::All), CbValidateError::None);
- CbObject Obj = LoadCompactBinaryObject(Value.Value);
- CHECK_EQ(Obj["hey"].AsInt32(), i);
- }
-}
-
-TEST_CASE_TEMPLATE("z$.size", ReferenceCaching, testutils::FalseType, testutils::TrueType)
-{
- auto JobQueue = MakeJobQueue(1, "testqueue");
-
- const auto CreateCacheValue = [](size_t Size) -> CbObject {
- std::vector<uint8_t> Buf;
- Buf.resize(Size);
-
- CbObjectWriter Writer;
- Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
- return Writer.Save();
- };
-
- SUBCASE("mem/disklayer")
- {
- const size_t Count = 16;
- ScopedTemporaryDirectory TempDir;
-
- GcStorageSize CacheSize;
-
- {
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
-
- CbObject CacheValue = CreateCacheValue(Zcs.GetConfig().DiskLayerConfig.BucketConfig.MemCacheSizeThreshold - 256);
-
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- std::vector<std::pair<std::string, IoHash>> Keys;
-
- for (size_t Key = 0; Key < Count; ++Key)
- {
- const size_t Bucket = Key % 4;
- std::string BucketName = fmt::format("test_bucket-{}", Bucket);
- IoHash Hash = IoHash::HashBuffer(&Key, sizeof(uint32_t));
- Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {});
- Keys.push_back({BucketName, Hash});
- }
- CacheSize = Zcs.StorageSize();
- CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
- CHECK_EQ(0, CacheSize.MemorySize);
-
- for (const auto& Key : Keys)
- {
- ZenCacheValue _;
- Zcs.Get(Key.first, Key.second, _);
- }
-
- CacheSize = Zcs.StorageSize();
- CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
- CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize);
- }
-
- {
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
-
- const GcStorageSize SerializedSize = Zcs.StorageSize();
- CHECK_EQ(SerializedSize.MemorySize, 0);
- CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
-
- for (size_t Bucket = 0; Bucket < 4; ++Bucket)
- {
- Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
- }
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
- CHECK_EQ(0, Zcs.StorageSize().MemorySize);
- }
- }
-
- SUBCASE("disklayer")
- {
- const size_t Count = 16;
- ScopedTemporaryDirectory TempDir;
-
- GcStorageSize CacheSize;
-
- {
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
-
- CbObject CacheValue = CreateCacheValue(Zcs.GetConfig().DiskLayerConfig.BucketConfig.MemCacheSizeThreshold + 64);
-
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- for (size_t Key = 0; Key < Count; ++Key)
- {
- const size_t Bucket = Key % 4;
- Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {});
- }
-
- CacheSize = Zcs.StorageSize();
- CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
- CHECK_EQ(0, CacheSize.MemorySize);
- }
-
- {
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
-
- const GcStorageSize SerializedSize = Zcs.StorageSize();
- CHECK_EQ(SerializedSize.MemorySize, 0);
- CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
-
- for (size_t Bucket = 0; Bucket < 4; ++Bucket)
- {
- Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
- }
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
- }
- }
-}
-
-TEST_CASE_TEMPLATE("z$.gc", ReferenceCaching, testutils::FalseType, testutils::TrueType)
-{
- using namespace testutils;
-
- auto JobQueue = MakeJobQueue(1, "testqueue");
-
- SUBCASE("gather references does NOT add references for expired cache entries")
- {
- ScopedTemporaryDirectory TempDir;
- std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)};
-
- const auto CollectAndFilter = [](GcManager& Gc,
- GcClock::TimePoint Time,
- GcClock::Duration MaxDuration,
- std::span<const IoHash> Cids,
- std::vector<IoHash>& OutKeep) {
- GcContext GcCtx(Time - MaxDuration, Time - MaxDuration);
- Gc.CollectGarbage(GcCtx);
- OutKeep.clear();
- GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); });
- };
-
- {
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- const auto Bucket = "teardrinker"sv;
-
- // Create a cache record
- const IoHash Key = CreateKey(42);
- CbObjectWriter Record;
- Record << "Key"sv
- << "SomeRecord"sv;
-
- for (size_t Idx = 0; auto& Cid : Cids)
- {
- Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid);
- }
-
- IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- Zcs.Put(Bucket, Key, {.Value = Buffer}, Cids);
-
- std::vector<IoHash> Keep;
-
- // Collect garbage with 1 hour max cache duration
- {
- CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
- CHECK_EQ(Cids.size(), Keep.size());
- }
-
- // Move forward in time
- {
- CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
- CHECK_EQ(0, Keep.size());
- }
- }
-
- // Expect timestamps to be serialized
- {
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- std::vector<IoHash> Keep;
-
- // Collect garbage with 1 hour max cache duration
- {
- CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
- CHECK_EQ(3, Keep.size());
- }
-
- // Move forward in time
- {
- CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
- CHECK_EQ(0, Keep.size());
- }
- }
- }
-
- SUBCASE("gc removes standalone values")
- {
- ScopedTemporaryDirectory TempDir;
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- const auto Bucket = "fortysixandtwo"sv;
- const GcClock::TimePoint CurrentTime = GcClock::Now();
-
- std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
-
- for (const auto& Key : Keys)
- {
- IoBuffer Value = CreateRandomBlob(128 << 10);
- Zcs.Put(Bucket, Key, {.Value = Value}, {});
- }
-
- {
- GcContext GcCtx(CurrentTime - std::chrono::hours(46), CurrentTime - std::chrono::hours(46));
-
- Gc.CollectGarbage(GcCtx);
-
- for (const auto& Key : Keys)
- {
- ZenCacheValue CacheValue;
- const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
- CHECK(Exists);
- }
- }
-
- // Move forward in time and collect again
- {
- GcContext GcCtx(CurrentTime + std::chrono::minutes(2), CurrentTime + std::chrono::minutes(2));
- Gc.CollectGarbage(GcCtx);
-
- for (const auto& Key : Keys)
- {
- ZenCacheValue CacheValue;
- const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
- CHECK(!Exists);
- }
-
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
- }
- }
-
- SUBCASE("gc removes small objects")
- {
- ScopedTemporaryDirectory TempDir;
- GcManager Gc;
- {
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- const auto Bucket = "rightintwo"sv;
-
- std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
-
- for (const auto& Key : Keys)
- {
- IoBuffer Value = CreateRandomBlob(128);
- Zcs.Put(Bucket, Key, {.Value = Value}, {});
- }
-
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(2), GcClock::Now() - std::chrono::hours(2));
- GcCtx.CollectSmallObjects(true);
-
- Gc.CollectGarbage(GcCtx);
-
- for (const auto& Key : Keys)
- {
- ZenCacheValue CacheValue;
- const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
- CHECK(Exists);
- }
- }
-
- // Move forward in time and collect again
- {
- GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2), GcClock::Now() + std::chrono::minutes(2));
- GcCtx.CollectSmallObjects(true);
-
- Zcs.Flush();
- Gc.CollectGarbage(GcCtx);
-
- for (const auto& Key : Keys)
- {
- ZenCacheValue CacheValue;
- const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
- CHECK(!Exists);
- }
- // GC could not remove the currently written block so size will not be zero
- CHECK_NE(0, Zcs.StorageSize().DiskSize);
- }
- }
- {
- // Unreferenced blocks will be pruned so size should now be zero
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
- }
- }
-}
-
-TEST_CASE_TEMPLATE("z$.threadedinsert", ReferenceCaching, testutils::FalseType, testutils::TrueType) // * doctest::skip(true))
-{
- // for (uint32_t i = 0; i < 100; ++i)
- {
- ScopedTemporaryDirectory TempDir;
-
- const uint64_t kChunkSize = 1048;
- const int32_t kChunkCount = 8192;
-
- struct Chunk
- {
- std::string Bucket;
- IoBuffer Buffer;
- };
- std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks;
- Chunks.reserve(kChunkCount);
-
- const std::string Bucket1 = "rightinone";
- const std::string Bucket2 = "rightintwo";
-
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
- {
- while (true)
- {
- IoBuffer Chunk = CreateRandomBlob(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- if (Chunks.contains(Hash))
- {
- continue;
- }
- Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
- break;
- }
- while (true)
- {
- IoBuffer Chunk = CreateRandomBlob(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- if (Chunks.contains(Hash))
- {
- continue;
- }
- Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
- break;
- }
- }
-
- CreateDirectories(TempDir.Path());
-
- WorkerThreadPool ThreadPool(4);
- GcManager Gc;
- auto JobQueue = MakeJobQueue(1, "testqueue");
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path(),
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
-
- {
- std::atomic<size_t> WorkCompleted = 0;
- for (const auto& Chunk : Chunks)
- {
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
- }
-
- const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
- CHECK_LE(kChunkSize * Chunks.size(), TotalSize);
-
- {
- std::atomic<size_t> WorkCompleted = 0;
- for (const auto& Chunk : Chunks)
- {
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- std::string Bucket = Chunk.second.Bucket;
- IoHash ChunkHash = Chunk.first;
- ZenCacheValue CacheValue;
-
- CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
- IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
- CHECK(ChunkHash == Hash);
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
- }
- std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes;
- GcChunkHashes.reserve(Chunks.size());
- for (const auto& Chunk : Chunks)
- {
- GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
- }
- {
- std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks;
-
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
- {
- {
- IoBuffer Chunk = CreateRandomBlob(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
- }
- {
- IoBuffer Chunk = CreateRandomBlob(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
- }
- }
-
- std::atomic<size_t> WorkCompleted = 0;
- std::atomic_uint32_t AddedChunkCount = 0;
- for (const auto& Chunk : NewChunks)
- {
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
- AddedChunkCount.fetch_add(1);
- WorkCompleted.fetch_add(1);
- });
- }
-
- for (const auto& Chunk : Chunks)
- {
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
- ZenCacheValue CacheValue;
- if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
- {
- CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
- }
- WorkCompleted.fetch_add(1);
- });
- }
- while (AddedChunkCount.load() < NewChunks.size())
- {
- // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const auto& Chunk : NewChunks)
- {
- ZenCacheValue CacheValue;
- if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
- {
- GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
- }
- }
- std::vector<IoHash> KeepHashes;
- KeepHashes.reserve(GcChunkHashes.size());
- for (const auto& Entry : GcChunkHashes)
- {
- KeepHashes.push_back(Entry.first);
- }
- size_t C = 0;
- while (C < KeepHashes.size())
- {
- if (C % 155 == 0)
- {
- if (C < KeepHashes.size() - 1)
- {
- KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- if (C + 3 < KeepHashes.size() - 1)
- {
- KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- }
- C++;
- }
-
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- GcCtx.AddRetainedCids(KeepHashes);
- Zcs.CollectGarbage(GcCtx);
- const HashKeySet& Deleted = GcCtx.DeletedCids();
- Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
- }
-
- while (WorkCompleted < NewChunks.size() + Chunks.size())
- {
- Sleep(1);
- }
-
- {
- // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const auto& Chunk : NewChunks)
- {
- ZenCacheValue CacheValue;
- if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
- {
- GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
- }
- }
- std::vector<IoHash> KeepHashes;
- KeepHashes.reserve(GcChunkHashes.size());
- for (const auto& Entry : GcChunkHashes)
- {
- KeepHashes.push_back(Entry.first);
- }
- size_t C = 0;
- while (C < KeepHashes.size())
- {
- if (C % 155 == 0)
- {
- if (C < KeepHashes.size() - 1)
- {
- KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- if (C + 3 < KeepHashes.size() - 1)
- {
- KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- }
- C++;
- }
-
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- GcCtx.AddRetainedCids(KeepHashes);
- Zcs.CollectGarbage(GcCtx);
- const HashKeySet& Deleted = GcCtx.DeletedCids();
- Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
- }
- }
- {
- std::atomic<size_t> WorkCompleted = 0;
- for (const auto& Chunk : GcChunkHashes)
- {
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
- ZenCacheValue CacheValue;
- CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
- CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < GcChunkHashes.size())
- {
- Sleep(1);
- }
- }
- }
-}
-
-TEST_CASE("z$.namespaces")
-{
- using namespace testutils;
-
- const auto CreateCacheValue = [](size_t Size) -> CbObject {
- std::vector<uint8_t> Buf;
- Buf.resize(Size);
-
- CbObjectWriter Writer;
- Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
- return Writer.Save();
- };
-
- auto JobQueue = MakeJobQueue(1, "testqueue");
-
- ScopedTemporaryDirectory TempDir;
- CreateDirectories(TempDir.Path());
-
- const CacheRequestContext Context;
- IoHash Key1;
- IoHash Key2;
- {
- GcManager Gc;
- ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = false}, nullptr);
- const auto Bucket = "teardrinker"sv;
- const auto CustomNamespace = "mynamespace"sv;
-
- // Create a cache record
- Key1 = CreateKey(42);
- CbObject CacheValue = CreateCacheValue(4096);
-
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {});
-
- ZenCacheValue GetValue;
- CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
- CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
-
- // This should just be dropped as we don't allow creating of namespaces on the fly
- Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {});
- CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
- }
-
- {
- GcManager Gc;
- ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr);
- const auto Bucket = "teardrinker"sv;
- const auto CustomNamespace = "mynamespace"sv;
-
- Key2 = CreateKey(43);
- CbObject CacheValue2 = CreateCacheValue(4096);
-
- IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
- Buffer2.SetContentType(ZenContentType::kCbObject);
- ZenCacheValue PutValue2 = {.Value = Buffer2};
- Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {});
-
- ZenCacheValue GetValue;
- CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
- CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
- CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
- CHECK(Zcs.Get(Context, CustomNamespace, Bucket, Key2, GetValue));
- }
-}
-
-TEST_CASE("z$.drop.bucket")
-{
- using namespace testutils;
-
- const auto CreateCacheValue = [](size_t Size) -> CbObject {
- std::vector<uint8_t> Buf;
- Buf.resize(Size);
-
- CbObjectWriter Writer;
- Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
- return Writer.Save();
- };
-
- auto JobQueue = MakeJobQueue(1, "testqueue");
-
- ScopedTemporaryDirectory TempDir;
- CreateDirectories(TempDir.Path());
-
- const CacheRequestContext Context;
- IoHash Key1;
- IoHash Key2;
-
- auto PutValue = [&CreateCacheValue,
- &Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
- // Create a cache record
- IoHash Key = CreateKey(KeyIndex);
- CbObject CacheValue = CreateCacheValue(Size);
-
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
- return Key;
- };
- auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
- ZenCacheValue GetValue;
- Zcs.Get(Context, Namespace, Bucket, Key, GetValue);
- return GetValue;
- };
- WorkerThreadPool Workers(1);
- {
- GcManager Gc;
- ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr);
- const auto Bucket = "teardrinker"sv;
- const auto Namespace = "mynamespace"sv;
-
- Key1 = PutValue(Zcs, Namespace, Bucket, 42, 4096);
- Key2 = PutValue(Zcs, Namespace, Bucket, 43, 2048);
-
- ZenCacheValue Value1 = GetValue(Zcs, Namespace, Bucket, Key1);
- CHECK(Value1.Value);
-
- std::atomic_bool WorkComplete = false;
- Workers.ScheduleWork([&]() {
- zen::Sleep(100);
- Value1.Value = IoBuffer{};
- WorkComplete = true;
- });
- // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
- // Our DropBucket execution blocks any incoming request from completing until we are done with the drop
- CHECK(Zcs.DropBucket(Namespace, Bucket));
- while (!WorkComplete)
- {
- zen::Sleep(1);
- }
-
- // Entire bucket should be dropped, but doing a request should will re-create the namespace but it must still be empty
- Value1 = GetValue(Zcs, Namespace, Bucket, Key1);
- CHECK(!Value1.Value);
- ZenCacheValue Value2 = GetValue(Zcs, Namespace, Bucket, Key2);
- CHECK(!Value2.Value);
- }
-}
-
-TEST_CASE("z$.drop.namespace")
-{
- using namespace testutils;
-
- const CacheRequestContext Context;
-
- const auto CreateCacheValue = [](size_t Size) -> CbObject {
- std::vector<uint8_t> Buf;
- Buf.resize(Size);
-
- CbObjectWriter Writer;
- Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
- return Writer.Save();
- };
-
- auto JobQueue = MakeJobQueue(1, "testqueue");
-
- ScopedTemporaryDirectory TempDir;
- CreateDirectories(TempDir.Path());
-
- auto PutValue = [&CreateCacheValue,
- &Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
- // Create a cache record
- IoHash Key = CreateKey(KeyIndex);
- CbObject CacheValue = CreateCacheValue(Size);
-
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
- return Key;
- };
- auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
- ZenCacheValue GetValue;
- Zcs.Get(Context, Namespace, Bucket, Key, GetValue);
- return GetValue;
- };
- WorkerThreadPool Workers(1);
- {
- GcManager Gc;
- ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr);
- const auto Bucket1 = "teardrinker1"sv;
- const auto Bucket2 = "teardrinker2"sv;
- const auto Namespace1 = "mynamespace1"sv;
- const auto Namespace2 = "mynamespace2"sv;
-
- IoHash Key1 = PutValue(Zcs, Namespace1, Bucket1, 42, 4096);
- IoHash Key2 = PutValue(Zcs, Namespace1, Bucket2, 43, 2048);
- IoHash Key3 = PutValue(Zcs, Namespace2, Bucket1, 44, 4096);
- IoHash Key4 = PutValue(Zcs, Namespace2, Bucket2, 45, 2048);
-
- ZenCacheValue Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1);
- CHECK(Value1.Value);
- ZenCacheValue Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2);
- CHECK(Value2.Value);
- ZenCacheValue Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3);
- CHECK(Value3.Value);
- ZenCacheValue Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4);
- CHECK(Value4.Value);
-
- std::atomic_bool WorkComplete = false;
- Workers.ScheduleWork([&]() {
- zen::Sleep(100);
- Value1.Value = IoBuffer{};
- Value2.Value = IoBuffer{};
- Value3.Value = IoBuffer{};
- Value4.Value = IoBuffer{};
- WorkComplete = true;
- });
- // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
- // Our DropBucket execution blocks any incoming request from completing until we are done with the drop
- CHECK(Zcs.DropNamespace(Namespace1));
- while (!WorkComplete)
- {
- zen::Sleep(1);
- }
-
- // Entire namespace should be dropped, but doing a request should will re-create the namespace but it must still be empty
- Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1);
- CHECK(!Value1.Value);
- Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2);
- CHECK(!Value2.Value);
- Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3);
- CHECK(Value3.Value);
- Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4);
- CHECK(Value4.Value);
- }
-}
-
-TEST_CASE_TEMPLATE("z$.blocked.disklayer.put", ReferenceCaching, testutils::FalseType, testutils::TrueType)
-{
- ScopedTemporaryDirectory TempDir;
-
- GcStorageSize CacheSize;
-
- const auto CreateCacheValue = [](size_t Size) -> CbObject {
- std::vector<uint8_t> Buf;
- Buf.resize(Size, Size & 0xff);
-
- CbObjectWriter Writer;
- Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
- return Writer.Save();
- };
-
- GcManager Gc;
- auto JobQueue = MakeJobQueue(1, "testqueue");
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
-
- CbObject CacheValue = CreateCacheValue(64 * 1024 + 64);
-
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- size_t Key = Buffer.Size();
- IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t));
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {});
-
- ZenCacheValue BufferGet;
- CHECK(Zcs.Get("test_bucket", HashKey, BufferGet));
-
- CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1);
- IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
- Buffer2.SetContentType(ZenContentType::kCbObject);
-
- // We should be able to overwrite even if the file is open for read
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {});
-
- MemoryView OldView = BufferGet.Value.GetView();
-
- ZenCacheValue BufferGet2;
- CHECK(Zcs.Get("test_bucket", HashKey, BufferGet2));
- MemoryView NewView = BufferGet2.Value.GetView();
-
- // Make sure file openend for read before we wrote it still have old data
- CHECK(OldView.GetSize() == Buffer.GetSize());
- CHECK(memcmp(OldView.GetData(), Buffer.GetData(), OldView.GetSize()) == 0);
-
- // Make sure we get the new data when reading after we write new data
- CHECK(NewView.GetSize() == Buffer2.GetSize());
- CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0);
-}
-
-TEST_CASE_TEMPLATE("z$.scrub", ReferenceCaching, testutils::FalseType, testutils::TrueType)
-{
- ScopedTemporaryDirectory TempDir;
-
- using namespace testutils;
-
- struct CacheRecord
- {
- IoBuffer Record;
- std::vector<CompressedBuffer> Attachments;
- };
-
- auto CreateCacheRecord = [](bool Structured, std::string_view Bucket, const IoHash& Key, const std::vector<size_t>& AttachmentSizes) {
- CacheRecord Result;
- if (Structured)
- {
- Result.Attachments.resize(AttachmentSizes.size());
- CbObjectWriter Record;
- Record.BeginObject("Key"sv);
- {
- Record << "Bucket"sv << Bucket;
- Record << "Hash"sv << Key;
- }
- Record.EndObject();
- for (size_t Index = 0; Index < AttachmentSizes.size(); Index++)
- {
- IoBuffer AttachmentData = CreateRandomBlob(AttachmentSizes[Index]);
- CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData));
- Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), CompressedAttachmentData.DecodeRawHash());
- Result.Attachments[Index] = CompressedAttachmentData;
- }
- Result.Record = Record.Save().GetBuffer().AsIoBuffer();
- Result.Record.SetContentType(ZenContentType::kCbObject);
- }
- else
- {
- std::string RecordData = fmt::format("{}:{}", Bucket, Key.ToHexString());
- size_t TotalSize = RecordData.length() + 1;
- for (size_t AttachmentSize : AttachmentSizes)
- {
- TotalSize += AttachmentSize;
- }
- Result.Record = IoBuffer(TotalSize);
- char* DataPtr = (char*)Result.Record.MutableData();
- memcpy(DataPtr, RecordData.c_str(), RecordData.length() + 1);
- DataPtr += RecordData.length() + 1;
- for (size_t AttachmentSize : AttachmentSizes)
- {
- IoBuffer AttachmentData = CreateRandomBlob(AttachmentSize);
- memcpy(DataPtr, AttachmentData.GetData(), AttachmentData.GetSize());
- DataPtr += AttachmentData.GetSize();
- }
- }
- return Result;
- };
-
- GcManager Gc;
- CidStore CidStore(Gc);
- auto JobQueue = MakeJobQueue(1, "testqueue");
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
-
- auto CreateRecords =
- [&](bool IsStructured, std::string_view BucketName, const std::vector<IoHash>& Cids, const std::vector<size_t>& AttachmentSizes) {
- for (const IoHash& Cid : Cids)
- {
- CacheRecord Record = CreateCacheRecord(IsStructured, BucketName, Cid, AttachmentSizes);
- std::vector<IoHash> AttachmentHashes;
- for (const CompressedBuffer& Attachment : Record.Attachments)
- {
- AttachmentHashes.push_back(Attachment.DecodeRawHash());
- CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), AttachmentHashes.back());
- }
- Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes);
- }
- };
-
- std::vector<size_t> AttachmentSizes = {16, 1000, 2000, 4000, 8000, 64000, 80000};
-
- std::vector<IoHash> UnstructuredCids{CreateKey(4), CreateKey(5), CreateKey(6)};
- CreateRecords(false, "mybucket"sv, UnstructuredCids, AttachmentSizes);
-
- std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)};
- CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes);
-
- WorkerThreadPool ThreadPool{1};
- ScrubContext ScrubCtx{ThreadPool};
- Zcs.ScrubStorage(ScrubCtx);
- CidStore.ScrubStorage(ScrubCtx);
- CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());
- CHECK(ScrubCtx.BadCids().GetSize() == 0);
-}
-
-TEST_CASE_TEMPLATE("z$.newgc.basics", ReferenceCaching, testutils::FalseType, testutils::TrueType)
-{
- using namespace testutils;
-
- ScopedTemporaryDirectory TempDir;
-
- auto JobQueue = MakeJobQueue(1, "testqueue");
-
- struct CacheEntry
- {
- IoBuffer Data;
- std::vector<std::pair<Oid, CompressedBuffer>> Attachments;
- };
-
- std::unordered_map<IoHash, CacheEntry> CacheEntries;
-
- auto CreateCacheRecord =
- [&](ZenCacheNamespace& Zcs, CidStore& CidStore, std::string_view Bucket, std::span<std::pair<Oid, CompressedBuffer>> Attachments) {
- std::vector<IoHash> AttachmentKeys;
- for (const auto& Attachment : Attachments)
- {
- AttachmentKeys.push_back(Attachment.second.DecodeRawHash());
- }
- auto Record = CreateRecord(Attachments);
- Zcs.Put(Bucket,
- Record.first,
- {.Value = Record.second,
- .RawSize = Record.second.GetSize(),
- .RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())},
- AttachmentKeys);
- for (const auto& Attachment : Attachments)
- {
- CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash());
- }
- CacheEntries.insert({Record.first, CacheEntry{.Data = Record.second, .Attachments = {Attachments.begin(), Attachments.end()}}});
- return Record.first;
- };
- auto CreateCacheValue = [&](ZenCacheNamespace& Zcs, std::string_view Bucket, size_t Size) {
- std::pair<Oid, IoBuffer> CacheValue = CreateBinaryBlob(Size);
- IoHash Key = ToIoHash(CacheValue.first);
- Zcs.Put(Bucket,
- Key,
- {.Value = CacheValue.second,
- .RawSize = CacheValue.second.GetSize(),
- .RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())},
- {});
- CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}});
- return Key;
- };
-
- auto ValidateCacheEntry = [&](ZenCacheNamespace& Zcs,
- CidStore& CidStore,
- std::string_view Bucket,
- const IoHash& Key,
- bool ExpectCacheEntry,
- bool ExpectAttachments) {
- const CacheEntry& Entry = CacheEntries[Key];
- ZenCacheValue Value;
- bool CacheExists = Zcs.Get(Bucket, Key, Value);
- if (ExpectCacheEntry)
- {
- if (!CacheExists)
- {
- return false;
- }
- if (Value.Value.GetSize() != Entry.Data.GetSize())
- {
- return false;
- }
- if (!Value.Value.GetView().EqualBytes(Entry.Data.GetView()))
- {
- return false;
- }
- }
- else if (CacheExists)
- {
- return false;
- }
-
- if (ExpectAttachments)
- {
- for (const auto& Attachment : Entry.Attachments)
- {
- IoHash AttachmentHash = Attachment.second.DecodeRawHash();
- IoBuffer StoredData = CidStore.FindChunkByCid(AttachmentHash);
- if (!StoredData)
- {
- return false;
- }
- if (!StoredData.GetView().EqualBytes(Attachment.second.GetCompressed().Flatten().GetView()))
- {
- return false;
- }
- }
- }
- else
- {
- for (const auto& Attachment : Entry.Attachments)
- {
- IoHash AttachmentHash = Attachment.second.DecodeRawHash();
- if (CidStore.ContainsChunk(AttachmentHash))
- {
- return false;
- }
- }
- }
- return true;
- };
-
- std::vector<IoHash> CacheRecords;
- std::vector<IoHash> UnstructuredCacheValues;
-
- const auto TearDrinkerBucket = "teardrinker"sv;
- {
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
-
- // Create some basic data
- {
- // Structured record with attachments
- auto Attachments1 = CreateCompressedAttachment(CidStore, std::vector<size_t>{77, 1024 * 1024 * 2, 99, 1024 * 1024 * 2 + 87});
- CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments1));
-
- // Structured record with reuse of attachments
- auto Attachments2 = CreateCompressedAttachment(CidStore, std::vector<size_t>{971});
- Attachments2.push_back(Attachments1[0]);
- Attachments2.push_back(Attachments1[1]);
- CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments2));
- }
-
- CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, {}));
-
- {
- // Unstructured cache values
- UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 84));
- UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 591));
- UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 1024 * 1024 * 3 + 7));
- UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 71));
- }
- }
-
- SUBCASE("expire nothing")
- {
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
-
- GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() - std::chrono::hours(1),
- .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(1),
- .CollectSmallObjects = false,
- .IsDeleteMode = false,
- .Verbose = true});
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
- CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
- CHECK_EQ(0u, Result.CompactStoresStatSum.RemovedDisk);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
- }
- SUBCASE("expire all large objects, delete nothing")
- {
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
-
- GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
- .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
- .CollectSmallObjects = false,
- .IsDeleteMode = false,
- .Verbose = true});
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
- CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
- CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
- CHECK_EQ(0u, Result.CompactStoresStatSum.RemovedDisk);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
- }
- SUBCASE("expire all object, delete nothing")
- {
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
-
- GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
- .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
- .CollectSmallObjects = true,
- .IsDeleteMode = false,
- .Verbose = true});
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
- CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
- CHECK_EQ(0u, Result.CompactStoresStatSum.RemovedDisk);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
- }
- SUBCASE("expire all large objects, skip cid")
- {
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
-
- GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
- .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
- .CollectSmallObjects = false,
- .IsDeleteMode = true,
- .SkipCidDelete = true,
- .Verbose = true});
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
- CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
- CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
- CHECK_EQ(CacheEntries[UnstructuredCacheValues[2]].Data.GetSize(), Result.CompactStoresStatSum.RemovedDisk);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
- }
- SUBCASE("expire all objects, skip cid")
- {
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
-
- GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
- .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
- .CollectSmallObjects = true,
- .IsDeleteMode = true,
- .SkipCidDelete = true,
- .Verbose = true});
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
- CHECK_GE(Result.CompactStoresStatSum.RemovedDisk, 0);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true));
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, true));
- }
- SUBCASE("expire all large objects")
- {
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
-
- GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
- .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
- .CollectSmallObjects = false,
- .IsDeleteMode = true,
- .SkipCidDelete = false,
- .Verbose = true});
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
- CHECK_EQ(1u,
- Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount); // Only one cache value is pruned/deleted as that is the only
- // large item in the cache (all other large items as in cas)
- CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
- CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
- CHECK_EQ(0u,
- Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats
- .FoundCount); // We won't remove any references since all referencers are small which retains all references
- CHECK_EQ(0u,
- Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats
- .DeletedCount); // We won't remove any references since all referencers are small which retains all references
- CHECK_EQ(CacheEntries[UnstructuredCacheValues[2]].Data.GetSize(), Result.CompactStoresStatSum.RemovedDisk);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
- }
- SUBCASE("expire all objects")
- {
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
-
- GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
- .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
- .CollectSmallObjects = true,
- .IsDeleteMode = true,
- .SkipCidDelete = false,
- .Verbose = true});
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
- CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
- CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
- CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
- CHECK_GT(Result.CompactStoresStatSum.RemovedDisk, 0);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false));
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false));
- }
-
- SUBCASE("keep 1 cache record, skip cid")
- {
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
-
- Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::hours(2));
-
- GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
- .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
- .CollectSmallObjects = true,
- .IsDeleteMode = true,
- .SkipCidDelete = true,
- .Verbose = true,
- .CompactBlockUsageThresholdPercent = 100});
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
- CHECK_EQ(6u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
- CHECK_EQ(6u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
- uint64_t MinExpectedRemoveSize = CacheEntries[UnstructuredCacheValues[2]].Data.GetSize();
- CHECK_LT(MinExpectedRemoveSize, Result.CompactStoresStatSum.RemovedDisk);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true));
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false));
- }
-
- SUBCASE("keep 2 cache records")
- {
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
-
- Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::hours(2));
- Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[1], GcClock::Now() + std::chrono::hours(2));
-
- GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
- .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
- .CollectSmallObjects = true,
- .IsDeleteMode = true,
- .SkipCidDelete = false,
- .Verbose = true});
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
- CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
- CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
- CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
- CHECK_GT(Result.CompactStoresStatSum.RemovedDisk, 0);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false));
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false));
- }
-
- SUBCASE("keep 3 cache value")
- {
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
-
- Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::hours(2));
- Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::hours(2));
- Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::hours(2));
-
- GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
- .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
- .CollectSmallObjects = true,
- .IsDeleteMode = true,
- .SkipCidDelete = false,
- .Verbose = true});
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
- CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
- CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
- CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
- CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
- CHECK_EQ(5u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
- CHECK_GT(Result.CompactStoresStatSum.RemovedDisk, 0);
- CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false));
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
- }
-
- SUBCASE("keep 3 cache value, skip cid")
- {
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
-
- // Prime so we can check GC of memory layer
- ZenCacheValue Dummy;
- Zcs.Get(TearDrinkerBucket, CacheRecords[0], Dummy);
- Zcs.Get(TearDrinkerBucket, CacheRecords[1], Dummy);
- Zcs.Get(TearDrinkerBucket, CacheRecords[2], Dummy);
- Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[0], Dummy);
- Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[1], Dummy);
- Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[2], Dummy);
- Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[3], Dummy);
-
- Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::hours(2));
- Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::hours(2));
- Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::hours(2));
-
- GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
- .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
- .CollectSmallObjects = true,
- .IsDeleteMode = true,
- .SkipCidDelete = true,
- .Verbose = true,
- .CompactBlockUsageThresholdPercent = 100});
- CHECK_EQ(7u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
- CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
- CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
- CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
- CHECK_GT(Result.CompactStoresStatSum.RemovedDisk, 0);
- uint64_t MemoryClean = CacheEntries[CacheRecords[0]].Data.GetSize() + CacheEntries[CacheRecords[1]].Data.GetSize() +
- CacheEntries[CacheRecords[2]].Data.GetSize() + CacheEntries[UnstructuredCacheValues[0]].Data.GetSize();
- CHECK_EQ(MemoryClean, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true));
-
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
- CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
- }
-
- SUBCASE("leave write block")
- {
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
- ZenCacheNamespace Zcs(Gc,
- *JobQueue,
- TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
- CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
-
- auto Attachments =
- CreateCompressedAttachment(CidStore, std::vector<size_t>{177, 1024 * 1024 * 2 + 31, 8999, 1024 * 1024 * 2 + 187});
- IoHash CacheRecord = CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments);
- {
- // Do get so it ends up in memcache
- ZenCacheValue _;
- Zcs.Get(TearDrinkerBucket, CacheRecord, _);
- }
-
- Zcs.SetAccessTime(TearDrinkerBucket, CacheRecord, GcClock::Now() - std::chrono::hours(2));
-
- Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::hours(2));
- Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[1], GcClock::Now() + std::chrono::hours(2));
- Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[2], GcClock::Now() + std::chrono::hours(2));
-
- Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[0], GcClock::Now() + std::chrono::hours(2));
- Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::hours(2));
- Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::hours(2));
- Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::hours(2));
-
- GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
- .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(1),
- .CollectSmallObjects = true,
- .IsDeleteMode = true,
- .SkipCidDelete = false,
- .Verbose = true});
- // Write block can't be compacted so Compacted will be less than Deleted
- CHECK_EQ(8u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
- CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.FoundCount);
- CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
- CHECK_EQ(9u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
- CHECK_EQ(4u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.FoundCount);
- CHECK_EQ(4u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
- CHECK_EQ(Attachments[1].second.GetCompressed().GetSize() + Attachments[3].second.GetCompressed().GetSize(),
- Result.CompactStoresStatSum.RemovedDisk);
- uint64_t MemoryClean = CacheEntries[CacheRecord].Data.GetSize();
- CHECK_EQ(MemoryClean, Result.ReferencerStatSum.RemoveExpiredDataStats.FreedMemory);
- }
-}
-
-#endif
-
-void
-z$_forcelink()
-{
-}
-
-} // namespace zen
diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h
deleted file mode 100644
index 4ab8f90f0..000000000
--- a/src/zenserver/cache/structuredcachestore.h
+++ /dev/null
@@ -1,272 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "cachedisklayer.h"
-#include "stats/statsreporter.h"
-
-#include <zencore/compactbinary.h>
-#include <zencore/iohash.h>
-#include <zencore/stats.h>
-#include <zenstore/gc.h>
-#include <zenutil/cache/cache.h>
-
-#include <atomic>
-#include <compare>
-#include <filesystem>
-#include <string_view>
-#include <unordered_map>
-
-namespace zen {
-
-class StatsDaemonClient;
-
-/******************************************************************************
-
- /$$$$$$$$ /$$$$$$ /$$
- |_____ $$ /$$__ $$ | $$
- /$$/ /$$$$$$ /$$$$$$$ | $$ \__/ /$$$$$$ /$$$$$$| $$$$$$$ /$$$$$$
- /$$/ /$$__ $| $$__ $$ | $$ |____ $$/$$_____| $$__ $$/$$__ $$
- /$$/ | $$$$$$$| $$ \ $$ | $$ /$$$$$$| $$ | $$ \ $| $$$$$$$$
- /$$/ | $$_____| $$ | $$ | $$ $$/$$__ $| $$ | $$ | $| $$_____/
- /$$$$$$$| $$$$$$| $$ | $$ | $$$$$$| $$$$$$| $$$$$$| $$ | $| $$$$$$$
- |________/\_______|__/ |__/ \______/ \_______/\_______|__/ |__/\_______/
-
- Cache store for UE5. Restricts keys to "{bucket}/{hash}" pairs where the hash
- is 40 (hex) chars in size. Values may be opaque blobs or structured objects
- which can in turn contain references to other objects (or blobs). Buckets are
- organized in namespaces to enable project isolation.
-
-******************************************************************************/
-
-class WorkerThreadPool;
-class DiskWriteBlocker;
-class JobQueue;
-
-/* Z$ namespace
-
- A namespace scopes a set of buckets, and would typically be used to isolate
- projects from each other.
-
- */
-class ZenCacheNamespace final : public GcStorage, public GcContributor
-{
-public:
- struct Configuration
- {
- ZenCacheDiskLayer::Configuration DiskLayerConfig;
- };
- struct BucketInfo
- {
- ZenCacheDiskLayer::BucketInfo DiskLayerInfo;
- };
- struct Info
- {
- std::filesystem::path RootDir;
- Configuration Config;
- std::vector<std::string> BucketNames;
- ZenCacheDiskLayer::Info DiskLayerInfo;
- };
-
- struct NamespaceStats
- {
- uint64_t HitCount;
- uint64_t MissCount;
- uint64_t WriteCount;
- metrics::RequestStatsSnapshot PutOps;
- metrics::RequestStatsSnapshot GetOps;
- ZenCacheDiskLayer::DiskStats DiskStats;
- };
-
- ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
- ~ZenCacheNamespace();
-
- bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
-
- bool DropBucket(std::string_view Bucket);
- void EnumerateBucketContents(std::string_view Bucket,
- std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const;
-
- bool Drop();
- void Flush();
-
- // GcContributor
- virtual void GatherReferences(GcContext& GcCtx) override;
-
- // GcStorage
- virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
- virtual void CollectGarbage(GcContext& GcCtx) override;
- virtual GcStorageSize StorageSize() const override;
-
- Configuration GetConfig() const { return m_Configuration; }
- Info GetInfo() const;
- std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
- NamespaceStats Stats();
-
- CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const;
-
-#if ZEN_WITH_TESTS
- void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time);
-#endif // ZEN_WITH_TESTS
-
-private:
- GcManager& m_Gc;
- JobQueue& m_JobQueue;
- std::filesystem::path m_RootDir;
- Configuration m_Configuration;
- ZenCacheDiskLayer m_DiskLayer;
- std::atomic<uint64_t> m_HitCount{};
- std::atomic<uint64_t> m_MissCount{};
- std::atomic<uint64_t> m_WriteCount{};
- metrics::RequestStats m_PutOps;
- metrics::RequestStats m_GetOps;
- uint64_t m_LastScrubTime = 0;
-
- ZenCacheNamespace(const ZenCacheNamespace&) = delete;
- ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete;
-};
-
-/** Cache store interface
-
- This manages a set of namespaces used for derived data caching purposes.
-
- */
-
-class ZenCacheStore final : public RefCounted, public StatsProvider
-{
-public:
- static constexpr std::string_view DefaultNamespace =
- "!default!"; // This is intentionally not a valid namespace name and will only be used for mapping when no namespace is given
- static constexpr std::string_view NamespaceDiskPrefix = "ns_";
-
- struct Configuration
- {
- ZenCacheNamespace::Configuration NamespaceConfig;
- bool AllowAutomaticCreationOfNamespaces = false;
- struct LogConfig
- {
- bool EnableWriteLog = true;
- bool EnableAccessLog = true;
- } Logging;
- };
-
- struct Info
- {
- std::filesystem::path BasePath;
- Configuration Config;
- std::vector<std::string> NamespaceNames;
- uint64_t DiskEntryCount = 0;
- GcStorageSize StorageSize;
- };
-
- struct NamedNamespaceStats
- {
- std::string NamespaceName;
- ZenCacheNamespace::NamespaceStats Stats;
- };
-
- struct CacheStoreStats
- {
- uint64_t HitCount = 0;
- uint64_t MissCount = 0;
- uint64_t WriteCount = 0;
- uint64_t RejectedWriteCount = 0;
- uint64_t RejectedReadCount = 0;
- metrics::RequestStatsSnapshot PutOps;
- metrics::RequestStatsSnapshot GetOps;
- std::vector<NamedNamespaceStats> NamespaceStats;
- };
-
- ZenCacheStore(GcManager& Gc,
- JobQueue& JobQueue,
- const std::filesystem::path& BasePath,
- const Configuration& Configuration,
- const DiskWriteBlocker* InDiskWriteBlocker);
- ~ZenCacheStore();
-
- bool Get(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::string_view Bucket,
- const IoHash& HashKey,
- ZenCacheValue& OutValue);
- void Put(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::string_view Bucket,
- const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References);
- bool DropBucket(std::string_view Namespace, std::string_view Bucket);
- bool DropNamespace(std::string_view Namespace);
- void Flush();
- void ScrubStorage(ScrubContext& Ctx);
-
- CacheValueDetails GetValueDetails(const std::string_view NamespaceFilter,
- const std::string_view BucketFilter,
- const std::string_view ValueFilter) const;
-
- GcStorageSize StorageSize() const;
- CacheStoreStats Stats(bool IncludeNamespaceStats = true);
-
- Configuration GetConfiguration() const { return m_Configuration; }
- void SetLoggingConfig(const Configuration::LogConfig& Loggingconfig);
- Info GetInfo() const;
- std::optional<ZenCacheNamespace::Info> GetNamespaceInfo(std::string_view Namespace);
- std::optional<ZenCacheNamespace::BucketInfo> GetBucketInfo(std::string_view Namespace, std::string_view Bucket);
- std::vector<std::string> GetNamespaces();
-
- void EnumerateBucketContents(std::string_view Namespace,
- std::string_view Bucket,
- std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>&& Fn);
-
- // StatsProvider
- virtual void ReportMetrics(StatsMetrics& Statsd) override;
-
-private:
- const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const;
- ZenCacheNamespace* GetNamespace(std::string_view Namespace);
- void IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const;
-
- typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NamespaceMap;
-
- CacheStoreStats m_LastReportedMetrics;
- const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
- mutable RwLock m_NamespacesLock;
- NamespaceMap m_Namespaces;
- std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces;
-
- GcManager& m_Gc;
- JobQueue& m_JobQueue;
- std::filesystem::path m_BasePath;
- Configuration m_Configuration;
- std::atomic<uint64_t> m_HitCount{};
- std::atomic<uint64_t> m_MissCount{};
- std::atomic<uint64_t> m_WriteCount{};
- std::atomic<uint64_t> m_RejectedWriteCount{};
- std::atomic<uint64_t> m_RejectedReadCount{};
- metrics::RequestStats m_PutOps;
- metrics::RequestStats m_GetOps;
-
- struct AccessLogItem
- {
- const char* Op;
- CacheRequestContext Context;
- std::string Namespace;
- std::string Bucket;
- IoHash HashKey;
- ZenCacheValue Value;
- };
-
- void LogWorker();
- RwLock m_LogQueueLock;
- std::vector<AccessLogItem> m_LogQueue;
- std::atomic_bool m_ExitLogging;
- Event m_LogEvent;
- std::thread m_AsyncLoggingThread;
- std::atomic_bool m_WriteLogEnabled;
- std::atomic_bool m_AccessLogEnabled;
-};
-
-void z$_forcelink();
-
-} // namespace zen