diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-06 15:46:29 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-04-12 22:20:47 +0200 |
| commit | 31dd0f8906aa5a27b8c453c72f6d10964a3be9eb (patch) | |
| tree | 9f4a7eb3bd9d6614bbf38c89f158c52ab7975b97 /zenserver/cache/structuredcachestore.cpp | |
| parent | Merge pull request #72 from EpicGames/de/set-ulimit (diff) | |
| download | zen-31dd0f8906aa5a27b8c453c72f6d10964a3be9eb.tar.xz zen-31dd0f8906aa5a27b8c453c72f6d10964a3be9eb.zip | |
structured cache with block store
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 1977 |
1 files changed, 1758 insertions, 219 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 738e4c1fd..c5ccef523 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -14,13 +14,13 @@ #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/string.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> #include <zencore/thread.h> #include <zencore/timer.h> #include <zencore/trace.h> #include <zenstore/cidstore.h> +#include <xxhash.h> + #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> #endif @@ -30,10 +30,220 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <random> +#endif + ////////////////////////////////////////////////////////////////////////// +#pragma pack(push) +#pragma pack(1) + namespace zen { +namespace { + +#pragma pack(push) +#pragma pack(1) + + struct CacheBucketIndexHeader + { + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t PayloadAlignment = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + }; + + static_assert(sizeof(CacheBucketIndexHeader) == 32); + + struct LegacyDiskLocation + { + inline LegacyDiskLocation() = default; + + inline LegacyDiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) + : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) + , LowerSize(ValueSize & 0xFFFFffff) + , IndexDataSize(IndexSize) + { + } + + static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; + static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize) + static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; + static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file + static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary + static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value + static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format + + static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } + + inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } + inline uint64_t Size() const { return LowerSize; } + inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } + inline ZenContentType GetContentType() const + { + ZenContentType ContentType = ZenContentType::kBinary; + + if (IsFlagSet(LegacyDiskLocation::kStructured)) + { + ContentType = ZenContentType::kCbObject; + } + + if (IsFlagSet(LegacyDiskLocation::kCompressed)) + { + ContentType = ZenContentType::kCompressedBinary; + } + + return ContentType; + } + inline uint64_t Flags() const { return OffsetAndFlags & kFlagsMask; } + + private: + uint64_t OffsetAndFlags = 0; + uint32_t LowerSize = 0; + uint32_t IndexDataSize = 0; + }; + + struct LegacyDiskIndexEntry + { + IoHash Key; + LegacyDiskLocation Location; + }; + +#pragma pack(pop) + + static_assert(sizeof(LegacyDiskIndexEntry) == 36); + + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".slog"; + const char* DataExtension = ".sobs"; + + std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) + { + ExtendablePathBuilder<256> Path; + + char BlockHexString[9]; + ToHexNumber(BlockIndex, BlockHexString); + + Path.Append(BlocksBasePath); + Path.AppendSeparator(); + Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); + Path.AppendSeparator(); + Path.Append(BlockHexString); + Path.Append(DataExtension); + return Path.ToPath(); + } + + std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + IndexExtension); + } + + std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + ".tmp" + IndexExtension); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + LogExtension); + } + + std::filesystem::path GetLegacyLogPath(const std::filesystem::path& BucketDir) + { + return BucketDir / (std::string("zen") + LogExtension); + } + + std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir) + { + return BucketDir / (std::string("zen") + DataExtension); + } + + std::vector<DiskIndexEntry> MakeDiskIndexEntries(const std::unordered_map<IoHash, DiskLocation>& MovedChunks, + const std::vector<IoHash>& DeletedChunks) + { + std::vector<DiskIndexEntry> result; + result.reserve(MovedChunks.size()); + for (const auto& MovedEntry : MovedChunks) + { + result.push_back({.Key = MovedEntry.first, .Location = MovedEntry.second}); + } + for (const IoHash& ChunkHash : DeletedChunks) + { + DiskLocation Location; + Location.Flags |= DiskLocation::kTombStone; + result.push_back({.Key = ChunkHash, .Location = Location}); + } + return result; + } + + bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.Flags() & ~(LegacyDiskLocation::kStandaloneFile | LegacyDiskLocation::kStructured | + LegacyDiskLocation::kTombStone | LegacyDiskLocation::kCompressed)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.Flags(), Entry.Key.ToHexString()); + return false; + } + if (!Entry.Location.IsFlagSet(LegacyDiskLocation::kTombStone)) + { + return true; + } + uint64_t Size = Entry.Location.Size(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + + bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.GetFlags() & + ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + { + return true; + } + uint64_t Size = Entry.Location.Size(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + +} // namespace + namespace fs = std::filesystem; static CbObject @@ -60,9 +270,9 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) } ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) -: GcStorage(Gc) +: m_RootDir(RootDir) +, GcStorage(Gc) , GcContributor(Gc) -, m_RootDir(RootDir) , m_DiskLayer(RootDir) { ZEN_INFO("initializing structured cache at '{}'", RootDir); @@ -425,6 +635,8 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo { using namespace std::literals; + m_BlocksBasePath = BucketDir / "blocks"; + CreateDirectories(BucketDir); std::filesystem::path ManifestPath{BucketDir / "zen_manifest"}; @@ -470,48 +682,694 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo } void -ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew) +ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() { - m_BucketDir = BucketDir; + ZEN_INFO("write store snapshot for '{}'", m_BucketDir / m_BucketName); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([this, &EntryCount, &Timer] { + ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", + m_BucketDir / m_BucketName, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); - uint64_t MaxFileOffset = 0; - uint64_t InvalidEntryCount = 0; - m_SobsCursor = 0; - m_TotalSize = 0; + namespace fs = std::filesystem; - m_Index.clear(); + fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); - std::filesystem::path SobsPath{BucketDir / "zen.sobs"}; - std::filesystem::path SlogPath{BucketDir / "zen.slog"}; + // Move index away, we keep it if something goes wrong + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } + if (fs::is_regular_file(IndexPath)) + { + fs::rename(IndexPath, STmpIndexPath); + } + + try + { + m_SlogFile.Flush(); - m_SobsFile.Open(SobsPath, IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); - m_SlogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + // Write the current state of the location map to a new index state + uint64_t LogCount = 0; + std::vector<DiskIndexEntry> Entries; - m_SlogFile.Replay( - [&](const DiskIndexEntry& Entry) { - if (Entry.Key == IoHash::Zero) + { + RwLock::SharedLockScope __(m_InsertLock); + RwLock::SharedLockScope ___(m_IndexLock); + Entries.resize(m_Index.size()); + + uint64_t EntryIndex = 0; + for (auto& Entry : m_Index) { - ++InvalidEntryCount; + DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Location = Entry.second.Location; } - else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + + LogCount = m_SlogFile.GetLogCount(); + } + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); + CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; + + Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); + + ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); + ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + ObjectIndexFile.Flush(); + ObjectIndexFile.Close(); + EntryCount = Entries.size(); + } + catch (std::exception& Err) + { + ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); + + // Restore any previous snapshot + + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(IndexPath); + fs::rename(STmpIndexPath, IndexPath); + } + } + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadIndexFile() +{ + std::vector<DiskIndexEntry> Entries; + std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + if (std::filesystem::is_regular_file(IndexPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([this, &Entries, &Timer] { + ZEN_INFO("read store '{}' index containing #{} entries in {}", + m_BucketDir / m_BucketName, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CacheBucketIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && (Header.Version == CacheBucketIndexHeader::CurrentVersion) && + (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && + (Header.EntryCount <= ExpectedEntryCount)) { - m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + m_PayloadAlignment = Header.PayloadAlignment; + + std::string InvalidEntryReason; + for (const DiskIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + + return Header.LogPosition; } else { - m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); - m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); + ZEN_WARN("skipping invalid index file '{}'", IndexPath); } - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size()); - }, - 0); + } + } + return 0; +} +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount) +{ + std::vector<DiskIndexEntry> Entries; + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + if (std::filesystem::is_regular_file(LogPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([LogPath, &Entries, &Timer] { + ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) + { + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + uint64_t ReadCount = EntryCount - SkipEntryCount; + m_Index.reserve(ReadCount); + uint64_t InvalidEntryCount = 0; + CasLog.Replay( + [&](const DiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Location.Flags & DiskLocation::kTombStone) + { + m_Index.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + m_Index.insert_or_assign(Record.Key, IndexEntry(Record.Location, GcClock::TickCount())); + }, + SkipEntryCount); + if (InvalidEntryCount) + { + ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); + } + } + } + return 0; +}; + +uint64_t +ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) +{ + std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir); + + if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0) + { + return 0; + } + + ZEN_INFO("migrating store {}", m_BucketDir / m_BucketName); + + std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir); + + uint64_t MigratedChunkCount = 0; + uint32_t MigratedBlockCount = 0; + Stopwatch MigrationTimer; + uint64_t TotalSize = 0; + const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] { + ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", + m_BucketDir / m_BucketName, + MigratedChunkCount, + MigratedBlockCount, + NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()), + NiceBytes(TotalSize)); + }); + + uint32_t WriteBlockIndex = 0; + while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; + } + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); + if (Error) + { + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); + return 0; + } + + if (Space.Free < MaxBlockSize) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", + m_BucketDir / m_BucketName, + MaxBlockSize, + NiceBytes(Space.Free)); + return 0; + } + + BasicFile BlockFile; + BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + + std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; + uint64_t InvalidEntryCount = 0; + + TCasLogFile<LegacyDiskIndexEntry> LegacyCasLog; + LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); + { + Stopwatch Timer; + const auto __ = MakeGuard([LegacyLogPath, &LegacyDiskIndex, &Timer] { + ZEN_INFO("read store '{}' legacy log containing #{} entries in {}", + LegacyLogPath, + LegacyDiskIndex.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + if (LegacyCasLog.Initialize()) + { + LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); + LegacyCasLog.Replay( + [&](const LegacyDiskIndexEntry& Record) { + if (Record.Location.IsFlagSet(LegacyDiskLocation::kTombStone)) + { + LegacyDiskIndex.erase(Record.Key); + return; + } + std::string InvalidEntryReason; + if (!ValidateLegacyEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + if (m_Index.contains(Record.Key)) + { + return; + } + LegacyDiskIndex[Record.Key] = Record; + }, + 0); + + std::vector<IoHash> BadEntries; + uint64_t BlockFileSize = BlockFile.FileSize(); + for (const auto& Entry : LegacyDiskIndex) + { + const LegacyDiskIndexEntry& Record(Entry.second); + if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + continue; + } + if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize) + { + continue; + } + ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath); + BadEntries.push_back(Entry.first); + } + for (const IoHash& BadHash : BadEntries) + { + LegacyDiskIndex.erase(BadHash); + } + InvalidEntryCount += BadEntries.size(); + } + } if (InvalidEntryCount) { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, SlogPath); + ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); } - m_SobsCursor = (MaxFileOffset + 15) & ~15; + if (LegacyDiskIndex.empty()) + { + LegacyCasLog.Close(); + BlockFile.Close(); + if (CleanSource) + { + // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find + // a manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); + } + return 0; + } + + uint64_t BlockChunkCount = 0; + uint64_t BlockTotalSize = 0; + for (const auto& Entry : LegacyDiskIndex) + { + const LegacyDiskIndexEntry& Record(Entry.second); + if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + continue; + } + BlockChunkCount++; + BlockTotalSize += Record.Location.Size(); + } + + uint64_t RequiredDiskSpace = BlockTotalSize + ((m_PayloadAlignment - 1) * BlockChunkCount); + uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize; + if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", + m_BucketDir / m_BucketName, + MaxRequiredBlockCount, + BlockStoreDiskLocation::MaxBlockIndex); + return 0; + } + + constexpr const uint64_t DiskReserve = 1ul << 28; + + if (CleanSource) + { + if (Space.Free < (MaxBlockSize + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + m_BucketDir / m_BucketName, + NiceBytes(MaxBlockSize + DiskReserve), + NiceBytes(Space.Free)); + return 0; + } + } + else + { + if (Space.Free < (RequiredDiskSpace + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + m_BucketDir / m_BucketName, + NiceBytes(RequiredDiskSpace + DiskReserve), + NiceBytes(Space.Free)); + return 0; + } + } + + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + CreateDirectories(LogPath.parent_path()); + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + if (CleanSource && (MaxRequiredBlockCount < 2)) + { + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(LegacyDiskIndex.size()); + + // We can use the block as is, just move it and add the blocks to our new log + for (auto& Entry : LegacyDiskIndex) + { + const LegacyDiskIndexEntry& Record(Entry.second); + + DiskLocation NewLocation; + uint8_t Flags = 0xff & (Record.Location.Flags() >> 56); + if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + NewLocation = DiskLocation(Record.Location.Size(), Flags); + } + else + { + BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.Offset(), Record.Location.Size()); + NewLocation = DiskLocation(NewChunkLocation, m_PayloadAlignment, Flags); + } + LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation}); + } + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + CreateDirectories(BlockPath.parent_path()); + BlockFile.Close(); + std::filesystem::rename(LegacyDataPath, BlockPath); + CasLog.Append(LogEntries); + for (const DiskIndexEntry& Entry : LogEntries) + { + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + + MigratedChunkCount += LogEntries.size(); + MigratedBlockCount++; + } + else + { + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(LegacyDiskIndex.size()); + for (const auto& Entry : LegacyDiskIndex) + { + ChunkHashes.push_back(Entry.first); + } + + std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { + auto LhsKeyIt = LegacyDiskIndex.find(Lhs); + auto RhsKeyIt = LegacyDiskIndex.find(Rhs); + return LhsKeyIt->second.Location.Offset() < RhsKeyIt->second.Location.Offset(); + }); + + uint64_t BlockSize = 0; + uint64_t BlockOffset = 0; + std::vector<BlockStoreLocation> NewLocations; + struct BlockData + { + std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; + uint64_t BlockOffset; + uint64_t BlockSize; + uint32_t BlockIndex; + }; + + std::vector<BlockData> BlockRanges; + std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; + BlockRanges.reserve(MaxRequiredBlockCount); + for (const IoHash& ChunkHash : ChunkHashes) + { + const LegacyDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; + const LegacyDiskLocation& LegacyChunkLocation = LegacyEntry.Location; + + if (LegacyChunkLocation.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + // For standalone files we just store the chunk hash an use the size from the legacy index as is + Chunks.push_back({ChunkHash, {}}); + continue; + } + + uint64_t ChunkOffset = LegacyChunkLocation.Offset(); + uint64_t ChunkSize = LegacyChunkLocation.Size(); + uint64_t ChunkEnd = ChunkOffset + ChunkSize; + + if (BlockSize == 0) + { + BlockOffset = ChunkOffset; + } + if ((ChunkEnd - BlockOffset) > MaxBlockSize) + { + BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; + BlockRange.Chunks.swap(Chunks); + BlockRanges.push_back(BlockRange); + + WriteBlockIndex++; + while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; + } + BlockOffset = ChunkOffset; + BlockSize = 0; + } + BlockSize = RoundUp(BlockSize, m_PayloadAlignment); + BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; + Chunks.push_back({ChunkHash, ChunkLocation}); + BlockSize = ChunkEnd - BlockOffset; + } + if (BlockSize > 0) + { + BlockRanges.push_back( + {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); + } + Stopwatch WriteBlockTimer; + + std::reverse(BlockRanges.begin(), BlockRanges.end()); + std::vector<std::uint8_t> Buffer(1 << 28); + for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) + { + const BlockData& BlockRange = BlockRanges[Idx]; + if (Idx > 0) + { + uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; + uint64_t Completed = BlockOffset + BlockSize - Remaining; + uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; + + ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", + m_BucketDir / m_BucketDir, + Idx, + BlockRanges.size(), + NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), + NiceBytes(BlockOffset + BlockSize), + NiceTimeSpanMs(ETA)); + } + + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); + BlockStoreFile ChunkBlock(BlockPath); + ChunkBlock.Create(BlockRange.BlockSize); + uint64_t Offset = 0; + while (Offset < BlockRange.BlockSize) + { + uint64_t Size = BlockRange.BlockSize - Offset; + if (Size > Buffer.size()) + { + Size = Buffer.size(); + } + BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); + ChunkBlock.Write(Buffer.data(), Size, Offset); + Offset += Size; + } + ChunkBlock.Truncate(Offset); + ChunkBlock.Flush(); + + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(BlockRange.Chunks.size()); + for (const auto& Entry : BlockRange.Chunks) + { + const LegacyDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first]; + + DiskLocation NewLocation; + uint8_t Flags = 0xff & (LegacyEntry.Location.Flags() >> 56); + if (LegacyEntry.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + NewLocation = DiskLocation(LegacyEntry.Location.Size(), Flags); + } + else + { + NewLocation = DiskLocation(Entry.second, m_PayloadAlignment, Flags); + } + LogEntries.push_back({.Key = Entry.first, .Location = NewLocation}); + } + CasLog.Append(LogEntries); + for (const DiskIndexEntry& Entry : LogEntries) + { + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + MigratedChunkCount += LogEntries.size(); + MigratedBlockCount++; + + if (CleanSource) + { + std::vector<LegacyDiskIndexEntry> LegacyLogEntries; + LegacyLogEntries.reserve(BlockRange.Chunks.size()); + for (const auto& Entry : BlockRange.Chunks) + { + LegacyLogEntries.push_back( + {.Key = Entry.first, .Location = LegacyDiskLocation(0, 0, 0, LegacyDiskLocation::kTombStone)}); + } + LegacyCasLog.Append(LegacyLogEntries); + BlockFile.SetFileSize(BlockRange.BlockOffset); + } + } + } + BlockFile.Close(); + LegacyCasLog.Close(); + CasLog.Close(); + + if (CleanSource) + { + // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find + // a manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); + } + return MigratedChunkCount; +} + +void +ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew) +{ + m_BucketDir = BucketDir; + + m_TotalSize = 0; + + m_Index.clear(); + + std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir); + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + + if (IsNew) + { + std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir); + fs::remove(LegacyLogPath); + fs::remove(LegacyDataPath); + fs::remove(LogPath); + fs::remove(IndexPath); + fs::remove_all(m_BlocksBasePath); + } + + uint64_t LogPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(LogPosition); + uint64_t LegacyLogEntryCount = MigrateLegacyData(true); + + CreateDirectories(m_BucketDir); + + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); + + std::unordered_set<uint32_t> KnownBlocks; + for (const auto& Entry : m_Index) + { + const DiskLocation& Location = Entry.second.Location; + m_TotalSize.fetch_add(Location.Size(), std::memory_order_release); + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + continue; + } + KnownBlocks.insert(Location.GetBlockLocation(m_PayloadAlignment).BlockIndex); + } + + if (std::filesystem::is_directory(m_BlocksBasePath)) + { + std::vector<std::filesystem::path> FoldersToScan; + FoldersToScan.push_back(m_BlocksBasePath); + size_t FolderOffset = 0; + while (FolderOffset < FoldersToScan.size()) + { + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) + { + if (Entry.is_directory()) + { + FoldersToScan.push_back(Entry.path()); + continue; + } + if (Entry.is_regular_file()) + { + const std::filesystem::path Path = Entry.path(); + if (Path.extension() != DataExtension) + { + continue; + } + std::string FileName = Path.stem().string(); + uint32_t BlockIndex; + bool OK = ParseHexNumber(FileName, BlockIndex); + if (!OK) + { + continue; + } + if (!KnownBlocks.contains(BlockIndex)) + { + // Log removing unreferenced block + // Clear out unused blocks + ZEN_INFO("removing unused block for '{}' at '{}'", m_BucketDir / m_BucketName, Path); + std::error_code Ec; + std::filesystem::remove(Path, Ec); + if (Ec) + { + ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); + } + continue; + } + Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path); + BlockFile->Open(); + m_ChunkBlocks[BlockIndex] = BlockFile; + } + } + ++FolderOffset; + } + } + else + { + CreateDirectories(m_BlocksBasePath); + } + + if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0)) + { + MakeIndexSnapshot(); + } + // TODO: should validate integrity of container files here } void @@ -537,7 +1395,10 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, Zen return false; } - OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size()); + const BlockStoreLocation& Location = Loc.GetBlockLocation(m_PayloadAlignment); + Ref<BlockStoreFile> ChunkBlock = m_ChunkBlocks[Location.BlockIndex]; + + OutValue.Value = ChunkBlock->GetChunk(Location.Offset, Location.Size); OutValue.Value.SetContentType(Loc.GetContentType()); return true; @@ -562,23 +1423,6 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, return false; } -void -ZenCacheDiskLayer::CacheBucket::DeleteStandaloneCacheValue(const DiskLocation& Loc, - const IoHash& HashKey, - const fs::path& Path, - std::error_code& Ec) -{ - ZEN_DEBUG("deleting standalone cache file '{}'", Path); - fs::remove(Path, Ec); - - if (!Ec) - { - m_SlogFile.Append(DiskIndexEntry{.Key = HashKey, .Location = {0, Loc.Size(), 0, DiskLocation::kTombStone}}); - m_Index.erase(HashKey); - m_TotalSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); - } -} - bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -619,54 +1463,91 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& { return PutStandaloneCacheValue(HashKey, Value); } - else - { - // Small object put - uint64_t EntryFlags = 0; + // Small object put - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } + uint8_t EntryFlags = 0; - RwLock::ExclusiveLockScope _(m_IndexLock); + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } + + uint64_t ChunkSize = Value.Value.Size(); - DiskLocation Loc(m_SobsCursor, Value.Value.Size(), 0, EntryFlags); + uint32_t WriteBlockIndex; + Ref<BlockStoreFile> WriteBlock; + uint64_t InsertOffset; - m_SobsCursor = RoundUp(m_SobsCursor + Loc.Size(), 16); + { + RwLock::ExclusiveLockScope _(m_InsertLock); - if (auto It = m_Index.find(HashKey); It == m_Index.end()) + WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + bool IsWriting = m_WriteBlock != nullptr; + if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > MaxBlockSize) { - // Previously unknown object - m_Index.insert({HashKey, {Loc, GcClock::TickCount()}}); + if (m_WriteBlock) + { + m_WriteBlock = nullptr; + } + { + RwLock::ExclusiveLockScope __(m_IndexLock); + if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) + { + throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BucketDir / m_BucketName)); + } + WriteBlockIndex += IsWriting ? 1 : 0; + while (m_ChunkBlocks.contains(WriteBlockIndex)) + { + WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; + } + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + m_WriteBlock = new BlockStoreFile(BlockPath); + m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + } + m_CurrentInsertOffset = 0; + m_WriteBlock->Create(MaxBlockSize); } - else + InsertOffset = m_CurrentInsertOffset; + m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment); + WriteBlock = m_WriteBlock; + } + + DiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment, EntryFlags); + const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; + + WriteBlock->Write(Value.Value.Data(), ChunkSize, InsertOffset); + m_SlogFile.Append(DiskIndexEntry); + + m_TotalSize.fetch_add(ChunkSize, std::memory_order::relaxed); + { + RwLock::ExclusiveLockScope __(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) { // TODO: should check if write is idempotent and bail out if it is? // this would requiring comparing contents on disk unless we add a // content hash to the index entry IndexEntry& Entry = It.value(); - Entry.Location = Loc; + Entry.Location = Location; Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); } - - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset()); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + else + { + m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); + } } } void ZenCacheDiskLayer::CacheBucket::Drop() { + // TODO: close all open files and manage locking // TODO: add error handling - - m_SobsFile.Close(); m_SlogFile.Close(); DeleteDirectories(m_BucketDir); } @@ -674,10 +1555,20 @@ ZenCacheDiskLayer::CacheBucket::Drop() void ZenCacheDiskLayer::CacheBucket::Flush() { + { + RwLock::ExclusiveLockScope _(m_InsertLock); + if (m_CurrentInsertOffset > 0) + { + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; + m_WriteBlock = nullptr; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; + } + } RwLock::SharedLockScope _(m_IndexLock); - m_SobsFile.Flush(); - m_SlogFile.Flush(); + MakeIndexSnapshot(); SaveManifest(); } @@ -754,9 +1645,10 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) { // Log a tombstone and delete the in-memory index for the bad entry - const auto It = m_Index.find(BadKey); - const DiskLocation& Location = It->second.Location; - m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = {Location.Offset(), Location.Size(), 0, DiskLocation::kTombStone}}); + const auto It = m_Index.find(BadKey); + DiskLocation Location = It->second.Location; + Location.Flags |= DiskLocation::kTombStone; + m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = Location}); m_Index.erase(BadKey); } } @@ -768,8 +1660,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); Stopwatch Timer; - const auto Guard = MakeGuard( - [this, &Timer] { ZEN_INFO("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + const auto Guard = MakeGuard([this, &Timer] { + ZEN_INFO("gathered references from '{}' in {}", m_BucketDir / m_BucketName, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); const GcClock::TimePoint ExpireTime = GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration(); @@ -820,6 +1713,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) } } } + _.ReleaseNow(); ValidKeys.reserve(std::distance(ValidIt, Entries.end())); ExpiredKeys.reserve(std::distance(Entries.begin(), ValidIt)); @@ -836,202 +1730,480 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); - Flush(); - - RwLock::ExclusiveLockScope _(m_IndexLock); + std::vector<DiskIndexEntry> ExpiredStandaloneEntries; + + Stopwatch TotalTimer; + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = 0; + uint64_t DeletedSize = 0; + uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + + uint64_t DeletedCount = 0; + uint64_t MovedCount = 0; + + const auto _ = MakeGuard([this, + &TotalTimer, + &WriteBlockTimeUs, + &WriteBlockLongestTimeUs, + &ReadBlockTimeUs, + &ReadBlockLongestTimeUs, + &TotalChunkCount, + &DeletedCount, + &MovedCount, + &DeletedSize, + &OldTotalSize] { + ZEN_INFO( + "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " + "#{} " + "of #{} " + "entires ({}).", + m_BucketDir / m_BucketName, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedCount, + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize)); + RwLock::SharedLockScope _(m_IndexLock); + SaveManifest(); + }); - const uint64_t OldCount = m_Index.size(); - const uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + m_SlogFile.Flush(); - ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir); + IndexMap Index; + size_t BlockCount; + uint64_t ExcludeBlockIndex = 0x800000000ull; - Stopwatch Timer; - const auto Guard = MakeGuard([this, &Timer, &OldCount, &OldTotalSize] { - const uint64_t NewCount = m_Index.size(); - const uint64_t NewTotalSize = m_TotalSize.load(std::memory_order::relaxed); - ZEN_INFO("garbage collect from '{}' DONE after {}, collected {} ({}) chunks of total {} ({})", - m_BucketDir, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - OldCount - NewCount, - NiceBytes(OldTotalSize - NewTotalSize), - OldCount, - NiceBytes(OldTotalSize)); - SaveManifest(); + std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketName); + std::vector<IoHash> DeleteCacheKeys; + DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); + GcCtx.FilterCas(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { + if (Keep) + { + return; + } + DeleteCacheKeys.push_back(ChunkHash); }); - - if (m_Index.empty()) + if (DeleteCacheKeys.empty()) { + ZEN_INFO("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName); return; } - - auto AddEntries = [this](std::span<const IoHash> Keys, std::vector<IndexMap::value_type>& OutEntries) { - for (const IoHash& Key : Keys) + { + RwLock::SharedLockScope __(m_InsertLock); + RwLock::SharedLockScope ___(m_IndexLock); { - if (auto It = m_Index.find(Key); It != m_Index.end()) + Stopwatch Timer; + const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_Index.empty()) + { + ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); + return; + } + if (m_WriteBlock) { - OutEntries.push_back(*It); + ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); } + __.ReleaseNow(); } - }; - - std::vector<IndexMap::value_type> ValidEntries; - std::vector<IndexMap::value_type> ExpiredEntries; + SaveManifest(); + Index = m_Index; + BlockCount = m_ChunkBlocks.size(); - AddEntries(GcCtx.ValidCacheKeys(m_BucketName), ValidEntries); - AddEntries(GcCtx.ExpiredCacheKeys(m_BucketName), ExpiredEntries); + for (const IoHash& Key : DeleteCacheKeys) + { + if (auto It = Index.find(Key); It != Index.end()) + { + DiskIndexEntry Entry = {.Key = It->first, .Location = It->second.Location}; + if (Entry.Location.Flags & DiskLocation::kStandaloneFile) + { + Entry.Location.Flags |= DiskLocation::kTombStone; + ExpiredStandaloneEntries.push_back(Entry); + } + } + } + if (GcCtx.IsDeletionMode()) + { + for (const auto& Entry : ExpiredStandaloneEntries) + { + m_Index.erase(Entry.Key); + } + m_SlogFile.Append(ExpiredStandaloneEntries); + } + } - // Remove all standalone file(s) - // NOTE: This can probably be made asynchronously + if (GcCtx.IsDeletionMode()) { std::error_code Ec; ExtendablePathBuilder<256> Path; - for (const auto& Entry : ExpiredEntries) + for (const auto& Entry : ExpiredStandaloneEntries) { - const IoHash& Key = Entry.first; - const DiskLocation& Loc = Entry.second.Location; + const IoHash& Key = Entry.Key; + const DiskLocation& Loc = Entry.Location; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - Path.Reset(); - BuildPath(Path, Key); + Path.Reset(); + BuildPath(Path, Key); - // NOTE: this will update index and log file - DeleteStandaloneCacheValue(Loc, Key, Path.c_str(), Ec); + { + RwLock::SharedLockScope __(m_IndexLock); + if (m_Index.contains(Key)) + { + // Someone added it back, let the file on disk be + ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); + continue; + } + ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); + fs::remove(Path.c_str(), Ec); + } - if (Ec) + if (Ec) + { + ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); + Ec.clear(); + DiskLocation RestoreLocation = Loc; + RestoreLocation.Flags &= ~DiskLocation::kTombStone; + + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + if (m_Index.contains(Key)) { - ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", Path.ToUtf8(), Ec.message()); - Ec.clear(); + continue; } + m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); + m_Index.insert({Key, {Loc, GcClock::TickCount()}}); + m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); + continue; } + m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + DeletedSize += Entry.Location.Size(); + DeletedCount++; + } + } + + TotalChunkCount = Index.size(); + + std::vector<IoHash> TotalChunkHashes; + TotalChunkHashes.reserve(TotalChunkCount); + for (const auto& Entry : Index) + { + const DiskLocation& Location = Entry.second.Location; + + if (Location.Flags & DiskLocation::kStandaloneFile) + { + continue; } + TotalChunkHashes.push_back(Entry.first); } - if (GcCtx.CollectSmallObjects() && !ExpiredEntries.empty()) + if (TotalChunkHashes.empty()) { - // Naive GC implementation of small objects. Needs enough free - // disk space to store intermediate sob container along side the - // old container + return; + } + std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; + std::vector<std::vector<IoHash>> KeepChunks; + std::vector<std::vector<IoHash>> DeleteChunks; - const auto ResetSobStorage = [this, &ValidEntries]() { - m_SobsFile.Close(); - m_SlogFile.Close(); + BlockIndexToChunkMapIndex.reserve(BlockCount); + KeepChunks.reserve(BlockCount); + DeleteChunks.reserve(BlockCount); + size_t GuesstimateCountPerBlock = TotalChunkHashes.size() / BlockCount / 2; - const bool IsNew = true; - m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); - m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + uint64_t DeleteCount = 0; - m_SobsCursor = 0; - m_TotalSize = 0; - m_Index.clear(); + uint64_t NewTotalSize = 0; - for (const auto& Entry : ValidEntries) - { - const IoHash& Key = Entry.first; - const DiskLocation& Loc = Entry.second.Location; + std::unordered_set<IoHash, IoHash::Hasher> Expired; + Expired.insert(DeleteCacheKeys.begin(), DeleteCacheKeys.end()); - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - m_SlogFile.Append({.Key = Key, .Location = Loc}); - m_Index.insert({Key, {Loc, GcClock::TickCount()}}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); - } - } - }; + GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + auto KeyIt = Index.find(ChunkHash); + const DiskLocation& Location = KeyIt->second.Location; + BlockStoreLocation BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); - uint64_t NewContainerSize{}; - for (const auto& Entry : ValidEntries) + uint32_t BlockIndex = BlockLocation.BlockIndex; + + if (static_cast<uint64_t>(BlockIndex) == ExcludeBlockIndex) { - const DiskLocation& Loc = Entry.second.Location; + return; + } - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false) - { - NewContainerSize += (Loc.Size() + sizeof(DiskLocation)); - } + auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex); + size_t ChunkMapIndex = 0; + if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) + { + ChunkMapIndex = KeepChunks.size(); + BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex; + KeepChunks.resize(ChunkMapIndex + 1); + KeepChunks.back().reserve(GuesstimateCountPerBlock); + DeleteChunks.resize(ChunkMapIndex + 1); + DeleteChunks.back().reserve(GuesstimateCountPerBlock); } + else + { + ChunkMapIndex = BlockIndexPtr->second; + } + if (Keep) + { + std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex]; + ChunkMap.push_back(ChunkHash); + NewTotalSize += BlockLocation.Size; + } + else + { + std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; + ChunkMap.push_back(ChunkHash); + DeleteCount++; + } + }); - if (NewContainerSize == 0) + std::unordered_set<uint32_t> BlocksToReWrite; + BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); + for (const auto& Entry : BlockIndexToChunkMapIndex) + { + uint32_t BlockIndex = Entry.first; + size_t ChunkMapIndex = Entry.second; + const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; + if (ChunkMap.empty()) { - ResetSobStorage(); - return; + continue; } + BlocksToReWrite.insert(BlockIndex); + } - const uint64_t DiskSpaceMargin = (256 << 10); + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + if (!PerformDelete) + { + uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); + ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", + m_BucketDir / m_BucketName, + DeleteCount, + NiceBytes(TotalSize - NewTotalSize), + TotalChunkCount, + NiceBytes(TotalSize)); + return; + } - std::error_code Ec; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Ec); - if (Ec || Space.Free < NewContainerSize + DiskSpaceMargin) + auto AddToDeleted = [this, &Index, &DeletedCount, &DeletedSize](const std::vector<IoHash>& DeletedEntries) { + for (const IoHash& ChunkHash : DeletedEntries) { - ZEN_WARN("garbage collect z$ bucket '{}' FAILED, not enough disk space {}/{} (required/free)", - m_BucketDir, - NiceBytes(NewContainerSize), - NiceBytes(Space.Free)); - return; + const DiskLocation& Location = Index[ChunkHash].Location; + ZEN_ASSERT(!Location.IsFlagSet(DiskLocation::kStandaloneFile)); + DeletedSize += Index[ChunkHash].Location.GetBlockLocation(m_PayloadAlignment).Size; } + DeletedCount += DeletedEntries.size(); + }; - std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"}; - std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"}; + // Move all chunks in blocks that have chunks removed to new blocks - // Copy non expired sob(s) to temporary sob container + Ref<BlockStoreFile> NewBlockFile; + uint64_t WriteOffset = 0; + uint32_t NewBlockIndex = 0; + auto UpdateLocations = [this](const std::span<DiskIndexEntry>& Entries) { + for (const DiskIndexEntry& Entry : Entries) { - BasicFile TmpSobs; - TCasLogFile<DiskIndexEntry> TmpLog; - uint64_t TmpCursor{}; - std::vector<uint8_t> Chunk; + if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + { + auto KeyIt = m_Index.find(Entry.Key); + uint64_t ChunkSize = KeyIt->second.Location.GetBlockLocation(m_PayloadAlignment).Size; + m_TotalSize.fetch_sub(ChunkSize); + m_Index.erase(KeyIt); + continue; + } + m_Index[Entry.Key].Location = Entry.Location; + } + }; - TmpSobs.Open(TmpSobsPath, BasicFile::Mode::kTruncate); - TmpLog.Open(TmpSlogPath, CasLogFile::Mode::kTruncate); + std::unordered_map<IoHash, DiskLocation> MovedBlockChunks; + for (uint32_t BlockIndex : BlocksToReWrite) + { + const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; - for (const auto& Entry : ValidEntries) + Ref<BlockStoreFile> OldBlockFile; + { + RwLock::SharedLockScope _i(m_IndexLock); + OldBlockFile = m_ChunkBlocks[BlockIndex]; + } + + const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex]; + if (KeepMap.empty()) + { + const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; + std::vector<DiskIndexEntry> LogEntries = MakeDiskIndexEntries({}, DeleteMap); + m_SlogFile.Append(LogEntries); + m_SlogFile.Flush(); { - const IoHash& Key = Entry.first; - const DiskLocation& Loc = Entry.second.Location; + RwLock::ExclusiveLockScope _i(m_IndexLock); + Stopwatch Timer; + const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + UpdateLocations(LogEntries); + m_ChunkBlocks[BlockIndex] = nullptr; + } + AddToDeleted(DeleteMap); + ZEN_DEBUG("marking cas store file for delete '{}', block #{}, '{}'", + m_BucketDir / m_BucketName, + BlockIndex, + OldBlockFile->GetPath()); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion, reason: '{}'", OldBlockFile->GetPath(), Ec.message()); + } + continue; + } - DiskLocation NewLoc; + std::vector<uint8_t> Chunk; + for (const IoHash& ChunkHash : KeepMap) + { + auto KeyIt = Index.find(ChunkHash); + const BlockStoreLocation ChunkLocation = KeyIt->second.Location.GetBlockLocation(m_PayloadAlignment); + Chunk.resize(ChunkLocation.Size); + OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + if (!NewBlockFile || (WriteOffset + Chunk.size() > MaxBlockSize)) + { + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order::memory_order_relaxed); + std::vector<DiskIndexEntry> LogEntries = MakeDiskIndexEntries(MovedBlockChunks, {}); + m_SlogFile.Append(LogEntries); + m_SlogFile.Flush(); + + if (NewBlockFile) { - NewLoc = DiskLocation(0, Loc.Size(), 0, Loc.GetFlags()); + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); } - else { - Chunk.resize(Loc.Size()); - m_SobsFile.Read(Chunk.data(), Chunk.size(), Loc.Offset()); - - NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, Loc.GetFlags()); - TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor); - TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16); + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + UpdateLocations(LogEntries); + if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) + { + ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", + m_BucketDir / m_BucketName, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + return; + } + while (m_ChunkBlocks.contains(NextBlockIndex)) + { + NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; + } + std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; } - TmpLog.Append(DiskIndexEntry{.Key = Key, .Location = NewLoc}); + MovedCount += MovedBlockChunks.size(); + MovedBlockChunks.clear(); + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); + if (Error) + { + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); + return; + } + if (Space.Free < MaxBlockSize) + { + uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve(); + if (Space.Free + ReclaimedSpace < MaxBlockSize) + { + ZEN_WARN("garbage collect from '{}' FAILED, required disk space {}, free {}", + m_BucketDir / m_BucketName, + MaxBlockSize, + NiceBytes(Space.Free + ReclaimedSpace)); + RwLock::ExclusiveLockScope _l(m_IndexLock); + Stopwatch Timer; + const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks.erase(NextBlockIndex); + return; + } + + ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + m_BucketDir / m_BucketName, + ReclaimedSpace, + NiceBytes(Space.Free + ReclaimedSpace)); + } + NewBlockFile->Create(MaxBlockSize); + NewBlockIndex = NextBlockIndex; + WriteOffset = 0; } - } - // Swap state - try + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + MovedBlockChunks.emplace(ChunkHash, + DiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, + m_PayloadAlignment, + KeyIt->second.Location.Flags)); + WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); + } + Chunk.clear(); + if (NewBlockFile) { - fs::path SobsPath{m_BucketDir / "zen.sobs"}; - fs::path SlogPath{m_BucketDir / "zen.slog"}; - - m_SobsFile.Close(); - m_SlogFile.Close(); - - fs::remove(SobsPath); - fs::remove(SlogPath); - - fs::rename(TmpSobsPath, SobsPath); - fs::rename(TmpSlogPath, SlogPath); + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); + NewBlockFile = {}; + } - const bool IsNew = false; - OpenLog(m_BucketDir, IsNew); + const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; + std::vector<DiskIndexEntry> LogEntries = MakeDiskIndexEntries(MovedBlockChunks, DeleteMap); + m_SlogFile.Append(LogEntries); + m_SlogFile.Flush(); + { + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + UpdateLocations(LogEntries); + m_ChunkBlocks[BlockIndex] = nullptr; } - catch (std::exception& Err) + MovedCount += MovedBlockChunks.size(); + AddToDeleted(DeleteMap); + MovedBlockChunks.clear(); + + ZEN_DEBUG("marking cas store file for delete '{}', block #{}, '{}'", + m_BucketDir / m_BucketName, + BlockIndex, + OldBlockFile->GetPath()); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) { - ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); - ResetSobStorage(); + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); } + OldBlockFile = nullptr; } } @@ -1144,16 +2316,20 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c // Update index - uint64_t EntryFlags = DiskLocation::kStandaloneFile; + uint8_t EntryFlags = DiskLocation::kStandaloneFile; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } RwLock::ExclusiveLockScope _(m_IndexLock); - DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags); + DiskLocation Loc(Value.Value.Size(), EntryFlags); IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); if (auto It = m_Index.find(HashKey); It == m_Index.end()) @@ -1255,10 +2431,10 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z auto It = m_Buckets.try_emplace(BucketName, BucketName); Bucket = &It.first->second; - std::filesystem::path bucketPath = m_RootDir; - bucketPath /= BucketName; + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName; - Bucket->OpenOrCreate(bucketPath); + Bucket->OpenOrCreate(BucketPath); } } @@ -1363,11 +2539,11 @@ void ZenCacheDiskLayer::Flush() { std::vector<CacheBucket*> Buckets; - Buckets.reserve(m_Buckets.size()); { RwLock::SharedLockScope _(m_Lock); + Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Buckets.push_back(&Kv.second); @@ -1419,6 +2595,9 @@ ZenCacheDiskLayer::TotalSize() const ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS +} + +namespace zen { using namespace std::literals; @@ -1427,10 +2606,18 @@ namespace testutils { IoBuffer CreateBinaryCacheValue(uint64_t Size) { - std::vector<uint32_t> Data(size_t(Size / sizeof(uint32_t))); - std::generate(Data.begin(), Data.end(), [Idx = 0]() mutable { return Idx++; }); + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); - IoBuffer Buf(IoBuffer::Clone, Data.data(), Data.size() * sizeof(uint32_t)); + IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size()); Buf.SetContentType(ZenContentType::kBinary); return Buf; }; @@ -1737,6 +2924,7 @@ TEST_CASE("z$.gc") GcCtx.MaxCacheDuration(std::chrono::minutes(2)); GcCtx.CollectSmallObjects(true); + Zcs.Flush(); Gc.CollectGarbage(GcCtx); for (const auto& Key : Keys) @@ -1751,6 +2939,357 @@ TEST_CASE("z$.gc") } } +TEST_CASE("z$.legacyconversion") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[] = {2041, + 1123, + 1223, + 1239, + 341, + 1412, + 912, + 774, + 341, + 431, + 554, + 1098, + 2048, + 339 + 64 * 1024, + 561 + 64 * 1024, + 16 + 64 * 1024, + 16 + 64 * 1024, + 2048, + 2048}; + size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t); + size_t SingleBlockSize = 0; + std::vector<IoBuffer> Chunks; + Chunks.reserve(ChunkCount); + for (uint64_t Size : ChunkSizes) + { + Chunks.push_back(testutils::CreateBinaryCacheValue(Size)); + SingleBlockSize += Size; + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(ChunkCount); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CreateDirectories(TempDir.Path()); + + const std::string Bucket = "rightintwo"; + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path()); + const GcClock::TimePoint CurrentTime = GcClock::Now(); + + for (size_t i = 0; i < ChunkCount; i++) + { + Zcs.Put(Bucket, ChunkHashes[i], {.Value = Chunks[i]}); + } + + std::vector<IoHash> KeepChunks; + for (size_t i = 0; i < ChunkCount; i += 2) + { + KeepChunks.push_back(ChunkHashes[i]); + } + GcContext GcCtx(CurrentTime + std::chrono::hours(2)); + GcCtx.MaxCacheDuration(std::chrono::minutes(2)); + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepChunks); + Zcs.Flush(); + Gc.CollectGarbage(GcCtx); + } + std::filesystem::path BucketDir = TempDir.Path() / Bucket; + std::filesystem::path BlocksBaseDir = BucketDir / "blocks"; + + std::filesystem::path CasPath = GetBlockPath(BlocksBaseDir, 1); + std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir); + std::filesystem::remove(LegacyDataPath); + std::filesystem::rename(CasPath, LegacyDataPath); + + std::vector<DiskIndexEntry> LogEntries; + std::filesystem::path IndexPath = GetIndexPath(BucketDir, Bucket); + if (std::filesystem::is_regular_file(IndexPath)) + { + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CacheBucketIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if (Header.Magic == CacheBucketIndexHeader::ExpectedMagic && Header.Version == CacheBucketIndexHeader::CurrentVersion && + Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) + { + LogEntries.resize(Header.EntryCount); + ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + } + } + ObjectIndexFile.Close(); + std::filesystem::remove(IndexPath); + } + + std::filesystem::path LogPath = GetLogPath(BucketDir, Bucket); + { + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + LogEntries.reserve(CasLog.GetLogCount()); + CasLog.Replay([&](const DiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0); + } + TCasLogFile<LegacyDiskIndexEntry> LegacyLog; + std::filesystem::path LegacylogPath = GetLegacyLogPath(BucketDir); + LegacyLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate); + + for (const DiskIndexEntry& Entry : LogEntries) + { + uint64_t Size; + uint64_t Offset; + if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + Size = Entry.Location.Location.StandaloneSize; + Offset = 0; + } + else + { + BlockStoreLocation Location = Entry.Location.GetBlockLocation(16); + Size = Location.Size; + Offset = Location.Offset; + } + LegacyDiskLocation LegacyLocation(Offset, Size, 0, static_cast<uint64_t>(Entry.Location.Flags) << 56); + LegacyDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation}; + LegacyLog.Append(LegacyEntry); + } + LegacyLog.Close(); + + std::filesystem::remove_all(BlocksBaseDir); + std::filesystem::remove(LogPath); + std::filesystem::remove(IndexPath); + + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path()); + + for (size_t i = 0; i < ChunkCount; i += 2) + { + ZenCacheValue Value; + CHECK(Zcs.Get(Bucket, ChunkHashes[i], Value)); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Value.Value)); + CHECK(!Zcs.Get(Bucket, ChunkHashes[i + 1], Value)); + } + } +} + +# if 0 +TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) +{ + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + + CreateDirectories(CasConfig.RootDirectory); + + const uint64_t kChunkSize = 1048; + const int32_t kChunkCount = 8192; + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(kChunkCount); + std::vector<IoBuffer> Chunks; + Chunks.reserve(kChunkCount); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + ChunkHashes.emplace_back(Hash); + Chunks.emplace_back(Chunk); + } + + WorkerThreadPool ThreadPool(4); + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 32768, 16, true); + { + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + const IoBuffer& Chunk = Chunks[Idx]; + const IoHash& Hash = ChunkHashes[Idx]; + ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + }); + } + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + } + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + + { + std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() { + IoHash ChunkHash = OldChunkHashes[Idx]; + IoBuffer Chunk = Cas.FindChunk(ChunkHash); + IoHash Hash = IoHash::HashBuffer(Chunk); + CHECK(ChunkHash == Hash); + }); + } + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + } + + std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + { + std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + std::vector<IoHash> NewChunkHashes; + NewChunkHashes.reserve(kChunkCount); + std::vector<IoBuffer> NewChunks; + NewChunks.reserve(kChunkCount); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunkHashes.emplace_back(Hash); + NewChunks.emplace_back(Chunk); + } + + RwLock ChunkHashesLock; + std::atomic_uint32_t AddedChunkCount; + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + const IoBuffer& Chunk = NewChunks[Idx]; + const IoHash& Hash = NewChunkHashes[Idx]; + ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + AddedChunkCount.fetch_add(1); + }); + ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() { + IoHash ChunkHash = OldChunkHashes[Idx]; + IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]); + if (Chunk) + { + CHECK(ChunkHash == IoHash::HashBuffer(Chunk)); + } + }); + } + + while (AddedChunkCount.load() < kChunkCount) + { + std::vector<IoHash> AddedHashes; + { + RwLock::ExclusiveLockScope _(ChunkHashesLock); + AddedHashes.swap(NewChunkHashes); + } + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const IoHash& ChunkHash : AddedHashes) + { + if (Cas.HaveChunk(ChunkHash)) + { + GcChunkHashes.emplace(ChunkHash); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Cas.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + + { + std::vector<IoHash> AddedHashes; + { + RwLock::ExclusiveLockScope _(ChunkHashesLock); + AddedHashes.swap(NewChunkHashes); + } + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const IoHash& ChunkHash : AddedHashes) + { + if (Cas.HaveChunk(ChunkHash)) + { + GcChunkHashes.emplace(ChunkHash); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 77 == 0 && C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Cas.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + } + { + for (const IoHash& ChunkHash : GcChunkHashes) + { + ThreadPool.ScheduleWork([&Cas, ChunkHash]() { + CHECK(Cas.HaveChunk(ChunkHash)); + CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + }); + } + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + } + } +} +# endif + #endif void |