// Copyright Epic Games, Inc. All Rights Reserved. #include "zenstore/cache/cachedisklayer.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////// #include namespace zen { const FLLMTag& GetCacheDiskTag() { static FLLMTag _("disk", FLLMTag("cache")); return _; } namespace cache::impl { #pragma pack(push) #pragma pack(1) struct CacheBucketIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; static constexpr uint32_t Version2 = 2; static constexpr uint32_t CurrentVersion = Version2; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t PayloadAlignment = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) { return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } bool IsValid() const { if (Magic != ExpectedMagic) { return false; } if (Checksum != ComputeChecksum(*this)) { return false; } if (PayloadAlignment == 0) { return false; } return true; } }; static_assert(sizeof(CacheBucketIndexHeader) == 32); struct BucketMetaHeader { static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta'; static constexpr uint32_t Version2 = 2; static constexpr uint32_t CurrentVersion = Version2; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t Padding = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const BucketMetaHeader& Header) { return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA); } bool IsValid() const { if (Magic != ExpectedMagic) { return false; } if (Checksum != ComputeChecksum(*this)) { return false; } if (Padding != 0) { return false; } return true; } }; static_assert(sizeof(BucketMetaHeader) == 32); static constexpr uint32_t BlockMetaDataExpectedMagic = 0x61'74'6d'62; // 'bmta'; #pragma pack(pop) ////////////////////////////////////////////////////////////////////////// template void Reset(T& V) { T Tmp; V.swap(Tmp); } const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; const char* MetaExtension = ".meta"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + IndexExtension); } std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + MetaExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + LogExtension); } std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { ZEN_UNUSED(BucketName); return BucketDir / "zen_manifest"; } bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } if (Entry.Location.Reserved != 0) { OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); return false; } if (Entry.Location.GetFlags() & ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); return false; } if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) { return true; } if (Entry.Location.Reserved != 0) { OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); return false; } uint64_t Size = Entry.Location.Size(); if (Size == 0) { OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); return false; } return true; } std::filesystem::path MoveDroppedDirectory(const std::filesystem::path& Dir) { int DropIndex = 0; do { if (!IsDir(Dir)) { return {}; } std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; if (IsDir(DroppedBucketPath)) { DropIndex++; continue; } std::error_code Ec; RenameDirectory(Dir, DroppedBucketPath, Ec); if (!Ec) { return DroppedBucketPath; } zen::Sleep(100); } while (DropIndex < 10); return {}; } } // namespace cache::impl namespace fs = std::filesystem; using namespace std::literals; } // namespace zen namespace zen::cache::impl { class BucketManifestSerializer { using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData; using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; public: // We use this to indicate if a on disk bucket needs wiping // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references // to block items. // See: https://github.com/EpicGames/zen/pull/299 static inline const uint32_t CurrentDiskBucketVersion = 1; bool Open(std::filesystem::path ManifestPath) { Manifest = LoadCompactBinaryObject(ManifestPath); return !!Manifest; } Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); } bool IsCurrentVersion(uint32_t& OutVersion) const { OutVersion = Manifest["Version"sv].AsUInt32(0); return OutVersion == CurrentDiskBucketVersion; } void ParseManifest(RwLock::ExclusiveLockScope& BucketLock, ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path ManifestPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, std::vector& Payloads); Oid GenerateNewManifest(std::filesystem::path ManifestPath); IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount); uint64_t GetSidecarSize() const { return sizeof(BucketMetaHeader) + m_ManifestEntryCount * sizeof(ManifestData); } void WriteSidecarFile(RwLock::SharedLockScope& BucketLock, const std::filesystem::path& SidecarPath, uint64_t SnapshotLogPosition, const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, const std::vector& AccessTimes, const std::vector& Payloads, const std::vector& MetaDatas); bool ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock, ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path SidecarPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, std::vector& Payloads); IoBuffer MakeManifest(const Oid& BucketId, ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, std::vector&& AccessTimes, std::vector&& Payloads, std::vector&& MetaDatas); CbObject Manifest; private: CbObject LoadCompactBinaryObject(const fs::path& Path) { FileContents Result = ReadFile(Path); if (!Result.ErrorCode) { IoBuffer Buffer = Result.Flatten(); if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) { return zen::LoadCompactBinaryObject(Buffer); } } return CbObject(); } uint64_t m_ManifestEntryCount = 0; #pragma pack(push) #pragma pack(4) struct ManifestData { uint32_t RawSize; // 4 uint32_t SecondsSinceEpoch; // 4 IoHash RawHash; // 20 IoHash Key; // 20 }; #pragma pack(pop) static_assert(sizeof(ManifestData) == 48); }; void BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& BucketLock, ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path ManifestPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, std::vector& Payloads) { if (Manifest["UsingMetaFile"sv].AsBool()) { ReadSidecarFile(BucketLock, Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads); return; } ZEN_TRACE_CPU("Z$::ParseManifest"); Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); const uint64_t Count = Manifest["Count"sv].AsUInt64(0); std::vector KeysIndexes; KeysIndexes.reserve(Count); CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); for (CbFieldView& KeyView : KeyArray) { if (auto It = Index.find(KeyView.AsHash()); It != Index.end()) { KeysIndexes.push_back(It.value()); } else { KeysIndexes.push_back(PayloadIndex()); } } size_t KeyIndexOffset = 0; CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); for (CbFieldView& TimeStampView : TimeStampArray) { const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; if (KeyIndex) { AccessTimes[KeyIndex] = TimeStampView.AsInt64(); } } KeyIndexOffset = 0; CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); if (RawHashArray.Num() == RawSizeArray.Num()) { auto RawHashIt = RawHashArray.CreateViewIterator(); auto RawSizeIt = RawSizeArray.CreateViewIterator(); while (RawHashIt != CbFieldViewIterator()) { const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; if (KeyIndex) { uint64_t RawSize = RawSizeIt.AsUInt64(); IoHash RawHash = RawHashIt.AsHash(); if ((RawSize != 0 || RawHash != IoHash::Zero) && RawSize <= std::numeric_limits::max()) { BucketPayload& Payload = Payloads[KeyIndex]; Bucket.SetMetaData(BucketLock, Payload, BucketMetaData{.RawSize = static_cast(RawSize), .RawHash = RawHash}); } } RawHashIt++; RawSizeIt++; } } else { ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); } } Oid BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath) { const Oid BucketId = Oid::NewOid(); CbObjectWriter Writer; Writer << "BucketId"sv << BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; Manifest = Writer.Save(); TemporaryFile::SafeWriteFile(ManifestPath, Manifest.GetBuffer().GetView()); return BucketId; } IoBuffer BucketManifestSerializer::MakeManifest(const Oid& BucketId, ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, std::vector&& AccessTimes, std::vector&& Payloads, std::vector&& MetaDatas) { using namespace std::literals; ZEN_TRACE_CPU("Z$::MakeManifest"); size_t ItemCount = Index.size(); // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth // And we don't need to reallocate the underlying buffer in almost every case const size_t EstimatedSizePerItem = 54u; const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); CbObjectWriter Writer(ReserveSize); Writer << "BucketId"sv << BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; if (!Index.empty()) { Writer.AddInteger("Count"sv, gsl::narrow(Index.size())); Writer.BeginArray("Keys"sv); for (auto& Kv : Index) { const IoHash& Key = Kv.first; Writer.AddHash(Key); } Writer.EndArray(); Writer.BeginArray("Timestamps"sv); for (auto& Kv : Index) { GcClock::Tick AccessTime = AccessTimes[Kv.second]; Writer.AddInteger(AccessTime); } Writer.EndArray(); if (!MetaDatas.empty()) { Writer.BeginArray("RawHash"sv); for (auto& Kv : Index) { const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; if (Payload.MetaData) { Writer.AddHash(MetaDatas[Payload.MetaData].RawHash); } else { Writer.AddHash(IoHash::Zero); } } Writer.EndArray(); Writer.BeginArray("RawSize"sv); for (auto& Kv : Index) { const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; if (Payload.MetaData) { Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize); } else { Writer.AddInteger(0); } } Writer.EndArray(); } } Manifest = Writer.Save(); return Manifest.GetBuffer().AsIoBuffer(); } IoBuffer BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount) { m_ManifestEntryCount = EntryCount; CbObjectWriter Writer; Writer << "BucketId"sv << BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; Writer << "Count"sv << EntryCount; Writer << "UsingMetaFile"sv << true; Manifest = Writer.Save(); return Manifest.GetBuffer().AsIoBuffer(); } bool BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock, ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path SidecarPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, std::vector& Payloads) { ZEN_TRACE_CPU("Z$::ReadSidecarFile"); ZEN_ASSERT(AccessTimes.size() == Payloads.size()); std::error_code Ec; BasicFile SidecarFile; SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec); if (Ec) { ZEN_WARN("Failed to read sidecar file '{}'. Reason: '{}'", SidecarPath, Ec.message()); return false; } uint64_t FileSize = SidecarFile.FileSize(); auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); }); if (FileSize < sizeof(BucketMetaHeader)) { ZEN_WARN("Failed to read sidecar file '{}'. Minimum size {} expected, actual size: ", SidecarPath, sizeof(BucketMetaHeader), FileSize); return false; } BasicFileBuffer Sidecar(SidecarFile, 128 * 1024); BucketMetaHeader Header; Sidecar.Read(&Header, sizeof Header, 0); if (!Header.IsValid()) { ZEN_WARN("Failed to read sidecar file '{}'. Header is invalid", SidecarPath); return false; } if (Header.Version != BucketMetaHeader::Version2) { ZEN_WARN("Failed to read sidecar file '{}'. Unsupported version: {}", SidecarPath, Header.Version); return false; } const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData); if (Header.EntryCount > ExpectedEntryCount) { ZEN_WARN( "Failed to read sidecar file '{}'. File is not large enough to hold expected entry count. Header count: {}, file size " "count: " "{}", SidecarPath, Header.EntryCount, ExpectedEntryCount); return false; } InvalidGuard.Dismiss(); uint64_t RemainingEntryCount = ExpectedEntryCount; uint64_t EntryCount = 0; uint64_t CurrentReadOffset = sizeof(Header); while (RemainingEntryCount--) { const ManifestData* Entry = Sidecar.MakeView(CurrentReadOffset); CurrentReadOffset += sizeof(ManifestData); if (auto It = Index.find(Entry->Key); It != Index.end()) { PayloadIndex PlIndex = It.value(); ZEN_ASSERT(size_t(PlIndex) <= Payloads.size()); ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex]; AccessTimes[PlIndex].SetSecondsSinceEpoch(Entry->SecondsSinceEpoch); if (Entry->RawSize && Entry->RawHash != IoHash::Zero) { Bucket.SetMetaData(BucketLock, PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash}); } } EntryCount++; } ZEN_ASSERT(EntryCount == ExpectedEntryCount); return true; } void BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&, const std::filesystem::path& SidecarPath, uint64_t SnapshotLogPosition, const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, const std::vector& AccessTimes, const std::vector& Payloads, const std::vector& MetaDatas) { ZEN_TRACE_CPU("Z$::WriteSidecarFile"); ZEN_DEBUG("writing store sidecar for '{}'", SidecarPath); const uint64_t EntryCount = Index.size(); Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store sidecar for '{}' containing {} entries in {}", SidecarPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); BucketMetaHeader Header; Header.EntryCount = m_ManifestEntryCount; Header.LogPosition = SnapshotLogPosition; Header.Checksum = Header.ComputeChecksum(Header); std::error_code Ec; TemporaryFile SidecarFile; SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath())); } SidecarFile.Write(&Header, sizeof Header, 0); { uint64_t WriteOffset = sizeof Header; const size_t MaxManifestDataBufferCount = (512u * 1024u) / sizeof(ManifestData); std::vector ManifestDataBuffer(Min(m_ManifestEntryCount, MaxManifestDataBufferCount)); auto WriteIt = ManifestDataBuffer.begin(); for (auto& Kv : Index) { ManifestData& Data = *WriteIt++; const PayloadIndex PlIndex = Kv.second; Data.Key = Kv.first; Data.SecondsSinceEpoch = AccessTimes[PlIndex].GetSecondsSinceEpoch(); if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData) { Data.RawHash = MetaDatas[MetaIndex].RawHash; Data.RawSize = MetaDatas[MetaIndex].RawSize; } else { Data.RawHash = IoHash::Zero; Data.RawSize = 0; } if (WriteIt == ManifestDataBuffer.end()) { uint64_t WriteSize = std::distance(ManifestDataBuffer.begin(), WriteIt) * sizeof(ManifestData); SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset); WriteOffset += WriteSize; WriteIt = ManifestDataBuffer.begin(); } } if (WriteIt != ManifestDataBuffer.begin()) { uint64_t WriteSize = std::distance(ManifestDataBuffer.begin(), WriteIt) * sizeof(ManifestData); SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset); } } SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath)); } } ////////////////////////////////////////////////////////////////////////// static const float IndexMinLoadFactor = 0.2f; static const float IndexMaxLoadFactor = 0.7f; } // namespace zen::cache::impl ////////////////////////////////////////////////////////////////////////// namespace zen { ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string_view BucketName, const BucketConfiguration& Config) : m_Gc(Gc) , m_OuterCacheMemoryUsage(OuterCacheMemoryUsage) , m_BucketName(BucketName) , m_Configuration(Config) , m_BucketId(Oid::Zero) { m_Index.min_load_factor(cache::impl::IndexMinLoadFactor); m_Index.max_load_factor(cache::impl::IndexMaxLoadFactor); if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) { const uint64_t LegacyOverrideSize = 16 * 1024 * 1024; // This is pretty ad hoc but in order to avoid too many individual files // it makes sense to have a different strategy for legacy values m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, LegacyOverrideSize); } else if (m_BucketName == std::string_view("iostorecompression")) { const uint64_t IoStoreDDCOverrideSize = 1024 * 1024; // This is pretty ad hoc but in order to avoid too many individual files // it makes sense to have a different strategy for ddc pak compression stores m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, IoStoreDDCOverrideSize); } m_Gc.AddGcReferencer(*this); } ZenCacheDiskLayer::CacheBucket::~CacheBucket() { try { m_SlogFile.Flush(); m_SlogFile.Close(); m_BlockStore.Close(); } catch (const std::exception& Ex) { ZEN_ERROR("~CacheBucket() failed with: ", Ex.what()); } m_Gc.RemoveGcReferencer(*this); } bool ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) { using namespace std::literals; ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate"); ZEN_ASSERT(m_IsFlushing.load()); // We want to take the lock here since we register as a GC referencer a construction RwLock::ExclusiveLockScope IndexLock(m_IndexLock); ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir); m_BlocksBasePath = BucketDir / "blocks"; m_BucketDir = BucketDir; CreateDirectories(m_BucketDir); std::filesystem::path ManifestPath = cache::impl::GetManifestPath(m_BucketDir, m_BucketName); bool IsNew = false; cache::impl::BucketManifestSerializer ManifestReader; if (ManifestReader.Open(ManifestPath)) { m_BucketId = ManifestReader.GetBucketId(); if (m_BucketId == Oid::Zero) { return false; } uint32_t Version = 0; if (ManifestReader.IsCurrentVersion(/* out */ Version) == false) { ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, cache::impl::BucketManifestSerializer::CurrentDiskBucketVersion); IsNew = true; } } else if (AllowCreate) { m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath); IsNew = true; } else { return false; } InitializeIndexFromDisk(IndexLock, IsNew); auto _ = MakeGuard([&]() { // We are now initialized, allow flushing when we exit m_IsFlushing.store(false); }); if (IsNew) { return true; } ManifestReader.ParseManifest(IndexLock, *this, ManifestPath, m_Index, m_AccessTimes, m_Payloads); return true; } void ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(uint64_t LogPosition, bool ResetLog, const std::function& ClaimDiskReserveFunc) { ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot"); if (m_LogFlushPosition == LogPosition) { return; } ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); const uint64_t EntryCount = m_Index.size(); Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", m_BucketDir, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; const fs::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName); try { const uint64_t IndexSize = sizeof(cache::impl::CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry); std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) { throw std::system_error(Error, fmt::format("get disk space in '{}' FAILED", m_BucketDir)); } bool EnoughSpace = Space.Free >= IndexSize + 1024 * 512; if (!EnoughSpace) { uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); EnoughSpace = (Space.Free + ReclaimedSpace) >= IndexSize + 1024 * 512; } if (!EnoughSpace) { throw std::runtime_error( fmt::format("not enough free disk space in '{}' to save index of size {}", m_BucketDir, NiceBytes(IndexSize))); } TemporaryFile ObjectIndexFile; std::error_code Ec; ObjectIndexFile.CreateTemporary(m_BucketDir, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir)); } const uint64_t IndexLogPosition = ResetLog ? 0 : LogPosition; cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount, .LogPosition = IndexLogPosition, .PayloadAlignment = gsl::narrow(m_Configuration.PayloadAlignment)}; Header.Checksum = cache::impl::CacheBucketIndexHeader::ComputeChecksum(Header); ObjectIndexFile.Write(&Header, sizeof(cache::impl::CacheBucketIndexHeader), 0); if (EntryCount > 0) { uint64_t IndexWriteOffset = sizeof(cache::impl::CacheBucketIndexHeader); size_t MaxWriteEntryCount = (512u * 1024u) / sizeof(DiskIndexEntry); std::vector DiskEntryBuffer(Min(m_Index.size(), MaxWriteEntryCount)); auto WriteIt = DiskEntryBuffer.begin(); for (auto& Entry : m_Index) { *WriteIt++ = {.Key = Entry.first, .Location = m_Payloads[Entry.second].Location}; if (WriteIt == DiskEntryBuffer.end()) { uint64_t WriteSize = std::distance(DiskEntryBuffer.begin(), WriteIt) * sizeof(DiskIndexEntry); ObjectIndexFile.Write(DiskEntryBuffer.data(), WriteSize, IndexWriteOffset); IndexWriteOffset += WriteSize; WriteIt = DiskEntryBuffer.begin(); } } if (WriteIt != DiskEntryBuffer.begin()) { uint64_t WriteSize = std::distance(DiskEntryBuffer.begin(), WriteIt) * sizeof(DiskIndexEntry); ObjectIndexFile.Write(DiskEntryBuffer.data(), WriteSize, IndexWriteOffset); } } ObjectIndexFile.Flush(); ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", ObjectIndexFile.GetPath(), IndexPath, Ec.message())); } if (ResetLog) { const std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName); if (IsFile(LogPath)) { m_SlogFile.Close(); if (!RemoveFile(LogPath, Ec) || Ec) { // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in // the end it will be the same result ZEN_WARN("snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); } m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); } } m_LogFlushPosition = IndexLogPosition; } catch (const std::exception& Err) { ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); } } uint64_t ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion) { ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile"); if (!IsFile(IndexPath)) { return 0; } auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); }); BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t FileSize = ObjectIndexFile.FileSize(); if (FileSize < sizeof(cache::impl::CacheBucketIndexHeader)) { return 0; } cache::impl::CacheBucketIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if (!Header.IsValid()) { return 0; } if (Header.Version != cache::impl::CacheBucketIndexHeader::Version2) { return 0; } const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(cache::impl::CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); if (Header.EntryCount > ExpectedEntryCount) { return 0; } InvalidGuard.Dismiss(); size_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); m_Configuration.PayloadAlignment = Header.PayloadAlignment; m_Payloads.reserve(Header.EntryCount); m_Index.reserve(Header.EntryCount); BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024); uint64_t CurrentReadOffset = sizeof(cache::impl::CacheBucketIndexHeader); uint64_t RemainingEntryCount = Header.EntryCount; std::string InvalidEntryReason; while (RemainingEntryCount--) { const DiskIndexEntry* Entry = FileBuffer.MakeView(CurrentReadOffset); CurrentReadOffset += sizeof(DiskIndexEntry); if (!cache::impl::ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; } const PayloadIndex EntryIndex = PayloadIndex(EntryCount); m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location}); m_Index.insert_or_assign(Entry->Key, EntryIndex); EntryCount++; } ZEN_ASSERT(EntryCount == m_Payloads.size()); m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount())); OutVersion = cache::impl::CacheBucketIndexHeader::Version2; return Header.LogPosition; } uint64_t ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_TRACE_CPU("Z$::Bucket::ReadLog"); if (!IsFile(LogPath)) { return 0; } uint64_t LogEntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (!CasLog.Initialize()) { return 0; } const uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } LogEntryCount = EntryCount - SkipEntryCount; uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const DiskIndexEntry& Record) { std::string InvalidEntryReason; if (Record.Location.Flags & DiskLocation::kTombStone) { // Note: this leaves m_Payloads and other arrays with 'holes' in them m_Index.erase(Record.Key); return; } if (!cache::impl::ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; return; } PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); m_Payloads.emplace_back(BucketPayload{.Location = Record.Location}); m_Index.insert_or_assign(Record.Key, EntryIndex); }, SkipEntryCount); m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); if (InvalidEntryCount) { ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); } return LogEntryCount; }; void ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockScope& IndexLock, const bool IsNew) { ZEN_TRACE_CPU("Z$::Bucket::Initialize"); m_StandaloneSize = 0; m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); m_MetaDatas.clear(); m_FreeMetaDatas.clear(); m_MemCachedPayloads.clear(); m_FreeMemCachedPayloads.clear(); std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName); std::filesystem::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName); if (IsNew) { RemoveFile(LogPath); RemoveFile(IndexPath); DeleteDirectories(m_BlocksBasePath); } CreateDirectories(m_BucketDir); m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); if (IsFile(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexLock, IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); RemoveFile(IndexPath); } } uint64_t LogEntryCount = 0; if (IsFile(LogPath)) { if (TCasLogFile::IsValid(LogPath)) { LogEntryCount = ReadLog(IndexLock, LogPath, m_LogFlushPosition); } else if (IsFile(LogPath)) { ZEN_WARN("removing invalid log at '{}'", LogPath); RemoveFile(LogPath); } } BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_Index) { size_t EntryIndex = Entry.second; const BucketPayload& Payload = m_Payloads[EntryIndex]; const DiskLocation& Location = Payload.Location; if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); } else { uint32_t BlockIndex = Location.Location.BlockLocation.GetBlockIndex(); KnownBlocks.insert(BlockIndex); } } BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); bool RemovedEntries = false; if (!MissingBlocks.empty()) { std::vector MissingEntries; for (auto& It : m_Index) { BucketPayload& Payload = m_Payloads[It.second]; DiskLocation Location = Payload.Location; if (!Location.IsFlagSet(DiskLocation::kStandaloneFile)) { if (MissingBlocks.contains(Location.Location.BlockLocation.GetBlockIndex())) { RemoveMemCachedData(IndexLock, Payload); RemoveMetaData(IndexLock, Payload); } } Location.Flags |= DiskLocation::kTombStone; MissingEntries.push_back(DiskIndexEntry{.Key = It.first, .Location = Location}); } ZEN_ASSERT(!MissingEntries.empty()); for (const DiskIndexEntry& Entry : MissingEntries) { m_Index.erase(Entry.Key); } m_SlogFile.Append(MissingEntries); m_SlogFile.Flush(); { std::vector Payloads; std::vector AccessTimes; std::vector MetaDatas; std::vector MemCachedPayloads; IndexMap Index; CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index); } RemovedEntries = true; } if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0 || RemovedEntries) { WriteIndexSnapshot(IndexLock, m_SlogFile.GetLogCount(), /*Flush log*/ true); } } void ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const { char HexString[sizeof(HashKey.Hash) * 2]; ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); Path.Append(m_BucketDir); Path.AppendSeparator(); Path.Append(L"blob"); Path.AppendSeparator(); Path.AppendAsciiRange(HexString, HexString + 3); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 3, HexString + 5); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); } IoBuffer ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const { ZEN_TRACE_CPU("Z$::Bucket::GetInlineCacheValue"); BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); IoBuffer Value = m_BlockStore.TryGetChunk(Location); if (Value) { Value.SetContentType(Loc.GetContentType()); } return Value; } IoBuffer ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const { ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue"); ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) { if (Data.GetSize() == Loc.Size()) { Data.SetContentType(Loc.GetContentType()); return Data; } } return {}; } struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle { PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} struct Entry { std::vector HashKeyAndReferences; }; std::vector Buffers; std::vector Entries; std::vector EntryResultIndexes; std::vector& OutResults; }; ZenCacheDiskLayer::CacheBucket::PutBatchHandle* ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector& OutResults) { ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch"); return new PutBatchHandle(OutResults); } void ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { ZEN_TRACE_CPU("Z$::Bucket::EndPutBatch"); try { ZEN_ASSERT(Batch); if (!Batch->Buffers.empty()) { std::vector EntryFlags; for (const IoBuffer& Buffer : Batch->Buffers) { uint8_t Flags = 0; if (Buffer.GetContentType() == ZenContentType::kCbObject) { Flags |= DiskLocation::kStructured; } else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) { Flags |= DiskLocation::kCompressed; } EntryFlags.push_back(Flags); } size_t IndexOffset = 0; m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); std::vector DiskEntries; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (size_t Index = 0; Index < Locations.size(); Index++) { DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; ZEN_ASSERT(HashKeyAndReferences.size() > 1); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); if (m_TrackedCacheKeys) { m_TrackedCacheKeys->insert(HashKey); } if (m_TrackedReferences && HashKeyAndReferences.size() > 1) { m_TrackedReferences->insert(m_TrackedReferences->end(), HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); } if (auto It = m_Index.find(HashKey); It != m_Index.end()) { PayloadIndex EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); BucketPayload& Payload = m_Payloads[EntryIndex]; RemoveMemCachedData(IndexLock, Payload); RemoveMetaData(IndexLock, Payload); Payload = (BucketPayload{.Location = Location}); m_AccessTimes[EntryIndex] = GcClock::TickCount(); } else { PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); m_Payloads.emplace_back(BucketPayload{.Location = Location}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } } } m_SlogFile.Append(DiskEntries); for (size_t Index = 0; Index < Locations.size(); Index++) { size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); Batch->OutResults[ResultIndex] = true; } IndexOffset += Locations.size(); }); } delete Batch; } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ZenCacheDiskLayer::CacheBucket::EndPutBatch: '{}'", Ex.what()); } } struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle { GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults) { Keys.reserve(OutResults.capacity()); ResultIndexes.reserve(OutResults.capacity()); } ~GetBatchHandle() {} std::vector Keys; std::vector ResultIndexes; ZenCacheValueVec_t& OutResults; }; ZenCacheDiskLayer::CacheBucket::GetBatchHandle* ZenCacheDiskLayer::CacheBucket::BeginGetBatch(ZenCacheValueVec_t& OutResult) { ZEN_TRACE_CPU("Z$::Bucket::BeginGetBatch"); return new GetBatchHandle(OutResult); } void ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch"); try { ZEN_ASSERT(Batch); ZEN_ASSERT(Batch->Keys.size() == Batch->ResultIndexes.size()); metrics::RequestStats::Scope StatsScope(m_GetOps, 0); if (!Batch->ResultIndexes.empty()) { eastl::fixed_vector StandaloneDiskLocations; eastl::fixed_vector StandaloneKeyIndexes; eastl::fixed_vector MemCachedKeyIndexes; eastl::fixed_vector InlineDiskLocations; eastl::fixed_vector InlineBlockLocations; eastl::fixed_vector InlineKeyIndexes; eastl::fixed_vector FillRawHashAndRawSize(Batch->Keys.size(), false); { RwLock::SharedLockScope IndexLock(m_IndexLock); for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++) { const IoHash& HashKey = Batch->Keys[KeyIndex]; auto It = m_Index.find(HashKey); if (It != m_Index.end()) { size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; const PayloadIndex PayloadIdx = It.value(); m_AccessTimes[PayloadIdx] = GcClock::TickCount(); const BucketPayload& Payload = m_Payloads[PayloadIdx]; const DiskLocation& Location = Payload.Location; FillRawHashAndRawSize[KeyIndex] = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0); if (Payload.MetaData) { const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData]; OutValue.RawHash = MetaData.RawHash; OutValue.RawSize = MetaData.RawSize; FillRawHashAndRawSize[KeyIndex] = false; } if (Payload.MemCached) { OutValue.Value = m_MemCachedPayloads[Payload.MemCached].Payload; if (FillRawHashAndRawSize[KeyIndex]) { MemCachedKeyIndexes.push_back(KeyIndex); } m_MemoryHitCount++; } else { if (m_Configuration.MemCacheSizeThreshold > 0) { m_MemoryMissCount++; } if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { StandaloneDiskLocations.push_back(Location); StandaloneKeyIndexes.push_back(KeyIndex); } else { InlineDiskLocations.push_back(Location); InlineBlockLocations.emplace_back(Location.GetBlockLocation(m_Configuration.PayloadAlignment)); InlineKeyIndexes.push_back(KeyIndex); } } } else { if (m_Configuration.MemCacheSizeThreshold > 0) { m_MemoryMissCount++; } } } } // MemCached and MetaData are set independently so we need to check if meta data has been set or if we need to set it even // if we found the data as memcached. // Often we will find the metadata due to the thread setting the mem cached part doing it before us so it is worth // checking if it is present once more before spending time fetching and setting the RawHash and RawSize in metadata auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value, bool UsesTemporaryMemory) { if (!Value) { return; } const IoHash& Key = Batch->Keys[KeyIndex]; size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; OutValue.Value = std::move(Value); OutValue.Value.SetContentType(Location.GetContentType()); bool AddToMemCache = false; bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex]; if (m_Configuration.MemCacheSizeThreshold > 0) { size_t ValueSize = OutValue.Value.GetSize(); if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) { OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); AddToMemCache = true; } } if (AddToMemCache || UsesTemporaryMemory) { // We need to own it if we want to add it to the memcache or the buffer is just a range of the block iteration buffer OutValue.Value.MakeOwned(); } if (SetMetaInfo) { // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the // metadata separately, check if it had time to set the metadata RwLock::SharedLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { BucketPayload& Payload = m_Payloads[UpdateIt->second]; if (Payload.MetaData) { const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData]; OutValue.RawHash = MetaData.RawHash; OutValue.RawSize = MetaData.RawSize; SetMetaInfo = false; } } } if (SetMetaInfo) { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData"); if (Location.IsFlagSet(DiskLocation::kCompressed)) { if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) { OutValue = ZenCacheValue{}; AddToMemCache = false; SetMetaInfo = false; } } else { OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); OutValue.RawSize = OutValue.Value.GetSize(); } if (OutValue.RawSize > std::numeric_limits::max()) { SetMetaInfo = false; } } if (SetMetaInfo || AddToMemCache) { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCache"); RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); { if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { BucketPayload& Payload = m_Payloads[UpdateIt->second]; // Only update if it has not already been updated by other thread if (!Payload.MetaData && SetMetaInfo) { SetMetaData(UpdateIndexLock, Payload, {.RawSize = gsl::narrow(OutValue.RawSize), .RawHash = OutValue.RawHash}); } if (!Payload.MemCached && AddToMemCache) { SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); } } } } }; // We don't want to read into memory if they are to big since we might only want to touch the compressed // header before sending it along if (!InlineDiskLocations.empty()) { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline"); m_BlockStore.IterateChunks(std::span{begin(InlineBlockLocations), end(InlineBlockLocations)}, [&](uint32_t, std::span ChunkIndexes) -> bool { // Up to 8KB or m_Configuration.MemCacheSizeThreshold depending on configuration const uint64_t LargeChunkSizeLimit = m_Configuration.MemCacheSizeThreshold == 0 ? Min(m_Configuration.LargeObjectThreshold, 8u * 1024u) : Max(m_Configuration.MemCacheSizeThreshold, 8u * 1024u); m_BlockStore.IterateBlock( std::span{begin(InlineBlockLocations), end(InlineBlockLocations)}, ChunkIndexes, [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { if (Data != nullptr) { FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], IoBufferBuilder::MakeFromMemory(MemoryView(Data, Size)), /*UsesTemporaryMemory*/ true); } return true; }, [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size), /*UsesTemporaryMemory*/ false); return true; }, LargeChunkSizeLimit); return true; }); } if (!StandaloneDiskLocations.empty()) { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadStandalone"); for (size_t Index = 0; Index < StandaloneDiskLocations.size(); Index++) { size_t KeyIndex = StandaloneKeyIndexes[Index]; const DiskLocation& Location = StandaloneDiskLocations[Index]; FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]), /*UsesTemporaryMemory*/ false); } } if (!MemCachedKeyIndexes.empty()) { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCached"); for (size_t KeyIndex : MemCachedKeyIndexes) { const IoHash& Key = Batch->Keys[KeyIndex]; bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex]; ZEN_ASSERT(SetMetaInfo); ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData"); size_t ResultIndex = Batch->ResultIndexes[KeyIndex]; ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; { // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the // metadata separately, check if it had time to set the metadata RwLock::SharedLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { BucketPayload& Payload = m_Payloads[UpdateIt->second]; if (Payload.MetaData) { const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData]; OutValue.RawHash = MetaData.RawHash; OutValue.RawSize = MetaData.RawSize; SetMetaInfo = false; } } } if (SetMetaInfo) { if (OutValue.Value.GetContentType() == ZenContentType::kCompressedBinary) { if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) { OutValue = ZenCacheValue{}; } } else { OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); OutValue.RawSize = OutValue.Value.GetSize(); } if (OutValue.RawSize <= std::numeric_limits::max()) { RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); { if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { BucketPayload& Payload = m_Payloads[UpdateIt->second]; // Only update if it has not already been updated by other thread if (!Payload.MetaData) { SetMetaData(UpdateIndexLock, Payload, {.RawSize = static_cast(OutValue.RawSize), .RawHash = OutValue.RawHash}); } } } } } } } for (size_t ResultIndex : Batch->ResultIndexes) { bool Hit = !!Batch->OutResults[ResultIndex].Value; if (Hit) { m_DiskHitCount++; StatsScope.SetBytes(Batch->OutResults[ResultIndex].Value.GetSize()); } else { m_DiskMissCount++; } } } delete Batch; } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ZenCacheDiskLayer::CacheBucket::EndGetBatch: '{}'", Ex.what()); } } void ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, GetBatchHandle& BatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Get(Batched)"); BatchHandle.Keys.push_back(HashKey); BatchHandle.ResultIndexes.push_back(BatchHandle.OutResults.size()); BatchHandle.OutResults.push_back({}); } bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { ZEN_TRACE_CPU("Z$::Bucket::Get"); metrics::RequestStats::Scope StatsScope(m_GetOps, 0); RwLock::SharedLockScope IndexLock(m_IndexLock); auto It = m_Index.find(HashKey); if (It == m_Index.end()) { m_DiskMissCount++; if (m_Configuration.MemCacheSizeThreshold > 0) { m_MemoryMissCount++; } return false; } PayloadIndex EntryIndex = It.value(); m_AccessTimes[EntryIndex] = GcClock::TickCount(); DiskLocation Location = m_Payloads[EntryIndex].Location; bool FillRawHashAndRawSize = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0); const BucketPayload* Payload = &m_Payloads[EntryIndex]; if (Payload->MetaData) { const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; OutValue.RawHash = MetaData.RawHash; OutValue.RawSize = MetaData.RawSize; FillRawHashAndRawSize = false; } if (Payload->MemCached) { OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload; Payload = nullptr; IndexLock.ReleaseNow(); m_MemoryHitCount++; } else { Payload = nullptr; IndexLock.ReleaseNow(); if (m_Configuration.MemCacheSizeThreshold > 0) { m_MemoryMissCount++; } if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { OutValue.Value = GetStandaloneCacheValue(Location, HashKey); } else { OutValue.Value = GetInlineCacheValue(Location); if (m_Configuration.MemCacheSizeThreshold > 0) { size_t ValueSize = OutValue.Value.GetSize(); if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) { ZEN_TRACE_CPU("Z$::Bucket::Get::MemCache"); OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end()) { BucketPayload& WritePayload = m_Payloads[UpdateIt->second]; // Only update if it has not already been updated by other thread if (!WritePayload.MemCached) { SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); } } } } } } if (FillRawHashAndRawSize) { ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData"); if (Location.IsFlagSet(DiskLocation::kCompressed)) { if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) { OutValue = ZenCacheValue{}; m_DiskMissCount++; return false; } } else { OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); OutValue.RawSize = OutValue.Value.GetSize(); } if (OutValue.RawSize <= std::numeric_limits::max()) { RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) { BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; // Only set if no other path has already updated the meta data if (!WritePayload.MetaData) { SetMetaData(UpdateIndexLock, WritePayload, {.RawSize = static_cast(OutValue.RawSize), .RawHash = OutValue.RawHash}); } } } } if (OutValue.Value) { m_DiskHitCount++; StatsScope.SetBytes(OutValue.Value.GetSize()); return true; } else { m_DiskMissCount++; return false; } } void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { PutStandaloneCacheValue(HashKey, Value, References); if (OptionalBatchHandle) { OptionalBatchHandle->OutResults.push_back(true); } } else { PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle); } m_DiskWriteCount++; } uint64_t ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) { ZEN_TRACE_CPU("Z$::Bucket::MemCacheTrim"); uint64_t Trimmed = 0; GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); std::vector PurgedBuffers; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); uint32_t MemCachedCount = gsl::narrow(m_MemCachedPayloads.size()); if (MemCachedCount == 0) { return 0; } PurgedBuffers.reserve(MemCachedCount); for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) { MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; if (!Data.Payload) { continue; } PayloadIndex Index = Data.OwnerIndex; ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); GcClock::Tick AccessTime = m_AccessTimes[Index]; if (AccessTime < ExpireTicks) { size_t PayloadSize = Data.Payload.GetSize(); RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); PurgedBuffers.emplace_back(std::move(Data.Payload)); Data.OwnerIndex = {}; // Data = {}; m_Payloads[Index].MemCached = {}; Trimmed += PayloadSize; m_FreeMemCachedPayloads.push_back(MemCachedIndex(ReadIndex)); } } } PurgedBuffers.clear(); return Trimmed; } void ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector& InOutUsageSlots) { ZEN_TRACE_CPU("Z$::Bucket::GetUsageByAccess"); std::vector PayloadSizes; std::vector AccessTimes; size_t SlotCount = InOutUsageSlots.capacity(); { RwLock::SharedLockScope _(m_IndexLock); uint32_t MemCachedCount = gsl::narrow(m_MemCachedPayloads.size()); if (MemCachedCount == 0) { return; } PayloadSizes.reserve(MemCachedCount); AccessTimes.reserve(MemCachedCount); for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) { MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; if (!Data.Payload) { continue; } PayloadIndex Index = Data.OwnerIndex; ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); PayloadSizes.push_back(Data.Payload.GetSize()); AccessTimes.push_back(m_AccessTimes[Index]); } } auto PayloadSizeIt = PayloadSizes.begin(); auto AccessTimeIt = AccessTimes.begin(); for (PayloadSizeIt = PayloadSizes.begin(); PayloadSizeIt != PayloadSizes.end(); PayloadSizeIt++) { ZEN_ASSERT_SLOW(AccessTimeIt != AccessTimes.end()); GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(*AccessTimeIt)); GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0); size_t Slot = Age < MaxAge ? gsl::narrow((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1); ZEN_ASSERT_SLOW(Slot < SlotCount); if (Slot >= InOutUsageSlots.size()) { InOutUsageSlots.resize(Slot + 1, 0); } InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(*PayloadSizeIt); AccessTimeIt++; } } std::function ZenCacheDiskLayer::CacheBucket::Drop() { ZEN_TRACE_CPU("Z$::Bucket::Drop"); m_Gc.RemoveGcReferencer(*this); RwLock::ExclusiveLockScope _(m_IndexLock); std::vector> ShardLocks; ShardLocks.reserve(256); for (RwLock& Lock : m_ShardedLocks) { ShardLocks.push_back(std::make_unique(Lock)); } m_BlockStore.Close(); m_SlogFile.Close(); std::filesystem::path DroppedPath = cache::impl::MoveDroppedDirectory(m_BucketDir); m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); m_MetaDatas.clear(); m_FreeMetaDatas.clear(); m_MemCachedPayloads.clear(); m_FreeMemCachedPayloads.clear(); m_StandaloneSize.store(0); m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load()); m_MemCachedSize.store(0); if (DroppedPath.empty()) { return {}; } else { return [DroppedPath = std::move(DroppedPath)]() { std::error_code Ec; (void)DeleteDirectories(DroppedPath, Ec); if (Ec) { ZEN_WARN("Failed to clean up dropped bucket directory '{}', reason: '{}'", DroppedPath, Ec.message()); } }; } } void ZenCacheDiskLayer::CacheBucket::Flush() { ZEN_TRACE_CPU("Z$::Bucket::Flush"); bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { return; } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); ZEN_INFO("Flushing bucket {}", m_BucketDir); try { m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); SaveSnapshot(); } catch (const std::exception& Ex) { ZEN_WARN("Failed to flush bucket in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); } } void ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function& ClaimDiskReserveFunc) { ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot"); try { // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock const uint64_t LogPosition = m_SlogFile.GetLogCount(); bool UseLegacyScheme = false; IoBuffer Buffer; cache::impl::BucketManifestSerializer ManifestWriter; if (UseLegacyScheme) { std::vector AccessTimes; std::vector Payloads; std::vector MetaDatas; IndexMap Index; { RwLock::SharedLockScope IndexLock(m_IndexLock); WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false); // Note: this copy could be eliminated on shutdown to // reduce memory usage and execution time Index = m_Index; Payloads = m_Payloads; AccessTimes = m_AccessTimes; MetaDatas = m_MetaDatas; } Buffer = ManifestWriter.MakeManifest(m_BucketId, std::move(Index), std::move(AccessTimes), std::move(Payloads), std::move(MetaDatas)); const uint64_t RequiredSpace = Buffer.GetSize() + 1024 * 512; std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) { ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); return; } bool EnoughSpace = Space.Free >= RequiredSpace; if (!EnoughSpace) { uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; } if (!EnoughSpace) { ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", m_BucketDir, NiceBytes(Buffer.GetSize())); return; } } else { RwLock::SharedLockScope IndexLock(m_IndexLock); WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false); const uint64_t EntryCount = m_Index.size(); Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); uint64_t SidecarSize = ManifestWriter.GetSidecarSize(); const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512; std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) { ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); return; } bool EnoughSpace = Space.Free >= RequiredSpace; if (!EnoughSpace) { uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; } if (!EnoughSpace) { ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", m_BucketDir, NiceBytes(Buffer.GetSize())); return; } ManifestWriter.WriteSidecarFile(IndexLock, cache::impl::GetMetaPath(m_BucketDir, m_BucketName), m_LogFlushPosition, m_Index, m_AccessTimes, m_Payloads, m_MetaDatas); } const std::filesystem::path ManifestPath = cache::impl::GetManifestPath(m_BucketDir, m_BucketName); TemporaryFile::SafeWriteFile(ManifestPath, Buffer.GetView()); } catch (const std::exception& Err) { ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); } } void ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::Bucket::Scrub"); ZEN_INFO("scrubbing '{}'", m_BucketDir); Stopwatch Timer; std::atomic_uint64_t ChunkCount = 0; std::atomic_uint64_t VerifiedChunkBytes = 0; auto LogStats = MakeGuard([&] { const uint32_t DurationMs = gsl::narrow(Timer.GetElapsedTimeMs()); ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", m_BucketName, NiceBytes(VerifiedChunkBytes.load()), NiceTimeSpanMs(DurationMs), ChunkCount.load(), NiceRate(VerifiedChunkBytes, DurationMs)); }); RwLock BadKeysLock; std::vector BadKeys; auto ReportBadKey = [&](const IoHash& Key) { BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Key); }); }; try { std::vector ChunkLocations; std::vector ChunkIndexToChunkHash; RwLock::SharedLockScope _(m_IndexLock); const size_t BlockChunkInitialCount = m_Index.size() / 4; ChunkLocations.reserve(BlockChunkInitialCount); ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); // Do a pass over the index and verify any standalone file values straight away // all other storage classes are gathered and verified in bulk in order to enable // more efficient I/O scheduling for (auto& Kv : m_Index) { const IoHash& HashKey = Kv.first; const BucketPayload& Payload = m_Payloads[Kv.second]; const DiskLocation& Loc = Payload.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { Ctx.ThrowIfDeadlineExpired(); ChunkCount.fetch_add(1); VerifiedChunkBytes.fetch_add(Loc.Size()); if (Loc.GetContentType() == ZenContentType::kBinary) { // Blob cache value, not much we can do about data integrity checking // here since there's no hash available ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); std::error_code Ec; uintmax_t size = FileSizeFromPath(DataFilePath.ToPath(), Ec); if (Ec) { ReportBadKey(HashKey); } if (size != Loc.Size()) { ReportBadKey(HashKey); } continue; } else { // Structured cache value IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); if (!Buffer) { ReportBadKey(HashKey); continue; } if (!ValidateIoBuffer(Loc.GetContentType(), Buffer)) { ReportBadKey(HashKey); continue; } } } else { ChunkLocations.emplace_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment)); ChunkIndexToChunkHash.push_back(HashKey); continue; } } const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { ChunkCount.fetch_add(1); VerifiedChunkBytes.fetch_add(Size); const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; if (!Data) { // ChunkLocation out of range of stored blocks ReportBadKey(Hash); return true; } if (!Size) { ReportBadKey(Hash); return true; } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); if (!Buffer) { ReportBadKey(Hash); return true; } const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); if (!ValidateIoBuffer(ContentType, Buffer)) { ReportBadKey(Hash); return true; } return true; }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { Ctx.ThrowIfDeadlineExpired(); ChunkCount.fetch_add(1); VerifiedChunkBytes.fetch_add(Size); const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); if (!Buffer) { ReportBadKey(Hash); return true; } const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); if (!ValidateIoBuffer(ContentType, Buffer)) { ReportBadKey(Hash); return true; } return true; }; m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span ChunkIndexes) { return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0); }); } catch (ScrubDeadlineExpiredException&) { ZEN_INFO("Scrubbing deadline expired, operation incomplete"); } Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes); if (!BadKeys.empty()) { ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir); if (Ctx.RunRecovery()) { // Deal with bad chunks by removing them from our lookup map std::vector LogEntries; LogEntries.reserve(BadKeys.size()); { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (const IoHash& BadKey : BadKeys) { // Log a tombstone and delete the in-memory index for the bad entry const auto It = m_Index.find(BadKey); BucketPayload& Payload = m_Payloads[It->second]; DiskLocation Location = Payload.Location; if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed); } RemoveMemCachedData(IndexLock, Payload); RemoveMetaData(IndexLock, Payload); Location.Flags |= DiskLocation::kTombStone; LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); m_Index.erase(BadKey); } } for (const DiskIndexEntry& Entry : LogEntries) { if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) { ExtendablePathBuilder<256> Path; BuildPath(Path, Entry.Key); fs::path FilePath = Path.ToPath(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); if (IsFile(FilePath)) { ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); std::error_code Ec; RemoveFile(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... } } } m_SlogFile.Append(LogEntries); // Clean up m_AccessTimes and m_Payloads vectors { std::vector Payloads; std::vector AccessTimes; std::vector MetaDatas; std::vector MemCachedPayloads; IndexMap Index; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index); } } } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do if (!BadKeys.empty()) { Ctx.ReportBadCidChunks(BadKeys); } } ZenCacheDiskLayer::BucketStats ZenCacheDiskLayer::CacheBucket::Stats() { GcStorageSize Size = StorageSize(); return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize, .DiskHitCount = m_DiskHitCount, .DiskMissCount = m_DiskMissCount, .DiskWriteCount = m_DiskWriteCount, .MemoryHitCount = m_MemoryHitCount, .MemoryMissCount = m_MemoryMissCount, .MemoryWriteCount = m_MemoryWriteCount, .PutOps = m_PutOps.Snapshot(), .GetOps = m_GetOps.Snapshot()}; } uint64_t ZenCacheDiskLayer::CacheBucket::EntryCount() const { RwLock::SharedLockScope _(m_IndexLock); return static_cast(m_Index.size()); } CacheValueDetails::ValueDetails ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const IoHash& Key, PayloadIndex Index) const { std::vector Attachments; const BucketPayload& Payload = m_Payloads[Index]; if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) { IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location, Key) : GetInlineCacheValue(Payload.Location); CbObjectView Obj(Value.GetData()); Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); } BucketMetaData MetaData = GetMetaData(IndexLock, Payload); return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), .RawSize = MetaData.RawSize, .RawHash = MetaData.RawHash, .LastAccess = m_AccessTimes[Index], .Attachments = std::move(Attachments), .ContentType = Payload.Location.GetContentType()}; } CacheValueDetails::BucketDetails ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const { CacheValueDetails::BucketDetails Details; RwLock::SharedLockScope _(m_IndexLock); if (ValueFilter.empty()) { Details.Values.reserve(m_Index.size()); for (const auto& It : m_Index) { Details.Values.insert_or_assign(It.first, GetValueDetails(IndexLock, It.first, It.second)); } } else { IoHash Key = IoHash::FromHexString(ValueFilter); if (auto It = m_Index.find(Key); It != m_Index.end()) { Details.Values.insert_or_assign(It->first, GetValueDetails(IndexLock, It->first, It->second)); } } return Details; } void ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents( std::function& Fn) const { RwLock::SharedLockScope IndexLock(m_IndexLock); for (const auto& It : m_Index) { CacheValueDetails::ValueDetails Vd = GetValueDetails(IndexLock, It.first, It.second); Fn(It.first, Vd); } } void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References) { ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue"); uint64_t NewFileSize = Value.Value.Size(); TemporaryFile DataFile; std::error_code Ec; DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); } bool CleanUpTempFile = true; auto __ = MakeGuard([&] { if (CleanUpTempFile) { std::error_code Ec; RemoveFile(DataFile.GetPath(), Ec); if (Ec) { ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", DataFile.GetPath(), m_BucketDir, Ec.message()); } } }); DataFile.WriteAll(Value.Value, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", NiceBytes(NewFileSize), DataFile.GetPath().string(), m_BucketDir)); } ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); std::filesystem::path FsPath{DataFilePath.ToPath()}; RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); // We do a speculative remove of the file instead of probing with a exists call and check the error code instead RemoveFile(FsPath, Ec); if (Ec) { if (Ec.value() != ENOENT) { ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); Sleep(100); Ec.clear(); RemoveFile(FsPath, Ec); if (Ec && Ec.value() != ENOENT) { throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); } } } // Assume parent directory exists DataFile.MoveTemporaryIntoPlace(FsPath, Ec); if (Ec) { CreateDirectories(FsPath.parent_path()); // Try again after we or someone else created the directory Ec.clear(); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); // Retry if we still fail to handle contention to file system uint32_t RetriesLeft = 3; while (Ec && RetriesLeft > 0) { ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retries left: {}.", FsPath, DataFile.GetPath(), m_BucketDir, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms Ec.clear(); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); RetriesLeft--; } if (Ec) { throw std::system_error( Ec, fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir)); } } // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file // will be disabled as the file handle has already been closed CleanUpTempFile = false; uint8_t EntryFlags = DiskLocation::kStandaloneFile; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } DiskLocation Loc(NewFileSize, EntryFlags); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); ValueLock.ReleaseNow(); if (m_TrackedCacheKeys) { m_TrackedCacheKeys->insert(HashKey); } if (m_TrackedReferences) { m_TrackedReferences->insert(m_TrackedReferences->end(), References.begin(), References.end()); } PayloadIndex EntryIndex = {}; if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object EntryIndex = PayloadIndex(m_Payloads.size()); m_Payloads.emplace_back(BucketPayload{.Location = Loc}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } else { EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); BucketPayload& Payload = m_Payloads[EntryIndex]; uint64_t OldSize = Payload.Location.Size(); Payload = BucketPayload{.Location = Loc}; m_AccessTimes[EntryIndex] = GcClock::TickCount(); RemoveMemCachedData(IndexLock, Payload); m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed); } if ((Value.RawSize != 0 || Value.RawHash != IoHash::Zero) && Value.RawSize <= std::numeric_limits::max()) { SetMetaData(IndexLock, m_Payloads[EntryIndex], {.RawSize = static_cast(Value.RawSize), .RawHash = Value.RawHash}); } else { RemoveMetaData(IndexLock, m_Payloads[EntryIndex]); } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); m_StandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); } void ZenCacheDiskLayer::CacheBucket::SetMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData) { if (Payload.MetaData) { m_MetaDatas[Payload.MetaData] = MetaData; } else { if (m_FreeMetaDatas.empty()) { Payload.MetaData = MetaDataIndex(m_MetaDatas.size()); m_MetaDatas.emplace_back(MetaData); } else { Payload.MetaData = m_FreeMetaDatas.back(); m_FreeMetaDatas.pop_back(); m_MetaDatas[Payload.MetaData] = MetaData; } } } void ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload) { if (Payload.MetaData) { m_FreeMetaDatas.push_back(Payload.MetaData); Payload.MetaData = {}; } } void ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData) { BucketPayload& Payload = m_Payloads[PayloadIndex]; uint64_t PayloadSize = MemCachedData.GetSize(); ZEN_ASSERT(PayloadSize != 0); if (m_FreeMemCachedPayloads.empty()) { if (m_MemCachedPayloads.size() != std::numeric_limits::max()) { Payload.MemCached = MemCachedIndex(gsl::narrow(m_MemCachedPayloads.size())); m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}); AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } } else { Payload.MemCached = m_FreeMemCachedPayloads.back(); m_FreeMemCachedPayloads.pop_back(); m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}; AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } } size_t ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload) { if (Payload.MemCached) { size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize(); RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemCachedPayloads[Payload.MemCached] = {}; m_FreeMemCachedPayloads.push_back(Payload.MemCached); Payload.MemCached = {}; return PayloadSize; } return 0; } ZenCacheDiskLayer::CacheBucket::BucketMetaData ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const { if (Payload.MetaData) { return m_MetaDatas[Payload.MetaData]; } return {}; } void ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); if (OptionalBatchHandle != nullptr) { OptionalBatchHandle->Buffers.push_back(Value.Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); OptionalBatchHandle->OutResults.push_back(false); std::vector& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); HashKeyAndReferences.push_back(HashKey); HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end()); return; } uint8_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_Configuration.PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { ZEN_MEMSCOPE(GetCacheDiskTag()); ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation"); DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (m_TrackedCacheKeys) { m_TrackedCacheKeys->insert(HashKey); } if (m_TrackedReferences) { m_TrackedReferences->insert(m_TrackedReferences->end(), References.begin(), References.end()); } if (auto It = m_Index.find(HashKey); It != m_Index.end()) { PayloadIndex EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); BucketPayload& Payload = m_Payloads[EntryIndex]; RemoveMemCachedData(IndexLock, Payload); RemoveMetaData(IndexLock, Payload); Payload = (BucketPayload{.Location = Location}); m_AccessTimes[EntryIndex] = GcClock::TickCount(); } else { PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); m_Payloads.emplace_back(BucketPayload{.Location = Location}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } m_SlogFile.Append({.Key = HashKey, .Location = Location}); }); } std::string ZenCacheDiskLayer::CacheBucket::GetGcName(GcCtx&) { return fmt::format("cachebucket: '{}'", m_BucketDir.string()); } class DiskBucketStoreCompactor : public GcStoreCompactor { using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; using CacheBucket = ZenCacheDiskLayer::CacheBucket; public: DiskBucketStoreCompactor(CacheBucket& Bucket, std::vector>&& ExpiredStandaloneKeys, bool FlushBucket) : m_Bucket(Bucket) , m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys)) , m_FlushBucket(FlushBucket) { m_ExpiredStandaloneKeys.shrink_to_fit(); } virtual ~DiskBucketStoreCompactor() {} virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function& ClaimDiskReserveCallback) override { ZEN_TRACE_CPU("Z$::Bucket::CompactStore"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { cache::impl::Reset(m_ExpiredStandaloneKeys); if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': RemovedDisk: {} in {}", m_Bucket.m_BucketDir, NiceBytes(Stats.RemovedDisk), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); if (!m_ExpiredStandaloneKeys.empty()) { // Compact standalone items size_t Skipped = 0; ExtendablePathBuilder<256> Path; for (const std::pair& ExpiredKey : m_ExpiredStandaloneKeys) { if (Ctx.IsCancelledFlag.load()) { return; } Path.Reset(); m_Bucket.BuildPath(Path, ExpiredKey.first); fs::path FilePath = Path.ToPath(); RwLock::SharedLockScope IndexLock(m_Bucket.m_IndexLock); if (m_Bucket.m_Index.contains(ExpiredKey.first)) { // Someone added it back, let the file on disk be ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back", m_Bucket.m_BucketDir, Path.ToUtf8()); continue; } if (Ctx.Settings.IsDeleteMode) { RwLock::ExclusiveLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first)); IndexLock.ReleaseNow(); ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); std::error_code Ec; if (!RemoveFile(FilePath, Ec)) { continue; } if (Ec) { ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'", m_Bucket.m_BucketDir, Path.ToUtf8(), Ec.message()); continue; } Stats.RemovedDisk += ExpiredKey.second; } else { RwLock::SharedLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first)); IndexLock.ReleaseNow(); ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); std::error_code Ec; bool Existed = IsFile(FilePath, Ec); if (Ec) { ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'", m_Bucket.m_BucketDir, FilePath, Ec.message()); continue; } if (!Existed) { continue; } Skipped++; } } if (Skipped > 0) { ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipped deleting of {} eligible files", m_Bucket.m_BucketDir, Skipped); } } if (Ctx.Settings.CollectSmallObjects) { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_TrackedCacheKeys = std::make_unique(); }); auto __ = MakeGuard([&]() { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_TrackedCacheKeys.reset(); }); }); size_t InlineEntryCount = 0; BlockStore::BlockUsageMap BlockUsage; { RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); for (const auto& Entry : m_Bucket.m_Index) { PayloadIndex Index = Entry.second; const BucketPayload& Payload = m_Bucket.m_Payloads[Index]; const DiskLocation& Loc = Payload.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { continue; } InlineEntryCount++; uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex(); uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment); if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end()) { BlockStore::BlockUsageInfo& Info = It.value(); Info.EntryCount++; Info.DiskUsage += ChunkSize; } else { BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); } } } { BlockStoreCompactState BlockCompactState; std::vector BlockCompactStateKeys; BlockCompactStateKeys.reserve(InlineEntryCount); BlockStore::BlockEntryCountMap BlocksToCompact = m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent); BlockCompactState.IncludeBlocks(BlocksToCompact); if (BlocksToCompact.size() > 0) { { RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); for (const auto& Entry : m_Bucket.m_Index) { PayloadIndex Index = Entry.second; const BucketPayload& Payload = m_Bucket.m_Payloads[Index]; const DiskLocation& Loc = Payload.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { continue; } if (!BlockCompactState.AddKeepLocation(Loc.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment))) { continue; } BlockCompactStateKeys.push_back(Entry.first); } } if (Ctx.Settings.IsDeleteMode) { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks", m_Bucket.m_BucketDir, BlocksToCompact.size()); } m_Bucket.m_BlockStore.CompactBlocks( BlockCompactState, m_Bucket.m_Configuration.PayloadAlignment, [&](const BlockStore::MovedChunksArray& MovedArray, const BlockStore::ChunkIndexArray& ScrubbedArray, uint64_t FreedDiskSpace) { std::vector MovedEntries; MovedEntries.reserve(MovedArray.size()); RwLock::ExclusiveLockScope IndexLock(m_Bucket.m_IndexLock); for (const std::pair& Moved : MovedArray) { size_t ChunkIndex = Moved.first; const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; ZEN_ASSERT(m_Bucket.m_TrackedCacheKeys); if (m_Bucket.m_TrackedCacheKeys->contains(Key)) { continue; } if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end()) { BucketPayload& Payload = m_Bucket.m_Payloads[It->second]; const BlockStoreLocation& NewLocation = Moved.second; Payload.Location = DiskLocation(NewLocation, m_Bucket.m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); } } for (size_t ScrubbedIndex : ScrubbedArray) { const IoHash& Key = BlockCompactStateKeys[ScrubbedIndex]; if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end()) { BucketPayload& Payload = m_Bucket.m_Payloads[It->second]; DiskLocation Location = Payload.Location; m_Bucket.RemoveMemCachedData(IndexLock, Payload); m_Bucket.RemoveMetaData(IndexLock, Payload); Location.Flags |= DiskLocation::kTombStone; MovedEntries.push_back(DiskIndexEntry{.Key = Key, .Location = Location}); } } m_Bucket.m_SlogFile.Append(MovedEntries); Stats.RemovedDisk += FreedDiskSpace; if (Ctx.IsCancelledFlag.load()) { return false; } return true; }, ClaimDiskReserveCallback, fmt::format("GCV2: cachebucket [COMPACT] '{}': ", m_Bucket.m_BucketDir)); } else { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks", m_Bucket.m_BucketDir, BlocksToCompact.size()); } } } } } if (m_FlushBucket) { m_Bucket.Flush(); } } virtual std::string GetGcName(GcCtx& Ctx) override { return m_Bucket.GetGcName(Ctx); } private: ZenCacheDiskLayer::CacheBucket& m_Bucket; std::vector> m_ExpiredStandaloneKeys; bool m_FlushBucket = false; }; GcStoreCompactor* ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { ZEN_TRACE_CPU("Z$::Bucket::RemoveExpiredData"); auto Log = [&Ctx]() { return Ctx.Logger; }; size_t TotalEntries = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: cachebucket [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}", m_BucketDir, Stats.CheckedCount, Stats.FoundCount, Stats.DeletedCount, NiceBytes(Stats.FreedMemory), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } }); const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count(); std::vector ExpiredEntries; std::vector> ExpiredStandaloneKeys; uint64_t RemovedStandaloneSize = 0; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (Ctx.IsCancelledFlag.load()) { return nullptr; } TotalEntries = m_Index.size(); // Find out expired keys for (const auto& Entry : m_Index) { const IoHash& Key = Entry.first; PayloadIndex EntryIndex = Entry.second; GcClock::Tick AccessTime = m_AccessTimes[EntryIndex]; if (AccessTime >= ExpireTicks) { continue; } const BucketPayload& Payload = m_Payloads[EntryIndex]; DiskIndexEntry ExpiredEntry = {.Key = Key, .Location = Payload.Location}; ExpiredEntry.Location.Flags |= DiskLocation::kTombStone; if (Payload.Location.Flags & DiskLocation::kStandaloneFile) { ExpiredStandaloneKeys.push_back({Key, Payload.Location.Size()}); RemovedStandaloneSize += Payload.Location.Size(); ExpiredEntries.push_back(ExpiredEntry); } else if (Ctx.Settings.CollectSmallObjects) { ExpiredEntries.push_back(ExpiredEntry); } } Stats.CheckedCount += TotalEntries; Stats.FoundCount += ExpiredEntries.size(); if (Ctx.IsCancelledFlag.load()) { return nullptr; } if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty()) { for (const DiskIndexEntry& Entry : ExpiredEntries) { auto It = m_Index.find(Entry.Key); ZEN_ASSERT(It != m_Index.end()); BucketPayload& Payload = m_Payloads[It->second]; RemoveMetaData(IndexLock, Payload); Stats.FreedMemory += RemoveMemCachedData(IndexLock, Payload); m_Index.erase(It); Stats.DeletedCount++; } m_SlogFile.Append(ExpiredEntries); m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed); } } if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty()) { std::vector Payloads; std::vector AccessTimes; std::vector MetaDatas; std::vector MemCachedPayloads; IndexMap Index; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index); } } if (Ctx.IsCancelledFlag.load()) { return nullptr; } return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys), /*FlushBucket*/ Stats.DeletedCount > 0); } bool ZenCacheDiskLayer::CacheBucket::ReadAttachmentsFromMetaData(uint32_t BlockIndex, std::span InlineKeys, std::span ChunkIndexes, std::vector& OutReferences) const { ZEN_TRACE_CPU("Z$::Bucket::ReadAttachmentsFromMetaData"); IoBuffer MetaDataPayload = m_BlockStore.GetMetaData(BlockIndex); if (MetaDataPayload) { tsl::robin_set WantedKeys; WantedKeys.reserve(ChunkIndexes.size()); for (const size_t ChunkIndex : ChunkIndexes) { WantedKeys.insert(InlineKeys[ChunkIndex]); } ZEN_TRACE_CPU("Z$::Bucket::GetAttachmentsFromMetaData"); return GetAttachmentsFromMetaData( MetaDataPayload, cache::impl::BlockMetaDataExpectedMagic, [&](std::span Keys, std::span AttachmentCounts, std::span Attachments) { auto AttachmentReadIt = Attachments.begin(); OutReferences.resize(OutReferences.size() + Attachments.size()); auto OutReferencesWriteIt = OutReferences.end() - Attachments.size(); auto KeyIt = Keys.begin(); for (uint32_t AttachmentCount : AttachmentCounts) { if (AttachmentCount > 0) { if (WantedKeys.contains(*KeyIt)) { memcpy(&(*OutReferencesWriteIt), &(*AttachmentReadIt), sizeof(IoHash) * AttachmentCount); OutReferencesWriteIt += AttachmentCount; AttachmentReadIt += AttachmentCount; } else { AttachmentReadIt += AttachmentCount; } } KeyIt++; } OutReferences.erase(OutReferencesWriteIt, OutReferences.end()); }); } return false; } bool ZenCacheDiskLayer::CacheBucket::GetReferences(const LoggerRef& Logger, std::atomic_bool& IsCancelledFlag, bool StateIsAlreadyLocked, bool ReadCacheAttachmentMetaData, bool WriteCacheAttachmentMetaData, std::vector& OutReferences, ReferencesStats* OptionalOutReferencesStats) { ZEN_TRACE_CPU("Z$::Bucket::GetReferencesLocked"); auto Log = [&Logger]() { return Logger; }; auto GetAttachments = [&](MemoryView Data) -> bool { if (ValidateCompactBinary(Data, CbValidateMode::Default) == CbValidateError::None) { CbObjectView Obj(Data.GetData()); Obj.IterateAttachments([&](CbFieldView Field) { OutReferences.emplace_back(Field.AsAttachment()); }); return true; } return false; }; std::vector> StandaloneKeys; { std::vector InlineKeys; std::vector InlineLocations; std::vector> InlineBlockChunkIndexes; { std::unordered_map BlockIndexToChunkIndexes; std::unique_ptr StateLock; if (!StateIsAlreadyLocked) { StateLock = std::make_unique(m_IndexLock); } for (const auto& Entry : m_Index) { if (IsCancelledFlag.load()) { return false; } PayloadIndex EntryIndex = Entry.second; const BucketPayload& Payload = m_Payloads[EntryIndex]; const DiskLocation& Loc = Payload.Location; if (OptionalOutReferencesStats != nullptr) { OptionalOutReferencesStats->ValueSizes.push_back(Loc.Size()); } if (!Loc.IsFlagSet(DiskLocation::kStructured)) { continue; } if (OptionalOutReferencesStats) { OptionalOutReferencesStats->StructuredValuesCount++; } const IoHash& Key = Entry.first; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { StandaloneKeys.push_back(std::make_pair(Key, Loc)); if (OptionalOutReferencesStats) { OptionalOutReferencesStats->StandaloneValuesCount++; } continue; } BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); size_t ChunkIndex = InlineLocations.size(); InlineLocations.push_back(ChunkLocation); InlineKeys.push_back(Key); if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end()) { InlineBlockChunkIndexes[It->second].push_back(ChunkIndex); } else { BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size()); InlineBlockChunkIndexes.emplace_back(std::vector{ChunkIndex}); } } } OutReferences.reserve(OutReferences.size() + InlineKeys.size() + StandaloneKeys.size()); // Make space for at least one attachment per record for (const std::vector& ChunkIndexes : InlineBlockChunkIndexes) { ZEN_ASSERT(!ChunkIndexes.empty()); uint32_t BlockIndex = InlineLocations[ChunkIndexes[0]].BlockIndex; if (!ReadCacheAttachmentMetaData || !ReadAttachmentsFromMetaData(BlockIndex, InlineKeys, ChunkIndexes, OutReferences)) { std::vector Keys; std::vector AttachmentCounts; size_t PrecachedReferencesStart = OutReferences.size(); size_t NextPrecachedReferencesStart = PrecachedReferencesStart; bool WriteMetaData = WriteCacheAttachmentMetaData && !m_BlockStore.IsWriting(BlockIndex); if (WriteMetaData) { Keys.reserve(InlineLocations.size()); } auto CaptureAttachments = [&](size_t ChunkIndex, MemoryView Data) { if (GetAttachments(Data)) { if (WriteMetaData) { size_t AttachmentCount = OutReferences.size() - NextPrecachedReferencesStart; if (AttachmentCount > 0) { Keys.push_back(InlineKeys[ChunkIndex]); AttachmentCounts.push_back(gsl::narrow(AttachmentCount)); NextPrecachedReferencesStart += AttachmentCount; } } } }; bool Continue = m_BlockStore.IterateBlock( InlineLocations, ChunkIndexes, [&](size_t ChunkIndex, const void* Data, uint64_t Size) { ZEN_UNUSED(ChunkIndex); CaptureAttachments(ChunkIndex, MemoryView(Data, Size)); return !IsCancelledFlag.load(); }, [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { ZEN_UNUSED(ChunkIndex); CaptureAttachments(ChunkIndex, File.GetChunk(Offset, Size).GetView()); return !IsCancelledFlag.load(); }, 32u * 1024); if (Continue) { if (WriteMetaData) { ZEN_ASSERT(Keys.size() == AttachmentCounts.size()); IoBuffer MetaDataPayload = BuildReferenceMetaData( cache::impl::BlockMetaDataExpectedMagic, Keys, AttachmentCounts, std::span(OutReferences) .subspan(PrecachedReferencesStart, OutReferences.size() - PrecachedReferencesStart)) .Flatten() .AsIoBuffer(); m_BlockStore.SetMetaData(BlockIndex, MetaDataPayload); } } else { return false; } } if (IsCancelledFlag.load()) { return false; } } } for (const auto& It : StandaloneKeys) { if (IsCancelledFlag.load()) { return false; } IoBuffer Buffer = GetStandaloneCacheValue(It.second, It.first); if (Buffer) { GetAttachments(Buffer.GetView()); } } return true; } class DiskBucketReferenceChecker : public GcReferenceChecker { using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; using CacheBucket = ZenCacheDiskLayer::CacheBucket; public: DiskBucketReferenceChecker(CacheBucket& Owner) : m_CacheBucket(Owner) {} virtual ~DiskBucketReferenceChecker() { try { m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); } catch (const std::exception& Ex) { ZEN_ERROR("~DiskBucketReferenceChecker threw exception: '{}'", Ex.what()); } } virtual std::string GetGcName(GcCtx& Ctx) override { return m_CacheBucket.GetGcName(Ctx); } virtual void PreCache(GcCtx& Ctx) override { ZEN_TRACE_CPU("Z$::Bucket::PreCache"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, m_PrecachedReferences.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique>(); }); bool Continue = m_CacheBucket.GetReferences(Ctx.Logger, Ctx.IsCancelledFlag, /*StateIsAlreadyLocked*/ false, Ctx.Settings.StoreCacheAttachmentMetaData, Ctx.Settings.StoreCacheAttachmentMetaData, m_PrecachedReferences, /*OptionalOutReferencesStats*/ nullptr); if (!Continue) { m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); return; } FilterReferences(Ctx, fmt::format("cachebucket [PRECACHE] '{}'", m_CacheBucket.m_BucketDir), m_PrecachedReferences); } virtual void UpdateLockedState(GcCtx& Ctx) override { ZEN_TRACE_CPU("Z$::Bucket::UpdateLockedState"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, m_PrecachedReferences.size() + m_AddedReferences.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); if (Ctx.IsCancelledFlag.load()) { m_PrecachedReferences = {}; m_CacheBucket.m_TrackedReferences.reset(); return; } ZEN_ASSERT(m_CacheBucket.m_TrackedReferences); m_AddedReferences = std::move(*m_CacheBucket.m_TrackedReferences); FilterReferences(Ctx, fmt::format("cachebucket [LOCKSTATE] '{}'", m_CacheBucket.m_BucketDir), m_AddedReferences); } virtual std::span GetUnusedReferences(GcCtx& Ctx, std::span IoCids) override { ZEN_TRACE_CPU("Z$::Bucket::GetUnusedReferences"); auto Log = [&Ctx]() { return Ctx.Logger; }; const size_t InitialCount = IoCids.size(); size_t UsedCount = InitialCount; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", m_CacheBucket.m_BucketDir, UsedCount, InitialCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::span UnusedReferences = KeepUnusedReferences(m_PrecachedReferences, IoCids); UnusedReferences = KeepUnusedReferences(m_AddedReferences, UnusedReferences); UsedCount = IoCids.size() - UnusedReferences.size(); return UnusedReferences; } CacheBucket& m_CacheBucket; std::vector m_PrecachedReferences; std::vector m_AddedReferences; }; std::vector ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) { ZEN_TRACE_CPU("Z$::Bucket::CreateReferenceCheckers"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': completed in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); { RwLock::SharedLockScope __(m_IndexLock); if (m_Index.empty()) { return {}; } } return {new DiskBucketReferenceChecker(*this)}; } std::vector ZenCacheDiskLayer::CacheBucket::CreateReferenceValidators(GcCtx& /*Ctx*/) { return {}; } void ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, std::vector& Payloads, std::vector& AccessTimes, std::vector& MetaDatas, std::vector& MemCachedPayloads, IndexMap& Index) { ZEN_TRACE_CPU("Z$::Bucket::CompactState"); size_t EntryCount = m_Index.size(); Payloads.reserve(EntryCount); AccessTimes.reserve(EntryCount); Index.reserve(EntryCount); Index.min_load_factor(cache::impl::IndexMinLoadFactor); Index.max_load_factor(cache::impl::IndexMaxLoadFactor); for (auto It : m_Index) { PayloadIndex EntryIndex = PayloadIndex(Payloads.size()); Payloads.push_back(m_Payloads[It.second]); BucketPayload& Payload = Payloads.back(); AccessTimes.push_back(m_AccessTimes[It.second]); if (Payload.MetaData) { MetaDatas.push_back(m_MetaDatas[Payload.MetaData]); Payload.MetaData = MetaDataIndex(MetaDatas.size() - 1); } if (Payload.MemCached) { MemCachedPayloads.emplace_back( MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex}); Payload.MemCached = MemCachedIndex(gsl::narrow(MemCachedPayloads.size() - 1)); } Index.insert({It.first, EntryIndex}); } m_Index.swap(Index); m_Payloads.swap(Payloads); m_AccessTimes.swap(AccessTimes); m_MetaDatas.swap(MetaDatas); cache::impl::Reset(m_FreeMetaDatas); m_MemCachedPayloads.swap(MemCachedPayloads); cache::impl::Reset(m_FreeMemCachedPayloads); } RwLock::SharedLockScope ZenCacheDiskLayer::CacheBucket::GetGcReferencerLock() { return RwLock::SharedLockScope(m_IndexLock); } #if ZEN_WITH_TESTS void ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time) { GcClock::Tick TimeTick = Time.time_since_epoch().count(); RwLock::SharedLockScope IndexLock(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) { size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); m_AccessTimes[EntryIndex] = TimeTick; } } #endif // ZEN_WITH_TESTS ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config) : m_Gc(Gc) , m_JobQueue(JobQueue) , m_RootDir(RootDir) , m_Configuration(Config) { } ZenCacheDiskLayer::~ZenCacheDiskLayer() { try { { RwLock::ExclusiveLockScope _(m_Lock); for (auto& It : m_Buckets) { m_DroppedBuckets.emplace_back(std::move(It.second)); } m_Buckets.clear(); } // We destroy the buckets without holding a lock since destructor calls GcManager::RemoveGcReferencer which takes an exclusive lock. // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock m_DroppedBuckets.clear(); } catch (const std::exception& Ex) { ZEN_ERROR("~ZenCacheDiskLayer() failed. Reason: '{}'", Ex.what()); } } ZenCacheDiskLayer::CacheBucket* ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) { ZEN_TRACE_CPU("Z$::GetOrCreateBucket"); { RwLock::SharedLockScope SharedLock(m_Lock); if (auto It = m_Buckets.find_as(InBucket, std::hash(), eastl::equal_to_2()); It != m_Buckets.end()) { return It->second.get(); } } // We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock. // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig; if (auto It = m_Configuration.BucketConfigMap.find_as(InBucket, std::hash(), eastl::equal_to_2()); It != m_Configuration.BucketConfigMap.end()) { BucketConfig = &It->second; } std::unique_ptr Bucket(std::make_unique(m_Gc, m_TotalMemCachedSize, InBucket, *BucketConfig)); RwLock::ExclusiveLockScope Lock(m_Lock); if (auto It = m_Buckets.find_as(InBucket, std::hash(), eastl::equal_to_2()); It != m_Buckets.end()) { return It->second.get(); } std::filesystem::path BucketPath = m_RootDir; BucketPath /= InBucket; try { if (!Bucket->OpenOrCreate(BucketPath)) { ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", InBucket, m_RootDir); return nullptr; } } catch (const std::exception& Err) { ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", InBucket, BucketPath, Err.what()); throw; } std::string BucketName{InBucket}; CacheBucket* Result = Bucket.get(); m_Buckets.emplace(BucketName, std::move(Bucket)); if (m_CapturedBuckets) { m_CapturedBuckets->push_back(std::string(BucketName)); } return Result; } struct ZenCacheDiskLayer::PutBatchHandle { PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; CacheBucket::PutBatchHandle* Handle; }; void ForEach(const std::function& CB) noexcept { try { RwLock::SharedLockScope _(Lock); for (ZenCacheDiskLayer::PutBatchHandle::BucketHandle& BucketHandle : BucketHandles) { ZEN_ASSERT(BucketHandle.Bucket); ZEN_ASSERT(BucketHandle.Handle); CB(BucketHandle.Bucket, BucketHandle.Handle); } } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ZenCacheDiskLayer::PutBatchHandle::ForEach: '{}'", Ex.what()); } } CacheBucket::PutBatchHandle* GetHandle(CacheBucket* Bucket) { { RwLock::SharedLockScope _(Lock); if (auto It = std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); It != BucketHandles.end()) { return It->Handle; } } CacheBucket::PutBatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults); if (NewBucketHandle == nullptr) { return nullptr; } RwLock::ExclusiveLockScope _(Lock); if (auto It = std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); It != BucketHandles.end()) { CacheBucket::PutBatchHandle* Result = It->Handle; ZEN_ASSERT(Result != nullptr); _.ReleaseNow(); Bucket->EndPutBatch(NewBucketHandle); return Result; } BucketHandles.push_back(ZenCacheDiskLayer::PutBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle}); return NewBucketHandle; } RwLock Lock; std::vector BucketHandles; std::vector& OutResults; }; ZenCacheDiskLayer::PutBatchHandle* ZenCacheDiskLayer::BeginPutBatch(std::vector& OutResults) { return new PutBatchHandle(OutResults); } void ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept { try { ZEN_ASSERT(Batch); Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->EndPutBatch(Handle); }); delete Batch; } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ZenCacheDiskLayer::EndPutBatch: '{}'", Ex.what()); } } struct ZenCacheDiskLayer::GetBatchHandle { GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; CacheBucket::GetBatchHandle* Handle; }; void ForEach(const std::function& CB) noexcept { try { RwLock::SharedLockScope _(Lock); for (ZenCacheDiskLayer::GetBatchHandle::BucketHandle& BucketHandle : BucketHandles) { ZEN_ASSERT(BucketHandle.Bucket); ZEN_ASSERT(BucketHandle.Handle); CB(BucketHandle.Bucket, BucketHandle.Handle); } } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ZenCacheDiskLayer::GetBatchHandle::ForEach: '{}'", Ex.what()); } } CacheBucket::GetBatchHandle* GetHandle(CacheBucket* Bucket) { { RwLock::SharedLockScope _(Lock); if (auto It = std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); It != BucketHandles.end()) { return It->Handle; } } CacheBucket::GetBatchHandle* NewBucketHandle = Bucket->BeginGetBatch(OutResults); if (NewBucketHandle == nullptr) { return nullptr; } RwLock::ExclusiveLockScope _(Lock); if (auto It = std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; }); It != BucketHandles.end()) { CacheBucket::GetBatchHandle* Result = It->Handle; ZEN_ASSERT(Result != nullptr); _.ReleaseNow(); Bucket->EndGetBatch(NewBucketHandle); return Result; } BucketHandles.push_back(ZenCacheDiskLayer::GetBatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle}); return NewBucketHandle; } RwLock Lock; eastl::fixed_vector BucketHandles; ZenCacheValueVec_t& OutResults; }; ZenCacheDiskLayer::GetBatchHandle* ZenCacheDiskLayer::BeginGetBatch(ZenCacheValueVec_t& OutResults) { return new GetBatchHandle(OutResults); } void ZenCacheDiskLayer::EndGetBatch(GetBatchHandle* Batch) noexcept { ZEN_TRACE_CPU("Z$::EndGetBatch"); try { ZEN_ASSERT(Batch); Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->EndGetBatch(Handle); }); TryMemCacheTrim(); delete Batch; } catch (const std::exception& Ex) { ZEN_ERROR("Exception in ZenCacheDiskLayer::EndGetBatch: '{}'", Ex.what()); } } bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { ZEN_TRACE_CPU("Z$::Get"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { if (Bucket->Get(HashKey, OutValue)) { TryMemCacheTrim(); return true; } } return false; } void ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatchHandle& BatchHandle) { ZEN_TRACE_CPU("Z$::Get(Batched)"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { Bucket->Get(HashKey, *BatchHandle.GetHandle(Bucket)); } } void ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); Bucket->Put(HashKey, Value, References, BucketBatchHandle); TryMemCacheTrim(); } } void ZenCacheDiskLayer::DiscoverBuckets() { ZEN_TRACE_CPU("Z$::DiscoverBuckets"); DirectoryContent DirContent; GetDirectoryContent(m_RootDir, DirectoryContentFlags::IncludeDirs, DirContent); // Initialize buckets std::vector BadBucketDirectories; std::vector FoundBucketDirectories; RwLock::ExclusiveLockScope _(m_Lock); for (const std::filesystem::path& BucketPath : DirContent.Directories) { const std::string BucketName = PathToUtf8(BucketPath.stem()); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { continue; } if (IsKnownBadBucketName(BucketName)) { BadBucketDirectories.push_back(BucketPath); continue; } else if (BucketName.starts_with("[dropped]")) { BadBucketDirectories.push_back(BucketPath); continue; } FoundBucketDirectories.push_back(BucketPath); ZEN_INFO("Discovered bucket '{}'", BucketName); } for (const std::filesystem::path& BadBucketPath : BadBucketDirectories) { bool IsOk = false; try { IsOk = DeleteDirectories(BadBucketPath); } catch (const std::exception&) { } if (IsOk) { ZEN_INFO("found bad bucket at '{}', deleted contents", BadBucketPath); } else { ZEN_WARN("bad bucket delete failed for '{}'", BadBucketPath); } } RwLock SyncLock; WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst); std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); try { for (auto& BucketPath : FoundBucketDirectories) { Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic&) { ZEN_MEMSCOPE(GetCacheDiskTag()); const std::string BucketName = PathToUtf8(BucketPath.stem()); try { BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig; if (auto It = m_Configuration.BucketConfigMap.find_as(std::string_view(BucketName), std::hash(), eastl::equal_to_2()); It != m_Configuration.BucketConfigMap.end()) { BucketConfig = &It->second; } std::unique_ptr NewBucket = std::make_unique(m_Gc, m_TotalMemCachedSize, BucketName, *BucketConfig); CacheBucket* Bucket = nullptr; { RwLock::ExclusiveLockScope __(SyncLock); auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket)); Bucket = InsertResult.first->second.get(); } ZEN_ASSERT(Bucket); if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false)) { ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); { RwLock::ExclusiveLockScope __(SyncLock); m_Buckets.erase(BucketName); } } } catch (const std::exception& Err) { ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); return; } }); } } catch (const std::exception& Ex) { AbortFlag.store(true); ZEN_WARN("Failed discovering buckets in {}. Reason: '{}'", m_RootDir, Ex.what()); } Work.Wait(); } std::function ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { ZEN_TRACE_CPU("Z$::DropBucket"); RwLock::ExclusiveLockScope _(m_Lock); auto It = m_Buckets.find(std::string(InBucket)); if (It != m_Buckets.end()) { CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); return Bucket.Drop(); } std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); std::filesystem::path DroppedPath = cache::impl::MoveDroppedDirectory(BucketPath); if (DroppedPath.empty()) { return {}; } else { return [DroppedPath = std::move(DroppedPath)]() { std::error_code Ec; (void)DeleteDirectories(DroppedPath, Ec); if (Ec) { ZEN_WARN("Failed to clean up dropped bucket directory '{}', reason: '{}'", DroppedPath, Ec.message()); } }; } } std::function ZenCacheDiskLayer::Drop() { ZEN_TRACE_CPU("Z$::Drop"); std::vector> PostDropOps; { RwLock::ExclusiveLockScope _(m_Lock); PostDropOps.reserve(m_Buckets.size()); while (!m_Buckets.empty()) { const auto& It = m_Buckets.begin(); CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); if (std::function PostDropOp = Bucket.Drop(); !PostDropOp) { return {}; } else { PostDropOps.emplace_back(std::move(PostDropOp)); } } } std::filesystem::path DroppedPath = cache::impl::MoveDroppedDirectory(m_RootDir); if (DroppedPath.empty()) { return {}; } else { return [DroppedPath = std::move(DroppedPath), PostDropOps = std::move(PostDropOps)]() { for (auto& PostDropOp : PostDropOps) { PostDropOp(); } std::error_code Ec; (void)DeleteDirectories(DroppedPath, Ec); if (Ec) { ZEN_WARN("Failed to clean up dropped bucket directory '{}', reason: '{}'", DroppedPath, Ec.message()); } }; } } void ZenCacheDiskLayer::Flush() { ZEN_MEMSCOPE(GetCacheDiskTag()); ZEN_TRACE_CPU("Z$::Flush"); std::vector Buckets; Stopwatch Timer; const auto _ = MakeGuard([&] { if (Buckets.empty()) { return; } ZEN_INFO("Flushed {} buckets at '{}' in {}", Buckets.size(), m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); { RwLock::SharedLockScope __(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { CacheBucket* Bucket = Kv.second.get(); Buckets.push_back(Bucket); } } { WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst); std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); try { for (auto& Bucket : Buckets) { Work.ScheduleWork(Pool, [Bucket](std::atomic&) { ZEN_MEMSCOPE(GetCacheDiskTag()); try { Bucket->Flush(); } catch (const std::exception& Ex) { ZEN_ERROR("Failed flushing bucket. Reason: '{}'", Ex.what()); } }); } } catch (const std::exception& Ex) { AbortFlag.store(true); ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what()); } Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t RemainingWork) { ZEN_UNUSED(IsAborted, IsPaused); ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", RemainingWork, m_RootDir); }); } } void ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::ScrubStorage"); RwLock::SharedLockScope _(m_Lock); { std::vector> Results; Results.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { #if 1 Results.push_back(Ctx.ThreadPool().EnqueueTask( std::packaged_task{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); #else CacheBucket& Bucket = *Kv.second; Bucket.ScrubStorage(Ctx); #endif } for (auto& Result : Results) { if (Result.valid()) { Result.wait(); } } for (auto& Result : Results) { Result.get(); } } } GcStorageSize ZenCacheDiskLayer::StorageSize() const { GcStorageSize StorageSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { GcStorageSize BucketSize = Kv.second->StorageSize(); StorageSize.DiskSize += BucketSize.DiskSize; StorageSize.MemorySize += BucketSize.MemorySize; } return StorageSize; } ZenCacheDiskLayer::DiskStats ZenCacheDiskLayer::Stats() const { GcStorageSize Size = StorageSize(); ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; { RwLock::SharedLockScope _(m_Lock); Stats.BucketStats.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Stats.BucketStats.emplace_back(NamedBucketStats{.BucketName = Kv.first, .Stats = Kv.second->Stats()}); } } return Stats; } ZenCacheDiskLayer::Info ZenCacheDiskLayer::GetInfo() const { ZenCacheDiskLayer::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration}; { RwLock::SharedLockScope _(m_Lock); Info.BucketNames.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Info.BucketNames.push_back(Kv.first); Info.EntryCount += Kv.second->EntryCount(); GcStorageSize BucketSize = Kv.second->StorageSize(); Info.StorageSize.DiskSize += BucketSize.DiskSize; Info.StorageSize.MemorySize += BucketSize.MemorySize; } } return Info; } std::optional ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()}; } return {}; } void ZenCacheDiskLayer::EnumerateBucketContents(std::string_view Bucket, std::function& Fn) const { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { It->second->EnumerateBucketContents(Fn); } } CacheValueDetails::NamespaceDetails ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const { CacheValueDetails::NamespaceDetails Details; { RwLock::SharedLockScope IndexLock(m_Lock); if (BucketFilter.empty()) { Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); for (auto& Kv : m_Buckets) { Details.Buckets[Kv.first] = Kv.second->GetValueDetails(IndexLock, ValueFilter); } } else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) { Details.Buckets[It->first] = It->second->GetValueDetails(IndexLock, ValueFilter); } } return Details; } std::vector ZenCacheDiskLayer::GetGcReferencerLocks() { std::vector Locks; Locks.emplace_back(RwLock::SharedLockScope(m_Lock)); for (auto& Kv : m_Buckets) { Locks.emplace_back(Kv.second->GetGcReferencerLock()); } return Locks; } void ZenCacheDiskLayer::EnableUpdateCapture() { m_Lock.WithExclusiveLock([&]() { if (m_UpdateCaptureRefCounter == 0) { ZEN_ASSERT(!m_CapturedBuckets); m_CapturedBuckets = std::make_unique>(); } else { ZEN_ASSERT(m_CapturedBuckets); } m_UpdateCaptureRefCounter++; }); } void ZenCacheDiskLayer::DisableUpdateCapture() { m_Lock.WithExclusiveLock([&]() { ZEN_ASSERT(m_CapturedBuckets); ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); m_UpdateCaptureRefCounter--; if (m_UpdateCaptureRefCounter == 0) { m_CapturedBuckets.reset(); } }); } std::vector ZenCacheDiskLayer::GetCapturedBucketsLocked() { if (m_CapturedBuckets) { return *m_CapturedBuckets; } return {}; } bool ZenCacheDiskLayer::GetContentStats(std::string_view BucketName, CacheContentStats& OutContentStats) const { std::atomic_bool CancelFlag = false; if (auto It = m_Buckets.find(std::string(BucketName)); It != m_Buckets.end()) { CacheBucket::ReferencesStats BucketStats; if (It->second->GetReferences(Log(), CancelFlag, false, true, false, OutContentStats.Attachments, &BucketStats)) { OutContentStats.ValueSizes = std::move(BucketStats.ValueSizes); OutContentStats.StructuredValuesCount = BucketStats.StructuredValuesCount; OutContentStats.StandaloneValuesCount = BucketStats.StandaloneValuesCount; return true; } } return false; } bool ZenCacheDiskLayer::StartAsyncMemCacheTrim() { ZEN_TRACE_CPU("Z$::MemCacheTrim"); ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0); ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0); ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0); bool Expected = false; if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true)) { return false; } try { m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) { MemCacheTrim(); }); } catch (const std::exception& Ex) { ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what()); m_IsMemCacheTrimming.store(false); return false; } return true; } void ZenCacheDiskLayer::MemCacheTrim() { ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim"); const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); uint64_t TrimmedSize = 0; Stopwatch Timer; const auto Guard = MakeGuard([&] { ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", NiceBytes(TrimmedSize), NiceBytes(m_TotalMemCachedSize), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); const GcClock::Tick NowTick = GcClock::TickCount(); const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); m_NextAllowedTrimTick.store(NextTrimTick); m_IsMemCacheTrimming.store(false); }); const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); static const size_t UsageSlotCount = 2048; std::vector UsageSlots; UsageSlots.reserve(UsageSlotCount); std::vector Buckets; { RwLock::SharedLockScope __(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Buckets.push_back(Kv.second.get()); } } const GcClock::TimePoint Now = GcClock::Now(); { ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess"); for (CacheBucket* Bucket : Buckets) { Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots); } } const uint64_t MemCacheTargetFootprintBytes = (m_Configuration.MemCacheTargetFootprintBytes * 75) / 100; uint64_t TotalSize = 0; for (size_t Index = 0; Index < UsageSlots.size(); ++Index) { TotalSize += UsageSlots[Index]; if (TotalSize >= MemCacheTargetFootprintBytes) { GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount); TrimmedSize = MemCacheTrim(Buckets, ExpireTime); break; } } } uint64_t ZenCacheDiskLayer::MemCacheTrim(std::vector& Buckets, GcClock::TimePoint ExpireTime) { if (m_Configuration.MemCacheTargetFootprintBytes == 0) { return 0; } uint64_t TrimmedSize = 0; for (CacheBucket* Bucket : Buckets) { TrimmedSize += Bucket->MemCacheTrim(ExpireTime); } const GcClock::TimePoint Now = GcClock::Now(); const GcClock::Tick NowTick = Now.time_since_epoch().count(); const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); GcClock::Tick LastTrimTick = m_NextAllowedTrimTick; const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); return TrimmedSize; } #if ZEN_WITH_TESTS void ZenCacheDiskLayer::SetAccessTime(std::string_view InBucket, const IoHash& HashKey, GcClock::TimePoint Time) { const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto It = m_Buckets.find(BucketName); if (It != m_Buckets.end()) { Bucket = It->second.get(); } } if (Bucket == nullptr) { return; } Bucket->SetAccessTime(HashKey, Time); } #endif // ZEN_WITH_TESTS } // namespace zen