aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-10-07 08:29:50 +0200
committerMartin Ridgers <[email protected]>2021-10-07 08:29:50 +0200
commit03232621d183f22e12e798a753e4a606763e63d6 (patch)
tree5701d202392dd4ab947139e4046a44ab9bc6cdf7 /zenserver
parentMerged main (diff)
parentOnly enable the MSVC debug output sink for sessions when the --debug mode is ... (diff)
downloadzen-03232621d183f22e12e798a753e4a606763e63d6.tar.xz
zen-03232621d183f22e12e798a753e4a606763e63d6.zip
Merged main
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/cache/structuredcache.cpp275
-rw-r--r--zenserver/cache/structuredcache.h42
-rw-r--r--zenserver/cache/structuredcachestore.cpp275
-rw-r--r--zenserver/cache/structuredcachestore.h48
-rw-r--r--zenserver/config.cpp22
-rw-r--r--zenserver/config.h4
-rw-r--r--zenserver/diag/logging.cpp5
-rw-r--r--zenserver/experimental/frontend.cpp119
-rw-r--r--zenserver/experimental/frontend.h24
-rw-r--r--zenserver/monitoring/httpstats.cpp50
-rw-r--r--zenserver/monitoring/httpstats.h37
-rw-r--r--zenserver/monitoring/httpstatus.cpp50
-rw-r--r--zenserver/monitoring/httpstatus.h37
-rw-r--r--zenserver/projectstore.cpp12
-rw-r--r--zenserver/testing/httptest.cpp41
-rw-r--r--zenserver/upstream/jupiter.h6
-rw-r--r--zenserver/upstream/upstreamcache.cpp270
-rw-r--r--zenserver/upstream/upstreamcache.h46
-rw-r--r--zenserver/upstream/zen.cpp6
-rw-r--r--zenserver/upstream/zen.h5
-rw-r--r--zenserver/xmake.lua15
-rw-r--r--zenserver/zenserver.cpp292
-rw-r--r--zenserver/zenserver.vcxproj9
-rw-r--r--zenserver/zenserver.vcxproj.filters14
24 files changed, 1213 insertions, 491 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index dc96aecae..4a2a3748a 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -12,6 +12,7 @@
#include <zenhttp/httpserver.h>
#include <zenstore/CAS.h>
+#include "monitoring/httpstats.h"
#include "structuredcache.h"
#include "structuredcachestore.h"
#include "upstream/jupiter.h"
@@ -149,13 +150,19 @@ ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
CasStore& InStore,
CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
std::unique_ptr<UpstreamCache> UpstreamCache)
: m_Log(logging::Get("cache"))
, m_CacheStore(InCacheStore)
+, m_StatsService(StatsService)
+, m_StatusService(StatusService)
, m_CasStore(InStore)
, m_CidStore(InCidStore)
, m_UpstreamCache(std::move(UpstreamCache))
{
+ StatsService.RegisterHandler("z$", *this);
+ StatusService.RegisterHandler("z$", *this);
}
HttpStructuredCacheService::~HttpStructuredCacheService()
@@ -200,11 +207,6 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
std::string_view Key = Request.RelativeUri();
- if (Key.empty())
- {
- return HandleStatusRequest(Request);
- }
-
if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); }))
{
// Bucket reference
@@ -270,10 +272,6 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request,
case kHead:
case kGet:
{
- if (Verb == kHead)
- {
- Request.SetSuppressResponseBody();
- }
HandleGetCacheRecord(Request, Ref, Policy);
}
break;
@@ -288,28 +286,104 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request,
void
HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
{
- const ZenContentType AcceptType = Request.AcceptContentType();
+ const ZenContentType AcceptType = Request.AcceptContentType();
+ const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData;
+ const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
+ const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
- ZenCacheValue Value;
- bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
- bool InUpstreamCache = false;
+ bool Success = false;
+ ZenCacheValue LocalCacheValue;
- const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote));
+ if (m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, LocalCacheValue))
+ {
+ Success = true;
- if (QueryUpstream)
+ if (!SkipData && AcceptType == ZenContentType::kCbPackage)
+ {
+ CbPackage Package;
+ CbObjectView CacheRecord(LocalCacheValue.Value.Data());
+ uint32_t AttachmentCount = 0;
+ uint32_t ValidCount = 0;
+
+ if (!SkipAttachments)
+ {
+ CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ ValidCount++;
+ }
+ AttachmentCount++;
+ });
+
+ if (ValidCount != AttachmentCount)
+ {
+ Success = false;
+ ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(AcceptType),
+ ValidCount,
+ AttachmentCount);
+ }
+ }
+
+ Package.SetObject(LoadCompactBinaryObject(LocalCacheValue.Value));
+
+ BinaryWriter MemStream;
+ Package.Save(MemStream);
+
+ LocalCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ LocalCacheValue.Value.SetContentType(HttpContentType::kCbPackage);
+ }
+ }
+
+ if (Success)
{
- const ZenContentType CacheRecordType = AcceptType;
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(LocalCacheValue.Value.Size()),
+ ToString(LocalCacheValue.Value.GetContentType()));
+
+ m_CacheStats.HitCount++;
+
+ if (SkipData)
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ return Request.WriteResponse(HttpResponseCode::OK, LocalCacheValue.Value.GetContentType(), LocalCacheValue.Value);
+ }
+ }
+ else if (!QueryUpstream)
+ {
+ ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
+ m_CacheStats.MissCount++;
+ return Request.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ // Issue upstream query asynchronously in order to keep requests flowing without
+ // hogging I/O servicing threads with blocking work
+
+ Request.WriteResponseAsync([this, AcceptType, SkipData, SkipAttachments, Ref](HttpServerRequest& AsyncRequest) {
+ bool Success = false;
+ ZenCacheValue UpstreamCacheValue;
- if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType);
+ metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming);
+
+ if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType);
UpstreamResult.Success)
{
- Value.Value = UpstreamResult.Value;
- Success = true;
- InUpstreamCache = true;
+ Success = true;
+ UpstreamCacheValue.Value = UpstreamResult.Value;
+
+ UpstreamCacheValue.Value.SetContentType(AcceptType);
- if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject)
+ if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject)
{
- if (CacheRecordType == ZenContentType::kCbObject)
+ if (AcceptType == ZenContentType::kCbObject)
{
const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All);
@@ -322,7 +396,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); });
IndexData.EndArray();
- Value.IndexData = IndexData.Save();
+ UpstreamCacheValue.IndexData = IndexData.Save();
}
else
{
@@ -336,19 +410,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (Success)
{
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, UpstreamCacheValue);
}
}
- else
+ else if (AcceptType == ZenContentType::kCbPackage)
{
- ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage);
-
CbPackage Package;
- if (Package.TryLoad(UpstreamResult.Value))
+ if (Package.TryLoad(UpstreamCacheValue.Value))
{
+ CbObject CacheRecord = Package.GetObject();
uint32_t AttachmentCount = 0;
uint32_t ValidCount = 0;
- CbObject CacheRecord = Package.GetObject();
CacheRecord.IterateAttachments([this, &Package, &Ref, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) {
if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
@@ -373,7 +445,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
{
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
- if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments))
+ if (SkipAttachments)
{
CbPackage PackageWithoutAttachments;
PackageWithoutAttachments.SetObject(CacheRecord);
@@ -381,7 +453,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
BinaryWriter MemStream;
PackageWithoutAttachments.Save(MemStream);
- Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ UpstreamCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
}
}
else
@@ -400,86 +472,34 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
}
}
}
- }
-
- if (!Success)
- {
- ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache)
- {
- CbObjectView CacheRecord(Value.Value.Data());
-
- const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All);
- if (ValidationResult != CbValidateError::None)
+ if (Success)
{
- ZEN_WARN("GET - '{}/{}' '{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
- return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv);
- }
-
- const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments);
- uint32_t AttachmentCount = 0;
- uint32_t ValidCount = 0;
- uint64_t AttachmentBytes = 0ull;
-
- CbPackage Package;
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(UpstreamCacheValue.Value.Size()),
+ ToString(UpstreamCacheValue.Value.GetContentType()));
- if (!SkipAttachments)
- {
- CacheRecord.IterateAttachments(
- [this, &Ref, &Package, &AttachmentCount, &ValidCount, &AttachmentBytes](CbFieldView AttachmentHash) {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
- AttachmentBytes += Chunk.Size();
- ValidCount++;
- }
- AttachmentCount++;
- });
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
- if (ValidCount != AttachmentCount)
+ if (SkipData)
{
- ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments",
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType),
- ValidCount,
- AttachmentCount);
-
- return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv);
+ AsyncRequest.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ AsyncRequest.WriteResponse(HttpResponseCode::OK, UpstreamCacheValue.Value.GetContentType(), UpstreamCacheValue.Value);
}
}
-
- Package.SetObject(LoadCompactBinaryObject(Value.Value));
-
- ZEN_DEBUG("HIT - '{}/{}' {} '{}', {} attachments (LOCAL)",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(AttachmentBytes + Value.Value.Size()),
- ToString(HttpContentType::kCbPackage),
- AttachmentCount);
-
- BinaryWriter MemStream;
- Package.Save(MemStream);
-
- IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
-
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response);
- }
- else
- {
- ZEN_DEBUG("HIT - '{}/{}' {} '{}' ({})",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Value.Value.Size()),
- ToString(Value.Value.GetContentType()),
- InUpstreamCache ? "UPSTREAM" : "LOCAL");
-
- Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
- }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
+ m_CacheStats.MissCount++;
+ AsyncRequest.WriteResponse(HttpResponseCode::NotFound);
+ }
+ });
}
void
@@ -668,10 +688,6 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request
case kGet:
{
HandleGetCachePayload(Request, Ref, Policy);
- if (Verb == kHead)
- {
- Request.SetSuppressResponseBody();
- }
}
break;
case kPut:
@@ -712,7 +728,8 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
if (!Payload)
{
- ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId);
+ ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType()));
+ m_CacheStats.MissCount++;
return Request.WriteResponse(HttpResponseCode::NotFound);
}
@@ -724,7 +741,20 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
ToString(Payload.GetContentType()),
InUpstreamCache ? "UPSTREAM" : "LOCAL");
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload);
+ m_CacheStats.HitCount++;
+ if (InUpstreamCache)
+ {
+ m_CacheStats.UpstreamHitCount++;
+ }
+
+ if ((Policy & CachePolicy::SkipData) == CachePolicy::SkipData)
+ {
+ Request.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload);
+ }
}
void
@@ -839,12 +869,25 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
}
void
-HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request)
+HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request)
{
CbObjectWriter Cbo;
- Cbo << "ok" << true;
EmitSnapshot("requests", m_HttpRequests, Cbo);
+ EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo);
+
+ const uint64_t HitCount = m_CacheStats.HitCount;
+ const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount;
+ const uint64_t MissCount = m_CacheStats.MissCount;
+ const uint64_t TotalCount = HitCount + MissCount;
+
+ Cbo.BeginObject("cache");
+ Cbo << "hits" << HitCount << "misses" << MissCount;
+ Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount) * 100.0) : 0.0);
+ Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount;
+ Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) * 100.0 : 0.0);
+ Cbo.EndObject();
+
if (m_UpstreamCache)
{
Cbo.BeginObject("upstream");
@@ -855,4 +898,12 @@ HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request)
Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
+void
+HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
} // namespace zen
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 47fc173e9..ad7253f79 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -5,6 +5,9 @@
#include <zencore/stats.h>
#include <zenhttp/httpserver.h>
+#include "monitoring/httpstats.h"
+#include "monitoring/httpstatus.h"
+
#include <memory>
namespace spdlog {
@@ -27,12 +30,12 @@ enum class CachePolicy : uint8_t;
*
* {BucketId}/{KeyHash}
*
- * Where BucketId is an alphanumeric string, and KeyHash is a 40-character hexadecimal
- * sequence. The hash value may be derived in any number of ways, it's up to the
- * application to pick an approach.
+ * Where BucketId is a lower-case alphanumeric string, and KeyHash is a 40-character
+ * hexadecimal sequence. The hash value may be derived in any number of ways, it's
+ * up to the application to pick an approach.
*
* Values may be structured or unstructured. Structured values are encoded using Unreal
- * Engine's compact binary encoding
+ * Engine's compact binary encoding (see CbObject)
*
* Additionally, attachments may be addressed as:
*
@@ -47,18 +50,19 @@ enum class CachePolicy : uint8_t;
*
*/
-class HttpStructuredCacheService : public zen::HttpService
+class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider
{
public:
HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- zen::CasStore& InCasStore,
- zen::CidStore& InCidStore,
+ CasStore& InCasStore,
+ CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
std::unique_ptr<UpstreamCache> UpstreamCache);
~HttpStructuredCacheService();
virtual const char* BaseUri() const override;
-
- virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleRequest(zen::HttpServerRequest& Request) override;
void Flush();
void Scrub(ScrubContext& Ctx);
@@ -71,6 +75,13 @@ private:
IoHash PayloadId;
};
+ struct CacheStats
+ {
+ std::atomic_uint64_t HitCount{};
+ std::atomic_uint64_t UpstreamHitCount{};
+ std::atomic_uint64_t MissCount{};
+ };
+
[[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef);
void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
@@ -79,16 +90,21 @@ private:
void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
- void HandleStatusRequest(zen::HttpServerRequest& Request);
+ virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
- zen::ZenCacheStore& m_CacheStore;
- zen::CasStore& m_CasStore;
- zen::CidStore& m_CidStore;
+ ZenCacheStore& m_CacheStore;
+ HttpStatsService& m_StatsService;
+ HttpStatusService& m_StatusService;
+ CasStore& m_CasStore;
+ CidStore& m_CidStore;
std::unique_ptr<UpstreamCache> m_UpstreamCache;
uint64_t m_LastScrubTime = 0;
metrics::OperationTiming m_HttpRequests;
+ metrics::OperationTiming m_UpstreamGetRequestTiming;
+ CacheStats m_CacheStats;
};
} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 5e93ebaa9..580446473 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -32,6 +32,8 @@ ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir
{
ZEN_INFO("initializing structured cache at '{}'", RootDir);
CreateDirectories(RootDir);
+
+ m_DiskLayer.DiscoverBuckets();
}
ZenCacheStore::~ZenCacheStore()
@@ -116,6 +118,13 @@ ZenCacheStore::Scrub(ScrubContext& Ctx)
m_DiskLayer.Scrub(Ctx);
m_MemLayer.Scrub(Ctx);
}
+
+void
+ZenCacheStore::GarbageCollect(GcContext& GcCtx)
+{
+ ZEN_UNUSED(GcCtx);
+}
+
//////////////////////////////////////////////////////////////////////////
ZenCacheMemoryLayer::ZenCacheMemoryLayer()
@@ -142,6 +151,10 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa
_.ReleaseNow();
+ // There's a race here. Since the lock is released early to allow
+ // inserts, the bucket delete path could end up deleting the
+ // underlying data structure
+
return Bucket->Get(HashKey, OutValue);
}
@@ -195,13 +208,21 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx)
}
void
+ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx)
+{
+ ZEN_UNUSED(GcCtx);
+}
+
+void
ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
+ RwLock::SharedLockScope _(m_bucketLock);
+
std::vector<IoHash> BadHashes;
for (auto& Kv : m_cacheMap)
{
- if (Kv.first != IoHash::HashBuffer(Kv.second))
+ if (Kv.first != IoHash::HashBuffer(Kv.second.Payload))
{
BadHashes.push_back(Kv.first);
}
@@ -209,10 +230,16 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
if (!BadHashes.empty())
{
- Ctx.ReportBadChunks(BadHashes);
+ Ctx.ReportBadCasChunks(BadHashes);
}
}
+void
+ZenCacheMemoryLayer::CacheBucket::GarbageCollect(GcContext& GcCtx)
+{
+ ZEN_UNUSED(GcCtx);
+}
+
bool
ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -224,18 +251,26 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV
}
else
{
- OutValue.Value = bucketIt->second;
+ BucketValue& Value = bucketIt.value();
+ OutValue.Value = Value.Payload;
+ Value.LastAccess = GetCurrentTimeStamp();
return true;
}
}
+uint64_t
+ZenCacheMemoryLayer::CacheBucket::GetCurrentTimeStamp()
+{
+ return GetLofreqTimerValue();
+}
+
void
ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
RwLock::ExclusiveLockScope _(m_bucketLock);
- m_cacheMap[HashKey] = Value.Value;
+ m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GetCurrentTimeStamp(), .Payload = Value.Value});
}
//////////////////////////////////////////////////////////////////////////
@@ -245,11 +280,17 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue
struct DiskLocation
{
- uint64_t OffsetAndFlags;
- uint32_t Size;
- uint32_t IndexDataSize;
+ inline DiskLocation() = default;
+
+ inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
+ : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
+ , LowerSize(ValueSize & 0xFFFFffff)
+ , IndexDataSize(IndexSize)
+ {
+ }
- static const uint64_t kOffsetMask = 0x00FF'ffFF'ffFF'ffFFull;
+ static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
+ static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull;
static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
@@ -257,6 +298,7 @@ struct DiskLocation
static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
+ inline uint64_t Size() const { return LowerSize; }
inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
inline ZenContentType GetContentType() const
{
@@ -269,6 +311,11 @@ struct DiskLocation
return ContentType;
}
+
+private:
+ uint64_t OffsetAndFlags = 0;
+ uint32_t LowerSize = 0;
+ uint32_t IndexDataSize = 0;
};
struct DiskIndexEntry
@@ -286,7 +333,7 @@ struct ZenCacheDiskLayer::CacheBucket
CacheBucket(CasStore& Cas);
~CacheBucket();
- void OpenOrCreate(std::filesystem::path BucketDir);
+ void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
static bool Delete(std::filesystem::path BucketDir);
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -294,14 +341,15 @@ struct ZenCacheDiskLayer::CacheBucket
void Drop();
void Flush();
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
- inline bool IsOk() const { return m_Ok; }
+ inline bool IsOk() const { return m_IsOk; }
private:
CasStore& m_CasStore;
std::filesystem::path m_BucketDir;
Oid m_BucketId;
- bool m_Ok = false;
+ bool m_IsOk = false;
uint64_t m_LargeObjectThreshold = 64 * 1024;
// These files are used to manage storage of small objects for this bucket
@@ -314,9 +362,19 @@ private:
uint64_t m_WriteCursor = 0;
void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey);
- void PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value);
- bool GetStandaloneCacheValue(const IoHash& HashKey, ZenCacheValue& OutValue, const DiskLocation& Loc);
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
+
+ // These locks are here to avoid contention on file creation, therefore it's sufficient
+ // that we take the same lock for the same hash
+ //
+ // These locks are small and should really be spaced out so they don't share cache lines,
+ // but we don't currently access them at particularly high frequency so it should not be
+ // an issue in practice
+
+ RwLock m_ShardedLocks[256];
+ inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; }
};
ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas)
@@ -341,7 +399,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir)
}
void
-ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
+ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate)
{
CreateDirectories(BucketDir);
@@ -368,17 +426,23 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
{
ManifestFile.Read(&m_BucketId, sizeof(Oid), 0);
- m_Ok = true;
+ m_IsOk = true;
}
- if (!m_Ok)
+ if (!m_IsOk)
{
ManifestFile.Close();
}
}
- if (!m_Ok)
+ if (!m_IsOk)
{
+ if (AllowCreate == false)
+ {
+ // Invalid bucket
+ return;
+ }
+
// No manifest file found, this is a new bucket
ManifestFile.Open(ManifestPath, /* IsCreate */ true, Ec);
@@ -410,13 +474,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
m_SlogFile.Replay([&](const DiskIndexEntry& Record) {
m_Index[Record.Key] = Record.Location;
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size);
+ MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size());
});
m_WriteCursor = (MaxFileOffset + 15) & ~15;
}
- m_Ok = true;
+ m_IsOk = true;
}
void
@@ -437,23 +501,25 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(WideStringBuilderBase& Path, const IoH
bool
ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue)
{
- if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size);
- OutValue.Value.SetContentType(Loc.GetContentType());
-
- return true;
+ return false;
}
- return false;
+ OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size());
+ OutValue.Value.SetContentType(Loc.GetContentType());
+
+ return true;
}
bool
-ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const IoHash& HashKey, ZenCacheValue& OutValue, const DiskLocation& Loc)
+ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue)
{
WideStringBuilder<128> DataFilePath;
BuildPath(DataFilePath, HashKey);
+ RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
+
if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str()))
{
OutValue.Value = Data;
@@ -468,7 +534,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const IoHash& HashKey, Z
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
- if (!m_Ok)
+ if (!m_IsOk)
{
return false;
}
@@ -486,7 +552,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
_.ReleaseNow();
- return GetStandaloneCacheValue(HashKey, OutValue, Loc);
+ return GetStandaloneCacheValue(Loc, HashKey, OutValue);
}
return false;
@@ -495,14 +561,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
void
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
- if (!m_Ok)
+ if (!m_IsOk)
{
return;
}
if (Value.Value.Size() >= m_LargeObjectThreshold)
{
- return PutLargeObject(HashKey, Value);
+ return PutStandaloneCacheValue(HashKey, Value);
}
else
{
@@ -517,10 +583,9 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
RwLock::ExclusiveLockScope _(m_IndexLock);
- DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags),
- .Size = gsl::narrow<uint32_t>(Value.Value.Size())};
+ DiskLocation Loc(m_WriteCursor, Value.Value.Size(), 0, EntryFlags);
- m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size, 16);
+ m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size(), 16);
if (auto it = m_Index.find(HashKey); it == m_Index.end())
{
@@ -530,11 +595,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
else
{
// TODO: should check if write is idempotent and bail out if it is?
+ // this would requiring comparing contents on disk unless we add a
+ // content hash to the index entry
it.value() = Loc;
}
m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset());
+ m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset());
}
}
@@ -558,61 +625,69 @@ ZenCacheDiskLayer::CacheBucket::Flush()
void
ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
- std::vector<DiskIndexEntry> StandaloneFiles;
+ std::atomic<uint64_t> ScrubbedChunks{0}, ScrubbedBytes{0};
- std::vector<IoHash> BadChunks;
- std::vector<IoBuffer> BadStandaloneChunks;
+ std::vector<IoHash> BadChunks;
{
RwLock::SharedLockScope _(m_IndexLock);
for (auto& Kv : m_Index)
{
- const IoHash& Hash = Kv.first;
- const DiskLocation& Loc = Kv.second;
+ const IoHash& HashKey = Kv.first;
+ const DiskLocation& Loc = Kv.second;
ZenCacheValue Value;
- if (!GetInlineCacheValue(Loc, Value))
+ if (GetInlineCacheValue(Loc, Value))
{
- ZEN_ASSERT(Loc.IsFlagSet(DiskLocation::kStandaloneFile));
- StandaloneFiles.push_back({.Key = Hash, .Location = Loc});
+ // Validate contents
}
else
{
- if (GetStandaloneCacheValue(Hash, Value, Loc))
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- // Hash contents
-
- const IoHash ComputedHash = HashBuffer(Value.Value);
-
- if (ComputedHash != Hash)
+ if (GetStandaloneCacheValue(Loc, HashKey, Value))
{
- BadChunks.push_back(Hash);
+ // Note: we cannot currently validate contents since we don't
+ // have a content hash!
+ }
+ else
+ {
+ // Value not found
+ BadChunks.push_back(HashKey);
}
- }
- else
- {
- // Non-existent
}
}
}
}
- if (Ctx.RunRecovery())
+ Ctx.ReportScrubbed(ScrubbedChunks, ScrubbedBytes);
+
+ if (BadChunks.empty())
{
- // Clean out bad chunks
+ return;
}
- if (!BadChunks.empty())
+ Ctx.ReportBadCasChunks(BadChunks);
+
+ if (Ctx.RunRecovery())
{
- Ctx.ReportBadChunks(BadChunks);
+ // Clean out bad data
}
}
void
-ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheDiskLayer::CacheBucket::GarbageCollect(GcContext& GcCtx)
{
+ ZEN_UNUSED(GcCtx);
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
+
WideStringBuilder<128> DataFilePath;
BuildPath(DataFilePath, HashKey);
@@ -661,7 +736,7 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenC
RwLock::ExclusiveLockScope _(m_IndexLock);
- DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0};
+ DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags);
if (auto it = m_Index.find(HashKey); it == m_Index.end())
{
@@ -719,7 +794,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= std::string(InBucket);
- Bucket->OpenOrCreate(BucketPath.c_str());
+ Bucket->OpenOrCreate(BucketPath);
}
}
@@ -762,7 +837,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
std::filesystem::path bucketPath = m_RootDir;
bucketPath /= std::string(InBucket);
- Bucket->OpenOrCreate(bucketPath.c_str());
+ Bucket->OpenOrCreate(bucketPath);
}
}
@@ -774,6 +849,63 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
}
}
+void
+ZenCacheDiskLayer::DiscoverBuckets()
+{
+ FileSystemTraversal Traversal;
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent,
+ [[maybe_unused]] const path_view& File,
+ [[maybe_unused]] uint64_t FileSize) override
+ {
+ }
+
+ virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override
+ {
+ Dirs.push_back(std::wstring(DirectoryName));
+ return false;
+ }
+
+ std::vector<std::wstring> Dirs;
+ } Visit;
+
+ Traversal.TraverseFileSystem(m_RootDir, Visit);
+
+ // Initialize buckets
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ for (const std::wstring& BucketName : Visit.Dirs)
+ {
+ // New bucket needs to be created
+
+ std::string BucketName8 = WideToUtf8(BucketName);
+
+ if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end())
+ {
+ }
+ else
+ {
+ auto InsertResult = m_Buckets.try_emplace(BucketName8, m_CasStore);
+
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= BucketName8;
+
+ CacheBucket& Bucket = InsertResult.first->second;
+
+ Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false);
+
+ if (!Bucket.IsOk())
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir);
+
+ m_Buckets.erase(InsertResult.first);
+ }
+ }
+ }
+}
+
bool
ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
{
@@ -830,27 +962,10 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
}
}
-//////////////////////////////////////////////////////////////////////////
-
-ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore)
-{
- ZEN_UNUSED(CacheStore);
-}
-
-ZenCacheTracker::~ZenCacheTracker()
-{
-}
-
-void
-ZenCacheTracker::TrackAccess(std::string_view Bucket, const IoHash& HashKey)
-{
- ZEN_UNUSED(Bucket);
- ZEN_UNUSED(HashKey);
-}
-
void
-ZenCacheTracker::Flush()
+ZenCacheDiskLayer::GarbageCollect(GcContext& GcCtx)
{
+ ZEN_UNUSED(GcCtx);
}
} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index f96757409..4753af627 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -46,6 +46,11 @@ struct ZenCacheValue
CbObject IndexData;
};
+/** In-memory cache storage
+
+ Intended for small values which are frequently accessed
+
+ */
class ZenCacheMemoryLayer
{
public:
@@ -56,20 +61,41 @@ public:
void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
bool DropBucket(std::string_view Bucket);
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
+
+ struct Configuration
+ {
+ uint64_t TargetFootprintBytes = 16 * 1024 * 1024;
+ uint64_t ScavengeThreshold = 4 * 1024 * 1024;
+ };
+
+ const Configuration& GetConfiguration() const { return m_Configuration; }
+ void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; }
private:
struct CacheBucket
{
- RwLock m_bucketLock;
- tsl::robin_map<IoHash, IoBuffer> m_cacheMap;
+ struct BucketValue
+ {
+ uint64_t LastAccess = 0;
+ IoBuffer Payload;
+ };
+
+ RwLock m_bucketLock;
+ tsl::robin_map<IoHash, BucketValue> m_cacheMap;
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(const IoHash& HashKey, const ZenCacheValue& Value);
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
+
+ private:
+ uint64_t GetCurrentTimeStamp();
};
RwLock m_Lock;
std::unordered_map<std::string, CacheBucket> m_Buckets;
+ Configuration m_Configuration;
};
class ZenCacheDiskLayer
@@ -83,6 +109,9 @@ public:
bool DropBucket(std::string_view Bucket);
void Flush();
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
+
+ void DiscoverBuckets();
private:
/** A cache bucket manages a single directory containing
@@ -107,6 +136,7 @@ public:
bool DropBucket(std::string_view Bucket);
void Flush();
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
private:
std::filesystem::path m_RootDir;
@@ -116,18 +146,4 @@ private:
uint64_t m_LastScrubTime = 0;
};
-/** Tracks cache entry access, stats and orchestrates cleanup activities
- */
-class ZenCacheTracker
-{
-public:
- ZenCacheTracker(ZenCacheStore& CacheStore);
- ~ZenCacheTracker();
-
- void TrackAccess(std::string_view Bucket, const IoHash& HashKey);
- void Flush();
-
-private:
-};
-
} // namespace zen
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index 254032226..df3259542 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -90,6 +90,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(GlobalOptions.IsTest)->default_value("false"));
options.add_options()("log-id", "Specify id for adding context to log output", cxxopts::value<std::string>(GlobalOptions.LogId));
options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::filesystem::path>(GlobalOptions.DataDir));
+ options.add_options()("content-dir", "Frontend content directory", cxxopts::value<std::filesystem::path>(GlobalOptions.ContentDir));
+ options.add_options()("abslog", "Path to log file", cxxopts::value<std::filesystem::path>(GlobalOptions.AbsLogFile));
options
.add_option("lifetime", "", "owner-pid", "Specify owning process id", cxxopts::value<int>(GlobalOptions.OwnerPid), "<identifier>");
@@ -212,8 +214,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
options.add_option("cache",
"",
"upstream-zen-url",
- "URL to a remote Zen server instance",
- cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.ZenConfig.Url)->default_value(""),
+ "URL to remote Zen server. Use a comma separated list to choose the one with the best latency.",
+ cxxopts::value<std::vector<std::string>>(ServiceConfig.UpstreamCacheConfig.ZenConfig.Urls)->default_value(""),
"");
options.add_option("cache",
@@ -227,7 +229,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
"",
"upstream-stats",
"Collect performance metrics for upstream endpoints",
- cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("true"),
+ cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("false"),
"");
try
@@ -366,9 +368,17 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv
if (auto ZenConfig = UpstreamConfig->get<sol::optional<sol::table>>("zen"))
{
- UpdateStringValueFromConfig(ZenConfig.value(),
- std::string_view("url"),
- ServiceConfig.UpstreamCacheConfig.ZenConfig.Url);
+ if (auto Url = ZenConfig.value().get<sol::optional<std::string>>("url"))
+ {
+ ServiceConfig.UpstreamCacheConfig.ZenConfig.Urls.push_back(Url.value());
+ }
+ else if (auto Urls = ZenConfig.value().get<sol::optional<sol::table>>("url"))
+ {
+ for (const auto& Kv : Urls.value())
+ {
+ ServiceConfig.UpstreamCacheConfig.ZenConfig.Urls.push_back(Kv.second.as<std::string>());
+ }
+ }
}
}
}
diff --git a/zenserver/config.h b/zenserver/config.h
index 75c19d690..405e22739 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -17,6 +17,8 @@ struct ZenServerOptions
bool UninstallService = false; // Flag used to initiate service uninstall (temporary)
std::string LogId; // Id for tagging log output
std::filesystem::path DataDir; // Root directory for state (used for testing)
+ std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
+ std::filesystem::path AbsLogFile;
};
struct ZenUpstreamJupiterConfig
@@ -34,7 +36,7 @@ struct ZenUpstreamJupiterConfig
struct ZenUpstreamZenConfig
{
- std::string Url;
+ std::vector<std::string> Urls;
};
enum class UpstreamCachePolicy : uint8_t
diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp
index bc7b883b5..7a7773cba 100644
--- a/zenserver/diag/logging.cpp
+++ b/zenserver/diag/logging.cpp
@@ -196,7 +196,8 @@ InitializeLogging(const ZenServerOptions& GlobalOptions)
EnableVTMode();
- std::filesystem::path LogPath = GlobalOptions.DataDir / "logs/zenserver.log";
+ std::filesystem::path LogPath =
+ !GlobalOptions.AbsLogFile.empty() ? GlobalOptions.AbsLogFile : GlobalOptions.DataDir / "logs/zenserver.log";
bool IsAsync = true;
spdlog::level::level_enum LogLevel = spdlog::level::info;
@@ -250,7 +251,7 @@ InitializeLogging(const ZenServerOptions& GlobalOptions)
Sinks.push_back(FileSink);
#if ZEN_PLATFORM_WINDOWS
- if (zen::IsDebuggerPresent())
+ if (zen::IsDebuggerPresent() && GlobalOptions.IsDebug)
{
auto DebugSink = std::make_shared<spdlog::sinks::msvc_sink_mt>();
DebugSink->set_level(spdlog::level::debug);
diff --git a/zenserver/experimental/frontend.cpp b/zenserver/experimental/frontend.cpp
new file mode 100644
index 000000000..98d570cfe
--- /dev/null
+++ b/zenserver/experimental/frontend.cpp
@@ -0,0 +1,119 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "frontend.h"
+
+#include <zencore/filesystem.h>
+#include <zencore/string.h>
+
+namespace zen {
+
+namespace html {
+
+ constexpr std::string_view Index = R"(
+<!DOCTYPE html>
+<html>
+<head>
+<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" integrity="sha384-F3w7mX95PdgyTmZZMECAngseQB83DfGTowi0iMjiWaeVhAn4FJkqJByhZMI3AhiU" crossorigin="anonymous">
+<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.min.js" integrity="sha384-skAcpIdS7UcVUC05LJ9Dxay8AXcDYfBJqt1CJ85S/CFujBsIzCIv+l9liuYLaMQ/" crossorigin="anonymous"></script>
+<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/font/bootstrap-icons.css">
+<style type="text/css">
+body {
+ background-color: #fafafa;
+}
+</style>
+<script type="text/javascript">
+ const getCacheStats = () => {
+ const opts = { headers: { "Accept": "application/json" } };
+ fetch("/stats/z$", opts)
+ .then(response => {
+ if (!response.ok) {
+ throw Error(response.statusText);
+ }
+ return response.json();
+ })
+ .then(json => {
+ document.getElementById("status").innerHTML = "connected"
+ document.getElementById("stats").innerHTML = JSON.stringify(json, null, 4);
+ })
+ .catch(error => {
+ document.getElementById("status").innerHTML = "disconnected"
+ document.getElementById("stats").innerHTML = ""
+ console.log(error);
+ })
+ .finally(() => {
+ window.setTimeout(getCacheStats, 1000);
+ });
+ };
+ getCacheStats();
+</script>
+</head>
+<body>
+ <div class="container">
+ <div class="row">
+ <div class="text-center mt-5">
+ <pre>
+__________ _________ __
+\____ / ____ ____ / _____/_/ |_ ____ _______ ____
+ / / _/ __ \ / \ \_____ \ \ __\ / _ \ \_ __ \_/ __ \
+ / /_ \ ___/ | | \ / \ | | ( <_> ) | | \/\ ___/
+/_______ \ \___ >|___| //_______ / |__| \____/ |__| \___ >
+ \/ \/ \/ \/ \/
+ </pre>
+ <pre id="status"/>
+ </div>
+ </div>
+ <div class="row">
+ <pre class="mb-0">Z$:</pre>
+ <pre id="stats"></pre>
+ <div>
+ </div>
+</body>
+</html>
+)";
+
+} // namespace html
+
+HttpFrontendService::HttpFrontendService(std::filesystem::path Directory) : m_Directory(Directory)
+{
+}
+
+HttpFrontendService::~HttpFrontendService()
+{
+}
+
+const char*
+HttpFrontendService::BaseUri() const
+{
+ return "/dashboard"; // in order to use the root path we need to remove HttpAddUrlToUrlGroup in HttpSys.cpp
+}
+
+void
+HttpFrontendService::HandleRequest(zen::HttpServerRequest& Request)
+{
+ using namespace std::literals;
+
+ if (m_Directory.empty())
+ {
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kHTML, html::Index);
+ }
+ else
+ {
+ std::string_view Uri = Request.RelativeUri();
+ std::filesystem::path RelPath{Uri.empty() ? "index.html" : Uri};
+ std::filesystem::path AbsPath = m_Directory / RelPath;
+
+ FileContents File = ReadFile(AbsPath);
+
+ if (!File.ErrorCode)
+ {
+ // TODO: Map file extension to MIME type
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kHTML, File.Data[0]);
+ }
+ else
+ {
+ return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Ooops!"sv);
+ }
+ }
+}
+
+} // namespace zen
diff --git a/zenserver/experimental/frontend.h b/zenserver/experimental/frontend.h
new file mode 100644
index 000000000..2ae20e940
--- /dev/null
+++ b/zenserver/experimental/frontend.h
@@ -0,0 +1,24 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenhttp/httpserver.h>
+
+#include <filesystem>
+
+namespace zen {
+
+class HttpFrontendService final : public zen::HttpService
+{
+public:
+ HttpFrontendService(std::filesystem::path Directory);
+ virtual ~HttpFrontendService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+
+private:
+ std::filesystem::path m_Directory;
+};
+
+} // namespace zen
diff --git a/zenserver/monitoring/httpstats.cpp b/zenserver/monitoring/httpstats.cpp
new file mode 100644
index 000000000..de04294d0
--- /dev/null
+++ b/zenserver/monitoring/httpstats.cpp
@@ -0,0 +1,50 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "httpstats.h"
+
+namespace zen {
+
+HttpStatsService::HttpStatsService() : m_Log(logging::Get("stats"))
+{
+}
+
+HttpStatsService::~HttpStatsService()
+{
+}
+
+const char*
+HttpStatsService::BaseUri() const
+{
+ return "/stats/";
+}
+
+void
+HttpStatsService::RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Providers.insert_or_assign(std::string(Id), &Provider);
+}
+
+void
+HttpStatsService::HandleRequest(HttpServerRequest& Request)
+{
+ using namespace std::literals;
+
+ std::string_view Key = Request.RelativeUri();
+
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kHead:
+ case HttpVerb::kGet:
+ if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers))
+ {
+ return It->second->HandleStatsRequest(Request);
+ }
+
+ [[fallthrough]];
+ default:
+ return;
+ }
+}
+
+} // namespace zen
diff --git a/zenserver/monitoring/httpstats.h b/zenserver/monitoring/httpstats.h
new file mode 100644
index 000000000..1c3c79dd0
--- /dev/null
+++ b/zenserver/monitoring/httpstats.h
@@ -0,0 +1,37 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+#include <zenhttp/httpserver.h>
+
+#include <map>
+
+namespace zen {
+
+struct IHttpStatsProvider
+{
+ virtual void HandleStatsRequest(HttpServerRequest& Request) = 0;
+};
+
+class HttpStatsService : public HttpService
+{
+public:
+ HttpStatsService();
+ ~HttpStatsService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+ void RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider);
+
+private:
+ spdlog::logger& m_Log;
+ HttpRequestRouter m_Router;
+
+ inline spdlog::logger& Log() { return m_Log; }
+
+ RwLock m_Lock;
+ std::map<std::string, IHttpStatsProvider*> m_Providers;
+};
+
+} // namespace zen \ No newline at end of file
diff --git a/zenserver/monitoring/httpstatus.cpp b/zenserver/monitoring/httpstatus.cpp
new file mode 100644
index 000000000..e12662b1c
--- /dev/null
+++ b/zenserver/monitoring/httpstatus.cpp
@@ -0,0 +1,50 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "httpstatus.h"
+
+namespace zen {
+
+HttpStatusService::HttpStatusService() : m_Log(logging::Get("status"))
+{
+}
+
+HttpStatusService::~HttpStatusService()
+{
+}
+
+const char*
+HttpStatusService::BaseUri() const
+{
+ return "/status/";
+}
+
+void
+HttpStatusService::RegisterHandler(std::string_view Id, IHttpStatusProvider& Provider)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Providers.insert_or_assign(std::string(Id), &Provider);
+}
+
+void
+HttpStatusService::HandleRequest(HttpServerRequest& Request)
+{
+ using namespace std::literals;
+
+ std::string_view Key = Request.RelativeUri();
+
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kHead:
+ case HttpVerb::kGet:
+ if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers))
+ {
+ return It->second->HandleStatusRequest(Request);
+ }
+
+ [[fallthrough]];
+ default:
+ return;
+ }
+}
+
+} // namespace zen
diff --git a/zenserver/monitoring/httpstatus.h b/zenserver/monitoring/httpstatus.h
new file mode 100644
index 000000000..8f069f760
--- /dev/null
+++ b/zenserver/monitoring/httpstatus.h
@@ -0,0 +1,37 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+#include <zenhttp/httpserver.h>
+
+#include <map>
+
+namespace zen {
+
+struct IHttpStatusProvider
+{
+ virtual void HandleStatusRequest(HttpServerRequest& Request) = 0;
+};
+
+class HttpStatusService : public HttpService
+{
+public:
+ HttpStatusService();
+ ~HttpStatusService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+ void RegisterHandler(std::string_view Id, IHttpStatusProvider& Provider);
+
+private:
+ spdlog::logger& m_Log;
+ HttpRequestRouter m_Router;
+
+ RwLock m_Lock;
+ std::map<std::string, IHttpStatusProvider*> m_Providers;
+
+ inline spdlog::logger& Log() { return m_Log; }
+};
+
+} // namespace zen \ No newline at end of file
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 7870f9559..5c4983472 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -1200,11 +1200,6 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
}
- if (Verb == HttpVerb::kHead)
- {
- HttpReq.SetSuppressResponseBody();
- }
-
if (IsOffset)
{
if (Offset > Value.Size())
@@ -1425,7 +1420,12 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver))
{
- ZEN_ERROR("Received malformed package!");
+ std::filesystem::path BadPackagePath =
+ Oplog.TempPath() / "bad_packages" / "session{}_request{}"_format(HttpReq.SessionId(), HttpReq.RequestId());
+
+ ZEN_ERROR("Received malformed package! Saving payload to '{}'", BadPackagePath);
+
+ zen::WriteFile(BadPackagePath, Payload);
return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package");
}
diff --git a/zenserver/testing/httptest.cpp b/zenserver/testing/httptest.cpp
index 01866a63b..924546762 100644
--- a/zenserver/testing/httptest.cpp
+++ b/zenserver/testing/httptest.cpp
@@ -4,9 +4,12 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
+#include <zencore/timer.h>
namespace zen {
+using namespace fmt::literals;
+
HttpTestingService::HttpTestingService()
{
m_Router.RegisterRoute(
@@ -15,6 +18,44 @@ HttpTestingService::HttpTestingService()
HttpVerb::kGet);
m_Router.RegisterRoute(
+ "hello_slow",
+ [this](HttpRouterRequest& Req) {
+ Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest& Request) {
+ Stopwatch Timer;
+ Sleep(1000);
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kText,
+ "hello, took me {}"_format(NiceTimeSpanMs(Timer.GetElapsedTimeMs())));
+ });
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "hello_veryslow",
+ [this](HttpRouterRequest& Req) {
+ Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest& Request) {
+ Stopwatch Timer;
+ Sleep(60000);
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kText,
+ "hello, took me {}"_format(NiceTimeSpanMs(Timer.GetElapsedTimeMs())));
+ });
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "hello_throw",
+ [this](HttpRouterRequest& Req) {
+ Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest&) { throw std::runtime_error("intentional error"); });
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "hello_noresponse",
+ [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest&) {}); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"metrics",
[this](HttpRouterRequest& Req) {
metrics::OperationTiming::Scope _(m_TimingStats);
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 9573a1631..1de417008 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -47,9 +47,9 @@ struct CloudCacheAccessToken
struct CloudCacheResult
{
IoBuffer Response;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- int32_t ErrorCode = {};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ int32_t ErrorCode{};
std::string Reason;
bool Success = false;
};
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 03054b542..5b2629f72 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -4,6 +4,7 @@
#include "jupiter.h"
#include "zen.h"
+#include <zencore/blockingqueue.h>
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
@@ -23,7 +24,6 @@
#include <algorithm>
#include <atomic>
-#include <deque>
#include <thread>
#include <unordered_map>
@@ -33,70 +33,6 @@ using namespace std::literals;
namespace detail {
- template<typename T>
- class BlockingQueue
- {
- public:
- BlockingQueue() = default;
-
- ~BlockingQueue() { CompleteAdding(); }
-
- void Enqueue(T&& Item)
- {
- {
- std::lock_guard Lock(m_Lock);
- m_Queue.emplace_back(std::move(Item));
- m_Size++;
- }
-
- m_NewItemSignal.notify_one();
- }
-
- bool WaitAndDequeue(T& Item)
- {
- if (m_CompleteAdding.load())
- {
- return false;
- }
-
- std::unique_lock Lock(m_Lock);
- m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); });
-
- if (!m_Queue.empty())
- {
- Item = std::move(m_Queue.front());
- m_Queue.pop_front();
- m_Size--;
-
- return true;
- }
-
- return false;
- }
-
- void CompleteAdding()
- {
- if (!m_CompleteAdding.load())
- {
- m_CompleteAdding.store(true);
- m_NewItemSignal.notify_all();
- }
- }
-
- std::size_t Size() const
- {
- std::unique_lock Lock(m_Lock);
- return m_Queue.size();
- }
-
- private:
- mutable std::mutex m_Lock;
- std::condition_variable m_NewItemSignal;
- std::deque<T> m_Queue;
- std::atomic_bool m_CompleteAdding{false};
- std::atomic_uint32_t m_Size;
- };
-
class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
@@ -105,12 +41,14 @@ namespace detail {
, m_UseLegacyDdc(Options.UseLegacyDdc)
{
using namespace fmt::literals;
- m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl);
+ m_DisplayName = "Jupiter - '{}'"_format(Options.ServiceUrl);
m_Client = new CloudCacheClient(Options);
}
virtual ~JupiterUpstreamEndpoint() = default;
+ virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); }
+
virtual bool IsHealthy() const override { return m_HealthOk.load(); }
virtual UpstreamEndpointHealth CheckHealth() override
@@ -186,16 +124,23 @@ namespace detail {
}
}
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -206,16 +151,23 @@ namespace detail {
CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -386,22 +338,70 @@ namespace detail {
class ZenUpstreamEndpoint final : public UpstreamEndpoint
{
+ struct ZenEndpoint
+ {
+ std::string Url;
+ std::string Reason;
+ double Latency{};
+ bool Ok = false;
+
+ bool operator<(const ZenEndpoint& RHS) const { return Ok && RHS.Ok ? Latency < RHS.Latency : Ok; }
+ };
+
public:
- ZenUpstreamEndpoint(std::string_view ServiceUrl)
+ ZenUpstreamEndpoint(std::span<std::string const> Urls) : m_Log(zen::logging::Get("upstream")), m_DisplayName("ZEN")
{
- using namespace fmt::literals;
- m_DisplayName = "Zen - {}"_format(ServiceUrl);
- m_Client = new ZenStructuredCacheClient(ServiceUrl);
+ for (const auto& Url : Urls)
+ {
+ m_Endpoints.push_back({.Url = Url});
+ }
}
~ZenUpstreamEndpoint() = default;
+ virtual UpstreamEndpointHealth Initialize() override
+ {
+ using namespace fmt::literals;
+
+ const ZenEndpoint& Ep = GetEndpoint();
+ if (Ep.Ok)
+ {
+ m_ServiceUrl = Ep.Url;
+ m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
+ m_Client = new ZenStructuredCacheClient(m_ServiceUrl);
+
+ m_HealthOk = true;
+ return {.Ok = true};
+ }
+
+ m_HealthOk = false;
+ return {.Reason = Ep.Reason};
+ }
+
virtual bool IsHealthy() const override { return m_HealthOk; }
virtual UpstreamEndpointHealth CheckHealth() override
{
+ using namespace fmt::literals;
+
try
{
+ if (m_Client.IsNull())
+ {
+ const ZenEndpoint& Ep = GetEndpoint();
+ if (Ep.Ok)
+ {
+ m_ServiceUrl = Ep.Url;
+ m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
+ m_Client = new ZenStructuredCacheClient(m_ServiceUrl);
+
+ m_HealthOk = true;
+ return {.Ok = true};
+ }
+
+ return {.Reason = Ep.Reason};
+ }
+
ZenStructuredCacheSession Session(*m_Client);
ZenCacheResult Result;
@@ -429,16 +429,23 @@ namespace detail {
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -450,16 +457,23 @@ namespace detail {
const ZenCacheResult Result =
Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -563,6 +577,42 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
+ const ZenEndpoint& GetEndpoint()
+ {
+ for (ZenEndpoint& Ep : m_Endpoints)
+ {
+ ZenStructuredCacheClient Client(Ep.Url);
+ ZenStructuredCacheSession Session(Client);
+ const int32_t SampleCount = 2;
+
+ Ep.Ok = false;
+ Ep.Latency = {};
+
+ for (int32_t Sample = 0; Sample < SampleCount; ++Sample)
+ {
+ ZenCacheResult Result = Session.CheckHealth();
+ Ep.Ok = Result.Success;
+ Ep.Reason = std::move(Result.Reason);
+ Ep.Latency += Result.ElapsedSeconds;
+ }
+ Ep.Latency /= double(SampleCount);
+ }
+
+ std::sort(std::begin(m_Endpoints), std::end(m_Endpoints));
+
+ for (const auto& Ep : m_Endpoints)
+ {
+ ZEN_INFO("ping ZEN endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason);
+ }
+
+ return m_Endpoints.front();
+ }
+
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
+ std::string m_ServiceUrl;
+ std::vector<ZenEndpoint> m_Endpoints;
std::string m_DisplayName;
RefPtr<ZenStructuredCacheClient> m_Client;
UpstreamEndpointStats m_Stats;
@@ -575,7 +625,7 @@ namespace detail {
struct UpstreamStats
{
- static constexpr uint64_t MaxSampleCount = 100ull;
+ static constexpr uint64_t MaxSampleCount = 1000ull;
UpstreamStats(bool Enabled) : m_Enabled(Enabled) {}
@@ -584,11 +634,6 @@ struct UpstreamStats
const GetUpstreamCacheResult& Result,
const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- if (!m_Enabled)
- {
- return;
- }
-
UpstreamEndpointStats& Stats = Endpoint.Stats();
if (Result.Error)
@@ -606,7 +651,7 @@ struct UpstreamStats
Stats.MissCount++;
}
- if (m_SampleCount++ % MaxSampleCount)
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
{
Dump(Logger, Endpoints);
}
@@ -617,11 +662,6 @@ struct UpstreamStats
const PutUpstreamCacheResult& Result,
const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- if (!m_Enabled)
- {
- return;
- }
-
UpstreamEndpointStats& Stats = Endpoint.Stats();
if (Result.Success)
{
@@ -634,7 +674,7 @@ struct UpstreamStats
Stats.ErrorCount++;
}
- if (m_SampleCount++ % MaxSampleCount)
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
{
Dump(Logger, Endpoints);
}
@@ -693,7 +733,7 @@ public:
{
for (auto& Endpoint : m_Endpoints)
{
- const UpstreamEndpointHealth Health = Endpoint->CheckHealth();
+ const UpstreamEndpointHealth Health = Endpoint->Initialize();
if (Health.Ok)
{
ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName());
@@ -925,7 +965,7 @@ private:
spdlog::logger& Log() { return m_Log; }
- using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>;
+ using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>;
struct RunState
{
@@ -975,9 +1015,9 @@ MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
}
std::unique_ptr<UpstreamEndpoint>
-MakeZenUpstreamEndpoint(std::string_view Url)
+MakeZenUpstreamEndpoint(std::span<std::string const> Urls)
{
- return std::make_unique<detail::ZenUpstreamEndpoint>(Url);
+ return std::make_unique<detail::ZenUpstreamEndpoint>(Urls);
}
} // namespace zen
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index a6b1e9784..edc995da6 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -45,35 +45,29 @@ struct UpstreamCacheOptions
bool StatsEnabled = false;
};
-enum class UpstreamStatusCode : uint8_t
-{
- Ok,
- Error
-};
-
struct UpstreamError
{
- UpstreamStatusCode StatusCode = UpstreamStatusCode::Ok;
- std::string Reason;
+ int32_t ErrorCode{};
+ std::string Reason{};
- explicit operator bool() const { return StatusCode != UpstreamStatusCode::Ok; }
+ explicit operator bool() const { return ErrorCode != 0; }
};
struct GetUpstreamCacheResult
{
IoBuffer Value;
- UpstreamError Error;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- bool Success = false;
+ UpstreamError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
};
struct PutUpstreamCacheResult
{
std::string Reason;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- bool Success = false;
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
};
struct UpstreamEndpointHealth
@@ -84,14 +78,14 @@ struct UpstreamEndpointHealth
struct UpstreamEndpointStats
{
- std::atomic_uint64_t HitCount = {};
- std::atomic_uint64_t MissCount = {};
- std::atomic_uint64_t UpCount = {};
- std::atomic_uint64_t ErrorCount = {};
- std::atomic<double> UpBytes = {};
- std::atomic<double> DownBytes = {};
- std::atomic<double> SecondsUp = {};
- std::atomic<double> SecondsDown = {};
+ std::atomic_uint64_t HitCount{};
+ std::atomic_uint64_t MissCount{};
+ std::atomic_uint64_t UpCount{};
+ std::atomic_uint64_t ErrorCount{};
+ std::atomic<double> UpBytes{};
+ std::atomic<double> DownBytes{};
+ std::atomic<double> SecondsUp{};
+ std::atomic<double> SecondsDown{};
};
/**
@@ -102,6 +96,8 @@ class UpstreamEndpoint
public:
virtual ~UpstreamEndpoint() = default;
+ virtual UpstreamEndpointHealth Initialize() = 0;
+
virtual bool IsHealthy() const = 0;
virtual UpstreamEndpointHealth CheckHealth() = 0;
@@ -149,6 +145,6 @@ std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Opt
std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options);
-std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::string_view Url);
+std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::span<std::string const> Urls);
} // namespace zen
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index c988a6b0b..6141fd397 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -328,7 +328,9 @@ namespace detail {
//////////////////////////////////////////////////////////////////////////
-ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl) : m_ServiceUrl(ServiceUrl)
+ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl)
+: m_Log(logging::Get(std::string_view("zenclient")))
+, m_ServiceUrl(ServiceUrl)
{
}
@@ -369,7 +371,7 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State)
using namespace std::literals;
ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient)
-: m_Log(logging::Get("zenclient"sv))
+: m_Log(OuterClient.Log())
, m_Client(OuterClient)
{
m_SessionState = m_Client.AllocSessionState();
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 158be668a..12e46bd8d 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -138,8 +138,11 @@ public:
std::string_view ServiceUrl() const { return m_ServiceUrl; }
+ inline spdlog::logger& Log() { return m_Log; }
+
private:
- std::string m_ServiceUrl;
+ spdlog::logger& m_Log;
+ std::string m_ServiceUrl;
RwLock m_SessionStateLock;
std::list<detail::ZenCacheSessionState*> m_SessionStateCache;
diff --git a/zenserver/xmake.lua b/zenserver/xmake.lua
index 7a6981fcd..fb1ba651d 100644
--- a/zenserver/xmake.lua
+++ b/zenserver/xmake.lua
@@ -32,3 +32,18 @@ target("zenserver")
add_packages(
"vcpkg::cxxopts",
"vcpkg::mimalloc")
+
+ on_load(function(target)
+ local commit, err = os.iorun("git log -1 --format=\"%h-%cI\"")
+ if commit ~= nil then
+ commit = commit:gsub("%s+", "")
+ commit = commit:gsub("\n", "")
+ if is_mode("release") then
+ commit = "rel-" .. commit
+ else
+ commit = "dbg-" .. commit
+ end
+ target:add("defines","BUILD_VERSION=\"" .. commit .. "\"")
+ print("build version " .. commit)
+ end
+ end)
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index b45df9fef..18c59636d 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -1,5 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include <zencore/compactbinarybuilder.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
@@ -31,6 +32,10 @@
#include <set>
#include <unordered_map>
+#if !defined(BUILD_VERSION)
+# define BUILD_VERSION ("dev-build")
+#endif
+
//////////////////////////////////////////////////////////////////////////
// We don't have any doctest code in this file but this is needed to bring
// in some shared code into the executable
@@ -81,7 +86,10 @@
#include "cache/structuredcachestore.h"
#include "compute/apply.h"
#include "diag/diagsvcs.h"
+#include "experimental/frontend.h"
#include "experimental/usnjournal.h"
+#include "monitoring/httpstats.h"
+#include "monitoring/httpstatus.h"
#include "projectstore.h"
#include "testing/httptest.h"
#include "testing/launch.h"
@@ -95,17 +103,16 @@
namespace zen {
-class ZenServer
-{
- ZenServerState::ZenServerEntry* m_ServerEntry = nullptr;
+using namespace std::literals;
+class ZenServer : public IHttpStatusProvider
+{
public:
void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid, ZenServerState::ZenServerEntry* ServerEntry)
{
- m_ServerEntry = ServerEntry;
using namespace fmt::literals;
- ZEN_INFO(ZEN_APP_NAME " initializing");
+ m_ServerEntry = ServerEntry;
m_DebugOptionForcedCrash = ServiceConfig.ShouldCrash;
if (ParentPid)
@@ -139,6 +146,15 @@ public:
// Ok so now we're configured, let's kick things off
+ m_Http = zen::CreateHttpServer();
+ m_Http->Initialize(BasePort);
+ m_Http->RegisterService(m_HealthService);
+ m_Http->RegisterService(m_StatsService);
+ m_Http->RegisterService(m_StatusService);
+ m_StatusService.RegisterHandler("status", *this);
+
+ // Initialize storage and services
+
ZEN_INFO("initializing storage");
zen::CasStoreConfiguration Config;
@@ -166,95 +182,7 @@ public:
if (ServiceConfig.StructuredCacheEnabled)
{
- using namespace std::literals;
- auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; };
-
- ZEN_INFO("instantiating structured cache service");
- m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
-
- std::unique_ptr<zen::UpstreamCache> UpstreamCache;
- if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
- {
- const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig;
-
- zen::UpstreamCacheOptions UpstreamOptions;
- UpstreamOptions.ReadUpstream =
- (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
- UpstreamOptions.WriteUpstream =
- (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
-
- if (UpstreamConfig.UpstreamThreadCount < 32)
- {
- UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
- }
-
- UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled;
-
- UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
-
- if (!UpstreamConfig.ZenConfig.Url.empty())
- {
- std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Url);
- UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
- }
-
- {
- zen::CloudCacheClientOptions Options;
- if (UpstreamConfig.JupiterConfig.UseProductionSettings)
- {
- Options = zen::CloudCacheClientOptions{
- .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
- .DdcNamespace = "ue.ddc"sv,
- .BlobStoreNamespace = "ue.ddc"sv,
- .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
- .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
- .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
- .UseLegacyDdc = false};
- }
- else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
- {
- Options = zen::CloudCacheClientOptions{
- .ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
- .DdcNamespace = "ue4.ddc"sv,
- .BlobStoreNamespace = "test.ddc"sv,
- .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
- .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
- .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
- .UseLegacyDdc = false};
- }
-
- Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl);
- Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace);
- Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace);
- Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider);
- Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId);
- Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret);
- Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc;
-
- if (!Options.ServiceUrl.empty())
- {
- std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options);
- UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
- }
- }
-
- if (UpstreamCache->Initialize())
- {
- ZEN_INFO("upstream cache active ({})",
- UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE"
- : UpstreamOptions.ReadUpstream ? "READONLY"
- : UpstreamOptions.WriteUpstream ? "WRITEONLY"
- : "DISABLED");
- }
- else
- {
- UpstreamCache.reset();
- ZEN_INFO("NOT using upstream cache");
- }
- }
-
- m_StructuredCacheService.reset(
- new zen::HttpStructuredCacheService(*m_CacheStore, *m_CasStore, *m_CidStore, std::move(UpstreamCache)));
+ InitializeStructuredCache(ServiceConfig);
}
else
{
@@ -272,13 +200,8 @@ public:
}
#endif
- m_Http = zen::CreateHttpServer();
- m_Http->Initialize(BasePort);
- m_Http->RegisterService(m_HealthService);
-
m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics
m_Http->RegisterService(m_TestingService);
-
m_Http->RegisterService(m_AdminService);
if (m_HttpProjectService)
@@ -302,8 +225,17 @@ public:
{
m_Http->RegisterService(*m_HttpFunctionService);
}
+
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot);
+
+ if (m_FrontendService)
+ {
+ m_Http->RegisterService(*m_FrontendService);
+ }
}
+ void InitializeStructuredCache(ZenServiceConfig& ServiceConfig);
+
#if ZEN_ENABLE_MESH
void StartMesh(int BasePort)
{
@@ -344,8 +276,12 @@ public:
const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode;
+ m_CurrentState = kRunning;
+
m_Http->Run(IsInteractiveMode);
+ m_CurrentState = kShuttingDown;
+
ZEN_INFO(ZEN_APP_NAME " exiting");
m_IoContext.stop();
@@ -364,6 +300,7 @@ public:
void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; }
void SetTestMode(bool State) { m_TestMode = State; }
void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; }
+ void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; }
void EnsureIoRunner()
{
@@ -417,6 +354,7 @@ public:
void Scrub()
{
+ Stopwatch Timer;
ZEN_INFO("Storage validation STARTING");
ScrubContext Ctx;
@@ -425,7 +363,13 @@ public:
m_ProjectStore->Scrub(Ctx);
m_StructuredCacheService->Scrub(Ctx);
- ZEN_INFO("Storage validation DONE");
+ const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
+
+ ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})",
+ NiceTimeSpanMs(ElapsedTimeMs),
+ NiceBytes(Ctx.ScrubbedBytes()),
+ Ctx.ScrubbedChunks(),
+ NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs));
}
void Flush()
@@ -443,17 +387,51 @@ public:
m_ProjectStore->Flush();
}
+ virtual void HandleStatusRequest(HttpServerRequest& Request) override
+ {
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Cbo << "state" << ToString(m_CurrentState);
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+
private:
- bool m_IsDedicatedMode = false;
- bool m_TestMode = false;
- std::filesystem::path m_DataRoot;
- std::jthread m_IoRunner;
- asio::io_context m_IoContext;
- asio::steady_timer m_PidCheckTimer{m_IoContext};
- zen::ProcessMonitor m_ProcessMonitor;
- zen::NamedMutex m_ServerMutex;
+ ZenServerState::ZenServerEntry* m_ServerEntry = nullptr;
+ bool m_IsDedicatedMode = false;
+ bool m_TestMode = false;
+ std::filesystem::path m_DataRoot;
+ std::filesystem::path m_ContentRoot;
+ std::jthread m_IoRunner;
+ asio::io_context m_IoContext;
+ asio::steady_timer m_PidCheckTimer{m_IoContext};
+ zen::ProcessMonitor m_ProcessMonitor;
+ zen::NamedMutex m_ServerMutex;
+
+ enum ServerState
+ {
+ kInitializing,
+ kRunning,
+ kShuttingDown
+ } m_CurrentState = kInitializing;
+
+ std::string_view ToString(ServerState Value)
+ {
+ switch (Value)
+ {
+ case kInitializing:
+ return "initializing"sv;
+ case kRunning:
+ return "running"sv;
+ case kShuttingDown:
+ return "shutdown"sv;
+ default:
+ return "unknown"sv;
+ }
+ }
zen::Ref<zen::HttpServer> m_Http;
+ zen::HttpStatusService m_StatusService;
+ zen::HttpStatsService m_StatsService;
std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()};
std::unique_ptr<zen::CidStore> m_CidStore;
std::unique_ptr<zen::ZenCacheStore> m_CacheStore;
@@ -471,10 +449,105 @@ private:
zen::HttpHealthService m_HealthService;
zen::Mesh m_ZenMesh{m_IoContext};
std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
+ std::unique_ptr<zen::HttpFrontendService> m_FrontendService;
bool m_DebugOptionForcedCrash = false;
};
+void
+ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig)
+{
+ using namespace std::literals;
+ auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; };
+
+ ZEN_INFO("instantiating structured cache service");
+ m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
+
+ std::unique_ptr<zen::UpstreamCache> UpstreamCache;
+ if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
+ {
+ const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig;
+
+ zen::UpstreamCacheOptions UpstreamOptions;
+ UpstreamOptions.ReadUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
+ UpstreamOptions.WriteUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
+
+ if (UpstreamConfig.UpstreamThreadCount < 32)
+ {
+ UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
+ }
+
+ UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled;
+
+ UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
+
+ if (!UpstreamConfig.ZenConfig.Urls.empty())
+ {
+ std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls);
+ UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
+ }
+
+ {
+ zen::CloudCacheClientOptions Options;
+ if (UpstreamConfig.JupiterConfig.UseProductionSettings)
+ {
+ Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
+ .DdcNamespace = "ue.ddc"sv,
+ .BlobStoreNamespace = "ue.ddc"sv,
+ .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
+ .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
+ .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
+ .UseLegacyDdc = false};
+ }
+ else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
+ {
+ Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
+ .DdcNamespace = "ue4.ddc"sv,
+ .BlobStoreNamespace = "test.ddc"sv,
+ .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
+ .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
+ .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
+ .UseLegacyDdc = false};
+ }
+
+ Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl);
+ Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace);
+ Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace);
+ Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider);
+ Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId);
+ Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret);
+ Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc;
+
+ if (!Options.ServiceUrl.empty())
+ {
+ std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options);
+ UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
+ }
+ }
+
+ if (UpstreamCache->Initialize())
+ {
+ ZEN_INFO("upstream cache active ({})",
+ UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE"
+ : UpstreamOptions.ReadUpstream ? "READONLY"
+ : UpstreamOptions.WriteUpstream ? "WRITEONLY"
+ : "DISABLED");
+ }
+ else
+ {
+ UpstreamCache.reset();
+ ZEN_INFO("NOT using upstream cache");
+ }
+ }
+
+ m_StructuredCacheService.reset(new zen::HttpStructuredCacheService(*m_CacheStore,
+ *m_CasStore,
+ *m_CidStore,
+ m_StatsService,
+ m_StatusService,
+ std::move(UpstreamCache)));
+}
+
} // namespace zen
class ZenWindowsService : public WindowsService
@@ -522,7 +595,7 @@ ZenWindowsService::Run()
ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig);
- ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort);
+ ZEN_INFO(ZEN_APP_NAME " - starting on port {}, build '{}'", GlobalOptions.BasePort, BUILD_VERSION);
ZenServerState ServerState;
ServerState.Initialize();
@@ -560,6 +633,7 @@ ZenWindowsService::Run()
ZenServer Server;
Server.SetDataRoot(GlobalOptions.DataDir);
+ Server.SetContentRoot(GlobalOptions.ContentDir);
Server.SetTestMode(GlobalOptions.IsTest);
Server.SetDedicatedMode(GlobalOptions.IsDedicated);
Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry);
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index bcb7ea028..7fad477a1 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -108,7 +108,12 @@
<ClInclude Include="cache\structuredcachestore.h" />
<ClInclude Include="compute\apply.h" />
<ClInclude Include="config.h" />
+ <ClInclude Include="diag\formatters.h" />
<ClInclude Include="diag\logging.h" />
+ <ClInclude Include="experimental\frontend.h" />
+ <ClInclude Include="experimental\vfs.h" />
+ <ClInclude Include="monitoring\httpstats.h" />
+ <ClInclude Include="monitoring\httpstatus.h" />
<ClInclude Include="resource.h" />
<ClInclude Include="sos\sos.h" />
<ClInclude Include="testing\httptest.h" />
@@ -132,6 +137,10 @@
<ClCompile Include="compute\apply.cpp" />
<ClCompile Include="config.cpp" />
<ClCompile Include="diag\logging.cpp" />
+ <ClCompile Include="experimental\frontend.cpp" />
+ <ClCompile Include="experimental\vfs.cpp" />
+ <ClCompile Include="monitoring\httpstats.cpp" />
+ <ClCompile Include="monitoring\httpstatus.cpp" />
<ClCompile Include="projectstore.cpp" />
<ClCompile Include="cache\cacheagent.cpp" />
<ClCompile Include="sos\sos.cpp" />
diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters
index 6b99ca8d7..04e639a33 100644
--- a/zenserver/zenserver.vcxproj.filters
+++ b/zenserver/zenserver.vcxproj.filters
@@ -38,6 +38,13 @@
<ClInclude Include="testing\httptest.h" />
<ClInclude Include="windows\service.h" />
<ClInclude Include="resource.h" />
+ <ClInclude Include="experimental\frontend.h">
+ <Filter>experimental</Filter>
+ </ClInclude>
+ <ClInclude Include="diag\formatters.h" />
+ <ClInclude Include="experimental\vfs.h" />
+ <ClInclude Include="monitoring\httpstats.h" />
+ <ClInclude Include="monitoring\httpstatus.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="zenserver.cpp" />
@@ -70,6 +77,13 @@
</ClCompile>
<ClCompile Include="testing\httptest.cpp" />
<ClCompile Include="windows\service.cpp" />
+ <ClCompile Include="admin\admin.cpp" />
+ <ClCompile Include="experimental\frontend.cpp">
+ <Filter>experimental</Filter>
+ </ClCompile>
+ <ClCompile Include="experimental\vfs.cpp" />
+ <ClCompile Include="monitoring\httpstats.cpp" />
+ <ClCompile Include="monitoring\httpstatus.cpp" />
</ItemGroup>
<ItemGroup>
<Filter Include="cache">