diff options
| author | Dan Engelbrecht <[email protected]> | 2023-10-02 11:29:00 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-02 11:29:00 +0200 |
| commit | 14deb110acac35e96afa72316f6cd871dfe04168 (patch) | |
| tree | 9206028a2423c01f562d6e3b45b2e2c34947b611 /src | |
| parent | lightweight gc (#431) (diff) | |
| download | zen-14deb110acac35e96afa72316f6cd871dfe04168.tar.xz zen-14deb110acac35e96afa72316f6cd871dfe04168.zip | |
Limit size of memory cache layer (#423)
- Feature: Limit the size ZenCacheMemoryLayer may use
- `--cache-memlayer-targetfootprint` option to set which size (in bytes) it should be limited to, zero to have it unbounded
- `--cache-memlayer-maxage` option to set how long (in seconds) cache items should be kept in the memory cache
Do more "standard" GC rather than clearing everything.
Tries to purge memory on Get/Put on the fly if exceeding limit - not sure if we should have a polling thread instead of adding overhead to Get/Put (however light it may be).
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/cache/cachememorylayer.cpp | 221 | ||||
| -rw-r--r-- | src/zenserver/cache/cachememorylayer.h | 23 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 72 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.h | 15 | ||||
| -rw-r--r-- | src/zenserver/config.cpp | 39 | ||||
| -rw-r--r-- | src/zenserver/config.h | 72 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 20 |
7 files changed, 371 insertions, 91 deletions
diff --git a/src/zenserver/cache/cachememorylayer.cpp b/src/zenserver/cache/cachememorylayer.cpp index be21e60f1..d48ee5aa8 100644 --- a/src/zenserver/cache/cachememorylayer.cpp +++ b/src/zenserver/cache/cachememorylayer.cpp @@ -4,13 +4,19 @@ #include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> +#include <zencore/fmtutils.h> +#include <zencore/jobqueue.h> +#include <zencore/scopeguard.h> #include <zencore/trace.h> ////////////////////////////////////////////////////////////////////////// namespace zen { -ZenCacheMemoryLayer::ZenCacheMemoryLayer() +ZenCacheMemoryLayer::ZenCacheMemoryLayer(JobQueue& JobQueue, const Configuration& Config) +: m_JobQueue(JobQueue) +, m_Configuration(Config) +, m_LastTickTrim(GcClock::Clock::time_point::min().time_since_epoch().count()) { } @@ -21,6 +27,10 @@ ZenCacheMemoryLayer::~ZenCacheMemoryLayer() bool ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { + if (m_Configuration.TargetFootprintBytes == 0) + { + return false; + } ZEN_TRACE_CPU("Z$::Mem::Get"); RwLock::SharedLockScope _(m_Lock); @@ -40,12 +50,18 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa // inserts, the bucket delete path could end up deleting the // underlying data structure + Trim(); + return Bucket->Get(HashKey, OutValue); } void ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { + if (m_Configuration.TargetFootprintBytes == 0) + { + return; + } ZEN_TRACE_CPU("Z$::Mem::Put"); const auto BucketName = std::string(InBucket); @@ -77,9 +93,18 @@ ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const } } - // Note that since the underlying IoBuffer is retained, the content type is also + Trim(); - Bucket->Put(HashKey, Value); + // Note that since the underlying IoBuffer is retained, the content type is also + int64_t SizeDiff = Bucket->Put(HashKey, Value); + if (SizeDiff > 0) + { + m_TotalSize.fetch_add(gsl::narrow<uint64_t>(SizeDiff)); + } + else if (SizeDiff < 0) + { + m_TotalSize.fetch_sub(gsl::narrow<uint64_t>(-SizeDiff)); + } } bool @@ -92,6 +117,7 @@ ZenCacheMemoryLayer::DropBucket(std::string_view InBucket) if (It != m_Buckets.end()) { CacheBucket& Bucket = *It->second; + m_TotalSize.fetch_sub(Bucket.TotalSize()); m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); Bucket.Drop(); @@ -110,6 +136,7 @@ ZenCacheMemoryLayer::Drop() { const auto& It = m_Buckets.begin(); CacheBucket& Bucket = *It->second; + m_TotalSize.fetch_sub(Bucket.TotalSize()); m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); Bucket.Drop(); @@ -141,11 +168,101 @@ ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& Access } } +uint64_t +ZenCacheMemoryLayer::CollectGarbage(GcClock::TimePoint ExpireTime) +{ + uint64_t TrimmedSize = 0; + RwLock::SharedLockScope __(m_Lock); + for (auto& Kv : m_Buckets) + { + uint64_t BucketTrimmedSize = Kv.second->Trim(ExpireTime); + if (BucketTrimmedSize > 0) + { + m_TotalSize.fetch_sub(BucketTrimmedSize); + TrimmedSize += BucketTrimmedSize; + } + } + const GcClock::TimePoint Now = GcClock::Now(); + const GcClock::Tick NowTick = Now.time_since_epoch().count(); + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds); + GcClock::Tick LastTrimTick = m_LastTickTrim; + const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_LastTickTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); + return TrimmedSize; +} + void -ZenCacheMemoryLayer::Reset() +ZenCacheMemoryLayer::Trim() { - RwLock::ExclusiveLockScope _(m_Lock); - m_Buckets.clear(); + if (m_TotalSize <= m_Configuration.TargetFootprintBytes) + { + return; + } + if (m_Configuration.MaxAgeSeconds == 0 || m_Configuration.TrimIntervalSeconds == 0) + { + return; + } + + const GcClock::TimePoint Now = GcClock::Now(); + + const GcClock::Tick NowTick = Now.time_since_epoch().count(); + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds); + GcClock::Tick LastTrimTick = m_LastTickTrim; + const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count(); + if (NowTick < NextAllowedTrimTick) + { + return; + } + + bool Expected = false; + if (!m_IsTrimming.compare_exchange_strong(Expected, true)) + { + return; + } + + // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong + const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_LastTickTrim.store(NextTrimTick); + + m_JobQueue.QueueJob("ZenCacheMemoryLayer::Trim", [this, Now, TrimInterval](JobContext&) { + ZEN_TRACE_CPU("Z$::Mem::Trim"); + + Stopwatch Timer; + uint64_t TrimmedSize = 0; + const auto Guard = MakeGuard([&] { + if (TrimmedSize > 0) + { + ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", + NiceBytes(TrimmedSize), + NiceBytes(m_TotalSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + m_IsTrimming.store(false); + }); + + const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MaxAgeSeconds); + + std::vector<uint64_t> UsageSlots; + UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count()); + { + RwLock::SharedLockScope __(m_Lock); + for (auto& Kv : m_Buckets) + { + Kv.second->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots); + } + } + uint64_t TotalSize = 0; + for (size_t Index = 0; Index < UsageSlots.size(); ++Index) + { + TotalSize += UsageSlots[Index]; + if (TotalSize >= m_Configuration.TargetFootprintBytes) + { + GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index); + TrimmedSize = CollectGarbage(ExpireTime); + break; + } + } + }); } uint64_t @@ -267,34 +384,36 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV return false; } -void +int64_t ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { ZEN_TRACE_CPU("Z$::Mem::Bucket::Put"); metrics::OperationTiming::Scope $(m_PutOps); - size_t PayloadSize = Value.Value.GetSize(); + size_t PayloadSize = Value.Value.GetSize(); + uint64_t OldPayloadSize = 0; + { GcClock::Tick AccessTime = GcClock::TickCount(); RwLock::ExclusiveLockScope _(m_BucketLock); - if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max()) - { - // No more space in our memory cache! - return; - } if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) { uint32_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - m_TotalSize.fetch_sub(PayloadSize, std::memory_order::relaxed); BucketPayload& Payload = m_Payloads[EntryIndex]; + OldPayloadSize = Payload.Payload.GetSize(); Payload.Payload = Value.Value; Payload.RawHash = Value.RawHash; Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize); m_AccessTimes[EntryIndex] = AccessTime; } + else if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max()) + { + // No more space in our memory cache! + return 0; + } else { uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size()); @@ -307,7 +426,17 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); } - m_TotalSize.fetch_add(PayloadSize, std::memory_order::relaxed); + if (PayloadSize > OldPayloadSize) + { + m_TotalSize.fetch_add(PayloadSize - OldPayloadSize); + return gsl::narrow<int64_t>(PayloadSize - OldPayloadSize); + } + else if (PayloadSize < OldPayloadSize) + { + m_TotalSize.fetch_sub(OldPayloadSize - PayloadSize); + return -gsl::narrow<int64_t>(OldPayloadSize - PayloadSize); + } + return 0; } void @@ -321,10 +450,72 @@ ZenCacheMemoryLayer::CacheBucket::Drop() } uint64_t +ZenCacheMemoryLayer::CacheBucket::Trim(GcClock::TimePoint ExpireTime) +{ + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + tsl::robin_map<IoHash, uint32_t> CacheMap; + + size_t TrimmedSize = 0; + GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); + + RwLock::ExclusiveLockScope _(m_BucketLock); + { + AccessTimes.reserve(m_CacheMap.size()); + Payloads.reserve(m_CacheMap.size()); + CacheMap.reserve(m_CacheMap.size()); + + for (const auto& Kv : m_CacheMap) + { + if (m_AccessTimes[Kv.second] < ExpireTicks) + { + size_t PayloadSize = m_Payloads[Kv.second].Payload.GetSize(); + m_TotalSize.fetch_sub(PayloadSize); + TrimmedSize += PayloadSize; + continue; + } + size_t Index = gsl::narrow<uint32_t>(Payloads.size()); + Payloads.emplace_back(m_Payloads[Kv.second]); + AccessTimes.push_back(m_AccessTimes[Kv.second]); + CacheMap.insert_or_assign(Kv.first, Index); + } + + m_AccessTimes.swap(AccessTimes); + m_Payloads.swap(Payloads); + m_CacheMap.swap(CacheMap); + } + return TrimmedSize; +} + +uint64_t ZenCacheMemoryLayer::CacheBucket::EntryCount() const { RwLock::SharedLockScope _(m_BucketLock); return static_cast<uint64_t>(m_CacheMap.size()); } +void +ZenCacheMemoryLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, + GcClock::Duration SectionLength, + std::vector<uint64_t>& InOutUsageSlots) +{ + RwLock::SharedLockScope _(m_BucketLock); + for (const auto& It : m_CacheMap) + { + uint32_t Index = It.second; + GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); + GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch(); + uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0); + if (Slot >= InOutUsageSlots.capacity()) + { + Slot = InOutUsageSlots.capacity() - 1; + } + if (Slot > InOutUsageSlots.size()) + { + InOutUsageSlots.resize(uint64_t(Slot + 1), 0); + } + InOutUsageSlots[Slot] += m_Payloads[Index].Payload.GetSize(); + } +} + } // namespace zen diff --git a/src/zenserver/cache/cachememorylayer.h b/src/zenserver/cache/cachememorylayer.h index 0ef0c905f..f15fe241b 100644 --- a/src/zenserver/cache/cachememorylayer.h +++ b/src/zenserver/cache/cachememorylayer.h @@ -19,6 +19,8 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +class JobQueue; + /** In-memory cache storage Intended for small values which are frequently accessed @@ -31,8 +33,9 @@ class ZenCacheMemoryLayer public: struct Configuration { - uint64_t TargetFootprintBytes = 16 * 1024 * 1024; - uint64_t ScavengeThreshold = 4 * 1024 * 1024; + uint64_t TargetFootprintBytes = 512 * 1024 * 1024; + uint64_t TrimIntervalSeconds = 60; + uint64_t MaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count()); }; struct BucketInfo @@ -49,16 +52,16 @@ public: uint64_t TotalSize = 0; }; - ZenCacheMemoryLayer(); + ZenCacheMemoryLayer(JobQueue& JobQueue, const Configuration& Config); ~ZenCacheMemoryLayer(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + uint64_t CollectGarbage(GcClock::TimePoint ExpireTime); void Drop(); bool DropBucket(std::string_view Bucket); void ScrubStorage(ScrubContext& Ctx); void GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes); - void Reset(); uint64_t TotalSize() const; Info GetInfo() const; @@ -68,6 +71,8 @@ public: void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; } private: + void Trim(); + struct CacheBucket { #pragma pack(push) @@ -93,21 +98,27 @@ private: std::atomic_uint64_t m_TotalSize{}; bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value); + int64_t Put(const IoHash& HashKey, const ZenCacheValue& Value); + uint64_t Trim(GcClock::TimePoint ExpireTime); void Drop(); void ScrubStorage(ScrubContext& Ctx); void GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes); inline uint64_t TotalSize() const { return m_TotalSize; } uint64_t EntryCount() const; + void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots); }; + JobQueue& m_JobQueue; mutable RwLock m_Lock; std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; Configuration m_Configuration; + std::atomic_uint64_t m_TotalSize{}; + std::atomic_bool m_IsTrimming = false; + std::atomic<GcClock::Tick> m_LastTickTrim; ZenCacheMemoryLayer(const ZenCacheMemoryLayer&) = delete; ZenCacheMemoryLayer& operator=(const ZenCacheMemoryLayer&) = delete; }; -} // namespace zen
\ No newline at end of file +} // namespace zen diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index 9e14892e3..0a2947b16 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -33,6 +33,7 @@ ZEN_THIRD_PARTY_INCLUDES_START ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS +# include <zencore/jobqueue.h> # include <zencore/testing.h> # include <zencore/testutils.h> # include <zencore/workthreadpool.h> @@ -42,10 +43,15 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { -ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir) +ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, + JobQueue& JobQueue, + const std::filesystem::path& RootDir, + const ZenCacheMemoryLayer::Configuration MemLayerConfig) : GcStorage(Gc) , GcContributor(Gc) , m_RootDir(RootDir) +, m_JobQueue(JobQueue) +, m_MemLayer(m_JobQueue, MemLayerConfig) , m_DiskLayer(RootDir) { ZEN_INFO("initializing structured cache at '{}'", RootDir); @@ -187,7 +193,7 @@ ZenCacheNamespace::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Namespace::CollectGarbage"); - m_MemLayer.Reset(); + (void)m_MemLayer.CollectGarbage(GcCtx.CacheExpireTime()); m_DiskLayer.CollectGarbage(GcCtx); } @@ -252,10 +258,14 @@ ZEN_DEFINE_LOG_CATEGORY_STATIC(LogCacheActivity, "z$"); static constinit std::string_view UE4DDCNamespaceName = "ue4.ddc"; -ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration, const DiskWriteBlocker* InDiskWriteBlocker) +ZenCacheStore::ZenCacheStore(GcManager& Gc, + JobQueue& JobQueue, + const Configuration& Configuration, + const DiskWriteBlocker* InDiskWriteBlocker) : m_Log(logging::Get("z$")) , m_DiskWriteBlocker(InDiskWriteBlocker) , m_Gc(Gc) +, m_JobQueue(JobQueue) , m_Configuration(Configuration) , m_ExitLogging(false) { @@ -293,7 +303,10 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration, for (const std::string& NamespaceName : Namespaces) { m_Namespaces[NamespaceName] = - std::make_unique<ZenCacheNamespace>(Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName)); + std::make_unique<ZenCacheNamespace>(Gc, + m_JobQueue, + m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName), + m_Configuration.MemLayerConfig); } } @@ -577,7 +590,10 @@ ZenCacheStore::GetNamespace(std::string_view Namespace) auto NewNamespace = m_Namespaces.insert_or_assign( std::string(Namespace), - std::make_unique<ZenCacheNamespace>(m_Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace))); + std::make_unique<ZenCacheNamespace>(m_Gc, + m_JobQueue, + m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace), + m_Configuration.MemLayerConfig)); return NewNamespace.first->second.get(); } @@ -752,7 +768,8 @@ TEST_CASE("z$.store") GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + auto JobQueue = MakeJobQueue(1, "testqueue"); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); const int kIterationCount = 100; @@ -788,6 +805,8 @@ TEST_CASE("z$.store") TEST_CASE("z$.size") { + auto JobQueue = MakeJobQueue(1, "testqueue"); + const auto CreateCacheValue = [](size_t Size) -> CbObject { std::vector<uint8_t> Buf; Buf.resize(Size); @@ -806,7 +825,7 @@ TEST_CASE("z$.size") { GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256); @@ -826,7 +845,7 @@ TEST_CASE("z$.size") { GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); const GcStorageSize SerializedSize = Zcs.StorageSize(); CHECK_EQ(SerializedSize.MemorySize, 0); @@ -849,7 +868,7 @@ TEST_CASE("z$.size") { GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64); @@ -869,7 +888,7 @@ TEST_CASE("z$.size") { GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); const GcStorageSize SerializedSize = Zcs.StorageSize(); CHECK_EQ(SerializedSize.MemorySize, 0); @@ -888,6 +907,8 @@ TEST_CASE("z$.gc") { using namespace testutils; + auto JobQueue = MakeJobQueue(1, "testqueue"); + SUBCASE("gather references does NOT add references for expired cache entries") { ScopedTemporaryDirectory TempDir; @@ -906,7 +927,7 @@ TEST_CASE("z$.gc") { GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); const auto Bucket = "teardrinker"sv; // Create a cache record @@ -943,7 +964,7 @@ TEST_CASE("z$.gc") // Expect timestamps to be serialized { GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); std::vector<IoHash> Keep; // Collect garbage with 1 hour max cache duration @@ -964,7 +985,7 @@ TEST_CASE("z$.gc") { ScopedTemporaryDirectory TempDir; GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); const auto Bucket = "fortysixandtwo"sv; const GcClock::TimePoint CurrentTime = GcClock::Now(); @@ -1009,7 +1030,7 @@ TEST_CASE("z$.gc") { ScopedTemporaryDirectory TempDir; GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); const auto Bucket = "rightintwo"sv; std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; @@ -1104,7 +1125,8 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) WorkerThreadPool ThreadPool(4); GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path()); + auto JobQueue = MakeJobQueue(1, "testqueue"); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path()); { std::atomic<size_t> WorkCompleted = 0; @@ -1313,6 +1335,8 @@ TEST_CASE("z$.namespaces") return Writer.Save(); }; + auto JobQueue = MakeJobQueue(1, "testqueue"); + ScopedTemporaryDirectory TempDir; CreateDirectories(TempDir.Path()); @@ -1321,7 +1345,7 @@ TEST_CASE("z$.namespaces") IoHash Key2; { GcManager Gc; - ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false}, nullptr); + ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false}, nullptr); const auto Bucket = "teardrinker"sv; const auto CustomNamespace = "mynamespace"sv; @@ -1346,7 +1370,7 @@ TEST_CASE("z$.namespaces") { GcManager Gc; - ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr); + ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr); const auto Bucket = "teardrinker"sv; const auto CustomNamespace = "mynamespace"sv; @@ -1379,6 +1403,8 @@ TEST_CASE("z$.drop.bucket") return Writer.Save(); }; + auto JobQueue = MakeJobQueue(1, "testqueue"); + ScopedTemporaryDirectory TempDir; CreateDirectories(TempDir.Path()); @@ -1407,7 +1433,7 @@ TEST_CASE("z$.drop.bucket") WorkerThreadPool Workers(1); { GcManager Gc; - ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr); + ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr); const auto Bucket = "teardrinker"sv; const auto Namespace = "mynamespace"sv; @@ -1454,6 +1480,8 @@ TEST_CASE("z$.drop.namespace") return Writer.Save(); }; + auto JobQueue = MakeJobQueue(1, "testqueue"); + ScopedTemporaryDirectory TempDir; CreateDirectories(TempDir.Path()); @@ -1478,7 +1506,7 @@ TEST_CASE("z$.drop.namespace") WorkerThreadPool Workers(1); { GcManager Gc; - ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr); + ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr); const auto Bucket1 = "teardrinker1"sv; const auto Bucket2 = "teardrinker2"sv; const auto Namespace1 = "mynamespace1"sv; @@ -1543,7 +1571,8 @@ TEST_CASE("z$.blocked.disklayer.put") }; GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + auto JobQueue = MakeJobQueue(1, "testqueue"); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(64 * 1024 + 64); @@ -1637,7 +1666,8 @@ TEST_CASE("z$.scrub") GcManager Gc; CidStore CidStore(Gc); - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + auto JobQueue = MakeJobQueue(1, "testqueue"); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h index 0dd160a98..0c87ecc22 100644 --- a/src/zenserver/cache/structuredcachestore.h +++ b/src/zenserver/cache/structuredcachestore.h @@ -39,6 +39,7 @@ namespace zen { class WorkerThreadPool; class DiskWriteBlocker; +class JobQueue; /* Z$ namespace @@ -77,7 +78,10 @@ public: ZenCacheDiskLayer::DiskStats DiskStats; }; - ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir); + ZenCacheNamespace(GcManager& Gc, + JobQueue& JobQueue, + const std::filesystem::path& RootDir, + const ZenCacheMemoryLayer::Configuration MemLayerConfig = {}); ~ZenCacheNamespace(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); @@ -108,6 +112,7 @@ public: private: std::filesystem::path m_RootDir; + JobQueue& m_JobQueue; ZenCacheMemoryLayer m_MemLayer; ZenCacheDiskLayer m_DiskLayer; std::atomic<uint64_t> m_HitCount{}; @@ -137,8 +142,9 @@ public: struct Configuration { - std::filesystem::path BasePath; - bool AllowAutomaticCreationOfNamespaces = false; + std::filesystem::path BasePath; + bool AllowAutomaticCreationOfNamespaces = false; + ZenCacheMemoryLayer::Configuration MemLayerConfig; struct LogConfig { bool EnableWriteLog = true; @@ -171,7 +177,7 @@ public: std::vector<NamedNamespaceStats> NamespaceStats; }; - ZenCacheStore(GcManager& Gc, const Configuration& Configuration, const DiskWriteBlocker* InDiskWriteBlocker); + ZenCacheStore(GcManager& Gc, JobQueue& JobQueue, const Configuration& Configuration, const DiskWriteBlocker* InDiskWriteBlocker); ~ZenCacheStore(); bool Get(const CacheRequestContext& Context, @@ -222,6 +228,7 @@ private: std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces; GcManager& m_Gc; + JobQueue& m_JobQueue; Configuration m_Configuration; std::atomic<uint64_t> m_HitCount{}; std::atomic<uint64_t> m_MissCount{}; diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index 6daf7fe1c..5e24d174b 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -814,9 +814,17 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.AddOption("trace.file"sv, ServerOptions.TraceFile, "tracefile"sv); ////// cache - LuaOptions.AddOption("cache.enable"sv, ServerOptions.StructuredCacheEnabled); - LuaOptions.AddOption("cache.writelog"sv, ServerOptions.StructuredCacheWriteLogEnabled); - LuaOptions.AddOption("cache.accesslog"sv, ServerOptions.StructuredCacheAccessLogEnabled); + LuaOptions.AddOption("cache.enable"sv, ServerOptions.StructuredCacheConfig.Enabled); + LuaOptions.AddOption("cache.writelog"sv, ServerOptions.StructuredCacheConfig.WriteLogEnabled, "cache-write-log"); + LuaOptions.AddOption("cache.accesslog"sv, ServerOptions.StructuredCacheConfig.AccessLogEnabled, "cache-access-log"); + + LuaOptions.AddOption("cache.memlayer.targetfootprint"sv, + ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes, + "cache-memlayer-targetfootprint"); + LuaOptions.AddOption("cache.memlayer.triminterval"sv, + ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds, + "cache-memlayer-triminterval"); + LuaOptions.AddOption("cache.memlayer.maxage"sv, ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds, "cache-memlayer-maxage"); ////// cache.upstream LuaOptions.AddOption("cache.upstream.policy"sv, ServerOptions.UpstreamCacheConfig.CachePolicy, "upstream-cache-policy"sv); @@ -1236,14 +1244,35 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) "", "cache-write-log", "Whether cache write log is enabled", - cxxopts::value<bool>(ServerOptions.StructuredCacheWriteLogEnabled)->default_value("true"), + cxxopts::value<bool>(ServerOptions.StructuredCacheConfig.WriteLogEnabled)->default_value("true"), ""); options.add_option("cache", "", "cache-access-log", "Whether cache access log is enabled", - cxxopts::value<bool>(ServerOptions.StructuredCacheAccessLogEnabled)->default_value("true"), + cxxopts::value<bool>(ServerOptions.StructuredCacheConfig.AccessLogEnabled)->default_value("true"), + ""); + + options.add_option("cache", + "", + "cache-memlayer-targetfootprint", + "Max allowed memory used by cache memory layer per namespace in bytes. Default set to 536870912 (512 Mb).", + cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes)->default_value("536870912"), + ""); + + options.add_option("cache", + "", + "cache-memlayer-triminterval", + "Minimum time between each attempt to trim cache memory layers in seconds. Default set to 60 (1 min).", + cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds)->default_value("60"), + ""); + + options.add_option("cache", + "", + "cache-memlayer-maxage", + "Maximum age of payloads when trimming cache memory layers in seconds. Default set to 86400 (1 day).", + cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds)->default_value("86400"), ""); options.add_option("compute", diff --git a/src/zenserver/config.h b/src/zenserver/config.h index bb6e20bb6..924375a19 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -116,40 +116,48 @@ struct ZenObjectStoreConfig std::vector<BucketConfig> Buckets; }; +struct ZenStructuredCacheConfig +{ + bool Enabled = true; + bool WriteLogEnabled = false; + bool AccessLogEnabled = false; + uint64_t MemTargetFootprintBytes = 512 * 1024 * 1024; + uint64_t MemTrimIntervalSeconds = 60; + uint64_t MemMaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count()); +}; + struct ZenServerOptions { - ZenUpstreamCacheConfig UpstreamCacheConfig; - ZenGcConfig GcConfig; - ZenAuthConfig AuthConfig; - ZenObjectStoreConfig ObjectStoreConfig; - zen::HttpServerConfig HttpServerConfig; - std::filesystem::path DataDir; // Root directory for state (used for testing) - std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) - std::filesystem::path AbsLogFile; // Absolute path to main log file - std::filesystem::path ConfigFile; // Path to Lua config file - std::string ChildId; // Id assigned by parent process (used for lifetime management) - std::string LogId; // Id for tagging log output - std::string EncryptionKey; // 256 bit AES encryption key - std::string EncryptionIV; // 128 bit AES initialization vector - int BasePort = 1337; // Service listen port (used for both UDP and TCP) - int OwnerPid = 0; // Parent process id (zero for standalone) - int WebSocketPort = 0; // Web socket port (Zero = disabled) - int WebSocketThreads = 0; - bool InstallService = false; // Flag used to initiate service install (temporary) - bool UninstallService = false; // Flag used to initiate service uninstall (temporary) - bool IsDebug = false; - bool IsTest = false; - bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements - bool StructuredCacheEnabled = true; - bool StructuredCacheWriteLogEnabled = false; - bool StructuredCacheAccessLogEnabled = false; - bool ExecServiceEnabled = true; - bool ComputeServiceEnabled = true; - bool ShouldCrash = false; // Option for testing crash handling - bool IsFirstRun = false; - bool NoSentry = false; - bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports - bool ObjectStoreEnabled = false; + ZenUpstreamCacheConfig UpstreamCacheConfig; + ZenGcConfig GcConfig; + ZenAuthConfig AuthConfig; + ZenObjectStoreConfig ObjectStoreConfig; + zen::HttpServerConfig HttpServerConfig; + ZenStructuredCacheConfig StructuredCacheConfig; + std::filesystem::path DataDir; // Root directory for state (used for testing) + std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) + std::filesystem::path AbsLogFile; // Absolute path to main log file + std::filesystem::path ConfigFile; // Path to Lua config file + std::string ChildId; // Id assigned by parent process (used for lifetime management) + std::string LogId; // Id for tagging log output + std::string EncryptionKey; // 256 bit AES encryption key + std::string EncryptionIV; // 128 bit AES initialization vector + int BasePort = 1337; // Service listen port (used for both UDP and TCP) + int OwnerPid = 0; // Parent process id (zero for standalone) + int WebSocketPort = 0; // Web socket port (Zero = disabled) + int WebSocketThreads = 0; + bool InstallService = false; // Flag used to initiate service install (temporary) + bool UninstallService = false; // Flag used to initiate service uninstall (temporary) + bool IsDebug = false; + bool IsTest = false; + bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements + bool ExecServiceEnabled = true; + bool ComputeServiceEnabled = true; + bool ShouldCrash = false; // Option for testing crash handling + bool IsFirstRun = false; + bool NoSentry = false; + bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports + bool ObjectStoreEnabled = false; #if ZEN_WITH_TRACE std::string TraceHost; // Host name or IP address to send trace data to std::string TraceFile; // Path of a file to write a trace diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index e4143dc01..cf9f03d89 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -383,7 +383,7 @@ public: } #endif // ZEN_WITH_COMPUTE_SERVICES - if (ServerOptions.StructuredCacheEnabled) + if (ServerOptions.StructuredCacheConfig.Enabled) { InitializeStructuredCache(ServerOptions); } @@ -946,13 +946,17 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) using namespace std::literals; ZEN_INFO("instantiating structured cache service"); - m_CacheStore = - new ZenCacheStore(m_GcManager, - ZenCacheStore::Configuration{.BasePath = m_DataRoot / "cache", - .AllowAutomaticCreationOfNamespaces = true, - .Logging = {.EnableWriteLog = ServerOptions.StructuredCacheWriteLogEnabled, - .EnableAccessLog = ServerOptions.StructuredCacheAccessLogEnabled}}, - m_GcManager.GetDiskWriteBlocker()); + m_CacheStore = new ZenCacheStore( + m_GcManager, + *m_JobQueue, + ZenCacheStore::Configuration{.BasePath = m_DataRoot / "cache", + .AllowAutomaticCreationOfNamespaces = true, + .MemLayerConfig = {.TargetFootprintBytes = ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes, + .TrimIntervalSeconds = ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds, + .MaxAgeSeconds = ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds}, + .Logging = {.EnableWriteLog = ServerOptions.StructuredCacheConfig.WriteLogEnabled, + .EnableAccessLog = ServerOptions.StructuredCacheConfig.AccessLogEnabled}}, + m_GcManager.GetDiskWriteBlocker()); const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; |