diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-03 22:23:26 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-05-03 22:23:26 +0200 |
| commit | c5b2435192f382fbaa39a8ff67de16ee3b69b7a6 (patch) | |
| tree | 294bb596d61582744dd7901f6a464c324bdec3d2 | |
| parent | Merge pull request #84 from EpicGames/de/cleanup-lock-sharding-in-iobuffer (diff) | |
| parent | macos compilation fix (diff) | |
| download | zen-c5b2435192f382fbaa39a8ff67de16ee3b69b7a6.tar.xz zen-c5b2435192f382fbaa39a8ff67de16ee3b69b7a6.zip | |
Merge pull request #86 from EpicGames/de/block-store-refactor
structured cache with block store
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 1697 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 95 | ||||
| -rw-r--r-- | zenserver/projectstore.cpp | 2 | ||||
| -rw-r--r-- | zenstore/blockstore.cpp | 1207 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 996 | ||||
| -rw-r--r-- | zenstore/compactcas.h | 7 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 4 | ||||
| -rw-r--r-- | zenstore/gc.cpp | 23 | ||||
| -rw-r--r-- | zenstore/include/zenstore/blockstore.h | 75 | ||||
| -rw-r--r-- | zenstore/include/zenstore/gc.h | 3 |
10 files changed, 2950 insertions, 1159 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index a9f38e51d..3ba75cd9c 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -14,13 +14,13 @@ #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/string.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> #include <zencore/thread.h> #include <zencore/timer.h> #include <zencore/trace.h> #include <zenstore/cidstore.h> +#include <xxhash.h> + #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> #endif @@ -30,10 +30,183 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <random> +#endif + ////////////////////////////////////////////////////////////////////////// namespace zen { +namespace { + +#pragma pack(push) +#pragma pack(1) + + struct CacheBucketIndexHeader + { + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t PayloadAlignment = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + }; + + static_assert(sizeof(CacheBucketIndexHeader) == 32); + + struct LegacyDiskLocation + { + inline LegacyDiskLocation() = default; + + inline LegacyDiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) + : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) + , LowerSize(ValueSize & 0xFFFFffff) + , IndexDataSize(IndexSize) + { + } + + static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; + static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize) + static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; + static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file + static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary + static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value + static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format + + static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } + + inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } + inline uint64_t Size() const { return LowerSize; } + inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } + inline ZenContentType GetContentType() const + { + ZenContentType ContentType = ZenContentType::kBinary; + + if (IsFlagSet(LegacyDiskLocation::kStructured)) + { + ContentType = ZenContentType::kCbObject; + } + + if (IsFlagSet(LegacyDiskLocation::kCompressed)) + { + ContentType = ZenContentType::kCompressedBinary; + } + + return ContentType; + } + inline uint64_t Flags() const { return OffsetAndFlags & kFlagsMask; } + + private: + uint64_t OffsetAndFlags = 0; + uint32_t LowerSize = 0; + uint32_t IndexDataSize = 0; + }; + + struct LegacyDiskIndexEntry + { + IoHash Key; + LegacyDiskLocation Location; + }; + +#pragma pack(pop) + + static_assert(sizeof(LegacyDiskIndexEntry) == 36); + + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".slog"; + const char* LegacyDataExtension = ".sobs"; + + std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + IndexExtension); + } + + std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + ".tmp" + IndexExtension); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + LogExtension); + } + + std::filesystem::path GetLegacyLogPath(const std::filesystem::path& BucketDir) + { + return BucketDir / (std::string("zen") + LogExtension); + } + + std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir) + { + return BucketDir / (std::string("zen") + LegacyDataExtension); + } + + bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.Flags() & ~(LegacyDiskLocation::kStandaloneFile | LegacyDiskLocation::kStructured | + LegacyDiskLocation::kTombStone | LegacyDiskLocation::kCompressed)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.Flags(), Entry.Key.ToHexString()); + return false; + } + if (!Entry.Location.IsFlagSet(LegacyDiskLocation::kTombStone)) + { + return true; + } + uint64_t Size = Entry.Location.Size(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + + bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.GetFlags() & + ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + { + return true; + } + uint64_t Size = Entry.Location.Size(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + +} // namespace + namespace fs = std::filesystem; static CbObject @@ -193,8 +366,8 @@ void ZenCacheStore::GatherReferences(GcContext& GcCtx) { Stopwatch Timer; - const auto Guard = MakeGuard( - [this, &Timer] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + const auto Guard = + MakeGuard([&] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); access_tracking::AccessTimes AccessTimes; m_MemLayer.GatherAccessTimes(AccessTimes); @@ -425,6 +598,8 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo { using namespace std::literals; + m_BlocksBasePath = BucketDir / "blocks"; + CreateDirectories(BucketDir); std::filesystem::path ManifestPath{BucketDir / "zen_manifest"}; @@ -470,48 +645,465 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo } void -ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew) +ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() { - m_BucketDir = BucketDir; + ZEN_INFO("write store snapshot for '{}'", m_BucketDir / m_BucketName); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", + m_BucketDir / m_BucketName, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); - uint64_t MaxFileOffset = 0; - uint64_t InvalidEntryCount = 0; - m_SobsCursor = 0; - m_TotalSize = 0; + namespace fs = std::filesystem; - m_Index.clear(); + fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); - std::filesystem::path SobsPath{BucketDir / "zen.sobs"}; - std::filesystem::path SlogPath{BucketDir / "zen.slog"}; + // Move index away, we keep it if something goes wrong + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } + if (fs::is_regular_file(IndexPath)) + { + fs::rename(IndexPath, STmpIndexPath); + } - m_SobsFile.Open(SobsPath, IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); - m_SlogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + try + { + m_SlogFile.Flush(); - m_SlogFile.Replay( - [&](const DiskIndexEntry& Entry) { - if (Entry.Key == IoHash::Zero) + // Write the current state of the location map to a new index state + uint64_t LogCount = 0; + std::vector<DiskIndexEntry> Entries; + + { + Entries.resize(m_Index.size()); + + uint64_t EntryIndex = 0; + for (auto& Entry : m_Index) { - ++InvalidEntryCount; + DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Location = Entry.second.Location; } - else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + + LogCount = m_SlogFile.GetLogCount(); + } + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); + CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; + + Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); + + ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); + ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + ObjectIndexFile.Flush(); + ObjectIndexFile.Close(); + EntryCount = Entries.size(); + } + catch (std::exception& Err) + { + ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); + + // Restore any previous snapshot + + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(IndexPath); + fs::rename(STmpIndexPath, IndexPath); + } + } + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadIndexFile() +{ + std::vector<DiskIndexEntry> Entries; + std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + if (std::filesystem::is_regular_file(IndexPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing #{} entries in {}", + m_BucketDir / m_BucketName, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CacheBucketIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && (Header.Version == CacheBucketIndexHeader::CurrentVersion) && + (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && + (Header.EntryCount <= ExpectedEntryCount)) { - m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + m_PayloadAlignment = Header.PayloadAlignment; + + std::string InvalidEntryReason; + for (const DiskIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + + return Header.LogPosition; } else { - m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); - m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); + ZEN_WARN("skipping invalid index file '{}'", IndexPath); } - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size()); - }, - 0); + } + } + return 0; +} +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount) +{ + std::vector<DiskIndexEntry> Entries; + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + if (std::filesystem::is_regular_file(LogPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) + { + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + uint64_t ReadCount = EntryCount - SkipEntryCount; + m_Index.reserve(ReadCount); + uint64_t InvalidEntryCount = 0; + CasLog.Replay( + [&](const DiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Location.Flags & DiskLocation::kTombStone) + { + m_Index.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + m_Index.insert_or_assign(Record.Key, IndexEntry(Record.Location, GcClock::TickCount())); + }, + SkipEntryCount); + if (InvalidEntryCount) + { + ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); + } + } + } + return 0; +}; + +uint64_t +ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) +{ + std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir); + + if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0) + { + return 0; + } + + ZEN_INFO("migrating store {}", m_BucketDir / m_BucketName); + + std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir); + + uint64_t MigratedChunkCount = 0; + uint32_t MigratedBlockCount = 0; + Stopwatch MigrationTimer; + uint64_t TotalSize = 0; + const auto _ = MakeGuard([&] { + ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", + m_BucketDir / m_BucketName, + MigratedChunkCount, + MigratedBlockCount, + NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()), + NiceBytes(TotalSize)); + }); + + uint64_t BlockFileSize = 0; + { + BasicFile BlockFile; + BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + BlockFileSize = BlockFile.FileSize(); + } + + std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; + uint64_t InvalidEntryCount = 0; + + size_t BlockChunkCount = 0; + TCasLogFile<LegacyDiskIndexEntry> LegacyCasLog; + LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); + { + Stopwatch Timer; + const auto __ = MakeGuard([&] { + ZEN_INFO("read store '{}' legacy log containing #{} entries in {}", + LegacyLogPath, + LegacyDiskIndex.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + if (LegacyCasLog.Initialize()) + { + LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); + LegacyCasLog.Replay( + [&](const LegacyDiskIndexEntry& Record) { + if (Record.Location.IsFlagSet(LegacyDiskLocation::kTombStone)) + { + LegacyDiskIndex.erase(Record.Key); + return; + } + std::string InvalidEntryReason; + if (!ValidateLegacyEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + if (m_Index.contains(Record.Key)) + { + return; + } + LegacyDiskIndex[Record.Key] = Record; + }, + 0); + + std::vector<IoHash> BadEntries; + for (const auto& Entry : LegacyDiskIndex) + { + const LegacyDiskIndexEntry& Record(Entry.second); + if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + continue; + } + if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize) + { + BlockChunkCount++; + continue; + } + ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath); + BadEntries.push_back(Entry.first); + } + for (const IoHash& BadHash : BadEntries) + { + LegacyDiskIndex.erase(BadHash); + } + InvalidEntryCount += BadEntries.size(); + } + } if (InvalidEntryCount) { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, SlogPath); + ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); + } + + if (LegacyDiskIndex.empty()) + { + LegacyCasLog.Close(); + if (CleanSource) + { + // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find + // a manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); + } + return 0; + } + + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + CreateDirectories(LogPath.parent_path()); + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash; + std::vector<BlockStoreLocation> ChunkLocations; + ChunkIndexToChunkHash.reserve(BlockChunkCount); + ChunkLocations.reserve(BlockChunkCount); + + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(LegacyDiskIndex.size() - BlockChunkCount); + + for (const auto& Entry : LegacyDiskIndex) + { + const IoHash& ChunkHash = Entry.first; + const LegacyDiskLocation& Location = Entry.second.Location; + if (Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) + { + uint8_t Flags = 0xff & (Location.Flags() >> 56); + DiskLocation NewLocation = DiskLocation(Location.Size(), Flags); + LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation}); + continue; + } + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.Offset(), .Size = Location.Size()}); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + TotalSize += Location.Size(); + } + for (const DiskIndexEntry& Entry : LogEntries) + { + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + CasLog.Append(LogEntries); + + m_BlockStore.Split( + ChunkLocations, + LegacyDataPath, + m_BlocksBasePath, + MaxBlockSize, + BlockStoreDiskLocation::MaxBlockIndex + 1, + m_PayloadAlignment, + CleanSource, + [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount]( + const BlockStore::MovedChunksArray& MovedChunks) { + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) + { + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + const LegacyDiskLocation& OldLocation = OldEntry.Location; + uint8_t Flags = 0xff & (OldLocation.Flags() >> 56); + LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, Flags)}); + } + for (const DiskIndexEntry& Entry : LogEntries) + { + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + CasLog.Append(LogEntries); + CasLog.Flush(); + if (CleanSource) + { + std::vector<LegacyDiskIndexEntry> LegacyLogEntries; + LegacyLogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) + { + size_t ChunkIndex = Entry.first; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + const LegacyDiskLocation& OldLocation = OldEntry.Location; + LegacyDiskLocation NewLocation(OldLocation.Offset(), + OldLocation.Size(), + 0, + OldLocation.Flags() | LegacyDiskLocation::kTombStone); + LegacyLogEntries.push_back(LegacyDiskIndexEntry{.Key = ChunkHash, .Location = NewLocation}); + } + LegacyCasLog.Append(LegacyLogEntries); + LegacyCasLog.Flush(); + } + MigratedBlockCount++; + MigratedChunkCount += MovedChunks.size(); + }); + + LegacyCasLog.Close(); + CasLog.Close(); + + if (CleanSource) + { + // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find + // a manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); + } + return MigratedChunkCount; +} + +void +ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew) +{ + m_BucketDir = BucketDir; + + m_TotalSize = 0; + + m_Index.clear(); + + std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir); + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + + if (IsNew) + { + std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir); + fs::remove(LegacyLogPath); + fs::remove(LegacyDataPath); + fs::remove(LogPath); + fs::remove(IndexPath); + fs::remove_all(m_BlocksBasePath); } - m_SobsCursor = (MaxFileOffset + 15) & ~15; + uint64_t LogPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(LogPosition); + uint64_t LegacyLogEntryCount = MigrateLegacyData(true); + + CreateDirectories(m_BucketDir); + + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); + + std::vector<BlockStoreLocation> KnownLocations; + KnownLocations.reserve(m_Index.size()); + for (const auto& Entry : m_Index) + { + const DiskLocation& Location = Entry.second.Location; + m_TotalSize.fetch_add(Location.Size(), std::memory_order::relaxed); + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + continue; + } + const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); + KnownLocations.push_back(BlockLocation); + } + + m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); + + if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0)) + { + MakeIndexSnapshot(); + } + // TODO: should validate integrity of container files here } void @@ -532,12 +1124,15 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H bool ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue) { - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); + + Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); + if (!ChunkBlock) { return false; } - OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size()); + OutValue.Value = ChunkBlock->GetChunk(Location.Offset, Location.Size); OutValue.Value.SetContentType(Loc.GetContentType()); return true; @@ -562,23 +1157,6 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, return false; } -void -ZenCacheDiskLayer::CacheBucket::DeleteStandaloneCacheValue(const DiskLocation& Loc, - const IoHash& HashKey, - const fs::path& Path, - std::error_code& Ec) -{ - ZEN_DEBUG("deleting standalone cache file '{}'", Path); - fs::remove(Path, Ec); - - if (!Ec) - { - m_SlogFile.Append(DiskIndexEntry{.Key = HashKey, .Location = {0, Loc.Size(), 0, DiskLocation::kTombStone}}); - m_Index.erase(HashKey); - m_TotalSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); - } -} - bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -593,15 +1171,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal { IndexEntry& Entry = It.value(); Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); + DiskLocation Location = Entry.Location; + _.ReleaseNow(); - if (GetInlineCacheValue(Entry.Location, OutValue)) + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { - return true; + return GetStandaloneCacheValue(Location, HashKey, OutValue); } - - _.ReleaseNow(); - - return GetStandaloneCacheValue(Entry.Location, HashKey, OutValue); + return GetInlineCacheValue(Location, OutValue); } return false; @@ -619,54 +1196,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& { return PutStandaloneCacheValue(HashKey, Value); } - else - { - // Small object put - - uint64_t EntryFlags = 0; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - RwLock::ExclusiveLockScope _(m_IndexLock); - - DiskLocation Loc(m_SobsCursor, Value.Value.Size(), 0, EntryFlags); - - m_SobsCursor = RoundUp(m_SobsCursor + Loc.Size(), 16); - - if (auto It = m_Index.find(HashKey); It == m_Index.end()) - { - // Previously unknown object - m_Index.insert({HashKey, {Loc, GcClock::TickCount()}}); - } - else - { - // TODO: should check if write is idempotent and bail out if it is? - // this would requiring comparing contents on disk unless we add a - // content hash to the index entry - IndexEntry& Entry = It.value(); - Entry.Location = Loc; - Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); - } - - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset()); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); - } + PutInlineCacheValue(HashKey, Value); } void ZenCacheDiskLayer::CacheBucket::Drop() { - // TODO: add error handling - - m_SobsFile.Close(); + m_BlockStore.Close(); m_SlogFile.Close(); DeleteDirectories(m_BucketDir); } @@ -674,11 +1210,10 @@ ZenCacheDiskLayer::CacheBucket::Drop() void ZenCacheDiskLayer::CacheBucket::Flush() { - RwLock::SharedLockScope _(m_IndexLock); - - m_SobsFile.Flush(); - m_SlogFile.Flush(); + m_BlockStore.Flush(); + RwLock::SharedLockScope _(m_IndexLock); + MakeIndexSnapshot(); SaveManifest(); } @@ -724,20 +1259,22 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) ZenCacheValue Value; - if (GetInlineCacheValue(Loc, Value)) - { - // Validate contents - } - else if (GetStandaloneCacheValue(Loc, HashKey, Value)) + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - // Note: we cannot currently validate contents since we don't - // have a content hash! + if (GetStandaloneCacheValue(Loc, HashKey, Value)) + { + // Note: we cannot currently validate contents since we don't + // have a content hash! + continue; + } } - else + else if (GetInlineCacheValue(Loc, Value)) { - // Value not found - BadKeys.push_back(HashKey); + // Validate contents + continue; } + // Value not found + BadKeys.push_back(HashKey); } } @@ -754,12 +1291,18 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) { // Log a tombstone and delete the in-memory index for the bad entry - const auto It = m_Index.find(BadKey); - const DiskLocation& Location = It->second.Location; - m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = {Location.Offset(), Location.Size(), 0, DiskLocation::kTombStone}}); + const auto It = m_Index.find(BadKey); + DiskLocation Location = It->second.Location; + Location.Flags |= DiskLocation::kTombStone; + m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = Location}); m_Index.erase(BadKey); } } + + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + Ctx.ReportBadCasChunks(BadKeys); } void @@ -767,68 +1310,95 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); - Stopwatch Timer; - const auto Guard = MakeGuard( - [this, &Timer] { ZEN_INFO("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_INFO("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", + m_BucketDir / m_BucketName, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs)); + }); const GcClock::TimePoint ExpireTime = GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration(); const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - RwLock::SharedLockScope _(m_IndexLock); - - std::vector<IoHash> ValidKeys; - std::vector<IoHash> ExpiredKeys; - std::vector<IoHash> Cids; - std::vector<IndexMap::value_type> Entries(m_Index.begin(), m_Index.end()); - - std::sort(Entries.begin(), Entries.end(), [](const auto& LHS, const auto& RHS) { - return LHS.second.LastAccess < RHS.second.LastAccess; - }); + IndexMap Index; + { + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + Index = m_Index; + } - const auto ValidIt = std::lower_bound(Entries.begin(), Entries.end(), ExpireTicks, [](const auto& Kv, auto Ticks) { - const IndexEntry& Entry = Kv.second; - return Entry.LastAccess < Ticks; - }); + std::vector<IoHash> ExpiredKeys; + ExpiredKeys.reserve(1024); + std::vector<IoHash> Cids; Cids.reserve(1024); - for (auto Kv = ValidIt; Kv != Entries.end(); ++Kv) + for (const auto& Entry : Index) { - const IoHash& Key = Kv->first; - const DiskLocation& Loc = Kv->second.Location; + const IoHash& Key = Entry.first; + if (Entry.second.LastAccess < ExpireTicks) + { + ExpiredKeys.push_back(Key); + continue; + } + + const DiskLocation& Loc = Entry.second.Location; if (Loc.IsFlagSet(DiskLocation::kStructured)) { - ZenCacheValue CacheValue; - if (!GetInlineCacheValue(Loc, CacheValue)) + if (Cids.size() > 1024) { - GetStandaloneCacheValue(Loc, Key, CacheValue); + GcCtx.ContributeCids(Cids); + Cids.clear(); } - if (CacheValue.Value) + ZenCacheValue CacheValue; { - ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); - if (Cids.size() > 1024) + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + if (!GetStandaloneCacheValue(Loc, Key, CacheValue)) + { + continue; + } + } + else if (!GetInlineCacheValue(Loc, CacheValue)) { - GcCtx.ContributeCids(Cids); - Cids.clear(); + continue; } - CbObject Obj(SharedBuffer{CacheValue.Value}); - Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); } + + ZEN_ASSERT(CacheValue.Value); + ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); + CbObject Obj(SharedBuffer{CacheValue.Value}); + Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); } } - ValidKeys.reserve(std::distance(ValidIt, Entries.end())); - ExpiredKeys.reserve(std::distance(Entries.begin(), ValidIt)); - - std::transform(ValidIt, Entries.end(), std::back_inserter(ValidKeys), [](const auto& Kv) { return Kv.first; }); - std::transform(Entries.begin(), ValidIt, std::back_inserter(ExpiredKeys), [](const auto& Kv) { return Kv.first; }); - GcCtx.ContributeCids(Cids); - GcCtx.ContributeCacheKeys(m_BucketName, std::move(ValidKeys), std::move(ExpiredKeys)); + GcCtx.ContributeCacheKeys(m_BucketName, std::move(ExpiredKeys)); } void @@ -836,203 +1406,276 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); - Flush(); - - RwLock::ExclusiveLockScope _(m_IndexLock); - - const uint64_t OldCount = m_Index.size(); - const uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); - - ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir); - - Stopwatch Timer; - const auto Guard = MakeGuard([this, &Timer, &OldCount, &OldTotalSize] { - const uint64_t NewCount = m_Index.size(); - const uint64_t NewTotalSize = m_TotalSize.load(std::memory_order::relaxed); - ZEN_INFO("garbage collect from '{}' DONE after {}, collected {} ({}) chunks of total {} ({})", - m_BucketDir, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - OldCount - NewCount, - NiceBytes(OldTotalSize - NewTotalSize), - OldCount, - NiceBytes(OldTotalSize)); + ZEN_INFO("collecting garbage from '{}'", m_BucketDir / m_BucketName); + + Stopwatch TotalTimer; + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = 0; + uint64_t DeletedSize = 0; + uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + + uint64_t DeletedCount = 0; + uint64_t MovedCount = 0; + + const auto _ = MakeGuard([&] { + ZEN_INFO( + "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " + "#{} " + "of #{} " + "entires ({}).", + m_BucketDir / m_BucketName, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedCount, + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize)); + RwLock::SharedLockScope _(m_IndexLock); SaveManifest(); }); - if (m_Index.empty()) + m_SlogFile.Flush(); + + std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketName); + std::vector<IoHash> DeleteCacheKeys; + DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); + GcCtx.FilterCas(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { + if (Keep) + { + return; + } + DeleteCacheKeys.push_back(ChunkHash); + }); + if (DeleteCacheKeys.empty()) { + ZEN_INFO("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName); return; } - auto AddEntries = [this](std::span<const IoHash> Keys, std::vector<IndexMap::value_type>& OutEntries) { - for (const IoHash& Key : Keys) + std::vector<DiskIndexEntry> ExpiredStandaloneEntries; + IndexMap Index; + BlockStore::ReclaimSnapshotState BlockStoreState; + { + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); { - if (auto It = m_Index.find(Key); It != m_Index.end()) + if (m_Index.empty()) { - OutEntries.push_back(*It); + ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); + return; } + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); } - }; - - std::vector<IndexMap::value_type> ValidEntries; - std::vector<IndexMap::value_type> ExpiredEntries; - - AddEntries(GcCtx.ValidCacheKeys(m_BucketName), ValidEntries); - AddEntries(GcCtx.ExpiredCacheKeys(m_BucketName), ExpiredEntries); - - // Remove all standalone file(s) - // NOTE: This can probably be made asynchronously - { - std::error_code Ec; - ExtendablePathBuilder<256> Path; + SaveManifest(); + Index = m_Index; - for (const auto& Entry : ExpiredEntries) + for (const IoHash& Key : DeleteCacheKeys) { - const IoHash& Key = Entry.first; - const DiskLocation& Loc = Entry.second.Location; - - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + if (auto It = Index.find(Key); It != Index.end()) { - Path.Reset(); - BuildPath(Path, Key); - - // NOTE: this will update index and log file - DeleteStandaloneCacheValue(Loc, Key, Path.c_str(), Ec); - - if (Ec) + DiskIndexEntry Entry = {.Key = It->first, .Location = It->second.Location}; + if (Entry.Location.Flags & DiskLocation::kStandaloneFile) { - ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", Path.ToUtf8(), Ec.message()); - Ec.clear(); + Entry.Location.Flags |= DiskLocation::kTombStone; + ExpiredStandaloneEntries.push_back(Entry); } } } + if (GcCtx.IsDeletionMode()) + { + for (const auto& Entry : ExpiredStandaloneEntries) + { + m_Index.erase(Entry.Key); + } + m_SlogFile.Append(ExpiredStandaloneEntries); + } } - if (GcCtx.CollectSmallObjects() && !ExpiredEntries.empty()) + if (GcCtx.IsDeletionMode()) { - // Naive GC implementation of small objects. Needs enough free - // disk space to store intermediate sob container along side the - // old container - - const auto ResetSobStorage = [this, &ValidEntries]() { - m_SobsFile.Close(); - m_SlogFile.Close(); + std::error_code Ec; + ExtendablePathBuilder<256> Path; - const bool IsNew = true; - m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); - m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + for (const auto& Entry : ExpiredStandaloneEntries) + { + const IoHash& Key = Entry.Key; + const DiskLocation& Loc = Entry.Location; - m_SobsCursor = 0; - m_TotalSize = 0; - m_Index.clear(); + Path.Reset(); + BuildPath(Path, Key); - for (const auto& Entry : ValidEntries) { - const IoHash& Key = Entry.first; - const DiskLocation& Loc = Entry.second.Location; - - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_Index.contains(Key)) { - m_SlogFile.Append({.Key = Key, .Location = Loc}); - m_Index.insert({Key, {Loc, GcClock::TickCount()}}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + // Someone added it back, let the file on disk be + ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); + continue; } + ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); + fs::remove(Path.c_str(), Ec); } - }; - - uint64_t NewContainerSize{}; - for (const auto& Entry : ValidEntries) - { - const DiskLocation& Loc = Entry.second.Location; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false) + if (Ec) { - NewContainerSize += (Loc.Size() + sizeof(DiskLocation)); + ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); + Ec.clear(); + DiskLocation RestoreLocation = Loc; + RestoreLocation.Flags &= ~DiskLocation::kTombStone; + + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + if (m_Index.contains(Key)) + { + continue; + } + m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); + m_Index.insert({Key, {Loc, GcClock::TickCount()}}); + m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); + continue; } + m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + DeletedSize += Entry.Location.Size(); + DeletedCount++; } + } - if (NewContainerSize == 0) - { - ResetSobStorage(); - return; - } + TotalChunkCount = Index.size(); - const uint64_t DiskSpaceMargin = (256 << 10); + std::vector<IoHash> TotalChunkHashes; + TotalChunkHashes.reserve(TotalChunkCount); + for (const auto& Entry : Index) + { + const DiskLocation& Location = Entry.second.Location; - std::error_code Ec; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Ec); - if (Ec || Space.Free < NewContainerSize + DiskSpaceMargin) + if (Location.Flags & DiskLocation::kStandaloneFile) { - ZEN_WARN("garbage collect z$ bucket '{}' FAILED, not enough disk space {}/{} (required/free)", - m_BucketDir, - NiceBytes(NewContainerSize), - NiceBytes(Space.Free)); - return; + continue; } + TotalChunkHashes.push_back(Entry.first); + } - std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"}; - std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"}; - - // Copy non expired sob(s) to temporary sob container - + if (TotalChunkHashes.empty()) + { + return; + } + TotalChunkCount = TotalChunkHashes.size(); + + std::vector<BlockStoreLocation> ChunkLocations; + BlockStore::ChunkIndexArray KeepChunkIndexes; + std::vector<IoHash> ChunkIndexToChunkHash; + ChunkLocations.reserve(TotalChunkCount); + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); + + GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + auto KeyIt = Index.find(ChunkHash); + const DiskLocation& DiskLocation = KeyIt->second.Location; + BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + if (Keep) { - BasicFile TmpSobs; - TCasLogFile<DiskIndexEntry> TmpLog; - uint64_t TmpCursor{}; - std::vector<uint8_t> Chunk; + KeepChunkIndexes.push_back(ChunkIndex); + } + }); - TmpSobs.Open(TmpSobsPath, BasicFile::Mode::kTruncate); - TmpLog.Open(TmpSlogPath, CasLogFile::Mode::kTruncate); + size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); - for (const auto& Entry : ValidEntries) - { - const IoHash& Key = Entry.first; - const DiskLocation& Loc = Entry.second.Location; + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + if (!PerformDelete) + { + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); + uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); + ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", + m_BucketDir / m_BucketName, + DeleteCount, + 0, // NiceBytes(TotalSize - NewTotalSize), + TotalChunkCount, + NiceBytes(TotalSize)); + return; + } - DiskLocation NewLoc; + std::vector<IoHash> DeletedChunks; + m_BlockStore.ReclaimSpace( + BlockStoreState, + ChunkLocations, + KeepChunkIndexes, + m_PayloadAlignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); + for (const auto& Entry : MovedChunks) + { + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const DiskLocation& OldDiskLocation = Index[ChunkHash].Location; + LogEntries.push_back( + {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())}); + } + for (const size_t ChunkIndex : RemovedChunks) + { + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const DiskLocation& OldDiskLocation = Index[ChunkHash].Location; + LogEntries.push_back({.Key = ChunkHash, + .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), + m_PayloadAlignment, + OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); + DeletedChunks.push_back(ChunkHash); + } - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - NewLoc = DiskLocation(0, Loc.Size(), 0, Loc.GetFlags()); - } - else + m_SlogFile.Append(LogEntries); + m_SlogFile.Flush(); + { + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + for (const DiskIndexEntry& Entry : LogEntries) { - Chunk.resize(Loc.Size()); - m_SobsFile.Read(Chunk.data(), Chunk.size(), Loc.Offset()); - - NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, Loc.GetFlags()); - TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor); - TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16); + if (Entry.Location.GetFlags() & DiskLocation::kTombStone) + { + m_Index.erase(Entry.Key); + uint64_t ChunkSize = Entry.Location.GetBlockLocation(m_PayloadAlignment).Size; + m_TotalSize.fetch_sub(ChunkSize); + continue; + } + m_Index[Entry.Key].Location = Entry.Location; } - - TmpLog.Append(DiskIndexEntry{.Key = Key, .Location = NewLoc}); } - } - - // Swap state - try - { - fs::path SobsPath{m_BucketDir / "zen.sobs"}; - fs::path SlogPath{m_BucketDir / "zen.slog"}; - - m_SobsFile.Close(); - m_SlogFile.Close(); - - fs::remove(SobsPath); - fs::remove(SlogPath); - - fs::rename(TmpSobsPath, SobsPath); - fs::rename(TmpSlogPath, SlogPath); + }, + [&]() { return GcCtx.CollectSmallObjects(); }); - const bool IsNew = false; - OpenLog(m_BucketDir, IsNew); - } - catch (std::exception& Err) - { - ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); - ResetSobStorage(); - } - } + GcCtx.DeletedCas(DeletedChunks); } void @@ -1144,16 +1787,20 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c // Update index - uint64_t EntryFlags = DiskLocation::kStandaloneFile; + uint8_t EntryFlags = DiskLocation::kStandaloneFile; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } RwLock::ExclusiveLockScope _(m_IndexLock); - DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags); + DiskLocation Loc(Value.Value.Size(), EntryFlags); IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); if (auto It = m_Index.find(HashKey); It == m_Index.end()) @@ -1171,6 +1818,41 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); } +void +ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) +{ + uint8_t EntryFlags = 0; + + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } + + BlockStoreLocation BlockStoreLocation = m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment); + DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); + const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; + m_SlogFile.Append(DiskIndexEntry); + m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order::relaxed); + RwLock::ExclusiveLockScope __(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + // TODO: should check if write is idempotent and bail out if it is? + // this would requiring comparing contents on disk unless we add a + // content hash to the index entry + IndexEntry& Entry = It.value(); + Entry.Location = Location; + Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); + } + else + { + m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); + } +} + ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) @@ -1363,6 +2045,7 @@ void ZenCacheDiskLayer::Flush() { std::vector<CacheBucket*> Buckets; + { RwLock::SharedLockScope _(m_Lock); Buckets.reserve(m_Buckets.size()); @@ -1417,6 +2100,9 @@ ZenCacheDiskLayer::TotalSize() const ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS +} + +namespace zen { using namespace std::literals; @@ -1425,10 +2111,18 @@ namespace testutils { IoBuffer CreateBinaryCacheValue(uint64_t Size) { - std::vector<uint32_t> Data(size_t(Size / sizeof(uint32_t))); - std::generate(Data.begin(), Data.end(), [Idx = 0]() mutable { return Idx++; }); + static std::random_device rd; + static std::mt19937 g(rd()); - IoBuffer Buf(IoBuffer::Clone, Data.data(), Data.size() * sizeof(uint32_t)); + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size()); Buf.SetContentType(ZenContentType::kBinary); return Buf; }; @@ -1735,6 +2429,7 @@ TEST_CASE("z$.gc") GcCtx.MaxCacheDuration(std::chrono::minutes(2)); GcCtx.CollectSmallObjects(true); + Zcs.Flush(); Gc.CollectGarbage(GcCtx); for (const auto& Key : Keys) @@ -1749,6 +2444,398 @@ TEST_CASE("z$.gc") } } +TEST_CASE("z$.legacyconversion") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[] = {2041, + 1123, + 1223, + 1239, + 341, + 1412, + 912, + 774, + 341, + 431, + 554, + 1098, + 2048, + 339 + 64 * 1024, + 561 + 64 * 1024, + 16 + 64 * 1024, + 16 + 64 * 1024, + 2048, + 2048}; + size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t); + size_t SingleBlockSize = 0; + std::vector<IoBuffer> Chunks; + Chunks.reserve(ChunkCount); + for (uint64_t Size : ChunkSizes) + { + Chunks.push_back(testutils::CreateBinaryCacheValue(Size)); + SingleBlockSize += Size; + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(ChunkCount); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CreateDirectories(TempDir.Path()); + + const std::string Bucket = "rightintwo"; + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path()); + const GcClock::TimePoint CurrentTime = GcClock::Now(); + + for (size_t i = 0; i < ChunkCount; i++) + { + Zcs.Put(Bucket, ChunkHashes[i], {.Value = Chunks[i]}); + } + + std::vector<IoHash> KeepChunks; + for (size_t i = 0; i < ChunkCount; i += 2) + { + KeepChunks.push_back(ChunkHashes[i]); + } + GcContext GcCtx(CurrentTime + std::chrono::hours(2)); + GcCtx.MaxCacheDuration(std::chrono::minutes(2)); + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepChunks); + Zcs.Flush(); + Gc.CollectGarbage(GcCtx); + } + std::filesystem::path BucketDir = TempDir.Path() / Bucket; + std::filesystem::path BlocksBaseDir = BucketDir / "blocks"; + + std::filesystem::path CasPath = BlockStore ::GetBlockPath(BlocksBaseDir, 1); + std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir); + std::filesystem::remove(LegacyDataPath); + std::filesystem::rename(CasPath, LegacyDataPath); + + std::vector<DiskIndexEntry> LogEntries; + std::filesystem::path IndexPath = GetIndexPath(BucketDir, Bucket); + if (std::filesystem::is_regular_file(IndexPath)) + { + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CacheBucketIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if (Header.Magic == CacheBucketIndexHeader::ExpectedMagic && Header.Version == CacheBucketIndexHeader::CurrentVersion && + Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) + { + LogEntries.resize(Header.EntryCount); + ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + } + } + ObjectIndexFile.Close(); + std::filesystem::remove(IndexPath); + } + + std::filesystem::path LogPath = GetLogPath(BucketDir, Bucket); + { + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + LogEntries.reserve(CasLog.GetLogCount()); + CasLog.Replay([&](const DiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0); + } + TCasLogFile<LegacyDiskIndexEntry> LegacyLog; + std::filesystem::path LegacylogPath = GetLegacyLogPath(BucketDir); + LegacyLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate); + + for (const DiskIndexEntry& Entry : LogEntries) + { + uint64_t Size; + uint64_t Offset; + if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + Size = Entry.Location.Location.StandaloneSize; + Offset = 0; + } + else + { + BlockStoreLocation Location = Entry.Location.GetBlockLocation(16); + Size = Location.Size; + Offset = Location.Offset; + } + LegacyDiskLocation LegacyLocation(Offset, Size, 0, static_cast<uint64_t>(Entry.Location.Flags) << 56); + LegacyDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation}; + LegacyLog.Append(LegacyEntry); + } + LegacyLog.Close(); + + std::filesystem::remove_all(BlocksBaseDir); + std::filesystem::remove(LogPath); + std::filesystem::remove(IndexPath); + + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path()); + + for (size_t i = 0; i < ChunkCount; i += 2) + { + ZenCacheValue Value; + CHECK(Zcs.Get(Bucket, ChunkHashes[i], Value)); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Value.Value)); + CHECK(!Zcs.Get(Bucket, ChunkHashes[i + 1], Value)); + } + } +} + +TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) +{ + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + const uint64_t kChunkSize = 1048; + const int32_t kChunkCount = 8192; + + struct Chunk + { + std::string Bucket; + IoBuffer Buffer; + }; + std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks; + Chunks.reserve(kChunkCount); + + const std::string Bucket1 = "rightinone"; + const std::string Bucket2 = "rightintwo"; + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + while (true) + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + if (Chunks.contains(Hash)) + { + continue; + } + Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; + break; + } + while (true) + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + if (Chunks.contains(Hash)) + { + continue; + } + Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; + break; + } + } + + CreateDirectories(TempDir.Path()); + + WorkerThreadPool ThreadPool(4); + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path()); + + { + std::atomic<size_t> WorkCompleted = 0; + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + } + + const uint64_t TotalSize = Zcs.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * Chunks.size(), TotalSize); + + { + std::atomic<size_t> WorkCompleted = 0; + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { + std::string Bucket = Chunk.second.Bucket; + IoHash ChunkHash = Chunk.first; + ZenCacheValue CacheValue; + + CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); + IoHash Hash = IoHash::HashBuffer(CacheValue.Value); + CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + } + std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes; + GcChunkHashes.reserve(Chunks.size()); + for (const auto& Chunk : Chunks) + { + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; + } + { + std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks; + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; + } + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; + } + } + + std::atomic<size_t> WorkCompleted = 0; + std::atomic_uint32_t AddedChunkCount = 0; + for (const auto& Chunk : NewChunks) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); + AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); + }); + } + + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) + { + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + } + WorkCompleted.fetch_add(1); + }); + } + while (AddedChunkCount.load() < NewChunks.size()) + { + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const auto& Chunk : NewChunks) + { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) + { + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; + } + } + std::vector<IoHash> KeepHashes; + KeepHashes.reserve(GcChunkHashes.size()); + for (const auto& Entry : GcChunkHashes) + { + KeepHashes.push_back(Entry.first); + } + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Zcs.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + + while (WorkCompleted < NewChunks.size() + Chunks.size()) + { + Sleep(1); + } + + { + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const auto& Chunk : NewChunks) + { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) + { + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; + } + } + std::vector<IoHash> KeepHashes; + KeepHashes.reserve(GcChunkHashes.size()); + for (const auto& Entry : GcChunkHashes) + { + KeepHashes.push_back(Entry.first); + } + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Zcs.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + } + { + std::atomic<size_t> WorkCompleted = 0; + for (const auto& Chunk : GcChunkHashes) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { + ZenCacheValue CacheValue; + CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < GcChunkHashes.size()) + { + Sleep(1); + } + } + } +} + #endif void diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index f39d01747..0c2a7c0b2 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -8,6 +8,7 @@ #include <zencore/thread.h> #include <zencore/uid.h> #include <zenstore/basicfile.h> +#include <zenstore/blockstore.h> #include <zenstore/cas.h> #include <zenstore/caslog.h> #include <zenstore/gc.h> @@ -76,37 +77,32 @@ struct DiskLocation { inline DiskLocation() = default; - inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) - : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) - , LowerSize(ValueSize & 0xFFFFffff) - , IndexDataSize(IndexSize) + inline DiskLocation(uint64_t ValueSize, uint8_t Flags) : Flags(Flags | kStandaloneFile) { Location.StandaloneSize = ValueSize; } + + inline DiskLocation(const BlockStoreLocation& Location, uint64_t PayloadAlignment, uint8_t Flags) : Flags(Flags & ~kStandaloneFile) { + this->Location.BlockLocation = BlockStoreDiskLocation(Location, PayloadAlignment); } - static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; - static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize) - static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; - static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file - static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary - static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value - static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format - - static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } + inline BlockStoreLocation GetBlockLocation(uint64_t PayloadAlignment) const + { + ZEN_ASSERT(!(Flags & kStandaloneFile)); + return Location.BlockLocation.Get(PayloadAlignment); + } - inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } - inline uint64_t Size() const { return LowerSize; } - inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } - inline uint64_t GetFlags() const { return OffsetAndFlags & kFlagsMask; } + inline uint64_t Size() const { return (Flags & kStandaloneFile) ? Location.StandaloneSize : Location.BlockLocation.GetSize(); } + inline uint8_t IsFlagSet(uint64_t Flag) const { return Flags & Flag; } + inline uint8_t GetFlags() const { return Flags; } inline ZenContentType GetContentType() const { ZenContentType ContentType = ZenContentType::kBinary; - if (IsFlagSet(DiskLocation::kStructured)) + if (IsFlagSet(kStructured)) { ContentType = ZenContentType::kCbObject; } - if (IsFlagSet(DiskLocation::kCompressed)) + if (IsFlagSet(kCompressed)) { ContentType = ZenContentType::kCompressedBinary; } @@ -114,21 +110,29 @@ struct DiskLocation return ContentType; } -private: - uint64_t OffsetAndFlags = 0; - uint32_t LowerSize = 0; - uint32_t IndexDataSize = 0; + union + { + BlockStoreDiskLocation BlockLocation; // 10 bytes + uint64_t StandaloneSize = 0; // 8 bytes + } Location; + + static const uint8_t kStandaloneFile = 0x80u; // Stored as a separate file + static const uint8_t kStructured = 0x40u; // Serialized as compact binary + static const uint8_t kTombStone = 0x20u; // Represents a deleted key/value + static const uint8_t kCompressed = 0x10u; // Stored in compressed buffer format + + uint8_t Flags = 0; + uint8_t Reserved = 0; }; struct DiskIndexEntry { - IoHash Key; - DiskLocation Location; + IoHash Key; // 20 bytes + DiskLocation Location; // 12 bytes }; - #pragma pack(pop) -static_assert(sizeof(DiskIndexEntry) == 36); +static_assert(sizeof(DiskIndexEntry) == 32); /** In-memory cache storage @@ -245,15 +249,19 @@ private: inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); } private: + const uint64_t MaxBlockSize = 1ull << 30; + uint64_t m_PayloadAlignment = 1ull << 4; + std::string m_BucketName; std::filesystem::path m_BucketDir; + std::filesystem::path m_BlocksBasePath; + BlockStore m_BlockStore; Oid m_BucketId; bool m_IsOk = false; uint64_t m_LargeObjectThreshold = 64 * 1024; // These files are used to manage storage of small objects for this bucket - BasicFile m_SobsFile; TCasLogFile<DiskIndexEntry> m_SlogFile; struct IndexEntry @@ -264,10 +272,12 @@ private: IndexEntry() : Location(), LastAccess() {} IndexEntry(const DiskLocation& Loc, const int64_t Timestamp) : Location(Loc), LastAccess(Timestamp) {} IndexEntry(const IndexEntry& E) : Location(E.Location), LastAccess(E.LastAccess.load(std::memory_order_relaxed)) {} - IndexEntry(IndexEntry&& E) : Location(std::move(E.Location)), LastAccess(E.LastAccess.load(std::memory_order_relaxed)) {} + IndexEntry(IndexEntry&& E) noexcept : Location(std::move(E.Location)), LastAccess(E.LastAccess.load(std::memory_order_relaxed)) + { + } IndexEntry& operator=(const IndexEntry& E) { return *this = IndexEntry(E); } - IndexEntry& operator=(IndexEntry&& E) + IndexEntry& operator=(IndexEntry&& E) noexcept { Location = std::move(E.Location); LastAccess.store(E.LastAccess.load(), std::memory_order_relaxed); @@ -277,20 +287,21 @@ private: using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>; - RwLock m_IndexLock; - IndexMap m_Index; - uint64_t m_SobsCursor = 0; + RwLock m_IndexLock; + IndexMap m_Index; + std::atomic_uint64_t m_TotalSize{}; - void BuildPath(PathBuilderBase& Path, const IoHash& HashKey); - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); - void DeleteStandaloneCacheValue(const DiskLocation& Loc, - const IoHash& HashKey, - const std::filesystem::path& Path, - std::error_code& Ec); - bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); - void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew); + void BuildPath(PathBuilderBase& Path, const IoHash& HashKey); + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); + void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); + void MakeIndexSnapshot(); + uint64_t ReadIndexFile(); + uint64_t ReadLog(uint64_t LogPosition); + uint64_t MigrateLegacyData(bool CleanSource); + void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew); // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 617f50660..aceb2df00 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -976,7 +976,7 @@ ProjectStore::GatherReferences(GcContext& GcCtx) { Stopwatch Timer; const auto Guard = - MakeGuard([this, &Timer] { ZEN_INFO("project store gathered all references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + MakeGuard([&] { ZEN_INFO("project store gathered all references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); DiscoverProjects(); diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp index 1eb859d5a..0992662c2 100644 --- a/zenstore/blockstore.cpp +++ b/zenstore/blockstore.cpp @@ -1,13 +1,17 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include "compactcas.h" - #include <zenstore/blockstore.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> + #if ZEN_WITH_TESTS # include <zencore/compactbinarybuilder.h> # include <zencore/testing.h> # include <zencore/testutils.h> +# include <zencore/workthreadpool.h> # include <algorithm> # include <random> #endif @@ -102,12 +106,814 @@ BlockStoreFile::Flush() m_File.Flush(); } +BasicFile& +BlockStoreFile::GetBasicFile() +{ + return m_File; +} + void BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun) { m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun)); } +constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024; + +void +BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount, + const std::vector<BlockStoreLocation>& KnownLocations) +{ + ZEN_ASSERT(MaxBlockSize > 0); + ZEN_ASSERT(MaxBlockCount > 0); + ZEN_ASSERT(IsPow2(MaxBlockCount)); + + m_BlocksBasePath = BlocksBasePath; + m_MaxBlockSize = MaxBlockSize; + + m_ChunkBlocks.clear(); + + std::unordered_set<uint32_t> KnownBlocks; + for (const auto& Entry : KnownLocations) + { + KnownBlocks.insert(Entry.BlockIndex); + } + + if (std::filesystem::is_directory(m_BlocksBasePath)) + { + std::vector<std::filesystem::path> FoldersToScan; + FoldersToScan.push_back(m_BlocksBasePath); + size_t FolderOffset = 0; + while (FolderOffset < FoldersToScan.size()) + { + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) + { + if (Entry.is_directory()) + { + FoldersToScan.push_back(Entry.path()); + continue; + } + if (Entry.is_regular_file()) + { + const std::filesystem::path Path = Entry.path(); + if (Path.extension() != GetBlockFileExtension()) + { + continue; + } + std::string FileName = Path.stem().string(); + uint32_t BlockIndex; + bool OK = ParseHexNumber(FileName, BlockIndex); + if (!OK) + { + continue; + } + if (!KnownBlocks.contains(BlockIndex)) + { + // Log removing unreferenced block + // Clear out unused blocks + ZEN_INFO("removing unused block at '{}'", Path); + std::error_code Ec; + std::filesystem::remove(Path, Ec); + if (Ec) + { + ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); + } + continue; + } + Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path); + BlockFile->Open(); + m_ChunkBlocks[BlockIndex] = BlockFile; + } + } + ++FolderOffset; + } + } + else + { + CreateDirectories(m_BlocksBasePath); + } +} + +void +BlockStore::Close() +{ + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + m_WriteBlock = nullptr; + m_CurrentInsertOffset = 0; + m_WriteBlockIndex = 0; + m_ChunkBlocks.clear(); + m_BlocksBasePath.clear(); +} + +BlockStoreLocation +BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment) +{ + ZEN_ASSERT(Data != nullptr); + ZEN_ASSERT(Size > 0u); + ZEN_ASSERT(Size <= m_MaxBlockSize); + ZEN_ASSERT(Alignment > 0u); + + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + bool IsWriting = m_WriteBlock != nullptr; + if (!IsWriting || (m_CurrentInsertOffset + Size) > m_MaxBlockSize) + { + if (m_WriteBlock) + { + m_WriteBlock = nullptr; + } + { + if (m_ChunkBlocks.size() == m_MaxBlockCount) + { + throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); + } + WriteBlockIndex += IsWriting ? 1 : 0; + while (m_ChunkBlocks.contains(WriteBlockIndex)) + { + WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); + } + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + m_WriteBlock = new BlockStoreFile(BlockPath); + m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + } + m_CurrentInsertOffset = 0; + m_WriteBlock->Create(m_MaxBlockSize); + } + uint64_t InsertOffset = m_CurrentInsertOffset; + m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment); + Ref<BlockStoreFile> WriteBlock = m_WriteBlock; + InsertLock.ReleaseNow(); + + WriteBlock->Write(Data, Size, InsertOffset); + + return {.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}; +} + +BlockStore::ReclaimSnapshotState +BlockStore::GetReclaimSnapshotState() +{ + ReclaimSnapshotState State; + RwLock::ExclusiveLockScope _(m_InsertLock); + State.ExcludeBlockIndex = m_WriteBlock ? m_WriteBlockIndex.load(std::memory_order_acquire) : 0xffffffffu; + State.BlockCount = m_ChunkBlocks.size(); + _.ReleaseNow(); + return State; +} + +Ref<BlockStoreFile> +BlockStore::GetChunkBlock(const BlockStoreLocation& Location) +{ + RwLock::SharedLockScope InsertLock(m_InsertLock); + if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) + { + return BlockIt->second; + } + return {}; +} + +void +BlockStore::Flush() +{ + RwLock::ExclusiveLockScope _(m_InsertLock); + if (m_CurrentInsertOffset > 0) + { + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); + m_WriteBlock = nullptr; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; + } +} + +void +BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, + const std::vector<BlockStoreLocation>& ChunkLocations, + const ChunkIndexArray& KeepChunkIndexes, + uint64_t PayloadAlignment, + bool DryRun, + const ReclaimCallback& ChangeCallback, + const ClaimDiskReserveCallback& DiskReserveCallback) +{ + if (ChunkLocations.empty()) + { + return; + } + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = ChunkLocations.size(); + uint64_t DeletedSize = 0; + uint64_t OldTotalSize = 0; + uint64_t NewTotalSize = 0; + + uint64_t MovedCount = 0; + uint64_t DeletedCount = 0; + + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_INFO( + "reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " + "#{} " + "of #{} " + "chunks ({}).", + m_BlocksBasePath, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedCount, + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize)); + }); + + size_t BlockCount = Snapshot.BlockCount; + + std::unordered_set<size_t> KeepChunkMap; + KeepChunkMap.reserve(KeepChunkIndexes.size()); + for (size_t KeepChunkIndex : KeepChunkIndexes) + { + KeepChunkMap.insert(KeepChunkIndex); + } + + std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; + std::vector<ChunkIndexArray> BlockKeepChunks; + std::vector<ChunkIndexArray> BlockDeleteChunks; + + BlockIndexToChunkMapIndex.reserve(BlockCount); + BlockKeepChunks.reserve(BlockCount); + BlockDeleteChunks.reserve(BlockCount); + size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; + + size_t DeleteCount = 0; + for (size_t Index = 0; Index < TotalChunkCount; ++Index) + { + const BlockStoreLocation& Location = ChunkLocations[Index]; + OldTotalSize += Location.Size; + if (Location.BlockIndex == Snapshot.ExcludeBlockIndex) + { + continue; + } + + auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex); + size_t ChunkMapIndex = 0; + if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) + { + ChunkMapIndex = BlockKeepChunks.size(); + BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex; + BlockKeepChunks.resize(ChunkMapIndex + 1); + BlockKeepChunks.back().reserve(GuesstimateCountPerBlock); + BlockDeleteChunks.resize(ChunkMapIndex + 1); + BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock); + } + else + { + ChunkMapIndex = BlockIndexPtr->second; + } + + if (KeepChunkMap.contains(Index)) + { + ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex]; + IndexMap.push_back(Index); + NewTotalSize += Location.Size; + continue; + } + ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex]; + IndexMap.push_back(Index); + DeleteCount++; + } + + std::unordered_set<uint32_t> BlocksToReWrite; + BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); + for (const auto& Entry : BlockIndexToChunkMapIndex) + { + uint32_t BlockIndex = Entry.first; + size_t ChunkMapIndex = Entry.second; + const ChunkIndexArray& ChunkMap = BlockDeleteChunks[ChunkMapIndex]; + if (ChunkMap.empty()) + { + continue; + } + BlocksToReWrite.insert(BlockIndex); + } + + if (DryRun) + { + ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}", + m_BlocksBasePath, + DeleteCount, + NiceBytes(OldTotalSize - NewTotalSize), + TotalChunkCount, + OldTotalSize); + return; + } + + Ref<BlockStoreFile> NewBlockFile; + try + { + uint64_t WriteOffset = 0; + uint32_t NewBlockIndex = 0; + for (uint32_t BlockIndex : BlocksToReWrite) + { + const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; + + Ref<BlockStoreFile> OldBlockFile; + { + RwLock::SharedLockScope _i(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + OldBlockFile = m_ChunkBlocks[BlockIndex]; + ZEN_ASSERT(OldBlockFile); + } + + const ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex]; + if (KeepMap.empty()) + { + const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; + for (size_t DeleteIndex : DeleteMap) + { + DeletedSize += ChunkLocations[DeleteIndex].Size; + } + ChangeCallback({}, DeleteMap); + DeletedCount += DeleteMap.size(); + { + RwLock::ExclusiveLockScope _i(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks[BlockIndex] = nullptr; + } + ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); + } + continue; + } + + MovedChunksArray MovedChunks; + std::vector<uint8_t> Chunk; + for (const size_t& ChunkIndex : KeepMap) + { + const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; + Chunk.resize(ChunkLocation.Size); + OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); + + if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) + { + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); + + if (NewBlockFile) + { + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); + NewBlockFile = nullptr; + } + { + ChangeCallback(MovedChunks, {}); + MovedCount += KeepMap.size(); + MovedChunks.clear(); + RwLock::ExclusiveLockScope __(m_InsertLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + if (m_ChunkBlocks.size() == m_MaxBlockCount) + { + ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", + m_BlocksBasePath, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + return; + } + while (m_ChunkBlocks.contains(NextBlockIndex)) + { + NextBlockIndex = (NextBlockIndex + 1) & (m_MaxBlockCount - 1); + } + std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; + } + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); + if (Error) + { + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); + return; + } + if (Space.Free < m_MaxBlockSize) + { + uint64_t ReclaimedSpace = DiskReserveCallback(); + if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + { + ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", + m_BlocksBasePath, + m_MaxBlockSize, + NiceBytes(Space.Free + ReclaimedSpace)); + RwLock::ExclusiveLockScope _l(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks.erase(NextBlockIndex); + return; + } + + ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + m_BlocksBasePath, + ReclaimedSpace, + NiceBytes(Space.Free + ReclaimedSpace)); + } + NewBlockFile->Create(m_MaxBlockSize); + NewBlockIndex = NextBlockIndex; + WriteOffset = 0; + } + + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}}); + WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment); + } + Chunk.clear(); + if (NewBlockFile) + { + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); + NewBlockFile = nullptr; + } + + const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; + for (size_t DeleteIndex : DeleteMap) + { + DeletedSize += ChunkLocations[DeleteIndex].Size; + } + + ChangeCallback(MovedChunks, DeleteMap); + MovedCount += KeepMap.size(); + DeletedCount += DeleteMap.size(); + MovedChunks.clear(); + { + RwLock::ExclusiveLockScope __(m_InsertLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks[BlockIndex] = nullptr; + } + ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); + } + OldBlockFile = nullptr; + } + } + catch (std::exception& ex) + { + ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what()); + if (NewBlockFile) + { + ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath()); + std::error_code Ec; + NewBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", NewBlockFile->GetPath(), Ec.message()); + } + } + } +} + +void +BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, + IterateChunksSmallSizeCallback SmallSizeCallback, + IterateChunksLargeSizeCallback LargeSizeCallback) +{ + // We do a read sweep through the payloads file and validate + // any entries that are contained within each segment, with + // the assumption that most entries will be checked in this + // pass. An alternative strategy would be to use memory mapping. + + { + ChunkIndexArray BigChunks; + IoBuffer ReadBuffer{ScrubSmallChunkWindowSize}; + void* BufferBase = ReadBuffer.MutableData(); + + RwLock::SharedLockScope _(m_InsertLock); + + for (const auto& Block : m_ChunkBlocks) + { + uint64_t WindowStart = 0; + uint64_t WindowEnd = ScrubSmallChunkWindowSize; + uint32_t BlockIndex = Block.first; + const Ref<BlockStoreFile>& BlockFile = Block.second; + const uint64_t FileSize = BlockFile->FileSize(); + + do + { + const uint64_t ChunkSize = Min(ScrubSmallChunkWindowSize, FileSize - WindowStart); + BlockFile->Read(BufferBase, ChunkSize, WindowStart); + + // TODO: We could be smarter here if the ChunkLocations were sorted on block index - we could + // then only scan a subset of ChunkLocations instead of scanning through them all... + for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) + { + const BlockStoreLocation Location = ChunkLocations[ChunkIndex]; + if (BlockIndex != Location.BlockIndex) + { + continue; + } + + const uint64_t EntryOffset = Location.Offset; + if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) + { + const uint64_t EntryEnd = EntryOffset + Location.Size; + + if (EntryEnd >= WindowEnd) + { + BigChunks.push_back(ChunkIndex); + + continue; + } + + SmallSizeCallback(ChunkIndex, + reinterpret_cast<uint8_t*>(BufferBase) + Location.Offset - WindowStart, + Location.Size); + } + } + + WindowStart += ScrubSmallChunkWindowSize; + WindowEnd += ScrubSmallChunkWindowSize; + } while (WindowStart < FileSize); + } + + // Deal with large chunks and chunks that extend over a ScrubSmallChunkWindowSize border + for (size_t ChunkIndex : BigChunks) + { + const BlockStoreLocation Location = ChunkLocations[ChunkIndex]; + const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex]; + LargeSizeCallback(ChunkIndex, BlockFile, Location.Offset, Location.Size); + } + } +} + +bool +BlockStore::Split(const std::vector<BlockStoreLocation>& ChunkLocations, + const std::filesystem::path& SourceBlockFilePath, + const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount, + size_t PayloadAlignment, + bool CleanSource, + const SplitCallback& Callback) +{ + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(BlocksBasePath.parent_path(), Error); + if (Error) + { + ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", BlocksBasePath, Error.message()); + return false; + } + + if (Space.Free < MaxBlockSize) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", + BlocksBasePath, + MaxBlockSize, + NiceBytes(Space.Free)); + return false; + } + + size_t TotalSize = 0; + for (const BlockStoreLocation& Location : ChunkLocations) + { + TotalSize += Location.Size; + } + size_t ChunkCount = ChunkLocations.size(); + uint64_t RequiredDiskSpace = TotalSize + ((PayloadAlignment - 1) * ChunkCount); + uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize; + if (MaxRequiredBlockCount > MaxBlockCount) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", + BlocksBasePath, + MaxRequiredBlockCount, + MaxBlockCount); + return false; + } + + constexpr const uint64_t DiskReserve = 1ul << 28; + + if (CleanSource) + { + if (Space.Free < (MaxBlockSize + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + BlocksBasePath, + NiceBytes(MaxBlockSize + DiskReserve), + NiceBytes(Space.Free)); + return false; + } + } + else + { + if (Space.Free < (RequiredDiskSpace + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + BlocksBasePath, + NiceBytes(RequiredDiskSpace + DiskReserve), + NiceBytes(Space.Free)); + return false; + } + } + + uint32_t WriteBlockIndex = 0; + while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; + } + + BasicFile BlockFile; + BlockFile.Open(SourceBlockFilePath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + + if (CleanSource && (MaxRequiredBlockCount < 2)) + { + MovedChunksArray Chunks; + Chunks.reserve(ChunkCount); + for (size_t Index = 0; Index < ChunkCount; ++Index) + { + const BlockStoreLocation& ChunkLocation = ChunkLocations[Index]; + Chunks.push_back({Index, {.BlockIndex = WriteBlockIndex, .Offset = ChunkLocation.Offset, .Size = ChunkLocation.Size}}); + } + std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex); + CreateDirectories(BlockPath.parent_path()); + BlockFile.Close(); + std::filesystem::rename(SourceBlockFilePath, BlockPath); + Callback(Chunks); + return true; + } + + ChunkIndexArray ChunkIndexes; + ChunkIndexes.reserve(ChunkCount); + for (size_t Index = 0; Index < ChunkCount; ++Index) + { + ChunkIndexes.push_back(Index); + } + + std::sort(begin(ChunkIndexes), end(ChunkIndexes), [&](size_t Lhs, size_t Rhs) { + const BlockStoreLocation& LhsLocation = ChunkLocations[Lhs]; + const BlockStoreLocation& RhsLocation = ChunkLocations[Rhs]; + return LhsLocation.Offset < RhsLocation.Offset; + }); + + uint64_t BlockSize = 0; + uint64_t BlockOffset = 0; + std::vector<BlockStoreLocation> NewLocations; + struct BlockData + { + MovedChunksArray Chunks; + uint64_t BlockOffset; + uint64_t BlockSize; + uint32_t BlockIndex; + }; + + std::vector<BlockData> BlockRanges; + MovedChunksArray Chunks; + BlockRanges.reserve(MaxRequiredBlockCount); + for (const size_t& ChunkIndex : ChunkIndexes) + { + const BlockStoreLocation& LegacyChunkLocation = ChunkLocations[ChunkIndex]; + + uint64_t ChunkOffset = LegacyChunkLocation.Offset; + uint64_t ChunkSize = LegacyChunkLocation.Size; + uint64_t ChunkEnd = ChunkOffset + ChunkSize; + + if (BlockSize == 0) + { + BlockOffset = ChunkOffset; + } + if ((ChunkEnd - BlockOffset) > MaxBlockSize) + { + BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; + BlockRange.Chunks.swap(Chunks); + BlockRanges.push_back(BlockRange); + + WriteBlockIndex++; + while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; + } + BlockOffset = ChunkOffset; + BlockSize = 0; + } + BlockSize = RoundUp(BlockSize, PayloadAlignment); + BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; + Chunks.push_back({ChunkIndex, ChunkLocation}); + BlockSize = ChunkEnd - BlockOffset; + } + if (BlockSize > 0) + { + BlockRanges.push_back( + {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); + } + + Stopwatch WriteBlockTimer; + + std::reverse(BlockRanges.begin(), BlockRanges.end()); + std::vector<std::uint8_t> Buffer(1 << 28); + for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) + { + const BlockData& BlockRange = BlockRanges[Idx]; + if (Idx > 0) + { + uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; + uint64_t Completed = BlockOffset + BlockSize - Remaining; + uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; + + ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", + BlocksBasePath, + Idx, + BlockRanges.size(), + NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), + NiceBytes(BlockOffset + BlockSize), + NiceTimeSpanMs(ETA)); + } + + std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, BlockRange.BlockIndex); + BlockStoreFile ChunkBlock(BlockPath); + ChunkBlock.Create(BlockRange.BlockSize); + uint64_t Offset = 0; + while (Offset < BlockRange.BlockSize) + { + uint64_t Size = BlockRange.BlockSize - Offset; + if (Size > Buffer.size()) + { + Size = Buffer.size(); + } + BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); + ChunkBlock.Write(Buffer.data(), Size, Offset); + Offset += Size; + } + ChunkBlock.Truncate(Offset); + ChunkBlock.Flush(); + + Callback(BlockRange.Chunks); + + if (CleanSource) + { + BlockFile.SetFileSize(BlockRange.BlockOffset); + } + } + BlockFile.Close(); + + return true; +} + +const char* +BlockStore::GetBlockFileExtension() +{ + return ".ucas"; +} + +std::filesystem::path +BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) +{ + ExtendablePathBuilder<256> Path; + + char BlockHexString[9]; + ToHexNumber(BlockIndex, BlockHexString); + + Path.Append(BlocksBasePath); + Path.AppendSeparator(); + Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); + Path.AppendSeparator(); + Path.Append(BlockHexString); + Path.Append(GetBlockFileExtension()); + return Path.ToPath(); +} + #if ZEN_WITH_TESTS static bool @@ -232,6 +1038,403 @@ TEST_CASE("blockstore.blockfile") CHECK(!std::filesystem::exists(RootDirectory / "1")); } +namespace { + BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment) + { + BlockStoreLocation Location = Store.WriteChunk(String.data(), String.length(), PayloadAlignment); + CHECK(Location.Size == String.length()); + return Location; + }; + + std::string ReadChunkAsString(BlockStore& Store, const BlockStoreLocation& Location) + { + Ref<BlockStoreFile> ChunkBlock(Store.GetChunkBlock(Location)); + if (!ChunkBlock) + { + return ""; + } + IoBuffer ChunkData = ChunkBlock->GetChunk(Location.Offset, Location.Size); + if (!ChunkData) + { + return ""; + } + std::string AsString((const char*)ChunkData.Data(), ChunkData.Size()); + return AsString; + }; + + std::vector<std::filesystem::path> GetDirectoryContent(std::filesystem::path RootDir, bool Files, bool Directories) + { + FileSystemTraversal Traversal; + struct Visitor : public FileSystemTraversal::TreeVisitor + { + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t) override + { + if (Files) + { + Items.push_back(Parent / File); + } + } + + virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& Dir) override + { + if (Directories) + { + Items.push_back(Parent / Dir); + } + return true; + } + + bool Files; + bool Directories; + std::vector<std::filesystem::path> Items; + } Visit; + Visit.Files = Files; + Visit.Directories = Directories; + + Traversal.TraverseFileSystem(RootDir, Visit); + return Visit.Items; + }; + + static IoBuffer CreateChunk(uint64_t Size) + { + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); + } +} // namespace + +TEST_CASE("blockstore.chunks") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory, 128, 1024, {}); + Ref<BlockStoreFile> BadChunk = Store.GetChunkBlock({.BlockIndex = 0, .Offset = 0, .Size = 512}); + CHECK(!BadChunk); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + + CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); + CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); + + std::string ThirdChunkData = + "This is a much longer string that will not fit in the first block so it should be placed in the second block"; + BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4); + CHECK(ThirdChunkLocation.BlockIndex != FirstChunkLocation.BlockIndex); + + CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); + CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); + CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData); +} + +TEST_CASE("blockstore.clean.stray.blocks") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 128, 1024, {}); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + std::string ThirdChunkData = + "This is a much longer string that will not fit in the first block so it should be placed in the second block"; + WriteStringAsChunk(Store, ThirdChunkData, 4); + + Store.Close(); + + // Not referencing the second block means that we should be deleted + Store.Initialize(RootDirectory / "store", 128, 1024, {FirstChunkLocation, SecondChunkLocation}); + + CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1); +} + +TEST_CASE("blockstore.flush.forces.new.block") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 128, 1024, {}); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + Store.Flush(); + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + Store.Flush(); + std::string ThirdChunkData = + "This is a much longer string that will not fit in the first block so it should be placed in the second block"; + WriteStringAsChunk(Store, ThirdChunkData, 4); + + CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 3); +} + +TEST_CASE("blockstore.iterate.chunks") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {}); + Ref<BlockStoreFile> BadChunk = Store.GetChunkBlock({.BlockIndex = 0, .Offset = 0, .Size = 512}); + CHECK(!BadChunk); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + Store.Flush(); + + std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L'); + BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4); + + Store.IterateChunks( + {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation}, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + CHECK(Data); + CHECK(Size > 0); + std::string AsString((const char*)Data, Size); + switch (ChunkIndex) + { + case 0: + CHECK(AsString == FirstChunkData); + break; + case 1: + CHECK(AsString == SecondChunkData); + break; + default: + CHECK(false); + break; + } + }, + [&](size_t ChunkIndex, Ref<BlockStoreFile> BlockFile, uint64_t Offset, uint64_t Size) { + CHECK(BlockFile); + CHECK(ChunkIndex == 2); + CHECK(Offset == VeryLargeChunkLocation.Offset); + CHECK(Size == VeryLargeChunkLocation.Size); + size_t StreamOffset = 0; + BlockFile->StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { + const char* VeryLargeChunkSection = &(VeryLargeChunk.data()[StreamOffset]); + CHECK(memcmp(VeryLargeChunkSection, Data, Size) == 0); + }); + }); +} + +TEST_CASE("blockstore.reclaim.space") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 512, 1024, {}); + + constexpr size_t ChunkCount = 200; + constexpr size_t Alignment = 8; + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<IoHash> ChunkHashes; + ChunkLocations.reserve(ChunkCount); + ChunkHashes.reserve(ChunkCount); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + IoBuffer Chunk = CreateChunk(57 + ChunkIndex); + ChunkLocations.push_back(Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment)); + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + std::vector<size_t> ChunksToKeep; + ChunksToKeep.reserve(ChunkLocations.size()); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + ChunksToKeep.push_back(ChunkIndex); + } + + Store.Flush(); + BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState(); + Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true); + + // If we keep all the chunks we should not get any callbacks on moved/deleted stuff + Store.ReclaimSpace( + State1, + ChunkLocations, + ChunksToKeep, + Alignment, + false, + [](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); }, + []() { + CHECK(false); + return 0; + }); + + size_t DeleteChunkCount = 38; + ChunksToKeep.clear(); + for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex) + { + ChunksToKeep.push_back(ChunkIndex); + } + + std::vector<BlockStoreLocation> NewChunkLocations = ChunkLocations; + size_t MovedChunkCount = 0; + size_t DeletedChunkCount = 0; + Store.ReclaimSpace( + State1, + ChunkLocations, + ChunksToKeep, + Alignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { + for (const auto& MovedChunk : MovedChunks) + { + CHECK(MovedChunk.first >= DeleteChunkCount); + NewChunkLocations[MovedChunk.first] = MovedChunk.second; + } + MovedChunkCount += MovedChunks.size(); + for (size_t DeletedIndex : DeletedChunks) + { + CHECK(DeletedIndex < DeleteChunkCount); + } + DeletedChunkCount += DeletedChunks.size(); + }, + []() { + CHECK(false); + return 0; + }); + CHECK(MovedChunkCount <= DeleteChunkCount); + CHECK(DeletedChunkCount == DeleteChunkCount); + ChunkLocations = std::vector<BlockStoreLocation>(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end()); + + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + Ref<BlockStoreFile> ChunkBlock = Store.GetChunkBlock(NewChunkLocations[ChunkIndex]); + if (ChunkIndex >= DeleteChunkCount) + { + CHECK(ChunkBlock); + IoBuffer VerifyChunk = ChunkBlock->GetChunk(NewChunkLocations[ChunkIndex].Offset, NewChunkLocations[ChunkIndex].Size); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + } + } + + NewChunkLocations = ChunkLocations; + MovedChunkCount = 0; + DeletedChunkCount = 0; + Store.ReclaimSpace( + State1, + ChunkLocations, + {}, + Alignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { + CHECK(MovedChunks.empty()); + DeletedChunkCount += DeletedChunks.size(); + }, + []() { + CHECK(false); + return 0; + }); + CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount); +} + +TEST_CASE("blockstore.thread.read.write") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 1088, 1024, {}); + + constexpr size_t ChunkCount = 1000; + constexpr size_t Alignment = 8; + std::vector<IoBuffer> Chunks; + std::vector<IoHash> ChunkHashes; + Chunks.reserve(ChunkCount); + ChunkHashes.reserve(ChunkCount); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + IoBuffer Chunk = CreateChunk(57 + ChunkIndex / 2); + Chunks.push_back(Chunk); + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + std::vector<BlockStoreLocation> ChunkLocations; + ChunkLocations.resize(ChunkCount); + + WorkerThreadPool WorkerPool(8); + std::atomic<size_t> WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + ChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + + WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + Ref<BlockStoreFile> ChunkBlock = Store.GetChunkBlock(ChunkLocations[ChunkIndex]); + CHECK(ChunkBlock); + IoBuffer VerifyChunk = ChunkBlock->GetChunk(ChunkLocations[ChunkIndex].Offset, ChunkLocations[ChunkIndex].Size); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + + std::vector<BlockStoreLocation> SecondChunkLocations; + SecondChunkLocations.resize(ChunkCount); + WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + SecondChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment); + WorkCompleted.fetch_add(1); + }); + WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + Ref<BlockStoreFile> ChunkBlock = Store.GetChunkBlock(ChunkLocations[ChunkIndex]); + CHECK(ChunkBlock); + IoBuffer VerifyChunk = ChunkBlock->GetChunk(ChunkLocations[ChunkIndex].Offset, ChunkLocations[ChunkIndex].Size); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size() * 2) + { + Sleep(1); + } +} + #endif void diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 920ed965f..7cc742beb 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -48,25 +48,8 @@ struct CasDiskIndexHeader static_assert(sizeof(CasDiskIndexHeader) == 32); namespace { - std::vector<CasDiskIndexEntry> MakeCasDiskEntries(const std::unordered_map<IoHash, BlockStoreDiskLocation>& MovedChunks, - const std::vector<IoHash>& DeletedChunks) - { - std::vector<CasDiskIndexEntry> result; - result.reserve(MovedChunks.size()); - for (const auto& MovedEntry : MovedChunks) - { - result.push_back({.Key = MovedEntry.first, .Location = MovedEntry.second}); - } - for (const IoHash& ChunkHash : DeletedChunks) - { - result.push_back({.Key = ChunkHash, .Flags = CasDiskIndexEntry::kTombstone}); - } - return result; - } - const char* IndexExtension = ".uidx"; const char* LogExtension = ".ulog"; - const char* DataExtension = ".ucas"; std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { @@ -93,22 +76,6 @@ namespace { return GetBasePath(RootPath, ContainerBaseName) / "blocks"; } - std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) - { - ExtendablePathBuilder<256> Path; - - char BlockHexString[9]; - ToHexNumber(BlockIndex, BlockHexString); - - Path.Append(BlocksBasePath); - Path.AppendSeparator(); - Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); - Path.AppendSeparator(); - Path.Append(BlockHexString); - Path.Append(DataExtension); - return Path.ToPath(); - } - std::filesystem::path GetLegacyLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return RootPath / (ContainerBaseName + LogExtension); @@ -116,7 +83,7 @@ namespace { std::filesystem::path GetLegacyDataPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { - return RootPath / (ContainerBaseName + DataExtension); + return RootPath / (ContainerBaseName + ".ucas"); } std::filesystem::path GetLegacyIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) @@ -263,53 +230,12 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint3 CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { - uint32_t WriteBlockIndex; - Ref<BlockStoreFile> WriteBlock; - uint64_t InsertOffset; { - RwLock::ExclusiveLockScope _(m_InsertLock); - - { - RwLock::SharedLockScope __(m_LocationMapLock); - if (m_LocationMap.contains(ChunkHash)) - { - return CasStore::InsertResult{.New = false}; - } - } - - // New entry - - WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - bool IsWriting = m_WriteBlock != nullptr; - if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize) + RwLock::SharedLockScope _(m_LocationMapLock); + if (m_LocationMap.contains(ChunkHash)) { - if (m_WriteBlock) - { - m_WriteBlock = nullptr; - } - { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) - { - throw std::runtime_error( - fmt::format("unable to allocate a new block in '{}'", m_Config.RootDirectory / m_ContainerBaseName)); - } - WriteBlockIndex += IsWriting ? 1 : 0; - while (m_ChunkBlocks.contains(WriteBlockIndex)) - { - WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; - } - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); - m_WriteBlock = new BlockStoreFile(BlockPath); - m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; - m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); - } - m_CurrentInsertOffset = 0; - m_WriteBlock->Create(m_MaxBlockSize); + return CasStore::InsertResult{.New = false}; } - InsertOffset = m_CurrentInsertOffset; - m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment); - WriteBlock = m_WriteBlock; } // We can end up in a situation that InsertChunk writes the same chunk data in @@ -324,17 +250,15 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const // This should be a rare occasion and the current flow reduces the time we block for // reads, insert and GC. - BlockStoreDiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment); - const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; - - WriteBlock->Write(ChunkData, ChunkSize, InsertOffset); + BlockStoreLocation Location = m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment); + BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); + const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; m_CasLog.Append(IndexEntry); - - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst); { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - m_LocationMap.emplace(ChunkHash, Location); + RwLock::ExclusiveLockScope _(m_LocationMapLock); + m_LocationMap.emplace(ChunkHash, DiskLocation); } + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); return CasStore::InsertResult{.New = true}; } @@ -348,20 +272,21 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { - Ref<BlockStoreFile> ChunkBlock; - BlockStoreLocation Location; + RwLock::SharedLockScope _(m_LocationMapLock); + auto KeyIt = m_LocationMap.find(ChunkHash); + if (KeyIt == m_LocationMap.end()) { - RwLock::SharedLockScope _(m_LocationMapLock); - if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) - { - Location = KeyIt->second.Get(m_PayloadAlignment); - ChunkBlock = m_ChunkBlocks[Location.BlockIndex]; - } - else - { - return IoBuffer(); - } + return IoBuffer(); + } + BlockStoreLocation Location = KeyIt->second.Get(m_PayloadAlignment); + _.ReleaseNow(); + + Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); + if (!ChunkBlock) + { + return IoBuffer(); } + return ChunkBlock->GetChunk(Location.Offset, Location.Size); } @@ -388,128 +313,94 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) void CasContainerStrategy::Flush() { - { - RwLock::ExclusiveLockScope _(m_InsertLock); - if (m_CurrentInsertOffset > 0) - { - uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; - m_WriteBlock = nullptr; - m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); - m_CurrentInsertOffset = 0; - } - } + m_BlockStore.Flush(); MakeIndexSnapshot(); } void CasContainerStrategy::Scrub(ScrubContext& Ctx) { - std::vector<CasDiskIndexEntry> BadChunks; - - // We do a read sweep through the payloads file and validate - // any entries that are contained within each segment, with - // the assumption that most entries will be checked in this - // pass. An alternative strategy would be to use memory mapping. + RwLock::SharedLockScope _(m_LocationMapLock); + uint64_t TotalChunkCount = m_LocationMap.size(); + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<IoHash> ChunkIndexToChunkHash; + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); { - std::vector<CasDiskIndexEntry> BigChunks; - const uint64_t WindowSize = 4 * 1024 * 1024; - IoBuffer ReadBuffer{WindowSize}; - void* BufferBase = ReadBuffer.MutableData(); - - RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time? - RwLock::SharedLockScope __(m_LocationMapLock); - - for (const auto& Block : m_ChunkBlocks) + for (const auto& Entry : m_LocationMap) { - uint64_t WindowStart = 0; - uint64_t WindowEnd = WindowSize; - const Ref<BlockStoreFile>& BlockFile = Block.second; - BlockFile->Open(); - const uint64_t FileSize = BlockFile->FileSize(); - - do - { - const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); - BlockFile->Read(BufferBase, ChunkSize, WindowStart); - - for (auto& Entry : m_LocationMap) - { - const BlockStoreLocation Location = Entry.second.Get(m_PayloadAlignment); - const uint64_t EntryOffset = Location.Offset; + const IoHash& ChunkHash = Entry.first; + const BlockStoreDiskLocation& DiskLocation = Entry.second; + BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); + size_t ChunkIndex = ChunkLocations.size(); - if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) - { - const uint64_t EntryEnd = EntryOffset + Location.Size; - - if (EntryEnd >= WindowEnd) - { - BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); - - continue; - } - - const IoHash ComputedHash = - IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Location.Offset - WindowStart, Location.Size); - - if (Entry.first != ComputedHash) - { - // Hash mismatch - BadChunks.push_back({.Key = Entry.first, .Location = Entry.second, .Flags = CasDiskIndexEntry::kTombstone}); - } - } - } - - WindowStart += WindowSize; - WindowEnd += WindowSize; - } while (WindowStart < FileSize); + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; } + } - // Deal with large chunks - - for (const CasDiskIndexEntry& Entry : BigChunks) - { - IoHashStream Hasher; - const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment); - const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex]; - BlockFile->StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); - IoHash ComputedHash = Hasher.GetHash(); + std::vector<IoHash> BadKeys; - if (Entry.Key != ComputedHash) + m_BlockStore.IterateChunks( + ChunkLocations, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + const IoHash ComputedHash = IoHash::HashBuffer(Data, Size); + const IoHash& ExpectedHash = ChunkIndexToChunkHash[ChunkIndex]; + if (ComputedHash != ExpectedHash) { - BadChunks.push_back({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone}); + // Hash mismatch + BadKeys.push_back(ExpectedHash); } - } - } + }, + [&](size_t ChunkIndex, Ref<BlockStoreFile> BlockFile, uint64_t Offset, uint64_t Size) { + IoHashStream Hasher; + BlockFile->StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); + IoHash ComputedHash = Hasher.GetHash(); + const IoHash& ExpectedHash = ChunkIndexToChunkHash[ChunkIndex]; + if (ComputedHash != ExpectedHash) + { + // Hash mismatch + BadKeys.push_back(ExpectedHash); + } + }); - if (BadChunks.empty()) + if (BadKeys.empty()) { return; } - ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_Config.RootDirectory / m_ContainerBaseName); + ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_Config.RootDirectory / m_ContainerBaseName); - // Deal with bad chunks by removing them from our lookup map + _.ReleaseNow(); - std::vector<IoHash> BadChunkHashes; - BadChunkHashes.reserve(BadChunks.size()); - - m_CasLog.Append(BadChunks); + if (Ctx.RunRecovery()) { - RwLock::ExclusiveLockScope _(m_LocationMapLock); - for (const CasDiskIndexEntry& Entry : BadChunks) + // Deal with bad chunks by removing them from our lookup map + + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(BadKeys.size()); { - BadChunkHashes.push_back(Entry.Key); - m_LocationMap.erase(Entry.Key); + RwLock::ExclusiveLockScope __(m_LocationMapLock); + for (const IoHash& ChunkHash : BadKeys) + { + const auto KeyIt = m_LocationMap.find(ChunkHash); + if (KeyIt == m_LocationMap.end()) + { + // Might have been GC'd + continue; + } + LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); + m_LocationMap.erase(KeyIt); + } } + m_CasLog.Append(LogEntries); } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do - - Ctx.ReportBadCasChunks(BadChunkHashes); + Ctx.ReportBadCasChunks(BadKeys); } void @@ -533,93 +424,33 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) // We update the index as we complete each new block file. This makes it possible // to break the GC if we want to limit time for execution. // - // GC can fairly parallell to regular operation - it will block while taking - // a snapshot of the current m_LocationMap state. - // - // While moving blocks it will do a blocking operation and update the m_LocationMap - // after each new block is written and figuring out the path to the next new block. + // GC can very parallell to regular operation - it will block while taking + // a snapshot of the current m_LocationMap state and while moving blocks it will + // do a blocking operation and update the m_LocationMap after each new block is + // written and figuring out the path to the next new block. ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); + uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; - uint64_t TotalChunkCount = 0; - uint64_t DeletedSize = 0; - uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); - std::vector<IoHash> DeletedChunks; - uint64_t MovedCount = 0; - - Stopwatch TotalTimer; - const auto _ = MakeGuard([this, - &TotalTimer, - &WriteBlockTimeUs, - &WriteBlockLongestTimeUs, - &ReadBlockTimeUs, - &ReadBlockLongestTimeUs, - &TotalChunkCount, - &DeletedChunks, - &MovedCount, - &DeletedSize, - OldTotalSize] { - ZEN_INFO( - "garbage collect for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " - "#{} " - "of #{} " - "chunks ({}).", - m_Config.RootDirectory / m_ContainerBaseName, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs), - NiceBytes(DeletedSize), - DeletedChunks.size(), - MovedCount, - TotalChunkCount, - NiceBytes(OldTotalSize)); - }); - - LocationMap_t LocationMap; - size_t BlockCount; - uint64_t ExcludeBlockIndex = 0x800000000ull; + LocationMap_t LocationMap; + BlockStore::ReclaimSnapshotState BlockStoreState; { - RwLock::SharedLockScope __(m_InsertLock); RwLock::SharedLockScope ___(m_LocationMapLock); - { - Stopwatch Timer; - const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (m_WriteBlock) - { - ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - } - __.ReleaseNow(); - } - LocationMap = m_LocationMap; - BlockCount = m_ChunkBlocks.size(); - } - - if (LocationMap.empty()) - { - ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_Config.RootDirectory / m_ContainerBaseName); - return; + Stopwatch Timer; + const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + LocationMap = m_LocationMap; + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); } - TotalChunkCount = LocationMap.size(); - - std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; - std::vector<std::vector<IoHash>> KeepChunks; - std::vector<std::vector<IoHash>> DeleteChunks; - - BlockIndexToChunkMapIndex.reserve(BlockCount); - KeepChunks.reserve(BlockCount); - DeleteChunks.reserve(BlockCount); - size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; + uint64_t TotalChunkCount = LocationMap.size(); std::vector<IoHash> TotalChunkHashes; TotalChunkHashes.reserve(TotalChunkCount); @@ -628,272 +459,82 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) TotalChunkHashes.push_back(Entry.first); } - uint64_t DeleteCount = 0; + std::vector<BlockStoreLocation> ChunkLocations; + BlockStore::ChunkIndexArray KeepChunkIndexes; + std::vector<IoHash> ChunkIndexToChunkHash; + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); - uint64_t NewTotalSize = 0; GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { - auto KeyIt = LocationMap.find(ChunkHash); - const BlockStoreDiskLocation& Location = KeyIt->second; - uint32_t BlockIndex = Location.GetBlockIndex(); + auto KeyIt = LocationMap.find(ChunkHash); + const BlockStoreDiskLocation& DiskLocation = KeyIt->second; + BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); + size_t ChunkIndex = ChunkLocations.size(); - if (static_cast<uint64_t>(BlockIndex) == ExcludeBlockIndex) - { - return; - } - - auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex); - size_t ChunkMapIndex = 0; - if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) - { - ChunkMapIndex = KeepChunks.size(); - BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex; - KeepChunks.resize(ChunkMapIndex + 1); - KeepChunks.back().reserve(GuesstimateCountPerBlock); - DeleteChunks.resize(ChunkMapIndex + 1); - DeleteChunks.back().reserve(GuesstimateCountPerBlock); - } - else - { - ChunkMapIndex = BlockIndexPtr->second; - } + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; if (Keep) { - std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex]; - ChunkMap.push_back(ChunkHash); - NewTotalSize += Location.GetSize(); - } - else - { - std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; - ChunkMap.push_back(ChunkHash); - DeleteCount++; + KeepChunkIndexes.push_back(ChunkIndex); } }); - std::unordered_set<uint32_t> BlocksToReWrite; - BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); - for (const auto& Entry : BlockIndexToChunkMapIndex) - { - uint32_t BlockIndex = Entry.first; - size_t ChunkMapIndex = Entry.second; - const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; - if (ChunkMap.empty()) - { - continue; - } - BlocksToReWrite.insert(BlockIndex); - } - const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { - uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); - ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}", - m_Config.RootDirectory / m_ContainerBaseName, - DeleteCount, - NiceBytes(TotalSize - NewTotalSize), - TotalChunkCount, - NiceBytes(TotalSize)); + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); return; } - // Move all chunks in blocks that have chunks removed to new blocks - - Ref<BlockStoreFile> NewBlockFile; - uint64_t WriteOffset = 0; - uint32_t NewBlockIndex = 0; - DeletedChunks.reserve(DeleteCount); - - auto UpdateLocations = [this](const std::span<CasDiskIndexEntry>& Entries) { - for (const CasDiskIndexEntry& Entry : Entries) - { - if (Entry.Flags & CasDiskIndexEntry::kTombstone) + std::vector<IoHash> DeletedChunks; + m_BlockStore.ReclaimSpace( + BlockStoreState, + ChunkLocations, + KeepChunkIndexes, + m_PayloadAlignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); + for (const auto& Entry : MovedChunks) { - auto KeyIt = m_LocationMap.find(Entry.Key); - uint64_t ChunkSize = KeyIt->second.GetSize(); - m_TotalSize.fetch_sub(ChunkSize); - m_LocationMap.erase(KeyIt); - continue; + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}}); + } + for (const size_t ChunkIndex : RemovedChunks) + { + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const BlockStoreDiskLocation& OldDiskLocation = LocationMap[ChunkHash]; + LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone}); + DeletedChunks.push_back(ChunkHash); } - m_LocationMap[Entry.Key] = Entry.Location; - } - }; - - std::unordered_map<IoHash, BlockStoreDiskLocation> MovedBlockChunks; - for (uint32_t BlockIndex : BlocksToReWrite) - { - const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; - - Ref<BlockStoreFile> OldBlockFile; - { - RwLock::SharedLockScope _i(m_LocationMapLock); - OldBlockFile = m_ChunkBlocks[BlockIndex]; - } - const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex]; - if (KeepMap.empty()) - { - const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; - std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries({}, DeleteMap); m_CasLog.Append(LogEntries); m_CasLog.Flush(); { - RwLock::ExclusiveLockScope _i(m_LocationMapLock); + RwLock::ExclusiveLockScope __(m_LocationMapLock); Stopwatch Timer; - const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); - UpdateLocations(LogEntries); - m_ChunkBlocks[BlockIndex] = nullptr; - } - DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); - ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", - m_ContainerBaseName, - BlockIndex, - OldBlockFile->GetPath()); - std::error_code Ec; - OldBlockFile->MarkAsDeleteOnClose(Ec); - if (Ec) - { - ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); - } - continue; - } - - std::vector<uint8_t> Chunk; - for (const IoHash& ChunkHash : KeepMap) - { - auto KeyIt = LocationMap.find(ChunkHash); - const BlockStoreLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment); - Chunk.resize(ChunkLocation.Size); - OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - - if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) - { - uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); - std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, {}); - m_CasLog.Append(LogEntries); - m_CasLog.Flush(); - - if (NewBlockFile) - { - NewBlockFile->Truncate(WriteOffset); - NewBlockFile->Flush(); - } - { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - UpdateLocations(LogEntries); - if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) - { - ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", - m_Config.RootDirectory / m_ContainerBaseName, - static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); - return; - } - while (m_ChunkBlocks.contains(NextBlockIndex)) - { - NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; - } - std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); - NewBlockFile = new BlockStoreFile(NewBlockPath); - m_ChunkBlocks[NextBlockIndex] = NewBlockFile; - } - - MovedCount += MovedBlockChunks.size(); - MovedBlockChunks.clear(); - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); - if (Error) + for (const CasDiskIndexEntry& Entry : LogEntries) { - ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_Config.RootDirectory, Error.message()); - return; - } - if (Space.Free < m_MaxBlockSize) - { - uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve(); - if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + if (Entry.Flags & CasDiskIndexEntry::kTombstone) { - ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", - m_Config.RootDirectory / m_ContainerBaseName, - m_MaxBlockSize, - NiceBytes(Space.Free + ReclaimedSpace)); - RwLock::ExclusiveLockScope _l(m_LocationMapLock); - Stopwatch Timer; - const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - m_ChunkBlocks.erase(NextBlockIndex); - return; + m_LocationMap.erase(Entry.Key); + uint64_t ChunkSize = Entry.Location.GetSize(); + m_TotalSize.fetch_sub(ChunkSize); + continue; } - - ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", - m_Config.RootDirectory / m_ContainerBaseName, - ReclaimedSpace, - NiceBytes(Space.Free + ReclaimedSpace)); + m_LocationMap[Entry.Key] = Entry.Location; } - NewBlockFile->Create(m_MaxBlockSize); - NewBlockIndex = NextBlockIndex; - WriteOffset = 0; } - - NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); - MovedBlockChunks.emplace( - ChunkHash, - BlockStoreDiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, m_PayloadAlignment)); - WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); - } - Chunk.clear(); - if (NewBlockFile) - { - NewBlockFile->Truncate(WriteOffset); - NewBlockFile->Flush(); - NewBlockFile = {}; - } - - const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; - std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap); - m_CasLog.Append(LogEntries); - m_CasLog.Flush(); - { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - UpdateLocations(LogEntries); - m_ChunkBlocks[BlockIndex] = nullptr; - } - MovedCount += MovedBlockChunks.size(); - DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); - MovedBlockChunks.clear(); - - ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_ContainerBaseName, BlockIndex, OldBlockFile->GetPath()); - std::error_code Ec; - OldBlockFile->MarkAsDeleteOnClose(Ec); - if (Ec) - { - ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); - } - OldBlockFile = nullptr; - } - - for (const IoHash& ChunkHash : DeletedChunks) - { - DeletedSize += LocationMap[ChunkHash].GetSize(); - } + }, + [&GcCtx]() { return GcCtx.CollectSmallObjects(); }); GcCtx.DeletedCas(DeletedChunks); } @@ -904,7 +545,7 @@ CasContainerStrategy::MakeIndexSnapshot() ZEN_INFO("write store snapshot for '{}'", m_Config.RootDirectory / m_ContainerBaseName); uint64_t EntryCount = 0; Stopwatch Timer; - const auto _ = MakeGuard([this, &EntryCount, &Timer] { + const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", m_Config.RootDirectory / m_ContainerBaseName, EntryCount, @@ -935,7 +576,6 @@ CasContainerStrategy::MakeIndexSnapshot() std::vector<CasDiskIndexEntry> Entries; { - RwLock::SharedLockScope __(m_InsertLock); RwLock::SharedLockScope ___(m_LocationMapLock); Entries.resize(m_LocationMap.size()); @@ -990,7 +630,7 @@ CasContainerStrategy::ReadIndexFile() if (std::filesystem::is_regular_file(IndexPath)) { Stopwatch Timer; - const auto _ = MakeGuard([this, &Entries, &Timer] { + const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing #{} entries in {}", m_Config.RootDirectory / m_ContainerBaseName, Entries.size(), @@ -1043,7 +683,7 @@ CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) if (std::filesystem::is_regular_file(LogPath)) { Stopwatch Timer; - const auto _ = MakeGuard([this, &Entries, &Timer] { + const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing #{} entries in {}", m_Config.RootDirectory / m_ContainerBaseName, Entries.size(), @@ -1103,7 +743,7 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) uint32_t MigratedBlockCount = 0; Stopwatch MigrationTimer; uint64_t TotalSize = 0; - const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] { + const auto _ = MakeGuard([&] { ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", m_Config.RootDirectory / m_ContainerBaseName, MigratedChunkCount, @@ -1112,32 +752,13 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) NiceBytes(TotalSize)); }); - uint32_t WriteBlockIndex = 0; - while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + uint64_t BlockFileSize = 0; { - ++WriteBlockIndex; + BasicFile BlockFile; + BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + BlockFileSize = BlockFile.FileSize(); } - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); - if (Error) - { - ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message()); - return 0; - } - - if (Space.Free < m_MaxBlockSize) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", - m_Config.RootDirectory / m_ContainerBaseName, - m_MaxBlockSize, - NiceBytes(Space.Free)); - return 0; - } - - BasicFile BlockFile; - BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; uint64_t InvalidEntryCount = 0; @@ -1145,7 +766,7 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); { Stopwatch Timer; - const auto __ = MakeGuard([this, &LegacyDiskIndex, &Timer] { + const auto __ = MakeGuard([&] { ZEN_INFO("read store '{}' legacy log containing #{} entries in {}", m_Config.RootDirectory / m_ContainerBaseName, LegacyDiskIndex.size(), @@ -1173,7 +794,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) 0); std::vector<IoHash> BadEntries; - uint64_t BlockFileSize = BlockFile.FileSize(); for (const auto& Entry : LegacyDiskIndex) { const LegacyCasDiskIndexEntry& Record(Entry.second); @@ -1199,7 +819,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) if (LegacyDiskIndex.empty()) { - BlockFile.Close(); LegacyCasLog.Close(); if (CleanSource) { @@ -1218,219 +837,75 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) return 0; } - for (const auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskIndexEntry& Record(Entry.second); - TotalSize += Record.Location.GetSize(); - } - - uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size()); - uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize; - if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", - m_Config.RootDirectory / m_ContainerBaseName, - MaxRequiredBlockCount, - BlockStoreDiskLocation::MaxBlockIndex); - return 0; - } - - constexpr const uint64_t DiskReserve = 1ul << 28; - - if (CleanSource) - { - if (Space.Free < (m_MaxBlockSize + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - m_Config.RootDirectory / m_ContainerBaseName, - NiceBytes(m_MaxBlockSize + DiskReserve), - NiceBytes(Space.Free)); - return 0; - } - } - else - { - if (Space.Free < (RequiredDiskSpace + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - m_Config.RootDirectory / m_ContainerBaseName, - NiceBytes(RequiredDiskSpace + DiskReserve), - NiceBytes(Space.Free)); - return 0; - } - } - std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); CreateDirectories(LogPath.parent_path()); TCasLogFile<CasDiskIndexEntry> CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - if (CleanSource && (MaxRequiredBlockCount < 2)) - { - std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(LegacyDiskIndex.size()); - - // We can use the block as is, just move it and add the blocks to our new log - for (auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskIndexEntry& Record(Entry.second); - - BlockStoreLocation NewChunkLocation{WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()}; - BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment); - LogEntries.push_back( - {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags}); - } - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); - CreateDirectories(BlockPath.parent_path()); - BlockFile.Close(); - std::filesystem::rename(LegacyDataPath, BlockPath); - CasLog.Append(LogEntries); - for (const CasDiskIndexEntry& Entry : LogEntries) - { - m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); - } - - MigratedChunkCount += LogEntries.size(); - MigratedBlockCount++; - } - else + std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash; + std::vector<BlockStoreLocation> ChunkLocations; + ChunkIndexToChunkHash.reserve(LegacyDiskIndex.size()); + ChunkLocations.reserve(LegacyDiskIndex.size()); + for (const auto& Entry : LegacyDiskIndex) { - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(LegacyDiskIndex.size()); - for (const auto& Entry : LegacyDiskIndex) - { - ChunkHashes.push_back(Entry.first); - } - - std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { - auto LhsKeyIt = LegacyDiskIndex.find(Lhs); - auto RhsKeyIt = LegacyDiskIndex.find(Rhs); - return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset(); - }); - - uint64_t BlockSize = 0; - uint64_t BlockOffset = 0; - std::vector<BlockStoreLocation> NewLocations; - struct BlockData - { - std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; - uint64_t BlockOffset; - uint64_t BlockSize; - uint32_t BlockIndex; - }; - - std::vector<BlockData> BlockRanges; - std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; - BlockRanges.reserve(MaxRequiredBlockCount); - for (const IoHash& ChunkHash : ChunkHashes) - { - const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; - const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location; - - uint64_t ChunkOffset = LegacyChunkLocation.GetOffset(); - uint64_t ChunkSize = LegacyChunkLocation.GetSize(); - uint64_t ChunkEnd = ChunkOffset + ChunkSize; - - if (BlockSize == 0) - { - BlockOffset = ChunkOffset; - } - if ((ChunkEnd - BlockOffset) > m_MaxBlockSize) - { - BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; - BlockRange.Chunks.swap(Chunks); - BlockRanges.push_back(BlockRange); - - WriteBlockIndex++; - while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) - { - ++WriteBlockIndex; - } - BlockOffset = ChunkOffset; - BlockSize = 0; - } - BlockSize = RoundUp(BlockSize, m_PayloadAlignment); - BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; - Chunks.push_back({ChunkHash, ChunkLocation}); - BlockSize = ChunkEnd - BlockOffset; - } - if (BlockSize > 0) - { - BlockRanges.push_back( - {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); - } - Stopwatch WriteBlockTimer; - - std::reverse(BlockRanges.begin(), BlockRanges.end()); - std::vector<std::uint8_t> Buffer(1 << 28); - for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) - { - const BlockData& BlockRange = BlockRanges[Idx]; - if (Idx > 0) - { - uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; - uint64_t Completed = BlockOffset + BlockSize - Remaining; - uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; - - ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", - m_Config.RootDirectory / m_ContainerBaseName, - Idx, - BlockRanges.size(), - NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), - NiceBytes(BlockOffset + BlockSize), - NiceTimeSpanMs(ETA)); - } - - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); - BlockStoreFile ChunkBlock(BlockPath); - ChunkBlock.Create(BlockRange.BlockSize); - uint64_t Offset = 0; - while (Offset < BlockRange.BlockSize) - { - uint64_t Size = BlockRange.BlockSize - Offset; - if (Size > Buffer.size()) - { - Size = Buffer.size(); - } - BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); - ChunkBlock.Write(Buffer.data(), Size, Offset); - Offset += Size; - } - ChunkBlock.Truncate(Offset); - ChunkBlock.Flush(); - + const LegacyCasDiskLocation& Location = Entry.second.Location; + const IoHash& ChunkHash = Entry.first; + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.GetOffset(), .Size = Location.GetSize()}); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + TotalSize += Location.GetSize(); + } + m_BlockStore.Split( + ChunkLocations, + LegacyDataPath, + m_BlocksBasePath, + m_MaxBlockSize, + BlockStoreDiskLocation::MaxBlockIndex + 1, + m_PayloadAlignment, + CleanSource, + [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount]( + const BlockStore::MovedChunksArray& MovedChunks) { std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) + LogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) { - const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first]; - BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment); - LogEntries.push_back( - {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags}); + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + LogEntries.push_back({.Key = ChunkHash, + .Location = {NewLocation, m_PayloadAlignment}, + .ContentType = OldEntry.ContentType, + .Flags = OldEntry.Flags}); } - CasLog.Append(LogEntries); for (const CasDiskIndexEntry& Entry : LogEntries) { m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); } - MigratedChunkCount += LogEntries.size(); - MigratedBlockCount++; - + CasLog.Append(LogEntries); + CasLog.Flush(); if (CleanSource) { std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries; - LegacyLogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) + LegacyLogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) { - LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone}); + size_t ChunkIndex = Entry.first; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + LegacyLogEntries.push_back( + LegacyCasDiskIndexEntry{.Key = ChunkHash, + .Location = OldEntry.Location, + .ContentType = OldEntry.ContentType, + .Flags = (uint8_t)(OldEntry.Flags | LegacyCasDiskIndexEntry::kTombstone)}); } LegacyCasLog.Append(LegacyLogEntries); - BlockFile.SetFileSize(BlockRange.BlockOffset); + LegacyCasLog.Flush(); } - } - } + MigratedBlockCount++; + MigratedChunkCount += MovedChunks.size(); + }); - BlockFile.Close(); LegacyCasLog.Close(); CasLog.Close(); @@ -1480,67 +955,16 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - std::unordered_set<uint32_t> KnownBlocks; + std::vector<BlockStoreLocation> KnownLocations; + KnownLocations.reserve(m_LocationMap.size()); for (const auto& Entry : m_LocationMap) { const BlockStoreDiskLocation& Location = Entry.second; - m_TotalSize.fetch_add(Location.GetSize(), std::memory_order_seq_cst); - KnownBlocks.insert(Location.GetBlockIndex()); + m_TotalSize.fetch_add(Location.GetSize(), std::memory_order::relaxed); + KnownLocations.push_back(Location.Get(m_PayloadAlignment)); } - if (std::filesystem::is_directory(m_BlocksBasePath)) - { - std::vector<std::filesystem::path> FoldersToScan; - FoldersToScan.push_back(m_BlocksBasePath); - size_t FolderOffset = 0; - while (FolderOffset < FoldersToScan.size()) - { - for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) - { - if (Entry.is_directory()) - { - FoldersToScan.push_back(Entry.path()); - continue; - } - if (Entry.is_regular_file()) - { - const std::filesystem::path Path = Entry.path(); - if (Path.extension() != DataExtension) - { - continue; - } - std::string FileName = Path.stem().string(); - uint32_t BlockIndex; - bool OK = ParseHexNumber(FileName, BlockIndex); - if (!OK) - { - continue; - } - if (!KnownBlocks.contains(BlockIndex)) - { - // Log removing unreferenced block - // Clear out unused blocks - ZEN_INFO("removing unused block for '{}' at '{}'", m_ContainerBaseName, Path); - std::error_code Ec; - std::filesystem::remove(Path, Ec); - if (Ec) - { - ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); - } - continue; - } - Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path); - BlockFile->Open(); - m_ChunkBlocks[BlockIndex] = BlockFile; - } - } - ++FolderOffset; - } - } - else - { - CreateDirectories(m_BlocksBasePath); - } + m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0)) { @@ -2195,7 +1619,7 @@ TEST_CASE("compactcas.legacyconversion") Gc.CollectGarbage(GcCtx); } - std::filesystem::path BlockPath = GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1); + std::filesystem::path BlockPath = BlockStore::GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1); std::filesystem::path LegacyDataPath = GetLegacyDataPath(CasConfig.RootDirectory, "test"); std::filesystem::rename(BlockPath, LegacyDataPath); @@ -2463,7 +1887,13 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { CHECK(Cas.HaveChunk(ChunkHash)); - CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + if (ChunkHash != IoHash::HashBuffer(Cas.FindChunk(ChunkHash))) + { + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + CHECK(Buffer); + IoHash BufferHash = IoHash::HashBuffer(Buffer); + CHECK(ChunkHash == BufferHash); + } WorkCompleted.fetch_add(1); }); } diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h index 11da37202..114a6a48c 100644 --- a/zenstore/compactcas.h +++ b/zenstore/compactcas.h @@ -78,17 +78,12 @@ private: TCasLogFile<CasDiskIndexEntry> m_CasLog; std::string m_ContainerBaseName; std::filesystem::path m_BlocksBasePath; + BlockStore m_BlockStore; RwLock m_LocationMapLock; typedef std::unordered_map<IoHash, BlockStoreDiskLocation, IoHash::Hasher> LocationMap_t; LocationMap_t m_LocationMap; - std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks; - RwLock m_InsertLock; // used to serialize inserts - Ref<BlockStoreFile> m_WriteBlock; - std::uint64_t m_CurrentInsertOffset = 0; - - std::atomic_uint32_t m_WriteBlockIndex{}; std::atomic_uint64_t m_TotalSize{}; }; diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index b53cfaa54..d074a906f 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -92,7 +92,7 @@ FileCasStrategy::Initialize(bool IsNewStore) m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); Stopwatch Timer; - const auto _ = MakeGuard([this, &Timer] { + const auto _ = MakeGuard([&] { ZEN_INFO("read log {} containing {}", m_Config.RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed))); }); @@ -692,7 +692,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); Stopwatch TotalTimer; - const auto _ = MakeGuard([this, &TotalTimer, &DeletedCount, &ChunkCount, OldTotalSize] { + const auto _ = MakeGuard([&] { ZEN_INFO("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}", m_Config.RootDirectory, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index 856f9af02..4b50668d9 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -76,7 +76,7 @@ namespace { return MakeErrorCodeFromLastError(); } bool Keep = true; - auto _ = MakeGuard([FileHandle, &Keep, Path]() { + auto _ = MakeGuard([&]() { ::CloseHandle(FileHandle); if (!Keep) { @@ -105,7 +105,7 @@ namespace { } bool Keep = true; - auto _ = MakeGuard([Fd, &Keep, Path]() { + auto _ = MakeGuard([&]() { close(Fd); if (!Keep) { @@ -212,9 +212,8 @@ GcContext::ContributeCas(std::span<const IoHash> Cas) } void -GcContext::ContributeCacheKeys(const std::string& Bucket, std::vector<IoHash> ValidKeys, std::vector<IoHash> ExpiredKeys) +GcContext::ContributeCacheKeys(const std::string& Bucket, std::vector<IoHash>&& ExpiredKeys) { - m_State->m_CacheBuckets[Bucket].ValidKeys = std::move(ValidKeys); m_State->m_CacheBuckets[Bucket].ExpiredKeys = std::move(ExpiredKeys); } @@ -255,12 +254,6 @@ GcContext::DeletedCas() } std::span<const IoHash> -GcContext::ValidCacheKeys(const std::string& Bucket) const -{ - return m_State->m_CacheBuckets[Bucket].ValidKeys; -} - -std::span<const IoHash> GcContext::ExpiredCacheKeys(const std::string& Bucket) const { return m_State->m_CacheBuckets[Bucket].ExpiredKeys; @@ -399,7 +392,7 @@ CasGc::CollectGarbage(GcContext& GcCtx) // First gather reference set { Stopwatch Timer; - const auto Guard = MakeGuard([this, &Timer] { ZEN_INFO("gathered references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + const auto Guard = MakeGuard([&] { ZEN_INFO("gathered references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); for (GcContributor* Contributor : m_GcContribs) { Contributor->GatherReferences(GcCtx); @@ -440,7 +433,7 @@ CasGc::CollectGarbage(GcContext& GcCtx) { Stopwatch Timer; - const auto Guard = MakeGuard([this, &Timer] { ZEN_INFO("collected garbage in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + const auto Guard = MakeGuard([&] { ZEN_INFO("collected garbage in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); for (GcStorage* Storage : m_GcStorage) { Storage->CollectGarbage(GcCtx); @@ -452,8 +445,7 @@ CasGc::CollectGarbage(GcContext& GcCtx) if (CidStore* CidStore = m_CidStore) { Stopwatch Timer; - const auto Guard = - MakeGuard([this, &Timer] { ZEN_INFO("clean up deleted content ids in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + const auto Guard = MakeGuard([&] { ZEN_INFO("clean up deleted content ids in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); CidStore->RemoveCids(GcCtx.DeletedCas()); } } @@ -679,8 +671,7 @@ GcScheduler::SchedulerThread() NiceTimeSpanMs(uint64_t(std::chrono::duration_cast<std::chrono::milliseconds>(GcCtx.MaxCacheDuration()).count()))); { Stopwatch Timer; - const auto __ = - MakeGuard([this, &Timer] { ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + const auto __ = MakeGuard([&] { ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); m_CasGc.CollectGarbage(GcCtx); diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h index 424db461a..e330cc080 100644 --- a/zenstore/include/zenstore/blockstore.h +++ b/zenstore/include/zenstore/blockstore.h @@ -6,6 +6,9 @@ #include <zencore/zencore.h> #include <zenstore/basicfile.h> +#include <unordered_map> +#include <unordered_set> + namespace zen { ////////////////////////////////////////////////////////////////////////// @@ -15,6 +18,8 @@ struct BlockStoreLocation uint32_t BlockIndex; uint64_t Offset; uint64_t Size; + + inline auto operator<=>(const BlockStoreLocation& Rhs) const = default; }; #pragma pack(push) @@ -91,6 +96,7 @@ struct BlockStoreFile : public RefCounted void Write(const void* Data, uint64_t Size, uint64_t FileOffset); void Truncate(uint64_t Size); void Flush(); + BasicFile& GetBasicFile(); void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); private: @@ -99,6 +105,75 @@ private: BasicFile m_File; }; +class BlockStore +{ +public: + struct ReclaimSnapshotState + { + size_t ExcludeBlockIndex; + size_t BlockCount; + }; + + typedef std::vector<std::pair<size_t, BlockStoreLocation>> MovedChunksArray; + typedef std::vector<size_t> ChunkIndexArray; + + typedef std::function<void(const MovedChunksArray& MovedChunks, const ChunkIndexArray& RemovedChunks)> ReclaimCallback; + typedef std::function<uint64_t()> ClaimDiskReserveCallback; + typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback; + typedef std::function<void(size_t ChunkIndex, Ref<BlockStoreFile> BlockFile, uint64_t Offset, uint64_t Size)> + IterateChunksLargeSizeCallback; + typedef std::function<void(const MovedChunksArray& MovedChunks)> SplitCallback; + + void Initialize(const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount, + const std::vector<BlockStoreLocation>& KnownLocations); + void Close(); + + BlockStoreLocation WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment); + + Ref<BlockStoreFile> GetChunkBlock(const BlockStoreLocation& Location); + void Flush(); + + ReclaimSnapshotState GetReclaimSnapshotState(); + void ReclaimSpace( + const ReclaimSnapshotState& Snapshot, + const std::vector<BlockStoreLocation>& ChunkLocations, + const ChunkIndexArray& KeepChunkIndexes, + uint64_t PayloadAlignment, + bool DryRun, + const ReclaimCallback& ChangeCallback = [](const MovedChunksArray&, const ChunkIndexArray&) {}, + const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; }); + + void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, + IterateChunksSmallSizeCallback SmallSizeCallback, + IterateChunksLargeSizeCallback LargeSizeCallback); + + static bool Split(const std::vector<BlockStoreLocation>& ChunkLocations, + const std::filesystem::path& SourceBlockFilePath, + const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount, + size_t PayloadAlignment, + bool CleanSource, + const SplitCallback& Callback); + + static const char* GetBlockFileExtension(); + static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex); + +private: + std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks; + + RwLock m_InsertLock; // used to serialize inserts + Ref<BlockStoreFile> m_WriteBlock; + std::uint64_t m_CurrentInsertOffset = 0; + std::atomic_uint32_t m_WriteBlockIndex{}; + + uint64_t m_MaxBlockSize = 1u << 28; + uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1; + std::filesystem::path m_BlocksBasePath; +}; + void blockstore_forcelink(); } // namespace zen diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h index bc8dee9a3..6268588ec 100644 --- a/zenstore/include/zenstore/gc.h +++ b/zenstore/include/zenstore/gc.h @@ -53,7 +53,7 @@ public: void ContributeCids(std::span<const IoHash> Cid); void ContributeCas(std::span<const IoHash> Hash); - void ContributeCacheKeys(const std::string& Bucket, std::vector<IoHash> ValidKeys, std::vector<IoHash> ExpiredKeys); + void ContributeCacheKeys(const std::string& Bucket, std::vector<IoHash>&& ExpiredKeys); void IterateCids(std::function<void(const IoHash&)> Callback); @@ -64,7 +64,6 @@ public: void DeletedCas(std::span<const IoHash> Cas); CasChunkSet& DeletedCas(); - std::span<const IoHash> ValidCacheKeys(const std::string& Bucket) const; std::span<const IoHash> ExpiredCacheKeys(const std::string& Bucket) const; bool IsDeletionMode() const; |