diff options
Diffstat (limited to 'src/zenstore/compactcas.cpp')
| -rw-r--r-- | src/zenstore/compactcas.cpp | 707 |
1 files changed, 607 insertions, 100 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 2be0542db..b00abb2cb 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -15,6 +15,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenstore/scrubcontext.h> +#include <zenutil/parallelwork.h> #include <gsl/gsl-lite.hpp> @@ -144,6 +145,16 @@ CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get(" CasContainerStrategy::~CasContainerStrategy() { + try + { + m_BlockStore.Close(); + m_CasLog.Flush(); + m_CasLog.Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~CasContainerStrategy failed with: ", Ex.what()); + } m_Gc.RemoveGcReferenceStore(*this); m_Gc.RemoveGcStorage(this); } @@ -203,12 +214,12 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const ZEN_TRACE_CPU("CasContainer::UpdateLocation"); BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; - m_CasLog.Append(IndexEntry); { RwLock::ExclusiveLockScope _(m_LocationMapLock); m_LocationMap.emplace(ChunkHash, m_Locations.size()); m_Locations.push_back(DiskLocation); } + m_CasLog.Append(IndexEntry); }); return CasStore::InsertResult{.New = true}; @@ -226,7 +237,7 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) } std::vector<CasStore::InsertResult> -CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes) +CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<const IoHash> ChunkHashes) { ZEN_MEMSCOPE(GetCasContainerTag()); @@ -272,7 +283,6 @@ CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> IndexEntries.emplace_back( CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)}); } - m_CasLog.Append(IndexEntries); { RwLock::ExclusiveLockScope _(m_LocationMapLock); for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries) @@ -281,6 +291,7 @@ CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> m_Locations.push_back(DiskIndexEntry.Location); } } + m_CasLog.Append(IndexEntries); }); return Result; } @@ -306,7 +317,12 @@ bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); - return m_LocationMap.contains(ChunkHash); + if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) + { + const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + return m_BlockStore.HasChunk(Location); + } + return false; } void @@ -323,7 +339,7 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) } bool -CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, +CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) @@ -360,7 +376,11 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, return true; } - auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) { + auto DoOneBlock = [this](const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, + uint64_t LargeSizeLimit, + std::span<const size_t> FoundChunkIndexes, + std::span<const BlockStoreLocation> FoundChunkLocations, + std::span<const size_t> ChunkIndexes) { if (ChunkIndexes.size() < 4) { for (size_t ChunkIndex : ChunkIndexes) @@ -376,57 +396,96 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, return m_BlockStore.IterateBlock( FoundChunkLocations, ChunkIndexes, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + [AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) { if (Data == nullptr) { return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer()); } return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size)); }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + [AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); }, LargeSizeLimit); }; - Latch WorkLatch(1); - std::atomic_bool AsyncContinue = true; - bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { - if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) + std::atomic<bool> AbortFlag; + { + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + try { - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (!AsyncContinue) - { - return; - } - try - { - bool Continue = DoOneBlock(ChunkIndexes); - if (!Continue) + const bool Continue = m_BlockStore.IterateChunks( + FoundChunkLocations, + [this, + &Work, + &AbortFlag, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, + &FoundChunkIndexes, + &FoundChunkLocations, + OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { + if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) { - AsyncContinue.store(false); - } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", - m_RootDirectory, + std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); + Work.ScheduleWork( + *OptionalWorkerPool, + [this, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, BlockIndex, - Ex.what()); - } - }); - return AsyncContinue.load(); + &FoundChunkIndexes, + &FoundChunkLocations, + ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } + try + { + bool Continue = + DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); + if (!Continue) + { + AbortFlag.store(true); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", + m_RootDirectory, + BlockIndex, + Ex.what()); + AbortFlag.store(true); + } + }); + return !AbortFlag.load(); + } + else + { + if (!DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes)) + { + AbortFlag.store(true); + } + return !AbortFlag.load(); + } + }); + if (!Continue) + { + AbortFlag.store(true); + } } - else + catch (const std::exception& Ex) { - return DoOneBlock(ChunkIndexes); + AbortFlag.store(true); + ZEN_WARN("Failed iterating chunks for cas root path {}. Reason: '{}'", m_RootDirectory, Ex.what()); } - }); - WorkLatch.CountDown(); - WorkLatch.Wait(); - return AsyncContinue.load() && Continue; + + Work.Wait(); + } + return !AbortFlag.load(); } void @@ -437,7 +496,7 @@ CasContainerStrategy::Flush() ZEN_TRACE_CPU("CasContainer::Flush"); m_BlockStore.Flush(/*ForceNewBlock*/ false); m_CasLog.Flush(); - MakeIndexSnapshot(); + MakeIndexSnapshot(/*ResetLog*/ false); } void @@ -677,7 +736,9 @@ public: m_CasContainerStrategy.m_BlockStore.CompactBlocks( BlockCompactState, m_CasContainerStrategy.m_PayloadAlignment, - [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { + [&](const BlockStore::MovedChunksArray& MovedArray, + const BlockStore::ChunkIndexArray& ScrubbedArray, + uint64_t FreedDiskSpace) { std::vector<CasDiskIndexEntry> MovedEntries; RwLock::ExclusiveLockScope _(m_CasContainerStrategy.m_LocationMapLock); for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) @@ -702,7 +763,27 @@ public: MovedEntries.push_back(CasDiskIndexEntry{.Key = Key, .Location = Location}); } } + for (size_t ChunkIndex : ScrubbedArray) + { + const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; + if (auto It = m_CasContainerStrategy.m_LocationMap.find(Key); + It != m_CasContainerStrategy.m_LocationMap.end()) + { + BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second]; + const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); + if (Location.Get(m_CasContainerStrategy.m_PayloadAlignment) != OldLocation) + { + // Someone has moved our chunk so lets just skip the new location we were provided, it will be + // GC:d at a later time + continue; + } + MovedEntries.push_back( + CasDiskIndexEntry{.Key = Key, .Location = Location, .Flags = CasDiskIndexEntry::kTombstone}); + m_CasContainerStrategy.m_LocationMap.erase(It); + } + } m_CasContainerStrategy.m_CasLog.Append(MovedEntries); + m_CasContainerStrategy.m_CasLog.Flush(); Stats.RemovedDisk += FreedDiskSpace; if (Ctx.IsCancelledFlag.load()) { @@ -900,13 +981,12 @@ CasContainerStrategy::StorageSize() const } void -CasContainerStrategy::MakeIndexSnapshot() +CasContainerStrategy::MakeIndexSnapshot(bool ResetLog) { ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::MakeIndexSnapshot"); - uint64_t LogCount = m_CasLog.GetLogCount(); - if (m_LogFlushPosition == LogCount) + if (m_LogFlushPosition == m_CasLog.GetLogCount()) { return; } @@ -923,34 +1003,17 @@ CasContainerStrategy::MakeIndexSnapshot() namespace fs = std::filesystem; - fs::path IndexPath = cas::impl::GetIndexPath(m_RootDirectory, m_ContainerBaseName); - fs::path TempIndexPath = cas::impl::GetTempIndexPath(m_RootDirectory, m_ContainerBaseName); - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(TempIndexPath)) - { - std::error_code Ec; - if (!fs::remove(TempIndexPath, Ec) || Ec) - { - ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", TempIndexPath, Ec.message()); - return; - } - } + const fs::path IndexPath = cas::impl::GetIndexPath(m_RootDirectory, m_ContainerBaseName); try { - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, TempIndexPath); - } - // Write the current state of the location map to a new index state std::vector<CasDiskIndexEntry> Entries; - uint64_t IndexLogPosition = 0; + // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock + const uint64_t IndexLogPosition = ResetLog ? 0 : m_CasLog.GetLogCount(); { RwLock::SharedLockScope ___(m_LocationMapLock); - IndexLogPosition = m_CasLog.GetLogCount(); Entries.resize(m_LocationMap.size()); uint64_t EntryIndex = 0; @@ -960,6 +1023,7 @@ CasContainerStrategy::MakeIndexSnapshot() IndexEntry.Key = Entry.first; IndexEntry.Location = m_Locations[Entry.second]; } + EntryCount = m_LocationMap.size(); } TemporaryFile ObjectIndexFile; @@ -969,7 +1033,7 @@ CasContainerStrategy::MakeIndexSnapshot() { throw std::system_error(Ec, fmt::format("Failed to create temp file for index snapshot at '{}'", IndexPath)); } - CasDiskIndexHeader Header = {.EntryCount = Entries.size(), + CasDiskIndexHeader Header = {.EntryCount = EntryCount, .LogPosition = IndexLogPosition, .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; @@ -981,35 +1045,34 @@ CasContainerStrategy::MakeIndexSnapshot() ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); if (Ec) { - throw std::system_error(Ec, fmt::format("Failed to move temp file '{}' to '{}'", ObjectIndexFile.GetPath(), IndexPath)); + throw std::system_error(Ec, + fmt::format("Snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", + ObjectIndexFile.GetPath(), + IndexPath, + Ec.message())); } - EntryCount = Entries.size(); - m_LogFlushPosition = IndexLogPosition; - } - catch (const std::exception& Err) - { - ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); - - // Restore any previous snapshot - if (fs::is_regular_file(TempIndexPath)) + if (ResetLog) { - std::error_code Ec; - fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless - fs::rename(TempIndexPath, IndexPath, Ec); - if (Ec) + const std::filesystem::path LogPath = cas::impl::GetLogPath(m_RootDirectory, m_ContainerBaseName); + + if (IsFile(LogPath)) { - ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", TempIndexPath, Ec.message()); + m_CasLog.Close(); + if (!RemoveFile(LogPath, Ec) || Ec) + { + // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in + // the end it will be the same result + ZEN_WARN("Snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); + } + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); } } + m_LogFlushPosition = IndexLogPosition; } - if (fs::is_regular_file(TempIndexPath)) + catch (const std::exception& Err) { - std::error_code Ec; - if (!fs::remove(TempIndexPath, Ec) || Ec) - { - ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempIndexPath, Ec.message()); - } + ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); } } @@ -1092,7 +1155,7 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski if (!TCasLogFile<CasDiskIndexEntry>::IsValid(LogPath)) { ZEN_WARN("removing invalid cas log at '{}'", LogPath); - std::filesystem::remove(LogPath); + RemoveFile(LogPath); return 0; } @@ -1115,7 +1178,7 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } - LogEntryCount = EntryCount - SkipEntryCount; + LogEntryCount = SkipEntryCount; CasLog.Replay( [&](const CasDiskIndexEntry& Record) { LogEntryCount++; @@ -1134,7 +1197,6 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski m_Locations.push_back(Record.Location); }, SkipEntryCount); - return LogEntryCount; } return 0; @@ -1155,7 +1217,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) if (IsNewStore) { - std::filesystem::remove_all(BasePath); + DeleteDirectories(BasePath); } CreateDirectories(BasePath); @@ -1165,19 +1227,19 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) std::filesystem::path LogPath = cas::impl::GetLogPath(m_RootDirectory, m_ContainerBaseName); std::filesystem::path IndexPath = cas::impl::GetIndexPath(m_RootDirectory, m_ContainerBaseName); - if (std::filesystem::is_regular_file(IndexPath)) + if (IsFile(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); - std::filesystem::remove(IndexPath); + RemoveFile(IndexPath); } } uint64_t LogEntryCount = 0; - if (std::filesystem::is_regular_file(LogPath)) + if (IsFile(LogPath)) { if (TCasLogFile<CasDiskIndexEntry>::IsValid(LogPath)) { @@ -1186,12 +1248,10 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) else { ZEN_WARN("removing invalid cas log at '{}'", LogPath); - std::filesystem::remove(LogPath); + RemoveFile(LogPath); } } - m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_LocationMap) @@ -1201,11 +1261,41 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) KnownBlocks.insert(BlockIndex); } - m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + bool RemovedEntries = false; + if (!MissingBlocks.empty()) + { + std::vector<CasDiskIndexEntry> MissingEntries; + for (auto& It : m_LocationMap) + { + const uint32_t BlockIndex = m_Locations[It.second].GetBlockIndex(); + if (MissingBlocks.contains(BlockIndex)) + { + MissingEntries.push_back({.Key = It.first, .Location = m_Locations[It.second], .Flags = CasDiskIndexEntry::kTombstone}); + } + } + ZEN_ASSERT(!MissingEntries.empty()); + + for (const CasDiskIndexEntry& Entry : MissingEntries) + { + m_LocationMap.erase(Entry.Key); + } + m_CasLog.Append(MissingEntries); + m_CasLog.Flush(); + + { + RwLock::ExclusiveLockScope IndexLock(m_LocationMapLock); + CompactIndex(IndexLock); + } + RemovedEntries = true; + } - if (IsNewStore || (LogEntryCount > 0)) + if (IsNewStore || (LogEntryCount > 0) || RemovedEntries) { - MakeIndexSnapshot(); + MakeIndexSnapshot(/*ResetLog*/ true); } // TODO: should validate integrity of container files here @@ -1573,6 +1663,423 @@ TEST_CASE("compactcas.threadedinsert") } } +TEST_CASE("compactcas.restart") +{ + uint64_t ExpectedSize = 0; + + auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) { + WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); + + Latch WorkLatch(1); + tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup; + ChunkHashesLookup.reserve(ChunkCount); + RwLock InsertLock; + for (size_t Offset = 0; Offset < ChunkCount;) + { + size_t BatchCount = Min<size_t>(ChunkCount - Offset, 512u); + WorkLatch.AddCount(1); + ThreadPool.ScheduleWork( + [&WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount, ChunkSize]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + + std::vector<IoBuffer> BatchBlobs; + std::vector<IoHash> BatchHashes; + BatchBlobs.reserve(BatchCount); + BatchHashes.reserve(BatchCount); + + while (BatchBlobs.size() < BatchCount) + { + IoBuffer Chunk = + CreateSemiRandomBlob(ChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377)); + IoHash Hash = IoHash::HashBuffer(Chunk); + { + RwLock::ExclusiveLockScope __(InsertLock); + if (ChunkHashesLookup.contains(Hash)) + { + continue; + } + ChunkHashesLookup.insert(Hash); + ExpectedSize += Chunk.Size(); + } + + BatchBlobs.emplace_back(CompressedBuffer::Compress(SharedBuffer(Chunk)).GetCompressed().Flatten().AsIoBuffer()); + BatchHashes.push_back(Hash); + } + + Cas.InsertChunks(BatchBlobs, BatchHashes); + { + RwLock::ExclusiveLockScope __(InsertLock); + Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); + } + }); + Offset += BatchCount; + } + WorkLatch.CountDown(); + WorkLatch.Wait(); + }; + + ScopedTemporaryDirectory TempDir; + std::filesystem::path CasPath = TempDir.Path(); + CreateDirectories(CasPath); + + bool Generate = false; + if (!Generate) + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + } + + const uint64_t kChunkSize = 1048 + 395; + const size_t kChunkCount = 7167; + + std::vector<IoHash> Hashes; + Hashes.reserve(kChunkCount); + + auto ValidateChunks = [&](CasContainerStrategy& Cas, std::span<const IoHash> Hashes, bool ShouldExist) { + for (const IoHash& Hash : Hashes) + { + if (ShouldExist) + { + CHECK(Cas.HaveChunk(Hash)); + IoBuffer Buffer = Cas.FindChunk(Hash); + CHECK(Buffer); + IoHash ValidateHash; + uint64_t ValidateRawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize); + CHECK(Compressed); + CHECK(ValidateHash == Hash); + } + else + { + CHECK(!Cas.HaveChunk(Hash)); + IoBuffer Buffer = Cas.FindChunk(Hash); + CHECK(!Buffer); + } + } + }; + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, true); + GenerateChunks(Cas, kChunkCount, kChunkSize, Hashes); + ValidateChunks(Cas, Hashes, true); + Cas.Flush(); + ValidateChunks(Cas, Hashes, true); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + GenerateChunks(Cas, kChunkCount, kChunkSize / 4, Hashes); + ValidateChunks(Cas, Hashes, true); + } + + class GcRefChecker : public GcReferenceChecker + { + public: + explicit GcRefChecker(std::vector<IoHash>&& HashesToKeep) : m_HashesToKeep(std::move(HashesToKeep)) {} + ~GcRefChecker() {} + std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return "test"; + } + void PreCache(GcCtx& Ctx) override { FilterReferences(Ctx, "test", m_HashesToKeep); } + void UpdateLockedState(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); } + std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override + { + ZEN_UNUSED(Ctx); + return KeepUnusedReferences(m_HashesToKeep, IoCids); + } + + private: + std::vector<IoHash> m_HashesToKeep; + }; + + class GcRef : public GcReferencer + { + public: + GcRef(GcManager& Gc, std::span<const IoHash> HashesToKeep) : m_Gc(Gc) + { + m_HashesToKeep.insert(m_HashesToKeep.begin(), HashesToKeep.begin(), HashesToKeep.end()); + m_Gc.AddGcReferencer(*this); + } + ~GcRef() { m_Gc.RemoveGcReferencer(*this); } + std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return "test"; + } + GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override + { + ZEN_UNUSED(Ctx, Stats); + return nullptr; + } + std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return {new GcRefChecker(std::move(m_HashesToKeep))}; + } + std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return {}; + } + + private: + GcManager& m_Gc; + std::vector<IoHash> m_HashesToKeep; + }; + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + GenerateChunks(Cas, kChunkCount, kChunkSize / 5, Hashes); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + GenerateChunks(Cas, kChunkCount, kChunkSize / 2, Hashes); + ValidateChunks(Cas, Hashes, true); + if (true) + { + std::vector<IoHash> DropHashes; + std::vector<IoHash> KeepHashes; + for (size_t Index = 0; Index < Hashes.size(); Index++) + { + if (Index % 5 == 0) + { + KeepHashes.push_back(Hashes[Index]); + } + else + { + DropHashes.push_back(Hashes[Index]); + } + } + // std::span<const IoHash> KeepHashes(Hashes); + // ZEN_ASSERT(ExpectedGcCount < Hashes.size()); + // KeepHashes = KeepHashes.subspan(ExpectedGcCount); + GcRef Ref(Gc, KeepHashes); + Gc.CollectGarbage(GcSettings{.CollectSmallObjects = true, .IsDeleteMode = true}); + ValidateChunks(Cas, KeepHashes, true); + ValidateChunks(Cas, DropHashes, false); + Hashes = KeepHashes; + } + GenerateChunks(Cas, kChunkCount, kChunkSize / 3, Hashes); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + Cas.Flush(); + ValidateChunks(Cas, Hashes, true); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + } +} + +TEST_CASE("compactcas.iteratechunks") +{ + std::atomic<size_t> WorkCompleted = 0; + WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); + + const uint64_t kChunkSize = 1048 + 395; + const size_t kChunkCount = 63840; + + for (uint32_t N = 0; N < 4; N++) + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + ScopedTemporaryDirectory TempDir; + Cas.Initialize(TempDir.Path(), "test", 65536 * 128, 8, true); + + CHECK(Cas.IterateChunks( + {}, + [](size_t Index, const IoBuffer& Payload) { + ZEN_UNUSED(Index, Payload); + return true; + }, + &ThreadPool, + 2048u)); + + uint64_t ExpectedSize = 0; + + std::vector<IoHash> Hashes; + Hashes.reserve(kChunkCount); + + { + Latch WorkLatch(1); + tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup; + ChunkHashesLookup.reserve(kChunkCount); + RwLock InsertLock; + for (size_t Offset = 0; Offset < kChunkCount;) + { + size_t BatchCount = Min<size_t>(kChunkCount - Offset, 512u); + WorkLatch.AddCount(1); + ThreadPool.ScheduleWork( + [N, &WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + + std::vector<IoBuffer> BatchBlobs; + std::vector<IoHash> BatchHashes; + BatchBlobs.reserve(BatchCount); + BatchHashes.reserve(BatchCount); + + while (BatchBlobs.size() < BatchCount) + { + IoBuffer Chunk = CreateRandomBlob( + N + kChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377)); + IoHash Hash = IoHash::HashBuffer(Chunk); + { + RwLock::ExclusiveLockScope __(InsertLock); + if (ChunkHashesLookup.contains(Hash)) + { + continue; + } + ChunkHashesLookup.insert(Hash); + ExpectedSize += Chunk.Size(); + } + + BatchBlobs.emplace_back(std::move(Chunk)); + BatchHashes.push_back(Hash); + } + + Cas.InsertChunks(BatchBlobs, BatchHashes); + { + RwLock::ExclusiveLockScope __(InsertLock); + Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); + } + }); + Offset += BatchCount; + } + WorkLatch.CountDown(); + WorkLatch.Wait(); + } + + WorkerThreadPool BatchWorkerPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "fetch"); + { + std::vector<std::atomic<bool>> FetchedFlags(Hashes.size()); + std::atomic<uint64_t> FetchedSize = 0; + CHECK(Cas.IterateChunks( + Hashes, + [&Hashes, &FetchedFlags, &FetchedSize](size_t Index, const IoBuffer& Payload) { + CHECK(FetchedFlags[Index].load() == false); + FetchedFlags[Index].store(true); + const IoHash& Hash = Hashes[Index]; + CHECK(Hash == IoHash::HashBuffer(Payload)); + FetchedSize += Payload.GetSize(); + return true; + }, + &BatchWorkerPool, + 2048u)); + for (const auto& Flag : FetchedFlags) + { + CHECK(Flag.load()); + } + CHECK(FetchedSize == ExpectedSize); + } + + Latch WorkLatch(1); + for (size_t I = 0; I < 2; I++) + { + WorkLatch.AddCount(1); + ThreadPool.ScheduleWork([&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + std::vector<IoHash> PartialHashes; + PartialHashes.reserve(Hashes.size() / 4); + for (size_t Index = 0; Index < Hashes.size(); Index++) + { + size_t TestIndex = Index + I; + if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1)) + { + PartialHashes.push_back(Hashes[Index]); + } + } + std::reverse(PartialHashes.begin(), PartialHashes.end()); + + std::vector<IoHash> NoFoundHashes; + std::vector<size_t> NoFindIndexes; + + NoFoundHashes.reserve(9); + for (size_t J = 0; J < 9; J++) + { + std::string Data = fmt::format("oh no, we don't exist {}", J + 1); + NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length())); + } + + NoFindIndexes.reserve(9); + + // Sprinkle in chunks that are not found! + auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + + std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size()); + std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size()); + + CHECK(Cas.IterateChunks( + PartialHashes, + [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) { + CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index)); + uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1); + CHECK(PreviousCount == 0); + FoundFlags[Index] = !!Payload; + const IoHash& Hash = PartialHashes[Index]; + CHECK(Hash == IoHash::HashBuffer(Payload)); + return true; + }, + &BatchWorkerPool, + 2048u)); + + for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++) + { + CHECK(FetchedCounts[FoundIndex].load() <= 1); + if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end()) + { + CHECK(FoundFlags[FoundIndex]); + } + else + { + CHECK(!FoundFlags[FoundIndex]); + } + } + }); + } + WorkLatch.CountDown(); + WorkLatch.Wait(); + } +} + #endif void |