// Copyright Epic Games, Inc. All Rights Reserved. #include "compactcas.h" #include "cas.h" #include #include #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include # include # include # include # include # include ZEN_THIRD_PARTY_INCLUDES_START # include ZEN_THIRD_PARTY_INCLUDES_END #endif ////////////////////////////////////////////////////////////////////////// namespace zen { struct CasDiskIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; static constexpr uint32_t CurrentVersion = 1; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t PayloadAlignment = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header) { return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } }; static_assert(sizeof(CasDiskIndexHeader) == 32); namespace { const char* IndexExtension = ".uidx"; const char* LogExtension = ".ulog"; std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return RootPath / ContainerBaseName; } std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension); } std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension); } std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / "blocks"; } bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); return false; } if (Entry.Flags & CasDiskIndexEntry::kTombstone) { return true; } if (Entry.ContentType != ZenContentType::kUnknownContentType) { OutReason = fmt::format("Invalid content type {} for entry {}", static_cast(Entry.ContentType), Entry.Key.ToHexString()); return false; } if (const uint64_t Size = Entry.Location.GetSize(); Size == 0) { OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); return false; } return true; } } // namespace ////////////////////////////////////////////////////////////////////////// static const float IndexMinLoadFactor = 0.2f; static const float IndexMaxLoadFactor = 0.7f; CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get("containercas")), m_Gc(Gc) { m_LocationMap.min_load_factor(IndexMinLoadFactor); m_LocationMap.max_load_factor(IndexMaxLoadFactor); m_Gc.AddGcStorage(this); m_Gc.AddGcReferenceStore(*this); } CasContainerStrategy::~CasContainerStrategy() { m_Gc.RemoveGcReferenceStore(*this); m_Gc.RemoveGcStorage(this); } void CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory, const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint32_t Alignment, bool IsNewStore) { ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); ZEN_ASSERT(MaxBlockSize > 0); m_RootDirectory = RootDirectory; m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; m_MaxBlockSize = MaxBlockSize; m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName); OpenContainer(IsNewStore); m_IsInitialized = true; } CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { ZEN_TRACE_CPU("CasContainer::InsertChunk"); { RwLock::SharedLockScope _(m_LocationMapLock); if (m_LocationMap.contains(ChunkHash)) { return CasStore::InsertResult{.New = false}; } } // We can end up in a situation that InsertChunk writes the same chunk data in // different locations. // We release the insert lock once we have the correct WriteBlock ready and we know // where to write the data. If a new InsertChunk request for the same chunk hash/data // comes in before we update m_LocationMap below we will have a race. // The outcome of that is that we will write the chunk data in more than one location // but the chunk hash will only point to one of the chunks. // We will in that case waste space until the next GC operation. // // This should be a rare occasion and the current flow reduces the time we block for // reads, insert and GC. m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) { BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; m_CasLog.Append(IndexEntry); { RwLock::ExclusiveLockScope _(m_LocationMapLock); m_LocationMap.emplace(ChunkHash, m_Locations.size()); m_Locations.push_back(DiskLocation); } }); return CasStore::InsertResult{.New = true}; } CasStore::InsertResult CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { #if !ZEN_WITH_TESTS ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); #endif return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { ZEN_TRACE_CPU("CasContainer::FindChunk"); RwLock::SharedLockScope _(m_LocationMapLock); auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt == m_LocationMap.end()) { return IoBuffer(); } const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); IoBuffer Chunk = m_BlockStore.TryGetChunk(Location); return Chunk; } bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); return m_LocationMap.contains(ChunkHash); } void CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) { // This implementation is good enough for relatively small // chunk sets (in terms of chunk identifiers), but would // benefit from a better implementation which removes // items incrementally for large sets, especially when // we're likely to already have a large proportion of the // chunks in the set InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void CasContainerStrategy::Flush() { ZEN_TRACE_CPU("CasContainer::Flush"); m_BlockStore.Flush(/*ForceNewBlock*/ false); m_CasLog.Flush(); MakeIndexSnapshot(); } void CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) { ZEN_TRACE_CPU("CasContainer::ScrubStorage"); if (Ctx.IsSkipCas()) { ZEN_INFO("SKIPPED scrubbing: '{}'", m_BlocksBasePath); return; } ZEN_INFO("scrubbing '{}'", m_BlocksBasePath); std::vector BadKeys; uint64_t ChunkCount{0}, ChunkBytes{0}; std::vector ChunkLocations; std::vector ChunkIndexToChunkHash; try { RwLock::SharedLockScope _(m_LocationMapLock); uint64_t TotalChunkCount = m_LocationMap.size(); ChunkLocations.reserve(TotalChunkCount); ChunkIndexToChunkHash.reserve(TotalChunkCount); { for (const auto& Entry : m_LocationMap) { const IoHash& ChunkHash = Entry.first; const BlockStoreDiskLocation& DiskLocation = m_Locations[Entry.second]; BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); ChunkLocations.push_back(Location); ChunkIndexToChunkHash.push_back(ChunkHash); } } const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { ++ChunkCount; ChunkBytes += Size; const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; if (!Data) { // ChunkLocation out of range of stored blocks BadKeys.push_back(Hash); return; } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); IoHash RawHash; uint64_t RawSize; if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) { if (RawHash == Hash) { // TODO: this should also hash the (decompressed) contents return; } } BadKeys.push_back(Hash); }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { Ctx.ThrowIfDeadlineExpired(); ++ChunkCount; ChunkBytes += Size; const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); IoHash RawHash; uint64_t RawSize; // TODO: Add API to verify compressed buffer without having to memory-map the whole file if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) { if (RawHash == Hash) { // TODO: this should also hash the (decompressed) contents return; } } BadKeys.push_back(Hash); }; m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); } catch (const ScrubDeadlineExpiredException&) { ZEN_INFO("Scrubbing deadline expired, operation incomplete"); } Ctx.ReportScrubbed(ChunkCount, ChunkBytes); if (!BadKeys.empty()) { ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName); if (Ctx.RunRecovery()) { // Deal with bad chunks by removing them from our lookup map std::vector LogEntries; LogEntries.reserve(BadKeys.size()); { RwLock::ExclusiveLockScope IndexLock(m_LocationMapLock); for (const IoHash& ChunkHash : BadKeys) { const auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt == m_LocationMap.end()) { // Might have been GC'd continue; } LogEntries.push_back( {.Key = KeyIt->first, .Location = m_Locations[KeyIt->second], .Flags = CasDiskIndexEntry::kTombstone}); m_LocationMap.erase(KeyIt); } // Clean up m_Locations vectors CompactIndex(IndexLock); } m_CasLog.Append(LogEntries); } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do if (!BadKeys.empty()) { Ctx.ReportBadCidChunks(BadKeys); } ZEN_INFO("scrubbed {} chunks ({}) in '{}'", ChunkCount, NiceBytes(ChunkBytes), m_RootDirectory / m_ContainerBaseName); } void CasContainerStrategy::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("CasContainer::CollectGarbage"); if (GcCtx.SkipCid()) { return; } // It collects all the blocks that we want to delete chunks from. For each such // block we keep a list of chunks to retain and a list of chunks to delete. // // If there is a block that we are currently writing to, that block is omitted // from the garbage collection. // // Next it will iterate over all blocks that we want to remove chunks from. // If the block is empty after removal of chunks we mark the block as pending // delete - we want to delete it as soon as there are no IoBuffers using the // block file. // Once complete we update the m_LocationMap by removing the chunks. // // If the block is non-empty we write out the chunks we want to keep to a new // block file (creating new block files as needed). // // We update the index as we complete each new block file. This makes it possible // to break the GC if we want to limit time for execution. // // GC can very parallell to regular operation - it will block while taking // a snapshot of the current m_LocationMap state and while moving blocks it will // do a blocking operation and update the m_LocationMap after each new block is // written and figuring out the path to the next new block. ZEN_DEBUG("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName); uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; LocationMap_t LocationMap; std::vector Locations; BlockStore::ReclaimSnapshotState BlockStoreState; { ZEN_TRACE_CPU("CasContainer::CollectGarbage::State"); RwLock::SharedLockScope ___(m_LocationMapLock); Stopwatch Timer; const auto ____ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); LocationMap = m_LocationMap; Locations = m_Locations; BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); } uint64_t TotalChunkCount = LocationMap.size(); std::vector TotalChunkHashes; TotalChunkHashes.reserve(TotalChunkCount); for (const auto& Entry : LocationMap) { TotalChunkHashes.push_back(Entry.first); } std::vector ChunkLocations; BlockStore::ChunkIndexArray KeepChunkIndexes; std::vector ChunkIndexToChunkHash; ChunkLocations.reserve(TotalChunkCount); KeepChunkIndexes.reserve(TotalChunkCount); ChunkIndexToChunkHash.reserve(TotalChunkCount); { ZEN_TRACE_CPU("CasContainer::CollectGarbage::Filter"); GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { auto KeyIt = LocationMap.find(ChunkHash); const BlockStoreDiskLocation& DiskLocation = Locations[KeyIt->second]; BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); ChunkIndexToChunkHash.push_back(ChunkHash); if (Keep) { KeepChunkIndexes.push_back(ChunkIndex); } }); } const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); return; } std::vector DeletedChunks; m_BlockStore.ReclaimSpace( BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, false, [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { std::vector LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); { RwLock::ExclusiveLockScope __(m_LocationMapLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); for (const auto& Entry : MovedChunks) { size_t ChunkIndex = Entry.first; const BlockStoreLocation& NewLocation = Entry.second; const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; size_t LocationIndex = m_LocationMap[ChunkHash]; BlockStoreDiskLocation& Location = m_Locations[LocationIndex]; if (Locations[LocationMap[ChunkHash]] != Location) { // Entry has been updated while GC was running, ignore the move continue; } Location = {NewLocation, m_PayloadAlignment}; LogEntries.push_back({.Key = ChunkHash, .Location = Location}); } for (const size_t ChunkIndex : RemovedChunks) { const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; size_t LocationIndex = m_LocationMap[ChunkHash]; const BlockStoreDiskLocation& Location = Locations[LocationIndex]; if (Locations[LocationMap[ChunkHash]] != Location) { // Entry has been updated while GC was running, ignore the delete continue; } LogEntries.push_back({.Key = ChunkHash, .Location = Location, .Flags = CasDiskIndexEntry::kTombstone}); m_LocationMap.erase(ChunkHash); DeletedChunks.push_back(ChunkHash); } } m_CasLog.Append(LogEntries); m_CasLog.Flush(); }, [&GcCtx]() { return GcCtx.ClaimGCReserve(); }); if (!DeletedChunks.empty()) { // Clean up m_Locations vectors RwLock::ExclusiveLockScope IndexLock(m_LocationMapLock); CompactIndex(IndexLock); } GcCtx.AddDeletedCids(DeletedChunks); } class CasContainerStoreCompactor : public GcStoreCompactor { public: CasContainerStoreCompactor(CasContainerStrategy& Owner) : m_CasContainerStrategy(Owner) {} virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function& ClaimDiskReserveCallback) override { ZEN_TRACE_CPU("CasContainer::CompactStore"); Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: compactcas [COMPACT] '{}': RemovedDisk: {} in {}", m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName, NiceBytes(Stats.RemovedDisk), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); if (Ctx.Settings.CollectSmallObjects) { BlockStore::BlockUsageMap BlockUsage; { RwLock::SharedLockScope __(m_CasContainerStrategy.m_LocationMapLock); if (Ctx.IsCancelledFlag.load()) { return; } for (const auto& Entry : m_CasContainerStrategy.m_LocationMap) { size_t Index = Entry.second; const BlockStoreDiskLocation& Loc = m_CasContainerStrategy.m_Locations[Index]; uint32_t BlockIndex = Loc.GetBlockIndex(); uint64_t ChunkSize = RoundUp(Loc.GetSize(), m_CasContainerStrategy.m_PayloadAlignment); if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end()) { It->second.EntryCount++; It->second.DiskUsage += ChunkSize; } else { BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); } } } { BlockStoreCompactState BlockCompactState; std::vector BlockCompactStateKeys; BlockStore::BlockEntryCountMap BlocksToCompact = m_CasContainerStrategy.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent); BlockCompactState.IncludeBlocks(BlocksToCompact); if (BlocksToCompact.size() > 0) { { RwLock::SharedLockScope __(m_CasContainerStrategy.m_LocationMapLock); for (const auto& Entry : m_CasContainerStrategy.m_LocationMap) { size_t Index = Entry.second; const BlockStoreDiskLocation& Loc = m_CasContainerStrategy.m_Locations[Index]; if (!BlockCompactState.AddKeepLocation(Loc.Get(m_CasContainerStrategy.m_PayloadAlignment))) { continue; } BlockCompactStateKeys.push_back(Entry.first); } } if (Ctx.Settings.IsDeleteMode) { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: compactcas [COMPACT] '{}': compacting {} blocks", m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName, BlocksToCompact.size()); } m_CasContainerStrategy.m_BlockStore.CompactBlocks( BlockCompactState, m_CasContainerStrategy.m_PayloadAlignment, [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { std::vector MovedEntries; RwLock::ExclusiveLockScope _(m_CasContainerStrategy.m_LocationMapLock); for (const std::pair& Moved : MovedArray) { size_t ChunkIndex = Moved.first; const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; if (auto It = m_CasContainerStrategy.m_LocationMap.find(Key); It != m_CasContainerStrategy.m_LocationMap.end()) { BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second]; const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); if (Location.Get(m_CasContainerStrategy.m_PayloadAlignment) != OldLocation) { // Someone has moved our chunk so lets just skip the new location we were provided, it will be // GC:d at a later time continue; } const BlockStoreLocation& NewLocation = Moved.second; Location = BlockStoreDiskLocation(NewLocation, m_CasContainerStrategy.m_PayloadAlignment); MovedEntries.push_back(CasDiskIndexEntry{.Key = Key, .Location = Location}); } } m_CasContainerStrategy.m_CasLog.Append(MovedEntries); Stats.RemovedDisk += FreedDiskSpace; if (Ctx.IsCancelledFlag.load()) { return false; } return true; }, ClaimDiskReserveCallback, fmt::format("GCV2: compactcas [COMPACT] '{}': ", m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName)); } else { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: compactcas [COMPACT] '{}': skipped compacting of {} eligible blocks", m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName, BlocksToCompact.size()); } } } } } } CasContainerStrategy& m_CasContainerStrategy; }; class CasContainerReferencePruner : public GcReferencePruner { public: CasContainerReferencePruner(CasContainerStrategy& Owner, std::vector&& Cids) : m_CasContainerStrategy(Owner) , m_Cids(std::move(Cids)) { } virtual std::string GetGcName(GcCtx& Ctx) override { return m_CasContainerStrategy.GetGcName(Ctx); } virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, GcStats& Stats, const GetUnusedReferencesFunc& GetUnusedReferences) override { ZEN_TRACE_CPU("CasContainer::RemoveUnreferencedData"); Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: compactcas [PRUNE] '{}': Checked: {}, Deleted: {}, FreedMemory: {} in {}", m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName, Stats.CheckedCount, Stats.DeletedCount, NiceBytes(Stats.FreedMemory), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::vector UnusedCids = GetUnusedReferences(m_Cids); Stats.CheckedCount = m_Cids.size(); Stats.FoundCount = UnusedCids.size(); if (UnusedCids.empty()) { // Nothing to collect return nullptr; } if (!Ctx.Settings.CollectSmallObjects) { return nullptr; } if (Ctx.Settings.IsDeleteMode) { std::vector ExpiredEntries; ExpiredEntries.reserve(UnusedCids.size()); { RwLock::ExclusiveLockScope __(m_CasContainerStrategy.m_LocationMapLock); if (Ctx.IsCancelledFlag.load()) { return nullptr; } for (const IoHash& Cid : UnusedCids) { if (auto It = m_CasContainerStrategy.m_LocationMap.find(Cid); It != m_CasContainerStrategy.m_LocationMap.end()) { ExpiredEntries.push_back({.Key = Cid, .Location = m_CasContainerStrategy.m_Locations[It->second], .Flags = CasDiskIndexEntry::kTombstone}); } } if (!ExpiredEntries.empty()) { for (const CasDiskIndexEntry& Entry : ExpiredEntries) { m_CasContainerStrategy.m_LocationMap.erase(Entry.Key); Stats.DeletedCount++; } m_CasContainerStrategy.m_CasLog.Append(ExpiredEntries); m_CasContainerStrategy.m_CasLog.Flush(); } } } return new CasContainerStoreCompactor(m_CasContainerStrategy); } private: CasContainerStrategy& m_CasContainerStrategy; std::vector m_Cids; }; std::string CasContainerStrategy::GetGcName(GcCtx&) { return fmt::format("compactcas: '{}'", (m_RootDirectory / m_ContainerBaseName).string()); } GcReferencePruner* CasContainerStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&) { ZEN_TRACE_CPU("CasContainer::CreateReferencePruner"); Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: compactcas [CREATE PRUNER] '{}' in {}", m_RootDirectory / m_ContainerBaseName, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::vector CidsToCheck; { RwLock::SharedLockScope __(m_LocationMapLock); if (m_LocationMap.empty()) { return {}; } if (Ctx.IsCancelledFlag.load()) { return nullptr; } CidsToCheck.reserve(m_LocationMap.size()); for (const auto& It : m_LocationMap) { CidsToCheck.push_back(It.first); } } return new CasContainerReferencePruner(*this, std::move(CidsToCheck)); } void CasContainerStrategy::CompactIndex(RwLock::ExclusiveLockScope&) { ZEN_TRACE_CPU("CasContainer::CompactIndex"); size_t EntryCount = m_LocationMap.size(); LocationMap_t LocationMap; std::vector Locations; Locations.reserve(EntryCount); LocationMap.reserve(EntryCount); for (auto It : m_LocationMap) { size_t EntryIndex = Locations.size(); Locations.push_back(m_Locations[It.second]); LocationMap.insert({It.first, EntryIndex}); } m_LocationMap.swap(LocationMap); m_Locations.swap(Locations); } GcStorageSize CasContainerStrategy::StorageSize() const { return {.DiskSize = m_BlockStore.TotalSize()}; } void CasContainerStrategy::MakeIndexSnapshot() { ZEN_TRACE_CPU("CasContainer::MakeIndexSnapshot"); uint64_t LogCount = m_CasLog.GetLogCount(); if (m_LogFlushPosition == LogCount) { return; } ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", m_RootDirectory / m_ContainerBaseName, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName); // Move index away, we keep it if something goes wrong if (fs::is_regular_file(TempIndexPath)) { std::error_code Ec; if (!fs::remove(TempIndexPath, Ec) || Ec) { ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", TempIndexPath, Ec.message()); return; } } try { if (fs::is_regular_file(IndexPath)) { fs::rename(IndexPath, TempIndexPath); } // Write the current state of the location map to a new index state std::vector Entries; uint64_t IndexLogPosition = 0; { RwLock::SharedLockScope ___(m_LocationMapLock); IndexLogPosition = m_CasLog.GetLogCount(); Entries.resize(m_LocationMap.size()); uint64_t EntryIndex = 0; for (auto& Entry : m_LocationMap) { CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; IndexEntry.Key = Entry.first; IndexEntry.Location = m_Locations[Entry.second]; } } TemporaryFile ObjectIndexFile; std::error_code Ec; ObjectIndexFile.CreateTemporary(IndexPath.parent_path(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to create temp file for index snapshot at '{}'", IndexPath)); } CasDiskIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = IndexLogPosition, .PayloadAlignment = gsl::narrow(m_PayloadAlignment)}; Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header); ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexHeader), 0); ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); ObjectIndexFile.Flush(); ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to move temp file '{}' to '{}'", ObjectIndexFile.GetPath(), IndexPath)); } EntryCount = Entries.size(); m_LogFlushPosition = IndexLogPosition; } catch (const std::exception& Err) { ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); // Restore any previous snapshot if (fs::is_regular_file(TempIndexPath)) { std::error_code Ec; fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless fs::rename(TempIndexPath, IndexPath, Ec); if (Ec) { ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", TempIndexPath, Ec.message()); } } } if (fs::is_regular_file(TempIndexPath)) { std::error_code Ec; if (!fs::remove(TempIndexPath, Ec) || Ec) { ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempIndexPath, Ec.message()); } } } uint64_t CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) { ZEN_TRACE_CPU("CasContainer::ReadIndexFile"); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t Size = ObjectIndexFile.FileSize(); if (Size >= sizeof(CasDiskIndexHeader)) { uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); CasDiskIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) && (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && (Header.EntryCount <= ExpectedEntryCount)) { m_PayloadAlignment = Header.PayloadAlignment; m_Locations.reserve(ExpectedEntryCount); m_LocationMap.reserve(ExpectedEntryCount); std::vector Entries; Entries.resize(128 * 1024 / sizeof(CasDiskIndexEntry)); uint64_t RemainingEntries = Header.EntryCount; uint64_t ReadOffset = sizeof(CasDiskIndexHeader); do { const uint64_t NumToRead = Min(RemainingEntries, Entries.size()); Entries.resize(NumToRead); ObjectIndexFile.Read(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), ReadOffset); std::string InvalidEntryReason; for (const CasDiskIndexEntry& Entry : Entries) { if (!ValidateEntry(Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; } m_LocationMap[Entry.Key] = m_Locations.size(); m_Locations.push_back(Entry.Location); ++EntryCount; } RemainingEntries -= NumToRead; ReadOffset += NumToRead * sizeof(CasDiskIndexEntry); } while (RemainingEntries); OutVersion = CasDiskIndexHeader::CurrentVersion; return Header.LogPosition; } else { ZEN_WARN("skipping invalid index file '{}'", IndexPath); } } return 0; } uint64_t CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_TRACE_CPU("CasContainer::ReadLog"); if (!TCasLogFile::IsValid(LogPath)) { ZEN_WARN("removing invalid cas log at '{}'", LogPath); std::filesystem::remove(LogPath); return 0; } size_t LogEntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing {} entries in {}", m_RootDirectory / m_ContainerBaseName, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (CasLog.Initialize()) { uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } LogEntryCount = EntryCount - SkipEntryCount; CasLog.Replay( [&](const CasDiskIndexEntry& Record) { LogEntryCount++; std::string InvalidEntryReason; if (Record.Flags & CasDiskIndexEntry::kTombstone) { m_LocationMap.erase(Record.Key); return; } if (!ValidateEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); return; } m_LocationMap[Record.Key] = m_Locations.size(); m_Locations.push_back(Record.Location); }, SkipEntryCount); return LogEntryCount; } return 0; } void CasContainerStrategy::OpenContainer(bool IsNewStore) { ZEN_TRACE_CPU("CasContainer::OpenContainer"); // Add .running file and delete on clean on close to detect bad termination m_LocationMap.clear(); m_Locations.clear(); std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName); if (IsNewStore) { std::filesystem::remove_all(BasePath); } CreateDirectories(BasePath); m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); if (std::filesystem::is_regular_file(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); std::filesystem::remove(IndexPath); } } uint64_t LogEntryCount = 0; if (std::filesystem::is_regular_file(LogPath)) { if (TCasLogFile::IsValid(LogPath)) { LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); } else { ZEN_WARN("removing invalid cas log at '{}'", LogPath); std::filesystem::remove(LogPath); } } m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_LocationMap) { const BlockStoreDiskLocation& DiskLocation = m_Locations[Entry.second]; BlockStoreLocation BlockLocation = DiskLocation.Get(m_PayloadAlignment); KnownBlocks.Add(BlockLocation.BlockIndex); } m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); if (IsNewStore || (LogEntryCount > 0)) { MakeIndexSnapshot(); } // TODO: should validate integrity of container files here } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS TEST_CASE("compactcas.hex") { uint32_t Value; std::string HexString; CHECK(!ParseHexNumber("", Value)); char Hex[9]; ToHexNumber(0u, Hex); HexString = std::string(Hex); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0u); ToHexNumber(std::numeric_limits::max(), Hex); HexString = std::string(Hex); CHECK(HexString == "ffffffff"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == std::numeric_limits::max()); ToHexNumber(0xadf14711u, Hex); HexString = std::string(Hex); CHECK(HexString == "adf14711"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0xadf14711u); ToHexNumber(0x80000000u, Hex); HexString = std::string(Hex); CHECK(HexString == "80000000"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0x80000000u); ToHexNumber(0x718293a4u, Hex); HexString = std::string(Hex); CHECK(HexString == "718293a4"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0x718293a4u); } TEST_CASE("compactcas.compact.gc") { ScopedTemporaryDirectory TempDir; const int kIterationCount = 1000; std::vector Keys(kIterationCount); { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); for (int i = 0; i < kIterationCount; ++i) { CbObjectWriter Cbo; Cbo << "id" << i; CbObject Obj = Cbo.Save(); IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); const IoHash Hash = HashBuffer(ObjBuffer); Cas.InsertChunk(ObjBuffer, Hash); Keys[i] = Hash; } for (int i = 0; i < kIterationCount; ++i) { IoBuffer Chunk = Cas.FindChunk(Keys[i]); CHECK(!!Chunk); CbObject Value = LoadCompactBinaryObject(Chunk); CHECK_EQ(Value["id"].AsInt32(), i); } } // Validate that we can still read the inserted data after closing // the original cas store { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); for (int i = 0; i < kIterationCount; ++i) { IoBuffer Chunk = Cas.FindChunk(Keys[i]); CHECK(!!Chunk); CbObject Value = LoadCompactBinaryObject(Chunk); CHECK_EQ(Value["id"].AsInt32(), i); } } } TEST_CASE("compactcas.compact.totalsize") { std::random_device rd; std::mt19937 g(rd()); // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1024; const int32_t kChunkCount = 16; { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { IoBuffer Chunk = CreateRandomBlob(kChunkSize); const IoHash Hash = HashBuffer(Chunk); CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); ZEN_ASSERT(InsertResult.New); } const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } // Re-open again, this time we should have a snapshot { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } } } TEST_CASE("compactcas.gc.basic") { ScopedTemporaryDirectory TempDir; GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); IoBuffer Chunk = CreateRandomBlob(128); IoHash ChunkHash = IoHash::HashBuffer(Chunk); const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); CHECK(InsertResult.New); Cas.Flush(); GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); GcCtx.CollectSmallObjects(true); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHash)); } TEST_CASE("compactcas.gc.removefile") { ScopedTemporaryDirectory TempDir; IoBuffer Chunk = CreateRandomBlob(128); IoHash ChunkHash = IoHash::HashBuffer(Chunk); { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); CHECK(InsertResult.New); const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash); CHECK(!InsertResultDup.New); Cas.Flush(); } GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false); GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); GcCtx.CollectSmallObjects(true); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHash)); } TEST_CASE("compactcas.gc.compact") { { ScopedTemporaryDirectory TempDir; GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true); uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; std::vector Chunks; Chunks.reserve(9); for (uint64_t Size : ChunkSizes) { Chunks.push_back(CreateRandomBlob(Size)); } std::vector ChunkHashes; ChunkHashes.reserve(9); for (const IoBuffer& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New); CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New); CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New); CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New); CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New); CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New); CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New); CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New); CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New); CHECK(Cas.HaveChunk(ChunkHashes[0])); CHECK(Cas.HaveChunk(ChunkHashes[1])); CHECK(Cas.HaveChunk(ChunkHashes[2])); CHECK(Cas.HaveChunk(ChunkHashes[3])); CHECK(Cas.HaveChunk(ChunkHashes[4])); CHECK(Cas.HaveChunk(ChunkHashes[5])); CHECK(Cas.HaveChunk(ChunkHashes[6])); CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); auto ValidateChunkExists = [&](size_t Index) { IoBuffer Chunk = Cas.FindChunk(ChunkHashes[Index]); bool Exists = !!Chunk; CHECK(Exists); IoHash Hash = IoHash::HashBuffer(Chunk); if (ChunkHashes[Index] != Hash) { CHECK(fmt::format("{}", ChunkHashes[Index]) == fmt::format("{}", Hash)); } }; // Keep first and last { GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[0]); KeepChunks.push_back(ChunkHashes[8]); GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(!Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(!Cas.HaveChunk(ChunkHashes[6])); CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); ValidateChunkExists(0); ValidateChunkExists(8); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[4], ChunkHashes[4]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[6], ChunkHashes[6]); Cas.InsertChunk(Chunks[7], ChunkHashes[7]); } // Keep last { GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[8]); GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(!Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(!Cas.HaveChunk(ChunkHashes[6])); CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); ValidateChunkExists(8); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[4], ChunkHashes[4]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[6], ChunkHashes[6]); Cas.InsertChunk(Chunks[7], ChunkHashes[7]); } // Keep mixed { GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[1]); KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[7]); GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); CHECK(Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(!Cas.HaveChunk(ChunkHashes[6])); CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(!Cas.HaveChunk(ChunkHashes[8])); ValidateChunkExists(1); ValidateChunkExists(4); ValidateChunkExists(7); Cas.InsertChunk(Chunks[0], ChunkHashes[0]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[6], ChunkHashes[6]); Cas.InsertChunk(Chunks[8], ChunkHashes[8]); } // Keep multiple at end { GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[7]); KeepChunks.push_back(ChunkHashes[8]); GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(!Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(Cas.HaveChunk(ChunkHashes[6])); CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); ValidateChunkExists(6); ValidateChunkExists(7); ValidateChunkExists(8); Cas.InsertChunk(Chunks[0], ChunkHashes[0]); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[4], ChunkHashes[4]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); } // Keep every other { GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[0]); KeepChunks.push_back(ChunkHashes[2]); KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[8]); GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(Cas.HaveChunk(ChunkHashes[6])); CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); ValidateChunkExists(0); ValidateChunkExists(2); ValidateChunkExists(4); ValidateChunkExists(6); ValidateChunkExists(8); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[7], ChunkHashes[7]); } // Verify that we nicely appended blocks even after all GC operations ValidateChunkExists(0); ValidateChunkExists(1); ValidateChunkExists(2); ValidateChunkExists(3); ValidateChunkExists(4); ValidateChunkExists(5); ValidateChunkExists(6); ValidateChunkExists(7); ValidateChunkExists(8); } } TEST_CASE("compactcas.gc.deleteblockonopen") { ScopedTemporaryDirectory TempDir; uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; std::vector Chunks; Chunks.reserve(20); for (uint64_t Size : ChunkSizes) { Chunks.push_back(CreateRandomBlob(Size)); } std::vector ChunkHashes; ChunkHashes.reserve(20); for (const IoBuffer& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); for (size_t i = 0; i < 20; i++) { CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); } // GC every other block { GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); GcCtx.CollectSmallObjects(true); std::vector KeepChunks; for (size_t i = 0; i < 20; i += 2) { KeepChunks.push_back(ChunkHashes[i]); } GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); for (size_t i = 0; i < 20; i += 2) { CHECK(Cas.HaveChunk(ChunkHashes[i])); CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); } } } { // Re-open GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 1024, 16, false); for (size_t i = 0; i < 20; i += 2) { CHECK(Cas.HaveChunk(ChunkHashes[i])); CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); } } } TEST_CASE("compactcas.gc.handleopeniobuffer") { ScopedTemporaryDirectory TempDir; uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; std::vector Chunks; Chunks.reserve(20); for (const uint64_t& Size : ChunkSizes) { Chunks.push_back(CreateRandomBlob(Size)); } std::vector ChunkHashes; ChunkHashes.reserve(20); for (const IoBuffer& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); for (size_t i = 0; i < 20; i++) { CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); } IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]); Cas.Flush(); // GC everything GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); GcCtx.CollectSmallObjects(true); Cas.CollectGarbage(GcCtx); for (size_t i = 0; i < 20; i++) { CHECK(!Cas.HaveChunk(ChunkHashes[i])); } CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); } TEST_CASE_TEMPLATE("compactcas.threadedinsert", GCV2, FalseType, TrueType) { // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1048; const int32_t kChunkCount = 4096; uint64_t ExpectedSize = 0; std::unordered_map Chunks; Chunks.reserve(kChunkCount); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { while (true) { IoBuffer Chunk = CreateRandomBlob(kChunkSize); IoHash Hash = HashBuffer(Chunk); if (Chunks.contains(Hash)) { continue; } Chunks[Hash] = Chunk; ExpectedSize += Chunk.Size(); break; } } std::atomic WorkCompleted = 0; WorkerThreadPool ThreadPool(4); GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); { for (const auto& Chunk : Chunks) { const IoHash& Hash = Chunk.first; const IoBuffer& Buffer = Chunk.second; ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() { CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); ZEN_ASSERT(InsertResult.New); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size()) { Sleep(1); } } WorkCompleted = 0; const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_LE(ExpectedSize, TotalSize); CHECK_GE(ExpectedSize + 32768, TotalSize); { for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() { IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); IoHash Hash = IoHash::HashBuffer(Buffer); CHECK(ChunkHash == Hash); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size()) { Sleep(1); } } std::unordered_set GcChunkHashes; GcChunkHashes.reserve(Chunks.size()); for (const auto& Chunk : Chunks) { GcChunkHashes.insert(Chunk.first); } { WorkCompleted = 0; std::unordered_map NewChunks; NewChunks.reserve(kChunkCount); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { IoBuffer Chunk = CreateRandomBlob(kChunkSize); IoHash Hash = HashBuffer(Chunk); NewChunks[Hash] = Chunk; GcChunkHashes.insert(Hash); } std::atomic_uint32_t AddedChunkCount; for (const auto& Chunk : NewChunks) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { Cas.InsertChunk(Chunk.second, Chunk.first); AddedChunkCount.fetch_add(1); WorkCompleted.fetch_add(1); }); } for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() { IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); if (Buffer) { CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); } WorkCompleted.fetch_add(1); }); } std::unordered_set ChunksToDelete; std::vector KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); size_t C = 0; while (C < KeepHashes.size()) { if (C % 155 == 0) { if (C < KeepHashes.size() - 1) { ChunksToDelete.insert(KeepHashes[C]); KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } if (C + 3 < KeepHashes.size() - 1) { ChunksToDelete.insert(KeepHashes[C + 3]); KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } } C++; } auto DoGC = [](CasContainerStrategy& Cas, const std::unordered_set& ChunksToDelete, const std::vector& KeepHashes, std::unordered_set& GcChunkHashes) { if (GCV2::Enabled) { std::atomic_bool IsCancelledFlag = false; GcCtx Ctx = {.Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), .CollectSmallObjects = true, .IsDeleteMode = true, .CompactBlockUsageThresholdPercent = 100}, .IsCancelledFlag = IsCancelledFlag}; GcReferenceStoreStats PrunerStats; GcReferencePruner* Pruner = Cas.CreateReferencePruner(Ctx, PrunerStats); CHECK(Pruner); HashKeySet Deleted; GcStats Stats; GcStoreCompactor* Compactor = Pruner->RemoveUnreferencedData(Ctx, Stats, [&](std::span References) -> std::vector { std::vector Unreferenced; HashKeySet Retain; Retain.AddHashesToSet(KeepHashes); for (const IoHash& ChunkHash : References) { if (!Retain.ContainsHash(ChunkHash)) { Unreferenced.push_back(ChunkHash); } } Deleted.AddHashesToSet(Unreferenced); return Unreferenced; }); if (Compactor) { Deleted.IterateHashes([&GcChunkHashes, &ChunksToDelete](const IoHash& ChunkHash) { CHECK(ChunksToDelete.contains(ChunkHash)); GcChunkHashes.erase(ChunkHash); }); GcCompactStoreStats CompactStats; Compactor->CompactStore(Ctx, CompactStats, []() { return 0; }); } } else { GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); GcCtx.CollectSmallObjects(true); GcCtx.AddRetainedCids(KeepHashes); Cas.CollectGarbage(GcCtx); const HashKeySet& Deleted = GcCtx.DeletedCids(); Deleted.IterateHashes([&GcChunkHashes, &ChunksToDelete](const IoHash& ChunkHash) { CHECK(ChunksToDelete.contains(ChunkHash)); GcChunkHashes.erase(ChunkHash); }); } }; while (AddedChunkCount.load() < NewChunks.size()) { DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes); } while (WorkCompleted < NewChunks.size() + Chunks.size()) { Sleep(1); } DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes); } { WorkCompleted = 0; for (const IoHash& ChunkHash : GcChunkHashes) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { CHECK(Cas.HaveChunk(ChunkHash)); CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < GcChunkHashes.size()) { Sleep(1); } } } } #endif void compactcas_forcelink() { } } // namespace zen