diff options
| author | Stefan Boberg <[email protected]> | 2021-10-01 22:15:42 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-10-01 22:15:42 +0200 |
| commit | 2a6157b15508541cfd082e8544c78c8f94b18005 (patch) | |
| tree | 7f70bd046afe918c5433e753a68de72ed602532a | |
| parent | Added explicit mimalloc IoBuffer allocation path (diff) | |
| parent | zen: added print/printpackage subcommands to help in debugging or inspecting ... (diff) | |
| download | zen-2a6157b15508541cfd082e8544c78c8f94b18005.tar.xz zen-2a6157b15508541cfd082e8544c78c8f94b18005.zip | |
Merge branch 'main' of https://github.com/EpicGames/zen
29 files changed, 810 insertions, 140 deletions
diff --git a/zen/cmds/print.cpp b/zen/cmds/print.cpp new file mode 100644 index 000000000..aac6afd44 --- /dev/null +++ b/zen/cmds/print.cpp @@ -0,0 +1,107 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "print.h" + +#include <zencore/compactbinarypackage.h> +#include <zencore/filesystem.h> +#include <zencore/logging.h> +#include <zencore/string.h> + +using namespace std::literals; + +namespace zen { + +PrintCommand::PrintCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "s", "source", "Object payload file", cxxopts::value(m_Filename), "<file name>"); +} + +PrintCommand::~PrintCommand() = default; + +int +PrintCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions, argc, argv); + + m_Options.parse_positional({"source"}); + + auto result = m_Options.parse(argc, argv); + + if (result.count("help")) + { + std::cout << m_Options.help({"", "Group"}) << std::endl; + + return 0; + } + + // Validate arguments + + if (m_Filename.empty()) + throw std::runtime_error("No file specified"); + + zen::FileContents Fc = zen::ReadFile(m_Filename); + IoBuffer Data = Fc.Flatten(); + zen::CbObject Object{SharedBuffer(Data)}; + + zen::StringBuilder<1024> ObjStr; + zen::CompactBinaryToJson(Object, ObjStr); + zen::ConsoleLog().info("{}", ObjStr); + + return 0; +} + +////////////////////////////////////////////////////////////////////////// + +PrintPackageCommand::PrintPackageCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "s", "source", "Package payload file", cxxopts::value(m_Filename), "<file name>"); +} + +PrintPackageCommand::~PrintPackageCommand() +{ +} + +int +PrintPackageCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions, argc, argv); + + m_Options.parse_positional({"source"}); + + auto result = m_Options.parse(argc, argv); + + if (result.count("help")) + { + std::cout << m_Options.help({"", "Group"}) << std::endl; + + return 0; + } + + // Validate arguments + + if (m_Filename.empty()) + throw std::runtime_error("No file specified"); + + zen::FileContents Fc = zen::ReadFile(m_Filename); + IoBuffer Data = Fc.Flatten(); + zen::CbPackage Package; + + bool Ok = Package.TryLoad(Data) || zen::legacy::TryLoadCbPackage(Package, Data, &UniqueBuffer::Alloc); + + if (Ok) + { + zen::StringBuilder<1024> ObjStr; + zen::CompactBinaryToJson(Package.GetObject(), ObjStr); + zen::ConsoleLog().info("{}", ObjStr); + } + else + { + zen::ConsoleLog().error("error: malformed package?"); + } + + return 0; +} + +} // namespace zen diff --git a/zen/cmds/print.h b/zen/cmds/print.h new file mode 100644 index 000000000..eed0aa14e --- /dev/null +++ b/zen/cmds/print.h @@ -0,0 +1,41 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "../zen.h" + +namespace zen { + +/** Print Compact Binary + */ +class PrintCommand : public ZenCmdBase +{ +public: + PrintCommand(); + ~PrintCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options* Options() override { return &m_Options; } + +private: + cxxopts::Options m_Options{"print", "Print compact binary object"}; + std::string m_Filename; +}; + +/** Print Compact Binary Package + */ +class PrintPackageCommand : public ZenCmdBase +{ +public: + PrintPackageCommand(); + ~PrintPackageCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options* Options() override { return &m_Options; } + +private: + cxxopts::Options m_Options{"printpkg", "Print compact binary package"}; + std::string m_Filename; +}; + +} // namespace zen diff --git a/zen/cmds/run.cpp b/zen/cmds/run.cpp index 94eb7ef6d..19b5c8980 100644 --- a/zen/cmds/run.cpp +++ b/zen/cmds/run.cpp @@ -10,6 +10,7 @@ #include <zencore/fmtutils.h> #include <zencore/iohash.h> #include <zencore/logging.h> +#include <zencore/stream.h> #include <zencore/string.h> #include <zencore/timer.h> #include <zenutil/zenserverprocess.h> diff --git a/zen/zen.cpp b/zen/zen.cpp index 86c41d658..3c33ff5e0 100644 --- a/zen/zen.cpp +++ b/zen/zen.cpp @@ -9,6 +9,7 @@ #include "cmds/dedup.h" #include "cmds/deploy.h" #include "cmds/hash.h" +#include "cmds/print.h" #include "cmds/run.h" #include "cmds/status.h" #include "cmds/top.h" @@ -98,18 +99,20 @@ main(int argc, char** argv) auto _ = zen::MakeGuard([] { spdlog::shutdown(); }); - HashCommand HashCmd; - CopyCommand CopyCmd; - DedupCommand DedupCmd; - DeployCommand DeployCmd; - DropCommand DropCmd; - ChunkCommand ChunkCmd; - RunCommand RunCmd; - StatusCommand StatusCmd; - TopCommand TopCmd; - PsCommand PsCmd; - UpCommand UpCmd; - DownCommand DownCmd; + HashCommand HashCmd; + CopyCommand CopyCmd; + DedupCommand DedupCmd; + DeployCommand DeployCmd; + DropCommand DropCmd; + ChunkCommand ChunkCmd; + RunCommand RunCmd; + StatusCommand StatusCmd; + TopCommand TopCmd; + PrintCommand PrintCmd; + PrintPackageCommand PrintPkgCmd; + PsCommand PsCmd; + UpCommand UpCmd; + DownCommand DownCmd; #if ZEN_WITH_TESTS RunTestsCommand RunTestsCmd; @@ -128,6 +131,8 @@ main(int argc, char** argv) {"dedup", &DedupCmd, "Dedup files"}, {"drop", &DropCmd, "Drop cache bucket(s)"}, {"hash", &HashCmd, "Compute file hashes"}, + {"print", &PrintCmd, "Print compact binary object"}, + {"printpackage", &PrintPkgCmd, "Print compact binary package"}, {"run", &RunCmd, "Remote execution"}, {"status", &StatusCmd, "Show zen status"}, {"ps", &PsCmd, "Enumerate running zen server instances"}, diff --git a/zen/zen.vcxproj b/zen/zen.vcxproj index fb0674e87..f31c0bc17 100644 --- a/zen/zen.vcxproj +++ b/zen/zen.vcxproj @@ -99,6 +99,7 @@ <ClCompile Include="cmds\dedup.cpp" /> <ClCompile Include="cmds\deploy.cpp" /> <ClCompile Include="cmds\hash.cpp" /> + <ClCompile Include="cmds\print.cpp" /> <ClCompile Include="cmds\run.cpp" /> <ClCompile Include="cmds\scrub.cpp" /> <ClCompile Include="cmds\status.cpp" /> @@ -114,6 +115,7 @@ <ClInclude Include="cmds\dedup.h" /> <ClInclude Include="cmds\deploy.h" /> <ClInclude Include="cmds\hash.h" /> + <ClInclude Include="cmds\print.h" /> <ClInclude Include="cmds\run.h" /> <ClInclude Include="cmds\scrub.h" /> <ClInclude Include="cmds\status.h" /> diff --git a/zen/zen.vcxproj.filters b/zen/zen.vcxproj.filters index 9002f01c2..d983b413c 100644 --- a/zen/zen.vcxproj.filters +++ b/zen/zen.vcxproj.filters @@ -28,6 +28,7 @@ <ClCompile Include="cmds\up.cpp" /> <ClCompile Include="cmds\cache.cpp" /> <ClCompile Include="cmds\scrub.cpp" /> + <ClCompile Include="cmds\print.cpp" /> </ItemGroup> <ItemGroup> <ClInclude Include="chunk\chunk.h" /> @@ -57,6 +58,7 @@ <ClInclude Include="cmds\up.h" /> <ClInclude Include="cmds\cache.h" /> <ClInclude Include="cmds\scrub.h" /> + <ClInclude Include="cmds\print.h" /> </ItemGroup> <ItemGroup> <Filter Include="cmds"> diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp index a06c00e41..f6ba92f98 100644 --- a/zencore/filesystem.cpp +++ b/zencore/filesystem.cpp @@ -522,6 +522,23 @@ WriteFile(std::filesystem::path Path, IoBuffer Data) WriteFile(Path, &DataPtr, 1); } +IoBuffer +FileContents::Flatten() +{ + if (Data.size() == 1) + { + return Data[0]; + } + else if (Data.empty()) + { + return {}; + } + else + { + ZEN_NOT_IMPLEMENTED(); + } +} + FileContents ReadFile(std::filesystem::path Path) { @@ -815,7 +832,7 @@ TEST_CASE("filesystem") using namespace std::filesystem; // GetExePath - path BinPath = GetRunningExecutablePath(); + path BinPath = GetRunningExecutablePath(); const bool ExpectedExe = BinPath.stem() == "zencore-test" || BinPath.stem() == "zenserver-test"; CHECK(ExpectedExe); CHECK(is_regular_file(BinPath)); diff --git a/zencore/include/zencore/filesystem.h b/zencore/include/zencore/filesystem.h index 6678528f6..c7ac7140d 100644 --- a/zencore/include/zencore/filesystem.h +++ b/zencore/include/zencore/filesystem.h @@ -2,9 +2,10 @@ #pragma once -#include "stream.h" #include "zencore.h" +#include <zencore/iobuffer.h> + #include <filesystem> #include <functional> @@ -36,6 +37,8 @@ struct FileContents { std::vector<IoBuffer> Data; std::error_code ErrorCode; + + IoBuffer Flatten(); }; ZENCORE_API FileContents ReadFile(std::filesystem::path Path); diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h index ed52184d2..263cf672d 100644 --- a/zencore/include/zencore/iobuffer.h +++ b/zencore/include/zencore/iobuffer.h @@ -25,6 +25,7 @@ enum class ZenContentType : uint8_t kCbPackageOffer = 6, kCompressedBinary = 7, kUnknownContentType = 8, + kHTML = 9, kCOUNT }; @@ -54,6 +55,8 @@ ToString(ZenContentType ContentType) return "compressed-binary"sv; case ZenContentType::kYAML: return "yaml"sv; + case ZenContentType::kHTML: + return "html"sv; } } diff --git a/zencore/include/zencore/refcount.h b/zencore/include/zencore/refcount.h index 0a1e15614..320718f5b 100644 --- a/zencore/include/zencore/refcount.h +++ b/zencore/include/zencore/refcount.h @@ -17,7 +17,7 @@ namespace zen { class RefCounted { public: - RefCounted() = default; + RefCounted() = default; virtual ~RefCounted() = default; inline uint32_t AddRef() const { return AtomicIncrement(const_cast<RefCounted*>(this)->m_RefCount); } diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp index 8e5d61877..cfd1463ba 100644 --- a/zenhttp/httpserver.cpp +++ b/zenhttp/httpserver.cpp @@ -58,6 +58,9 @@ MapContentTypeToString(HttpContentType ContentType) case HttpContentType::kYAML: return "text/yaml"sv; + + case HttpContentType::kHTML: + return "text/html"sv; } } diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 4c89b995a..8ab0276c5 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -405,6 +405,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (!Success) { ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } @@ -449,6 +450,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request ValidCount, AttachmentCount); + m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); } } @@ -467,6 +469,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + m_CacheStats.HitCount++; Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response); } else @@ -478,6 +481,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request ToString(Value.Value.GetContentType()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); + m_CacheStats.HitCount++; + if (InUpstreamCache) + { + m_CacheStats.UpstreamHitCount++; + } + Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); } } @@ -667,11 +676,11 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request case kHead: case kGet: { - HandleGetCachePayload(Request, Ref, Policy); if (Verb == kHead) { Request.SetSuppressResponseBody(); } + HandleGetCachePayload(Request, Ref, Policy); } break; case kPut: @@ -712,7 +721,8 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques if (!Payload) { - ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); + ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType())); + m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } @@ -724,6 +734,12 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques ToString(Payload.GetContentType()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); + m_CacheStats.HitCount++; + if (InUpstreamCache) + { + m_CacheStats.UpstreamHitCount++; + } + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); } @@ -846,6 +862,23 @@ HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) EmitSnapshot("requests", m_HttpRequests, Cbo); + const uint64_t HitCount = m_CacheStats.HitCount; + const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; + const uint64_t MissCount = m_CacheStats.MissCount; + const uint64_t TotalCount = HitCount + MissCount; + + Cbo.BeginObject("cache"); + Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount) * 100.0) : 0.0); + Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) * 100.0 : 0.0); + Cbo.EndObject(); + + if (m_UpstreamCache) + { + Cbo.BeginObject("upstream"); + m_UpstreamCache->GetStatus(Cbo); + Cbo.EndObject(); + } + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 47fc173e9..a360878bd 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -71,6 +71,13 @@ private: IoHash PayloadId; }; + struct CacheStats + { + std::atomic_uint64_t HitCount{}; + std::atomic_uint64_t UpstreamHitCount{}; + std::atomic_uint64_t MissCount{}; + }; + [[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef); void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); @@ -89,6 +96,7 @@ private: std::unique_ptr<UpstreamCache> m_UpstreamCache; uint64_t m_LastScrubTime = 0; metrics::OperationTiming m_HttpRequests; + CacheStats m_CacheStats; }; } // namespace zen diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 5e93ebaa9..b97f0830f 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -116,6 +116,13 @@ ZenCacheStore::Scrub(ScrubContext& Ctx) m_DiskLayer.Scrub(Ctx); m_MemLayer.Scrub(Ctx); } + +void +ZenCacheStore::GarbageCollect(GcContext& GcCtx) +{ + ZEN_UNUSED(GcCtx); +} + ////////////////////////////////////////////////////////////////////////// ZenCacheMemoryLayer::ZenCacheMemoryLayer() @@ -195,6 +202,12 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) } void +ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx) +{ + ZEN_UNUSED(GcCtx); +} + +void ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) { std::vector<IoHash> BadHashes; @@ -294,6 +307,7 @@ struct ZenCacheDiskLayer::CacheBucket void Drop(); void Flush(); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); inline bool IsOk() const { return m_Ok; } @@ -611,6 +625,12 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) } void +ZenCacheDiskLayer::CacheBucket::GarbageCollect(GcContext& GcCtx) +{ + ZEN_UNUSED(GcCtx); +} + +void ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value) { WideStringBuilder<128> DataFilePath; @@ -830,27 +850,10 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) } } -////////////////////////////////////////////////////////////////////////// - -ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore) -{ - ZEN_UNUSED(CacheStore); -} - -ZenCacheTracker::~ZenCacheTracker() -{ -} - -void -ZenCacheTracker::TrackAccess(std::string_view Bucket, const IoHash& HashKey) -{ - ZEN_UNUSED(Bucket); - ZEN_UNUSED(HashKey); -} - void -ZenCacheTracker::Flush() +ZenCacheDiskLayer::GarbageCollect(GcContext& GcCtx) { + ZEN_UNUSED(GcCtx); } } // namespace zen diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index f96757409..011f13323 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -56,6 +56,7 @@ public: void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); private: struct CacheBucket @@ -83,6 +84,7 @@ public: bool DropBucket(std::string_view Bucket); void Flush(); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); private: /** A cache bucket manages a single directory containing @@ -107,6 +109,7 @@ public: bool DropBucket(std::string_view Bucket); void Flush(); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); private: std::filesystem::path m_RootDir; @@ -116,18 +119,4 @@ private: uint64_t m_LastScrubTime = 0; }; -/** Tracks cache entry access, stats and orchestrates cleanup activities - */ -class ZenCacheTracker -{ -public: - ZenCacheTracker(ZenCacheStore& CacheStore); - ~ZenCacheTracker(); - - void TrackAccess(std::string_view Bucket, const IoHash& HashKey); - void Flush(); - -private: -}; - } // namespace zen diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 42f59b26c..759534d58 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -90,6 +90,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(GlobalOptions.IsTest)->default_value("false")); options.add_options()("log-id", "Specify id for adding context to log output", cxxopts::value<std::string>(GlobalOptions.LogId)); options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::filesystem::path>(GlobalOptions.DataDir)); + options.add_options()("content-dir", "Frontend content directory", cxxopts::value<std::filesystem::path>(GlobalOptions.ContentDir)); options .add_option("lifetime", "", "owner-pid", "Specify owning process id", cxxopts::value<int>(GlobalOptions.OwnerPid), "<identifier>"); diff --git a/zenserver/config.h b/zenserver/config.h index 75c19d690..af1a24455 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -17,6 +17,7 @@ struct ZenServerOptions bool UninstallService = false; // Flag used to initiate service uninstall (temporary) std::string LogId; // Id for tagging log output std::filesystem::path DataDir; // Root directory for state (used for testing) + std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) }; struct ZenUpstreamJupiterConfig diff --git a/zenserver/experimental/frontend.cpp b/zenserver/experimental/frontend.cpp new file mode 100644 index 000000000..79fcf0a17 --- /dev/null +++ b/zenserver/experimental/frontend.cpp @@ -0,0 +1,119 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "frontend.h" + +#include <zencore/filesystem.h> +#include <zencore/string.h> + +namespace zen { + +namespace html { + + constexpr std::string_view Index = R"( +<!DOCTYPE html> +<html> +<head> +<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" integrity="sha384-F3w7mX95PdgyTmZZMECAngseQB83DfGTowi0iMjiWaeVhAn4FJkqJByhZMI3AhiU" crossorigin="anonymous"> +<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.min.js" integrity="sha384-skAcpIdS7UcVUC05LJ9Dxay8AXcDYfBJqt1CJ85S/CFujBsIzCIv+l9liuYLaMQ/" crossorigin="anonymous"></script> +<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/font/bootstrap-icons.css"> +<style type="text/css"> +body { + background-color: #fafafa; +} +</style> +<script type="text/javascript"> + const getCacheStats = () => { + const opts = { headers: { "Accept": "application/json" } }; + fetch("/z$", opts) + .then(response => { + if (!response.ok) { + throw Error(response.statusText); + } + return response.json(); + }) + .then(json => { + document.getElementById("status").innerHTML = "connected" + document.getElementById("stats").innerHTML = JSON.stringify(json, null, 4); + }) + .catch(error => { + document.getElementById("status").innerHTML = "disconnected" + document.getElementById("stats").innerHTML = "" + console.log(error); + }) + .finally(() => { + window.setTimeout(getCacheStats, 1000); + }); + }; + getCacheStats(); +</script> +</head> +<body> + <div class="container"> + <div class="row"> + <div class="text-center mt-5"> + <pre> +__________ _________ __ +\____ / ____ ____ / _____/_/ |_ ____ _______ ____ + / / _/ __ \ / \ \_____ \ \ __\ / _ \ \_ __ \_/ __ \ + / /_ \ ___/ | | \ / \ | | ( <_> ) | | \/\ ___/ +/_______ \ \___ >|___| //_______ / |__| \____/ |__| \___ > + \/ \/ \/ \/ \/ + </pre> + <pre id="status"/> + </div> + </div> + <div class="row"> + <pre class="mb-0">Z$:</pre> + <pre id="stats"></pre> + <div> + </div> +</body> +</html> +)"; + +} // namespace html + +HttpFrontendService::HttpFrontendService(std::filesystem::path Directory) : m_Directory(Directory) +{ +} + +HttpFrontendService::~HttpFrontendService() +{ +} + +const char* +HttpFrontendService::BaseUri() const +{ + return "/dashboard"; // in order to use the root path we need to remove HttpAddUrlToUrlGroup in HttpSys.cpp +} + +void +HttpFrontendService::HandleRequest(zen::HttpServerRequest& Request) +{ + using namespace std::literals; + + if (m_Directory.empty()) + { + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kHTML, html::Index); + } + else + { + std::string_view Uri = Request.RelativeUri(); + std::filesystem::path RelPath{Uri.empty() ? "index.html" : Uri}; + std::filesystem::path AbsPath = m_Directory / RelPath; + + FileContents File = ReadFile(AbsPath); + + if (!File.ErrorCode) + { + // TODO: Map file extension to MIME type + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kHTML, File.Data[0]); + } + else + { + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Ooops!"sv); + } + } +} + +} // namespace zen diff --git a/zenserver/experimental/frontend.h b/zenserver/experimental/frontend.h new file mode 100644 index 000000000..2ae20e940 --- /dev/null +++ b/zenserver/experimental/frontend.h @@ -0,0 +1,24 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhttp/httpserver.h> + +#include <filesystem> + +namespace zen { + +class HttpFrontendService final : public zen::HttpService +{ +public: + HttpFrontendService(std::filesystem::path Directory); + virtual ~HttpFrontendService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; + +private: + std::filesystem::path m_Directory; +}; + +} // namespace zen diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 7870f9559..6b24692e1 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -1425,7 +1425,12 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver)) { - ZEN_ERROR("Received malformed package!"); + std::filesystem::path BadPackagePath = + Oplog.TempPath() / "bad_packages" / "session{}_request{}"_format(HttpReq.SessionId(), HttpReq.RequestId()); + + ZEN_ERROR("Received malformed package! Saving payload to '{}'", BadPackagePath); + + zen::WriteFile(BadPackagePath, Payload); return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); } diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index b93635e76..0397ddaa0 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -299,13 +299,16 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, return PutDerivedData(BucketId, Key.ToHexString(), DerivedData); } -CloudCacheResult +PutRefResult CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + PutRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; } IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); @@ -328,16 +331,102 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer if (Response.error) { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + PutRefResult Result; + Result.ErrorCode = static_cast<int32_t>(Response.error.code); + Result.Reason = std::move(Response.error.message); + return Result; } else if (!VerifyAccessToken(Response.status_code)) { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + PutRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; } - return {.Bytes = Response.uploaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + PutRefResult Result; + Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.Bytes = Response.uploaded_bytes; + Result.ElapsedSeconds = Response.elapsed; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + + return Result; +} + +FinalizeRefResult +CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + FinalizeRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" + << Key.ToHexString() << "/finalize/" << RefHash.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, + {"X-Jupiter-IoHash", RefHash.ToHexString()}, + {"Content-Type", "application/x-ue-cb"}}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + FinalizeRefResult Result; + Result.ErrorCode = static_cast<int32_t>(Response.error.code); + Result.Reason = std::move(Response.error.message); + return Result; + } + else if (!VerifyAccessToken(Response.status_code)) + { + FinalizeRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; + } + + FinalizeRefResult Result; + Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.Bytes = Response.uploaded_bytes; + Result.ElapsedSeconds = Response.elapsed; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + + return Result; } CloudCacheResult diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index d8844279e..1de417008 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/iohash.h> #include <zencore/logging.h> #include <zencore/refcount.h> #include <zencore/thread.h> @@ -46,13 +47,23 @@ struct CloudCacheAccessToken struct CloudCacheResult { IoBuffer Response; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - int32_t ErrorCode = {}; + int64_t Bytes{}; + double ElapsedSeconds{}; + int32_t ErrorCode{}; std::string Reason; bool Success = false; }; +struct PutRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + +struct FinalizeRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + /** * Context for performing Jupiter operations * @@ -76,11 +87,13 @@ public: CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData); CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData); - CloudCacheResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + PutRefResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); CloudCacheResult PutBlob(const IoHash& Key, IoBuffer Blob); CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob); CloudCacheResult PutObject(const IoHash& Key, IoBuffer Object); + FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); + CloudCacheResult DerivedDataExists(std::string_view BucketId, std::string_view Key); CloudCacheResult DerivedDataExists(std::string_view BucketId, const IoHash& Key); CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 0dd16cd06..b1966e299 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -9,6 +9,7 @@ #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> +#include <zencore/stats.h> #include <zencore/stream.h> #include <zencore/timer.h> @@ -45,6 +46,7 @@ namespace detail { { std::lock_guard Lock(m_Lock); m_Queue.emplace_back(std::move(Item)); + m_Size++; } m_NewItemSignal.notify_one(); @@ -64,6 +66,7 @@ namespace detail { { Item = std::move(m_Queue.front()); m_Queue.pop_front(); + m_Size--; return true; } @@ -80,7 +83,7 @@ namespace detail { } } - std::size_t Num() const + std::size_t Size() const { std::unique_lock Lock(m_Lock); return m_Queue.size(); @@ -91,12 +94,15 @@ namespace detail { std::condition_variable m_NewItemSignal; std::deque<T> m_Queue; std::atomic_bool m_CompleteAdding{false}; + std::atomic_uint32_t m_Size; }; class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: - JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc) + JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) + : m_Log(zen::logging::Get("upstream")) + , m_UseLegacyDdc(Options.UseLegacyDdc) { using namespace fmt::literals; m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); @@ -180,16 +186,23 @@ namespace detail { } } - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -200,16 +213,23 @@ namespace detail { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -277,16 +297,82 @@ namespace detail { Success = false; for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) { - if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); + if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + RecordValue, + ZenContentType::kCbObject); Result.Success) { TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; Success = true; - break; + + if (!Result.Needs.empty()) + { + for (const IoHash& NeededHash : Result.Needs) + { + Success = false; + + if (auto It = + std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash); + It != std::end(CacheRecord.PayloadIds)) + { + const size_t Idx = It - std::begin(CacheRecord.PayloadIds); + + if (CloudCacheResult BlobResult = + Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + BlobResult.Success) + { + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; + Success = true; + } + else + { + ZEN_WARN("upload missing payload '{}/{}/{}' FAILED", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + NeededHash); + } + } + else + { + ZEN_WARN("needed payload '{}/{}/{}' MISSING", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + NeededHash); + } + } + + const IoHash RefHash = IoHash::HashBuffer(RecordValue); + + if (FinalizeRefResult FinalizeResult = + Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + FinalizeResult.Success) + { + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + Success = true; + + for (const IoHash& MissingHash : FinalizeResult.Needs) + { + ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + MissingHash); + } + } + else + { + ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); + Success = false; + } + } + + if (Success) + { + break; + } } } @@ -302,6 +388,9 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; @@ -354,16 +443,23 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -375,16 +471,23 @@ namespace detail { const ZenCacheResult Result = Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -500,7 +603,7 @@ namespace detail { struct UpstreamStats { - static constexpr uint64_t MaxSampleCount = 100ull; + static constexpr uint64_t MaxSampleCount = 1000ull; UpstreamStats(bool Enabled) : m_Enabled(Enabled) {} @@ -509,13 +612,13 @@ struct UpstreamStats const GetUpstreamCacheResult& Result, const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) { - if (!m_Enabled) + UpstreamEndpointStats& Stats = Endpoint.Stats(); + + if (Result.Error) { - return; + Stats.ErrorCount++; } - - UpstreamEndpointStats& Stats = Endpoint.Stats(); - if (Result.Success) + else if (Result.Success) { Stats.HitCount++; Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); @@ -526,7 +629,7 @@ struct UpstreamStats Stats.MissCount++; } - if (m_SampleCount++ % MaxSampleCount) + if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } @@ -537,11 +640,6 @@ struct UpstreamStats const PutUpstreamCacheResult& Result, const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) { - if (!m_Enabled) - { - return; - } - UpstreamEndpointStats& Stats = Endpoint.Stats(); if (Result.Success) { @@ -549,8 +647,12 @@ struct UpstreamStats Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); } + else + { + Stats.ErrorCount++; + } - if (m_SampleCount++ % MaxSampleCount) + if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } @@ -575,13 +677,13 @@ struct UpstreamStats const uint64_t TotalCount = HitCount + MissCount; const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0; - Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", - Ep->DisplayName(), - HitRate, - DownBytes, - DownSpeed, - UpBytes, - UpSpeed); + Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", + Ep->DisplayName(), + HitRate, + DownBytes, + DownSpeed, + UpBytes, + UpSpeed); } } @@ -700,6 +802,36 @@ public: return {}; } + virtual void GetStatus(CbObjectWriter& Status) override + { + Status << "reading" << m_Options.ReadUpstream; + Status << "writing" << m_Options.WriteUpstream; + Status << "worker_threads" << m_Options.ThreadCount; + Status << "queue_count" << m_UpstreamQueue.Size(); + + Status.BeginArray("endpoints"); + for (const auto& Ep : m_Endpoints) + { + Status.BeginObject(); + Status << "name" << Ep->DisplayName(); + Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); + + UpstreamEndpointStats& Stats = Ep->Stats(); + const uint64_t HitCount = Stats.HitCount; + const uint64_t MissCount = Stats.MissCount; + const uint64_t TotalCount = HitCount + MissCount; + const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0; + + Status << "hit_ratio" << HitRate; + Status << "downloaded_mb" << Stats.DownBytes; + Status << "uploaded_mb" << Stats.UpBytes; + Status << "error_count" << Stats.ErrorCount; + + Status.EndObject(); + } + Status.EndArray(); + } + private: void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) { diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 0e736480b..08f379b11 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -12,6 +12,7 @@ namespace zen { +class CbObjectWriter; class CidStore; class ZenCacheStore; struct CloudCacheClientOptions; @@ -44,35 +45,29 @@ struct UpstreamCacheOptions bool StatsEnabled = false; }; -enum class UpstreamStatusCode : uint8_t -{ - Ok, - Error -}; - struct UpstreamError { - UpstreamStatusCode StatusCode = UpstreamStatusCode::Ok; - std::string Reason; + int32_t ErrorCode{}; + std::string Reason{}; - explicit operator bool() const { return StatusCode != UpstreamStatusCode::Ok; } + explicit operator bool() const { return ErrorCode != 0; } }; struct GetUpstreamCacheResult { IoBuffer Value; - UpstreamError Error; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - bool Success = false; + UpstreamError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + bool Success = false; }; struct PutUpstreamCacheResult { std::string Reason; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - bool Success = false; + int64_t Bytes{}; + double ElapsedSeconds{}; + bool Success = false; }; struct UpstreamEndpointHealth @@ -83,13 +78,14 @@ struct UpstreamEndpointHealth struct UpstreamEndpointStats { - std::atomic_uint64_t HitCount = {}; - std::atomic_uint64_t MissCount = {}; - std::atomic_uint64_t UpCount = {}; - std::atomic<double> UpBytes = {}; - std::atomic<double> DownBytes = {}; - std::atomic<double> SecondsUp = {}; - std::atomic<double> SecondsDown = {}; + std::atomic_uint64_t HitCount{}; + std::atomic_uint64_t MissCount{}; + std::atomic_uint64_t UpCount{}; + std::atomic_uint64_t ErrorCount{}; + std::atomic<double> UpBytes{}; + std::atomic<double> DownBytes{}; + std::atomic<double> SecondsUp{}; + std::atomic<double> SecondsDown{}; }; /** @@ -139,6 +135,8 @@ public: }; virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; + + virtual void GetStatus(CbObjectWriter& CbO) = 0; }; std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore); diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index b45df9fef..db1be9dea 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -81,6 +81,7 @@ #include "cache/structuredcachestore.h" #include "compute/apply.h" #include "diag/diagsvcs.h" +#include "experimental/frontend.h" #include "experimental/usnjournal.h" #include "projectstore.h" #include "testing/httptest.h" @@ -302,6 +303,12 @@ public: { m_Http->RegisterService(*m_HttpFunctionService); } + + m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot); + if (m_FrontendService) + { + m_Http->RegisterService(*m_FrontendService); + } } #if ZEN_ENABLE_MESH @@ -364,6 +371,7 @@ public: void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } void SetTestMode(bool State) { m_TestMode = State; } void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } + void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } void EnsureIoRunner() { @@ -447,6 +455,7 @@ private: bool m_IsDedicatedMode = false; bool m_TestMode = false; std::filesystem::path m_DataRoot; + std::filesystem::path m_ContentRoot; std::jthread m_IoRunner; asio::io_context m_IoContext; asio::steady_timer m_PidCheckTimer{m_IoContext}; @@ -471,6 +480,7 @@ private: zen::HttpHealthService m_HealthService; zen::Mesh m_ZenMesh{m_IoContext}; std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; + std::unique_ptr<zen::HttpFrontendService> m_FrontendService; bool m_DebugOptionForcedCrash = false; }; @@ -560,6 +570,7 @@ ZenWindowsService::Run() ZenServer Server; Server.SetDataRoot(GlobalOptions.DataDir); + Server.SetContentRoot(GlobalOptions.ContentDir); Server.SetTestMode(GlobalOptions.IsTest); Server.SetDedicatedMode(GlobalOptions.IsDedicated); Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry); diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index bcb7ea028..335786fbf 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -109,6 +109,7 @@ <ClInclude Include="compute\apply.h" /> <ClInclude Include="config.h" /> <ClInclude Include="diag\logging.h" /> + <ClInclude Include="experimental\frontend.h" /> <ClInclude Include="resource.h" /> <ClInclude Include="sos\sos.h" /> <ClInclude Include="testing\httptest.h" /> @@ -132,6 +133,7 @@ <ClCompile Include="compute\apply.cpp" /> <ClCompile Include="config.cpp" /> <ClCompile Include="diag\logging.cpp" /> + <ClCompile Include="experimental\frontend.cpp" /> <ClCompile Include="projectstore.cpp" /> <ClCompile Include="cache\cacheagent.cpp" /> <ClCompile Include="sos\sos.cpp" /> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index 6b99ca8d7..1c5b17fee 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -38,6 +38,9 @@ <ClInclude Include="testing\httptest.h" /> <ClInclude Include="windows\service.h" /> <ClInclude Include="resource.h" /> + <ClInclude Include="experimental\frontend.h"> + <Filter>experimental</Filter> + </ClInclude> </ItemGroup> <ItemGroup> <ClCompile Include="zenserver.cpp" /> @@ -70,6 +73,10 @@ </ClCompile> <ClCompile Include="testing\httptest.cpp" /> <ClCompile Include="windows\service.cpp" /> + <ClCompile Include="admin\admin.cpp" /> + <ClCompile Include="experimental\frontend.cpp"> + <Filter>experimental</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <Filter Include="cache"> diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp index 1db2b50bf..808fc8fb3 100644 --- a/zenstore/CAS.cpp +++ b/zenstore/CAS.cpp @@ -32,6 +32,15 @@ CasChunkSet::AddChunkToSet(const IoHash& HashToAdd) } void +CasChunkSet::AddChunksToSet(std::span<const IoHash> HashesToAdd) +{ + for (const IoHash& Hash : HashesToAdd) + { + m_ChunkSet.insert(Hash); + } +} + +void CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate) { for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;) @@ -58,6 +67,34 @@ CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callba ////////////////////////////////////////////////////////////////////////// +struct GcContext::GcState +{ + CasChunkSet m_CasChunks; + CasChunkSet m_CidChunks; +}; + +GcContext::GcContext() : m_State(std::make_unique<GcState>()) +{ +} + +GcContext::~GcContext() +{ +} + +void +GcContext::ContributeCids(std::span<const IoHash> Cids) +{ + m_State->m_CidChunks.AddChunksToSet(Cids); +} + +void +GcContext::ContributeCas(std::span<const IoHash> Cas) +{ + m_State->m_CasChunks.AddChunksToSet(Cas); +} + +////////////////////////////////////////////////////////////////////////// + void ScrubContext::ReportBadChunks(std::span<IoHash> BadChunks) { diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h index 93454ca6f..1425845a0 100644 --- a/zenstore/include/zenstore/CAS.h +++ b/zenstore/include/zenstore/CAS.h @@ -37,7 +37,16 @@ struct CasStoreConfiguration class GcContext { public: + GcContext(); + ~GcContext(); + + void ContributeCids(std::span<const IoHash> Cid); + void ContributeCas(std::span<const IoHash> Hash); + private: + struct GcState; + + std::unique_ptr<GcState> m_State; }; /** Context object for data scrubbing @@ -58,10 +67,14 @@ private: bool m_Recover = true; }; +/** Manage a set of IoHash values + */ + class CasChunkSet { public: void AddChunkToSet(const IoHash& HashToAdd); + void AddChunksToSet(std::span<const IoHash> HashesToAdd); void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate); void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback); inline [[nodiscard]] bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); } @@ -69,6 +82,7 @@ public: inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); } private: + // Q: should we protect this with a lock, or is that a higher level concern? std::unordered_set<IoHash> m_ChunkSet; }; |