diff options
| author | Stefan Boberg <[email protected]> | 2023-12-19 12:06:13 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-19 12:06:13 +0100 |
| commit | 519d942d809e740a3b1fe5a1f6a57a4cfe43408b (patch) | |
| tree | 9b3c084e21bb7fd5e6bb3335e890647062d0703b /src/zenserver/cache | |
| parent | added mimalloc_hooks (diff) | |
| parent | ensure we can build without trace (#619) (diff) | |
| download | zen-273-integrated-memory-tracking.tar.xz zen-273-integrated-memory-tracking.zip | |
Merge branch 'main' into 273-integrated-memory-tracking273-integrated-memory-tracking
Diffstat (limited to 'src/zenserver/cache')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 323 | ||||
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.h | 52 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 71 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.h | 6 |
4 files changed, 283 insertions, 169 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 13f3c9e58..8d046105d 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -209,9 +209,6 @@ namespace { zen::Sleep(100); } while (true); } - - uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize) { return 8u + 32u + RoundUp(PayloadSize, 8u); } - } // namespace namespace fs = std::filesystem; @@ -507,6 +504,8 @@ BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& B std::vector<AccessTime>& AccessTimes, std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads) { + ZEN_TRACE_CPU("Z$::ReadSidecarFile"); + ZEN_ASSERT(AccessTimes.size() == Payloads.size()); std::error_code Ec; @@ -593,6 +592,8 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&, const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas) { + ZEN_TRACE_CPU("Z$::WriteSidecarFile"); + BucketMetaHeader Header; Header.EntryCount = m_ManifestEntryCount; Header.LogPosition = SnapshotLogPosition; @@ -701,7 +702,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo { using namespace std::literals; - ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate"); + ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate"); ZEN_ASSERT(m_IsFlushing.load()); // We want to take the lock here since we register as a GC referencer a construction @@ -768,7 +769,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo void ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot"); + ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot"); const uint64_t LogCount = m_SlogFile.GetLogCount(); if (m_LogFlushPosition == LogCount) @@ -878,7 +879,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uin uint64_t ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile"); + ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile"); if (!std::filesystem::is_regular_file(IndexPath)) { @@ -967,7 +968,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const uint64_t ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog"); + ZEN_TRACE_CPU("Z$::Bucket::ReadLog"); if (!std::filesystem::is_regular_file(LogPath)) { @@ -1037,7 +1038,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std:: void ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockScope& IndexLock, const bool IsNew) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog"); + ZEN_TRACE_CPU("Z$::Bucket::Initialize"); m_StandaloneSize = 0; @@ -1139,7 +1140,7 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H IoBuffer ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const { - ZEN_TRACE_CPU("Z$::Disk::Bucket::GetInlineCacheValue"); + ZEN_TRACE_CPU("Z$::Bucket::GetInlineCacheValue"); BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); @@ -1155,7 +1156,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con IoBuffer ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const { - ZEN_TRACE_CPU("Z$::Disk::Bucket::GetStandaloneCacheValue"); + ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue"); ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); @@ -1175,6 +1176,8 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { + ZEN_TRACE_CPU("Z$::Bucket::Get"); + metrics::RequestStats::Scope StatsScope(m_GetOps, 0); RwLock::SharedLockScope IndexLock(m_IndexLock); @@ -1189,7 +1192,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal return false; } - size_t EntryIndex = It.value(); + PayloadIndex EntryIndex = It.value(); m_AccessTimes[EntryIndex] = GcClock::TickCount(); DiskLocation Location = m_Payloads[EntryIndex].Location; @@ -1206,7 +1209,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal if (Payload->MemCached) { - OutValue.Value = m_MemCachedPayloads[Payload->MemCached]; + OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload; Payload = nullptr; IndexLock.ReleaseNow(); m_MemoryHitCount++; @@ -1231,7 +1234,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal size_t ValueSize = OutValue.Value.GetSize(); if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MemCache"); + ZEN_TRACE_CPU("Z$::Bucket::Get::MemCache"); OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end()) @@ -1240,7 +1243,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal // Only update if it has not already been updated by other thread if (!WritePayload.MemCached) { - SetMemCachedData(UpdateIndexLock, WritePayload, OutValue.Value); + SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); } } } @@ -1250,7 +1253,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal if (FillRawHashAndRawSize) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MetaData"); + ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData"); if (Location.IsFlagSet(DiskLocation::kCompressed)) { if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) @@ -1293,6 +1296,8 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { + ZEN_TRACE_CPU("Z$::Bucket::Put"); + metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) @@ -1307,71 +1312,91 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& m_DiskWriteCount++; } -void +uint64_t ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) { + ZEN_TRACE_CPU("Z$::Bucket::MemCacheTrim"); + + uint64_t Trimmed = 0; GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - if (m_MemCachedPayloads.empty()) + uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); + if (MemCachedCount == 0) { - return; + return 0; } - for (const auto& Kv : m_Index) + + uint32_t WriteIndex = 0; + for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) { - size_t Index = Kv.second; - BucketPayload& Payload = m_Payloads[Index]; - if (!Payload.MemCached) + MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; + if (!Data.Payload) + { + continue; + } + PayloadIndex Index = Data.OwnerIndex; + ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); + GcClock::Tick AccessTime = m_AccessTimes[Index]; + if (AccessTime < ExpireTicks) { + size_t PayloadSize = Data.Payload.GetSize(); + RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); + Data = {}; + m_Payloads[Index].MemCached = {}; + Trimmed += PayloadSize; continue; } - if (m_AccessTimes[Index] < ExpireTicks) + if (ReadIndex > WriteIndex) { - RemoveMemCachedData(IndexLock, Payload); + m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index}; + m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex); } + WriteIndex++; } + m_MemCachedPayloads.resize(WriteIndex); m_MemCachedPayloads.shrink_to_fit(); - m_FreeMemCachedPayloads.shrink_to_fit(); - m_FreeMetaDatas.shrink_to_fit(); + zen::Reset(m_FreeMemCachedPayloads); + return Trimmed; } void -ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, - GcClock::Duration SectionLength, - std::vector<uint64_t>& InOutUsageSlots) +ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots) { + ZEN_TRACE_CPU("Z$::Bucket::GetUsageByAccess"); + + size_t SlotCount = InOutUsageSlots.capacity(); RwLock::SharedLockScope _(m_IndexLock); - if (m_MemCachedPayloads.empty()) + uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); + if (MemCachedCount == 0) { return; } - for (const auto& It : m_Index) + for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) { - size_t Index = It.second; - BucketPayload& Payload = m_Payloads[Index]; - if (!Payload.MemCached) + MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; + if (!Data.Payload) { continue; } + PayloadIndex Index = Data.OwnerIndex; + ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); - GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch(); - uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0); - if (Slot >= InOutUsageSlots.capacity()) - { - Slot = InOutUsageSlots.capacity() - 1; - } - if (Slot > InOutUsageSlots.size()) + GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0); + size_t Slot = Age < MaxAge ? gsl::narrow<size_t>((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1); + ZEN_ASSERT_SLOW(Slot < SlotCount); + if (Slot >= InOutUsageSlots.size()) { - InOutUsageSlots.resize(uint64_t(Slot + 1), 0); + InOutUsageSlots.resize(Slot + 1, 0); } - InOutUsageSlots[Slot] += m_MemCachedPayloads[Payload.MemCached].GetSize(); + InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize()); } } bool ZenCacheDiskLayer::CacheBucket::Drop() { - ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop"); + ZEN_TRACE_CPU("Z$::Bucket::Drop"); RwLock::ExclusiveLockScope _(m_IndexLock); @@ -1407,7 +1432,7 @@ ZenCacheDiskLayer::CacheBucket::Drop() void ZenCacheDiskLayer::CacheBucket::Flush() { - ZEN_TRACE_CPU("Z$::Disk::Bucket::Flush"); + ZEN_TRACE_CPU("Z$::Bucket::Flush"); bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { @@ -1433,6 +1458,7 @@ ZenCacheDiskLayer::CacheBucket::Flush() void ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc) { + ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot"); try { bool UseLegacyScheme = false; @@ -1607,7 +1633,7 @@ ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) void ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub"); + ZEN_TRACE_CPU("Z$::Bucket::Scrub"); ZEN_INFO("scrubbing '{}'", m_BucketDir); @@ -1823,7 +1849,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) std::vector<BucketPayload> Payloads; std::vector<AccessTime> AccessTimes; std::vector<BucketMetaData> MetaDatas; - std::vector<IoBuffer> MemCachedPayloads; + std::vector<MemCacheData> MemCachedPayloads; std::vector<ReferenceIndex> FirstReferenceIndex; IndexMap Index; @@ -1847,7 +1873,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) void ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::GatherReferences"); + ZEN_TRACE_CPU("Z$::Bucket::GatherReferences"); #define CALCULATE_BLOCKING_TIME 0 @@ -1999,10 +2025,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Key); It != m_Index.end()) { - const BucketPayload& CachedPayload = Payloads[It->second]; + const BucketPayload& CachedPayload = m_Payloads[It->second]; if (CachedPayload.MemCached) { - Buffer = m_MemCachedPayloads[CachedPayload.MemCached]; + Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload; ZEN_ASSERT_SLOW(Buffer); } else @@ -2065,7 +2091,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) void ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage"); + ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage"); ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); @@ -2124,7 +2150,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) std::vector<BucketPayload> Payloads; std::vector<AccessTime> AccessTimes; std::vector<BucketMetaData> MetaDatas; - std::vector<IoBuffer> MemCachedPayloads; + std::vector<MemCacheData> MemCachedPayloads; std::vector<ReferenceIndex> FirstReferenceIndex; IndexMap Index; { @@ -2165,7 +2191,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State"); + ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::State"); RwLock::SharedLockScope IndexLock(m_IndexLock); Stopwatch Timer; @@ -2213,7 +2239,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) if (GcCtx.IsDeletionMode()) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::Delete"); + ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::Delete"); ExtendablePathBuilder<256> Path; @@ -2281,7 +2307,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment); size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); - ChunkIndexToChunkHash[ChunkIndex] = Key; + ChunkIndexToChunkHash.push_back(Key); if (ExpiredCacheKeys.contains(Key)) { continue; @@ -2453,7 +2479,7 @@ ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents( void ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) { - ZEN_TRACE_CPU("Z$::Disk::CollectGarbage"); + ZEN_TRACE_CPU("Z$::CollectGarbage"); std::vector<CacheBucket*> Buckets; { @@ -2468,13 +2494,16 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) { Bucket->CollectGarbage(GcCtx); } - MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); + if (!m_IsMemCacheTrimming) + { + MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); + } } void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue"); + ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue"); uint64_t NewFileSize = Value.Value.Size(); @@ -2671,16 +2700,17 @@ ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, Buck } void -ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, IoBuffer& MemCachedData) +ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData) { - uint64_t PayloadSize = MemCachedData.GetSize(); + BucketPayload& Payload = m_Payloads[PayloadIndex]; + uint64_t PayloadSize = MemCachedData.GetSize(); ZEN_ASSERT(PayloadSize != 0); if (m_FreeMemCachedPayloads.empty()) { if (m_MemCachedPayloads.size() != std::numeric_limits<uint32_t>::max()) { Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(m_MemCachedPayloads.size())); - m_MemCachedPayloads.push_back(MemCachedData); + m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}); AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } @@ -2689,7 +2719,7 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, Bu { Payload.MemCached = m_FreeMemCachedPayloads.back(); m_FreeMemCachedPayloads.pop_back(); - m_MemCachedPayloads[Payload.MemCached] = MemCachedData; + m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}; AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } @@ -2700,9 +2730,9 @@ ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&, { if (Payload.MemCached) { - size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].GetSize(); + size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize(); RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); - m_MemCachedPayloads[Payload.MemCached] = IoBuffer{}; + m_MemCachedPayloads[Payload.MemCached] = {}; m_FreeMemCachedPayloads.push_back(Payload.MemCached); Payload.MemCached = {}; return PayloadSize; @@ -2723,7 +2753,7 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck void ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue"); + ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); uint8_t EntryFlags = 0; @@ -2800,7 +2830,7 @@ public: virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactStore"); + ZEN_TRACE_CPU("Z$::Bucket::CompactStore"); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3023,7 +3053,7 @@ private: GcStoreCompactor* ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveExpiredData"); + ZEN_TRACE_CPU("Z$::Bucket::RemoveExpiredData"); size_t TotalEntries = 0; @@ -3117,7 +3147,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) std::vector<BucketPayload> Payloads; std::vector<AccessTime> AccessTimes; std::vector<BucketMetaData> MetaDatas; - std::vector<IoBuffer> MemCachedPayloads; + std::vector<MemCacheData> MemCachedPayloads; std::vector<ReferenceIndex> FirstReferenceIndex; IndexMap Index; { @@ -3164,7 +3194,7 @@ public: virtual void PreCache(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Z$::Disk::Bucket::PreCache"); + ZEN_TRACE_CPU("Z$::Bucket::PreCache"); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3385,7 +3415,7 @@ public: virtual void LockState(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Z$::Disk::Bucket::LockState"); + ZEN_TRACE_CPU("Z$::Bucket::LockState"); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3458,7 +3488,7 @@ public: virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override { - ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveUsedReferencesFromSet"); + ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet"); ZEN_ASSERT(m_IndexLock); size_t InitialCount = IoCids.size(); @@ -3505,7 +3535,7 @@ public: std::vector<GcReferenceChecker*> ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CreateReferenceCheckers"); + ZEN_TRACE_CPU("Z$::Bucket::CreateReferenceCheckers"); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3530,7 +3560,7 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) void ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactReferences"); + ZEN_TRACE_CPU("Z$::Bucket::CompactReferences"); std::vector<ReferenceIndex> FirstReferenceIndex; std::vector<IoHash> NewReferenceHashes; @@ -3708,12 +3738,12 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, std::vector<BucketPayload>& Payloads, std::vector<AccessTime>& AccessTimes, std::vector<BucketMetaData>& MetaDatas, - std::vector<IoBuffer>& MemCachedPayloads, + std::vector<MemCacheData>& MemCachedPayloads, std::vector<ReferenceIndex>& FirstReferenceIndex, IndexMap& Index, RwLock::ExclusiveLockScope& IndexLock) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactState"); + ZEN_TRACE_CPU("Z$::Bucket::CompactState"); size_t EntryCount = m_Index.size(); Payloads.reserve(EntryCount); @@ -3738,7 +3768,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, } if (Payload.MemCached) { - MemCachedPayloads.push_back(std::move(m_MemCachedPayloads[Payload.MemCached])); + MemCachedPayloads.emplace_back( + MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex}); Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(MemCachedPayloads.size() - 1)); } if (m_Configuration.EnableReferenceCaching) @@ -3811,7 +3842,7 @@ ZenCacheDiskLayer::~ZenCacheDiskLayer() ZenCacheDiskLayer::CacheBucket* ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) { - ZEN_TRACE_CPU("Z$::Disk::GetOrCreateBucket"); + ZEN_TRACE_CPU("Z$::GetOrCreateBucket"); const auto BucketName = std::string(InBucket); { @@ -3858,7 +3889,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { - ZEN_TRACE_CPU("Z$::Disk::Get"); + ZEN_TRACE_CPU("Z$::Get"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { @@ -3874,7 +3905,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach void ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { - ZEN_TRACE_CPU("Z$::Disk::Put"); + ZEN_TRACE_CPU("Z$::Put"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { @@ -3886,6 +3917,8 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z void ZenCacheDiskLayer::DiscoverBuckets() { + ZEN_TRACE_CPU("Z$::DiscoverBuckets"); + DirectoryContent DirContent; GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); @@ -3986,6 +4019,8 @@ ZenCacheDiskLayer::DiscoverBuckets() bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { + ZEN_TRACE_CPU("Z$::DropBucket"); + RwLock::ExclusiveLockScope _(m_Lock); auto It = m_Buckets.find(std::string(InBucket)); @@ -4008,6 +4043,8 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket) bool ZenCacheDiskLayer::Drop() { + ZEN_TRACE_CPU("Z$::Drop"); + RwLock::ExclusiveLockScope _(m_Lock); std::vector<std::unique_ptr<CacheBucket>> Buckets; @@ -4029,6 +4066,8 @@ ZenCacheDiskLayer::Drop() void ZenCacheDiskLayer::Flush() { + ZEN_TRACE_CPU("Z$::Flush"); + std::vector<CacheBucket*> Buckets; Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -4070,6 +4109,8 @@ ZenCacheDiskLayer::Flush() void ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) { + ZEN_TRACE_CPU("Z$::ScrubStorage"); + RwLock::SharedLockScope _(m_Lock); { std::vector<std::future<void>> Results; @@ -4096,7 +4137,7 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) void ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) { - ZEN_TRACE_CPU("Z$::Disk::GatherReferences"); + ZEN_TRACE_CPU("Z$::GatherReferences"); std::vector<CacheBucket*> Buckets; { @@ -4213,20 +4254,11 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st void ZenCacheDiskLayer::MemCacheTrim() { - ZEN_TRACE_CPU("Z$::Disk::MemCacheTrim"); + ZEN_TRACE_CPU("Z$::MemCacheTrim"); ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0); - - const GcClock::TimePoint Now = GcClock::Now(); - - const GcClock::Tick NowTick = Now.time_since_epoch().count(); - const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); - GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; - const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count(); - if (NowTick < NextAllowedTrimTick) - { - return; - } + ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0); + ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0); bool Expected = false; if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true)) @@ -4234,75 +4266,90 @@ ZenCacheDiskLayer::MemCacheTrim() return; } - // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong - const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_LastTickMemCacheTrim.store(NextTrimTick); + try + { + m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) { + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); + + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); + uint64_t TrimmedSize = 0; + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", + NiceBytes(TrimmedSize), + NiceBytes(m_TotalMemCachedSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + + const GcClock::Tick NowTick = GcClock::TickCount(); + const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_NextAllowedTrimTick.store(NextTrimTick); + m_IsMemCacheTrimming.store(false); + }); - m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this, Now, TrimInterval](JobContext&) { - ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); + const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); - uint64_t StartSize = m_TotalMemCachedSize.load(); - Stopwatch Timer; - const auto Guard = MakeGuard([&] { - uint64_t EndSize = m_TotalMemCachedSize.load(); - ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", - NiceBytes(StartSize > EndSize ? StartSize - EndSize : 0), - NiceBytes(m_TotalMemCachedSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - m_IsMemCacheTrimming.store(false); - }); + static const size_t UsageSlotCount = 2048; + std::vector<uint64_t> UsageSlots; + UsageSlots.reserve(UsageSlotCount); - const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); - - std::vector<uint64_t> UsageSlots; - UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count()); + std::vector<CacheBucket*> Buckets; + { + RwLock::SharedLockScope __(m_Lock); + Buckets.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Buckets.push_back(Kv.second.get()); + } + } - std::vector<CacheBucket*> Buckets; - { - RwLock::SharedLockScope __(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) + const GcClock::TimePoint Now = GcClock::Now(); { - Buckets.push_back(Kv.second.get()); + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess"); + for (CacheBucket* Bucket : Buckets) + { + Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots); + } } - } - for (CacheBucket* Bucket : Buckets) - { - Bucket->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots); - } - uint64_t TotalSize = 0; - for (size_t Index = 0; Index < UsageSlots.size(); ++Index) - { - TotalSize += UsageSlots[Index]; - if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) + uint64_t TotalSize = 0; + for (size_t Index = 0; Index < UsageSlots.size(); ++Index) { - GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index); - MemCacheTrim(Buckets, ExpireTime); - break; + TotalSize += UsageSlots[Index]; + if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) + { + GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount); + TrimmedSize = MemCacheTrim(Buckets, ExpireTime); + break; + } } - } - }); + }); + } + catch (std::exception& Ex) + { + ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what()); + m_IsMemCacheTrimming.store(false); + } } -void +uint64_t ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime) { if (m_Configuration.MemCacheTargetFootprintBytes == 0) { - return; + return 0; } - RwLock::SharedLockScope __(m_Lock); + uint64_t TrimmedSize = 0; for (CacheBucket* Bucket : Buckets) { - Bucket->MemCacheTrim(ExpireTime); + TrimmedSize += Bucket->MemCacheTrim(ExpireTime); } const GcClock::TimePoint Now = GcClock::Now(); const GcClock::Tick NowTick = Now.time_since_epoch().count(); const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); - GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; + GcClock::Tick LastTrimTick = m_NextAllowedTrimTick; const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); + m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); + return TrimmedSize; } #if ZEN_WITH_TESTS diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h index 277371f2c..6997a12e4 100644 --- a/src/zenserver/cache/cachedisklayer.h +++ b/src/zenserver/cache/cachedisklayer.h @@ -197,15 +197,15 @@ public: CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config); ~CacheBucket(); - bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); - void MemCacheTrim(GcClock::TimePoint ExpireTime); - bool Drop(); - void Flush(); - void ScrubStorage(ScrubContext& Ctx); - void GatherReferences(GcContext& GcCtx); - void CollectGarbage(GcContext& GcCtx); + bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); + bool Drop(); + void Flush(); + void ScrubStorage(ScrubContext& Ctx); + void GatherReferences(GcContext& GcCtx); + void CollectGarbage(GcContext& GcCtx); inline GcStorageSize StorageSize() const { @@ -218,7 +218,7 @@ public: CacheValueDetails::BucketDetails GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const; void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const; - void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots); + void GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots); #if ZEN_WITH_TESTS void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time); #endif // ZEN_WITH_TESTS @@ -286,6 +286,11 @@ public: operator bool() const { return RawSize != 0 || RawHash != IoHash::Zero; }; }; + struct MemCacheData + { + IoBuffer Payload; + PayloadIndex OwnerIndex; + }; #pragma pack(pop) static_assert(sizeof(BucketPayload) == 20u); static_assert(sizeof(BucketMetaData) == 28u); @@ -323,7 +328,7 @@ public: std::vector<BucketPayload> m_Payloads; std::vector<BucketMetaData> m_MetaDatas; std::vector<MetaDataIndex> m_FreeMetaDatas; - std::vector<IoBuffer> m_MemCachedPayloads; + std::vector<MemCacheData> m_MemCachedPayloads; std::vector<MemCachedIndex> m_FreeMemCachedPayloads; std::vector<ReferenceIndex> m_FirstReferenceIndex; std::vector<IoHash> m_ReferenceHashes; @@ -364,7 +369,7 @@ public: const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData); void RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload); BucketMetaData GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const; - void SetMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, IoBuffer& MemCachedData); + void SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData); size_t RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload); void InitializeIndexFromDisk(RwLock::ExclusiveLockScope&, bool IsNew); @@ -390,7 +395,7 @@ public: std::vector<BucketPayload>& Payloads, std::vector<AccessTime>& AccessTimes, std::vector<BucketMetaData>& MetaDatas, - std::vector<IoBuffer>& MemCachedPayloads, + std::vector<MemCacheData>& MemCachedPayloads, std::vector<ReferenceIndex>& FirstReferenceIndex, IndexMap& Index, RwLock::ExclusiveLockScope& IndexLock); @@ -405,6 +410,10 @@ public: m_MemCachedSize.fetch_sub(ValueSize, std::memory_order::relaxed); m_OuterCacheMemoryUsage.fetch_sub(ValueSize, std::memory_order::relaxed); } + static inline uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize) + { + return sizeof(MemCacheData) + sizeof(IoBufferCore) + RoundUp(PayloadSize, 8u); + } // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash @@ -436,10 +445,21 @@ private: { return; } + if (m_IsMemCacheTrimming) + { + return; + } + + const GcClock::Tick NowTick = GcClock::TickCount(); + if (NowTick < m_NextAllowedTrimTick) + { + return; + } + MemCacheTrim(); } - void MemCacheTrim(); - void MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime); + void MemCacheTrim(); + uint64_t MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime); GcManager& m_Gc; JobQueue& m_JobQueue; @@ -447,7 +467,7 @@ private: Configuration m_Configuration; std::atomic_uint64_t m_TotalMemCachedSize{}; std::atomic_bool m_IsMemCacheTrimming = false; - std::atomic<GcClock::Tick> m_LastTickMemCacheTrim; + std::atomic<GcClock::Tick> m_NextAllowedTrimTick; mutable RwLock m_Lock; std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 8db96f914..f61fbd8bc 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -338,7 +338,11 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach HttpStructuredCacheService::~HttpStructuredCacheService() { ZEN_INFO("closing structured cache"); - m_RequestRecorder.reset(); + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } m_StatsService.UnregisterHandler("z$", *this); m_StatusService.UnregisterHandler("z$", *this); @@ -615,24 +619,44 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) if (Key == HttpZCacheUtilStartRecording) { - m_RequestRecorder.reset(); HttpServerRequest::QueryParams Params = Request.GetQueryParams(); std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); - m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); + + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + + m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); + m_RequestRecordingEnabled.store(true); + } + ZEN_INFO("cache RPC recording STARTED -> '{}'", RecordPath); Request.WriteResponse(HttpResponseCode::OK); return; } + if (Key == HttpZCacheUtilStopRecording) { - m_RequestRecorder.reset(); + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } + ZEN_INFO("cache RPC recording STOPPED"); Request.WriteResponse(HttpResponseCode::OK); return; } + if (Key == HttpZCacheUtilReplayRecording) { CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; - m_RequestRecorder.reset(); + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); uint32_t ThreadCount = std::thread::hardware_concurrency(); @@ -643,11 +667,18 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) ThreadCount = gsl::narrow<uint32_t>(Value.value()); } } + + ZEN_INFO("initiating cache RPC replay using {} threads, from '{}'", ThreadCount, RecordPath); + std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount); + + ZEN_INFO("cache RPC replay STARTED"); + Request.WriteResponse(HttpResponseCode::OK); return; } + if (Key.starts_with(HttpZCacheDetailsPrefix)) { HandleDetailsRequest(Request); @@ -1776,11 +1807,15 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { uint64_t RequestIndex = ~0ull; - if (m_RequestRecorder) + if (m_RequestRecordingEnabled) { - RequestIndex = m_RequestRecorder->RecordRequest( - {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, - Body); + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) + { + RequestIndex = m_RequestRecorder->RecordRequest( + {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, + Body); + } } uint32_t AcceptMagic = 0; @@ -1816,8 +1851,11 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); if (RequestIndex != ~0ull) { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) + { + m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + } } AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } @@ -1828,10 +1866,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) if (RequestIndex != ~0ull) { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) + { + m_RequestRecorder->RecordResponse(RequestIndex, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } } AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h index 57a533029..2feaaead8 100644 --- a/src/zenserver/cache/httpstructuredcache.h +++ b/src/zenserver/cache/httpstructuredcache.h @@ -190,6 +190,12 @@ private: void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); + // This exists to avoid taking locks when recording is not enabled + std::atomic_bool m_RequestRecordingEnabled{false}; + + // This lock should be taken in SHARED mode when calling into the recorder, + // and taken in EXCLUSIVE mode whenever the recorder is created or destroyed + RwLock m_RequestRecordingLock; std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder; }; |