// Copyright Epic Games, Inc. All Rights Reserved. #include "cachedisklayer.h" #include #include #include #include #include #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////// namespace zen { namespace { #pragma pack(push) #pragma pack(1) // We use this to indicate if a on disk bucket needs wiping // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scrable the references // to block items. // See: https://github.com/EpicGames/zen/pull/299 static const uint32_t CurrentDiskBucketVersion = 1; struct CacheBucketIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; static constexpr uint32_t Version2 = 2; static constexpr uint32_t CurrentVersion = Version2; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t PayloadAlignment = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) { return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } }; static_assert(sizeof(CacheBucketIndexHeader) == 32); #pragma pack(pop) const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + IndexExtension); } std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + ".tmp"); } std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + LogExtension); } bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } if (Entry.Location.Reserved != 0) { OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); return false; } if (Entry.Location.GetFlags() & ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); return false; } if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) { return true; } if (Entry.Location.Reserved != 0) { OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); return false; } uint64_t Size = Entry.Location.Size(); if (Size == 0) { OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); return false; } return true; } bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) { int DropIndex = 0; do { if (!std::filesystem::exists(Dir)) { return false; } std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; if (std::filesystem::exists(DroppedBucketPath)) { DropIndex++; continue; } std::error_code Ec; std::filesystem::rename(Dir, DroppedBucketPath, Ec); if (!Ec) { DeleteDirectories(DroppedBucketPath); return true; } // TODO: Do we need to bail at some point? zen::Sleep(100); } while (true); } } // namespace namespace fs = std::filesystem; static CbObject LoadCompactBinaryObject(const fs::path& Path) { FileContents Result = ReadFile(Path); if (!Result.ErrorCode) { IoBuffer Buffer = Result.Flatten(); if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) { return LoadCompactBinaryObject(Buffer); } } return CbObject(); } static void SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) { WriteFile(Path, Object.GetBuffer().AsIoBuffer()); } ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero) { if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) { // This is pretty ad hoc but in order to avoid too many individual files // it makes sense to have a different strategy for legacy values m_LargeObjectThreshold = 16 * 1024 * 1024; } } ZenCacheDiskLayer::CacheBucket::~CacheBucket() { } bool ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) { using namespace std::literals; ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate"); ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir); m_BlocksBasePath = BucketDir / "blocks"; m_BucketDir = BucketDir; CreateDirectories(m_BucketDir); std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; bool IsNew = false; CbObject Manifest = LoadCompactBinaryObject(ManifestPath); if (Manifest) { m_BucketId = Manifest["BucketId"sv].AsObjectId(); if (m_BucketId == Oid::Zero) { return false; } const uint32_t Version = Manifest["Version"sv].AsUInt32(0); if (Version != CurrentDiskBucketVersion) { ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion); IsNew = true; } } else if (AllowCreate) { m_BucketId.Generate(); CbObjectWriter Writer; Writer << "BucketId"sv << m_BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; Manifest = Writer.Save(); SaveCompactBinaryObject(ManifestPath, Manifest); IsNew = true; } else { return false; } OpenLog(IsNew); if (!IsNew) { Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); for (CbFieldView Entry : Manifest["Timestamps"sv]) { const CbObjectView Obj = Entry.AsObjectView(); const IoHash Key = Obj["Key"sv].AsHash(); if (auto It = m_Index.find(Key); It != m_Index.end()) { size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); } } for (CbFieldView Entry : Manifest["RawInfo"sv]) { const CbObjectView Obj = Entry.AsObjectView(); const IoHash Key = Obj["Key"sv].AsHash(); if (auto It = m_Index.find(Key); It != m_Index.end()) { size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); const IoHash RawHash = Obj["RawHash"sv].AsHash(); const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); if (RawHash == IoHash::Zero || RawSize == 0) { ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); } m_Payloads[EntryIndex].RawHash = RawHash; m_Payloads[EntryIndex].RawSize = RawSize; } } } return true; } void ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() { ZEN_TRACE_CPU("Z$::Bucket::MakeIndexSnapshot"); uint64_t LogCount = m_SlogFile.GetLogCount(); if (m_LogFlushPosition == LogCount) { return; } ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", m_BucketDir, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); // Move index away, we keep it if something goes wrong if (fs::is_regular_file(STmpIndexPath)) { fs::remove(STmpIndexPath); } if (fs::is_regular_file(IndexPath)) { fs::rename(IndexPath, STmpIndexPath); } try { // Write the current state of the location map to a new index state std::vector Entries; Entries.resize(m_Index.size()); { uint64_t EntryIndex = 0; for (auto& Entry : m_Index) { DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; IndexEntry.Key = Entry.first; IndexEntry.Location = m_Payloads[Entry.second].Location; } } BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount, .PayloadAlignment = gsl::narrow(m_PayloadAlignment)}; Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); ObjectIndexFile.Flush(); ObjectIndexFile.Close(); EntryCount = Entries.size(); m_LogFlushPosition = LogCount; } catch (std::exception& Err) { ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); // Restore any previous snapshot if (fs::is_regular_file(STmpIndexPath)) { fs::remove(IndexPath); fs::rename(STmpIndexPath, IndexPath); } } if (fs::is_regular_file(STmpIndexPath)) { fs::remove(STmpIndexPath); } } uint64_t ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) { ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile"); if (std::filesystem::is_regular_file(IndexPath)) { BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t Size = ObjectIndexFile.FileSize(); if (Size >= sizeof(CacheBucketIndexHeader)) { CacheBucketIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0)) { switch (Header.Version) { case CacheBucketIndexHeader::Version2: { uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); if (Header.EntryCount > ExpectedEntryCount) { break; } size_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); m_PayloadAlignment = Header.PayloadAlignment; std::vector Entries; Entries.resize(Header.EntryCount); ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); m_Payloads.reserve(Header.EntryCount); m_AccessTimes.reserve(Header.EntryCount); m_Index.reserve(Header.EntryCount); std::string InvalidEntryReason; for (const DiskIndexEntry& Entry : Entries) { if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; } size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(Entry.Key, EntryIndex); EntryCount++; } OutVersion = CacheBucketIndexHeader::Version2; return Header.LogPosition; } break; default: break; } } } ZEN_WARN("skipping invalid index file '{}'", IndexPath); } return 0; } uint64_t ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_TRACE_CPU("Z$::Bucket::ReadLog"); if (std::filesystem::is_regular_file(LogPath)) { uint64_t LogEntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (CasLog.Initialize()) { uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } LogEntryCount = EntryCount - SkipEntryCount; m_Index.reserve(LogEntryCount); uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const DiskIndexEntry& Record) { std::string InvalidEntryReason; if (Record.Location.Flags & DiskLocation::kTombStone) { m_Index.erase(Record.Key); return; } if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; return; } size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(Record.Key, EntryIndex); }, SkipEntryCount); if (InvalidEntryCount) { ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); } return LogEntryCount; } } return 0; }; void ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) { ZEN_TRACE_CPU("Z$::Bucket::OpenLog"); m_TotalStandaloneSize = 0; m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); if (IsNew) { fs::remove(LogPath); fs::remove(IndexPath); fs::remove_all(m_BlocksBasePath); } CreateDirectories(m_BucketDir); std::unordered_map BlockSizes = m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); if (std::filesystem::is_regular_file(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); std::filesystem::remove(IndexPath); } } uint64_t LogEntryCount = 0; if (std::filesystem::is_regular_file(LogPath)) { if (TCasLogFile::IsValid(LogPath)) { LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); } else if (fs::is_regular_file(LogPath)) { ZEN_WARN("removing invalid cas log at '{}'", LogPath); std::filesystem::remove(LogPath); } } m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); std::vector KnownLocations; KnownLocations.reserve(m_Index.size()); std::vector BadEntries; for (const auto& Entry : m_Index) { size_t EntryIndex = Entry.second; const BucketPayload& Payload = m_Payloads[EntryIndex]; const DiskLocation& Location = Payload.Location; if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); continue; } const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); auto BlockIt = BlockSizes.find(BlockLocation.BlockIndex); if (BlockIt == BlockSizes.end()) { ZEN_WARN("Unknown block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); } else { uint64_t BlockSize = BlockIt->second; if (BlockLocation.Offset + BlockLocation.Size > BlockSize) { ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); } else { KnownLocations.push_back(BlockLocation); continue; } } DiskLocation NewLocation = Payload.Location; NewLocation.Flags |= DiskLocation::kTombStone; BadEntries.push_back(DiskIndexEntry{.Key = Entry.first, .Location = NewLocation}); } if (!BadEntries.empty()) { m_SlogFile.Append(BadEntries); m_SlogFile.Flush(); LogEntryCount += BadEntries.size(); for (const DiskIndexEntry& BadEntry : BadEntries) { m_Index.erase(BadEntry.Key); } } m_BlockStore.Prune(KnownLocations); if (IsNew || LogEntryCount > 0) { MakeIndexSnapshot(); } // TODO: should validate integrity of container files here } void ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const { char HexString[sizeof(HashKey.Hash) * 2]; ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); Path.Append(m_BucketDir); Path.AppendSeparator(); Path.Append(L"blob"); Path.AppendSeparator(); Path.AppendAsciiRange(HexString, HexString + 3); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 3, HexString + 5); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); } IoBuffer ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const { BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); IoBuffer Value = m_BlockStore.TryGetChunk(Location); if (Value) { Value.SetContentType(Loc.GetContentType()); } return Value; } IoBuffer ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const { ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue"); ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) { Data.SetContentType(Loc.GetContentType()); return Data; } return {}; } bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { RwLock::SharedLockScope _(m_IndexLock); auto It = m_Index.find(HashKey); if (It == m_Index.end()) { return false; } size_t EntryIndex = It.value(); const BucketPayload& Payload = m_Payloads[EntryIndex]; m_AccessTimes[EntryIndex] = GcClock::TickCount(); DiskLocation Location = Payload.Location; OutValue.RawSize = Payload.RawSize; OutValue.RawHash = Payload.RawHash; if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { // We don't need to hold the index lock when we read a standalone file _.ReleaseNow(); OutValue.Value = GetStandaloneCacheValue(Location, HashKey); } else { OutValue.Value = GetInlineCacheValue(Location); } _.ReleaseNow(); if (!Location.IsFlagSet(DiskLocation::kStructured)) { if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0) { if (Location.IsFlagSet(DiskLocation::kCompressed)) { (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize); } else { OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); OutValue.RawSize = OutValue.Value.GetSize(); } RwLock::ExclusiveLockScope __(m_IndexLock); if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) { BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; WritePayload.RawHash = OutValue.RawHash; WritePayload.RawSize = OutValue.RawSize; m_LogFlushPosition = 0; // Force resave of index on exit } } } return (bool)OutValue.Value; } void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { if (Value.Value.Size() >= m_LargeObjectThreshold) { return PutStandaloneCacheValue(HashKey, Value); } PutInlineCacheValue(HashKey, Value); } bool ZenCacheDiskLayer::CacheBucket::Drop() { ZEN_TRACE_CPU("Z$::Bucket::Drop"); RwLock::ExclusiveLockScope _(m_IndexLock); std::vector> ShardLocks; ShardLocks.reserve(256); for (RwLock& Lock : m_ShardedLocks) { ShardLocks.push_back(std::make_unique(Lock)); } m_BlockStore.Close(); m_SlogFile.Close(); bool Deleted = MoveAndDeleteDirectory(m_BucketDir); m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); return Deleted; } void ZenCacheDiskLayer::CacheBucket::Flush() { ZEN_TRACE_CPU("Z$::Bucket::Flush"); m_BlockStore.Flush(); RwLock::SharedLockScope _(m_IndexLock); m_SlogFile.Flush(); MakeIndexSnapshot(); SaveManifest(); } void ZenCacheDiskLayer::CacheBucket::SaveManifest() { using namespace std::literals; ZEN_TRACE_CPU("Z$::Bucket::SaveManifest"); CbObjectWriter Writer; Writer << "BucketId"sv << m_BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; if (!m_Index.empty()) { Writer.BeginArray("Timestamps"sv); for (auto& Kv : m_Index) { const IoHash& Key = Kv.first; GcClock::Tick AccessTime = m_AccessTimes[Kv.second]; Writer.BeginObject(); Writer << "Key"sv << Key; Writer << "LastAccess"sv << AccessTime; Writer.EndObject(); } Writer.EndArray(); Writer.BeginArray("RawInfo"sv); { for (auto& Kv : m_Index) { const IoHash& Key = Kv.first; const BucketPayload& Payload = m_Payloads[Kv.second]; if (Payload.RawHash != IoHash::Zero) { Writer.BeginObject(); Writer << "Key"sv << Key; Writer << "RawHash"sv << Payload.RawHash; Writer << "RawSize"sv << Payload.RawSize; Writer.EndObject(); } } } Writer.EndArray(); } try { SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); } catch (std::exception& Err) { ZEN_WARN("writing manifest FAILED, reason: '{}'", Err.what()); } } IoHash HashBuffer(const CompositeBuffer& Buffer) { IoHashStream Hasher; for (const SharedBuffer& Segment : Buffer.GetSegments()) { Hasher.Append(Segment.GetView()); } return Hasher.GetHash(); } bool ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) { ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType); if (ContentType == ZenContentType::kCbObject) { CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); if (Error == CbValidateError::None) { return true; } ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error)); return false; } else if (ContentType == ZenContentType::kCompressedBinary) { IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer); IoHash HeaderRawHash; uint64_t RawSize = 0; if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize)) { ZEN_SCOPED_ERROR("compressed buffer header validation failed"); return false; } CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize); CompositeBuffer Decompressed = Compressed.DecompressToComposite(); IoHash DecompressedHash = HashBuffer(Decompressed); if (HeaderRawHash != DecompressedHash) { ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash); return false; } } else { // No way to verify this kind of content (what is it exactly?) static int Once = [&] { ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType)); return 42; }(); } return true; }; void ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::Bucket::Scrub"); ZEN_INFO("scrubbing '{}'", m_BucketDir); Stopwatch Timer; uint64_t ChunkCount = 0; uint64_t VerifiedChunkBytes = 0; auto LogStats = MakeGuard([&] { const uint32_t DurationMs = gsl::narrow(Timer.GetElapsedTimeMs()); ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", m_BucketName, NiceBytes(VerifiedChunkBytes), NiceTimeSpanMs(DurationMs), ChunkCount, NiceRate(VerifiedChunkBytes, DurationMs)); }); std::vector BadKeys; auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); }; try { std::vector ChunkLocations; std::vector ChunkIndexToChunkHash; RwLock::SharedLockScope _(m_IndexLock); const size_t BlockChunkInitialCount = m_Index.size() / 4; ChunkLocations.reserve(BlockChunkInitialCount); ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); // Do a pass over the index and verify any standalone file values straight away // all other storage classes are gathered and verified in bulk in order to enable // more efficient I/O scheduling for (auto& Kv : m_Index) { const IoHash& HashKey = Kv.first; const BucketPayload& Payload = m_Payloads[Kv.second]; const DiskLocation& Loc = Payload.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { Ctx.ThrowIfDeadlineExpired(); ++ChunkCount; VerifiedChunkBytes += Loc.Size(); if (Loc.GetContentType() == ZenContentType::kBinary) { // Blob cache value, not much we can do about data integrity checking // here since there's no hash available ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); std::error_code Ec; uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); if (Ec) { ReportBadKey(HashKey); } if (size != Loc.Size()) { ReportBadKey(HashKey); } continue; } else { // Structured cache value IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); if (!Buffer) { ReportBadKey(HashKey); continue; } if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer)) { ReportBadKey(HashKey); continue; } } } else { ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment)); ChunkIndexToChunkHash.push_back(HashKey); continue; } } const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void { ++ChunkCount; VerifiedChunkBytes += Size; const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; if (!Data) { // ChunkLocation out of range of stored blocks ReportBadKey(Hash); return; } if (!Size) { ReportBadKey(Hash); return; } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); if (!Buffer) { ReportBadKey(Hash); return; } const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) { ReportBadKey(Hash); return; } }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void { Ctx.ThrowIfDeadlineExpired(); ++ChunkCount; VerifiedChunkBytes += Size; const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); if (!Buffer) { ReportBadKey(Hash); return; } const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) { ReportBadKey(Hash); return; } }; m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); } catch (ScrubDeadlineExpiredException&) { ZEN_INFO("Scrubbing deadline expired, operation incomplete"); } Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes); if (!BadKeys.empty()) { ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir); if (Ctx.RunRecovery()) { // Deal with bad chunks by removing them from our lookup map std::vector LogEntries; LogEntries.reserve(BadKeys.size()); { RwLock::ExclusiveLockScope __(m_IndexLock); for (const IoHash& BadKey : BadKeys) { // Log a tombstone and delete the in-memory index for the bad entry const auto It = m_Index.find(BadKey); const BucketPayload& Payload = m_Payloads[It->second]; DiskLocation Location = Payload.Location; Location.Flags |= DiskLocation::kTombStone; LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); m_Index.erase(BadKey); } } for (const DiskIndexEntry& Entry : LogEntries) { if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) { ExtendablePathBuilder<256> Path; BuildPath(Path, Entry.Key); fs::path FilePath = Path.ToPath(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); if (fs::is_regular_file(FilePath)) { ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); std::error_code Ec; fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... } m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); } } m_SlogFile.Append(LogEntries); // Clean up m_AccessTimes and m_Payloads vectors { std::vector Payloads; std::vector AccessTimes; IndexMap Index; { RwLock::ExclusiveLockScope __(m_IndexLock); size_t EntryCount = m_Index.size(); Payloads.reserve(EntryCount); AccessTimes.reserve(EntryCount); Index.reserve(EntryCount); for (auto It : m_Index) { size_t EntryIndex = Payloads.size(); Payloads.push_back(m_Payloads[EntryIndex]); AccessTimes.push_back(m_AccessTimes[EntryIndex]); Index.insert({It.first, EntryIndex}); } m_Index.swap(Index); m_Payloads.swap(Payloads); m_AccessTimes.swap(AccessTimes); } } } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do if (!BadKeys.empty()) { Ctx.ReportBadCidChunks(BadKeys); } } void ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), NiceLatencyNs(ReadBlockTimeUs), NiceLatencyNs(ReadBlockLongestTimeUs)); }); const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime(); const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); IndexMap Index; std::vector AccessTimes; std::vector Payloads; { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); Index = m_Index; AccessTimes = m_AccessTimes; Payloads = m_Payloads; } std::vector ExpiredKeys; ExpiredKeys.reserve(1024); std::vector Cids; Cids.reserve(1024); for (const auto& Entry : Index) { const IoHash& Key = Entry.first; GcClock::Tick AccessTime = AccessTimes[Entry.second]; if (AccessTime < ExpireTicks) { ExpiredKeys.push_back(Key); continue; } const DiskLocation& Loc = Payloads[Entry.second].Location; if (Loc.IsFlagSet(DiskLocation::kStructured)) { if (Cids.size() > 1024) { GcCtx.AddRetainedCids(Cids); Cids.clear(); } IoBuffer Buffer; { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { // We don't need to hold the index lock when we read a standalone file __.ReleaseNow(); if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer) { continue; } } else if (Buffer = GetInlineCacheValue(Loc); !Buffer) { continue; } } ZEN_ASSERT(Buffer); ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); CbObject Obj(SharedBuffer{Buffer}); Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); } } GcCtx.AddRetainedCids(Cids); GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); } void ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); Stopwatch TotalTimer; uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; uint64_t TotalChunkCount = 0; uint64_t DeletedSize = 0; uint64_t OldTotalSize = TotalSize(); std::unordered_set DeletedChunks; uint64_t MovedCount = 0; const auto _ = MakeGuard([&] { ZEN_DEBUG( "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " "{} " "of {} " "entires ({}).", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), NiceLatencyNs(ReadBlockTimeUs), NiceLatencyNs(ReadBlockLongestTimeUs), NiceBytes(DeletedSize), DeletedChunks.size(), MovedCount, TotalChunkCount, NiceBytes(OldTotalSize)); RwLock::SharedLockScope _(m_IndexLock); SaveManifest(); }); m_SlogFile.Flush(); std::span ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); std::vector DeleteCacheKeys; DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { if (Keep) { return; } DeleteCacheKeys.push_back(ChunkHash); }); if (DeleteCacheKeys.empty()) { ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir); return; } auto __ = MakeGuard([&]() { if (!DeletedChunks.empty()) { // Clean up m_AccessTimes and m_Payloads vectors std::vector Payloads; std::vector AccessTimes; IndexMap Index; { RwLock::ExclusiveLockScope _(m_IndexLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); size_t EntryCount = m_Index.size(); Payloads.reserve(EntryCount); AccessTimes.reserve(EntryCount); Index.reserve(EntryCount); for (auto It : m_Index) { size_t OldEntryIndex = It.second; size_t NewEntryIndex = Payloads.size(); Payloads.push_back(m_Payloads[OldEntryIndex]); AccessTimes.push_back(m_AccessTimes[OldEntryIndex]); Index.insert({It.first, NewEntryIndex}); } m_Index.swap(Index); m_Payloads.swap(Payloads); m_AccessTimes.swap(AccessTimes); } GcCtx.AddDeletedCids(std::vector(DeletedChunks.begin(), DeletedChunks.end())); } }); std::vector ExpiredStandaloneEntries; IndexMap Index; BlockStore::ReclaimSnapshotState BlockStoreState; { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); if (m_Index.empty()) { ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir); return; } BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); SaveManifest(); Index = m_Index; for (const IoHash& Key : DeleteCacheKeys) { if (auto It = Index.find(Key); It != Index.end()) { const BucketPayload& Payload = m_Payloads[It->second]; DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; if (Entry.Location.Flags & DiskLocation::kStandaloneFile) { Entry.Location.Flags |= DiskLocation::kTombStone; ExpiredStandaloneEntries.push_back(Entry); } } } if (GcCtx.IsDeletionMode()) { for (const auto& Entry : ExpiredStandaloneEntries) { m_Index.erase(Entry.Key); m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); DeletedChunks.insert(Entry.Key); } m_SlogFile.Append(ExpiredStandaloneEntries); } } if (GcCtx.IsDeletionMode()) { std::error_code Ec; ExtendablePathBuilder<256> Path; for (const auto& Entry : ExpiredStandaloneEntries) { const IoHash& Key = Entry.Key; const DiskLocation& Loc = Entry.Location; Path.Reset(); BuildPath(Path, Key); fs::path FilePath = Path.ToPath(); { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); if (m_Index.contains(Key)) { // Someone added it back, let the file on disk be ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); continue; } __.ReleaseNow(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); if (fs::is_regular_file(FilePath)) { ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); fs::remove(FilePath, Ec); } } if (Ec) { ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); Ec.clear(); DiskLocation RestoreLocation = Loc; RestoreLocation.Flags &= ~DiskLocation::kTombStone; RwLock::ExclusiveLockScope __(m_IndexLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); if (m_Index.contains(Key)) { continue; } m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert({Key, EntryIndex}); m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed); DeletedChunks.erase(Key); continue; } DeletedSize += Entry.Location.Size(); } } TotalChunkCount = Index.size(); std::vector TotalChunkHashes; TotalChunkHashes.reserve(TotalChunkCount); for (const auto& Entry : Index) { const DiskLocation& Location = m_Payloads[Entry.second].Location; if (Location.Flags & DiskLocation::kStandaloneFile) { continue; } TotalChunkHashes.push_back(Entry.first); } if (TotalChunkHashes.empty()) { return; } TotalChunkCount = TotalChunkHashes.size(); std::vector ChunkLocations; BlockStore::ChunkIndexArray KeepChunkIndexes; std::vector ChunkIndexToChunkHash; ChunkLocations.reserve(TotalChunkCount); ChunkLocations.reserve(TotalChunkCount); ChunkIndexToChunkHash.reserve(TotalChunkCount); GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { auto KeyIt = Index.find(ChunkHash); const DiskLocation& DiskLocation = m_Payloads[KeyIt->second].Location; BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; if (Keep) { KeepChunkIndexes.push_back(ChunkIndex); } }); size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); uint64_t CurrentTotalSize = TotalSize(); ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}", m_BucketDir, DeleteCount, TotalChunkCount, NiceBytes(CurrentTotalSize)); return; } m_BlockStore.ReclaimSpace( BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, false, [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { std::vector LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); for (const auto& Entry : MovedChunks) { size_t ChunkIndex = Entry.first; const BlockStoreLocation& NewLocation = Entry.second; const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; const DiskLocation& OldDiskLocation = OldPayload.Location; LogEntries.push_back( {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())}); } for (const size_t ChunkIndex : RemovedChunks) { const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; const DiskLocation& OldDiskLocation = OldPayload.Location; LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), m_PayloadAlignment, OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); DeletedChunks.insert(ChunkHash); } m_SlogFile.Append(LogEntries); m_SlogFile.Flush(); { RwLock::ExclusiveLockScope __(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); for (const DiskIndexEntry& Entry : LogEntries) { if (Entry.Location.GetFlags() & DiskLocation::kTombStone) { m_Index.erase(Entry.Key); continue; } m_Payloads[m_Index[Entry.Key]].Location = Entry.Location; } } }, [&]() { return GcCtx.CollectSmallObjects(); }); } void ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector& AccessTimes) { using namespace access_tracking; for (const KeyAccessTime& KeyTime : AccessTimes) { if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end()) { size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); m_AccessTimes[EntryIndex] = KeyTime.LastAccess; } } } uint64_t ZenCacheDiskLayer::CacheBucket::EntryCount() const { RwLock::SharedLockScope _(m_IndexLock); return static_cast(m_Index.size()); } CacheValueDetails::ValueDetails ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index) const { std::vector Attachments; const BucketPayload& Payload = m_Payloads[Index]; if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) { IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location, Key) : GetInlineCacheValue(Payload.Location); CbObject Obj(SharedBuffer{Value}); Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); } return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), .RawSize = Payload.RawSize, .RawHash = Payload.RawHash, .LastAccess = m_AccessTimes[Index], .Attachments = std::move(Attachments), .ContentType = Payload.Location.GetContentType()}; } CacheValueDetails::BucketDetails ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const { CacheValueDetails::BucketDetails Details; RwLock::SharedLockScope _(m_IndexLock); if (ValueFilter.empty()) { Details.Values.reserve(m_Index.size()); for (const auto& It : m_Index) { Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second)); } } else { IoHash Key = IoHash::FromHexString(ValueFilter); if (auto It = m_Index.find(Key); It != m_Index.end()) { Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second)); } } return Details; } void ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { CacheBucket& Bucket = *Kv.second; Bucket.CollectGarbage(GcCtx); } } void ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes) { RwLock::SharedLockScope _(m_Lock); for (const auto& Kv : AccessTimes.Buckets) { if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end()) { CacheBucket& Bucket = *It->second; Bucket.UpdateAccessTimes(Kv.second); } } } void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) { ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue"); uint64_t NewFileSize = Value.Value.Size(); TemporaryFile DataFile; std::error_code Ec; DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); } bool CleanUpTempFile = false; auto __ = MakeGuard([&] { if (CleanUpTempFile) { std::error_code Ec; std::filesystem::remove(DataFile.GetPath(), Ec); if (Ec) { ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", DataFile.GetPath(), m_BucketDir, Ec.message()); } } }); DataFile.WriteAll(Value.Value, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", NiceBytes(NewFileSize), DataFile.GetPath().string(), m_BucketDir)); } ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); std::filesystem::path FsPath{DataFilePath.ToPath()}; RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); // We do a speculative remove of the file instead of probing with a exists call and check the error code instead std::filesystem::remove(FsPath, Ec); if (Ec) { if (Ec.value() != ENOENT) { ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); Sleep(100); Ec.clear(); std::filesystem::remove(FsPath, Ec); if (Ec && Ec.value() != ENOENT) { throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); } } } // Assume parent directory exists DataFile.MoveTemporaryIntoPlace(FsPath, Ec); if (Ec) { CreateDirectories(FsPath.parent_path()); // Try again after we or someone else created the directory Ec.clear(); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); // Retry if we still fail to handle contention to file system uint32_t RetriesLeft = 3; while (Ec && RetriesLeft > 0) { ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retries left: {}.", FsPath, DataFile.GetPath(), m_BucketDir, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms Ec.clear(); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); RetriesLeft--; } if (Ec) { throw std::system_error( Ec, fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir)); } } // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file // will be disabled as the file handle has already been closed CleanUpTempFile = false; uint8_t EntryFlags = DiskLocation::kStandaloneFile; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } DiskLocation Loc(NewFileSize, EntryFlags); RwLock::ExclusiveLockScope _(m_IndexLock); if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } else { // TODO: should check if write is idempotent and bail out if it is? size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}; m_AccessTimes.emplace_back(GcClock::TickCount()); m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); } void ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) { uint8_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); m_SlogFile.Append({.Key = HashKey, .Location = Location}); RwLock::ExclusiveLockScope _(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) { // TODO: should check if write is idempotent and bail out if it is? // this would requiring comparing contents on disk unless we add a // content hash to the index entry size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); m_AccessTimes[EntryIndex] = GcClock::TickCount(); } else { size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } }); } ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) { } ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto It = m_Buckets.find(BucketName); if (It != m_Buckets.end()) { Bucket = It->second.get(); } } if (Bucket == nullptr) { // Bucket needs to be opened/created RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { Bucket = It->second.get(); } else { auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique(BucketName)); Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; if (!Bucket->OpenOrCreate(BucketPath)) { m_Buckets.erase(InsertResult.first); return false; } } } ZEN_ASSERT(Bucket != nullptr); return Bucket->Get(HashKey, OutValue); } void ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto It = m_Buckets.find(BucketName); if (It != m_Buckets.end()) { Bucket = It->second.get(); } } if (Bucket == nullptr) { // New bucket needs to be created RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { Bucket = It->second.get(); } else { auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique(BucketName)); Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; try { if (!Bucket->OpenOrCreate(BucketPath)) { ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); m_Buckets.erase(InsertResult.first); return; } } catch (const std::exception& Err) { ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); return; } } } ZEN_ASSERT(Bucket != nullptr); Bucket->Put(HashKey, Value); } void ZenCacheDiskLayer::DiscoverBuckets() { DirectoryContent DirContent; GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); // Initialize buckets RwLock::ExclusiveLockScope _(m_Lock); for (const std::filesystem::path& BucketPath : DirContent.Directories) { const std::string BucketName = PathToUtf8(BucketPath.stem()); // New bucket needs to be created if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { continue; } auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique(BucketName)); CacheBucket& Bucket = *InsertResult.first->second; try { if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false)) { ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); m_Buckets.erase(InsertResult.first); continue; } } catch (const std::exception& Err) { ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); return; } ZEN_INFO("Discovered bucket '{}'", BucketName); } } bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { RwLock::ExclusiveLockScope _(m_Lock); auto It = m_Buckets.find(std::string(InBucket)); if (It != m_Buckets.end()) { CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); return Bucket.Drop(); } // Make sure we remove the folder even if we don't know about the bucket std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); return MoveAndDeleteDirectory(BucketPath); } bool ZenCacheDiskLayer::Drop() { RwLock::ExclusiveLockScope _(m_Lock); std::vector> Buckets; Buckets.reserve(m_Buckets.size()); while (!m_Buckets.empty()) { const auto& It = m_Buckets.begin(); CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); if (!Bucket.Drop()) { return false; } } return MoveAndDeleteDirectory(m_RootDir); } void ZenCacheDiskLayer::Flush() { std::vector Buckets; { RwLock::SharedLockScope _(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { CacheBucket* Bucket = Kv.second.get(); Buckets.push_back(Bucket); } } for (auto& Bucket : Buckets) { Bucket->Flush(); } } void ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); { std::vector> Results; Results.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { #if 1 Results.push_back( Ctx.ThreadPool().EnqueueTask(std::packaged_task{[Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); #else CacheBucket& Bucket = *Kv.second; Bucket.ScrubStorage(Ctx); #endif } for (auto& Result : Results) { Result.get(); } } } void ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { CacheBucket& Bucket = *Kv.second; Bucket.GatherReferences(GcCtx); } } uint64_t ZenCacheDiskLayer::TotalSize() const { uint64_t TotalSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { TotalSize += Kv.second->TotalSize(); } return TotalSize; } ZenCacheDiskLayer::Info ZenCacheDiskLayer::GetInfo() const { ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir}, .TotalSize = TotalSize()}; RwLock::SharedLockScope _(m_Lock); Info.BucketNames.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Info.BucketNames.push_back(Kv.first); Info.EntryCount += Kv.second->EntryCount(); } return Info; } std::optional ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; } return {}; } CacheValueDetails::NamespaceDetails ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const { RwLock::SharedLockScope _(m_Lock); CacheValueDetails::NamespaceDetails Details; if (BucketFilter.empty()) { Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); for (auto& Kv : m_Buckets) { Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter); } } else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) { Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter); } return Details; } } // namespace zen