// Copyright Epic Games, Inc. All Rights Reserved. #include "cachedisklayer.h" #include #include #include #include #include #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////// namespace zen { namespace { #pragma pack(push) #pragma pack(1) // We use this to indicate if a on disk bucket needs wiping // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scrable the references // to block items. // See: https://github.com/EpicGames/zen/pull/299 static const uint32_t CurrentDiskBucketVersion = 1; struct CacheBucketIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; static constexpr uint32_t Version2 = 2; static constexpr uint32_t CurrentVersion = Version2; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t PayloadAlignment = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) { return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } }; static_assert(sizeof(CacheBucketIndexHeader) == 32); #pragma pack(pop) const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + IndexExtension); } std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + ".tmp"); } std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + LogExtension); } bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } if (Entry.Location.Reserved != 0) { OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); return false; } if (Entry.Location.GetFlags() & ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); return false; } if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) { return true; } if (Entry.Location.Reserved != 0) { OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); return false; } uint64_t Size = Entry.Location.Size(); if (Size == 0) { OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); return false; } return true; } bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) { int DropIndex = 0; do { if (!std::filesystem::exists(Dir)) { return false; } std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; if (std::filesystem::exists(DroppedBucketPath)) { DropIndex++; continue; } std::error_code Ec; std::filesystem::rename(Dir, DroppedBucketPath, Ec); if (!Ec) { DeleteDirectories(DroppedBucketPath); return true; } // TODO: Do we need to bail at some point? zen::Sleep(100); } while (true); } } // namespace namespace fs = std::filesystem; static CbObject LoadCompactBinaryObject(const fs::path& Path) { FileContents Result = ReadFile(Path); if (!Result.ErrorCode) { IoBuffer Buffer = Result.Flatten(); if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) { return LoadCompactBinaryObject(Buffer); } } return CbObject(); } static void SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) { WriteFile(Path, Object.GetBuffer().AsIoBuffer()); } ////////////////////////////////////////////////////////////////////////// const size_t ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex; const size_t ZenCacheDiskLayer::CacheBucket::NoReferencesIndex; ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, bool EnableReferenceCaching) : m_BucketName(std::move(BucketName)) , m_BucketId(Oid::Zero) , m_EnableReferenceCaching(EnableReferenceCaching) { if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) { // This is pretty ad hoc but in order to avoid too many individual files // it makes sense to have a different strategy for legacy values m_LargeObjectThreshold = 16 * 1024 * 1024; } } ZenCacheDiskLayer::CacheBucket::~CacheBucket() { } bool ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) { using namespace std::literals; ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate"); ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir); m_BlocksBasePath = BucketDir / "blocks"; m_BucketDir = BucketDir; CreateDirectories(m_BucketDir); std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; bool IsNew = false; CbObject Manifest = LoadCompactBinaryObject(ManifestPath); if (Manifest) { m_BucketId = Manifest["BucketId"sv].AsObjectId(); if (m_BucketId == Oid::Zero) { return false; } const uint32_t Version = Manifest["Version"sv].AsUInt32(0); if (Version != CurrentDiskBucketVersion) { ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion); IsNew = true; } } else if (AllowCreate) { m_BucketId.Generate(); CbObjectWriter Writer; Writer << "BucketId"sv << m_BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; Manifest = Writer.Save(); SaveCompactBinaryObject(ManifestPath, Manifest); IsNew = true; } else { return false; } OpenLog(IsNew); if (!IsNew) { ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate::Manifest"); Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); uint64_t Count = Manifest["Count"sv].AsUInt64(0); if (Count != 0) { std::vector KeysIndexes; KeysIndexes.reserve(Count); CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); for (CbFieldView& KeyView : KeyArray) { if (auto It = m_Index.find(KeyView.AsHash()); It != m_Index.end()) { KeysIndexes.push_back(It.value()); continue; } KeysIndexes.push_back((uint64_t)-1); } size_t KeyIndexOffset = 0; CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); for (CbFieldView& TimeStampView : TimeStampArray) { size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; if (KeyIndex == (uint64_t)-1) { continue; } m_AccessTimes[KeyIndex] = TimeStampView.AsInt64(); } KeyIndexOffset = 0; CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); for (CbFieldView& RawHashView : RawHashArray) { size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; if (KeyIndex == (uint64_t)-1) { continue; } m_Payloads[KeyIndex].RawHash = RawHashView.AsHash(); } KeyIndexOffset = 0; CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); for (CbFieldView& RawSizeView : RawSizeArray) { size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; if (KeyIndex == (uint64_t)-1) { continue; } m_Payloads[KeyIndex].RawSize = RawSizeView.AsUInt64(); } } ////// Legacy format read { for (CbFieldView Entry : Manifest["Timestamps"sv]) { const CbObjectView Obj = Entry.AsObjectView(); const IoHash Key = Obj["Key"sv].AsHash(); if (auto It = m_Index.find(Key); It != m_Index.end()) { size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); } } for (CbFieldView Entry : Manifest["RawInfo"sv]) { const CbObjectView Obj = Entry.AsObjectView(); const IoHash Key = Obj["Key"sv].AsHash(); if (auto It = m_Index.find(Key); It != m_Index.end()) { size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); const IoHash RawHash = Obj["RawHash"sv].AsHash(); const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); if (RawHash == IoHash::Zero || RawSize == 0) { ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); } m_Payloads[EntryIndex].RawHash = RawHash; m_Payloads[EntryIndex].RawSize = RawSize; } } } } return true; } void ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() { ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeIndexSnapshot"); uint64_t LogCount = m_SlogFile.GetLogCount(); if (m_LogFlushPosition == LogCount) { return; } ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", m_BucketDir, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); // Move index away, we keep it if something goes wrong if (fs::is_regular_file(STmpIndexPath)) { std::error_code Ec; if (!fs::remove(STmpIndexPath, Ec) || Ec) { ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", STmpIndexPath, Ec.message()); return; } } try { if (fs::is_regular_file(IndexPath)) { fs::rename(IndexPath, STmpIndexPath); } // Write the current state of the location map to a new index state std::vector Entries; Entries.resize(m_Index.size()); { uint64_t EntryIndex = 0; for (auto& Entry : m_Index) { DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; IndexEntry.Key = Entry.first; IndexEntry.Location = m_Payloads[Entry.second].Location; } } BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount, .PayloadAlignment = gsl::narrow(m_PayloadAlignment)}; Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); ObjectIndexFile.Flush(); ObjectIndexFile.Close(); EntryCount = Entries.size(); m_LogFlushPosition = LogCount; } catch (std::exception& Err) { ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); // Restore any previous snapshot if (fs::is_regular_file(STmpIndexPath)) { std::error_code Ec; fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless fs::rename(STmpIndexPath, IndexPath, Ec); if (Ec) { ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", STmpIndexPath, Ec.message()); } } } if (fs::is_regular_file(STmpIndexPath)) { std::error_code Ec; if (!fs::remove(STmpIndexPath, Ec) || Ec) { ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", STmpIndexPath, Ec.message()); } } } uint64_t ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) { ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile"); if (std::filesystem::is_regular_file(IndexPath)) { BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t Size = ObjectIndexFile.FileSize(); if (Size >= sizeof(CacheBucketIndexHeader)) { CacheBucketIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0)) { switch (Header.Version) { case CacheBucketIndexHeader::Version2: { uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); if (Header.EntryCount > ExpectedEntryCount) { break; } size_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); m_PayloadAlignment = Header.PayloadAlignment; std::vector Entries; Entries.resize(Header.EntryCount); ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); m_Payloads.reserve(Header.EntryCount); if (m_EnableReferenceCaching) { m_FirstReferenceIndex.reserve(Header.EntryCount); } m_AccessTimes.reserve(Header.EntryCount); m_Index.reserve(Header.EntryCount); std::string InvalidEntryReason; for (const DiskIndexEntry& Entry : Entries) { if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; } size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero}); m_AccessTimes.emplace_back(GcClock::TickCount()); if (m_EnableReferenceCaching) { m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); } m_Index.insert_or_assign(Entry.Key, EntryIndex); EntryCount++; } OutVersion = CacheBucketIndexHeader::Version2; return Header.LogPosition; } break; default: break; } } } ZEN_WARN("skipping invalid index file '{}'", IndexPath); } return 0; } uint64_t ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog"); if (std::filesystem::is_regular_file(LogPath)) { uint64_t LogEntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (CasLog.Initialize()) { uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } LogEntryCount = EntryCount - SkipEntryCount; m_Index.reserve(LogEntryCount); uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const DiskIndexEntry& Record) { std::string InvalidEntryReason; if (Record.Location.Flags & DiskLocation::kTombStone) { m_Index.erase(Record.Key); return; } if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; return; } size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero}); m_AccessTimes.emplace_back(GcClock::TickCount()); if (m_EnableReferenceCaching) { m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); } m_Index.insert_or_assign(Record.Key, EntryIndex); }, SkipEntryCount); if (InvalidEntryCount) { ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); } return LogEntryCount; } } return 0; }; void ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) { ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog"); m_TotalStandaloneSize = 0; m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); m_FirstReferenceIndex.clear(); m_ReferenceHashes.clear(); m_NextReferenceHashesIndexes.clear(); m_ReferenceCount = 0; std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); if (IsNew) { fs::remove(LogPath); fs::remove(IndexPath); fs::remove_all(m_BlocksBasePath); } CreateDirectories(m_BucketDir); std::unordered_map BlockSizes = m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); if (std::filesystem::is_regular_file(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); std::filesystem::remove(IndexPath); } } uint64_t LogEntryCount = 0; if (std::filesystem::is_regular_file(LogPath)) { if (TCasLogFile::IsValid(LogPath)) { LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); } else if (fs::is_regular_file(LogPath)) { ZEN_WARN("removing invalid cas log at '{}'", LogPath); std::filesystem::remove(LogPath); } } m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); std::vector KnownLocations; KnownLocations.reserve(m_Index.size()); std::vector BadEntries; for (const auto& Entry : m_Index) { size_t EntryIndex = Entry.second; const BucketPayload& Payload = m_Payloads[EntryIndex]; const DiskLocation& Location = Payload.Location; if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); continue; } const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); auto BlockIt = BlockSizes.find(BlockLocation.BlockIndex); if (BlockIt == BlockSizes.end()) { ZEN_WARN("Unknown block {} for entry {} in '{}'", BlockLocation.BlockIndex, Entry.first.ToHexString(), m_BucketDir); } else { uint64_t BlockSize = BlockIt->second; if (BlockLocation.Offset + BlockLocation.Size > BlockSize) { ZEN_WARN("Range is outside of block {} for entry {} in '{}'", BlockLocation.BlockIndex, Entry.first.ToHexString(), m_BucketDir); } else { KnownLocations.push_back(BlockLocation); continue; } } DiskLocation NewLocation = Payload.Location; NewLocation.Flags |= DiskLocation::kTombStone; BadEntries.push_back(DiskIndexEntry{.Key = Entry.first, .Location = NewLocation}); } if (!BadEntries.empty()) { m_SlogFile.Append(BadEntries); m_SlogFile.Flush(); LogEntryCount += BadEntries.size(); for (const DiskIndexEntry& BadEntry : BadEntries) { m_Index.erase(BadEntry.Key); } } m_BlockStore.Prune(KnownLocations); if (IsNew || LogEntryCount > 0) { MakeIndexSnapshot(); } // TODO: should validate integrity of container files here } void ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const { char HexString[sizeof(HashKey.Hash) * 2]; ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); Path.Append(m_BucketDir); Path.AppendSeparator(); Path.Append(L"blob"); Path.AppendSeparator(); Path.AppendAsciiRange(HexString, HexString + 3); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 3, HexString + 5); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); } IoBuffer ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const { ZEN_TRACE_CPU("Z$::Disk::Bucket::GetInlineCacheValue"); BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); IoBuffer Value = m_BlockStore.TryGetChunk(Location); if (Value) { Value.SetContentType(Loc.GetContentType()); } return Value; } IoBuffer ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const { ZEN_TRACE_CPU("Z$::Disk::Bucket::GetStandaloneCacheValue"); ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) { Data.SetContentType(ContentType); return Data; } return {}; } bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { metrics::RequestStats::Scope StatsScope(m_GetOps, 0); RwLock::SharedLockScope _(m_IndexLock); auto It = m_Index.find(HashKey); if (It == m_Index.end()) { m_MissCount++; return false; } size_t EntryIndex = It.value(); const BucketPayload& Payload = m_Payloads[EntryIndex]; m_AccessTimes[EntryIndex] = GcClock::TickCount(); DiskLocation Location = Payload.Location; OutValue.RawSize = Payload.RawSize; OutValue.RawHash = Payload.RawHash; if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { // We don't need to hold the index lock when we read a standalone file _.ReleaseNow(); OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey); } else { OutValue.Value = GetInlineCacheValue(Location); } _.ReleaseNow(); if (!Location.IsFlagSet(DiskLocation::kStructured)) { if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0) { if (Location.IsFlagSet(DiskLocation::kCompressed)) { (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize); } else { OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); OutValue.RawSize = OutValue.Value.GetSize(); } RwLock::ExclusiveLockScope __(m_IndexLock); if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) { BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; WritePayload.RawHash = OutValue.RawHash; WritePayload.RawSize = OutValue.RawSize; m_LogFlushPosition = 0; // Force resave of index on exit } } } if (OutValue.Value) { m_HitCount++; StatsScope.SetBytes(OutValue.Value.GetSize()); return true; } else { m_MissCount++; return false; } } void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References) { metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); if (Value.Value.Size() >= m_LargeObjectThreshold) { return PutStandaloneCacheValue(HashKey, Value, References); } PutInlineCacheValue(HashKey, Value, References); m_WriteCount++; } bool ZenCacheDiskLayer::CacheBucket::Drop() { ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop"); RwLock::ExclusiveLockScope _(m_IndexLock); std::vector> ShardLocks; ShardLocks.reserve(256); for (RwLock& Lock : m_ShardedLocks) { ShardLocks.push_back(std::make_unique(Lock)); } m_BlockStore.Close(); m_SlogFile.Close(); bool Deleted = MoveAndDeleteDirectory(m_BucketDir); m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); m_FirstReferenceIndex.clear(); m_ReferenceHashes.clear(); m_NextReferenceHashesIndexes.clear(); m_ReferenceCount = 0; return Deleted; } void ZenCacheDiskLayer::CacheBucket::Flush() { ZEN_TRACE_CPU("Z$::Disk::Bucket::Flush"); bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { return; } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); std::vector AccessTimes; std::vector Payloads; IndexMap Index; { RwLock::SharedLockScope IndexLock(m_IndexLock); MakeIndexSnapshot(); Index = m_Index; Payloads = m_Payloads; AccessTimes = m_AccessTimes; } SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads)); } void ZenCacheDiskLayer::CacheBucket::SaveManifest(CbObject&& Manifest) { ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest"); try { SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Manifest); } catch (std::exception& Err) { ZEN_WARN("writing manifest FAILED, reason: '{}'", Err.what()); } } CbObject ZenCacheDiskLayer::CacheBucket::MakeManifest(IndexMap&& Index, std::vector&& AccessTimes, const std::vector& Payloads) { using namespace std::literals; ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeManifest"); size_t ItemCount = m_Index.size(); // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth // And we don't need to reallocate theunderying buffer in almost every case const size_t EstimatedSizePerItem = 54u; const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); CbObjectWriter Writer(ReserveSize); Writer << "BucketId"sv << m_BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; if (!m_Index.empty()) { Writer.AddInteger("Count"sv, gsl::narrow(Index.size())); Writer.BeginArray("Keys"sv); for (auto& Kv : Index) { const IoHash& Key = Kv.first; Writer.AddHash(Key); } Writer.EndArray(); Writer.BeginArray("Timestamps"sv); for (auto& Kv : Index) { GcClock::Tick AccessTime = AccessTimes[Kv.second]; Writer.AddInteger(AccessTime); } Writer.EndArray(); Writer.BeginArray("RawHash"sv); for (auto& Kv : Index) { const BucketPayload& Payload = Payloads[Kv.second]; Writer.AddHash(Payload.RawHash); } Writer.EndArray(); Writer.BeginArray("RawSize"sv); for (auto& Kv : Index) { const BucketPayload& Payload = Payloads[Kv.second]; Writer.AddInteger(Payload.RawSize); } Writer.EndArray(); } return Writer.Save(); } IoHash HashBuffer(const CompositeBuffer& Buffer) { IoHashStream Hasher; for (const SharedBuffer& Segment : Buffer.GetSegments()) { Hasher.Append(Segment.GetView()); } return Hasher.GetHash(); } bool ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) { ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType); if (ContentType == ZenContentType::kCbObject) { CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); if (Error == CbValidateError::None) { return true; } ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error)); return false; } else if (ContentType == ZenContentType::kCompressedBinary) { IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer); IoHash HeaderRawHash; uint64_t RawSize = 0; if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize)) { ZEN_SCOPED_ERROR("compressed buffer header validation failed"); return false; } CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize); CompositeBuffer Decompressed = Compressed.DecompressToComposite(); IoHash DecompressedHash = HashBuffer(Decompressed); if (HeaderRawHash != DecompressedHash) { ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash); return false; } } else { // No way to verify this kind of content (what is it exactly?) static int Once = [&] { ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType)); return 42; }(); } return true; }; void ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub"); ZEN_INFO("scrubbing '{}'", m_BucketDir); Stopwatch Timer; uint64_t ChunkCount = 0; uint64_t VerifiedChunkBytes = 0; auto LogStats = MakeGuard([&] { const uint32_t DurationMs = gsl::narrow(Timer.GetElapsedTimeMs()); ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", m_BucketName, NiceBytes(VerifiedChunkBytes), NiceTimeSpanMs(DurationMs), ChunkCount, NiceRate(VerifiedChunkBytes, DurationMs)); }); std::vector BadKeys; auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); }; try { std::vector ChunkLocations; std::vector ChunkIndexToChunkHash; RwLock::SharedLockScope _(m_IndexLock); const size_t BlockChunkInitialCount = m_Index.size() / 4; ChunkLocations.reserve(BlockChunkInitialCount); ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); // Do a pass over the index and verify any standalone file values straight away // all other storage classes are gathered and verified in bulk in order to enable // more efficient I/O scheduling for (auto& Kv : m_Index) { const IoHash& HashKey = Kv.first; const BucketPayload& Payload = m_Payloads[Kv.second]; const DiskLocation& Loc = Payload.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { Ctx.ThrowIfDeadlineExpired(); ++ChunkCount; VerifiedChunkBytes += Loc.Size(); if (Loc.GetContentType() == ZenContentType::kBinary) { // Blob cache value, not much we can do about data integrity checking // here since there's no hash available ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); std::error_code Ec; uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); if (Ec) { ReportBadKey(HashKey); } if (size != Loc.Size()) { ReportBadKey(HashKey); } continue; } else { // Structured cache value IoBuffer Buffer = GetStandaloneCacheValue(Loc.GetContentType(), HashKey); if (!Buffer) { ReportBadKey(HashKey); continue; } if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer)) { ReportBadKey(HashKey); continue; } } } else { ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment)); ChunkIndexToChunkHash.push_back(HashKey); continue; } } const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void { ++ChunkCount; VerifiedChunkBytes += Size; const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; if (!Data) { // ChunkLocation out of range of stored blocks ReportBadKey(Hash); return; } if (!Size) { ReportBadKey(Hash); return; } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); if (!Buffer) { ReportBadKey(Hash); return; } const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) { ReportBadKey(Hash); return; } }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void { Ctx.ThrowIfDeadlineExpired(); ++ChunkCount; VerifiedChunkBytes += Size; const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); if (!Buffer) { ReportBadKey(Hash); return; } const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) { ReportBadKey(Hash); return; } }; m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); } catch (ScrubDeadlineExpiredException&) { ZEN_INFO("Scrubbing deadline expired, operation incomplete"); } Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes); if (!BadKeys.empty()) { ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir); if (Ctx.RunRecovery()) { // Deal with bad chunks by removing them from our lookup map std::vector LogEntries; LogEntries.reserve(BadKeys.size()); { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (const IoHash& BadKey : BadKeys) { // Log a tombstone and delete the in-memory index for the bad entry const auto It = m_Index.find(BadKey); BucketPayload& Payload = m_Payloads[It->second]; if (m_EnableReferenceCaching) { RemoveReferences(IndexLock, m_FirstReferenceIndex[It->second]); } DiskLocation Location = Payload.Location; Location.Flags |= DiskLocation::kTombStone; LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); m_Index.erase(BadKey); } } for (const DiskIndexEntry& Entry : LogEntries) { if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) { ExtendablePathBuilder<256> Path; BuildPath(Path, Entry.Key); fs::path FilePath = Path.ToPath(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); if (fs::is_regular_file(FilePath)) { ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); std::error_code Ec; fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... } m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); } } m_SlogFile.Append(LogEntries); // Clean up m_AccessTimes and m_Payloads vectors { std::vector Payloads; std::vector AccessTimes; std::vector FirstReferenceIndex; IndexMap Index; { RwLock::ExclusiveLockScope __(m_IndexLock); size_t EntryCount = m_Index.size(); Payloads.reserve(EntryCount); AccessTimes.reserve(EntryCount); if (m_EnableReferenceCaching) { FirstReferenceIndex.reserve(EntryCount); } Index.reserve(EntryCount); for (auto It : m_Index) { size_t EntryIndex = Payloads.size(); Payloads.push_back(m_Payloads[It.second]); AccessTimes.push_back(m_AccessTimes[It.second]); if (m_EnableReferenceCaching) { FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]); } Index.insert({It.first, EntryIndex}); } m_Index.swap(Index); m_Payloads.swap(Payloads); m_AccessTimes.swap(AccessTimes); m_FirstReferenceIndex.swap(FirstReferenceIndex); CompactReferences(__); } } } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do if (!BadKeys.empty()) { Ctx.ReportBadCidChunks(BadKeys); } } void ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Disk::Bucket::GatherReferences"); #define CALCULATE_BLOCKING_TIME 0 #if CALCULATE_BLOCKING_TIME uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; #endif // CALCULATE_BLOCKING_TIME Stopwatch TotalTimer; const auto _ = MakeGuard([&] { #if CALCULATE_BLOCKING_TIME ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), NiceLatencyNs(ReadBlockTimeUs), NiceLatencyNs(ReadBlockLongestTimeUs)); #else ZEN_DEBUG("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); #endif // CALCULATE_BLOCKING_TIME }); const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime(); const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); IndexMap Index; std::vector AccessTimes; std::vector Payloads; std::vector FirstReferenceIndex; { RwLock::SharedLockScope __(m_IndexLock); #if CALCULATE_BLOCKING_TIME Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME Index = m_Index; AccessTimes = m_AccessTimes; Payloads = m_Payloads; FirstReferenceIndex = m_FirstReferenceIndex; } std::vector ExpiredKeys; ExpiredKeys.reserve(1024); std::vector Cids; if (!GcCtx.SkipCid()) { Cids.reserve(1024); } std::vector> StructuredItemsWithUnknownAttachments; for (const auto& Entry : Index) { const IoHash& Key = Entry.first; GcClock::Tick AccessTime = AccessTimes[Entry.second]; if (AccessTime < ExpireTicks) { ExpiredKeys.push_back(Key); continue; } if (GcCtx.SkipCid()) { continue; } BucketPayload& Payload = Payloads[Entry.second]; const DiskLocation& Loc = Payload.Location; if (!Loc.IsFlagSet(DiskLocation::kStructured)) { continue; } if (m_EnableReferenceCaching) { if (FirstReferenceIndex.empty() || (FirstReferenceIndex[Entry.second] == UnknownReferencesIndex)) { StructuredItemsWithUnknownAttachments.push_back(Entry); continue; } bool ReferencesAreKnown = false; { RwLock::SharedLockScope IndexLock(m_IndexLock); #if CALCULATE_BLOCKING_TIME Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Entry.first); It != m_Index.end()) { ReferencesAreKnown = GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids); } } if (ReferencesAreKnown) { if (Cids.size() >= 1024) { GcCtx.AddRetainedCids(Cids); Cids.clear(); } continue; } } StructuredItemsWithUnknownAttachments.push_back(Entry); } for (const auto& Entry : StructuredItemsWithUnknownAttachments) { BucketPayload& Payload = Payloads[Entry.second]; const DiskLocation& Loc = Payload.Location; { IoBuffer Buffer; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Entry.first); !Buffer) { continue; } } else { RwLock::SharedLockScope IndexLock(m_IndexLock); #if CALCULATE_BLOCKING_TIME Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Entry.first); It != m_Index.end()) { BucketPayload& UpdatePayload = m_Payloads[It->second]; Buffer = GetInlineCacheValue(UpdatePayload.Location); } if (!Buffer) { continue; } } ZEN_ASSERT(Buffer); ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); CbObject Obj(SharedBuffer{Buffer}); size_t CurrentCidCount = Cids.size(); Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); if (m_EnableReferenceCaching) { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); #if CALCULATE_BLOCKING_TIME Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Entry.first); It != m_Index.end()) { if (m_FirstReferenceIndex[It->second] == UnknownReferencesIndex) { SetReferences(IndexLock, m_FirstReferenceIndex[It->second], std::span(Cids.data() + CurrentCidCount, Cids.size() - CurrentCidCount)); } else { Cids.resize(CurrentCidCount); (void)GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids); } } } if (Cids.size() >= 1024) { GcCtx.AddRetainedCids(Cids); Cids.clear(); } } } GcCtx.AddRetainedCids(Cids); GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); } void ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage"); ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); Stopwatch TotalTimer; uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; uint64_t TotalChunkCount = 0; uint64_t DeletedSize = 0; uint64_t OldTotalSize = TotalSize(); std::unordered_set DeletedChunks; uint64_t MovedCount = 0; const auto _ = MakeGuard([&] { ZEN_DEBUG( "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " "{} " "of {} " "entires ({}).", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), NiceLatencyNs(ReadBlockTimeUs), NiceLatencyNs(ReadBlockLongestTimeUs), NiceBytes(DeletedSize), DeletedChunks.size(), MovedCount, TotalChunkCount, NiceBytes(OldTotalSize)); bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { return; } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); std::vector AccessTimes; std::vector Payloads; IndexMap Index; { RwLock::SharedLockScope IndexLock(m_IndexLock); MakeIndexSnapshot(); Index = m_Index; Payloads = m_Payloads; AccessTimes = m_AccessTimes; } SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads)); }); m_SlogFile.Flush(); std::span ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); std::vector DeleteCacheKeys; DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { if (Keep) { return; } DeleteCacheKeys.push_back(ChunkHash); }); if (DeleteCacheKeys.empty()) { ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir); return; } auto __ = MakeGuard([&]() { if (!DeletedChunks.empty()) { // Clean up m_AccessTimes and m_Payloads vectors std::vector Payloads; std::vector AccessTimes; std::vector FirstReferenceIndex; IndexMap Index; { RwLock::ExclusiveLockScope _(m_IndexLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); size_t EntryCount = m_Index.size(); Payloads.reserve(EntryCount); AccessTimes.reserve(EntryCount); if (m_EnableReferenceCaching) { FirstReferenceIndex.reserve(EntryCount); } Index.reserve(EntryCount); for (auto It : m_Index) { size_t OldEntryIndex = It.second; size_t NewEntryIndex = Payloads.size(); Payloads.push_back(m_Payloads[OldEntryIndex]); AccessTimes.push_back(m_AccessTimes[OldEntryIndex]); if (m_EnableReferenceCaching) { FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]); } Index.insert({It.first, NewEntryIndex}); } m_Index.swap(Index); m_Payloads.swap(Payloads); m_AccessTimes.swap(AccessTimes); m_FirstReferenceIndex.swap(FirstReferenceIndex); CompactReferences(_); } GcCtx.AddDeletedCids(std::vector(DeletedChunks.begin(), DeletedChunks.end())); } }); std::vector ExpiredStandaloneEntries; IndexMap Index; std::vector Payloads; BlockStore::ReclaimSnapshotState BlockStoreState; { bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is currently flushing", m_BucketDir); return; } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); std::vector AccessTimes; { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State"); RwLock::SharedLockScope IndexLock(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); if (m_Index.empty()) { ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir); return; } BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); Payloads = m_Payloads; AccessTimes = m_AccessTimes; Index = m_Index; for (const IoHash& Key : DeleteCacheKeys) { if (auto It = Index.find(Key); It != Index.end()) { const BucketPayload& Payload = Payloads[It->second]; DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; if (Entry.Location.Flags & DiskLocation::kStandaloneFile) { Entry.Location.Flags |= DiskLocation::kTombStone; ExpiredStandaloneEntries.push_back(Entry); } } } if (GcCtx.IsDeletionMode()) { for (const auto& Entry : ExpiredStandaloneEntries) { m_Index.erase(Entry.Key); m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); DeletedChunks.insert(Entry.Key); } m_SlogFile.Append(ExpiredStandaloneEntries); } } SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads)); } if (GcCtx.IsDeletionMode()) { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::Delete"); std::error_code Ec; ExtendablePathBuilder<256> Path; for (const auto& Entry : ExpiredStandaloneEntries) { const IoHash& Key = Entry.Key; const DiskLocation& Loc = Entry.Location; Path.Reset(); BuildPath(Path, Key); fs::path FilePath = Path.ToPath(); { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); if (m_Index.contains(Key)) { // Someone added it back, let the file on disk be ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); continue; } __.ReleaseNow(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); if (fs::is_regular_file(FilePath)) { ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); fs::remove(FilePath, Ec); } } if (Ec) { ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); Ec.clear(); DiskLocation RestoreLocation = Loc; RestoreLocation.Flags &= ~DiskLocation::kTombStone; RwLock::ExclusiveLockScope __(m_IndexLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); if (m_Index.contains(Key)) { continue; } m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation}); m_AccessTimes.emplace_back(GcClock::TickCount()); if (m_EnableReferenceCaching) { m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); } m_Index.insert({Key, EntryIndex}); m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed); DeletedChunks.erase(Key); continue; } DeletedSize += Entry.Location.Size(); } } TotalChunkCount = Index.size(); std::vector TotalChunkHashes; TotalChunkHashes.reserve(TotalChunkCount); { for (const auto& Entry : Index) { const DiskLocation& Location = Payloads[Entry.second].Location; if (Location.Flags & DiskLocation::kStandaloneFile) { continue; } TotalChunkHashes.push_back(Entry.first); } } if (TotalChunkHashes.empty()) { return; } TotalChunkCount = TotalChunkHashes.size(); std::vector ChunkLocations; BlockStore::ChunkIndexArray KeepChunkIndexes; std::vector ChunkIndexToChunkHash; ChunkLocations.reserve(TotalChunkCount); ChunkLocations.reserve(TotalChunkCount); ChunkIndexToChunkHash.reserve(TotalChunkCount); GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { auto KeyIt = Index.find(ChunkHash); const DiskLocation& DiskLocation = Payloads[KeyIt->second].Location; BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; if (Keep) { KeepChunkIndexes.push_back(ChunkIndex); } }); size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); uint64_t CurrentTotalSize = TotalSize(); ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}", m_BucketDir, DeleteCount, TotalChunkCount, NiceBytes(CurrentTotalSize)); return; } m_BlockStore.ReclaimSpace( BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, false, [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { std::vector LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); { RwLock::ExclusiveLockScope __(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); for (const auto& Entry : MovedChunks) { size_t ChunkIndex = Entry.first; const BlockStoreLocation& NewLocation = Entry.second; const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; size_t PayloadIndex = m_Index[ChunkHash]; BucketPayload& Payload = m_Payloads[PayloadIndex]; if (Payloads[Index[ChunkHash]].Location != m_Payloads[PayloadIndex].Location) { // Entry has been updated while GC was running, ignore the move continue; } Payload.Location = DiskLocation(NewLocation, m_PayloadAlignment, Payload.Location.GetFlags()); LogEntries.push_back({.Key = ChunkHash, .Location = Payload.Location}); } for (const size_t ChunkIndex : RemovedChunks) { const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; size_t PayloadIndex = m_Index[ChunkHash]; const BucketPayload& Payload = m_Payloads[PayloadIndex]; if (Payloads[Index[ChunkHash]].Location != Payload.Location) { // Entry has been updated while GC was running, ignore the delete continue; } const DiskLocation& OldDiskLocation = Payload.Location; LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), m_PayloadAlignment, OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); m_Index.erase(ChunkHash); DeletedChunks.insert(ChunkHash); } } m_SlogFile.Append(LogEntries); m_SlogFile.Flush(); }, [&]() { return GcCtx.CollectSmallObjects(); }); } void ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector& AccessTimes) { ZEN_TRACE_CPU("Z$::Disk::Bucket::UpdateAccessTimes"); using namespace access_tracking; for (const KeyAccessTime& KeyTime : AccessTimes) { if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end()) { size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); m_AccessTimes[EntryIndex] = KeyTime.LastAccess; } } } ZenCacheDiskLayer::BucketStats ZenCacheDiskLayer::CacheBucket::Stats() { return ZenCacheDiskLayer::BucketStats{.TotalSize = TotalSize(), .HitCount = m_HitCount, .MissCount = m_MissCount, .WriteCount = m_WriteCount, .PutOps = m_PutOps.Snapshot(), .GetOps = m_GetOps.Snapshot()}; } uint64_t ZenCacheDiskLayer::CacheBucket::EntryCount() const { RwLock::SharedLockScope _(m_IndexLock); return static_cast(m_Index.size()); } CacheValueDetails::ValueDetails ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index) const { std::vector Attachments; const BucketPayload& Payload = m_Payloads[Index]; if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) { IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key) : GetInlineCacheValue(Payload.Location); CbObject Obj(SharedBuffer{Value}); Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); } return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), .RawSize = Payload.RawSize, .RawHash = Payload.RawHash, .LastAccess = m_AccessTimes[Index], .Attachments = std::move(Attachments), .ContentType = Payload.Location.GetContentType()}; } CacheValueDetails::BucketDetails ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const { CacheValueDetails::BucketDetails Details; RwLock::SharedLockScope _(m_IndexLock); if (ValueFilter.empty()) { Details.Values.reserve(m_Index.size()); for (const auto& It : m_Index) { Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second)); } } else { IoHash Key = IoHash::FromHexString(ValueFilter); if (auto It = m_Index.find(Key); It != m_Index.end()) { Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second)); } } return Details; } void ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents( std::function& Fn) const { RwLock::SharedLockScope _(m_IndexLock); for (const auto& It : m_Index) { CacheValueDetails::ValueDetails Vd = GetValueDetails(It.first, It.second); Fn(It.first, Vd); } } void ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Disk::CollectGarbage"); std::vector Buckets; { RwLock::SharedLockScope _(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Buckets.push_back(Kv.second.get()); } } for (CacheBucket* Bucket : Buckets) { Bucket->CollectGarbage(GcCtx); } } void ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes) { RwLock::SharedLockScope _(m_Lock); for (const auto& Kv : AccessTimes.Buckets) { if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end()) { CacheBucket& Bucket = *It->second; Bucket.UpdateAccessTimes(Kv.second); } } } void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References) { ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue"); uint64_t NewFileSize = Value.Value.Size(); TemporaryFile DataFile; std::error_code Ec; DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); } bool CleanUpTempFile = false; auto __ = MakeGuard([&] { if (CleanUpTempFile) { std::error_code Ec; std::filesystem::remove(DataFile.GetPath(), Ec); if (Ec) { ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", DataFile.GetPath(), m_BucketDir, Ec.message()); } } }); DataFile.WriteAll(Value.Value, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", NiceBytes(NewFileSize), DataFile.GetPath().string(), m_BucketDir)); } ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); std::filesystem::path FsPath{DataFilePath.ToPath()}; RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); // We do a speculative remove of the file instead of probing with a exists call and check the error code instead std::filesystem::remove(FsPath, Ec); if (Ec) { if (Ec.value() != ENOENT) { ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); Sleep(100); Ec.clear(); std::filesystem::remove(FsPath, Ec); if (Ec && Ec.value() != ENOENT) { throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); } } } // Assume parent directory exists DataFile.MoveTemporaryIntoPlace(FsPath, Ec); if (Ec) { CreateDirectories(FsPath.parent_path()); // Try again after we or someone else created the directory Ec.clear(); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); // Retry if we still fail to handle contention to file system uint32_t RetriesLeft = 3; while (Ec && RetriesLeft > 0) { ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retries left: {}.", FsPath, DataFile.GetPath(), m_BucketDir, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms Ec.clear(); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); RetriesLeft--; } if (Ec) { throw std::system_error( Ec, fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir)); } } // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file // will be disabled as the file handle has already been closed CleanUpTempFile = false; uint8_t EntryFlags = DiskLocation::kStandaloneFile; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } DiskLocation Loc(NewFileSize, EntryFlags); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); m_AccessTimes.emplace_back(GcClock::TickCount()); if (m_EnableReferenceCaching) { m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); } m_Index.insert_or_assign(HashKey, EntryIndex); } else { // TODO: should check if write is idempotent and bail out if it is? size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); BucketPayload& Payload = m_Payloads[EntryIndex]; Payload = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}; if (m_EnableReferenceCaching) { SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); } m_AccessTimes[EntryIndex] = GcClock::TickCount(); m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); } void ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References) { ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue"); uint8_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); m_SlogFile.Append({.Key = HashKey, .Location = Location}); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) { // TODO: should check if write is idempotent and bail out if it is? // this would requiring comparing contents on disk unless we add a // content hash to the index entry size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); m_AccessTimes[EntryIndex] = GcClock::TickCount(); if (m_EnableReferenceCaching) { SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); } } else { size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); m_AccessTimes.emplace_back(GcClock::TickCount()); if (m_EnableReferenceCaching) { m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); } m_Index.insert_or_assign(HashKey, EntryIndex); } }); } void ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&) { std::vector FirstReferenceIndex; std::vector NewReferenceHashes; std::vector NewNextReferenceHashesIndexes; FirstReferenceIndex.reserve(m_ReferenceCount); NewReferenceHashes.reserve(m_ReferenceCount); NewNextReferenceHashesIndexes.reserve(m_ReferenceCount); for (const auto& It : m_Index) { size_t SourceIndex = m_FirstReferenceIndex[It.second]; if (SourceIndex == UnknownReferencesIndex) { continue; } if (SourceIndex == NoReferencesIndex) { continue; } FirstReferenceIndex.push_back(NewNextReferenceHashesIndexes.size()); NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]); NewNextReferenceHashesIndexes.push_back(NoReferencesIndex); SourceIndex = m_NextReferenceHashesIndexes[SourceIndex]; while (SourceIndex != NoReferencesIndex) { NewNextReferenceHashesIndexes.back() = NewReferenceHashes.size(); NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]); NewNextReferenceHashesIndexes.push_back(NoReferencesIndex); SourceIndex = m_NextReferenceHashesIndexes[SourceIndex]; } } m_FirstReferenceIndex.swap(FirstReferenceIndex); m_ReferenceHashes.swap(NewReferenceHashes); m_NextReferenceHashesIndexes.swap(NewNextReferenceHashesIndexes); m_ReferenceCount = m_ReferenceHashes.size(); } size_t ZenCacheDiskLayer::CacheBucket::AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key) { size_t ReferenceIndex = m_ReferenceHashes.size(); m_ReferenceHashes.push_back(Key); m_NextReferenceHashesIndexes.push_back(NoReferencesIndex); m_ReferenceCount++; return ReferenceIndex; } void ZenCacheDiskLayer::CacheBucket::SetReferences(RwLock::ExclusiveLockScope& Lock, std::size_t& FirstReferenceIndex, std::span References) { auto ReferenceIt = References.begin(); if (FirstReferenceIndex == UnknownReferencesIndex) { FirstReferenceIndex = NoReferencesIndex; } size_t CurrentIndex = FirstReferenceIndex; if (CurrentIndex != NoReferencesIndex) { if (ReferenceIt != References.end()) { ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero); if (CurrentIndex == NoReferencesIndex) { CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt); FirstReferenceIndex = CurrentIndex; } else { m_ReferenceHashes[CurrentIndex] = *ReferenceIt; } ReferenceIt++; } } else { if (ReferenceIt != References.end()) { ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero); CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt); ReferenceIt++; } FirstReferenceIndex = CurrentIndex; } while (ReferenceIt != References.end()) { ZEN_ASSERT(CurrentIndex != NoReferencesIndex); ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero); size_t ReferenceIndex = m_NextReferenceHashesIndexes[CurrentIndex]; if (ReferenceIndex == NoReferencesIndex) { ReferenceIndex = AllocateReferenceEntry(Lock, *ReferenceIt); m_NextReferenceHashesIndexes[CurrentIndex] = ReferenceIndex; } else { m_ReferenceHashes[ReferenceIndex] = *ReferenceIt; } CurrentIndex = ReferenceIndex; ReferenceIt++; } while (CurrentIndex != NoReferencesIndex) { size_t NextIndex = m_NextReferenceHashesIndexes[CurrentIndex]; if (NextIndex != NoReferencesIndex) { m_ReferenceHashes[CurrentIndex] = IoHash::Zero; ZEN_ASSERT(m_ReferenceCount > 0); m_ReferenceCount--; m_NextReferenceHashesIndexes[CurrentIndex] = NoReferencesIndex; } CurrentIndex = NextIndex; } } void ZenCacheDiskLayer::CacheBucket::RemoveReferences(RwLock::ExclusiveLockScope&, std::size_t& FirstReferenceIndex) { if (FirstReferenceIndex == UnknownReferencesIndex) { return; } size_t CurrentIndex = FirstReferenceIndex; while (CurrentIndex != NoReferencesIndex) { m_ReferenceHashes[CurrentIndex] = IoHash::Zero; ZEN_ASSERT(m_ReferenceCount > 0); m_ReferenceCount--; CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex]; } FirstReferenceIndex = UnknownReferencesIndex; } bool ZenCacheDiskLayer::CacheBucket::LockedGetReferences(std::size_t FirstReferenceIndex, std::vector& OutReferences) const { if (FirstReferenceIndex == UnknownReferencesIndex) { return false; } size_t CurrentIndex = FirstReferenceIndex; while (CurrentIndex != NoReferencesIndex) { ZEN_ASSERT_SLOW(m_ReferenceHashes[CurrentIndex] != IoHash::Zero); OutReferences.push_back(m_ReferenceHashes[CurrentIndex]); CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex]; } return true; } ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir, bool EnableReferenceCaching) : m_RootDir(RootDir) , m_EnableReferenceCaching(EnableReferenceCaching) { } ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { ZEN_TRACE_CPU("Z$::Disk::Get"); const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto It = m_Buckets.find(BucketName); if (It != m_Buckets.end()) { Bucket = It->second.get(); } } if (Bucket == nullptr) { // Bucket needs to be opened/created RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { Bucket = It->second.get(); } else { auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique(BucketName, m_EnableReferenceCaching)); Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; if (!Bucket->OpenOrCreate(BucketPath)) { m_Buckets.erase(InsertResult.first); return false; } } } ZEN_ASSERT(Bucket != nullptr); return Bucket->Get(HashKey, OutValue); } void ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References) { ZEN_TRACE_CPU("Z$::Disk::Put"); const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto It = m_Buckets.find(BucketName); if (It != m_Buckets.end()) { Bucket = It->second.get(); } } if (Bucket == nullptr) { // New bucket needs to be created RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { Bucket = It->second.get(); } else { auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique(BucketName, m_EnableReferenceCaching)); Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; try { if (!Bucket->OpenOrCreate(BucketPath)) { ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); m_Buckets.erase(InsertResult.first); return; } } catch (const std::exception& Err) { ZEN_WARN("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); throw; } } } ZEN_ASSERT(Bucket != nullptr); Bucket->Put(HashKey, Value, References); } void ZenCacheDiskLayer::DiscoverBuckets() { DirectoryContent DirContent; GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); // Initialize buckets std::vector BadBucketDirectories; RwLock::ExclusiveLockScope _(m_Lock); for (const std::filesystem::path& BucketPath : DirContent.Directories) { const std::string BucketName = PathToUtf8(BucketPath.stem()); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { continue; } if (IsKnownBadBucketName(BucketName)) { BadBucketDirectories.push_back(BucketPath); continue; } auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique(BucketName, m_EnableReferenceCaching)); CacheBucket& Bucket = *InsertResult.first->second; try { if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false)) { ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); m_Buckets.erase(InsertResult.first); continue; } } catch (const std::exception& Err) { ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); return; } ZEN_INFO("Discovered bucket '{}'", BucketName); } for (const std::filesystem::path& BadBucketPath : BadBucketDirectories) { bool IsOk = false; try { IsOk = DeleteDirectories(BadBucketPath); } catch (std::exception&) { } if (IsOk) { ZEN_INFO("found bad bucket at '{}', deleted contents", BadBucketPath); } else { ZEN_WARN("bad bucket delete failed for '{}'", BadBucketPath); } } } bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { RwLock::ExclusiveLockScope _(m_Lock); auto It = m_Buckets.find(std::string(InBucket)); if (It != m_Buckets.end()) { CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); return Bucket.Drop(); } // Make sure we remove the folder even if we don't know about the bucket std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); return MoveAndDeleteDirectory(BucketPath); } bool ZenCacheDiskLayer::Drop() { RwLock::ExclusiveLockScope _(m_Lock); std::vector> Buckets; Buckets.reserve(m_Buckets.size()); while (!m_Buckets.empty()) { const auto& It = m_Buckets.begin(); CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); if (!Bucket.Drop()) { return false; } } return MoveAndDeleteDirectory(m_RootDir); } void ZenCacheDiskLayer::Flush() { std::vector Buckets; { RwLock::SharedLockScope _(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { CacheBucket* Bucket = Kv.second.get(); Buckets.push_back(Bucket); } } for (auto& Bucket : Buckets) { Bucket->Flush(); } } void ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); { std::vector> Results; Results.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { #if 1 Results.push_back( Ctx.ThreadPool().EnqueueTask(std::packaged_task{[Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); #else CacheBucket& Bucket = *Kv.second; Bucket.ScrubStorage(Ctx); #endif } for (auto& Result : Results) { Result.get(); } } } void ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Disk::GatherReferences"); std::vector Buckets; { RwLock::SharedLockScope _(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Buckets.push_back(Kv.second.get()); } } for (CacheBucket* Bucket : Buckets) { Bucket->GatherReferences(GcCtx); } } uint64_t ZenCacheDiskLayer::TotalSize() const { uint64_t TotalSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { TotalSize += Kv.second->TotalSize(); } return TotalSize; } ZenCacheDiskLayer::DiskStats ZenCacheDiskLayer::Stats() const { ZenCacheDiskLayer::DiskStats Stats = {}; { RwLock::SharedLockScope _(m_Lock); Stats.BucketStats.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Stats.BucketStats.emplace_back(NamedBucketStats{.BucketName = Kv.first, .Stats = Kv.second->Stats()}); } } return Stats; } ZenCacheDiskLayer::Info ZenCacheDiskLayer::GetInfo() const { ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir, .EnableReferenceCaching = m_EnableReferenceCaching}, .TotalSize = TotalSize()}; { RwLock::SharedLockScope _(m_Lock); Info.BucketNames.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Info.BucketNames.push_back(Kv.first); Info.EntryCount += Kv.second->EntryCount(); } } return Info; } std::optional ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; } return {}; } void ZenCacheDiskLayer::EnumerateBucketContents(std::string_view Bucket, std::function& Fn) const { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { It->second->EnumerateBucketContents(Fn); } } CacheValueDetails::NamespaceDetails ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const { RwLock::SharedLockScope _(m_Lock); CacheValueDetails::NamespaceDetails Details; if (BucketFilter.empty()) { Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); for (auto& Kv : m_Buckets) { Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter); } } else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) { Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter); } return Details; } } // namespace zen