aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/admin/admin.cpp129
-rw-r--r--src/zenserver/admin/admin.h29
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp323
-rw-r--r--src/zenserver/cache/cachedisklayer.h52
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp71
-rw-r--r--src/zenserver/cache/httpstructuredcache.h6
-rw-r--r--src/zenserver/config.cpp77
-rw-r--r--src/zenserver/config.h4
-rw-r--r--src/zenserver/projectstore/projectstore.cpp6
-rw-r--r--src/zenserver/upstream/zen.cpp5
-rw-r--r--src/zenserver/zenserver.cpp52
11 files changed, 550 insertions, 204 deletions
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index c2df847ad..cc1ffdcdc 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -3,6 +3,7 @@
#include "admin.h"
#include <zencore/compactbinarybuilder.h>
+#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/jobqueue.h>
#include <zencore/logging.h>
@@ -20,24 +21,86 @@
#include <zenstore/gc.h>
#include "cache/structuredcachestore.h"
+#include "config.h"
#include "projectstore/projectstore.h"
#include <chrono>
namespace zen {
-HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
- JobQueue& BackgroundJobQueue,
- ZenCacheStore* CacheStore,
- CidStore* CidStore,
- ProjectStore* ProjectStore,
- const LogPaths& LogPaths)
+struct DirStats
+{
+ uint64_t FileCount = 0;
+ uint64_t DirCount = 0;
+ uint64_t ByteCount = 0;
+};
+
+DirStats
+GetStatsForDirectory(std::filesystem::path Dir)
+{
+ if (!std::filesystem::exists(Dir))
+ return {};
+
+ FileSystemTraversal Traversal;
+
+ struct StatsTraversal : public FileSystemTraversal::TreeVisitor
+ {
+ virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override
+ {
+ ZEN_UNUSED(Parent, File);
+ ++TotalFileCount;
+ TotalBytes += FileSize;
+ }
+ virtual bool VisitDirectory(const std::filesystem::path&, const path_view&) override
+ {
+ ++TotalDirCount;
+ return true;
+ }
+
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFileCount = 0;
+ uint64_t TotalDirCount = 0;
+
+ DirStats GetStats() { return {.FileCount = TotalFileCount, .DirCount = TotalDirCount, .ByteCount = TotalBytes}; }
+ };
+
+ StatsTraversal DirTraverser;
+ Traversal.TraverseFileSystem(Dir, DirTraverser);
+
+ return DirTraverser.GetStats();
+}
+
+struct StateDiskStats
+{
+ DirStats CacheStats;
+ DirStats CasStats;
+ DirStats ProjectStats;
+};
+
+StateDiskStats
+GetStatsForStateDirectory(std::filesystem::path StateDir)
+{
+ StateDiskStats Stats;
+ Stats.CacheStats = GetStatsForDirectory(StateDir / "cache");
+ Stats.CasStats = GetStatsForDirectory(StateDir / "cas");
+ Stats.ProjectStats = GetStatsForDirectory(StateDir / "projects");
+ return Stats;
+}
+
+HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
+ JobQueue& BackgroundJobQueue,
+ ZenCacheStore* CacheStore,
+ CidStore* CidStore,
+ ProjectStore* ProjectStore,
+ const LogPaths& LogPaths,
+ const ZenServerOptions& ServerOptions)
: m_GcScheduler(Scheduler)
, m_BackgroundJobQueue(BackgroundJobQueue)
, m_CacheStore(CacheStore)
, m_CidStore(CidStore)
, m_ProjectStore(ProjectStore)
, m_LogPaths(LogPaths)
+, m_ServerOptions(ServerOptions)
{
using namespace std::literals;
@@ -509,6 +572,60 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
#endif // ZEN_WITH_TRACE
m_Router.RegisterRoute(
+ "info",
+ [this](HttpRouterRequest& Req) {
+ CbObjectWriter Obj;
+
+ Obj << "root" << m_ServerOptions.SystemRootDir.generic_wstring();
+ Obj << "install" << (m_ServerOptions.SystemRootDir / "Install").generic_wstring();
+
+ Obj.BeginObject("primary");
+ Obj << "data" << m_ServerOptions.DataDir.generic_wstring();
+
+ try
+ {
+ auto Stats = GetStatsForStateDirectory(m_ServerOptions.DataDir);
+
+ auto EmitStats = [&](std::string_view Tag, const DirStats& Stats) {
+ Obj.BeginObject(Tag);
+ Obj << "bytes" << Stats.ByteCount;
+ Obj << "files" << Stats.FileCount;
+ Obj << "dirs" << Stats.DirCount;
+ Obj.EndObject();
+ };
+
+ EmitStats("cache", Stats.CacheStats);
+ EmitStats("cas", Stats.CasStats);
+ EmitStats("project", Stats.ProjectStats);
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("exception in disk stats gathering for '{}': {}", m_ServerOptions.DataDir, Ex.what());
+ }
+ Obj.EndObject();
+
+ try
+ {
+ std::vector<CbObject> Manifests = ReadAllCentralManifests(m_ServerOptions.SystemRootDir);
+
+ Obj.BeginArray("known");
+
+ for (const auto& Manifest : Manifests)
+ {
+ Obj.AddObject(Manifest);
+ }
+
+ Obj.EndArray();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("exception in state gathering for '{}': {}", m_ServerOptions.SystemRootDir, Ex.what());
+ }
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"logs",
[this](HttpRouterRequest& Req) {
CbObjectWriter Obj;
diff --git a/src/zenserver/admin/admin.h b/src/zenserver/admin/admin.h
index 9d8bdfe50..563c4f536 100644
--- a/src/zenserver/admin/admin.h
+++ b/src/zenserver/admin/admin.h
@@ -12,6 +12,7 @@ class JobQueue;
class ZenCacheStore;
class CidStore;
class ProjectStore;
+struct ZenServerOptions;
class HttpAdminService : public zen::HttpService
{
@@ -22,25 +23,27 @@ public:
std::filesystem::path HttpLogPath;
std::filesystem::path CacheLogPath;
};
- HttpAdminService(GcScheduler& Scheduler,
- JobQueue& BackgroundJobQueue,
- ZenCacheStore* CacheStore,
- CidStore* CidStore,
- ProjectStore* ProjectStore,
- const LogPaths& LogPaths);
+ HttpAdminService(GcScheduler& Scheduler,
+ JobQueue& BackgroundJobQueue,
+ ZenCacheStore* CacheStore,
+ CidStore* CidStore,
+ ProjectStore* ProjectStore,
+ const LogPaths& LogPaths,
+ const ZenServerOptions& ServerOptions);
~HttpAdminService();
virtual const char* BaseUri() const override;
virtual void HandleRequest(zen::HttpServerRequest& Request) override;
private:
- HttpRequestRouter m_Router;
- GcScheduler& m_GcScheduler;
- JobQueue& m_BackgroundJobQueue;
- ZenCacheStore* m_CacheStore;
- CidStore* m_CidStore;
- ProjectStore* m_ProjectStore;
- LogPaths m_LogPaths;
+ HttpRequestRouter m_Router;
+ GcScheduler& m_GcScheduler;
+ JobQueue& m_BackgroundJobQueue;
+ ZenCacheStore* m_CacheStore;
+ CidStore* m_CidStore;
+ ProjectStore* m_ProjectStore;
+ LogPaths m_LogPaths;
+ const ZenServerOptions& m_ServerOptions;
};
} // namespace zen
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 13f3c9e58..8d046105d 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -209,9 +209,6 @@ namespace {
zen::Sleep(100);
} while (true);
}
-
- uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize) { return 8u + 32u + RoundUp(PayloadSize, 8u); }
-
} // namespace
namespace fs = std::filesystem;
@@ -507,6 +504,8 @@ BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& B
std::vector<AccessTime>& AccessTimes,
std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads)
{
+ ZEN_TRACE_CPU("Z$::ReadSidecarFile");
+
ZEN_ASSERT(AccessTimes.size() == Payloads.size());
std::error_code Ec;
@@ -593,6 +592,8 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&,
const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas)
{
+ ZEN_TRACE_CPU("Z$::WriteSidecarFile");
+
BucketMetaHeader Header;
Header.EntryCount = m_ManifestEntryCount;
Header.LogPosition = SnapshotLogPosition;
@@ -701,7 +702,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
{
using namespace std::literals;
- ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate");
+ ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate");
ZEN_ASSERT(m_IsFlushing.load());
// We want to take the lock here since we register as a GC referencer a construction
@@ -768,7 +769,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
void
ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot");
+ ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot");
const uint64_t LogCount = m_SlogFile.GetLogCount();
if (m_LogFlushPosition == LogCount)
@@ -878,7 +879,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uin
uint64_t
ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile");
+ ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile");
if (!std::filesystem::is_regular_file(IndexPath))
{
@@ -967,7 +968,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
uint64_t
ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog");
+ ZEN_TRACE_CPU("Z$::Bucket::ReadLog");
if (!std::filesystem::is_regular_file(LogPath))
{
@@ -1037,7 +1038,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::
void
ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockScope& IndexLock, const bool IsNew)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog");
+ ZEN_TRACE_CPU("Z$::Bucket::Initialize");
m_StandaloneSize = 0;
@@ -1139,7 +1140,7 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H
IoBuffer
ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::GetInlineCacheValue");
+ ZEN_TRACE_CPU("Z$::Bucket::GetInlineCacheValue");
BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment);
@@ -1155,7 +1156,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con
IoBuffer
ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::GetStandaloneCacheValue");
+ ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue");
ExtendablePathBuilder<256> DataFilePath;
BuildPath(DataFilePath, HashKey);
@@ -1175,6 +1176,8 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
+ ZEN_TRACE_CPU("Z$::Bucket::Get");
+
metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
RwLock::SharedLockScope IndexLock(m_IndexLock);
@@ -1189,7 +1192,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
return false;
}
- size_t EntryIndex = It.value();
+ PayloadIndex EntryIndex = It.value();
m_AccessTimes[EntryIndex] = GcClock::TickCount();
DiskLocation Location = m_Payloads[EntryIndex].Location;
@@ -1206,7 +1209,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
if (Payload->MemCached)
{
- OutValue.Value = m_MemCachedPayloads[Payload->MemCached];
+ OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload;
Payload = nullptr;
IndexLock.ReleaseNow();
m_MemoryHitCount++;
@@ -1231,7 +1234,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
size_t ValueSize = OutValue.Value.GetSize();
if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MemCache");
+ ZEN_TRACE_CPU("Z$::Bucket::Get::MemCache");
OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end())
@@ -1240,7 +1243,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
// Only update if it has not already been updated by other thread
if (!WritePayload.MemCached)
{
- SetMemCachedData(UpdateIndexLock, WritePayload, OutValue.Value);
+ SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value);
}
}
}
@@ -1250,7 +1253,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
if (FillRawHashAndRawSize)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MetaData");
+ ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData");
if (Location.IsFlagSet(DiskLocation::kCompressed))
{
if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
@@ -1293,6 +1296,8 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
void
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
+ ZEN_TRACE_CPU("Z$::Bucket::Put");
+
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
@@ -1307,71 +1312,91 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
m_DiskWriteCount++;
}
-void
+uint64_t
ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime)
{
+ ZEN_TRACE_CPU("Z$::Bucket::MemCacheTrim");
+
+ uint64_t Trimmed = 0;
GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- if (m_MemCachedPayloads.empty())
+ uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
+ if (MemCachedCount == 0)
{
- return;
+ return 0;
}
- for (const auto& Kv : m_Index)
+
+ uint32_t WriteIndex = 0;
+ for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
{
- size_t Index = Kv.second;
- BucketPayload& Payload = m_Payloads[Index];
- if (!Payload.MemCached)
+ MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
+ if (!Data.Payload)
+ {
+ continue;
+ }
+ PayloadIndex Index = Data.OwnerIndex;
+ ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
+ GcClock::Tick AccessTime = m_AccessTimes[Index];
+ if (AccessTime < ExpireTicks)
{
+ size_t PayloadSize = Data.Payload.GetSize();
+ RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
+ Data = {};
+ m_Payloads[Index].MemCached = {};
+ Trimmed += PayloadSize;
continue;
}
- if (m_AccessTimes[Index] < ExpireTicks)
+ if (ReadIndex > WriteIndex)
{
- RemoveMemCachedData(IndexLock, Payload);
+ m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index};
+ m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex);
}
+ WriteIndex++;
}
+ m_MemCachedPayloads.resize(WriteIndex);
m_MemCachedPayloads.shrink_to_fit();
- m_FreeMemCachedPayloads.shrink_to_fit();
- m_FreeMetaDatas.shrink_to_fit();
+ zen::Reset(m_FreeMemCachedPayloads);
+ return Trimmed;
}
void
-ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart,
- GcClock::Duration SectionLength,
- std::vector<uint64_t>& InOutUsageSlots)
+ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots)
{
+ ZEN_TRACE_CPU("Z$::Bucket::GetUsageByAccess");
+
+ size_t SlotCount = InOutUsageSlots.capacity();
RwLock::SharedLockScope _(m_IndexLock);
- if (m_MemCachedPayloads.empty())
+ uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
+ if (MemCachedCount == 0)
{
return;
}
- for (const auto& It : m_Index)
+ for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
{
- size_t Index = It.second;
- BucketPayload& Payload = m_Payloads[Index];
- if (!Payload.MemCached)
+ MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
+ if (!Data.Payload)
{
continue;
}
+ PayloadIndex Index = Data.OwnerIndex;
+ ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index]));
- GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch();
- uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0);
- if (Slot >= InOutUsageSlots.capacity())
- {
- Slot = InOutUsageSlots.capacity() - 1;
- }
- if (Slot > InOutUsageSlots.size())
+ GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0);
+ size_t Slot = Age < MaxAge ? gsl::narrow<size_t>((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1);
+ ZEN_ASSERT_SLOW(Slot < SlotCount);
+ if (Slot >= InOutUsageSlots.size())
{
- InOutUsageSlots.resize(uint64_t(Slot + 1), 0);
+ InOutUsageSlots.resize(Slot + 1, 0);
}
- InOutUsageSlots[Slot] += m_MemCachedPayloads[Payload.MemCached].GetSize();
+ InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize());
}
}
bool
ZenCacheDiskLayer::CacheBucket::Drop()
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop");
+ ZEN_TRACE_CPU("Z$::Bucket::Drop");
RwLock::ExclusiveLockScope _(m_IndexLock);
@@ -1407,7 +1432,7 @@ ZenCacheDiskLayer::CacheBucket::Drop()
void
ZenCacheDiskLayer::CacheBucket::Flush()
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Flush");
+ ZEN_TRACE_CPU("Z$::Bucket::Flush");
bool Expected = false;
if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
{
@@ -1433,6 +1458,7 @@ ZenCacheDiskLayer::CacheBucket::Flush()
void
ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
+ ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot");
try
{
bool UseLegacyScheme = false;
@@ -1607,7 +1633,7 @@ ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer)
void
ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub");
+ ZEN_TRACE_CPU("Z$::Bucket::Scrub");
ZEN_INFO("scrubbing '{}'", m_BucketDir);
@@ -1823,7 +1849,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
std::vector<BucketMetaData> MetaDatas;
- std::vector<IoBuffer> MemCachedPayloads;
+ std::vector<MemCacheData> MemCachedPayloads;
std::vector<ReferenceIndex> FirstReferenceIndex;
IndexMap Index;
@@ -1847,7 +1873,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
void
ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::GatherReferences");
+ ZEN_TRACE_CPU("Z$::Bucket::GatherReferences");
#define CALCULATE_BLOCKING_TIME 0
@@ -1999,10 +2025,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
#endif // CALCULATE_BLOCKING_TIME
if (auto It = m_Index.find(Key); It != m_Index.end())
{
- const BucketPayload& CachedPayload = Payloads[It->second];
+ const BucketPayload& CachedPayload = m_Payloads[It->second];
if (CachedPayload.MemCached)
{
- Buffer = m_MemCachedPayloads[CachedPayload.MemCached];
+ Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload;
ZEN_ASSERT_SLOW(Buffer);
}
else
@@ -2065,7 +2091,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
void
ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage");
+ ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage");
ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir);
@@ -2124,7 +2150,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
std::vector<BucketMetaData> MetaDatas;
- std::vector<IoBuffer> MemCachedPayloads;
+ std::vector<MemCacheData> MemCachedPayloads;
std::vector<ReferenceIndex> FirstReferenceIndex;
IndexMap Index;
{
@@ -2165,7 +2191,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State");
+ ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::State");
RwLock::SharedLockScope IndexLock(m_IndexLock);
Stopwatch Timer;
@@ -2213,7 +2239,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
if (GcCtx.IsDeletionMode())
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::Delete");
+ ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::Delete");
ExtendablePathBuilder<256> Path;
@@ -2281,7 +2307,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment);
size_t ChunkIndex = ChunkLocations.size();
ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash[ChunkIndex] = Key;
+ ChunkIndexToChunkHash.push_back(Key);
if (ExpiredCacheKeys.contains(Key))
{
continue;
@@ -2453,7 +2479,7 @@ ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents(
void
ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
{
- ZEN_TRACE_CPU("Z$::Disk::CollectGarbage");
+ ZEN_TRACE_CPU("Z$::CollectGarbage");
std::vector<CacheBucket*> Buckets;
{
@@ -2468,13 +2494,16 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
{
Bucket->CollectGarbage(GcCtx);
}
- MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
+ if (!m_IsMemCacheTrimming)
+ {
+ MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
+ }
}
void
ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue");
+ ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue");
uint64_t NewFileSize = Value.Value.Size();
@@ -2671,16 +2700,17 @@ ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, Buck
}
void
-ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, IoBuffer& MemCachedData)
+ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData)
{
- uint64_t PayloadSize = MemCachedData.GetSize();
+ BucketPayload& Payload = m_Payloads[PayloadIndex];
+ uint64_t PayloadSize = MemCachedData.GetSize();
ZEN_ASSERT(PayloadSize != 0);
if (m_FreeMemCachedPayloads.empty())
{
if (m_MemCachedPayloads.size() != std::numeric_limits<uint32_t>::max())
{
Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(m_MemCachedPayloads.size()));
- m_MemCachedPayloads.push_back(MemCachedData);
+ m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex});
AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
m_MemoryWriteCount++;
}
@@ -2689,7 +2719,7 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, Bu
{
Payload.MemCached = m_FreeMemCachedPayloads.back();
m_FreeMemCachedPayloads.pop_back();
- m_MemCachedPayloads[Payload.MemCached] = MemCachedData;
+ m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex};
AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
m_MemoryWriteCount++;
}
@@ -2700,9 +2730,9 @@ ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&,
{
if (Payload.MemCached)
{
- size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].GetSize();
+ size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize();
RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
- m_MemCachedPayloads[Payload.MemCached] = IoBuffer{};
+ m_MemCachedPayloads[Payload.MemCached] = {};
m_FreeMemCachedPayloads.push_back(Payload.MemCached);
Payload.MemCached = {};
return PayloadSize;
@@ -2723,7 +2753,7 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck
void
ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue");
+ ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
uint8_t EntryFlags = 0;
@@ -2800,7 +2830,7 @@ public:
virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactStore");
+ ZEN_TRACE_CPU("Z$::Bucket::CompactStore");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3023,7 +3053,7 @@ private:
GcStoreCompactor*
ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveExpiredData");
+ ZEN_TRACE_CPU("Z$::Bucket::RemoveExpiredData");
size_t TotalEntries = 0;
@@ -3117,7 +3147,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
std::vector<BucketMetaData> MetaDatas;
- std::vector<IoBuffer> MemCachedPayloads;
+ std::vector<MemCacheData> MemCachedPayloads;
std::vector<ReferenceIndex> FirstReferenceIndex;
IndexMap Index;
{
@@ -3164,7 +3194,7 @@ public:
virtual void PreCache(GcCtx& Ctx) override
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::PreCache");
+ ZEN_TRACE_CPU("Z$::Bucket::PreCache");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3385,7 +3415,7 @@ public:
virtual void LockState(GcCtx& Ctx) override
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::LockState");
+ ZEN_TRACE_CPU("Z$::Bucket::LockState");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3458,7 +3488,7 @@ public:
virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveUsedReferencesFromSet");
+ ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet");
ZEN_ASSERT(m_IndexLock);
size_t InitialCount = IoCids.size();
@@ -3505,7 +3535,7 @@ public:
std::vector<GcReferenceChecker*>
ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CreateReferenceCheckers");
+ ZEN_TRACE_CPU("Z$::Bucket::CreateReferenceCheckers");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3530,7 +3560,7 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
void
ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactReferences");
+ ZEN_TRACE_CPU("Z$::Bucket::CompactReferences");
std::vector<ReferenceIndex> FirstReferenceIndex;
std::vector<IoHash> NewReferenceHashes;
@@ -3708,12 +3738,12 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
std::vector<BucketPayload>& Payloads,
std::vector<AccessTime>& AccessTimes,
std::vector<BucketMetaData>& MetaDatas,
- std::vector<IoBuffer>& MemCachedPayloads,
+ std::vector<MemCacheData>& MemCachedPayloads,
std::vector<ReferenceIndex>& FirstReferenceIndex,
IndexMap& Index,
RwLock::ExclusiveLockScope& IndexLock)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactState");
+ ZEN_TRACE_CPU("Z$::Bucket::CompactState");
size_t EntryCount = m_Index.size();
Payloads.reserve(EntryCount);
@@ -3738,7 +3768,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
}
if (Payload.MemCached)
{
- MemCachedPayloads.push_back(std::move(m_MemCachedPayloads[Payload.MemCached]));
+ MemCachedPayloads.emplace_back(
+ MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex});
Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(MemCachedPayloads.size() - 1));
}
if (m_Configuration.EnableReferenceCaching)
@@ -3811,7 +3842,7 @@ ZenCacheDiskLayer::~ZenCacheDiskLayer()
ZenCacheDiskLayer::CacheBucket*
ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
{
- ZEN_TRACE_CPU("Z$::Disk::GetOrCreateBucket");
+ ZEN_TRACE_CPU("Z$::GetOrCreateBucket");
const auto BucketName = std::string(InBucket);
{
@@ -3858,7 +3889,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
bool
ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
- ZEN_TRACE_CPU("Z$::Disk::Get");
+ ZEN_TRACE_CPU("Z$::Get");
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
@@ -3874,7 +3905,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
void
ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
- ZEN_TRACE_CPU("Z$::Disk::Put");
+ ZEN_TRACE_CPU("Z$::Put");
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
@@ -3886,6 +3917,8 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
void
ZenCacheDiskLayer::DiscoverBuckets()
{
+ ZEN_TRACE_CPU("Z$::DiscoverBuckets");
+
DirectoryContent DirContent;
GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent);
@@ -3986,6 +4019,8 @@ ZenCacheDiskLayer::DiscoverBuckets()
bool
ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
{
+ ZEN_TRACE_CPU("Z$::DropBucket");
+
RwLock::ExclusiveLockScope _(m_Lock);
auto It = m_Buckets.find(std::string(InBucket));
@@ -4008,6 +4043,8 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
bool
ZenCacheDiskLayer::Drop()
{
+ ZEN_TRACE_CPU("Z$::Drop");
+
RwLock::ExclusiveLockScope _(m_Lock);
std::vector<std::unique_ptr<CacheBucket>> Buckets;
@@ -4029,6 +4066,8 @@ ZenCacheDiskLayer::Drop()
void
ZenCacheDiskLayer::Flush()
{
+ ZEN_TRACE_CPU("Z$::Flush");
+
std::vector<CacheBucket*> Buckets;
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -4070,6 +4109,8 @@ ZenCacheDiskLayer::Flush()
void
ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
{
+ ZEN_TRACE_CPU("Z$::ScrubStorage");
+
RwLock::SharedLockScope _(m_Lock);
{
std::vector<std::future<void>> Results;
@@ -4096,7 +4137,7 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
void
ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx)
{
- ZEN_TRACE_CPU("Z$::Disk::GatherReferences");
+ ZEN_TRACE_CPU("Z$::GatherReferences");
std::vector<CacheBucket*> Buckets;
{
@@ -4213,20 +4254,11 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st
void
ZenCacheDiskLayer::MemCacheTrim()
{
- ZEN_TRACE_CPU("Z$::Disk::MemCacheTrim");
+ ZEN_TRACE_CPU("Z$::MemCacheTrim");
ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0);
-
- const GcClock::TimePoint Now = GcClock::Now();
-
- const GcClock::Tick NowTick = Now.time_since_epoch().count();
- const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
- GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim;
- const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count();
- if (NowTick < NextAllowedTrimTick)
- {
- return;
- }
+ ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0);
+ ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0);
bool Expected = false;
if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true))
@@ -4234,75 +4266,90 @@ ZenCacheDiskLayer::MemCacheTrim()
return;
}
- // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong
- const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_LastTickMemCacheTrim.store(NextTrimTick);
+ try
+ {
+ m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) {
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]");
+
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
+ uint64_t TrimmedSize = 0;
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([&] {
+ ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
+ NiceBytes(TrimmedSize),
+ NiceBytes(m_TotalMemCachedSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ const GcClock::Tick NowTick = GcClock::TickCount();
+ const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_NextAllowedTrimTick.store(NextTrimTick);
+ m_IsMemCacheTrimming.store(false);
+ });
- m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this, Now, TrimInterval](JobContext&) {
- ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]");
+ const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
- uint64_t StartSize = m_TotalMemCachedSize.load();
- Stopwatch Timer;
- const auto Guard = MakeGuard([&] {
- uint64_t EndSize = m_TotalMemCachedSize.load();
- ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
- NiceBytes(StartSize > EndSize ? StartSize - EndSize : 0),
- NiceBytes(m_TotalMemCachedSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- m_IsMemCacheTrimming.store(false);
- });
+ static const size_t UsageSlotCount = 2048;
+ std::vector<uint64_t> UsageSlots;
+ UsageSlots.reserve(UsageSlotCount);
- const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
-
- std::vector<uint64_t> UsageSlots;
- UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count());
+ std::vector<CacheBucket*> Buckets;
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ Buckets.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Buckets.push_back(Kv.second.get());
+ }
+ }
- std::vector<CacheBucket*> Buckets;
- {
- RwLock::SharedLockScope __(m_Lock);
- Buckets.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
+ const GcClock::TimePoint Now = GcClock::Now();
{
- Buckets.push_back(Kv.second.get());
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess");
+ for (CacheBucket* Bucket : Buckets)
+ {
+ Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots);
+ }
}
- }
- for (CacheBucket* Bucket : Buckets)
- {
- Bucket->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots);
- }
- uint64_t TotalSize = 0;
- for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
- {
- TotalSize += UsageSlots[Index];
- if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes)
+ uint64_t TotalSize = 0;
+ for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
{
- GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index);
- MemCacheTrim(Buckets, ExpireTime);
- break;
+ TotalSize += UsageSlots[Index];
+ if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes)
+ {
+ GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount);
+ TrimmedSize = MemCacheTrim(Buckets, ExpireTime);
+ break;
+ }
}
- }
- });
+ });
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what());
+ m_IsMemCacheTrimming.store(false);
+ }
}
-void
+uint64_t
ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime)
{
if (m_Configuration.MemCacheTargetFootprintBytes == 0)
{
- return;
+ return 0;
}
- RwLock::SharedLockScope __(m_Lock);
+ uint64_t TrimmedSize = 0;
for (CacheBucket* Bucket : Buckets)
{
- Bucket->MemCacheTrim(ExpireTime);
+ TrimmedSize += Bucket->MemCacheTrim(ExpireTime);
}
const GcClock::TimePoint Now = GcClock::Now();
const GcClock::Tick NowTick = Now.time_since_epoch().count();
const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
- GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim;
+ GcClock::Tick LastTrimTick = m_NextAllowedTrimTick;
const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
+ m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
+ return TrimmedSize;
}
#if ZEN_WITH_TESTS
diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h
index 277371f2c..6997a12e4 100644
--- a/src/zenserver/cache/cachedisklayer.h
+++ b/src/zenserver/cache/cachedisklayer.h
@@ -197,15 +197,15 @@ public:
CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config);
~CacheBucket();
- bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
- bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
- void MemCacheTrim(GcClock::TimePoint ExpireTime);
- bool Drop();
- void Flush();
- void ScrubStorage(ScrubContext& Ctx);
- void GatherReferences(GcContext& GcCtx);
- void CollectGarbage(GcContext& GcCtx);
+ bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
+ bool Drop();
+ void Flush();
+ void ScrubStorage(ScrubContext& Ctx);
+ void GatherReferences(GcContext& GcCtx);
+ void CollectGarbage(GcContext& GcCtx);
inline GcStorageSize StorageSize() const
{
@@ -218,7 +218,7 @@ public:
CacheValueDetails::BucketDetails GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const;
void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const;
- void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots);
+ void GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots);
#if ZEN_WITH_TESTS
void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time);
#endif // ZEN_WITH_TESTS
@@ -286,6 +286,11 @@ public:
operator bool() const { return RawSize != 0 || RawHash != IoHash::Zero; };
};
+ struct MemCacheData
+ {
+ IoBuffer Payload;
+ PayloadIndex OwnerIndex;
+ };
#pragma pack(pop)
static_assert(sizeof(BucketPayload) == 20u);
static_assert(sizeof(BucketMetaData) == 28u);
@@ -323,7 +328,7 @@ public:
std::vector<BucketPayload> m_Payloads;
std::vector<BucketMetaData> m_MetaDatas;
std::vector<MetaDataIndex> m_FreeMetaDatas;
- std::vector<IoBuffer> m_MemCachedPayloads;
+ std::vector<MemCacheData> m_MemCachedPayloads;
std::vector<MemCachedIndex> m_FreeMemCachedPayloads;
std::vector<ReferenceIndex> m_FirstReferenceIndex;
std::vector<IoHash> m_ReferenceHashes;
@@ -364,7 +369,7 @@ public:
const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData);
void RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload);
BucketMetaData GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const;
- void SetMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, IoBuffer& MemCachedData);
+ void SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData);
size_t RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload);
void InitializeIndexFromDisk(RwLock::ExclusiveLockScope&, bool IsNew);
@@ -390,7 +395,7 @@ public:
std::vector<BucketPayload>& Payloads,
std::vector<AccessTime>& AccessTimes,
std::vector<BucketMetaData>& MetaDatas,
- std::vector<IoBuffer>& MemCachedPayloads,
+ std::vector<MemCacheData>& MemCachedPayloads,
std::vector<ReferenceIndex>& FirstReferenceIndex,
IndexMap& Index,
RwLock::ExclusiveLockScope& IndexLock);
@@ -405,6 +410,10 @@ public:
m_MemCachedSize.fetch_sub(ValueSize, std::memory_order::relaxed);
m_OuterCacheMemoryUsage.fetch_sub(ValueSize, std::memory_order::relaxed);
}
+ static inline uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize)
+ {
+ return sizeof(MemCacheData) + sizeof(IoBufferCore) + RoundUp(PayloadSize, 8u);
+ }
// These locks are here to avoid contention on file creation, therefore it's sufficient
// that we take the same lock for the same hash
@@ -436,10 +445,21 @@ private:
{
return;
}
+ if (m_IsMemCacheTrimming)
+ {
+ return;
+ }
+
+ const GcClock::Tick NowTick = GcClock::TickCount();
+ if (NowTick < m_NextAllowedTrimTick)
+ {
+ return;
+ }
+
MemCacheTrim();
}
- void MemCacheTrim();
- void MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime);
+ void MemCacheTrim();
+ uint64_t MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime);
GcManager& m_Gc;
JobQueue& m_JobQueue;
@@ -447,7 +467,7 @@ private:
Configuration m_Configuration;
std::atomic_uint64_t m_TotalMemCachedSize{};
std::atomic_bool m_IsMemCacheTrimming = false;
- std::atomic<GcClock::Tick> m_LastTickMemCacheTrim;
+ std::atomic<GcClock::Tick> m_NextAllowedTrimTick;
mutable RwLock m_Lock;
std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets;
std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 8db96f914..f61fbd8bc 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -338,7 +338,11 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach
HttpStructuredCacheService::~HttpStructuredCacheService()
{
ZEN_INFO("closing structured cache");
- m_RequestRecorder.reset();
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
m_StatsService.UnregisterHandler("z$", *this);
m_StatusService.UnregisterHandler("z$", *this);
@@ -615,24 +619,44 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
if (Key == HttpZCacheUtilStartRecording)
{
- m_RequestRecorder.reset();
HttpServerRequest::QueryParams Params = Request.GetQueryParams();
std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
- m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
+
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+
+ m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
+ m_RequestRecordingEnabled.store(true);
+ }
+ ZEN_INFO("cache RPC recording STARTED -> '{}'", RecordPath);
Request.WriteResponse(HttpResponseCode::OK);
return;
}
+
if (Key == HttpZCacheUtilStopRecording)
{
- m_RequestRecorder.reset();
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
+ ZEN_INFO("cache RPC recording STOPPED");
Request.WriteResponse(HttpResponseCode::OK);
return;
}
+
if (Key == HttpZCacheUtilReplayRecording)
{
CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
- m_RequestRecorder.reset();
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
+
HttpServerRequest::QueryParams Params = Request.GetQueryParams();
std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
uint32_t ThreadCount = std::thread::hardware_concurrency();
@@ -643,11 +667,18 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
ThreadCount = gsl::narrow<uint32_t>(Value.value());
}
}
+
+ ZEN_INFO("initiating cache RPC replay using {} threads, from '{}'", ThreadCount, RecordPath);
+
std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false));
ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount);
+
+ ZEN_INFO("cache RPC replay STARTED");
+
Request.WriteResponse(HttpResponseCode::OK);
return;
}
+
if (Key.starts_with(HttpZCacheDetailsPrefix))
{
HandleDetailsRequest(Request);
@@ -1776,11 +1807,15 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
[this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
uint64_t RequestIndex = ~0ull;
- if (m_RequestRecorder)
+ if (m_RequestRecordingEnabled)
{
- RequestIndex = m_RequestRecorder->RecordRequest(
- {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
- Body);
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
+ {
+ RequestIndex = m_RequestRecorder->RecordRequest(
+ {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
+ Body);
+ }
}
uint32_t AcceptMagic = 0;
@@ -1816,8 +1851,11 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle);
if (RequestIndex != ~0ull)
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
+ {
+ m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
}
AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
@@ -1828,10 +1866,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
if (RequestIndex != ~0ull)
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
+ {
+ m_RequestRecorder->RecordResponse(RequestIndex,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
}
AsyncRequest.WriteResponse(HttpResponseCode::OK,
HttpContentType::kCbPackage,
diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h
index 57a533029..2feaaead8 100644
--- a/src/zenserver/cache/httpstructuredcache.h
+++ b/src/zenserver/cache/httpstructuredcache.h
@@ -190,6 +190,12 @@ private:
void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
+ // This exists to avoid taking locks when recording is not enabled
+ std::atomic_bool m_RequestRecordingEnabled{false};
+
+ // This lock should be taken in SHARED mode when calling into the recorder,
+ // and taken in EXCLUSIVE mode whenever the recorder is created or destroyed
+ RwLock m_RequestRecordingLock;
std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder;
};
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index 5f2c3351e..012925b51 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -5,6 +5,8 @@
#include "config/luaconfig.h"
#include "diag/logging.h"
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
#include <zencore/crypto.h>
#include <zencore/except.h>
#include <zencore/fmtutils.h>
@@ -41,7 +43,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
std::filesystem::path
-PickDefaultStateDirectory()
+PickDefaultSystemRootDirectory()
{
// Pick sensible default
PWSTR ProgramDataDir = nullptr;
@@ -50,7 +52,7 @@ PickDefaultStateDirectory()
if (SUCCEEDED(hRes))
{
std::filesystem::path FinalPath(ProgramDataDir);
- FinalPath /= L"Epic\\Zen\\Data";
+ FinalPath /= L"Epic\\Zen";
::CoTaskMemFree(ProgramDataDir);
return FinalPath;
@@ -66,7 +68,7 @@ PickDefaultStateDirectory()
namespace zen {
std::filesystem::path
-PickDefaultStateDirectory()
+PickDefaultSystemRootDirectory()
{
int UserId = getuid();
const passwd* Passwd = getpwuid(UserId);
@@ -79,6 +81,62 @@ PickDefaultStateDirectory()
namespace zen {
+std::filesystem::path
+PickDefaultStateDirectory(std::filesystem::path SystemRoot)
+{
+ if (SystemRoot.empty())
+ return SystemRoot;
+
+ return SystemRoot / "Data";
+}
+
+void
+EmitCentralManifest(const std::filesystem::path& SystemRoot, Oid Identifier, CbObject Manifest, std::filesystem::path ManifestPath)
+{
+ CbObjectWriter Cbo;
+ Cbo << "path" << ManifestPath.generic_wstring();
+ Cbo << "manifest" << Manifest;
+
+ const std::filesystem::path StatesPath = SystemRoot / "States";
+
+ CreateDirectories(StatesPath);
+ WriteFile(StatesPath / fmt::format("{}", Identifier), Cbo.Save().GetBuffer().AsIoBuffer());
+}
+
+std::vector<CbObject>
+ReadAllCentralManifests(const std::filesystem::path& SystemRoot)
+{
+ std::vector<CbObject> Manifests;
+
+ DirectoryContent Content;
+ GetDirectoryContent(SystemRoot / "States", DirectoryContent::IncludeFilesFlag, Content);
+
+ for (std::filesystem::path& File : Content.Files)
+ {
+ try
+ {
+ FileContents FileData = ReadFile(File);
+ IoBuffer DataBuffer = FileData.Flatten();
+ CbValidateError ValidateError = ValidateCompactBinary(DataBuffer, CbValidateMode::All);
+
+ if (ValidateError == CbValidateError::None)
+ {
+ Manifests.push_back(LoadCompactBinaryObject(DataBuffer));
+ }
+ else
+ {
+ ZEN_WARN("failed to load manifest '{}': {}", File, ToString(ValidateError));
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("failed to load manifest '{}': {}", File, Ex.what());
+ }
+ }
+
+ return Manifests;
+}
+
void
ValidateOptions(ZenServerOptions& ServerOptions)
{
@@ -343,6 +401,7 @@ ParseConfigFile(const std::filesystem::path& Path,
LuaOptions.AddOption("server.logid"sv, ServerOptions.LogId, "log-id"sv);
LuaOptions.AddOption("server.sentry.disable"sv, ServerOptions.NoSentry, "no-sentry"sv);
LuaOptions.AddOption("server.sentry.allowpersonalinfo"sv, ServerOptions.SentryAllowPII, "sentry-allow-personal-info"sv);
+ LuaOptions.AddOption("server.systemrootdir"sv, ServerOptions.SystemRootDir, "system-dir"sv);
LuaOptions.AddOption("server.datadir"sv, ServerOptions.DataDir, "data-dir"sv);
LuaOptions.AddOption("server.contentdir"sv, ServerOptions.ContentDir, "content-dir"sv);
LuaOptions.AddOption("server.abslog"sv, ServerOptions.AbsLogFile, "abslog"sv);
@@ -370,9 +429,11 @@ ParseConfigFile(const std::filesystem::path& Path,
ServerOptions.HttpServerConfig.HttpSys.IsRequestLoggingEnabled,
"httpsys-enable-request-logging"sv);
+#if ZEN_WITH_TRACE
////// trace
LuaOptions.AddOption("trace.host"sv, ServerOptions.TraceHost, "tracehost"sv);
LuaOptions.AddOption("trace.file"sv, ServerOptions.TraceFile, "tracefile"sv);
+#endif
////// stats
LuaOptions.AddOption("stats.enable"sv, ServerOptions.StatsConfig.Enabled);
@@ -503,6 +564,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
// stream operator to convert argv value into the options type. std::fs::path
// expects paths in streams to be quoted but argv paths are unquoted. By
// going into a std::string first, paths with whitespace parse correctly.
+ std::string SystemRootDir;
std::string DataDir;
std::string ContentDir;
std::string AbsLogFile;
@@ -525,6 +587,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_options()("help", "Show command line help");
options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(ServerOptions.IsTest)->default_value("false"));
options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::string>(DataDir));
+ options.add_options()("system-dir", "Specify system root", cxxopts::value<std::string>(SystemRootDir));
options.add_options()("snapshot-dir",
"Specify a snapshot of server state to mirror into the persistence root at startup",
cxxopts::value<std::string>(BaseSnapshotDir));
@@ -975,6 +1038,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
}
logging::RefreshLogLevels();
+ ServerOptions.SystemRootDir = MakeSafePath(SystemRootDir);
ServerOptions.DataDir = MakeSafePath(DataDir);
ServerOptions.BaseSnapshotDir = MakeSafePath(BaseSnapshotDir);
ServerOptions.ContentDir = MakeSafePath(ContentDir);
@@ -1022,9 +1086,14 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
throw;
}
+ if (ServerOptions.SystemRootDir.empty())
+ {
+ ServerOptions.SystemRootDir = PickDefaultSystemRootDirectory();
+ }
+
if (ServerOptions.DataDir.empty())
{
- ServerOptions.DataDir = PickDefaultStateDirectory();
+ ServerOptions.DataDir = PickDefaultStateDirectory(ServerOptions.SystemRootDir);
}
if (ServerOptions.AbsLogFile.empty())
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index cd2d92523..b5314b600 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -128,6 +128,7 @@ struct ZenServerOptions
zen::HttpServerConfig HttpServerConfig;
ZenStructuredCacheConfig StructuredCacheConfig;
ZenStatsConfig StatsConfig;
+ std::filesystem::path SystemRootDir; // System root directory (used for machine level config)
std::filesystem::path DataDir; // Root directory for state (used for testing)
std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
std::filesystem::path AbsLogFile; // Absolute path to main log file
@@ -162,4 +163,7 @@ struct ZenServerOptions
void ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions);
+void EmitCentralManifest(const std::filesystem::path& SystemRoot, Oid Identifier, CbObject Manifest, std::filesystem::path ManifestPath);
+std::vector<CbObject> ReadAllCentralManifests(const std::filesystem::path& SystemRoot);
+
} // namespace zen
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 73cb35fb8..b7507bd17 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1176,8 +1176,6 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
const OplogEntryMapping& OpMapping,
const OplogEntry& OpEntry)
{
- ZEN_TRACE_CPU("Store::Oplog::RegisterOplogEntry");
-
// For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing
// too many problems. Longer term we'll probably want to ensure we can do concurrent updates however
@@ -3662,11 +3660,11 @@ namespace testutils {
return Result;
}
- uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset)
+ uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64_t RawOffset)
{
if (RawOffset > 0)
{
- uint64 BlockSize = 0;
+ uint64_t BlockSize = 0;
OodleCompressor Compressor;
OodleCompressionLevel CompressionLevel;
if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
diff --git a/src/zenserver/upstream/zen.cpp b/src/zenserver/upstream/zen.cpp
index 8ae33597a..2d52236b3 100644
--- a/src/zenserver/upstream/zen.cpp
+++ b/src/zenserver/upstream/zen.cpp
@@ -59,6 +59,11 @@ ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClien
ZenStructuredCacheClient::~ZenStructuredCacheClient()
{
+ RwLock::ExclusiveLockScope _(m_SessionStateLock);
+ for (auto& CacheEntry : m_SessionStateCache)
+ {
+ delete CacheEntry;
+ }
}
detail::ZenCacheSessionState*
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 336f715f4..f80f95f8e 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -305,7 +305,8 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
m_ProjectStore,
HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile,
.HttpLogPath = ServerOptions.DataDir / "logs" / "http.log",
- .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"});
+ .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"},
+ ServerOptions);
m_Http->RegisterService(*m_AdminService);
return EffectiveBasePort;
@@ -329,6 +330,8 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions)
bool UpdateManifest = false;
std::filesystem::path ManifestPath = m_DataRoot / "root_manifest";
+ Oid StateId = Oid::Zero;
+ DateTime CreatedWhen{0};
if (!WipeState)
{
@@ -365,6 +368,8 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions)
m_RootManifest = LoadCompactBinaryObject(Manifest);
const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0);
+ StateId = m_RootManifest["state_id"].AsObjectId();
+ CreatedWhen = m_RootManifest["created"].AsDateTime();
if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION)
{
@@ -391,6 +396,20 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions)
}
}
+ if (StateId == Oid::Zero)
+ {
+ StateId = Oid::NewOid();
+ UpdateManifest = true;
+ }
+
+ const DateTime Now = DateTime::Now();
+
+ if (CreatedWhen.GetTicks() == 0)
+ {
+ CreatedWhen = Now;
+ UpdateManifest = true;
+ }
+
// Handle any state wipe
if (WipeState)
@@ -418,19 +437,36 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions)
UpdateManifest = true;
}
- if (UpdateManifest)
- {
- // Write new manifest
-
- const DateTime Now = DateTime::Now();
+ // Write manifest
+ {
CbObjectWriter Cbo;
- Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << Now << "updated" << Now << "state_id" << Oid::NewOid();
+ Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << CreatedWhen << "updated" << Now << "state_id" << StateId;
m_RootManifest = Cbo.Save();
- WriteFile(ManifestPath, m_RootManifest.GetBuffer().AsIoBuffer());
+ if (UpdateManifest)
+ {
+ IoBuffer ManifestBuffer = m_RootManifest.GetBuffer().AsIoBuffer();
+
+ WriteFile(ManifestPath, ManifestBuffer);
+ }
+
+ if (!ServerOptions.IsTest)
+ {
+ try
+ {
+ EmitCentralManifest(ServerOptions.SystemRootDir, StateId, m_RootManifest, ManifestPath);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Unable to emit central manifest: ", Ex.what());
+ }
+ }
}
+
+ // Write state marker
+
{
std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker";
static const std::string_view StateMarkerContent = "deleting this file will cause " ZEN_APP_NAME " to exit"sv;