aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/cache')
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp323
-rw-r--r--src/zenserver/cache/cachedisklayer.h52
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp71
-rw-r--r--src/zenserver/cache/httpstructuredcache.h6
4 files changed, 283 insertions, 169 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 13f3c9e58..8d046105d 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -209,9 +209,6 @@ namespace {
zen::Sleep(100);
} while (true);
}
-
- uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize) { return 8u + 32u + RoundUp(PayloadSize, 8u); }
-
} // namespace
namespace fs = std::filesystem;
@@ -507,6 +504,8 @@ BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& B
std::vector<AccessTime>& AccessTimes,
std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads)
{
+ ZEN_TRACE_CPU("Z$::ReadSidecarFile");
+
ZEN_ASSERT(AccessTimes.size() == Payloads.size());
std::error_code Ec;
@@ -593,6 +592,8 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&,
const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas)
{
+ ZEN_TRACE_CPU("Z$::WriteSidecarFile");
+
BucketMetaHeader Header;
Header.EntryCount = m_ManifestEntryCount;
Header.LogPosition = SnapshotLogPosition;
@@ -701,7 +702,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
{
using namespace std::literals;
- ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate");
+ ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate");
ZEN_ASSERT(m_IsFlushing.load());
// We want to take the lock here since we register as a GC referencer a construction
@@ -768,7 +769,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
void
ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot");
+ ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot");
const uint64_t LogCount = m_SlogFile.GetLogCount();
if (m_LogFlushPosition == LogCount)
@@ -878,7 +879,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uin
uint64_t
ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile");
+ ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile");
if (!std::filesystem::is_regular_file(IndexPath))
{
@@ -967,7 +968,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
uint64_t
ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog");
+ ZEN_TRACE_CPU("Z$::Bucket::ReadLog");
if (!std::filesystem::is_regular_file(LogPath))
{
@@ -1037,7 +1038,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::
void
ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockScope& IndexLock, const bool IsNew)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog");
+ ZEN_TRACE_CPU("Z$::Bucket::Initialize");
m_StandaloneSize = 0;
@@ -1139,7 +1140,7 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H
IoBuffer
ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::GetInlineCacheValue");
+ ZEN_TRACE_CPU("Z$::Bucket::GetInlineCacheValue");
BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment);
@@ -1155,7 +1156,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con
IoBuffer
ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::GetStandaloneCacheValue");
+ ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue");
ExtendablePathBuilder<256> DataFilePath;
BuildPath(DataFilePath, HashKey);
@@ -1175,6 +1176,8 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
+ ZEN_TRACE_CPU("Z$::Bucket::Get");
+
metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
RwLock::SharedLockScope IndexLock(m_IndexLock);
@@ -1189,7 +1192,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
return false;
}
- size_t EntryIndex = It.value();
+ PayloadIndex EntryIndex = It.value();
m_AccessTimes[EntryIndex] = GcClock::TickCount();
DiskLocation Location = m_Payloads[EntryIndex].Location;
@@ -1206,7 +1209,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
if (Payload->MemCached)
{
- OutValue.Value = m_MemCachedPayloads[Payload->MemCached];
+ OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload;
Payload = nullptr;
IndexLock.ReleaseNow();
m_MemoryHitCount++;
@@ -1231,7 +1234,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
size_t ValueSize = OutValue.Value.GetSize();
if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MemCache");
+ ZEN_TRACE_CPU("Z$::Bucket::Get::MemCache");
OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end())
@@ -1240,7 +1243,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
// Only update if it has not already been updated by other thread
if (!WritePayload.MemCached)
{
- SetMemCachedData(UpdateIndexLock, WritePayload, OutValue.Value);
+ SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value);
}
}
}
@@ -1250,7 +1253,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
if (FillRawHashAndRawSize)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MetaData");
+ ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData");
if (Location.IsFlagSet(DiskLocation::kCompressed))
{
if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
@@ -1293,6 +1296,8 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
void
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
+ ZEN_TRACE_CPU("Z$::Bucket::Put");
+
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
@@ -1307,71 +1312,91 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
m_DiskWriteCount++;
}
-void
+uint64_t
ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime)
{
+ ZEN_TRACE_CPU("Z$::Bucket::MemCacheTrim");
+
+ uint64_t Trimmed = 0;
GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- if (m_MemCachedPayloads.empty())
+ uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
+ if (MemCachedCount == 0)
{
- return;
+ return 0;
}
- for (const auto& Kv : m_Index)
+
+ uint32_t WriteIndex = 0;
+ for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
{
- size_t Index = Kv.second;
- BucketPayload& Payload = m_Payloads[Index];
- if (!Payload.MemCached)
+ MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
+ if (!Data.Payload)
+ {
+ continue;
+ }
+ PayloadIndex Index = Data.OwnerIndex;
+ ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
+ GcClock::Tick AccessTime = m_AccessTimes[Index];
+ if (AccessTime < ExpireTicks)
{
+ size_t PayloadSize = Data.Payload.GetSize();
+ RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
+ Data = {};
+ m_Payloads[Index].MemCached = {};
+ Trimmed += PayloadSize;
continue;
}
- if (m_AccessTimes[Index] < ExpireTicks)
+ if (ReadIndex > WriteIndex)
{
- RemoveMemCachedData(IndexLock, Payload);
+ m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index};
+ m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex);
}
+ WriteIndex++;
}
+ m_MemCachedPayloads.resize(WriteIndex);
m_MemCachedPayloads.shrink_to_fit();
- m_FreeMemCachedPayloads.shrink_to_fit();
- m_FreeMetaDatas.shrink_to_fit();
+ zen::Reset(m_FreeMemCachedPayloads);
+ return Trimmed;
}
void
-ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart,
- GcClock::Duration SectionLength,
- std::vector<uint64_t>& InOutUsageSlots)
+ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots)
{
+ ZEN_TRACE_CPU("Z$::Bucket::GetUsageByAccess");
+
+ size_t SlotCount = InOutUsageSlots.capacity();
RwLock::SharedLockScope _(m_IndexLock);
- if (m_MemCachedPayloads.empty())
+ uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
+ if (MemCachedCount == 0)
{
return;
}
- for (const auto& It : m_Index)
+ for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
{
- size_t Index = It.second;
- BucketPayload& Payload = m_Payloads[Index];
- if (!Payload.MemCached)
+ MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
+ if (!Data.Payload)
{
continue;
}
+ PayloadIndex Index = Data.OwnerIndex;
+ ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index]));
- GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch();
- uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0);
- if (Slot >= InOutUsageSlots.capacity())
- {
- Slot = InOutUsageSlots.capacity() - 1;
- }
- if (Slot > InOutUsageSlots.size())
+ GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0);
+ size_t Slot = Age < MaxAge ? gsl::narrow<size_t>((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1);
+ ZEN_ASSERT_SLOW(Slot < SlotCount);
+ if (Slot >= InOutUsageSlots.size())
{
- InOutUsageSlots.resize(uint64_t(Slot + 1), 0);
+ InOutUsageSlots.resize(Slot + 1, 0);
}
- InOutUsageSlots[Slot] += m_MemCachedPayloads[Payload.MemCached].GetSize();
+ InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize());
}
}
bool
ZenCacheDiskLayer::CacheBucket::Drop()
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop");
+ ZEN_TRACE_CPU("Z$::Bucket::Drop");
RwLock::ExclusiveLockScope _(m_IndexLock);
@@ -1407,7 +1432,7 @@ ZenCacheDiskLayer::CacheBucket::Drop()
void
ZenCacheDiskLayer::CacheBucket::Flush()
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Flush");
+ ZEN_TRACE_CPU("Z$::Bucket::Flush");
bool Expected = false;
if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
{
@@ -1433,6 +1458,7 @@ ZenCacheDiskLayer::CacheBucket::Flush()
void
ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
+ ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot");
try
{
bool UseLegacyScheme = false;
@@ -1607,7 +1633,7 @@ ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer)
void
ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub");
+ ZEN_TRACE_CPU("Z$::Bucket::Scrub");
ZEN_INFO("scrubbing '{}'", m_BucketDir);
@@ -1823,7 +1849,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
std::vector<BucketMetaData> MetaDatas;
- std::vector<IoBuffer> MemCachedPayloads;
+ std::vector<MemCacheData> MemCachedPayloads;
std::vector<ReferenceIndex> FirstReferenceIndex;
IndexMap Index;
@@ -1847,7 +1873,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
void
ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::GatherReferences");
+ ZEN_TRACE_CPU("Z$::Bucket::GatherReferences");
#define CALCULATE_BLOCKING_TIME 0
@@ -1999,10 +2025,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
#endif // CALCULATE_BLOCKING_TIME
if (auto It = m_Index.find(Key); It != m_Index.end())
{
- const BucketPayload& CachedPayload = Payloads[It->second];
+ const BucketPayload& CachedPayload = m_Payloads[It->second];
if (CachedPayload.MemCached)
{
- Buffer = m_MemCachedPayloads[CachedPayload.MemCached];
+ Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload;
ZEN_ASSERT_SLOW(Buffer);
}
else
@@ -2065,7 +2091,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
void
ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage");
+ ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage");
ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir);
@@ -2124,7 +2150,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
std::vector<BucketMetaData> MetaDatas;
- std::vector<IoBuffer> MemCachedPayloads;
+ std::vector<MemCacheData> MemCachedPayloads;
std::vector<ReferenceIndex> FirstReferenceIndex;
IndexMap Index;
{
@@ -2165,7 +2191,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State");
+ ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::State");
RwLock::SharedLockScope IndexLock(m_IndexLock);
Stopwatch Timer;
@@ -2213,7 +2239,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
if (GcCtx.IsDeletionMode())
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::Delete");
+ ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::Delete");
ExtendablePathBuilder<256> Path;
@@ -2281,7 +2307,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment);
size_t ChunkIndex = ChunkLocations.size();
ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash[ChunkIndex] = Key;
+ ChunkIndexToChunkHash.push_back(Key);
if (ExpiredCacheKeys.contains(Key))
{
continue;
@@ -2453,7 +2479,7 @@ ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents(
void
ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
{
- ZEN_TRACE_CPU("Z$::Disk::CollectGarbage");
+ ZEN_TRACE_CPU("Z$::CollectGarbage");
std::vector<CacheBucket*> Buckets;
{
@@ -2468,13 +2494,16 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
{
Bucket->CollectGarbage(GcCtx);
}
- MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
+ if (!m_IsMemCacheTrimming)
+ {
+ MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
+ }
}
void
ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue");
+ ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue");
uint64_t NewFileSize = Value.Value.Size();
@@ -2671,16 +2700,17 @@ ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, Buck
}
void
-ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, IoBuffer& MemCachedData)
+ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData)
{
- uint64_t PayloadSize = MemCachedData.GetSize();
+ BucketPayload& Payload = m_Payloads[PayloadIndex];
+ uint64_t PayloadSize = MemCachedData.GetSize();
ZEN_ASSERT(PayloadSize != 0);
if (m_FreeMemCachedPayloads.empty())
{
if (m_MemCachedPayloads.size() != std::numeric_limits<uint32_t>::max())
{
Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(m_MemCachedPayloads.size()));
- m_MemCachedPayloads.push_back(MemCachedData);
+ m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex});
AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
m_MemoryWriteCount++;
}
@@ -2689,7 +2719,7 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, Bu
{
Payload.MemCached = m_FreeMemCachedPayloads.back();
m_FreeMemCachedPayloads.pop_back();
- m_MemCachedPayloads[Payload.MemCached] = MemCachedData;
+ m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex};
AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
m_MemoryWriteCount++;
}
@@ -2700,9 +2730,9 @@ ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&,
{
if (Payload.MemCached)
{
- size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].GetSize();
+ size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize();
RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
- m_MemCachedPayloads[Payload.MemCached] = IoBuffer{};
+ m_MemCachedPayloads[Payload.MemCached] = {};
m_FreeMemCachedPayloads.push_back(Payload.MemCached);
Payload.MemCached = {};
return PayloadSize;
@@ -2723,7 +2753,7 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck
void
ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue");
+ ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
uint8_t EntryFlags = 0;
@@ -2800,7 +2830,7 @@ public:
virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactStore");
+ ZEN_TRACE_CPU("Z$::Bucket::CompactStore");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3023,7 +3053,7 @@ private:
GcStoreCompactor*
ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveExpiredData");
+ ZEN_TRACE_CPU("Z$::Bucket::RemoveExpiredData");
size_t TotalEntries = 0;
@@ -3117,7 +3147,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
std::vector<BucketMetaData> MetaDatas;
- std::vector<IoBuffer> MemCachedPayloads;
+ std::vector<MemCacheData> MemCachedPayloads;
std::vector<ReferenceIndex> FirstReferenceIndex;
IndexMap Index;
{
@@ -3164,7 +3194,7 @@ public:
virtual void PreCache(GcCtx& Ctx) override
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::PreCache");
+ ZEN_TRACE_CPU("Z$::Bucket::PreCache");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3385,7 +3415,7 @@ public:
virtual void LockState(GcCtx& Ctx) override
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::LockState");
+ ZEN_TRACE_CPU("Z$::Bucket::LockState");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3458,7 +3488,7 @@ public:
virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveUsedReferencesFromSet");
+ ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet");
ZEN_ASSERT(m_IndexLock);
size_t InitialCount = IoCids.size();
@@ -3505,7 +3535,7 @@ public:
std::vector<GcReferenceChecker*>
ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CreateReferenceCheckers");
+ ZEN_TRACE_CPU("Z$::Bucket::CreateReferenceCheckers");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3530,7 +3560,7 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
void
ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactReferences");
+ ZEN_TRACE_CPU("Z$::Bucket::CompactReferences");
std::vector<ReferenceIndex> FirstReferenceIndex;
std::vector<IoHash> NewReferenceHashes;
@@ -3708,12 +3738,12 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
std::vector<BucketPayload>& Payloads,
std::vector<AccessTime>& AccessTimes,
std::vector<BucketMetaData>& MetaDatas,
- std::vector<IoBuffer>& MemCachedPayloads,
+ std::vector<MemCacheData>& MemCachedPayloads,
std::vector<ReferenceIndex>& FirstReferenceIndex,
IndexMap& Index,
RwLock::ExclusiveLockScope& IndexLock)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactState");
+ ZEN_TRACE_CPU("Z$::Bucket::CompactState");
size_t EntryCount = m_Index.size();
Payloads.reserve(EntryCount);
@@ -3738,7 +3768,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
}
if (Payload.MemCached)
{
- MemCachedPayloads.push_back(std::move(m_MemCachedPayloads[Payload.MemCached]));
+ MemCachedPayloads.emplace_back(
+ MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex});
Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(MemCachedPayloads.size() - 1));
}
if (m_Configuration.EnableReferenceCaching)
@@ -3811,7 +3842,7 @@ ZenCacheDiskLayer::~ZenCacheDiskLayer()
ZenCacheDiskLayer::CacheBucket*
ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
{
- ZEN_TRACE_CPU("Z$::Disk::GetOrCreateBucket");
+ ZEN_TRACE_CPU("Z$::GetOrCreateBucket");
const auto BucketName = std::string(InBucket);
{
@@ -3858,7 +3889,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
bool
ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
- ZEN_TRACE_CPU("Z$::Disk::Get");
+ ZEN_TRACE_CPU("Z$::Get");
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
@@ -3874,7 +3905,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
void
ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
- ZEN_TRACE_CPU("Z$::Disk::Put");
+ ZEN_TRACE_CPU("Z$::Put");
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
@@ -3886,6 +3917,8 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
void
ZenCacheDiskLayer::DiscoverBuckets()
{
+ ZEN_TRACE_CPU("Z$::DiscoverBuckets");
+
DirectoryContent DirContent;
GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent);
@@ -3986,6 +4019,8 @@ ZenCacheDiskLayer::DiscoverBuckets()
bool
ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
{
+ ZEN_TRACE_CPU("Z$::DropBucket");
+
RwLock::ExclusiveLockScope _(m_Lock);
auto It = m_Buckets.find(std::string(InBucket));
@@ -4008,6 +4043,8 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
bool
ZenCacheDiskLayer::Drop()
{
+ ZEN_TRACE_CPU("Z$::Drop");
+
RwLock::ExclusiveLockScope _(m_Lock);
std::vector<std::unique_ptr<CacheBucket>> Buckets;
@@ -4029,6 +4066,8 @@ ZenCacheDiskLayer::Drop()
void
ZenCacheDiskLayer::Flush()
{
+ ZEN_TRACE_CPU("Z$::Flush");
+
std::vector<CacheBucket*> Buckets;
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -4070,6 +4109,8 @@ ZenCacheDiskLayer::Flush()
void
ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
{
+ ZEN_TRACE_CPU("Z$::ScrubStorage");
+
RwLock::SharedLockScope _(m_Lock);
{
std::vector<std::future<void>> Results;
@@ -4096,7 +4137,7 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
void
ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx)
{
- ZEN_TRACE_CPU("Z$::Disk::GatherReferences");
+ ZEN_TRACE_CPU("Z$::GatherReferences");
std::vector<CacheBucket*> Buckets;
{
@@ -4213,20 +4254,11 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st
void
ZenCacheDiskLayer::MemCacheTrim()
{
- ZEN_TRACE_CPU("Z$::Disk::MemCacheTrim");
+ ZEN_TRACE_CPU("Z$::MemCacheTrim");
ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0);
-
- const GcClock::TimePoint Now = GcClock::Now();
-
- const GcClock::Tick NowTick = Now.time_since_epoch().count();
- const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
- GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim;
- const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count();
- if (NowTick < NextAllowedTrimTick)
- {
- return;
- }
+ ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0);
+ ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0);
bool Expected = false;
if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true))
@@ -4234,75 +4266,90 @@ ZenCacheDiskLayer::MemCacheTrim()
return;
}
- // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong
- const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_LastTickMemCacheTrim.store(NextTrimTick);
+ try
+ {
+ m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) {
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]");
+
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
+ uint64_t TrimmedSize = 0;
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([&] {
+ ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
+ NiceBytes(TrimmedSize),
+ NiceBytes(m_TotalMemCachedSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ const GcClock::Tick NowTick = GcClock::TickCount();
+ const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_NextAllowedTrimTick.store(NextTrimTick);
+ m_IsMemCacheTrimming.store(false);
+ });
- m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this, Now, TrimInterval](JobContext&) {
- ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]");
+ const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
- uint64_t StartSize = m_TotalMemCachedSize.load();
- Stopwatch Timer;
- const auto Guard = MakeGuard([&] {
- uint64_t EndSize = m_TotalMemCachedSize.load();
- ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
- NiceBytes(StartSize > EndSize ? StartSize - EndSize : 0),
- NiceBytes(m_TotalMemCachedSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- m_IsMemCacheTrimming.store(false);
- });
+ static const size_t UsageSlotCount = 2048;
+ std::vector<uint64_t> UsageSlots;
+ UsageSlots.reserve(UsageSlotCount);
- const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
-
- std::vector<uint64_t> UsageSlots;
- UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count());
+ std::vector<CacheBucket*> Buckets;
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ Buckets.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Buckets.push_back(Kv.second.get());
+ }
+ }
- std::vector<CacheBucket*> Buckets;
- {
- RwLock::SharedLockScope __(m_Lock);
- Buckets.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
+ const GcClock::TimePoint Now = GcClock::Now();
{
- Buckets.push_back(Kv.second.get());
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess");
+ for (CacheBucket* Bucket : Buckets)
+ {
+ Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots);
+ }
}
- }
- for (CacheBucket* Bucket : Buckets)
- {
- Bucket->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots);
- }
- uint64_t TotalSize = 0;
- for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
- {
- TotalSize += UsageSlots[Index];
- if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes)
+ uint64_t TotalSize = 0;
+ for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
{
- GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index);
- MemCacheTrim(Buckets, ExpireTime);
- break;
+ TotalSize += UsageSlots[Index];
+ if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes)
+ {
+ GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount);
+ TrimmedSize = MemCacheTrim(Buckets, ExpireTime);
+ break;
+ }
}
- }
- });
+ });
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what());
+ m_IsMemCacheTrimming.store(false);
+ }
}
-void
+uint64_t
ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime)
{
if (m_Configuration.MemCacheTargetFootprintBytes == 0)
{
- return;
+ return 0;
}
- RwLock::SharedLockScope __(m_Lock);
+ uint64_t TrimmedSize = 0;
for (CacheBucket* Bucket : Buckets)
{
- Bucket->MemCacheTrim(ExpireTime);
+ TrimmedSize += Bucket->MemCacheTrim(ExpireTime);
}
const GcClock::TimePoint Now = GcClock::Now();
const GcClock::Tick NowTick = Now.time_since_epoch().count();
const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
- GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim;
+ GcClock::Tick LastTrimTick = m_NextAllowedTrimTick;
const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
+ m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
+ return TrimmedSize;
}
#if ZEN_WITH_TESTS
diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h
index 277371f2c..6997a12e4 100644
--- a/src/zenserver/cache/cachedisklayer.h
+++ b/src/zenserver/cache/cachedisklayer.h
@@ -197,15 +197,15 @@ public:
CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config);
~CacheBucket();
- bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
- bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
- void MemCacheTrim(GcClock::TimePoint ExpireTime);
- bool Drop();
- void Flush();
- void ScrubStorage(ScrubContext& Ctx);
- void GatherReferences(GcContext& GcCtx);
- void CollectGarbage(GcContext& GcCtx);
+ bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
+ bool Drop();
+ void Flush();
+ void ScrubStorage(ScrubContext& Ctx);
+ void GatherReferences(GcContext& GcCtx);
+ void CollectGarbage(GcContext& GcCtx);
inline GcStorageSize StorageSize() const
{
@@ -218,7 +218,7 @@ public:
CacheValueDetails::BucketDetails GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const;
void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const;
- void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots);
+ void GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots);
#if ZEN_WITH_TESTS
void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time);
#endif // ZEN_WITH_TESTS
@@ -286,6 +286,11 @@ public:
operator bool() const { return RawSize != 0 || RawHash != IoHash::Zero; };
};
+ struct MemCacheData
+ {
+ IoBuffer Payload;
+ PayloadIndex OwnerIndex;
+ };
#pragma pack(pop)
static_assert(sizeof(BucketPayload) == 20u);
static_assert(sizeof(BucketMetaData) == 28u);
@@ -323,7 +328,7 @@ public:
std::vector<BucketPayload> m_Payloads;
std::vector<BucketMetaData> m_MetaDatas;
std::vector<MetaDataIndex> m_FreeMetaDatas;
- std::vector<IoBuffer> m_MemCachedPayloads;
+ std::vector<MemCacheData> m_MemCachedPayloads;
std::vector<MemCachedIndex> m_FreeMemCachedPayloads;
std::vector<ReferenceIndex> m_FirstReferenceIndex;
std::vector<IoHash> m_ReferenceHashes;
@@ -364,7 +369,7 @@ public:
const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData);
void RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload);
BucketMetaData GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const;
- void SetMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, IoBuffer& MemCachedData);
+ void SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData);
size_t RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload);
void InitializeIndexFromDisk(RwLock::ExclusiveLockScope&, bool IsNew);
@@ -390,7 +395,7 @@ public:
std::vector<BucketPayload>& Payloads,
std::vector<AccessTime>& AccessTimes,
std::vector<BucketMetaData>& MetaDatas,
- std::vector<IoBuffer>& MemCachedPayloads,
+ std::vector<MemCacheData>& MemCachedPayloads,
std::vector<ReferenceIndex>& FirstReferenceIndex,
IndexMap& Index,
RwLock::ExclusiveLockScope& IndexLock);
@@ -405,6 +410,10 @@ public:
m_MemCachedSize.fetch_sub(ValueSize, std::memory_order::relaxed);
m_OuterCacheMemoryUsage.fetch_sub(ValueSize, std::memory_order::relaxed);
}
+ static inline uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize)
+ {
+ return sizeof(MemCacheData) + sizeof(IoBufferCore) + RoundUp(PayloadSize, 8u);
+ }
// These locks are here to avoid contention on file creation, therefore it's sufficient
// that we take the same lock for the same hash
@@ -436,10 +445,21 @@ private:
{
return;
}
+ if (m_IsMemCacheTrimming)
+ {
+ return;
+ }
+
+ const GcClock::Tick NowTick = GcClock::TickCount();
+ if (NowTick < m_NextAllowedTrimTick)
+ {
+ return;
+ }
+
MemCacheTrim();
}
- void MemCacheTrim();
- void MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime);
+ void MemCacheTrim();
+ uint64_t MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime);
GcManager& m_Gc;
JobQueue& m_JobQueue;
@@ -447,7 +467,7 @@ private:
Configuration m_Configuration;
std::atomic_uint64_t m_TotalMemCachedSize{};
std::atomic_bool m_IsMemCacheTrimming = false;
- std::atomic<GcClock::Tick> m_LastTickMemCacheTrim;
+ std::atomic<GcClock::Tick> m_NextAllowedTrimTick;
mutable RwLock m_Lock;
std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets;
std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 8db96f914..f61fbd8bc 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -338,7 +338,11 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach
HttpStructuredCacheService::~HttpStructuredCacheService()
{
ZEN_INFO("closing structured cache");
- m_RequestRecorder.reset();
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
m_StatsService.UnregisterHandler("z$", *this);
m_StatusService.UnregisterHandler("z$", *this);
@@ -615,24 +619,44 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
if (Key == HttpZCacheUtilStartRecording)
{
- m_RequestRecorder.reset();
HttpServerRequest::QueryParams Params = Request.GetQueryParams();
std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
- m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
+
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+
+ m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
+ m_RequestRecordingEnabled.store(true);
+ }
+ ZEN_INFO("cache RPC recording STARTED -> '{}'", RecordPath);
Request.WriteResponse(HttpResponseCode::OK);
return;
}
+
if (Key == HttpZCacheUtilStopRecording)
{
- m_RequestRecorder.reset();
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
+ ZEN_INFO("cache RPC recording STOPPED");
Request.WriteResponse(HttpResponseCode::OK);
return;
}
+
if (Key == HttpZCacheUtilReplayRecording)
{
CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
- m_RequestRecorder.reset();
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
+
HttpServerRequest::QueryParams Params = Request.GetQueryParams();
std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
uint32_t ThreadCount = std::thread::hardware_concurrency();
@@ -643,11 +667,18 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
ThreadCount = gsl::narrow<uint32_t>(Value.value());
}
}
+
+ ZEN_INFO("initiating cache RPC replay using {} threads, from '{}'", ThreadCount, RecordPath);
+
std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false));
ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount);
+
+ ZEN_INFO("cache RPC replay STARTED");
+
Request.WriteResponse(HttpResponseCode::OK);
return;
}
+
if (Key.starts_with(HttpZCacheDetailsPrefix))
{
HandleDetailsRequest(Request);
@@ -1776,11 +1807,15 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
[this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
uint64_t RequestIndex = ~0ull;
- if (m_RequestRecorder)
+ if (m_RequestRecordingEnabled)
{
- RequestIndex = m_RequestRecorder->RecordRequest(
- {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
- Body);
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
+ {
+ RequestIndex = m_RequestRecorder->RecordRequest(
+ {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
+ Body);
+ }
}
uint32_t AcceptMagic = 0;
@@ -1816,8 +1851,11 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle);
if (RequestIndex != ~0ull)
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
+ {
+ m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
}
AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
@@ -1828,10 +1866,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
if (RequestIndex != ~0ull)
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
+ {
+ m_RequestRecorder->RecordResponse(RequestIndex,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
}
AsyncRequest.WriteResponse(HttpResponseCode::OK,
HttpContentType::kCbPackage,
diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h
index 57a533029..2feaaead8 100644
--- a/src/zenserver/cache/httpstructuredcache.h
+++ b/src/zenserver/cache/httpstructuredcache.h
@@ -190,6 +190,12 @@ private:
void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
+ // This exists to avoid taking locks when recording is not enabled
+ std::atomic_bool m_RequestRecordingEnabled{false};
+
+ // This lock should be taken in SHARED mode when calling into the recorder,
+ // and taken in EXCLUSIVE mode whenever the recorder is created or destroyed
+ RwLock m_RequestRecordingLock;
std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder;
};