aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-10-30 09:32:54 +0100
committerGitHub <[email protected]>2023-10-30 09:32:54 +0100
commit3a6a5855cf36967c6bde31292669bfaf832c6f0b (patch)
tree593e7c21e6840e7ad312207fddc63e1934e19d85 /src
parentset up arch properly when running tests (mac) (#505) (diff)
downloadzen-3a6a5855cf36967c6bde31292669bfaf832c6f0b.tar.xz
zen-3a6a5855cf36967c6bde31292669bfaf832c6f0b.zip
New GC implementation (#459)
- Feature: New garbage collection implementation, still in evaluation mode. Enabled by `--gc-v2` command line option
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/admin_cmd.cpp16
-rw-r--r--src/zen/cmds/admin_cmd.h2
-rw-r--r--src/zencore/workthreadpool.cpp20
-rw-r--r--src/zenserver/admin/admin.cpp10
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp633
-rw-r--r--src/zenserver/cache/cachedisklayer.h57
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp654
-rw-r--r--src/zenserver/cache/structuredcachestore.h4
-rw-r--r--src/zenserver/config.cpp8
-rw-r--r--src/zenserver/config.h1
-rw-r--r--src/zenserver/projectstore/projectstore.cpp258
-rw-r--r--src/zenserver/projectstore/projectstore.h17
-rw-r--r--src/zenserver/zenserver.cpp3
-rw-r--r--src/zenstore/blockstore.cpp190
-rw-r--r--src/zenstore/compactcas.cpp217
-rw-r--r--src/zenstore/compactcas.h7
-rw-r--r--src/zenstore/filecas.cpp167
-rw-r--r--src/zenstore/filecas.h7
-rw-r--r--src/zenstore/gc.cpp326
-rw-r--r--src/zenstore/include/zenstore/blockstore.h58
-rw-r--r--src/zenstore/include/zenstore/gc.h187
21 files changed, 2701 insertions, 141 deletions
diff --git a/src/zen/cmds/admin_cmd.cpp b/src/zen/cmds/admin_cmd.cpp
index 27341fe58..209390e2a 100644
--- a/src/zen/cmds/admin_cmd.cpp
+++ b/src/zen/cmds/admin_cmd.cpp
@@ -93,6 +93,10 @@ GcCommand::GcCommand()
"Max disk usage size (in bytes)",
cxxopts::value(m_DiskSizeSoftLimit)->default_value("0"),
"<disksizesoftlimit>");
+ m_Options
+ .add_option("", "", "usegcv1", "Force use of GC version 1", cxxopts::value(m_ForceUseGCV1)->default_value("false"), "<usegcv2>");
+ m_Options
+ .add_option("", "", "usegcv2", "Force use of GC version 2", cxxopts::value(m_ForceUseGCV2)->default_value("false"), "<usegcv2>");
}
GcCommand::~GcCommand()
@@ -137,6 +141,18 @@ GcCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
Params.Add({"skipdelete", "true"});
}
+ if (m_ForceUseGCV1)
+ {
+ if (m_ForceUseGCV2)
+ {
+ throw OptionParseException("only usegcv1 or usegcv2 can be selected, not both");
+ }
+ Params.Add({"forceusegcv1", "true"});
+ }
+ if (m_ForceUseGCV2)
+ {
+ Params.Add({"forceusegcv2", "true"});
+ }
cpr::Session Session;
Session.SetHeader(cpr::Header{{"Accept", "application/json"}});
diff --git a/src/zen/cmds/admin_cmd.h b/src/zen/cmds/admin_cmd.h
index 9d7a50e09..4a9de89e2 100644
--- a/src/zen/cmds/admin_cmd.h
+++ b/src/zen/cmds/admin_cmd.h
@@ -41,6 +41,8 @@ private:
bool m_SkipDelete{false};
uint64_t m_MaxCacheDuration{0};
uint64_t m_DiskSizeSoftLimit{0};
+ bool m_ForceUseGCV1{false};
+ bool m_ForceUseGCV2{false};
};
class GcStatusCommand : public ZenCmdBase
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp
index cc21e717a..3a4b1e6a1 100644
--- a/src/zencore/workthreadpool.cpp
+++ b/src/zencore/workthreadpool.cpp
@@ -199,7 +199,10 @@ WorkerThreadPool::WorkerThreadPool(int InThreadCount) : WorkerThreadPool(InThrea
WorkerThreadPool::WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName)
{
- m_Impl = std::make_unique<Impl>(InThreadCount, WorkerThreadBaseName);
+ if (InThreadCount > 0)
+ {
+ m_Impl = std::make_unique<Impl>(InThreadCount, WorkerThreadBaseName);
+ }
}
WorkerThreadPool::~WorkerThreadPool()
@@ -210,7 +213,14 @@ WorkerThreadPool::~WorkerThreadPool()
void
WorkerThreadPool::ScheduleWork(Ref<IWork> Work)
{
- m_Impl->ScheduleWork(std::move(Work));
+ if (m_Impl)
+ {
+ m_Impl->ScheduleWork(std::move(Work));
+ }
+ else
+ {
+ Work->Execute();
+ }
}
void
@@ -222,7 +232,11 @@ WorkerThreadPool::ScheduleWork(std::function<void()>&& Work)
[[nodiscard]] size_t
WorkerThreadPool::PendingWorkItemCount() const
{
- return m_Impl->PendingWorkItemCount();
+ if (m_Impl)
+ {
+ return m_Impl->PendingWorkItemCount();
+ }
+ return 0;
}
//////////////////////////////////////////////////////////////////////////
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 7c7c729c7..fb376f238 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -298,6 +298,16 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
GcParams.SkipDelete = Param == "true"sv;
}
+ if (auto Param = Params.GetValue("forceusegcv1"); Param.empty() == false)
+ {
+ GcParams.ForceGCVersion = GcVersion::kV1;
+ }
+
+ if (auto Param = Params.GetValue("forceusegcv2"); Param.empty() == false)
+ {
+ GcParams.ForceGCVersion = GcVersion::kV2;
+ }
+
const bool Started = m_GcScheduler.TriggerGc(GcParams);
CbObjectWriter Response;
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 2efec1e66..38cbf3a93 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -168,8 +168,13 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
const size_t ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex;
const size_t ZenCacheDiskLayer::CacheBucket::NoReferencesIndex;
-ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, const BucketConfiguration& Config)
-: m_BucketName(std::move(BucketName))
+ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
+ std::atomic_uint64_t& OuterCacheMemoryUsage,
+ std::string BucketName,
+ const BucketConfiguration& Config)
+: m_Gc(Gc)
+, m_OuterCacheMemoryUsage(OuterCacheMemoryUsage)
+, m_BucketName(std::move(BucketName))
, m_Configuration(Config)
, m_BucketId(Oid::Zero)
{
@@ -179,10 +184,12 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, const Bucket
// it makes sense to have a different strategy for legacy values
m_Configuration.LargeObjectThreshold = 16 * 1024 * 1024;
}
+ m_Gc.AddGcReferencer(*this);
}
ZenCacheDiskLayer::CacheBucket::~CacheBucket()
{
+ m_Gc.RemoveGcReferencer(*this);
}
bool
@@ -717,7 +724,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy
}
bool
-ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
@@ -782,8 +789,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
if (!m_CachedPayloads[UpdateIt->second])
{
m_CachedPayloads[UpdateIt->second] = OutValue.Value;
- m_MemCachedSize.fetch_add(ValueSize);
- CacheMemoryUsage.fetch_add(ValueSize);
+ AddMemCacheUsage(ValueSize);
m_MemoryWriteCount++;
}
}
@@ -834,27 +840,24 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
void
-ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
{
- PutStandaloneCacheValue(HashKey, Value, References, CacheMemoryUsage);
+ PutStandaloneCacheValue(HashKey, Value, References);
}
else
{
- PutInlineCacheValue(HashKey, Value, References, CacheMemoryUsage);
+ PutInlineCacheValue(HashKey, Value, References);
}
m_DiskWriteCount++;
}
void
-ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime)
{
GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
@@ -864,8 +867,7 @@ ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime, std:
if (m_AccessTimes[Kv.second] < ExpireTicks)
{
size_t PayloadSize = m_CachedPayloads[Kv.second].GetSize();
- m_MemCachedSize.fetch_sub(PayloadSize);
- CacheMemoryUsage.fetch_sub(PayloadSize);
+ RemoveMemCacheUsage(PayloadSize);
m_CachedPayloads[Kv.second] = {};
}
}
@@ -900,7 +902,7 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart,
}
bool
-ZenCacheDiskLayer::CacheBucket::Drop(std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::Drop()
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop");
@@ -926,7 +928,7 @@ ZenCacheDiskLayer::CacheBucket::Drop(std::atomic_uint64_t& CacheMemoryUsage)
m_NextReferenceHashesIndexes.clear();
m_ReferenceCount = 0;
m_StandaloneSize.store(0);
- CacheMemoryUsage.fetch_sub(m_MemCachedSize.load());
+ m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load());
m_MemCachedSize.store(0);
return Deleted;
@@ -1102,7 +1104,7 @@ ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer)
};
void
-ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub");
@@ -1292,8 +1294,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx, std::atomic_uint
if (m_Configuration.MemCacheSizeThreshold > 0)
{
size_t CachedSize = m_CachedPayloads[It->second].GetSize();
- m_MemCachedSize.fetch_sub(CachedSize);
- CacheMemoryUsage.fetch_sub(CachedSize);
+ RemoveMemCacheUsage(CachedSize);
m_CachedPayloads[It->second] = IoBuffer{};
}
@@ -1411,8 +1412,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
for (const auto& Entry : Index)
{
- const IoHash& Key = Entry.first;
- GcClock::Tick AccessTime = AccessTimes[Entry.second];
+ const IoHash& Key = Entry.first;
+ size_t PayloadIndex = Entry.second;
+ GcClock::Tick AccessTime = AccessTimes[PayloadIndex];
if (AccessTime < ExpireTicks)
{
ExpiredKeys.push_back(Key);
@@ -1424,7 +1426,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
continue;
}
- BucketPayload& Payload = Payloads[Entry.second];
+ BucketPayload& Payload = Payloads[PayloadIndex];
const DiskLocation& Loc = Payload.Location;
if (!Loc.IsFlagSet(DiskLocation::kStructured))
@@ -1433,7 +1435,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
}
if (m_Configuration.EnableReferenceCaching)
{
- if (FirstReferenceIndex.empty() || (FirstReferenceIndex[Entry.second] == UnknownReferencesIndex))
+ if (FirstReferenceIndex.empty() || (FirstReferenceIndex[PayloadIndex] == UnknownReferencesIndex))
{
StructuredItemsWithUnknownAttachments.push_back(Entry);
continue;
@@ -1450,7 +1452,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
});
#endif // CALCULATE_BLOCKING_TIME
- if (auto It = m_Index.find(Entry.first); It != m_Index.end())
+ if (auto It = m_Index.find(Key); It != m_Index.end())
{
ReferencesAreKnown = GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids);
}
@@ -1470,13 +1472,15 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
for (const auto& Entry : StructuredItemsWithUnknownAttachments)
{
- BucketPayload& Payload = Payloads[Entry.second];
- const DiskLocation& Loc = Payload.Location;
+ const IoHash& Key = Entry.first;
+ size_t PayloadIndex = Entry.second;
+ BucketPayload& Payload = Payloads[PayloadIndex];
+ const DiskLocation& Loc = Payload.Location;
{
IoBuffer Buffer;
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Entry.first); !Buffer)
+ if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Key); !Buffer)
{
continue;
}
@@ -1492,7 +1496,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
});
#endif // CALCULATE_BLOCKING_TIME
- if (auto It = m_Index.find(Entry.first); It != m_Index.end())
+ if (auto It = m_Index.find(Key); It != m_Index.end())
{
if (m_Configuration.MemCacheSizeThreshold > 0)
{
@@ -1514,8 +1518,8 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
ZEN_ASSERT(Buffer);
ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
- CbObject Obj(SharedBuffer{Buffer});
- size_t CurrentCidCount = Cids.size();
+ CbObjectView Obj(Buffer.GetData());
+ size_t CurrentCidCount = Cids.size();
Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
if (m_Configuration.EnableReferenceCaching)
{
@@ -1528,7 +1532,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
});
#endif // CALCULATE_BLOCKING_TIME
- if (auto It = m_Index.find(Entry.first); It != m_Index.end())
+ if (auto It = m_Index.find(Key); It != m_Index.end())
{
if (m_FirstReferenceIndex[It->second] == UnknownReferencesIndex)
{
@@ -1556,7 +1560,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
}
void
-ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage");
@@ -1762,17 +1766,19 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin
TotalChunkCount = 0;
for (const auto& Entry : Index)
{
- const DiskLocation& DiskLocation = Payloads[Entry.second].Location;
+ size_t EntryIndex = Entry.second;
+ const DiskLocation& DiskLocation = Payloads[EntryIndex].Location;
if (DiskLocation.Flags & DiskLocation::kStandaloneFile)
{
continue;
}
+ const IoHash& Key = Entry.first;
BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment);
size_t ChunkIndex = ChunkLocations.size();
ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash[ChunkIndex] = Entry.first;
- if (ExpiredCacheKeys.contains(Entry.first))
+ ChunkIndexToChunkHash[ChunkIndex] = Key;
+ if (ExpiredCacheKeys.contains(Key))
{
continue;
}
@@ -1815,12 +1821,12 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin
});
for (const auto& Entry : MovedChunks)
{
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- size_t PayloadIndex = m_Index[ChunkHash];
- BucketPayload& Payload = m_Payloads[PayloadIndex];
- if (Payloads[Index[ChunkHash]].Location != m_Payloads[PayloadIndex].Location)
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ size_t EntryIndex = m_Index[ChunkHash];
+ BucketPayload& Payload = m_Payloads[EntryIndex];
+ if (Payloads[Index[ChunkHash]].Location != m_Payloads[EntryIndex].Location)
{
// Entry has been updated while GC was running, ignore the move
continue;
@@ -1830,9 +1836,9 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin
}
for (const size_t ChunkIndex : RemovedChunks)
{
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- size_t PayloadIndex = m_Index[ChunkHash];
- const BucketPayload& Payload = m_Payloads[PayloadIndex];
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ size_t EntryIndex = m_Index[ChunkHash];
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
if (Payloads[Index[ChunkHash]].Location != Payload.Location)
{
// Entry has been updated while GC was running, ignore the delete
@@ -1843,12 +1849,11 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin
.Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment),
m_Configuration.PayloadAlignment,
OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
- if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[PayloadIndex])
+ if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[EntryIndex])
{
- uint64_t CachePayloadSize = m_CachedPayloads[PayloadIndex].Size();
- m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
- CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
- m_CachedPayloads[PayloadIndex] = IoBuffer{};
+ uint64_t CachePayloadSize = m_CachedPayloads[EntryIndex].Size();
+ RemoveMemCacheUsage(CachePayloadSize);
+ m_CachedPayloads[EntryIndex] = IoBuffer{};
}
m_Index.erase(ChunkHash);
DeletedChunks.insert(ChunkHash);
@@ -1891,10 +1896,10 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index)
const BucketPayload& Payload = m_Payloads[Index];
if (Payload.Location.IsFlagSet(DiskLocation::kStructured))
{
- IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile)
- ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key)
- : GetInlineCacheValue(Payload.Location);
- CbObject Obj(SharedBuffer{Value});
+ IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile)
+ ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key)
+ : GetInlineCacheValue(Payload.Location);
+ CbObjectView Obj(Value.GetData());
Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); });
}
return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(),
@@ -1958,16 +1963,13 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
}
for (CacheBucket* Bucket : Buckets)
{
- Bucket->CollectGarbage(GcCtx, m_TotalMemCachedSize);
+ Bucket->CollectGarbage(GcCtx);
}
MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
}
void
-ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue");
@@ -2118,8 +2120,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey
if (m_CachedPayloads[EntryIndex])
{
uint64_t CachePayloadSize = m_CachedPayloads[EntryIndex].Size();
- m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
- CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
+ RemoveMemCacheUsage(CachePayloadSize);
m_CachedPayloads[EntryIndex] = IoBuffer{};
}
}
@@ -2131,10 +2132,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey
}
void
-ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue");
@@ -2176,14 +2174,12 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
if (m_CachedPayloads[EntryIndex])
{
uint64_t OldCachedSize = m_CachedPayloads[EntryIndex].GetSize();
- m_MemCachedSize.fetch_sub(OldCachedSize);
- CacheMemoryUsage.fetch_sub(OldCachedSize);
+ RemoveMemCacheUsage(OldCachedSize);
}
if (MemCacheBuffer)
{
- m_MemCachedSize.fetch_add(PayloadSize);
- CacheMemoryUsage.fetch_add(PayloadSize);
+ AddMemCacheUsage(PayloadSize);
m_MemoryWriteCount++;
}
m_CachedPayloads[EntryIndex] = std::move(MemCacheBuffer);
@@ -2202,8 +2198,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
{
if (MemCacheBuffer)
{
- m_MemCachedSize.fetch_add(PayloadSize);
- CacheMemoryUsage.fetch_add(PayloadSize);
+ AddMemCacheUsage(PayloadSize);
m_MemoryWriteCount++;
}
m_CachedPayloads.emplace_back(std::move(MemCacheBuffer));
@@ -2219,6 +2214,409 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
}
void
+ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx)
+{
+ size_t TotalEntries = 0;
+ tsl::robin_set<IoHash, IoHash::Hasher> ExpiredInlineKeys;
+ std::vector<std::pair<IoHash, uint64_t>> ExpiredStandaloneKeys;
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gc cache bucket '{}': removed {} expired keys out of {} in {}",
+ m_BucketDir,
+ ExpiredStandaloneKeys.size() + ExpiredInlineKeys.size(),
+ TotalEntries,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count();
+
+ BlockStoreCompactState BlockCompactState;
+ BlockStore::ReclaimSnapshotState BlockSnapshotState;
+ std::vector<IoHash> BlockCompactStateKeys;
+ std::vector<DiskIndexEntry> ExpiredEntries;
+ uint64_t RemovedStandaloneSize = 0;
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ if (Ctx.Settings.CollectSmallObjects)
+ {
+ BlockSnapshotState = m_BlockStore.GetReclaimSnapshotState();
+ }
+ TotalEntries = m_Index.size();
+
+ // Find out expired keys and affected blocks
+ for (const auto& Entry : m_Index)
+ {
+ const IoHash& Key = Entry.first;
+ size_t EntryIndex = Entry.second;
+ GcClock::Tick AccessTime = m_AccessTimes[EntryIndex];
+ if (AccessTime >= ExpireTicks)
+ {
+ continue;
+ }
+
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ DiskIndexEntry ExpiredEntry = {.Key = Key, .Location = Payload.Location};
+ ExpiredEntry.Location.Flags |= DiskLocation::kTombStone;
+
+ if (Payload.Location.Flags & DiskLocation::kStandaloneFile)
+ {
+ ExpiredStandaloneKeys.push_back({Key, Payload.Location.Size()});
+ RemovedStandaloneSize += Payload.Location.Size();
+ ExpiredEntries.push_back(ExpiredEntry);
+ }
+ else if (Ctx.Settings.CollectSmallObjects)
+ {
+ ExpiredInlineKeys.insert(Key);
+ uint32_t BlockIndex = Payload.Location.Location.BlockLocation.GetBlockIndex();
+ bool IsActiveWriteBlock = BlockSnapshotState.m_ActiveWriteBlocks.contains(BlockIndex);
+ if (!IsActiveWriteBlock)
+ {
+ BlockCompactState.AddBlock(BlockIndex);
+ }
+ ExpiredEntries.push_back(ExpiredEntry);
+ }
+ }
+
+ Ctx.ExpiredItems.fetch_add(ExpiredStandaloneKeys.size() + ExpiredInlineKeys.size());
+
+ // Get all locations we need to keep for affected blocks
+ if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty())
+ {
+ for (const auto& Entry : m_Index)
+ {
+ const IoHash& Key = Entry.first;
+ if (ExpiredInlineKeys.contains(Key))
+ {
+ continue;
+ }
+ size_t EntryIndex = Entry.second;
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ if (Payload.Location.Flags & DiskLocation::kStandaloneFile)
+ {
+ continue;
+ }
+ if (BlockCompactState.AddKeepLocation(Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment)))
+ {
+ BlockCompactStateKeys.push_back(Key);
+ }
+ }
+ }
+
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ for (const DiskIndexEntry& Entry : ExpiredEntries)
+ {
+ auto It = m_Index.find(Entry.Key);
+ ZEN_ASSERT(It != m_Index.end());
+ if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[It->second])
+ {
+ size_t PayloadSize = m_CachedPayloads[It->second].GetSize();
+ Ctx.RemovedMemory.fetch_add(PayloadSize);
+ RemoveMemCacheUsage(PayloadSize);
+ }
+ m_Index.erase(It);
+ }
+ m_SlogFile.Append(ExpiredEntries);
+ m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed);
+ }
+ }
+ Ctx.Items.fetch_add(TotalEntries);
+
+ if (ExpiredEntries.empty())
+ {
+ return;
+ }
+
+ if (!Ctx.Settings.IsDeleteMode)
+ {
+ return;
+ }
+
+ Ctx.DeletedItems.fetch_add(ExpiredEntries.size());
+
+ // Compact standalone items
+ ExtendablePathBuilder<256> Path;
+ for (const std::pair<IoHash, uint64_t>& ExpiredKey : ExpiredStandaloneKeys)
+ {
+ Path.Reset();
+ BuildPath(Path, ExpiredKey.first);
+ fs::path FilePath = Path.ToPath();
+
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ if (m_Index.contains(ExpiredKey.first))
+ {
+ // Someone added it back, let the file on disk be
+ ZEN_DEBUG("gc cache bucket '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back",
+ m_BucketDir,
+ Path.ToUtf8());
+ continue;
+ }
+
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(ExpiredKey.first));
+ IndexLock.ReleaseNow();
+ ZEN_DEBUG("gc cache bucket '{}': deleting standalone cache file '{}'", m_BucketDir, Path.ToUtf8());
+
+ std::error_code Ec;
+ if (!fs::remove(FilePath, Ec))
+ {
+ continue;
+ }
+ if (Ec)
+ {
+ ZEN_WARN("gc cache bucket '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'",
+ m_BucketDir,
+ Path.ToUtf8(),
+ Ec.message());
+ continue;
+ }
+ Ctx.RemovedDiskSpace.fetch_add(ExpiredKey.second);
+ }
+
+ if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty())
+ {
+ // Compact block store
+ m_BlockStore.CompactBlocks(
+ BlockCompactState,
+ m_Configuration.PayloadAlignment,
+ [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) {
+ std::vector<DiskIndexEntry> MovedEntries;
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
+ {
+ size_t ChunkIndex = Moved.first;
+ const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
+
+ if (auto It = m_Index.find(Key); It != m_Index.end())
+ {
+ BucketPayload& Payload = m_Payloads[It->second];
+ const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex);
+ if (Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment) != OldLocation)
+ {
+ // Someone has moved our chunk so lets just skip the new location we were provided, it will be GC:d at a later
+ // time
+ continue;
+ }
+
+ const BlockStoreLocation& NewLocation = Moved.second;
+
+ Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags());
+ MovedEntries.push_back({.Key = Key, .Location = Payload.Location});
+ }
+ }
+ m_SlogFile.Append(MovedEntries);
+ Ctx.RemovedDiskSpace.fetch_add(FreedDiskSpace);
+ },
+ [&]() { return 0; });
+ }
+
+ std::vector<BucketPayload> Payloads;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<IoBuffer> CachedPayloads;
+ std::vector<size_t> FirstReferenceIndex;
+ IndexMap Index;
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ CompactState(Payloads, AccessTimes, CachedPayloads, FirstReferenceIndex, Index, IndexLock);
+ }
+}
+
+class DiskBucketReferenceChecker : public GcReferenceChecker
+{
+public:
+ DiskBucketReferenceChecker(ZenCacheDiskLayer::CacheBucket& Owner) : m_CacheBucket(Owner) {}
+
+ virtual ~DiskBucketReferenceChecker()
+ {
+ m_IndexLock.reset();
+ if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it
+ m_CacheBucket.ClearReferenceCache();
+ }
+ }
+
+ virtual void LockState(GcCtx&) override
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gc cache bucket '{}': found {} references in {}",
+ m_CacheBucket.m_BucketDir,
+ m_CacheBucket.m_ReferenceCount + m_UncachedReferences.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock);
+
+ // Rescan to see if any cache items needs refreshing since last pass when we had the lock
+ for (const auto& Entry : m_CacheBucket.m_Index)
+ {
+ size_t PayloadIndex = Entry.second;
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_CacheBucket.m_Payloads[PayloadIndex];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ continue;
+ }
+ ZEN_ASSERT(!m_CacheBucket.m_FirstReferenceIndex.empty());
+ const IoHash& Key = Entry.first;
+ if (m_CacheBucket.m_FirstReferenceIndex[PayloadIndex] == ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex)
+ {
+ IoBuffer Buffer;
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ Buffer = m_CacheBucket.GetStandaloneCacheValue(Loc.GetContentType(), Key);
+ }
+ else
+ {
+ Buffer = m_CacheBucket.GetInlineCacheValue(Loc);
+ }
+
+ if (Buffer)
+ {
+ ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
+ CbObjectView Obj(Buffer.GetData());
+ Obj.IterateAttachments([this](CbFieldView Field) { m_UncachedReferences.insert(Field.AsAttachment()); });
+ }
+ }
+ }
+ }
+
+ virtual void RemoveUsedReferencesFromSet(GcCtx&, HashSet& IoCids) override
+ {
+ ZEN_ASSERT(m_IndexLock);
+
+ for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes)
+ {
+ IoCids.erase(ReferenceHash);
+ }
+
+ for (const IoHash& ReferenceHash : m_UncachedReferences)
+ {
+ IoCids.erase(ReferenceHash);
+ }
+ }
+ ZenCacheDiskLayer::CacheBucket& m_CacheBucket;
+ std::unique_ptr<RwLock::SharedLockScope> m_IndexLock;
+ HashSet m_UncachedReferences;
+};
+
+std::vector<GcReferenceChecker*>
+ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx&)
+{
+ Stopwatch Timer;
+ const auto _ = MakeGuard(
+ [&] { ZEN_DEBUG("gc cache bucket '{}': refreshed reference cache in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
+ std::vector<IoHash> UpdateKeys;
+ std::vector<IoHash> StandaloneKeys;
+ std::vector<size_t> ReferenceCounts;
+ std::vector<IoHash> References;
+
+ // Refresh cache
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ for (const auto& Entry : m_Index)
+ {
+ size_t PayloadIndex = Entry.second;
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Payloads[PayloadIndex];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ continue;
+ }
+ if (m_Configuration.EnableReferenceCaching &&
+ m_FirstReferenceIndex[PayloadIndex] != ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex)
+ {
+ continue;
+ }
+ const IoHash& Key = Entry.first;
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ StandaloneKeys.push_back(Key);
+ continue;
+ }
+ IoBuffer Buffer = GetInlineCacheValue(Loc);
+ if (!Buffer)
+ {
+ UpdateKeys.push_back(Key);
+ ReferenceCounts.push_back(0);
+ continue;
+ }
+ size_t CurrentReferenceCount = References.size();
+ {
+ CbObjectView Obj(Buffer.GetData());
+ Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); });
+ Buffer = {};
+ }
+ UpdateKeys.push_back(Key);
+ ReferenceCounts.push_back(References.size() - CurrentReferenceCount);
+ }
+ }
+ {
+ for (const IoHash& Key : StandaloneKeys)
+ {
+ IoBuffer Buffer = GetStandaloneCacheValue(ZenContentType::kCbObject, Key);
+ if (!Buffer)
+ {
+ continue;
+ }
+
+ size_t CurrentReferenceCount = References.size();
+ {
+ CbObjectView Obj(Buffer.GetData());
+ Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); });
+ Buffer = {};
+ }
+ UpdateKeys.push_back(Key);
+ ReferenceCounts.push_back(References.size() - CurrentReferenceCount);
+ }
+ }
+
+ {
+ size_t ReferenceOffset = 0;
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ if (!m_Configuration.EnableReferenceCaching)
+ {
+ ZEN_ASSERT(m_FirstReferenceIndex.empty());
+ ZEN_ASSERT(m_ReferenceHashes.empty());
+ ZEN_ASSERT(m_NextReferenceHashesIndexes.empty());
+ ZEN_ASSERT(m_ReferenceCount == 0);
+ // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when
+ // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted.
+ m_FirstReferenceIndex.resize(m_Payloads.size(), UnknownReferencesIndex);
+ }
+ for (size_t Index = 0; Index < UpdateKeys.size(); Index++)
+ {
+ const IoHash& Key = UpdateKeys[Index];
+ size_t ReferenceCount = ReferenceCounts[Index];
+ auto It = m_Index.find(Key);
+ if (It == m_Index.end())
+ {
+ ReferenceOffset += ReferenceCount;
+ continue;
+ }
+ if (m_FirstReferenceIndex[It->second] != ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex)
+ {
+ continue;
+ }
+ SetReferences(IndexLock,
+ m_FirstReferenceIndex[It->second],
+ std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount});
+ ReferenceOffset += ReferenceCount;
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ CompactReferences(IndexLock);
+ }
+ }
+
+ return {new DiskBucketReferenceChecker(*this)};
+}
+
+void
ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&)
{
std::vector<size_t> FirstReferenceIndex;
@@ -2381,6 +2779,19 @@ ZenCacheDiskLayer::CacheBucket::LockedGetReferences(std::size_t FirstReferenceIn
}
void
+ZenCacheDiskLayer::CacheBucket::ClearReferenceCache()
+{
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ m_FirstReferenceIndex.clear();
+ m_FirstReferenceIndex.shrink_to_fit();
+ m_ReferenceHashes.clear();
+ m_ReferenceHashes.shrink_to_fit();
+ m_NextReferenceHashesIndexes.clear();
+ m_NextReferenceHashesIndexes.shrink_to_fit();
+ m_ReferenceCount = 0;
+}
+
+void
ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloads,
std::vector<AccessTime>& AccessTimes,
std::vector<IoBuffer>& CachedPayloads,
@@ -2426,16 +2837,34 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payload
}
}
+#if ZEN_WITH_TESTS
+void
+ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time)
+{
+ GcClock::Tick TimeTick = Time.time_since_epoch().count();
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ {
+ size_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
+ m_AccessTimes[EntryIndex] = TimeTick;
+ }
+}
+#endif // ZEN_WITH_TESTS
+
//////////////////////////////////////////////////////////////////////////
-ZenCacheDiskLayer::ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config)
-: m_JobQueue(JobQueue)
+ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config)
+: m_Gc(Gc)
+, m_JobQueue(JobQueue)
, m_RootDir(RootDir)
, m_Configuration(Config)
{
}
-ZenCacheDiskLayer::~ZenCacheDiskLayer() = default;
+ZenCacheDiskLayer::~ZenCacheDiskLayer()
+{
+}
bool
ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
@@ -2468,8 +2897,10 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
else
{
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig));
- Bucket = InsertResult.first->second.get();
+ auto InsertResult =
+ m_Buckets.emplace(BucketName,
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
+ Bucket = InsertResult.first->second.get();
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= BucketName;
@@ -2483,7 +2914,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
ZEN_ASSERT(Bucket != nullptr);
- if (Bucket->Get(HashKey, OutValue, m_TotalMemCachedSize))
+ if (Bucket->Get(HashKey, OutValue))
{
TryMemCacheTrim();
return true;
@@ -2522,8 +2953,10 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
}
else
{
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig));
- Bucket = InsertResult.first->second.get();
+ auto InsertResult =
+ m_Buckets.emplace(BucketName,
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
+ Bucket = InsertResult.first->second.get();
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= BucketName;
@@ -2547,7 +2980,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
ZEN_ASSERT(Bucket != nullptr);
- Bucket->Put(HashKey, Value, References, m_TotalMemCachedSize);
+ Bucket->Put(HashKey, Value, References);
TryMemCacheTrim();
}
@@ -2579,8 +3012,10 @@ ZenCacheDiskLayer::DiscoverBuckets()
continue;
}
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig));
- CacheBucket& Bucket = *InsertResult.first->second;
+ auto InsertResult =
+ m_Buckets.emplace(BucketName,
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
+ CacheBucket& Bucket = *InsertResult.first->second;
try
{
@@ -2636,7 +3071,7 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
m_DroppedBuckets.push_back(std::move(It->second));
m_Buckets.erase(It);
- return Bucket.Drop(m_TotalMemCachedSize);
+ return Bucket.Drop();
}
// Make sure we remove the folder even if we don't know about the bucket
@@ -2658,7 +3093,7 @@ ZenCacheDiskLayer::Drop()
CacheBucket& Bucket = *It->second;
m_DroppedBuckets.push_back(std::move(It->second));
m_Buckets.erase(It->first);
- if (!Bucket.Drop(m_TotalMemCachedSize))
+ if (!Bucket.Drop())
{
return false;
}
@@ -2700,10 +3135,10 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
{
#if 1
Results.push_back(Ctx.ThreadPool().EnqueueTask(
- std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx, m_TotalMemCachedSize); }}));
+ std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
#else
CacheBucket& Bucket = *Kv.second;
- Bucket.ScrubStorage(Ctx, m_TotalMemCachedSize);
+ Bucket.ScrubStorage(Ctx);
#endif
}
@@ -2914,7 +3349,7 @@ ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::Tim
RwLock::SharedLockScope __(m_Lock);
for (CacheBucket* Bucket : Buckets)
{
- Bucket->MemCacheTrim(ExpireTime, m_TotalMemCachedSize);
+ Bucket->MemCacheTrim(ExpireTime);
}
const GcClock::TimePoint Now = GcClock::Now();
const GcClock::Tick NowTick = Now.time_since_epoch().count();
@@ -2924,4 +3359,30 @@ ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::Tim
m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
}
+#if ZEN_WITH_TESTS
+void
+ZenCacheDiskLayer::SetAccessTime(std::string_view InBucket, const IoHash& HashKey, GcClock::TimePoint Time)
+{
+ const auto BucketName = std::string(InBucket);
+ CacheBucket* Bucket = nullptr;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ auto It = m_Buckets.find(BucketName);
+
+ if (It != m_Buckets.end())
+ {
+ Bucket = It->second.get();
+ }
+ }
+
+ if (Bucket == nullptr)
+ {
+ return;
+ }
+ Bucket->SetAccessTime(HashKey, Time);
+}
+#endif // ZEN_WITH_TESTS
+
} // namespace zen
diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h
index cc6653e28..d8f51c398 100644
--- a/src/zenserver/cache/cachedisklayer.h
+++ b/src/zenserver/cache/cachedisklayer.h
@@ -151,7 +151,7 @@ public:
uint64_t MemorySize;
};
- explicit ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
+ explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheDiskLayer();
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -174,24 +174,28 @@ public:
CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const;
+#if ZEN_WITH_TESTS
+ void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time);
+#endif // ZEN_WITH_TESTS
+
private:
/** A cache bucket manages a single directory containing
metadata and data for that bucket
*/
- struct CacheBucket
+ struct CacheBucket : public GcReferencer
{
- CacheBucket(std::string BucketName, const BucketConfiguration& Config);
+ CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config);
~CacheBucket();
bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
- bool Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, std::atomic_uint64_t& CacheMemoryUsage);
- void MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage);
- bool Drop(std::atomic_uint64_t& CacheMemoryUsage);
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ void MemCacheTrim(GcClock::TimePoint ExpireTime);
+ bool Drop();
void Flush();
- void ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage);
+ void ScrubStorage(ScrubContext& Ctx);
void GatherReferences(GcContext& GcCtx);
- void CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage);
+ void CollectGarbage(GcContext& GcCtx);
inline GcStorageSize StorageSize() const
{
@@ -205,8 +209,13 @@ private:
void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const;
void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots);
+#if ZEN_WITH_TESTS
+ void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time);
+#endif // ZEN_WITH_TESTS
private:
+ GcManager& m_Gc;
+ std::atomic_uint64_t& m_OuterCacheMemoryUsage;
std::string m_BucketName;
std::filesystem::path m_BucketDir;
std::filesystem::path m_BlocksBasePath;
@@ -258,16 +267,13 @@ private:
std::atomic_uint64_t m_StandaloneSize{};
std::atomic_uint64_t m_MemCachedSize{};
+ virtual void RemoveExpiredData(GcCtx& Ctx) override;
+ virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override;
+
void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
- void PutStandaloneCacheValue(const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- std::atomic_uint64_t& CacheMemoryUsage);
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
IoBuffer GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const;
- void PutInlineCacheValue(const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- std::atomic_uint64_t& CacheMemoryUsage);
+ void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
void MakeIndexSnapshot();
uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion);
@@ -289,6 +295,7 @@ private:
}
size_t AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key);
bool LockedGetReferences(std::size_t FirstReferenceIndex, std::vector<IoHash>& OutReferences) const;
+ void ClearReferenceCache();
void CompactState(std::vector<BucketPayload>& TmpPayloads,
std::vector<AccessTime>& TmpAccessTimes,
@@ -297,6 +304,17 @@ private:
IndexMap& TmpIndex,
RwLock::ExclusiveLockScope& IndexLock);
+ void AddMemCacheUsage(uint64_t ValueSize)
+ {
+ m_MemCachedSize.fetch_add(ValueSize, std::memory_order::relaxed);
+ m_OuterCacheMemoryUsage.fetch_add(ValueSize, std::memory_order::relaxed);
+ }
+ void RemoveMemCacheUsage(uint64_t ValueSize)
+ {
+ m_MemCachedSize.fetch_sub(ValueSize, std::memory_order::relaxed);
+ m_OuterCacheMemoryUsage.fetch_sub(ValueSize, std::memory_order::relaxed);
+ }
+
// These locks are here to avoid contention on file creation, therefore it's sufficient
// that we take the same lock for the same hash
//
@@ -305,6 +323,8 @@ private:
// an issue in practice
mutable RwLock m_ShardedLocks[256];
inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; }
+
+ friend class DiskBucketReferenceChecker;
};
inline void TryMemCacheTrim()
@@ -326,6 +346,7 @@ private:
void MemCacheTrim();
void MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime);
+ GcManager& m_Gc;
JobQueue& m_JobQueue;
std::filesystem::path m_RootDir;
Configuration m_Configuration;
@@ -338,6 +359,8 @@ private:
ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete;
ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete;
+
+ friend class DiskBucketReferenceChecker;
};
} // namespace zen
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index 6fab14eee..516532528 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -64,7 +64,7 @@ ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const st
, m_JobQueue(JobQueue)
, m_RootDir(RootDir)
, m_Configuration(Config)
-, m_DiskLayer(m_JobQueue, m_RootDir, m_Configuration.DiskLayerConfig)
+, m_DiskLayer(m_Gc, m_JobQueue, m_RootDir, m_Configuration.DiskLayerConfig)
{
ZEN_INFO("initializing structured cache at '{}'", m_RootDir);
CreateDirectories(m_RootDir);
@@ -232,6 +232,14 @@ ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const st
return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter);
}
+#if ZEN_WITH_TESTS
+void
+ZenCacheNamespace::SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time)
+{
+ m_DiskLayer.SetAccessTime(Bucket, HashKey, Time);
+}
+#endif // ZEN_WITH_TESTS
+
//////////////////////////// ZenCacheStore
ZEN_DEFINE_LOG_CATEGORY_STATIC(LogCacheActivity, "z$");
@@ -784,6 +792,73 @@ namespace testutils {
return Buf;
};
+ IoHash ToIoHash(const Oid& Id)
+ {
+ char OIdString[24 + 1];
+ Id.ToString(OIdString);
+ IoHash Key = IoHash::HashBuffer(OIdString, 24);
+ return Key;
+ }
+
+ std::pair<Oid, IoBuffer> CreateBinaryBlob(size_t Size)
+ {
+ uint64_t seed{Size};
+ auto next = [](uint64_t& seed) {
+ uint64_t z = (seed += UINT64_C(0x9E3779B97F4A7C15));
+ z = (z ^ (z >> 30)) * UINT64_C(0xBF58476D1CE4E5B9);
+ z = (z ^ (z >> 27)) * UINT64_C(0x94D049BB133111EB);
+ return z ^ (z >> 31);
+ };
+
+ IoBuffer Data(Size);
+ uint64_t* DataPtr = reinterpret_cast<uint64_t*>(Data.MutableData());
+ while (Size > sizeof(uint64_t))
+ {
+ *DataPtr++ = next(seed);
+ Size -= sizeof(uint64_t);
+ }
+ uint64_t ByteNext = next(seed);
+ uint8_t* ByteDataPtr = reinterpret_cast<uint8_t*>(DataPtr);
+ while (Size > 0)
+ {
+ *ByteDataPtr++ = static_cast<uint8_t>(ByteNext & 0xff);
+ ByteNext >>= 8;
+ Size--;
+ }
+ return {Oid::NewOid(), Data};
+ }
+
+ std::vector<std::pair<Oid, CompressedBuffer>> CreateCompressedAttachment(CidStore& Store, const std::span<const size_t>& Sizes)
+ {
+ std::vector<std::pair<Oid, CompressedBuffer>> Result;
+ Result.reserve(Sizes.size());
+ for (size_t Size : Sizes)
+ {
+ auto Blob = CreateBinaryBlob(Size);
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Blob.second.Data(), Blob.second.Size()));
+ CHECK(!Store.ContainsChunk(Compressed.DecodeRawHash()));
+ Result.emplace_back(std::pair<Oid, CompressedBuffer>(Blob.first, Compressed));
+ }
+ return Result;
+ }
+
+ std::pair<IoHash, IoBuffer> CreateRecord(std::span<std::pair<Oid, CompressedBuffer>> Attachments)
+ {
+ Oid Id = Oid::NewOid();
+ IoHash Key = ToIoHash(Id);
+ CbObjectWriter Record;
+ Record << "Key"sv << Id;
+
+ for (size_t Idx = 0; auto& Cid : Attachments)
+ {
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid.second.DecodeRawHash());
+ }
+
+ IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+ return {Key, Buffer};
+ }
+
} // namespace testutils
TEST_CASE("z$.store")
@@ -1752,6 +1827,583 @@ TEST_CASE("z$.scrub")
CHECK(ScrubCtx.BadCids().GetSize() == 0);
}
+TEST_CASE("z$.newgc.basics")
+{
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
+ struct CacheEntry
+ {
+ IoBuffer Data;
+ std::vector<std::pair<Oid, CompressedBuffer>> Attachments;
+ };
+
+ std::unordered_map<IoHash, CacheEntry> CacheEntries;
+
+ auto CreateCacheRecord =
+ [&](ZenCacheNamespace& Zcs, CidStore& CidStore, std::string_view Bucket, std::span<std::pair<Oid, CompressedBuffer>> Attachments) {
+ std::vector<IoHash> AttachmentKeys;
+ for (const auto& Attachment : Attachments)
+ {
+ AttachmentKeys.push_back(Attachment.second.DecodeRawHash());
+ }
+ auto Record = CreateRecord(Attachments);
+ Zcs.Put(Bucket,
+ Record.first,
+ {.Value = Record.second,
+ .RawSize = Record.second.GetSize(),
+ .RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())},
+ AttachmentKeys);
+ for (const auto& Attachment : Attachments)
+ {
+ CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash());
+ }
+ CacheEntries.insert({Record.first, CacheEntry{.Data = Record.second, .Attachments = {Attachments.begin(), Attachments.end()}}});
+ return Record.first;
+ };
+ auto CreateCacheValue = [&](ZenCacheNamespace& Zcs, std::string_view Bucket, size_t Size) {
+ std::pair<Oid, IoBuffer> CacheValue = CreateBinaryBlob(Size);
+ IoHash Key = ToIoHash(CacheValue.first);
+ Zcs.Put(Bucket,
+ Key,
+ {.Value = CacheValue.second,
+ .RawSize = CacheValue.second.GetSize(),
+ .RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())},
+ {});
+ CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}});
+ return Key;
+ };
+
+ auto ValidateCacheEntry = [&](ZenCacheNamespace& Zcs,
+ CidStore& CidStore,
+ std::string_view Bucket,
+ const IoHash& Key,
+ bool ExpectCacheEntry,
+ bool ExpectAttachments) {
+ const CacheEntry& Entry = CacheEntries[Key];
+ ZenCacheValue Value;
+ bool CacheExists = Zcs.Get(Bucket, Key, Value);
+ if (ExpectCacheEntry)
+ {
+ if (!CacheExists)
+ {
+ return false;
+ }
+ if (Value.Value.GetSize() != Entry.Data.GetSize())
+ {
+ return false;
+ }
+ if (!Value.Value.GetView().EqualBytes(Entry.Data.GetView()))
+ {
+ return false;
+ }
+ }
+ else if (CacheExists)
+ {
+ return false;
+ }
+
+ if (ExpectAttachments)
+ {
+ for (const auto& Attachment : Entry.Attachments)
+ {
+ IoHash AttachmentHash = Attachment.second.DecodeRawHash();
+ IoBuffer StoredData = CidStore.FindChunkByCid(AttachmentHash);
+ if (!StoredData)
+ {
+ return false;
+ }
+ if (!StoredData.GetView().EqualBytes(Attachment.second.GetCompressed().Flatten().GetView()))
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ for (const auto& Attachment : Entry.Attachments)
+ {
+ IoHash AttachmentHash = Attachment.second.DecodeRawHash();
+ if (CidStore.ContainsChunk(AttachmentHash))
+ {
+ return false;
+ }
+ }
+ }
+ return true;
+ };
+
+ std::vector<IoHash> CacheRecords;
+ std::vector<IoHash> UnstructuredCacheValues;
+
+ const auto TearDrinkerBucket = "teardrinker"sv;
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+
+ // Create some basic data
+ {
+ // Structured record with attachments
+ auto Attachments1 = CreateCompressedAttachment(CidStore, std::vector<size_t>{77, 1024 * 1024 * 2, 99, 1024 * 1024 * 2 + 87});
+ CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments1));
+
+ // Structured record with reuse of attachments
+ auto Attachments2 = CreateCompressedAttachment(CidStore, std::vector<size_t>{971});
+ Attachments2.push_back(Attachments1[0]);
+ Attachments2.push_back(Attachments1[1]);
+ CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments2));
+ }
+
+ CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, {}));
+
+ {
+ // Unstructured cache values
+ UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 84));
+ UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 591));
+ UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 1024 * 1024 * 3 + 7));
+ UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 71));
+ }
+ }
+
+ SUBCASE("expire nothing")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() - std::chrono::hours(1),
+ .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = false});
+ CHECK_EQ(7u, Result.Items);
+ CHECK_EQ(0u, Result.ExpiredItems);
+ CHECK_EQ(0u, Result.DeletedItems);
+ CHECK_EQ(5u, Result.References);
+ CHECK_EQ(0u, Result.PrunedReferences);
+ CHECK_EQ(0u, Result.CompactedReferences);
+ CHECK_EQ(0u, Result.RemovedDiskSpace);
+ CHECK_EQ(0u, Result.RemovedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all large objects, delete nothing")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = false});
+ CHECK_EQ(7u, Result.Items);
+ CHECK_EQ(1u, Result.ExpiredItems);
+ CHECK_EQ(0u, Result.DeletedItems);
+ CHECK_EQ(5u, Result.References);
+ CHECK_EQ(0u, Result.PrunedReferences);
+ CHECK_EQ(0u, Result.CompactedReferences);
+ CHECK_EQ(0u, Result.RemovedDiskSpace);
+ CHECK_EQ(0u, Result.RemovedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all object, delete nothing")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = false});
+ CHECK_EQ(7u, Result.Items);
+ CHECK_EQ(7u, Result.ExpiredItems);
+ CHECK_EQ(0u, Result.DeletedItems);
+ CHECK_EQ(5u, Result.References);
+ CHECK_EQ(0u, Result.PrunedReferences);
+ CHECK_EQ(0u, Result.CompactedReferences);
+ CHECK_EQ(0u, Result.RemovedDiskSpace);
+ CHECK_EQ(0u, Result.RemovedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all large objects, skip cid")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = true,
+ .SkipCidDelete = true});
+ CHECK_EQ(7u, Result.Items);
+ CHECK_EQ(1u, Result.ExpiredItems);
+ CHECK_EQ(1u, Result.DeletedItems);
+ CHECK_EQ(0u, Result.References);
+ CHECK_EQ(0u, Result.PrunedReferences);
+ CHECK_EQ(0u, Result.CompactedReferences);
+ CHECK_EQ(CacheEntries[UnstructuredCacheValues[2]].Data.GetSize(), Result.RemovedDiskSpace);
+ CHECK_EQ(0u, Result.RemovedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all objects, skip cid")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = true});
+ CHECK_EQ(7u, Result.Items);
+ CHECK_EQ(7u, Result.ExpiredItems);
+ CHECK_EQ(7u, Result.DeletedItems);
+ CHECK_EQ(0u, Result.References);
+ CHECK_EQ(0u, Result.PrunedReferences);
+ CHECK_EQ(0u, Result.CompactedReferences);
+ CHECK_GE(Result.RemovedDiskSpace, 0);
+ CHECK_EQ(0u, Result.RemovedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, true));
+ }
+ SUBCASE("expire all large objects")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .CollectSmallObjects = false,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false});
+ CHECK_EQ(7u, Result.Items);
+ CHECK_EQ(1u, Result.ExpiredItems); // Only one cache value is pruned/deleted as that is the only large item in the cache (all other
+ // large items as in cas)
+ CHECK_EQ(1u, Result.DeletedItems);
+ CHECK_EQ(5u, Result.References);
+ CHECK_EQ(0u,
+ Result.PrunedReferences); // We won't remove any references since all referencers are small which retains all references
+ CHECK_EQ(0u, Result.CompactedReferences);
+ CHECK_EQ(CacheEntries[UnstructuredCacheValues[2]].Data.GetSize(), Result.RemovedDiskSpace);
+ CHECK_EQ(0u, Result.RemovedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+ SUBCASE("expire all objects")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false});
+ CHECK_EQ(7u, Result.Items);
+ CHECK_EQ(7u, Result.ExpiredItems);
+ CHECK_EQ(7u, Result.DeletedItems);
+ CHECK_EQ(5u, Result.References);
+ CHECK_EQ(5u, Result.PrunedReferences);
+ CHECK_EQ(5u, Result.CompactedReferences);
+ CHECK_GT(Result.RemovedDiskSpace, 0);
+ CHECK_EQ(0u, Result.RemovedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false));
+ }
+
+ SUBCASE("keep 1 cache record, skip cid")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::minutes(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = true});
+ CHECK_EQ(7u, Result.Items);
+ CHECK_EQ(6u, Result.ExpiredItems);
+ CHECK_EQ(6u, Result.DeletedItems);
+ CHECK_EQ(0u, Result.References);
+ CHECK_EQ(0u, Result.PrunedReferences);
+ CHECK_EQ(0u, Result.CompactedReferences);
+ CHECK_GT(Result.RemovedDiskSpace, 0);
+ CHECK_EQ(0u, Result.RemovedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false));
+ }
+
+ SUBCASE("keep 2 cache records")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::minutes(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[1], GcClock::Now() + std::chrono::minutes(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false});
+ CHECK_EQ(7u, Result.Items);
+ CHECK_EQ(5u, Result.ExpiredItems);
+ CHECK_EQ(5u, Result.DeletedItems);
+ CHECK_EQ(5u, Result.References);
+ CHECK_EQ(0u, Result.PrunedReferences);
+ CHECK_EQ(0u, Result.CompactedReferences);
+ CHECK_GT(Result.RemovedDiskSpace, 0);
+ CHECK_EQ(0u, Result.RemovedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false));
+ }
+
+ SUBCASE("keep 3 cache value")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::minutes(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::minutes(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::minutes(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false});
+ CHECK_EQ(7u, Result.Items);
+ CHECK_EQ(4u, Result.ExpiredItems);
+ CHECK_EQ(4u, Result.DeletedItems);
+ CHECK_EQ(5u, Result.References);
+ CHECK_EQ(5u, Result.PrunedReferences);
+ CHECK_EQ(5u, Result.CompactedReferences);
+ CHECK_GT(Result.RemovedDiskSpace, 0);
+ CHECK_EQ(0u, Result.RemovedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+
+ SUBCASE("keep 3 cache value, skip cid")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+
+ // Prime so we can check GC of memory layer
+ ZenCacheValue Dummy;
+ Zcs.Get(TearDrinkerBucket, CacheRecords[0], Dummy);
+ Zcs.Get(TearDrinkerBucket, CacheRecords[1], Dummy);
+ Zcs.Get(TearDrinkerBucket, CacheRecords[2], Dummy);
+ Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[0], Dummy);
+ Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[1], Dummy);
+ Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[2], Dummy);
+ Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[3], Dummy);
+
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::minutes(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::minutes(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::minutes(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = true});
+ CHECK_EQ(7u, Result.Items);
+ CHECK_EQ(4u, Result.ExpiredItems);
+ CHECK_EQ(4u, Result.DeletedItems);
+ CHECK_EQ(0u, Result.References);
+ CHECK_EQ(0u, Result.PrunedReferences);
+ CHECK_EQ(0u, Result.CompactedReferences);
+ CHECK_GT(Result.RemovedDiskSpace, 0);
+ uint64_t MemoryClean = CacheEntries[CacheRecords[0]].Data.GetSize() + CacheEntries[CacheRecords[1]].Data.GetSize() +
+ CacheEntries[CacheRecords[2]].Data.GetSize() + CacheEntries[UnstructuredCacheValues[0]].Data.GetSize();
+ CHECK_EQ(MemoryClean, Result.RemovedMemory);
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true));
+
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true));
+ CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true));
+ }
+
+ SUBCASE("leave write block")
+ {
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+
+ auto Attachments =
+ CreateCompressedAttachment(CidStore, std::vector<size_t>{177, 1024 * 1024 * 2 + 31, 8999, 1024 * 1024 * 2 + 187});
+ IoHash CacheRecord = CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments);
+
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecord, GcClock::Now() - std::chrono::minutes(2));
+
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::minutes(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[1], GcClock::Now() + std::chrono::minutes(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[2], GcClock::Now() + std::chrono::minutes(2));
+
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[0], GcClock::Now() + std::chrono::minutes(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::minutes(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::minutes(2));
+ Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::minutes(2));
+
+ GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true,
+ .SkipCidDelete = false});
+ CHECK_EQ(8u, Result.Items);
+ CHECK_EQ(1u, Result.ExpiredItems);
+ CHECK_EQ(1u, Result.DeletedItems);
+ CHECK_EQ(9u, Result.References);
+ CHECK_EQ(4u, Result.PrunedReferences);
+ CHECK_EQ(4u, Result.CompactedReferences);
+ CHECK_EQ(Attachments[1].second.GetCompressed().GetSize() + Attachments[3].second.GetCompressed().GetSize(),
+ Result.RemovedDiskSpace);
+ uint64_t MemoryClean = CacheEntries[CacheRecord].Data.GetSize();
+ CHECK_EQ(MemoryClean, Result.RemovedMemory);
+ }
+}
+
#endif
void
diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h
index dacf482d8..a3cac0d44 100644
--- a/src/zenserver/cache/structuredcachestore.h
+++ b/src/zenserver/cache/structuredcachestore.h
@@ -106,6 +106,10 @@ public:
CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const;
+#if ZEN_WITH_TESTS
+ void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time);
+#endif // ZEN_WITH_TESTS
+
private:
GcManager& m_Gc;
JobQueue& m_JobQueue;
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index ca438fe38..43d517520 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -865,6 +865,7 @@ ParseConfigFile(const std::filesystem::path& Path,
LuaOptions.AddOption("cache.upstream.zen.url"sv, ServerOptions.UpstreamCacheConfig.ZenConfig.Urls);
LuaOptions.AddOption("gc.enabled"sv, ServerOptions.GcConfig.Enabled, "gc-enabled"sv);
+ LuaOptions.AddOption("gc.v2"sv, ServerOptions.GcConfig.UseGCV2, "gc-v2"sv);
LuaOptions.AddOption("gc.monitorintervalseconds"sv, ServerOptions.GcConfig.MonitorIntervalSeconds, "gc-monitor-interval-seconds"sv);
LuaOptions.AddOption("gc.intervalseconds"sv, ServerOptions.GcConfig.IntervalSeconds, "gc-interval-seconds"sv);
@@ -1228,6 +1229,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_option("gc",
"",
+ "gc-v2",
+ "Use V2 of GC implementation or not.",
+ cxxopts::value<bool>(ServerOptions.GcConfig.UseGCV2)->default_value("false"),
+ "");
+
+ options.add_option("gc",
+ "",
"gc-small-objects",
"Whether garbage collection of small objects is enabled or not.",
cxxopts::value<bool>(ServerOptions.GcConfig.CollectSmallObjects)->default_value("true"),
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index a1e091665..d55f0d5a1 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -71,6 +71,7 @@ struct ZenGcConfig
uint64_t DiskSizeSoftLimit = 0;
int32_t LightweightIntervalSeconds = 0;
uint64_t MinimumFreeDiskSpaceToAllowWrites = 1ul << 28;
+ bool UseGCV2 = false;
};
struct ZenOpenIdProviderConfig
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index df23db1bd..274876123 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -452,6 +452,8 @@ struct ProjectStore::OplogStorage : public RefCounted
m_OpBlobs.Flush();
}
+ uint32_t GetMaxLsn() const { return m_MaxLsn.load(); }
+
spdlog::logger& Log() { return m_OwnerOplog->Log(); }
private:
@@ -855,6 +857,17 @@ ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key)
return -1;
}
+int
+ProjectStore::Oplog::GetMaxOpIndex() const
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return -1;
+ }
+ return gsl::narrow<int>(m_Storage->GetMaxLsn());
+}
+
std::optional<CbObject>
ProjectStore::Oplog::GetOpByKey(const Oid& Key)
{
@@ -1661,6 +1674,17 @@ ProjectStore::Project::TouchOplog(std::string_view Oplog) const
m_LastAccessTimes.insert_or_assign(std::string(Oplog), GcClock::TickCount());
};
+GcClock::TimePoint
+ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const
+{
+ RwLock::SharedLockScope Lock(m_ProjectLock);
+ if (auto It = m_LastAccessTimes.find(std::string(Oplog)); It != m_LastAccessTimes.end())
+ {
+ return GcClock::TimePointFromTick(It->second);
+ }
+ return GcClock::TimePoint::min();
+}
+
//////////////////////////////////////////////////////////////////////////
ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue)
@@ -1675,11 +1699,13 @@ ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcMa
// m_Log.set_level(spdlog::level::debug);
m_Gc.AddGcContributor(this);
m_Gc.AddGcStorage(this);
+ m_Gc.AddGcReferencer(*this);
}
ProjectStore::~ProjectStore()
{
ZEN_INFO("closing project store at '{}'", m_ProjectBasePath);
+ m_Gc.RemoveGcReferencer(*this);
m_Gc.RemoveGcStorage(this);
m_Gc.RemoveGcContributor(this);
}
@@ -3010,6 +3036,238 @@ ProjectStore::AreDiskWritesAllowed() const
return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
}
+void
+ProjectStore::RemoveExpiredData(GcCtx& Ctx)
+{
+ size_t ProjectCount = 0;
+ size_t ExpiredProjectCount = 0;
+ size_t OplogCount = 0;
+ size_t ExpiredOplogCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gc project store '{}': removed {} expired projects out of {}, {} expired oplogs out of {} in {}",
+ m_ProjectBasePath,
+ ExpiredProjectCount,
+ ProjectCount,
+ ExpiredOplogCount,
+ OplogCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ std::vector<Ref<Project>> ExpiredProjects;
+ std::vector<Ref<Project>> Projects;
+
+ {
+ RwLock::SharedLockScope Lock(m_ProjectsLock);
+ for (auto& Kv : m_Projects)
+ {
+ if (Kv.second->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime))
+ {
+ ExpiredProjects.push_back(Kv.second);
+ continue;
+ }
+ Projects.push_back(Kv.second);
+ }
+ }
+
+ for (const Ref<Project>& Project : Projects)
+ {
+ std::vector<std::string> ExpiredOplogs;
+ {
+ RwLock::ExclusiveLockScope __(m_ProjectsLock);
+ Project->IterateOplogs(
+ [&Ctx, &Project, &ExpiredOplogs, &OplogCount](const RwLock::SharedLockScope& Lock, ProjectStore::Oplog& Oplog) {
+ OplogCount++;
+ if (Project->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime, Oplog))
+ {
+ ExpiredOplogs.push_back(Oplog.OplogId());
+ }
+ });
+ }
+ std::filesystem::path ProjectPath = BasePathForProject(Project->Identifier);
+ ExpiredOplogCount += ExpiredOplogs.size();
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ for (const std::string& OplogId : ExpiredOplogs)
+ {
+ std::filesystem::path OplogBasePath = ProjectPath / OplogId;
+ uint64_t OplogSize = Oplog::TotalSize(OplogBasePath);
+ ZEN_DEBUG("gc project store '{}': garbage collected oplog '{}' in project '{}'. Removing storage on disk",
+ m_ProjectBasePath,
+ OplogId,
+ Project->Identifier);
+ Project->DeleteOplog(OplogId);
+ Ctx.RemovedDiskSpace.fetch_add(OplogSize);
+ }
+ Ctx.DeletedItems.fetch_add(ExpiredOplogs.size());
+ Project->Flush();
+ }
+ }
+ ProjectCount = Projects.size();
+ Ctx.Items.fetch_add(ProjectCount + OplogCount);
+ ExpiredProjectCount = ExpiredProjects.size();
+
+ if (ExpiredProjects.empty())
+ {
+ ZEN_DEBUG("gc project store '{}': no expired projects found", m_ProjectBasePath);
+ return;
+ }
+
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ for (const Ref<Project>& Project : ExpiredProjects)
+ {
+ std::filesystem::path PathToRemove;
+ std::string ProjectId = Project->Identifier;
+ {
+ {
+ RwLock::SharedLockScope Lock(m_ProjectsLock);
+ if (!Project->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime))
+ {
+ ZEN_DEBUG("gc project store '{}': skipped garbage collect of project '{}'. Project no longer expired.",
+ m_ProjectBasePath,
+ ProjectId);
+ continue;
+ }
+ }
+ RwLock::ExclusiveLockScope __(m_ProjectsLock);
+ bool Success = Project->PrepareForDelete(PathToRemove);
+ if (!Success)
+ {
+ ZEN_DEBUG("gc project store '{}': skipped garbage collect of project '{}'. Project folder is locked.",
+ m_ProjectBasePath,
+ ProjectId);
+ continue;
+ }
+ m_Projects.erase(ProjectId);
+ }
+
+ ZEN_DEBUG("gc project store '{}': sgarbage collected project '{}'. Removing storage on disk", m_ProjectBasePath, ProjectId);
+ if (PathToRemove.empty())
+ {
+ continue;
+ }
+
+ DeleteDirectories(PathToRemove);
+ }
+ Ctx.DeletedItems.fetch_add(ExpiredProjects.size());
+ }
+
+ Ctx.ExpiredItems.fetch_add(ExpiredOplogCount + ExpiredProjectCount);
+}
+
+class ProjectStoreReferenceChecker : public GcReferenceChecker
+{
+public:
+ ProjectStoreReferenceChecker(ProjectStore::Oplog& Owner, bool PreCache) : m_Oplog(Owner)
+ {
+ if (PreCache)
+ {
+ RwLock::SharedLockScope _(m_Oplog.m_OplogLock);
+ m_Oplog.IterateOplog([&](CbObjectView Op) {
+ Op.IterateAttachments([&](CbFieldView Visitor) { m_UncachedReferences.insert(Visitor.AsAttachment()); });
+ });
+ m_PreCachedLsn = m_Oplog.GetMaxOpIndex();
+ }
+ }
+
+ virtual ~ProjectStoreReferenceChecker() {}
+
+ virtual void LockState(GcCtx&) override
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gc project oplog '{}': found {} references in {} from {}/{}",
+ m_Oplog.m_BasePath,
+ m_UncachedReferences.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ m_Oplog.m_OuterProject->Identifier,
+ m_Oplog.OplogId());
+ });
+
+ m_OplogLock = std::make_unique<RwLock::SharedLockScope>(m_Oplog.m_OplogLock);
+ if (m_Oplog.GetMaxOpIndex() != m_PreCachedLsn)
+ {
+ // TODO: Maybe we could just check the added oplog entries - we might get a few extra references from obsolete entries
+ // but I don't think that would be critical
+ m_UncachedReferences.clear();
+ m_Oplog.IterateOplog([&](CbObjectView Op) {
+ Op.IterateAttachments([&](CbFieldView Visitor) { m_UncachedReferences.insert(Visitor.AsAttachment()); });
+ });
+ }
+ }
+
+ virtual void RemoveUsedReferencesFromSet(GcCtx&, HashSet& IoCids) override
+ {
+ for (const IoHash& ReferenceHash : m_UncachedReferences)
+ {
+ IoCids.erase(ReferenceHash);
+ }
+ }
+ ProjectStore::Oplog& m_Oplog;
+ std::unique_ptr<RwLock::SharedLockScope> m_OplogLock;
+ HashSet m_UncachedReferences;
+ int m_PreCachedLsn = -1;
+};
+
+std::vector<GcReferenceChecker*>
+ProjectStore::CreateReferenceCheckers(GcCtx&)
+{
+ size_t ProjectCount = 0;
+ size_t OplogCount = 0;
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gc project store '{}': opened {} projects and {} oplogs in {}",
+ m_ProjectBasePath,
+ ProjectCount,
+ OplogCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ DiscoverProjects();
+
+ std::vector<Ref<ProjectStore::Project>> Projects;
+ {
+ RwLock::SharedLockScope Lock(m_ProjectsLock);
+ Projects.reserve(m_Projects.size());
+
+ for (auto& Kv : m_Projects)
+ {
+ Projects.push_back(Kv.second);
+ }
+ }
+ ProjectCount += Projects.size();
+ std::vector<GcReferenceChecker*> Checkers;
+ try
+ {
+ for (const Ref<ProjectStore::Project>& Project : Projects)
+ {
+ std::vector<std::string> OpLogs = Project->ScanForOplogs();
+ Checkers.reserve(OpLogs.size());
+ for (const std::string& OpLogId : OpLogs)
+ {
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId);
+ GcClock::TimePoint Now = GcClock::Now();
+ bool TryPreCache = Project->LastOplogAccessTime(OpLogId) < (Now - std::chrono::minutes(5));
+ Checkers.emplace_back(new ProjectStoreReferenceChecker(*Oplog, TryPreCache));
+ }
+ OplogCount += OpLogs.size();
+ }
+ }
+ catch (std::exception&)
+ {
+ while (!Checkers.empty())
+ {
+ delete Checkers.back();
+ Checkers.pop_back();
+ }
+ throw;
+ }
+
+ return Checkers;
+}
+
//////////////////////////////////////////////////////////////////////////
#if ZEN_WITH_TESTS
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 5aede88b0..94e697278 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -60,7 +60,7 @@ static_assert(IsPow2(sizeof(OplogEntry)));
package data split into separate chunks for bulk data, exports and header
information.
*/
-class ProjectStore : public RefCounted, public GcStorage, public GcContributor
+class ProjectStore : public RefCounted, public GcStorage, public GcContributor, public GcReferencer
{
struct OplogStorage;
@@ -98,6 +98,7 @@ public:
std::optional<CbObject> GetOpByKey(const Oid& Key);
std::optional<CbObject> GetOpByIndex(int Index);
int GetOpIndexByKey(const Oid& Key);
+ int GetMaxOpIndex() const;
IoBuffer FindChunk(Oid ChunkId);
@@ -208,6 +209,8 @@ public:
std::string_view ClientPath);
void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash);
void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash);
+
+ friend class ProjectStoreReferenceChecker;
};
struct Project : public RefCounted
@@ -225,10 +228,11 @@ public:
void IterateOplogs(std::function<void(const RwLock::SharedLockScope&, Oplog&)>&& Fn);
std::vector<std::string> ScanForOplogs() const;
bool IsExpired(const RwLock::SharedLockScope&, const GcClock::TimePoint ExpireTime);
- bool IsExpired(const RwLock::SharedLockScope&, const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog);
- bool IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog);
- void TouchProject() const;
- void TouchOplog(std::string_view Oplog) const;
+ bool IsExpired(const RwLock::SharedLockScope&, const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog);
+ bool IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog);
+ void TouchProject() const;
+ void TouchOplog(std::string_view Oplog) const;
+ GcClock::TimePoint LastOplogAccessTime(std::string_view Oplog) const;
Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath);
virtual ~Project();
@@ -289,6 +293,9 @@ public:
virtual void CollectGarbage(GcContext& GcCtx) override;
virtual GcStorageSize StorageSize() const override;
+ virtual void RemoveExpiredData(GcCtx& Ctx) override;
+ virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override;
+
CbArray GetProjectsList();
std::pair<HttpResponseCode, std::string> GetProjectFiles(const std::string_view ProjectId,
const std::string_view OplogId,
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index d7fc2d069..f40602769 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -288,7 +288,8 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
.DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize,
.DiskSizeSoftLimit = ServerOptions.GcConfig.DiskSizeSoftLimit,
.MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites,
- .LightweightInterval = std::chrono::seconds(ServerOptions.GcConfig.LightweightIntervalSeconds)};
+ .LightweightInterval = std::chrono::seconds(ServerOptions.GcConfig.LightweightIntervalSeconds),
+ .UseGCVersion = ServerOptions.GcConfig.UseGCV2 ? GcVersion::kV2 : GcVersion::kV1};
m_GcScheduler.Initialize(GcConfig);
// Create and register admin interface last to make sure all is properly initialized
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 02ee204ad..837185201 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -957,6 +957,196 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
}
}
+void
+BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
+ uint64_t PayloadAlignment,
+ const CompactCallback& ChangeCallback,
+ const ClaimDiskReserveCallback& DiskReserveCallback)
+{
+ uint64_t DeletedSize = 0;
+ uint64_t MovedCount = 0;
+ uint64_t MovedSize = 0;
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("compact blocks for '{}' DONE after {}, deleted {} and moved {} chunks ({}) ",
+ m_BlocksBasePath,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceBytes(DeletedSize),
+ MovedCount,
+ NiceBytes(MovedSize));
+ });
+
+ uint64_t WriteOffset = m_MaxBlockSize + 1u; // Force detect a new block
+ uint32_t NewBlockIndex = 0;
+ MovedChunksArray MovedChunks;
+
+ uint64_t RemovedSize = 0;
+
+ Ref<BlockStoreFile> NewBlockFile;
+ auto NewBlockFileGuard = MakeGuard([&]() {
+ if (NewBlockFile)
+ {
+ ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath());
+ {
+ RwLock::ExclusiveLockScope _l(m_InsertLock);
+ if (m_ChunkBlocks[NewBlockIndex] == NewBlockFile)
+ {
+ m_ChunkBlocks.erase(NewBlockIndex);
+ }
+ }
+ NewBlockFile->MarkAsDeleteOnClose();
+ }
+ });
+
+ std::vector<uint32_t> RemovedBlocks;
+
+ CompactState.IterateBlocks(
+ [&](uint32_t BlockIndex, const std::vector<size_t>& KeepChunkIndexes, const std::vector<BlockStoreLocation>& ChunkLocations) {
+ ZEN_ASSERT(BlockIndex != m_WriteBlockIndex.load());
+
+ Ref<BlockStoreFile> OldBlockFile;
+ {
+ RwLock::SharedLockScope _(m_InsertLock);
+ auto It = m_ChunkBlocks.find(BlockIndex);
+ if (It == m_ChunkBlocks.end())
+ {
+ // This block has unknown, we can't move anything. Report error?
+ return;
+ }
+ if (!It->second)
+ {
+ // This block has been removed, we can't move anything. Report error?
+ return;
+ }
+ OldBlockFile = It->second;
+ }
+ ZEN_ASSERT(OldBlockFile);
+
+ uint64_t OldBlockSize = OldBlockFile->FileSize();
+
+ // TODO: Add heuristics for determining if it is worth to compact a block (if only a very small part is removed)
+
+ std::vector<uint8_t> Chunk;
+ for (const size_t& ChunkIndex : KeepChunkIndexes)
+ {
+ const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
+ Chunk.resize(ChunkLocation.Size);
+ OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
+
+ if ((WriteOffset + Chunk.size()) > m_MaxBlockSize)
+ {
+ if (NewBlockFile)
+ {
+ NewBlockFile->Flush();
+ MovedSize += NewBlockFile->FileSize();
+ NewBlockFile = nullptr;
+
+ ZEN_ASSERT(!MovedChunks.empty() || RemovedSize > 0); // We should not have a new block if we haven't moved anything
+
+ ChangeCallback(MovedChunks, RemovedSize);
+ DeletedSize += RemovedSize;
+ RemovedSize = 0;
+ MovedCount += MovedChunks.size();
+ MovedChunks.clear();
+ }
+
+ uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
+ {
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+ std::filesystem::path NewBlockPath;
+ NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath);
+ if (NextBlockIndex == (uint32_t)m_MaxBlockCount)
+ {
+ ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
+ m_BlocksBasePath,
+ static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+ return;
+ }
+
+ NewBlockFile = new BlockStoreFile(NewBlockPath);
+ m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
+ }
+ ZEN_ASSERT(NewBlockFile);
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message());
+ return;
+ }
+
+ if (Space.Free < m_MaxBlockSize)
+ {
+ uint64_t ReclaimedSpace = DiskReserveCallback();
+ if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
+ {
+ ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
+ m_BlocksBasePath,
+ m_MaxBlockSize,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ {
+ RwLock::ExclusiveLockScope _l(m_InsertLock);
+ ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
+ m_ChunkBlocks.erase(NextBlockIndex);
+ }
+ NewBlockFile->MarkAsDeleteOnClose();
+ return;
+ }
+
+ ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
+ m_BlocksBasePath,
+ ReclaimedSpace,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ }
+ NewBlockFile->Create(m_MaxBlockSize);
+ NewBlockIndex = NextBlockIndex;
+ WriteOffset = 0;
+ }
+
+ NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
+ MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}});
+ WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment);
+ }
+ Chunk.clear();
+
+ // Report what we have moved so we can purge the old block
+ if (!MovedChunks.empty() || RemovedSize > 0)
+ {
+ ChangeCallback(MovedChunks, RemovedSize);
+ DeletedSize += RemovedSize;
+ RemovedSize = 0;
+ MovedCount += MovedChunks.size();
+ MovedChunks.clear();
+ }
+
+ {
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+ ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ OldBlockFile->MarkAsDeleteOnClose();
+ m_ChunkBlocks.erase(BlockIndex);
+ m_TotalSize.fetch_sub(OldBlockSize);
+ RemovedSize += OldBlockSize;
+ }
+ });
+ if (NewBlockFile)
+ {
+ NewBlockFile->Flush();
+ MovedSize += NewBlockFile->FileSize();
+ NewBlockFile = nullptr;
+ }
+
+ if (!MovedChunks.empty() || RemovedSize > 0)
+ {
+ ChangeCallback(MovedChunks, RemovedSize);
+ DeletedSize += RemovedSize;
+ RemovedSize = 0;
+ MovedCount += MovedChunks.size();
+ MovedChunks.clear();
+ }
+}
+
const char*
BlockStore::GetBlockFileExtension()
{
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 115bdcf03..f93dafa21 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -117,10 +117,12 @@ namespace {
CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get("containercas")), m_Gc(Gc)
{
m_Gc.AddGcStorage(this);
+ m_Gc.AddGcReferenceStore(*this);
}
CasContainerStrategy::~CasContainerStrategy()
{
+ m_Gc.RemoveGcReferenceStore(*this);
m_Gc.RemoveGcStorage(this);
}
@@ -551,6 +553,221 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
GcCtx.AddDeletedCids(DeletedChunks);
}
+class CasContainerStoreCompactor : public GcReferenceStoreCompactor
+{
+public:
+ CasContainerStoreCompactor(CasContainerStrategy& Owner,
+ BlockStoreCompactState&& CompactState,
+ std::vector<IoHash>&& CompactStateKeys,
+ std::vector<IoHash>&& PrunedKeys)
+ : m_CasContainerStrategy(Owner)
+ , m_CompactState(std::move(CompactState))
+ , m_CompactStateKeys(std::move(CompactStateKeys))
+ , m_PrunedKeys(std::move(PrunedKeys))
+ {
+ }
+
+ virtual void CompactReferenceStore(GcCtx& Ctx)
+ {
+ size_t CompactedCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gc block store '{}': compacted {} cids in {}",
+ m_CasContainerStrategy.m_RootDirectory / m_CasContainerStrategy.m_ContainerBaseName,
+ CompactedCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ if (Ctx.Settings.IsDeleteMode && Ctx.Settings.CollectSmallObjects)
+ {
+ // Compact block store
+ m_CasContainerStrategy.m_BlockStore.CompactBlocks(
+ m_CompactState,
+ m_CasContainerStrategy.m_PayloadAlignment,
+ [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) {
+ std::vector<CasDiskIndexEntry> MovedEntries;
+ RwLock::ExclusiveLockScope _(m_CasContainerStrategy.m_LocationMapLock);
+ for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
+ {
+ size_t ChunkIndex = Moved.first;
+ const IoHash& Key = m_CompactStateKeys[ChunkIndex];
+
+ if (auto It = m_CasContainerStrategy.m_LocationMap.find(Key); It != m_CasContainerStrategy.m_LocationMap.end())
+ {
+ BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second];
+ const BlockStoreLocation& OldLocation = m_CompactState.GetLocation(ChunkIndex);
+ if (Location.Get(m_CasContainerStrategy.m_PayloadAlignment) != OldLocation)
+ {
+ // Someone has moved our chunk so lets just skip the new location we were provided, it will be GC:d at a
+ // later time
+ continue;
+ }
+
+ const BlockStoreLocation& NewLocation = Moved.second;
+ Location = BlockStoreDiskLocation(NewLocation, m_CasContainerStrategy.m_PayloadAlignment);
+ MovedEntries.push_back(CasDiskIndexEntry{.Key = Key, .Location = Location});
+ }
+ }
+ m_CasContainerStrategy.m_CasLog.Append(MovedEntries);
+ Ctx.RemovedDiskSpace.fetch_add(FreedDiskSpace);
+ },
+ [&]() { return 0; });
+
+ CompactedCount = m_PrunedKeys.size();
+ Ctx.CompactedReferences.fetch_add(
+ CompactedCount); // Slightly missleading, it might not be compacted if the block is the currently writing block
+ }
+ }
+
+ CasContainerStrategy& m_CasContainerStrategy;
+ BlockStoreCompactState m_CompactState;
+ std::vector<IoHash> m_CompactStateKeys;
+ std::vector<IoHash> m_PrunedKeys;
+};
+
+class CasContainerReferencePruner : public GcReferencePruner
+{
+public:
+ CasContainerReferencePruner(CasContainerStrategy& Owner, std::vector<IoHash>&& Cids)
+ : m_CasContainerStrategy(Owner)
+ , m_Cids(std::move(Cids))
+ {
+ }
+
+ virtual GcReferenceStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, const GetUnusedReferencesFunc& GetUnusedReferences)
+ {
+ size_t TotalCount = m_Cids.size();
+ size_t PruneCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gc block store '{}': removed {} unused cid out of {} in {}",
+ m_CasContainerStrategy.m_RootDirectory / m_CasContainerStrategy.m_ContainerBaseName,
+ PruneCount,
+ TotalCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ std::vector<IoHash> UnusedCids = GetUnusedReferences(m_Cids);
+ m_Cids.clear();
+
+ if (UnusedCids.empty())
+ {
+ // Nothing to collect
+ return nullptr;
+ }
+
+ BlockStoreCompactState CompactState;
+ BlockStore::ReclaimSnapshotState BlockSnapshotState;
+ std::vector<IoHash> CompactStateKeys;
+ std::vector<CasDiskIndexEntry> ExpiredEntries;
+ ExpiredEntries.reserve(UnusedCids.size());
+ tsl::robin_set<IoHash, IoHash::Hasher> UnusedKeys;
+
+ {
+ RwLock::ExclusiveLockScope __(m_CasContainerStrategy.m_LocationMapLock);
+ if (Ctx.Settings.CollectSmallObjects)
+ {
+ BlockSnapshotState = m_CasContainerStrategy.m_BlockStore.GetReclaimSnapshotState();
+ }
+
+ for (const IoHash& Cid : UnusedCids)
+ {
+ auto It = m_CasContainerStrategy.m_LocationMap.find(Cid);
+ if (It == m_CasContainerStrategy.m_LocationMap.end())
+ {
+ continue;
+ }
+ CasDiskIndexEntry ExpiredEntry = {.Key = Cid,
+ .Location = m_CasContainerStrategy.m_Locations[It->second],
+ .Flags = CasDiskIndexEntry::kTombstone};
+ const BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second];
+ BlockStoreLocation BlockLocation = Location.Get(m_CasContainerStrategy.m_PayloadAlignment);
+ if (Ctx.Settings.CollectSmallObjects)
+ {
+ UnusedKeys.insert(Cid);
+ uint32_t BlockIndex = BlockLocation.BlockIndex;
+ bool IsActiveWriteBlock = BlockSnapshotState.m_ActiveWriteBlocks.contains(BlockIndex);
+ if (!IsActiveWriteBlock)
+ {
+ CompactState.AddBlock(BlockIndex);
+ }
+ ExpiredEntries.push_back(ExpiredEntry);
+ }
+ }
+
+ // Get all locations we need to keep for affected blocks
+ if (Ctx.Settings.CollectSmallObjects && !UnusedKeys.empty())
+ {
+ for (const auto& Entry : m_CasContainerStrategy.m_LocationMap)
+ {
+ const IoHash& Key = Entry.first;
+ if (UnusedKeys.contains(Key))
+ {
+ continue;
+ }
+ const BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[Entry.second];
+ BlockStoreLocation BlockLocation = Location.Get(m_CasContainerStrategy.m_PayloadAlignment);
+ if (CompactState.AddKeepLocation(BlockLocation))
+ {
+ CompactStateKeys.push_back(Key);
+ }
+ }
+ }
+
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ for (const CasDiskIndexEntry& Entry : ExpiredEntries)
+ {
+ m_CasContainerStrategy.m_LocationMap.erase(Entry.Key);
+ }
+ m_CasContainerStrategy.m_CasLog.Append(ExpiredEntries);
+ m_CasContainerStrategy.m_CasLog.Flush();
+ }
+ }
+
+ PruneCount = UnusedKeys.size();
+ Ctx.PrunedReferences.fetch_add(PruneCount);
+ return new CasContainerStoreCompactor(m_CasContainerStrategy,
+ std::move(CompactState),
+ std::move(CompactStateKeys),
+ std::vector<IoHash>(UnusedKeys.begin(), UnusedKeys.end()));
+ }
+
+private:
+ CasContainerStrategy& m_CasContainerStrategy;
+ std::vector<IoHash> m_Cids;
+};
+
+GcReferencePruner*
+CasContainerStrategy::CreateReferencePruner(GcCtx& Ctx)
+{
+ size_t TotalCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gc block store '{}': found {} cid keys to check in {}",
+ m_RootDirectory / m_ContainerBaseName,
+ TotalCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ std::vector<IoHash> CidsToCheck;
+ {
+ RwLock::SharedLockScope __(m_LocationMapLock);
+ CidsToCheck.reserve(m_LocationMap.size());
+ for (const auto& It : m_LocationMap)
+ {
+ CidsToCheck.push_back(It.first);
+ }
+ }
+ TotalCount = CidsToCheck.size();
+ if (TotalCount == 0)
+ {
+ return {};
+ }
+ Ctx.References.fetch_add(TotalCount);
+ return new CasContainerReferencePruner(*this, std::move(CidsToCheck));
+}
+
void
CasContainerStrategy::CompactIndex(RwLock::ExclusiveLockScope&)
{
diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h
index 478a1f78e..9ff4ae4fc 100644
--- a/src/zenstore/compactcas.h
+++ b/src/zenstore/compactcas.h
@@ -49,7 +49,7 @@ static_assert(sizeof(CasDiskIndexEntry) == 32);
*/
-struct CasContainerStrategy final : public GcStorage
+struct CasContainerStrategy final : public GcStorage, public GcReferenceStore
{
CasContainerStrategy(GcManager& Gc);
~CasContainerStrategy();
@@ -71,6 +71,8 @@ struct CasContainerStrategy final : public GcStorage
virtual void CollectGarbage(GcContext& GcCtx) override;
virtual GcStorageSize StorageSize() const override;
+ virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx) override;
+
private:
CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
void MakeIndexSnapshot();
@@ -97,6 +99,9 @@ private:
typedef tsl::robin_map<IoHash, size_t, IoHash::Hasher> LocationMap_t;
LocationMap_t m_LocationMap;
std::vector<BlockStoreDiskLocation> m_Locations;
+
+ friend class CasContainerReferencePruner;
+ friend class CasContainerStoreCompactor;
};
void compactcas_forcelink();
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 24d0a39bb..e28e0dea4 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -122,10 +122,12 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo
FileCasStrategy::FileCasStrategy(GcManager& Gc) : m_Log(logging::Get("filecas")), m_Gc(Gc)
{
m_Gc.AddGcStorage(this);
+ m_Gc.AddGcReferenceStore(*this);
}
FileCasStrategy::~FileCasStrategy()
{
+ m_Gc.RemoveGcReferenceStore(*this);
m_Gc.RemoveGcStorage(this);
}
@@ -1329,7 +1331,170 @@ FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir)
return Entries;
};
- //////////////////////////////////////////////////////////////////////////
+class FileCasStoreCompactor : public GcReferenceStoreCompactor
+{
+public:
+ FileCasStoreCompactor(FileCasStrategy& Owner, std::vector<IoHash>&& ReferencesToClean)
+ : m_FileCasStrategy(Owner)
+ , m_ReferencesToClean(std::move(ReferencesToClean))
+ {
+ }
+
+ virtual void CompactReferenceStore(GcCtx& Ctx)
+ {
+ size_t CompactedCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gc file store '{}': removed data for {} unused cids in {}",
+ m_FileCasStrategy.m_RootDirectory,
+ CompactedCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ std::vector<IoHash> ReferencedCleaned;
+ ReferencedCleaned.reserve(m_ReferencesToClean.size());
+
+ for (const IoHash& ChunkHash : m_ReferencesToClean)
+ {
+ FileCasStrategy::ShardingHelper Name(m_FileCasStrategy.m_RootDirectory.c_str(), ChunkHash);
+ {
+ RwLock::SharedLockScope __(m_FileCasStrategy.m_Lock);
+ if (auto It = m_FileCasStrategy.m_Index.find(ChunkHash); It != m_FileCasStrategy.m_Index.end())
+ {
+ // Not regarded as pruned, leave it be
+ continue;
+ }
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ ZEN_DEBUG("deleting CAS payload file '{}'", Name.ShardedPath.ToUtf8());
+ std::error_code Ec;
+ uint64_t SizeOnDisk = std::filesystem::file_size(Name.ShardedPath.c_str(), Ec);
+ if (Ec)
+ {
+ SizeOnDisk = 0;
+ }
+ bool Existed = std::filesystem::remove(Name.ShardedPath.c_str(), Ec);
+ if (Ec)
+ {
+ ZEN_WARN("failed deleting CAS payload file '{}'. Reason '{}'", Name.ShardedPath.ToUtf8(), Ec.message());
+ continue;
+ }
+ if (!Existed)
+ {
+ continue;
+ }
+ Ctx.RemovedDiskSpace.fetch_add(SizeOnDisk);
+ }
+ else
+ {
+ std::error_code Ec;
+ bool Existed = std::filesystem::is_regular_file(Name.ShardedPath.c_str(), Ec);
+ if (Ec)
+ {
+ ZEN_WARN("failed checking CAS payload file '{}'. Reason '{}'", Name.ShardedPath.ToUtf8(), Ec.message());
+ continue;
+ }
+ if (!Existed)
+ {
+ continue;
+ }
+ }
+ ReferencedCleaned.push_back(ChunkHash);
+ }
+ }
+ CompactedCount = ReferencedCleaned.size();
+ Ctx.CompactedReferences.fetch_add(ReferencedCleaned.size());
+ }
+
+private:
+ FileCasStrategy& m_FileCasStrategy;
+ std::vector<IoHash> m_ReferencesToClean;
+};
+
+class FileCasReferencePruner : public GcReferencePruner
+{
+public:
+ FileCasReferencePruner(FileCasStrategy& Owner, std::vector<IoHash>&& Cids) : m_FileCasStrategy(Owner), m_Cids(std::move(Cids)) {}
+
+ virtual GcReferenceStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, const GetUnusedReferencesFunc& GetUnusedReferences)
+ {
+ size_t TotalCount = m_Cids.size();
+ size_t PruneCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gc file store '{}': removed {} unused cid out of {} in {}",
+ m_FileCasStrategy.m_RootDirectory,
+ PruneCount,
+ TotalCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ std::vector<IoHash> UnusedReferences = GetUnusedReferences(m_Cids);
+ m_Cids.clear();
+
+ std::vector<IoHash> PrunedReferences;
+ PrunedReferences.reserve(UnusedReferences.size());
+ {
+ RwLock::ExclusiveLockScope __(m_FileCasStrategy.m_Lock);
+ for (const IoHash& ChunkHash : UnusedReferences)
+ {
+ auto It = m_FileCasStrategy.m_Index.find(ChunkHash);
+ if (It == m_FileCasStrategy.m_Index.end())
+ {
+ continue;
+ }
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ uint64_t FileSize = It->second.Size;
+ m_FileCasStrategy.m_Index.erase(It);
+ m_FileCasStrategy.m_CasLog.Append(
+ {.Key = ChunkHash, .Flags = FileCasStrategy::FileCasIndexEntry::kTombStone, .Size = FileSize});
+ m_FileCasStrategy.m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed);
+ }
+ PrunedReferences.push_back(ChunkHash);
+ }
+ }
+
+ PruneCount = PrunedReferences.size();
+ Ctx.PrunedReferences.fetch_add(PruneCount);
+ return new FileCasStoreCompactor(m_FileCasStrategy, std::move(PrunedReferences));
+ }
+
+private:
+ FileCasStrategy& m_FileCasStrategy;
+ std::vector<IoHash> m_Cids;
+};
+
+GcReferencePruner*
+FileCasStrategy::CreateReferencePruner(GcCtx& Ctx)
+{
+ // TODO
+ std::size_t TotalCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gc file store '{}': found {} cid keys to check in {}",
+ m_RootDirectory,
+ TotalCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ std::vector<IoHash> CidsToCheck;
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ CidsToCheck.reserve(m_Index.size());
+ for (const auto& It : m_Index)
+ {
+ CidsToCheck.push_back(It.first);
+ }
+ }
+ TotalCount = CidsToCheck.size();
+ if (TotalCount == 0)
+ {
+ return {};
+ }
+ Ctx.References.fetch_add(TotalCount);
+ return new FileCasReferencePruner(*this, std::move(CidsToCheck));
+}
+
+//////////////////////////////////////////////////////////////////////////
#if ZEN_WITH_TESTS
diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h
index ea7ff8e8c..2e9a1d5dc 100644
--- a/src/zenstore/filecas.h
+++ b/src/zenstore/filecas.h
@@ -27,7 +27,7 @@ class BasicFile;
/** CAS storage strategy using a file-per-chunk storage strategy
*/
-struct FileCasStrategy final : public GcStorage
+struct FileCasStrategy final : public GcStorage, public GcReferenceStore
{
FileCasStrategy(GcManager& Gc);
~FileCasStrategy();
@@ -44,6 +44,8 @@ struct FileCasStrategy final : public GcStorage
virtual void CollectGarbage(GcContext& GcCtx) override;
virtual GcStorageSize StorageSize() const override;
+ virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx) override;
+
private:
void MakeIndexSnapshot();
uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion);
@@ -97,6 +99,9 @@ private:
size_t Shard2len = 0;
ExtendablePathBuilder<128> ShardedPath;
};
+
+ friend class FileCasReferencePruner;
+ friend class FileCasStoreCompactor;
};
void filecas_forcelink();
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index 9743eabf0..e09f46063 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -327,6 +327,280 @@ GcManager::~GcManager()
{
}
+//////// Begin New GC WIP
+
+void
+GcManager::AddGcReferencer(GcReferencer& Referencer)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_GcReferencers.push_back(&Referencer);
+}
+void
+GcManager::RemoveGcReferencer(GcReferencer& Referencer)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ std::erase_if(m_GcReferencers, [&](GcReferencer* $) { return $ == &Referencer; });
+}
+
+void
+GcManager::AddGcReferenceStore(GcReferenceStore& ReferenceStore)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_GcReferenceStores.push_back(&ReferenceStore);
+}
+void
+GcManager::RemoveGcReferenceStore(GcReferenceStore& ReferenceStore)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ std::erase_if(m_GcReferenceStores, [&](GcReferenceStore* $) { return $ == &ReferenceStore; });
+}
+
+GcResult
+GcManager::CollectGarbage(const GcSettings& Settings)
+{
+ GcCtx Ctx{.Settings = Settings};
+
+ Stopwatch TotalTimer;
+ auto __ = MakeGuard([&]() {
+ ZEN_INFO(
+ "GC: Removed {} items out of {}, deleted {} out of {}. Pruned {} Cid entries out of {}, compacted {} Cid entries out of {}, "
+ "freed "
+ "{} on disk and {} of memory in {}",
+ Ctx.ExpiredItems.load(),
+ Ctx.Items.load(),
+ Ctx.DeletedItems.load(),
+ Ctx.ExpiredItems.load(),
+ Ctx.PrunedReferences.load(),
+ Ctx.References.load(),
+ Ctx.CompactedReferences.load(),
+ Ctx.PrunedReferences.load(),
+ NiceBytes(Ctx.RemovedDiskSpace.load()),
+ NiceBytes(Ctx.RemovedMemory.load()),
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()));
+ });
+
+ RwLock::SharedLockScope GcLock(m_Lock);
+
+ static const bool SingleThread =
+#if ZEN_BUILD_DEBUG
+ true
+#else
+ false
+#endif
+ ;
+ WorkerThreadPool ThreadPool(SingleThread ? 0 : 8);
+
+ if (!m_GcReferencers.empty())
+ {
+ Latch WorkLeft(1);
+ // First remove any cache keys that may own references
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() { ZEN_INFO("GC: Removed expired data in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())) });
+ for (GcReferencer* Owner : m_GcReferencers)
+ {
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork([&Ctx, Owner, &WorkLeft]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ Owner->RemoveExpiredData(Ctx);
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ }
+
+ if (Ctx.Settings.SkipCidDelete)
+ {
+ return GcResult{.Items = Ctx.Items.load(),
+ .ExpiredItems = Ctx.ExpiredItems.load(),
+ .DeletedItems = Ctx.DeletedItems.load(),
+ .References = Ctx.References.load(),
+ .PrunedReferences = Ctx.PrunedReferences.load(),
+ .CompactedReferences = Ctx.CompactedReferences.load(),
+ .RemovedDiskSpace = Ctx.RemovedDiskSpace.load(),
+ .RemovedMemory = Ctx.RemovedMemory.load()};
+ }
+
+ std::vector<std::unique_ptr<GcReferencePruner>> ReferencePruners;
+ if (!m_GcReferenceStores.empty())
+ {
+ ReferencePruners.reserve(m_GcReferenceStores.size());
+ Latch WorkLeft(1);
+ RwLock ReferencePrunersLock;
+ // Easy to go wide, CreateReferencePruner is usually not very heavy but big data sets change that
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() { ZEN_INFO("GC: Created Cid pruners in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())) });
+ for (GcReferenceStore* CidStore : m_GcReferenceStores)
+ {
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork([&Ctx, CidStore, &WorkLeft, &ReferencePrunersLock, &ReferencePruners]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ // The CidStore will pick a list of CId entries to check, returning a collector
+ std::unique_ptr<GcReferencePruner> ReferencePruner(CidStore->CreateReferencePruner(Ctx));
+ if (ReferencePruner)
+ {
+ RwLock::ExclusiveLockScope __(ReferencePrunersLock);
+ ReferencePruners.emplace_back(std::move(ReferencePruner));
+ }
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ }
+
+ std::vector<std::unique_ptr<GcReferenceChecker>> ReferenceCheckers;
+ if (!m_GcReferencers.empty())
+ {
+ ReferenceCheckers.reserve(m_GcReferencers.size());
+ Latch WorkLeft(1);
+ RwLock ReferenceCheckersLock;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() { ZEN_INFO("GC: Created Cid checkers in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())) });
+ // Easy to go wide, CreateReferenceCheckers is potentially heavy
+ // Lock all reference owners from changing the reference data and get access to check for referenced data
+ for (GcReferencer* Referencer : m_GcReferencers)
+ {
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork([&Ctx, &WorkLeft, Referencer, &ReferenceCheckersLock, &ReferenceCheckers]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ // The Referencer will create a reference checker that guarrantees that the references do not change as long as it lives
+ std::vector<GcReferenceChecker*> Checkers = Referencer->CreateReferenceCheckers(Ctx);
+ try
+ {
+ if (!Checkers.empty())
+ {
+ RwLock::ExclusiveLockScope __(ReferenceCheckersLock);
+ for (auto& Checker : Checkers)
+ {
+ ReferenceCheckers.emplace_back(std::unique_ptr<GcReferenceChecker>(Checker));
+ Checker = nullptr;
+ }
+ }
+ }
+ catch (std::exception&)
+ {
+ while (!Checkers.empty())
+ {
+ delete Checkers.back();
+ Checkers.pop_back();
+ }
+ throw;
+ }
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ }
+
+ Stopwatch LockStateTimer;
+ if (!ReferenceCheckers.empty())
+ {
+ // Easy to go wide, locking all references checkers so we hafve a stead state of which references are used
+ // From this point we have block all writes to all References (DiskBucket/ProjectStore) until we do delete the ReferenceCheckers
+ Latch WorkLeft(1);
+
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() { ZEN_INFO("GC: Locked Cid checkers in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())) });
+ for (std::unique_ptr<GcReferenceChecker>& ReferenceChecker : ReferenceCheckers)
+ {
+ GcReferenceChecker* Checker = ReferenceChecker.get();
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork([&Ctx, Checker, &WorkLeft, &ReferenceCheckers]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ Checker->LockState(Ctx);
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ }
+
+ std::vector<std::unique_ptr<GcReferenceStoreCompactor>> ReferenceStoreCompactors;
+ ReferenceStoreCompactors.reserve(ReferencePruners.size());
+ if (!ReferencePruners.empty())
+ {
+ const auto GetUnusedReferences = [&ReferenceCheckers, &Ctx](std::span<IoHash> References) -> std::vector<IoHash> {
+ HashSet UnusedCids(References.begin(), References.end());
+ for (const std::unique_ptr<GcReferenceChecker>& ReferenceChecker : ReferenceCheckers)
+ {
+ ReferenceChecker->RemoveUsedReferencesFromSet(Ctx, UnusedCids);
+ if (UnusedCids.empty())
+ {
+ return {};
+ }
+ }
+ return std::vector<IoHash>(UnusedCids.begin(), UnusedCids.end());
+ };
+
+ // Easy to go wide, checking all Cids agains references in cache
+ // Ask stores to remove data that the ReferenceCheckers says are not references - this should be a lightweight operation that
+ // only updates in-memory index, actual disk changes should be done by the ReferenceStoreCompactors
+
+ Latch WorkLeft(1);
+ RwLock ReferenceStoreCompactorsLock;
+
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() { ZEN_INFO("GC: Pruned unreferenced Cid data in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())) });
+ for (std::unique_ptr<GcReferencePruner>& ReferencePruner : ReferencePruners)
+ {
+ GcReferencePruner* Pruner = ReferencePruner.get();
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork(
+ [&Ctx, Pruner, &WorkLeft, &GetUnusedReferences, &ReferenceStoreCompactorsLock, &ReferenceStoreCompactors]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ // Go through all the ReferenceCheckers to see if the list of Cids the collector selected are referenced or not.
+ std::unique_ptr<GcReferenceStoreCompactor> ReferenceCompactor(Pruner->RemoveUnreferencedData(Ctx, GetUnusedReferences));
+ if (ReferenceCompactor)
+ {
+ RwLock::ExclusiveLockScope __(ReferenceStoreCompactorsLock);
+ ReferenceStoreCompactors.emplace_back(std::move(ReferenceCompactor));
+ }
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ }
+ // Let the GcReferencers add new data, we will only change on-disk data at this point, adding new data is allowed
+ ReferenceCheckers.clear();
+ ZEN_INFO("GC: Writes blocked for {}", NiceTimeSpanMs(LockStateTimer.GetElapsedTimeMs()))
+
+ // Let go of the pruners
+ ReferencePruners.clear();
+
+ if (!ReferenceStoreCompactors.empty())
+ {
+ Latch WorkLeft(1);
+
+ // Easy to go wide
+ // Remove the stuff we deemed unreferenced from disk - may be heavy operation
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() { ZEN_INFO("GC: Compacted Cid stores in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())) });
+ for (std::unique_ptr<GcReferenceStoreCompactor>& StoreCompactor : ReferenceStoreCompactors)
+ {
+ GcReferenceStoreCompactor* Compactor = StoreCompactor.get();
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork([&Ctx, Compactor, &WorkLeft]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ // Go through all the ReferenceCheckers to see if the list of Cids the collector selected are referenced or not.
+ Compactor->CompactReferenceStore(Ctx);
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ }
+
+ ReferenceStoreCompactors.clear();
+
+ return GcResult{.Items = Ctx.Items.load(),
+ .ExpiredItems = Ctx.ExpiredItems.load(),
+ .DeletedItems = Ctx.DeletedItems.load(),
+ .References = Ctx.References.load(),
+ .PrunedReferences = Ctx.PrunedReferences.load(),
+ .CompactedReferences = Ctx.CompactedReferences.load(),
+ .RemovedDiskSpace = Ctx.RemovedDiskSpace.load(),
+ .RemovedMemory = Ctx.RemovedMemory.load()};
+}
+
+//////// End New GC WIP
+
void
GcManager::AddGcContributor(GcContributor* Contributor)
{
@@ -645,23 +919,19 @@ GcScheduler::Shutdown()
bool
GcScheduler::TriggerGc(const GcScheduler::TriggerGcParams& Params)
{
- if (m_Config.Enabled)
+ std::unique_lock Lock(m_GcMutex);
+ if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
{
- std::unique_lock Lock(m_GcMutex);
- if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
- {
- m_TriggerGcParams = Params;
- uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
+ m_TriggerGcParams = Params;
+ uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
- if (m_Status.compare_exchange_strong(/* expected */ IdleState,
- /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
- {
- m_GcSignal.notify_one();
- return true;
- }
+ if (m_Status.compare_exchange_strong(/* expected */ IdleState,
+ /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
+ {
+ m_GcSignal.notify_one();
+ return true;
}
}
-
return false;
}
@@ -806,7 +1076,7 @@ GcScheduler::SchedulerThread()
break;
}
- if (!m_Config.Enabled && !m_TriggerScrubParams)
+ if (!m_Config.Enabled && !m_TriggerScrubParams && !m_TriggerGcParams)
{
WaitTime = std::chrono::seconds::max();
continue;
@@ -830,6 +1100,7 @@ GcScheduler::SchedulerThread()
std::chrono::seconds MaxProjectStoreDuration = m_Config.MaxProjectStoreDuration;
uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit;
bool SkipCid = false;
+ GcVersion UseGCVersion = m_Config.UseGCVersion;
bool DiskSpaceGCTriggered = false;
bool TimeBasedGCTriggered = false;
@@ -863,6 +1134,8 @@ GcScheduler::SchedulerThread()
{
DoDelete = false;
}
+ UseGCVersion = TriggerParams.ForceGCVersion.value_or(UseGCVersion);
+ DoGc = true;
}
if (m_TriggerScrubParams)
@@ -1067,7 +1340,7 @@ GcScheduler::SchedulerThread()
}
}
- CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, DoDelete, CollectSmallObjects, SkipCid);
+ CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, DoDelete, CollectSmallObjects, SkipCid, UseGCVersion);
uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle)))
@@ -1148,7 +1421,8 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
const GcClock::TimePoint& ProjectStoreExpireTime,
bool Delete,
bool CollectSmallObjects,
- bool SkipCid)
+ bool SkipCid,
+ GcVersion UseGCVersion)
{
ZEN_TRACE_CPU("GcScheduler::CollectGarbage");
@@ -1195,10 +1469,26 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
Stopwatch Timer;
const auto __ = MakeGuard([&] { ZEN_INFO("garbage collection DONE in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
- GcStorageSize Diff = m_GcManager.CollectGarbage(GcCtx);
+ GcStorageSize Diff;
+ switch (UseGCVersion)
+ {
+ case GcVersion::kV1:
+ Diff = m_GcManager.CollectGarbage(GcCtx);
+ break;
+ case GcVersion::kV2:
+ {
+ GcResult Result = m_GcManager.CollectGarbage({.CacheExpireTime = CacheExpireTime,
+ .ProjectStoreExpireTime = ProjectStoreExpireTime,
+ .CollectSmallObjects = CollectSmallObjects,
+ .IsDeleteMode = Delete,
+ .SkipCidDelete = SkipCid});
+ Diff.DiskSize = Result.RemovedDiskSpace;
+ Diff.MemorySize = Result.RemovedMemory;
+ }
+ break;
+ }
std::chrono::milliseconds ElapsedMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());
-
if (SkipCid)
{
m_LastLightweightGcTime = GcClock::Now();
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index 56906f570..cd475cd8b 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -108,6 +108,8 @@ private:
BasicFile m_File;
};
+class BlockStoreCompactState;
+
class BlockStore
{
public:
@@ -124,6 +126,7 @@ public:
typedef std::vector<size_t> ChunkIndexArray;
typedef std::function<void(const MovedChunksArray& MovedChunks, const ChunkIndexArray& RemovedChunks)> ReclaimCallback;
+ typedef std::function<void(const MovedChunksArray& MovedChunks, uint64_t FreedDiskSpace)> CompactCallback;
typedef std::function<uint64_t()> ClaimDiskReserveCallback;
typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback;
typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback;
@@ -156,6 +159,12 @@ public:
const IterateChunksSmallSizeCallback& SmallSizeCallback,
const IterateChunksLargeSizeCallback& LargeSizeCallback);
+ void CompactBlocks(
+ const BlockStoreCompactState& CompactState,
+ uint64_t PayloadAlignment,
+ const CompactCallback& ChangeCallback = [](const MovedChunksArray&, uint64_t) {},
+ const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; });
+
static const char* GetBlockFileExtension();
static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex);
@@ -179,6 +188,55 @@ private:
std::atomic_uint64_t m_TotalSize{};
};
+class BlockStoreCompactState
+{
+public:
+ BlockStoreCompactState() = default;
+
+ void AddBlock(uint32_t BlockIndex)
+ {
+ auto It = m_BlockIndexToChunkMapIndex.find(BlockIndex);
+ if (It == m_BlockIndexToChunkMapIndex.end())
+ {
+ m_KeepChunks.emplace_back(std::vector<size_t>());
+ m_BlockIndexToChunkMapIndex.insert_or_assign(BlockIndex, m_KeepChunks.size() - 1);
+ }
+ }
+
+ bool AddKeepLocation(const BlockStoreLocation& Location)
+ {
+ auto It = m_BlockIndexToChunkMapIndex.find(Location.BlockIndex);
+ if (It == m_BlockIndexToChunkMapIndex.end())
+ {
+ return false;
+ }
+
+ std::vector<size_t>& KeepChunks = m_KeepChunks[It->second];
+ size_t Index = m_ChunkLocations.size();
+ KeepChunks.push_back(Index);
+ m_ChunkLocations.push_back(Location);
+ return true;
+ };
+
+ const BlockStoreLocation& GetLocation(size_t Index) const { return m_ChunkLocations[Index]; }
+
+ void IterateBlocks(std::function<void(uint32_t BlockIndex,
+ const std::vector<size_t>& KeepChunkIndexes,
+ const std::vector<BlockStoreLocation>& ChunkLocations)> Callback) const
+ {
+ for (auto It : m_BlockIndexToChunkMapIndex)
+ {
+ size_t ChunkMapIndex = It.second;
+ Callback(It.first, m_KeepChunks[ChunkMapIndex], m_ChunkLocations);
+ }
+ }
+
+private:
+ std::unordered_map<uint32_t, size_t> m_BlockIndexToChunkMapIndex; // Maps to which vector in BlockKeepChunks to use for a block
+ std::vector<std::vector<size_t>> m_KeepChunks; // One vector per block index with index into ChunkLocations
+ std::vector<BlockStoreLocation> m_ChunkLocations;
+};
+
void blockstore_forcelink();
} // namespace zen
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index 42605804e..fa7dce331 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -20,6 +20,10 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include <span>
#include <thread>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
namespace spdlog {
class logger;
}
@@ -48,6 +52,151 @@ public:
static TimePoint TimePointFromTick(const Tick TickCount) { return TimePoint{Duration{TickCount}}; }
};
+//////// Begin New GC WIP
+
+struct GcSettings
+{
+ GcClock::TimePoint CacheExpireTime = GcClock::Now();
+ GcClock::TimePoint ProjectStoreExpireTime = GcClock::Now();
+ bool CollectSmallObjects = false;
+ bool IsDeleteMode = false;
+ bool SkipCidDelete = false;
+};
+
+struct GcResult
+{
+ uint64_t Items = 0;
+ uint64_t ExpiredItems = 0;
+ uint64_t DeletedItems = 0;
+ uint64_t References = 0;
+ uint64_t PrunedReferences = 0;
+ uint64_t CompactedReferences = 0;
+ uint64_t RemovedDiskSpace = 0;
+ uint64_t RemovedMemory = 0;
+};
+
+struct GcCtx
+{
+ const GcSettings Settings;
+ std::atomic_uint64_t Items = 0;
+ std::atomic_uint64_t ExpiredItems = 0;
+ std::atomic_uint64_t DeletedItems = 0;
+ std::atomic_uint64_t References = 0;
+ std::atomic_uint64_t PrunedReferences = 0;
+ std::atomic_uint64_t CompactedReferences = 0;
+ std::atomic_uint64_t RemovedDiskSpace = 0;
+ std::atomic_uint64_t RemovedMemory = 0;
+};
+
+typedef tsl::robin_set<IoHash> HashSet;
+
+/**
+ * @brief An interface to remove the stored data on disk after a GcReferencePruner::RemoveUnreferencedData
+ *
+ * CompactReferenceStore is called after pruning (GcReferencePruner::RemoveUnreferencedData) and state locking is
+ * complete so implementor must take care to only remove data that has not been altered since the prune operation.
+ *
+ * Instance will be deleted after CompactReferenceStore has completed execution.
+ *
+ * The subclass constructor should be provided with information on what is intended to be removed.
+ */
+class GcReferenceStoreCompactor
+{
+public:
+ virtual ~GcReferenceStoreCompactor() = default;
+
+ // Remove data on disk based on results from GcReferencePruner::RemoveUnreferencedData
+ virtual void CompactReferenceStore(GcCtx& Ctx) = 0;
+};
+
+/**
+ * @brief An interface to check if a set of Cids are referenced
+ *
+ * Instance will be deleted after RemoveUsedReferencesFromSet has been called 0-n times.
+ *
+ * During construction of the GcReferenceChecker the world is not stopped and this is a good
+ * place to do caching to be able to execute LockState and RemoveUsedReferencesFromSet quickly.
+ */
+class GcReferenceChecker
+{
+public:
+ // Destructor should unlock what was locked in LockState
+ virtual ~GcReferenceChecker() = default;
+
+ // Lock the state and make sure no references changes, usually a read-lock is taken until the destruction
+ // of the instance. Called once before any calls to RemoveUsedReferencesFromSet
+ // The implementation should be as fast as possible as LockState is part of a stop the world (from changes)
+ // until all instances of GcReferenceChecker are deleted
+ virtual void LockState(GcCtx& Ctx) = 0;
+
+ // Go through IoCids and see which ones are referenced. If it is the reference must be removed from IoCids
+ // This function should use pre-cached information on what is referenced as we are in stop the world mode
+ virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) = 0;
+};
+
+/**
+ * @brief Interface to handle GC of data that references Cid data
+ *
+ * TODO: Maybe we should split up being a referencer and something that holds cache values?
+ *
+ * GcCacheStore and GcReferencer?
+ *
+ * This interface is registered/unregistered to GcManager vua AddGcReferencer() and RemoveGcReferencer()
+ */
+class GcReferencer
+{
+protected:
+ virtual ~GcReferencer() = default;
+
+public:
+ // Remove expired data based on either GcCtx::Settings CacheExpireTime/ProjectExpireTime
+ // TODO: For disk layer we need to first update it with access times from the memory layer
+ // The implementer of GcReferencer (in our case a disk bucket) does not know about any
+ // potential memory cache layer :(
+ virtual void RemoveExpiredData(GcCtx& Ctx) = 0;
+
+ // Create 0-n GcReferenceChecker for this GcReferencer. Caller will manage lifetime of
+ // returned instances
+ virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) = 0;
+};
+
+/**
+ * @brief Interface to prune - remove pointers to data but not the bulk data on disk - references from a GcReferenceStore
+ */
+class GcReferencePruner
+{
+public:
+ virtual ~GcReferencePruner() = default;
+
+ typedef std::function<std::vector<IoHash>(std::span<IoHash> References)> GetUnusedReferencesFunc;
+
+ // Check a set of references to see if they are in use.
+ // Use the GetUnusedReferences input function to check if references are used and update any pointers
+ // so any query for references determined to be unreferences will not be found.
+ // If any references a found to be unused, return a GcReferenceStoreCompactor instance which will
+ // clean up any stored bulk data mapping to the pruned references.
+ // Caller will manage lifetime of returned instance
+ // This function should execute as fast as possible, so try to prepare a list of references to check ahead of
+ // call to this function and make sure the removal of unreferences items is as lightweight as possible.
+ virtual GcReferenceStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, const GetUnusedReferencesFunc& GetUnusedReferences) = 0;
+};
+
+/**
+ * @brief A interface to prune referenced (Cid) data from a store
+ */
+class GcReferenceStore
+{
+protected:
+ virtual ~GcReferenceStore() = default;
+
+public:
+ // Create a GcReferencePruner which can check a set of references (decided by implementor) if they are no longer in use
+ // Caller will manage lifetime of returned instance
+ virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx) = 0;
+};
+
+//////// End New GC WIP
+
/** Garbage Collection context object
*/
class GcContext
@@ -141,6 +290,18 @@ public:
GcManager();
~GcManager();
+ //////// Begin New GC WIP
+
+ void AddGcReferencer(GcReferencer& Referencer);
+ void RemoveGcReferencer(GcReferencer& Referencer);
+
+ void AddGcReferenceStore(GcReferenceStore& ReferenceStore);
+ void RemoveGcReferenceStore(GcReferenceStore& ReferenceStore);
+
+ GcResult CollectGarbage(const GcSettings& Settings);
+
+ //////// End New GC WIP
+
void AddGcContributor(GcContributor* Contributor);
void RemoveGcContributor(GcContributor* Contributor);
@@ -163,6 +324,9 @@ private:
std::vector<GcStorage*> m_GcStorage;
CidStore* m_CidStore = nullptr;
const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
+
+ std::vector<GcReferencer*> m_GcReferencers;
+ std::vector<GcReferenceStore*> m_GcReferenceStores;
};
enum class GcSchedulerStatus : uint32_t
@@ -172,6 +336,12 @@ enum class GcSchedulerStatus : uint32_t
kStopped
};
+enum class GcVersion : uint32_t
+{
+ kV1,
+ kV2
+};
+
struct GcSchedulerConfig
{
std::filesystem::path RootDirectory;
@@ -185,6 +355,7 @@ struct GcSchedulerConfig
uint64_t DiskSizeSoftLimit = 0;
uint64_t MinimumFreeDiskSpaceToAllowWrites = 1ul << 28;
std::chrono::seconds LightweightInterval{};
+ GcVersion UseGCVersion = GcVersion::kV1;
};
struct GcSchedulerState
@@ -246,12 +417,13 @@ public:
struct TriggerGcParams
{
- bool CollectSmallObjects = false;
- std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max();
- std::chrono::seconds MaxProjectStoreDuration = std::chrono::seconds::max();
- uint64_t DiskSizeSoftLimit = 0;
- bool SkipCid = false;
- bool SkipDelete = false;
+ bool CollectSmallObjects = false;
+ std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max();
+ std::chrono::seconds MaxProjectStoreDuration = std::chrono::seconds::max();
+ uint64_t DiskSizeSoftLimit = 0;
+ bool SkipCid = false;
+ bool SkipDelete = false;
+ std::optional<GcVersion> ForceGCVersion;
};
bool TriggerGc(const TriggerGcParams& Params);
@@ -270,7 +442,8 @@ private:
const GcClock::TimePoint& ProjectStoreExpireTime,
bool Delete,
bool CollectSmallObjects,
- bool SkipCid);
+ bool SkipCid,
+ GcVersion UseGCVersion);
void ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice);
spdlog::logger& Log() { return m_Log; }
virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); }