diff options
| author | Stefan Boberg <[email protected]> | 2023-12-19 21:49:55 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-19 21:49:55 +0100 |
| commit | 8a90a40b4517d198855c1e52740a7e7fb21ecc20 (patch) | |
| tree | ffd8524bcebf8841b69725934f31d438a03e7746 /src/zenstore/cache/cachedisklayer.cpp | |
| parent | 0.2.38 (diff) | |
| download | zen-8a90a40b4517d198855c1e52740a7e7fb21ecc20.tar.xz zen-8a90a40b4517d198855c1e52740a7e7fb21ecc20.zip | |
move cachedisklayer and structuredcachestore into zenstore (#624)
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 4397 |
1 files changed, 4397 insertions, 0 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp new file mode 100644 index 000000000..82f190caa --- /dev/null +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -0,0 +1,4397 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenstore/cachedisklayer.h" + +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compress.h> +#include <zencore/except.h> +#include <zencore/fmtutils.h> +#include <zencore/jobqueue.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zencore/xxhash.h> +#include <zenutil/workerpools.h> + +#include <future> + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +bool +IsKnownBadBucketName(std::string_view Bucket) +{ + if (Bucket.size() == 32) + { + uint8_t BucketHex[16]; + if (ParseHexBytes(Bucket, BucketHex)) + { + return true; + } + } + + return false; +} + +namespace { + +#pragma pack(push) +#pragma pack(1) + + struct CacheBucketIndexHeader + { + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t Version2 = 2; + static constexpr uint32_t CurrentVersion = Version2; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t PayloadAlignment = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + + bool IsValid() const + { + if (Magic != ExpectedMagic) + { + return false; + } + + if (Checksum != ComputeChecksum(*this)) + { + return false; + } + + if (PayloadAlignment == 0) + { + return false; + } + + return true; + } + }; + + static_assert(sizeof(CacheBucketIndexHeader) == 32); + + struct BucketMetaHeader + { + static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta'; + static constexpr uint32_t Version1 = 1; + static constexpr uint32_t CurrentVersion = Version1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t Padding = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const BucketMetaHeader& Header) + { + return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + + bool IsValid() const + { + if (Magic != ExpectedMagic) + { + return false; + } + + if (Checksum != ComputeChecksum(*this)) + { + return false; + } + + if (Padding != 0) + { + return false; + } + + return true; + } + }; + + static_assert(sizeof(BucketMetaHeader) == 32); + +#pragma pack(pop) + + ////////////////////////////////////////////////////////////////////////// + + template<typename T> + void Reset(T& V) + { + T Tmp; + V.swap(Tmp); + } + + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".slog"; + const char* MetaExtension = ".meta"; + + std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + IndexExtension); + } + + std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + MetaExtension); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + LogExtension); + } + + std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + ZEN_UNUSED(BucketName); + return BucketDir / "zen_manifest"; + } + + bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.Reserved != 0) + { + OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.GetFlags() & + ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + { + return true; + } + if (Entry.Location.Reserved != 0) + { + OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); + return false; + } + uint64_t Size = Entry.Location.Size(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + + bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) + { + int DropIndex = 0; + do + { + if (!std::filesystem::exists(Dir)) + { + return false; + } + + std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); + std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; + if (std::filesystem::exists(DroppedBucketPath)) + { + DropIndex++; + continue; + } + + std::error_code Ec; + std::filesystem::rename(Dir, DroppedBucketPath, Ec); + if (!Ec) + { + DeleteDirectories(DroppedBucketPath); + return true; + } + // TODO: Do we need to bail at some point? + zen::Sleep(100); + } while (true); + } +} // namespace + +namespace fs = std::filesystem; +using namespace std::literals; + +class BucketManifestSerializer +{ + using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; + using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData; + + using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; + using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; + +public: + // We use this to indicate if a on disk bucket needs wiping + // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references + // to block items. + // See: https://github.com/EpicGames/zen/pull/299 + static inline const uint32_t CurrentDiskBucketVersion = 1; + + bool Open(std::filesystem::path ManifestPath) + { + Manifest = LoadCompactBinaryObject(ManifestPath); + return !!Manifest; + } + + Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); } + + bool IsCurrentVersion(uint32_t& OutVersion) const + { + OutVersion = Manifest["Version"sv].AsUInt32(0); + return OutVersion == CurrentDiskBucketVersion; + } + + void ParseManifest(RwLock::ExclusiveLockScope& BucketLock, + ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path ManifestPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads); + + Oid GenerateNewManifest(std::filesystem::path ManifestPath); + + IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount); + uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); } + void WriteSidecarFile(RwLock::SharedLockScope& BucketLock, + const std::filesystem::path& SidecarPath, + uint64_t SnapshotLogPosition, + const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + const std::vector<AccessTime>& AccessTimes, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas); + bool ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock, + ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path SidecarPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads); + + IoBuffer MakeManifest(const Oid& BucketId, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector<AccessTime>&& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>&& Payloads, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>&& MetaDatas); + + CbObject Manifest; + +private: + CbObject LoadCompactBinaryObject(const fs::path& Path) + { + FileContents Result = ReadFile(Path); + + if (!Result.ErrorCode) + { + IoBuffer Buffer = Result.Flatten(); + if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + { + return zen::LoadCompactBinaryObject(Buffer); + } + } + + return CbObject(); + } + + uint64_t m_ManifestEntryCount = 0; + + struct ManifestData + { + IoHash Key; // 20 + AccessTime Timestamp; // 4 + IoHash RawHash; // 20 + uint32_t Padding_0; // 4 + size_t RawSize; // 8 + uint64_t Padding_1; // 8 + }; + + static_assert(sizeof(ManifestData) == 64); +}; + +void +BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& BucketLock, + ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path ManifestPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads) +{ + if (Manifest["UsingMetaFile"sv].AsBool()) + { + ReadSidecarFile(BucketLock, Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads); + + return; + } + + ZEN_TRACE_CPU("Z$::ParseManifest"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + + const uint64_t Count = Manifest["Count"sv].AsUInt64(0); + std::vector<PayloadIndex> KeysIndexes; + KeysIndexes.reserve(Count); + + CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); + for (CbFieldView& KeyView : KeyArray) + { + if (auto It = Index.find(KeyView.AsHash()); It != Index.end()) + { + KeysIndexes.push_back(It.value()); + } + else + { + KeysIndexes.push_back(PayloadIndex()); + } + } + + size_t KeyIndexOffset = 0; + CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); + for (CbFieldView& TimeStampView : TimeStampArray) + { + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex) + { + AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + } + } + + KeyIndexOffset = 0; + CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); + CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); + if (RawHashArray.Num() == RawSizeArray.Num()) + { + auto RawHashIt = RawHashArray.CreateViewIterator(); + auto RawSizeIt = RawSizeArray.CreateViewIterator(); + while (RawHashIt != CbFieldViewIterator()) + { + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + + if (KeyIndex) + { + uint64_t RawSize = RawSizeIt.AsUInt64(); + IoHash RawHash = RawHashIt.AsHash(); + if (RawSize != 0 || RawHash != IoHash::Zero) + { + BucketPayload& Payload = Payloads[KeyIndex]; + Bucket.SetMetaData(BucketLock, Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); + } + } + + RawHashIt++; + RawSizeIt++; + } + } + else + { + ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); + } +} + +Oid +BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath) +{ + const Oid BucketId = Oid::NewOid(); + + CbObjectWriter Writer; + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + Manifest = Writer.Save(); + WriteFile(ManifestPath, Manifest.GetBuffer().AsIoBuffer()); + + return BucketId; +} + +IoBuffer +BucketManifestSerializer::MakeManifest(const Oid& BucketId, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector<AccessTime>&& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>&& Payloads, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>&& MetaDatas) +{ + using namespace std::literals; + + ZEN_TRACE_CPU("Z$::MakeManifest"); + + size_t ItemCount = Index.size(); + + // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth + // And we don't need to reallocate the underlying buffer in almost every case + const size_t EstimatedSizePerItem = 54u; + const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); + CbObjectWriter Writer(ReserveSize); + + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + + if (!Index.empty()) + { + Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size())); + Writer.BeginArray("Keys"sv); + for (auto& Kv : Index) + { + const IoHash& Key = Kv.first; + Writer.AddHash(Key); + } + Writer.EndArray(); + + Writer.BeginArray("Timestamps"sv); + for (auto& Kv : Index) + { + GcClock::Tick AccessTime = AccessTimes[Kv.second]; + Writer.AddInteger(AccessTime); + } + Writer.EndArray(); + + if (!MetaDatas.empty()) + { + Writer.BeginArray("RawHash"sv); + for (auto& Kv : Index) + { + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; + if (Payload.MetaData) + { + Writer.AddHash(MetaDatas[Payload.MetaData].RawHash); + } + else + { + Writer.AddHash(IoHash::Zero); + } + } + Writer.EndArray(); + + Writer.BeginArray("RawSize"sv); + for (auto& Kv : Index) + { + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; + if (Payload.MetaData) + { + Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize); + } + else + { + Writer.AddInteger(0); + } + } + Writer.EndArray(); + } + } + + Manifest = Writer.Save(); + return Manifest.GetBuffer().AsIoBuffer(); +} + +IoBuffer +BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount) +{ + m_ManifestEntryCount = EntryCount; + + CbObjectWriter Writer; + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + Writer << "Count"sv << EntryCount; + Writer << "UsingMetaFile"sv << true; + Manifest = Writer.Save(); + + return Manifest.GetBuffer().AsIoBuffer(); +} + +bool +BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock, + ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path SidecarPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads) +{ + ZEN_TRACE_CPU("Z$::ReadSidecarFile"); + + ZEN_ASSERT(AccessTimes.size() == Payloads.size()); + + std::error_code Ec; + + BasicFile SidecarFile; + SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to open sidecar file '{}'", SidecarPath)); + } + + uint64_t FileSize = SidecarFile.FileSize(); + + auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); }); + + if (FileSize < sizeof(BucketMetaHeader)) + { + return false; + } + + BasicFileBuffer Sidecar(SidecarFile, 128 * 1024); + + BucketMetaHeader Header; + Sidecar.Read(&Header, sizeof Header, 0); + + if (!Header.IsValid()) + { + return false; + } + + if (Header.Version != BucketMetaHeader::Version1) + { + return false; + } + + const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData); + if (Header.EntryCount > ExpectedEntryCount) + { + return false; + } + + InvalidGuard.Dismiss(); + + uint64_t RemainingEntryCount = ExpectedEntryCount; + uint64_t EntryCount = 0; + uint64_t CurrentReadOffset = sizeof(Header); + + while (RemainingEntryCount--) + { + const ManifestData* Entry = Sidecar.MakeView<ManifestData>(CurrentReadOffset); + CurrentReadOffset += sizeof(ManifestData); + + if (auto It = Index.find(Entry->Key); It != Index.end()) + { + PayloadIndex PlIndex = It.value(); + + ZEN_ASSERT(size_t(PlIndex) <= Payloads.size()); + + ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex]; + + AccessTimes[PlIndex] = Entry->Timestamp; + + if (Entry->RawSize && Entry->RawHash != IoHash::Zero) + { + Bucket.SetMetaData(BucketLock, PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash}); + } + } + + EntryCount++; + } + + ZEN_ASSERT(EntryCount == ExpectedEntryCount); + + return true; +} + +void +BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&, + const std::filesystem::path& SidecarPath, + uint64_t SnapshotLogPosition, + const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + const std::vector<AccessTime>& AccessTimes, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas) +{ + ZEN_TRACE_CPU("Z$::WriteSidecarFile"); + + BucketMetaHeader Header; + Header.EntryCount = m_ManifestEntryCount; + Header.LogPosition = SnapshotLogPosition; + Header.Checksum = Header.ComputeChecksum(Header); + + std::error_code Ec; + + TemporaryFile SidecarFile; + SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath())); + } + + SidecarFile.Write(&Header, sizeof Header, 0); + + // TODO: make this batching for better performance + { + uint64_t WriteOffset = sizeof Header; + + // BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024); + + std::vector<ManifestData> ManifestDataBuffer; + const size_t MaxManifestDataBufferCount = Min(Index.size(), 4096u); // 256 Kb + ManifestDataBuffer.reserve(MaxManifestDataBufferCount); + for (auto& Kv : Index) + { + const IoHash& Key = Kv.first; + const PayloadIndex PlIndex = Kv.second; + + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = 0; + + if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData) + { + RawHash = MetaDatas[MetaIndex].RawHash; + RawSize = MetaDatas[MetaIndex].RawSize; + } + + ManifestDataBuffer.emplace_back(ManifestData{.Key = Key, + .Timestamp = AccessTimes[PlIndex], + .RawHash = RawHash, + .Padding_0 = 0, + .RawSize = RawSize, + .Padding_1 = 0}); + if (ManifestDataBuffer.size() == MaxManifestDataBufferCount) + { + const uint64_t WriteSize = sizeof(ManifestData) * ManifestDataBuffer.size(); + SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset); + WriteOffset += WriteSize; + ManifestDataBuffer.clear(); + ManifestDataBuffer.reserve(MaxManifestDataBufferCount); + } + } + if (ManifestDataBuffer.size() > 0) + { + SidecarFile.Write(ManifestDataBuffer.data(), sizeof(ManifestData) * ManifestDataBuffer.size(), WriteOffset); + } + } + + SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath)); + } +} + +////////////////////////////////////////////////////////////////////////// + +static const float IndexMinLoadFactor = 0.2f; +static const float IndexMaxLoadFactor = 0.7f; + +ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, + std::atomic_uint64_t& OuterCacheMemoryUsage, + std::string BucketName, + const BucketConfiguration& Config) +: m_Gc(Gc) +, m_OuterCacheMemoryUsage(OuterCacheMemoryUsage) +, m_BucketName(std::move(BucketName)) +, m_Configuration(Config) +, m_BucketId(Oid::Zero) +{ + m_Index.min_load_factor(IndexMinLoadFactor); + m_Index.max_load_factor(IndexMaxLoadFactor); + + if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) + { + const uint64_t LegacyOverrideSize = 16 * 1024 * 1024; + + // This is pretty ad hoc but in order to avoid too many individual files + // it makes sense to have a different strategy for legacy values + m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, LegacyOverrideSize); + } + m_Gc.AddGcReferencer(*this); +} + +ZenCacheDiskLayer::CacheBucket::~CacheBucket() +{ + m_Gc.RemoveGcReferencer(*this); +} + +bool +ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) +{ + using namespace std::literals; + + ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate"); + ZEN_ASSERT(m_IsFlushing.load()); + + // We want to take the lock here since we register as a GC referencer a construction + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + + ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir); + + m_BlocksBasePath = BucketDir / "blocks"; + m_BucketDir = BucketDir; + + CreateDirectories(m_BucketDir); + + std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); + + bool IsNew = false; + + BucketManifestSerializer ManifestReader; + + if (ManifestReader.Open(ManifestPath)) + { + m_BucketId = ManifestReader.GetBucketId(); + if (m_BucketId == Oid::Zero) + { + return false; + } + + uint32_t Version = 0; + if (ManifestReader.IsCurrentVersion(/* out */ Version) == false) + { + ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", + BucketDir, + Version, + BucketManifestSerializer::CurrentDiskBucketVersion); + IsNew = true; + } + } + else if (AllowCreate) + { + m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath); + IsNew = true; + } + else + { + return false; + } + + InitializeIndexFromDisk(IndexLock, IsNew); + + auto _ = MakeGuard([&]() { + // We are now initialized, allow flushing when we exit + m_IsFlushing.store(false); + }); + + if (IsNew) + { + return true; + } + + ManifestReader.ParseManifest(IndexLock, *this, ManifestPath, m_Index, m_AccessTimes, m_Payloads); + + return true; +} + +void +ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc) +{ + ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot"); + + const uint64_t LogCount = m_SlogFile.GetLogCount(); + if (m_LogFlushPosition == LogCount) + { + return; + } + + ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); + const uint64_t EntryCount = m_Index.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", + m_BucketDir, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + namespace fs = std::filesystem; + + fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + + try + { + const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry); + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); + if (Error) + { + throw std::system_error(Error, fmt::format("get disk space in '{}' FAILED", m_BucketDir)); + } + + bool EnoughSpace = Space.Free >= IndexSize + 1024 * 512; + if (!EnoughSpace) + { + uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); + EnoughSpace = (Space.Free + ReclaimedSpace) >= IndexSize + 1024 * 512; + } + if (!EnoughSpace) + { + throw std::runtime_error( + fmt::format("not enough free disk space in '{}' to save index of size {}", m_BucketDir, NiceBytes(IndexSize))); + } + + TemporaryFile ObjectIndexFile; + std::error_code Ec; + ObjectIndexFile.CreateTemporary(m_BucketDir, Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir)); + } + + { + // This is in a separate scope just to ensure IndexWriter goes out + // of scope before the file is flushed/closed, in order to ensure + // all data is written to the file + BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024); + + CacheBucketIndexHeader Header = {.EntryCount = EntryCount, + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; + + Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); + IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0); + + uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader); + + for (auto& Entry : m_Index) + { + DiskIndexEntry IndexEntry; + IndexEntry.Key = Entry.first; + IndexEntry.Location = m_Payloads[Entry.second].Location; + IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset); + + IndexWriteOffset += sizeof(DiskIndexEntry); + } + + IndexWriter.Flush(); + } + + ObjectIndexFile.Flush(); + ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); + if (Ec) + { + std::filesystem::path TempFilePath = ObjectIndexFile.GetPath(); + ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message()); + + if (std::filesystem::is_regular_file(TempFilePath)) + { + if (!std::filesystem::remove(TempFilePath, Ec) || Ec) + { + ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempFilePath, Ec.message()); + } + } + } + else + { + // We must only update the log flush position once the snapshot write succeeds + m_LogFlushPosition = LogCount; + } + } + catch (std::exception& Err) + { + ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); + } +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion) +{ + ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile"); + + if (!std::filesystem::is_regular_file(IndexPath)) + { + return 0; + } + + auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); }); + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t FileSize = ObjectIndexFile.FileSize(); + if (FileSize < sizeof(CacheBucketIndexHeader)) + { + return 0; + } + + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + + if (!Header.IsValid()) + { + return 0; + } + + if (Header.Version != CacheBucketIndexHeader::Version2) + { + return 0; + } + + const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + if (Header.EntryCount > ExpectedEntryCount) + { + return 0; + } + + InvalidGuard.Dismiss(); + + size_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + m_Configuration.PayloadAlignment = Header.PayloadAlignment; + + m_Payloads.reserve(Header.EntryCount); + m_Index.reserve(Header.EntryCount); + + BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024); + + uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader); + uint64_t RemainingEntryCount = Header.EntryCount; + + std::string InvalidEntryReason; + while (RemainingEntryCount--) + { + const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset); + CurrentReadOffset += sizeof(DiskIndexEntry); + + if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + + const PayloadIndex EntryIndex = PayloadIndex(EntryCount); + m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location}); + m_Index.insert_or_assign(Entry->Key, EntryIndex); + + EntryCount++; + } + + ZEN_ASSERT(EntryCount == m_Payloads.size()); + + m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount())); + + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(EntryCount); + } + + OutVersion = CacheBucketIndexHeader::Version2; + return Header.LogPosition; +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) +{ + ZEN_TRACE_CPU("Z$::Bucket::ReadLog"); + + if (!std::filesystem::is_regular_file(LogPath)) + { + return 0; + } + + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (!CasLog.Initialize()) + { + return 0; + } + + const uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + + LogEntryCount = EntryCount - SkipEntryCount; + uint64_t InvalidEntryCount = 0; + + CasLog.Replay( + [&](const DiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Location.Flags & DiskLocation::kTombStone) + { + // Note: this leaves m_Payloads and other arrays with 'holes' in them + m_Index.erase(Record.Key); + return; + } + + if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); + m_Payloads.emplace_back(BucketPayload{.Location = Record.Location}); + m_Index.insert_or_assign(Record.Key, EntryIndex); + }, + SkipEntryCount); + + m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); + + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(m_Payloads.size()); + } + + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); + } + + return LogEntryCount; +}; + +void +ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockScope& IndexLock, const bool IsNew) +{ + ZEN_TRACE_CPU("Z$::Bucket::Initialize"); + + m_StandaloneSize = 0; + + m_Index.clear(); + m_Payloads.clear(); + m_AccessTimes.clear(); + m_MetaDatas.clear(); + m_FreeMetaDatas.clear(); + m_MemCachedPayloads.clear(); + m_FreeMemCachedPayloads.clear(); + m_FirstReferenceIndex.clear(); + m_ReferenceHashes.clear(); + m_NextReferenceHashesIndexes.clear(); + m_ReferenceCount = 0; + + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + + if (IsNew) + { + fs::remove(LogPath); + fs::remove(IndexPath); + fs::remove_all(m_BlocksBasePath); + } + + CreateDirectories(m_BucketDir); + + m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); + + if (std::filesystem::is_regular_file(IndexPath)) + { + uint32_t IndexVersion = 0; + m_LogFlushPosition = ReadIndexFile(IndexLock, IndexPath, IndexVersion); + if (IndexVersion == 0) + { + ZEN_WARN("removing invalid index file at '{}'", IndexPath); + std::filesystem::remove(IndexPath); + } + } + + uint64_t LogEntryCount = 0; + if (std::filesystem::is_regular_file(LogPath)) + { + if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath)) + { + LogEntryCount = ReadLog(IndexLock, LogPath, m_LogFlushPosition); + } + else if (fs::is_regular_file(LogPath)) + { + ZEN_WARN("removing invalid log at '{}'", LogPath); + std::filesystem::remove(LogPath); + } + } + + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); + + BlockStore::BlockIndexSet KnownBlocks; + for (const auto& Entry : m_Index) + { + size_t EntryIndex = Entry.second; + const BucketPayload& Payload = m_Payloads[EntryIndex]; + const DiskLocation& Location = Payload.Location; + + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); + } + else + { + const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); + KnownBlocks.Add(BlockLocation.BlockIndex); + } + } + m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + + if (IsNew || LogEntryCount > 0) + { + WriteIndexSnapshot(IndexLock); + } +} + +void +ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const +{ + char HexString[sizeof(HashKey.Hash) * 2]; + ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); + + Path.Append(m_BucketDir); + Path.AppendSeparator(); + Path.Append(L"blob"); + Path.AppendSeparator(); + Path.AppendAsciiRange(HexString, HexString + 3); + Path.AppendSeparator(); + Path.AppendAsciiRange(HexString + 3, HexString + 5); + Path.AppendSeparator(); + Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); +} + +IoBuffer +ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const +{ + ZEN_TRACE_CPU("Z$::Bucket::GetInlineCacheValue"); + + BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); + + IoBuffer Value = m_BlockStore.TryGetChunk(Location); + if (Value) + { + Value.SetContentType(Loc.GetContentType()); + } + + return Value; +} + +IoBuffer +ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const +{ + ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue"); + + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); + + RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); + + if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) + { + Data.SetContentType(ContentType); + + return Data; + } + + return {}; +} + +bool +ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) +{ + ZEN_TRACE_CPU("Z$::Bucket::Get"); + + metrics::RequestStats::Scope StatsScope(m_GetOps, 0); + + RwLock::SharedLockScope IndexLock(m_IndexLock); + auto It = m_Index.find(HashKey); + if (It == m_Index.end()) + { + m_DiskMissCount++; + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_MemoryMissCount++; + } + return false; + } + + PayloadIndex EntryIndex = It.value(); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + DiskLocation Location = m_Payloads[EntryIndex].Location; + + bool FillRawHashAndRawSize = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0); + + const BucketPayload* Payload = &m_Payloads[EntryIndex]; + if (Payload->MetaData) + { + const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; + OutValue.RawHash = MetaData.RawHash; + OutValue.RawSize = MetaData.RawSize; + FillRawHashAndRawSize = false; + } + + if (Payload->MemCached) + { + OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload; + Payload = nullptr; + IndexLock.ReleaseNow(); + m_MemoryHitCount++; + } + else + { + Payload = nullptr; + IndexLock.ReleaseNow(); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_MemoryMissCount++; + } + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey); + } + else + { + OutValue.Value = GetInlineCacheValue(Location); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + size_t ValueSize = OutValue.Value.GetSize(); + if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) + { + ZEN_TRACE_CPU("Z$::Bucket::Get::MemCache"); + OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); + RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); + if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end()) + { + BucketPayload& WritePayload = m_Payloads[UpdateIt->second]; + // Only update if it has not already been updated by other thread + if (!WritePayload.MemCached) + { + SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); + } + } + } + } + } + } + + if (FillRawHashAndRawSize) + { + ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData"); + if (Location.IsFlagSet(DiskLocation::kCompressed)) + { + if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) + { + OutValue = ZenCacheValue{}; + m_DiskMissCount++; + return false; + } + } + else + { + OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); + OutValue.RawSize = OutValue.Value.GetSize(); + } + RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); + if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) + { + BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; + + // Only set if no other path has already updated the meta data + if (!WritePayload.MetaData) + { + SetMetaData(UpdateIndexLock, WritePayload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash}); + } + } + } + if (OutValue.Value) + { + m_DiskHitCount++; + StatsScope.SetBytes(OutValue.Value.GetSize()); + return true; + } + else + { + m_DiskMissCount++; + return false; + } +} + +void +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +{ + ZEN_TRACE_CPU("Z$::Bucket::Put"); + + metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + + if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) + { + PutStandaloneCacheValue(HashKey, Value, References); + } + else + { + PutInlineCacheValue(HashKey, Value, References); + } + + m_DiskWriteCount++; +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) +{ + ZEN_TRACE_CPU("Z$::Bucket::MemCacheTrim"); + + uint64_t Trimmed = 0; + GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); + + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); + if (MemCachedCount == 0) + { + return 0; + } + + uint32_t WriteIndex = 0; + for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) + { + MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; + if (!Data.Payload) + { + continue; + } + PayloadIndex Index = Data.OwnerIndex; + ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); + GcClock::Tick AccessTime = m_AccessTimes[Index]; + if (AccessTime < ExpireTicks) + { + size_t PayloadSize = Data.Payload.GetSize(); + RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); + Data = {}; + m_Payloads[Index].MemCached = {}; + Trimmed += PayloadSize; + continue; + } + if (ReadIndex > WriteIndex) + { + m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index}; + m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex); + } + WriteIndex++; + } + m_MemCachedPayloads.resize(WriteIndex); + m_MemCachedPayloads.shrink_to_fit(); + zen::Reset(m_FreeMemCachedPayloads); + return Trimmed; +} + +void +ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots) +{ + ZEN_TRACE_CPU("Z$::Bucket::GetUsageByAccess"); + + size_t SlotCount = InOutUsageSlots.capacity(); + RwLock::SharedLockScope _(m_IndexLock); + uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); + if (MemCachedCount == 0) + { + return; + } + for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) + { + MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; + if (!Data.Payload) + { + continue; + } + PayloadIndex Index = Data.OwnerIndex; + ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); + GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); + GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0); + size_t Slot = Age < MaxAge ? gsl::narrow<size_t>((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1); + ZEN_ASSERT_SLOW(Slot < SlotCount); + if (Slot >= InOutUsageSlots.size()) + { + InOutUsageSlots.resize(Slot + 1, 0); + } + InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize()); + } +} + +bool +ZenCacheDiskLayer::CacheBucket::Drop() +{ + ZEN_TRACE_CPU("Z$::Bucket::Drop"); + + RwLock::ExclusiveLockScope _(m_IndexLock); + + std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks; + ShardLocks.reserve(256); + for (RwLock& Lock : m_ShardedLocks) + { + ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock)); + } + m_BlockStore.Close(); + m_SlogFile.Close(); + + bool Deleted = MoveAndDeleteDirectory(m_BucketDir); + + m_Index.clear(); + m_Payloads.clear(); + m_AccessTimes.clear(); + m_MetaDatas.clear(); + m_FreeMetaDatas.clear(); + m_MemCachedPayloads.clear(); + m_FreeMemCachedPayloads.clear(); + m_FirstReferenceIndex.clear(); + m_ReferenceHashes.clear(); + m_NextReferenceHashesIndexes.clear(); + m_ReferenceCount = 0; + m_StandaloneSize.store(0); + m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load()); + m_MemCachedSize.store(0); + + return Deleted; +} + +void +ZenCacheDiskLayer::CacheBucket::Flush() +{ + ZEN_TRACE_CPU("Z$::Bucket::Flush"); + bool Expected = false; + if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) + { + return; + } + auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); + + ZEN_INFO("Flushing bucket {}", m_BucketDir); + + try + { + m_BlockStore.Flush(/*ForceNewBlock*/ false); + m_SlogFile.Flush(); + + SaveSnapshot(); + } + catch (std::exception& Ex) + { + ZEN_WARN("Failed to flush bucket in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); + } +} + +void +ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc) +{ + ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot"); + try + { + bool UseLegacyScheme = false; + + IoBuffer Buffer; + BucketManifestSerializer ManifestWriter; + + if (UseLegacyScheme) + { + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + std::vector<BucketMetaData> MetaDatas; + IndexMap Index; + + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + WriteIndexSnapshot(IndexLock); + // Note: this copy could be eliminated on shutdown to + // reduce memory usage and execution time + Index = m_Index; + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + MetaDatas = m_MetaDatas; + } + + Buffer = ManifestWriter.MakeManifest(m_BucketId, + std::move(Index), + std::move(AccessTimes), + std::move(Payloads), + std::move(MetaDatas)); + const uint64_t RequiredSpace = Buffer.GetSize() + 1024 * 512; + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); + if (Error) + { + ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); + return; + } + bool EnoughSpace = Space.Free >= RequiredSpace; + if (!EnoughSpace) + { + uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); + EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; + } + if (!EnoughSpace) + { + ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", + m_BucketDir, + NiceBytes(Buffer.GetSize())); + return; + } + } + else + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + WriteIndexSnapshot(IndexLock); + const uint64_t EntryCount = m_Index.size(); + Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); + uint64_t SidecarSize = ManifestWriter.GetSidecarSize(); + + const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512; + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); + if (Error) + { + ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); + return; + } + bool EnoughSpace = Space.Free >= RequiredSpace; + if (!EnoughSpace) + { + uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); + EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; + } + if (!EnoughSpace) + { + ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", + m_BucketDir, + NiceBytes(Buffer.GetSize())); + return; + } + + ManifestWriter.WriteSidecarFile(IndexLock, + GetMetaPath(m_BucketDir, m_BucketName), + m_LogFlushPosition, + m_Index, + m_AccessTimes, + m_Payloads, + m_MetaDatas); + } + + std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); + WriteFile(ManifestPath, Buffer); + } + catch (std::exception& Err) + { + ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); + } +} + +IoHash +HashBuffer(const CompositeBuffer& Buffer) +{ + IoHashStream Hasher; + + for (const SharedBuffer& Segment : Buffer.GetSegments()) + { + Hasher.Append(Segment.GetView()); + } + + return Hasher.GetHash(); +} + +bool +ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) +{ + ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType); + + if (ContentType == ZenContentType::kCbObject) + { + CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); + + if (Error == CbValidateError::None) + { + return true; + } + + ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error)); + + return false; + } + else if (ContentType == ZenContentType::kCompressedBinary) + { + IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer); + + IoHash HeaderRawHash; + uint64_t RawSize = 0; + if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize)) + { + ZEN_SCOPED_ERROR("compressed buffer header validation failed"); + + return false; + } + + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize); + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + IoHash DecompressedHash = HashBuffer(Decompressed); + + if (HeaderRawHash != DecompressedHash) + { + ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash); + + return false; + } + } + else + { + // No way to verify this kind of content (what is it exactly?) + + static int Once = [&] { + ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType)); + return 42; + }(); + } + + return true; +}; + +void +ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) +{ + ZEN_TRACE_CPU("Z$::Bucket::Scrub"); + + ZEN_INFO("scrubbing '{}'", m_BucketDir); + + Stopwatch Timer; + uint64_t ChunkCount = 0; + uint64_t VerifiedChunkBytes = 0; + + auto LogStats = MakeGuard([&] { + const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs()); + + ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", + m_BucketName, + NiceBytes(VerifiedChunkBytes), + NiceTimeSpanMs(DurationMs), + ChunkCount, + NiceRate(VerifiedChunkBytes, DurationMs)); + }); + + std::vector<IoHash> BadKeys; + auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); }; + + try + { + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<IoHash> ChunkIndexToChunkHash; + + RwLock::SharedLockScope _(m_IndexLock); + + const size_t BlockChunkInitialCount = m_Index.size() / 4; + ChunkLocations.reserve(BlockChunkInitialCount); + ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); + + // Do a pass over the index and verify any standalone file values straight away + // all other storage classes are gathered and verified in bulk in order to enable + // more efficient I/O scheduling + + for (auto& Kv : m_Index) + { + const IoHash& HashKey = Kv.first; + const BucketPayload& Payload = m_Payloads[Kv.second]; + const DiskLocation& Loc = Payload.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + Ctx.ThrowIfDeadlineExpired(); + + ++ChunkCount; + VerifiedChunkBytes += Loc.Size(); + + if (Loc.GetContentType() == ZenContentType::kBinary) + { + // Blob cache value, not much we can do about data integrity checking + // here since there's no hash available + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); + + RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); + + std::error_code Ec; + uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); + if (Ec) + { + ReportBadKey(HashKey); + } + if (size != Loc.Size()) + { + ReportBadKey(HashKey); + } + continue; + } + else + { + // Structured cache value + IoBuffer Buffer = GetStandaloneCacheValue(Loc.GetContentType(), HashKey); + if (!Buffer) + { + ReportBadKey(HashKey); + continue; + } + if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer)) + { + ReportBadKey(HashKey); + continue; + } + } + } + else + { + ChunkLocations.emplace_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment)); + ChunkIndexToChunkHash.push_back(HashKey); + continue; + } + } + + const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void { + ++ChunkCount; + VerifiedChunkBytes += Size; + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + if (!Data) + { + // ChunkLocation out of range of stored blocks + ReportBadKey(Hash); + return; + } + if (!Size) + { + ReportBadKey(Hash); + return; + } + IoBuffer Buffer(IoBuffer::Wrap, Data, Size); + if (!Buffer) + { + ReportBadKey(Hash); + return; + } + const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; + ZenContentType ContentType = Payload.Location.GetContentType(); + Buffer.SetContentType(ContentType); + if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) + { + ReportBadKey(Hash); + return; + } + }; + + const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void { + Ctx.ThrowIfDeadlineExpired(); + + ++ChunkCount; + VerifiedChunkBytes += Size; + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + if (!Buffer) + { + ReportBadKey(Hash); + return; + } + const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; + ZenContentType ContentType = Payload.Location.GetContentType(); + Buffer.SetContentType(ContentType); + if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) + { + ReportBadKey(Hash); + return; + } + }; + + m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + } + catch (ScrubDeadlineExpiredException&) + { + ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + } + + Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes); + + if (!BadKeys.empty()) + { + ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir); + + if (Ctx.RunRecovery()) + { + // Deal with bad chunks by removing them from our lookup map + + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(BadKeys.size()); + + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + for (const IoHash& BadKey : BadKeys) + { + // Log a tombstone and delete the in-memory index for the bad entry + const auto It = m_Index.find(BadKey); + BucketPayload& Payload = m_Payloads[It->second]; + if (m_Configuration.EnableReferenceCaching) + { + RemoveReferences(IndexLock, m_FirstReferenceIndex[It->second]); + } + DiskLocation Location = Payload.Location; + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed); + } + + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); + + Location.Flags |= DiskLocation::kTombStone; + LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); + m_Index.erase(BadKey); + } + } + for (const DiskIndexEntry& Entry : LogEntries) + { + if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + ExtendablePathBuilder<256> Path; + BuildPath(Path, Entry.Key); + fs::path FilePath = Path.ToPath(); + RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); + if (fs::is_regular_file(FilePath)) + { + ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); + std::error_code Ec; + fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... + } + } + } + m_SlogFile.Append(LogEntries); + + // Clean up m_AccessTimes and m_Payloads vectors + { + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + std::vector<BucketMetaData> MetaDatas; + std::vector<MemCacheData> MemCachedPayloads; + std::vector<ReferenceIndex> FirstReferenceIndex; + IndexMap Index; + + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); + } + } + } + } + + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + if (!BadKeys.empty()) + { + Ctx.ReportBadCidChunks(BadKeys); + } +} + +void +ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("Z$::Bucket::GatherReferences"); + +#define CALCULATE_BLOCKING_TIME 0 + +#if CALCULATE_BLOCKING_TIME + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; +#endif // CALCULATE_BLOCKING_TIME + + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { +#if CALCULATE_BLOCKING_TIME + ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", + m_BucketDir, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs)); +#else + ZEN_DEBUG("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); +#endif // CALCULATE_BLOCKING_TIME + }); + + const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime(); + + const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); + + IndexMap Index; + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + std::vector<ReferenceIndex> FirstReferenceIndex; + { + RwLock::SharedLockScope __(m_IndexLock); +#if CALCULATE_BLOCKING_TIME + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); +#endif // CALCULATE_BLOCKING_TIME + if (m_Index.empty()) + { + return; + } + Index = m_Index; + AccessTimes = m_AccessTimes; + Payloads = m_Payloads; + FirstReferenceIndex = m_FirstReferenceIndex; + } + + std::vector<IoHash> ExpiredKeys; + ExpiredKeys.reserve(1024); + + std::vector<IoHash> Cids; + if (!GcCtx.SkipCid()) + { + Cids.reserve(1024); + } + + std::vector<std::pair<IoHash, size_t>> StructuredItemsWithUnknownAttachments; + + for (const auto& Entry : Index) + { + const IoHash& Key = Entry.first; + size_t PayloadIndex = Entry.second; + GcClock::Tick AccessTime = AccessTimes[PayloadIndex]; + if (AccessTime < ExpireTicks) + { + ExpiredKeys.push_back(Key); + continue; + } + + if (GcCtx.SkipCid()) + { + continue; + } + + BucketPayload& Payload = Payloads[PayloadIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + if (m_Configuration.EnableReferenceCaching) + { + if (FirstReferenceIndex.empty() || FirstReferenceIndex[PayloadIndex] == ReferenceIndex::Unknown()) + { + StructuredItemsWithUnknownAttachments.push_back(Entry); + continue; + } + + bool ReferencesAreKnown = false; + { + RwLock::SharedLockScope IndexLock(m_IndexLock); +#if CALCULATE_BLOCKING_TIME + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); +#endif // CALCULATE_BLOCKING_TIME + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + ReferencesAreKnown = GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids); + } + } + if (ReferencesAreKnown) + { + if (Cids.size() >= 1024) + { + GcCtx.AddRetainedCids(Cids); + Cids.clear(); + } + continue; + } + } + StructuredItemsWithUnknownAttachments.push_back(Entry); + } + + for (const auto& Entry : StructuredItemsWithUnknownAttachments) + { + const IoHash& Key = Entry.first; + BucketPayload& Payload = Payloads[Entry.second]; + const DiskLocation& Loc = Payload.Location; + { + IoBuffer Buffer; + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Key); !Buffer) + { + continue; + } + } + else + { + RwLock::SharedLockScope IndexLock(m_IndexLock); +#if CALCULATE_BLOCKING_TIME + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); +#endif // CALCULATE_BLOCKING_TIME + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + const BucketPayload& CachedPayload = m_Payloads[It->second]; + if (CachedPayload.MemCached) + { + Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload; + ZEN_ASSERT_SLOW(Buffer); + } + else + { + DiskLocation Location = m_Payloads[It->second].Location; + IndexLock.ReleaseNow(); + Buffer = GetInlineCacheValue(Location); + // Don't memcache items when doing GC + } + } + if (!Buffer) + { + continue; + } + } + + ZEN_ASSERT(Buffer); + ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); + CbObjectView Obj(Buffer.GetData()); + size_t CurrentCidCount = Cids.size(); + Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); + if (m_Configuration.EnableReferenceCaching) + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); +#if CALCULATE_BLOCKING_TIME + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); +#endif // CALCULATE_BLOCKING_TIME + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + if (m_FirstReferenceIndex[It->second] == ReferenceIndex::Unknown()) + { + SetReferences(IndexLock, + m_FirstReferenceIndex[It->second], + std::span<IoHash>(Cids.data() + CurrentCidCount, Cids.size() - CurrentCidCount)); + } + else + { + Cids.resize(CurrentCidCount); + (void)GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids); + } + } + } + if (Cids.size() >= 1024) + { + GcCtx.AddRetainedCids(Cids); + Cids.clear(); + } + } + } + + GcCtx.AddRetainedCids(Cids); + GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); +} + +void +ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage"); + + ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); + + Stopwatch TotalTimer; + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = 0; + uint64_t DeletedSize = 0; + GcStorageSize OldTotalSize = StorageSize(); + + std::unordered_set<IoHash> DeletedChunks; + uint64_t MovedCount = 0; + + const auto _ = MakeGuard([&] { + ZEN_DEBUG( + "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " + "{} " + "of {} " + "entries ({}/{}).", + m_BucketDir, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedChunks.size(), + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize.DiskSize), + NiceBytes(OldTotalSize.MemorySize)); + + bool Expected = false; + if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) + { + return; + } + auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); + + try + { + SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); + } + catch (std::exception& Ex) + { + ZEN_WARN("Failed to write index and manifest after GC in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); + } + }); + + auto __ = MakeGuard([&]() { + if (!DeletedChunks.empty()) + { + // Clean up m_AccessTimes and m_Payloads vectors + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + std::vector<BucketMetaData> MetaDatas; + std::vector<MemCacheData> MemCachedPayloads; + std::vector<ReferenceIndex> FirstReferenceIndex; + IndexMap Index; + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); + } + GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end())); + } + }); + + std::span<const IoHash> ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); + if (ExpiredCacheKeySpan.empty()) + { + return; + } + + m_SlogFile.Flush(); + + std::unordered_set<IoHash, IoHash::Hasher> ExpiredCacheKeys(ExpiredCacheKeySpan.begin(), ExpiredCacheKeySpan.end()); + + std::vector<DiskIndexEntry> ExpiredStandaloneEntries; + IndexMap IndexSnapshot; + std::vector<BucketPayload> PayloadsSnapshot; + BlockStore::ReclaimSnapshotState BlockStoreState; + { + bool Expected = false; + if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) + { + ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is currently flushing", m_BucketDir); + return; + } + auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); + + { + ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::State"); + RwLock::SharedLockScope IndexLock(m_IndexLock); + + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); + + for (const IoHash& Key : ExpiredCacheKeys) + { + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + const BucketPayload& Payload = m_Payloads[It->second]; + if (Payload.Location.Flags & DiskLocation::kStandaloneFile) + { + DiskIndexEntry Entry = {.Key = Key, .Location = Payload.Location}; + Entry.Location.Flags |= DiskLocation::kTombStone; + ExpiredStandaloneEntries.push_back(Entry); + } + } + } + + PayloadsSnapshot = m_Payloads; + IndexSnapshot = m_Index; + + if (GcCtx.IsDeletionMode()) + { + IndexLock.ReleaseNow(); + RwLock::ExclusiveLockScope __(m_IndexLock); + for (const auto& Entry : ExpiredStandaloneEntries) + { + if (m_Index.erase(Entry.Key) == 1) + { + m_StandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + DeletedChunks.insert(Entry.Key); + } + } + m_SlogFile.Append(ExpiredStandaloneEntries); + } + } + } + + if (GcCtx.IsDeletionMode()) + { + ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::Delete"); + + ExtendablePathBuilder<256> Path; + + for (const auto& Entry : ExpiredStandaloneEntries) + { + const IoHash& Key = Entry.Key; + + Path.Reset(); + BuildPath(Path, Key); + fs::path FilePath = Path.ToPath(); + + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_Index.contains(Key)) + { + // Someone added it back, let the file on disk be + ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); + continue; + } + IndexLock.ReleaseNow(); + + RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); + if (fs::is_regular_file(FilePath)) + { + ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); + std::error_code Ec; + fs::remove(FilePath, Ec); + if (Ec) + { + ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); + continue; + } + } + } + DeletedSize += Entry.Location.Size(); + } + } + + TotalChunkCount = IndexSnapshot.size(); + + std::vector<BlockStoreLocation> ChunkLocations; + BlockStore::ChunkIndexArray KeepChunkIndexes; + std::vector<IoHash> ChunkIndexToChunkHash; + ChunkLocations.reserve(TotalChunkCount); + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); + { + TotalChunkCount = 0; + for (const auto& Entry : IndexSnapshot) + { + size_t EntryIndex = Entry.second; + const DiskLocation& DiskLocation = PayloadsSnapshot[EntryIndex].Location; + + if (DiskLocation.Flags & DiskLocation::kStandaloneFile) + { + continue; + } + const IoHash& Key = Entry.first; + BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment); + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash.push_back(Key); + if (ExpiredCacheKeys.contains(Key)) + { + continue; + } + KeepChunkIndexes.push_back(ChunkIndex); + } + } + TotalChunkCount = ChunkLocations.size(); + size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); + + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + if (!PerformDelete) + { + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, true); + GcStorageSize CurrentTotalSize = StorageSize(); + ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} ({}/{})", + m_BucketDir, + DeleteCount, + TotalChunkCount, + NiceBytes(CurrentTotalSize.DiskSize), + NiceBytes(CurrentTotalSize.MemorySize)); + return; + } + + m_BlockStore.ReclaimSpace( + BlockStoreState, + ChunkLocations, + KeepChunkIndexes, + m_Configuration.PayloadAlignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + for (const auto& Entry : MovedChunks) + { + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + size_t EntryIndex = m_Index[ChunkHash]; + BucketPayload& Payload = m_Payloads[EntryIndex]; + if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != m_Payloads[EntryIndex].Location) + { + // Entry has been updated while GC was running, ignore the move + continue; + } + Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); + LogEntries.push_back({.Key = ChunkHash, .Location = Payload.Location}); + } + for (const size_t ChunkIndex : RemovedChunks) + { + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + size_t EntryIndex = m_Index[ChunkHash]; + BucketPayload& Payload = m_Payloads[EntryIndex]; + if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != Payload.Location) + { + // Entry has been updated while GC was running, ignore the delete + continue; + } + const DiskLocation& OldDiskLocation = Payload.Location; + LogEntries.push_back({.Key = ChunkHash, + .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment), + m_Configuration.PayloadAlignment, + OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); + + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); + + m_Index.erase(ChunkHash); + DeletedChunks.insert(ChunkHash); + } + } + + m_SlogFile.Append(LogEntries); + m_SlogFile.Flush(); + }, + [&]() { return GcCtx.ClaimGCReserve(); }); +} + +ZenCacheDiskLayer::BucketStats +ZenCacheDiskLayer::CacheBucket::Stats() +{ + GcStorageSize Size = StorageSize(); + return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize, + .MemorySize = Size.MemorySize, + .DiskHitCount = m_DiskHitCount, + .DiskMissCount = m_DiskMissCount, + .DiskWriteCount = m_DiskWriteCount, + .MemoryHitCount = m_MemoryHitCount, + .MemoryMissCount = m_MemoryMissCount, + .MemoryWriteCount = m_MemoryWriteCount, + .PutOps = m_PutOps.Snapshot(), + .GetOps = m_GetOps.Snapshot()}; +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::EntryCount() const +{ + RwLock::SharedLockScope _(m_IndexLock); + return static_cast<uint64_t>(m_Index.size()); +} + +CacheValueDetails::ValueDetails +ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const IoHash& Key, PayloadIndex Index) const +{ + std::vector<IoHash> Attachments; + const BucketPayload& Payload = m_Payloads[Index]; + if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) + { + IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) + ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key) + : GetInlineCacheValue(Payload.Location); + CbObjectView Obj(Value.GetData()); + Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); + } + BucketMetaData MetaData = GetMetaData(IndexLock, Payload); + return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), + .RawSize = MetaData.RawSize, + .RawHash = MetaData.RawHash, + .LastAccess = m_AccessTimes[Index], + .Attachments = std::move(Attachments), + .ContentType = Payload.Location.GetContentType()}; +} + +CacheValueDetails::BucketDetails +ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const +{ + CacheValueDetails::BucketDetails Details; + RwLock::SharedLockScope _(m_IndexLock); + if (ValueFilter.empty()) + { + Details.Values.reserve(m_Index.size()); + for (const auto& It : m_Index) + { + Details.Values.insert_or_assign(It.first, GetValueDetails(IndexLock, It.first, It.second)); + } + } + else + { + IoHash Key = IoHash::FromHexString(ValueFilter); + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + Details.Values.insert_or_assign(It->first, GetValueDetails(IndexLock, It->first, It->second)); + } + } + return Details; +} + +void +ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents( + std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const +{ + RwLock::SharedLockScope IndexLock(m_IndexLock); + for (const auto& It : m_Index) + { + CacheValueDetails::ValueDetails Vd = GetValueDetails(IndexLock, It.first, It.second); + + Fn(It.first, Vd); + } +} + +void +ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("Z$::CollectGarbage"); + + std::vector<CacheBucket*> Buckets; + { + RwLock::SharedLockScope _(m_Lock); + Buckets.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Buckets.push_back(Kv.second.get()); + } + } + for (CacheBucket* Bucket : Buckets) + { + Bucket->CollectGarbage(GcCtx); + } + if (!m_IsMemCacheTrimming) + { + MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); + } +} + +void +ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +{ + ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue"); + + uint64_t NewFileSize = Value.Value.Size(); + + TemporaryFile DataFile; + + std::error_code Ec; + DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); + } + + bool CleanUpTempFile = true; + auto __ = MakeGuard([&] { + if (CleanUpTempFile) + { + std::error_code Ec; + std::filesystem::remove(DataFile.GetPath(), Ec); + if (Ec) + { + ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", + DataFile.GetPath(), + m_BucketDir, + Ec.message()); + } + } + }); + + DataFile.WriteAll(Value.Value, Ec); + if (Ec) + { + throw std::system_error(Ec, + fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", + NiceBytes(NewFileSize), + DataFile.GetPath().string(), + m_BucketDir)); + } + + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); + std::filesystem::path FsPath{DataFilePath.ToPath()}; + + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); + + // We do a speculative remove of the file instead of probing with a exists call and check the error code instead + std::filesystem::remove(FsPath, Ec); + if (Ec) + { + if (Ec.value() != ENOENT) + { + ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); + Sleep(100); + Ec.clear(); + std::filesystem::remove(FsPath, Ec); + if (Ec && Ec.value() != ENOENT) + { + throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); + } + } + } + + // Assume parent directory exists + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + if (Ec) + { + CreateDirectories(FsPath.parent_path()); + + // Try again after we or someone else created the directory + Ec.clear(); + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + + // Retry if we still fail to handle contention to file system + uint32_t RetriesLeft = 3; + while (Ec && RetriesLeft > 0) + { + ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retries left: {}.", + FsPath, + DataFile.GetPath(), + m_BucketDir, + Ec.message(), + RetriesLeft); + Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms + Ec.clear(); + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + RetriesLeft--; + } + if (Ec) + { + throw std::system_error( + Ec, + fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir)); + } + } + + // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file + // will be disabled as the file handle has already been closed + CleanUpTempFile = false; + + uint8_t EntryFlags = DiskLocation::kStandaloneFile; + + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } + + DiskLocation Loc(NewFileSize, EntryFlags); + + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + ValueLock.ReleaseNow(); + if (m_UpdatedKeys) + { + m_UpdatedKeys->insert(HashKey); + } + + PayloadIndex EntryIndex = {}; + if (auto It = m_Index.find(HashKey); It == m_Index.end()) + { + // Previously unknown object + EntryIndex = PayloadIndex(m_Payloads.size()); + m_Payloads.emplace_back(BucketPayload{.Location = Loc}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.emplace_back(ReferenceIndex{}); + SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); + } + m_Index.insert_or_assign(HashKey, EntryIndex); + } + else + { + EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); + BucketPayload& Payload = m_Payloads[EntryIndex]; + uint64_t OldSize = Payload.Location.Size(); + Payload = BucketPayload{.Location = Loc}; + if (m_Configuration.EnableReferenceCaching) + { + SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); + } + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + RemoveMemCachedData(IndexLock, Payload); + m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed); + } + if (Value.RawSize != 0 || Value.RawHash != IoHash::Zero) + { + SetMetaData(IndexLock, m_Payloads[EntryIndex], {.RawSize = Value.RawSize, .RawHash = Value.RawHash}); + } + else + { + RemoveMetaData(IndexLock, m_Payloads[EntryIndex]); + } + + m_SlogFile.Append({.Key = HashKey, .Location = Loc}); + m_StandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); +} + +void +ZenCacheDiskLayer::CacheBucket::SetMetaData(RwLock::ExclusiveLockScope&, + BucketPayload& Payload, + const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData) +{ + if (Payload.MetaData) + { + m_MetaDatas[Payload.MetaData] = MetaData; + } + else + { + if (m_FreeMetaDatas.empty()) + { + Payload.MetaData = MetaDataIndex(m_MetaDatas.size()); + m_MetaDatas.emplace_back(MetaData); + } + else + { + Payload.MetaData = m_FreeMetaDatas.back(); + m_FreeMetaDatas.pop_back(); + m_MetaDatas[Payload.MetaData] = MetaData; + } + } +} + +void +ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload) +{ + if (Payload.MetaData) + { + m_FreeMetaDatas.push_back(Payload.MetaData); + Payload.MetaData = {}; + } +} + +void +ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData) +{ + BucketPayload& Payload = m_Payloads[PayloadIndex]; + uint64_t PayloadSize = MemCachedData.GetSize(); + ZEN_ASSERT(PayloadSize != 0); + if (m_FreeMemCachedPayloads.empty()) + { + if (m_MemCachedPayloads.size() != std::numeric_limits<uint32_t>::max()) + { + Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(m_MemCachedPayloads.size())); + m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}); + AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); + m_MemoryWriteCount++; + } + } + else + { + Payload.MemCached = m_FreeMemCachedPayloads.back(); + m_FreeMemCachedPayloads.pop_back(); + m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}; + AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); + m_MemoryWriteCount++; + } +} + +size_t +ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload) +{ + if (Payload.MemCached) + { + size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize(); + RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); + m_MemCachedPayloads[Payload.MemCached] = {}; + m_FreeMemCachedPayloads.push_back(Payload.MemCached); + Payload.MemCached = {}; + return PayloadSize; + } + return 0; +} + +ZenCacheDiskLayer::CacheBucket::BucketMetaData +ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const +{ + if (Payload.MetaData) + { + return m_MetaDatas[Payload.MetaData]; + } + return {}; +} + +void +ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +{ + ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); + + uint8_t EntryFlags = 0; + + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } + + m_BlockStore.WriteChunk(Value.Value.Data(), + Value.Value.Size(), + m_Configuration.PayloadAlignment, + [&](const BlockStoreLocation& BlockStoreLocation) { + DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); + m_SlogFile.Append({.Key = HashKey, .Location = Location}); + + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + if (m_UpdatedKeys) + { + m_UpdatedKeys->insert(HashKey); + } + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + PayloadIndex EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); + BucketPayload& Payload = m_Payloads[EntryIndex]; + + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); + + Payload = (BucketPayload{.Location = Location}); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + + if (m_Configuration.EnableReferenceCaching) + { + SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); + } + } + else + { + PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); + m_Payloads.emplace_back(BucketPayload{.Location = Location}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.emplace_back(ReferenceIndex{}); + SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); + } + m_Index.insert_or_assign(HashKey, EntryIndex); + } + }); +} + +std::string +ZenCacheDiskLayer::CacheBucket::GetGcName(GcCtx&) +{ + return fmt::format("cachebucket:'{}'", m_BucketDir.string()); +} + +class DiskBucketStoreCompactor : public GcStoreCompactor +{ +public: + DiskBucketStoreCompactor(ZenCacheDiskLayer::CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys) + : m_Bucket(Bucket) + , m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys)) + { + m_ExpiredStandaloneKeys.shrink_to_fit(); + } + + virtual ~DiskBucketStoreCompactor() {} + + virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override + { + ZEN_TRACE_CPU("Z$::Bucket::CompactStore"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + Reset(m_ExpiredStandaloneKeys); + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': RemovedDisk: {} in {}", + m_Bucket.m_BucketDir, + NiceBytes(Stats.RemovedDisk), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + if (!m_ExpiredStandaloneKeys.empty()) + { + // Compact standalone items + size_t Skipped = 0; + ExtendablePathBuilder<256> Path; + for (const std::pair<IoHash, uint64_t>& ExpiredKey : m_ExpiredStandaloneKeys) + { + if (Ctx.IsCancelledFlag.load()) + { + return; + } + Path.Reset(); + m_Bucket.BuildPath(Path, ExpiredKey.first); + fs::path FilePath = Path.ToPath(); + + RwLock::SharedLockScope IndexLock(m_Bucket.m_IndexLock); + if (m_Bucket.m_Index.contains(ExpiredKey.first)) + { + // Someone added it back, let the file on disk be + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back", + m_Bucket.m_BucketDir, + Path.ToUtf8()); + continue; + } + + if (Ctx.Settings.IsDeleteMode) + { + RwLock::ExclusiveLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first)); + IndexLock.ReleaseNow(); + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); + + std::error_code Ec; + if (!fs::remove(FilePath, Ec)) + { + continue; + } + if (Ec) + { + ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'", + m_Bucket.m_BucketDir, + Path.ToUtf8(), + Ec.message()); + continue; + } + Stats.RemovedDisk += ExpiredKey.second; + } + else + { + RwLock::SharedLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first)); + IndexLock.ReleaseNow(); + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); + + std::error_code Ec; + bool Existed = std::filesystem::is_regular_file(FilePath, Ec); + if (Ec) + { + ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'", + m_Bucket.m_BucketDir, + FilePath, + Ec.message()); + continue; + } + if (!Existed) + { + continue; + } + Skipped++; + } + } + if (Skipped > 0) + { + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipped deleting of {} eligible files", m_Bucket.m_BucketDir, Skipped); + } + } + + if (Ctx.Settings.CollectSmallObjects) + { + m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys = std::make_unique<HashSet>(); }); + auto __ = MakeGuard([&]() { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys.reset(); }); }); + + size_t InlineEntryCount = 0; + BlockStore::BlockUsageMap BlockUsage; + { + RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); + for (const auto& Entry : m_Bucket.m_Index) + { + ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index]; + const DiskLocation& Loc = Payload.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + continue; + } + InlineEntryCount++; + uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex(); + uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment); + if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end()) + { + It->second.EntryCount++; + It->second.DiskUsage += ChunkSize; + } + else + { + BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); + } + } + } + + { + BlockStoreCompactState BlockCompactState; + std::vector<IoHash> BlockCompactStateKeys; + BlockCompactStateKeys.reserve(InlineEntryCount); + + BlockStore::BlockEntryCountMap BlocksToCompact = + m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent); + BlockCompactState.IncludeBlocks(BlocksToCompact); + + if (BlocksToCompact.size() > 0) + { + { + RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); + for (const auto& Entry : m_Bucket.m_Index) + { + ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index]; + const DiskLocation& Loc = Payload.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + continue; + } + if (!BlockCompactState.AddKeepLocation(Loc.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment))) + { + continue; + } + BlockCompactStateKeys.push_back(Entry.first); + } + } + + if (Ctx.Settings.IsDeleteMode) + { + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks", + m_Bucket.m_BucketDir, + BlocksToCompact.size()); + } + + m_Bucket.m_BlockStore.CompactBlocks( + BlockCompactState, + m_Bucket.m_Configuration.PayloadAlignment, + [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { + std::vector<DiskIndexEntry> MovedEntries; + MovedEntries.reserve(MovedArray.size()); + RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock); + for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) + { + size_t ChunkIndex = Moved.first; + const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; + + if (m_Bucket.m_UpdatedKeys->contains(Key)) + { + continue; + } + + if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end()) + { + ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second]; + const BlockStoreLocation& NewLocation = Moved.second; + Payload.Location = DiskLocation(NewLocation, + m_Bucket.m_Configuration.PayloadAlignment, + Payload.Location.GetFlags()); + MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); + } + } + m_Bucket.m_SlogFile.Append(MovedEntries); + Stats.RemovedDisk += FreedDiskSpace; + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + return true; + }, + ClaimDiskReserveCallback); + } + else + { + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks", + m_Bucket.m_BucketDir, + BlocksToCompact.size()); + } + } + } + } + } + } + +private: + ZenCacheDiskLayer::CacheBucket& m_Bucket; + std::vector<std::pair<IoHash, uint64_t>> m_ExpiredStandaloneKeys; +}; + +GcStoreCompactor* +ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) +{ + ZEN_TRACE_CPU("Z$::Bucket::RemoveExpiredData"); + + size_t TotalEntries = 0; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachebucket [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}", + m_BucketDir, + Stats.CheckedCount, + Stats.FoundCount, + Stats.DeletedCount, + NiceBytes(Stats.FreedMemory), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count(); + + std::vector<DiskIndexEntry> ExpiredEntries; + std::vector<std::pair<IoHash, uint64_t>> ExpiredStandaloneKeys; + uint64_t RemovedStandaloneSize = 0; + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + if (m_Index.empty()) + { + return nullptr; + } + + TotalEntries = m_Index.size(); + + // Find out expired keys + for (const auto& Entry : m_Index) + { + const IoHash& Key = Entry.first; + PayloadIndex EntryIndex = Entry.second; + GcClock::Tick AccessTime = m_AccessTimes[EntryIndex]; + if (AccessTime >= ExpireTicks) + { + continue; + } + + const BucketPayload& Payload = m_Payloads[EntryIndex]; + DiskIndexEntry ExpiredEntry = {.Key = Key, .Location = Payload.Location}; + ExpiredEntry.Location.Flags |= DiskLocation::kTombStone; + + if (Payload.Location.Flags & DiskLocation::kStandaloneFile) + { + ExpiredStandaloneKeys.push_back({Key, Payload.Location.Size()}); + RemovedStandaloneSize += Payload.Location.Size(); + ExpiredEntries.push_back(ExpiredEntry); + } + else if (Ctx.Settings.CollectSmallObjects) + { + ExpiredEntries.push_back(ExpiredEntry); + } + } + + Stats.CheckedCount += TotalEntries; + Stats.FoundCount += ExpiredEntries.size(); + + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + + if (Ctx.Settings.IsDeleteMode) + { + for (const DiskIndexEntry& Entry : ExpiredEntries) + { + auto It = m_Index.find(Entry.Key); + ZEN_ASSERT(It != m_Index.end()); + BucketPayload& Payload = m_Payloads[It->second]; + RemoveMetaData(IndexLock, Payload); + Stats.FreedMemory += RemoveMemCachedData(IndexLock, Payload); + m_Index.erase(It); + Stats.DeletedCount++; + } + m_SlogFile.Append(ExpiredEntries); + m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed); + } + } + + if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty()) + { + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + std::vector<BucketMetaData> MetaDatas; + std::vector<MemCacheData> MemCachedPayloads; + std::vector<ReferenceIndex> FirstReferenceIndex; + IndexMap Index; + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); + } + } + + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + + return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); +} + +class DiskBucketReferenceChecker : public GcReferenceChecker +{ + using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; + using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; + using CacheBucket = ZenCacheDiskLayer::CacheBucket; + using ReferenceIndex = ZenCacheDiskLayer::CacheBucket::ReferenceIndex; + +public: + DiskBucketReferenceChecker(CacheBucket& Owner) : m_CacheBucket(Owner) {} + + virtual ~DiskBucketReferenceChecker() + { + try + { + m_IndexLock.reset(); + if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it + m_CacheBucket.ClearReferenceCache(); + } + } + catch (std::exception& Ex) + { + ZEN_ERROR("~DiskBucketReferenceChecker threw exception: '{}'", Ex.what()); + } + } + + virtual void PreCache(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Z$::Bucket::PreCache"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}", + m_CacheBucket.m_BucketDir, + m_CacheBucket.m_ReferenceCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + std::vector<IoHash> UpdateKeys; + std::vector<size_t> ReferenceCounts; + std::vector<IoHash> References; + + auto GetAttachments = [&References, &ReferenceCounts](const void* CbObjectData) { + size_t CurrentReferenceCount = References.size(); + CbObjectView Obj(CbObjectData); + Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); + ReferenceCounts.push_back(References.size() - CurrentReferenceCount); + }; + + // Refresh cache + { + // If reference caching is enabled the references will be updated at modification for us so we don't need to track modifications + if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys = std::make_unique<HashSet>(); }); + } + + std::vector<IoHash> StandaloneKeys; + { + std::vector<IoHash> InlineKeys; + std::unordered_map<uint32_t, std::size_t> BlockIndexToEntriesPerBlockIndex; + struct InlineEntry + { + uint32_t InlineKeyIndex; + uint32_t Offset; + uint32_t Size; + }; + std::vector<std::vector<InlineEntry>> EntriesPerBlock; + size_t UpdateCount = 0; + { + RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); + for (const auto& Entry : m_CacheBucket.m_Index) + { + if (Ctx.IsCancelledFlag.load()) + { + IndexLock.ReleaseNow(); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + return; + } + + PayloadIndex EntryIndex = Entry.second; + const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + if (m_CacheBucket.m_Configuration.EnableReferenceCaching && + m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown()) + { + continue; + } + UpdateCount++; + const IoHash& Key = Entry.first; + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + StandaloneKeys.push_back(Key); + continue; + } + + BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment); + InlineEntry UpdateEntry = {.InlineKeyIndex = gsl::narrow<uint32_t>(InlineKeys.size()), + .Offset = gsl::narrow<uint32_t>(ChunkLocation.Offset), + .Size = gsl::narrow<uint32_t>(ChunkLocation.Size)}; + InlineKeys.push_back(Key); + + if (auto It = BlockIndexToEntriesPerBlockIndex.find(ChunkLocation.BlockIndex); + It != BlockIndexToEntriesPerBlockIndex.end()) + { + EntriesPerBlock[It->second].emplace_back(UpdateEntry); + } + else + { + BlockIndexToEntriesPerBlockIndex.insert_or_assign(ChunkLocation.BlockIndex, EntriesPerBlock.size()); + EntriesPerBlock.emplace_back(std::vector<InlineEntry>{UpdateEntry}); + } + } + } + + UpdateKeys.reserve(UpdateCount); + + for (auto It : BlockIndexToEntriesPerBlockIndex) + { + uint32_t BlockIndex = It.first; + + Ref<BlockStoreFile> BlockFile = m_CacheBucket.m_BlockStore.GetBlockFile(BlockIndex); + if (BlockFile) + { + size_t EntriesPerBlockIndex = It.second; + std::vector<InlineEntry>& InlineEntries = EntriesPerBlock[EntriesPerBlockIndex]; + + std::sort(InlineEntries.begin(), InlineEntries.end(), [&](const InlineEntry& Lhs, const InlineEntry& Rhs) -> bool { + return Lhs.Offset < Rhs.Offset; + }); + + uint64_t BlockFileSize = BlockFile->FileSize(); + BasicFileBuffer BlockBuffer(BlockFile->GetBasicFile(), 32768); + for (const InlineEntry& InlineEntry : InlineEntries) + { + if ((InlineEntry.Offset + InlineEntry.Size) > BlockFileSize) + { + ReferenceCounts.push_back(0); + } + else + { + MemoryView ChunkView = BlockBuffer.MakeView(InlineEntry.Size, InlineEntry.Offset); + if (ChunkView.GetSize() == InlineEntry.Size) + { + GetAttachments(ChunkView.GetData()); + } + else + { + std::vector<uint8_t> Buffer(InlineEntry.Size); + BlockBuffer.Read(Buffer.data(), InlineEntry.Size, InlineEntry.Offset); + GetAttachments(Buffer.data()); + } + } + const IoHash& Key = InlineKeys[InlineEntry.InlineKeyIndex]; + UpdateKeys.push_back(Key); + } + } + } + } + { + for (const IoHash& Key : StandaloneKeys) + { + if (Ctx.IsCancelledFlag.load()) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + return; + } + + IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(ZenContentType::kCbObject, Key); + if (!Buffer) + { + continue; + } + + GetAttachments(Buffer.GetData()); + UpdateKeys.push_back(Key); + } + } + } + + { + size_t ReferenceOffset = 0; + RwLock::ExclusiveLockScope IndexLock(m_CacheBucket.m_IndexLock); + + if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + ZEN_ASSERT(m_CacheBucket.m_FirstReferenceIndex.empty()); + ZEN_ASSERT(m_CacheBucket.m_ReferenceHashes.empty()); + ZEN_ASSERT(m_CacheBucket.m_NextReferenceHashesIndexes.empty()); + ZEN_ASSERT(m_CacheBucket.m_ReferenceCount == 0); + ZEN_ASSERT(m_CacheBucket.m_UpdatedKeys); + + // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when + // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted. + m_CacheBucket.m_FirstReferenceIndex.resize(m_CacheBucket.m_Payloads.size(), ReferenceIndex::Unknown()); + m_CacheBucket.m_ReferenceHashes.reserve(References.size()); + m_CacheBucket.m_NextReferenceHashesIndexes.reserve(References.size()); + } + else + { + ZEN_ASSERT(!m_CacheBucket.m_UpdatedKeys); + } + + for (size_t Index = 0; Index < UpdateKeys.size(); Index++) + { + const IoHash& Key = UpdateKeys[Index]; + size_t ReferenceCount = ReferenceCounts[Index]; + if (auto It = m_CacheBucket.m_Index.find(Key); It != m_CacheBucket.m_Index.end()) + { + PayloadIndex EntryIndex = It->second; + if (m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + if (m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown()) + { + // The reference data is valid and what we have is old/redundant + continue; + } + } + else if (m_CacheBucket.m_UpdatedKeys->contains(Key)) + { + // Our pre-cache data is invalid + continue; + } + + m_CacheBucket.SetReferences(IndexLock, + m_CacheBucket.m_FirstReferenceIndex[EntryIndex], + std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount}); + } + ReferenceOffset += ReferenceCount; + } + + if (m_CacheBucket.m_Configuration.EnableReferenceCaching && !UpdateKeys.empty()) + { + m_CacheBucket.CompactReferences(IndexLock); + } + } + } + + virtual void LockState(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Z$::Bucket::LockState"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}", + m_CacheBucket.m_BucketDir, + m_CacheBucket.m_ReferenceCount + m_UncachedReferences.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock); + if (Ctx.IsCancelledFlag.load()) + { + m_UncachedReferences.clear(); + m_IndexLock.reset(); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + return; + } + + if (m_CacheBucket.m_UpdatedKeys) + { + const HashSet& UpdatedKeys(*m_CacheBucket.m_UpdatedKeys); + for (const IoHash& Key : UpdatedKeys) + { + if (Ctx.IsCancelledFlag.load()) + { + m_UncachedReferences.clear(); + m_IndexLock.reset(); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + return; + } + + auto It = m_CacheBucket.m_Index.find(Key); + if (It == m_CacheBucket.m_Index.end()) + { + continue; + } + + PayloadIndex EntryIndex = It->second; + const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + + IoBuffer Buffer; + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + Buffer = m_CacheBucket.GetStandaloneCacheValue(Loc.GetContentType(), Key); + } + else + { + Buffer = m_CacheBucket.GetInlineCacheValue(Loc); + } + + if (Buffer) + { + ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); + CbObjectView Obj(Buffer.GetData()); + Obj.IterateAttachments([this](CbFieldView Field) { m_UncachedReferences.insert(Field.AsAttachment()); }); + } + } + } + } + + virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override + { + ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet"); + + ZEN_ASSERT(m_IndexLock); + size_t InitialCount = IoCids.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachebucket [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", + m_CacheBucket.m_BucketDir, + InitialCount - IoCids.size(), + InitialCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes) + { + if (IoCids.erase(ReferenceHash) == 1) + { + if (IoCids.empty()) + { + return; + } + } + } + + for (const IoHash& ReferenceHash : m_UncachedReferences) + { + if (IoCids.erase(ReferenceHash) == 1) + { + if (IoCids.empty()) + { + return; + } + } + } + } + CacheBucket& m_CacheBucket; + std::unique_ptr<RwLock::SharedLockScope> m_IndexLock; + HashSet m_UncachedReferences; +}; + +std::vector<GcReferenceChecker*> +ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) +{ + ZEN_TRACE_CPU("Z$::Bucket::CreateReferenceCheckers"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': completed in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + { + RwLock::SharedLockScope __(m_IndexLock); + if (m_Index.empty()) + { + return {}; + } + } + + return {new DiskBucketReferenceChecker(*this)}; +} + +void +ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&) +{ + ZEN_TRACE_CPU("Z$::Bucket::CompactReferences"); + + std::vector<ReferenceIndex> FirstReferenceIndex; + std::vector<IoHash> NewReferenceHashes; + std::vector<ReferenceIndex> NewNextReferenceHashesIndexes; + + FirstReferenceIndex.reserve(m_ReferenceCount); + NewReferenceHashes.reserve(m_ReferenceCount); + NewNextReferenceHashesIndexes.reserve(m_ReferenceCount); + + for (const auto& It : m_Index) + { + ReferenceIndex SourceIndex = m_FirstReferenceIndex[It.second]; + if (SourceIndex == ReferenceIndex::Unknown()) + { + FirstReferenceIndex.push_back(ReferenceIndex{}); + continue; + } + if (SourceIndex == ReferenceIndex::None()) + { + FirstReferenceIndex.push_back(ReferenceIndex::None()); + continue; + } + FirstReferenceIndex.push_back(ReferenceIndex{NewNextReferenceHashesIndexes.size()}); + NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]); + NewNextReferenceHashesIndexes.push_back(ReferenceIndex::None()); + + SourceIndex = m_NextReferenceHashesIndexes[SourceIndex]; + while (SourceIndex != ReferenceIndex::None()) + { + NewNextReferenceHashesIndexes.back() = ReferenceIndex{NewReferenceHashes.size()}; + NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]); + NewNextReferenceHashesIndexes.push_back(ReferenceIndex::None()); + SourceIndex = m_NextReferenceHashesIndexes[SourceIndex]; + } + } + m_FirstReferenceIndex.swap(FirstReferenceIndex); + m_ReferenceHashes.swap(NewReferenceHashes); + m_ReferenceHashes.shrink_to_fit(); + m_NextReferenceHashesIndexes.swap(NewNextReferenceHashesIndexes); + m_NextReferenceHashesIndexes.shrink_to_fit(); + m_ReferenceCount = m_ReferenceHashes.size(); +} + +ZenCacheDiskLayer::CacheBucket::ReferenceIndex +ZenCacheDiskLayer::CacheBucket::AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key) +{ + ReferenceIndex NewIndex = ReferenceIndex{m_ReferenceHashes.size()}; + m_ReferenceHashes.push_back(Key); + m_NextReferenceHashesIndexes.emplace_back(ReferenceIndex::None()); + m_ReferenceCount++; + return NewIndex; +} + +void +ZenCacheDiskLayer::CacheBucket::SetReferences(RwLock::ExclusiveLockScope& Lock, + ReferenceIndex& FirstReferenceIndex, + std::span<IoHash> References) +{ + auto ReferenceIt = References.begin(); + + if (FirstReferenceIndex == ReferenceIndex::Unknown()) + { + FirstReferenceIndex = ReferenceIndex::None(); + } + + ReferenceIndex CurrentIndex = FirstReferenceIndex; + if (CurrentIndex != ReferenceIndex::None()) + { + if (ReferenceIt != References.end()) + { + ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero); + if (CurrentIndex == ReferenceIndex::None()) + { + CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt); + FirstReferenceIndex = CurrentIndex; + } + else + { + m_ReferenceHashes[CurrentIndex] = *ReferenceIt; + } + ReferenceIt++; + } + } + else + { + if (ReferenceIt != References.end()) + { + ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero); + CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt); + ReferenceIt++; + } + FirstReferenceIndex = CurrentIndex; + } + + while (ReferenceIt != References.end()) + { + ZEN_ASSERT(CurrentIndex != ReferenceIndex::None()); + ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero); + ReferenceIndex NextReferenceIndex = m_NextReferenceHashesIndexes[CurrentIndex]; + if (NextReferenceIndex == ReferenceIndex::None()) + { + NextReferenceIndex = AllocateReferenceEntry(Lock, *ReferenceIt); + m_NextReferenceHashesIndexes[CurrentIndex] = NextReferenceIndex; + } + else + { + m_ReferenceHashes[NextReferenceIndex] = *ReferenceIt; + } + CurrentIndex = NextReferenceIndex; + ReferenceIt++; + } + + while (CurrentIndex != ReferenceIndex::None()) + { + ReferenceIndex NextIndex = m_NextReferenceHashesIndexes[CurrentIndex]; + if (NextIndex != ReferenceIndex::None()) + { + m_ReferenceHashes[CurrentIndex] = IoHash::Zero; + ZEN_ASSERT(m_ReferenceCount > 0); + m_ReferenceCount--; + m_NextReferenceHashesIndexes[CurrentIndex] = ReferenceIndex::None(); + } + CurrentIndex = NextIndex; + } +} + +void +ZenCacheDiskLayer::CacheBucket::RemoveReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex) +{ + if (FirstReferenceIndex == ReferenceIndex::Unknown()) + { + return; + } + ReferenceIndex CurrentIndex = FirstReferenceIndex; + while (CurrentIndex == ReferenceIndex::None()) + { + m_ReferenceHashes[CurrentIndex] = IoHash::Zero; + ZEN_ASSERT(m_ReferenceCount > 0); + m_ReferenceCount--; + CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex]; + } + FirstReferenceIndex = {}; +} + +bool +ZenCacheDiskLayer::CacheBucket::LockedGetReferences(ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const +{ + if (FirstReferenceIndex == ReferenceIndex::Unknown()) + { + return false; + } + + ReferenceIndex CurrentIndex = FirstReferenceIndex; + while (CurrentIndex != ReferenceIndex::None()) + { + ZEN_ASSERT_SLOW(m_ReferenceHashes[CurrentIndex] != IoHash::Zero); + OutReferences.push_back(m_ReferenceHashes[CurrentIndex]); + CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex]; + } + return true; +} + +void +ZenCacheDiskLayer::CacheBucket::ClearReferenceCache() +{ + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + Reset(m_FirstReferenceIndex); + Reset(m_ReferenceHashes); + Reset(m_NextReferenceHashesIndexes); + m_ReferenceCount = 0; +} + +void +ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, + std::vector<BucketPayload>& Payloads, + std::vector<AccessTime>& AccessTimes, + std::vector<BucketMetaData>& MetaDatas, + std::vector<MemCacheData>& MemCachedPayloads, + std::vector<ReferenceIndex>& FirstReferenceIndex, + IndexMap& Index, + RwLock::ExclusiveLockScope& IndexLock) +{ + ZEN_TRACE_CPU("Z$::Bucket::CompactState"); + + size_t EntryCount = m_Index.size(); + Payloads.reserve(EntryCount); + AccessTimes.reserve(EntryCount); + if (m_Configuration.EnableReferenceCaching) + { + FirstReferenceIndex.reserve(EntryCount); + } + Index.reserve(EntryCount); + Index.min_load_factor(IndexMinLoadFactor); + Index.max_load_factor(IndexMaxLoadFactor); + for (auto It : m_Index) + { + PayloadIndex EntryIndex = PayloadIndex(Payloads.size()); + Payloads.push_back(m_Payloads[It.second]); + BucketPayload& Payload = Payloads.back(); + AccessTimes.push_back(m_AccessTimes[It.second]); + if (Payload.MetaData) + { + MetaDatas.push_back(m_MetaDatas[Payload.MetaData]); + Payload.MetaData = MetaDataIndex(MetaDatas.size() - 1); + } + if (Payload.MemCached) + { + MemCachedPayloads.emplace_back( + MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex}); + Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(MemCachedPayloads.size() - 1)); + } + if (m_Configuration.EnableReferenceCaching) + { + FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]); + } + Index.insert({It.first, EntryIndex}); + } + m_Index.swap(Index); + m_Payloads.swap(Payloads); + m_AccessTimes.swap(AccessTimes); + m_MetaDatas.swap(MetaDatas); + Reset(m_FreeMetaDatas); + m_MemCachedPayloads.swap(MemCachedPayloads); + Reset(m_FreeMemCachedPayloads); + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.swap(FirstReferenceIndex); + CompactReferences(IndexLock); + } +} + +#if ZEN_WITH_TESTS +void +ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time) +{ + GcClock::Tick TimeTick = Time.time_since_epoch().count(); + RwLock::SharedLockScope IndexLock(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_AccessTimes[EntryIndex] = TimeTick; + } +} +#endif // ZEN_WITH_TESTS + +////////////////////////////////////////////////////////////////////////// + +ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config) +: m_Gc(Gc) +, m_JobQueue(JobQueue) +, m_RootDir(RootDir) +, m_Configuration(Config) +{ +} + +ZenCacheDiskLayer::~ZenCacheDiskLayer() +{ + try + { + { + RwLock::ExclusiveLockScope _(m_Lock); + for (auto& It : m_Buckets) + { + m_DroppedBuckets.emplace_back(std::move(It.second)); + } + m_Buckets.clear(); + } + // We destroy the buckets without holding a lock since destructor calls GcManager::RemoveGcReferencer which takes an exclusive lock. + // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock + m_DroppedBuckets.clear(); + } + catch (std::exception& Ex) + { + ZEN_ERROR("~ZenCacheDiskLayer() failed. Reason: '{}'", Ex.what()); + } +} + +ZenCacheDiskLayer::CacheBucket* +ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) +{ + ZEN_TRACE_CPU("Z$::GetOrCreateBucket"); + const auto BucketName = std::string(InBucket); + + { + RwLock::SharedLockScope SharedLock(m_Lock); + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + { + return It->second.get(); + } + } + + // We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock. + // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock + std::unique_ptr<CacheBucket> Bucket( + std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); + + RwLock::ExclusiveLockScope Lock(m_Lock); + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + { + return It->second.get(); + } + + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName; + try + { + if (!Bucket->OpenOrCreate(BucketPath)) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + return nullptr; + } + } + catch (const std::exception& Err) + { + ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + throw; + } + + CacheBucket* Result = Bucket.get(); + m_Buckets.emplace(BucketName, std::move(Bucket)); + + return Result; +} + +bool +ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) +{ + ZEN_TRACE_CPU("Z$::Get"); + + if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) + { + if (Bucket->Get(HashKey, OutValue)) + { + TryMemCacheTrim(); + return true; + } + } + return false; +} + +void +ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +{ + ZEN_TRACE_CPU("Z$::Put"); + + if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) + { + Bucket->Put(HashKey, Value, References); + TryMemCacheTrim(); + } +} + +void +ZenCacheDiskLayer::DiscoverBuckets() +{ + ZEN_TRACE_CPU("Z$::DiscoverBuckets"); + + DirectoryContent DirContent; + GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); + + // Initialize buckets + + std::vector<std::filesystem::path> BadBucketDirectories; + std::vector<std::filesystem::path> FoundBucketDirectories; + + RwLock::ExclusiveLockScope _(m_Lock); + + for (const std::filesystem::path& BucketPath : DirContent.Directories) + { + const std::string BucketName = PathToUtf8(BucketPath.stem()); + + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + { + continue; + } + + if (IsKnownBadBucketName(BucketName)) + { + BadBucketDirectories.push_back(BucketPath); + + continue; + } + + FoundBucketDirectories.push_back(BucketPath); + ZEN_INFO("Discovered bucket '{}'", BucketName); + } + + for (const std::filesystem::path& BadBucketPath : BadBucketDirectories) + { + bool IsOk = false; + + try + { + IsOk = DeleteDirectories(BadBucketPath); + } + catch (std::exception&) + { + } + + if (IsOk) + { + ZEN_INFO("found bad bucket at '{}', deleted contents", BadBucketPath); + } + else + { + ZEN_WARN("bad bucket delete failed for '{}'", BadBucketPath); + } + } + + RwLock SyncLock; + + WorkerThreadPool& Pool = GetLargeWorkerPool(); + Latch WorkLatch(1); + for (auto& BucketPath : FoundBucketDirectories) + { + WorkLatch.AddCount(1); + Pool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); + + const std::string BucketName = PathToUtf8(BucketPath.stem()); + try + { + std::unique_ptr<CacheBucket> NewBucket = + std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig); + + CacheBucket* Bucket = nullptr; + { + RwLock::ExclusiveLockScope __(SyncLock); + auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket)); + Bucket = InsertResult.first->second.get(); + } + ZEN_ASSERT(Bucket); + + if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false)) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + + { + RwLock::ExclusiveLockScope __(SyncLock); + m_Buckets.erase(BucketName); + } + } + } + catch (const std::exception& Err) + { + ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + return; + } + }); + } + WorkLatch.CountDown(); + WorkLatch.Wait(); +} + +bool +ZenCacheDiskLayer::DropBucket(std::string_view InBucket) +{ + ZEN_TRACE_CPU("Z$::DropBucket"); + + RwLock::ExclusiveLockScope _(m_Lock); + + auto It = m_Buckets.find(std::string(InBucket)); + + if (It != m_Buckets.end()) + { + CacheBucket& Bucket = *It->second; + m_DroppedBuckets.push_back(std::move(It->second)); + m_Buckets.erase(It); + + return Bucket.Drop(); + } + + // Make sure we remove the folder even if we don't know about the bucket + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= std::string(InBucket); + return MoveAndDeleteDirectory(BucketPath); +} + +bool +ZenCacheDiskLayer::Drop() +{ + ZEN_TRACE_CPU("Z$::Drop"); + + RwLock::ExclusiveLockScope _(m_Lock); + + std::vector<std::unique_ptr<CacheBucket>> Buckets; + Buckets.reserve(m_Buckets.size()); + while (!m_Buckets.empty()) + { + const auto& It = m_Buckets.begin(); + CacheBucket& Bucket = *It->second; + m_DroppedBuckets.push_back(std::move(It->second)); + m_Buckets.erase(It->first); + if (!Bucket.Drop()) + { + return false; + } + } + return MoveAndDeleteDirectory(m_RootDir); +} + +void +ZenCacheDiskLayer::Flush() +{ + ZEN_TRACE_CPU("Z$::Flush"); + + std::vector<CacheBucket*> Buckets; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (Buckets.empty()) + { + return; + } + ZEN_INFO("Flushed {} buckets at '{}' in {}", Buckets.size(), m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + { + RwLock::SharedLockScope __(m_Lock); + Buckets.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + CacheBucket* Bucket = Kv.second.get(); + Buckets.push_back(Bucket); + } + } + { + WorkerThreadPool& Pool = GetSmallWorkerPool(); + Latch WorkLatch(1); + for (auto& Bucket : Buckets) + { + WorkLatch.AddCount(1); + Pool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); + Bucket->Flush(); + }); + } + WorkLatch.CountDown(); + while (!WorkLatch.Wait(1000)) + { + ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir); + } + } +} + +void +ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) +{ + ZEN_TRACE_CPU("Z$::ScrubStorage"); + + RwLock::SharedLockScope _(m_Lock); + { + std::vector<std::future<void>> Results; + Results.reserve(m_Buckets.size()); + + for (auto& Kv : m_Buckets) + { +#if 1 + Results.push_back(Ctx.ThreadPool().EnqueueTask( + std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); +#else + CacheBucket& Bucket = *Kv.second; + Bucket.ScrubStorage(Ctx); +#endif + } + + for (auto& Result : Results) + { + Result.get(); + } + } +} + +void +ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("Z$::GatherReferences"); + + std::vector<CacheBucket*> Buckets; + { + RwLock::SharedLockScope _(m_Lock); + Buckets.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Buckets.push_back(Kv.second.get()); + } + } + for (CacheBucket* Bucket : Buckets) + { + Bucket->GatherReferences(GcCtx); + } +} + +GcStorageSize +ZenCacheDiskLayer::StorageSize() const +{ + GcStorageSize StorageSize{}; + + RwLock::SharedLockScope _(m_Lock); + for (auto& Kv : m_Buckets) + { + GcStorageSize BucketSize = Kv.second->StorageSize(); + StorageSize.DiskSize += BucketSize.DiskSize; + StorageSize.MemorySize += BucketSize.MemorySize; + } + + return StorageSize; +} + +ZenCacheDiskLayer::DiskStats +ZenCacheDiskLayer::Stats() const +{ + GcStorageSize Size = StorageSize(); + ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; + { + RwLock::SharedLockScope _(m_Lock); + Stats.BucketStats.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Stats.BucketStats.emplace_back(NamedBucketStats{.BucketName = Kv.first, .Stats = Kv.second->Stats()}); + } + } + return Stats; +} + +ZenCacheDiskLayer::Info +ZenCacheDiskLayer::GetInfo() const +{ + ZenCacheDiskLayer::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration}; + { + RwLock::SharedLockScope _(m_Lock); + Info.BucketNames.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Info.BucketNames.push_back(Kv.first); + Info.EntryCount += Kv.second->EntryCount(); + GcStorageSize BucketSize = Kv.second->StorageSize(); + Info.StorageSize.DiskSize += BucketSize.DiskSize; + Info.StorageSize.MemorySize += BucketSize.MemorySize; + } + } + return Info; +} + +std::optional<ZenCacheDiskLayer::BucketInfo> +ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const +{ + RwLock::SharedLockScope _(m_Lock); + + if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) + { + return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()}; + } + return {}; +} + +void +ZenCacheDiskLayer::EnumerateBucketContents(std::string_view Bucket, + std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const +{ + RwLock::SharedLockScope _(m_Lock); + + if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) + { + It->second->EnumerateBucketContents(Fn); + } +} + +CacheValueDetails::NamespaceDetails +ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const +{ + CacheValueDetails::NamespaceDetails Details; + { + RwLock::SharedLockScope IndexLock(m_Lock); + if (BucketFilter.empty()) + { + Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); + for (auto& Kv : m_Buckets) + { + Details.Buckets[Kv.first] = Kv.second->GetValueDetails(IndexLock, ValueFilter); + } + } + else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) + { + Details.Buckets[It->first] = It->second->GetValueDetails(IndexLock, ValueFilter); + } + } + return Details; +} + +void +ZenCacheDiskLayer::MemCacheTrim() +{ + ZEN_TRACE_CPU("Z$::MemCacheTrim"); + + ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0); + ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0); + ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0); + + bool Expected = false; + if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true)) + { + return; + } + + try + { + m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) { + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); + + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); + uint64_t TrimmedSize = 0; + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", + NiceBytes(TrimmedSize), + NiceBytes(m_TotalMemCachedSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + + const GcClock::Tick NowTick = GcClock::TickCount(); + const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_NextAllowedTrimTick.store(NextTrimTick); + m_IsMemCacheTrimming.store(false); + }); + + const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); + + static const size_t UsageSlotCount = 2048; + std::vector<uint64_t> UsageSlots; + UsageSlots.reserve(UsageSlotCount); + + std::vector<CacheBucket*> Buckets; + { + RwLock::SharedLockScope __(m_Lock); + Buckets.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Buckets.push_back(Kv.second.get()); + } + } + + const GcClock::TimePoint Now = GcClock::Now(); + { + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess"); + for (CacheBucket* Bucket : Buckets) + { + Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots); + } + } + + uint64_t TotalSize = 0; + for (size_t Index = 0; Index < UsageSlots.size(); ++Index) + { + TotalSize += UsageSlots[Index]; + if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) + { + GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount); + TrimmedSize = MemCacheTrim(Buckets, ExpireTime); + break; + } + } + }); + } + catch (std::exception& Ex) + { + ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what()); + m_IsMemCacheTrimming.store(false); + } +} + +uint64_t +ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime) +{ + if (m_Configuration.MemCacheTargetFootprintBytes == 0) + { + return 0; + } + uint64_t TrimmedSize = 0; + for (CacheBucket* Bucket : Buckets) + { + TrimmedSize += Bucket->MemCacheTrim(ExpireTime); + } + const GcClock::TimePoint Now = GcClock::Now(); + const GcClock::Tick NowTick = Now.time_since_epoch().count(); + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); + GcClock::Tick LastTrimTick = m_NextAllowedTrimTick; + const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); + return TrimmedSize; +} + +#if ZEN_WITH_TESTS +void +ZenCacheDiskLayer::SetAccessTime(std::string_view InBucket, const IoHash& HashKey, GcClock::TimePoint Time) +{ + const auto BucketName = std::string(InBucket); + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + auto It = m_Buckets.find(BucketName); + + if (It != m_Buckets.end()) + { + Bucket = It->second.get(); + } + } + + if (Bucket == nullptr) + { + return; + } + Bucket->SetAccessTime(HashKey, Time); +} +#endif // ZEN_WITH_TESTS + +} // namespace zen |