aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-10-24 14:54:26 +0200
committerGitHub <[email protected]>2023-10-24 14:54:26 +0200
commit1a144212278aa7158d6b32b63e398db95a7ae868 (patch)
tree58735827b0b706368a82bcaaa8aaa68f211e1d10 /src/zenserver/cache
parentchunking moved to zenstore (#490) (diff)
downloadzen-1a144212278aa7158d6b32b63e398db95a7ae868.tar.xz
zen-1a144212278aa7158d6b32b63e398db95a7ae868.zip
merge disk and memory layers (#493)
- Feature: Added `--cache-memlayer-sizethreshold` option to zenserver to control at which size cache entries get cached in memory - Changed: Merged cache memory layer with cache disk layer to reduce memory and cpu overhead
Diffstat (limited to 'src/zenserver/cache')
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp826
-rw-r--r--src/zenserver/cache/cachedisklayer.h136
-rw-r--r--src/zenserver/cache/cachememorylayer.cpp521
-rw-r--r--src/zenserver/cache/cachememorylayer.h124
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp82
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp165
-rw-r--r--src/zenserver/cache/structuredcachestore.h52
7 files changed, 785 insertions, 1121 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 73c02bcc2..f755436e0 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -7,6 +7,7 @@
#include <zencore/compactbinaryvalidation.h>
#include <zencore/compress.h>
#include <zencore/fmtutils.h>
+#include <zencore/jobqueue.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/trace.h>
@@ -167,16 +168,16 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
const size_t ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex;
const size_t ZenCacheDiskLayer::CacheBucket::NoReferencesIndex;
-ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, bool EnableReferenceCaching)
+ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, const BucketConfiguration& Config)
: m_BucketName(std::move(BucketName))
+, m_Configuration(Config)
, m_BucketId(Oid::Zero)
-, m_EnableReferenceCaching(EnableReferenceCaching)
{
if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap")))
{
// This is pretty ad hoc but in order to avoid too many individual files
// it makes sense to have a different strategy for legacy values
- m_LargeObjectThreshold = 16 * 1024 * 1024;
+ m_Configuration.LargeObjectThreshold = 16 * 1024 * 1024;
}
}
@@ -397,7 +398,7 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
CacheBucketIndexHeader Header = {.EntryCount = Entries.size(),
.LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
@@ -470,7 +471,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- m_PayloadAlignment = Header.PayloadAlignment;
+ m_Configuration.PayloadAlignment = Header.PayloadAlignment;
std::vector<DiskIndexEntry> Entries;
Entries.resize(Header.EntryCount);
@@ -479,11 +480,6 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
sizeof(CacheBucketIndexHeader));
m_Payloads.reserve(Header.EntryCount);
- if (m_EnableReferenceCaching)
- {
- m_FirstReferenceIndex.reserve(Header.EntryCount);
- }
- m_AccessTimes.reserve(Header.EntryCount);
m_Index.reserve(Header.EntryCount);
std::string InvalidEntryReason;
@@ -496,14 +492,18 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
}
size_t EntryIndex = m_Payloads.size();
m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- if (m_EnableReferenceCaching)
- {
- m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
- }
m_Index.insert_or_assign(Entry.Key, EntryIndex);
EntryCount++;
}
+ m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_CachedPayloads.resize(m_Payloads.size());
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.resize(m_Payloads.size(), UnknownReferencesIndex);
+ }
OutVersion = CacheBucketIndexHeader::Version2;
return Header.LogPosition;
}
@@ -540,8 +540,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui
ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
SkipEntryCount = 0;
}
- LogEntryCount = EntryCount - SkipEntryCount;
- m_Index.reserve(LogEntryCount);
+ LogEntryCount = EntryCount - SkipEntryCount;
uint64_t InvalidEntryCount = 0;
CasLog.Replay(
[&](const DiskIndexEntry& Record) {
@@ -559,14 +558,18 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui
}
size_t EntryIndex = m_Payloads.size();
m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- if (m_EnableReferenceCaching)
- {
- m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
- }
m_Index.insert_or_assign(Record.Key, EntryIndex);
},
SkipEntryCount);
+ m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_CachedPayloads.resize(m_Payloads.size());
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.resize(m_Payloads.size(), UnknownReferencesIndex);
+ }
if (InvalidEntryCount)
{
ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir);
@@ -582,11 +585,12 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog");
- m_TotalStandaloneSize = 0;
+ m_StandaloneSize = 0;
m_Index.clear();
m_Payloads.clear();
m_AccessTimes.clear();
+ m_CachedPayloads.clear();
m_FirstReferenceIndex.clear();
m_ReferenceHashes.clear();
m_NextReferenceHashesIndexes.clear();
@@ -604,7 +608,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
CreateDirectories(m_BucketDir);
- m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1);
+ m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1);
if (std::filesystem::is_regular_file(IndexPath))
{
@@ -643,10 +647,10 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
{
- m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
+ m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
continue;
}
- const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment);
+ const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment);
KnownLocations.push_back(BlockLocation);
}
@@ -681,7 +685,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::GetInlineCacheValue");
- BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment);
+ BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment);
IoBuffer Value = m_BlockStore.TryGetChunk(Location);
if (Value)
@@ -713,88 +717,190 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy
}
bool
-ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
+ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage)
{
metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
- RwLock::SharedLockScope _(m_IndexLock);
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
auto It = m_Index.find(HashKey);
if (It == m_Index.end())
{
- m_MissCount++;
+ m_DiskMissCount++;
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
return false;
}
- size_t EntryIndex = It.value();
- const BucketPayload& Payload = m_Payloads[EntryIndex];
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
- DiskLocation Location = Payload.Location;
- OutValue.RawSize = Payload.RawSize;
- OutValue.RawHash = Payload.RawHash;
- if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+
+ size_t EntryIndex = It.value();
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ DiskLocation Location = m_Payloads[EntryIndex].Location;
+ bool FillRawHashAndRawSize = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0);
+ if (FillRawHashAndRawSize)
{
- // We don't need to hold the index lock when we read a standalone file
- _.ReleaseNow();
- OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey);
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ if (Payload.RawHash != IoHash::Zero || Payload.RawSize != 0)
+ {
+ OutValue.RawHash = Payload.RawHash;
+ OutValue.RawSize = Payload.RawSize;
+ FillRawHashAndRawSize = false;
+ }
}
- else
+
+ if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[EntryIndex])
{
- OutValue.Value = GetInlineCacheValue(Location);
+ OutValue.Value = m_CachedPayloads[EntryIndex];
+ IndexLock.ReleaseNow();
+ m_MemoryHitCount++;
}
- _.ReleaseNow();
-
- if (!Location.IsFlagSet(DiskLocation::kStructured))
+ else
{
- if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0)
+ IndexLock.ReleaseNow();
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey);
+ }
+ else
{
- if (Location.IsFlagSet(DiskLocation::kCompressed))
+ OutValue.Value = GetInlineCacheValue(Location);
+ if (m_Configuration.MemCacheSizeThreshold > 0)
{
- (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize);
+ size_t ValueSize = OutValue.Value.GetSize();
+ if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
+ {
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MemCache");
+ OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end())
+ {
+ // Only update if it has not already been updated by other thread
+ if (!m_CachedPayloads[UpdateIt->second])
+ {
+ m_CachedPayloads[UpdateIt->second] = OutValue.Value;
+ m_MemCachedSize.fetch_add(ValueSize);
+ CacheMemoryUsage.fetch_add(ValueSize);
+ m_MemoryWriteCount++;
+ }
+ }
+ }
}
- else
+ }
+ }
+
+ if (FillRawHashAndRawSize)
+ {
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MetaData");
+ if (Location.IsFlagSet(DiskLocation::kCompressed))
+ {
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
{
- OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
- OutValue.RawSize = OutValue.Value.GetSize();
+ OutValue = ZenCacheValue{};
+ m_DiskMissCount++;
+ return false;
}
- RwLock::ExclusiveLockScope __(m_IndexLock);
- if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end())
+ }
+ else
+ {
+ OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
+ OutValue.RawSize = OutValue.Value.GetSize();
+ }
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end())
+ {
+ BucketPayload& WritePayload = m_Payloads[WriteIt.value()];
+ if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0)
{
- BucketPayload& WritePayload = m_Payloads[WriteIt.value()];
- WritePayload.RawHash = OutValue.RawHash;
- WritePayload.RawSize = OutValue.RawSize;
-
- m_LogFlushPosition = 0; // Force resave of index on exit
+ WritePayload.RawHash = OutValue.RawHash;
+ WritePayload.RawSize = OutValue.RawSize;
}
}
}
if (OutValue.Value)
{
- m_HitCount++;
+ m_DiskHitCount++;
StatsScope.SetBytes(OutValue.Value.GetSize());
return true;
}
else
{
- m_MissCount++;
+ m_DiskMissCount++;
return false;
}
}
void
-ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ std::atomic_uint64_t& CacheMemoryUsage)
{
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
- if (Value.Value.Size() >= m_LargeObjectThreshold)
+ if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
{
- return PutStandaloneCacheValue(HashKey, Value, References);
+ PutStandaloneCacheValue(HashKey, Value, References, CacheMemoryUsage);
}
- PutInlineCacheValue(HashKey, Value, References);
+ else
+ {
+ PutInlineCacheValue(HashKey, Value, References, CacheMemoryUsage);
+ }
+
+ m_DiskWriteCount++;
+}
- m_WriteCount++;
+void
+ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage)
+{
+ GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
+
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ for (const auto& Kv : m_Index)
+ {
+ if (m_AccessTimes[Kv.second] < ExpireTicks)
+ {
+ size_t PayloadSize = m_CachedPayloads[Kv.second].GetSize();
+ m_MemCachedSize.fetch_sub(PayloadSize);
+ CacheMemoryUsage.fetch_sub(PayloadSize);
+ m_CachedPayloads[Kv.second] = {};
+ }
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart,
+ GcClock::Duration SectionLength,
+ std::vector<uint64_t>& InOutUsageSlots)
+{
+ RwLock::SharedLockScope _(m_IndexLock);
+ for (const auto& It : m_Index)
+ {
+ size_t Index = It.second;
+ if (!m_CachedPayloads[Index])
+ {
+ continue;
+ }
+ GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index]));
+ GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch();
+ uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0);
+ if (Slot >= InOutUsageSlots.capacity())
+ {
+ Slot = InOutUsageSlots.capacity() - 1;
+ }
+ if (Slot > InOutUsageSlots.size())
+ {
+ InOutUsageSlots.resize(uint64_t(Slot + 1), 0);
+ }
+ InOutUsageSlots[Slot] += m_CachedPayloads[Index].GetSize();
+ }
}
bool
-ZenCacheDiskLayer::CacheBucket::Drop()
+ZenCacheDiskLayer::CacheBucket::Drop(std::atomic_uint64_t& CacheMemoryUsage)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop");
@@ -814,10 +920,14 @@ ZenCacheDiskLayer::CacheBucket::Drop()
m_Index.clear();
m_Payloads.clear();
m_AccessTimes.clear();
+ m_CachedPayloads.clear();
m_FirstReferenceIndex.clear();
m_ReferenceHashes.clear();
m_NextReferenceHashesIndexes.clear();
m_ReferenceCount = 0;
+ m_StandaloneSize.store(0);
+ CacheMemoryUsage.fetch_sub(m_MemCachedSize.load());
+ m_MemCachedSize.store(0);
return Deleted;
}
@@ -992,7 +1102,7 @@ ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer)
};
void
-ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
+ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub");
@@ -1083,7 +1193,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
}
else
{
- ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment));
+ ChunkLocations.emplace_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment));
ChunkIndexToChunkHash.push_back(HashKey);
continue;
}
@@ -1169,11 +1279,24 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
// Log a tombstone and delete the in-memory index for the bad entry
const auto It = m_Index.find(BadKey);
BucketPayload& Payload = m_Payloads[It->second];
- if (m_EnableReferenceCaching)
+ if (m_Configuration.EnableReferenceCaching)
{
RemoveReferences(IndexLock, m_FirstReferenceIndex[It->second]);
}
DiskLocation Location = Payload.Location;
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed);
+ }
+
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ size_t CachedSize = m_CachedPayloads[It->second].GetSize();
+ m_MemCachedSize.fetch_sub(CachedSize);
+ CacheMemoryUsage.fetch_sub(CachedSize);
+ m_CachedPayloads[It->second] = IoBuffer{};
+ }
+
Location.Flags |= DiskLocation::kTombStone;
LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
m_Index.erase(BadKey);
@@ -1193,7 +1316,6 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
std::error_code Ec;
fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file...
}
- m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
}
}
m_SlogFile.Append(LogEntries);
@@ -1202,38 +1324,13 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
{
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
+ std::vector<IoBuffer> CachedPayloads;
std::vector<size_t> FirstReferenceIndex;
IndexMap Index;
{
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- size_t EntryCount = m_Index.size();
- Payloads.reserve(EntryCount);
- AccessTimes.reserve(EntryCount);
- if (m_EnableReferenceCaching)
- {
- FirstReferenceIndex.reserve(EntryCount);
- }
- Index.reserve(EntryCount);
- for (auto It : m_Index)
- {
- size_t EntryIndex = Payloads.size();
- Payloads.push_back(m_Payloads[It.second]);
- AccessTimes.push_back(m_AccessTimes[It.second]);
- if (m_EnableReferenceCaching)
- {
- FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]);
- }
- Index.insert({It.first, EntryIndex});
- }
- m_Index.swap(Index);
- m_Payloads.swap(Payloads);
- m_AccessTimes.swap(AccessTimes);
- if (m_EnableReferenceCaching)
- {
- m_FirstReferenceIndex.swap(FirstReferenceIndex);
- CompactReferences(IndexLock);
- }
+ CompactState(Payloads, AccessTimes, CachedPayloads, FirstReferenceIndex, Index, IndexLock);
}
}
}
@@ -1334,7 +1431,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
continue;
}
- if (m_EnableReferenceCaching)
+ if (m_Configuration.EnableReferenceCaching)
{
if (FirstReferenceIndex.empty() || (FirstReferenceIndex[Entry.second] == UnknownReferencesIndex))
{
@@ -1397,8 +1494,17 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
#endif // CALCULATE_BLOCKING_TIME
if (auto It = m_Index.find(Entry.first); It != m_Index.end())
{
- BucketPayload& UpdatePayload = m_Payloads[It->second];
- Buffer = GetInlineCacheValue(UpdatePayload.Location);
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ Buffer = m_CachedPayloads[It->second];
+ }
+ if (!Buffer)
+ {
+ DiskLocation Location = m_Payloads[It->second].Location;
+ IndexLock.ReleaseNow();
+ Buffer = GetInlineCacheValue(Location);
+ // Don't memcache items when doing GC
+ }
}
if (!Buffer)
{
@@ -1411,7 +1517,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
CbObject Obj(SharedBuffer{Buffer});
size_t CurrentCidCount = Cids.size();
Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
- if (m_EnableReferenceCaching)
+ if (m_Configuration.EnableReferenceCaching)
{
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
#if CALCULATE_BLOCKING_TIME
@@ -1450,20 +1556,20 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
}
void
-ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
+ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage");
ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir);
- Stopwatch TotalTimer;
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
- uint64_t TotalChunkCount = 0;
- uint64_t DeletedSize = 0;
- uint64_t OldTotalSize = TotalSize();
+ Stopwatch TotalTimer;
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = 0;
+ uint64_t DeletedSize = 0;
+ GcStorageSize OldTotalSize = StorageSize();
std::unordered_set<IoHash> DeletedChunks;
uint64_t MovedCount = 0;
@@ -1473,7 +1579,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
"garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved "
"{} "
"of {} "
- "entires ({}).",
+ "entries ({}/{}).",
m_BucketDir,
NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
NiceLatencyNs(WriteBlockTimeUs),
@@ -1484,7 +1590,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
DeletedChunks.size(),
MovedCount,
TotalChunkCount,
- NiceBytes(OldTotalSize));
+ NiceBytes(OldTotalSize.DiskSize),
+ NiceBytes(OldTotalSize.MemorySize));
bool Expected = false;
if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
@@ -1514,45 +1621,18 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
// Clean up m_AccessTimes and m_Payloads vectors
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
+ std::vector<IoBuffer> CachedPayloads;
std::vector<size_t> FirstReferenceIndex;
IndexMap Index;
-
{
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- size_t EntryCount = m_Index.size();
- Payloads.reserve(EntryCount);
- AccessTimes.reserve(EntryCount);
- if (m_EnableReferenceCaching)
- {
- FirstReferenceIndex.reserve(EntryCount);
- }
- Index.reserve(EntryCount);
- for (auto It : m_Index)
- {
- size_t OldEntryIndex = It.second;
- size_t NewEntryIndex = Payloads.size();
- Payloads.push_back(m_Payloads[OldEntryIndex]);
- AccessTimes.push_back(m_AccessTimes[OldEntryIndex]);
- if (m_EnableReferenceCaching)
- {
- FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]);
- }
- Index.insert({It.first, NewEntryIndex});
- }
- m_Index.swap(Index);
- m_Payloads.swap(Payloads);
- m_AccessTimes.swap(AccessTimes);
- if (m_EnableReferenceCaching)
- {
- m_FirstReferenceIndex.swap(FirstReferenceIndex);
- CompactReferences(IndexLock);
- }
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ CompactState(Payloads, AccessTimes, CachedPayloads, FirstReferenceIndex, Index, IndexLock);
}
GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end()));
}
@@ -1610,7 +1690,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
for (const auto& Entry : ExpiredStandaloneEntries)
{
m_Index.erase(Entry.Key);
- m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ m_StandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
DeletedChunks.insert(Entry.Key);
}
m_SlogFile.Append(ExpiredStandaloneEntries);
@@ -1623,20 +1703,18 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::Delete");
- std::error_code Ec;
ExtendablePathBuilder<256> Path;
for (const auto& Entry : ExpiredStandaloneEntries)
{
- const IoHash& Key = Entry.Key;
- const DiskLocation& Loc = Entry.Location;
+ const IoHash& Key = Entry.Key;
Path.Reset();
BuildPath(Path, Key);
fs::path FilePath = Path.ToPath();
{
- RwLock::SharedLockScope __(m_IndexLock);
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
Stopwatch Timer;
const auto ____ = MakeGuard([&] {
uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
@@ -1649,47 +1727,21 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8());
continue;
}
- __.ReleaseNow();
+ IndexLock.ReleaseNow();
RwLock::ExclusiveLockScope ValueLock(LockForHash(Key));
if (fs::is_regular_file(FilePath))
{
ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
+ std::error_code Ec;
fs::remove(FilePath, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
+ continue;
+ }
}
}
-
- if (Ec)
- {
- ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
- Ec.clear();
- DiskLocation RestoreLocation = Loc;
- RestoreLocation.Flags &= ~DiskLocation::kTombStone;
-
- RwLock::ExclusiveLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- if (m_Index.contains(Key))
- {
- continue;
- }
- m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation});
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- if (m_EnableReferenceCaching)
- {
- m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
- }
- m_Index.insert({Key, EntryIndex});
- m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed);
- DeletedChunks.erase(Key);
- continue;
- }
DeletedSize += Entry.Location.Size();
}
}
@@ -1712,7 +1764,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
continue;
}
- BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment);
+ BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment);
size_t ChunkIndex = ChunkLocations.size();
ChunkLocations.push_back(Location);
ChunkIndexToChunkHash[ChunkIndex] = Entry.first;
@@ -1729,13 +1781,14 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
if (!PerformDelete)
{
- m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
- uint64_t CurrentTotalSize = TotalSize();
- ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}",
+ m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, true);
+ GcStorageSize CurrentTotalSize = StorageSize();
+ ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} ({}/{})",
m_BucketDir,
DeleteCount,
TotalChunkCount,
- NiceBytes(CurrentTotalSize));
+ NiceBytes(CurrentTotalSize.DiskSize),
+ NiceBytes(CurrentTotalSize.MemorySize));
return;
}
@@ -1743,7 +1796,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
BlockStoreState,
ChunkLocations,
KeepChunkIndexes,
- m_PayloadAlignment,
+ m_Configuration.PayloadAlignment,
false,
[&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
std::vector<DiskIndexEntry> LogEntries;
@@ -1768,7 +1821,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
// Entry has been updated while GC was running, ignore the move
continue;
}
- Payload.Location = DiskLocation(NewLocation, m_PayloadAlignment, Payload.Location.GetFlags());
+ Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags());
LogEntries.push_back({.Key = ChunkHash, .Location = Payload.Location});
}
for (const size_t ChunkIndex : RemovedChunks)
@@ -1783,9 +1836,16 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
}
const DiskLocation& OldDiskLocation = Payload.Location;
LogEntries.push_back({.Key = ChunkHash,
- .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment),
- m_PayloadAlignment,
+ .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment),
+ m_Configuration.PayloadAlignment,
OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
+ if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[PayloadIndex])
+ {
+ uint64_t CachePayloadSize = m_CachedPayloads[PayloadIndex].Size();
+ m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
+ CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
+ m_CachedPayloads[PayloadIndex] = IoBuffer{};
+ }
m_Index.erase(ChunkHash);
DeletedChunks.insert(ChunkHash);
}
@@ -1797,33 +1857,20 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
[&]() { return GcCtx.ClaimGCReserve(); });
}
-void
-ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes)
-{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::UpdateAccessTimes");
-
- using namespace access_tracking;
-
- for (const KeyAccessTime& KeyTime : AccessTimes)
- {
- if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_AccessTimes[EntryIndex] = KeyTime.LastAccess;
- }
- }
-}
-
ZenCacheDiskLayer::BucketStats
ZenCacheDiskLayer::CacheBucket::Stats()
{
- return ZenCacheDiskLayer::BucketStats{.TotalSize = TotalSize(),
- .HitCount = m_HitCount,
- .MissCount = m_MissCount,
- .WriteCount = m_WriteCount,
- .PutOps = m_PutOps.Snapshot(),
- .GetOps = m_GetOps.Snapshot()};
+ GcStorageSize Size = StorageSize();
+ return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize,
+ .MemorySize = Size.MemorySize,
+ .DiskHitCount = m_DiskHitCount,
+ .DiskMissCount = m_DiskMissCount,
+ .DiskWriteCount = m_DiskWriteCount,
+ .MemoryHitCount = m_MemoryHitCount,
+ .MemoryMissCount = m_MemoryMissCount,
+ .MemoryWriteCount = m_MemoryWriteCount,
+ .PutOps = m_PutOps.Snapshot(),
+ .GetOps = m_GetOps.Snapshot()};
}
uint64_t
@@ -1907,27 +1954,16 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
}
for (CacheBucket* Bucket : Buckets)
{
- Bucket->CollectGarbage(GcCtx);
- }
-}
-
-void
-ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- for (const auto& Kv : AccessTimes.Buckets)
- {
- if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end())
- {
- CacheBucket& Bucket = *It->second;
- Bucket.UpdateAccessTimes(Kv.second);
- }
+ Bucket->CollectGarbage(GcCtx, m_TotalMemCachedSize);
}
+ MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
}
void
-ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ std::atomic_uint64_t& CacheMemoryUsage)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue");
@@ -1942,7 +1978,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir));
}
- bool CleanUpTempFile = false;
+ bool CleanUpTempFile = true;
auto __ = MakeGuard([&] {
if (CleanUpTempFile)
{
@@ -2042,13 +2078,19 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
DiskLocation Loc(NewFileSize, EntryFlags);
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ ValueLock.ReleaseNow();
+
if (auto It = m_Index.find(HashKey); It == m_Index.end())
{
// Previously unknown object
size_t EntryIndex = m_Payloads.size();
m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
m_AccessTimes.emplace_back(GcClock::TickCount());
- if (m_EnableReferenceCaching)
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_CachedPayloads.emplace_back(IoBuffer{});
+ }
+ if (m_Configuration.EnableReferenceCaching)
{
m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
SetReferences(IndexLock, m_FirstReferenceIndex.back(), References);
@@ -2057,25 +2099,38 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
}
else
{
- // TODO: should check if write is idempotent and bail out if it is?
size_t EntryIndex = It.value();
ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
BucketPayload& Payload = m_Payloads[EntryIndex];
+ uint64_t OldSize = Payload.Location.Size();
Payload = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash};
- if (m_EnableReferenceCaching)
+ if (m_Configuration.EnableReferenceCaching)
{
SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References);
}
m_AccessTimes[EntryIndex] = GcClock::TickCount();
- m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed);
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ if (m_CachedPayloads[EntryIndex])
+ {
+ uint64_t CachePayloadSize = m_CachedPayloads[EntryIndex].Size();
+ m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
+ CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed);
+ m_CachedPayloads[EntryIndex] = IoBuffer{};
+ }
+ }
+ m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed);
}
m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed);
+ m_StandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed);
}
void
-ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ std::atomic_uint64_t& CacheMemoryUsage)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue");
@@ -2090,38 +2145,73 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
EntryFlags |= DiskLocation::kCompressed;
}
- m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) {
- DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
- m_SlogFile.Append({.Key = HashKey, .Location = Location});
+ uint64_t PayloadSize = Value.Value.GetSize();
+ const bool MemCacheEnabled = (m_Configuration.MemCacheSizeThreshold > 0);
+ IoBuffer MemCacheBuffer = (MemCacheEnabled && (PayloadSize <= m_Configuration.MemCacheSizeThreshold))
+ ? IoBufferBuilder::ReadFromFileMaybe(Value.Value)
+ : IoBuffer{};
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- if (auto It = m_Index.find(HashKey); It != m_Index.end())
- {
- // TODO: should check if write is idempotent and bail out if it is?
- // this would requiring comparing contents on disk unless we add a
- // content hash to the index entry
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
- if (m_EnableReferenceCaching)
+ m_BlockStore.WriteChunk(
+ Value.Value.Data(),
+ Value.Value.Size(),
+ m_Configuration.PayloadAlignment,
+ [&](const BlockStoreLocation& BlockStoreLocation) {
+ DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags);
+ m_SlogFile.Append({.Key = HashKey, .Location = Location});
+
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
{
- SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References);
+ size_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
+ m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+
+ if (MemCacheEnabled)
+ {
+ if (m_CachedPayloads[EntryIndex])
+ {
+ uint64_t OldCachedSize = m_CachedPayloads[EntryIndex].GetSize();
+ m_MemCachedSize.fetch_sub(OldCachedSize);
+ CacheMemoryUsage.fetch_sub(OldCachedSize);
+ }
+
+ if (MemCacheBuffer)
+ {
+ m_MemCachedSize.fetch_add(PayloadSize);
+ CacheMemoryUsage.fetch_add(PayloadSize);
+ m_MemoryWriteCount++;
+ }
+ m_CachedPayloads[EntryIndex] = std::move(MemCacheBuffer);
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References);
+ }
}
- }
- else
- {
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- if (m_EnableReferenceCaching)
+ else
{
- m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
- SetReferences(IndexLock, m_FirstReferenceIndex.back(), References);
+ size_t EntryIndex = m_Payloads.size();
+ m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ if (MemCacheEnabled)
+ {
+ if (MemCacheBuffer)
+ {
+ m_MemCachedSize.fetch_add(PayloadSize);
+ CacheMemoryUsage.fetch_add(PayloadSize);
+ m_MemoryWriteCount++;
+ }
+ m_CachedPayloads.emplace_back(std::move(MemCacheBuffer));
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
+ SetReferences(IndexLock, m_FirstReferenceIndex.back(), References);
+ }
+ m_Index.insert_or_assign(HashKey, EntryIndex);
}
- m_Index.insert_or_assign(HashKey, EntryIndex);
- }
- });
+ });
}
void
@@ -2286,11 +2376,58 @@ ZenCacheDiskLayer::CacheBucket::LockedGetReferences(std::size_t FirstReferenceIn
return true;
}
+void
+ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloads,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<IoBuffer>& CachedPayloads,
+ std::vector<size_t>& FirstReferenceIndex,
+ IndexMap& Index,
+ RwLock::ExclusiveLockScope& IndexLock)
+{
+ size_t EntryCount = m_Index.size();
+ Payloads.reserve(EntryCount);
+ AccessTimes.reserve(EntryCount);
+ CachedPayloads.reserve(EntryCount);
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ FirstReferenceIndex.reserve(EntryCount);
+ }
+ Index.reserve(EntryCount);
+ for (auto It : m_Index)
+ {
+ size_t EntryIndex = Payloads.size();
+ Payloads.push_back(m_Payloads[It.second]);
+ AccessTimes.push_back(m_AccessTimes[It.second]);
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ CachedPayloads.push_back(std::move(m_CachedPayloads[It.second]));
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]);
+ }
+ Index.insert({It.first, EntryIndex});
+ }
+ m_Index.swap(Index);
+ m_Payloads.swap(Payloads);
+ m_AccessTimes.swap(AccessTimes);
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_CachedPayloads.swap(CachedPayloads);
+ }
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.swap(FirstReferenceIndex);
+ CompactReferences(IndexLock);
+ }
+}
+
//////////////////////////////////////////////////////////////////////////
-ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir, bool EnableReferenceCaching)
-: m_RootDir(RootDir)
-, m_EnableReferenceCaching(EnableReferenceCaching)
+ZenCacheDiskLayer::ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config)
+: m_JobQueue(JobQueue)
+, m_RootDir(RootDir)
+, m_Configuration(Config)
{
}
@@ -2327,7 +2464,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
else
{
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching));
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig));
Bucket = InsertResult.first->second.get();
std::filesystem::path BucketPath = m_RootDir;
@@ -2342,7 +2479,12 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
ZEN_ASSERT(Bucket != nullptr);
- return Bucket->Get(HashKey, OutValue);
+ if (Bucket->Get(HashKey, OutValue, m_TotalMemCachedSize))
+ {
+ TryMemCacheTrim();
+ return true;
+ }
+ return false;
}
void
@@ -2376,7 +2518,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
}
else
{
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching));
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig));
Bucket = InsertResult.first->second.get();
std::filesystem::path BucketPath = m_RootDir;
@@ -2401,7 +2543,8 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
ZEN_ASSERT(Bucket != nullptr);
- Bucket->Put(HashKey, Value, References);
+ Bucket->Put(HashKey, Value, References, m_TotalMemCachedSize);
+ TryMemCacheTrim();
}
void
@@ -2432,7 +2575,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
continue;
}
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching));
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig));
CacheBucket& Bucket = *InsertResult.first->second;
try
@@ -2489,7 +2632,7 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
m_DroppedBuckets.push_back(std::move(It->second));
m_Buckets.erase(It);
- return Bucket.Drop();
+ return Bucket.Drop(m_TotalMemCachedSize);
}
// Make sure we remove the folder even if we don't know about the bucket
@@ -2511,7 +2654,7 @@ ZenCacheDiskLayer::Drop()
CacheBucket& Bucket = *It->second;
m_DroppedBuckets.push_back(std::move(It->second));
m_Buckets.erase(It->first);
- if (!Bucket.Drop())
+ if (!Bucket.Drop(m_TotalMemCachedSize))
{
return false;
}
@@ -2552,11 +2695,11 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
for (auto& Kv : m_Buckets)
{
#if 1
- Results.push_back(
- Ctx.ThreadPool().EnqueueTask(std::packaged_task<void()>{[Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
+ Results.push_back(Ctx.ThreadPool().EnqueueTask(
+ std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx, m_TotalMemCachedSize); }}));
#else
CacheBucket& Bucket = *Kv.second;
- Bucket.ScrubStorage(Ctx);
+ Bucket.ScrubStorage(Ctx, m_TotalMemCachedSize);
#endif
}
@@ -2587,24 +2730,27 @@ ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx)
}
}
-uint64_t
-ZenCacheDiskLayer::TotalSize() const
+GcStorageSize
+ZenCacheDiskLayer::StorageSize() const
{
- uint64_t TotalSize{};
- RwLock::SharedLockScope _(m_Lock);
+ GcStorageSize StorageSize{};
+ RwLock::SharedLockScope _(m_Lock);
for (auto& Kv : m_Buckets)
{
- TotalSize += Kv.second->TotalSize();
+ GcStorageSize BucketSize = Kv.second->StorageSize();
+ StorageSize.DiskSize += BucketSize.DiskSize;
+ StorageSize.MemorySize += BucketSize.MemorySize;
}
- return TotalSize;
+ return StorageSize;
}
ZenCacheDiskLayer::DiskStats
ZenCacheDiskLayer::Stats() const
{
- ZenCacheDiskLayer::DiskStats Stats = {};
+ GcStorageSize Size = StorageSize();
+ ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize};
{
RwLock::SharedLockScope _(m_Lock);
Stats.BucketStats.reserve(m_Buckets.size());
@@ -2619,8 +2765,7 @@ ZenCacheDiskLayer::Stats() const
ZenCacheDiskLayer::Info
ZenCacheDiskLayer::GetInfo() const
{
- ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir, .EnableReferenceCaching = m_EnableReferenceCaching},
- .TotalSize = TotalSize()};
+ ZenCacheDiskLayer::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration};
{
RwLock::SharedLockScope _(m_Lock);
Info.BucketNames.reserve(m_Buckets.size());
@@ -2628,6 +2773,9 @@ ZenCacheDiskLayer::GetInfo() const
{
Info.BucketNames.push_back(Kv.first);
Info.EntryCount += Kv.second->EntryCount();
+ GcStorageSize BucketSize = Kv.second->StorageSize();
+ Info.StorageSize.DiskSize += BucketSize.DiskSize;
+ Info.StorageSize.MemorySize += BucketSize.MemorySize;
}
}
return Info;
@@ -2640,7 +2788,7 @@ ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const
if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
{
- return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()};
+ return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()};
}
return {};
}
@@ -2677,4 +2825,100 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st
return Details;
}
+void
+ZenCacheDiskLayer::MemCacheTrim()
+{
+ ZEN_TRACE_CPU("Z$::Disk::MemCacheTrim");
+
+ ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0);
+
+ const GcClock::TimePoint Now = GcClock::Now();
+
+ const GcClock::Tick NowTick = Now.time_since_epoch().count();
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
+ GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim;
+ const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count();
+ if (NowTick < NextAllowedTrimTick)
+ {
+ return;
+ }
+
+ bool Expected = false;
+ if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true))
+ {
+ return;
+ }
+
+ // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong
+ const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_LastTickMemCacheTrim.store(NextTrimTick);
+
+ m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this, Now, TrimInterval](JobContext&) {
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]");
+
+ uint64_t StartSize = m_TotalMemCachedSize.load();
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([&] {
+ uint64_t EndSize = m_TotalMemCachedSize.load();
+ ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
+ NiceBytes(StartSize > EndSize ? StartSize - EndSize : 0),
+ NiceBytes(m_TotalMemCachedSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ m_IsMemCacheTrimming.store(false);
+ });
+
+ const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
+
+ std::vector<uint64_t> UsageSlots;
+ UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count());
+
+ std::vector<CacheBucket*> Buckets;
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ RwLock::SharedLockScope _(m_Lock);
+ Buckets.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Buckets.push_back(Kv.second.get());
+ }
+ }
+ for (CacheBucket* Bucket : Buckets)
+ {
+ Bucket->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots);
+ }
+
+ uint64_t TotalSize = 0;
+ for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
+ {
+ TotalSize += UsageSlots[Index];
+ if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes)
+ {
+ GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index);
+ MemCacheTrim(Buckets, ExpireTime);
+ break;
+ }
+ }
+ });
+}
+
+void
+ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime)
+{
+ if (m_Configuration.MemCacheTargetFootprintBytes == 0)
+ {
+ return;
+ }
+ RwLock::SharedLockScope __(m_Lock);
+ for (CacheBucket* Bucket : Buckets)
+ {
+ Bucket->MemCacheTrim(ExpireTime, m_TotalMemCachedSize);
+ }
+ const GcClock::TimePoint Now = GcClock::Now();
+ const GcClock::Tick NowTick = Now.time_since_epoch().count();
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
+ GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim;
+ const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
+}
+
} // namespace zen
diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h
index 7e05430a2..ea390f807 100644
--- a/src/zenserver/cache/cachedisklayer.h
+++ b/src/zenserver/cache/cachedisklayer.h
@@ -17,6 +17,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
class IoBuffer;
+class JobQueue;
#pragma pack(push)
#pragma pack(1)
@@ -90,32 +91,48 @@ static_assert(sizeof(DiskIndexEntry) == 32);
class ZenCacheDiskLayer
{
public:
+ struct BucketConfiguration
+ {
+ uint64_t MaxBlockSize = 1ull << 30;
+ uint64_t PayloadAlignment = 1ull << 4;
+ uint64_t MemCacheSizeThreshold = 1 * 1024;
+ uint64_t LargeObjectThreshold = 128 * 1024;
+ bool EnableReferenceCaching = false;
+ };
+
struct Configuration
{
- std::filesystem::path RootDir;
- bool EnableReferenceCaching;
+ BucketConfiguration BucketConfig;
+ uint64_t MemCacheTargetFootprintBytes = 512 * 1024 * 1024;
+ uint64_t MemCacheTrimIntervalSeconds = 60;
+ uint64_t MemCacheMaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count());
};
struct BucketInfo
{
- uint64_t EntryCount = 0;
- uint64_t TotalSize = 0;
+ uint64_t EntryCount = 0;
+ GcStorageSize StorageSize;
};
struct Info
{
+ std::filesystem::path RootDir;
Configuration Config;
std::vector<std::string> BucketNames;
uint64_t EntryCount = 0;
- uint64_t TotalSize = 0;
+ GcStorageSize StorageSize;
};
struct BucketStats
{
- uint64_t TotalSize;
- uint64_t HitCount;
- uint64_t MissCount;
- uint64_t WriteCount;
+ uint64_t DiskSize;
+ uint64_t MemorySize;
+ uint64_t DiskHitCount;
+ uint64_t DiskMissCount;
+ uint64_t DiskWriteCount;
+ uint64_t MemoryHitCount;
+ uint64_t MemoryMissCount;
+ uint64_t MemoryWriteCount;
metrics::RequestStatsSnapshot PutOps;
metrics::RequestStatsSnapshot GetOps;
};
@@ -129,9 +146,11 @@ public:
struct DiskStats
{
std::vector<NamedBucketStats> BucketStats;
+ uint64_t DiskSize;
+ uint64_t MemorySize;
};
- explicit ZenCacheDiskLayer(const std::filesystem::path& RootDir, bool EnableReferenceCaching);
+ explicit ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheDiskLayer();
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -142,11 +161,10 @@ public:
void ScrubStorage(ScrubContext& Ctx);
void GatherReferences(GcContext& GcCtx);
void CollectGarbage(GcContext& GcCtx);
- void UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes);
- void DiscoverBuckets();
- uint64_t TotalSize() const;
- DiskStats Stats() const;
+ void DiscoverBuckets();
+ GcStorageSize StorageSize() const;
+ DiskStats Stats() const;
Info GetInfo() const;
std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
@@ -161,38 +179,40 @@ private:
*/
struct CacheBucket
{
- CacheBucket(std::string BucketName, bool EnableReferenceCaching);
+ CacheBucket(std::string BucketName, const BucketConfiguration& Config);
~CacheBucket();
bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
- bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
- bool Drop();
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, std::atomic_uint64_t& CacheMemoryUsage);
+ void MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage);
+ bool Drop(std::atomic_uint64_t& CacheMemoryUsage);
void Flush();
- void ScrubStorage(ScrubContext& Ctx);
+ void ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage);
void GatherReferences(GcContext& GcCtx);
- void CollectGarbage(GcContext& GcCtx);
- void UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes);
+ void CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage);
- inline uint64_t TotalSize() const { return m_TotalStandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(); }
- uint64_t EntryCount() const;
- BucketStats Stats();
+ inline GcStorageSize StorageSize() const
+ {
+ return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(),
+ .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)};
+ }
+ uint64_t EntryCount() const;
+ BucketStats Stats();
CacheValueDetails::BucketDetails GetValueDetails(const std::string_view ValueFilter) const;
void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const;
- private:
- const uint64_t MaxBlockSize = 1ull << 30;
- uint64_t m_PayloadAlignment = 1ull << 4;
+ void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots);
+ private:
std::string m_BucketName;
std::filesystem::path m_BucketDir;
std::filesystem::path m_BlocksBasePath;
+ BucketConfiguration m_Configuration;
BlockStore m_BlockStore;
Oid m_BucketId;
- uint64_t m_LargeObjectThreshold = 128 * 1024;
std::atomic_bool m_IsFlushing{};
- const bool m_EnableReferenceCaching = false;
// These files are used to manage storage of small objects for this bucket
@@ -216,9 +236,12 @@ private:
using IndexMap = tsl::robin_map<IoHash, size_t, IoHash::Hasher>;
- std::atomic<uint64_t> m_HitCount;
- std::atomic<uint64_t> m_MissCount;
- std::atomic<uint64_t> m_WriteCount;
+ std::atomic<uint64_t> m_DiskHitCount;
+ std::atomic<uint64_t> m_DiskMissCount;
+ std::atomic<uint64_t> m_DiskWriteCount;
+ std::atomic<uint64_t> m_MemoryHitCount;
+ std::atomic<uint64_t> m_MemoryMissCount;
+ std::atomic<uint64_t> m_MemoryWriteCount;
metrics::RequestStats m_PutOps;
metrics::RequestStats m_GetOps;
@@ -226,16 +249,24 @@ private:
IndexMap m_Index;
std::vector<AccessTime> m_AccessTimes;
std::vector<BucketPayload> m_Payloads;
+ std::vector<IoBuffer> m_CachedPayloads;
std::vector<size_t> m_FirstReferenceIndex;
std::vector<IoHash> m_ReferenceHashes;
std::vector<size_t> m_NextReferenceHashesIndexes;
size_t m_ReferenceCount = 0;
- std::atomic_uint64_t m_TotalStandaloneSize{};
+ std::atomic_uint64_t m_StandaloneSize{};
+ std::atomic_uint64_t m_MemCachedSize{};
void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
- void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ void PutStandaloneCacheValue(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ std::atomic_uint64_t& CacheMemoryUsage);
IoBuffer GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const;
- void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ void PutInlineCacheValue(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ std::atomic_uint64_t& CacheMemoryUsage);
IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
void MakeIndexSnapshot();
uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion);
@@ -257,6 +288,14 @@ private:
}
size_t AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key);
bool LockedGetReferences(std::size_t FirstReferenceIndex, std::vector<IoHash>& OutReferences) const;
+
+ void CompactState(std::vector<BucketPayload>& TmpPayloads,
+ std::vector<AccessTime>& TmpAccessTimes,
+ std::vector<IoBuffer>& TmpCachedPayloads,
+ std::vector<size_t>& TmpFirstReferenceIndex,
+ IndexMap& TmpIndex,
+ RwLock::ExclusiveLockScope& IndexLock);
+
// These locks are here to avoid contention on file creation, therefore it's sufficient
// that we take the same lock for the same hash
//
@@ -267,11 +306,34 @@ private:
inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; }
};
+ inline void TryMemCacheTrim()
+ {
+ if (m_Configuration.MemCacheTargetFootprintBytes == 0)
+ {
+ return;
+ }
+ if (m_Configuration.MemCacheMaxAgeSeconds == 0 || m_Configuration.MemCacheTrimIntervalSeconds == 0)
+ {
+ return;
+ }
+ if (m_TotalMemCachedSize <= m_Configuration.MemCacheTargetFootprintBytes)
+ {
+ return;
+ }
+ MemCacheTrim();
+ }
+ void MemCacheTrim();
+ void MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime);
+
+ JobQueue& m_JobQueue;
std::filesystem::path m_RootDir;
+ Configuration m_Configuration;
+ std::atomic_uint64_t m_TotalMemCachedSize{};
+ std::atomic_bool m_IsMemCacheTrimming = false;
+ std::atomic<GcClock::Tick> m_LastTickMemCacheTrim;
mutable RwLock m_Lock;
- std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; // TODO: make this case insensitive
+ std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets;
std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
- const bool m_EnableReferenceCaching;
ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete;
ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete;
diff --git a/src/zenserver/cache/cachememorylayer.cpp b/src/zenserver/cache/cachememorylayer.cpp
deleted file mode 100644
index b62791974..000000000
--- a/src/zenserver/cache/cachememorylayer.cpp
+++ /dev/null
@@ -1,521 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "cachememorylayer.h"
-
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/compress.h>
-#include <zencore/fmtutils.h>
-#include <zencore/jobqueue.h>
-#include <zencore/scopeguard.h>
-#include <zencore/trace.h>
-
-//////////////////////////////////////////////////////////////////////////
-
-namespace zen {
-
-ZenCacheMemoryLayer::ZenCacheMemoryLayer(JobQueue& JobQueue, const Configuration& Config)
-: m_JobQueue(JobQueue)
-, m_Configuration(Config)
-, m_LastTickTrim(GcClock::Clock::time_point::min().time_since_epoch().count())
-{
-}
-
-ZenCacheMemoryLayer::~ZenCacheMemoryLayer()
-{
-}
-
-bool
-ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- if (m_Configuration.TargetFootprintBytes == 0)
- {
- return false;
- }
- ZEN_TRACE_CPU("Z$::Mem::Get");
-
- RwLock::SharedLockScope _(m_Lock);
-
- auto It = m_Buckets.find(std::string(InBucket));
-
- if (It == m_Buckets.end())
- {
- return false;
- }
-
- CacheBucket* Bucket = It->second.get();
-
- _.ReleaseNow();
-
- // There's a race here. Since the lock is released early to allow
- // inserts, the bucket delete path could end up deleting the
- // underlying data structure
-
- Trim();
-
- return Bucket->Get(HashKey, OutValue);
-}
-
-void
-ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
-{
- if (m_Configuration.TargetFootprintBytes == 0)
- {
- return;
- }
- ZEN_TRACE_CPU("Z$::Mem::Put");
-
- const auto BucketName = std::string(InBucket);
- CacheBucket* Bucket = nullptr;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- }
-
- if (Bucket == nullptr)
- {
- // New bucket
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- else
- {
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>());
- Bucket = InsertResult.first->second.get();
- }
- }
-
- // Note that since the underlying IoBuffer is retained, the content type is also
- int64_t Diff = Bucket->Put(HashKey, Value);
-
- if (Diff > 0)
- {
- m_TotalSize.fetch_add(static_cast<uint64_t>(Diff));
- Trim();
- }
- else if (Diff < 0)
- {
- m_TotalSize.fetch_sub(static_cast<uint64_t>(-Diff));
- }
-}
-
-bool
-ZenCacheMemoryLayer::DropBucket(std::string_view InBucket)
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- auto It = m_Buckets.find(std::string(InBucket));
-
- if (It != m_Buckets.end())
- {
- CacheBucket& Bucket = *It->second;
- m_TotalSize.fetch_sub(Bucket.TotalSize());
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It);
- Bucket.Drop();
- return true;
- }
- return false;
-}
-
-void
-ZenCacheMemoryLayer::Drop()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- while (!m_Buckets.empty())
- {
- const auto& It = m_Buckets.begin();
- CacheBucket& Bucket = *It->second;
- m_TotalSize.fetch_sub(Bucket.TotalSize());
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It->first);
- Bucket.Drop();
- }
-}
-
-void
-ZenCacheMemoryLayer::ScrubStorage(ScrubContext& Ctx)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- Kv.second->ScrubStorage(Ctx);
- }
-}
-
-void
-ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes)
-{
- using namespace zen::access_tracking;
-
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first];
- Kv.second->GatherAccessTimes(Bucket);
- }
-}
-
-uint64_t
-ZenCacheMemoryLayer::CollectGarbage(GcClock::TimePoint ExpireTime)
-{
- uint64_t TrimmedSize = 0;
- RwLock::SharedLockScope __(m_Lock);
- for (auto& Kv : m_Buckets)
- {
- uint64_t BucketTrimmedSize = Kv.second->Trim(ExpireTime);
- if (BucketTrimmedSize > 0)
- {
- m_TotalSize.fetch_sub(BucketTrimmedSize);
- TrimmedSize += BucketTrimmedSize;
- }
- }
- const GcClock::TimePoint Now = GcClock::Now();
- const GcClock::Tick NowTick = Now.time_since_epoch().count();
- const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds);
- GcClock::Tick LastTrimTick = m_LastTickTrim;
- const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_LastTickTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
- return TrimmedSize;
-}
-
-void
-ZenCacheMemoryLayer::Trim()
-{
- if (m_TotalSize <= m_Configuration.TargetFootprintBytes)
- {
- return;
- }
- if (m_Configuration.MaxAgeSeconds == 0 || m_Configuration.TrimIntervalSeconds == 0)
- {
- return;
- }
-
- const GcClock::TimePoint Now = GcClock::Now();
-
- const GcClock::Tick NowTick = Now.time_since_epoch().count();
- const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds);
- GcClock::Tick LastTrimTick = m_LastTickTrim;
- const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count();
- if (NowTick < NextAllowedTrimTick)
- {
- return;
- }
-
- bool Expected = false;
- if (!m_IsTrimming.compare_exchange_strong(Expected, true))
- {
- return;
- }
-
- // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong
- const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_LastTickTrim.store(NextTrimTick);
-
- m_JobQueue.QueueJob("ZenCacheMemoryLayer::Trim", [this, Now, TrimInterval](JobContext&) {
- ZEN_TRACE_CPU("Z$::Mem::Trim");
-
- Stopwatch Timer;
- uint64_t TrimmedSize = 0;
- const auto Guard = MakeGuard([&] {
- if (TrimmedSize > 0)
- {
- ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
- NiceBytes(TrimmedSize),
- NiceBytes(m_TotalSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- }
- m_IsTrimming.store(false);
- });
-
- const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MaxAgeSeconds);
-
- std::vector<uint64_t> UsageSlots;
- UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count());
- {
- RwLock::SharedLockScope __(m_Lock);
- for (auto& Kv : m_Buckets)
- {
- Kv.second->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots);
- }
- }
- uint64_t TotalSize = 0;
- for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
- {
- TotalSize += UsageSlots[Index];
- if (TotalSize >= m_Configuration.TargetFootprintBytes)
- {
- GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index);
- TrimmedSize = CollectGarbage(ExpireTime);
- break;
- }
- }
- });
-}
-
-uint64_t
-ZenCacheMemoryLayer::TotalSize() const
-{
- uint64_t TotalSize{};
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- TotalSize += Kv.second->TotalSize();
- }
-
- return TotalSize;
-}
-
-ZenCacheMemoryLayer::Info
-ZenCacheMemoryLayer::GetInfo() const
-{
- ZenCacheMemoryLayer::Info Info = {.Config = m_Configuration, .TotalSize = TotalSize()};
-
- RwLock::SharedLockScope _(m_Lock);
- Info.BucketNames.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Info.BucketNames.push_back(Kv.first);
- Info.EntryCount += Kv.second->EntryCount();
- }
- return Info;
-}
-
-std::optional<ZenCacheMemoryLayer::BucketInfo>
-ZenCacheMemoryLayer::GetBucketInfo(std::string_view Bucket) const
-{
- RwLock::SharedLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
- {
- return ZenCacheMemoryLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()};
- }
- return {};
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
-{
- RwLock::SharedLockScope _(m_BucketLock);
-
- std::vector<IoHash> BadHashes;
-
- auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) {
- if (ContentType == ZenContentType::kCbObject)
- {
- CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
- return Error == CbValidateError::None;
- }
- if (ContentType == ZenContentType::kCompressedBinary)
- {
- IoHash RawHash;
- uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- return false;
- }
- if (Hash != RawHash)
- {
- return false;
- }
- }
- return true;
- };
-
- for (auto& Kv : m_CacheMap)
- {
- const BucketPayload& Payload = m_Payloads[Kv.second];
- if (!ValidateEntry(Kv.first, Payload.Payload.GetContentType(), Payload.Payload))
- {
- BadHashes.push_back(Kv.first);
- }
- }
-
- if (!BadHashes.empty())
- {
- Ctx.ReportBadCidChunks(BadHashes);
- }
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes)
-{
- RwLock::SharedLockScope _(m_BucketLock);
- std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [this](const auto& Kv) {
- return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = m_AccessTimes[Kv.second]};
- });
-}
-
-bool
-ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- ZEN_TRACE_CPU("Z$::Mem::Bucket::Get");
-
- metrics::OperationTiming::Scope $(m_GetOps);
-
- RwLock::SharedLockScope _(m_BucketLock);
-
- if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end())
- {
- uint32_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
- ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size());
-
- const BucketPayload& Payload = m_Payloads[EntryIndex];
- OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash};
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
-
- return true;
- }
-
- return false;
-}
-
-int64_t
-ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
-{
- ZEN_TRACE_CPU("Z$::Mem::Bucket::Put");
-
- metrics::OperationTiming::Scope $(m_PutOps);
-
- size_t PayloadSize = Value.Value.GetSize();
- uint64_t OldPayloadSize = 0;
-
- {
- GcClock::Tick AccessTime = GcClock::TickCount();
- RwLock::ExclusiveLockScope _(m_BucketLock);
- if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end())
- {
- uint32_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
-
- BucketPayload& Payload = m_Payloads[EntryIndex];
- OldPayloadSize = Payload.Payload.GetSize();
- Payload.Payload = IoBufferBuilder::ReadFromFileMaybe(Value.Value);
- Payload.RawHash = Value.RawHash;
- Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize);
- m_AccessTimes[EntryIndex] = AccessTime;
- }
- else if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max())
- {
- // No more space in our memory cache!
- return 0;
- }
- else
- {
- uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size());
- m_Payloads.emplace_back(BucketPayload{.Payload = IoBufferBuilder::ReadFromFileMaybe(Value.Value),
- .RawSize = gsl::narrow<uint32_t>(Value.RawSize),
- .RawHash = Value.RawHash});
- m_AccessTimes.emplace_back(AccessTime);
- m_CacheMap.insert_or_assign(HashKey, EntryIndex);
- }
- ZEN_ASSERT_SLOW(m_Payloads.size() == m_CacheMap.size());
- ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size());
- }
-
- if (PayloadSize > OldPayloadSize)
- {
- m_TotalSize.fetch_add(PayloadSize - OldPayloadSize);
- return PayloadSize - OldPayloadSize;
- }
- else if (PayloadSize < OldPayloadSize)
- {
- m_TotalSize.fetch_sub(OldPayloadSize - PayloadSize);
- return -static_cast<int64_t>(OldPayloadSize - PayloadSize);
- }
- return 0;
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::Drop()
-{
- RwLock::ExclusiveLockScope _(m_BucketLock);
- m_CacheMap.clear();
- m_AccessTimes.clear();
- m_Payloads.clear();
- m_TotalSize.store(0);
-}
-
-uint64_t
-ZenCacheMemoryLayer::CacheBucket::Trim(GcClock::TimePoint ExpireTime)
-{
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketPayload> Payloads;
- tsl::robin_map<IoHash, uint32_t> CacheMap;
-
- size_t TrimmedSize = 0;
- GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
-
- RwLock::ExclusiveLockScope _(m_BucketLock);
- {
- AccessTimes.reserve(m_CacheMap.size());
- Payloads.reserve(m_CacheMap.size());
- CacheMap.reserve(m_CacheMap.size());
-
- for (const auto& Kv : m_CacheMap)
- {
- if (m_AccessTimes[Kv.second] < ExpireTicks)
- {
- size_t PayloadSize = m_Payloads[Kv.second].Payload.GetSize();
- m_TotalSize.fetch_sub(PayloadSize);
- TrimmedSize += PayloadSize;
- continue;
- }
- size_t Index = gsl::narrow<uint32_t>(Payloads.size());
- Payloads.emplace_back(m_Payloads[Kv.second]);
- AccessTimes.push_back(m_AccessTimes[Kv.second]);
- CacheMap.insert_or_assign(Kv.first, Index);
- }
-
- m_AccessTimes.swap(AccessTimes);
- m_Payloads.swap(Payloads);
- m_CacheMap.swap(CacheMap);
- }
- return TrimmedSize;
-}
-
-uint64_t
-ZenCacheMemoryLayer::CacheBucket::EntryCount() const
-{
- RwLock::SharedLockScope _(m_BucketLock);
- return static_cast<uint64_t>(m_CacheMap.size());
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart,
- GcClock::Duration SectionLength,
- std::vector<uint64_t>& InOutUsageSlots)
-{
- RwLock::SharedLockScope _(m_BucketLock);
- for (const auto& It : m_CacheMap)
- {
- uint32_t Index = It.second;
- GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index]));
- GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch();
- uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0);
- if (Slot >= InOutUsageSlots.capacity())
- {
- Slot = InOutUsageSlots.capacity() - 1;
- }
- if (Slot > InOutUsageSlots.size())
- {
- InOutUsageSlots.resize(uint64_t(Slot + 1), 0);
- }
- InOutUsageSlots[Slot] += m_Payloads[Index].Payload.GetSize();
- }
-}
-
-} // namespace zen
diff --git a/src/zenserver/cache/cachememorylayer.h b/src/zenserver/cache/cachememorylayer.h
deleted file mode 100644
index f15fe241b..000000000
--- a/src/zenserver/cache/cachememorylayer.h
+++ /dev/null
@@ -1,124 +0,0 @@
-
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "cacheshared.h"
-
-#include <zencore/iohash.h>
-#include <zencore/stats.h>
-#include <zenstore/gc.h>
-
-#include <inttypes.h>
-#include <string_view>
-#include <vector>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <tsl/robin_map.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-namespace zen {
-
-class JobQueue;
-
-/** In-memory cache storage
-
- Intended for small values which are frequently accessed
-
- This should have a better memory management policy to maintain reasonable
- footprint.
- */
-class ZenCacheMemoryLayer
-{
-public:
- struct Configuration
- {
- uint64_t TargetFootprintBytes = 512 * 1024 * 1024;
- uint64_t TrimIntervalSeconds = 60;
- uint64_t MaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count());
- };
-
- struct BucketInfo
- {
- uint64_t EntryCount = 0;
- uint64_t TotalSize = 0;
- };
-
- struct Info
- {
- Configuration Config;
- std::vector<std::string> BucketNames;
- uint64_t EntryCount = 0;
- uint64_t TotalSize = 0;
- };
-
- ZenCacheMemoryLayer(JobQueue& JobQueue, const Configuration& Config);
- ~ZenCacheMemoryLayer();
-
- bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
- uint64_t CollectGarbage(GcClock::TimePoint ExpireTime);
- void Drop();
- bool DropBucket(std::string_view Bucket);
- void ScrubStorage(ScrubContext& Ctx);
- void GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes);
- uint64_t TotalSize() const;
-
- Info GetInfo() const;
- std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
-
- const Configuration& GetConfiguration() const { return m_Configuration; }
- void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; }
-
-private:
- void Trim();
-
- struct CacheBucket
- {
-#pragma pack(push)
-#pragma pack(1)
- struct BucketPayload
- {
- IoBuffer Payload; // 8
- uint32_t RawSize; // 4
- IoHash RawHash; // 20
- };
-#pragma pack(pop)
- static_assert(sizeof(BucketPayload) == 32u);
- static_assert(sizeof(AccessTime) == 4u);
-
- metrics::OperationTiming m_PutOps;
- metrics::OperationTiming m_GetOps;
-
- mutable RwLock m_BucketLock;
- std::vector<AccessTime> m_AccessTimes;
- std::vector<BucketPayload> m_Payloads;
- tsl::robin_map<IoHash, uint32_t> m_CacheMap;
-
- std::atomic_uint64_t m_TotalSize{};
-
- bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- int64_t Put(const IoHash& HashKey, const ZenCacheValue& Value);
- uint64_t Trim(GcClock::TimePoint ExpireTime);
- void Drop();
- void ScrubStorage(ScrubContext& Ctx);
- void GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes);
- inline uint64_t TotalSize() const { return m_TotalSize; }
- uint64_t EntryCount() const;
- void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots);
- };
-
- JobQueue& m_JobQueue;
- mutable RwLock m_Lock;
- std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets;
- std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
- Configuration m_Configuration;
- std::atomic_uint64_t m_TotalSize{};
- std::atomic_bool m_IsTrimming = false;
- std::atomic<GcClock::Tick> m_LastTickTrim;
-
- ZenCacheMemoryLayer(const ZenCacheMemoryLayer&) = delete;
- ZenCacheMemoryLayer& operator=(const ZenCacheMemoryLayer&) = delete;
-};
-
-} // namespace zen
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 91f895b0b..d8bc00c03 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -366,7 +366,7 @@ HttpStructuredCacheService::ScrubStorage(ScrubContext& Ctx)
ZenCacheStore::Info Info = m_CacheStore.GetInfo();
- ZEN_INFO("scrubbing '{}'", Info.Config.BasePath);
+ ZEN_INFO("scrubbing '{}'", Info.BasePath);
m_LastScrubTime = Ctx.ScrubTimestamp();
@@ -712,7 +712,7 @@ HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request)
ResponseWriter.BeginObject("Configuration");
{
ExtendableStringBuilder<128> BasePathString;
- BasePathString << Info.Config.BasePath.u8string();
+ BasePathString << Info.BasePath.u8string();
ResponseWriter.AddString("BasePath"sv, BasePathString.ToView());
ResponseWriter.AddBool("AllowAutomaticCreationOfNamespaces", Info.Config.AllowAutomaticCreationOfNamespaces);
ResponseWriter.BeginObject("Logging");
@@ -742,7 +742,6 @@ HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request)
ResponseWriter.EndObject();
ResponseWriter.AddInteger("DiskEntryCount", Info.DiskEntryCount);
- ResponseWriter.AddInteger("MemoryEntryCount", Info.MemoryEntryCount);
return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
}
@@ -772,9 +771,16 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque
ResponseWriter.BeginObject("Configuration");
{
ExtendableStringBuilder<128> BasePathString;
- BasePathString << Info->Config.RootDir.u8string();
+ BasePathString << Info->RootDir.u8string();
ResponseWriter.AddString("RootDir"sv, BasePathString.ToView());
- ResponseWriter.AddInteger("DiskLayerThreshold"sv, Info->Config.DiskLayerThreshold);
+ ResponseWriter.AddInteger("MaxBlockSize"sv, Info->Config.DiskLayerConfig.BucketConfig.MaxBlockSize);
+ ResponseWriter.AddInteger("PayloadAlignment"sv, Info->Config.DiskLayerConfig.BucketConfig.PayloadAlignment);
+ ResponseWriter.AddInteger("MemCacheSizeThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold);
+ ResponseWriter.AddInteger("LargeObjectThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.LargeObjectThreshold);
+ ResponseWriter.AddInteger("MemCacheTargetFootprintBytes"sv, Info->Config.DiskLayerConfig.MemCacheTargetFootprintBytes);
+ ResponseWriter.AddInteger("MemCacheTrimIntervalSeconds"sv, Info->Config.DiskLayerConfig.MemCacheTrimIntervalSeconds);
+ ResponseWriter.AddInteger("MemCacheMaxAgeSeconds"sv, Info->Config.DiskLayerConfig.MemCacheMaxAgeSeconds);
+ ResponseWriter.AddBool("EnableReferenceCaching"sv, Info->Config.DiskLayerConfig.BucketConfig.EnableReferenceCaching);
}
ResponseWriter.EndObject();
@@ -791,13 +797,12 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque
ResponseWriter.BeginObject("StorageSize"sv);
{
- ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.TotalSize);
- ResponseWriter.AddInteger("MemorySize"sv, Info->MemoryLayerInfo.TotalSize);
+ ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.StorageSize.DiskSize);
+ ResponseWriter.AddInteger("MemorySize"sv, Info->DiskLayerInfo.StorageSize.MemorySize);
}
ResponseWriter.EndObject();
- ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount);
- ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount);
+ ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount);
return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
}
@@ -842,13 +847,12 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
ResponseWriter.BeginObject("StorageSize");
{
- ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.TotalSize);
- ResponseWriter.AddInteger("MemorySize", Info->MemoryLayerInfo.TotalSize);
+ ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.StorageSize.DiskSize);
+ ResponseWriter.AddInteger("MemorySize", Info->DiskLayerInfo.StorageSize.MemorySize);
}
ResponseWriter.EndObject();
ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount);
- ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount);
return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
}
@@ -3362,6 +3366,12 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
Cbo << "hit_ratio" << (NamespaceTotal > 0 ? (double(NamespaceStats.Stats.HitCount) / double(NamespaceTotal)) : 0.0);
EmitSnapshot("read", NamespaceStats.Stats.GetOps, Cbo);
EmitSnapshot("write", NamespaceStats.Stats.PutOps, Cbo);
+ Cbo.BeginObject("size");
+ {
+ Cbo << "disk" << NamespaceStats.Stats.DiskStats.DiskSize;
+ Cbo << "memory" << NamespaceStats.Stats.DiskStats.MemorySize;
+ }
+ Cbo.EndObject();
if (!NamespaceStats.Stats.DiskStats.BucketStats.empty())
{
Cbo.BeginArray("buckets");
@@ -3369,24 +3379,50 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
{
Cbo.BeginObject();
Cbo.AddString("bucket", BucketStats.BucketName);
- if (BucketStats.Stats.TotalSize == 0 && BucketStats.Stats.HitCount == 0 && BucketStats.Stats.MissCount == 0 &&
- BucketStats.Stats.WriteCount == 0)
+ if (BucketStats.Stats.DiskSize != 0 || BucketStats.Stats.MemorySize != 0)
{
+ Cbo.BeginObject("size");
+ {
+ Cbo << "disk" << BucketStats.Stats.DiskSize;
+ Cbo << "memory" << BucketStats.Stats.MemorySize;
+ }
Cbo.EndObject();
- continue;
}
- Cbo << "size" << BucketStats.Stats.TotalSize;
- const uint64_t BucketTotal = BucketStats.Stats.HitCount + BucketStats.Stats.MissCount;
- if (BucketTotal == 0 && BucketStats.Stats.WriteCount == 0)
+
+ if (BucketStats.Stats.DiskSize == 0 && BucketStats.Stats.DiskHitCount == 0 &&
+ BucketStats.Stats.DiskMissCount == 0 && BucketStats.Stats.DiskWriteCount == 0 &&
+ BucketStats.Stats.MemoryHitCount == 0 && BucketStats.Stats.MemoryMissCount == 0 &&
+ BucketStats.Stats.MemoryWriteCount == 0)
{
Cbo.EndObject();
continue;
}
- Cbo << "hits" << BucketStats.Stats.HitCount << "misses" << BucketStats.Stats.MissCount << "writes"
- << BucketStats.Stats.WriteCount;
- Cbo << "hit_ratio" << (BucketTotal > 0 ? (double(BucketStats.Stats.HitCount) / double(BucketTotal)) : 0.0);
- EmitSnapshot("read", BucketStats.Stats.GetOps, Cbo);
- EmitSnapshot("write", BucketStats.Stats.PutOps, Cbo);
+
+ const uint64_t BucketDiskTotal = BucketStats.Stats.DiskHitCount + BucketStats.Stats.DiskMissCount;
+ if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0)
+ {
+ Cbo << "hits" << BucketStats.Stats.DiskHitCount << "misses" << BucketStats.Stats.DiskMissCount << "writes"
+ << BucketStats.Stats.DiskWriteCount;
+ Cbo << "hit_ratio"
+ << (BucketDiskTotal > 0 ? (double(BucketStats.Stats.DiskHitCount) / double(BucketDiskTotal)) : 0.0);
+ }
+
+ const uint64_t BucketMemoryTotal = BucketStats.Stats.MemoryHitCount + BucketStats.Stats.MemoryMissCount;
+ if (BucketMemoryTotal != 0 || BucketStats.Stats.MemoryWriteCount != 0)
+ {
+ Cbo << "mem_hits" << BucketStats.Stats.MemoryHitCount << "mem_misses" << BucketStats.Stats.MemoryMissCount
+ << "mem_writes" << BucketStats.Stats.MemoryWriteCount;
+ Cbo << "mem_hit_ratio"
+ << (BucketMemoryTotal > 0 ? (double(BucketStats.Stats.MemoryHitCount) / double(BucketMemoryTotal))
+ : 0.0);
+ }
+
+ if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0 || BucketMemoryTotal != 0 ||
+ BucketStats.Stats.MemoryWriteCount != 0)
+ {
+ EmitSnapshot("read", BucketStats.Stats.GetOps, Cbo);
+ EmitSnapshot("write", BucketStats.Stats.PutOps, Cbo);
+ }
Cbo.EndObject();
}
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index 77316a5dc..89123a70f 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -58,19 +58,15 @@ IsKnownBadBucketName(std::string_view Bucket)
return false;
}
-ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc,
- JobQueue& JobQueue,
- const std::filesystem::path& RootDir,
- bool EnableReferenceCaching,
- const ZenCacheMemoryLayer::Configuration MemLayerConfig)
+ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config)
: m_Gc(Gc)
-, m_RootDir(RootDir)
, m_JobQueue(JobQueue)
-, m_MemLayer(m_JobQueue, MemLayerConfig)
-, m_DiskLayer(RootDir, EnableReferenceCaching)
+, m_RootDir(RootDir)
+, m_Configuration(Config)
+, m_DiskLayer(m_JobQueue, m_RootDir, m_Configuration.DiskLayerConfig)
{
- ZEN_INFO("initializing structured cache at '{}'", RootDir);
- CreateDirectories(RootDir);
+ ZEN_INFO("initializing structured cache at '{}'", m_RootDir);
+ CreateDirectories(m_RootDir);
m_DiskLayer.DiscoverBuckets();
@@ -91,27 +87,13 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
- bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue);
-
- if (Ok)
- {
- ZEN_ASSERT(OutValue.Value.Size());
- StatsScope.SetBytes(OutValue.Value.Size());
- m_HitCount++;
- return true;
- }
-
- Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue);
+ bool Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue);
if (Ok)
{
ZEN_ASSERT(OutValue.Value.Size());
StatsScope.SetBytes(OutValue.Value.Size());
- if (OutValue.Value.Size() <= m_DiskLayerSizeThreshold)
- {
- m_MemLayer.Put(InBucket, HashKey, OutValue);
- }
m_HitCount++;
return true;
}
@@ -132,11 +114,6 @@ ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const Z
ZEN_ASSERT(Value.Value.Size());
m_DiskLayer.Put(InBucket, HashKey, Value, References);
-
- if (Value.Value.Size() <= m_DiskLayerSizeThreshold)
- {
- m_MemLayer.Put(InBucket, HashKey, Value);
- }
m_WriteCount++;
}
@@ -145,15 +122,11 @@ ZenCacheNamespace::DropBucket(std::string_view Bucket)
{
ZEN_INFO("dropping bucket '{}'", Bucket);
- // TODO: should ensure this is done atomically across all layers
+ const bool Dropped = m_DiskLayer.DropBucket(Bucket);
- const bool MemDropped = m_MemLayer.DropBucket(Bucket);
- const bool DiskDropped = m_DiskLayer.DropBucket(Bucket);
- const bool AnyDropped = MemDropped || DiskDropped;
+ ZEN_INFO("bucket '{}' was {}", Bucket, Dropped ? "dropped" : "not found");
- ZEN_INFO("bucket '{}' was {}", Bucket, AnyDropped ? "dropped" : "not found");
-
- return AnyDropped;
+ return Dropped;
}
void
@@ -166,7 +139,6 @@ ZenCacheNamespace::EnumerateBucketContents(std::string_view
bool
ZenCacheNamespace::Drop()
{
- m_MemLayer.Drop();
return m_DiskLayer.Drop();
}
@@ -189,7 +161,6 @@ ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx)
m_LastScrubTime = Ctx.ScrubTimestamp();
m_DiskLayer.ScrubStorage(Ctx);
- m_MemLayer.ScrubStorage(Ctx);
}
void
@@ -201,10 +172,6 @@ ZenCacheNamespace::GatherReferences(GcContext& GcCtx)
const auto Guard =
MakeGuard([&] { ZEN_DEBUG("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
- access_tracking::AccessTimes AccessTimes;
- m_MemLayer.GatherAccessTimes(AccessTimes);
-
- m_DiskLayer.UpdateAccessTimes(AccessTimes);
m_DiskLayer.GatherReferences(GcCtx);
}
@@ -213,31 +180,24 @@ ZenCacheNamespace::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::Namespace::CollectGarbage");
- (void)m_MemLayer.CollectGarbage(GcCtx.CacheExpireTime());
m_DiskLayer.CollectGarbage(GcCtx);
}
GcStorageSize
ZenCacheNamespace::StorageSize() const
{
- return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()};
+ return m_DiskLayer.StorageSize();
}
ZenCacheNamespace::Info
ZenCacheNamespace::GetInfo() const
{
- ZenCacheNamespace::Info Info = {.Config = {.RootDir = m_RootDir, .DiskLayerThreshold = m_DiskLayerSizeThreshold},
- .DiskLayerInfo = m_DiskLayer.GetInfo(),
- .MemoryLayerInfo = m_MemLayer.GetInfo()};
+ ZenCacheNamespace::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration, .DiskLayerInfo = m_DiskLayer.GetInfo()};
std::unordered_set<std::string> BucketNames;
for (const std::string& BucketName : Info.DiskLayerInfo.BucketNames)
{
BucketNames.insert(BucketName);
}
- for (const std::string& BucketName : Info.MemoryLayerInfo.BucketNames)
- {
- BucketNames.insert(BucketName);
- }
Info.BucketNames.insert(Info.BucketNames.end(), BucketNames.begin(), BucketNames.end());
return Info;
}
@@ -250,8 +210,7 @@ ZenCacheNamespace::GetBucketInfo(std::string_view Bucket) const
{
return {};
}
- ZenCacheNamespace::BucketInfo Info = {.DiskLayerInfo = *DiskBucketInfo,
- .MemoryLayerInfo = m_MemLayer.GetBucketInfo(Bucket).value_or(ZenCacheMemoryLayer::BucketInfo{})};
+ ZenCacheNamespace::BucketInfo Info = {.DiskLayerInfo = *DiskBucketInfo};
return Info;
}
@@ -278,23 +237,25 @@ ZEN_DEFINE_LOG_CATEGORY_STATIC(LogCacheActivity, "z$");
static constinit std::string_view UE4DDCNamespaceName = "ue4.ddc";
-ZenCacheStore::ZenCacheStore(GcManager& Gc,
- JobQueue& JobQueue,
- const Configuration& Configuration,
- const DiskWriteBlocker* InDiskWriteBlocker)
+ZenCacheStore::ZenCacheStore(GcManager& Gc,
+ JobQueue& JobQueue,
+ const std::filesystem::path& BasePath,
+ const Configuration& Configuration,
+ const DiskWriteBlocker* InDiskWriteBlocker)
: m_DiskWriteBlocker(InDiskWriteBlocker)
, m_Gc(Gc)
, m_JobQueue(JobQueue)
+, m_BasePath(BasePath)
, m_Configuration(Configuration)
, m_ExitLogging(false)
{
SetLoggingConfig(m_Configuration.Logging);
- CreateDirectories(m_Configuration.BasePath);
+ CreateDirectories(m_BasePath);
- ZEN_INFO("initializing cache store at '{}'", m_Configuration.BasePath);
+ ZEN_INFO("initializing cache store at '{}'", m_BasePath);
DirectoryContent DirContent;
- GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
+ GetDirectoryContent(m_BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
std::vector<std::string> Namespaces;
for (const std::filesystem::path& DirPath : DirContent.Directories)
@@ -307,14 +268,13 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc,
}
}
- ZEN_INFO("Found {} namespaces in '{}'", Namespaces.size(), m_Configuration.BasePath);
+ ZEN_INFO("Found {} namespaces in '{}'", Namespaces.size(), m_BasePath);
if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end())
{
// default (unspecified) and ue4-ddc namespace points to the same namespace instance
- std::filesystem::path DefaultNamespaceFolder =
- m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
+ std::filesystem::path DefaultNamespaceFolder = m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
CreateDirectories(DefaultNamespaceFolder);
Namespaces.push_back(std::string(UE4DDCNamespaceName));
}
@@ -324,15 +284,14 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc,
m_Namespaces[NamespaceName] =
std::make_unique<ZenCacheNamespace>(Gc,
m_JobQueue,
- m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName),
- m_Configuration.EnableReferenceCaching,
- m_Configuration.MemLayerConfig);
+ m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName),
+ m_Configuration.NamespaceConfig);
}
}
ZenCacheStore::~ZenCacheStore()
{
- ZEN_INFO("closing cache store at '{}'", m_Configuration.BasePath);
+ ZEN_INFO("closing cache store at '{}'", m_BasePath);
SetLoggingConfig({.EnableWriteLog = false, .EnableAccessLog = false});
m_Namespaces.clear();
}
@@ -564,7 +523,7 @@ ZenCacheStore::DropNamespace(std::string_view InNamespace)
void
ZenCacheStore::Flush()
{
- ZEN_INFO("flushing cache store at '{}'", m_Configuration.BasePath);
+ ZEN_INFO("flushing cache store at '{}'", m_BasePath);
IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); });
}
@@ -632,13 +591,12 @@ ZenCacheStore::GetNamespace(std::string_view Namespace)
return It->second.get();
}
- auto NewNamespace = m_Namespaces.insert_or_assign(
- std::string(Namespace),
- std::make_unique<ZenCacheNamespace>(m_Gc,
- m_JobQueue,
- m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace),
- m_Configuration.EnableReferenceCaching,
- m_Configuration.MemLayerConfig));
+ auto NewNamespace =
+ m_Namespaces.insert_or_assign(std::string(Namespace),
+ std::make_unique<ZenCacheNamespace>(m_Gc,
+ m_JobQueue,
+ m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace),
+ m_Configuration.NamespaceConfig));
return NewNamespace.first->second.get();
}
@@ -754,7 +712,6 @@ ZenCacheStore::GetInfo() const
Info.NamespaceNames.push_back(std::string(NamespaceName));
ZenCacheNamespace::Info NamespaceInfo = Namespace.GetInfo();
Info.DiskEntryCount += NamespaceInfo.DiskLayerInfo.EntryCount;
- Info.MemoryEntryCount += NamespaceInfo.MemoryLayerInfo.EntryCount;
});
return Info;
@@ -816,7 +773,7 @@ TEST_CASE("z$.store")
GcManager Gc;
auto JobQueue = MakeJobQueue(1, "testqueue");
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", false);
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
const int kIterationCount = 100;
@@ -872,9 +829,9 @@ TEST_CASE("z$.size")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", false);
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
- CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256);
+ CbObject CacheValue = CreateCacheValue(Zcs.GetConfig().DiskLayerConfig.BucketConfig.MemCacheSizeThreshold - 256);
IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
Buffer.SetContentType(ZenContentType::kCbObject);
@@ -895,7 +852,7 @@ TEST_CASE("z$.size")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", false);
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
const GcStorageSize SerializedSize = Zcs.StorageSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
@@ -906,6 +863,7 @@ TEST_CASE("z$.size")
Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
}
CHECK_EQ(0, Zcs.StorageSize().DiskSize);
+ CHECK_EQ(0, Zcs.StorageSize().MemorySize);
}
}
@@ -918,9 +876,9 @@ TEST_CASE("z$.size")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", false);
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
- CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64);
+ CbObject CacheValue = CreateCacheValue(Zcs.GetConfig().DiskLayerConfig.BucketConfig.MemCacheSizeThreshold + 64);
IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
Buffer.SetContentType(ZenContentType::kCbObject);
@@ -938,7 +896,7 @@ TEST_CASE("z$.size")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", false);
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
const GcStorageSize SerializedSize = Zcs.StorageSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
@@ -977,7 +935,10 @@ TEST_CASE("z$.gc")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", true);
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
const auto Bucket = "teardrinker"sv;
// Create a cache record
@@ -1014,7 +975,10 @@ TEST_CASE("z$.gc")
// Expect timestamps to be serialized
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", true);
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
std::vector<IoHash> Keep;
// Collect garbage with 1 hour max cache duration
@@ -1035,7 +999,10 @@ TEST_CASE("z$.gc")
{
ScopedTemporaryDirectory TempDir;
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", true);
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
const auto Bucket = "fortysixandtwo"sv;
const GcClock::TimePoint CurrentTime = GcClock::Now();
@@ -1081,7 +1048,10 @@ TEST_CASE("z$.gc")
ScopedTemporaryDirectory TempDir;
GcManager Gc;
{
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", true);
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
const auto Bucket = "rightintwo"sv;
std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
@@ -1126,7 +1096,10 @@ TEST_CASE("z$.gc")
}
{
// Unreferenced blocks will be pruned so size should now be zero
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", true);
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
CHECK_EQ(0, Zcs.StorageSize().DiskSize);
}
}
@@ -1183,7 +1156,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
WorkerThreadPool ThreadPool(4);
GcManager Gc;
auto JobQueue = MakeJobQueue(1, "testqueue");
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path(), true);
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path(), {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
{
std::atomic<size_t> WorkCompleted = 0;
@@ -1402,7 +1375,7 @@ TEST_CASE("z$.namespaces")
IoHash Key2;
{
GcManager Gc;
- ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false}, nullptr);
+ ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = false}, nullptr);
const auto Bucket = "teardrinker"sv;
const auto CustomNamespace = "mynamespace"sv;
@@ -1427,7 +1400,7 @@ TEST_CASE("z$.namespaces")
{
GcManager Gc;
- ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr);
+ ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr);
const auto Bucket = "teardrinker"sv;
const auto CustomNamespace = "mynamespace"sv;
@@ -1490,7 +1463,7 @@ TEST_CASE("z$.drop.bucket")
WorkerThreadPool Workers(1);
{
GcManager Gc;
- ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr);
+ ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr);
const auto Bucket = "teardrinker"sv;
const auto Namespace = "mynamespace"sv;
@@ -1563,7 +1536,7 @@ TEST_CASE("z$.drop.namespace")
WorkerThreadPool Workers(1);
{
GcManager Gc;
- ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr);
+ ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr);
const auto Bucket1 = "teardrinker1"sv;
const auto Bucket2 = "teardrinker2"sv;
const auto Namespace1 = "mynamespace1"sv;
@@ -1629,7 +1602,7 @@ TEST_CASE("z$.blocked.disklayer.put")
GcManager Gc;
auto JobQueue = MakeJobQueue(1, "testqueue");
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", false);
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
CbObject CacheValue = CreateCacheValue(64 * 1024 + 64);
@@ -1724,7 +1697,7 @@ TEST_CASE("z$.scrub")
GcManager Gc;
CidStore CidStore(Gc);
auto JobQueue = MakeJobQueue(1, "testqueue");
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", true);
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h
index 28b2189ae..02d0e31c0 100644
--- a/src/zenserver/cache/structuredcachestore.h
+++ b/src/zenserver/cache/structuredcachestore.h
@@ -3,7 +3,6 @@
#pragma once
#include "cachedisklayer.h"
-#include "cachememorylayer.h"
#include <zencore/compactbinary.h>
#include <zencore/iohash.h>
@@ -52,20 +51,18 @@ class ZenCacheNamespace final : public GcStorage, public GcContributor
public:
struct Configuration
{
- std::filesystem::path RootDir;
- uint64_t DiskLayerThreshold = 0;
+ ZenCacheDiskLayer::Configuration DiskLayerConfig;
};
struct BucketInfo
{
- ZenCacheDiskLayer::BucketInfo DiskLayerInfo;
- ZenCacheMemoryLayer::BucketInfo MemoryLayerInfo;
+ ZenCacheDiskLayer::BucketInfo DiskLayerInfo;
};
struct Info
{
- Configuration Config;
- std::vector<std::string> BucketNames;
- ZenCacheDiskLayer::Info DiskLayerInfo;
- ZenCacheMemoryLayer::Info MemoryLayerInfo;
+ std::filesystem::path RootDir;
+ Configuration Config;
+ std::vector<std::string> BucketNames;
+ ZenCacheDiskLayer::Info DiskLayerInfo;
};
struct NamespaceStats
@@ -78,11 +75,7 @@ public:
ZenCacheDiskLayer::DiskStats DiskStats;
};
- ZenCacheNamespace(GcManager& Gc,
- JobQueue& JobQueue,
- const std::filesystem::path& RootDir,
- bool EnableReferenceCaching,
- const ZenCacheMemoryLayer::Configuration MemLayerConfig = {});
+ ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheNamespace();
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -92,12 +85,10 @@ public:
void EnumerateBucketContents(std::string_view Bucket,
std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const;
- bool Drop();
- void Flush();
- uint64_t DiskLayerThreshold() const { return m_DiskLayerSizeThreshold; }
+ bool Drop();
+ void Flush();
// GcContributor
-
virtual void GatherReferences(GcContext& GcCtx) override;
// GcStorage
@@ -105,6 +96,7 @@ public:
virtual void CollectGarbage(GcContext& GcCtx) override;
virtual GcStorageSize StorageSize() const override;
+ Configuration GetConfig() const { return m_Configuration; }
Info GetInfo() const;
std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
NamespaceStats Stats();
@@ -113,17 +105,16 @@ public:
private:
GcManager& m_Gc;
- std::filesystem::path m_RootDir;
JobQueue& m_JobQueue;
- ZenCacheMemoryLayer m_MemLayer;
+ std::filesystem::path m_RootDir;
+ Configuration m_Configuration;
ZenCacheDiskLayer m_DiskLayer;
std::atomic<uint64_t> m_HitCount{};
std::atomic<uint64_t> m_MissCount{};
std::atomic<uint64_t> m_WriteCount{};
metrics::RequestStats m_PutOps;
metrics::RequestStats m_GetOps;
- uint64_t m_DiskLayerSizeThreshold = 1 * 1024;
- uint64_t m_LastScrubTime = 0;
+ uint64_t m_LastScrubTime = 0;
ZenCacheNamespace(const ZenCacheNamespace&) = delete;
ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete;
@@ -144,10 +135,8 @@ public:
struct Configuration
{
- std::filesystem::path BasePath;
- bool AllowAutomaticCreationOfNamespaces = false;
- bool EnableReferenceCaching = false;
- ZenCacheMemoryLayer::Configuration MemLayerConfig;
+ ZenCacheNamespace::Configuration NamespaceConfig;
+ bool AllowAutomaticCreationOfNamespaces = false;
struct LogConfig
{
bool EnableWriteLog = true;
@@ -157,10 +146,10 @@ public:
struct Info
{
+ std::filesystem::path BasePath;
Configuration Config;
std::vector<std::string> NamespaceNames;
- uint64_t DiskEntryCount = 0;
- uint64_t MemoryEntryCount = 0;
+ uint64_t DiskEntryCount = 0;
GcStorageSize StorageSize;
};
@@ -182,7 +171,11 @@ public:
std::vector<NamedNamespaceStats> NamespaceStats;
};
- ZenCacheStore(GcManager& Gc, JobQueue& JobQueue, const Configuration& Configuration, const DiskWriteBlocker* InDiskWriteBlocker);
+ ZenCacheStore(GcManager& Gc,
+ JobQueue& JobQueue,
+ const std::filesystem::path& BasePath,
+ const Configuration& Configuration,
+ const DiskWriteBlocker* InDiskWriteBlocker);
~ZenCacheStore();
bool Get(const CacheRequestContext& Context,
@@ -233,6 +226,7 @@ private:
GcManager& m_Gc;
JobQueue& m_JobQueue;
+ std::filesystem::path m_BasePath;
Configuration m_Configuration;
std::atomic<uint64_t> m_HitCount{};
std::atomic<uint64_t> m_MissCount{};