aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-10-30 09:32:54 +0100
committerGitHub <[email protected]>2023-10-30 09:32:54 +0100
commit3a6a5855cf36967c6bde31292669bfaf832c6f0b (patch)
tree593e7c21e6840e7ad312207fddc63e1934e19d85 /src/zenserver/cache/cachedisklayer.cpp
parentset up arch properly when running tests (mac) (#505) (diff)
downloadzen-3a6a5855cf36967c6bde31292669bfaf832c6f0b.tar.xz
zen-3a6a5855cf36967c6bde31292669bfaf832c6f0b.zip
New GC implementation (#459)
- Feature: New garbage collection implementation, still in evaluation mode. Enabled by `--gc-v2` command line option
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp633
1 files changed, 547 insertions, 86 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 2efec1e66..38cbf3a93 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -168,8 +168,13 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
const size_t ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex;
const size_t ZenCacheDiskLayer::CacheBucket::NoReferencesIndex;
-ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, const BucketConfiguration& Config)
-: m_BucketName(std::move(BucketName))
+ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
+ std::atomic_uint64_t& OuterCacheMemoryUsage,
+ std::string BucketName,
+ const BucketConfiguration& Config)
+: m_Gc(Gc)
+, m_OuterCacheMemoryUsage(OuterCacheMemoryUsage)
+, m_BucketName(std::move(BucketName))
, m_Configuration(Config)
, m_BucketId(Oid::Zero)
{
@@ -179,10 +184,12 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, const Bucket
// it makes sense to have a different strategy for legacy values
m_Configuration.LargeObjectThreshold = 16 * 1024 * 1024;
}
+ m_Gc.AddGcReferencer(*this);
}
ZenCacheDiskLayer::CacheBucket::~CacheBucket()
{
+ m_Gc.RemoveGcReferencer(*this);
}
bool
@@ -717,7 +724,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy
}
bool
-ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
@@ -782,8 +789,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
if (!m_CachedPayloads[UpdateIt->second])
{
m_CachedPayloads[UpdateIt->second] = OutValue.Value;
- m_MemCachedSize.fetch_add(ValueSize);
- CacheMemoryUsage.fetch_add(ValueSize);
+ AddMemCacheUsage(ValueSize);
m_MemoryWriteCount++;
}
}
@@ -834,27 +840,24 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
void
-ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
{
- PutStandaloneCacheValue(HashKey, Value, References, CacheMemoryUsage);
+ PutStandaloneCacheValue(HashKey, Value, References);
}
else
{
- PutInlineCacheValue(HashKey, Value, References, CacheMemoryUsage);
+ PutInlineCacheValue(HashKey, Value, References);
}
m_DiskWriteCount++;
}
void
-ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime)
{
GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
@@ -864,8 +867,7 @@ ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime, std:
if (m_AccessTimes[Kv.second] < ExpireTicks)
{
size_t PayloadSize = m_CachedPayloads[Kv.second].GetSize();
- m_MemCachedSize.fetch_sub(PayloadSize);
- CacheMemoryUsage.fetch_sub(PayloadSize);
+ RemoveMemCacheUsage(PayloadSize);
m_CachedPayloads[Kv.second] = {};
}
}
@@ -900,7 +902,7 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart,
}
bool
-ZenCacheDiskLayer::CacheBucket::Drop(std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::Drop()
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop");
@@ -926,7 +928,7 @@ ZenCacheDiskLayer::CacheBucket::Drop(std::atomic_uint64_t& CacheMemoryUsage)
m_NextReferenceHashesIndexes.clear();
m_ReferenceCount = 0;
m_StandaloneSize.store(0);
- CacheMemoryUsage.fetch_sub(m_MemCachedSize.load());
+ m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load());
m_MemCachedSize.store(0);
return Deleted;
@@ -1102,7 +1104,7 @@ ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer)
};
void
-ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub");
@@ -1292,8 +1294,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx, std::atomic_uint
if (m_Configuration.MemCacheSizeThreshold > 0)
{
size_t CachedSize = m_CachedPayloads[It->second].GetSize();
- m_MemCachedSize.fetch_sub(CachedSize);
- CacheMemoryUsage.fetch_sub(CachedSize);
+ RemoveMemCacheUsage(CachedSize);
m_CachedPayloads[It->second] = IoBuffer{};
}
@@ -1411,8 +1412,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
for (const auto& Entry : Index)
{
- const IoHash& Key = Entry.first;
- GcClock::Tick AccessTime = AccessTimes[Entry.second];
+ const IoHash& Key = Entry.first;
+ size_t PayloadIndex = Entry.second;
+ GcClock::Tick AccessTime = AccessTimes[PayloadIndex];
if (AccessTime < ExpireTicks)
{
ExpiredKeys.push_back(Key);
@@ -1424,7 +1426,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
continue;
}
- BucketPayload& Payload = Payloads[Entry.second];
+ BucketPayload& Payload = Payloads[PayloadIndex];
const DiskLocation& Loc = Payload.Location;
if (!Loc.IsFlagSet(DiskLocation::kStructured))
@@ -1433,7 +1435,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
}
if (m_Configuration.EnableReferenceCaching)
{
- if (FirstReferenceIndex.empty() || (FirstReferenceIndex[Entry.second] == UnknownReferencesIndex))
+ if (FirstReferenceIndex.empty() || (FirstReferenceIndex[PayloadIndex] == UnknownReferencesIndex))
{
StructuredItemsWithUnknownAttachments.push_back(Entry);
continue;
@@ -1450,7 +1452,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
});
#endif // CALCULATE_BLOCKING_TIME
- if (auto It = m_Index.find(Entry.first); It != m_Index.end())
+ if (auto It = m_Index.find(Key); It != m_Index.end())
{
ReferencesAreKnown = GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids);
}
@@ -1470,13 +1472,15 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
for (const auto& Entry : StructuredItemsWithUnknownAttachments)
{
- BucketPayload& Payload = Payloads[Entry.second];
- const DiskLocation& Loc = Payload.Location;
+ const IoHash& Key = Entry.first;
+ size_t PayloadIndex = Entry.second;
+ BucketPayload& Payload = Payloads[PayloadIndex];
+ const DiskLocation& Loc = Payload.Location;
{
IoBuffer Buffer;
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Entry.first); !Buffer)
+ if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Key); !Buffer)
{
continue;
}
@@ -1492,7 +1496,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
});
#endif // CALCULATE_BLOCKING_TIME
- if (auto It = m_Index.find(Entry.first); It != m_Index.end())
+ if (auto It = m_Index.find(Key); It != m_Index.end())
{
if (m_Configuration.MemCacheSizeThreshold > 0)
{
@@ -1514,8 +1518,8 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
ZEN_ASSERT(Buffer);
ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
- CbObject Obj(SharedBuffer{Buffer});
- size_t CurrentCidCount = Cids.size();
+ CbObjectView Obj(Buffer.GetData());
+ size_t CurrentCidCount = Cids.size();
Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
if (m_Configuration.EnableReferenceCaching)
{
@@ -1528,7 +1532,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
});
#endif // CALCULATE_BLOCKING_TIME
- if (auto It = m_Index.find(Entry.first); It != m_Index.end())
+ if (auto It = m_Index.find(Key); It != m_Index.end())
{
if (m_FirstReferenceIndex[It->second] == UnknownReferencesIndex)
{
@@ -1556,7 +1560,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
}
void
-ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage");
@@ -1762,17 +1766,19 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin
TotalChunkCount = 0;
for (const auto& Entry : Index)
{
- const DiskLocation& DiskLocation = Payloads[Entry.second].Location;
+ size_t EntryIndex = Entry.second;
+ const DiskLocation& DiskLocation = Payloads[EntryIndex].Location;
if (DiskLocation.Flags & DiskLocation::kStandaloneFile)
{
continue;
}
+ const IoHash& Key = Entry.first;
BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment);
size_t ChunkIndex = ChunkLocations.size();
ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash[ChunkIndex] = Entry.first;
- if (ExpiredCacheKeys.contains(Entry.first))
+ ChunkIndexToChunkHash[ChunkIndex] = Key;
+ if (ExpiredCacheKeys.contains(Key))
{
continue;
}
@@ -1815,12 +1821,12 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin
});
for (const auto& Entry : MovedChunks)
{
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- size_t PayloadIndex = m_Index[ChunkHash];
- BucketPayload& Payload = m_Payloads[PayloadIndex];
- if (Payloads[Index[ChunkHash]].Location != m_Payloads[PayloadIndex].Location)
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ size_t EntryIndex = m_Index[ChunkHash];
+ BucketPayload& Payload = m_Payloads[EntryIndex];
+ if (Payloads[Index[ChunkHash]].Location != m_Payloads[EntryIndex].Location)
{
// Entry has been updated while GC was running, ignore the move
continue;
@@ -1830,9 +1836,9 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin
}
for (const size_t ChunkIndex : RemovedChunks)
{
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- size_t PayloadIndex = m_Index[ChunkHash];
- const BucketPayload& Payload = m_Payloads[PayloadIndex];
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ size_t EntryIndex = m_Index[ChunkHash];
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
if (Payloads[Index[ChunkHash]].Location != Payload.Location)
{
// Entry has been updated while GC was running, ignore the delete
@@ -1843,12 +1849,11 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin
.Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment),
m_Configuration.PayloadAlignment,
OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
- if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[PayloadIndex])
+ if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[EntryIndex])
{
- uint64_t CachePayloadSize = m_CachedPayloads[PayloadIndex].Size();
- m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
- CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
- m_CachedPayloads[PayloadIndex] = IoBuffer{};
+ uint64_t CachePayloadSize = m_CachedPayloads[EntryIndex].Size();
+ RemoveMemCacheUsage(CachePayloadSize);
+ m_CachedPayloads[EntryIndex] = IoBuffer{};
}
m_Index.erase(ChunkHash);
DeletedChunks.insert(ChunkHash);
@@ -1891,10 +1896,10 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index)
const BucketPayload& Payload = m_Payloads[Index];
if (Payload.Location.IsFlagSet(DiskLocation::kStructured))
{
- IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile)
- ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key)
- : GetInlineCacheValue(Payload.Location);
- CbObject Obj(SharedBuffer{Value});
+ IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile)
+ ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key)
+ : GetInlineCacheValue(Payload.Location);
+ CbObjectView Obj(Value.GetData());
Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); });
}
return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(),
@@ -1958,16 +1963,13 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
}
for (CacheBucket* Bucket : Buckets)
{
- Bucket->CollectGarbage(GcCtx, m_TotalMemCachedSize);
+ Bucket->CollectGarbage(GcCtx);
}
MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
}
void
-ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue");
@@ -2118,8 +2120,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey
if (m_CachedPayloads[EntryIndex])
{
uint64_t CachePayloadSize = m_CachedPayloads[EntryIndex].Size();
- m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
- CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
+ RemoveMemCacheUsage(CachePayloadSize);
m_CachedPayloads[EntryIndex] = IoBuffer{};
}
}
@@ -2131,10 +2132,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey
}
void
-ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- std::atomic_uint64_t& CacheMemoryUsage)
+ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue");
@@ -2176,14 +2174,12 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
if (m_CachedPayloads[EntryIndex])
{
uint64_t OldCachedSize = m_CachedPayloads[EntryIndex].GetSize();
- m_MemCachedSize.fetch_sub(OldCachedSize);
- CacheMemoryUsage.fetch_sub(OldCachedSize);
+ RemoveMemCacheUsage(OldCachedSize);
}
if (MemCacheBuffer)
{
- m_MemCachedSize.fetch_add(PayloadSize);
- CacheMemoryUsage.fetch_add(PayloadSize);
+ AddMemCacheUsage(PayloadSize);
m_MemoryWriteCount++;
}
m_CachedPayloads[EntryIndex] = std::move(MemCacheBuffer);
@@ -2202,8 +2198,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
{
if (MemCacheBuffer)
{
- m_MemCachedSize.fetch_add(PayloadSize);
- CacheMemoryUsage.fetch_add(PayloadSize);
+ AddMemCacheUsage(PayloadSize);
m_MemoryWriteCount++;
}
m_CachedPayloads.emplace_back(std::move(MemCacheBuffer));
@@ -2219,6 +2214,409 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
}
void
+ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx)
+{
+ size_t TotalEntries = 0;
+ tsl::robin_set<IoHash, IoHash::Hasher> ExpiredInlineKeys;
+ std::vector<std::pair<IoHash, uint64_t>> ExpiredStandaloneKeys;
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gc cache bucket '{}': removed {} expired keys out of {} in {}",
+ m_BucketDir,
+ ExpiredStandaloneKeys.size() + ExpiredInlineKeys.size(),
+ TotalEntries,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count();
+
+ BlockStoreCompactState BlockCompactState;
+ BlockStore::ReclaimSnapshotState BlockSnapshotState;
+ std::vector<IoHash> BlockCompactStateKeys;
+ std::vector<DiskIndexEntry> ExpiredEntries;
+ uint64_t RemovedStandaloneSize = 0;
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ if (Ctx.Settings.CollectSmallObjects)
+ {
+ BlockSnapshotState = m_BlockStore.GetReclaimSnapshotState();
+ }
+ TotalEntries = m_Index.size();
+
+ // Find out expired keys and affected blocks
+ for (const auto& Entry : m_Index)
+ {
+ const IoHash& Key = Entry.first;
+ size_t EntryIndex = Entry.second;
+ GcClock::Tick AccessTime = m_AccessTimes[EntryIndex];
+ if (AccessTime >= ExpireTicks)
+ {
+ continue;
+ }
+
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ DiskIndexEntry ExpiredEntry = {.Key = Key, .Location = Payload.Location};
+ ExpiredEntry.Location.Flags |= DiskLocation::kTombStone;
+
+ if (Payload.Location.Flags & DiskLocation::kStandaloneFile)
+ {
+ ExpiredStandaloneKeys.push_back({Key, Payload.Location.Size()});
+ RemovedStandaloneSize += Payload.Location.Size();
+ ExpiredEntries.push_back(ExpiredEntry);
+ }
+ else if (Ctx.Settings.CollectSmallObjects)
+ {
+ ExpiredInlineKeys.insert(Key);
+ uint32_t BlockIndex = Payload.Location.Location.BlockLocation.GetBlockIndex();
+ bool IsActiveWriteBlock = BlockSnapshotState.m_ActiveWriteBlocks.contains(BlockIndex);
+ if (!IsActiveWriteBlock)
+ {
+ BlockCompactState.AddBlock(BlockIndex);
+ }
+ ExpiredEntries.push_back(ExpiredEntry);
+ }
+ }
+
+ Ctx.ExpiredItems.fetch_add(ExpiredStandaloneKeys.size() + ExpiredInlineKeys.size());
+
+ // Get all locations we need to keep for affected blocks
+ if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty())
+ {
+ for (const auto& Entry : m_Index)
+ {
+ const IoHash& Key = Entry.first;
+ if (ExpiredInlineKeys.contains(Key))
+ {
+ continue;
+ }
+ size_t EntryIndex = Entry.second;
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ if (Payload.Location.Flags & DiskLocation::kStandaloneFile)
+ {
+ continue;
+ }
+ if (BlockCompactState.AddKeepLocation(Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment)))
+ {
+ BlockCompactStateKeys.push_back(Key);
+ }
+ }
+ }
+
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ for (const DiskIndexEntry& Entry : ExpiredEntries)
+ {
+ auto It = m_Index.find(Entry.Key);
+ ZEN_ASSERT(It != m_Index.end());
+ if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[It->second])
+ {
+ size_t PayloadSize = m_CachedPayloads[It->second].GetSize();
+ Ctx.RemovedMemory.fetch_add(PayloadSize);
+ RemoveMemCacheUsage(PayloadSize);
+ }
+ m_Index.erase(It);
+ }
+ m_SlogFile.Append(ExpiredEntries);
+ m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed);
+ }
+ }
+ Ctx.Items.fetch_add(TotalEntries);
+
+ if (ExpiredEntries.empty())
+ {
+ return;
+ }
+
+ if (!Ctx.Settings.IsDeleteMode)
+ {
+ return;
+ }
+
+ Ctx.DeletedItems.fetch_add(ExpiredEntries.size());
+
+ // Compact standalone items
+ ExtendablePathBuilder<256> Path;
+ for (const std::pair<IoHash, uint64_t>& ExpiredKey : ExpiredStandaloneKeys)
+ {
+ Path.Reset();
+ BuildPath(Path, ExpiredKey.first);
+ fs::path FilePath = Path.ToPath();
+
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ if (m_Index.contains(ExpiredKey.first))
+ {
+ // Someone added it back, let the file on disk be
+ ZEN_DEBUG("gc cache bucket '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back",
+ m_BucketDir,
+ Path.ToUtf8());
+ continue;
+ }
+
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(ExpiredKey.first));
+ IndexLock.ReleaseNow();
+ ZEN_DEBUG("gc cache bucket '{}': deleting standalone cache file '{}'", m_BucketDir, Path.ToUtf8());
+
+ std::error_code Ec;
+ if (!fs::remove(FilePath, Ec))
+ {
+ continue;
+ }
+ if (Ec)
+ {
+ ZEN_WARN("gc cache bucket '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'",
+ m_BucketDir,
+ Path.ToUtf8(),
+ Ec.message());
+ continue;
+ }
+ Ctx.RemovedDiskSpace.fetch_add(ExpiredKey.second);
+ }
+
+ if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty())
+ {
+ // Compact block store
+ m_BlockStore.CompactBlocks(
+ BlockCompactState,
+ m_Configuration.PayloadAlignment,
+ [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) {
+ std::vector<DiskIndexEntry> MovedEntries;
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
+ {
+ size_t ChunkIndex = Moved.first;
+ const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
+
+ if (auto It = m_Index.find(Key); It != m_Index.end())
+ {
+ BucketPayload& Payload = m_Payloads[It->second];
+ const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex);
+ if (Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment) != OldLocation)
+ {
+ // Someone has moved our chunk so lets just skip the new location we were provided, it will be GC:d at a later
+ // time
+ continue;
+ }
+
+ const BlockStoreLocation& NewLocation = Moved.second;
+
+ Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags());
+ MovedEntries.push_back({.Key = Key, .Location = Payload.Location});
+ }
+ }
+ m_SlogFile.Append(MovedEntries);
+ Ctx.RemovedDiskSpace.fetch_add(FreedDiskSpace);
+ },
+ [&]() { return 0; });
+ }
+
+ std::vector<BucketPayload> Payloads;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<IoBuffer> CachedPayloads;
+ std::vector<size_t> FirstReferenceIndex;
+ IndexMap Index;
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ CompactState(Payloads, AccessTimes, CachedPayloads, FirstReferenceIndex, Index, IndexLock);
+ }
+}
+
+class DiskBucketReferenceChecker : public GcReferenceChecker
+{
+public:
+ DiskBucketReferenceChecker(ZenCacheDiskLayer::CacheBucket& Owner) : m_CacheBucket(Owner) {}
+
+ virtual ~DiskBucketReferenceChecker()
+ {
+ m_IndexLock.reset();
+ if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it
+ m_CacheBucket.ClearReferenceCache();
+ }
+ }
+
+ virtual void LockState(GcCtx&) override
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("gc cache bucket '{}': found {} references in {}",
+ m_CacheBucket.m_BucketDir,
+ m_CacheBucket.m_ReferenceCount + m_UncachedReferences.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock);
+
+ // Rescan to see if any cache items needs refreshing since last pass when we had the lock
+ for (const auto& Entry : m_CacheBucket.m_Index)
+ {
+ size_t PayloadIndex = Entry.second;
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_CacheBucket.m_Payloads[PayloadIndex];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ continue;
+ }
+ ZEN_ASSERT(!m_CacheBucket.m_FirstReferenceIndex.empty());
+ const IoHash& Key = Entry.first;
+ if (m_CacheBucket.m_FirstReferenceIndex[PayloadIndex] == ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex)
+ {
+ IoBuffer Buffer;
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ Buffer = m_CacheBucket.GetStandaloneCacheValue(Loc.GetContentType(), Key);
+ }
+ else
+ {
+ Buffer = m_CacheBucket.GetInlineCacheValue(Loc);
+ }
+
+ if (Buffer)
+ {
+ ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
+ CbObjectView Obj(Buffer.GetData());
+ Obj.IterateAttachments([this](CbFieldView Field) { m_UncachedReferences.insert(Field.AsAttachment()); });
+ }
+ }
+ }
+ }
+
+ virtual void RemoveUsedReferencesFromSet(GcCtx&, HashSet& IoCids) override
+ {
+ ZEN_ASSERT(m_IndexLock);
+
+ for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes)
+ {
+ IoCids.erase(ReferenceHash);
+ }
+
+ for (const IoHash& ReferenceHash : m_UncachedReferences)
+ {
+ IoCids.erase(ReferenceHash);
+ }
+ }
+ ZenCacheDiskLayer::CacheBucket& m_CacheBucket;
+ std::unique_ptr<RwLock::SharedLockScope> m_IndexLock;
+ HashSet m_UncachedReferences;
+};
+
+std::vector<GcReferenceChecker*>
+ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx&)
+{
+ Stopwatch Timer;
+ const auto _ = MakeGuard(
+ [&] { ZEN_DEBUG("gc cache bucket '{}': refreshed reference cache in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
+ std::vector<IoHash> UpdateKeys;
+ std::vector<IoHash> StandaloneKeys;
+ std::vector<size_t> ReferenceCounts;
+ std::vector<IoHash> References;
+
+ // Refresh cache
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ for (const auto& Entry : m_Index)
+ {
+ size_t PayloadIndex = Entry.second;
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Payloads[PayloadIndex];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ continue;
+ }
+ if (m_Configuration.EnableReferenceCaching &&
+ m_FirstReferenceIndex[PayloadIndex] != ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex)
+ {
+ continue;
+ }
+ const IoHash& Key = Entry.first;
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ StandaloneKeys.push_back(Key);
+ continue;
+ }
+ IoBuffer Buffer = GetInlineCacheValue(Loc);
+ if (!Buffer)
+ {
+ UpdateKeys.push_back(Key);
+ ReferenceCounts.push_back(0);
+ continue;
+ }
+ size_t CurrentReferenceCount = References.size();
+ {
+ CbObjectView Obj(Buffer.GetData());
+ Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); });
+ Buffer = {};
+ }
+ UpdateKeys.push_back(Key);
+ ReferenceCounts.push_back(References.size() - CurrentReferenceCount);
+ }
+ }
+ {
+ for (const IoHash& Key : StandaloneKeys)
+ {
+ IoBuffer Buffer = GetStandaloneCacheValue(ZenContentType::kCbObject, Key);
+ if (!Buffer)
+ {
+ continue;
+ }
+
+ size_t CurrentReferenceCount = References.size();
+ {
+ CbObjectView Obj(Buffer.GetData());
+ Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); });
+ Buffer = {};
+ }
+ UpdateKeys.push_back(Key);
+ ReferenceCounts.push_back(References.size() - CurrentReferenceCount);
+ }
+ }
+
+ {
+ size_t ReferenceOffset = 0;
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ if (!m_Configuration.EnableReferenceCaching)
+ {
+ ZEN_ASSERT(m_FirstReferenceIndex.empty());
+ ZEN_ASSERT(m_ReferenceHashes.empty());
+ ZEN_ASSERT(m_NextReferenceHashesIndexes.empty());
+ ZEN_ASSERT(m_ReferenceCount == 0);
+ // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when
+ // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted.
+ m_FirstReferenceIndex.resize(m_Payloads.size(), UnknownReferencesIndex);
+ }
+ for (size_t Index = 0; Index < UpdateKeys.size(); Index++)
+ {
+ const IoHash& Key = UpdateKeys[Index];
+ size_t ReferenceCount = ReferenceCounts[Index];
+ auto It = m_Index.find(Key);
+ if (It == m_Index.end())
+ {
+ ReferenceOffset += ReferenceCount;
+ continue;
+ }
+ if (m_FirstReferenceIndex[It->second] != ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex)
+ {
+ continue;
+ }
+ SetReferences(IndexLock,
+ m_FirstReferenceIndex[It->second],
+ std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount});
+ ReferenceOffset += ReferenceCount;
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ CompactReferences(IndexLock);
+ }
+ }
+
+ return {new DiskBucketReferenceChecker(*this)};
+}
+
+void
ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&)
{
std::vector<size_t> FirstReferenceIndex;
@@ -2381,6 +2779,19 @@ ZenCacheDiskLayer::CacheBucket::LockedGetReferences(std::size_t FirstReferenceIn
}
void
+ZenCacheDiskLayer::CacheBucket::ClearReferenceCache()
+{
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ m_FirstReferenceIndex.clear();
+ m_FirstReferenceIndex.shrink_to_fit();
+ m_ReferenceHashes.clear();
+ m_ReferenceHashes.shrink_to_fit();
+ m_NextReferenceHashesIndexes.clear();
+ m_NextReferenceHashesIndexes.shrink_to_fit();
+ m_ReferenceCount = 0;
+}
+
+void
ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloads,
std::vector<AccessTime>& AccessTimes,
std::vector<IoBuffer>& CachedPayloads,
@@ -2426,16 +2837,34 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payload
}
}
+#if ZEN_WITH_TESTS
+void
+ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time)
+{
+ GcClock::Tick TimeTick = Time.time_since_epoch().count();
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ {
+ size_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
+ m_AccessTimes[EntryIndex] = TimeTick;
+ }
+}
+#endif // ZEN_WITH_TESTS
+
//////////////////////////////////////////////////////////////////////////
-ZenCacheDiskLayer::ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config)
-: m_JobQueue(JobQueue)
+ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config)
+: m_Gc(Gc)
+, m_JobQueue(JobQueue)
, m_RootDir(RootDir)
, m_Configuration(Config)
{
}
-ZenCacheDiskLayer::~ZenCacheDiskLayer() = default;
+ZenCacheDiskLayer::~ZenCacheDiskLayer()
+{
+}
bool
ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
@@ -2468,8 +2897,10 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
else
{
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig));
- Bucket = InsertResult.first->second.get();
+ auto InsertResult =
+ m_Buckets.emplace(BucketName,
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
+ Bucket = InsertResult.first->second.get();
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= BucketName;
@@ -2483,7 +2914,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
ZEN_ASSERT(Bucket != nullptr);
- if (Bucket->Get(HashKey, OutValue, m_TotalMemCachedSize))
+ if (Bucket->Get(HashKey, OutValue))
{
TryMemCacheTrim();
return true;
@@ -2522,8 +2953,10 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
}
else
{
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig));
- Bucket = InsertResult.first->second.get();
+ auto InsertResult =
+ m_Buckets.emplace(BucketName,
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
+ Bucket = InsertResult.first->second.get();
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= BucketName;
@@ -2547,7 +2980,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
ZEN_ASSERT(Bucket != nullptr);
- Bucket->Put(HashKey, Value, References, m_TotalMemCachedSize);
+ Bucket->Put(HashKey, Value, References);
TryMemCacheTrim();
}
@@ -2579,8 +3012,10 @@ ZenCacheDiskLayer::DiscoverBuckets()
continue;
}
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig));
- CacheBucket& Bucket = *InsertResult.first->second;
+ auto InsertResult =
+ m_Buckets.emplace(BucketName,
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
+ CacheBucket& Bucket = *InsertResult.first->second;
try
{
@@ -2636,7 +3071,7 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
m_DroppedBuckets.push_back(std::move(It->second));
m_Buckets.erase(It);
- return Bucket.Drop(m_TotalMemCachedSize);
+ return Bucket.Drop();
}
// Make sure we remove the folder even if we don't know about the bucket
@@ -2658,7 +3093,7 @@ ZenCacheDiskLayer::Drop()
CacheBucket& Bucket = *It->second;
m_DroppedBuckets.push_back(std::move(It->second));
m_Buckets.erase(It->first);
- if (!Bucket.Drop(m_TotalMemCachedSize))
+ if (!Bucket.Drop())
{
return false;
}
@@ -2700,10 +3135,10 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
{
#if 1
Results.push_back(Ctx.ThreadPool().EnqueueTask(
- std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx, m_TotalMemCachedSize); }}));
+ std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
#else
CacheBucket& Bucket = *Kv.second;
- Bucket.ScrubStorage(Ctx, m_TotalMemCachedSize);
+ Bucket.ScrubStorage(Ctx);
#endif
}
@@ -2914,7 +3349,7 @@ ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::Tim
RwLock::SharedLockScope __(m_Lock);
for (CacheBucket* Bucket : Buckets)
{
- Bucket->MemCacheTrim(ExpireTime, m_TotalMemCachedSize);
+ Bucket->MemCacheTrim(ExpireTime);
}
const GcClock::TimePoint Now = GcClock::Now();
const GcClock::Tick NowTick = Now.time_since_epoch().count();
@@ -2924,4 +3359,30 @@ ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::Tim
m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
}
+#if ZEN_WITH_TESTS
+void
+ZenCacheDiskLayer::SetAccessTime(std::string_view InBucket, const IoHash& HashKey, GcClock::TimePoint Time)
+{
+ const auto BucketName = std::string(InBucket);
+ CacheBucket* Bucket = nullptr;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ auto It = m_Buckets.find(BucketName);
+
+ if (It != m_Buckets.end())
+ {
+ Bucket = It->second.get();
+ }
+ }
+
+ if (Bucket == nullptr)
+ {
+ return;
+ }
+ Bucket->SetAccessTime(HashKey, Time);
+}
+#endif // ZEN_WITH_TESTS
+
} // namespace zen