// Copyright Epic Games, Inc. All Rights Reserved. #include "compactcas.h" #include "cas.h" #include #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include # include # include # include # include # include #endif ////////////////////////////////////////////////////////////////////////// namespace zen { struct CasDiskIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; static constexpr uint32_t CurrentVersion = 1; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t PayloadAlignment = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header) { return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } }; static_assert(sizeof(CasDiskIndexHeader) == 32); namespace { const char* IndexExtension = ".uidx"; const char* LogExtension = ".ulog"; std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return RootPath / ContainerBaseName; } std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension); } std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension); } std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return GetBasePath(RootPath, ContainerBaseName) / "blocks"; } bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); return false; } if (Entry.Flags & CasDiskIndexEntry::kTombstone) { return true; } if (Entry.ContentType != ZenContentType::kUnknownContentType) { OutReason = fmt::format("Invalid content type {} for entry {}", static_cast(Entry.ContentType), Entry.Key.ToHexString()); return false; } uint64_t Size = Entry.Location.GetSize(); if (Size == 0) { OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); return false; } return true; } } // namespace ////////////////////////////////////////////////////////////////////////// CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("containercas")) { } CasContainerStrategy::~CasContainerStrategy() { } void CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory, const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore) { ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); ZEN_ASSERT(MaxBlockSize > 0); m_RootDirectory = RootDirectory; m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; m_MaxBlockSize = MaxBlockSize; m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName); OpenContainer(IsNewStore); m_IsInitialized = true; } CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { { RwLock::SharedLockScope _(m_LocationMapLock); if (m_LocationMap.contains(ChunkHash)) { return CasStore::InsertResult{.New = false}; } } // We can end up in a situation that InsertChunk writes the same chunk data in // different locations. // We release the insert lock once we have the correct WriteBlock ready and we know // where to write the data. If a new InsertChunk request for the same chunk hash/data // comes in before we update m_LocationMap below we will have a race. // The outcome of that is that we will write the chunk data in more than one location // but the chunk hash will only point to one of the chunks. // We will in that case waste space until the next GC operation. // // This should be a rare occasion and the current flow reduces the time we block for // reads, insert and GC. m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) { BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; m_CasLog.Append(IndexEntry); { RwLock::ExclusiveLockScope _(m_LocationMapLock); m_LocationMap.emplace(ChunkHash, DiskLocation); } }); m_TotalSize.fetch_add(static_cast(ChunkSize), std::memory_order::relaxed); return CasStore::InsertResult{.New = true}; } CasStore::InsertResult CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { #if !ZEN_WITH_TESTS ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); #endif return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt == m_LocationMap.end()) { return IoBuffer(); } const BlockStoreLocation& Location = KeyIt->second.Get(m_PayloadAlignment); IoBuffer Chunk = m_BlockStore.TryGetChunk(Location); return Chunk; } bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); return m_LocationMap.contains(ChunkHash); } void CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) { // This implementation is good enough for relatively small // chunk sets (in terms of chunk identifiers), but would // benefit from a better implementation which removes // items incrementally for large sets, especially when // we're likely to already have a large proportion of the // chunks in the set InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void CasContainerStrategy::Flush() { m_BlockStore.Flush(); MakeIndexSnapshot(); } void CasContainerStrategy::Scrub(ScrubContext& Ctx) { std::vector BadKeys; uint64_t ChunkCount{0}, ChunkBytes{0}; std::vector ChunkLocations; std::vector ChunkIndexToChunkHash; RwLock::SharedLockScope _(m_LocationMapLock); uint64_t TotalChunkCount = m_LocationMap.size(); ChunkLocations.reserve(TotalChunkCount); ChunkIndexToChunkHash.reserve(TotalChunkCount); { for (const auto& Entry : m_LocationMap) { const IoHash& ChunkHash = Entry.first; const BlockStoreDiskLocation& DiskLocation = Entry.second; BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); ChunkLocations.push_back(Location); ChunkIndexToChunkHash.push_back(ChunkHash); } } const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { ++ChunkCount; ChunkBytes += Size; const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; if (!Data) { // ChunkLocation out of range of stored blocks BadKeys.push_back(Hash); return; } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed) { if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash) { // Hash mismatch BadKeys.push_back(Hash); return; } return; } #if ZEN_WITH_TESTS IoHash ComputedHash = IoHash::HashBuffer(Data, Size); if (ComputedHash == Hash) { return; } #endif BadKeys.push_back(Hash); }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { ++ChunkCount; ChunkBytes += Size; const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); // TODO: Add API to verify compressed buffer without having to memorymap the whole file if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed) { if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash) { // Hash mismatch BadKeys.push_back(Hash); return; } return; } #if ZEN_WITH_TESTS IoHashStream Hasher; File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); if (ComputedHash == Hash) { return; } #endif BadKeys.push_back(Hash); }; m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); _.ReleaseNow(); Ctx.ReportScrubbed(ChunkCount, ChunkBytes); if (!BadKeys.empty()) { ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName); if (Ctx.RunRecovery()) { // Deal with bad chunks by removing them from our lookup map std::vector LogEntries; LogEntries.reserve(BadKeys.size()); { RwLock::ExclusiveLockScope __(m_LocationMapLock); for (const IoHash& ChunkHash : BadKeys) { const auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt == m_LocationMap.end()) { // Might have been GC'd continue; } LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); m_LocationMap.erase(KeyIt); } } m_CasLog.Append(LogEntries); } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do Ctx.ReportBadCidChunks(BadKeys); ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); } void CasContainerStrategy::CollectGarbage(GcContext& GcCtx) { // It collects all the blocks that we want to delete chunks from. For each such // block we keep a list of chunks to retain and a list of chunks to delete. // // If there is a block that we are currently writing to, that block is omitted // from the garbage collection. // // Next it will iterate over all blocks that we want to remove chunks from. // If the block is empty after removal of chunks we mark the block as pending // delete - we want to delete it as soon as there are no IoBuffers using the // block file. // Once complete we update the m_LocationMap by removing the chunks. // // If the block is non-empty we write out the chunks we want to keep to a new // block file (creating new block files as needed). // // We update the index as we complete each new block file. This makes it possible // to break the GC if we want to limit time for execution. // // GC can very parallell to regular operation - it will block while taking // a snapshot of the current m_LocationMap state and while moving blocks it will // do a blocking operation and update the m_LocationMap after each new block is // written and figuring out the path to the next new block. ZEN_INFO("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName); uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; LocationMap_t LocationMap; BlockStore::ReclaimSnapshotState BlockStoreState; { RwLock::SharedLockScope ___(m_LocationMapLock); Stopwatch Timer; const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); LocationMap = m_LocationMap; BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); } uint64_t TotalChunkCount = LocationMap.size(); std::vector TotalChunkHashes; TotalChunkHashes.reserve(TotalChunkCount); for (const auto& Entry : LocationMap) { TotalChunkHashes.push_back(Entry.first); } std::vector ChunkLocations; BlockStore::ChunkIndexArray KeepChunkIndexes; std::vector ChunkIndexToChunkHash; ChunkLocations.reserve(TotalChunkCount); ChunkIndexToChunkHash.reserve(TotalChunkCount); GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { auto KeyIt = LocationMap.find(ChunkHash); const BlockStoreDiskLocation& DiskLocation = KeyIt->second; BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; if (Keep) { KeepChunkIndexes.push_back(ChunkIndex); } }); const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); return; } std::vector DeletedChunks; m_BlockStore.ReclaimSpace( BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, false, [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { std::vector LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); for (const auto& Entry : MovedChunks) { size_t ChunkIndex = Entry.first; const BlockStoreLocation& NewLocation = Entry.second; const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}}); } for (const size_t ChunkIndex : RemovedChunks) { const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; const BlockStoreDiskLocation& OldDiskLocation = LocationMap[ChunkHash]; LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone}); DeletedChunks.push_back(ChunkHash); } m_CasLog.Append(LogEntries); m_CasLog.Flush(); { RwLock::ExclusiveLockScope __(m_LocationMapLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); for (const CasDiskIndexEntry& Entry : LogEntries) { if (Entry.Flags & CasDiskIndexEntry::kTombstone) { m_LocationMap.erase(Entry.Key); uint64_t ChunkSize = Entry.Location.GetSize(); m_TotalSize.fetch_sub(ChunkSize); continue; } m_LocationMap[Entry.Key] = Entry.Location; } } }, [&GcCtx]() { return GcCtx.CollectSmallObjects(); }); GcCtx.AddDeletedCids(DeletedChunks); } void CasContainerStrategy::MakeIndexSnapshot() { ZEN_INFO("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", m_RootDirectory / m_ContainerBaseName, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName); // Move index away, we keep it if something goes wrong if (fs::is_regular_file(TempIndexPath)) { fs::remove(TempIndexPath); } if (fs::is_regular_file(IndexPath)) { fs::rename(IndexPath, TempIndexPath); } try { m_CasLog.Flush(); // Write the current state of the location map to a new index state uint64_t LogCount = 0; std::vector Entries; { RwLock::SharedLockScope ___(m_LocationMapLock); Entries.resize(m_LocationMap.size()); uint64_t EntryIndex = 0; for (auto& Entry : m_LocationMap) { CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; IndexEntry.Key = Entry.first; IndexEntry.Location = Entry.second; } LogCount = m_CasLog.GetLogCount(); } BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); CasDiskIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount, .PayloadAlignment = gsl::narrow(m_PayloadAlignment)}; Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header); ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0); ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry)); ObjectIndexFile.Flush(); ObjectIndexFile.Close(); EntryCount = Entries.size(); } catch (std::exception& Err) { ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); // Restore any previous snapshot if (fs::is_regular_file(TempIndexPath)) { fs::remove(IndexPath); fs::rename(TempIndexPath, IndexPath); } } if (fs::is_regular_file(TempIndexPath)) { fs::remove(TempIndexPath); } } uint64_t CasContainerStrategy::ReadIndexFile() { std::vector Entries; std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); if (std::filesystem::is_regular_file(IndexPath)) { Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing #{} entries in {}", m_RootDirectory / m_ContainerBaseName, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t Size = ObjectIndexFile.FileSize(); if (Size >= sizeof(CasDiskIndexHeader)) { uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); CasDiskIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) && (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && (Header.EntryCount <= ExpectedEntryCount)) { Entries.resize(Header.EntryCount); ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); m_PayloadAlignment = Header.PayloadAlignment; std::string InvalidEntryReason; for (const CasDiskIndexEntry& Entry : Entries) { if (!ValidateEntry(Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; } m_LocationMap[Entry.Key] = Entry.Location; } return Header.LogPosition; } else { ZEN_WARN("skipping invalid index file '{}'", IndexPath); } } } return 0; } uint64_t CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) { std::vector Entries; std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); if (std::filesystem::is_regular_file(LogPath)) { Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing #{} entries in {}", m_RootDirectory / m_ContainerBaseName, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (CasLog.Initialize()) { uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } uint64_t ReadCount = EntryCount - SkipEntryCount; Entries.reserve(ReadCount); CasLog.Replay( [&](const CasDiskIndexEntry& Record) { std::string InvalidEntryReason; if (Record.Flags & CasDiskIndexEntry::kTombstone) { m_LocationMap.erase(Record.Key); return; } if (!ValidateEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); return; } m_LocationMap[Record.Key] = Record.Location; }, SkipEntryCount); return ReadCount; } } return 0; } void CasContainerStrategy::OpenContainer(bool IsNewStore) { // Add .running file and delete on clean on close to detect bad termination m_TotalSize = 0; m_LocationMap.clear(); std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName); if (IsNewStore) { std::filesystem::remove_all(BasePath); } uint64_t LogPosition = ReadIndexFile(); uint64_t LogEntryCount = ReadLog(LogPosition); CreateDirectories(BasePath); std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); std::vector KnownLocations; KnownLocations.reserve(m_LocationMap.size()); for (const auto& Entry : m_LocationMap) { const BlockStoreDiskLocation& Location = Entry.second; m_TotalSize.fetch_add(Location.GetSize(), std::memory_order::relaxed); KnownLocations.push_back(Location.Get(m_PayloadAlignment)); } m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); if (IsNewStore || (LogEntryCount > 0)) { MakeIndexSnapshot(); } // TODO: should validate integrity of container files here } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS namespace { static IoBuffer CreateChunk(uint64_t Size) { static std::random_device rd; static std::mt19937 g(rd()); std::vector Values; Values.resize(Size); for (size_t Idx = 0; Idx < Size; ++Idx) { Values[Idx] = static_cast(Idx); } std::shuffle(Values.begin(), Values.end(), g); return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); } } // namespace TEST_CASE("compactcas.hex") { uint32_t Value; std::string HexString; CHECK(!ParseHexNumber("", Value)); char Hex[9]; ToHexNumber(0u, Hex); HexString = std::string(Hex); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0u); ToHexNumber(std::numeric_limits::max(), Hex); HexString = std::string(Hex); CHECK(HexString == "ffffffff"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == std::numeric_limits::max()); ToHexNumber(0xadf14711u, Hex); HexString = std::string(Hex); CHECK(HexString == "adf14711"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0xadf14711u); ToHexNumber(0x80000000u, Hex); HexString = std::string(Hex); CHECK(HexString == "80000000"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0x80000000u); ToHexNumber(0x718293a4u, Hex); HexString = std::string(Hex); CHECK(HexString == "718293a4"); CHECK(ParseHexNumber(HexString, Value)); CHECK(Value == 0x718293a4u); } TEST_CASE("compactcas.compact.gc") { ScopedTemporaryDirectory TempDir; const int kIterationCount = 1000; std::vector Keys(kIterationCount); { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); for (int i = 0; i < kIterationCount; ++i) { CbObjectWriter Cbo; Cbo << "id" << i; CbObject Obj = Cbo.Save(); IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); const IoHash Hash = HashBuffer(ObjBuffer); Cas.InsertChunk(ObjBuffer, Hash); Keys[i] = Hash; } for (int i = 0; i < kIterationCount; ++i) { IoBuffer Chunk = Cas.FindChunk(Keys[i]); CHECK(!!Chunk); CbObject Value = LoadCompactBinaryObject(Chunk); CHECK_EQ(Value["id"].AsInt32(), i); } } // Validate that we can still read the inserted data after closing // the original cas store { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); for (int i = 0; i < kIterationCount; ++i) { IoBuffer Chunk = Cas.FindChunk(Keys[i]); CHECK(!!Chunk); CbObject Value = LoadCompactBinaryObject(Chunk); CHECK_EQ(Value["id"].AsInt32(), i); } } } TEST_CASE("compactcas.compact.totalsize") { std::random_device rd; std::mt19937 g(rd()); // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1024; const int32_t kChunkCount = 16; { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { IoBuffer Chunk = CreateChunk(kChunkSize); const IoHash Hash = HashBuffer(Chunk); CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); ZEN_ASSERT(InsertResult.New); } const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } // Re-open again, this time we should have a snapshot { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } } } TEST_CASE("compactcas.gc.basic") { ScopedTemporaryDirectory TempDir; GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); IoBuffer Chunk = CreateChunk(128); IoHash ChunkHash = IoHash::HashBuffer(Chunk); const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); CHECK(InsertResult.New); Cas.Flush(); GcContext GcCtx; GcCtx.CollectSmallObjects(true); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHash)); } TEST_CASE("compactcas.gc.removefile") { ScopedTemporaryDirectory TempDir; IoBuffer Chunk = CreateChunk(128); IoHash ChunkHash = IoHash::HashBuffer(Chunk); { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); CHECK(InsertResult.New); const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash); CHECK(!InsertResultDup.New); Cas.Flush(); } GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false); GcContext GcCtx; GcCtx.CollectSmallObjects(true); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHash)); } TEST_CASE("compactcas.gc.compact") { // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true); uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; std::vector Chunks; Chunks.reserve(9); for (uint64_t Size : ChunkSizes) { Chunks.push_back(CreateChunk(Size)); } std::vector ChunkHashes; ChunkHashes.reserve(9); for (const IoBuffer& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New); CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New); CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New); CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New); CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New); CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New); CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New); CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New); CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New); CHECK(Cas.HaveChunk(ChunkHashes[0])); CHECK(Cas.HaveChunk(ChunkHashes[1])); CHECK(Cas.HaveChunk(ChunkHashes[2])); CHECK(Cas.HaveChunk(ChunkHashes[3])); CHECK(Cas.HaveChunk(ChunkHashes[4])); CHECK(Cas.HaveChunk(ChunkHashes[5])); CHECK(Cas.HaveChunk(ChunkHashes[6])); CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); uint64_t InitialSize = Cas.StorageSize().DiskSize; // Keep first and last { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[0]); KeepChunks.push_back(ChunkHashes[8]); GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(!Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(!Cas.HaveChunk(ChunkHashes[6])); CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); } Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[4], ChunkHashes[4]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[6], ChunkHashes[6]); Cas.InsertChunk(Chunks[7], ChunkHashes[7]); // Keep last { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[8]); GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(!Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(!Cas.HaveChunk(ChunkHashes[6])); CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[4], ChunkHashes[4]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[6], ChunkHashes[6]); Cas.InsertChunk(Chunks[7], ChunkHashes[7]); } // Keep mixed { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[1]); KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[7]); GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); CHECK(Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(!Cas.HaveChunk(ChunkHashes[6])); CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(!Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); Cas.InsertChunk(Chunks[0], ChunkHashes[0]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[6], ChunkHashes[6]); Cas.InsertChunk(Chunks[8], ChunkHashes[8]); } // Keep multiple at end { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[7]); KeepChunks.push_back(ChunkHashes[8]); GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(!Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(!Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(Cas.HaveChunk(ChunkHashes[6])); CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); Cas.InsertChunk(Chunks[0], ChunkHashes[0]); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[4], ChunkHashes[4]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); } // Keep every other { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; KeepChunks.push_back(ChunkHashes[0]); KeepChunks.push_back(ChunkHashes[2]); KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[8]); GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(Cas.HaveChunk(ChunkHashes[0])); CHECK(!Cas.HaveChunk(ChunkHashes[1])); CHECK(Cas.HaveChunk(ChunkHashes[2])); CHECK(!Cas.HaveChunk(ChunkHashes[3])); CHECK(Cas.HaveChunk(ChunkHashes[4])); CHECK(!Cas.HaveChunk(ChunkHashes[5])); CHECK(Cas.HaveChunk(ChunkHashes[6])); CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); Cas.InsertChunk(Chunks[5], ChunkHashes[5]); Cas.InsertChunk(Chunks[7], ChunkHashes[7]); } // Verify that we nicely appended blocks even after all GC operations CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3]))); CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5]))); CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); uint64_t FinalSize = Cas.StorageSize().DiskSize; CHECK(InitialSize == FinalSize); } } TEST_CASE("compactcas.gc.deleteblockonopen") { ScopedTemporaryDirectory TempDir; uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; std::vector Chunks; Chunks.reserve(20); for (uint64_t Size : ChunkSizes) { Chunks.push_back(CreateChunk(Size)); } std::vector ChunkHashes; ChunkHashes.reserve(20); for (const IoBuffer& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } { GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); for (size_t i = 0; i < 20; i++) { CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); } // GC every other block { GcContext GcCtx; GcCtx.CollectSmallObjects(true); std::vector KeepChunks; for (size_t i = 0; i < 20; i += 2) { KeepChunks.push_back(ChunkHashes[i]); } GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); for (size_t i = 0; i < 20; i += 2) { CHECK(Cas.HaveChunk(ChunkHashes[i])); CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); } } } { // Re-open GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 1024, 16, false); for (size_t i = 0; i < 20; i += 2) { CHECK(Cas.HaveChunk(ChunkHashes[i])); CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); } } } TEST_CASE("compactcas.gc.handleopeniobuffer") { ScopedTemporaryDirectory TempDir; uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; std::vector Chunks; Chunks.reserve(20); for (const uint64_t& Size : ChunkSizes) { Chunks.push_back(CreateChunk(Size)); } std::vector ChunkHashes; ChunkHashes.reserve(20); for (const IoBuffer& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); for (size_t i = 0; i < 20; i++) { CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); } IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]); Cas.Flush(); // GC everything GcContext GcCtx; GcCtx.CollectSmallObjects(true); Cas.CollectGarbage(GcCtx); for (size_t i = 0; i < 20; i++) { CHECK(!Cas.HaveChunk(ChunkHashes[i])); } CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); } TEST_CASE("compactcas.threadedinsert") { // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1048; const int32_t kChunkCount = 4096; uint64_t ExpectedSize = 0; std::unordered_map Chunks; Chunks.reserve(kChunkCount); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { while (true) { IoBuffer Chunk = CreateChunk(kChunkSize); IoHash Hash = HashBuffer(Chunk); if (Chunks.contains(Hash)) { continue; } Chunks[Hash] = Chunk; ExpectedSize += Chunk.Size(); break; } } std::atomic WorkCompleted = 0; WorkerThreadPool ThreadPool(4); GcManager Gc; CasContainerStrategy Cas(Gc); Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); { for (const auto& Chunk : Chunks) { const IoHash& Hash = Chunk.first; const IoBuffer& Buffer = Chunk.second; ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() { CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); ZEN_ASSERT(InsertResult.New); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size()) { Sleep(1); } } WorkCompleted = 0; const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(ExpectedSize, TotalSize); { for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() { IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); IoHash Hash = IoHash::HashBuffer(Buffer); CHECK(ChunkHash == Hash); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size()) { Sleep(1); } } std::unordered_set GcChunkHashes; GcChunkHashes.reserve(Chunks.size()); for (const auto& Chunk : Chunks) { GcChunkHashes.insert(Chunk.first); } { WorkCompleted = 0; std::unordered_map NewChunks; NewChunks.reserve(kChunkCount); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { IoBuffer Chunk = CreateChunk(kChunkSize); IoHash Hash = HashBuffer(Chunk); NewChunks[Hash] = Chunk; } std::atomic_uint32_t AddedChunkCount; for (const auto& Chunk : NewChunks) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { Cas.InsertChunk(Chunk.second, Chunk.first); AddedChunkCount.fetch_add(1); WorkCompleted.fetch_add(1); }); } for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() { IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); if (Buffer) { CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); } WorkCompleted.fetch_add(1); }); } while (AddedChunkCount.load() < NewChunks.size()) { // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope for (const auto& Chunk : NewChunks) { if (Cas.HaveChunk(Chunk.first)) { GcChunkHashes.emplace(Chunk.first); } } std::vector KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); size_t C = 0; while (C < KeepHashes.size()) { if (C % 155 == 0) { if (C < KeepHashes.size() - 1) { KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } if (C + 3 < KeepHashes.size() - 1) { KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } } C++; } GcContext GcCtx; GcCtx.CollectSmallObjects(true); GcCtx.AddRetainedCids(KeepHashes); Cas.CollectGarbage(GcCtx); const HashKeySet& Deleted = GcCtx.DeletedCids(); Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } while (WorkCompleted < NewChunks.size() + Chunks.size()) { Sleep(1); } // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope for (const auto& Chunk : NewChunks) { if (Cas.HaveChunk(Chunk.first)) { GcChunkHashes.emplace(Chunk.first); } } std::vector KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); size_t C = 0; while (C < KeepHashes.size()) { if (C % 155 == 0) { if (C < KeepHashes.size() - 1) { KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } if (C + 3 < KeepHashes.size() - 1) { KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } } C++; } GcContext GcCtx; GcCtx.CollectSmallObjects(true); GcCtx.AddRetainedCids(KeepHashes); Cas.CollectGarbage(GcCtx); const HashKeySet& Deleted = GcCtx.DeletedCids(); Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } { WorkCompleted = 0; for (const IoHash& ChunkHash : GcChunkHashes) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { CHECK(Cas.HaveChunk(ChunkHash)); CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < GcChunkHashes.size()) { Sleep(1); } } } } #endif void compactcas_forcelink() { } } // namespace zen