aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-10-24 14:54:26 +0200
committerGitHub <[email protected]>2023-10-24 14:54:26 +0200
commit1a144212278aa7158d6b32b63e398db95a7ae868 (patch)
tree58735827b0b706368a82bcaaa8aaa68f211e1d10 /src/zenserver/cache/cachedisklayer.cpp
parentchunking moved to zenstore (#490) (diff)
downloadzen-1a144212278aa7158d6b32b63e398db95a7ae868.tar.xz
zen-1a144212278aa7158d6b32b63e398db95a7ae868.zip
merge disk and memory layers (#493)
- Feature: Added `--cache-memlayer-sizethreshold` option to zenserver to control at which size cache entries get cached in memory - Changed: Merged cache memory layer with cache disk layer to reduce memory and cpu overhead
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp826
1 files changed, 535 insertions, 291 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 73c02bcc2..f755436e0 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -7,6 +7,7 @@
#include <zencore/compactbinaryvalidation.h>
#include <zencore/compress.h>
#include <zencore/fmtutils.h>
+#include <zencore/jobqueue.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/trace.h>
@@ -167,16 +168,16 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
const size_t ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex;
const size_t ZenCacheDiskLayer::CacheBucket::NoReferencesIndex;
-ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, bool EnableReferenceCaching)
+ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, const BucketConfiguration& Config)
: m_BucketName(std::move(BucketName))
+, m_Configuration(Config)
, m_BucketId(Oid::Zero)
-, m_EnableReferenceCaching(EnableReferenceCaching)
{
if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap")))
{
// This is pretty ad hoc but in order to avoid too many individual files
// it makes sense to have a different strategy for legacy values
- m_LargeObjectThreshold = 16 * 1024 * 1024;
+ m_Configuration.LargeObjectThreshold = 16 * 1024 * 1024;
}
}
@@ -397,7 +398,7 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
CacheBucketIndexHeader Header = {.EntryCount = Entries.size(),
.LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
@@ -470,7 +471,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- m_PayloadAlignment = Header.PayloadAlignment;
+ m_Configuration.PayloadAlignment = Header.PayloadAlignment;
std::vector<DiskIndexEntry> Entries;
Entries.resize(Header.EntryCount);
@@ -479,11 +480,6 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
sizeof(CacheBucketIndexHeader));
m_Payloads.reserve(Header.EntryCount);
- if (m_EnableReferenceCaching)
- {
- m_FirstReferenceIndex.reserve(Header.EntryCount);
- }
- m_AccessTimes.reserve(Header.EntryCount);
m_Index.reserve(Header.EntryCount);
std::string InvalidEntryReason;
@@ -496,14 +492,18 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
}
size_t EntryIndex = m_Payloads.size();
m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- if (m_EnableReferenceCaching)
- {
- m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
- }
m_Index.insert_or_assign(Entry.Key, EntryIndex);
EntryCount++;
}
+ m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_CachedPayloads.resize(m_Payloads.size());
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.resize(m_Payloads.size(), UnknownReferencesIndex);
+ }
OutVersion = CacheBucketIndexHeader::Version2;
return Header.LogPosition;
}
@@ -540,8 +540,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui
ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
SkipEntryCount = 0;
}
- LogEntryCount = EntryCount - SkipEntryCount;
- m_Index.reserve(LogEntryCount);
+ LogEntryCount = EntryCount - SkipEntryCount;
uint64_t InvalidEntryCount = 0;
CasLog.Replay(
[&](const DiskIndexEntry& Record) {
@@ -559,14 +558,18 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui
}
size_t EntryIndex = m_Payloads.size();
m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- if (m_EnableReferenceCaching)
- {
- m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
- }
m_Index.insert_or_assign(Record.Key, EntryIndex);
},
SkipEntryCount);
+ m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_CachedPayloads.resize(m_Payloads.size());
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.resize(m_Payloads.size(), UnknownReferencesIndex);
+ }
if (InvalidEntryCount)
{
ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir);
@@ -582,11 +585,12 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog");
- m_TotalStandaloneSize = 0;
+ m_StandaloneSize = 0;
m_Index.clear();
m_Payloads.clear();
m_AccessTimes.clear();
+ m_CachedPayloads.clear();
m_FirstReferenceIndex.clear();
m_ReferenceHashes.clear();
m_NextReferenceHashesIndexes.clear();
@@ -604,7 +608,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
CreateDirectories(m_BucketDir);
- m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1);
+ m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1);
if (std::filesystem::is_regular_file(IndexPath))
{
@@ -643,10 +647,10 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
{
- m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
+ m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
continue;
}
- const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment);
+ const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment);
KnownLocations.push_back(BlockLocation);
}
@@ -681,7 +685,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::GetInlineCacheValue");
- BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment);
+ BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment);
IoBuffer Value = m_BlockStore.TryGetChunk(Location);
if (Value)
@@ -713,88 +717,190 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy
}
bool
-ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
+ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage)
{
metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
- RwLock::SharedLockScope _(m_IndexLock);
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
auto It = m_Index.find(HashKey);
if (It == m_Index.end())
{
- m_MissCount++;
+ m_DiskMissCount++;
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
return false;
}
- size_t EntryIndex = It.value();
- const BucketPayload& Payload = m_Payloads[EntryIndex];
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
- DiskLocation Location = Payload.Location;
- OutValue.RawSize = Payload.RawSize;
- OutValue.RawHash = Payload.RawHash;
- if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+
+ size_t EntryIndex = It.value();
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ DiskLocation Location = m_Payloads[EntryIndex].Location;
+ bool FillRawHashAndRawSize = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0);
+ if (FillRawHashAndRawSize)
{
- // We don't need to hold the index lock when we read a standalone file
- _.ReleaseNow();
- OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey);
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ if (Payload.RawHash != IoHash::Zero || Payload.RawSize != 0)
+ {
+ OutValue.RawHash = Payload.RawHash;
+ OutValue.RawSize = Payload.RawSize;
+ FillRawHashAndRawSize = false;
+ }
}
- else
+
+ if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[EntryIndex])
{
- OutValue.Value = GetInlineCacheValue(Location);
+ OutValue.Value = m_CachedPayloads[EntryIndex];
+ IndexLock.ReleaseNow();
+ m_MemoryHitCount++;
}
- _.ReleaseNow();
-
- if (!Location.IsFlagSet(DiskLocation::kStructured))
+ else
{
- if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0)
+ IndexLock.ReleaseNow();
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey);
+ }
+ else
{
- if (Location.IsFlagSet(DiskLocation::kCompressed))
+ OutValue.Value = GetInlineCacheValue(Location);
+ if (m_Configuration.MemCacheSizeThreshold > 0)
{
- (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize);
+ size_t ValueSize = OutValue.Value.GetSize();
+ if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
+ {
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MemCache");
+ OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end())
+ {
+ // Only update if it has not already been updated by other thread
+ if (!m_CachedPayloads[UpdateIt->second])
+ {
+ m_CachedPayloads[UpdateIt->second] = OutValue.Value;
+ m_MemCachedSize.fetch_add(ValueSize);
+ CacheMemoryUsage.fetch_add(ValueSize);
+ m_MemoryWriteCount++;
+ }
+ }
+ }
}
- else
+ }
+ }
+
+ if (FillRawHashAndRawSize)
+ {
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MetaData");
+ if (Location.IsFlagSet(DiskLocation::kCompressed))
+ {
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
{
- OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
- OutValue.RawSize = OutValue.Value.GetSize();
+ OutValue = ZenCacheValue{};
+ m_DiskMissCount++;
+ return false;
}
- RwLock::ExclusiveLockScope __(m_IndexLock);
- if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end())
+ }
+ else
+ {
+ OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
+ OutValue.RawSize = OutValue.Value.GetSize();
+ }
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end())
+ {
+ BucketPayload& WritePayload = m_Payloads[WriteIt.value()];
+ if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0)
{
- BucketPayload& WritePayload = m_Payloads[WriteIt.value()];
- WritePayload.RawHash = OutValue.RawHash;
- WritePayload.RawSize = OutValue.RawSize;
-
- m_LogFlushPosition = 0; // Force resave of index on exit
+ WritePayload.RawHash = OutValue.RawHash;
+ WritePayload.RawSize = OutValue.RawSize;
}
}
}
if (OutValue.Value)
{
- m_HitCount++;
+ m_DiskHitCount++;
StatsScope.SetBytes(OutValue.Value.GetSize());
return true;
}
else
{
- m_MissCount++;
+ m_DiskMissCount++;
return false;
}
}
void
-ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ std::atomic_uint64_t& CacheMemoryUsage)
{
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
- if (Value.Value.Size() >= m_LargeObjectThreshold)
+ if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
{
- return PutStandaloneCacheValue(HashKey, Value, References);
+ PutStandaloneCacheValue(HashKey, Value, References, CacheMemoryUsage);
}
- PutInlineCacheValue(HashKey, Value, References);
+ else
+ {
+ PutInlineCacheValue(HashKey, Value, References, CacheMemoryUsage);
+ }
+
+ m_DiskWriteCount++;
+}
- m_WriteCount++;
+void
+ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage)
+{
+ GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
+
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ for (const auto& Kv : m_Index)
+ {
+ if (m_AccessTimes[Kv.second] < ExpireTicks)
+ {
+ size_t PayloadSize = m_CachedPayloads[Kv.second].GetSize();
+ m_MemCachedSize.fetch_sub(PayloadSize);
+ CacheMemoryUsage.fetch_sub(PayloadSize);
+ m_CachedPayloads[Kv.second] = {};
+ }
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart,
+ GcClock::Duration SectionLength,
+ std::vector<uint64_t>& InOutUsageSlots)
+{
+ RwLock::SharedLockScope _(m_IndexLock);
+ for (const auto& It : m_Index)
+ {
+ size_t Index = It.second;
+ if (!m_CachedPayloads[Index])
+ {
+ continue;
+ }
+ GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index]));
+ GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch();
+ uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0);
+ if (Slot >= InOutUsageSlots.capacity())
+ {
+ Slot = InOutUsageSlots.capacity() - 1;
+ }
+ if (Slot > InOutUsageSlots.size())
+ {
+ InOutUsageSlots.resize(uint64_t(Slot + 1), 0);
+ }
+ InOutUsageSlots[Slot] += m_CachedPayloads[Index].GetSize();
+ }
}
bool
-ZenCacheDiskLayer::CacheBucket::Drop()
+ZenCacheDiskLayer::CacheBucket::Drop(std::atomic_uint64_t& CacheMemoryUsage)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop");
@@ -814,10 +920,14 @@ ZenCacheDiskLayer::CacheBucket::Drop()
m_Index.clear();
m_Payloads.clear();
m_AccessTimes.clear();
+ m_CachedPayloads.clear();
m_FirstReferenceIndex.clear();
m_ReferenceHashes.clear();
m_NextReferenceHashesIndexes.clear();
m_ReferenceCount = 0;
+ m_StandaloneSize.store(0);
+ CacheMemoryUsage.fetch_sub(m_MemCachedSize.load());
+ m_MemCachedSize.store(0);
return Deleted;
}
@@ -992,7 +1102,7 @@ ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer)
};
void
-ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
+ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub");
@@ -1083,7 +1193,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
}
else
{
- ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment));
+ ChunkLocations.emplace_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment));
ChunkIndexToChunkHash.push_back(HashKey);
continue;
}
@@ -1169,11 +1279,24 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
// Log a tombstone and delete the in-memory index for the bad entry
const auto It = m_Index.find(BadKey);
BucketPayload& Payload = m_Payloads[It->second];
- if (m_EnableReferenceCaching)
+ if (m_Configuration.EnableReferenceCaching)
{
RemoveReferences(IndexLock, m_FirstReferenceIndex[It->second]);
}
DiskLocation Location = Payload.Location;
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed);
+ }
+
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ size_t CachedSize = m_CachedPayloads[It->second].GetSize();
+ m_MemCachedSize.fetch_sub(CachedSize);
+ CacheMemoryUsage.fetch_sub(CachedSize);
+ m_CachedPayloads[It->second] = IoBuffer{};
+ }
+
Location.Flags |= DiskLocation::kTombStone;
LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
m_Index.erase(BadKey);
@@ -1193,7 +1316,6 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
std::error_code Ec;
fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file...
}
- m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
}
}
m_SlogFile.Append(LogEntries);
@@ -1202,38 +1324,13 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
{
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
+ std::vector<IoBuffer> CachedPayloads;
std::vector<size_t> FirstReferenceIndex;
IndexMap Index;
{
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- size_t EntryCount = m_Index.size();
- Payloads.reserve(EntryCount);
- AccessTimes.reserve(EntryCount);
- if (m_EnableReferenceCaching)
- {
- FirstReferenceIndex.reserve(EntryCount);
- }
- Index.reserve(EntryCount);
- for (auto It : m_Index)
- {
- size_t EntryIndex = Payloads.size();
- Payloads.push_back(m_Payloads[It.second]);
- AccessTimes.push_back(m_AccessTimes[It.second]);
- if (m_EnableReferenceCaching)
- {
- FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]);
- }
- Index.insert({It.first, EntryIndex});
- }
- m_Index.swap(Index);
- m_Payloads.swap(Payloads);
- m_AccessTimes.swap(AccessTimes);
- if (m_EnableReferenceCaching)
- {
- m_FirstReferenceIndex.swap(FirstReferenceIndex);
- CompactReferences(IndexLock);
- }
+ CompactState(Payloads, AccessTimes, CachedPayloads, FirstReferenceIndex, Index, IndexLock);
}
}
}
@@ -1334,7 +1431,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
continue;
}
- if (m_EnableReferenceCaching)
+ if (m_Configuration.EnableReferenceCaching)
{
if (FirstReferenceIndex.empty() || (FirstReferenceIndex[Entry.second] == UnknownReferencesIndex))
{
@@ -1397,8 +1494,17 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
#endif // CALCULATE_BLOCKING_TIME
if (auto It = m_Index.find(Entry.first); It != m_Index.end())
{
- BucketPayload& UpdatePayload = m_Payloads[It->second];
- Buffer = GetInlineCacheValue(UpdatePayload.Location);
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ Buffer = m_CachedPayloads[It->second];
+ }
+ if (!Buffer)
+ {
+ DiskLocation Location = m_Payloads[It->second].Location;
+ IndexLock.ReleaseNow();
+ Buffer = GetInlineCacheValue(Location);
+ // Don't memcache items when doing GC
+ }
}
if (!Buffer)
{
@@ -1411,7 +1517,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
CbObject Obj(SharedBuffer{Buffer});
size_t CurrentCidCount = Cids.size();
Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
- if (m_EnableReferenceCaching)
+ if (m_Configuration.EnableReferenceCaching)
{
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
#if CALCULATE_BLOCKING_TIME
@@ -1450,20 +1556,20 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
}
void
-ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
+ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage");
ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir);
- Stopwatch TotalTimer;
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
- uint64_t TotalChunkCount = 0;
- uint64_t DeletedSize = 0;
- uint64_t OldTotalSize = TotalSize();
+ Stopwatch TotalTimer;
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = 0;
+ uint64_t DeletedSize = 0;
+ GcStorageSize OldTotalSize = StorageSize();
std::unordered_set<IoHash> DeletedChunks;
uint64_t MovedCount = 0;
@@ -1473,7 +1579,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
"garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved "
"{} "
"of {} "
- "entires ({}).",
+ "entries ({}/{}).",
m_BucketDir,
NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
NiceLatencyNs(WriteBlockTimeUs),
@@ -1484,7 +1590,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
DeletedChunks.size(),
MovedCount,
TotalChunkCount,
- NiceBytes(OldTotalSize));
+ NiceBytes(OldTotalSize.DiskSize),
+ NiceBytes(OldTotalSize.MemorySize));
bool Expected = false;
if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
@@ -1514,45 +1621,18 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
// Clean up m_AccessTimes and m_Payloads vectors
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
+ std::vector<IoBuffer> CachedPayloads;
std::vector<size_t> FirstReferenceIndex;
IndexMap Index;
-
{
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- size_t EntryCount = m_Index.size();
- Payloads.reserve(EntryCount);
- AccessTimes.reserve(EntryCount);
- if (m_EnableReferenceCaching)
- {
- FirstReferenceIndex.reserve(EntryCount);
- }
- Index.reserve(EntryCount);
- for (auto It : m_Index)
- {
- size_t OldEntryIndex = It.second;
- size_t NewEntryIndex = Payloads.size();
- Payloads.push_back(m_Payloads[OldEntryIndex]);
- AccessTimes.push_back(m_AccessTimes[OldEntryIndex]);
- if (m_EnableReferenceCaching)
- {
- FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]);
- }
- Index.insert({It.first, NewEntryIndex});
- }
- m_Index.swap(Index);
- m_Payloads.swap(Payloads);
- m_AccessTimes.swap(AccessTimes);
- if (m_EnableReferenceCaching)
- {
- m_FirstReferenceIndex.swap(FirstReferenceIndex);
- CompactReferences(IndexLock);
- }
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ CompactState(Payloads, AccessTimes, CachedPayloads, FirstReferenceIndex, Index, IndexLock);
}
GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end()));
}
@@ -1610,7 +1690,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
for (const auto& Entry : ExpiredStandaloneEntries)
{
m_Index.erase(Entry.Key);
- m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ m_StandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
DeletedChunks.insert(Entry.Key);
}
m_SlogFile.Append(ExpiredStandaloneEntries);
@@ -1623,20 +1703,18 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::Delete");
- std::error_code Ec;
ExtendablePathBuilder<256> Path;
for (const auto& Entry : ExpiredStandaloneEntries)
{
- const IoHash& Key = Entry.Key;
- const DiskLocation& Loc = Entry.Location;
+ const IoHash& Key = Entry.Key;
Path.Reset();
BuildPath(Path, Key);
fs::path FilePath = Path.ToPath();
{
- RwLock::SharedLockScope __(m_IndexLock);
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
Stopwatch Timer;
const auto ____ = MakeGuard([&] {
uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
@@ -1649,47 +1727,21 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8());
continue;
}
- __.ReleaseNow();
+ IndexLock.ReleaseNow();
RwLock::ExclusiveLockScope ValueLock(LockForHash(Key));
if (fs::is_regular_file(FilePath))
{
ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
+ std::error_code Ec;
fs::remove(FilePath, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
+ continue;
+ }
}
}
-
- if (Ec)
- {
- ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
- Ec.clear();
- DiskLocation RestoreLocation = Loc;
- RestoreLocation.Flags &= ~DiskLocation::kTombStone;
-
- RwLock::ExclusiveLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- if (m_Index.contains(Key))
- {
- continue;
- }
- m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation});
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- if (m_EnableReferenceCaching)
- {
- m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
- }
- m_Index.insert({Key, EntryIndex});
- m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed);
- DeletedChunks.erase(Key);
- continue;
- }
DeletedSize += Entry.Location.Size();
}
}
@@ -1712,7 +1764,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
continue;
}
- BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment);
+ BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment);
size_t ChunkIndex = ChunkLocations.size();
ChunkLocations.push_back(Location);
ChunkIndexToChunkHash[ChunkIndex] = Entry.first;
@@ -1729,13 +1781,14 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
if (!PerformDelete)
{
- m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
- uint64_t CurrentTotalSize = TotalSize();
- ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}",
+ m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, true);
+ GcStorageSize CurrentTotalSize = StorageSize();
+ ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} ({}/{})",
m_BucketDir,
DeleteCount,
TotalChunkCount,
- NiceBytes(CurrentTotalSize));
+ NiceBytes(CurrentTotalSize.DiskSize),
+ NiceBytes(CurrentTotalSize.MemorySize));
return;
}
@@ -1743,7 +1796,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
BlockStoreState,
ChunkLocations,
KeepChunkIndexes,
- m_PayloadAlignment,
+ m_Configuration.PayloadAlignment,
false,
[&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
std::vector<DiskIndexEntry> LogEntries;
@@ -1768,7 +1821,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
// Entry has been updated while GC was running, ignore the move
continue;
}
- Payload.Location = DiskLocation(NewLocation, m_PayloadAlignment, Payload.Location.GetFlags());
+ Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags());
LogEntries.push_back({.Key = ChunkHash, .Location = Payload.Location});
}
for (const size_t ChunkIndex : RemovedChunks)
@@ -1783,9 +1836,16 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
}
const DiskLocation& OldDiskLocation = Payload.Location;
LogEntries.push_back({.Key = ChunkHash,
- .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment),
- m_PayloadAlignment,
+ .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment),
+ m_Configuration.PayloadAlignment,
OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
+ if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[PayloadIndex])
+ {
+ uint64_t CachePayloadSize = m_CachedPayloads[PayloadIndex].Size();
+ m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
+ CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
+ m_CachedPayloads[PayloadIndex] = IoBuffer{};
+ }
m_Index.erase(ChunkHash);
DeletedChunks.insert(ChunkHash);
}
@@ -1797,33 +1857,20 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
[&]() { return GcCtx.ClaimGCReserve(); });
}
-void
-ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes)
-{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::UpdateAccessTimes");
-
- using namespace access_tracking;
-
- for (const KeyAccessTime& KeyTime : AccessTimes)
- {
- if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_AccessTimes[EntryIndex] = KeyTime.LastAccess;
- }
- }
-}
-
ZenCacheDiskLayer::BucketStats
ZenCacheDiskLayer::CacheBucket::Stats()
{
- return ZenCacheDiskLayer::BucketStats{.TotalSize = TotalSize(),
- .HitCount = m_HitCount,
- .MissCount = m_MissCount,
- .WriteCount = m_WriteCount,
- .PutOps = m_PutOps.Snapshot(),
- .GetOps = m_GetOps.Snapshot()};
+ GcStorageSize Size = StorageSize();
+ return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize,
+ .MemorySize = Size.MemorySize,
+ .DiskHitCount = m_DiskHitCount,
+ .DiskMissCount = m_DiskMissCount,
+ .DiskWriteCount = m_DiskWriteCount,
+ .MemoryHitCount = m_MemoryHitCount,
+ .MemoryMissCount = m_MemoryMissCount,
+ .MemoryWriteCount = m_MemoryWriteCount,
+ .PutOps = m_PutOps.Snapshot(),
+ .GetOps = m_GetOps.Snapshot()};
}
uint64_t
@@ -1907,27 +1954,16 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
}
for (CacheBucket* Bucket : Buckets)
{
- Bucket->CollectGarbage(GcCtx);
- }
-}
-
-void
-ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- for (const auto& Kv : AccessTimes.Buckets)
- {
- if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end())
- {
- CacheBucket& Bucket = *It->second;
- Bucket.UpdateAccessTimes(Kv.second);
- }
+ Bucket->CollectGarbage(GcCtx, m_TotalMemCachedSize);
}
+ MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
}
void
-ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ std::atomic_uint64_t& CacheMemoryUsage)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue");
@@ -1942,7 +1978,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir));
}
- bool CleanUpTempFile = false;
+ bool CleanUpTempFile = true;
auto __ = MakeGuard([&] {
if (CleanUpTempFile)
{
@@ -2042,13 +2078,19 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
DiskLocation Loc(NewFileSize, EntryFlags);
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ ValueLock.ReleaseNow();
+
if (auto It = m_Index.find(HashKey); It == m_Index.end())
{
// Previously unknown object
size_t EntryIndex = m_Payloads.size();
m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
m_AccessTimes.emplace_back(GcClock::TickCount());
- if (m_EnableReferenceCaching)
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_CachedPayloads.emplace_back(IoBuffer{});
+ }
+ if (m_Configuration.EnableReferenceCaching)
{
m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
SetReferences(IndexLock, m_FirstReferenceIndex.back(), References);
@@ -2057,25 +2099,38 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
}
else
{
- // TODO: should check if write is idempotent and bail out if it is?
size_t EntryIndex = It.value();
ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
BucketPayload& Payload = m_Payloads[EntryIndex];
+ uint64_t OldSize = Payload.Location.Size();
Payload = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash};
- if (m_EnableReferenceCaching)
+ if (m_Configuration.EnableReferenceCaching)
{
SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References);
}
m_AccessTimes[EntryIndex] = GcClock::TickCount();
- m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed);
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ if (m_CachedPayloads[EntryIndex])
+ {
+ uint64_t CachePayloadSize = m_CachedPayloads[EntryIndex].Size();
+ m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
+ CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
+ m_CachedPayloads[EntryIndex] = IoBuffer{};
+ }
+ }
+ m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed);
}
m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed);
+ m_StandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed);
}
void
-ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ std::atomic_uint64_t& CacheMemoryUsage)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue");
@@ -2090,38 +2145,73 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
EntryFlags |= DiskLocation::kCompressed;
}
- m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) {
- DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
- m_SlogFile.Append({.Key = HashKey, .Location = Location});
+ uint64_t PayloadSize = Value.Value.GetSize();
+ const bool MemCacheEnabled = (m_Configuration.MemCacheSizeThreshold > 0);
+ IoBuffer MemCacheBuffer = (MemCacheEnabled && (PayloadSize <= m_Configuration.MemCacheSizeThreshold))
+ ? IoBufferBuilder::ReadFromFileMaybe(Value.Value)
+ : IoBuffer{};
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- if (auto It = m_Index.find(HashKey); It != m_Index.end())
- {
- // TODO: should check if write is idempotent and bail out if it is?
- // this would requiring comparing contents on disk unless we add a
- // content hash to the index entry
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
- if (m_EnableReferenceCaching)
+ m_BlockStore.WriteChunk(
+ Value.Value.Data(),
+ Value.Value.Size(),
+ m_Configuration.PayloadAlignment,
+ [&](const BlockStoreLocation& BlockStoreLocation) {
+ DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags);
+ m_SlogFile.Append({.Key = HashKey, .Location = Location});
+
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
{
- SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References);
+ size_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
+ m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+
+ if (MemCacheEnabled)
+ {
+ if (m_CachedPayloads[EntryIndex])
+ {
+ uint64_t OldCachedSize = m_CachedPayloads[EntryIndex].GetSize();
+ m_MemCachedSize.fetch_sub(OldCachedSize);
+ CacheMemoryUsage.fetch_sub(OldCachedSize);
+ }
+
+ if (MemCacheBuffer)
+ {
+ m_MemCachedSize.fetch_add(PayloadSize);
+ CacheMemoryUsage.fetch_add(PayloadSize);
+ m_MemoryWriteCount++;
+ }
+ m_CachedPayloads[EntryIndex] = std::move(MemCacheBuffer);
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References);
+ }
}
- }
- else
- {
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- if (m_EnableReferenceCaching)
+ else
{
- m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
- SetReferences(IndexLock, m_FirstReferenceIndex.back(), References);
+ size_t EntryIndex = m_Payloads.size();
+ m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ if (MemCacheEnabled)
+ {
+ if (MemCacheBuffer)
+ {
+ m_MemCachedSize.fetch_add(PayloadSize);
+ CacheMemoryUsage.fetch_add(PayloadSize);
+ m_MemoryWriteCount++;
+ }
+ m_CachedPayloads.emplace_back(std::move(MemCacheBuffer));
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
+ SetReferences(IndexLock, m_FirstReferenceIndex.back(), References);
+ }
+ m_Index.insert_or_assign(HashKey, EntryIndex);
}
- m_Index.insert_or_assign(HashKey, EntryIndex);
- }
- });
+ });
}
void
@@ -2286,11 +2376,58 @@ ZenCacheDiskLayer::CacheBucket::LockedGetReferences(std::size_t FirstReferenceIn
return true;
}
+void
+ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloads,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<IoBuffer>& CachedPayloads,
+ std::vector<size_t>& FirstReferenceIndex,
+ IndexMap& Index,
+ RwLock::ExclusiveLockScope& IndexLock)
+{
+ size_t EntryCount = m_Index.size();
+ Payloads.reserve(EntryCount);
+ AccessTimes.reserve(EntryCount);
+ CachedPayloads.reserve(EntryCount);
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ FirstReferenceIndex.reserve(EntryCount);
+ }
+ Index.reserve(EntryCount);
+ for (auto It : m_Index)
+ {
+ size_t EntryIndex = Payloads.size();
+ Payloads.push_back(m_Payloads[It.second]);
+ AccessTimes.push_back(m_AccessTimes[It.second]);
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ CachedPayloads.push_back(std::move(m_CachedPayloads[It.second]));
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]);
+ }
+ Index.insert({It.first, EntryIndex});
+ }
+ m_Index.swap(Index);
+ m_Payloads.swap(Payloads);
+ m_AccessTimes.swap(AccessTimes);
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_CachedPayloads.swap(CachedPayloads);
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.swap(FirstReferenceIndex);
+ CompactReferences(IndexLock);
+ }
+}
+
//////////////////////////////////////////////////////////////////////////
-ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir, bool EnableReferenceCaching)
-: m_RootDir(RootDir)
-, m_EnableReferenceCaching(EnableReferenceCaching)
+ZenCacheDiskLayer::ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config)
+: m_JobQueue(JobQueue)
+, m_RootDir(RootDir)
+, m_Configuration(Config)
{
}
@@ -2327,7 +2464,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
else
{
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching));
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig));
Bucket = InsertResult.first->second.get();
std::filesystem::path BucketPath = m_RootDir;
@@ -2342,7 +2479,12 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
ZEN_ASSERT(Bucket != nullptr);
- return Bucket->Get(HashKey, OutValue);
+ if (Bucket->Get(HashKey, OutValue, m_TotalMemCachedSize))
+ {
+ TryMemCacheTrim();
+ return true;
+ }
+ return false;
}
void
@@ -2376,7 +2518,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
}
else
{
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching));
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig));
Bucket = InsertResult.first->second.get();
std::filesystem::path BucketPath = m_RootDir;
@@ -2401,7 +2543,8 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
ZEN_ASSERT(Bucket != nullptr);
- Bucket->Put(HashKey, Value, References);
+ Bucket->Put(HashKey, Value, References, m_TotalMemCachedSize);
+ TryMemCacheTrim();
}
void
@@ -2432,7 +2575,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
continue;
}
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching));
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig));
CacheBucket& Bucket = *InsertResult.first->second;
try
@@ -2489,7 +2632,7 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
m_DroppedBuckets.push_back(std::move(It->second));
m_Buckets.erase(It);
- return Bucket.Drop();
+ return Bucket.Drop(m_TotalMemCachedSize);
}
// Make sure we remove the folder even if we don't know about the bucket
@@ -2511,7 +2654,7 @@ ZenCacheDiskLayer::Drop()
CacheBucket& Bucket = *It->second;
m_DroppedBuckets.push_back(std::move(It->second));
m_Buckets.erase(It->first);
- if (!Bucket.Drop())
+ if (!Bucket.Drop(m_TotalMemCachedSize))
{
return false;
}
@@ -2552,11 +2695,11 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
for (auto& Kv : m_Buckets)
{
#if 1
- Results.push_back(
- Ctx.ThreadPool().EnqueueTask(std::packaged_task<void()>{[Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
+ Results.push_back(Ctx.ThreadPool().EnqueueTask(
+ std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx, m_TotalMemCachedSize); }}));
#else
CacheBucket& Bucket = *Kv.second;
- Bucket.ScrubStorage(Ctx);
+ Bucket.ScrubStorage(Ctx, m_TotalMemCachedSize);
#endif
}
@@ -2587,24 +2730,27 @@ ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx)
}
}
-uint64_t
-ZenCacheDiskLayer::TotalSize() const
+GcStorageSize
+ZenCacheDiskLayer::StorageSize() const
{
- uint64_t TotalSize{};
- RwLock::SharedLockScope _(m_Lock);
+ GcStorageSize StorageSize{};
+ RwLock::SharedLockScope _(m_Lock);
for (auto& Kv : m_Buckets)
{
- TotalSize += Kv.second->TotalSize();
+ GcStorageSize BucketSize = Kv.second->StorageSize();
+ StorageSize.DiskSize += BucketSize.DiskSize;
+ StorageSize.MemorySize += BucketSize.MemorySize;
}
- return TotalSize;
+ return StorageSize;
}
ZenCacheDiskLayer::DiskStats
ZenCacheDiskLayer::Stats() const
{
- ZenCacheDiskLayer::DiskStats Stats = {};
+ GcStorageSize Size = StorageSize();
+ ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize};
{
RwLock::SharedLockScope _(m_Lock);
Stats.BucketStats.reserve(m_Buckets.size());
@@ -2619,8 +2765,7 @@ ZenCacheDiskLayer::Stats() const
ZenCacheDiskLayer::Info
ZenCacheDiskLayer::GetInfo() const
{
- ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir, .EnableReferenceCaching = m_EnableReferenceCaching},
- .TotalSize = TotalSize()};
+ ZenCacheDiskLayer::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration};
{
RwLock::SharedLockScope _(m_Lock);
Info.BucketNames.reserve(m_Buckets.size());
@@ -2628,6 +2773,9 @@ ZenCacheDiskLayer::GetInfo() const
{
Info.BucketNames.push_back(Kv.first);
Info.EntryCount += Kv.second->EntryCount();
+ GcStorageSize BucketSize = Kv.second->StorageSize();
+ Info.StorageSize.DiskSize += BucketSize.DiskSize;
+ Info.StorageSize.MemorySize += BucketSize.MemorySize;
}
}
return Info;
@@ -2640,7 +2788,7 @@ ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const
if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
{
- return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()};
+ return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()};
}
return {};
}
@@ -2677,4 +2825,100 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st
return Details;
}
+void
+ZenCacheDiskLayer::MemCacheTrim()
+{
+ ZEN_TRACE_CPU("Z$::Disk::MemCacheTrim");
+
+ ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0);
+
+ const GcClock::TimePoint Now = GcClock::Now();
+
+ const GcClock::Tick NowTick = Now.time_since_epoch().count();
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
+ GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim;
+ const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count();
+ if (NowTick < NextAllowedTrimTick)
+ {
+ return;
+ }
+
+ bool Expected = false;
+ if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true))
+ {
+ return;
+ }
+
+ // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong
+ const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_LastTickMemCacheTrim.store(NextTrimTick);
+
+ m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this, Now, TrimInterval](JobContext&) {
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]");
+
+ uint64_t StartSize = m_TotalMemCachedSize.load();
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([&] {
+ uint64_t EndSize = m_TotalMemCachedSize.load();
+ ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
+ NiceBytes(StartSize > EndSize ? StartSize - EndSize : 0),
+ NiceBytes(m_TotalMemCachedSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ m_IsMemCacheTrimming.store(false);
+ });
+
+ const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
+
+ std::vector<uint64_t> UsageSlots;
+ UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count());
+
+ std::vector<CacheBucket*> Buckets;
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ RwLock::SharedLockScope _(m_Lock);
+ Buckets.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Buckets.push_back(Kv.second.get());
+ }
+ }
+ for (CacheBucket* Bucket : Buckets)
+ {
+ Bucket->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots);
+ }
+
+ uint64_t TotalSize = 0;
+ for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
+ {
+ TotalSize += UsageSlots[Index];
+ if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes)
+ {
+ GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index);
+ MemCacheTrim(Buckets, ExpireTime);
+ break;
+ }
+ }
+ });
+}
+
+void
+ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime)
+{
+ if (m_Configuration.MemCacheTargetFootprintBytes == 0)
+ {
+ return;
+ }
+ RwLock::SharedLockScope __(m_Lock);
+ for (CacheBucket* Bucket : Buckets)
+ {
+ Bucket->MemCacheTrim(ExpireTime, m_TotalMemCachedSize);
+ }
+ const GcClock::TimePoint Now = GcClock::Now();
+ const GcClock::Tick NowTick = Now.time_since_epoch().count();
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
+ GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim;
+ const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
+}
+
} // namespace zen