// Copyright Epic Games, Inc. All Rights Reserved. #include "zenstore/cache/cachedisklayer.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////// namespace zen { namespace { #pragma pack(push) #pragma pack(1) struct CacheBucketIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; static constexpr uint32_t Version2 = 2; static constexpr uint32_t CurrentVersion = Version2; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t PayloadAlignment = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) { return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } bool IsValid() const { if (Magic != ExpectedMagic) { return false; } if (Checksum != ComputeChecksum(*this)) { return false; } if (PayloadAlignment == 0) { return false; } return true; } }; static_assert(sizeof(CacheBucketIndexHeader) == 32); struct BucketMetaHeader { static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta'; static constexpr uint32_t Version2 = 2; static constexpr uint32_t CurrentVersion = Version2; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t Padding = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const BucketMetaHeader& Header) { return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA); } bool IsValid() const { if (Magic != ExpectedMagic) { return false; } if (Checksum != ComputeChecksum(*this)) { return false; } if (Padding != 0) { return false; } return true; } }; static_assert(sizeof(BucketMetaHeader) == 32); static constexpr uint32_t BlockMetaDataExpectedMagic = 0x61'74'6d'62; // 'bmta'; #pragma pack(pop) ////////////////////////////////////////////////////////////////////////// template void Reset(T& V) { T Tmp; V.swap(Tmp); } const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; const char* MetaExtension = ".meta"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + IndexExtension); } std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + MetaExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + LogExtension); } std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { ZEN_UNUSED(BucketName); return BucketDir / "zen_manifest"; } bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } if (Entry.Location.Reserved != 0) { OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); return false; } if (Entry.Location.GetFlags() & ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); return false; } if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) { return true; } if (Entry.Location.Reserved != 0) { OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); return false; } uint64_t Size = Entry.Location.Size(); if (Size == 0) { OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); return false; } return true; } bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) { int DropIndex = 0; do { if (!std::filesystem::exists(Dir)) { return false; } std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; if (std::filesystem::exists(DroppedBucketPath)) { DropIndex++; continue; } std::error_code Ec; std::filesystem::rename(Dir, DroppedBucketPath, Ec); if (!Ec) { DeleteDirectories(DroppedBucketPath); return true; } // TODO: Do we need to bail at some point? zen::Sleep(100); } while (true); } } // namespace namespace fs = std::filesystem; using namespace std::literals; class BucketManifestSerializer { using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData; using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; public: // We use this to indicate if a on disk bucket needs wiping // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references // to block items. // See: https://github.com/EpicGames/zen/pull/299 static inline const uint32_t CurrentDiskBucketVersion = 1; bool Open(std::filesystem::path ManifestPath) { Manifest = LoadCompactBinaryObject(ManifestPath); return !!Manifest; } Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); } bool IsCurrentVersion(uint32_t& OutVersion) const { OutVersion = Manifest["Version"sv].AsUInt32(0); return OutVersion == CurrentDiskBucketVersion; } void ParseManifest(RwLock::ExclusiveLockScope& BucketLock, ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path ManifestPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, std::vector& Payloads); Oid GenerateNewManifest(std::filesystem::path ManifestPath); IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount); uint64_t GetSidecarSize() const { return sizeof(BucketMetaHeader) + m_ManifestEntryCount * sizeof(ManifestData); } void WriteSidecarFile(RwLock::SharedLockScope& BucketLock, const std::filesystem::path& SidecarPath, uint64_t SnapshotLogPosition, const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, const std::vector& AccessTimes, const std::vector& Payloads, const std::vector& MetaDatas); bool ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock, ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path SidecarPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, std::vector& Payloads); IoBuffer MakeManifest(const Oid& BucketId, ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, std::vector&& AccessTimes, std::vector&& Payloads, std::vector&& MetaDatas); CbObject Manifest; private: CbObject LoadCompactBinaryObject(const fs::path& Path) { FileContents Result = ReadFile(Path); if (!Result.ErrorCode) { IoBuffer Buffer = Result.Flatten(); if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) { return zen::LoadCompactBinaryObject(Buffer); } } return CbObject(); } uint64_t m_ManifestEntryCount = 0; #pragma pack(push) #pragma pack(4) struct ManifestData { uint32_t RawSize; // 4 AccessTime Timestamp; // 4 IoHash RawHash; // 20 IoHash Key; // 20 }; #pragma pack(pop) static_assert(sizeof(ManifestData) == 48); }; void BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& BucketLock, ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path ManifestPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, std::vector& Payloads) { if (Manifest["UsingMetaFile"sv].AsBool()) { ReadSidecarFile(BucketLock, Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads); return; } ZEN_TRACE_CPU("Z$::ParseManifest"); Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); const uint64_t Count = Manifest["Count"sv].AsUInt64(0); std::vector KeysIndexes; KeysIndexes.reserve(Count); CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); for (CbFieldView& KeyView : KeyArray) { if (auto It = Index.find(KeyView.AsHash()); It != Index.end()) { KeysIndexes.push_back(It.value()); } else { KeysIndexes.push_back(PayloadIndex()); } } size_t KeyIndexOffset = 0; CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); for (CbFieldView& TimeStampView : TimeStampArray) { const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; if (KeyIndex) { AccessTimes[KeyIndex] = TimeStampView.AsInt64(); } } KeyIndexOffset = 0; CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); if (RawHashArray.Num() == RawSizeArray.Num()) { auto RawHashIt = RawHashArray.CreateViewIterator(); auto RawSizeIt = RawSizeArray.CreateViewIterator(); while (RawHashIt != CbFieldViewIterator()) { const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; if (KeyIndex) { uint64_t RawSize = RawSizeIt.AsUInt64(); IoHash RawHash = RawHashIt.AsHash(); if ((RawSize != 0 || RawHash != IoHash::Zero) && RawSize <= std::numeric_limits::max()) { BucketPayload& Payload = Payloads[KeyIndex]; Bucket.SetMetaData(BucketLock, Payload, BucketMetaData{.RawSize = static_cast(RawSize), .RawHash = RawHash}); } } RawHashIt++; RawSizeIt++; } } else { ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); } } Oid BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath) { const Oid BucketId = Oid::NewOid(); CbObjectWriter Writer; Writer << "BucketId"sv << BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; Manifest = Writer.Save(); TemporaryFile::SafeWriteFile(ManifestPath, Manifest.GetBuffer().GetView()); return BucketId; } IoBuffer BucketManifestSerializer::MakeManifest(const Oid& BucketId, ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, std::vector&& AccessTimes, std::vector&& Payloads, std::vector&& MetaDatas) { using namespace std::literals; ZEN_TRACE_CPU("Z$::MakeManifest"); size_t ItemCount = Index.size(); // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth // And we don't need to reallocate the underlying buffer in almost every case const size_t EstimatedSizePerItem = 54u; const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); CbObjectWriter Writer(ReserveSize); Writer << "BucketId"sv << BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; if (!Index.empty()) { Writer.AddInteger("Count"sv, gsl::narrow(Index.size())); Writer.BeginArray("Keys"sv); for (auto& Kv : Index) { const IoHash& Key = Kv.first; Writer.AddHash(Key); } Writer.EndArray(); Writer.BeginArray("Timestamps"sv); for (auto& Kv : Index) { GcClock::Tick AccessTime = AccessTimes[Kv.second]; Writer.AddInteger(AccessTime); } Writer.EndArray(); if (!MetaDatas.empty()) { Writer.BeginArray("RawHash"sv); for (auto& Kv : Index) { const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; if (Payload.MetaData) { Writer.AddHash(MetaDatas[Payload.MetaData].RawHash); } else { Writer.AddHash(IoHash::Zero); } } Writer.EndArray(); Writer.BeginArray("RawSize"sv); for (auto& Kv : Index) { const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; if (Payload.MetaData) { Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize); } else { Writer.AddInteger(0); } } Writer.EndArray(); } } Manifest = Writer.Save(); return Manifest.GetBuffer().AsIoBuffer(); } IoBuffer BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount) { m_ManifestEntryCount = EntryCount; CbObjectWriter Writer; Writer << "BucketId"sv << BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; Writer << "Count"sv << EntryCount; Writer << "UsingMetaFile"sv << true; Manifest = Writer.Save(); return Manifest.GetBuffer().AsIoBuffer(); } bool BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock, ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path SidecarPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, std::vector& Payloads) { ZEN_TRACE_CPU("Z$::ReadSidecarFile"); ZEN_ASSERT(AccessTimes.size() == Payloads.size()); std::error_code Ec; BasicFile SidecarFile; SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec); if (Ec) { ZEN_WARN("Failed to read sidecar file '{}'. Reason: '{}'", SidecarPath, Ec.message()); return false; } uint64_t FileSize = SidecarFile.FileSize(); auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); }); if (FileSize < sizeof(BucketMetaHeader)) { ZEN_WARN("Failed to read sidecar file '{}'. Minimum size {} expected, actual size: ", SidecarPath, sizeof(BucketMetaHeader), FileSize); return false; } BasicFileBuffer Sidecar(SidecarFile, 128 * 1024); BucketMetaHeader Header; Sidecar.Read(&Header, sizeof Header, 0); if (!Header.IsValid()) { ZEN_WARN("Failed to read sidecar file '{}'. Header is invalid", SidecarPath); return false; } if (Header.Version != BucketMetaHeader::Version2) { ZEN_WARN("Failed to read sidecar file '{}'. Unsupported version: {}", SidecarPath, Header.Version); return false; } const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData); if (Header.EntryCount > ExpectedEntryCount) { ZEN_WARN( "Failed to read sidecar file '{}'. File is not large enough to hold expected entry count. Header count: {}, file size count: " "{}", SidecarPath, Header.EntryCount, ExpectedEntryCount); return false; } InvalidGuard.Dismiss(); uint64_t RemainingEntryCount = ExpectedEntryCount; uint64_t EntryCount = 0; uint64_t CurrentReadOffset = sizeof(Header); while (RemainingEntryCount--) { const ManifestData* Entry = Sidecar.MakeView(CurrentReadOffset); CurrentReadOffset += sizeof(ManifestData); if (auto It = Index.find(Entry->Key); It != Index.end()) { PayloadIndex PlIndex = It.value(); ZEN_ASSERT(size_t(PlIndex) <= Payloads.size()); ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex]; AccessTimes[PlIndex] = Entry->Timestamp; if (Entry->RawSize && Entry->RawHash != IoHash::Zero) { Bucket.SetMetaData(BucketLock, PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash}); } } EntryCount++; } ZEN_ASSERT(EntryCount == ExpectedEntryCount); return true; } void BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&, const std::filesystem::path& SidecarPath, uint64_t SnapshotLogPosition, const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, const std::vector& AccessTimes, const std::vector& Payloads, const std::vector& MetaDatas) { ZEN_TRACE_CPU("Z$::WriteSidecarFile"); BucketMetaHeader Header; Header.EntryCount = m_ManifestEntryCount; Header.LogPosition = SnapshotLogPosition; Header.Checksum = Header.ComputeChecksum(Header); std::error_code Ec; TemporaryFile SidecarFile; SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath())); } SidecarFile.Write(&Header, sizeof Header, 0); // TODO: make this batching for better performance { uint64_t WriteOffset = sizeof Header; // BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024); std::vector ManifestDataBuffer; const size_t MaxManifestDataBufferCount = Min(Index.size(), 8192u); // 512 Kb ManifestDataBuffer.reserve(MaxManifestDataBufferCount); for (auto& Kv : Index) { const IoHash& Key = Kv.first; const PayloadIndex PlIndex = Kv.second; IoHash RawHash = IoHash::Zero; uint32_t RawSize = 0; if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData) { RawHash = MetaDatas[MetaIndex].RawHash; RawSize = MetaDatas[MetaIndex].RawSize; } ManifestDataBuffer.emplace_back( ManifestData{.RawSize = RawSize, .Timestamp = AccessTimes[PlIndex], .RawHash = RawHash, .Key = Key}); if (ManifestDataBuffer.size() == MaxManifestDataBufferCount) { const uint64_t WriteSize = sizeof(ManifestData) * ManifestDataBuffer.size(); SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset); WriteOffset += WriteSize; ManifestDataBuffer.clear(); ManifestDataBuffer.reserve(MaxManifestDataBufferCount); } } if (ManifestDataBuffer.size() > 0) { SidecarFile.Write(ManifestDataBuffer.data(), sizeof(ManifestData) * ManifestDataBuffer.size(), WriteOffset); } } SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath)); } } ////////////////////////////////////////////////////////////////////////// static const float IndexMinLoadFactor = 0.2f; static const float IndexMaxLoadFactor = 0.7f; ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config) : m_Gc(Gc) , m_OuterCacheMemoryUsage(OuterCacheMemoryUsage) , m_BucketName(std::move(BucketName)) , m_Configuration(Config) , m_BucketId(Oid::Zero) { m_Index.min_load_factor(IndexMinLoadFactor); m_Index.max_load_factor(IndexMaxLoadFactor); if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) { const uint64_t LegacyOverrideSize = 16 * 1024 * 1024; // This is pretty ad hoc but in order to avoid too many individual files // it makes sense to have a different strategy for legacy values m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, LegacyOverrideSize); } else if (m_BucketName == std::string_view("iostorecompression")) { const uint64_t IoStoreDDCOverrideSize = 1024 * 1024; // This is pretty ad hoc but in order to avoid too many individual files // it makes sense to have a different strategy for ddc pak compression stores m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, IoStoreDDCOverrideSize); } m_Gc.AddGcReferencer(*this); } ZenCacheDiskLayer::CacheBucket::~CacheBucket() { m_Gc.RemoveGcReferencer(*this); } bool ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) { using namespace std::literals; ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate"); ZEN_ASSERT(m_IsFlushing.load()); // We want to take the lock here since we register as a GC referencer a construction RwLock::ExclusiveLockScope IndexLock(m_IndexLock); ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir); m_BlocksBasePath = BucketDir / "blocks"; m_BucketDir = BucketDir; CreateDirectories(m_BucketDir); std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); bool IsNew = false; BucketManifestSerializer ManifestReader; if (ManifestReader.Open(ManifestPath)) { m_BucketId = ManifestReader.GetBucketId(); if (m_BucketId == Oid::Zero) { return false; } uint32_t Version = 0; if (ManifestReader.IsCurrentVersion(/* out */ Version) == false) { ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, BucketManifestSerializer::CurrentDiskBucketVersion); IsNew = true; } } else if (AllowCreate) { m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath); IsNew = true; } else { return false; } InitializeIndexFromDisk(IndexLock, IsNew); auto _ = MakeGuard([&]() { // We are now initialized, allow flushing when we exit m_IsFlushing.store(false); }); if (IsNew) { return true; } ManifestReader.ParseManifest(IndexLock, *this, ManifestPath, m_Index, m_AccessTimes, m_Payloads); return true; } void ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition, const std::function& ClaimDiskReserveFunc) { ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot"); const uint64_t LogCount = FlushLockPosition ? 0 : m_SlogFile.GetLogCount(); if (m_LogFlushPosition == LogCount) { return; } ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); const uint64_t EntryCount = m_Index.size(); Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", m_BucketDir, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); try { const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry); std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) { throw std::system_error(Error, fmt::format("get disk space in '{}' FAILED", m_BucketDir)); } bool EnoughSpace = Space.Free >= IndexSize + 1024 * 512; if (!EnoughSpace) { uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); EnoughSpace = (Space.Free + ReclaimedSpace) >= IndexSize + 1024 * 512; } if (!EnoughSpace) { throw std::runtime_error( fmt::format("not enough free disk space in '{}' to save index of size {}", m_BucketDir, NiceBytes(IndexSize))); } TemporaryFile ObjectIndexFile; std::error_code Ec; ObjectIndexFile.CreateTemporary(m_BucketDir, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir)); } { // This is in a separate scope just to ensure IndexWriter goes out // of scope before the file is flushed/closed, in order to ensure // all data is written to the file BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024); CacheBucketIndexHeader Header = {.EntryCount = EntryCount, .LogPosition = LogCount, .PayloadAlignment = gsl::narrow(m_Configuration.PayloadAlignment)}; Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0); uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader); for (auto& Entry : m_Index) { DiskIndexEntry IndexEntry; IndexEntry.Key = Entry.first; IndexEntry.Location = m_Payloads[Entry.second].Location; IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset); IndexWriteOffset += sizeof(DiskIndexEntry); } IndexWriter.Flush(); } ObjectIndexFile.Flush(); ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); if (Ec) { std::filesystem::path TempFilePath = ObjectIndexFile.GetPath(); ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message()); } else { // We must only update the log flush position once the snapshot write succeeds if (FlushLockPosition) { std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); if (std::filesystem::is_regular_file(LogPath)) { if (!std::filesystem::remove(LogPath, Ec) || Ec) { ZEN_WARN("snapshot failed to clean log file '{}', removing index at '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); std::error_code RemoveIndexEc; std::filesystem::remove(IndexPath, RemoveIndexEc); } } } if (!Ec) { m_LogFlushPosition = LogCount; } } } catch (const std::exception& Err) { ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); } } uint64_t ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion) { ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile"); if (!std::filesystem::is_regular_file(IndexPath)) { return 0; } auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); }); BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t FileSize = ObjectIndexFile.FileSize(); if (FileSize < sizeof(CacheBucketIndexHeader)) { return 0; } CacheBucketIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if (!Header.IsValid()) { return 0; } if (Header.Version != CacheBucketIndexHeader::Version2) { return 0; } const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); if (Header.EntryCount > ExpectedEntryCount) { return 0; } InvalidGuard.Dismiss(); size_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); m_Configuration.PayloadAlignment = Header.PayloadAlignment; m_Payloads.reserve(Header.EntryCount); m_Index.reserve(Header.EntryCount); BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024); uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader); uint64_t RemainingEntryCount = Header.EntryCount; std::string InvalidEntryReason; while (RemainingEntryCount--) { const DiskIndexEntry* Entry = FileBuffer.MakeView(CurrentReadOffset); CurrentReadOffset += sizeof(DiskIndexEntry); if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; } const PayloadIndex EntryIndex = PayloadIndex(EntryCount); m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location}); m_Index.insert_or_assign(Entry->Key, EntryIndex); EntryCount++; } ZEN_ASSERT(EntryCount == m_Payloads.size()); m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount())); OutVersion = CacheBucketIndexHeader::Version2; return Header.LogPosition; } uint64_t ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_TRACE_CPU("Z$::Bucket::ReadLog"); if (!std::filesystem::is_regular_file(LogPath)) { return 0; } uint64_t LogEntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (!CasLog.Initialize()) { return 0; } const uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } LogEntryCount = EntryCount - SkipEntryCount; uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const DiskIndexEntry& Record) { std::string InvalidEntryReason; if (Record.Location.Flags & DiskLocation::kTombStone) { // Note: this leaves m_Payloads and other arrays with 'holes' in them m_Index.erase(Record.Key); return; } if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; return; } PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); m_Payloads.emplace_back(BucketPayload{.Location = Record.Location}); m_Index.insert_or_assign(Record.Key, EntryIndex); }, SkipEntryCount); m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); if (InvalidEntryCount) { ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); } return LogEntryCount; }; void ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockScope& IndexLock, const bool IsNew) { ZEN_TRACE_CPU("Z$::Bucket::Initialize"); m_StandaloneSize = 0; m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); m_MetaDatas.clear(); m_FreeMetaDatas.clear(); m_MemCachedPayloads.clear(); m_FreeMemCachedPayloads.clear(); std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); if (IsNew) { fs::remove(LogPath); fs::remove(IndexPath); fs::remove_all(m_BlocksBasePath); } CreateDirectories(m_BucketDir); m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); if (std::filesystem::is_regular_file(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexLock, IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); std::filesystem::remove(IndexPath); } } uint64_t LogEntryCount = 0; if (std::filesystem::is_regular_file(LogPath)) { if (TCasLogFile::IsValid(LogPath)) { LogEntryCount = ReadLog(IndexLock, LogPath, m_LogFlushPosition); } else if (fs::is_regular_file(LogPath)) { ZEN_WARN("removing invalid log at '{}'", LogPath); std::filesystem::remove(LogPath); } } if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0) { WriteIndexSnapshot(IndexLock, /*Flush log*/ true); } m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_Index) { size_t EntryIndex = Entry.second; const BucketPayload& Payload = m_Payloads[EntryIndex]; const DiskLocation& Location = Payload.Location; if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); } else { const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); KnownBlocks.Add(BlockLocation.BlockIndex); } } m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); } void ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const { char HexString[sizeof(HashKey.Hash) * 2]; ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); Path.Append(m_BucketDir); Path.AppendSeparator(); Path.Append(L"blob"); Path.AppendSeparator(); Path.AppendAsciiRange(HexString, HexString + 3); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 3, HexString + 5); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); } IoBuffer ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const { ZEN_TRACE_CPU("Z$::Bucket::GetInlineCacheValue"); BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); IoBuffer Value = m_BlockStore.TryGetChunk(Location); if (Value) { Value.SetContentType(Loc.GetContentType()); } return Value; } IoBuffer ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const { ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue"); ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) { if (Data.GetSize() == Loc.Size()) { Data.SetContentType(Loc.GetContentType()); return Data; } } return {}; } struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle { PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} struct Entry { std::vector HashKeyAndReferences; }; std::vector Buffers; std::vector Entries; std::vector EntryResultIndexes; std::vector& OutResults; }; ZenCacheDiskLayer::CacheBucket::PutBatchHandle* ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector& OutResults) { ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch"); return new PutBatchHandle(OutResults); } void ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { ZEN_TRACE_CPU("Z$::Bucket::EndPutBatch"); try { ZEN_ASSERT(Batch); if (!Batch->Buffers.empty()) { std::vector EntryFlags; for (const IoBuffer& Buffer : Batch->Buffers) { uint8_t Flags = 0; if (Buffer.GetContentType() == ZenContentType::kCbObject) { Flags |= DiskLocation::kStructured; } else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) { Flags |= DiskLocation::kCompressed; } EntryFlags.push_back(Flags); } size_t IndexOffset = 0; m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span Locations) { std::vector DiskEntries; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (size_t Index = 0; Index < Locations.size(); Index++) { DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; ZEN_ASSERT(HashKeyAndReferences.size() > 1); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); if (m_TrackedCacheKeys) { m_TrackedCacheKeys->insert(HashKey); } if (m_TrackedReferences && HashKeyAndReferences.size() > 1) { m_TrackedReferences->insert(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); } if (auto It = m_Index.find(HashKey); It != m_Index.end()) { PayloadIndex EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); BucketPayload& Payload = m_Payloads[EntryIndex]; RemoveMemCachedData(IndexLock, Payload); RemoveMetaData(IndexLock, Payload); Payload = (BucketPayload{.Location = Location}); m_AccessTimes[EntryIndex] = GcClock::TickCount(); } else { PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); m_Payloads.emplace_back(BucketPayload{.Location = Location}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } } } m_SlogFile.Append(DiskEntries); for (size_t Index = 0; Index < Locations.size(); Index++) { size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); Batch->OutResults[ResultIndex] = true; } IndexOffset += Locations.size(); }); } delete Batch; } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ZenCacheDiskLayer::CacheBucket::EndPutBatch: '{}'", Ex.what()); } } struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle { GetBatchHandle(std::vector& OutResults) : OutResults(OutResults) { Keys.reserve(OutResults.capacity()); ResultIndexes.reserve(OutResults.capacity()); } std::vector Keys; std::vector ResultIndexes; std::vector& OutResults; }; ZenCacheDiskLayer::CacheBucket::GetBatchHandle* ZenCacheDiskLayer::CacheBucket::BeginGetBatch(std::vector& OutResult) { ZEN_TRACE_CPU("Z$::Bucket::BeginGetBatch"); return new GetBatchHandle(OutResult); } void ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch"); try { ZEN_ASSERT(Batch); ZEN_ASSERT(Batch->Keys.size() == Batch->ResultIndexes.size()); metrics::RequestStats::Scope StatsScope(m_GetOps, 0); if (!Batch->ResultIndexes.empty()) { std::vector StandaloneDiskLocations; std::vector StandaloneKeyIndexes; std::vector MemCachedKeyIndexes; std::vector InlineDiskLocations; std::vector InlineBlockLocations; std::vector InlineKeyIndexes; std::vector FillRawHashAndRawSize(Batch->Keys.size(), false); { RwLock::SharedLockScope IndexLock(m_IndexLock); for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++) { const IoHash& HashKey = Batch->Keys[KeyIndex]; auto It = m_Index.find(HashKey); if (It != m_Index.end()) { size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; const PayloadIndex PayloadIdx = It.value(); m_AccessTimes[PayloadIdx] = GcClock::TickCount(); const BucketPayload& Payload = m_Payloads[PayloadIdx]; const DiskLocation& Location = Payload.Location; FillRawHashAndRawSize[KeyIndex] = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0); if (Payload.MetaData) { const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData]; OutValue.RawHash = MetaData.RawHash; OutValue.RawSize = MetaData.RawSize; FillRawHashAndRawSize[KeyIndex] = false; } if (Payload.MemCached) { OutValue.Value = m_MemCachedPayloads[Payload.MemCached].Payload; if (FillRawHashAndRawSize[KeyIndex]) { MemCachedKeyIndexes.push_back(KeyIndex); } m_MemoryHitCount++; } else { if (m_Configuration.MemCacheSizeThreshold > 0) { m_MemoryMissCount++; } if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { StandaloneDiskLocations.push_back(Location); StandaloneKeyIndexes.push_back(KeyIndex); } else { InlineDiskLocations.push_back(Location); InlineBlockLocations.emplace_back(Location.GetBlockLocation(m_Configuration.PayloadAlignment)); InlineKeyIndexes.push_back(KeyIndex); } } } } } // MemCached and MetaData are set independently so we need to check if meta data has been set or if we need to set it even // if we found the data as memcached. // Often we will find the metadata due to the thread setting the mem cached part doing it before us so it is worth // checking if it is present once more before spending time fetching and setting the RawHash and RawSize in metadata auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value) { if (!Value) { return; } const IoHash& Key = Batch->Keys[KeyIndex]; size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; OutValue.Value = std::move(Value); OutValue.Value.SetContentType(Location.GetContentType()); bool AddToMemCache = false; bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex]; if (m_Configuration.MemCacheSizeThreshold > 0) { size_t ValueSize = OutValue.Value.GetSize(); if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) { OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); AddToMemCache = true; } } if (SetMetaInfo) { // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the // metadata separately, check if it had time to set the metdata RwLock::SharedLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { BucketPayload& Payload = m_Payloads[UpdateIt->second]; if (Payload.MetaData) { const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData]; OutValue.RawHash = MetaData.RawHash; OutValue.RawSize = MetaData.RawSize; SetMetaInfo = false; } } } if (SetMetaInfo) { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData"); if (Location.IsFlagSet(DiskLocation::kCompressed)) { if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) { OutValue = ZenCacheValue{}; AddToMemCache = false; SetMetaInfo = false; } } else { OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); OutValue.RawSize = OutValue.Value.GetSize(); } if (OutValue.RawSize > std::numeric_limits::max()) { SetMetaInfo = false; } } if (SetMetaInfo || AddToMemCache) { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCache"); RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); { if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { BucketPayload& Payload = m_Payloads[UpdateIt->second]; // Only update if it has not already been updated by other thread if (!Payload.MetaData && SetMetaInfo) { SetMetaData(UpdateIndexLock, Payload, {.RawSize = gsl::narrow(OutValue.RawSize), .RawHash = OutValue.RawHash}); } if (!Payload.MemCached && AddToMemCache) { SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); } } } } }; // We don't want to read into memory if they are to big since we might only want to touch the compressed // header before sending it along if (!InlineDiskLocations.empty()) { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline"); m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span ChunkIndexes) -> bool { const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 32u * 1024u); m_BlockStore.IterateBlock( InlineBlockLocations, ChunkIndexes, [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { if (Data != nullptr) { FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], IoBufferBuilder::MakeCloneFromMemory(Data, Size)); } return true; }, [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size)); return true; }, LargeChunkSizeLimit); return true; }); } if (!StandaloneDiskLocations.empty()) { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadStandalone"); for (size_t Index = 0; Index < StandaloneDiskLocations.size(); Index++) { size_t KeyIndex = StandaloneKeyIndexes[Index]; const DiskLocation& Location = StandaloneDiskLocations[Index]; FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex])); } } if (!MemCachedKeyIndexes.empty()) { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCached"); for (size_t KeyIndex : MemCachedKeyIndexes) { const IoHash& Key = Batch->Keys[KeyIndex]; bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex]; ZEN_ASSERT(SetMetaInfo); ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData"); size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; { // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the // metadata separately, check if it had time to set the metdata RwLock::SharedLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { BucketPayload& Payload = m_Payloads[UpdateIt->second]; if (Payload.MetaData) { const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData]; OutValue.RawHash = MetaData.RawHash; OutValue.RawSize = MetaData.RawSize; SetMetaInfo = false; } } } if (SetMetaInfo) { if (OutValue.Value.GetContentType() == ZenContentType::kCompressedBinary) { if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) { OutValue = ZenCacheValue{}; } } else { OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); OutValue.RawSize = OutValue.Value.GetSize(); } if (OutValue.RawSize <= std::numeric_limits::max()) { RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); { if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { BucketPayload& Payload = m_Payloads[UpdateIt->second]; // Only update if it has not already been updated by other thread if (!Payload.MetaData) { SetMetaData(UpdateIndexLock, Payload, {.RawSize = static_cast(OutValue.RawSize), .RawHash = OutValue.RawHash}); } } } } } } } for (size_t ResultIndex : Batch->ResultIndexes) { bool Hit = !!Batch->OutResults[ResultIndex].Value; if (Hit) { m_DiskHitCount++; StatsScope.SetBytes(Batch->OutResults[ResultIndex].Value.GetSize()); } else { m_DiskMissCount++; if (m_Configuration.MemCacheSizeThreshold > 0) { m_MemoryMissCount++; } } } } } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ZenCacheDiskLayer::CacheBucket::EndGetBatch: '{}'", Ex.what()); } } void ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, GetBatchHandle& BatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Get(Batched)"); BatchHandle.Keys.push_back(HashKey); BatchHandle.ResultIndexes.push_back(BatchHandle.OutResults.size()); BatchHandle.OutResults.push_back({}); } bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { ZEN_TRACE_CPU("Z$::Bucket::Get"); metrics::RequestStats::Scope StatsScope(m_GetOps, 0); RwLock::SharedLockScope IndexLock(m_IndexLock); auto It = m_Index.find(HashKey); if (It == m_Index.end()) { m_DiskMissCount++; if (m_Configuration.MemCacheSizeThreshold > 0) { m_MemoryMissCount++; } return false; } PayloadIndex EntryIndex = It.value(); m_AccessTimes[EntryIndex] = GcClock::TickCount(); DiskLocation Location = m_Payloads[EntryIndex].Location; bool FillRawHashAndRawSize = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0); const BucketPayload* Payload = &m_Payloads[EntryIndex]; if (Payload->MetaData) { const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; OutValue.RawHash = MetaData.RawHash; OutValue.RawSize = MetaData.RawSize; FillRawHashAndRawSize = false; } if (Payload->MemCached) { OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload; Payload = nullptr; IndexLock.ReleaseNow(); m_MemoryHitCount++; } else { Payload = nullptr; IndexLock.ReleaseNow(); if (m_Configuration.MemCacheSizeThreshold > 0) { m_MemoryMissCount++; } if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { OutValue.Value = GetStandaloneCacheValue(Location, HashKey); } else { OutValue.Value = GetInlineCacheValue(Location); if (m_Configuration.MemCacheSizeThreshold > 0) { size_t ValueSize = OutValue.Value.GetSize(); if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) { ZEN_TRACE_CPU("Z$::Bucket::Get::MemCache"); OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end()) { BucketPayload& WritePayload = m_Payloads[UpdateIt->second]; // Only update if it has not already been updated by other thread if (!WritePayload.MemCached) { SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); } } } } } } if (FillRawHashAndRawSize) { ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData"); if (Location.IsFlagSet(DiskLocation::kCompressed)) { if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) { OutValue = ZenCacheValue{}; m_DiskMissCount++; return false; } } else { OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); OutValue.RawSize = OutValue.Value.GetSize(); } if (OutValue.RawSize <= std::numeric_limits::max()) { RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) { BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; // Only set if no other path has already updated the meta data if (!WritePayload.MetaData) { SetMetaData(UpdateIndexLock, WritePayload, {.RawSize = static_cast(OutValue.RawSize), .RawHash = OutValue.RawHash}); } } } } if (OutValue.Value) { m_DiskHitCount++; StatsScope.SetBytes(OutValue.Value.GetSize()); return true; } else { m_DiskMissCount++; return false; } } void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { PutStandaloneCacheValue(HashKey, Value, References); if (OptionalBatchHandle) { OptionalBatchHandle->OutResults.push_back(true); } } else { PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle); } m_DiskWriteCount++; } uint64_t ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) { ZEN_TRACE_CPU("Z$::Bucket::MemCacheTrim"); uint64_t Trimmed = 0; GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); uint32_t MemCachedCount = gsl::narrow(m_MemCachedPayloads.size()); if (MemCachedCount == 0) { return 0; } uint32_t WriteIndex = 0; for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) { MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; if (!Data.Payload) { continue; } PayloadIndex Index = Data.OwnerIndex; ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); GcClock::Tick AccessTime = m_AccessTimes[Index]; if (AccessTime < ExpireTicks) { size_t PayloadSize = Data.Payload.GetSize(); RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); Data = {}; m_Payloads[Index].MemCached = {}; Trimmed += PayloadSize; continue; } if (ReadIndex > WriteIndex) { m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index}; m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex); } WriteIndex++; } m_MemCachedPayloads.resize(WriteIndex); m_MemCachedPayloads.shrink_to_fit(); zen::Reset(m_FreeMemCachedPayloads); return Trimmed; } void ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector& InOutUsageSlots) { ZEN_TRACE_CPU("Z$::Bucket::GetUsageByAccess"); size_t SlotCount = InOutUsageSlots.capacity(); RwLock::SharedLockScope _(m_IndexLock); uint32_t MemCachedCount = gsl::narrow(m_MemCachedPayloads.size()); if (MemCachedCount == 0) { return; } for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) { MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; if (!Data.Payload) { continue; } PayloadIndex Index = Data.OwnerIndex; ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0); size_t Slot = Age < MaxAge ? gsl::narrow((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1); ZEN_ASSERT_SLOW(Slot < SlotCount); if (Slot >= InOutUsageSlots.size()) { InOutUsageSlots.resize(Slot + 1, 0); } InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize()); } } bool ZenCacheDiskLayer::CacheBucket::Drop() { ZEN_TRACE_CPU("Z$::Bucket::Drop"); RwLock::ExclusiveLockScope _(m_IndexLock); std::vector> ShardLocks; ShardLocks.reserve(256); for (RwLock& Lock : m_ShardedLocks) { ShardLocks.push_back(std::make_unique(Lock)); } m_BlockStore.Close(); m_SlogFile.Close(); bool Deleted = MoveAndDeleteDirectory(m_BucketDir); m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); m_MetaDatas.clear(); m_FreeMetaDatas.clear(); m_MemCachedPayloads.clear(); m_FreeMemCachedPayloads.clear(); m_StandaloneSize.store(0); m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load()); m_MemCachedSize.store(0); return Deleted; } void ZenCacheDiskLayer::CacheBucket::Flush() { ZEN_TRACE_CPU("Z$::Bucket::Flush"); bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { return; } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); ZEN_INFO("Flushing bucket {}", m_BucketDir); try { m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); SaveSnapshot(); } catch (const std::exception& Ex) { ZEN_WARN("Failed to flush bucket in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); } } void ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function& ClaimDiskReserveFunc) { ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot"); try { bool UseLegacyScheme = false; IoBuffer Buffer; BucketManifestSerializer ManifestWriter; if (UseLegacyScheme) { std::vector AccessTimes; std::vector Payloads; std::vector MetaDatas; IndexMap Index; { RwLock::SharedLockScope IndexLock(m_IndexLock); WriteIndexSnapshot(IndexLock, /*Flush log*/ false); // Note: this copy could be eliminated on shutdown to // reduce memory usage and execution time Index = m_Index; Payloads = m_Payloads; AccessTimes = m_AccessTimes; MetaDatas = m_MetaDatas; } Buffer = ManifestWriter.MakeManifest(m_BucketId, std::move(Index), std::move(AccessTimes), std::move(Payloads), std::move(MetaDatas)); const uint64_t RequiredSpace = Buffer.GetSize() + 1024 * 512; std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) { ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); return; } bool EnoughSpace = Space.Free >= RequiredSpace; if (!EnoughSpace) { uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; } if (!EnoughSpace) { ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", m_BucketDir, NiceBytes(Buffer.GetSize())); return; } } else { RwLock::SharedLockScope IndexLock(m_IndexLock); WriteIndexSnapshot(IndexLock, /*Flush log*/ false); const uint64_t EntryCount = m_Index.size(); Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); uint64_t SidecarSize = ManifestWriter.GetSidecarSize(); const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512; std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) { ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); return; } bool EnoughSpace = Space.Free >= RequiredSpace; if (!EnoughSpace) { uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; } if (!EnoughSpace) { ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", m_BucketDir, NiceBytes(Buffer.GetSize())); return; } ManifestWriter.WriteSidecarFile(IndexLock, GetMetaPath(m_BucketDir, m_BucketName), m_LogFlushPosition, m_Index, m_AccessTimes, m_Payloads, m_MetaDatas); } std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); TemporaryFile::SafeWriteFile(ManifestPath, Buffer.GetView()); } catch (const std::exception& Err) { ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); } } void ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::Bucket::Scrub"); ZEN_INFO("scrubbing '{}'", m_BucketDir); Stopwatch Timer; std::atomic_uint64_t ChunkCount = 0; std::atomic_uint64_t VerifiedChunkBytes = 0; auto LogStats = MakeGuard([&] { const uint32_t DurationMs = gsl::narrow(Timer.GetElapsedTimeMs()); ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", m_BucketName, NiceBytes(VerifiedChunkBytes.load()), NiceTimeSpanMs(DurationMs), ChunkCount.load(), NiceRate(VerifiedChunkBytes, DurationMs)); }); RwLock BadKeysLock; std::vector BadKeys; auto ReportBadKey = [&](const IoHash& Key) { BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Key); }); }; try { std::vector ChunkLocations; std::vector ChunkIndexToChunkHash; RwLock::SharedLockScope _(m_IndexLock); const size_t BlockChunkInitialCount = m_Index.size() / 4; ChunkLocations.reserve(BlockChunkInitialCount); ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); // Do a pass over the index and verify any standalone file values straight away // all other storage classes are gathered and verified in bulk in order to enable // more efficient I/O scheduling for (auto& Kv : m_Index) { const IoHash& HashKey = Kv.first; const BucketPayload& Payload = m_Payloads[Kv.second]; const DiskLocation& Loc = Payload.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { Ctx.ThrowIfDeadlineExpired(); ChunkCount.fetch_add(1); VerifiedChunkBytes.fetch_add(Loc.Size()); if (Loc.GetContentType() == ZenContentType::kBinary) { // Blob cache value, not much we can do about data integrity checking // here since there's no hash available ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); std::error_code Ec; uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); if (Ec) { ReportBadKey(HashKey); } if (size != Loc.Size()) { ReportBadKey(HashKey); } continue; } else { // Structured cache value IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); if (!Buffer) { ReportBadKey(HashKey); continue; } if (!ValidateIoBuffer(Loc.GetContentType(), Buffer)) { ReportBadKey(HashKey); continue; } } } else { ChunkLocations.emplace_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment)); ChunkIndexToChunkHash.push_back(HashKey); continue; } } const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { ChunkCount.fetch_add(1); VerifiedChunkBytes.fetch_add(Size); const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; if (!Data) { // ChunkLocation out of range of stored blocks ReportBadKey(Hash); return true; } if (!Size) { ReportBadKey(Hash); return true; } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); if (!Buffer) { ReportBadKey(Hash); return true; } const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); if (!ValidateIoBuffer(ContentType, Buffer)) { ReportBadKey(Hash); return true; } return true; }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { Ctx.ThrowIfDeadlineExpired(); ChunkCount.fetch_add(1); VerifiedChunkBytes.fetch_add(Size); const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); if (!Buffer) { ReportBadKey(Hash); return true; } const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); if (!ValidateIoBuffer(ContentType, Buffer)) { ReportBadKey(Hash); return true; } return true; }; m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span ChunkIndexes) { return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk); }); } catch (ScrubDeadlineExpiredException&) { ZEN_INFO("Scrubbing deadline expired, operation incomplete"); } Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes); if (!BadKeys.empty()) { ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir); if (Ctx.RunRecovery()) { // Deal with bad chunks by removing them from our lookup map std::vector LogEntries; LogEntries.reserve(BadKeys.size()); { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (const IoHash& BadKey : BadKeys) { // Log a tombstone and delete the in-memory index for the bad entry const auto It = m_Index.find(BadKey); BucketPayload& Payload = m_Payloads[It->second]; DiskLocation Location = Payload.Location; if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed); } RemoveMemCachedData(IndexLock, Payload); RemoveMetaData(IndexLock, Payload); Location.Flags |= DiskLocation::kTombStone; LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); m_Index.erase(BadKey); } } for (const DiskIndexEntry& Entry : LogEntries) { if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) { ExtendablePathBuilder<256> Path; BuildPath(Path, Entry.Key); fs::path FilePath = Path.ToPath(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); if (fs::is_regular_file(FilePath)) { ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); std::error_code Ec; fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... } } } m_SlogFile.Append(LogEntries); // Clean up m_AccessTimes and m_Payloads vectors { std::vector Payloads; std::vector AccessTimes; std::vector MetaDatas; std::vector MemCachedPayloads; IndexMap Index; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index); } } } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do if (!BadKeys.empty()) { Ctx.ReportBadCidChunks(BadKeys); } } void ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Bucket::GatherReferences"); #define CALCULATE_BLOCKING_TIME 0 #if CALCULATE_BLOCKING_TIME uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; #endif // CALCULATE_BLOCKING_TIME Stopwatch TotalTimer; const auto _ = MakeGuard([&] { #if CALCULATE_BLOCKING_TIME ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), NiceLatencyNs(ReadBlockTimeUs), NiceLatencyNs(ReadBlockLongestTimeUs)); #else ZEN_DEBUG("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); #endif // CALCULATE_BLOCKING_TIME }); const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime(); const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); IndexMap Index; std::vector AccessTimes; std::vector Payloads; { RwLock::SharedLockScope __(m_IndexLock); #if CALCULATE_BLOCKING_TIME Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME if (m_Index.empty()) { return; } Index = m_Index; AccessTimes = m_AccessTimes; Payloads = m_Payloads; } std::vector ExpiredKeys; ExpiredKeys.reserve(1024); std::vector Cids; if (!GcCtx.SkipCid()) { Cids.reserve(1024); } std::vector> StructuredItemsWithUnknownAttachments; for (const auto& Entry : Index) { const IoHash& Key = Entry.first; size_t PayloadIndex = Entry.second; GcClock::Tick AccessTime = AccessTimes[PayloadIndex]; if (AccessTime < ExpireTicks) { ExpiredKeys.push_back(Key); continue; } if (GcCtx.SkipCid()) { continue; } BucketPayload& Payload = Payloads[PayloadIndex]; const DiskLocation& Loc = Payload.Location; if (!Loc.IsFlagSet(DiskLocation::kStructured)) { continue; } StructuredItemsWithUnknownAttachments.push_back(Entry); } for (const auto& Entry : StructuredItemsWithUnknownAttachments) { const IoHash& Key = Entry.first; BucketPayload& Payload = Payloads[Entry.second]; const DiskLocation& Loc = Payload.Location; { IoBuffer Buffer; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer) { continue; } } else { RwLock::SharedLockScope IndexLock(m_IndexLock); #if CALCULATE_BLOCKING_TIME Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Key); It != m_Index.end()) { const BucketPayload& CachedPayload = m_Payloads[It->second]; if (CachedPayload.MemCached) { Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload; ZEN_ASSERT_SLOW(Buffer); } else { DiskLocation Location = m_Payloads[It->second].Location; IndexLock.ReleaseNow(); Buffer = GetInlineCacheValue(Location); // Don't memcache items when doing GC } } if (!Buffer) { continue; } } ZEN_ASSERT(Buffer); ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); CbObjectView Obj(Buffer.GetData()); Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); if (Cids.size() >= 1024) { GcCtx.AddRetainedCids(Cids); Cids.clear(); } } } GcCtx.AddRetainedCids(Cids); GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); } void ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage"); ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); Stopwatch TotalTimer; uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; uint64_t TotalChunkCount = 0; uint64_t DeletedSize = 0; GcStorageSize OldTotalSize = StorageSize(); std::unordered_set DeletedChunks; uint64_t MovedCount = 0; const auto _ = MakeGuard([&] { ZEN_DEBUG( "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " "{} " "of {} " "entries ({}/{}).", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), NiceLatencyNs(ReadBlockTimeUs), NiceLatencyNs(ReadBlockLongestTimeUs), NiceBytes(DeletedSize), DeletedChunks.size(), MovedCount, TotalChunkCount, NiceBytes(OldTotalSize.DiskSize), NiceBytes(OldTotalSize.MemorySize)); bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { return; } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); try { SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); } catch (const std::exception& Ex) { ZEN_WARN("Failed to write index and manifest after GC in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); } }); auto __ = MakeGuard([&]() { if (!DeletedChunks.empty()) { // Clean up m_AccessTimes and m_Payloads vectors std::vector Payloads; std::vector AccessTimes; std::vector MetaDatas; std::vector MemCachedPayloads; IndexMap Index; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index); } GcCtx.AddDeletedCids(std::vector(DeletedChunks.begin(), DeletedChunks.end())); } }); std::span ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); if (ExpiredCacheKeySpan.empty()) { return; } m_SlogFile.Flush(); std::unordered_set ExpiredCacheKeys(ExpiredCacheKeySpan.begin(), ExpiredCacheKeySpan.end()); std::vector ExpiredStandaloneEntries; IndexMap IndexSnapshot; std::vector PayloadsSnapshot; BlockStore::ReclaimSnapshotState BlockStoreState; { bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is currently flushing", m_BucketDir); return; } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); { ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::State"); RwLock::SharedLockScope IndexLock(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); for (const IoHash& Key : ExpiredCacheKeys) { if (auto It = m_Index.find(Key); It != m_Index.end()) { const BucketPayload& Payload = m_Payloads[It->second]; if (Payload.Location.Flags & DiskLocation::kStandaloneFile) { DiskIndexEntry Entry = {.Key = Key, .Location = Payload.Location}; Entry.Location.Flags |= DiskLocation::kTombStone; ExpiredStandaloneEntries.push_back(Entry); } } } PayloadsSnapshot = m_Payloads; IndexSnapshot = m_Index; if (GcCtx.IsDeletionMode()) { IndexLock.ReleaseNow(); RwLock::ExclusiveLockScope __(m_IndexLock); for (const auto& Entry : ExpiredStandaloneEntries) { if (m_Index.erase(Entry.Key) == 1) { m_StandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); DeletedChunks.insert(Entry.Key); } } m_SlogFile.Append(ExpiredStandaloneEntries); } } } if (GcCtx.IsDeletionMode()) { ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::Delete"); ExtendablePathBuilder<256> Path; for (const auto& Entry : ExpiredStandaloneEntries) { const IoHash& Key = Entry.Key; Path.Reset(); BuildPath(Path, Key); fs::path FilePath = Path.ToPath(); { RwLock::SharedLockScope IndexLock(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); if (m_Index.contains(Key)) { // Someone added it back, let the file on disk be ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); continue; } IndexLock.ReleaseNow(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); if (fs::is_regular_file(FilePath)) { ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); std::error_code Ec; fs::remove(FilePath, Ec); if (Ec) { ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); continue; } } } DeletedSize += Entry.Location.Size(); } } TotalChunkCount = IndexSnapshot.size(); std::vector ChunkLocations; BlockStore::ChunkIndexArray KeepChunkIndexes; std::vector ChunkIndexToChunkHash; ChunkLocations.reserve(TotalChunkCount); ChunkLocations.reserve(TotalChunkCount); ChunkIndexToChunkHash.reserve(TotalChunkCount); { TotalChunkCount = 0; for (const auto& Entry : IndexSnapshot) { size_t EntryIndex = Entry.second; const DiskLocation& DiskLocation = PayloadsSnapshot[EntryIndex].Location; if (DiskLocation.Flags & DiskLocation::kStandaloneFile) { continue; } const IoHash& Key = Entry.first; BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment); size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); ChunkIndexToChunkHash.push_back(Key); if (ExpiredCacheKeys.contains(Key)) { continue; } KeepChunkIndexes.push_back(ChunkIndex); } } TotalChunkCount = ChunkLocations.size(); size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, true); GcStorageSize CurrentTotalSize = StorageSize(); ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} ({}/{})", m_BucketDir, DeleteCount, TotalChunkCount, NiceBytes(CurrentTotalSize.DiskSize), NiceBytes(CurrentTotalSize.MemorySize)); return; } m_BlockStore.ReclaimSpace( BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, false, [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { std::vector LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); for (const auto& Entry : MovedChunks) { size_t ChunkIndex = Entry.first; const BlockStoreLocation& NewLocation = Entry.second; const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; size_t EntryIndex = m_Index[ChunkHash]; BucketPayload& Payload = m_Payloads[EntryIndex]; if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != m_Payloads[EntryIndex].Location) { // Entry has been updated while GC was running, ignore the move continue; } Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); LogEntries.push_back({.Key = ChunkHash, .Location = Payload.Location}); } for (const size_t ChunkIndex : RemovedChunks) { const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; size_t EntryIndex = m_Index[ChunkHash]; BucketPayload& Payload = m_Payloads[EntryIndex]; if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != Payload.Location) { // Entry has been updated while GC was running, ignore the delete continue; } const DiskLocation& OldDiskLocation = Payload.Location; LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment), m_Configuration.PayloadAlignment, OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); RemoveMemCachedData(IndexLock, Payload); RemoveMetaData(IndexLock, Payload); m_Index.erase(ChunkHash); DeletedChunks.insert(ChunkHash); } } m_SlogFile.Append(LogEntries); m_SlogFile.Flush(); }, [&]() { return GcCtx.ClaimGCReserve(); }); } ZenCacheDiskLayer::BucketStats ZenCacheDiskLayer::CacheBucket::Stats() { GcStorageSize Size = StorageSize(); return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize, .DiskHitCount = m_DiskHitCount, .DiskMissCount = m_DiskMissCount, .DiskWriteCount = m_DiskWriteCount, .MemoryHitCount = m_MemoryHitCount, .MemoryMissCount = m_MemoryMissCount, .MemoryWriteCount = m_MemoryWriteCount, .PutOps = m_PutOps.Snapshot(), .GetOps = m_GetOps.Snapshot()}; } uint64_t ZenCacheDiskLayer::CacheBucket::EntryCount() const { RwLock::SharedLockScope _(m_IndexLock); return static_cast(m_Index.size()); } CacheValueDetails::ValueDetails ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const IoHash& Key, PayloadIndex Index) const { std::vector Attachments; const BucketPayload& Payload = m_Payloads[Index]; if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) { IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location, Key) : GetInlineCacheValue(Payload.Location); CbObjectView Obj(Value.GetData()); Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); } BucketMetaData MetaData = GetMetaData(IndexLock, Payload); return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), .RawSize = MetaData.RawSize, .RawHash = MetaData.RawHash, .LastAccess = m_AccessTimes[Index], .Attachments = std::move(Attachments), .ContentType = Payload.Location.GetContentType()}; } CacheValueDetails::BucketDetails ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const { CacheValueDetails::BucketDetails Details; RwLock::SharedLockScope _(m_IndexLock); if (ValueFilter.empty()) { Details.Values.reserve(m_Index.size()); for (const auto& It : m_Index) { Details.Values.insert_or_assign(It.first, GetValueDetails(IndexLock, It.first, It.second)); } } else { IoHash Key = IoHash::FromHexString(ValueFilter); if (auto It = m_Index.find(Key); It != m_Index.end()) { Details.Values.insert_or_assign(It->first, GetValueDetails(IndexLock, It->first, It->second)); } } return Details; } void ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents( std::function& Fn) const { RwLock::SharedLockScope IndexLock(m_IndexLock); for (const auto& It : m_Index) { CacheValueDetails::ValueDetails Vd = GetValueDetails(IndexLock, It.first, It.second); Fn(It.first, Vd); } } void ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::CollectGarbage"); std::vector Buckets; { RwLock::SharedLockScope _(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Buckets.push_back(Kv.second.get()); } } for (CacheBucket* Bucket : Buckets) { Bucket->CollectGarbage(GcCtx); } if (!m_IsMemCacheTrimming) { MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); } } void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References) { ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue"); uint64_t NewFileSize = Value.Value.Size(); TemporaryFile DataFile; std::error_code Ec; DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); } bool CleanUpTempFile = true; auto __ = MakeGuard([&] { if (CleanUpTempFile) { std::error_code Ec; std::filesystem::remove(DataFile.GetPath(), Ec); if (Ec) { ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", DataFile.GetPath(), m_BucketDir, Ec.message()); } } }); DataFile.WriteAll(Value.Value, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", NiceBytes(NewFileSize), DataFile.GetPath().string(), m_BucketDir)); } ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); std::filesystem::path FsPath{DataFilePath.ToPath()}; RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); // We do a speculative remove of the file instead of probing with a exists call and check the error code instead std::filesystem::remove(FsPath, Ec); if (Ec) { if (Ec.value() != ENOENT) { ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); Sleep(100); Ec.clear(); std::filesystem::remove(FsPath, Ec); if (Ec && Ec.value() != ENOENT) { throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); } } } // Assume parent directory exists DataFile.MoveTemporaryIntoPlace(FsPath, Ec); if (Ec) { CreateDirectories(FsPath.parent_path()); // Try again after we or someone else created the directory Ec.clear(); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); // Retry if we still fail to handle contention to file system uint32_t RetriesLeft = 3; while (Ec && RetriesLeft > 0) { ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retries left: {}.", FsPath, DataFile.GetPath(), m_BucketDir, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms Ec.clear(); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); RetriesLeft--; } if (Ec) { throw std::system_error( Ec, fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir)); } } // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file // will be disabled as the file handle has already been closed CleanUpTempFile = false; uint8_t EntryFlags = DiskLocation::kStandaloneFile; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } DiskLocation Loc(NewFileSize, EntryFlags); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); ValueLock.ReleaseNow(); if (m_TrackedCacheKeys) { m_TrackedCacheKeys->insert(HashKey); } if (m_TrackedReferences) { m_TrackedReferences->insert(References.begin(), References.end()); } PayloadIndex EntryIndex = {}; if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object EntryIndex = PayloadIndex(m_Payloads.size()); m_Payloads.emplace_back(BucketPayload{.Location = Loc}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } else { EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); BucketPayload& Payload = m_Payloads[EntryIndex]; uint64_t OldSize = Payload.Location.Size(); Payload = BucketPayload{.Location = Loc}; m_AccessTimes[EntryIndex] = GcClock::TickCount(); RemoveMemCachedData(IndexLock, Payload); m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed); } if ((Value.RawSize != 0 || Value.RawHash != IoHash::Zero) && Value.RawSize <= std::numeric_limits::max()) { SetMetaData(IndexLock, m_Payloads[EntryIndex], {.RawSize = static_cast(Value.RawSize), .RawHash = Value.RawHash}); } else { RemoveMetaData(IndexLock, m_Payloads[EntryIndex]); } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); m_StandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); } void ZenCacheDiskLayer::CacheBucket::SetMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData) { if (Payload.MetaData) { m_MetaDatas[Payload.MetaData] = MetaData; } else { if (m_FreeMetaDatas.empty()) { Payload.MetaData = MetaDataIndex(m_MetaDatas.size()); m_MetaDatas.emplace_back(MetaData); } else { Payload.MetaData = m_FreeMetaDatas.back(); m_FreeMetaDatas.pop_back(); m_MetaDatas[Payload.MetaData] = MetaData; } } } void ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload) { if (Payload.MetaData) { m_FreeMetaDatas.push_back(Payload.MetaData); Payload.MetaData = {}; } } void ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData) { BucketPayload& Payload = m_Payloads[PayloadIndex]; uint64_t PayloadSize = MemCachedData.GetSize(); ZEN_ASSERT(PayloadSize != 0); if (m_FreeMemCachedPayloads.empty()) { if (m_MemCachedPayloads.size() != std::numeric_limits::max()) { Payload.MemCached = MemCachedIndex(gsl::narrow(m_MemCachedPayloads.size())); m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}); AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } } else { Payload.MemCached = m_FreeMemCachedPayloads.back(); m_FreeMemCachedPayloads.pop_back(); m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}; AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } } size_t ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload) { if (Payload.MemCached) { size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize(); RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemCachedPayloads[Payload.MemCached] = {}; m_FreeMemCachedPayloads.push_back(Payload.MemCached); Payload.MemCached = {}; return PayloadSize; } return 0; } ZenCacheDiskLayer::CacheBucket::BucketMetaData ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const { if (Payload.MetaData) { return m_MetaDatas[Payload.MetaData]; } return {}; } void ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); if (OptionalBatchHandle != nullptr) { OptionalBatchHandle->Buffers.push_back(Value.Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); OptionalBatchHandle->OutResults.push_back(false); std::vector& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); HashKeyAndReferences.push_back(HashKey); HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end()); return; } uint8_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_Configuration.PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation"); DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); m_SlogFile.Append({.Key = HashKey, .Location = Location}); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (m_TrackedCacheKeys) { m_TrackedCacheKeys->insert(HashKey); } if (m_TrackedReferences) { m_TrackedReferences->insert(References.begin(), References.end()); } if (auto It = m_Index.find(HashKey); It != m_Index.end()) { PayloadIndex EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); BucketPayload& Payload = m_Payloads[EntryIndex]; RemoveMemCachedData(IndexLock, Payload); RemoveMetaData(IndexLock, Payload); Payload = (BucketPayload{.Location = Location}); m_AccessTimes[EntryIndex] = GcClock::TickCount(); } else { PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); m_Payloads.emplace_back(BucketPayload{.Location = Location}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } }); } std::string ZenCacheDiskLayer::CacheBucket::GetGcName(GcCtx&) { return fmt::format("cachebucket: '{}'", m_BucketDir.string()); } class DiskBucketStoreCompactor : public GcStoreCompactor { using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; using CacheBucket = ZenCacheDiskLayer::CacheBucket; public: DiskBucketStoreCompactor(CacheBucket& Bucket, std::vector>&& ExpiredStandaloneKeys) : m_Bucket(Bucket) , m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys)) { m_ExpiredStandaloneKeys.shrink_to_fit(); } virtual ~DiskBucketStoreCompactor() {} virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function& ClaimDiskReserveCallback) override { ZEN_TRACE_CPU("Z$::Bucket::CompactStore"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { Reset(m_ExpiredStandaloneKeys); if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': RemovedDisk: {} in {}", m_Bucket.m_BucketDir, NiceBytes(Stats.RemovedDisk), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); if (!m_ExpiredStandaloneKeys.empty()) { // Compact standalone items size_t Skipped = 0; ExtendablePathBuilder<256> Path; for (const std::pair& ExpiredKey : m_ExpiredStandaloneKeys) { if (Ctx.IsCancelledFlag.load()) { return; } Path.Reset(); m_Bucket.BuildPath(Path, ExpiredKey.first); fs::path FilePath = Path.ToPath(); RwLock::SharedLockScope IndexLock(m_Bucket.m_IndexLock); if (m_Bucket.m_Index.contains(ExpiredKey.first)) { // Someone added it back, let the file on disk be ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back", m_Bucket.m_BucketDir, Path.ToUtf8()); continue; } if (Ctx.Settings.IsDeleteMode) { RwLock::ExclusiveLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first)); IndexLock.ReleaseNow(); ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); std::error_code Ec; if (!fs::remove(FilePath, Ec)) { continue; } if (Ec) { ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'", m_Bucket.m_BucketDir, Path.ToUtf8(), Ec.message()); continue; } Stats.RemovedDisk += ExpiredKey.second; } else { RwLock::SharedLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first)); IndexLock.ReleaseNow(); ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); std::error_code Ec; bool Existed = std::filesystem::is_regular_file(FilePath, Ec); if (Ec) { ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'", m_Bucket.m_BucketDir, FilePath, Ec.message()); continue; } if (!Existed) { continue; } Skipped++; } } if (Skipped > 0) { ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipped deleting of {} eligible files", m_Bucket.m_BucketDir, Skipped); } } if (Ctx.Settings.CollectSmallObjects) { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_TrackedCacheKeys = std::make_unique(); }); auto __ = MakeGuard([&]() { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_TrackedCacheKeys.reset(); }); }); size_t InlineEntryCount = 0; BlockStore::BlockUsageMap BlockUsage; { RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); for (const auto& Entry : m_Bucket.m_Index) { PayloadIndex Index = Entry.second; const BucketPayload& Payload = m_Bucket.m_Payloads[Index]; const DiskLocation& Loc = Payload.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { continue; } InlineEntryCount++; uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex(); uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment); if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end()) { BlockStore::BlockUsageInfo& Info = It.value(); Info.EntryCount++; Info.DiskUsage += ChunkSize; } else { BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); } } } { BlockStoreCompactState BlockCompactState; std::vector BlockCompactStateKeys; BlockCompactStateKeys.reserve(InlineEntryCount); BlockStore::BlockEntryCountMap BlocksToCompact = m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent); BlockCompactState.IncludeBlocks(BlocksToCompact); if (BlocksToCompact.size() > 0) { { RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); for (const auto& Entry : m_Bucket.m_Index) { PayloadIndex Index = Entry.second; const BucketPayload& Payload = m_Bucket.m_Payloads[Index]; const DiskLocation& Loc = Payload.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { continue; } if (!BlockCompactState.AddKeepLocation(Loc.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment))) { continue; } BlockCompactStateKeys.push_back(Entry.first); } } if (Ctx.Settings.IsDeleteMode) { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks", m_Bucket.m_BucketDir, BlocksToCompact.size()); } m_Bucket.m_BlockStore.CompactBlocks( BlockCompactState, m_Bucket.m_Configuration.PayloadAlignment, [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { std::vector MovedEntries; MovedEntries.reserve(MovedArray.size()); RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock); for (const std::pair& Moved : MovedArray) { size_t ChunkIndex = Moved.first; const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; ZEN_ASSERT(m_Bucket.m_TrackedCacheKeys); if (m_Bucket.m_TrackedCacheKeys->contains(Key)) { continue; } if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end()) { BucketPayload& Payload = m_Bucket.m_Payloads[It->second]; const BlockStoreLocation& NewLocation = Moved.second; Payload.Location = DiskLocation(NewLocation, m_Bucket.m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); } } m_Bucket.m_SlogFile.Append(MovedEntries); Stats.RemovedDisk += FreedDiskSpace; if (Ctx.IsCancelledFlag.load()) { return false; } return true; }, ClaimDiskReserveCallback, fmt::format("GCV2: cachebucket [COMPACT] '{}': ", m_Bucket.m_BucketDir)); } else { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks", m_Bucket.m_BucketDir, BlocksToCompact.size()); } } } } } } virtual std::string GetGcName(GcCtx& Ctx) override { return m_Bucket.GetGcName(Ctx); } private: ZenCacheDiskLayer::CacheBucket& m_Bucket; std::vector> m_ExpiredStandaloneKeys; }; GcStoreCompactor* ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { ZEN_TRACE_CPU("Z$::Bucket::RemoveExpiredData"); auto Log = [&Ctx]() { return Ctx.Logger; }; size_t TotalEntries = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: cachebucket [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}", m_BucketDir, Stats.CheckedCount, Stats.FoundCount, Stats.DeletedCount, NiceBytes(Stats.FreedMemory), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } if (Stats.DeletedCount > 0) { bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { return; } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); try { SaveSnapshot([]() { return 0; }); } catch (const std::exception& Ex) { ZEN_WARN("Failed to write index and manifest after RemoveExpiredData in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); } } }); const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count(); std::vector ExpiredEntries; std::vector> ExpiredStandaloneKeys; uint64_t RemovedStandaloneSize = 0; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (Ctx.IsCancelledFlag.load()) { return nullptr; } if (m_Index.empty()) { return nullptr; } TotalEntries = m_Index.size(); // Find out expired keys for (const auto& Entry : m_Index) { const IoHash& Key = Entry.first; PayloadIndex EntryIndex = Entry.second; GcClock::Tick AccessTime = m_AccessTimes[EntryIndex]; if (AccessTime >= ExpireTicks) { continue; } const BucketPayload& Payload = m_Payloads[EntryIndex]; DiskIndexEntry ExpiredEntry = {.Key = Key, .Location = Payload.Location}; ExpiredEntry.Location.Flags |= DiskLocation::kTombStone; if (Payload.Location.Flags & DiskLocation::kStandaloneFile) { ExpiredStandaloneKeys.push_back({Key, Payload.Location.Size()}); RemovedStandaloneSize += Payload.Location.Size(); ExpiredEntries.push_back(ExpiredEntry); } else if (Ctx.Settings.CollectSmallObjects) { ExpiredEntries.push_back(ExpiredEntry); } } Stats.CheckedCount += TotalEntries; Stats.FoundCount += ExpiredEntries.size(); if (Ctx.IsCancelledFlag.load()) { return nullptr; } if (Ctx.Settings.IsDeleteMode) { for (const DiskIndexEntry& Entry : ExpiredEntries) { auto It = m_Index.find(Entry.Key); ZEN_ASSERT(It != m_Index.end()); BucketPayload& Payload = m_Payloads[It->second]; RemoveMetaData(IndexLock, Payload); Stats.FreedMemory += RemoveMemCachedData(IndexLock, Payload); m_Index.erase(It); Stats.DeletedCount++; } m_SlogFile.Append(ExpiredEntries); m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed); } } if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty()) { std::vector Payloads; std::vector AccessTimes; std::vector MetaDatas; std::vector MemCachedPayloads; IndexMap Index; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index); } } if (Ctx.IsCancelledFlag.load()) { return nullptr; } return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); } bool ZenCacheDiskLayer::CacheBucket::ReadAttachmentsFromMetaData(uint32_t BlockIndex, std::span InlineKeys, std::span ChunkIndexes, std::vector& OutReferences) const { ZEN_TRACE_CPU("Z$::Bucket::ReadAttachmentsFromMetaData"); IoBuffer MetaDataPayload = m_BlockStore.GetMetaData(BlockIndex); if (MetaDataPayload) { tsl::robin_set WantedKeys; WantedKeys.reserve(ChunkIndexes.size()); for (const size_t ChunkIndex : ChunkIndexes) { WantedKeys.insert(InlineKeys[ChunkIndex]); } ZEN_TRACE_CPU("Z$::Bucket::GetAttachmentsFromMetaData"); return GetAttachmentsFromMetaData( MetaDataPayload, BlockMetaDataExpectedMagic, [&](std::span Keys, std::span AttachmentCounts, std::span Attachments) { auto AttachmentReadIt = Attachments.begin(); OutReferences.resize(OutReferences.size() + Attachments.size()); auto OutReferencesWriteIt = OutReferences.end() - Attachments.size(); auto KeyIt = Keys.begin(); for (uint32_t AttachmentCount : AttachmentCounts) { if (AttachmentCount > 0) { if (WantedKeys.contains(*KeyIt)) { for (uint32_t It = 0u; It < AttachmentCount; It++) { *OutReferencesWriteIt++ = *AttachmentReadIt++; } } else { AttachmentReadIt += AttachmentCount; } } KeyIt++; } OutReferences.erase(OutReferencesWriteIt, OutReferences.end()); }); } return false; } bool ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector& OutReferences) { ZEN_TRACE_CPU("Z$::Bucket::GetReferencesLocked"); auto Log = [&Ctx]() { return Ctx.Logger; }; auto GetAttachments = [&](MemoryView Data) -> bool { if (ValidateCompactBinary(Data, CbValidateMode::Default) == CbValidateError::None) { CbObjectView Obj(Data.GetData()); Obj.IterateAttachments([&](CbFieldView Field) { OutReferences.emplace_back(Field.AsAttachment()); }); return true; } return false; }; std::vector> StandaloneKeys; { std::vector InlineKeys; std::vector InlineLocations; std::vector> InlineBlockChunkIndexes; { std::unordered_map BlockIndexToChunkIndexes; for (const auto& Entry : m_Index) { if (Ctx.IsCancelledFlag.load()) { return false; } PayloadIndex EntryIndex = Entry.second; const BucketPayload& Payload = m_Payloads[EntryIndex]; const DiskLocation& Loc = Payload.Location; if (!Loc.IsFlagSet(DiskLocation::kStructured)) { continue; } const IoHash& Key = Entry.first; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { StandaloneKeys.push_back(std::make_pair(Key, Loc)); continue; } BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); size_t ChunkIndex = InlineLocations.size(); InlineLocations.push_back(ChunkLocation); InlineKeys.push_back(Key); if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end()) { InlineBlockChunkIndexes[It->second].push_back(ChunkIndex); } else { BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size()); InlineBlockChunkIndexes.emplace_back(std::vector{ChunkIndex}); } } } for (std::vector ChunkIndexes : InlineBlockChunkIndexes) { ZEN_ASSERT(!ChunkIndexes.empty()); uint32_t BlockIndex = InlineLocations[ChunkIndexes[0]].BlockIndex; if (!m_Configuration.StoreAttachmentMetaData || !ReadAttachmentsFromMetaData(BlockIndex, InlineKeys, ChunkIndexes, OutReferences)) { std::vector Keys; std::vector AttachmentCounts; size_t PrecachedReferencesStart = OutReferences.size(); size_t NextPrecachedReferencesStart = PrecachedReferencesStart; bool WriteMetaData = m_Configuration.StoreAttachmentMetaData && !m_BlockStore.IsWriting(BlockIndex); if (WriteMetaData) { Keys.reserve(InlineLocations.size()); } auto CaptureAttachments = [&](size_t ChunkIndex, MemoryView Data) { if (GetAttachments(Data)) { size_t AttachmentCount = OutReferences.size() - NextPrecachedReferencesStart; if (WriteMetaData && AttachmentCount > 0) { Keys.push_back(InlineKeys[ChunkIndex]); AttachmentCounts.push_back(gsl::narrow(AttachmentCount)); NextPrecachedReferencesStart += AttachmentCount; } } }; bool Continue = m_BlockStore.IterateBlock( InlineLocations, ChunkIndexes, [&](size_t ChunkIndex, const void* Data, uint64_t Size) { ZEN_UNUSED(ChunkIndex); CaptureAttachments(ChunkIndex, MemoryView(Data, Size)); return !Ctx.IsCancelledFlag.load(); }, [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { ZEN_UNUSED(ChunkIndex); CaptureAttachments(ChunkIndex, File.GetChunk(Offset, Size).GetView()); return !Ctx.IsCancelledFlag.load(); }); if (Continue) { if (WriteMetaData) { ZEN_ASSERT(Keys.size() == AttachmentCounts.size()); IoBuffer MetaDataPayload = BuildReferenceMetaData( BlockMetaDataExpectedMagic, Keys, AttachmentCounts, std::span(OutReferences) .subspan(PrecachedReferencesStart, OutReferences.size() - PrecachedReferencesStart)) .Flatten() .AsIoBuffer(); m_BlockStore.SetMetaData(BlockIndex, MetaDataPayload); } } else { return false; } } if (Ctx.IsCancelledFlag.load()) { return false; } } } for (const auto& It : StandaloneKeys) { if (Ctx.IsCancelledFlag.load()) { return false; } IoBuffer Buffer = GetStandaloneCacheValue(It.second, It.first); if (Buffer) { GetAttachments(Buffer.GetView()); } } return true; } class DiskBucketReferenceChecker : public GcReferenceChecker { using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; using CacheBucket = ZenCacheDiskLayer::CacheBucket; public: DiskBucketReferenceChecker(CacheBucket& Owner) : m_CacheBucket(Owner) {} virtual ~DiskBucketReferenceChecker() { try { m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); } catch (const std::exception& Ex) { ZEN_ERROR("~DiskBucketReferenceChecker threw exception: '{}'", Ex.what()); } } virtual std::string GetGcName(GcCtx& Ctx) override { return m_CacheBucket.GetGcName(Ctx); } virtual void PreCache(GcCtx& Ctx) override { ZEN_TRACE_CPU("Z$::Bucket::PreCache"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique(); }); RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); bool Continue = m_CacheBucket.GetReferencesLocked(Ctx, m_References); IndexLock.ReleaseNow(); if (!Continue) { m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); } } virtual void UpdateLockedState(GcCtx& Ctx) override { ZEN_TRACE_CPU("Z$::Bucket::UpdateLockedState"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); if (Ctx.IsCancelledFlag.load()) { m_References = {}; m_CacheBucket.m_TrackedReferences.reset(); return; } ZEN_ASSERT(m_CacheBucket.m_TrackedReferences); HashSet& AddedReferences(*m_CacheBucket.m_TrackedReferences); m_References.reserve(m_References.size() + AddedReferences.size()); m_References.insert(m_References.end(), AddedReferences.begin(), AddedReferences.end()); AddedReferences = {}; } virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override { ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet"); auto Log = [&Ctx]() { return Ctx.Logger; }; size_t InitialCount = IoCids.size(); Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", m_CacheBucket.m_BucketDir, InitialCount - IoCids.size(), InitialCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); for (const IoHash& ReferenceHash : m_References) { if (IoCids.erase(ReferenceHash) == 1) { if (IoCids.empty()) { return; } } } } CacheBucket& m_CacheBucket; std::vector m_References; }; std::vector ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) { ZEN_TRACE_CPU("Z$::Bucket::CreateReferenceCheckers"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': completed in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); { RwLock::SharedLockScope __(m_IndexLock); if (m_Index.empty()) { return {}; } } return {new DiskBucketReferenceChecker(*this)}; } void ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, std::vector& Payloads, std::vector& AccessTimes, std::vector& MetaDatas, std::vector& MemCachedPayloads, IndexMap& Index) { ZEN_TRACE_CPU("Z$::Bucket::CompactState"); size_t EntryCount = m_Index.size(); Payloads.reserve(EntryCount); AccessTimes.reserve(EntryCount); Index.reserve(EntryCount); Index.min_load_factor(IndexMinLoadFactor); Index.max_load_factor(IndexMaxLoadFactor); for (auto It : m_Index) { PayloadIndex EntryIndex = PayloadIndex(Payloads.size()); Payloads.push_back(m_Payloads[It.second]); BucketPayload& Payload = Payloads.back(); AccessTimes.push_back(m_AccessTimes[It.second]); if (Payload.MetaData) { MetaDatas.push_back(m_MetaDatas[Payload.MetaData]); Payload.MetaData = MetaDataIndex(MetaDatas.size() - 1); } if (Payload.MemCached) { MemCachedPayloads.emplace_back( MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex}); Payload.MemCached = MemCachedIndex(gsl::narrow(MemCachedPayloads.size() - 1)); } Index.insert({It.first, EntryIndex}); } m_Index.swap(Index); m_Payloads.swap(Payloads); m_AccessTimes.swap(AccessTimes); m_MetaDatas.swap(MetaDatas); Reset(m_FreeMetaDatas); m_MemCachedPayloads.swap(MemCachedPayloads); Reset(m_FreeMemCachedPayloads); } RwLock::SharedLockScope ZenCacheDiskLayer::CacheBucket::GetGcReferencerLock() { return RwLock::SharedLockScope(m_IndexLock); } #if ZEN_WITH_TESTS void ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time) { GcClock::Tick TimeTick = Time.time_since_epoch().count(); RwLock::SharedLockScope IndexLock(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) { size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); m_AccessTimes[EntryIndex] = TimeTick; } } #endif // ZEN_WITH_TESTS ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config) : m_Gc(Gc) , m_JobQueue(JobQueue) , m_RootDir(RootDir) , m_Configuration(Config) { } ZenCacheDiskLayer::~ZenCacheDiskLayer() { try { { RwLock::ExclusiveLockScope _(m_Lock); for (auto& It : m_Buckets) { m_DroppedBuckets.emplace_back(std::move(It.second)); } m_Buckets.clear(); } // We destroy the buckets without holding a lock since destructor calls GcManager::RemoveGcReferencer which takes an exclusive lock. // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock m_DroppedBuckets.clear(); } catch (const std::exception& Ex) { ZEN_ERROR("~ZenCacheDiskLayer() failed. Reason: '{}'", Ex.what()); } } ZenCacheDiskLayer::CacheBucket* ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) { ZEN_TRACE_CPU("Z$::GetOrCreateBucket"); const auto BucketName = std::string(InBucket); { RwLock::SharedLockScope SharedLock(m_Lock); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { return It->second.get(); } } // We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock. // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock std::unique_ptr Bucket( std::make_unique(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); RwLock::ExclusiveLockScope Lock(m_Lock); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { return It->second.get(); } std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; try { if (!Bucket->OpenOrCreate(BucketPath)) { ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); return nullptr; } } catch (const std::exception& Err) { ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); throw; } CacheBucket* Result = Bucket.get(); m_Buckets.emplace(BucketName, std::move(Bucket)); m_UpdateCaptureLock.WithExclusiveLock([&]() { if (m_CapturedBuckets) { m_CapturedBuckets->push_back(std::string(BucketName)); } }); return Result; } struct ZenCacheDiskLayer::PutBatchHandle { PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; CacheBucket::PutBatchHandle* Handle; }; void ForEach(const std::function& CB) noexcept { try { RwLock::SharedLockScope _(Lock); for (ZenCacheDiskLayer::PutBatchHandle::BucketHandle& BucketHandle : BucketHandles) { ZEN_ASSERT(BucketHandle.Bucket); ZEN_ASSERT(BucketHandle.Handle); CB(BucketHandle.Bucket, BucketHandle.Handle); } } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ZenCacheDiskLayer::PutBatchHandle::ForEach: '{}'", Ex.what()); } } CacheBucket::PutBatchHandle* GetHandle(CacheBucket* Bucket) { { RwLock::SharedLockScope _(Lock); if (auto It = std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); It != BucketHandles.end()) { return It->Handle; } } CacheBucket::PutBatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults); if (NewBucketHandle == nullptr) { return nullptr; } RwLock::ExclusiveLockScope _(Lock); if (auto It = std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); It != BucketHandles.end()) { CacheBucket::PutBatchHandle* Result = It->Handle; ZEN_ASSERT(Result != nullptr); _.ReleaseNow(); Bucket->EndPutBatch(NewBucketHandle); return Result; } BucketHandles.push_back(ZenCacheDiskLayer::PutBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle}); return NewBucketHandle; } RwLock Lock; std::vector BucketHandles; std::vector& OutResults; }; ZenCacheDiskLayer::PutBatchHandle* ZenCacheDiskLayer::BeginPutBatch(std::vector& OutResults) { return new PutBatchHandle(OutResults); } void ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept { try { ZEN_ASSERT(Batch); Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->EndPutBatch(Handle); }); delete Batch; } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ZenCacheDiskLayer::EndPutBatch: '{}'", Ex.what()); } } struct ZenCacheDiskLayer::GetBatchHandle { GetBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; CacheBucket::GetBatchHandle* Handle; }; void ForEach(const std::function& CB) noexcept { try { RwLock::SharedLockScope _(Lock); for (ZenCacheDiskLayer::GetBatchHandle::BucketHandle& BucketHandle : BucketHandles) { ZEN_ASSERT(BucketHandle.Bucket); ZEN_ASSERT(BucketHandle.Handle); CB(BucketHandle.Bucket, BucketHandle.Handle); } } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ZenCacheDiskLayer::GetBatchHandle::ForEach: '{}'", Ex.what()); } } CacheBucket::GetBatchHandle* GetHandle(CacheBucket* Bucket) { { RwLock::SharedLockScope _(Lock); if (auto It = std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); It != BucketHandles.end()) { return It->Handle; } } CacheBucket::GetBatchHandle* NewBucketHandle = Bucket->BeginGetBatch(OutResults); if (NewBucketHandle == nullptr) { return nullptr; } RwLock::ExclusiveLockScope _(Lock); if (auto It = std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); It != BucketHandles.end()) { CacheBucket::GetBatchHandle* Result = It->Handle; ZEN_ASSERT(Result != nullptr); _.ReleaseNow(); Bucket->EndGetBatch(NewBucketHandle); return Result; } BucketHandles.push_back(ZenCacheDiskLayer::GetBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle}); return NewBucketHandle; } RwLock Lock; std::vector BucketHandles; std::vector& OutResults; }; ZenCacheDiskLayer::GetBatchHandle* ZenCacheDiskLayer::BeginGetBatch(std::vector& OutResults) { return new GetBatchHandle(OutResults); } void ZenCacheDiskLayer::EndGetBatch(GetBatchHandle* Batch) noexcept { ZEN_TRACE_CPU("Z$::EndGetBatch"); try { ZEN_ASSERT(Batch); Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->EndGetBatch(Handle); }); TryMemCacheTrim(); delete Batch; } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ZenCacheDiskLayer::EndGetBatch: '{}'", Ex.what()); } } bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { ZEN_TRACE_CPU("Z$::Get"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { if (Bucket->Get(HashKey, OutValue)) { TryMemCacheTrim(); return true; } } return false; } void ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatchHandle& BatchHandle) { ZEN_TRACE_CPU("Z$::Get(Batched)"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { Bucket->Get(HashKey, *BatchHandle.GetHandle(Bucket)); } } void ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); Bucket->Put(HashKey, Value, References, BucketBatchHandle); TryMemCacheTrim(); } } void ZenCacheDiskLayer::DiscoverBuckets() { ZEN_TRACE_CPU("Z$::DiscoverBuckets"); DirectoryContent DirContent; GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); // Initialize buckets std::vector BadBucketDirectories; std::vector FoundBucketDirectories; RwLock::ExclusiveLockScope _(m_Lock); for (const std::filesystem::path& BucketPath : DirContent.Directories) { const std::string BucketName = PathToUtf8(BucketPath.stem()); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { continue; } if (IsKnownBadBucketName(BucketName)) { BadBucketDirectories.push_back(BucketPath); continue; } FoundBucketDirectories.push_back(BucketPath); ZEN_INFO("Discovered bucket '{}'", BucketName); } for (const std::filesystem::path& BadBucketPath : BadBucketDirectories) { bool IsOk = false; try { IsOk = DeleteDirectories(BadBucketPath); } catch (const std::exception&) { } if (IsOk) { ZEN_INFO("found bad bucket at '{}', deleted contents", BadBucketPath); } else { ZEN_WARN("bad bucket delete failed for '{}'", BadBucketPath); } } RwLock SyncLock; WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst); Latch WorkLatch(1); for (auto& BucketPath : FoundBucketDirectories) { WorkLatch.AddCount(1); Pool.ScheduleWork([this, &WorkLatch, &SyncLock, BucketPath]() { auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); const std::string BucketName = PathToUtf8(BucketPath.stem()); try { std::unique_ptr NewBucket = std::make_unique(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig); CacheBucket* Bucket = nullptr; { RwLock::ExclusiveLockScope __(SyncLock); auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket)); Bucket = InsertResult.first->second.get(); } ZEN_ASSERT(Bucket); if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false)) { ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); { RwLock::ExclusiveLockScope __(SyncLock); m_Buckets.erase(BucketName); } } } catch (const std::exception& Err) { ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); return; } }); } WorkLatch.CountDown(); WorkLatch.Wait(); } bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { ZEN_TRACE_CPU("Z$::DropBucket"); RwLock::ExclusiveLockScope _(m_Lock); auto It = m_Buckets.find(std::string(InBucket)); if (It != m_Buckets.end()) { CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); return Bucket.Drop(); } // Make sure we remove the folder even if we don't know about the bucket std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); return MoveAndDeleteDirectory(BucketPath); } bool ZenCacheDiskLayer::Drop() { ZEN_TRACE_CPU("Z$::Drop"); RwLock::ExclusiveLockScope _(m_Lock); std::vector> Buckets; Buckets.reserve(m_Buckets.size()); while (!m_Buckets.empty()) { const auto& It = m_Buckets.begin(); CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); if (!Bucket.Drop()) { return false; } } return MoveAndDeleteDirectory(m_RootDir); } void ZenCacheDiskLayer::Flush() { ZEN_TRACE_CPU("Z$::Flush"); std::vector Buckets; Stopwatch Timer; const auto _ = MakeGuard([&] { if (Buckets.empty()) { return; } ZEN_INFO("Flushed {} buckets at '{}' in {}", Buckets.size(), m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); { RwLock::SharedLockScope __(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { CacheBucket* Bucket = Kv.second.get(); Buckets.push_back(Bucket); } } { WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst); Latch WorkLatch(1); try { for (auto& Bucket : Buckets) { WorkLatch.AddCount(1); Pool.ScheduleWork([&WorkLatch, Bucket]() { auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); try { Bucket->Flush(); } catch (const std::exception& Ex) { ZEN_ERROR("Failed flushing bucket. Reason: '{}'", Ex.what()); } }); } } catch (const std::exception& Ex) { ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what()); } WorkLatch.CountDown(); while (!WorkLatch.Wait(1000)) { ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir); } } } void ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::ScrubStorage"); RwLock::SharedLockScope _(m_Lock); { std::vector> Results; Results.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { #if 1 Results.push_back(Ctx.ThreadPool().EnqueueTask( std::packaged_task{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); #else CacheBucket& Bucket = *Kv.second; Bucket.ScrubStorage(Ctx); #endif } for (auto& Result : Results) { if (Result.valid()) { Result.wait(); } } for (auto& Result : Results) { Result.get(); } } } void ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::GatherReferences"); std::vector Buckets; { RwLock::SharedLockScope _(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Buckets.push_back(Kv.second.get()); } } for (CacheBucket* Bucket : Buckets) { Bucket->GatherReferences(GcCtx); } } GcStorageSize ZenCacheDiskLayer::StorageSize() const { GcStorageSize StorageSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { GcStorageSize BucketSize = Kv.second->StorageSize(); StorageSize.DiskSize += BucketSize.DiskSize; StorageSize.MemorySize += BucketSize.MemorySize; } return StorageSize; } ZenCacheDiskLayer::DiskStats ZenCacheDiskLayer::Stats() const { GcStorageSize Size = StorageSize(); ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; { RwLock::SharedLockScope _(m_Lock); Stats.BucketStats.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Stats.BucketStats.emplace_back(NamedBucketStats{.BucketName = Kv.first, .Stats = Kv.second->Stats()}); } } return Stats; } ZenCacheDiskLayer::Info ZenCacheDiskLayer::GetInfo() const { ZenCacheDiskLayer::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration}; { RwLock::SharedLockScope _(m_Lock); Info.BucketNames.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Info.BucketNames.push_back(Kv.first); Info.EntryCount += Kv.second->EntryCount(); GcStorageSize BucketSize = Kv.second->StorageSize(); Info.StorageSize.DiskSize += BucketSize.DiskSize; Info.StorageSize.MemorySize += BucketSize.MemorySize; } } return Info; } std::optional ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()}; } return {}; } void ZenCacheDiskLayer::EnumerateBucketContents(std::string_view Bucket, std::function& Fn) const { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { It->second->EnumerateBucketContents(Fn); } } CacheValueDetails::NamespaceDetails ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const { CacheValueDetails::NamespaceDetails Details; { RwLock::SharedLockScope IndexLock(m_Lock); if (BucketFilter.empty()) { Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); for (auto& Kv : m_Buckets) { Details.Buckets[Kv.first] = Kv.second->GetValueDetails(IndexLock, ValueFilter); } } else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) { Details.Buckets[It->first] = It->second->GetValueDetails(IndexLock, ValueFilter); } } return Details; } std::vector ZenCacheDiskLayer::GetGcReferencerLocks() { std::vector Locks; Locks.emplace_back(RwLock::SharedLockScope(m_Lock)); for (auto& Kv : m_Buckets) { Locks.emplace_back(Kv.second->GetGcReferencerLock()); } return Locks; } void ZenCacheDiskLayer::EnableUpdateCapture() { m_UpdateCaptureLock.WithExclusiveLock([&]() { if (m_UpdateCaptureRefCounter == 0) { ZEN_ASSERT(!m_CapturedBuckets); m_CapturedBuckets = std::make_unique>(); } else { ZEN_ASSERT(m_CapturedBuckets); } m_UpdateCaptureRefCounter++; }); } void ZenCacheDiskLayer::DisableUpdateCapture() { m_UpdateCaptureLock.WithExclusiveLock([&]() { ZEN_ASSERT(m_CapturedBuckets); ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); m_UpdateCaptureRefCounter--; if (m_UpdateCaptureRefCounter == 0) { m_CapturedBuckets.reset(); } }); } std::vector ZenCacheDiskLayer::GetCapturedBuckets() { RwLock::SharedLockScope _(m_UpdateCaptureLock); if (m_CapturedBuckets) { return *m_CapturedBuckets; } return {}; } void ZenCacheDiskLayer::MemCacheTrim() { ZEN_TRACE_CPU("Z$::MemCacheTrim"); ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0); ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0); ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0); bool Expected = false; if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true)) { return; } try { m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) { ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); uint64_t TrimmedSize = 0; Stopwatch Timer; const auto Guard = MakeGuard([&] { ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", NiceBytes(TrimmedSize), NiceBytes(m_TotalMemCachedSize), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); const GcClock::Tick NowTick = GcClock::TickCount(); const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); m_NextAllowedTrimTick.store(NextTrimTick); m_IsMemCacheTrimming.store(false); }); const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); static const size_t UsageSlotCount = 2048; std::vector UsageSlots; UsageSlots.reserve(UsageSlotCount); std::vector Buckets; { RwLock::SharedLockScope __(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Buckets.push_back(Kv.second.get()); } } const GcClock::TimePoint Now = GcClock::Now(); { ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess"); for (CacheBucket* Bucket : Buckets) { Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots); } } uint64_t TotalSize = 0; for (size_t Index = 0; Index < UsageSlots.size(); ++Index) { TotalSize += UsageSlots[Index]; if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) { GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount); TrimmedSize = MemCacheTrim(Buckets, ExpireTime); break; } } }); } catch (const std::exception& Ex) { ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what()); m_IsMemCacheTrimming.store(false); } } uint64_t ZenCacheDiskLayer::MemCacheTrim(std::vector& Buckets, GcClock::TimePoint ExpireTime) { if (m_Configuration.MemCacheTargetFootprintBytes == 0) { return 0; } uint64_t TrimmedSize = 0; for (CacheBucket* Bucket : Buckets) { TrimmedSize += Bucket->MemCacheTrim(ExpireTime); } const GcClock::TimePoint Now = GcClock::Now(); const GcClock::Tick NowTick = Now.time_since_epoch().count(); const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); GcClock::Tick LastTrimTick = m_NextAllowedTrimTick; const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); return TrimmedSize; } #if ZEN_WITH_TESTS void ZenCacheDiskLayer::SetAccessTime(std::string_view InBucket, const IoHash& HashKey, GcClock::TimePoint Time) { const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto It = m_Buckets.find(BucketName); if (It != m_Buckets.end()) { Bucket = It->second.get(); } } if (Bucket == nullptr) { return; } Bucket->SetAccessTime(HashKey, Time); } #endif // ZEN_WITH_TESTS } // namespace zen