diff options
| author | Stefan Boberg <[email protected]> | 2023-05-17 10:31:50 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-17 10:31:50 +0200 |
| commit | 5adba30f4528a7d74090a8391d09b287501846a7 (patch) | |
| tree | 25476b8e49fb5a44170b4d181de60de1f2d88ebe /src/zenserver/cache/cachedisklayer.cpp | |
| parent | amended CHANGELOG.md with recent changes (diff) | |
| download | zen-5adba30f4528a7d74090a8391d09b287501846a7.tar.xz zen-5adba30f4528a7d74090a8391d09b287501846a7.zip | |
Restructured structured cache store (#314)
This change separates out the disk and memory storage strategies into separate cpp/h files to improve maintainability.
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 2127 |
1 files changed, 2127 insertions, 0 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp new file mode 100644 index 000000000..e37d732c0 --- /dev/null +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -0,0 +1,2127 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "cachedisklayer.h" + +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compress.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zencore/xxhash.h> + +#include <future> + +////////////////////////////////////////////////////////////////////////// + +namespace zen { +namespace { + +#pragma pack(push) +#pragma pack(1) + + // We use this to indicate if a on disk bucket needs wiping + // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scrable the references + // to block items. + // See: https://github.com/EpicGames/zen/pull/299 + static const uint32_t CurrentDiskBucketVersion = 1; + + struct CacheBucketIndexHeader + { + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t Version2 = 2; + static constexpr uint32_t CurrentVersion = Version2; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t PayloadAlignment = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + }; + + static_assert(sizeof(CacheBucketIndexHeader) == 32); + +#pragma pack(pop) + + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".slog"; + + std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + IndexExtension); + } + + std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + ".tmp"); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + LogExtension); + } + + bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.Reserved != 0) + { + OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.GetFlags() & + ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + { + return true; + } + if (Entry.Location.Reserved != 0) + { + OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); + return false; + } + uint64_t Size = Entry.Location.Size(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + + bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) + { + int DropIndex = 0; + do + { + if (!std::filesystem::exists(Dir)) + { + return false; + } + + std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); + std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; + if (std::filesystem::exists(DroppedBucketPath)) + { + DropIndex++; + continue; + } + + std::error_code Ec; + std::filesystem::rename(Dir, DroppedBucketPath, Ec); + if (!Ec) + { + DeleteDirectories(DroppedBucketPath); + return true; + } + // TODO: Do we need to bail at some point? + zen::Sleep(100); + } while (true); + } +} // namespace + +namespace fs = std::filesystem; + +static CbObject +LoadCompactBinaryObject(const fs::path& Path) +{ + FileContents Result = ReadFile(Path); + + if (!Result.ErrorCode) + { + IoBuffer Buffer = Result.Flatten(); + if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + { + return LoadCompactBinaryObject(Buffer); + } + } + + return CbObject(); +} + +static void +SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) +{ + WriteFile(Path, Object.GetBuffer().AsIoBuffer()); +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero) +{ + if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) + { + // This is pretty ad hoc but in order to avoid too many individual files + // it makes sense to have a different strategy for legacy values + m_LargeObjectThreshold = 16 * 1024 * 1024; + } +} + +ZenCacheDiskLayer::CacheBucket::~CacheBucket() +{ +} + +bool +ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) +{ + using namespace std::literals; + + ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate"); + + ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir); + + m_BlocksBasePath = BucketDir / "blocks"; + m_BucketDir = BucketDir; + + CreateDirectories(m_BucketDir); + + std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; + + bool IsNew = false; + + CbObject Manifest = LoadCompactBinaryObject(ManifestPath); + + if (Manifest) + { + m_BucketId = Manifest["BucketId"sv].AsObjectId(); + if (m_BucketId == Oid::Zero) + { + return false; + } + // uint32_t Version = Manifest["Version"sv].AsUInt32(0); + // if (Version != CurrentDiskBucketVersion) + //{ + // ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion); + // IsNew = true; + // } + } + else if (AllowCreate) + { + m_BucketId.Generate(); + + CbObjectWriter Writer; + Writer << "BucketId"sv << m_BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + Manifest = Writer.Save(); + SaveCompactBinaryObject(ManifestPath, Manifest); + IsNew = true; + } + else + { + return false; + } + + OpenLog(IsNew); + + if (!IsNew) + { + Stopwatch Timer; + const auto _ = + MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + + for (CbFieldView Entry : Manifest["Timestamps"sv]) + { + const CbObjectView Obj = Entry.AsObjectView(); + const IoHash Key = Obj["Key"sv].AsHash(); + + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); + } + } + for (CbFieldView Entry : Manifest["RawInfo"sv]) + { + const CbObjectView Obj = Entry.AsObjectView(); + const IoHash Key = Obj["Key"sv].AsHash(); + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); + + const IoHash RawHash = Obj["RawHash"sv].AsHash(); + const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); + + if (RawHash == IoHash::Zero || RawSize == 0) + { + ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); + } + + m_Payloads[EntryIndex].RawHash = RawHash; + m_Payloads[EntryIndex].RawSize = RawSize; + } + } + } + + return true; +} + +void +ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() +{ + ZEN_TRACE_CPU("Z$::Bucket::MakeIndexSnapshot"); + + uint64_t LogCount = m_SlogFile.GetLogCount(); + if (m_LogFlushPosition == LogCount) + { + return; + } + + ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", + m_BucketDir, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + namespace fs = std::filesystem; + + fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); + + // Move index away, we keep it if something goes wrong + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } + if (fs::is_regular_file(IndexPath)) + { + fs::rename(IndexPath, STmpIndexPath); + } + + try + { + // Write the current state of the location map to a new index state + std::vector<DiskIndexEntry> Entries; + Entries.resize(m_Index.size()); + + { + uint64_t EntryIndex = 0; + for (auto& Entry : m_Index) + { + DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Location = m_Payloads[Entry.second].Location; + } + } + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); + CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; + + Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); + + ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); + ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + ObjectIndexFile.Flush(); + ObjectIndexFile.Close(); + EntryCount = Entries.size(); + m_LogFlushPosition = LogCount; + } + catch (std::exception& Err) + { + ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); + + // Restore any previous snapshot + + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(IndexPath); + fs::rename(STmpIndexPath, IndexPath); + } + } + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) +{ + ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile"); + + if (std::filesystem::is_regular_file(IndexPath)) + { + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CacheBucketIndexHeader)) + { + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && + (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0)) + { + switch (Header.Version) + { + case CacheBucketIndexHeader::Version2: + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + if (Header.EntryCount > ExpectedEntryCount) + { + break; + } + size_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing {} entries in {}", + IndexPath, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + m_PayloadAlignment = Header.PayloadAlignment; + + std::vector<DiskIndexEntry> Entries; + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), + Header.EntryCount * sizeof(DiskIndexEntry), + sizeof(CacheBucketIndexHeader)); + + m_Payloads.reserve(Header.EntryCount); + m_AccessTimes.reserve(Header.EntryCount); + m_Index.reserve(Header.EntryCount); + + std::string InvalidEntryReason; + for (const DiskIndexEntry& Entry : Entries) + { + if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert_or_assign(Entry.Key, EntryIndex); + EntryCount++; + } + OutVersion = CacheBucketIndexHeader::Version2; + return Header.LogPosition; + } + break; + default: + break; + } + } + } + ZEN_WARN("skipping invalid index file '{}'", IndexPath); + } + return 0; +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) +{ + ZEN_TRACE_CPU("Z$::Bucket::ReadLog"); + + if (std::filesystem::is_regular_file(LogPath)) + { + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) + { + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + LogEntryCount = EntryCount - SkipEntryCount; + m_Index.reserve(LogEntryCount); + uint64_t InvalidEntryCount = 0; + CasLog.Replay( + [&](const DiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Location.Flags & DiskLocation::kTombStone) + { + m_Index.erase(Record.Key); + return; + } + if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert_or_assign(Record.Key, EntryIndex); + }, + SkipEntryCount); + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); + } + return LogEntryCount; + } + } + return 0; +}; + +void +ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) +{ + ZEN_TRACE_CPU("Z$::Bucket::OpenLog"); + + m_TotalStandaloneSize = 0; + + m_Index.clear(); + m_Payloads.clear(); + m_AccessTimes.clear(); + + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + + if (IsNew) + { + fs::remove(LogPath); + fs::remove(IndexPath); + fs::remove_all(m_BlocksBasePath); + } + + CreateDirectories(m_BucketDir); + + std::unordered_map<uint32_t, uint64_t> BlockSizes = + m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); + + if (std::filesystem::is_regular_file(IndexPath)) + { + uint32_t IndexVersion = 0; + m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); + if (IndexVersion == 0) + { + ZEN_WARN("removing invalid index file at '{}'", IndexPath); + std::filesystem::remove(IndexPath); + } + } + + uint64_t LogEntryCount = 0; + if (std::filesystem::is_regular_file(LogPath)) + { + if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath)) + { + LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); + } + else if (fs::is_regular_file(LogPath)) + { + ZEN_WARN("removing invalid cas log at '{}'", LogPath); + std::filesystem::remove(LogPath); + } + } + + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); + + std::vector<BlockStoreLocation> KnownLocations; + KnownLocations.reserve(m_Index.size()); + std::vector<DiskIndexEntry> BadEntries; + for (const auto& Entry : m_Index) + { + size_t EntryIndex = Entry.second; + const BucketPayload& Payload = m_Payloads[EntryIndex]; + const DiskLocation& Location = Payload.Location; + + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); + continue; + } + const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); + + auto BlockIt = BlockSizes.find(BlockLocation.BlockIndex); + if (BlockIt == BlockSizes.end()) + { + ZEN_WARN("Unknown block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + } + else + { + uint64_t BlockSize = BlockIt->second; + if (BlockLocation.Offset + BlockLocation.Size > BlockSize) + { + ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + } + else + { + KnownLocations.push_back(BlockLocation); + continue; + } + } + + DiskLocation NewLocation = Payload.Location; + NewLocation.Flags |= DiskLocation::kTombStone; + BadEntries.push_back(DiskIndexEntry{.Key = Entry.first, .Location = NewLocation}); + } + + if (!BadEntries.empty()) + { + m_SlogFile.Append(BadEntries); + m_SlogFile.Flush(); + + LogEntryCount += BadEntries.size(); + + for (const DiskIndexEntry& BadEntry : BadEntries) + { + m_Index.erase(BadEntry.Key); + } + } + + m_BlockStore.Prune(KnownLocations); + + if (IsNew || LogEntryCount > 0) + { + MakeIndexSnapshot(); + } + // TODO: should validate integrity of container files here +} + +void +ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const +{ + char HexString[sizeof(HashKey.Hash) * 2]; + ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); + + Path.Append(m_BucketDir); + Path.AppendSeparator(); + Path.Append(L"blob"); + Path.AppendSeparator(); + Path.AppendAsciiRange(HexString, HexString + 3); + Path.AppendSeparator(); + Path.AppendAsciiRange(HexString + 3, HexString + 5); + Path.AppendSeparator(); + Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); +} + +IoBuffer +ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const +{ + BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); + + IoBuffer Value = m_BlockStore.TryGetChunk(Location); + if (Value) + { + Value.SetContentType(Loc.GetContentType()); + } + + return Value; +} + +IoBuffer +ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const +{ + ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue"); + + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); + + RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); + + if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) + { + Data.SetContentType(Loc.GetContentType()); + + return Data; + } + + return {}; +} + +bool +ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) +{ + RwLock::SharedLockScope _(m_IndexLock); + auto It = m_Index.find(HashKey); + if (It == m_Index.end()) + { + return false; + } + size_t EntryIndex = It.value(); + const BucketPayload& Payload = m_Payloads[EntryIndex]; + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + DiskLocation Location = Payload.Location; + OutValue.RawSize = Payload.RawSize; + OutValue.RawHash = Payload.RawHash; + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + // We don't need to hold the index lock when we read a standalone file + _.ReleaseNow(); + OutValue.Value = GetStandaloneCacheValue(Location, HashKey); + } + else + { + OutValue.Value = GetInlineCacheValue(Location); + } + _.ReleaseNow(); + + if (!Location.IsFlagSet(DiskLocation::kStructured)) + { + if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0) + { + if (Location.IsFlagSet(DiskLocation::kCompressed)) + { + (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize); + } + else + { + OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); + OutValue.RawSize = OutValue.Value.GetSize(); + } + RwLock::ExclusiveLockScope __(m_IndexLock); + if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) + { + BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; + WritePayload.RawHash = OutValue.RawHash; + WritePayload.RawSize = OutValue.RawSize; + + m_LogFlushPosition = 0; // Force resave of index on exit + } + } + } + + return (bool)OutValue.Value; +} + +void +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) +{ + if (Value.Value.Size() >= m_LargeObjectThreshold) + { + return PutStandaloneCacheValue(HashKey, Value); + } + PutInlineCacheValue(HashKey, Value); +} + +bool +ZenCacheDiskLayer::CacheBucket::Drop() +{ + ZEN_TRACE_CPU("Z$::Bucket::Drop"); + + RwLock::ExclusiveLockScope _(m_IndexLock); + + std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks; + ShardLocks.reserve(256); + for (RwLock& Lock : m_ShardedLocks) + { + ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock)); + } + m_BlockStore.Close(); + m_SlogFile.Close(); + + bool Deleted = MoveAndDeleteDirectory(m_BucketDir); + + m_Index.clear(); + m_Payloads.clear(); + m_AccessTimes.clear(); + return Deleted; +} + +void +ZenCacheDiskLayer::CacheBucket::Flush() +{ + ZEN_TRACE_CPU("Z$::Bucket::Flush"); + + m_BlockStore.Flush(); + + RwLock::SharedLockScope _(m_IndexLock); + m_SlogFile.Flush(); + MakeIndexSnapshot(); + SaveManifest(); +} + +void +ZenCacheDiskLayer::CacheBucket::SaveManifest() +{ + using namespace std::literals; + + ZEN_TRACE_CPU("Z$::Bucket::SaveManifest"); + + CbObjectWriter Writer; + Writer << "BucketId"sv << m_BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + + if (!m_Index.empty()) + { + Writer.BeginArray("Timestamps"sv); + for (auto& Kv : m_Index) + { + const IoHash& Key = Kv.first; + GcClock::Tick AccessTime = m_AccessTimes[Kv.second]; + + Writer.BeginObject(); + Writer << "Key"sv << Key; + Writer << "LastAccess"sv << AccessTime; + Writer.EndObject(); + } + Writer.EndArray(); + + Writer.BeginArray("RawInfo"sv); + { + for (auto& Kv : m_Index) + { + const IoHash& Key = Kv.first; + const BucketPayload& Payload = m_Payloads[Kv.second]; + if (Payload.RawHash != IoHash::Zero) + { + Writer.BeginObject(); + Writer << "Key"sv << Key; + Writer << "RawHash"sv << Payload.RawHash; + Writer << "RawSize"sv << Payload.RawSize; + Writer.EndObject(); + } + } + } + Writer.EndArray(); + } + + try + { + SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); + } + catch (std::exception& Err) + { + ZEN_WARN("writing manifest FAILED, reason: '{}'", Err.what()); + } +} + +IoHash +HashBuffer(const CompositeBuffer& Buffer) +{ + IoHashStream Hasher; + + for (const SharedBuffer& Segment : Buffer.GetSegments()) + { + Hasher.Append(Segment.GetView()); + } + + return Hasher.GetHash(); +} + +bool +ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) +{ + ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType); + + if (ContentType == ZenContentType::kCbObject) + { + CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); + + if (Error == CbValidateError::None) + { + return true; + } + + ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error)); + + return false; + } + else if (ContentType == ZenContentType::kCompressedBinary) + { + IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer); + + IoHash HeaderRawHash; + uint64_t RawSize = 0; + if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize)) + { + ZEN_SCOPED_ERROR("compressed buffer header validation failed"); + + return false; + } + + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize); + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + IoHash DecompressedHash = HashBuffer(Decompressed); + + if (HeaderRawHash != DecompressedHash) + { + ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash); + + return false; + } + } + else + { + // No way to verify this kind of content (what is it exactly?) + + static int Once = [&] { + ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType)); + return 42; + }(); + } + + return true; +}; + +void +ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) +{ + ZEN_TRACE_CPU("Z$::Bucket::Scrub"); + + ZEN_INFO("scrubbing '{}'", m_BucketDir); + + Stopwatch Timer; + uint64_t ChunkCount = 0; + uint64_t VerifiedChunkBytes = 0; + + auto LogStats = MakeGuard([&] { + const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs()); + + ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", + m_BucketName, + NiceBytes(VerifiedChunkBytes), + NiceTimeSpanMs(DurationMs), + ChunkCount, + NiceRate(VerifiedChunkBytes, DurationMs)); + }); + + std::vector<IoHash> BadKeys; + auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); }; + + try + { + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<IoHash> ChunkIndexToChunkHash; + + RwLock::SharedLockScope _(m_IndexLock); + + const size_t BlockChunkInitialCount = m_Index.size() / 4; + ChunkLocations.reserve(BlockChunkInitialCount); + ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); + + // Do a pass over the index and verify any standalone file values straight away + // all other storage classes are gathered and verified in bulk in order to enable + // more efficient I/O scheduling + + for (auto& Kv : m_Index) + { + const IoHash& HashKey = Kv.first; + const BucketPayload& Payload = m_Payloads[Kv.second]; + const DiskLocation& Loc = Payload.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + Ctx.ThrowIfDeadlineExpired(); + + ++ChunkCount; + VerifiedChunkBytes += Loc.Size(); + + if (Loc.GetContentType() == ZenContentType::kBinary) + { + // Blob cache value, not much we can do about data integrity checking + // here since there's no hash available + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); + + RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); + + std::error_code Ec; + uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); + if (Ec) + { + ReportBadKey(HashKey); + } + if (size != Loc.Size()) + { + ReportBadKey(HashKey); + } + continue; + } + else + { + // Structured cache value + IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); + if (!Buffer) + { + ReportBadKey(HashKey); + continue; + } + if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer)) + { + ReportBadKey(HashKey); + continue; + } + } + } + else + { + ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment)); + ChunkIndexToChunkHash.push_back(HashKey); + continue; + } + } + + const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void { + ++ChunkCount; + VerifiedChunkBytes += Size; + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + if (!Data) + { + // ChunkLocation out of range of stored blocks + ReportBadKey(Hash); + return; + } + if (!Size) + { + ReportBadKey(Hash); + return; + } + IoBuffer Buffer(IoBuffer::Wrap, Data, Size); + if (!Buffer) + { + ReportBadKey(Hash); + return; + } + const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; + ZenContentType ContentType = Payload.Location.GetContentType(); + Buffer.SetContentType(ContentType); + if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) + { + ReportBadKey(Hash); + return; + } + }; + + const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void { + Ctx.ThrowIfDeadlineExpired(); + + ++ChunkCount; + VerifiedChunkBytes += Size; + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + if (!Buffer) + { + ReportBadKey(Hash); + return; + } + const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; + ZenContentType ContentType = Payload.Location.GetContentType(); + Buffer.SetContentType(ContentType); + if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) + { + ReportBadKey(Hash); + return; + } + }; + + m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + } + catch (ScrubDeadlineExpiredException&) + { + ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + } + + Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes); + + if (!BadKeys.empty()) + { + ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir); + + if (Ctx.RunRecovery()) + { + // Deal with bad chunks by removing them from our lookup map + + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(BadKeys.size()); + + { + RwLock::ExclusiveLockScope __(m_IndexLock); + for (const IoHash& BadKey : BadKeys) + { + // Log a tombstone and delete the in-memory index for the bad entry + const auto It = m_Index.find(BadKey); + const BucketPayload& Payload = m_Payloads[It->second]; + DiskLocation Location = Payload.Location; + Location.Flags |= DiskLocation::kTombStone; + LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); + m_Index.erase(BadKey); + } + } + for (const DiskIndexEntry& Entry : LogEntries) + { + if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + ExtendablePathBuilder<256> Path; + BuildPath(Path, Entry.Key); + fs::path FilePath = Path.ToPath(); + RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); + if (fs::is_regular_file(FilePath)) + { + ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); + std::error_code Ec; + fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... + } + m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + } + } + m_SlogFile.Append(LogEntries); + + // Clean up m_AccessTimes and m_Payloads vectors + { + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + IndexMap Index; + + { + RwLock::ExclusiveLockScope __(m_IndexLock); + size_t EntryCount = m_Index.size(); + Payloads.reserve(EntryCount); + AccessTimes.reserve(EntryCount); + Index.reserve(EntryCount); + for (auto It : m_Index) + { + size_t EntryIndex = Payloads.size(); + Payloads.push_back(m_Payloads[EntryIndex]); + AccessTimes.push_back(m_AccessTimes[EntryIndex]); + Index.insert({It.first, EntryIndex}); + } + m_Index.swap(Index); + m_Payloads.swap(Payloads); + m_AccessTimes.swap(AccessTimes); + } + } + } + } + + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + if (!BadKeys.empty()) + { + Ctx.ReportBadCidChunks(BadKeys); + } +} + +void +ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); + + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", + m_BucketDir, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs)); + }); + + const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime(); + + const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); + + IndexMap Index; + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + { + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + Index = m_Index; + AccessTimes = m_AccessTimes; + Payloads = m_Payloads; + } + + std::vector<IoHash> ExpiredKeys; + ExpiredKeys.reserve(1024); + + std::vector<IoHash> Cids; + Cids.reserve(1024); + + for (const auto& Entry : Index) + { + const IoHash& Key = Entry.first; + GcClock::Tick AccessTime = AccessTimes[Entry.second]; + if (AccessTime < ExpireTicks) + { + ExpiredKeys.push_back(Key); + continue; + } + + const DiskLocation& Loc = Payloads[Entry.second].Location; + + if (Loc.IsFlagSet(DiskLocation::kStructured)) + { + if (Cids.size() > 1024) + { + GcCtx.AddRetainedCids(Cids); + Cids.clear(); + } + + IoBuffer Buffer; + { + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + // We don't need to hold the index lock when we read a standalone file + __.ReleaseNow(); + if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer) + { + continue; + } + } + else if (Buffer = GetInlineCacheValue(Loc); !Buffer) + { + continue; + } + } + + ZEN_ASSERT(Buffer); + ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); + CbObject Obj(SharedBuffer{Buffer}); + Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); + } + } + + GcCtx.AddRetainedCids(Cids); + GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); +} + +void +ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); + + ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); + + Stopwatch TotalTimer; + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = 0; + uint64_t DeletedSize = 0; + uint64_t OldTotalSize = TotalSize(); + + std::unordered_set<IoHash> DeletedChunks; + uint64_t MovedCount = 0; + + const auto _ = MakeGuard([&] { + ZEN_DEBUG( + "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " + "{} " + "of {} " + "entires ({}).", + m_BucketDir, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedChunks.size(), + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize)); + RwLock::SharedLockScope _(m_IndexLock); + SaveManifest(); + }); + + m_SlogFile.Flush(); + + std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); + std::vector<IoHash> DeleteCacheKeys; + DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); + GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { + if (Keep) + { + return; + } + DeleteCacheKeys.push_back(ChunkHash); + }); + if (DeleteCacheKeys.empty()) + { + ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir); + return; + } + + auto __ = MakeGuard([&]() { + if (!DeletedChunks.empty()) + { + // Clean up m_AccessTimes and m_Payloads vectors + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + IndexMap Index; + + { + RwLock::ExclusiveLockScope _(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + size_t EntryCount = m_Index.size(); + Payloads.reserve(EntryCount); + AccessTimes.reserve(EntryCount); + Index.reserve(EntryCount); + for (auto It : m_Index) + { + size_t OldEntryIndex = It.second; + size_t NewEntryIndex = Payloads.size(); + Payloads.push_back(m_Payloads[OldEntryIndex]); + AccessTimes.push_back(m_AccessTimes[OldEntryIndex]); + Index.insert({It.first, NewEntryIndex}); + } + m_Index.swap(Index); + m_Payloads.swap(Payloads); + m_AccessTimes.swap(AccessTimes); + } + GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end())); + } + }); + + std::vector<DiskIndexEntry> ExpiredStandaloneEntries; + IndexMap Index; + BlockStore::ReclaimSnapshotState BlockStoreState; + { + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_Index.empty()) + { + ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir); + return; + } + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); + + SaveManifest(); + Index = m_Index; + + for (const IoHash& Key : DeleteCacheKeys) + { + if (auto It = Index.find(Key); It != Index.end()) + { + const BucketPayload& Payload = m_Payloads[It->second]; + DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; + if (Entry.Location.Flags & DiskLocation::kStandaloneFile) + { + Entry.Location.Flags |= DiskLocation::kTombStone; + ExpiredStandaloneEntries.push_back(Entry); + } + } + } + if (GcCtx.IsDeletionMode()) + { + for (const auto& Entry : ExpiredStandaloneEntries) + { + m_Index.erase(Entry.Key); + m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + DeletedChunks.insert(Entry.Key); + } + m_SlogFile.Append(ExpiredStandaloneEntries); + } + } + + if (GcCtx.IsDeletionMode()) + { + std::error_code Ec; + ExtendablePathBuilder<256> Path; + + for (const auto& Entry : ExpiredStandaloneEntries) + { + const IoHash& Key = Entry.Key; + const DiskLocation& Loc = Entry.Location; + + Path.Reset(); + BuildPath(Path, Key); + fs::path FilePath = Path.ToPath(); + + { + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_Index.contains(Key)) + { + // Someone added it back, let the file on disk be + ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); + continue; + } + __.ReleaseNow(); + + RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); + if (fs::is_regular_file(FilePath)) + { + ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); + fs::remove(FilePath, Ec); + } + } + + if (Ec) + { + ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); + Ec.clear(); + DiskLocation RestoreLocation = Loc; + RestoreLocation.Flags &= ~DiskLocation::kTombStone; + + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + if (m_Index.contains(Key)) + { + continue; + } + m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert({Key, EntryIndex}); + m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed); + DeletedChunks.erase(Key); + continue; + } + DeletedSize += Entry.Location.Size(); + } + } + + TotalChunkCount = Index.size(); + + std::vector<IoHash> TotalChunkHashes; + TotalChunkHashes.reserve(TotalChunkCount); + for (const auto& Entry : Index) + { + const DiskLocation& Location = m_Payloads[Entry.second].Location; + + if (Location.Flags & DiskLocation::kStandaloneFile) + { + continue; + } + TotalChunkHashes.push_back(Entry.first); + } + + if (TotalChunkHashes.empty()) + { + return; + } + TotalChunkCount = TotalChunkHashes.size(); + + std::vector<BlockStoreLocation> ChunkLocations; + BlockStore::ChunkIndexArray KeepChunkIndexes; + std::vector<IoHash> ChunkIndexToChunkHash; + ChunkLocations.reserve(TotalChunkCount); + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); + + GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + auto KeyIt = Index.find(ChunkHash); + const DiskLocation& DiskLocation = m_Payloads[KeyIt->second].Location; + BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + if (Keep) + { + KeepChunkIndexes.push_back(ChunkIndex); + } + }); + + size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); + + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + if (!PerformDelete) + { + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); + uint64_t CurrentTotalSize = TotalSize(); + ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}", + m_BucketDir, + DeleteCount, + TotalChunkCount, + NiceBytes(CurrentTotalSize)); + return; + } + + m_BlockStore.ReclaimSpace( + BlockStoreState, + ChunkLocations, + KeepChunkIndexes, + m_PayloadAlignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); + for (const auto& Entry : MovedChunks) + { + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; + const DiskLocation& OldDiskLocation = OldPayload.Location; + LogEntries.push_back( + {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())}); + } + for (const size_t ChunkIndex : RemovedChunks) + { + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; + const DiskLocation& OldDiskLocation = OldPayload.Location; + LogEntries.push_back({.Key = ChunkHash, + .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), + m_PayloadAlignment, + OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); + DeletedChunks.insert(ChunkHash); + } + + m_SlogFile.Append(LogEntries); + m_SlogFile.Flush(); + { + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + for (const DiskIndexEntry& Entry : LogEntries) + { + if (Entry.Location.GetFlags() & DiskLocation::kTombStone) + { + m_Index.erase(Entry.Key); + continue; + } + m_Payloads[m_Index[Entry.Key]].Location = Entry.Location; + } + } + }, + [&]() { return GcCtx.CollectSmallObjects(); }); +} + +void +ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) +{ + using namespace access_tracking; + + for (const KeyAccessTime& KeyTime : AccessTimes) + { + if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end()) + { + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_AccessTimes[EntryIndex] = KeyTime.LastAccess; + } + } +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::EntryCount() const +{ + RwLock::SharedLockScope _(m_IndexLock); + return static_cast<uint64_t>(m_Index.size()); +} + +CacheValueDetails::ValueDetails +ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index) const +{ + std::vector<IoHash> Attachments; + const BucketPayload& Payload = m_Payloads[Index]; + if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) + { + IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location, Key) + : GetInlineCacheValue(Payload.Location); + CbObject Obj(SharedBuffer{Value}); + Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); + } + return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), + .RawSize = Payload.RawSize, + .RawHash = Payload.RawHash, + .LastAccess = m_AccessTimes[Index], + .Attachments = std::move(Attachments), + .ContentType = Payload.Location.GetContentType()}; +} + +CacheValueDetails::BucketDetails +ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const +{ + CacheValueDetails::BucketDetails Details; + RwLock::SharedLockScope _(m_IndexLock); + if (ValueFilter.empty()) + { + Details.Values.reserve(m_Index.size()); + for (const auto& It : m_Index) + { + Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second)); + } + } + else + { + IoHash Key = IoHash::FromHexString(ValueFilter); + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second)); + } + } + return Details; +} + +void +ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) +{ + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + CacheBucket& Bucket = *Kv.second; + Bucket.CollectGarbage(GcCtx); + } +} + +void +ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes) +{ + RwLock::SharedLockScope _(m_Lock); + + for (const auto& Kv : AccessTimes.Buckets) + { + if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end()) + { + CacheBucket& Bucket = *It->second; + Bucket.UpdateAccessTimes(Kv.second); + } + } +} + +void +ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) +{ + ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue"); + + uint64_t NewFileSize = Value.Value.Size(); + + TemporaryFile DataFile; + + std::error_code Ec; + DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); + } + + bool CleanUpTempFile = false; + auto __ = MakeGuard([&] { + if (CleanUpTempFile) + { + std::error_code Ec; + std::filesystem::remove(DataFile.GetPath(), Ec); + if (Ec) + { + ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", + DataFile.GetPath(), + m_BucketDir, + Ec.message()); + } + } + }); + + DataFile.WriteAll(Value.Value, Ec); + if (Ec) + { + throw std::system_error(Ec, + fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", + NiceBytes(NewFileSize), + DataFile.GetPath().string(), + m_BucketDir)); + } + + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); + std::filesystem::path FsPath{DataFilePath.ToPath()}; + + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); + + // We do a speculative remove of the file instead of probing with a exists call and check the error code instead + std::filesystem::remove(FsPath, Ec); + if (Ec) + { + if (Ec.value() != ENOENT) + { + ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); + Sleep(100); + Ec.clear(); + std::filesystem::remove(FsPath, Ec); + if (Ec && Ec.value() != ENOENT) + { + throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); + } + } + } + + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + if (Ec) + { + CreateDirectories(FsPath.parent_path()); + Ec.clear(); + + // Try again + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + if (Ec) + { + ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retrying.", + FsPath, + DataFile.GetPath(), + m_BucketDir, + Ec.message()); + Sleep(100); + Ec.clear(); + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + if (Ec) + { + throw std::system_error( + Ec, + fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir)); + } + } + } + + // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file + // will be disabled as the file handle has already been closed + CleanUpTempFile = false; + + uint8_t EntryFlags = DiskLocation::kStandaloneFile; + + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } + + DiskLocation Loc(NewFileSize, EntryFlags); + + RwLock::ExclusiveLockScope _(m_IndexLock); + if (auto It = m_Index.find(HashKey); It == m_Index.end()) + { + // Previously unknown object + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert_or_assign(HashKey, EntryIndex); + } + else + { + // TODO: should check if write is idempotent and bail out if it is? + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}; + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); + } + + m_SlogFile.Append({.Key = HashKey, .Location = Loc}); + m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); +} + +void +ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) +{ + uint8_t EntryFlags = 0; + + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } + + m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { + DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); + m_SlogFile.Append({.Key = HashKey, .Location = Location}); + + RwLock::ExclusiveLockScope _(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + // TODO: should check if write is idempotent and bail out if it is? + // this would requiring comparing contents on disk unless we add a + // content hash to the index entry + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + } + else + { + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert_or_assign(HashKey, EntryIndex); + } + }); +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) +{ +} + +ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; + +bool +ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) +{ + const auto BucketName = std::string(InBucket); + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + auto It = m_Buckets.find(BucketName); + + if (It != m_Buckets.end()) + { + Bucket = It->second.get(); + } + } + + if (Bucket == nullptr) + { + // Bucket needs to be opened/created + + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + { + Bucket = It->second.get(); + } + else + { + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); + Bucket = InsertResult.first->second.get(); + + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName; + + if (!Bucket->OpenOrCreate(BucketPath)) + { + m_Buckets.erase(InsertResult.first); + return false; + } + } + } + + ZEN_ASSERT(Bucket != nullptr); + return Bucket->Get(HashKey, OutValue); +} + +void +ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) +{ + const auto BucketName = std::string(InBucket); + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + auto It = m_Buckets.find(BucketName); + + if (It != m_Buckets.end()) + { + Bucket = It->second.get(); + } + } + + if (Bucket == nullptr) + { + // New bucket needs to be created + + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + { + Bucket = It->second.get(); + } + else + { + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); + Bucket = InsertResult.first->second.get(); + + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName; + + try + { + if (!Bucket->OpenOrCreate(BucketPath)) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + m_Buckets.erase(InsertResult.first); + return; + } + } + catch (const std::exception& Err) + { + ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + return; + } + } + } + + ZEN_ASSERT(Bucket != nullptr); + + Bucket->Put(HashKey, Value); +} + +void +ZenCacheDiskLayer::DiscoverBuckets() +{ + DirectoryContent DirContent; + GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); + + // Initialize buckets + + RwLock::ExclusiveLockScope _(m_Lock); + + for (const std::filesystem::path& BucketPath : DirContent.Directories) + { + const std::string BucketName = PathToUtf8(BucketPath.stem()); + // New bucket needs to be created + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + { + continue; + } + + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); + CacheBucket& Bucket = *InsertResult.first->second; + + try + { + if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false)) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + + m_Buckets.erase(InsertResult.first); + continue; + } + } + catch (const std::exception& Err) + { + ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + return; + } + ZEN_INFO("Discovered bucket '{}'", BucketName); + } +} + +bool +ZenCacheDiskLayer::DropBucket(std::string_view InBucket) +{ + RwLock::ExclusiveLockScope _(m_Lock); + + auto It = m_Buckets.find(std::string(InBucket)); + + if (It != m_Buckets.end()) + { + CacheBucket& Bucket = *It->second; + m_DroppedBuckets.push_back(std::move(It->second)); + m_Buckets.erase(It); + + return Bucket.Drop(); + } + + // Make sure we remove the folder even if we don't know about the bucket + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= std::string(InBucket); + return MoveAndDeleteDirectory(BucketPath); +} + +bool +ZenCacheDiskLayer::Drop() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + std::vector<std::unique_ptr<CacheBucket>> Buckets; + Buckets.reserve(m_Buckets.size()); + while (!m_Buckets.empty()) + { + const auto& It = m_Buckets.begin(); + CacheBucket& Bucket = *It->second; + m_DroppedBuckets.push_back(std::move(It->second)); + m_Buckets.erase(It->first); + if (!Bucket.Drop()) + { + return false; + } + } + return MoveAndDeleteDirectory(m_RootDir); +} + +void +ZenCacheDiskLayer::Flush() +{ + std::vector<CacheBucket*> Buckets; + + { + RwLock::SharedLockScope _(m_Lock); + Buckets.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + CacheBucket* Bucket = Kv.second.get(); + Buckets.push_back(Bucket); + } + } + + for (auto& Bucket : Buckets) + { + Bucket->Flush(); + } +} + +void +ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) +{ + RwLock::SharedLockScope _(m_Lock); + + { + std::vector<std::future<void>> Results; + Results.reserve(m_Buckets.size()); + + for (auto& Kv : m_Buckets) + { +#if 1 + Results.push_back( + Ctx.ThreadPool().EnqueueTask(std::packaged_task<void()>{[Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); +#else + CacheBucket& Bucket = *Kv.second; + Bucket.ScrubStorage(Ctx); +#endif + } + + for (auto& Result : Results) + { + Result.get(); + } + } +} + +void +ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) +{ + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + CacheBucket& Bucket = *Kv.second; + Bucket.GatherReferences(GcCtx); + } +} + +uint64_t +ZenCacheDiskLayer::TotalSize() const +{ + uint64_t TotalSize{}; + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + TotalSize += Kv.second->TotalSize(); + } + + return TotalSize; +} + +ZenCacheDiskLayer::Info +ZenCacheDiskLayer::GetInfo() const +{ + ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir}, .TotalSize = TotalSize()}; + + RwLock::SharedLockScope _(m_Lock); + Info.BucketNames.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Info.BucketNames.push_back(Kv.first); + Info.EntryCount += Kv.second->EntryCount(); + } + return Info; +} + +std::optional<ZenCacheDiskLayer::BucketInfo> +ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const +{ + RwLock::SharedLockScope _(m_Lock); + + if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) + { + return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; + } + return {}; +} + +CacheValueDetails::NamespaceDetails +ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const +{ + RwLock::SharedLockScope _(m_Lock); + CacheValueDetails::NamespaceDetails Details; + if (BucketFilter.empty()) + { + Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); + for (auto& Kv : m_Buckets) + { + Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter); + } + } + else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) + { + Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter); + } + return Details; +} + +} // namespace zen
\ No newline at end of file |