diff options
| author | Stefan Boberg <[email protected]> | 2021-10-01 22:15:42 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-10-01 22:15:42 +0200 |
| commit | 2a6157b15508541cfd082e8544c78c8f94b18005 (patch) | |
| tree | 7f70bd046afe918c5433e753a68de72ed602532a /zenserver | |
| parent | Added explicit mimalloc IoBuffer allocation path (diff) | |
| parent | zen: added print/printpackage subcommands to help in debugging or inspecting ... (diff) | |
| download | zen-2a6157b15508541cfd082e8544c78c8f94b18005.tar.xz zen-2a6157b15508541cfd082e8544c78c8f94b18005.zip | |
Merge branch 'main' of https://github.com/EpicGames/zen
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 37 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 8 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 41 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 17 | ||||
| -rw-r--r-- | zenserver/config.cpp | 1 | ||||
| -rw-r--r-- | zenserver/config.h | 1 | ||||
| -rw-r--r-- | zenserver/experimental/frontend.cpp | 119 | ||||
| -rw-r--r-- | zenserver/experimental/frontend.h | 24 | ||||
| -rw-r--r-- | zenserver/projectstore.cpp | 7 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 103 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 21 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 242 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 44 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 11 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj | 2 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj.filters | 7 |
16 files changed, 560 insertions, 125 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 4c89b995a..8ab0276c5 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -405,6 +405,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (!Success) { ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } @@ -449,6 +450,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request ValidCount, AttachmentCount); + m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); } } @@ -467,6 +469,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + m_CacheStats.HitCount++; Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response); } else @@ -478,6 +481,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request ToString(Value.Value.GetContentType()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); + m_CacheStats.HitCount++; + if (InUpstreamCache) + { + m_CacheStats.UpstreamHitCount++; + } + Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); } } @@ -667,11 +676,11 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request case kHead: case kGet: { - HandleGetCachePayload(Request, Ref, Policy); if (Verb == kHead) { Request.SetSuppressResponseBody(); } + HandleGetCachePayload(Request, Ref, Policy); } break; case kPut: @@ -712,7 +721,8 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques if (!Payload) { - ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); + ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType())); + m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } @@ -724,6 +734,12 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques ToString(Payload.GetContentType()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); + m_CacheStats.HitCount++; + if (InUpstreamCache) + { + m_CacheStats.UpstreamHitCount++; + } + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); } @@ -846,6 +862,23 @@ HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) EmitSnapshot("requests", m_HttpRequests, Cbo); + const uint64_t HitCount = m_CacheStats.HitCount; + const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; + const uint64_t MissCount = m_CacheStats.MissCount; + const uint64_t TotalCount = HitCount + MissCount; + + Cbo.BeginObject("cache"); + Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount) * 100.0) : 0.0); + Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) * 100.0 : 0.0); + Cbo.EndObject(); + + if (m_UpstreamCache) + { + Cbo.BeginObject("upstream"); + m_UpstreamCache->GetStatus(Cbo); + Cbo.EndObject(); + } + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 47fc173e9..a360878bd 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -71,6 +71,13 @@ private: IoHash PayloadId; }; + struct CacheStats + { + std::atomic_uint64_t HitCount{}; + std::atomic_uint64_t UpstreamHitCount{}; + std::atomic_uint64_t MissCount{}; + }; + [[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef); void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); @@ -89,6 +96,7 @@ private: std::unique_ptr<UpstreamCache> m_UpstreamCache; uint64_t m_LastScrubTime = 0; metrics::OperationTiming m_HttpRequests; + CacheStats m_CacheStats; }; } // namespace zen diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 5e93ebaa9..b97f0830f 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -116,6 +116,13 @@ ZenCacheStore::Scrub(ScrubContext& Ctx) m_DiskLayer.Scrub(Ctx); m_MemLayer.Scrub(Ctx); } + +void +ZenCacheStore::GarbageCollect(GcContext& GcCtx) +{ + ZEN_UNUSED(GcCtx); +} + ////////////////////////////////////////////////////////////////////////// ZenCacheMemoryLayer::ZenCacheMemoryLayer() @@ -195,6 +202,12 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) } void +ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx) +{ + ZEN_UNUSED(GcCtx); +} + +void ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) { std::vector<IoHash> BadHashes; @@ -294,6 +307,7 @@ struct ZenCacheDiskLayer::CacheBucket void Drop(); void Flush(); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); inline bool IsOk() const { return m_Ok; } @@ -611,6 +625,12 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) } void +ZenCacheDiskLayer::CacheBucket::GarbageCollect(GcContext& GcCtx) +{ + ZEN_UNUSED(GcCtx); +} + +void ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value) { WideStringBuilder<128> DataFilePath; @@ -830,27 +850,10 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) } } -////////////////////////////////////////////////////////////////////////// - -ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore) -{ - ZEN_UNUSED(CacheStore); -} - -ZenCacheTracker::~ZenCacheTracker() -{ -} - -void -ZenCacheTracker::TrackAccess(std::string_view Bucket, const IoHash& HashKey) -{ - ZEN_UNUSED(Bucket); - ZEN_UNUSED(HashKey); -} - void -ZenCacheTracker::Flush() +ZenCacheDiskLayer::GarbageCollect(GcContext& GcCtx) { + ZEN_UNUSED(GcCtx); } } // namespace zen diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index f96757409..011f13323 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -56,6 +56,7 @@ public: void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); private: struct CacheBucket @@ -83,6 +84,7 @@ public: bool DropBucket(std::string_view Bucket); void Flush(); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); private: /** A cache bucket manages a single directory containing @@ -107,6 +109,7 @@ public: bool DropBucket(std::string_view Bucket); void Flush(); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); private: std::filesystem::path m_RootDir; @@ -116,18 +119,4 @@ private: uint64_t m_LastScrubTime = 0; }; -/** Tracks cache entry access, stats and orchestrates cleanup activities - */ -class ZenCacheTracker -{ -public: - ZenCacheTracker(ZenCacheStore& CacheStore); - ~ZenCacheTracker(); - - void TrackAccess(std::string_view Bucket, const IoHash& HashKey); - void Flush(); - -private: -}; - } // namespace zen diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 42f59b26c..759534d58 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -90,6 +90,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(GlobalOptions.IsTest)->default_value("false")); options.add_options()("log-id", "Specify id for adding context to log output", cxxopts::value<std::string>(GlobalOptions.LogId)); options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::filesystem::path>(GlobalOptions.DataDir)); + options.add_options()("content-dir", "Frontend content directory", cxxopts::value<std::filesystem::path>(GlobalOptions.ContentDir)); options .add_option("lifetime", "", "owner-pid", "Specify owning process id", cxxopts::value<int>(GlobalOptions.OwnerPid), "<identifier>"); diff --git a/zenserver/config.h b/zenserver/config.h index 75c19d690..af1a24455 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -17,6 +17,7 @@ struct ZenServerOptions bool UninstallService = false; // Flag used to initiate service uninstall (temporary) std::string LogId; // Id for tagging log output std::filesystem::path DataDir; // Root directory for state (used for testing) + std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) }; struct ZenUpstreamJupiterConfig diff --git a/zenserver/experimental/frontend.cpp b/zenserver/experimental/frontend.cpp new file mode 100644 index 000000000..79fcf0a17 --- /dev/null +++ b/zenserver/experimental/frontend.cpp @@ -0,0 +1,119 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "frontend.h" + +#include <zencore/filesystem.h> +#include <zencore/string.h> + +namespace zen { + +namespace html { + + constexpr std::string_view Index = R"( +<!DOCTYPE html> +<html> +<head> +<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" integrity="sha384-F3w7mX95PdgyTmZZMECAngseQB83DfGTowi0iMjiWaeVhAn4FJkqJByhZMI3AhiU" crossorigin="anonymous"> +<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.min.js" integrity="sha384-skAcpIdS7UcVUC05LJ9Dxay8AXcDYfBJqt1CJ85S/CFujBsIzCIv+l9liuYLaMQ/" crossorigin="anonymous"></script> +<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/font/bootstrap-icons.css"> +<style type="text/css"> +body { + background-color: #fafafa; +} +</style> +<script type="text/javascript"> + const getCacheStats = () => { + const opts = { headers: { "Accept": "application/json" } }; + fetch("/z$", opts) + .then(response => { + if (!response.ok) { + throw Error(response.statusText); + } + return response.json(); + }) + .then(json => { + document.getElementById("status").innerHTML = "connected" + document.getElementById("stats").innerHTML = JSON.stringify(json, null, 4); + }) + .catch(error => { + document.getElementById("status").innerHTML = "disconnected" + document.getElementById("stats").innerHTML = "" + console.log(error); + }) + .finally(() => { + window.setTimeout(getCacheStats, 1000); + }); + }; + getCacheStats(); +</script> +</head> +<body> + <div class="container"> + <div class="row"> + <div class="text-center mt-5"> + <pre> +__________ _________ __ +\____ / ____ ____ / _____/_/ |_ ____ _______ ____ + / / _/ __ \ / \ \_____ \ \ __\ / _ \ \_ __ \_/ __ \ + / /_ \ ___/ | | \ / \ | | ( <_> ) | | \/\ ___/ +/_______ \ \___ >|___| //_______ / |__| \____/ |__| \___ > + \/ \/ \/ \/ \/ + </pre> + <pre id="status"/> + </div> + </div> + <div class="row"> + <pre class="mb-0">Z$:</pre> + <pre id="stats"></pre> + <div> + </div> +</body> +</html> +)"; + +} // namespace html + +HttpFrontendService::HttpFrontendService(std::filesystem::path Directory) : m_Directory(Directory) +{ +} + +HttpFrontendService::~HttpFrontendService() +{ +} + +const char* +HttpFrontendService::BaseUri() const +{ + return "/dashboard"; // in order to use the root path we need to remove HttpAddUrlToUrlGroup in HttpSys.cpp +} + +void +HttpFrontendService::HandleRequest(zen::HttpServerRequest& Request) +{ + using namespace std::literals; + + if (m_Directory.empty()) + { + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kHTML, html::Index); + } + else + { + std::string_view Uri = Request.RelativeUri(); + std::filesystem::path RelPath{Uri.empty() ? "index.html" : Uri}; + std::filesystem::path AbsPath = m_Directory / RelPath; + + FileContents File = ReadFile(AbsPath); + + if (!File.ErrorCode) + { + // TODO: Map file extension to MIME type + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kHTML, File.Data[0]); + } + else + { + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Ooops!"sv); + } + } +} + +} // namespace zen diff --git a/zenserver/experimental/frontend.h b/zenserver/experimental/frontend.h new file mode 100644 index 000000000..2ae20e940 --- /dev/null +++ b/zenserver/experimental/frontend.h @@ -0,0 +1,24 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhttp/httpserver.h> + +#include <filesystem> + +namespace zen { + +class HttpFrontendService final : public zen::HttpService +{ +public: + HttpFrontendService(std::filesystem::path Directory); + virtual ~HttpFrontendService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; + +private: + std::filesystem::path m_Directory; +}; + +} // namespace zen diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 7870f9559..6b24692e1 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -1425,7 +1425,12 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver)) { - ZEN_ERROR("Received malformed package!"); + std::filesystem::path BadPackagePath = + Oplog.TempPath() / "bad_packages" / "session{}_request{}"_format(HttpReq.SessionId(), HttpReq.RequestId()); + + ZEN_ERROR("Received malformed package! Saving payload to '{}'", BadPackagePath); + + zen::WriteFile(BadPackagePath, Payload); return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); } diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index b93635e76..0397ddaa0 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -299,13 +299,16 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, return PutDerivedData(BucketId, Key.ToHexString(), DerivedData); } -CloudCacheResult +PutRefResult CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + PutRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; } IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); @@ -328,16 +331,102 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer if (Response.error) { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + PutRefResult Result; + Result.ErrorCode = static_cast<int32_t>(Response.error.code); + Result.Reason = std::move(Response.error.message); + return Result; } else if (!VerifyAccessToken(Response.status_code)) { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + PutRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; } - return {.Bytes = Response.uploaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + PutRefResult Result; + Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.Bytes = Response.uploaded_bytes; + Result.ElapsedSeconds = Response.elapsed; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + + return Result; +} + +FinalizeRefResult +CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + FinalizeRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" + << Key.ToHexString() << "/finalize/" << RefHash.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, + {"X-Jupiter-IoHash", RefHash.ToHexString()}, + {"Content-Type", "application/x-ue-cb"}}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + FinalizeRefResult Result; + Result.ErrorCode = static_cast<int32_t>(Response.error.code); + Result.Reason = std::move(Response.error.message); + return Result; + } + else if (!VerifyAccessToken(Response.status_code)) + { + FinalizeRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; + } + + FinalizeRefResult Result; + Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.Bytes = Response.uploaded_bytes; + Result.ElapsedSeconds = Response.elapsed; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + + return Result; } CloudCacheResult diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index d8844279e..1de417008 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/iohash.h> #include <zencore/logging.h> #include <zencore/refcount.h> #include <zencore/thread.h> @@ -46,13 +47,23 @@ struct CloudCacheAccessToken struct CloudCacheResult { IoBuffer Response; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - int32_t ErrorCode = {}; + int64_t Bytes{}; + double ElapsedSeconds{}; + int32_t ErrorCode{}; std::string Reason; bool Success = false; }; +struct PutRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + +struct FinalizeRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + /** * Context for performing Jupiter operations * @@ -76,11 +87,13 @@ public: CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData); CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData); - CloudCacheResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + PutRefResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); CloudCacheResult PutBlob(const IoHash& Key, IoBuffer Blob); CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob); CloudCacheResult PutObject(const IoHash& Key, IoBuffer Object); + FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); + CloudCacheResult DerivedDataExists(std::string_view BucketId, std::string_view Key); CloudCacheResult DerivedDataExists(std::string_view BucketId, const IoHash& Key); CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 0dd16cd06..b1966e299 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -9,6 +9,7 @@ #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> +#include <zencore/stats.h> #include <zencore/stream.h> #include <zencore/timer.h> @@ -45,6 +46,7 @@ namespace detail { { std::lock_guard Lock(m_Lock); m_Queue.emplace_back(std::move(Item)); + m_Size++; } m_NewItemSignal.notify_one(); @@ -64,6 +66,7 @@ namespace detail { { Item = std::move(m_Queue.front()); m_Queue.pop_front(); + m_Size--; return true; } @@ -80,7 +83,7 @@ namespace detail { } } - std::size_t Num() const + std::size_t Size() const { std::unique_lock Lock(m_Lock); return m_Queue.size(); @@ -91,12 +94,15 @@ namespace detail { std::condition_variable m_NewItemSignal; std::deque<T> m_Queue; std::atomic_bool m_CompleteAdding{false}; + std::atomic_uint32_t m_Size; }; class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: - JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc) + JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) + : m_Log(zen::logging::Get("upstream")) + , m_UseLegacyDdc(Options.UseLegacyDdc) { using namespace fmt::literals; m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); @@ -180,16 +186,23 @@ namespace detail { } } - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -200,16 +213,23 @@ namespace detail { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -277,16 +297,82 @@ namespace detail { Success = false; for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) { - if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); + if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + RecordValue, + ZenContentType::kCbObject); Result.Success) { TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; Success = true; - break; + + if (!Result.Needs.empty()) + { + for (const IoHash& NeededHash : Result.Needs) + { + Success = false; + + if (auto It = + std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash); + It != std::end(CacheRecord.PayloadIds)) + { + const size_t Idx = It - std::begin(CacheRecord.PayloadIds); + + if (CloudCacheResult BlobResult = + Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + BlobResult.Success) + { + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; + Success = true; + } + else + { + ZEN_WARN("upload missing payload '{}/{}/{}' FAILED", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + NeededHash); + } + } + else + { + ZEN_WARN("needed payload '{}/{}/{}' MISSING", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + NeededHash); + } + } + + const IoHash RefHash = IoHash::HashBuffer(RecordValue); + + if (FinalizeRefResult FinalizeResult = + Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + FinalizeResult.Success) + { + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + Success = true; + + for (const IoHash& MissingHash : FinalizeResult.Needs) + { + ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + MissingHash); + } + } + else + { + ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); + Success = false; + } + } + + if (Success) + { + break; + } } } @@ -302,6 +388,9 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; @@ -354,16 +443,23 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -375,16 +471,23 @@ namespace detail { const ZenCacheResult Result = Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -500,7 +603,7 @@ namespace detail { struct UpstreamStats { - static constexpr uint64_t MaxSampleCount = 100ull; + static constexpr uint64_t MaxSampleCount = 1000ull; UpstreamStats(bool Enabled) : m_Enabled(Enabled) {} @@ -509,13 +612,13 @@ struct UpstreamStats const GetUpstreamCacheResult& Result, const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) { - if (!m_Enabled) + UpstreamEndpointStats& Stats = Endpoint.Stats(); + + if (Result.Error) { - return; + Stats.ErrorCount++; } - - UpstreamEndpointStats& Stats = Endpoint.Stats(); - if (Result.Success) + else if (Result.Success) { Stats.HitCount++; Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); @@ -526,7 +629,7 @@ struct UpstreamStats Stats.MissCount++; } - if (m_SampleCount++ % MaxSampleCount) + if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } @@ -537,11 +640,6 @@ struct UpstreamStats const PutUpstreamCacheResult& Result, const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) { - if (!m_Enabled) - { - return; - } - UpstreamEndpointStats& Stats = Endpoint.Stats(); if (Result.Success) { @@ -549,8 +647,12 @@ struct UpstreamStats Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); } + else + { + Stats.ErrorCount++; + } - if (m_SampleCount++ % MaxSampleCount) + if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } @@ -575,13 +677,13 @@ struct UpstreamStats const uint64_t TotalCount = HitCount + MissCount; const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0; - Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", - Ep->DisplayName(), - HitRate, - DownBytes, - DownSpeed, - UpBytes, - UpSpeed); + Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", + Ep->DisplayName(), + HitRate, + DownBytes, + DownSpeed, + UpBytes, + UpSpeed); } } @@ -700,6 +802,36 @@ public: return {}; } + virtual void GetStatus(CbObjectWriter& Status) override + { + Status << "reading" << m_Options.ReadUpstream; + Status << "writing" << m_Options.WriteUpstream; + Status << "worker_threads" << m_Options.ThreadCount; + Status << "queue_count" << m_UpstreamQueue.Size(); + + Status.BeginArray("endpoints"); + for (const auto& Ep : m_Endpoints) + { + Status.BeginObject(); + Status << "name" << Ep->DisplayName(); + Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); + + UpstreamEndpointStats& Stats = Ep->Stats(); + const uint64_t HitCount = Stats.HitCount; + const uint64_t MissCount = Stats.MissCount; + const uint64_t TotalCount = HitCount + MissCount; + const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0; + + Status << "hit_ratio" << HitRate; + Status << "downloaded_mb" << Stats.DownBytes; + Status << "uploaded_mb" << Stats.UpBytes; + Status << "error_count" << Stats.ErrorCount; + + Status.EndObject(); + } + Status.EndArray(); + } + private: void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) { diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 0e736480b..08f379b11 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -12,6 +12,7 @@ namespace zen { +class CbObjectWriter; class CidStore; class ZenCacheStore; struct CloudCacheClientOptions; @@ -44,35 +45,29 @@ struct UpstreamCacheOptions bool StatsEnabled = false; }; -enum class UpstreamStatusCode : uint8_t -{ - Ok, - Error -}; - struct UpstreamError { - UpstreamStatusCode StatusCode = UpstreamStatusCode::Ok; - std::string Reason; + int32_t ErrorCode{}; + std::string Reason{}; - explicit operator bool() const { return StatusCode != UpstreamStatusCode::Ok; } + explicit operator bool() const { return ErrorCode != 0; } }; struct GetUpstreamCacheResult { IoBuffer Value; - UpstreamError Error; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - bool Success = false; + UpstreamError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + bool Success = false; }; struct PutUpstreamCacheResult { std::string Reason; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - bool Success = false; + int64_t Bytes{}; + double ElapsedSeconds{}; + bool Success = false; }; struct UpstreamEndpointHealth @@ -83,13 +78,14 @@ struct UpstreamEndpointHealth struct UpstreamEndpointStats { - std::atomic_uint64_t HitCount = {}; - std::atomic_uint64_t MissCount = {}; - std::atomic_uint64_t UpCount = {}; - std::atomic<double> UpBytes = {}; - std::atomic<double> DownBytes = {}; - std::atomic<double> SecondsUp = {}; - std::atomic<double> SecondsDown = {}; + std::atomic_uint64_t HitCount{}; + std::atomic_uint64_t MissCount{}; + std::atomic_uint64_t UpCount{}; + std::atomic_uint64_t ErrorCount{}; + std::atomic<double> UpBytes{}; + std::atomic<double> DownBytes{}; + std::atomic<double> SecondsUp{}; + std::atomic<double> SecondsDown{}; }; /** @@ -139,6 +135,8 @@ public: }; virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; + + virtual void GetStatus(CbObjectWriter& CbO) = 0; }; std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore); diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index b45df9fef..db1be9dea 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -81,6 +81,7 @@ #include "cache/structuredcachestore.h" #include "compute/apply.h" #include "diag/diagsvcs.h" +#include "experimental/frontend.h" #include "experimental/usnjournal.h" #include "projectstore.h" #include "testing/httptest.h" @@ -302,6 +303,12 @@ public: { m_Http->RegisterService(*m_HttpFunctionService); } + + m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot); + if (m_FrontendService) + { + m_Http->RegisterService(*m_FrontendService); + } } #if ZEN_ENABLE_MESH @@ -364,6 +371,7 @@ public: void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } void SetTestMode(bool State) { m_TestMode = State; } void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } + void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } void EnsureIoRunner() { @@ -447,6 +455,7 @@ private: bool m_IsDedicatedMode = false; bool m_TestMode = false; std::filesystem::path m_DataRoot; + std::filesystem::path m_ContentRoot; std::jthread m_IoRunner; asio::io_context m_IoContext; asio::steady_timer m_PidCheckTimer{m_IoContext}; @@ -471,6 +480,7 @@ private: zen::HttpHealthService m_HealthService; zen::Mesh m_ZenMesh{m_IoContext}; std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; + std::unique_ptr<zen::HttpFrontendService> m_FrontendService; bool m_DebugOptionForcedCrash = false; }; @@ -560,6 +570,7 @@ ZenWindowsService::Run() ZenServer Server; Server.SetDataRoot(GlobalOptions.DataDir); + Server.SetContentRoot(GlobalOptions.ContentDir); Server.SetTestMode(GlobalOptions.IsTest); Server.SetDedicatedMode(GlobalOptions.IsDedicated); Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry); diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index bcb7ea028..335786fbf 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -109,6 +109,7 @@ <ClInclude Include="compute\apply.h" /> <ClInclude Include="config.h" /> <ClInclude Include="diag\logging.h" /> + <ClInclude Include="experimental\frontend.h" /> <ClInclude Include="resource.h" /> <ClInclude Include="sos\sos.h" /> <ClInclude Include="testing\httptest.h" /> @@ -132,6 +133,7 @@ <ClCompile Include="compute\apply.cpp" /> <ClCompile Include="config.cpp" /> <ClCompile Include="diag\logging.cpp" /> + <ClCompile Include="experimental\frontend.cpp" /> <ClCompile Include="projectstore.cpp" /> <ClCompile Include="cache\cacheagent.cpp" /> <ClCompile Include="sos\sos.cpp" /> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index 6b99ca8d7..1c5b17fee 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -38,6 +38,9 @@ <ClInclude Include="testing\httptest.h" /> <ClInclude Include="windows\service.h" /> <ClInclude Include="resource.h" /> + <ClInclude Include="experimental\frontend.h"> + <Filter>experimental</Filter> + </ClInclude> </ItemGroup> <ItemGroup> <ClCompile Include="zenserver.cpp" /> @@ -70,6 +73,10 @@ </ClCompile> <ClCompile Include="testing\httptest.cpp" /> <ClCompile Include="windows\service.cpp" /> + <ClCompile Include="admin\admin.cpp" /> + <ClCompile Include="experimental\frontend.cpp"> + <Filter>experimental</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <Filter Include="cache"> |