aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-10-02 11:29:00 +0200
committerGitHub <[email protected]>2023-10-02 11:29:00 +0200
commit14deb110acac35e96afa72316f6cd871dfe04168 (patch)
tree9206028a2423c01f562d6e3b45b2e2c34947b611 /src
parentlightweight gc (#431) (diff)
downloadzen-14deb110acac35e96afa72316f6cd871dfe04168.tar.xz
zen-14deb110acac35e96afa72316f6cd871dfe04168.zip
Limit size of memory cache layer (#423)
- Feature: Limit the size ZenCacheMemoryLayer may use - `--cache-memlayer-targetfootprint` option to set which size (in bytes) it should be limited to, zero to have it unbounded - `--cache-memlayer-maxage` option to set how long (in seconds) cache items should be kept in the memory cache Do more "standard" GC rather than clearing everything. Tries to purge memory on Get/Put on the fly if exceeding limit - not sure if we should have a polling thread instead of adding overhead to Get/Put (however light it may be).
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/cache/cachememorylayer.cpp221
-rw-r--r--src/zenserver/cache/cachememorylayer.h23
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp72
-rw-r--r--src/zenserver/cache/structuredcachestore.h15
-rw-r--r--src/zenserver/config.cpp39
-rw-r--r--src/zenserver/config.h72
-rw-r--r--src/zenserver/zenserver.cpp20
7 files changed, 371 insertions, 91 deletions
diff --git a/src/zenserver/cache/cachememorylayer.cpp b/src/zenserver/cache/cachememorylayer.cpp
index be21e60f1..d48ee5aa8 100644
--- a/src/zenserver/cache/cachememorylayer.cpp
+++ b/src/zenserver/cache/cachememorylayer.cpp
@@ -4,13 +4,19 @@
#include <zencore/compactbinaryvalidation.h>
#include <zencore/compress.h>
+#include <zencore/fmtutils.h>
+#include <zencore/jobqueue.h>
+#include <zencore/scopeguard.h>
#include <zencore/trace.h>
//////////////////////////////////////////////////////////////////////////
namespace zen {
-ZenCacheMemoryLayer::ZenCacheMemoryLayer()
+ZenCacheMemoryLayer::ZenCacheMemoryLayer(JobQueue& JobQueue, const Configuration& Config)
+: m_JobQueue(JobQueue)
+, m_Configuration(Config)
+, m_LastTickTrim(GcClock::Clock::time_point::min().time_since_epoch().count())
{
}
@@ -21,6 +27,10 @@ ZenCacheMemoryLayer::~ZenCacheMemoryLayer()
bool
ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
+ if (m_Configuration.TargetFootprintBytes == 0)
+ {
+ return false;
+ }
ZEN_TRACE_CPU("Z$::Mem::Get");
RwLock::SharedLockScope _(m_Lock);
@@ -40,12 +50,18 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa
// inserts, the bucket delete path could end up deleting the
// underlying data structure
+ Trim();
+
return Bucket->Get(HashKey, OutValue);
}
void
ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
{
+ if (m_Configuration.TargetFootprintBytes == 0)
+ {
+ return;
+ }
ZEN_TRACE_CPU("Z$::Mem::Put");
const auto BucketName = std::string(InBucket);
@@ -77,9 +93,18 @@ ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const
}
}
- // Note that since the underlying IoBuffer is retained, the content type is also
+ Trim();
- Bucket->Put(HashKey, Value);
+ // Note that since the underlying IoBuffer is retained, the content type is also
+ int64_t SizeDiff = Bucket->Put(HashKey, Value);
+ if (SizeDiff > 0)
+ {
+ m_TotalSize.fetch_add(gsl::narrow<uint64_t>(SizeDiff));
+ }
+ else if (SizeDiff < 0)
+ {
+ m_TotalSize.fetch_sub(gsl::narrow<uint64_t>(-SizeDiff));
+ }
}
bool
@@ -92,6 +117,7 @@ ZenCacheMemoryLayer::DropBucket(std::string_view InBucket)
if (It != m_Buckets.end())
{
CacheBucket& Bucket = *It->second;
+ m_TotalSize.fetch_sub(Bucket.TotalSize());
m_DroppedBuckets.push_back(std::move(It->second));
m_Buckets.erase(It);
Bucket.Drop();
@@ -110,6 +136,7 @@ ZenCacheMemoryLayer::Drop()
{
const auto& It = m_Buckets.begin();
CacheBucket& Bucket = *It->second;
+ m_TotalSize.fetch_sub(Bucket.TotalSize());
m_DroppedBuckets.push_back(std::move(It->second));
m_Buckets.erase(It->first);
Bucket.Drop();
@@ -141,11 +168,101 @@ ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& Access
}
}
+uint64_t
+ZenCacheMemoryLayer::CollectGarbage(GcClock::TimePoint ExpireTime)
+{
+ uint64_t TrimmedSize = 0;
+ RwLock::SharedLockScope __(m_Lock);
+ for (auto& Kv : m_Buckets)
+ {
+ uint64_t BucketTrimmedSize = Kv.second->Trim(ExpireTime);
+ if (BucketTrimmedSize > 0)
+ {
+ m_TotalSize.fetch_sub(BucketTrimmedSize);
+ TrimmedSize += BucketTrimmedSize;
+ }
+ }
+ const GcClock::TimePoint Now = GcClock::Now();
+ const GcClock::Tick NowTick = Now.time_since_epoch().count();
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds);
+ GcClock::Tick LastTrimTick = m_LastTickTrim;
+ const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_LastTickTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
+ return TrimmedSize;
+}
+
void
-ZenCacheMemoryLayer::Reset()
+ZenCacheMemoryLayer::Trim()
{
- RwLock::ExclusiveLockScope _(m_Lock);
- m_Buckets.clear();
+ if (m_TotalSize <= m_Configuration.TargetFootprintBytes)
+ {
+ return;
+ }
+ if (m_Configuration.MaxAgeSeconds == 0 || m_Configuration.TrimIntervalSeconds == 0)
+ {
+ return;
+ }
+
+ const GcClock::TimePoint Now = GcClock::Now();
+
+ const GcClock::Tick NowTick = Now.time_since_epoch().count();
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds);
+ GcClock::Tick LastTrimTick = m_LastTickTrim;
+ const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count();
+ if (NowTick < NextAllowedTrimTick)
+ {
+ return;
+ }
+
+ bool Expected = false;
+ if (!m_IsTrimming.compare_exchange_strong(Expected, true))
+ {
+ return;
+ }
+
+ // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong
+ const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_LastTickTrim.store(NextTrimTick);
+
+ m_JobQueue.QueueJob("ZenCacheMemoryLayer::Trim", [this, Now, TrimInterval](JobContext&) {
+ ZEN_TRACE_CPU("Z$::Mem::Trim");
+
+ Stopwatch Timer;
+ uint64_t TrimmedSize = 0;
+ const auto Guard = MakeGuard([&] {
+ if (TrimmedSize > 0)
+ {
+ ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
+ NiceBytes(TrimmedSize),
+ NiceBytes(m_TotalSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ m_IsTrimming.store(false);
+ });
+
+ const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MaxAgeSeconds);
+
+ std::vector<uint64_t> UsageSlots;
+ UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count());
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ for (auto& Kv : m_Buckets)
+ {
+ Kv.second->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots);
+ }
+ }
+ uint64_t TotalSize = 0;
+ for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
+ {
+ TotalSize += UsageSlots[Index];
+ if (TotalSize >= m_Configuration.TargetFootprintBytes)
+ {
+ GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index);
+ TrimmedSize = CollectGarbage(ExpireTime);
+ break;
+ }
+ }
+ });
}
uint64_t
@@ -267,34 +384,36 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV
return false;
}
-void
+int64_t
ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
ZEN_TRACE_CPU("Z$::Mem::Bucket::Put");
metrics::OperationTiming::Scope $(m_PutOps);
- size_t PayloadSize = Value.Value.GetSize();
+ size_t PayloadSize = Value.Value.GetSize();
+ uint64_t OldPayloadSize = 0;
+
{
GcClock::Tick AccessTime = GcClock::TickCount();
RwLock::ExclusiveLockScope _(m_BucketLock);
- if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max())
- {
- // No more space in our memory cache!
- return;
- }
if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end())
{
uint32_t EntryIndex = It.value();
ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
- m_TotalSize.fetch_sub(PayloadSize, std::memory_order::relaxed);
BucketPayload& Payload = m_Payloads[EntryIndex];
+ OldPayloadSize = Payload.Payload.GetSize();
Payload.Payload = Value.Value;
Payload.RawHash = Value.RawHash;
Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize);
m_AccessTimes[EntryIndex] = AccessTime;
}
+ else if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max())
+ {
+ // No more space in our memory cache!
+ return 0;
+ }
else
{
uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size());
@@ -307,7 +426,17 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue
ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size());
}
- m_TotalSize.fetch_add(PayloadSize, std::memory_order::relaxed);
+ if (PayloadSize > OldPayloadSize)
+ {
+ m_TotalSize.fetch_add(PayloadSize - OldPayloadSize);
+ return gsl::narrow<int64_t>(PayloadSize - OldPayloadSize);
+ }
+ else if (PayloadSize < OldPayloadSize)
+ {
+ m_TotalSize.fetch_sub(OldPayloadSize - PayloadSize);
+ return -gsl::narrow<int64_t>(OldPayloadSize - PayloadSize);
+ }
+ return 0;
}
void
@@ -321,10 +450,72 @@ ZenCacheMemoryLayer::CacheBucket::Drop()
}
uint64_t
+ZenCacheMemoryLayer::CacheBucket::Trim(GcClock::TimePoint ExpireTime)
+{
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketPayload> Payloads;
+ tsl::robin_map<IoHash, uint32_t> CacheMap;
+
+ size_t TrimmedSize = 0;
+ GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
+
+ RwLock::ExclusiveLockScope _(m_BucketLock);
+ {
+ AccessTimes.reserve(m_CacheMap.size());
+ Payloads.reserve(m_CacheMap.size());
+ CacheMap.reserve(m_CacheMap.size());
+
+ for (const auto& Kv : m_CacheMap)
+ {
+ if (m_AccessTimes[Kv.second] < ExpireTicks)
+ {
+ size_t PayloadSize = m_Payloads[Kv.second].Payload.GetSize();
+ m_TotalSize.fetch_sub(PayloadSize);
+ TrimmedSize += PayloadSize;
+ continue;
+ }
+ size_t Index = gsl::narrow<uint32_t>(Payloads.size());
+ Payloads.emplace_back(m_Payloads[Kv.second]);
+ AccessTimes.push_back(m_AccessTimes[Kv.second]);
+ CacheMap.insert_or_assign(Kv.first, Index);
+ }
+
+ m_AccessTimes.swap(AccessTimes);
+ m_Payloads.swap(Payloads);
+ m_CacheMap.swap(CacheMap);
+ }
+ return TrimmedSize;
+}
+
+uint64_t
ZenCacheMemoryLayer::CacheBucket::EntryCount() const
{
RwLock::SharedLockScope _(m_BucketLock);
return static_cast<uint64_t>(m_CacheMap.size());
}
+void
+ZenCacheMemoryLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart,
+ GcClock::Duration SectionLength,
+ std::vector<uint64_t>& InOutUsageSlots)
+{
+ RwLock::SharedLockScope _(m_BucketLock);
+ for (const auto& It : m_CacheMap)
+ {
+ uint32_t Index = It.second;
+ GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index]));
+ GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch();
+ uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0);
+ if (Slot >= InOutUsageSlots.capacity())
+ {
+ Slot = InOutUsageSlots.capacity() - 1;
+ }
+ if (Slot > InOutUsageSlots.size())
+ {
+ InOutUsageSlots.resize(uint64_t(Slot + 1), 0);
+ }
+ InOutUsageSlots[Slot] += m_Payloads[Index].Payload.GetSize();
+ }
+}
+
} // namespace zen
diff --git a/src/zenserver/cache/cachememorylayer.h b/src/zenserver/cache/cachememorylayer.h
index 0ef0c905f..f15fe241b 100644
--- a/src/zenserver/cache/cachememorylayer.h
+++ b/src/zenserver/cache/cachememorylayer.h
@@ -19,6 +19,8 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+class JobQueue;
+
/** In-memory cache storage
Intended for small values which are frequently accessed
@@ -31,8 +33,9 @@ class ZenCacheMemoryLayer
public:
struct Configuration
{
- uint64_t TargetFootprintBytes = 16 * 1024 * 1024;
- uint64_t ScavengeThreshold = 4 * 1024 * 1024;
+ uint64_t TargetFootprintBytes = 512 * 1024 * 1024;
+ uint64_t TrimIntervalSeconds = 60;
+ uint64_t MaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count());
};
struct BucketInfo
@@ -49,16 +52,16 @@ public:
uint64_t TotalSize = 0;
};
- ZenCacheMemoryLayer();
+ ZenCacheMemoryLayer(JobQueue& JobQueue, const Configuration& Config);
~ZenCacheMemoryLayer();
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
+ uint64_t CollectGarbage(GcClock::TimePoint ExpireTime);
void Drop();
bool DropBucket(std::string_view Bucket);
void ScrubStorage(ScrubContext& Ctx);
void GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes);
- void Reset();
uint64_t TotalSize() const;
Info GetInfo() const;
@@ -68,6 +71,8 @@ public:
void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; }
private:
+ void Trim();
+
struct CacheBucket
{
#pragma pack(push)
@@ -93,21 +98,27 @@ private:
std::atomic_uint64_t m_TotalSize{};
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value);
+ int64_t Put(const IoHash& HashKey, const ZenCacheValue& Value);
+ uint64_t Trim(GcClock::TimePoint ExpireTime);
void Drop();
void ScrubStorage(ScrubContext& Ctx);
void GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes);
inline uint64_t TotalSize() const { return m_TotalSize; }
uint64_t EntryCount() const;
+ void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots);
};
+ JobQueue& m_JobQueue;
mutable RwLock m_Lock;
std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets;
std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
Configuration m_Configuration;
+ std::atomic_uint64_t m_TotalSize{};
+ std::atomic_bool m_IsTrimming = false;
+ std::atomic<GcClock::Tick> m_LastTickTrim;
ZenCacheMemoryLayer(const ZenCacheMemoryLayer&) = delete;
ZenCacheMemoryLayer& operator=(const ZenCacheMemoryLayer&) = delete;
};
-} // namespace zen \ No newline at end of file
+} // namespace zen
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index 9e14892e3..0a2947b16 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -33,6 +33,7 @@ ZEN_THIRD_PARTY_INCLUDES_START
ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_WITH_TESTS
+# include <zencore/jobqueue.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
# include <zencore/workthreadpool.h>
@@ -42,10 +43,15 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
-ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir)
+ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc,
+ JobQueue& JobQueue,
+ const std::filesystem::path& RootDir,
+ const ZenCacheMemoryLayer::Configuration MemLayerConfig)
: GcStorage(Gc)
, GcContributor(Gc)
, m_RootDir(RootDir)
+, m_JobQueue(JobQueue)
+, m_MemLayer(m_JobQueue, MemLayerConfig)
, m_DiskLayer(RootDir)
{
ZEN_INFO("initializing structured cache at '{}'", RootDir);
@@ -187,7 +193,7 @@ ZenCacheNamespace::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::Namespace::CollectGarbage");
- m_MemLayer.Reset();
+ (void)m_MemLayer.CollectGarbage(GcCtx.CacheExpireTime());
m_DiskLayer.CollectGarbage(GcCtx);
}
@@ -252,10 +258,14 @@ ZEN_DEFINE_LOG_CATEGORY_STATIC(LogCacheActivity, "z$");
static constinit std::string_view UE4DDCNamespaceName = "ue4.ddc";
-ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration, const DiskWriteBlocker* InDiskWriteBlocker)
+ZenCacheStore::ZenCacheStore(GcManager& Gc,
+ JobQueue& JobQueue,
+ const Configuration& Configuration,
+ const DiskWriteBlocker* InDiskWriteBlocker)
: m_Log(logging::Get("z$"))
, m_DiskWriteBlocker(InDiskWriteBlocker)
, m_Gc(Gc)
+, m_JobQueue(JobQueue)
, m_Configuration(Configuration)
, m_ExitLogging(false)
{
@@ -293,7 +303,10 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration,
for (const std::string& NamespaceName : Namespaces)
{
m_Namespaces[NamespaceName] =
- std::make_unique<ZenCacheNamespace>(Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName));
+ std::make_unique<ZenCacheNamespace>(Gc,
+ m_JobQueue,
+ m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName),
+ m_Configuration.MemLayerConfig);
}
}
@@ -577,7 +590,10 @@ ZenCacheStore::GetNamespace(std::string_view Namespace)
auto NewNamespace = m_Namespaces.insert_or_assign(
std::string(Namespace),
- std::make_unique<ZenCacheNamespace>(m_Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace)));
+ std::make_unique<ZenCacheNamespace>(m_Gc,
+ m_JobQueue,
+ m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace),
+ m_Configuration.MemLayerConfig));
return NewNamespace.first->second.get();
}
@@ -752,7 +768,8 @@ TEST_CASE("z$.store")
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
const int kIterationCount = 100;
@@ -788,6 +805,8 @@ TEST_CASE("z$.store")
TEST_CASE("z$.size")
{
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
const auto CreateCacheValue = [](size_t Size) -> CbObject {
std::vector<uint8_t> Buf;
Buf.resize(Size);
@@ -806,7 +825,7 @@ TEST_CASE("z$.size")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256);
@@ -826,7 +845,7 @@ TEST_CASE("z$.size")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
const GcStorageSize SerializedSize = Zcs.StorageSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
@@ -849,7 +868,7 @@ TEST_CASE("z$.size")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64);
@@ -869,7 +888,7 @@ TEST_CASE("z$.size")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
const GcStorageSize SerializedSize = Zcs.StorageSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
@@ -888,6 +907,8 @@ TEST_CASE("z$.gc")
{
using namespace testutils;
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
SUBCASE("gather references does NOT add references for expired cache entries")
{
ScopedTemporaryDirectory TempDir;
@@ -906,7 +927,7 @@ TEST_CASE("z$.gc")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
const auto Bucket = "teardrinker"sv;
// Create a cache record
@@ -943,7 +964,7 @@ TEST_CASE("z$.gc")
// Expect timestamps to be serialized
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
std::vector<IoHash> Keep;
// Collect garbage with 1 hour max cache duration
@@ -964,7 +985,7 @@ TEST_CASE("z$.gc")
{
ScopedTemporaryDirectory TempDir;
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
const auto Bucket = "fortysixandtwo"sv;
const GcClock::TimePoint CurrentTime = GcClock::Now();
@@ -1009,7 +1030,7 @@ TEST_CASE("z$.gc")
{
ScopedTemporaryDirectory TempDir;
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
const auto Bucket = "rightintwo"sv;
std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
@@ -1104,7 +1125,8 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
WorkerThreadPool ThreadPool(4);
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path());
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path());
{
std::atomic<size_t> WorkCompleted = 0;
@@ -1313,6 +1335,8 @@ TEST_CASE("z$.namespaces")
return Writer.Save();
};
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
ScopedTemporaryDirectory TempDir;
CreateDirectories(TempDir.Path());
@@ -1321,7 +1345,7 @@ TEST_CASE("z$.namespaces")
IoHash Key2;
{
GcManager Gc;
- ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false}, nullptr);
+ ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false}, nullptr);
const auto Bucket = "teardrinker"sv;
const auto CustomNamespace = "mynamespace"sv;
@@ -1346,7 +1370,7 @@ TEST_CASE("z$.namespaces")
{
GcManager Gc;
- ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr);
+ ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr);
const auto Bucket = "teardrinker"sv;
const auto CustomNamespace = "mynamespace"sv;
@@ -1379,6 +1403,8 @@ TEST_CASE("z$.drop.bucket")
return Writer.Save();
};
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
ScopedTemporaryDirectory TempDir;
CreateDirectories(TempDir.Path());
@@ -1407,7 +1433,7 @@ TEST_CASE("z$.drop.bucket")
WorkerThreadPool Workers(1);
{
GcManager Gc;
- ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr);
+ ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr);
const auto Bucket = "teardrinker"sv;
const auto Namespace = "mynamespace"sv;
@@ -1454,6 +1480,8 @@ TEST_CASE("z$.drop.namespace")
return Writer.Save();
};
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+
ScopedTemporaryDirectory TempDir;
CreateDirectories(TempDir.Path());
@@ -1478,7 +1506,7 @@ TEST_CASE("z$.drop.namespace")
WorkerThreadPool Workers(1);
{
GcManager Gc;
- ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr);
+ ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr);
const auto Bucket1 = "teardrinker1"sv;
const auto Bucket2 = "teardrinker2"sv;
const auto Namespace1 = "mynamespace1"sv;
@@ -1543,7 +1571,8 @@ TEST_CASE("z$.blocked.disklayer.put")
};
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(64 * 1024 + 64);
@@ -1637,7 +1666,8 @@ TEST_CASE("z$.scrub")
GcManager Gc;
CidStore CidStore(Gc);
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ auto JobQueue = MakeJobQueue(1, "testqueue");
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h
index 0dd160a98..0c87ecc22 100644
--- a/src/zenserver/cache/structuredcachestore.h
+++ b/src/zenserver/cache/structuredcachestore.h
@@ -39,6 +39,7 @@ namespace zen {
class WorkerThreadPool;
class DiskWriteBlocker;
+class JobQueue;
/* Z$ namespace
@@ -77,7 +78,10 @@ public:
ZenCacheDiskLayer::DiskStats DiskStats;
};
- ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir);
+ ZenCacheNamespace(GcManager& Gc,
+ JobQueue& JobQueue,
+ const std::filesystem::path& RootDir,
+ const ZenCacheMemoryLayer::Configuration MemLayerConfig = {});
~ZenCacheNamespace();
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -108,6 +112,7 @@ public:
private:
std::filesystem::path m_RootDir;
+ JobQueue& m_JobQueue;
ZenCacheMemoryLayer m_MemLayer;
ZenCacheDiskLayer m_DiskLayer;
std::atomic<uint64_t> m_HitCount{};
@@ -137,8 +142,9 @@ public:
struct Configuration
{
- std::filesystem::path BasePath;
- bool AllowAutomaticCreationOfNamespaces = false;
+ std::filesystem::path BasePath;
+ bool AllowAutomaticCreationOfNamespaces = false;
+ ZenCacheMemoryLayer::Configuration MemLayerConfig;
struct LogConfig
{
bool EnableWriteLog = true;
@@ -171,7 +177,7 @@ public:
std::vector<NamedNamespaceStats> NamespaceStats;
};
- ZenCacheStore(GcManager& Gc, const Configuration& Configuration, const DiskWriteBlocker* InDiskWriteBlocker);
+ ZenCacheStore(GcManager& Gc, JobQueue& JobQueue, const Configuration& Configuration, const DiskWriteBlocker* InDiskWriteBlocker);
~ZenCacheStore();
bool Get(const CacheRequestContext& Context,
@@ -222,6 +228,7 @@ private:
std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces;
GcManager& m_Gc;
+ JobQueue& m_JobQueue;
Configuration m_Configuration;
std::atomic<uint64_t> m_HitCount{};
std::atomic<uint64_t> m_MissCount{};
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index 6daf7fe1c..5e24d174b 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -814,9 +814,17 @@ ParseConfigFile(const std::filesystem::path& Path,
LuaOptions.AddOption("trace.file"sv, ServerOptions.TraceFile, "tracefile"sv);
////// cache
- LuaOptions.AddOption("cache.enable"sv, ServerOptions.StructuredCacheEnabled);
- LuaOptions.AddOption("cache.writelog"sv, ServerOptions.StructuredCacheWriteLogEnabled);
- LuaOptions.AddOption("cache.accesslog"sv, ServerOptions.StructuredCacheAccessLogEnabled);
+ LuaOptions.AddOption("cache.enable"sv, ServerOptions.StructuredCacheConfig.Enabled);
+ LuaOptions.AddOption("cache.writelog"sv, ServerOptions.StructuredCacheConfig.WriteLogEnabled, "cache-write-log");
+ LuaOptions.AddOption("cache.accesslog"sv, ServerOptions.StructuredCacheConfig.AccessLogEnabled, "cache-access-log");
+
+ LuaOptions.AddOption("cache.memlayer.targetfootprint"sv,
+ ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes,
+ "cache-memlayer-targetfootprint");
+ LuaOptions.AddOption("cache.memlayer.triminterval"sv,
+ ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds,
+ "cache-memlayer-triminterval");
+ LuaOptions.AddOption("cache.memlayer.maxage"sv, ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds, "cache-memlayer-maxage");
////// cache.upstream
LuaOptions.AddOption("cache.upstream.policy"sv, ServerOptions.UpstreamCacheConfig.CachePolicy, "upstream-cache-policy"sv);
@@ -1236,14 +1244,35 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
"",
"cache-write-log",
"Whether cache write log is enabled",
- cxxopts::value<bool>(ServerOptions.StructuredCacheWriteLogEnabled)->default_value("true"),
+ cxxopts::value<bool>(ServerOptions.StructuredCacheConfig.WriteLogEnabled)->default_value("true"),
"");
options.add_option("cache",
"",
"cache-access-log",
"Whether cache access log is enabled",
- cxxopts::value<bool>(ServerOptions.StructuredCacheAccessLogEnabled)->default_value("true"),
+ cxxopts::value<bool>(ServerOptions.StructuredCacheConfig.AccessLogEnabled)->default_value("true"),
+ "");
+
+ options.add_option("cache",
+ "",
+ "cache-memlayer-targetfootprint",
+ "Max allowed memory used by cache memory layer per namespace in bytes. Default set to 536870912 (512 Mb).",
+ cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes)->default_value("536870912"),
+ "");
+
+ options.add_option("cache",
+ "",
+ "cache-memlayer-triminterval",
+ "Minimum time between each attempt to trim cache memory layers in seconds. Default set to 60 (1 min).",
+ cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds)->default_value("60"),
+ "");
+
+ options.add_option("cache",
+ "",
+ "cache-memlayer-maxage",
+ "Maximum age of payloads when trimming cache memory layers in seconds. Default set to 86400 (1 day).",
+ cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds)->default_value("86400"),
"");
options.add_option("compute",
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index bb6e20bb6..924375a19 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -116,40 +116,48 @@ struct ZenObjectStoreConfig
std::vector<BucketConfig> Buckets;
};
+struct ZenStructuredCacheConfig
+{
+ bool Enabled = true;
+ bool WriteLogEnabled = false;
+ bool AccessLogEnabled = false;
+ uint64_t MemTargetFootprintBytes = 512 * 1024 * 1024;
+ uint64_t MemTrimIntervalSeconds = 60;
+ uint64_t MemMaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count());
+};
+
struct ZenServerOptions
{
- ZenUpstreamCacheConfig UpstreamCacheConfig;
- ZenGcConfig GcConfig;
- ZenAuthConfig AuthConfig;
- ZenObjectStoreConfig ObjectStoreConfig;
- zen::HttpServerConfig HttpServerConfig;
- std::filesystem::path DataDir; // Root directory for state (used for testing)
- std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
- std::filesystem::path AbsLogFile; // Absolute path to main log file
- std::filesystem::path ConfigFile; // Path to Lua config file
- std::string ChildId; // Id assigned by parent process (used for lifetime management)
- std::string LogId; // Id for tagging log output
- std::string EncryptionKey; // 256 bit AES encryption key
- std::string EncryptionIV; // 128 bit AES initialization vector
- int BasePort = 1337; // Service listen port (used for both UDP and TCP)
- int OwnerPid = 0; // Parent process id (zero for standalone)
- int WebSocketPort = 0; // Web socket port (Zero = disabled)
- int WebSocketThreads = 0;
- bool InstallService = false; // Flag used to initiate service install (temporary)
- bool UninstallService = false; // Flag used to initiate service uninstall (temporary)
- bool IsDebug = false;
- bool IsTest = false;
- bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements
- bool StructuredCacheEnabled = true;
- bool StructuredCacheWriteLogEnabled = false;
- bool StructuredCacheAccessLogEnabled = false;
- bool ExecServiceEnabled = true;
- bool ComputeServiceEnabled = true;
- bool ShouldCrash = false; // Option for testing crash handling
- bool IsFirstRun = false;
- bool NoSentry = false;
- bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports
- bool ObjectStoreEnabled = false;
+ ZenUpstreamCacheConfig UpstreamCacheConfig;
+ ZenGcConfig GcConfig;
+ ZenAuthConfig AuthConfig;
+ ZenObjectStoreConfig ObjectStoreConfig;
+ zen::HttpServerConfig HttpServerConfig;
+ ZenStructuredCacheConfig StructuredCacheConfig;
+ std::filesystem::path DataDir; // Root directory for state (used for testing)
+ std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
+ std::filesystem::path AbsLogFile; // Absolute path to main log file
+ std::filesystem::path ConfigFile; // Path to Lua config file
+ std::string ChildId; // Id assigned by parent process (used for lifetime management)
+ std::string LogId; // Id for tagging log output
+ std::string EncryptionKey; // 256 bit AES encryption key
+ std::string EncryptionIV; // 128 bit AES initialization vector
+ int BasePort = 1337; // Service listen port (used for both UDP and TCP)
+ int OwnerPid = 0; // Parent process id (zero for standalone)
+ int WebSocketPort = 0; // Web socket port (Zero = disabled)
+ int WebSocketThreads = 0;
+ bool InstallService = false; // Flag used to initiate service install (temporary)
+ bool UninstallService = false; // Flag used to initiate service uninstall (temporary)
+ bool IsDebug = false;
+ bool IsTest = false;
+ bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements
+ bool ExecServiceEnabled = true;
+ bool ComputeServiceEnabled = true;
+ bool ShouldCrash = false; // Option for testing crash handling
+ bool IsFirstRun = false;
+ bool NoSentry = false;
+ bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports
+ bool ObjectStoreEnabled = false;
#if ZEN_WITH_TRACE
std::string TraceHost; // Host name or IP address to send trace data to
std::string TraceFile; // Path of a file to write a trace
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index e4143dc01..cf9f03d89 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -383,7 +383,7 @@ public:
}
#endif // ZEN_WITH_COMPUTE_SERVICES
- if (ServerOptions.StructuredCacheEnabled)
+ if (ServerOptions.StructuredCacheConfig.Enabled)
{
InitializeStructuredCache(ServerOptions);
}
@@ -946,13 +946,17 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
using namespace std::literals;
ZEN_INFO("instantiating structured cache service");
- m_CacheStore =
- new ZenCacheStore(m_GcManager,
- ZenCacheStore::Configuration{.BasePath = m_DataRoot / "cache",
- .AllowAutomaticCreationOfNamespaces = true,
- .Logging = {.EnableWriteLog = ServerOptions.StructuredCacheWriteLogEnabled,
- .EnableAccessLog = ServerOptions.StructuredCacheAccessLogEnabled}},
- m_GcManager.GetDiskWriteBlocker());
+ m_CacheStore = new ZenCacheStore(
+ m_GcManager,
+ *m_JobQueue,
+ ZenCacheStore::Configuration{.BasePath = m_DataRoot / "cache",
+ .AllowAutomaticCreationOfNamespaces = true,
+ .MemLayerConfig = {.TargetFootprintBytes = ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes,
+ .TrimIntervalSeconds = ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds,
+ .MaxAgeSeconds = ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds},
+ .Logging = {.EnableWriteLog = ServerOptions.StructuredCacheConfig.WriteLogEnabled,
+ .EnableAccessLog = ServerOptions.StructuredCacheConfig.AccessLogEnabled}},
+ m_GcManager.GetDiskWriteBlocker());
const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;