diff options
| author | Dan Engelbrecht <[email protected]> | 2025-06-09 09:03:39 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-06-09 09:03:39 +0200 |
| commit | 6f2d68d2c11011d541259d0037908dd76eadeb8a (patch) | |
| tree | 4fa165343dd42544dded51fad0e13ebae44dd442 /src/zenstore/compactcas.cpp | |
| parent | 5.6.10-pre0 (diff) | |
| download | zen-6f2d68d2c11011d541259d0037908dd76eadeb8a.tar.xz zen-6f2d68d2c11011d541259d0037908dd76eadeb8a.zip | |
missing chunks bugfix (#424)
* make sure to close log file when resetting log
* drop entries that refers to missing blocks
* Don't scrub keys that has been rewritten
* currectly count added bytes / m_TotalSize
* fix negative sleep time in BlockStoreFile::Open()
* be defensive when fetching log position
* append to log files *after* we updated all state successfully
* explicitly close stuff in destructors with exception catching
* clean up empty size block store files
Diffstat (limited to 'src/zenstore/compactcas.cpp')
| -rw-r--r-- | src/zenstore/compactcas.cpp | 306 |
1 files changed, 290 insertions, 16 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 0c9302ec8..2ab5752ff 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -145,6 +145,16 @@ CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get(" CasContainerStrategy::~CasContainerStrategy() { + try + { + m_BlockStore.Close(); + m_CasLog.Flush(); + m_CasLog.Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~CasContainerStrategy failed with: ", Ex.what()); + } m_Gc.RemoveGcReferenceStore(*this); m_Gc.RemoveGcStorage(this); } @@ -204,12 +214,12 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const ZEN_TRACE_CPU("CasContainer::UpdateLocation"); BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; - m_CasLog.Append(IndexEntry); { RwLock::ExclusiveLockScope _(m_LocationMapLock); m_LocationMap.emplace(ChunkHash, m_Locations.size()); m_Locations.push_back(DiskLocation); } + m_CasLog.Append(IndexEntry); }); return CasStore::InsertResult{.New = true}; @@ -273,7 +283,6 @@ CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<c IndexEntries.emplace_back( CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)}); } - m_CasLog.Append(IndexEntries); { RwLock::ExclusiveLockScope _(m_LocationMapLock); for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries) @@ -282,6 +291,7 @@ CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<c m_Locations.push_back(DiskIndexEntry.Location); } } + m_CasLog.Append(IndexEntries); }); return Result; } @@ -746,19 +756,27 @@ public: MovedEntries.push_back(CasDiskIndexEntry{.Key = Key, .Location = Location}); } } - for (size_t ScrubbedIndex : ScrubbedArray) + for (size_t ChunkIndex : ScrubbedArray) { - const IoHash& Key = BlockCompactStateKeys[ScrubbedIndex]; + const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; if (auto It = m_CasContainerStrategy.m_LocationMap.find(Key); It != m_CasContainerStrategy.m_LocationMap.end()) { - BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second]; + BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second]; + const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); + if (Location.Get(m_CasContainerStrategy.m_PayloadAlignment) != OldLocation) + { + // Someone has moved our chunk so lets just skip the new location we were provided, it will be + // GC:d at a later time + continue; + } MovedEntries.push_back( CasDiskIndexEntry{.Key = Key, .Location = Location, .Flags = CasDiskIndexEntry::kTombstone}); m_CasContainerStrategy.m_LocationMap.erase(It); } } m_CasContainerStrategy.m_CasLog.Append(MovedEntries); + m_CasContainerStrategy.m_CasLog.Flush(); Stats.RemovedDisk += FreedDiskSpace; if (Ctx.IsCancelledFlag.load()) { @@ -984,14 +1002,11 @@ CasContainerStrategy::MakeIndexSnapshot(bool ResetLog) { // Write the current state of the location map to a new index state std::vector<CasDiskIndexEntry> Entries; - uint64_t IndexLogPosition = 0; + // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock + const uint64_t IndexLogPosition = ResetLog ? 0 : m_CasLog.GetLogCount(); { RwLock::SharedLockScope ___(m_LocationMapLock); - if (!ResetLog) - { - IndexLogPosition = m_CasLog.GetLogCount(); - } Entries.resize(m_LocationMap.size()); uint64_t EntryIndex = 0; @@ -1036,12 +1051,14 @@ CasContainerStrategy::MakeIndexSnapshot(bool ResetLog) if (IsFile(LogPath)) { + m_CasLog.Close(); if (!RemoveFile(LogPath, Ec) || Ec) { // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in // the end it will be the same result ZEN_WARN("Snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); } + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); } } m_LogFlushPosition = IndexLogPosition; @@ -1154,7 +1171,7 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } - LogEntryCount = EntryCount - SkipEntryCount; + LogEntryCount = SkipEntryCount; CasLog.Replay( [&](const CasDiskIndexEntry& Record) { LogEntryCount++; @@ -1173,7 +1190,6 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski m_Locations.push_back(Record.Location); }, SkipEntryCount); - return LogEntryCount; } return 0; @@ -1229,8 +1245,6 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) } } - m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_LocationMap) @@ -1240,9 +1254,39 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) KnownBlocks.insert(BlockIndex); } - m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + bool RemovedEntries = false; + if (!MissingBlocks.empty()) + { + std::vector<CasDiskIndexEntry> MissingEntries; + for (auto& It : m_LocationMap) + { + const uint32_t BlockIndex = m_Locations[It.second].GetBlockIndex(); + if (MissingBlocks.contains(BlockIndex)) + { + MissingEntries.push_back({.Key = It.first, .Location = m_Locations[It.second], .Flags = CasDiskIndexEntry::kTombstone}); + } + } + ZEN_ASSERT(!MissingEntries.empty()); + + for (const CasDiskIndexEntry& Entry : MissingEntries) + { + m_LocationMap.erase(Entry.Key); + } + m_CasLog.Append(MissingEntries); + m_CasLog.Flush(); + + { + RwLock::ExclusiveLockScope IndexLock(m_LocationMapLock); + CompactIndex(IndexLock); + } + RemovedEntries = true; + } - if (IsNewStore || (LogEntryCount > 0)) + if (IsNewStore || (LogEntryCount > 0) || RemovedEntries) { MakeIndexSnapshot(/*ResetLog*/ true); } @@ -1612,6 +1656,236 @@ TEST_CASE("compactcas.threadedinsert") } } +TEST_CASE("compactcas.restart") +{ + uint64_t ExpectedSize = 0; + + auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) { + WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); + + Latch WorkLatch(1); + tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup; + ChunkHashesLookup.reserve(ChunkCount); + RwLock InsertLock; + for (size_t Offset = 0; Offset < ChunkCount;) + { + size_t BatchCount = Min<size_t>(ChunkCount - Offset, 512u); + WorkLatch.AddCount(1); + ThreadPool.ScheduleWork( + [&WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount, ChunkSize]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + + std::vector<IoBuffer> BatchBlobs; + std::vector<IoHash> BatchHashes; + BatchBlobs.reserve(BatchCount); + BatchHashes.reserve(BatchCount); + + while (BatchBlobs.size() < BatchCount) + { + IoBuffer Chunk = + CreateSemiRandomBlob(ChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377)); + IoHash Hash = IoHash::HashBuffer(Chunk); + { + RwLock::ExclusiveLockScope __(InsertLock); + if (ChunkHashesLookup.contains(Hash)) + { + continue; + } + ChunkHashesLookup.insert(Hash); + ExpectedSize += Chunk.Size(); + } + + BatchBlobs.emplace_back(CompressedBuffer::Compress(SharedBuffer(Chunk)).GetCompressed().Flatten().AsIoBuffer()); + BatchHashes.push_back(Hash); + } + + Cas.InsertChunks(BatchBlobs, BatchHashes); + { + RwLock::ExclusiveLockScope __(InsertLock); + Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); + } + }); + Offset += BatchCount; + } + WorkLatch.CountDown(); + WorkLatch.Wait(); + }; + + ScopedTemporaryDirectory TempDir; + std::filesystem::path CasPath = TempDir.Path(); + CreateDirectories(CasPath); + + bool Generate = false; + if (!Generate) + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + } + + const uint64_t kChunkSize = 1048 + 395; + const size_t kChunkCount = 7167; + + std::vector<IoHash> Hashes; + Hashes.reserve(kChunkCount); + + auto ValidateChunks = [&](CasContainerStrategy& Cas, std::span<const IoHash> Hashes, bool ShouldExist) { + for (const IoHash& Hash : Hashes) + { + if (ShouldExist) + { + CHECK(Cas.HaveChunk(Hash)); + IoBuffer Buffer = Cas.FindChunk(Hash); + CHECK(Buffer); + IoHash ValidateHash; + uint64_t ValidateRawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize); + CHECK(Compressed); + CHECK(ValidateHash == Hash); + } + else + { + CHECK(!Cas.HaveChunk(Hash)); + IoBuffer Buffer = Cas.FindChunk(Hash); + CHECK(!Buffer); + } + } + }; + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, true); + GenerateChunks(Cas, kChunkCount, kChunkSize, Hashes); + ValidateChunks(Cas, Hashes, true); + Cas.Flush(); + ValidateChunks(Cas, Hashes, true); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + GenerateChunks(Cas, kChunkCount, kChunkSize / 4, Hashes); + ValidateChunks(Cas, Hashes, true); + } + + class GcRefChecker : public GcReferenceChecker + { + public: + explicit GcRefChecker(std::vector<IoHash>&& HashesToKeep) : m_HashesToKeep(std::move(HashesToKeep)) {} + ~GcRefChecker() {} + std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return "test"; + } + void PreCache(GcCtx& Ctx) override { FilterReferences(Ctx, "test", m_HashesToKeep); } + void UpdateLockedState(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); } + std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override + { + ZEN_UNUSED(Ctx); + return KeepUnusedReferences(m_HashesToKeep, IoCids); + } + + private: + std::vector<IoHash> m_HashesToKeep; + }; + + class GcRef : public GcReferencer + { + public: + GcRef(GcManager& Gc, std::span<const IoHash> HashesToKeep) : m_Gc(Gc) + { + m_HashesToKeep.insert(m_HashesToKeep.begin(), HashesToKeep.begin(), HashesToKeep.end()); + m_Gc.AddGcReferencer(*this); + } + ~GcRef() { m_Gc.RemoveGcReferencer(*this); } + std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return "test"; + } + GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override + { + ZEN_UNUSED(Ctx, Stats); + return nullptr; + } + std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return {new GcRefChecker(std::move(m_HashesToKeep))}; + } + std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return {}; + } + + private: + GcManager& m_Gc; + std::vector<IoHash> m_HashesToKeep; + }; + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + GenerateChunks(Cas, kChunkCount, kChunkSize / 5, Hashes); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + GenerateChunks(Cas, kChunkCount, kChunkSize / 2, Hashes); + ValidateChunks(Cas, Hashes, true); + if (true) + { + std::vector<IoHash> DropHashes; + std::vector<IoHash> KeepHashes; + for (size_t Index = 0; Index < Hashes.size(); Index++) + { + if (Index % 5 == 0) + { + KeepHashes.push_back(Hashes[Index]); + } + else + { + DropHashes.push_back(Hashes[Index]); + } + } + // std::span<const IoHash> KeepHashes(Hashes); + // ZEN_ASSERT(ExpectedGcCount < Hashes.size()); + // KeepHashes = KeepHashes.subspan(ExpectedGcCount); + GcRef Ref(Gc, KeepHashes); + Gc.CollectGarbage(GcSettings{.CollectSmallObjects = true, .IsDeleteMode = true}); + ValidateChunks(Cas, KeepHashes, true); + ValidateChunks(Cas, DropHashes, false); + Hashes = KeepHashes; + } + GenerateChunks(Cas, kChunkCount, kChunkSize / 3, Hashes); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + Cas.Flush(); + ValidateChunks(Cas, Hashes, true); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + } +} + TEST_CASE("compactcas.iteratechunks") { std::atomic<size_t> WorkCompleted = 0; |