aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-10-01 22:15:42 +0200
committerStefan Boberg <[email protected]>2021-10-01 22:15:42 +0200
commit2a6157b15508541cfd082e8544c78c8f94b18005 (patch)
tree7f70bd046afe918c5433e753a68de72ed602532a
parentAdded explicit mimalloc IoBuffer allocation path (diff)
parentzen: added print/printpackage subcommands to help in debugging or inspecting ... (diff)
downloadzen-2a6157b15508541cfd082e8544c78c8f94b18005.tar.xz
zen-2a6157b15508541cfd082e8544c78c8f94b18005.zip
Merge branch 'main' of https://github.com/EpicGames/zen
-rw-r--r--zen/cmds/print.cpp107
-rw-r--r--zen/cmds/print.h41
-rw-r--r--zen/cmds/run.cpp1
-rw-r--r--zen/zen.cpp29
-rw-r--r--zen/zen.vcxproj2
-rw-r--r--zen/zen.vcxproj.filters2
-rw-r--r--zencore/filesystem.cpp19
-rw-r--r--zencore/include/zencore/filesystem.h5
-rw-r--r--zencore/include/zencore/iobuffer.h3
-rw-r--r--zencore/include/zencore/refcount.h2
-rw-r--r--zenhttp/httpserver.cpp3
-rw-r--r--zenserver/cache/structuredcache.cpp37
-rw-r--r--zenserver/cache/structuredcache.h8
-rw-r--r--zenserver/cache/structuredcachestore.cpp41
-rw-r--r--zenserver/cache/structuredcachestore.h17
-rw-r--r--zenserver/config.cpp1
-rw-r--r--zenserver/config.h1
-rw-r--r--zenserver/experimental/frontend.cpp119
-rw-r--r--zenserver/experimental/frontend.h24
-rw-r--r--zenserver/projectstore.cpp7
-rw-r--r--zenserver/upstream/jupiter.cpp103
-rw-r--r--zenserver/upstream/jupiter.h21
-rw-r--r--zenserver/upstream/upstreamcache.cpp242
-rw-r--r--zenserver/upstream/upstreamcache.h44
-rw-r--r--zenserver/zenserver.cpp11
-rw-r--r--zenserver/zenserver.vcxproj2
-rw-r--r--zenserver/zenserver.vcxproj.filters7
-rw-r--r--zenstore/CAS.cpp37
-rw-r--r--zenstore/include/zenstore/CAS.h14
29 files changed, 810 insertions, 140 deletions
diff --git a/zen/cmds/print.cpp b/zen/cmds/print.cpp
new file mode 100644
index 000000000..aac6afd44
--- /dev/null
+++ b/zen/cmds/print.cpp
@@ -0,0 +1,107 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "print.h"
+
+#include <zencore/compactbinarypackage.h>
+#include <zencore/filesystem.h>
+#include <zencore/logging.h>
+#include <zencore/string.h>
+
+using namespace std::literals;
+
+namespace zen {
+
+PrintCommand::PrintCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "s", "source", "Object payload file", cxxopts::value(m_Filename), "<file name>");
+}
+
+PrintCommand::~PrintCommand() = default;
+
+int
+PrintCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ ZEN_UNUSED(GlobalOptions, argc, argv);
+
+ m_Options.parse_positional({"source"});
+
+ auto result = m_Options.parse(argc, argv);
+
+ if (result.count("help"))
+ {
+ std::cout << m_Options.help({"", "Group"}) << std::endl;
+
+ return 0;
+ }
+
+ // Validate arguments
+
+ if (m_Filename.empty())
+ throw std::runtime_error("No file specified");
+
+ zen::FileContents Fc = zen::ReadFile(m_Filename);
+ IoBuffer Data = Fc.Flatten();
+ zen::CbObject Object{SharedBuffer(Data)};
+
+ zen::StringBuilder<1024> ObjStr;
+ zen::CompactBinaryToJson(Object, ObjStr);
+ zen::ConsoleLog().info("{}", ObjStr);
+
+ return 0;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+PrintPackageCommand::PrintPackageCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "s", "source", "Package payload file", cxxopts::value(m_Filename), "<file name>");
+}
+
+PrintPackageCommand::~PrintPackageCommand()
+{
+}
+
+int
+PrintPackageCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ ZEN_UNUSED(GlobalOptions, argc, argv);
+
+ m_Options.parse_positional({"source"});
+
+ auto result = m_Options.parse(argc, argv);
+
+ if (result.count("help"))
+ {
+ std::cout << m_Options.help({"", "Group"}) << std::endl;
+
+ return 0;
+ }
+
+ // Validate arguments
+
+ if (m_Filename.empty())
+ throw std::runtime_error("No file specified");
+
+ zen::FileContents Fc = zen::ReadFile(m_Filename);
+ IoBuffer Data = Fc.Flatten();
+ zen::CbPackage Package;
+
+ bool Ok = Package.TryLoad(Data) || zen::legacy::TryLoadCbPackage(Package, Data, &UniqueBuffer::Alloc);
+
+ if (Ok)
+ {
+ zen::StringBuilder<1024> ObjStr;
+ zen::CompactBinaryToJson(Package.GetObject(), ObjStr);
+ zen::ConsoleLog().info("{}", ObjStr);
+ }
+ else
+ {
+ zen::ConsoleLog().error("error: malformed package?");
+ }
+
+ return 0;
+}
+
+} // namespace zen
diff --git a/zen/cmds/print.h b/zen/cmds/print.h
new file mode 100644
index 000000000..eed0aa14e
--- /dev/null
+++ b/zen/cmds/print.h
@@ -0,0 +1,41 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "../zen.h"
+
+namespace zen {
+
+/** Print Compact Binary
+ */
+class PrintCommand : public ZenCmdBase
+{
+public:
+ PrintCommand();
+ ~PrintCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options* Options() override { return &m_Options; }
+
+private:
+ cxxopts::Options m_Options{"print", "Print compact binary object"};
+ std::string m_Filename;
+};
+
+/** Print Compact Binary Package
+ */
+class PrintPackageCommand : public ZenCmdBase
+{
+public:
+ PrintPackageCommand();
+ ~PrintPackageCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options* Options() override { return &m_Options; }
+
+private:
+ cxxopts::Options m_Options{"printpkg", "Print compact binary package"};
+ std::string m_Filename;
+};
+
+} // namespace zen
diff --git a/zen/cmds/run.cpp b/zen/cmds/run.cpp
index 94eb7ef6d..19b5c8980 100644
--- a/zen/cmds/run.cpp
+++ b/zen/cmds/run.cpp
@@ -10,6 +10,7 @@
#include <zencore/fmtutils.h>
#include <zencore/iohash.h>
#include <zencore/logging.h>
+#include <zencore/stream.h>
#include <zencore/string.h>
#include <zencore/timer.h>
#include <zenutil/zenserverprocess.h>
diff --git a/zen/zen.cpp b/zen/zen.cpp
index 86c41d658..3c33ff5e0 100644
--- a/zen/zen.cpp
+++ b/zen/zen.cpp
@@ -9,6 +9,7 @@
#include "cmds/dedup.h"
#include "cmds/deploy.h"
#include "cmds/hash.h"
+#include "cmds/print.h"
#include "cmds/run.h"
#include "cmds/status.h"
#include "cmds/top.h"
@@ -98,18 +99,20 @@ main(int argc, char** argv)
auto _ = zen::MakeGuard([] { spdlog::shutdown(); });
- HashCommand HashCmd;
- CopyCommand CopyCmd;
- DedupCommand DedupCmd;
- DeployCommand DeployCmd;
- DropCommand DropCmd;
- ChunkCommand ChunkCmd;
- RunCommand RunCmd;
- StatusCommand StatusCmd;
- TopCommand TopCmd;
- PsCommand PsCmd;
- UpCommand UpCmd;
- DownCommand DownCmd;
+ HashCommand HashCmd;
+ CopyCommand CopyCmd;
+ DedupCommand DedupCmd;
+ DeployCommand DeployCmd;
+ DropCommand DropCmd;
+ ChunkCommand ChunkCmd;
+ RunCommand RunCmd;
+ StatusCommand StatusCmd;
+ TopCommand TopCmd;
+ PrintCommand PrintCmd;
+ PrintPackageCommand PrintPkgCmd;
+ PsCommand PsCmd;
+ UpCommand UpCmd;
+ DownCommand DownCmd;
#if ZEN_WITH_TESTS
RunTestsCommand RunTestsCmd;
@@ -128,6 +131,8 @@ main(int argc, char** argv)
{"dedup", &DedupCmd, "Dedup files"},
{"drop", &DropCmd, "Drop cache bucket(s)"},
{"hash", &HashCmd, "Compute file hashes"},
+ {"print", &PrintCmd, "Print compact binary object"},
+ {"printpackage", &PrintPkgCmd, "Print compact binary package"},
{"run", &RunCmd, "Remote execution"},
{"status", &StatusCmd, "Show zen status"},
{"ps", &PsCmd, "Enumerate running zen server instances"},
diff --git a/zen/zen.vcxproj b/zen/zen.vcxproj
index fb0674e87..f31c0bc17 100644
--- a/zen/zen.vcxproj
+++ b/zen/zen.vcxproj
@@ -99,6 +99,7 @@
<ClCompile Include="cmds\dedup.cpp" />
<ClCompile Include="cmds\deploy.cpp" />
<ClCompile Include="cmds\hash.cpp" />
+ <ClCompile Include="cmds\print.cpp" />
<ClCompile Include="cmds\run.cpp" />
<ClCompile Include="cmds\scrub.cpp" />
<ClCompile Include="cmds\status.cpp" />
@@ -114,6 +115,7 @@
<ClInclude Include="cmds\dedup.h" />
<ClInclude Include="cmds\deploy.h" />
<ClInclude Include="cmds\hash.h" />
+ <ClInclude Include="cmds\print.h" />
<ClInclude Include="cmds\run.h" />
<ClInclude Include="cmds\scrub.h" />
<ClInclude Include="cmds\status.h" />
diff --git a/zen/zen.vcxproj.filters b/zen/zen.vcxproj.filters
index 9002f01c2..d983b413c 100644
--- a/zen/zen.vcxproj.filters
+++ b/zen/zen.vcxproj.filters
@@ -28,6 +28,7 @@
<ClCompile Include="cmds\up.cpp" />
<ClCompile Include="cmds\cache.cpp" />
<ClCompile Include="cmds\scrub.cpp" />
+ <ClCompile Include="cmds\print.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="chunk\chunk.h" />
@@ -57,6 +58,7 @@
<ClInclude Include="cmds\up.h" />
<ClInclude Include="cmds\cache.h" />
<ClInclude Include="cmds\scrub.h" />
+ <ClInclude Include="cmds\print.h" />
</ItemGroup>
<ItemGroup>
<Filter Include="cmds">
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp
index a06c00e41..f6ba92f98 100644
--- a/zencore/filesystem.cpp
+++ b/zencore/filesystem.cpp
@@ -522,6 +522,23 @@ WriteFile(std::filesystem::path Path, IoBuffer Data)
WriteFile(Path, &DataPtr, 1);
}
+IoBuffer
+FileContents::Flatten()
+{
+ if (Data.size() == 1)
+ {
+ return Data[0];
+ }
+ else if (Data.empty())
+ {
+ return {};
+ }
+ else
+ {
+ ZEN_NOT_IMPLEMENTED();
+ }
+}
+
FileContents
ReadFile(std::filesystem::path Path)
{
@@ -815,7 +832,7 @@ TEST_CASE("filesystem")
using namespace std::filesystem;
// GetExePath
- path BinPath = GetRunningExecutablePath();
+ path BinPath = GetRunningExecutablePath();
const bool ExpectedExe = BinPath.stem() == "zencore-test" || BinPath.stem() == "zenserver-test";
CHECK(ExpectedExe);
CHECK(is_regular_file(BinPath));
diff --git a/zencore/include/zencore/filesystem.h b/zencore/include/zencore/filesystem.h
index 6678528f6..c7ac7140d 100644
--- a/zencore/include/zencore/filesystem.h
+++ b/zencore/include/zencore/filesystem.h
@@ -2,9 +2,10 @@
#pragma once
-#include "stream.h"
#include "zencore.h"
+#include <zencore/iobuffer.h>
+
#include <filesystem>
#include <functional>
@@ -36,6 +37,8 @@ struct FileContents
{
std::vector<IoBuffer> Data;
std::error_code ErrorCode;
+
+ IoBuffer Flatten();
};
ZENCORE_API FileContents ReadFile(std::filesystem::path Path);
diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h
index ed52184d2..263cf672d 100644
--- a/zencore/include/zencore/iobuffer.h
+++ b/zencore/include/zencore/iobuffer.h
@@ -25,6 +25,7 @@ enum class ZenContentType : uint8_t
kCbPackageOffer = 6,
kCompressedBinary = 7,
kUnknownContentType = 8,
+ kHTML = 9,
kCOUNT
};
@@ -54,6 +55,8 @@ ToString(ZenContentType ContentType)
return "compressed-binary"sv;
case ZenContentType::kYAML:
return "yaml"sv;
+ case ZenContentType::kHTML:
+ return "html"sv;
}
}
diff --git a/zencore/include/zencore/refcount.h b/zencore/include/zencore/refcount.h
index 0a1e15614..320718f5b 100644
--- a/zencore/include/zencore/refcount.h
+++ b/zencore/include/zencore/refcount.h
@@ -17,7 +17,7 @@ namespace zen {
class RefCounted
{
public:
- RefCounted() = default;
+ RefCounted() = default;
virtual ~RefCounted() = default;
inline uint32_t AddRef() const { return AtomicIncrement(const_cast<RefCounted*>(this)->m_RefCount); }
diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp
index 8e5d61877..cfd1463ba 100644
--- a/zenhttp/httpserver.cpp
+++ b/zenhttp/httpserver.cpp
@@ -58,6 +58,9 @@ MapContentTypeToString(HttpContentType ContentType)
case HttpContentType::kYAML:
return "text/yaml"sv;
+
+ case HttpContentType::kHTML:
+ return "text/html"sv;
}
}
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 4c89b995a..8ab0276c5 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -405,6 +405,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (!Success)
{
ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
+ m_CacheStats.MissCount++;
return Request.WriteResponse(HttpResponseCode::NotFound);
}
@@ -449,6 +450,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
ValidCount,
AttachmentCount);
+ m_CacheStats.MissCount++;
return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv);
}
}
@@ -467,6 +469,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ m_CacheStats.HitCount++;
Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response);
}
else
@@ -478,6 +481,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
ToString(Value.Value.GetContentType()),
InUpstreamCache ? "UPSTREAM" : "LOCAL");
+ m_CacheStats.HitCount++;
+ if (InUpstreamCache)
+ {
+ m_CacheStats.UpstreamHitCount++;
+ }
+
Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
}
}
@@ -667,11 +676,11 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request
case kHead:
case kGet:
{
- HandleGetCachePayload(Request, Ref, Policy);
if (Verb == kHead)
{
Request.SetSuppressResponseBody();
}
+ HandleGetCachePayload(Request, Ref, Policy);
}
break;
case kPut:
@@ -712,7 +721,8 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
if (!Payload)
{
- ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId);
+ ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType()));
+ m_CacheStats.MissCount++;
return Request.WriteResponse(HttpResponseCode::NotFound);
}
@@ -724,6 +734,12 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
ToString(Payload.GetContentType()),
InUpstreamCache ? "UPSTREAM" : "LOCAL");
+ m_CacheStats.HitCount++;
+ if (InUpstreamCache)
+ {
+ m_CacheStats.UpstreamHitCount++;
+ }
+
Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload);
}
@@ -846,6 +862,23 @@ HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request)
EmitSnapshot("requests", m_HttpRequests, Cbo);
+ const uint64_t HitCount = m_CacheStats.HitCount;
+ const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount;
+ const uint64_t MissCount = m_CacheStats.MissCount;
+ const uint64_t TotalCount = HitCount + MissCount;
+
+ Cbo.BeginObject("cache");
+ Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount) * 100.0) : 0.0);
+ Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) * 100.0 : 0.0);
+ Cbo.EndObject();
+
+ if (m_UpstreamCache)
+ {
+ Cbo.BeginObject("upstream");
+ m_UpstreamCache->GetStatus(Cbo);
+ Cbo.EndObject();
+ }
+
Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 47fc173e9..a360878bd 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -71,6 +71,13 @@ private:
IoHash PayloadId;
};
+ struct CacheStats
+ {
+ std::atomic_uint64_t HitCount{};
+ std::atomic_uint64_t UpstreamHitCount{};
+ std::atomic_uint64_t MissCount{};
+ };
+
[[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef);
void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
@@ -89,6 +96,7 @@ private:
std::unique_ptr<UpstreamCache> m_UpstreamCache;
uint64_t m_LastScrubTime = 0;
metrics::OperationTiming m_HttpRequests;
+ CacheStats m_CacheStats;
};
} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 5e93ebaa9..b97f0830f 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -116,6 +116,13 @@ ZenCacheStore::Scrub(ScrubContext& Ctx)
m_DiskLayer.Scrub(Ctx);
m_MemLayer.Scrub(Ctx);
}
+
+void
+ZenCacheStore::GarbageCollect(GcContext& GcCtx)
+{
+ ZEN_UNUSED(GcCtx);
+}
+
//////////////////////////////////////////////////////////////////////////
ZenCacheMemoryLayer::ZenCacheMemoryLayer()
@@ -195,6 +202,12 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx)
}
void
+ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx)
+{
+ ZEN_UNUSED(GcCtx);
+}
+
+void
ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
std::vector<IoHash> BadHashes;
@@ -294,6 +307,7 @@ struct ZenCacheDiskLayer::CacheBucket
void Drop();
void Flush();
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
inline bool IsOk() const { return m_Ok; }
@@ -611,6 +625,12 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
}
void
+ZenCacheDiskLayer::CacheBucket::GarbageCollect(GcContext& GcCtx)
+{
+ ZEN_UNUSED(GcCtx);
+}
+
+void
ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value)
{
WideStringBuilder<128> DataFilePath;
@@ -830,27 +850,10 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
}
}
-//////////////////////////////////////////////////////////////////////////
-
-ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore)
-{
- ZEN_UNUSED(CacheStore);
-}
-
-ZenCacheTracker::~ZenCacheTracker()
-{
-}
-
-void
-ZenCacheTracker::TrackAccess(std::string_view Bucket, const IoHash& HashKey)
-{
- ZEN_UNUSED(Bucket);
- ZEN_UNUSED(HashKey);
-}
-
void
-ZenCacheTracker::Flush()
+ZenCacheDiskLayer::GarbageCollect(GcContext& GcCtx)
{
+ ZEN_UNUSED(GcCtx);
}
} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index f96757409..011f13323 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -56,6 +56,7 @@ public:
void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
bool DropBucket(std::string_view Bucket);
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
private:
struct CacheBucket
@@ -83,6 +84,7 @@ public:
bool DropBucket(std::string_view Bucket);
void Flush();
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
private:
/** A cache bucket manages a single directory containing
@@ -107,6 +109,7 @@ public:
bool DropBucket(std::string_view Bucket);
void Flush();
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
private:
std::filesystem::path m_RootDir;
@@ -116,18 +119,4 @@ private:
uint64_t m_LastScrubTime = 0;
};
-/** Tracks cache entry access, stats and orchestrates cleanup activities
- */
-class ZenCacheTracker
-{
-public:
- ZenCacheTracker(ZenCacheStore& CacheStore);
- ~ZenCacheTracker();
-
- void TrackAccess(std::string_view Bucket, const IoHash& HashKey);
- void Flush();
-
-private:
-};
-
} // namespace zen
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index 42f59b26c..759534d58 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -90,6 +90,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(GlobalOptions.IsTest)->default_value("false"));
options.add_options()("log-id", "Specify id for adding context to log output", cxxopts::value<std::string>(GlobalOptions.LogId));
options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::filesystem::path>(GlobalOptions.DataDir));
+ options.add_options()("content-dir", "Frontend content directory", cxxopts::value<std::filesystem::path>(GlobalOptions.ContentDir));
options
.add_option("lifetime", "", "owner-pid", "Specify owning process id", cxxopts::value<int>(GlobalOptions.OwnerPid), "<identifier>");
diff --git a/zenserver/config.h b/zenserver/config.h
index 75c19d690..af1a24455 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -17,6 +17,7 @@ struct ZenServerOptions
bool UninstallService = false; // Flag used to initiate service uninstall (temporary)
std::string LogId; // Id for tagging log output
std::filesystem::path DataDir; // Root directory for state (used for testing)
+ std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
};
struct ZenUpstreamJupiterConfig
diff --git a/zenserver/experimental/frontend.cpp b/zenserver/experimental/frontend.cpp
new file mode 100644
index 000000000..79fcf0a17
--- /dev/null
+++ b/zenserver/experimental/frontend.cpp
@@ -0,0 +1,119 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "frontend.h"
+
+#include <zencore/filesystem.h>
+#include <zencore/string.h>
+
+namespace zen {
+
+namespace html {
+
+ constexpr std::string_view Index = R"(
+<!DOCTYPE html>
+<html>
+<head>
+<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" integrity="sha384-F3w7mX95PdgyTmZZMECAngseQB83DfGTowi0iMjiWaeVhAn4FJkqJByhZMI3AhiU" crossorigin="anonymous">
+<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.min.js" integrity="sha384-skAcpIdS7UcVUC05LJ9Dxay8AXcDYfBJqt1CJ85S/CFujBsIzCIv+l9liuYLaMQ/" crossorigin="anonymous"></script>
+<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/font/bootstrap-icons.css">
+<style type="text/css">
+body {
+ background-color: #fafafa;
+}
+</style>
+<script type="text/javascript">
+ const getCacheStats = () => {
+ const opts = { headers: { "Accept": "application/json" } };
+ fetch("/z$", opts)
+ .then(response => {
+ if (!response.ok) {
+ throw Error(response.statusText);
+ }
+ return response.json();
+ })
+ .then(json => {
+ document.getElementById("status").innerHTML = "connected"
+ document.getElementById("stats").innerHTML = JSON.stringify(json, null, 4);
+ })
+ .catch(error => {
+ document.getElementById("status").innerHTML = "disconnected"
+ document.getElementById("stats").innerHTML = ""
+ console.log(error);
+ })
+ .finally(() => {
+ window.setTimeout(getCacheStats, 1000);
+ });
+ };
+ getCacheStats();
+</script>
+</head>
+<body>
+ <div class="container">
+ <div class="row">
+ <div class="text-center mt-5">
+ <pre>
+__________ _________ __
+\____ / ____ ____ / _____/_/ |_ ____ _______ ____
+ / / _/ __ \ / \ \_____ \ \ __\ / _ \ \_ __ \_/ __ \
+ / /_ \ ___/ | | \ / \ | | ( <_> ) | | \/\ ___/
+/_______ \ \___ >|___| //_______ / |__| \____/ |__| \___ >
+ \/ \/ \/ \/ \/
+ </pre>
+ <pre id="status"/>
+ </div>
+ </div>
+ <div class="row">
+ <pre class="mb-0">Z$:</pre>
+ <pre id="stats"></pre>
+ <div>
+ </div>
+</body>
+</html>
+)";
+
+} // namespace html
+
+HttpFrontendService::HttpFrontendService(std::filesystem::path Directory) : m_Directory(Directory)
+{
+}
+
+HttpFrontendService::~HttpFrontendService()
+{
+}
+
+const char*
+HttpFrontendService::BaseUri() const
+{
+ return "/dashboard"; // in order to use the root path we need to remove HttpAddUrlToUrlGroup in HttpSys.cpp
+}
+
+void
+HttpFrontendService::HandleRequest(zen::HttpServerRequest& Request)
+{
+ using namespace std::literals;
+
+ if (m_Directory.empty())
+ {
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kHTML, html::Index);
+ }
+ else
+ {
+ std::string_view Uri = Request.RelativeUri();
+ std::filesystem::path RelPath{Uri.empty() ? "index.html" : Uri};
+ std::filesystem::path AbsPath = m_Directory / RelPath;
+
+ FileContents File = ReadFile(AbsPath);
+
+ if (!File.ErrorCode)
+ {
+ // TODO: Map file extension to MIME type
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kHTML, File.Data[0]);
+ }
+ else
+ {
+ return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Ooops!"sv);
+ }
+ }
+}
+
+} // namespace zen
diff --git a/zenserver/experimental/frontend.h b/zenserver/experimental/frontend.h
new file mode 100644
index 000000000..2ae20e940
--- /dev/null
+++ b/zenserver/experimental/frontend.h
@@ -0,0 +1,24 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenhttp/httpserver.h>
+
+#include <filesystem>
+
+namespace zen {
+
+class HttpFrontendService final : public zen::HttpService
+{
+public:
+ HttpFrontendService(std::filesystem::path Directory);
+ virtual ~HttpFrontendService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+
+private:
+ std::filesystem::path m_Directory;
+};
+
+} // namespace zen
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 7870f9559..6b24692e1 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -1425,7 +1425,12 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver))
{
- ZEN_ERROR("Received malformed package!");
+ std::filesystem::path BadPackagePath =
+ Oplog.TempPath() / "bad_packages" / "session{}_request{}"_format(HttpReq.SessionId(), HttpReq.RequestId());
+
+ ZEN_ERROR("Received malformed package! Saving payload to '{}'", BadPackagePath);
+
+ zen::WriteFile(BadPackagePath, Payload);
return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package");
}
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index b93635e76..0397ddaa0 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -299,13 +299,16 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key,
return PutDerivedData(BucketId, Key.ToHexString(), DerivedData);
}
-CloudCacheResult
+PutRefResult
CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
{
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ PutRefResult Result;
+ Result.ErrorCode = 401;
+ Result.Reason = "Invalid access token"sv;
+ return Result;
}
IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
@@ -328,16 +331,102 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
if (Response.error)
{
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ PutRefResult Result;
+ Result.ErrorCode = static_cast<int32_t>(Response.error.code);
+ Result.Reason = std::move(Response.error.message);
+ return Result;
}
else if (!VerifyAccessToken(Response.status_code))
{
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ PutRefResult Result;
+ Result.ErrorCode = 401;
+ Result.Reason = "Invalid access token"sv;
+ return Result;
}
- return {.Bytes = Response.uploaded_bytes,
- .ElapsedSeconds = Response.elapsed,
- .Success = (Response.status_code == 200 || Response.status_code == 201)};
+ PutRefResult Result;
+ Result.Success = (Response.status_code == 200 || Response.status_code == 201);
+ Result.Bytes = Response.uploaded_bytes;
+ Result.ElapsedSeconds = Response.elapsed;
+
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.text, JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+
+ return Result;
+}
+
+FinalizeRefResult
+CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ FinalizeRefResult Result;
+ Result.ErrorCode = 401;
+ Result.Reason = "Invalid access token"sv;
+ return Result;
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
+ << Key.ToHexString() << "/finalize/" << RefHash.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value},
+ {"X-Jupiter-IoHash", RefHash.ToHexString()},
+ {"Content-Type", "application/x-ue-cb"}});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ FinalizeRefResult Result;
+ Result.ErrorCode = static_cast<int32_t>(Response.error.code);
+ Result.Reason = std::move(Response.error.message);
+ return Result;
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ FinalizeRefResult Result;
+ Result.ErrorCode = 401;
+ Result.Reason = "Invalid access token"sv;
+ return Result;
+ }
+
+ FinalizeRefResult Result;
+ Result.Success = (Response.status_code == 200 || Response.status_code == 201);
+ Result.Bytes = Response.uploaded_bytes;
+ Result.ElapsedSeconds = Response.elapsed;
+
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.text, JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+
+ return Result;
}
CloudCacheResult
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index d8844279e..1de417008 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -2,6 +2,7 @@
#pragma once
+#include <zencore/iohash.h>
#include <zencore/logging.h>
#include <zencore/refcount.h>
#include <zencore/thread.h>
@@ -46,13 +47,23 @@ struct CloudCacheAccessToken
struct CloudCacheResult
{
IoBuffer Response;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- int32_t ErrorCode = {};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ int32_t ErrorCode{};
std::string Reason;
bool Success = false;
};
+struct PutRefResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+};
+
+struct FinalizeRefResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+};
+
/**
* Context for performing Jupiter operations
*
@@ -76,11 +87,13 @@ public:
CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData);
CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData);
- CloudCacheResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
+ PutRefResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
CloudCacheResult PutBlob(const IoHash& Key, IoBuffer Blob);
CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob);
CloudCacheResult PutObject(const IoHash& Key, IoBuffer Object);
+ FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
+
CloudCacheResult DerivedDataExists(std::string_view BucketId, std::string_view Key);
CloudCacheResult DerivedDataExists(std::string_view BucketId, const IoHash& Key);
CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key);
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 0dd16cd06..b1966e299 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -9,6 +9,7 @@
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
+#include <zencore/stats.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
@@ -45,6 +46,7 @@ namespace detail {
{
std::lock_guard Lock(m_Lock);
m_Queue.emplace_back(std::move(Item));
+ m_Size++;
}
m_NewItemSignal.notify_one();
@@ -64,6 +66,7 @@ namespace detail {
{
Item = std::move(m_Queue.front());
m_Queue.pop_front();
+ m_Size--;
return true;
}
@@ -80,7 +83,7 @@ namespace detail {
}
}
- std::size_t Num() const
+ std::size_t Size() const
{
std::unique_lock Lock(m_Lock);
return m_Queue.size();
@@ -91,12 +94,15 @@ namespace detail {
std::condition_variable m_NewItemSignal;
std::deque<T> m_Queue;
std::atomic_bool m_CompleteAdding{false};
+ std::atomic_uint32_t m_Size;
};
class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
- JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc)
+ JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
+ : m_Log(zen::logging::Get("upstream"))
+ , m_UseLegacyDdc(Options.UseLegacyDdc)
{
using namespace fmt::literals;
m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl);
@@ -180,16 +186,23 @@ namespace detail {
}
}
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -200,16 +213,23 @@ namespace detail {
CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -277,16 +297,82 @@ namespace detail {
Success = false;
for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
{
- if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- RecordValue,
- ZenContentType::kCbObject);
+ if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ RecordValue,
+ ZenContentType::kCbObject);
Result.Success)
{
TotalBytes += Result.Bytes;
TotalElapsedSeconds += Result.ElapsedSeconds;
Success = true;
- break;
+
+ if (!Result.Needs.empty())
+ {
+ for (const IoHash& NeededHash : Result.Needs)
+ {
+ Success = false;
+
+ if (auto It =
+ std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash);
+ It != std::end(CacheRecord.PayloadIds))
+ {
+ const size_t Idx = It - std::begin(CacheRecord.PayloadIds);
+
+ if (CloudCacheResult BlobResult =
+ Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ BlobResult.Success)
+ {
+ TotalBytes += BlobResult.Bytes;
+ TotalElapsedSeconds += BlobResult.ElapsedSeconds;
+ Success = true;
+ }
+ else
+ {
+ ZEN_WARN("upload missing payload '{}/{}/{}' FAILED",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ NeededHash);
+ }
+ }
+ else
+ {
+ ZEN_WARN("needed payload '{}/{}/{}' MISSING",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ NeededHash);
+ }
+ }
+
+ const IoHash RefHash = IoHash::HashBuffer(RecordValue);
+
+ if (FinalizeRefResult FinalizeResult =
+ Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
+ FinalizeResult.Success)
+ {
+ TotalBytes += FinalizeResult.Bytes;
+ TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+ Success = true;
+
+ for (const IoHash& MissingHash : FinalizeResult.Needs)
+ {
+ ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ MissingHash);
+ }
+ }
+ else
+ {
+ ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
+ Success = false;
+ }
+ }
+
+ if (Success)
+ {
+ break;
+ }
}
}
@@ -302,6 +388,9 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
bool m_UseLegacyDdc;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
@@ -354,16 +443,23 @@ namespace detail {
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -375,16 +471,23 @@ namespace detail {
const ZenCacheResult Result =
Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -500,7 +603,7 @@ namespace detail {
struct UpstreamStats
{
- static constexpr uint64_t MaxSampleCount = 100ull;
+ static constexpr uint64_t MaxSampleCount = 1000ull;
UpstreamStats(bool Enabled) : m_Enabled(Enabled) {}
@@ -509,13 +612,13 @@ struct UpstreamStats
const GetUpstreamCacheResult& Result,
const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- if (!m_Enabled)
+ UpstreamEndpointStats& Stats = Endpoint.Stats();
+
+ if (Result.Error)
{
- return;
+ Stats.ErrorCount++;
}
-
- UpstreamEndpointStats& Stats = Endpoint.Stats();
- if (Result.Success)
+ else if (Result.Success)
{
Stats.HitCount++;
Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
@@ -526,7 +629,7 @@ struct UpstreamStats
Stats.MissCount++;
}
- if (m_SampleCount++ % MaxSampleCount)
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
{
Dump(Logger, Endpoints);
}
@@ -537,11 +640,6 @@ struct UpstreamStats
const PutUpstreamCacheResult& Result,
const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- if (!m_Enabled)
- {
- return;
- }
-
UpstreamEndpointStats& Stats = Endpoint.Stats();
if (Result.Success)
{
@@ -549,8 +647,12 @@ struct UpstreamStats
Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
Stats.SecondsUp.fetch_add(Result.ElapsedSeconds);
}
+ else
+ {
+ Stats.ErrorCount++;
+ }
- if (m_SampleCount++ % MaxSampleCount)
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
{
Dump(Logger, Endpoints);
}
@@ -575,13 +677,13 @@ struct UpstreamStats
const uint64_t TotalCount = HitCount + MissCount;
const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0;
- Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
- Ep->DisplayName(),
- HitRate,
- DownBytes,
- DownSpeed,
- UpBytes,
- UpSpeed);
+ Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
+ Ep->DisplayName(),
+ HitRate,
+ DownBytes,
+ DownSpeed,
+ UpBytes,
+ UpSpeed);
}
}
@@ -700,6 +802,36 @@ public:
return {};
}
+ virtual void GetStatus(CbObjectWriter& Status) override
+ {
+ Status << "reading" << m_Options.ReadUpstream;
+ Status << "writing" << m_Options.WriteUpstream;
+ Status << "worker_threads" << m_Options.ThreadCount;
+ Status << "queue_count" << m_UpstreamQueue.Size();
+
+ Status.BeginArray("endpoints");
+ for (const auto& Ep : m_Endpoints)
+ {
+ Status.BeginObject();
+ Status << "name" << Ep->DisplayName();
+ Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
+
+ UpstreamEndpointStats& Stats = Ep->Stats();
+ const uint64_t HitCount = Stats.HitCount;
+ const uint64_t MissCount = Stats.MissCount;
+ const uint64_t TotalCount = HitCount + MissCount;
+ const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0;
+
+ Status << "hit_ratio" << HitRate;
+ Status << "downloaded_mb" << Stats.DownBytes;
+ Status << "uploaded_mb" << Stats.UpBytes;
+ Status << "error_count" << Stats.ErrorCount;
+
+ Status.EndObject();
+ }
+ Status.EndArray();
+ }
+
private:
void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
{
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 0e736480b..08f379b11 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -12,6 +12,7 @@
namespace zen {
+class CbObjectWriter;
class CidStore;
class ZenCacheStore;
struct CloudCacheClientOptions;
@@ -44,35 +45,29 @@ struct UpstreamCacheOptions
bool StatsEnabled = false;
};
-enum class UpstreamStatusCode : uint8_t
-{
- Ok,
- Error
-};
-
struct UpstreamError
{
- UpstreamStatusCode StatusCode = UpstreamStatusCode::Ok;
- std::string Reason;
+ int32_t ErrorCode{};
+ std::string Reason{};
- explicit operator bool() const { return StatusCode != UpstreamStatusCode::Ok; }
+ explicit operator bool() const { return ErrorCode != 0; }
};
struct GetUpstreamCacheResult
{
IoBuffer Value;
- UpstreamError Error;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- bool Success = false;
+ UpstreamError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
};
struct PutUpstreamCacheResult
{
std::string Reason;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- bool Success = false;
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
};
struct UpstreamEndpointHealth
@@ -83,13 +78,14 @@ struct UpstreamEndpointHealth
struct UpstreamEndpointStats
{
- std::atomic_uint64_t HitCount = {};
- std::atomic_uint64_t MissCount = {};
- std::atomic_uint64_t UpCount = {};
- std::atomic<double> UpBytes = {};
- std::atomic<double> DownBytes = {};
- std::atomic<double> SecondsUp = {};
- std::atomic<double> SecondsDown = {};
+ std::atomic_uint64_t HitCount{};
+ std::atomic_uint64_t MissCount{};
+ std::atomic_uint64_t UpCount{};
+ std::atomic_uint64_t ErrorCount{};
+ std::atomic<double> UpBytes{};
+ std::atomic<double> DownBytes{};
+ std::atomic<double> SecondsUp{};
+ std::atomic<double> SecondsDown{};
};
/**
@@ -139,6 +135,8 @@ public:
};
virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+
+ virtual void GetStatus(CbObjectWriter& CbO) = 0;
};
std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index b45df9fef..db1be9dea 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -81,6 +81,7 @@
#include "cache/structuredcachestore.h"
#include "compute/apply.h"
#include "diag/diagsvcs.h"
+#include "experimental/frontend.h"
#include "experimental/usnjournal.h"
#include "projectstore.h"
#include "testing/httptest.h"
@@ -302,6 +303,12 @@ public:
{
m_Http->RegisterService(*m_HttpFunctionService);
}
+
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot);
+ if (m_FrontendService)
+ {
+ m_Http->RegisterService(*m_FrontendService);
+ }
}
#if ZEN_ENABLE_MESH
@@ -364,6 +371,7 @@ public:
void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; }
void SetTestMode(bool State) { m_TestMode = State; }
void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; }
+ void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; }
void EnsureIoRunner()
{
@@ -447,6 +455,7 @@ private:
bool m_IsDedicatedMode = false;
bool m_TestMode = false;
std::filesystem::path m_DataRoot;
+ std::filesystem::path m_ContentRoot;
std::jthread m_IoRunner;
asio::io_context m_IoContext;
asio::steady_timer m_PidCheckTimer{m_IoContext};
@@ -471,6 +480,7 @@ private:
zen::HttpHealthService m_HealthService;
zen::Mesh m_ZenMesh{m_IoContext};
std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
+ std::unique_ptr<zen::HttpFrontendService> m_FrontendService;
bool m_DebugOptionForcedCrash = false;
};
@@ -560,6 +570,7 @@ ZenWindowsService::Run()
ZenServer Server;
Server.SetDataRoot(GlobalOptions.DataDir);
+ Server.SetContentRoot(GlobalOptions.ContentDir);
Server.SetTestMode(GlobalOptions.IsTest);
Server.SetDedicatedMode(GlobalOptions.IsDedicated);
Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry);
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index bcb7ea028..335786fbf 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -109,6 +109,7 @@
<ClInclude Include="compute\apply.h" />
<ClInclude Include="config.h" />
<ClInclude Include="diag\logging.h" />
+ <ClInclude Include="experimental\frontend.h" />
<ClInclude Include="resource.h" />
<ClInclude Include="sos\sos.h" />
<ClInclude Include="testing\httptest.h" />
@@ -132,6 +133,7 @@
<ClCompile Include="compute\apply.cpp" />
<ClCompile Include="config.cpp" />
<ClCompile Include="diag\logging.cpp" />
+ <ClCompile Include="experimental\frontend.cpp" />
<ClCompile Include="projectstore.cpp" />
<ClCompile Include="cache\cacheagent.cpp" />
<ClCompile Include="sos\sos.cpp" />
diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters
index 6b99ca8d7..1c5b17fee 100644
--- a/zenserver/zenserver.vcxproj.filters
+++ b/zenserver/zenserver.vcxproj.filters
@@ -38,6 +38,9 @@
<ClInclude Include="testing\httptest.h" />
<ClInclude Include="windows\service.h" />
<ClInclude Include="resource.h" />
+ <ClInclude Include="experimental\frontend.h">
+ <Filter>experimental</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="zenserver.cpp" />
@@ -70,6 +73,10 @@
</ClCompile>
<ClCompile Include="testing\httptest.cpp" />
<ClCompile Include="windows\service.cpp" />
+ <ClCompile Include="admin\admin.cpp" />
+ <ClCompile Include="experimental\frontend.cpp">
+ <Filter>experimental</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<Filter Include="cache">
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp
index 1db2b50bf..808fc8fb3 100644
--- a/zenstore/CAS.cpp
+++ b/zenstore/CAS.cpp
@@ -32,6 +32,15 @@ CasChunkSet::AddChunkToSet(const IoHash& HashToAdd)
}
void
+CasChunkSet::AddChunksToSet(std::span<const IoHash> HashesToAdd)
+{
+ for (const IoHash& Hash : HashesToAdd)
+ {
+ m_ChunkSet.insert(Hash);
+ }
+}
+
+void
CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate)
{
for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;)
@@ -58,6 +67,34 @@ CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callba
//////////////////////////////////////////////////////////////////////////
+struct GcContext::GcState
+{
+ CasChunkSet m_CasChunks;
+ CasChunkSet m_CidChunks;
+};
+
+GcContext::GcContext() : m_State(std::make_unique<GcState>())
+{
+}
+
+GcContext::~GcContext()
+{
+}
+
+void
+GcContext::ContributeCids(std::span<const IoHash> Cids)
+{
+ m_State->m_CidChunks.AddChunksToSet(Cids);
+}
+
+void
+GcContext::ContributeCas(std::span<const IoHash> Cas)
+{
+ m_State->m_CasChunks.AddChunksToSet(Cas);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
void
ScrubContext::ReportBadChunks(std::span<IoHash> BadChunks)
{
diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h
index 93454ca6f..1425845a0 100644
--- a/zenstore/include/zenstore/CAS.h
+++ b/zenstore/include/zenstore/CAS.h
@@ -37,7 +37,16 @@ struct CasStoreConfiguration
class GcContext
{
public:
+ GcContext();
+ ~GcContext();
+
+ void ContributeCids(std::span<const IoHash> Cid);
+ void ContributeCas(std::span<const IoHash> Hash);
+
private:
+ struct GcState;
+
+ std::unique_ptr<GcState> m_State;
};
/** Context object for data scrubbing
@@ -58,10 +67,14 @@ private:
bool m_Recover = true;
};
+/** Manage a set of IoHash values
+ */
+
class CasChunkSet
{
public:
void AddChunkToSet(const IoHash& HashToAdd);
+ void AddChunksToSet(std::span<const IoHash> HashesToAdd);
void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate);
void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback);
inline [[nodiscard]] bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); }
@@ -69,6 +82,7 @@ public:
inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); }
private:
+ // Q: should we protect this with a lock, or is that a higher level concern?
std::unordered_set<IoHash> m_ChunkSet;
};