// Copyright Epic Games, Inc. All Rights Reserved. #include "compactcas.h" #include "cas.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include # include # include # include # include # include ZEN_THIRD_PARTY_INCLUDES_START # include ZEN_THIRD_PARTY_INCLUDES_END #endif ////////////////////////////////////////////////////////////////////////// namespace zen { const FLLMTag& GetCasContainerTag() { static FLLMTag _("container", FLLMTag("cas")); return _; } struct CasDiskIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; static constexpr uint32_t CurrentVersion = 1; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t PayloadAlignment = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header) { return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } }; static_assert(sizeof(CasDiskIndexHeader) == 32); namespace cas::impl { const char* IndexExtension = ".uidx"; const char* LogExtension = ".ulog"; std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return RootPath / ContainerBaseName; } std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension); } std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension); } std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / "blocks"; } bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); return false; } if (Entry.Flags & CasDiskIndexEntry::kTombstone) { return true; } if (Entry.ContentType != ZenContentType::kUnknownContentType) { OutReason = fmt::format("Invalid content type {} for entry {}", static_cast(Entry.ContentType), Entry.Key.ToHexString()); return false; } if (const uint64_t Size = Entry.Location.GetSize(); Size == 0) { OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); return false; } return true; } } // namespace cas::impl ////////////////////////////////////////////////////////////////////////// CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get("containercas")), m_Gc(Gc) { ZEN_MEMSCOPE(GetCasContainerTag()); const float IndexMinLoadFactor = 0.2f; const float IndexMaxLoadFactor = 0.7f; m_LocationMap.min_load_factor(IndexMinLoadFactor); m_LocationMap.max_load_factor(IndexMaxLoadFactor); m_Gc.AddGcStorage(this); m_Gc.AddGcReferenceStore(*this); } CasContainerStrategy::~CasContainerStrategy() { m_Gc.RemoveGcReferenceStore(*this); m_Gc.RemoveGcStorage(this); } void CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory, const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint32_t Alignment, bool IsNewStore) { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); ZEN_ASSERT(MaxBlockSize > 0); m_RootDirectory = RootDirectory; m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; m_MaxBlockSize = MaxBlockSize; m_BlocksBasePath = cas::impl::GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName); OpenContainer(IsNewStore); m_IsInitialized = true; } CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { ZEN_TRACE_CPU("CasContainer::InsertChunk"); { RwLock::SharedLockScope _(m_LocationMapLock); if (m_LocationMap.contains(ChunkHash)) { return CasStore::InsertResult{.New = false}; } } ZEN_MEMSCOPE(GetCasContainerTag()); // We can end up in a situation that InsertChunk writes the same chunk data in // different locations. // We release the insert lock once we have the correct WriteBlock ready and we know // where to write the data. If a new InsertChunk request for the same chunk hash/data // comes in before we update m_LocationMap below we will have a race. // The outcome of that is that we will write the chunk data in more than one location // but the chunk hash will only point to one of the chunks. // We will in that case waste space until the next GC operation. // // This should be a rare occasion and the current flow reduces the time we block for // reads, insert and GC. m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::UpdateLocation"); BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; m_CasLog.Append(IndexEntry); { RwLock::ExclusiveLockScope _(m_LocationMapLock); m_LocationMap.emplace(ChunkHash, m_Locations.size()); m_Locations.push_back(DiskLocation); } }); return CasStore::InsertResult{.New = true}; } CasStore::InsertResult CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { ZEN_MEMSCOPE(GetCasContainerTag()); #if !ZEN_WITH_TESTS ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); #endif return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } std::vector CasContainerStrategy::InsertChunks(std::span Chunks, std::span ChunkHashes) { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_ASSERT(Chunks.size() == ChunkHashes.size()); std::vector Result(Chunks.size()); std::vector NewChunkIndexes; { RwLock::SharedLockScope _(m_LocationMapLock); for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++) { const IoHash& ChunkHash = ChunkHashes[ChunkIndex]; bool IsNew = !m_LocationMap.contains(ChunkHash); Result[ChunkIndex] = CasStore::InsertResult{.New = IsNew}; if (IsNew) { NewChunkIndexes.push_back(ChunkIndex); } } } if (NewChunkIndexes.empty()) { return Result; } std::vector Datas; for (size_t ChunkIndex : NewChunkIndexes) { const IoBuffer& Chunk = Chunks[ChunkIndex]; #if !ZEN_WITH_TESTS ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); #endif Datas.emplace_back(Chunk); } size_t ChunkOffset = 0; m_BlockStore.WriteChunks(Datas, m_PayloadAlignment, [&](std::span Locations) { ZEN_MEMSCOPE(GetCasContainerTag()); std::vector IndexEntries; for (const BlockStoreLocation& Location : Locations) { size_t ChunkIndex = NewChunkIndexes[ChunkOffset++]; IndexEntries.emplace_back( CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)}); } m_CasLog.Append(IndexEntries); { RwLock::ExclusiveLockScope _(m_LocationMapLock); for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries) { m_LocationMap.insert_or_assign(DiskIndexEntry.Key, m_Locations.size()); m_Locations.push_back(DiskIndexEntry.Location); } } }); return Result; } IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { ZEN_TRACE_CPU("CasContainer::FindChunk"); RwLock::SharedLockScope _(m_LocationMapLock); auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt == m_LocationMap.end()) { return IoBuffer(); } const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); IoBuffer Chunk = m_BlockStore.TryGetChunk(Location); return Chunk; } bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); return m_LocationMap.contains(ChunkHash); } void CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) { // This implementation is good enough for relatively small // chunk sets (in terms of chunk identifiers), but would // benefit from a better implementation which removes // items incrementally for large sets, especially when // we're likely to already have a large proportion of the // chunks in the set InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } bool CasContainerStrategy::IterateChunks(std::span ChunkHashes, const std::function& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) { ZEN_MEMSCOPE(GetCasContainerTag()); const size_t ChunkCount = ChunkHashes.size(); std::vector FoundChunkIndexes; std::vector FoundChunkLocations; FoundChunkIndexes.reserve(ChunkCount); FoundChunkLocations.reserve(ChunkCount); { RwLock::SharedLockScope _(m_LocationMapLock); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++) { if (auto KeyIt = m_LocationMap.find(ChunkHashes[ChunkIndex]); KeyIt != m_LocationMap.end()) { FoundChunkIndexes.push_back(ChunkIndex); FoundChunkLocations.push_back(m_Locations[KeyIt->second].Get(m_PayloadAlignment)); } } } if (FoundChunkLocations.size() < 4) { for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) { IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[Index]); size_t OuterIndex = FoundChunkIndexes[Index]; if (!AsyncCallback(OuterIndex, Chunk)) { return false; } } return true; } auto DoOneBlock = [this](const std::function& AsyncCallback, uint64_t LargeSizeLimit, std::span FoundChunkIndexes, std::span FoundChunkLocations, std::span ChunkIndexes) { if (ChunkIndexes.size() < 4) { for (size_t ChunkIndex : ChunkIndexes) { IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]); if (!AsyncCallback(FoundChunkIndexes[ChunkIndex], Chunk)) { return false; } } return true; } return m_BlockStore.IterateBlock( FoundChunkLocations, ChunkIndexes, [AsyncCallback, FoundChunkIndexes, LargeSizeLimit](size_t ChunkIndex, const void* Data, uint64_t Size) { if (Data == nullptr) { return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer()); } return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size)); }, [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); }, LargeSizeLimit); }; std::atomic AsyncContinue = true; { std::atomic AbortFlag; ParallelWork Work(AbortFlag); const bool Continue = m_BlockStore.IterateChunks( FoundChunkLocations, [this, &Work, &AsyncContinue, &AsyncCallback, LargeSizeLimit, DoOneBlock, &FoundChunkIndexes, &FoundChunkLocations, OptionalWorkerPool](uint32_t BlockIndex, std::span ChunkIndexes) { if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) { std::vector TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); Work.ScheduleWork( *OptionalWorkerPool, [this, &AsyncContinue, &AsyncCallback, LargeSizeLimit, DoOneBlock, BlockIndex, &FoundChunkIndexes, &FoundChunkLocations, ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic& AbortFlag) { if (AbortFlag) { AsyncContinue.store(false); } if (!AsyncContinue) { return; } try { bool Continue = DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); if (!Continue) { AsyncContinue.store(false); } } catch (const std::exception& Ex) { ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", m_RootDirectory, BlockIndex, Ex.what()); AsyncContinue.store(false); } }); return AsyncContinue.load(); } else { return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); } }); if (!Continue) { AsyncContinue.store(false); } Work.Wait(); } return AsyncContinue.load(); } void CasContainerStrategy::Flush() { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::Flush"); m_BlockStore.Flush(/*ForceNewBlock*/ false); m_CasLog.Flush(); MakeIndexSnapshot(/*ResetLog*/ false); } void CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::ScrubStorage"); if (Ctx.IsSkipCas()) { ZEN_INFO("SKIPPED scrubbing: '{}'", m_BlocksBasePath); return; } ZEN_INFO("scrubbing '{}'", m_BlocksBasePath); RwLock BadKeysLock; std::vector BadKeys; std::atomic_uint64_t ChunkCount{0}, ChunkBytes{0}; std::vector ChunkLocations; std::vector ChunkIndexToChunkHash; try { RwLock::SharedLockScope _(m_LocationMapLock); uint64_t TotalChunkCount = m_LocationMap.size(); ChunkLocations.reserve(TotalChunkCount); ChunkIndexToChunkHash.reserve(TotalChunkCount); { for (const auto& Entry : m_LocationMap) { const IoHash& ChunkHash = Entry.first; const BlockStoreDiskLocation& DiskLocation = m_Locations[Entry.second]; BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); ChunkLocations.push_back(Location); ChunkIndexToChunkHash.push_back(ChunkHash); } } const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { ChunkCount.fetch_add(1); ChunkBytes.fetch_add(Size); const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; if (!Data) { // ChunkLocation out of range of stored blocks BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); return true; } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); IoHash RawHash; uint64_t RawSize; if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) { if (RawHash == Hash) { // TODO: this should also hash the (decompressed) contents return true; } } BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); return true; }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { Ctx.ThrowIfDeadlineExpired(); ChunkCount.fetch_add(1); ChunkBytes.fetch_add(Size); const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); IoHash RawHash; uint64_t RawSize; // TODO: Add API to verify compressed buffer without having to memory-map the whole file if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) { if (RawHash == Hash) { // TODO: this should also hash the (decompressed) contents return true; } } BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); return true; }; m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span ChunkIndexes) { return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0); }); } catch (const ScrubDeadlineExpiredException&) { ZEN_INFO("Scrubbing deadline expired, operation incomplete"); } Ctx.ReportScrubbed(ChunkCount, ChunkBytes); if (!BadKeys.empty()) { ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName); if (Ctx.RunRecovery()) { // Deal with bad chunks by removing them from our lookup map std::vector LogEntries; LogEntries.reserve(BadKeys.size()); { RwLock::ExclusiveLockScope IndexLock(m_LocationMapLock); for (const IoHash& ChunkHash : BadKeys) { const auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt == m_LocationMap.end()) { // Might have been GC'd continue; } LogEntries.push_back( {.Key = KeyIt->first, .Location = m_Locations[KeyIt->second], .Flags = CasDiskIndexEntry::kTombstone}); m_LocationMap.erase(KeyIt); } // Clean up m_Locations vectors CompactIndex(IndexLock); } m_CasLog.Append(LogEntries); } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do if (!BadKeys.empty()) { Ctx.ReportBadCidChunks(BadKeys); } ZEN_INFO("scrubbed {} chunks ({}) in '{}'", ChunkCount.load(), NiceBytes(ChunkBytes.load()), m_RootDirectory / m_ContainerBaseName); } class CasContainerStoreCompactor : public GcStoreCompactor { public: CasContainerStoreCompactor(CasContainerStrategy& Owner) : m_CasContainerStrategy(Owner) {} virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function& ClaimDiskReserveCallback) override { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::CompactStore"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: compactcas [COMPACT] '{}': RemovedDisk: {} in {}", m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName, NiceBytes(Stats.RemovedDisk), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); if (Ctx.Settings.CollectSmallObjects) { BlockStore::BlockUsageMap BlockUsage; { RwLock::SharedLockScope __(m_CasContainerStrategy.m_LocationMapLock); if (Ctx.IsCancelledFlag.load()) { return; } for (const auto& Entry : m_CasContainerStrategy.m_LocationMap) { size_t Index = Entry.second; const BlockStoreDiskLocation& Loc = m_CasContainerStrategy.m_Locations[Index]; uint32_t BlockIndex = Loc.GetBlockIndex(); uint64_t ChunkSize = RoundUp(Loc.GetSize(), m_CasContainerStrategy.m_PayloadAlignment); if (BlockStore::BlockUsageMap::iterator It = BlockUsage.find(BlockIndex); It != BlockUsage.end()) { BlockStore::BlockUsageInfo& Info = It.value(); Info.EntryCount++; Info.DiskUsage += ChunkSize; } else { BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); } } } { BlockStoreCompactState BlockCompactState; std::vector BlockCompactStateKeys; BlockStore::BlockEntryCountMap BlocksToCompact = m_CasContainerStrategy.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent); BlockCompactState.IncludeBlocks(BlocksToCompact); if (BlocksToCompact.size() > 0) { { RwLock::SharedLockScope __(m_CasContainerStrategy.m_LocationMapLock); for (const auto& Entry : m_CasContainerStrategy.m_LocationMap) { size_t Index = Entry.second; const BlockStoreDiskLocation& Loc = m_CasContainerStrategy.m_Locations[Index]; if (!BlockCompactState.AddKeepLocation(Loc.Get(m_CasContainerStrategy.m_PayloadAlignment))) { continue; } BlockCompactStateKeys.push_back(Entry.first); } } if (Ctx.Settings.IsDeleteMode) { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: compactcas [COMPACT] '{}': compacting {} blocks", m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName, BlocksToCompact.size()); } m_CasContainerStrategy.m_BlockStore.CompactBlocks( BlockCompactState, m_CasContainerStrategy.m_PayloadAlignment, [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { std::vector MovedEntries; RwLock::ExclusiveLockScope _(m_CasContainerStrategy.m_LocationMapLock); for (const std::pair& Moved : MovedArray) { size_t ChunkIndex = Moved.first; const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; if (auto It = m_CasContainerStrategy.m_LocationMap.find(Key); It != m_CasContainerStrategy.m_LocationMap.end()) { BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second]; const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); if (Location.Get(m_CasContainerStrategy.m_PayloadAlignment) != OldLocation) { // Someone has moved our chunk so lets just skip the new location we were provided, it will be // GC:d at a later time continue; } const BlockStoreLocation& NewLocation = Moved.second; Location = BlockStoreDiskLocation(NewLocation, m_CasContainerStrategy.m_PayloadAlignment); MovedEntries.push_back(CasDiskIndexEntry{.Key = Key, .Location = Location}); } } m_CasContainerStrategy.m_CasLog.Append(MovedEntries); Stats.RemovedDisk += FreedDiskSpace; if (Ctx.IsCancelledFlag.load()) { return false; } return true; }, ClaimDiskReserveCallback, fmt::format("GCV2: compactcas [COMPACT] '{}': ", m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName)); } else { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: compactcas [COMPACT] '{}': skipped compacting of {} eligible blocks", m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName, BlocksToCompact.size()); } } } } } } virtual std::string GetGcName(GcCtx& Ctx) override { return m_CasContainerStrategy.GetGcName(Ctx); } CasContainerStrategy& m_CasContainerStrategy; }; class CasContainerReferencePruner : public GcReferencePruner { public: CasContainerReferencePruner(CasContainerStrategy& Owner, std::vector&& Cids) : m_CasContainerStrategy(Owner) , m_Cids(std::move(Cids)) { } virtual std::string GetGcName(GcCtx& Ctx) override { return m_CasContainerStrategy.GetGcName(Ctx); } virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, GcStats& Stats, const GetUnusedReferencesFunc& GetUnusedReferences) override { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::RemoveUnreferencedData"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: compactcas [PRUNE] '{}': Checked: {}, Deleted: {}, FreedMemory: {} in {}", m_CasContainerStrategy.m_RootDirectory.filename() / m_CasContainerStrategy.m_ContainerBaseName, Stats.CheckedCount, Stats.DeletedCount, NiceBytes(Stats.FreedMemory), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::span UnusedCids = GetUnusedReferences(m_Cids); Stats.CheckedCount = m_Cids.size(); Stats.FoundCount = UnusedCids.size(); if (!Ctx.Settings.CollectSmallObjects) { return nullptr; } if (!UnusedCids.empty()) { if (Ctx.Settings.IsDeleteMode) { std::vector ExpiredEntries; ExpiredEntries.reserve(UnusedCids.size()); { RwLock::ExclusiveLockScope __(m_CasContainerStrategy.m_LocationMapLock); if (Ctx.IsCancelledFlag.load()) { return nullptr; } for (const IoHash& Cid : UnusedCids) { if (auto It = m_CasContainerStrategy.m_LocationMap.find(Cid); It != m_CasContainerStrategy.m_LocationMap.end()) { ExpiredEntries.push_back({.Key = Cid, .Location = m_CasContainerStrategy.m_Locations[It->second], .Flags = CasDiskIndexEntry::kTombstone}); } } if (!ExpiredEntries.empty()) { for (const CasDiskIndexEntry& Entry : ExpiredEntries) { m_CasContainerStrategy.m_LocationMap.erase(Entry.Key); Stats.DeletedCount++; } m_CasContainerStrategy.m_CasLog.Append(ExpiredEntries); m_CasContainerStrategy.m_CasLog.Flush(); } } } } return new CasContainerStoreCompactor(m_CasContainerStrategy); } private: CasContainerStrategy& m_CasContainerStrategy; std::vector m_Cids; }; std::string CasContainerStrategy::GetGcName(GcCtx&) { return fmt::format("compactcas: '{}'", (m_RootDirectory / m_ContainerBaseName).string()); } GcReferencePruner* CasContainerStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&) { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::CreateReferencePruner"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: compactcas [CREATE PRUNER] '{}' in {}", m_RootDirectory / m_ContainerBaseName, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::vector CidsToCheck; { RwLock::SharedLockScope __(m_LocationMapLock); if (m_LocationMap.empty()) { return {}; } if (Ctx.IsCancelledFlag.load()) { return nullptr; } CidsToCheck.reserve(m_LocationMap.size()); for (const auto& It : m_LocationMap) { CidsToCheck.push_back(It.first); } } if (FilterReferences(Ctx, fmt::format("compactcas [CREATE PRUNER] '{}'", m_RootDirectory / m_ContainerBaseName), CidsToCheck)) { return new CasContainerReferencePruner(*this, std::move(CidsToCheck)); } return nullptr; } void CasContainerStrategy::CompactIndex(RwLock::ExclusiveLockScope&) { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::CompactIndex"); size_t EntryCount = m_LocationMap.size(); LocationMap_t LocationMap; std::vector Locations; Locations.reserve(EntryCount); LocationMap.reserve(EntryCount); for (auto It : m_LocationMap) { size_t EntryIndex = Locations.size(); Locations.push_back(m_Locations[It.second]); LocationMap.insert({It.first, EntryIndex}); } m_LocationMap.swap(LocationMap); m_Locations.swap(Locations); } GcStorageSize CasContainerStrategy::StorageSize() const { return {.DiskSize = m_BlockStore.TotalSize()}; } void CasContainerStrategy::MakeIndexSnapshot(bool ResetLog) { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::MakeIndexSnapshot"); if (m_LogFlushPosition == m_CasLog.GetLogCount()) { return; } ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", m_RootDirectory / m_ContainerBaseName, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; const fs::path IndexPath = cas::impl::GetIndexPath(m_RootDirectory, m_ContainerBaseName); try { // Write the current state of the location map to a new index state std::vector Entries; uint64_t IndexLogPosition = 0; { RwLock::SharedLockScope ___(m_LocationMapLock); if (!ResetLog) { IndexLogPosition = m_CasLog.GetLogCount(); } Entries.resize(m_LocationMap.size()); uint64_t EntryIndex = 0; for (auto& Entry : m_LocationMap) { CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; IndexEntry.Key = Entry.first; IndexEntry.Location = m_Locations[Entry.second]; } EntryCount = m_LocationMap.size(); } TemporaryFile ObjectIndexFile; std::error_code Ec; ObjectIndexFile.CreateTemporary(IndexPath.parent_path(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to create temp file for index snapshot at '{}'", IndexPath)); } CasDiskIndexHeader Header = {.EntryCount = EntryCount, .LogPosition = IndexLogPosition, .PayloadAlignment = gsl::narrow(m_PayloadAlignment)}; Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header); ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexHeader), 0); ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); ObjectIndexFile.Flush(); ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", ObjectIndexFile.GetPath(), IndexPath, Ec.message())); } if (ResetLog) { const std::filesystem::path LogPath = cas::impl::GetLogPath(m_RootDirectory, m_ContainerBaseName); if (IsFile(LogPath)) { if (!RemoveFile(LogPath, Ec) || Ec) { // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in // the end it will be the same result ZEN_WARN("Snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); } } } m_LogFlushPosition = IndexLogPosition; } catch (const std::exception& Err) { ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); } } uint64_t CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::ReadIndexFile"); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t Size = ObjectIndexFile.FileSize(); if (Size >= sizeof(CasDiskIndexHeader)) { uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); CasDiskIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) && (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && (Header.EntryCount <= ExpectedEntryCount)) { m_PayloadAlignment = Header.PayloadAlignment; m_Locations.reserve(ExpectedEntryCount); m_LocationMap.reserve(ExpectedEntryCount); std::vector Entries; Entries.resize(128 * 1024 / sizeof(CasDiskIndexEntry)); uint64_t RemainingEntries = Header.EntryCount; uint64_t ReadOffset = sizeof(CasDiskIndexHeader); do { const uint64_t NumToRead = Min(RemainingEntries, Entries.size()); Entries.resize(NumToRead); ObjectIndexFile.Read(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), ReadOffset); std::string InvalidEntryReason; for (const CasDiskIndexEntry& Entry : Entries) { if (!cas::impl::ValidateEntry(Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; } m_LocationMap[Entry.Key] = m_Locations.size(); m_Locations.push_back(Entry.Location); ++EntryCount; } RemainingEntries -= NumToRead; ReadOffset += NumToRead * sizeof(CasDiskIndexEntry); } while (RemainingEntries); OutVersion = CasDiskIndexHeader::CurrentVersion; return Header.LogPosition; } else { ZEN_WARN("skipping invalid index file '{}'", IndexPath); } } return 0; } uint64_t CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::ReadLog"); if (!TCasLogFile::IsValid(LogPath)) { ZEN_WARN("removing invalid cas log at '{}'", LogPath); RemoveFile(LogPath); return 0; } size_t LogEntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing {} entries in {}", m_RootDirectory / m_ContainerBaseName, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (CasLog.Initialize()) { uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } LogEntryCount = EntryCount - SkipEntryCount; CasLog.Replay( [&](const CasDiskIndexEntry& Record) { LogEntryCount++; std::string InvalidEntryReason; if (Record.Flags & CasDiskIndexEntry::kTombstone) { m_LocationMap.erase(Record.Key); return; } if (!cas::impl::ValidateEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); return; } m_LocationMap[Record.Key] = m_Locations.size(); m_Locations.push_back(Record.Location); }, SkipEntryCount); return LogEntryCount; } return 0; } void CasContainerStrategy::OpenContainer(bool IsNewStore) { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::OpenContainer"); // Add .running file and delete on clean on close to detect bad termination m_LocationMap.clear(); m_Locations.clear(); std::filesystem::path BasePath = cas::impl::GetBasePath(m_RootDirectory, m_ContainerBaseName); if (IsNewStore) { DeleteDirectories(BasePath); } CreateDirectories(BasePath); m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); std::filesystem::path LogPath = cas::impl::GetLogPath(m_RootDirectory, m_ContainerBaseName); std::filesystem::path IndexPath = cas::impl::GetIndexPath(m_RootDirectory, m_ContainerBaseName); if (IsFile(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); RemoveFile(IndexPath); } } uint64_t LogEntryCount = 0; if (IsFile(LogPath)) { if (TCasLogFile::IsValid(LogPath)) { LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); } else { ZEN_WARN("removing invalid cas log at '{}'", LogPath); RemoveFile(LogPath); } } m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_LocationMap) { const BlockStoreDiskLocation& DiskLocation = m_Locations[Entry.second]; uint32_t BlockIndex = DiskLocation.GetBlockIndex(); KnownBlocks.insert(BlockIndex); } m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); if (IsNewStore || (LogEntryCount > 0)) { MakeIndexSnapshot(/*ResetLog*/ true); } // TODO: should validate integrity of container files here } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS TEST_CASE("compactcas.hex") { uint32_t Value; std::string HexString; CHECK(!ParseHexNumber("", Value)); char Hex[9]; ToHexNumber(0u, Hex); HexString = std::string(Hex); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0u); ToHexNumber(std::numeric_limits::max(), Hex); HexString = std::string(Hex); CHECK(HexString == "ffffffff"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == std::numeric_limits::max()); ToHexNumber(0xadf14711u, Hex); HexString = std::string(Hex); CHECK(HexString == "adf14711"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0xadf14711u); ToHexNumber(0x80000000u, Hex); HexString = std::string(Hex); CHECK(HexString == "80000000"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0x80000000u); ToHexNumber(0x718293a4u, Hex); HexString = std::string(Hex); CHECK(HexString == "718293a4"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0x718293a4u); } TEST_CASE("compactcas.compact.gc") { ScopedTemporaryDirectory TempDir; const int kIterationCount = 1000; std::vector Keys(kIterationCount); { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); for (int i = 0; i < kIterationCount; ++i) { CbObjectWriter Cbo; Cbo << "id" << i; CbObject Obj = Cbo.Save(); IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); const IoHash Hash = IoHash::HashBuffer(ObjBuffer); Cas.InsertChunk(ObjBuffer, Hash); Keys[i] = Hash; } for (int i = 0; i < kIterationCount; ++i) { IoBuffer Chunk = Cas.FindChunk(Keys[i]); CHECK(!!Chunk); CbObject Value = LoadCompactBinaryObject(Chunk); CHECK_EQ(Value["id"].AsInt32(), i); } } // Validate that we can still read the inserted data after closing // the original cas store { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); for (int i = 0; i < kIterationCount; ++i) { IoBuffer Chunk = Cas.FindChunk(Keys[i]); CHECK(!!Chunk); CbObject Value = LoadCompactBinaryObject(Chunk); CHECK_EQ(Value["id"].AsInt32(), i); } } } TEST_CASE("compactcas.compact.totalsize") { std::random_device rd; std::mt19937 g(rd()); // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1024; const int32_t kChunkCount = 16; { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { IoBuffer Chunk = CreateRandomBlob(kChunkSize); const IoHash Hash = IoHash::HashBuffer(Chunk); CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); ZEN_ASSERT(InsertResult.New); } const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } // Re-open again, this time we should have a snapshot { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } } } TEST_CASE("compactcas.threadedinsert") { // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1048; const int32_t kChunkCount = 4096; uint64_t ExpectedSize = 0; tsl::robin_map Chunks; Chunks.reserve(kChunkCount); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { while (true) { IoBuffer Chunk = CreateRandomBlob(kChunkSize); IoHash Hash = IoHash::HashBuffer(Chunk); if (Chunks.contains(Hash)) { continue; } Chunks[Hash] = Chunk; ExpectedSize += Chunk.Size(); break; } } std::atomic WorkCompleted = 0; WorkerThreadPool ThreadPool(4); GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); { for (const auto& Chunk : Chunks) { const IoHash& Hash = Chunk.first; const IoBuffer& Buffer = Chunk.second; ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() { CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); ZEN_ASSERT(InsertResult.New); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size()) { Sleep(1); } } WorkCompleted = 0; const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_LE(ExpectedSize, TotalSize); CHECK_GE(ExpectedSize + 32768, TotalSize); { for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() { IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); IoHash Hash = IoHash::HashBuffer(Buffer); CHECK(ChunkHash == Hash); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size()) { Sleep(1); } } tsl::robin_set GcChunkHashes; GcChunkHashes.reserve(Chunks.size()); for (const auto& Chunk : Chunks) { GcChunkHashes.insert(Chunk.first); } { WorkCompleted = 0; tsl::robin_map NewChunks; NewChunks.reserve(kChunkCount); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { IoBuffer Chunk = CreateRandomBlob(kChunkSize); IoHash Hash = IoHash::HashBuffer(Chunk); NewChunks[Hash] = Chunk; GcChunkHashes.insert(Hash); } std::atomic_uint32_t AddedChunkCount; for (const auto& Chunk : NewChunks) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { Cas.InsertChunk(Chunk.second, Chunk.first); AddedChunkCount.fetch_add(1); WorkCompleted.fetch_add(1); }); } for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() { IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); if (Buffer) { CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); } WorkCompleted.fetch_add(1); }); } tsl::robin_set ChunksToDelete; std::vector KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); size_t C = 0; while (C < KeepHashes.size()) { if (C % 155 == 0) { if (C < KeepHashes.size() - 1) { ChunksToDelete.insert(KeepHashes[C]); KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } if (C + 3 < KeepHashes.size() - 1) { ChunksToDelete.insert(KeepHashes[C + 3]); KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } } C++; } auto DoGC = [](CasContainerStrategy& Cas, const tsl::robin_set& ChunksToDelete, const std::vector& KeepHashes, tsl::robin_set& GcChunkHashes) { std::atomic_bool IsCancelledFlag = false; GcCtx Ctx = {.Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), .CollectSmallObjects = true, .IsDeleteMode = true, .CompactBlockUsageThresholdPercent = 100}, .IsCancelledFlag = IsCancelledFlag}; GcReferenceStoreStats PrunerStats; GcReferencePruner* Pruner = Cas.CreateReferencePruner(Ctx, PrunerStats); CHECK(Pruner); HashKeySet Deleted; GcStats Stats; GcStoreCompactor* Compactor = Pruner->RemoveUnreferencedData(Ctx, Stats, [&](std::span References) -> std::span { std::vector Unreferenced; HashKeySet Retain; Retain.AddHashesToSet(KeepHashes); for (const IoHash& ChunkHash : References) { if (!Retain.ContainsHash(ChunkHash)) { Unreferenced.push_back(ChunkHash); } } Deleted.AddHashesToSet(Unreferenced); return Unreferenced; }); if (Compactor) { Deleted.IterateHashes([&GcChunkHashes, &ChunksToDelete](const IoHash& ChunkHash) { CHECK(ChunksToDelete.contains(ChunkHash)); GcChunkHashes.erase(ChunkHash); }); GcCompactStoreStats CompactStats; Compactor->CompactStore(Ctx, CompactStats, []() { return 0; }); } }; while (AddedChunkCount.load() < NewChunks.size()) { DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes); } while (WorkCompleted < NewChunks.size() + Chunks.size()) { Sleep(1); } DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes); } { WorkCompleted = 0; for (const IoHash& ChunkHash : GcChunkHashes) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { CHECK(Cas.HaveChunk(ChunkHash)); CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < GcChunkHashes.size()) { Sleep(1); } } } } TEST_CASE("compactcas.iteratechunks") { std::atomic WorkCompleted = 0; WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); const uint64_t kChunkSize = 1048 + 395; const size_t kChunkCount = 63840; for (uint32_t N = 0; N < 4; N++) { GcManager Gc; CasContainerStrategy Cas(Gc); ScopedTemporaryDirectory TempDir; Cas.Initialize(TempDir.Path(), "test", 65536 * 128, 8, true); CHECK(Cas.IterateChunks( {}, [](size_t Index, const IoBuffer& Payload) { ZEN_UNUSED(Index, Payload); return true; }, &ThreadPool, 2048u)); uint64_t ExpectedSize = 0; std::vector Hashes; Hashes.reserve(kChunkCount); { Latch WorkLatch(1); tsl::robin_set ChunkHashesLookup; ChunkHashesLookup.reserve(kChunkCount); RwLock InsertLock; for (size_t Offset = 0; Offset < kChunkCount;) { size_t BatchCount = Min(kChunkCount - Offset, 512u); WorkLatch.AddCount(1); ThreadPool.ScheduleWork( [N, &WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); std::vector BatchBlobs; std::vector BatchHashes; BatchBlobs.reserve(BatchCount); BatchHashes.reserve(BatchCount); while (BatchBlobs.size() < BatchCount) { IoBuffer Chunk = CreateRandomBlob( N + kChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377)); IoHash Hash = IoHash::HashBuffer(Chunk); { RwLock::ExclusiveLockScope __(InsertLock); if (ChunkHashesLookup.contains(Hash)) { continue; } ChunkHashesLookup.insert(Hash); ExpectedSize += Chunk.Size(); } BatchBlobs.emplace_back(std::move(Chunk)); BatchHashes.push_back(Hash); } Cas.InsertChunks(BatchBlobs, BatchHashes); { RwLock::ExclusiveLockScope __(InsertLock); Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); } }); Offset += BatchCount; } WorkLatch.CountDown(); WorkLatch.Wait(); } WorkerThreadPool BatchWorkerPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "fetch"); { std::vector> FetchedFlags(Hashes.size()); std::atomic FetchedSize = 0; CHECK(Cas.IterateChunks( Hashes, [&Hashes, &FetchedFlags, &FetchedSize](size_t Index, const IoBuffer& Payload) { CHECK(FetchedFlags[Index].load() == false); FetchedFlags[Index].store(true); const IoHash& Hash = Hashes[Index]; CHECK(Hash == IoHash::HashBuffer(Payload)); FetchedSize += Payload.GetSize(); return true; }, &BatchWorkerPool, 2048u)); for (const auto& Flag : FetchedFlags) { CHECK(Flag.load()); } CHECK(FetchedSize == ExpectedSize); } Latch WorkLatch(1); for (size_t I = 0; I < 2; I++) { WorkLatch.AddCount(1); ThreadPool.ScheduleWork([&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); std::vector PartialHashes; PartialHashes.reserve(Hashes.size() / 4); for (size_t Index = 0; Index < Hashes.size(); Index++) { size_t TestIndex = Index + I; if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1)) { PartialHashes.push_back(Hashes[Index]); } } std::reverse(PartialHashes.begin(), PartialHashes.end()); std::vector NoFoundHashes; std::vector NoFindIndexes; NoFoundHashes.reserve(9); for (size_t J = 0; J < 9; J++) { std::string Data = fmt::format("oh no, we don't exist {}", J + 1); NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length())); } NoFindIndexes.reserve(9); // Sprinkle in chunks that are not found! auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]); NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]); NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]); NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]); NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]); NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]); NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]); NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]); NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]); NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); std::vector> FoundFlags(PartialHashes.size() + NoFoundHashes.size()); std::vector> FetchedCounts(PartialHashes.size() + NoFoundHashes.size()); CHECK(Cas.IterateChunks( PartialHashes, [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) { CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index)); uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1); CHECK(PreviousCount == 0); FoundFlags[Index] = !!Payload; const IoHash& Hash = PartialHashes[Index]; CHECK(Hash == IoHash::HashBuffer(Payload)); return true; }, &BatchWorkerPool, 2048u)); for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++) { CHECK(FetchedCounts[FoundIndex].load() <= 1); if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end()) { CHECK(FoundFlags[FoundIndex]); } else { CHECK(!FoundFlags[FoundIndex]); } } }); } WorkLatch.CountDown(); WorkLatch.Wait(); } } #endif void compactcas_forcelink() { } } // namespace zen