// Copyright Epic Games, Inc. All Rights Reserved. #include "cachedisklayer.h" #include #include #include #include #include #include #include #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////// namespace zen { namespace { #pragma pack(push) #pragma pack(1) struct CacheBucketIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; static constexpr uint32_t Version2 = 2; static constexpr uint32_t CurrentVersion = Version2; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t PayloadAlignment = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) { return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } bool IsValid() const { if (Magic != ExpectedMagic) { return false; } if (Checksum != ComputeChecksum(*this)) { return false; } if (PayloadAlignment == 0) { return false; } return true; } }; static_assert(sizeof(CacheBucketIndexHeader) == 32); struct BucketMetaHeader { static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta'; static constexpr uint32_t Version1 = 1; static constexpr uint32_t CurrentVersion = Version1; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t Padding = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const BucketMetaHeader& Header) { return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA); } bool IsValid() const { if (Magic != ExpectedMagic) { return false; } if (Checksum != ComputeChecksum(*this)) { return false; } if (Padding != 0) { return false; } return true; } }; static_assert(sizeof(BucketMetaHeader) == 32); #pragma pack(pop) ////////////////////////////////////////////////////////////////////////// template void Reset(T& V) { T Tmp; V.swap(Tmp); } const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; const char* MetaExtension = ".meta"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + IndexExtension); } std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + MetaExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + LogExtension); } std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { ZEN_UNUSED(BucketName); return BucketDir / "zen_manifest"; } bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } if (Entry.Location.Reserved != 0) { OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); return false; } if (Entry.Location.GetFlags() & ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); return false; } if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) { return true; } if (Entry.Location.Reserved != 0) { OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); return false; } uint64_t Size = Entry.Location.Size(); if (Size == 0) { OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); return false; } return true; } bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) { int DropIndex = 0; do { if (!std::filesystem::exists(Dir)) { return false; } std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; if (std::filesystem::exists(DroppedBucketPath)) { DropIndex++; continue; } std::error_code Ec; std::filesystem::rename(Dir, DroppedBucketPath, Ec); if (!Ec) { DeleteDirectories(DroppedBucketPath); return true; } // TODO: Do we need to bail at some point? zen::Sleep(100); } while (true); } } // namespace namespace fs = std::filesystem; using namespace std::literals; class BucketManifestSerializer { using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData; using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; public: // We use this to indicate if a on disk bucket needs wiping // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references // to block items. // See: https://github.com/EpicGames/zen/pull/299 static inline const uint32_t CurrentDiskBucketVersion = 1; bool Open(std::filesystem::path ManifestPath) { Manifest = LoadCompactBinaryObject(ManifestPath); return !!Manifest; } Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); } bool IsCurrentVersion(uint32_t& OutVersion) const { OutVersion = Manifest["Version"sv].AsUInt32(0); return OutVersion == CurrentDiskBucketVersion; } void ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path ManifestPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, std::vector& Payloads); Oid GenerateNewManifest(std::filesystem::path ManifestPath); IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount); uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); } void WriteSidecarFile(const std::filesystem::path& SidecarPath, uint64_t SnapshotLogPosition, ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, std::vector&& AccessTimes, const std::vector& Payloads, const std::vector& MetaDatas); bool ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path SidecarPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, std::vector& Payloads); IoBuffer MakeManifest(const Oid& BucketId, ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, std::vector&& AccessTimes, const std::vector& Payloads, const std::vector& MetaDatas); CbObject Manifest; private: CbObject LoadCompactBinaryObject(const fs::path& Path) { FileContents Result = ReadFile(Path); if (!Result.ErrorCode) { IoBuffer Buffer = Result.Flatten(); if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) { return zen::LoadCompactBinaryObject(Buffer); } } return CbObject(); } uint64_t m_ManifestEntryCount = 0; struct ManifestData { IoHash Key; // 20 AccessTime Timestamp; // 4 IoHash RawHash; // 20 uint32_t Padding_0; // 4 size_t RawSize; // 8 uint64_t Padding_1; // 8 }; static_assert(sizeof(ManifestData) == 64); }; void BucketManifestSerializer::ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path ManifestPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, std::vector& Payloads) { if (Manifest["UsingMetaFile"sv].AsBool()) { ReadSidecarFile(Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads); return; } ZEN_TRACE_CPU("Z$::ParseManifest"); Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); const uint64_t Count = Manifest["Count"sv].AsUInt64(0); std::vector KeysIndexes; KeysIndexes.reserve(Count); CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); for (CbFieldView& KeyView : KeyArray) { if (auto It = Index.find(KeyView.AsHash()); It != Index.end()) { KeysIndexes.push_back(It.value()); } else { KeysIndexes.push_back(PayloadIndex()); } } size_t KeyIndexOffset = 0; CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); for (CbFieldView& TimeStampView : TimeStampArray) { const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; if (KeyIndex) { AccessTimes[KeyIndex] = TimeStampView.AsInt64(); } } KeyIndexOffset = 0; CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); if (RawHashArray.Num() == RawSizeArray.Num()) { auto RawHashIt = RawHashArray.CreateViewIterator(); auto RawSizeIt = RawSizeArray.CreateViewIterator(); while (RawHashIt != CbFieldViewIterator()) { const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; if (KeyIndex) { uint64_t RawSize = RawSizeIt.AsUInt64(); IoHash RawHash = RawHashIt.AsHash(); if (RawSize != 0 || RawHash != IoHash::Zero) { BucketPayload& Payload = Payloads[KeyIndex]; Bucket.SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); } } RawHashIt++; RawSizeIt++; } } else { ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); } } Oid BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath) { const Oid BucketId = Oid::NewOid(); CbObjectWriter Writer; Writer << "BucketId"sv << BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; Manifest = Writer.Save(); WriteFile(ManifestPath, Manifest.GetBuffer().AsIoBuffer()); return BucketId; } IoBuffer BucketManifestSerializer::MakeManifest(const Oid& BucketId, ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, std::vector&& AccessTimes, const std::vector& Payloads, const std::vector& MetaDatas) { using namespace std::literals; ZEN_TRACE_CPU("Z$::MakeManifest"); size_t ItemCount = Index.size(); // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth // And we don't need to reallocate the underlying buffer in almost every case const size_t EstimatedSizePerItem = 54u; const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); CbObjectWriter Writer(ReserveSize); Writer << "BucketId"sv << BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; if (!Index.empty()) { Writer.AddInteger("Count"sv, gsl::narrow(Index.size())); Writer.BeginArray("Keys"sv); for (auto& Kv : Index) { const IoHash& Key = Kv.first; Writer.AddHash(Key); } Writer.EndArray(); Writer.BeginArray("Timestamps"sv); for (auto& Kv : Index) { GcClock::Tick AccessTime = AccessTimes[Kv.second]; Writer.AddInteger(AccessTime); } Writer.EndArray(); if (!MetaDatas.empty()) { Writer.BeginArray("RawHash"sv); for (auto& Kv : Index) { const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; if (Payload.MetaData) { Writer.AddHash(MetaDatas[Payload.MetaData].RawHash); } else { Writer.AddHash(IoHash::Zero); } } Writer.EndArray(); Writer.BeginArray("RawSize"sv); for (auto& Kv : Index) { const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; if (Payload.MetaData) { Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize); } else { Writer.AddInteger(0); } } Writer.EndArray(); } } Manifest = Writer.Save(); return Manifest.GetBuffer().AsIoBuffer(); } IoBuffer BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount) { m_ManifestEntryCount = EntryCount; CbObjectWriter Writer; Writer << "BucketId"sv << BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; Writer << "Count"sv << EntryCount; Writer << "UsingMetaFile"sv << true; Manifest = Writer.Save(); return Manifest.GetBuffer().AsIoBuffer(); } bool BucketManifestSerializer::ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path SidecarPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, std::vector& Payloads) { ZEN_ASSERT(AccessTimes.size() == Payloads.size()); std::error_code Ec; BasicFile SidecarFile; SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("failed to open sidecar file '{}'", SidecarPath)); } uint64_t FileSize = SidecarFile.FileSize(); auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); }); if (FileSize < sizeof(BucketMetaHeader)) { return false; } BasicFileBuffer Sidecar(SidecarFile, 128 * 1024); BucketMetaHeader Header; Sidecar.Read(&Header, sizeof Header, 0); if (!Header.IsValid()) { return false; } if (Header.Version != BucketMetaHeader::Version1) { return false; } const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData); if (Header.EntryCount > ExpectedEntryCount) { return false; } InvalidGuard.Dismiss(); uint64_t RemainingEntryCount = ExpectedEntryCount; uint64_t EntryCount = 0; uint64_t CurrentReadOffset = sizeof(Header); while (RemainingEntryCount--) { const ManifestData* Entry = Sidecar.MakeView(CurrentReadOffset); CurrentReadOffset += sizeof(ManifestData); if (auto It = Index.find(Entry->Key); It != Index.end()) { PayloadIndex PlIndex = It.value(); ZEN_ASSERT(size_t(PlIndex) <= Payloads.size()); ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex]; AccessTimes[PlIndex] = Entry->Timestamp; if (Entry->RawSize && Entry->RawHash != IoHash::Zero) { Bucket.SetMetaData(PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash}); } } EntryCount++; } ZEN_ASSERT(EntryCount == ExpectedEntryCount); return true; } void BucketManifestSerializer::WriteSidecarFile(const std::filesystem::path& SidecarPath, uint64_t SnapshotLogPosition, ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, std::vector&& AccessTimes, const std::vector& Payloads, const std::vector& MetaDatas) { BucketMetaHeader Header; Header.EntryCount = m_ManifestEntryCount; Header.LogPosition = SnapshotLogPosition; Header.Checksum = Header.ComputeChecksum(Header); std::error_code Ec; TemporaryFile SidecarFile; SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath())); } SidecarFile.Write(&Header, sizeof Header, 0); // TODO: make this batching for better performance { uint64_t WriteOffset = sizeof Header; BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024); for (auto& Kv : Index) { const IoHash& Key = Kv.first; const PayloadIndex PlIndex = Kv.second; IoHash RawHash = IoHash::Zero; uint64_t RawSize = 0; if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData) { RawHash = MetaDatas[MetaIndex].RawHash; RawSize = MetaDatas[MetaIndex].RawSize; } ManifestData ManifestEntry = {.Key = Key, .Timestamp = AccessTimes[PlIndex], .RawHash = RawHash, .Padding_0 = 0, .RawSize = RawSize, .Padding_1 = 0}; SidecarWriter.Write(&ManifestEntry, sizeof ManifestEntry, WriteOffset); WriteOffset += sizeof ManifestEntry; } } SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath)); } } ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config) : m_Gc(Gc) , m_OuterCacheMemoryUsage(OuterCacheMemoryUsage) , m_BucketName(std::move(BucketName)) , m_Configuration(Config) , m_BucketId(Oid::Zero) { if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) { const uint64_t LegacyOverrideSize = 16 * 1024 * 1024; // This is pretty ad hoc but in order to avoid too many individual files // it makes sense to have a different strategy for legacy values m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, LegacyOverrideSize); } m_Gc.AddGcReferencer(*this); } ZenCacheDiskLayer::CacheBucket::~CacheBucket() { m_Gc.RemoveGcReferencer(*this); } bool ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) { using namespace std::literals; ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate"); ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir); m_BlocksBasePath = BucketDir / "blocks"; m_BucketDir = BucketDir; CreateDirectories(m_BucketDir); std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); bool IsNew = false; BucketManifestSerializer ManifestReader; if (ManifestReader.Open(ManifestPath)) { m_BucketId = ManifestReader.GetBucketId(); if (m_BucketId == Oid::Zero) { return false; } uint32_t Version = 0; if (ManifestReader.IsCurrentVersion(/* out */ Version) == false) { ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, BucketManifestSerializer::CurrentDiskBucketVersion); IsNew = true; } } else if (AllowCreate) { m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath); IsNew = true; } else { return false; } InitializeIndexFromDisk(IsNew); if (IsNew) { return true; } ManifestReader.ParseManifest(*this, ManifestPath, m_Index, m_AccessTimes, m_Payloads); return true; } void ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshot(const std::function& ClaimDiskReserveFunc) { ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot"); const uint64_t LogCount = m_SlogFile.GetLogCount(); if (m_LogFlushPosition == LogCount) { return; } ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); const uint64_t EntryCount = m_Index.size(); Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", m_BucketDir, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); try { const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry); std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) { throw std::system_error(Error, fmt::format("get disk space in '{}' FAILED", m_BucketDir)); } bool EnoughSpace = Space.Free >= IndexSize + 1024 * 512; if (!EnoughSpace) { uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); EnoughSpace = (Space.Free + ReclaimedSpace) >= IndexSize + 1024 * 512; } if (!EnoughSpace) { throw std::runtime_error( fmt::format("not enough free disk space in '{}' to save index of size {}", m_BucketDir, NiceBytes(IndexSize))); } TemporaryFile ObjectIndexFile; std::error_code Ec; ObjectIndexFile.CreateTemporary(m_BucketDir, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir)); } { // This is in a separate scope just to ensure IndexWriter goes out // of scope before the file is flushed/closed, in order to ensure // all data is written to the file BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024); CacheBucketIndexHeader Header = {.EntryCount = EntryCount, .LogPosition = LogCount, .PayloadAlignment = gsl::narrow(m_Configuration.PayloadAlignment)}; Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0); uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader); for (auto& Entry : m_Index) { DiskIndexEntry IndexEntry; IndexEntry.Key = Entry.first; IndexEntry.Location = m_Payloads[Entry.second].Location; IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset); IndexWriteOffset += sizeof(DiskIndexEntry); } IndexWriter.Flush(); } ObjectIndexFile.Flush(); ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); if (Ec) { std::filesystem::path TempFilePath = ObjectIndexFile.GetPath(); ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message()); if (std::filesystem::is_regular_file(TempFilePath)) { if (!std::filesystem::remove(TempFilePath, Ec) || Ec) { ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempFilePath, Ec.message()); } } } else { // We must only update the log flush position once the snapshot write succeeds m_LogFlushPosition = LogCount; } } catch (std::exception& Err) { ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); } } uint64_t ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) { ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile"); if (!std::filesystem::is_regular_file(IndexPath)) { return 0; } auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); }); BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t FileSize = ObjectIndexFile.FileSize(); if (FileSize < sizeof(CacheBucketIndexHeader)) { return 0; } CacheBucketIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if (!Header.IsValid()) { return 0; } if (Header.Version != CacheBucketIndexHeader::Version2) { return 0; } const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); if (Header.EntryCount > ExpectedEntryCount) { return 0; } InvalidGuard.Dismiss(); size_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); m_Configuration.PayloadAlignment = Header.PayloadAlignment; m_Payloads.reserve(Header.EntryCount); m_Index.reserve(Header.EntryCount); BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024); uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader); uint64_t RemainingEntryCount = Header.EntryCount; std::string InvalidEntryReason; while (RemainingEntryCount--) { const DiskIndexEntry* Entry = FileBuffer.MakeView(CurrentReadOffset); CurrentReadOffset += sizeof(DiskIndexEntry); if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; } const PayloadIndex EntryIndex = PayloadIndex(EntryCount); m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location}); m_Index.insert_or_assign(Entry->Key, EntryIndex); EntryCount++; } ZEN_ASSERT(EntryCount == m_Payloads.size()); m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount())); if (m_Configuration.EnableReferenceCaching) { m_FirstReferenceIndex.resize(EntryCount); } OutVersion = CacheBucketIndexHeader::Version2; return Header.LogPosition; } uint64_t ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog"); if (!std::filesystem::is_regular_file(LogPath)) { return 0; } uint64_t LogEntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (!CasLog.Initialize()) { return 0; } const uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } LogEntryCount = EntryCount - SkipEntryCount; uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const DiskIndexEntry& Record) { std::string InvalidEntryReason; if (Record.Location.Flags & DiskLocation::kTombStone) { // Note: this leaves m_Payloads and other arrays with 'holes' in them m_Index.erase(Record.Key); return; } if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; return; } PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); m_Payloads.emplace_back(BucketPayload{.Location = Record.Location}); m_Index.insert_or_assign(Record.Key, EntryIndex); }, SkipEntryCount); m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); if (m_Configuration.EnableReferenceCaching) { m_FirstReferenceIndex.resize(m_Payloads.size()); } if (InvalidEntryCount) { ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); } return LogEntryCount; }; void ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(const bool IsNew) { ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog"); m_StandaloneSize = 0; m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); m_MetaDatas.clear(); m_FreeMetaDatas.clear(); m_MemCachedPayloads.clear(); m_FreeMemCachedPayloads.clear(); m_FirstReferenceIndex.clear(); m_ReferenceHashes.clear(); m_NextReferenceHashesIndexes.clear(); m_ReferenceCount = 0; std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); if (IsNew) { fs::remove(LogPath); fs::remove(IndexPath); fs::remove_all(m_BlocksBasePath); } CreateDirectories(m_BucketDir); m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); if (std::filesystem::is_regular_file(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); std::filesystem::remove(IndexPath); } } uint64_t LogEntryCount = 0; if (std::filesystem::is_regular_file(LogPath)) { if (TCasLogFile::IsValid(LogPath)) { LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); } else if (fs::is_regular_file(LogPath)) { ZEN_WARN("removing invalid log at '{}'", LogPath); std::filesystem::remove(LogPath); } } m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_Index) { size_t EntryIndex = Entry.second; const BucketPayload& Payload = m_Payloads[EntryIndex]; const DiskLocation& Location = Payload.Location; if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); } else { const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); KnownBlocks.Add(BlockLocation.BlockIndex); } } m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); if (IsNew || LogEntryCount > 0) { WriteIndexSnapshot(); } } void ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const { char HexString[sizeof(HashKey.Hash) * 2]; ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); Path.Append(m_BucketDir); Path.AppendSeparator(); Path.Append(L"blob"); Path.AppendSeparator(); Path.AppendAsciiRange(HexString, HexString + 3); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 3, HexString + 5); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); } IoBuffer ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const { ZEN_TRACE_CPU("Z$::Disk::Bucket::GetInlineCacheValue"); BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); IoBuffer Value = m_BlockStore.TryGetChunk(Location); if (Value) { Value.SetContentType(Loc.GetContentType()); } return Value; } IoBuffer ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const { ZEN_TRACE_CPU("Z$::Disk::Bucket::GetStandaloneCacheValue"); ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) { Data.SetContentType(ContentType); return Data; } return {}; } bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { metrics::RequestStats::Scope StatsScope(m_GetOps, 0); RwLock::SharedLockScope IndexLock(m_IndexLock); auto It = m_Index.find(HashKey); if (It == m_Index.end()) { m_DiskMissCount++; if (m_Configuration.MemCacheSizeThreshold > 0) { m_MemoryMissCount++; } return false; } size_t EntryIndex = It.value(); m_AccessTimes[EntryIndex] = GcClock::TickCount(); DiskLocation Location = m_Payloads[EntryIndex].Location; bool FillRawHashAndRawSize = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0); const BucketPayload* Payload = &m_Payloads[EntryIndex]; if (Payload->MetaData) { const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; OutValue.RawHash = MetaData.RawHash; OutValue.RawSize = MetaData.RawSize; FillRawHashAndRawSize = false; } if (Payload->MemCached) { OutValue.Value = m_MemCachedPayloads[Payload->MemCached]; Payload = nullptr; IndexLock.ReleaseNow(); m_MemoryHitCount++; } else { Payload = nullptr; IndexLock.ReleaseNow(); if (m_Configuration.MemCacheSizeThreshold > 0) { m_MemoryMissCount++; } if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey); } else { OutValue.Value = GetInlineCacheValue(Location); if (m_Configuration.MemCacheSizeThreshold > 0) { size_t ValueSize = OutValue.Value.GetSize(); if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) { ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MemCache"); OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); RwLock::ExclusiveLockScope _(m_IndexLock); if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end()) { BucketPayload& WritePayload = m_Payloads[EntryIndex]; // Only update if it has not already been updated by other thread if (!WritePayload.MemCached) { SetMemCachedData(WritePayload, OutValue.Value); } } } } } } if (FillRawHashAndRawSize) { ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MetaData"); if (Location.IsFlagSet(DiskLocation::kCompressed)) { if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) { OutValue = ZenCacheValue{}; m_DiskMissCount++; return false; } } else { OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); OutValue.RawSize = OutValue.Value.GetSize(); } RwLock::ExclusiveLockScope __(m_IndexLock); if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) { BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; // Only set if no other path has already updated the meta data if (!WritePayload.MetaData) { SetMetaData(WritePayload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash}); } } } if (OutValue.Value) { m_DiskHitCount++; StatsScope.SetBytes(OutValue.Value.GetSize()); return true; } else { m_DiskMissCount++; return false; } } void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References) { metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { PutStandaloneCacheValue(HashKey, Value, References); } else { PutInlineCacheValue(HashKey, Value, References); } m_DiskWriteCount++; } void ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) { GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); RwLock::ExclusiveLockScope _(m_IndexLock); for (const auto& Kv : m_Index) { if (m_AccessTimes[Kv.second] < ExpireTicks) { BucketPayload& Payload = m_Payloads[Kv.second]; RemoveMemCachedData(Payload); } } } void ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector& InOutUsageSlots) { RwLock::SharedLockScope _(m_IndexLock); for (const auto& It : m_Index) { size_t Index = It.second; BucketPayload& Payload = m_Payloads[Index]; if (!Payload.MemCached) { continue; } GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch(); uint64_t Slot = gsl::narrow(Age.count() > 0 ? Age.count() / SectionLength.count() : 0); if (Slot >= InOutUsageSlots.capacity()) { Slot = InOutUsageSlots.capacity() - 1; } if (Slot > InOutUsageSlots.size()) { InOutUsageSlots.resize(uint64_t(Slot + 1), 0); } InOutUsageSlots[Slot] += m_MemCachedPayloads[Payload.MemCached].GetSize(); } } bool ZenCacheDiskLayer::CacheBucket::Drop() { ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop"); RwLock::ExclusiveLockScope _(m_IndexLock); std::vector> ShardLocks; ShardLocks.reserve(256); for (RwLock& Lock : m_ShardedLocks) { ShardLocks.push_back(std::make_unique(Lock)); } m_BlockStore.Close(); m_SlogFile.Close(); bool Deleted = MoveAndDeleteDirectory(m_BucketDir); m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); m_MetaDatas.clear(); m_FreeMetaDatas.clear(); m_MemCachedPayloads.clear(); m_FreeMemCachedPayloads.clear(); m_FirstReferenceIndex.clear(); m_ReferenceHashes.clear(); m_NextReferenceHashesIndexes.clear(); m_ReferenceCount = 0; m_StandaloneSize.store(0); m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load()); m_MemCachedSize.store(0); return Deleted; } void ZenCacheDiskLayer::CacheBucket::Flush() { ZEN_TRACE_CPU("Z$::Disk::Bucket::Flush"); bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { return; } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); ZEN_INFO("Flushing bucket {}", m_BucketDir); try { m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); SaveSnapshot(); } catch (std::exception& Ex) { ZEN_WARN("Failed to flush bucket in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); } } void ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function& ClaimDiskReserveFunc) { try { bool UseLegacyScheme = false; uint64_t SidecarSize = 0; IoBuffer Buffer; BucketManifestSerializer ManifestWriter; { std::vector AccessTimes; std::vector Payloads; std::vector MetaDatas; IndexMap Index; { RwLock::SharedLockScope IndexLock(m_IndexLock); WriteIndexSnapshot(); // Note: this copy could be eliminated on shutdown to // reduce memory usage and execution time Index = m_Index; Payloads = m_Payloads; AccessTimes = m_AccessTimes; MetaDatas = m_MetaDatas; } if (UseLegacyScheme) { Buffer = ManifestWriter.MakeManifest(m_BucketId, std::move(Index), std::move(AccessTimes), Payloads, MetaDatas); } else { const uint64_t EntryCount = Index.size(); Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); SidecarSize = ManifestWriter.GetSidecarSize(); } const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512; std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) { ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); return; } bool EnoughSpace = Space.Free >= RequiredSpace; if (!EnoughSpace) { uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; } if (!EnoughSpace) { ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", m_BucketDir, NiceBytes(Buffer.GetSize())); return; } if (!UseLegacyScheme) { ManifestWriter.WriteSidecarFile(GetMetaPath(m_BucketDir, m_BucketName), m_LogFlushPosition, std::move(Index), std::move(AccessTimes), Payloads, MetaDatas); } } std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); WriteFile(ManifestPath, Buffer); } catch (std::exception& Err) { ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); } } IoHash HashBuffer(const CompositeBuffer& Buffer) { IoHashStream Hasher; for (const SharedBuffer& Segment : Buffer.GetSegments()) { Hasher.Append(Segment.GetView()); } return Hasher.GetHash(); } bool ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) { ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType); if (ContentType == ZenContentType::kCbObject) { CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); if (Error == CbValidateError::None) { return true; } ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error)); return false; } else if (ContentType == ZenContentType::kCompressedBinary) { IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer); IoHash HeaderRawHash; uint64_t RawSize = 0; if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize)) { ZEN_SCOPED_ERROR("compressed buffer header validation failed"); return false; } CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize); CompositeBuffer Decompressed = Compressed.DecompressToComposite(); IoHash DecompressedHash = HashBuffer(Decompressed); if (HeaderRawHash != DecompressedHash) { ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash); return false; } } else { // No way to verify this kind of content (what is it exactly?) static int Once = [&] { ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType)); return 42; }(); } return true; }; void ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub"); ZEN_INFO("scrubbing '{}'", m_BucketDir); Stopwatch Timer; uint64_t ChunkCount = 0; uint64_t VerifiedChunkBytes = 0; auto LogStats = MakeGuard([&] { const uint32_t DurationMs = gsl::narrow(Timer.GetElapsedTimeMs()); ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", m_BucketName, NiceBytes(VerifiedChunkBytes), NiceTimeSpanMs(DurationMs), ChunkCount, NiceRate(VerifiedChunkBytes, DurationMs)); }); std::vector BadKeys; auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); }; try { std::vector ChunkLocations; std::vector ChunkIndexToChunkHash; RwLock::SharedLockScope _(m_IndexLock); const size_t BlockChunkInitialCount = m_Index.size() / 4; ChunkLocations.reserve(BlockChunkInitialCount); ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); // Do a pass over the index and verify any standalone file values straight away // all other storage classes are gathered and verified in bulk in order to enable // more efficient I/O scheduling for (auto& Kv : m_Index) { const IoHash& HashKey = Kv.first; const BucketPayload& Payload = m_Payloads[Kv.second]; const DiskLocation& Loc = Payload.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { Ctx.ThrowIfDeadlineExpired(); ++ChunkCount; VerifiedChunkBytes += Loc.Size(); if (Loc.GetContentType() == ZenContentType::kBinary) { // Blob cache value, not much we can do about data integrity checking // here since there's no hash available ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); std::error_code Ec; uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); if (Ec) { ReportBadKey(HashKey); } if (size != Loc.Size()) { ReportBadKey(HashKey); } continue; } else { // Structured cache value IoBuffer Buffer = GetStandaloneCacheValue(Loc.GetContentType(), HashKey); if (!Buffer) { ReportBadKey(HashKey); continue; } if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer)) { ReportBadKey(HashKey); continue; } } } else { ChunkLocations.emplace_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment)); ChunkIndexToChunkHash.push_back(HashKey); continue; } } const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void { ++ChunkCount; VerifiedChunkBytes += Size; const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; if (!Data) { // ChunkLocation out of range of stored blocks ReportBadKey(Hash); return; } if (!Size) { ReportBadKey(Hash); return; } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); if (!Buffer) { ReportBadKey(Hash); return; } const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) { ReportBadKey(Hash); return; } }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void { Ctx.ThrowIfDeadlineExpired(); ++ChunkCount; VerifiedChunkBytes += Size; const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); if (!Buffer) { ReportBadKey(Hash); return; } const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) { ReportBadKey(Hash); return; } }; m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); } catch (ScrubDeadlineExpiredException&) { ZEN_INFO("Scrubbing deadline expired, operation incomplete"); } Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes); if (!BadKeys.empty()) { ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir); if (Ctx.RunRecovery()) { // Deal with bad chunks by removing them from our lookup map std::vector LogEntries; LogEntries.reserve(BadKeys.size()); { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (const IoHash& BadKey : BadKeys) { // Log a tombstone and delete the in-memory index for the bad entry const auto It = m_Index.find(BadKey); BucketPayload& Payload = m_Payloads[It->second]; if (m_Configuration.EnableReferenceCaching) { RemoveReferences(IndexLock, m_FirstReferenceIndex[It->second]); } DiskLocation Location = Payload.Location; if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed); } RemoveMemCachedData(Payload); RemoveMetaData(Payload); Location.Flags |= DiskLocation::kTombStone; LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); m_Index.erase(BadKey); } } for (const DiskIndexEntry& Entry : LogEntries) { if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) { ExtendablePathBuilder<256> Path; BuildPath(Path, Entry.Key); fs::path FilePath = Path.ToPath(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); if (fs::is_regular_file(FilePath)) { ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); std::error_code Ec; fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... } } } m_SlogFile.Append(LogEntries); // Clean up m_AccessTimes and m_Payloads vectors { std::vector Payloads; std::vector AccessTimes; std::vector MetaDatas; std::vector MemCachedPayloads; std::vector FirstReferenceIndex; IndexMap Index; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); } } } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do if (!BadKeys.empty()) { Ctx.ReportBadCidChunks(BadKeys); } } void ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Disk::Bucket::GatherReferences"); #define CALCULATE_BLOCKING_TIME 0 #if CALCULATE_BLOCKING_TIME uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; #endif // CALCULATE_BLOCKING_TIME Stopwatch TotalTimer; const auto _ = MakeGuard([&] { #if CALCULATE_BLOCKING_TIME ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), NiceLatencyNs(ReadBlockTimeUs), NiceLatencyNs(ReadBlockLongestTimeUs)); #else ZEN_DEBUG("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); #endif // CALCULATE_BLOCKING_TIME }); const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime(); const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); IndexMap Index; std::vector AccessTimes; std::vector Payloads; std::vector FirstReferenceIndex; { RwLock::SharedLockScope __(m_IndexLock); #if CALCULATE_BLOCKING_TIME Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME Index = m_Index; AccessTimes = m_AccessTimes; Payloads = m_Payloads; FirstReferenceIndex = m_FirstReferenceIndex; } std::vector ExpiredKeys; ExpiredKeys.reserve(1024); std::vector Cids; if (!GcCtx.SkipCid()) { Cids.reserve(1024); } std::vector> StructuredItemsWithUnknownAttachments; for (const auto& Entry : Index) { const IoHash& Key = Entry.first; size_t PayloadIndex = Entry.second; GcClock::Tick AccessTime = AccessTimes[PayloadIndex]; if (AccessTime < ExpireTicks) { ExpiredKeys.push_back(Key); continue; } if (GcCtx.SkipCid()) { continue; } BucketPayload& Payload = Payloads[PayloadIndex]; const DiskLocation& Loc = Payload.Location; if (!Loc.IsFlagSet(DiskLocation::kStructured)) { continue; } if (m_Configuration.EnableReferenceCaching) { if (FirstReferenceIndex.empty() || FirstReferenceIndex[PayloadIndex] == ReferenceIndex::Unknown()) { StructuredItemsWithUnknownAttachments.push_back(Entry); continue; } bool ReferencesAreKnown = false; { RwLock::SharedLockScope IndexLock(m_IndexLock); #if CALCULATE_BLOCKING_TIME Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Key); It != m_Index.end()) { ReferencesAreKnown = GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids); } } if (ReferencesAreKnown) { if (Cids.size() >= 1024) { GcCtx.AddRetainedCids(Cids); Cids.clear(); } continue; } } StructuredItemsWithUnknownAttachments.push_back(Entry); } for (const auto& Entry : StructuredItemsWithUnknownAttachments) { const IoHash& Key = Entry.first; size_t PayloadIndex = Entry.second; BucketPayload& Payload = Payloads[PayloadIndex]; const DiskLocation& Loc = Payload.Location; { IoBuffer Buffer; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Key); !Buffer) { continue; } } else { RwLock::SharedLockScope IndexLock(m_IndexLock); #if CALCULATE_BLOCKING_TIME Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Key); It != m_Index.end()) { const BucketPayload& CachedPayload = Payloads[PayloadIndex]; if (CachedPayload.MemCached) { Buffer = m_MemCachedPayloads[CachedPayload.MemCached]; ZEN_ASSERT_SLOW(Buffer); } else { DiskLocation Location = m_Payloads[It->second].Location; IndexLock.ReleaseNow(); Buffer = GetInlineCacheValue(Location); // Don't memcache items when doing GC } } if (!Buffer) { continue; } } ZEN_ASSERT(Buffer); ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); CbObjectView Obj(Buffer.GetData()); size_t CurrentCidCount = Cids.size(); Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); if (m_Configuration.EnableReferenceCaching) { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); #if CALCULATE_BLOCKING_TIME Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Key); It != m_Index.end()) { if (m_FirstReferenceIndex[It->second] == ReferenceIndex::Unknown()) { SetReferences(IndexLock, m_FirstReferenceIndex[It->second], std::span(Cids.data() + CurrentCidCount, Cids.size() - CurrentCidCount)); } else { Cids.resize(CurrentCidCount); (void)GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids); } } } if (Cids.size() >= 1024) { GcCtx.AddRetainedCids(Cids); Cids.clear(); } } } GcCtx.AddRetainedCids(Cids); GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); } void ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage"); ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); Stopwatch TotalTimer; uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; uint64_t TotalChunkCount = 0; uint64_t DeletedSize = 0; GcStorageSize OldTotalSize = StorageSize(); std::unordered_set DeletedChunks; uint64_t MovedCount = 0; const auto _ = MakeGuard([&] { ZEN_DEBUG( "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " "{} " "of {} " "entries ({}/{}).", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), NiceLatencyNs(ReadBlockTimeUs), NiceLatencyNs(ReadBlockLongestTimeUs), NiceBytes(DeletedSize), DeletedChunks.size(), MovedCount, TotalChunkCount, NiceBytes(OldTotalSize.DiskSize), NiceBytes(OldTotalSize.MemorySize)); bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { return; } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); try { SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); } catch (std::exception& Ex) { ZEN_WARN("Failed to write index and manifest after GC in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); } }); m_SlogFile.Flush(); auto __ = MakeGuard([&]() { if (!DeletedChunks.empty()) { // Clean up m_AccessTimes and m_Payloads vectors std::vector Payloads; std::vector AccessTimes; std::vector MetaDatas; std::vector MemCachedPayloads; std::vector FirstReferenceIndex; IndexMap Index; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); } GcCtx.AddDeletedCids(std::vector(DeletedChunks.begin(), DeletedChunks.end())); } }); std::span ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); std::unordered_set ExpiredCacheKeys(ExpiredCacheKeySpan.begin(), ExpiredCacheKeySpan.end()); std::vector ExpiredStandaloneEntries; IndexMap IndexSnapshot; std::vector PayloadsSnapshot; BlockStore::ReclaimSnapshotState BlockStoreState; { bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is currently flushing", m_BucketDir); return; } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State"); RwLock::SharedLockScope IndexLock(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); for (const IoHash& Key : ExpiredCacheKeys) { if (auto It = m_Index.find(Key); It != m_Index.end()) { const BucketPayload& Payload = m_Payloads[It->second]; if (Payload.Location.Flags & DiskLocation::kStandaloneFile) { DiskIndexEntry Entry = {.Key = Key, .Location = Payload.Location}; Entry.Location.Flags |= DiskLocation::kTombStone; ExpiredStandaloneEntries.push_back(Entry); } } } PayloadsSnapshot = m_Payloads; IndexSnapshot = m_Index; if (GcCtx.IsDeletionMode()) { IndexLock.ReleaseNow(); RwLock::ExclusiveLockScope __(m_IndexLock); for (const auto& Entry : ExpiredStandaloneEntries) { if (m_Index.erase(Entry.Key) == 1) { m_StandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); DeletedChunks.insert(Entry.Key); } } m_SlogFile.Append(ExpiredStandaloneEntries); } } } if (GcCtx.IsDeletionMode()) { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::Delete"); ExtendablePathBuilder<256> Path; for (const auto& Entry : ExpiredStandaloneEntries) { const IoHash& Key = Entry.Key; Path.Reset(); BuildPath(Path, Key); fs::path FilePath = Path.ToPath(); { RwLock::SharedLockScope IndexLock(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); if (m_Index.contains(Key)) { // Someone added it back, let the file on disk be ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); continue; } IndexLock.ReleaseNow(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); if (fs::is_regular_file(FilePath)) { ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); std::error_code Ec; fs::remove(FilePath, Ec); if (Ec) { ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); continue; } } } DeletedSize += Entry.Location.Size(); } } TotalChunkCount = IndexSnapshot.size(); std::vector ChunkLocations; BlockStore::ChunkIndexArray KeepChunkIndexes; std::vector ChunkIndexToChunkHash; ChunkLocations.reserve(TotalChunkCount); ChunkLocations.reserve(TotalChunkCount); ChunkIndexToChunkHash.reserve(TotalChunkCount); { TotalChunkCount = 0; for (const auto& Entry : IndexSnapshot) { size_t EntryIndex = Entry.second; const DiskLocation& DiskLocation = PayloadsSnapshot[EntryIndex].Location; if (DiskLocation.Flags & DiskLocation::kStandaloneFile) { continue; } const IoHash& Key = Entry.first; BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment); size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); ChunkIndexToChunkHash[ChunkIndex] = Key; if (ExpiredCacheKeys.contains(Key)) { continue; } KeepChunkIndexes.push_back(ChunkIndex); } } TotalChunkCount = ChunkLocations.size(); size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, true); GcStorageSize CurrentTotalSize = StorageSize(); ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} ({}/{})", m_BucketDir, DeleteCount, TotalChunkCount, NiceBytes(CurrentTotalSize.DiskSize), NiceBytes(CurrentTotalSize.MemorySize)); return; } m_BlockStore.ReclaimSpace( BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, false, [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { std::vector LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); { RwLock::ExclusiveLockScope __(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); for (const auto& Entry : MovedChunks) { size_t ChunkIndex = Entry.first; const BlockStoreLocation& NewLocation = Entry.second; const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; size_t EntryIndex = m_Index[ChunkHash]; BucketPayload& Payload = m_Payloads[EntryIndex]; if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != m_Payloads[EntryIndex].Location) { // Entry has been updated while GC was running, ignore the move continue; } Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); LogEntries.push_back({.Key = ChunkHash, .Location = Payload.Location}); } for (const size_t ChunkIndex : RemovedChunks) { const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; size_t EntryIndex = m_Index[ChunkHash]; BucketPayload& Payload = m_Payloads[EntryIndex]; if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != Payload.Location) { // Entry has been updated while GC was running, ignore the delete continue; } const DiskLocation& OldDiskLocation = Payload.Location; LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment), m_Configuration.PayloadAlignment, OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); RemoveMemCachedData(Payload); RemoveMetaData(Payload); m_Index.erase(ChunkHash); DeletedChunks.insert(ChunkHash); } } m_SlogFile.Append(LogEntries); m_SlogFile.Flush(); }, [&]() { return GcCtx.ClaimGCReserve(); }); } ZenCacheDiskLayer::BucketStats ZenCacheDiskLayer::CacheBucket::Stats() { GcStorageSize Size = StorageSize(); return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize, .DiskHitCount = m_DiskHitCount, .DiskMissCount = m_DiskMissCount, .DiskWriteCount = m_DiskWriteCount, .MemoryHitCount = m_MemoryHitCount, .MemoryMissCount = m_MemoryMissCount, .MemoryWriteCount = m_MemoryWriteCount, .PutOps = m_PutOps.Snapshot(), .GetOps = m_GetOps.Snapshot()}; } uint64_t ZenCacheDiskLayer::CacheBucket::EntryCount() const { RwLock::SharedLockScope _(m_IndexLock); return static_cast(m_Index.size()); } CacheValueDetails::ValueDetails ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, PayloadIndex Index) const { std::vector Attachments; const BucketPayload& Payload = m_Payloads[Index]; if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) { IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key) : GetInlineCacheValue(Payload.Location); CbObjectView Obj(Value.GetData()); Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); } BucketMetaData MetaData = GetMetaData(Payload); return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), .RawSize = MetaData.RawSize, .RawHash = MetaData.RawHash, .LastAccess = m_AccessTimes[Index], .Attachments = std::move(Attachments), .ContentType = Payload.Location.GetContentType()}; } CacheValueDetails::BucketDetails ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const { CacheValueDetails::BucketDetails Details; RwLock::SharedLockScope _(m_IndexLock); if (ValueFilter.empty()) { Details.Values.reserve(m_Index.size()); for (const auto& It : m_Index) { Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second)); } } else { IoHash Key = IoHash::FromHexString(ValueFilter); if (auto It = m_Index.find(Key); It != m_Index.end()) { Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second)); } } return Details; } void ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents( std::function& Fn) const { RwLock::SharedLockScope _(m_IndexLock); for (const auto& It : m_Index) { CacheValueDetails::ValueDetails Vd = GetValueDetails(It.first, It.second); Fn(It.first, Vd); } } void ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Disk::CollectGarbage"); std::vector Buckets; { RwLock::SharedLockScope _(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Buckets.push_back(Kv.second.get()); } } for (CacheBucket* Bucket : Buckets) { Bucket->CollectGarbage(GcCtx); } MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); } void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References) { ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue"); uint64_t NewFileSize = Value.Value.Size(); TemporaryFile DataFile; std::error_code Ec; DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); } bool CleanUpTempFile = true; auto __ = MakeGuard([&] { if (CleanUpTempFile) { std::error_code Ec; std::filesystem::remove(DataFile.GetPath(), Ec); if (Ec) { ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", DataFile.GetPath(), m_BucketDir, Ec.message()); } } }); DataFile.WriteAll(Value.Value, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", NiceBytes(NewFileSize), DataFile.GetPath().string(), m_BucketDir)); } ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); std::filesystem::path FsPath{DataFilePath.ToPath()}; RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); // We do a speculative remove of the file instead of probing with a exists call and check the error code instead std::filesystem::remove(FsPath, Ec); if (Ec) { if (Ec.value() != ENOENT) { ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); Sleep(100); Ec.clear(); std::filesystem::remove(FsPath, Ec); if (Ec && Ec.value() != ENOENT) { throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); } } } // Assume parent directory exists DataFile.MoveTemporaryIntoPlace(FsPath, Ec); if (Ec) { CreateDirectories(FsPath.parent_path()); // Try again after we or someone else created the directory Ec.clear(); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); // Retry if we still fail to handle contention to file system uint32_t RetriesLeft = 3; while (Ec && RetriesLeft > 0) { ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retries left: {}.", FsPath, DataFile.GetPath(), m_BucketDir, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms Ec.clear(); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); RetriesLeft--; } if (Ec) { throw std::system_error( Ec, fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir)); } } // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file // will be disabled as the file handle has already been closed CleanUpTempFile = false; uint8_t EntryFlags = DiskLocation::kStandaloneFile; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } DiskLocation Loc(NewFileSize, EntryFlags); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); ValueLock.ReleaseNow(); PayloadIndex EntryIndex = {}; if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object EntryIndex = PayloadIndex(m_Payloads.size()); m_Payloads.emplace_back(BucketPayload{.Location = Loc}); m_AccessTimes.emplace_back(GcClock::TickCount()); if (m_Configuration.EnableReferenceCaching) { m_FirstReferenceIndex.emplace_back(ReferenceIndex{}); SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); } m_Index.insert_or_assign(HashKey, EntryIndex); } else { EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); BucketPayload& Payload = m_Payloads[EntryIndex]; uint64_t OldSize = Payload.Location.Size(); Payload = BucketPayload{.Location = Loc}; if (m_Configuration.EnableReferenceCaching) { SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); } m_AccessTimes[EntryIndex] = GcClock::TickCount(); RemoveMemCachedData(Payload); m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed); } if (Value.RawSize != 0 || Value.RawHash != IoHash::Zero) { SetMetaData(m_Payloads[EntryIndex], {.RawSize = Value.RawSize, .RawHash = Value.RawHash}); } else { RemoveMetaData(m_Payloads[EntryIndex]); } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); m_StandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); } void ZenCacheDiskLayer::CacheBucket::SetMetaData(BucketPayload& Payload, const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData) { if (Payload.MetaData) { m_MetaDatas[Payload.MetaData] = MetaData; } else { if (m_FreeMetaDatas.empty()) { Payload.MetaData = MetaDataIndex(m_MetaDatas.size()); m_MetaDatas.emplace_back(MetaData); } else { Payload.MetaData = m_FreeMetaDatas.back(); m_FreeMetaDatas.pop_back(); m_MetaDatas[Payload.MetaData] = MetaData; } } } void ZenCacheDiskLayer::CacheBucket::RemoveMetaData(BucketPayload& Payload) { if (Payload.MetaData) { m_FreeMetaDatas.push_back(Payload.MetaData); Payload.MetaData = {}; } } void ZenCacheDiskLayer::CacheBucket::SetMemCachedData(BucketPayload& Payload, IoBuffer& MemCachedData) { uint64_t PayloadSize = MemCachedData.GetSize(); ZEN_ASSERT(PayloadSize != 0); if (m_FreeMemCachedPayloads.empty()) { if (m_MemCachedPayloads.size() != std::numeric_limits::max()) { Payload.MemCached = MemCachedIndex(gsl::narrow(m_MemCachedPayloads.size())); m_MemCachedPayloads.push_back(MemCachedData); AddMemCacheUsage(PayloadSize); m_MemoryWriteCount++; } } else { Payload.MemCached = m_FreeMemCachedPayloads.back(); m_FreeMemCachedPayloads.pop_back(); m_MemCachedPayloads[Payload.MemCached] = MemCachedData; AddMemCacheUsage(PayloadSize); m_MemoryWriteCount++; } } size_t ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(BucketPayload& Payload) { if (Payload.MemCached) { size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].GetSize(); RemoveMemCacheUsage(PayloadSize); m_MemCachedPayloads[Payload.MemCached] = IoBuffer{}; m_FreeMemCachedPayloads.push_back(Payload.MemCached); Payload.MemCached = {}; return PayloadSize; } return 0; } ZenCacheDiskLayer::CacheBucket::BucketMetaData ZenCacheDiskLayer::CacheBucket::GetMetaData(const BucketPayload& Payload) const { if (Payload.MetaData) { return m_MetaDatas[Payload.MetaData]; } return {}; } void ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References) { ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue"); uint8_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_Configuration.PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); m_SlogFile.Append({.Key = HashKey, .Location = Location}); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) { PayloadIndex EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); BucketPayload& Payload = m_Payloads[EntryIndex]; RemoveMemCachedData(Payload); RemoveMetaData(Payload); Payload = (BucketPayload{.Location = Location}); m_AccessTimes[EntryIndex] = GcClock::TickCount(); if (m_Configuration.EnableReferenceCaching) { SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); } } else { PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); m_Payloads.emplace_back(BucketPayload{.Location = Location}); m_AccessTimes.emplace_back(GcClock::TickCount()); if (m_Configuration.EnableReferenceCaching) { m_FirstReferenceIndex.emplace_back(ReferenceIndex{}); SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); } m_Index.insert_or_assign(HashKey, EntryIndex); } }); } std::string ZenCacheDiskLayer::CacheBucket::GetGcName(GcCtx&) { return fmt::format("cachebucket:'{}'", m_BucketDir.string()); } class DiskBucketStoreCompactor : public GcStoreCompactor { public: DiskBucketStoreCompactor(ZenCacheDiskLayer::CacheBucket& Bucket, std::vector>&& ExpiredStandaloneKeys) : m_Bucket(Bucket) , m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys)) { m_ExpiredStandaloneKeys.shrink_to_fit(); } virtual ~DiskBucketStoreCompactor() {} virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function& ClaimDiskReserveCallback) override { Stopwatch Timer; const auto _ = MakeGuard([&] { Reset(m_ExpiredStandaloneKeys); if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': RemovedDisk: {} in {}", m_Bucket.m_BucketDir, NiceBytes(Stats.RemovedDisk), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); if (!m_ExpiredStandaloneKeys.empty()) { // Compact standalone items size_t Skipped = 0; ExtendablePathBuilder<256> Path; for (const std::pair& ExpiredKey : m_ExpiredStandaloneKeys) { if (Ctx.IsCancelledFlag.load()) { return; } Path.Reset(); m_Bucket.BuildPath(Path, ExpiredKey.first); fs::path FilePath = Path.ToPath(); RwLock::SharedLockScope IndexLock(m_Bucket.m_IndexLock); if (m_Bucket.m_Index.contains(ExpiredKey.first)) { // Someone added it back, let the file on disk be ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back", m_Bucket.m_BucketDir, Path.ToUtf8()); continue; } if (Ctx.Settings.IsDeleteMode) { RwLock::ExclusiveLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first)); IndexLock.ReleaseNow(); ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); std::error_code Ec; if (!fs::remove(FilePath, Ec)) { continue; } if (Ec) { ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'", m_Bucket.m_BucketDir, Path.ToUtf8(), Ec.message()); continue; } Stats.RemovedDisk += ExpiredKey.second; } else { std::error_code Ec; bool Existed = std::filesystem::is_regular_file(FilePath, Ec); if (Ec) { ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'", m_Bucket.m_BucketDir, FilePath, Ec.message()); continue; } if (!Existed) { continue; } Skipped++; } } if (Skipped > 0) { ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipped deleting of {} eligible files", m_Bucket.m_BucketDir, Skipped); } } if (Ctx.Settings.CollectSmallObjects) { std::unordered_map BlockUsage; { RwLock::SharedLockScope __(m_Bucket.m_IndexLock); for (const auto& Entry : m_Bucket.m_Index) { ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index]; const DiskLocation& Loc = Payload.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { continue; } uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex(); uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment); auto It = BlockUsage.find(BlockIndex); if (It == BlockUsage.end()) { BlockUsage.insert_or_assign(BlockIndex, ChunkSize); } else { It->second += ChunkSize; } } } { BlockStoreCompactState BlockCompactState; std::vector BlockCompactStateKeys; std::vector BlocksToCompact = m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent); BlockCompactState.IncludeBlocks(BlocksToCompact); if (BlocksToCompact.size() > 0) { { RwLock::SharedLockScope __(m_Bucket.m_IndexLock); for (const auto& Entry : m_Bucket.m_Index) { ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index]; const DiskLocation& Loc = Payload.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { continue; } if (!BlockCompactState.AddKeepLocation(Loc.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment))) { continue; } BlockCompactStateKeys.push_back(Entry.first); } } if (Ctx.Settings.IsDeleteMode) { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks", m_Bucket.m_BucketDir, BlocksToCompact.size()); } m_Bucket.m_BlockStore.CompactBlocks( BlockCompactState, m_Bucket.m_Configuration.PayloadAlignment, [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { std::vector MovedEntries; RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock); for (const std::pair& Moved : MovedArray) { size_t ChunkIndex = Moved.first; const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end()) { ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second]; const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); if (Payload.Location.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment) != OldLocation) { // Someone has moved our chunk so lets just skip the new location we were provided, it will be // GC:d at a later time continue; } const BlockStoreLocation& NewLocation = Moved.second; Payload.Location = DiskLocation(NewLocation, m_Bucket.m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); } } m_Bucket.m_SlogFile.Append(MovedEntries); Stats.RemovedDisk += FreedDiskSpace; if (Ctx.IsCancelledFlag.load()) { return false; } return true; }, ClaimDiskReserveCallback); } else { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks", m_Bucket.m_BucketDir, BlocksToCompact.size()); } } } } } } private: ZenCacheDiskLayer::CacheBucket& m_Bucket; std::vector> m_ExpiredStandaloneKeys; }; GcStoreCompactor* ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { size_t TotalEntries = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}", m_BucketDir, Stats.CheckedCount, Stats.FoundCount, Stats.DeletedCount, NiceBytes(Stats.FreedMemory), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count(); std::vector ExpiredEntries; std::vector> ExpiredStandaloneKeys; uint64_t RemovedStandaloneSize = 0; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (Ctx.IsCancelledFlag.load()) { return nullptr; } TotalEntries = m_Index.size(); // Find out expired keys for (const auto& Entry : m_Index) { const IoHash& Key = Entry.first; ZenCacheDiskLayer::CacheBucket::PayloadIndex EntryIndex = Entry.second; GcClock::Tick AccessTime = m_AccessTimes[EntryIndex]; if (AccessTime >= ExpireTicks) { continue; } const BucketPayload& Payload = m_Payloads[EntryIndex]; DiskIndexEntry ExpiredEntry = {.Key = Key, .Location = Payload.Location}; ExpiredEntry.Location.Flags |= DiskLocation::kTombStone; if (Payload.Location.Flags & DiskLocation::kStandaloneFile) { ExpiredStandaloneKeys.push_back({Key, Payload.Location.Size()}); RemovedStandaloneSize += Payload.Location.Size(); ExpiredEntries.push_back(ExpiredEntry); } else if (Ctx.Settings.CollectSmallObjects) { ExpiredEntries.push_back(ExpiredEntry); } } Stats.CheckedCount += TotalEntries; Stats.FoundCount += ExpiredEntries.size(); if (Ctx.IsCancelledFlag.load()) { return nullptr; } if (Ctx.Settings.IsDeleteMode) { for (const DiskIndexEntry& Entry : ExpiredEntries) { auto It = m_Index.find(Entry.Key); ZEN_ASSERT(It != m_Index.end()); BucketPayload& Payload = m_Payloads[It->second]; RemoveMetaData(Payload); Stats.FreedMemory += RemoveMemCachedData(Payload); m_Index.erase(It); Stats.DeletedCount++; } m_SlogFile.Append(ExpiredEntries); m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed); } } if (!ExpiredEntries.empty()) { std::vector Payloads; std::vector AccessTimes; std::vector MetaDatas; std::vector MemCachedPayloads; std::vector FirstReferenceIndex; IndexMap Index; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); } } if (Ctx.IsCancelledFlag.load()) { return nullptr; } return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); } class DiskBucketReferenceChecker : public GcReferenceChecker { public: DiskBucketReferenceChecker(ZenCacheDiskLayer::CacheBucket& Owner) : m_CacheBucket(Owner) {} virtual ~DiskBucketReferenceChecker() { m_IndexLock.reset(); if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) { // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it m_CacheBucket.ClearReferenceCache(); } } virtual void LockState(GcCtx& Ctx) override { Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, m_CacheBucket.m_ReferenceCount + m_UncachedReferences.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); m_IndexLock = std::make_unique(m_CacheBucket.m_IndexLock); if (Ctx.IsCancelledFlag.load()) { m_UncachedReferences.clear(); m_IndexLock.reset(); return; } // Rescan to see if any cache items needs refreshing since last pass when we had the lock for (const auto& Entry : m_CacheBucket.m_Index) { if (Ctx.IsCancelledFlag.load()) { m_UncachedReferences.clear(); m_IndexLock.reset(); return; } size_t PayloadIndex = Entry.second; const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_CacheBucket.m_Payloads[PayloadIndex]; const DiskLocation& Loc = Payload.Location; if (!Loc.IsFlagSet(DiskLocation::kStructured)) { continue; } ZEN_ASSERT(!m_CacheBucket.m_FirstReferenceIndex.empty()); const IoHash& Key = Entry.first; if (m_CacheBucket.m_FirstReferenceIndex[PayloadIndex] == ZenCacheDiskLayer::CacheBucket::ReferenceIndex::Unknown()) { IoBuffer Buffer; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { Buffer = m_CacheBucket.GetStandaloneCacheValue(Loc.GetContentType(), Key); } else { Buffer = m_CacheBucket.GetInlineCacheValue(Loc); } if (Buffer) { ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); CbObjectView Obj(Buffer.GetData()); Obj.IterateAttachments([this](CbFieldView Field) { m_UncachedReferences.insert(Field.AsAttachment()); }); } } } } virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override { ZEN_ASSERT(m_IndexLock); size_t InitialCount = IoCids.size(); Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", m_CacheBucket.m_BucketDir, InitialCount - IoCids.size(), InitialCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes) { IoCids.erase(ReferenceHash); } for (const IoHash& ReferenceHash : m_UncachedReferences) { IoCids.erase(ReferenceHash); } } ZenCacheDiskLayer::CacheBucket& m_CacheBucket; std::unique_ptr m_IndexLock; HashSet m_UncachedReferences; }; std::vector ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) { Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': found {} references in {}", m_BucketDir, m_ReferenceCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::vector UpdateKeys; std::vector StandaloneKeys; std::vector ReferenceCounts; std::vector References; // Refresh cache { RwLock::SharedLockScope IndexLock(m_IndexLock); for (const auto& Entry : m_Index) { if (Ctx.IsCancelledFlag.load()) { return {}; } size_t PayloadIndex = Entry.second; const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Payloads[PayloadIndex]; const DiskLocation& Loc = Payload.Location; if (!Loc.IsFlagSet(DiskLocation::kStructured)) { continue; } if (m_Configuration.EnableReferenceCaching && m_FirstReferenceIndex[PayloadIndex] != ZenCacheDiskLayer::CacheBucket::ReferenceIndex::Unknown()) { continue; } const IoHash& Key = Entry.first; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { StandaloneKeys.push_back(Key); continue; } IoBuffer Buffer = GetInlineCacheValue(Loc); if (!Buffer) { UpdateKeys.push_back(Key); ReferenceCounts.push_back(0); continue; } size_t CurrentReferenceCount = References.size(); { CbObjectView Obj(Buffer.GetData()); Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); Buffer = {}; } UpdateKeys.push_back(Key); ReferenceCounts.push_back(References.size() - CurrentReferenceCount); } } { for (const IoHash& Key : StandaloneKeys) { if (Ctx.IsCancelledFlag.load()) { return {}; } IoBuffer Buffer = GetStandaloneCacheValue(ZenContentType::kCbObject, Key); if (!Buffer) { continue; } size_t CurrentReferenceCount = References.size(); { CbObjectView Obj(Buffer.GetData()); Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); Buffer = {}; } UpdateKeys.push_back(Key); ReferenceCounts.push_back(References.size() - CurrentReferenceCount); } } { size_t ReferenceOffset = 0; RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (!m_Configuration.EnableReferenceCaching) { ZEN_ASSERT(m_FirstReferenceIndex.empty()); ZEN_ASSERT(m_ReferenceHashes.empty()); ZEN_ASSERT(m_NextReferenceHashesIndexes.empty()); ZEN_ASSERT(m_ReferenceCount == 0); // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted. m_FirstReferenceIndex.resize(m_Payloads.size()); } for (size_t Index = 0; Index < UpdateKeys.size(); Index++) { const IoHash& Key = UpdateKeys[Index]; size_t ReferenceCount = ReferenceCounts[Index]; auto It = m_Index.find(Key); if (It == m_Index.end()) { ReferenceOffset += ReferenceCount; continue; } if (m_FirstReferenceIndex[It->second] != ReferenceIndex::Unknown()) { continue; } SetReferences(IndexLock, m_FirstReferenceIndex[It->second], std::span{References.data() + ReferenceOffset, ReferenceCount}); ReferenceOffset += ReferenceCount; } if (m_Configuration.EnableReferenceCaching) { CompactReferences(IndexLock); } } return {new DiskBucketReferenceChecker(*this)}; } void ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&) { std::vector FirstReferenceIndex; std::vector NewReferenceHashes; std::vector NewNextReferenceHashesIndexes; FirstReferenceIndex.reserve(m_ReferenceCount); NewReferenceHashes.reserve(m_ReferenceCount); NewNextReferenceHashesIndexes.reserve(m_ReferenceCount); for (const auto& It : m_Index) { ReferenceIndex SourceIndex = m_FirstReferenceIndex[It.second]; if (SourceIndex == ReferenceIndex::Unknown()) { FirstReferenceIndex.push_back(ReferenceIndex{}); continue; } if (SourceIndex == ReferenceIndex::None()) { FirstReferenceIndex.push_back(ReferenceIndex::None()); continue; } FirstReferenceIndex.push_back(ReferenceIndex{NewNextReferenceHashesIndexes.size()}); NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]); NewNextReferenceHashesIndexes.push_back(ReferenceIndex::None()); SourceIndex = m_NextReferenceHashesIndexes[SourceIndex]; while (SourceIndex != ReferenceIndex::None()) { NewNextReferenceHashesIndexes.back() = ReferenceIndex{NewReferenceHashes.size()}; NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]); NewNextReferenceHashesIndexes.push_back(ReferenceIndex::None()); SourceIndex = m_NextReferenceHashesIndexes[SourceIndex]; } } m_FirstReferenceIndex.swap(FirstReferenceIndex); m_ReferenceHashes.swap(NewReferenceHashes); m_ReferenceHashes.shrink_to_fit(); m_NextReferenceHashesIndexes.swap(NewNextReferenceHashesIndexes); m_NextReferenceHashesIndexes.shrink_to_fit(); m_ReferenceCount = m_ReferenceHashes.size(); } ZenCacheDiskLayer::CacheBucket::ReferenceIndex ZenCacheDiskLayer::CacheBucket::AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key) { ReferenceIndex NewIndex = ReferenceIndex{m_ReferenceHashes.size()}; m_ReferenceHashes.push_back(Key); m_NextReferenceHashesIndexes.emplace_back(ReferenceIndex::None()); m_ReferenceCount++; return NewIndex; } void ZenCacheDiskLayer::CacheBucket::SetReferences(RwLock::ExclusiveLockScope& Lock, ReferenceIndex& FirstReferenceIndex, std::span References) { auto ReferenceIt = References.begin(); if (FirstReferenceIndex == ReferenceIndex::Unknown()) { FirstReferenceIndex = ReferenceIndex::None(); } ReferenceIndex CurrentIndex = FirstReferenceIndex; if (CurrentIndex != ReferenceIndex::None()) { if (ReferenceIt != References.end()) { ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero); if (CurrentIndex == ReferenceIndex::None()) { CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt); FirstReferenceIndex = CurrentIndex; } else { m_ReferenceHashes[CurrentIndex] = *ReferenceIt; } ReferenceIt++; } } else { if (ReferenceIt != References.end()) { ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero); CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt); ReferenceIt++; } FirstReferenceIndex = CurrentIndex; } while (ReferenceIt != References.end()) { ZEN_ASSERT(CurrentIndex != ReferenceIndex::None()); ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero); ReferenceIndex NextReferenceIndex = m_NextReferenceHashesIndexes[CurrentIndex]; if (NextReferenceIndex == ReferenceIndex::None()) { NextReferenceIndex = AllocateReferenceEntry(Lock, *ReferenceIt); m_NextReferenceHashesIndexes[CurrentIndex] = NextReferenceIndex; } else { m_ReferenceHashes[NextReferenceIndex] = *ReferenceIt; } CurrentIndex = NextReferenceIndex; ReferenceIt++; } while (CurrentIndex != ReferenceIndex::None()) { ReferenceIndex NextIndex = m_NextReferenceHashesIndexes[CurrentIndex]; if (NextIndex != ReferenceIndex::None()) { m_ReferenceHashes[CurrentIndex] = IoHash::Zero; ZEN_ASSERT(m_ReferenceCount > 0); m_ReferenceCount--; m_NextReferenceHashesIndexes[CurrentIndex] = ReferenceIndex::None(); } CurrentIndex = NextIndex; } } void ZenCacheDiskLayer::CacheBucket::RemoveReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex) { if (FirstReferenceIndex == ReferenceIndex::Unknown()) { return; } ReferenceIndex CurrentIndex = FirstReferenceIndex; while (CurrentIndex == ReferenceIndex::None()) { m_ReferenceHashes[CurrentIndex] = IoHash::Zero; ZEN_ASSERT(m_ReferenceCount > 0); m_ReferenceCount--; CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex]; } FirstReferenceIndex = {}; } bool ZenCacheDiskLayer::CacheBucket::LockedGetReferences(ReferenceIndex FirstReferenceIndex, std::vector& OutReferences) const { if (FirstReferenceIndex == ReferenceIndex::Unknown()) { return false; } ReferenceIndex CurrentIndex = FirstReferenceIndex; while (CurrentIndex != ReferenceIndex::None()) { ZEN_ASSERT_SLOW(m_ReferenceHashes[CurrentIndex] != IoHash::Zero); OutReferences.push_back(m_ReferenceHashes[CurrentIndex]); CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex]; } return true; } void ZenCacheDiskLayer::CacheBucket::ClearReferenceCache() { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); Reset(m_FirstReferenceIndex); Reset(m_ReferenceHashes); Reset(m_NextReferenceHashesIndexes); m_ReferenceCount = 0; } void ZenCacheDiskLayer::CacheBucket::CompactState(std::vector& Payloads, std::vector& AccessTimes, std::vector& MetaDatas, std::vector& MemCachedPayloads, std::vector& FirstReferenceIndex, IndexMap& Index, RwLock::ExclusiveLockScope& IndexLock) { size_t EntryCount = m_Index.size(); Payloads.reserve(EntryCount); AccessTimes.reserve(EntryCount); if (m_Configuration.EnableReferenceCaching) { FirstReferenceIndex.reserve(EntryCount); } Index.reserve(EntryCount); for (auto It : m_Index) { PayloadIndex EntryIndex = PayloadIndex(Payloads.size()); Payloads.push_back(m_Payloads[It.second]); BucketPayload& Payload = Payloads.back(); AccessTimes.push_back(m_AccessTimes[It.second]); if (Payload.MetaData) { MetaDatas.push_back(m_MetaDatas[Payload.MetaData]); Payload.MetaData = MetaDataIndex(MetaDatas.size() - 1); } if (Payload.MemCached) { MemCachedPayloads.push_back(std::move(m_MemCachedPayloads[Payload.MemCached])); Payload.MemCached = MemCachedIndex(gsl::narrow(MemCachedPayloads.size() - 1)); } if (m_Configuration.EnableReferenceCaching) { FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]); } Index.insert({It.first, EntryIndex}); } m_Index.swap(Index); m_Payloads.swap(Payloads); m_AccessTimes.swap(AccessTimes); m_MetaDatas.swap(MetaDatas); Reset(m_FreeMetaDatas); m_MemCachedPayloads.swap(MemCachedPayloads); Reset(m_FreeMemCachedPayloads); if (m_Configuration.EnableReferenceCaching) { m_FirstReferenceIndex.swap(FirstReferenceIndex); CompactReferences(IndexLock); } } #if ZEN_WITH_TESTS void ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time) { GcClock::Tick TimeTick = Time.time_since_epoch().count(); RwLock::SharedLockScope IndexLock(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) { size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); m_AccessTimes[EntryIndex] = TimeTick; } } #endif // ZEN_WITH_TESTS ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config) : m_Gc(Gc) , m_JobQueue(JobQueue) , m_RootDir(RootDir) , m_Configuration(Config) { } ZenCacheDiskLayer::~ZenCacheDiskLayer() { } bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { ZEN_TRACE_CPU("Z$::Disk::Get"); const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto It = m_Buckets.find(BucketName); if (It != m_Buckets.end()) { Bucket = It->second.get(); } } if (Bucket == nullptr) { // Bucket needs to be opened/created RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { Bucket = It->second.get(); } else { auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; if (!Bucket->OpenOrCreate(BucketPath)) { m_Buckets.erase(InsertResult.first); return false; } } } ZEN_ASSERT(Bucket != nullptr); if (Bucket->Get(HashKey, OutValue)) { TryMemCacheTrim(); return true; } return false; } void ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References) { ZEN_TRACE_CPU("Z$::Disk::Put"); const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto It = m_Buckets.find(BucketName); if (It != m_Buckets.end()) { Bucket = It->second.get(); } } if (Bucket == nullptr) { // New bucket needs to be created RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { Bucket = It->second.get(); } else { auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; try { if (!Bucket->OpenOrCreate(BucketPath)) { ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); m_Buckets.erase(InsertResult.first); return; } } catch (const std::exception& Err) { ZEN_WARN("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); throw; } } } ZEN_ASSERT(Bucket != nullptr); Bucket->Put(HashKey, Value, References); TryMemCacheTrim(); } void ZenCacheDiskLayer::DiscoverBuckets() { DirectoryContent DirContent; GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); // Initialize buckets std::vector BadBucketDirectories; std::vector FoundBucketDirectories; RwLock::ExclusiveLockScope _(m_Lock); for (const std::filesystem::path& BucketPath : DirContent.Directories) { const std::string BucketName = PathToUtf8(BucketPath.stem()); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { continue; } if (IsKnownBadBucketName(BucketName)) { BadBucketDirectories.push_back(BucketPath); continue; } FoundBucketDirectories.push_back(BucketPath); ZEN_INFO("Discovered bucket '{}'", BucketName); } for (const std::filesystem::path& BadBucketPath : BadBucketDirectories) { bool IsOk = false; try { IsOk = DeleteDirectories(BadBucketPath); } catch (std::exception&) { } if (IsOk) { ZEN_INFO("found bad bucket at '{}', deleted contents", BadBucketPath); } else { ZEN_WARN("bad bucket delete failed for '{}'", BadBucketPath); } } RwLock SyncLock; const size_t MaxHwTreadUse = std::thread::hardware_concurrency(); const int WorkerThreadPoolCount = gsl::narrow(Min(MaxHwTreadUse, FoundBucketDirectories.size())); WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::OpenOrCreate"); Latch WorkLatch(1); for (auto& BucketPath : FoundBucketDirectories) { WorkLatch.AddCount(1); Pool.ScheduleWork([&]() { auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); const std::string BucketName = PathToUtf8(BucketPath.stem()); try { std::unique_ptr NewBucket = std::make_unique(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig); CacheBucket* Bucket = nullptr; { RwLock::ExclusiveLockScope __(SyncLock); auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket)); Bucket = InsertResult.first->second.get(); } ZEN_ASSERT(Bucket); if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false)) { ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); { RwLock::ExclusiveLockScope __(SyncLock); m_Buckets.erase(BucketName); } } } catch (const std::exception& Err) { ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); return; } }); } WorkLatch.CountDown(); WorkLatch.Wait(); } bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { RwLock::ExclusiveLockScope _(m_Lock); auto It = m_Buckets.find(std::string(InBucket)); if (It != m_Buckets.end()) { CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); return Bucket.Drop(); } // Make sure we remove the folder even if we don't know about the bucket std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); return MoveAndDeleteDirectory(BucketPath); } bool ZenCacheDiskLayer::Drop() { RwLock::ExclusiveLockScope _(m_Lock); std::vector> Buckets; Buckets.reserve(m_Buckets.size()); while (!m_Buckets.empty()) { const auto& It = m_Buckets.begin(); CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); if (!Bucket.Drop()) { return false; } } return MoveAndDeleteDirectory(m_RootDir); } void ZenCacheDiskLayer::Flush() { std::vector Buckets; Stopwatch Timer; const auto _ = MakeGuard([&] { if (Buckets.empty()) { return; } ZEN_INFO("Flushed {} buckets at '{}' in {}", Buckets.size(), m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); { RwLock::SharedLockScope __(m_Lock); if (m_Buckets.empty()) { return; } Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { CacheBucket* Bucket = Kv.second.get(); Buckets.push_back(Bucket); } } { const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); const int WorkerThreadPoolCount = gsl::narrow(Min(MaxHwTreadUse, Buckets.size())); WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::Flush"); Latch WorkLatch(1); for (auto& Bucket : Buckets) { WorkLatch.AddCount(1); Pool.ScheduleWork([&]() { auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); Bucket->Flush(); }); } WorkLatch.CountDown(); while (!WorkLatch.Wait(1000)) { ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir); } } } void ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); { std::vector> Results; Results.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { #if 1 Results.push_back(Ctx.ThreadPool().EnqueueTask( std::packaged_task{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); #else CacheBucket& Bucket = *Kv.second; Bucket.ScrubStorage(Ctx); #endif } for (auto& Result : Results) { Result.get(); } } } void ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Disk::GatherReferences"); std::vector Buckets; { RwLock::SharedLockScope _(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Buckets.push_back(Kv.second.get()); } } for (CacheBucket* Bucket : Buckets) { Bucket->GatherReferences(GcCtx); } } GcStorageSize ZenCacheDiskLayer::StorageSize() const { GcStorageSize StorageSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { GcStorageSize BucketSize = Kv.second->StorageSize(); StorageSize.DiskSize += BucketSize.DiskSize; StorageSize.MemorySize += BucketSize.MemorySize; } return StorageSize; } ZenCacheDiskLayer::DiskStats ZenCacheDiskLayer::Stats() const { GcStorageSize Size = StorageSize(); ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; { RwLock::SharedLockScope _(m_Lock); Stats.BucketStats.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Stats.BucketStats.emplace_back(NamedBucketStats{.BucketName = Kv.first, .Stats = Kv.second->Stats()}); } } return Stats; } ZenCacheDiskLayer::Info ZenCacheDiskLayer::GetInfo() const { ZenCacheDiskLayer::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration}; { RwLock::SharedLockScope _(m_Lock); Info.BucketNames.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Info.BucketNames.push_back(Kv.first); Info.EntryCount += Kv.second->EntryCount(); GcStorageSize BucketSize = Kv.second->StorageSize(); Info.StorageSize.DiskSize += BucketSize.DiskSize; Info.StorageSize.MemorySize += BucketSize.MemorySize; } } return Info; } std::optional ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()}; } return {}; } void ZenCacheDiskLayer::EnumerateBucketContents(std::string_view Bucket, std::function& Fn) const { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { It->second->EnumerateBucketContents(Fn); } } CacheValueDetails::NamespaceDetails ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const { RwLock::SharedLockScope _(m_Lock); CacheValueDetails::NamespaceDetails Details; if (BucketFilter.empty()) { Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); for (auto& Kv : m_Buckets) { Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter); } } else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) { Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter); } return Details; } void ZenCacheDiskLayer::MemCacheTrim() { ZEN_TRACE_CPU("Z$::Disk::MemCacheTrim"); ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0); const GcClock::TimePoint Now = GcClock::Now(); const GcClock::Tick NowTick = Now.time_since_epoch().count(); const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count(); if (NowTick < NextAllowedTrimTick) { return; } bool Expected = false; if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true)) { return; } // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); m_LastTickMemCacheTrim.store(NextTrimTick); m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this, Now, TrimInterval](JobContext&) { ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); uint64_t StartSize = m_TotalMemCachedSize.load(); Stopwatch Timer; const auto Guard = MakeGuard([&] { uint64_t EndSize = m_TotalMemCachedSize.load(); ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", NiceBytes(StartSize > EndSize ? StartSize - EndSize : 0), NiceBytes(m_TotalMemCachedSize), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); m_IsMemCacheTrimming.store(false); }); const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); std::vector UsageSlots; UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count()); std::vector Buckets; { RwLock::SharedLockScope __(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Buckets.push_back(Kv.second.get()); } } for (CacheBucket* Bucket : Buckets) { Bucket->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots); } uint64_t TotalSize = 0; for (size_t Index = 0; Index < UsageSlots.size(); ++Index) { TotalSize += UsageSlots[Index]; if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) { GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index); MemCacheTrim(Buckets, ExpireTime); break; } } }); } void ZenCacheDiskLayer::MemCacheTrim(std::vector& Buckets, GcClock::TimePoint ExpireTime) { if (m_Configuration.MemCacheTargetFootprintBytes == 0) { return; } RwLock::SharedLockScope __(m_Lock); for (CacheBucket* Bucket : Buckets) { Bucket->MemCacheTrim(ExpireTime); } const GcClock::TimePoint Now = GcClock::Now(); const GcClock::Tick NowTick = Now.time_since_epoch().count(); const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); } #if ZEN_WITH_TESTS void ZenCacheDiskLayer::SetAccessTime(std::string_view InBucket, const IoHash& HashKey, GcClock::TimePoint Time) { const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto It = m_Buckets.find(BucketName); if (It != m_Buckets.end()) { Bucket = It->second.get(); } } if (Bucket == nullptr) { return; } Bucket->SetAccessTime(HashKey, Time); } #endif // ZEN_WITH_TESTS } // namespace zen