diff options
| author | Dan Engelbrecht <[email protected]> | 2023-11-24 13:26:51 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-11-24 13:26:51 +0100 |
| commit | 254d2f89c110fc5f14e658505559a7e7534a984d (patch) | |
| tree | 511e8dcbae633ae4ccaea20f29b9b04bc41ea875 /src | |
| parent | fix truncation of sentry hostname (diff) | |
| download | zen-254d2f89c110fc5f14e658505559a7e7534a984d.tar.xz zen-254d2f89c110fc5f14e658505559a7e7534a984d.zip | |
Add GC Cancel/Stop (#568)
- GcScheduler will now cancel any running GC when it shuts down.
- Old GC is rather limited in *when* it reacts to cancel of GC. GCv2 is more responsive.
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 71 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 255 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 19 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 22 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 103 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 12 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 9 |
7 files changed, 353 insertions, 138 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index af8b6227b..6ab3c7746 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -54,6 +54,13 @@ namespace { #pragma pack(pop) + template<typename T> + void Reset(T& V) + { + T Tmp; + V.swap(Tmp); + } + const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; @@ -2367,6 +2374,7 @@ public: { Stopwatch Timer; const auto _ = MakeGuard([&] { + Reset(m_ExpiredStandaloneKeys); if (!Ctx.Settings.Verbose) { return; @@ -2384,6 +2392,10 @@ public: ExtendablePathBuilder<256> Path; for (const std::pair<IoHash, uint64_t>& ExpiredKey : m_ExpiredStandaloneKeys) { + if (Ctx.IsCancelledFlag.load()) + { + return; + } Path.Reset(); m_Bucket.BuildPath(Path, ExpiredKey.first); fs::path FilePath = Path.ToPath(); @@ -2543,6 +2555,11 @@ public: } m_Bucket.m_SlogFile.Append(MovedEntries); Stats.RemovedDisk += FreedDiskSpace; + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + return true; }, ClaimDiskReserveCallback); } @@ -2558,7 +2575,6 @@ public: } } } - m_ExpiredStandaloneKeys.clear(); } private: @@ -2593,6 +2609,11 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) uint64_t RemovedStandaloneSize = 0; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + TotalEntries = m_Index.size(); // Find out expired keys @@ -2625,6 +2646,11 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) Stats.CheckedCount += TotalEntries; Stats.FoundCount += ExpiredEntries.size(); + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + if (Ctx.Settings.IsDeleteMode) { for (const DiskIndexEntry& Entry : ExpiredEntries) @@ -2656,6 +2682,11 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) } } + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); } @@ -2689,10 +2720,23 @@ public: }); m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock); + if (Ctx.IsCancelledFlag.load()) + { + m_UncachedReferences.clear(); + m_IndexLock.reset(); + return; + } // Rescan to see if any cache items needs refreshing since last pass when we had the lock for (const auto& Entry : m_CacheBucket.m_Index) { + if (Ctx.IsCancelledFlag.load()) + { + m_UncachedReferences.clear(); + m_IndexLock.reset(); + return; + } + size_t PayloadIndex = Entry.second; const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_CacheBucket.m_Payloads[PayloadIndex]; const DiskLocation& Loc = Payload.Location; @@ -2782,6 +2826,11 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) RwLock::SharedLockScope IndexLock(m_IndexLock); for (const auto& Entry : m_Index) { + if (Ctx.IsCancelledFlag.load()) + { + return {}; + } + size_t PayloadIndex = Entry.second; const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Payloads[PayloadIndex]; const DiskLocation& Loc = Payload.Location; @@ -2821,6 +2870,11 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) { for (const IoHash& Key : StandaloneKeys) { + if (Ctx.IsCancelledFlag.load()) + { + return {}; + } + IoBuffer Buffer = GetStandaloneCacheValue(ZenContentType::kCbObject, Key); if (!Buffer) { @@ -3047,12 +3101,9 @@ void ZenCacheDiskLayer::CacheBucket::ClearReferenceCache() { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - m_FirstReferenceIndex.clear(); - m_FirstReferenceIndex.shrink_to_fit(); - m_ReferenceHashes.clear(); - m_ReferenceHashes.shrink_to_fit(); - m_NextReferenceHashesIndexes.clear(); - m_NextReferenceHashesIndexes.shrink_to_fit(); + Reset(m_FirstReferenceIndex); + Reset(m_ReferenceHashes); + Reset(m_NextReferenceHashesIndexes); m_ReferenceCount = 0; } @@ -3099,11 +3150,9 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloa m_Payloads.swap(Payloads); m_AccessTimes.swap(AccessTimes); m_MetaDatas.swap(MetaDatas); - m_FreeMetaDatas.clear(); - m_FreeMetaDatas.shrink_to_fit(); + Reset(m_FreeMetaDatas); m_MemCachedPayloads.swap(MemCachedPayloads); - m_FreeMemCachedPayloads.clear(); - m_FreeMetaDatas.shrink_to_fit(); + Reset(m_FreeMemCachedPayloads); if (m_Configuration.EnableReferenceCaching) { m_FirstReferenceIndex.swap(FirstReferenceIndex); diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index ec299092d..e4a66daf4 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -1056,156 +1056,172 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, } }); - auto ReportChanges = [&]() { + auto ReportChanges = [&]() -> bool { + bool Continue = true; if (!MovedChunks.empty() || RemovedSize > 0) { - ChangeCallback(MovedChunks, RemovedSize > AddedSize ? RemovedSize - AddedSize : 0); + Continue = ChangeCallback(MovedChunks, RemovedSize > AddedSize ? RemovedSize - AddedSize : 0); DeletedSize += RemovedSize; RemovedSize = 0; AddedSize = 0; MovedCount += MovedChunks.size(); MovedChunks.clear(); } + return Continue; }; std::vector<uint32_t> RemovedBlocks; - CompactState.IterateBlocks( - [&](uint32_t BlockIndex, const std::vector<size_t>& KeepChunkIndexes, const std::vector<BlockStoreLocation>& ChunkLocations) { - Ref<BlockStoreFile> OldBlockFile; + CompactState.IterateBlocks([&](uint32_t BlockIndex, + const std::vector<size_t>& KeepChunkIndexes, + const std::vector<BlockStoreLocation>& ChunkLocations) -> bool { + Ref<BlockStoreFile> OldBlockFile; + { + RwLock::SharedLockScope _(m_InsertLock); + if ((BlockIndex == m_WriteBlockIndex.load()) && m_WriteBlock) { - RwLock::SharedLockScope _(m_InsertLock); - if ((BlockIndex == m_WriteBlockIndex.load()) && m_WriteBlock) - { - // You are trying to collect the currently writing block, Report error? - return; - } - auto It = m_ChunkBlocks.find(BlockIndex); - if (It == m_ChunkBlocks.end()) - { - // This block has unknown, we can't move anything. Report error? - return; - } - if (!It->second) - { - // This block has been removed, we can't move anything. Report error? - return; - } - OldBlockFile = It->second; + ZEN_ERROR("Compact Block was requested to rewrite the currently active write block in '{}', Block index {}", + m_BlocksBasePath, + BlockIndex); + return false; } - ZEN_ASSERT(OldBlockFile); + auto It = m_ChunkBlocks.find(BlockIndex); + if (It == m_ChunkBlocks.end()) + { + ZEN_WARN("Compact Block was requested to rewrite an unknown block in '{}', Block index {}", m_BlocksBasePath, BlockIndex); + return true; + } + if (!It->second) + { + ZEN_WARN("Compact Block was requested to rewrite a deleted block in '{}', Block index {}", m_BlocksBasePath, BlockIndex); + return true; + } + OldBlockFile = It->second; + } + ZEN_ASSERT(OldBlockFile); - uint64_t OldBlockSize = OldBlockFile->FileSize(); + uint64_t OldBlockSize = OldBlockFile->FileSize(); - std::vector<uint8_t> Chunk; - for (const size_t& ChunkIndex : KeepChunkIndexes) + std::vector<uint8_t> Chunk; + for (const size_t& ChunkIndex : KeepChunkIndexes) + { + const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; + if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize) { - const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; - if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize) - { - ZEN_WARN( - "Compact Block skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block " - "size {}", - m_BlocksBasePath, - ChunkLocation.Offset, - ChunkLocation.Size, - OldBlockFile->GetPath(), - OldBlockSize); - continue; - } + ZEN_WARN( + "Compact Block skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block " + "size {}", + m_BlocksBasePath, + ChunkLocation.Offset, + ChunkLocation.Size, + OldBlockFile->GetPath(), + OldBlockSize); + continue; + } - Chunk.resize(ChunkLocation.Size); - OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); + Chunk.resize(ChunkLocation.Size); + OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - if ((WriteOffset + Chunk.size()) > m_MaxBlockSize) + if ((WriteOffset + Chunk.size()) > m_MaxBlockSize) + { + if (NewBlockFile) { - if (NewBlockFile) - { - NewBlockFile->Flush(); - MovedSize += NewBlockFile->FileSize(); - NewBlockFile = nullptr; + NewBlockFile->Flush(); + MovedSize += NewBlockFile->FileSize(); + NewBlockFile = nullptr; - ZEN_ASSERT(!MovedChunks.empty() || RemovedSize > 0); // We should not have a new block if we haven't moved anything + ZEN_ASSERT(!MovedChunks.empty() || RemovedSize > 0); // We should not have a new block if we haven't moved anything - ChangeCallback(MovedChunks, RemovedSize); - DeletedSize += RemovedSize; - RemovedSize = 0; - MovedCount += MovedChunks.size(); - MovedChunks.clear(); + if (!ReportChanges()) + { + return false; } + } - uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); + { + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + std::filesystem::path NewBlockPath; + NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath); + if (NextBlockIndex == (uint32_t)m_MaxBlockCount) { - RwLock::ExclusiveLockScope InsertLock(m_InsertLock); - std::filesystem::path NewBlockPath; - NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath); - if (NextBlockIndex == (uint32_t)m_MaxBlockCount) - { - ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", - m_BlocksBasePath, - static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); - return; - } - - NewBlockFile = new BlockStoreFile(NewBlockPath); - m_ChunkBlocks[NextBlockIndex] = NewBlockFile; + ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", + m_BlocksBasePath, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + return false; } - ZEN_ASSERT(NewBlockFile); - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); - if (Error) + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; + } + ZEN_ASSERT(NewBlockFile); + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); + if (Error) + { + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); { - ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); - return; + RwLock::ExclusiveLockScope _l(m_InsertLock); + ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); + m_ChunkBlocks.erase(NextBlockIndex); } + NewBlockFile->MarkAsDeleteOnClose(); + NewBlockFile = nullptr; + return false; + } - if (Space.Free < m_MaxBlockSize) + if (Space.Free < m_MaxBlockSize) + { + uint64_t ReclaimedSpace = DiskReserveCallback(); + if (Space.Free + ReclaimedSpace < m_MaxBlockSize) { - uint64_t ReclaimedSpace = DiskReserveCallback(); - if (Space.Free + ReclaimedSpace < m_MaxBlockSize) - { - ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", - m_BlocksBasePath, - m_MaxBlockSize, - NiceBytes(Space.Free + ReclaimedSpace)); - { - RwLock::ExclusiveLockScope _l(m_InsertLock); - ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); - m_ChunkBlocks.erase(NextBlockIndex); - } - NewBlockFile = nullptr; - return; - } - - ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", m_BlocksBasePath, - ReclaimedSpace, + m_MaxBlockSize, NiceBytes(Space.Free + ReclaimedSpace)); + { + RwLock::ExclusiveLockScope _l(m_InsertLock); + ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); + m_ChunkBlocks.erase(NextBlockIndex); + } + NewBlockFile->MarkAsDeleteOnClose(); + NewBlockFile = nullptr; + return false; } - NewBlockFile->Create(m_MaxBlockSize); - NewBlockIndex = NextBlockIndex; - WriteOffset = 0; - } - NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); - MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}}); - WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment); - AddedSize += Chunk.size(); + ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + m_BlocksBasePath, + ReclaimedSpace, + NiceBytes(Space.Free + ReclaimedSpace)); + } + NewBlockFile->Create(m_MaxBlockSize); + NewBlockIndex = NextBlockIndex; + WriteOffset = 0; } - Chunk.clear(); - ReportChanges(); + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}}); + WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment); + AddedSize += Chunk.size(); + } + Chunk.clear(); + + if (!ReportChanges()) + { + return false; + } - { - RwLock::ExclusiveLockScope InsertLock(m_InsertLock); - ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); - OldBlockFile->MarkAsDeleteOnClose(); - m_ChunkBlocks.erase(BlockIndex); - m_TotalSize.fetch_sub(OldBlockSize); - RemovedSize += OldBlockSize; - } - }); + { + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + OldBlockFile->MarkAsDeleteOnClose(); + m_ChunkBlocks.erase(BlockIndex); + m_TotalSize.fetch_sub(OldBlockSize); + RemovedSize += OldBlockSize; + } + return true; + }); if (NewBlockFile) { @@ -1825,7 +1841,10 @@ TEST_CASE("blockstore.compact.blocks") Store.CompactBlocks( State, Alignment, - [&](const BlockStore::MovedChunksArray&, uint64_t) { CHECK(false); }, + [&](const BlockStore::MovedChunksArray&, uint64_t) { + CHECK(false); + return true; + }, []() { CHECK(false); return 0; @@ -1850,6 +1869,7 @@ TEST_CASE("blockstore.compact.blocks") [&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) { RemovedSize += Removed; CHECK(Moved.empty()); + return true; }, []() { return 0; }); CHECK_EQ(RemovedSize, PreSize); @@ -1875,6 +1895,7 @@ TEST_CASE("blockstore.compact.blocks") [&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) { RemovedSize += Removed; CHECK(Moved.empty()); + return true; }, []() { return 0; }); CHECK_EQ(Store.TotalSize() + RemovedSize, PreSize); @@ -1895,7 +1916,10 @@ TEST_CASE("blockstore.compact.blocks") Store.CompactBlocks( State, Alignment, - [&](const BlockStore::MovedChunksArray&, uint64_t) { CHECK(false); }, + [&](const BlockStore::MovedChunksArray&, uint64_t) { + CHECK(false); + return true; + }, []() { CHECK(false); return 0; @@ -1927,6 +1951,7 @@ TEST_CASE("blockstore.compact.blocks") [&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) { CHECK(Moved.empty()); RemovedSize += Removed; + return true; }, []() { CHECK(false); @@ -1970,6 +1995,7 @@ TEST_CASE("blockstore.compact.blocks") (*It) = Move.second; } RemovedSize += Removed; + return true; }, []() { CHECK(false); @@ -2046,6 +2072,7 @@ TEST_CASE("blockstore.compact.blocks") (*It) = Move.second; } RemovedSize += Removed; + return true; }, []() { CHECK(false); diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 02db5f848..f28601771 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -577,6 +577,11 @@ public: std::unordered_map<uint32_t, uint64_t> BlockUsage; { RwLock::SharedLockScope __(m_CasContainerStrategy.m_LocationMapLock); + if (Ctx.IsCancelledFlag.load()) + { + return; + } + for (const auto& Entry : m_CasContainerStrategy.m_LocationMap) { size_t Index = Entry.second; @@ -660,6 +665,11 @@ public: } m_CasContainerStrategy.m_CasLog.Append(MovedEntries); Stats.RemovedDisk += FreedDiskSpace; + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + return true; }, ClaimDiskReserveCallback); } @@ -720,6 +730,10 @@ public: { RwLock::ExclusiveLockScope __(m_CasContainerStrategy.m_LocationMapLock); + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } for (const IoHash& Cid : UnusedCids) { @@ -784,6 +798,11 @@ CasContainerStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&) { return {}; } + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + CidsToCheck.reserve(m_LocationMap.size()); for (const auto& It : m_LocationMap) { diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 2623e157d..6ba282163 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -44,6 +44,15 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +namespace { + template<typename T> + void Reset(T& V) + { + T Tmp; + V.swap(Tmp); + } +} // namespace + namespace filecas::impl { const char* IndexExtension = ".uidx"; const char* LogExtension = ".ulog"; @@ -1345,6 +1354,7 @@ public: { Stopwatch Timer; const auto _ = MakeGuard([&] { + Reset(m_ReferencesToClean); if (!Ctx.Settings.Verbose) { return; @@ -1366,6 +1376,11 @@ public: // Not regarded as pruned, leave it be continue; } + if (Ctx.IsCancelledFlag.load()) + { + return; + } + if (Ctx.Settings.IsDeleteMode) { ZEN_DEBUG("GCV2: filecas [COMPACT] '{}': Deleting CAS payload file '{}'", @@ -1418,8 +1433,7 @@ public: ZEN_DEBUG("GCV2: filecas [COMPACT] '{}': Skipped deleting of {} eligible files", m_FileCasStrategy.m_RootDirectory, Skipped); } - m_ReferencesToClean.clear(); - m_ReferencesToClean.shrink_to_fit(); + Reset(m_ReferencesToClean); } private: @@ -1521,6 +1535,10 @@ FileCasStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&) { return {}; } + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } CidsToCheck.reserve(m_Index.size()); for (const auto& It : m_Index) { diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 64958cdea..2cd1f6aeb 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -424,6 +424,7 @@ WriteGCResult(CbObjectWriter& Writer, const GcResult& Result, bool HumanReadable } Writer << "WriteBlock" << ToTimeSpan(Result.WriteBlockMS); Writer << "Elapsed" << ToTimeSpan(Result.ElapsedMS); + Writer << "Cancelled" << Result.WasCancelled; return; } @@ -538,8 +539,8 @@ Add(GcReferenceStoreStats& Sum, const GcReferenceStoreStats& Sub) Sum.ElapsedMS += Sub.ElapsedMS; } -void -Sum(GcResult& Stat) +GcResult& +Sum(GcResult& Stat, bool Cancelled = false) { for (std::pair<std::string, GcReferencerStats>& Referencer : Stat.ReferencerStats) { @@ -559,6 +560,10 @@ Sum(GcResult& Stat) Add(Stat.CompactStoresStatSum, Stat.ReferencerStatSum.CompactStoreStats); Add(Stat.CompactStoresStatSum, Stat.ReferenceStoreStatSum.CompactStoreStats); + + Stat.WasCancelled = Cancelled; + + return Stat; } void @@ -594,7 +599,9 @@ GcManager::RemoveGcReferenceStore(GcReferenceStore& ReferenceStore) GcResult GcManager::CollectGarbage(const GcSettings& Settings) { - GcCtx Ctx{.Settings = Settings}; + ZEN_TRACE_CPU("Gc::CollectGarbage(v2)"); + + GcCtx Ctx{.Settings = Settings, .IsCancelledFlag = m_CancelGC}; GcResult Result; { @@ -619,6 +626,11 @@ GcManager::CollectGarbage(const GcSettings& Settings) ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size()); if (!m_GcReferencers.empty()) { + if (CheckGCCancel()) + { + return Sum(Result, true); + } + Latch WorkLeft(1); { // First remove any cache keys that may own references @@ -629,6 +641,12 @@ GcManager::CollectGarbage(const GcSettings& Settings) }); for (size_t Index = 0; Index < m_GcReferencers.size(); Index++) { + if (CheckGCCancel()) + { + WorkLeft.CountDown(); + WorkLeft.Wait(); + return Sum(Result, true); + } GcReferencer* Owner = m_GcReferencers[Index]; std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index]; WorkLeft.AddCount(1); @@ -652,6 +670,11 @@ GcManager::CollectGarbage(const GcSettings& Settings) if (!Ctx.Settings.SkipCidDelete) { + if (CheckGCCancel()) + { + return Sum(Result, true); + } + Result.ReferenceStoreStats.resize(m_GcReferenceStores.size()); ZEN_INFO("GCV2: Creating reference pruners from {} reference stores", m_GcReferenceStores.size()); @@ -672,6 +695,13 @@ GcManager::CollectGarbage(const GcSettings& Settings) }); for (size_t Index = 0; Index < m_GcReferenceStores.size(); Index++) { + if (CheckGCCancel()) + { + WorkLeft.CountDown(); + WorkLeft.Wait(); + return Sum(Result, true); + } + GcReferenceStore* ReferenceStore = m_GcReferenceStores[Index]; std::pair<std::string, GcReferenceStoreStats>& Stats = Result.ReferenceStoreStats[Index]; WorkLeft.AddCount(1); @@ -701,6 +731,11 @@ GcManager::CollectGarbage(const GcSettings& Settings) if (!ReferencePruners.empty()) { + if (CheckGCCancel()) + { + return Sum(Result, true); + } + ZEN_INFO("GCV2: Creating reference checkers from {} referencers", m_GcReferencers.size()); std::unordered_map<std::unique_ptr<GcReferenceChecker>, size_t> ReferenceCheckers; if (!m_GcReferencers.empty()) @@ -719,6 +754,13 @@ GcManager::CollectGarbage(const GcSettings& Settings) // Lock all reference owners from changing the reference data and get access to check for referenced data for (size_t Index = 0; Index < m_GcReferencers.size(); Index++) { + if (CheckGCCancel()) + { + WorkLeft.CountDown(); + WorkLeft.Wait(); + return Sum(Result, true); + } + GcReferencer* Referencer = m_GcReferencers[Index]; std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index]; WorkLeft.AddCount(1); @@ -767,6 +809,11 @@ GcManager::CollectGarbage(const GcSettings& Settings) ZEN_INFO("GCV2: Writes blocked for {}", NiceTimeSpanMs(ElapsedMS))); if (!ReferenceCheckers.empty()) { + if (CheckGCCancel()) + { + return Sum(Result, true); + } + // Locking all references checkers so we have a steady state of which references are used // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until // we delete the ReferenceCheckers @@ -781,6 +828,13 @@ GcManager::CollectGarbage(const GcSettings& Settings) }); for (auto& It : ReferenceCheckers) { + if (CheckGCCancel()) + { + WorkLeft.CountDown(); + WorkLeft.Wait(); + return Sum(Result, true); + } + GcReferenceChecker* Checker = It.first.get(); size_t Index = It.second; std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index]; @@ -827,6 +881,13 @@ GcManager::CollectGarbage(const GcSettings& Settings) }); for (auto& It : ReferencePruners) { + if (CheckGCCancel()) + { + WorkLeft.CountDown(); + WorkLeft.Wait(); + return Sum(Result, true); + } + GcReferencePruner* Pruner = It.second.get(); size_t Index = It.first; GcReferenceStoreStats& Stats = Result.ReferenceStoreStats[Index].second; @@ -866,6 +927,11 @@ GcManager::CollectGarbage(const GcSettings& Settings) ZEN_INFO("GCV2: Compacting using {} store compactors", StoreCompactors.size()); if (!StoreCompactors.empty()) { + if (CheckGCCancel()) + { + return Sum(Result, true); + } + auto ClaimDiskReserve = [&]() -> uint64_t { if (!std::filesystem::is_regular_file(Settings.DiskReservePath)) { @@ -886,6 +952,11 @@ GcManager::CollectGarbage(const GcSettings& Settings) }); for (auto& It : StoreCompactors) { + if (CheckGCCancel()) + { + return Sum(Result, true); + } + GcStoreCompactor* Compactor = It.first.get(); GcCompactStoreStats& Stats = *It.second; { @@ -901,8 +972,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) ZEN_INFO("GCV2: Completed in {}", NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); } - Sum(Result); - return Result; + return Sum(Result); } #undef SCOPED_TIMER @@ -910,6 +980,12 @@ GcManager::CollectGarbage(const GcSettings& Settings) //////// End GC V2 void +GcManager::SetCancelGC(bool CancelFlag) +{ + m_CancelGC.store(CancelFlag); +} + +void GcManager::AddGcContributor(GcContributor* Contributor) { RwLock::ExclusiveLockScope _(m_Lock); @@ -965,6 +1041,10 @@ GcManager::CollectGarbage(GcContext& GcCtx) const auto Guard = MakeGuard([&] { ZEN_INFO("gathered references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); for (GcContributor* Contributor : m_GcContribs) { + if (CheckGCCancel()) + { + return GCTotalSizeDiff; + } Contributor->GatherReferences(GcCtx); } } @@ -982,6 +1062,11 @@ GcManager::CollectGarbage(GcContext& GcCtx) }); for (GcStorage* Storage : m_GcStorage) { + if (CheckGCCancel()) + { + break; + } + const auto PreSize = Storage->StorageSize(); Storage->CollectGarbage(GcCtx); const auto PostSize = Storage->StorageSize(); @@ -1208,7 +1293,11 @@ GcScheduler::Shutdown() if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status) { bool GcIsRunning = m_Status == static_cast<uint32_t>(GcSchedulerStatus::kRunning); - m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped); + if (GcIsRunning) + { + m_GcManager.SetCancelGC(true); + } + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped); m_GcSignal.notify_one(); if (m_GcThread.joinable()) @@ -1758,6 +1847,8 @@ GcScheduler::SchedulerThread() CompactBlockUsageThresholdPercent, Verbose); + m_GcManager.SetCancelGC(false); + uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning); if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle))) { diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 1429a6b02..b748fc8f6 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -126,7 +126,7 @@ public: typedef std::vector<size_t> ChunkIndexArray; typedef std::function<void(const MovedChunksArray& MovedChunks, const ChunkIndexArray& RemovedChunks)> ReclaimCallback; - typedef std::function<void(const MovedChunksArray& MovedChunks, uint64_t FreedDiskSpace)> CompactCallback; + typedef std::function<bool(const MovedChunksArray& MovedChunks, uint64_t FreedDiskSpace)> CompactCallback; typedef std::function<uint64_t()> ClaimDiskReserveCallback; typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback; typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback; @@ -163,7 +163,7 @@ public: void CompactBlocks( const BlockStoreCompactState& CompactState, uint64_t PayloadAlignment, - const CompactCallback& ChangeCallback = [](const MovedChunksArray&, uint64_t) {}, + const CompactCallback& ChangeCallback = [](const MovedChunksArray&, uint64_t) { return true; }, const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; }); static const char* GetBlockFileExtension(); @@ -230,14 +230,18 @@ public: const BlockStoreLocation& GetLocation(size_t Index) const { return m_ChunkLocations[Index]; } - void IterateBlocks(std::function<void(uint32_t BlockIndex, + void IterateBlocks(std::function<bool(uint32_t BlockIndex, const std::vector<size_t>& KeepChunkIndexes, const std::vector<BlockStoreLocation>& ChunkLocations)> Callback) const { for (auto It : m_BlockIndexToChunkMapIndex) { size_t ChunkMapIndex = It.second; - Callback(It.first, m_KeepChunks[ChunkMapIndex], m_ChunkLocations); + bool Continue = Callback(It.first, m_KeepChunks[ChunkMapIndex], m_ChunkLocations); + if (!Continue) + { + break; + } } } diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index f50af8006..486dca3c6 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -121,6 +121,8 @@ struct GcResult std::chrono::milliseconds WriteBlockMS = {}; std::chrono::milliseconds ElapsedMS = {}; + + bool WasCancelled = false; }; class CbObjectWriter; @@ -129,7 +131,8 @@ void WriteGCResult(CbObjectWriter& Writer, const GcResult& Result, bool HumanRea struct GcCtx { - const GcSettings Settings; + const GcSettings Settings; + std::atomic_bool& IsCancelledFlag; }; typedef tsl::robin_set<IoHash> HashSet; @@ -341,6 +344,7 @@ public: void RemoveGcReferenceStore(GcReferenceStore& ReferenceStore); GcResult CollectGarbage(const GcSettings& Settings); + void SetCancelGC(bool CancelFlag); //////// End GC V2 @@ -359,6 +363,7 @@ public: void SetDiskWriteBlocker(const DiskWriteBlocker* Monitor) { m_DiskWriteBlocker = Monitor; } private: + bool CheckGCCancel() { return m_CancelGC.load(); } LoggerRef Log() { return m_Log; } LoggerRef m_Log; mutable RwLock m_Lock; @@ -369,6 +374,8 @@ private: std::vector<GcReferencer*> m_GcReferencers; std::vector<GcReferenceStore*> m_GcReferenceStores; + + std::atomic_bool m_CancelGC{false}; }; enum class GcSchedulerStatus : uint32_t |