aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-11-24 13:26:51 +0100
committerGitHub <[email protected]>2023-11-24 13:26:51 +0100
commit254d2f89c110fc5f14e658505559a7e7534a984d (patch)
tree511e8dcbae633ae4ccaea20f29b9b04bc41ea875 /src
parentfix truncation of sentry hostname (diff)
downloadzen-254d2f89c110fc5f14e658505559a7e7534a984d.tar.xz
zen-254d2f89c110fc5f14e658505559a7e7534a984d.zip
Add GC Cancel/Stop (#568)
- GcScheduler will now cancel any running GC when it shuts down. - Old GC is rather limited in *when* it reacts to cancel of GC. GCv2 is more responsive.
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp71
-rw-r--r--src/zenstore/blockstore.cpp255
-rw-r--r--src/zenstore/compactcas.cpp19
-rw-r--r--src/zenstore/filecas.cpp22
-rw-r--r--src/zenstore/gc.cpp103
-rw-r--r--src/zenstore/include/zenstore/blockstore.h12
-rw-r--r--src/zenstore/include/zenstore/gc.h9
7 files changed, 353 insertions, 138 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index af8b6227b..6ab3c7746 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -54,6 +54,13 @@ namespace {
#pragma pack(pop)
+ template<typename T>
+ void Reset(T& V)
+ {
+ T Tmp;
+ V.swap(Tmp);
+ }
+
const char* IndexExtension = ".uidx";
const char* LogExtension = ".slog";
@@ -2367,6 +2374,7 @@ public:
{
Stopwatch Timer;
const auto _ = MakeGuard([&] {
+ Reset(m_ExpiredStandaloneKeys);
if (!Ctx.Settings.Verbose)
{
return;
@@ -2384,6 +2392,10 @@ public:
ExtendablePathBuilder<256> Path;
for (const std::pair<IoHash, uint64_t>& ExpiredKey : m_ExpiredStandaloneKeys)
{
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return;
+ }
Path.Reset();
m_Bucket.BuildPath(Path, ExpiredKey.first);
fs::path FilePath = Path.ToPath();
@@ -2543,6 +2555,11 @@ public:
}
m_Bucket.m_SlogFile.Append(MovedEntries);
Stats.RemovedDisk += FreedDiskSpace;
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return false;
+ }
+ return true;
},
ClaimDiskReserveCallback);
}
@@ -2558,7 +2575,6 @@ public:
}
}
}
- m_ExpiredStandaloneKeys.clear();
}
private:
@@ -2593,6 +2609,11 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
uint64_t RemovedStandaloneSize = 0;
{
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return nullptr;
+ }
+
TotalEntries = m_Index.size();
// Find out expired keys
@@ -2625,6 +2646,11 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
Stats.CheckedCount += TotalEntries;
Stats.FoundCount += ExpiredEntries.size();
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return nullptr;
+ }
+
if (Ctx.Settings.IsDeleteMode)
{
for (const DiskIndexEntry& Entry : ExpiredEntries)
@@ -2656,6 +2682,11 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
}
}
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return nullptr;
+ }
+
return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys));
}
@@ -2689,10 +2720,23 @@ public:
});
m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock);
+ if (Ctx.IsCancelledFlag.load())
+ {
+ m_UncachedReferences.clear();
+ m_IndexLock.reset();
+ return;
+ }
// Rescan to see if any cache items needs refreshing since last pass when we had the lock
for (const auto& Entry : m_CacheBucket.m_Index)
{
+ if (Ctx.IsCancelledFlag.load())
+ {
+ m_UncachedReferences.clear();
+ m_IndexLock.reset();
+ return;
+ }
+
size_t PayloadIndex = Entry.second;
const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_CacheBucket.m_Payloads[PayloadIndex];
const DiskLocation& Loc = Payload.Location;
@@ -2782,6 +2826,11 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
RwLock::SharedLockScope IndexLock(m_IndexLock);
for (const auto& Entry : m_Index)
{
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return {};
+ }
+
size_t PayloadIndex = Entry.second;
const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Payloads[PayloadIndex];
const DiskLocation& Loc = Payload.Location;
@@ -2821,6 +2870,11 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
{
for (const IoHash& Key : StandaloneKeys)
{
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return {};
+ }
+
IoBuffer Buffer = GetStandaloneCacheValue(ZenContentType::kCbObject, Key);
if (!Buffer)
{
@@ -3047,12 +3101,9 @@ void
ZenCacheDiskLayer::CacheBucket::ClearReferenceCache()
{
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- m_FirstReferenceIndex.clear();
- m_FirstReferenceIndex.shrink_to_fit();
- m_ReferenceHashes.clear();
- m_ReferenceHashes.shrink_to_fit();
- m_NextReferenceHashesIndexes.clear();
- m_NextReferenceHashesIndexes.shrink_to_fit();
+ Reset(m_FirstReferenceIndex);
+ Reset(m_ReferenceHashes);
+ Reset(m_NextReferenceHashesIndexes);
m_ReferenceCount = 0;
}
@@ -3099,11 +3150,9 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloa
m_Payloads.swap(Payloads);
m_AccessTimes.swap(AccessTimes);
m_MetaDatas.swap(MetaDatas);
- m_FreeMetaDatas.clear();
- m_FreeMetaDatas.shrink_to_fit();
+ Reset(m_FreeMetaDatas);
m_MemCachedPayloads.swap(MemCachedPayloads);
- m_FreeMemCachedPayloads.clear();
- m_FreeMetaDatas.shrink_to_fit();
+ Reset(m_FreeMemCachedPayloads);
if (m_Configuration.EnableReferenceCaching)
{
m_FirstReferenceIndex.swap(FirstReferenceIndex);
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index ec299092d..e4a66daf4 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -1056,156 +1056,172 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
}
});
- auto ReportChanges = [&]() {
+ auto ReportChanges = [&]() -> bool {
+ bool Continue = true;
if (!MovedChunks.empty() || RemovedSize > 0)
{
- ChangeCallback(MovedChunks, RemovedSize > AddedSize ? RemovedSize - AddedSize : 0);
+ Continue = ChangeCallback(MovedChunks, RemovedSize > AddedSize ? RemovedSize - AddedSize : 0);
DeletedSize += RemovedSize;
RemovedSize = 0;
AddedSize = 0;
MovedCount += MovedChunks.size();
MovedChunks.clear();
}
+ return Continue;
};
std::vector<uint32_t> RemovedBlocks;
- CompactState.IterateBlocks(
- [&](uint32_t BlockIndex, const std::vector<size_t>& KeepChunkIndexes, const std::vector<BlockStoreLocation>& ChunkLocations) {
- Ref<BlockStoreFile> OldBlockFile;
+ CompactState.IterateBlocks([&](uint32_t BlockIndex,
+ const std::vector<size_t>& KeepChunkIndexes,
+ const std::vector<BlockStoreLocation>& ChunkLocations) -> bool {
+ Ref<BlockStoreFile> OldBlockFile;
+ {
+ RwLock::SharedLockScope _(m_InsertLock);
+ if ((BlockIndex == m_WriteBlockIndex.load()) && m_WriteBlock)
{
- RwLock::SharedLockScope _(m_InsertLock);
- if ((BlockIndex == m_WriteBlockIndex.load()) && m_WriteBlock)
- {
- // You are trying to collect the currently writing block, Report error?
- return;
- }
- auto It = m_ChunkBlocks.find(BlockIndex);
- if (It == m_ChunkBlocks.end())
- {
- // This block has unknown, we can't move anything. Report error?
- return;
- }
- if (!It->second)
- {
- // This block has been removed, we can't move anything. Report error?
- return;
- }
- OldBlockFile = It->second;
+ ZEN_ERROR("Compact Block was requested to rewrite the currently active write block in '{}', Block index {}",
+ m_BlocksBasePath,
+ BlockIndex);
+ return false;
}
- ZEN_ASSERT(OldBlockFile);
+ auto It = m_ChunkBlocks.find(BlockIndex);
+ if (It == m_ChunkBlocks.end())
+ {
+ ZEN_WARN("Compact Block was requested to rewrite an unknown block in '{}', Block index {}", m_BlocksBasePath, BlockIndex);
+ return true;
+ }
+ if (!It->second)
+ {
+ ZEN_WARN("Compact Block was requested to rewrite a deleted block in '{}', Block index {}", m_BlocksBasePath, BlockIndex);
+ return true;
+ }
+ OldBlockFile = It->second;
+ }
+ ZEN_ASSERT(OldBlockFile);
- uint64_t OldBlockSize = OldBlockFile->FileSize();
+ uint64_t OldBlockSize = OldBlockFile->FileSize();
- std::vector<uint8_t> Chunk;
- for (const size_t& ChunkIndex : KeepChunkIndexes)
+ std::vector<uint8_t> Chunk;
+ for (const size_t& ChunkIndex : KeepChunkIndexes)
+ {
+ const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
+ if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize)
{
- const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
- if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize)
- {
- ZEN_WARN(
- "Compact Block skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block "
- "size {}",
- m_BlocksBasePath,
- ChunkLocation.Offset,
- ChunkLocation.Size,
- OldBlockFile->GetPath(),
- OldBlockSize);
- continue;
- }
+ ZEN_WARN(
+ "Compact Block skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block "
+ "size {}",
+ m_BlocksBasePath,
+ ChunkLocation.Offset,
+ ChunkLocation.Size,
+ OldBlockFile->GetPath(),
+ OldBlockSize);
+ continue;
+ }
- Chunk.resize(ChunkLocation.Size);
- OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
+ Chunk.resize(ChunkLocation.Size);
+ OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
- if ((WriteOffset + Chunk.size()) > m_MaxBlockSize)
+ if ((WriteOffset + Chunk.size()) > m_MaxBlockSize)
+ {
+ if (NewBlockFile)
{
- if (NewBlockFile)
- {
- NewBlockFile->Flush();
- MovedSize += NewBlockFile->FileSize();
- NewBlockFile = nullptr;
+ NewBlockFile->Flush();
+ MovedSize += NewBlockFile->FileSize();
+ NewBlockFile = nullptr;
- ZEN_ASSERT(!MovedChunks.empty() || RemovedSize > 0); // We should not have a new block if we haven't moved anything
+ ZEN_ASSERT(!MovedChunks.empty() || RemovedSize > 0); // We should not have a new block if we haven't moved anything
- ChangeCallback(MovedChunks, RemovedSize);
- DeletedSize += RemovedSize;
- RemovedSize = 0;
- MovedCount += MovedChunks.size();
- MovedChunks.clear();
+ if (!ReportChanges())
+ {
+ return false;
}
+ }
- uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
+ uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
+ {
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+ std::filesystem::path NewBlockPath;
+ NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath);
+ if (NextBlockIndex == (uint32_t)m_MaxBlockCount)
{
- RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
- std::filesystem::path NewBlockPath;
- NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath);
- if (NextBlockIndex == (uint32_t)m_MaxBlockCount)
- {
- ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
- m_BlocksBasePath,
- static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
- return;
- }
-
- NewBlockFile = new BlockStoreFile(NewBlockPath);
- m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
+ ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
+ m_BlocksBasePath,
+ static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+ return false;
}
- ZEN_ASSERT(NewBlockFile);
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error);
- if (Error)
+ NewBlockFile = new BlockStoreFile(NewBlockPath);
+ m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
+ }
+ ZEN_ASSERT(NewBlockFile);
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message());
{
- ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message());
- return;
+ RwLock::ExclusiveLockScope _l(m_InsertLock);
+ ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
+ m_ChunkBlocks.erase(NextBlockIndex);
}
+ NewBlockFile->MarkAsDeleteOnClose();
+ NewBlockFile = nullptr;
+ return false;
+ }
- if (Space.Free < m_MaxBlockSize)
+ if (Space.Free < m_MaxBlockSize)
+ {
+ uint64_t ReclaimedSpace = DiskReserveCallback();
+ if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
{
- uint64_t ReclaimedSpace = DiskReserveCallback();
- if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
- {
- ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
- m_BlocksBasePath,
- m_MaxBlockSize,
- NiceBytes(Space.Free + ReclaimedSpace));
- {
- RwLock::ExclusiveLockScope _l(m_InsertLock);
- ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
- m_ChunkBlocks.erase(NextBlockIndex);
- }
- NewBlockFile = nullptr;
- return;
- }
-
- ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
+ ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
m_BlocksBasePath,
- ReclaimedSpace,
+ m_MaxBlockSize,
NiceBytes(Space.Free + ReclaimedSpace));
+ {
+ RwLock::ExclusiveLockScope _l(m_InsertLock);
+ ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
+ m_ChunkBlocks.erase(NextBlockIndex);
+ }
+ NewBlockFile->MarkAsDeleteOnClose();
+ NewBlockFile = nullptr;
+ return false;
}
- NewBlockFile->Create(m_MaxBlockSize);
- NewBlockIndex = NextBlockIndex;
- WriteOffset = 0;
- }
- NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
- MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}});
- WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment);
- AddedSize += Chunk.size();
+ ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
+ m_BlocksBasePath,
+ ReclaimedSpace,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ }
+ NewBlockFile->Create(m_MaxBlockSize);
+ NewBlockIndex = NextBlockIndex;
+ WriteOffset = 0;
}
- Chunk.clear();
- ReportChanges();
+ NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
+ MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}});
+ WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment);
+ AddedSize += Chunk.size();
+ }
+ Chunk.clear();
+
+ if (!ReportChanges())
+ {
+ return false;
+ }
- {
- RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
- ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
- OldBlockFile->MarkAsDeleteOnClose();
- m_ChunkBlocks.erase(BlockIndex);
- m_TotalSize.fetch_sub(OldBlockSize);
- RemovedSize += OldBlockSize;
- }
- });
+ {
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+ ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ OldBlockFile->MarkAsDeleteOnClose();
+ m_ChunkBlocks.erase(BlockIndex);
+ m_TotalSize.fetch_sub(OldBlockSize);
+ RemovedSize += OldBlockSize;
+ }
+ return true;
+ });
if (NewBlockFile)
{
@@ -1825,7 +1841,10 @@ TEST_CASE("blockstore.compact.blocks")
Store.CompactBlocks(
State,
Alignment,
- [&](const BlockStore::MovedChunksArray&, uint64_t) { CHECK(false); },
+ [&](const BlockStore::MovedChunksArray&, uint64_t) {
+ CHECK(false);
+ return true;
+ },
[]() {
CHECK(false);
return 0;
@@ -1850,6 +1869,7 @@ TEST_CASE("blockstore.compact.blocks")
[&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) {
RemovedSize += Removed;
CHECK(Moved.empty());
+ return true;
},
[]() { return 0; });
CHECK_EQ(RemovedSize, PreSize);
@@ -1875,6 +1895,7 @@ TEST_CASE("blockstore.compact.blocks")
[&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) {
RemovedSize += Removed;
CHECK(Moved.empty());
+ return true;
},
[]() { return 0; });
CHECK_EQ(Store.TotalSize() + RemovedSize, PreSize);
@@ -1895,7 +1916,10 @@ TEST_CASE("blockstore.compact.blocks")
Store.CompactBlocks(
State,
Alignment,
- [&](const BlockStore::MovedChunksArray&, uint64_t) { CHECK(false); },
+ [&](const BlockStore::MovedChunksArray&, uint64_t) {
+ CHECK(false);
+ return true;
+ },
[]() {
CHECK(false);
return 0;
@@ -1927,6 +1951,7 @@ TEST_CASE("blockstore.compact.blocks")
[&](const BlockStore::MovedChunksArray& Moved, uint64_t Removed) {
CHECK(Moved.empty());
RemovedSize += Removed;
+ return true;
},
[]() {
CHECK(false);
@@ -1970,6 +1995,7 @@ TEST_CASE("blockstore.compact.blocks")
(*It) = Move.second;
}
RemovedSize += Removed;
+ return true;
},
[]() {
CHECK(false);
@@ -2046,6 +2072,7 @@ TEST_CASE("blockstore.compact.blocks")
(*It) = Move.second;
}
RemovedSize += Removed;
+ return true;
},
[]() {
CHECK(false);
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 02db5f848..f28601771 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -577,6 +577,11 @@ public:
std::unordered_map<uint32_t, uint64_t> BlockUsage;
{
RwLock::SharedLockScope __(m_CasContainerStrategy.m_LocationMapLock);
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return;
+ }
+
for (const auto& Entry : m_CasContainerStrategy.m_LocationMap)
{
size_t Index = Entry.second;
@@ -660,6 +665,11 @@ public:
}
m_CasContainerStrategy.m_CasLog.Append(MovedEntries);
Stats.RemovedDisk += FreedDiskSpace;
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return false;
+ }
+ return true;
},
ClaimDiskReserveCallback);
}
@@ -720,6 +730,10 @@ public:
{
RwLock::ExclusiveLockScope __(m_CasContainerStrategy.m_LocationMapLock);
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return nullptr;
+ }
for (const IoHash& Cid : UnusedCids)
{
@@ -784,6 +798,11 @@ CasContainerStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&)
{
return {};
}
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return nullptr;
+ }
+
CidsToCheck.reserve(m_LocationMap.size());
for (const auto& It : m_LocationMap)
{
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 2623e157d..6ba282163 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -44,6 +44,15 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+namespace {
+ template<typename T>
+ void Reset(T& V)
+ {
+ T Tmp;
+ V.swap(Tmp);
+ }
+} // namespace
+
namespace filecas::impl {
const char* IndexExtension = ".uidx";
const char* LogExtension = ".ulog";
@@ -1345,6 +1354,7 @@ public:
{
Stopwatch Timer;
const auto _ = MakeGuard([&] {
+ Reset(m_ReferencesToClean);
if (!Ctx.Settings.Verbose)
{
return;
@@ -1366,6 +1376,11 @@ public:
// Not regarded as pruned, leave it be
continue;
}
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return;
+ }
+
if (Ctx.Settings.IsDeleteMode)
{
ZEN_DEBUG("GCV2: filecas [COMPACT] '{}': Deleting CAS payload file '{}'",
@@ -1418,8 +1433,7 @@ public:
ZEN_DEBUG("GCV2: filecas [COMPACT] '{}': Skipped deleting of {} eligible files", m_FileCasStrategy.m_RootDirectory, Skipped);
}
- m_ReferencesToClean.clear();
- m_ReferencesToClean.shrink_to_fit();
+ Reset(m_ReferencesToClean);
}
private:
@@ -1521,6 +1535,10 @@ FileCasStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&)
{
return {};
}
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return nullptr;
+ }
CidsToCheck.reserve(m_Index.size());
for (const auto& It : m_Index)
{
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index 64958cdea..2cd1f6aeb 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -424,6 +424,7 @@ WriteGCResult(CbObjectWriter& Writer, const GcResult& Result, bool HumanReadable
}
Writer << "WriteBlock" << ToTimeSpan(Result.WriteBlockMS);
Writer << "Elapsed" << ToTimeSpan(Result.ElapsedMS);
+ Writer << "Cancelled" << Result.WasCancelled;
return;
}
@@ -538,8 +539,8 @@ Add(GcReferenceStoreStats& Sum, const GcReferenceStoreStats& Sub)
Sum.ElapsedMS += Sub.ElapsedMS;
}
-void
-Sum(GcResult& Stat)
+GcResult&
+Sum(GcResult& Stat, bool Cancelled = false)
{
for (std::pair<std::string, GcReferencerStats>& Referencer : Stat.ReferencerStats)
{
@@ -559,6 +560,10 @@ Sum(GcResult& Stat)
Add(Stat.CompactStoresStatSum, Stat.ReferencerStatSum.CompactStoreStats);
Add(Stat.CompactStoresStatSum, Stat.ReferenceStoreStatSum.CompactStoreStats);
+
+ Stat.WasCancelled = Cancelled;
+
+ return Stat;
}
void
@@ -594,7 +599,9 @@ GcManager::RemoveGcReferenceStore(GcReferenceStore& ReferenceStore)
GcResult
GcManager::CollectGarbage(const GcSettings& Settings)
{
- GcCtx Ctx{.Settings = Settings};
+ ZEN_TRACE_CPU("Gc::CollectGarbage(v2)");
+
+ GcCtx Ctx{.Settings = Settings, .IsCancelledFlag = m_CancelGC};
GcResult Result;
{
@@ -619,6 +626,11 @@ GcManager::CollectGarbage(const GcSettings& Settings)
ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size());
if (!m_GcReferencers.empty())
{
+ if (CheckGCCancel())
+ {
+ return Sum(Result, true);
+ }
+
Latch WorkLeft(1);
{
// First remove any cache keys that may own references
@@ -629,6 +641,12 @@ GcManager::CollectGarbage(const GcSettings& Settings)
});
for (size_t Index = 0; Index < m_GcReferencers.size(); Index++)
{
+ if (CheckGCCancel())
+ {
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ return Sum(Result, true);
+ }
GcReferencer* Owner = m_GcReferencers[Index];
std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index];
WorkLeft.AddCount(1);
@@ -652,6 +670,11 @@ GcManager::CollectGarbage(const GcSettings& Settings)
if (!Ctx.Settings.SkipCidDelete)
{
+ if (CheckGCCancel())
+ {
+ return Sum(Result, true);
+ }
+
Result.ReferenceStoreStats.resize(m_GcReferenceStores.size());
ZEN_INFO("GCV2: Creating reference pruners from {} reference stores", m_GcReferenceStores.size());
@@ -672,6 +695,13 @@ GcManager::CollectGarbage(const GcSettings& Settings)
});
for (size_t Index = 0; Index < m_GcReferenceStores.size(); Index++)
{
+ if (CheckGCCancel())
+ {
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ return Sum(Result, true);
+ }
+
GcReferenceStore* ReferenceStore = m_GcReferenceStores[Index];
std::pair<std::string, GcReferenceStoreStats>& Stats = Result.ReferenceStoreStats[Index];
WorkLeft.AddCount(1);
@@ -701,6 +731,11 @@ GcManager::CollectGarbage(const GcSettings& Settings)
if (!ReferencePruners.empty())
{
+ if (CheckGCCancel())
+ {
+ return Sum(Result, true);
+ }
+
ZEN_INFO("GCV2: Creating reference checkers from {} referencers", m_GcReferencers.size());
std::unordered_map<std::unique_ptr<GcReferenceChecker>, size_t> ReferenceCheckers;
if (!m_GcReferencers.empty())
@@ -719,6 +754,13 @@ GcManager::CollectGarbage(const GcSettings& Settings)
// Lock all reference owners from changing the reference data and get access to check for referenced data
for (size_t Index = 0; Index < m_GcReferencers.size(); Index++)
{
+ if (CheckGCCancel())
+ {
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ return Sum(Result, true);
+ }
+
GcReferencer* Referencer = m_GcReferencers[Index];
std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index];
WorkLeft.AddCount(1);
@@ -767,6 +809,11 @@ GcManager::CollectGarbage(const GcSettings& Settings)
ZEN_INFO("GCV2: Writes blocked for {}", NiceTimeSpanMs(ElapsedMS)));
if (!ReferenceCheckers.empty())
{
+ if (CheckGCCancel())
+ {
+ return Sum(Result, true);
+ }
+
// Locking all references checkers so we have a steady state of which references are used
// From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until
// we delete the ReferenceCheckers
@@ -781,6 +828,13 @@ GcManager::CollectGarbage(const GcSettings& Settings)
});
for (auto& It : ReferenceCheckers)
{
+ if (CheckGCCancel())
+ {
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ return Sum(Result, true);
+ }
+
GcReferenceChecker* Checker = It.first.get();
size_t Index = It.second;
std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index];
@@ -827,6 +881,13 @@ GcManager::CollectGarbage(const GcSettings& Settings)
});
for (auto& It : ReferencePruners)
{
+ if (CheckGCCancel())
+ {
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ return Sum(Result, true);
+ }
+
GcReferencePruner* Pruner = It.second.get();
size_t Index = It.first;
GcReferenceStoreStats& Stats = Result.ReferenceStoreStats[Index].second;
@@ -866,6 +927,11 @@ GcManager::CollectGarbage(const GcSettings& Settings)
ZEN_INFO("GCV2: Compacting using {} store compactors", StoreCompactors.size());
if (!StoreCompactors.empty())
{
+ if (CheckGCCancel())
+ {
+ return Sum(Result, true);
+ }
+
auto ClaimDiskReserve = [&]() -> uint64_t {
if (!std::filesystem::is_regular_file(Settings.DiskReservePath))
{
@@ -886,6 +952,11 @@ GcManager::CollectGarbage(const GcSettings& Settings)
});
for (auto& It : StoreCompactors)
{
+ if (CheckGCCancel())
+ {
+ return Sum(Result, true);
+ }
+
GcStoreCompactor* Compactor = It.first.get();
GcCompactStoreStats& Stats = *It.second;
{
@@ -901,8 +972,7 @@ GcManager::CollectGarbage(const GcSettings& Settings)
ZEN_INFO("GCV2: Completed in {}", NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()));
}
- Sum(Result);
- return Result;
+ return Sum(Result);
}
#undef SCOPED_TIMER
@@ -910,6 +980,12 @@ GcManager::CollectGarbage(const GcSettings& Settings)
//////// End GC V2
void
+GcManager::SetCancelGC(bool CancelFlag)
+{
+ m_CancelGC.store(CancelFlag);
+}
+
+void
GcManager::AddGcContributor(GcContributor* Contributor)
{
RwLock::ExclusiveLockScope _(m_Lock);
@@ -965,6 +1041,10 @@ GcManager::CollectGarbage(GcContext& GcCtx)
const auto Guard = MakeGuard([&] { ZEN_INFO("gathered references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
for (GcContributor* Contributor : m_GcContribs)
{
+ if (CheckGCCancel())
+ {
+ return GCTotalSizeDiff;
+ }
Contributor->GatherReferences(GcCtx);
}
}
@@ -982,6 +1062,11 @@ GcManager::CollectGarbage(GcContext& GcCtx)
});
for (GcStorage* Storage : m_GcStorage)
{
+ if (CheckGCCancel())
+ {
+ break;
+ }
+
const auto PreSize = Storage->StorageSize();
Storage->CollectGarbage(GcCtx);
const auto PostSize = Storage->StorageSize();
@@ -1208,7 +1293,11 @@ GcScheduler::Shutdown()
if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status)
{
bool GcIsRunning = m_Status == static_cast<uint32_t>(GcSchedulerStatus::kRunning);
- m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped);
+ if (GcIsRunning)
+ {
+ m_GcManager.SetCancelGC(true);
+ }
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped);
m_GcSignal.notify_one();
if (m_GcThread.joinable())
@@ -1758,6 +1847,8 @@ GcScheduler::SchedulerThread()
CompactBlockUsageThresholdPercent,
Verbose);
+ m_GcManager.SetCancelGC(false);
+
uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle)))
{
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index 1429a6b02..b748fc8f6 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -126,7 +126,7 @@ public:
typedef std::vector<size_t> ChunkIndexArray;
typedef std::function<void(const MovedChunksArray& MovedChunks, const ChunkIndexArray& RemovedChunks)> ReclaimCallback;
- typedef std::function<void(const MovedChunksArray& MovedChunks, uint64_t FreedDiskSpace)> CompactCallback;
+ typedef std::function<bool(const MovedChunksArray& MovedChunks, uint64_t FreedDiskSpace)> CompactCallback;
typedef std::function<uint64_t()> ClaimDiskReserveCallback;
typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback;
typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback;
@@ -163,7 +163,7 @@ public:
void CompactBlocks(
const BlockStoreCompactState& CompactState,
uint64_t PayloadAlignment,
- const CompactCallback& ChangeCallback = [](const MovedChunksArray&, uint64_t) {},
+ const CompactCallback& ChangeCallback = [](const MovedChunksArray&, uint64_t) { return true; },
const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; });
static const char* GetBlockFileExtension();
@@ -230,14 +230,18 @@ public:
const BlockStoreLocation& GetLocation(size_t Index) const { return m_ChunkLocations[Index]; }
- void IterateBlocks(std::function<void(uint32_t BlockIndex,
+ void IterateBlocks(std::function<bool(uint32_t BlockIndex,
const std::vector<size_t>& KeepChunkIndexes,
const std::vector<BlockStoreLocation>& ChunkLocations)> Callback) const
{
for (auto It : m_BlockIndexToChunkMapIndex)
{
size_t ChunkMapIndex = It.second;
- Callback(It.first, m_KeepChunks[ChunkMapIndex], m_ChunkLocations);
+ bool Continue = Callback(It.first, m_KeepChunks[ChunkMapIndex], m_ChunkLocations);
+ if (!Continue)
+ {
+ break;
+ }
}
}
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index f50af8006..486dca3c6 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -121,6 +121,8 @@ struct GcResult
std::chrono::milliseconds WriteBlockMS = {};
std::chrono::milliseconds ElapsedMS = {};
+
+ bool WasCancelled = false;
};
class CbObjectWriter;
@@ -129,7 +131,8 @@ void WriteGCResult(CbObjectWriter& Writer, const GcResult& Result, bool HumanRea
struct GcCtx
{
- const GcSettings Settings;
+ const GcSettings Settings;
+ std::atomic_bool& IsCancelledFlag;
};
typedef tsl::robin_set<IoHash> HashSet;
@@ -341,6 +344,7 @@ public:
void RemoveGcReferenceStore(GcReferenceStore& ReferenceStore);
GcResult CollectGarbage(const GcSettings& Settings);
+ void SetCancelGC(bool CancelFlag);
//////// End GC V2
@@ -359,6 +363,7 @@ public:
void SetDiskWriteBlocker(const DiskWriteBlocker* Monitor) { m_DiskWriteBlocker = Monitor; }
private:
+ bool CheckGCCancel() { return m_CancelGC.load(); }
LoggerRef Log() { return m_Log; }
LoggerRef m_Log;
mutable RwLock m_Lock;
@@ -369,6 +374,8 @@ private:
std::vector<GcReferencer*> m_GcReferencers;
std::vector<GcReferenceStore*> m_GcReferenceStores;
+
+ std::atomic_bool m_CancelGC{false};
};
enum class GcSchedulerStatus : uint32_t