aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-10-07 08:29:50 +0200
committerMartin Ridgers <[email protected]>2021-10-07 08:29:50 +0200
commit03232621d183f22e12e798a753e4a606763e63d6 (patch)
tree5701d202392dd4ab947139e4046a44ab9bc6cdf7
parentMerged main (diff)
parentOnly enable the MSVC debug output sink for sessions when the --debug mode is ... (diff)
downloadzen-03232621d183f22e12e798a753e4a606763e63d6.tar.xz
zen-03232621d183f22e12e798a753e4a606763e63d6.zip
Merged main
-rw-r--r--scripts/deploybuild.py37
-rw-r--r--zen/cmds/print.cpp107
-rw-r--r--zen/cmds/print.h41
-rw-r--r--zen/cmds/run.cpp1
-rw-r--r--zen/zen.cpp29
-rw-r--r--zen/zen.vcxproj2
-rw-r--r--zen/zen.vcxproj.filters2
-rw-r--r--zencore/compactbinary.cpp51
-rw-r--r--zencore/filesystem.cpp24
-rw-r--r--zencore/include/zencore/blockingqueue.h75
-rw-r--r--zencore/include/zencore/filesystem.h5
-rw-r--r--zencore/include/zencore/iobuffer.h8
-rw-r--r--zencore/include/zencore/refcount.h10
-rw-r--r--zencore/include/zencore/stats.h64
-rw-r--r--zencore/include/zencore/timer.h15
-rw-r--r--zencore/iobuffer.cpp81
-rw-r--r--zencore/stats.cpp91
-rw-r--r--zencore/timer.cpp16
-rw-r--r--zenhttp/httpserver.cpp45
-rw-r--r--zenhttp/httpsys.cpp299
-rw-r--r--zenhttp/httpsys.h21
-rw-r--r--zenhttp/include/zenhttp/httpserver.h2
-rw-r--r--zenhttp/iothreadpool.cpp6
-rw-r--r--zenhttp/iothreadpool.h8
-rw-r--r--zenhttp/workthreadpool.cpp77
-rw-r--r--zenhttp/workthreadpool.h47
-rw-r--r--zenhttp/zenhttp.vcxproj2
-rw-r--r--zenhttp/zenhttp.vcxproj.filters2
-rw-r--r--zenserver-test/zenserver-test.cpp106
-rw-r--r--zenserver/cache/structuredcache.cpp275
-rw-r--r--zenserver/cache/structuredcache.h42
-rw-r--r--zenserver/cache/structuredcachestore.cpp275
-rw-r--r--zenserver/cache/structuredcachestore.h48
-rw-r--r--zenserver/config.cpp22
-rw-r--r--zenserver/config.h4
-rw-r--r--zenserver/diag/logging.cpp5
-rw-r--r--zenserver/experimental/frontend.cpp119
-rw-r--r--zenserver/experimental/frontend.h24
-rw-r--r--zenserver/monitoring/httpstats.cpp50
-rw-r--r--zenserver/monitoring/httpstats.h37
-rw-r--r--zenserver/monitoring/httpstatus.cpp50
-rw-r--r--zenserver/monitoring/httpstatus.h37
-rw-r--r--zenserver/projectstore.cpp12
-rw-r--r--zenserver/testing/httptest.cpp41
-rw-r--r--zenserver/upstream/jupiter.h6
-rw-r--r--zenserver/upstream/upstreamcache.cpp270
-rw-r--r--zenserver/upstream/upstreamcache.h46
-rw-r--r--zenserver/upstream/zen.cpp6
-rw-r--r--zenserver/upstream/zen.h5
-rw-r--r--zenserver/xmake.lua15
-rw-r--r--zenserver/zenserver.cpp292
-rw-r--r--zenserver/zenserver.vcxproj9
-rw-r--r--zenserver/zenserver.vcxproj.filters14
-rw-r--r--zenstore/CAS.cpp48
-rw-r--r--zenstore/cidstore.cpp2
-rw-r--r--zenstore/compactcas.cpp2
-rw-r--r--zenstore/filecas.cpp12
-rw-r--r--zenstore/include/zenstore/CAS.h56
58 files changed, 2421 insertions, 677 deletions
diff --git a/scripts/deploybuild.py b/scripts/deploybuild.py
index 971f34ff9..c81235a8c 100644
--- a/scripts/deploybuild.py
+++ b/scripts/deploybuild.py
@@ -17,6 +17,10 @@ def jazz_print(tag, detail = ""):
def jazz_fail(tag, detail = ""):
print(f"{Fore.RED}{Style.BRIGHT}||> {tag}{Style.RESET_ALL} {detail}")
+def copy_file(src, dst):
+ print(f"{Fore.WHITE}{Style.BRIGHT}||> COPY {Style.RESET_ALL} {src} -> {Fore.GREEN}{Style.BRIGHT}{dst}")
+ shutil.copy(src, dst)
+
colorama.init()
origcwd = os.getcwd()
@@ -25,9 +29,13 @@ origcwd = os.getcwd()
parser = argparse.ArgumentParser(description='Deploy a zen build to an UE tree')
parser.add_argument("root", help="Path to an UE5 root directory")
+parser.add_argument("--sentry", action="store_true", help="Whether to upload symobls to Sentry")
+parser.add_argument("--xmake", action="store_true", help="Build with XMake")
args = parser.parse_args()
engineroot = args.root
+upload_symbols = args.sentry
+use_xmake = args.xmake
if not os.path.isfile(os.path.join(engineroot, "RunUAT.bat")):
print(f"{Fore.RED}Not a valid UE5 engine root directory: '{engineroot}'")
@@ -45,20 +53,29 @@ jazz_print("Zen root:", zenroot)
# Build fresh binaries
-vs_path = vswhere.get_latest_path() # can also specify prerelease=True
-jazz_print("BUILDING CODE", f"using VS root: {vs_path}")
-devenv_path = os.path.join(vs_path, "Common7\\IDE\\devenv.com")
+if use_xmake:
+ build_cmd = ["xmake", "-b", "zenserver"]
+ build_output_dir = r'build\windows\x64\release'
+else:
+ vs_path = vswhere.get_latest_path() # can also specify prerelease=True
+ jazz_print("BUILDING CODE", f"using VS root: {vs_path}")
+ devenv_path = os.path.join(vs_path, "Common7\\IDE\\devenv.com")
+ build_cmd = [devenv_path, "/build", "Release", "zen.sln"]
+ build_output_dir = r'x64\Release'
try:
- subprocess.run([devenv_path, "/build", "Release", "zen.sln"], check=True)
+ subprocess.run(build_cmd, check=True)
except:
jazz_fail("Build failed!")
exit(1)
-# Upload symbols etc to Sentry
+build_output_binary_path = os.path.join(zenroot, build_output_dir, "zenserver.exe")
+build_output_binary_pdb_path = os.path.join(zenroot, build_output_dir, "zenserver.pdb")
-jazz_print("Uploading symbols", "to Sentry")
-subprocess.run(["scripts\sentry-cli.exe", "upload-dif", "--org", "to", "--project", "zen-server", "x64\\Release\\zenserver.exe", "x64\\Release\\zenserver.pdb"])
+# Upload symbols etc to Sentry
+if upload_symbols:
+ jazz_print("Uploading symbols", "to Sentry")
+ subprocess.run(["scripts\sentry-cli.exe", "upload-dif", "--org", "to", "--project", "zen-server", build_output_binary_path, build_output_binary_pdb_path])
# Change into root directory to pick up Perforce environment
@@ -105,9 +122,9 @@ jazz_print("Placing zenserver", f"executables into tree at '{target_bin_dir}'")
crashpadtarget = os.path.join(target_bin_dir, "crashpad_handler.exe")
try:
- shutil.copy(os.path.join(zenroot, "x64\Release\zenserver.exe"), os.path.join(target_bin_dir, "zenserver.exe"))
- shutil.copy(os.path.join(zenroot, "x64\Release\zenserver.pdb"), os.path.join(target_bin_dir, "zenserver.pdb"))
- shutil.copy(os.path.join(zenroot, r'vcpkg_installed\x64-windows-static\x64-windows-static\tools\sentry-native\crashpad_handler.exe'), crashpadtarget)
+ copy_file(build_output_binary_path, os.path.join(target_bin_dir, "zenserver.exe"))
+ copy_file(build_output_binary_pdb_path, os.path.join(target_bin_dir, "zenserver.pdb"))
+ copy_file(os.path.join(zenroot, r'vcpkg_installed\x64-windows-static\tools\sentry-native\crashpad_handler.exe'), crashpadtarget)
P4.add(crashpadtarget).run()
except Exception as e:
print(f"Caught exception while copying: {e.args}")
diff --git a/zen/cmds/print.cpp b/zen/cmds/print.cpp
new file mode 100644
index 000000000..aac6afd44
--- /dev/null
+++ b/zen/cmds/print.cpp
@@ -0,0 +1,107 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "print.h"
+
+#include <zencore/compactbinarypackage.h>
+#include <zencore/filesystem.h>
+#include <zencore/logging.h>
+#include <zencore/string.h>
+
+using namespace std::literals;
+
+namespace zen {
+
+PrintCommand::PrintCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "s", "source", "Object payload file", cxxopts::value(m_Filename), "<file name>");
+}
+
+PrintCommand::~PrintCommand() = default;
+
+int
+PrintCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ ZEN_UNUSED(GlobalOptions, argc, argv);
+
+ m_Options.parse_positional({"source"});
+
+ auto result = m_Options.parse(argc, argv);
+
+ if (result.count("help"))
+ {
+ std::cout << m_Options.help({"", "Group"}) << std::endl;
+
+ return 0;
+ }
+
+ // Validate arguments
+
+ if (m_Filename.empty())
+ throw std::runtime_error("No file specified");
+
+ zen::FileContents Fc = zen::ReadFile(m_Filename);
+ IoBuffer Data = Fc.Flatten();
+ zen::CbObject Object{SharedBuffer(Data)};
+
+ zen::StringBuilder<1024> ObjStr;
+ zen::CompactBinaryToJson(Object, ObjStr);
+ zen::ConsoleLog().info("{}", ObjStr);
+
+ return 0;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+PrintPackageCommand::PrintPackageCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "s", "source", "Package payload file", cxxopts::value(m_Filename), "<file name>");
+}
+
+PrintPackageCommand::~PrintPackageCommand()
+{
+}
+
+int
+PrintPackageCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ ZEN_UNUSED(GlobalOptions, argc, argv);
+
+ m_Options.parse_positional({"source"});
+
+ auto result = m_Options.parse(argc, argv);
+
+ if (result.count("help"))
+ {
+ std::cout << m_Options.help({"", "Group"}) << std::endl;
+
+ return 0;
+ }
+
+ // Validate arguments
+
+ if (m_Filename.empty())
+ throw std::runtime_error("No file specified");
+
+ zen::FileContents Fc = zen::ReadFile(m_Filename);
+ IoBuffer Data = Fc.Flatten();
+ zen::CbPackage Package;
+
+ bool Ok = Package.TryLoad(Data) || zen::legacy::TryLoadCbPackage(Package, Data, &UniqueBuffer::Alloc);
+
+ if (Ok)
+ {
+ zen::StringBuilder<1024> ObjStr;
+ zen::CompactBinaryToJson(Package.GetObject(), ObjStr);
+ zen::ConsoleLog().info("{}", ObjStr);
+ }
+ else
+ {
+ zen::ConsoleLog().error("error: malformed package?");
+ }
+
+ return 0;
+}
+
+} // namespace zen
diff --git a/zen/cmds/print.h b/zen/cmds/print.h
new file mode 100644
index 000000000..eed0aa14e
--- /dev/null
+++ b/zen/cmds/print.h
@@ -0,0 +1,41 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "../zen.h"
+
+namespace zen {
+
+/** Print Compact Binary
+ */
+class PrintCommand : public ZenCmdBase
+{
+public:
+ PrintCommand();
+ ~PrintCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options* Options() override { return &m_Options; }
+
+private:
+ cxxopts::Options m_Options{"print", "Print compact binary object"};
+ std::string m_Filename;
+};
+
+/** Print Compact Binary Package
+ */
+class PrintPackageCommand : public ZenCmdBase
+{
+public:
+ PrintPackageCommand();
+ ~PrintPackageCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options* Options() override { return &m_Options; }
+
+private:
+ cxxopts::Options m_Options{"printpkg", "Print compact binary package"};
+ std::string m_Filename;
+};
+
+} // namespace zen
diff --git a/zen/cmds/run.cpp b/zen/cmds/run.cpp
index 94eb7ef6d..19b5c8980 100644
--- a/zen/cmds/run.cpp
+++ b/zen/cmds/run.cpp
@@ -10,6 +10,7 @@
#include <zencore/fmtutils.h>
#include <zencore/iohash.h>
#include <zencore/logging.h>
+#include <zencore/stream.h>
#include <zencore/string.h>
#include <zencore/timer.h>
#include <zenutil/zenserverprocess.h>
diff --git a/zen/zen.cpp b/zen/zen.cpp
index 86c41d658..3c33ff5e0 100644
--- a/zen/zen.cpp
+++ b/zen/zen.cpp
@@ -9,6 +9,7 @@
#include "cmds/dedup.h"
#include "cmds/deploy.h"
#include "cmds/hash.h"
+#include "cmds/print.h"
#include "cmds/run.h"
#include "cmds/status.h"
#include "cmds/top.h"
@@ -98,18 +99,20 @@ main(int argc, char** argv)
auto _ = zen::MakeGuard([] { spdlog::shutdown(); });
- HashCommand HashCmd;
- CopyCommand CopyCmd;
- DedupCommand DedupCmd;
- DeployCommand DeployCmd;
- DropCommand DropCmd;
- ChunkCommand ChunkCmd;
- RunCommand RunCmd;
- StatusCommand StatusCmd;
- TopCommand TopCmd;
- PsCommand PsCmd;
- UpCommand UpCmd;
- DownCommand DownCmd;
+ HashCommand HashCmd;
+ CopyCommand CopyCmd;
+ DedupCommand DedupCmd;
+ DeployCommand DeployCmd;
+ DropCommand DropCmd;
+ ChunkCommand ChunkCmd;
+ RunCommand RunCmd;
+ StatusCommand StatusCmd;
+ TopCommand TopCmd;
+ PrintCommand PrintCmd;
+ PrintPackageCommand PrintPkgCmd;
+ PsCommand PsCmd;
+ UpCommand UpCmd;
+ DownCommand DownCmd;
#if ZEN_WITH_TESTS
RunTestsCommand RunTestsCmd;
@@ -128,6 +131,8 @@ main(int argc, char** argv)
{"dedup", &DedupCmd, "Dedup files"},
{"drop", &DropCmd, "Drop cache bucket(s)"},
{"hash", &HashCmd, "Compute file hashes"},
+ {"print", &PrintCmd, "Print compact binary object"},
+ {"printpackage", &PrintPkgCmd, "Print compact binary package"},
{"run", &RunCmd, "Remote execution"},
{"status", &StatusCmd, "Show zen status"},
{"ps", &PsCmd, "Enumerate running zen server instances"},
diff --git a/zen/zen.vcxproj b/zen/zen.vcxproj
index fb0674e87..f31c0bc17 100644
--- a/zen/zen.vcxproj
+++ b/zen/zen.vcxproj
@@ -99,6 +99,7 @@
<ClCompile Include="cmds\dedup.cpp" />
<ClCompile Include="cmds\deploy.cpp" />
<ClCompile Include="cmds\hash.cpp" />
+ <ClCompile Include="cmds\print.cpp" />
<ClCompile Include="cmds\run.cpp" />
<ClCompile Include="cmds\scrub.cpp" />
<ClCompile Include="cmds\status.cpp" />
@@ -114,6 +115,7 @@
<ClInclude Include="cmds\dedup.h" />
<ClInclude Include="cmds\deploy.h" />
<ClInclude Include="cmds\hash.h" />
+ <ClInclude Include="cmds\print.h" />
<ClInclude Include="cmds\run.h" />
<ClInclude Include="cmds\scrub.h" />
<ClInclude Include="cmds\status.h" />
diff --git a/zen/zen.vcxproj.filters b/zen/zen.vcxproj.filters
index 9002f01c2..d983b413c 100644
--- a/zen/zen.vcxproj.filters
+++ b/zen/zen.vcxproj.filters
@@ -28,6 +28,7 @@
<ClCompile Include="cmds\up.cpp" />
<ClCompile Include="cmds\cache.cpp" />
<ClCompile Include="cmds\scrub.cpp" />
+ <ClCompile Include="cmds\print.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="chunk\chunk.h" />
@@ -57,6 +58,7 @@
<ClInclude Include="cmds\up.h" />
<ClInclude Include="cmds\cache.h" />
<ClInclude Include="cmds\scrub.h" />
+ <ClInclude Include="cmds\print.h" />
</ItemGroup>
<ItemGroup>
<Filter Include="cmds">
diff --git a/zencore/compactbinary.cpp b/zencore/compactbinary.cpp
index f3fbf312c..2ce6987d2 100644
--- a/zencore/compactbinary.cpp
+++ b/zencore/compactbinary.cpp
@@ -1474,10 +1474,30 @@ public:
Builder << Accessor.AsIntegerNegative();
break;
case CbFieldType::Float32:
- Builder.Append("{:.9g}"_format(Accessor.AsFloat32()));
+ {
+ const float Value = Accessor.AsFloat32();
+ if (std::isfinite(Value))
+ {
+ Builder.Append("{:.9g}"_format(Value));
+ }
+ else
+ {
+ Builder << "null"sv;
+ }
+ }
break;
case CbFieldType::Float64:
- Builder.Append("{:.17g}"_format(Accessor.AsFloat64()));
+ {
+ const double Value = Accessor.AsFloat64();
+ if (std::isfinite(Value))
+ {
+ Builder.Append("{:.17g}"_format(Value));
+ }
+ else
+ {
+ Builder << "null"sv;
+ }
+ }
break;
case CbFieldType::BoolFalse:
Builder << "false"sv;
@@ -1834,7 +1854,7 @@ TEST_CASE("uson.json")
SUBCASE("number")
{
- const double ExpectedFloatValue = 21.21f;
+ const float ExpectedFloatValue = 21.21f;
const double ExpectedDoubleValue = 42.42;
CbObjectWriter Writer;
@@ -1856,6 +1876,31 @@ TEST_CASE("uson.json")
CHECK(FloatValue == doctest::Approx(ExpectedFloatValue));
CHECK(DoubleValue == doctest::Approx(ExpectedDoubleValue));
}
+
+ SUBCASE("number.nan")
+ {
+ const float FloatNan = std::numeric_limits<float>::quiet_NaN();
+ const double DoubleNan = std::numeric_limits<double>::quiet_NaN();
+
+ CbObjectWriter Writer;
+ Writer << "FloatNan" << FloatNan;
+ Writer << "DoubleNan" << DoubleNan;
+
+ CbObject Obj = Writer.Save();
+
+ StringBuilder<128> Sb;
+ const std::string_view JsonText = Obj.ToJson(Sb).ToView();
+
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(JsonText.data(), JsonError);
+
+ const double FloatValue = Json["FloatNan"].number_value();
+ const double DoubleValue = Json["DoubleNan"].number_value();
+
+ CHECK(JsonError.empty());
+ CHECK(FloatValue == 0);
+ CHECK(DoubleValue == 0);
+ }
}
#endif
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp
index 2d2603434..9936f30ec 100644
--- a/zencore/filesystem.cpp
+++ b/zencore/filesystem.cpp
@@ -63,6 +63,13 @@ DeleteReparsePoint(const wchar_t* Path, DWORD dwReparseTag)
bool
CreateDirectories(const wchar_t* Dir)
{
+ // This may be suboptimal, in that it appears to try and create directories
+ // from the root on up instead of from some directory which is known to
+ // be present
+ //
+ // We should implement a smarter version at some point since this can be
+ // pretty expensive in aggregate
+
return std::filesystem::create_directories(Dir);
}
@@ -522,6 +529,23 @@ WriteFile(std::filesystem::path Path, IoBuffer Data)
WriteFile(Path, &DataPtr, 1);
}
+IoBuffer
+FileContents::Flatten()
+{
+ if (Data.size() == 1)
+ {
+ return Data[0];
+ }
+ else if (Data.empty())
+ {
+ return {};
+ }
+ else
+ {
+ ZEN_NOT_IMPLEMENTED();
+ }
+}
+
FileContents
ReadFile(std::filesystem::path Path)
{
diff --git a/zencore/include/zencore/blockingqueue.h b/zencore/include/zencore/blockingqueue.h
new file mode 100644
index 000000000..277095689
--- /dev/null
+++ b/zencore/include/zencore/blockingqueue.h
@@ -0,0 +1,75 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <atomic>
+#include <deque>
+#include <mutex>
+
+namespace zen {
+
+template<typename T>
+class BlockingQueue
+{
+public:
+ BlockingQueue() = default;
+
+ ~BlockingQueue() { CompleteAdding(); }
+
+ void Enqueue(T&& Item)
+ {
+ {
+ std::lock_guard Lock(m_Lock);
+ m_Queue.emplace_back(std::move(Item));
+ m_Size++;
+ }
+
+ m_NewItemSignal.notify_one();
+ }
+
+ bool WaitAndDequeue(T& Item)
+ {
+ if (m_CompleteAdding.load())
+ {
+ return false;
+ }
+
+ std::unique_lock Lock(m_Lock);
+ m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); });
+
+ if (!m_Queue.empty())
+ {
+ Item = std::move(m_Queue.front());
+ m_Queue.pop_front();
+ m_Size--;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ void CompleteAdding()
+ {
+ if (!m_CompleteAdding.load())
+ {
+ m_CompleteAdding.store(true);
+ m_NewItemSignal.notify_all();
+ }
+ }
+
+ std::size_t Size() const
+ {
+ std::unique_lock Lock(m_Lock);
+ return m_Queue.size();
+ }
+
+private:
+ mutable std::mutex m_Lock;
+ std::condition_variable m_NewItemSignal;
+ std::deque<T> m_Queue;
+ std::atomic_bool m_CompleteAdding{false};
+ std::atomic_uint32_t m_Size;
+};
+
+} // namespace zen
diff --git a/zencore/include/zencore/filesystem.h b/zencore/include/zencore/filesystem.h
index 6678528f6..c7ac7140d 100644
--- a/zencore/include/zencore/filesystem.h
+++ b/zencore/include/zencore/filesystem.h
@@ -2,9 +2,10 @@
#pragma once
-#include "stream.h"
#include "zencore.h"
+#include <zencore/iobuffer.h>
+
#include <filesystem>
#include <functional>
@@ -36,6 +37,8 @@ struct FileContents
{
std::vector<IoBuffer> Data;
std::error_code ErrorCode;
+
+ IoBuffer Flatten();
};
ZENCORE_API FileContents ReadFile(std::filesystem::path Path);
diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h
index 36ecbd9a7..110cd7d9d 100644
--- a/zencore/include/zencore/iobuffer.h
+++ b/zencore/include/zencore/iobuffer.h
@@ -25,6 +25,7 @@ enum class ZenContentType : uint8_t
kCbPackageOffer = 6,
kCompressedBinary = 7,
kUnknownContentType = 8,
+ kHTML = 9,
kCOUNT
};
@@ -54,6 +55,8 @@ ToString(ZenContentType ContentType)
return "compressed-binary"sv;
case ZenContentType::kYAML:
return "yaml"sv;
+ case ZenContentType::kHTML:
+ return "html"sv;
}
}
@@ -218,6 +221,7 @@ protected:
kIsMaterialized = 1 << 3, // Data pointers are valid
kLowLevelAlloc = 1 << 4, // Using direct memory allocation
kIsWholeFile = 1 << 5, // References an entire file
+ kIoBufferAlloc = 1 << 6, // Using IoBuffer allocator
kIsOwnedByThis = 1 << 7,
// Note that we have some extended flags defined below
@@ -229,8 +233,8 @@ protected:
kContentTypeBit3 = 1 << (24 + 3), // bits are reserved
};
- void* AllocateBuffer(size_t InSize, size_t Alignment);
- void FreeBuffer();
+ void AllocateBuffer(size_t InSize, size_t Alignment);
+ void FreeBuffer();
};
/**
diff --git a/zencore/include/zencore/refcount.h b/zencore/include/zencore/refcount.h
index 320718f5b..1873ce48e 100644
--- a/zencore/include/zencore/refcount.h
+++ b/zencore/include/zencore/refcount.h
@@ -4,6 +4,8 @@
#include "atomic.h"
#include "zencore.h"
+#include <concepts>
+
namespace zen {
/**
@@ -114,6 +116,11 @@ public:
inline Ref(T* Ptr) : m_Ref(Ptr) { m_Ref && m_Ref->AddRef(); }
inline ~Ref() { m_Ref && m_Ref->Release(); }
+ template<typename DerivedType>
+ requires std::derived_from<DerivedType, T> inline Ref(const Ref<DerivedType>& Rhs) : Ref(Rhs.m_Ref)
+ {
+ }
+
[[nodiscard]] inline bool IsNull() const { return m_Ref == nullptr; }
inline explicit operator bool() const { return m_Ref != nullptr; }
inline T* operator->() const { return m_Ref; }
@@ -152,6 +159,9 @@ public:
private:
T* m_Ref = nullptr;
+
+ template<class T>
+ friend class Ref;
};
void refcount_forcelink();
diff --git a/zencore/include/zencore/stats.h b/zencore/include/zencore/stats.h
index dfa8dac34..884bb53f6 100644
--- a/zencore/include/zencore/stats.h
+++ b/zencore/include/zencore/stats.h
@@ -209,6 +209,8 @@ public:
Scope(OperationTiming& Outer);
~Scope();
+ void Cancel();
+
private:
OperationTiming& m_Outer;
uint64_t m_StartTick;
@@ -219,10 +221,70 @@ private:
Histogram m_Histogram;
};
+/** Metrics for network requests
+
+ Aggregates tracking of duration, payload sizes into a single
+ class
+
+ */
+class RequestStats
+{
+public:
+ RequestStats(int32_t SampleCount = 514);
+ ~RequestStats();
+
+ void Update(int64_t Duration, int64_t Bytes);
+ uint64_t Count() const;
+
+ // Timing
+
+ int64_t MaxDuration() const { return m_BytesHistogram.Max(); }
+ int64_t MinDuration() const { return m_BytesHistogram.Min(); }
+ double MeanDuration() const { return m_BytesHistogram.Mean(); }
+ SampleSnapshot DurationSnapshot() const { return m_RequestTimeHistogram.Snapshot(); }
+ double Rate1() { return m_RequestMeter.Rate1(); }
+ double Rate5() { return m_RequestMeter.Rate5(); }
+ double Rate15() { return m_RequestMeter.Rate15(); }
+ double MeanRate() const { return m_RequestMeter.MeanRate(); }
+
+ // Bytes
+
+ int64_t MaxBytes() const { return m_BytesHistogram.Max(); }
+ int64_t MinBytes() const { return m_BytesHistogram.Min(); }
+ double MeanBytes() const { return m_BytesHistogram.Mean(); }
+ SampleSnapshot BytesSnapshot() const { return m_BytesHistogram.Snapshot(); }
+ double ByteRate1() { return m_BytesMeter.Rate1(); }
+ double ByteRate5() { return m_BytesMeter.Rate5(); }
+ double ByteRate15() { return m_BytesMeter.Rate15(); }
+ double ByteMeanRate() const { return m_BytesMeter.MeanRate(); }
+
+ struct Scope
+ {
+ Scope(OperationTiming& Outer);
+ ~Scope();
+
+ void Cancel();
+
+ private:
+ OperationTiming& m_Outer;
+ uint64_t m_StartTick;
+ };
+
+ void EmitSnapshot(std::string_view Tag, CbObjectWriter& Cbo);
+
+private:
+ Meter m_RequestMeter;
+ Meter m_BytesMeter;
+ Histogram m_RequestTimeHistogram;
+ Histogram m_BytesHistogram;
+};
+
void EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo);
-void EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo);
+void EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor);
void EmitSnapshot(std::string_view Tag, Meter& Stat, CbObjectWriter& Cbo);
+void EmitSnapshot(const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor);
+
} // namespace zen::metrics
namespace zen {
diff --git a/zencore/include/zencore/timer.h b/zencore/include/zencore/timer.h
index 693b6daaa..e4ddc3505 100644
--- a/zencore/include/zencore/timer.h
+++ b/zencore/include/zencore/timer.h
@@ -38,6 +38,21 @@ private:
uint64_t m_StartValue;
};
+// Low frequency timers
+
+namespace detail {
+ extern ZENCORE_API uint64_t g_LofreqTimerValue;
+} // namespace detail
+
+inline uint64_t
+GetLofreqTimerValue()
+{
+ return detail::g_LofreqTimerValue;
+}
+
+ZENCORE_API void UpdateLofreqTimerValue();
+ZENCORE_API uint64_t GetLofreqTimerFrequency();
+
void timer_forcelink(); // internal
} // namespace zen
diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp
index 244425761..04685defc 100644
--- a/zencore/iobuffer.cpp
+++ b/zencore/iobuffer.cpp
@@ -31,26 +31,29 @@ namespace zen {
//////////////////////////////////////////////////////////////////////////
-void*
+void
IoBufferCore::AllocateBuffer(size_t InSize, size_t Alignment)
{
#if ZEN_PLATFORM_WINDOWS
if (((InSize & 0xffFF) == 0) && (Alignment == 0x10000))
{
m_Flags |= kLowLevelAlloc;
- return VirtualAlloc(nullptr, InSize, MEM_COMMIT, PAGE_READWRITE);
+ m_DataPtr = VirtualAlloc(nullptr, InSize, MEM_COMMIT, PAGE_READWRITE);
+
+ return;
}
#endif // ZEN_PLATFORM_WINDOWS
-#if ZEN_USE_MIMALLOC && 0
+#if ZEN_USE_MIMALLOC
void* Ptr = mi_aligned_alloc(Alignment, RoundUp(InSize, Alignment));
+ m_Flags |= kIoBufferAlloc;
#else
void* Ptr = Memory::Alloc(InSize, Alignment);
#endif
ZEN_ASSERT(Ptr);
- return Ptr;
+ m_DataPtr = Ptr;
}
void
@@ -70,11 +73,14 @@ IoBufferCore::FreeBuffer()
}
#endif // ZEN_PLATFORM_WINDOWS
-#if ZEN_USE_MIMALLOC && 0
- return mi_free(const_cast<void*>(m_DataPtr));
-#else
- return Memory::Free(const_cast<void*>(m_DataPtr));
+#if ZEN_USE_MIMALLOC
+ if (m_Flags & kIoBufferAlloc)
+ {
+ return mi_free(const_cast<void*>(m_DataPtr));
+ }
#endif
+
+ return Memory::Free(const_cast<void*>(m_DataPtr));
}
//////////////////////////////////////////////////////////////////////////
@@ -85,7 +91,7 @@ IoBufferCore::IoBufferCore(size_t InSize)
{
ZEN_ASSERT(InSize);
- m_DataPtr = AllocateBuffer(InSize, sizeof(void*));
+ AllocateBuffer(InSize, sizeof(void*));
m_DataBytes = InSize;
SetIsOwnedByThis(true);
@@ -95,7 +101,7 @@ IoBufferCore::IoBufferCore(size_t InSize, size_t Alignment)
{
ZEN_ASSERT(InSize);
- m_DataPtr = AllocateBuffer(InSize, Alignment);
+ AllocateBuffer(InSize, Alignment);
m_DataBytes = InSize;
SetIsOwnedByThis(true);
@@ -138,10 +144,9 @@ IoBufferCore::MakeOwned(bool Immutable)
{
if (!IsOwned())
{
- void* OwnedDataPtr = AllocateBuffer(m_DataBytes, sizeof(void*));
- memcpy(OwnedDataPtr, m_DataPtr, m_DataBytes);
-
- m_DataPtr = OwnedDataPtr;
+ const void* OldDataPtr = m_DataPtr;
+ AllocateBuffer(m_DataBytes, sizeof(void*));
+ memcpy(const_cast<void*>(m_DataPtr), OldDataPtr, m_DataBytes);
SetIsOwnedByThis(true);
}
@@ -183,29 +188,29 @@ IoBufferExtendedCore::~IoBufferExtendedCore()
{
if (m_MappedPointer)
{
-#if ZEN_PLATFORM_WINDOWS
+# if ZEN_PLATFORM_WINDOWS
UnmapViewOfFile(m_MappedPointer);
-#else
+# else
uint64_t MapSize = ~uint64_t(uintptr_t(m_MmapHandle));
munmap(m_MappedPointer, MapSize);
-#endif
+# endif
}
-#if ZEN_PLATFORM_WINDOWS
+# if ZEN_PLATFORM_WINDOWS
if (m_Flags & kOwnsMmap)
{
CloseHandle(m_MmapHandle);
}
-#endif
+# endif
if (m_Flags & kOwnsFile)
{
-#if ZEN_PLATFORM_WINDOWS
+# if ZEN_PLATFORM_WINDOWS
BOOL Success = CloseHandle(m_FileHandle);
-#else
+# else
int Fd = int(uintptr_t(m_FileHandle));
bool Success = (close(Fd) == 0);
-#endif
+# endif
if (!Success)
{
@@ -239,7 +244,7 @@ IoBufferExtendedCore::Materialize() const
const uint64_t MappedOffsetDisplacement = m_FileOffset - MapOffset;
const uint64_t MapSize = m_DataBytes + MappedOffsetDisplacement;
-#if ZEN_PLATFORM_WINDOWS
+# if ZEN_PLATFORM_WINDOWS
m_MmapHandle = CreateFileMapping(m_FileHandle,
/* lpFileMappingAttributes */ nullptr,
/* flProtect */ PAGE_READONLY,
@@ -260,7 +265,7 @@ IoBufferExtendedCore::Materialize() const
/* FileOffsetHigh */ uint32_t(MapOffset >> 32),
/* FileOffsetLow */ uint32_t(MapOffset & 0xffFFffFFu),
/* dwNumberOfBytesToMap */ MapSize);
-#else
+# else
m_MmapHandle = (void*)uintptr_t(~MapSize); // ~ so it's never null (assuming MapSize >= 0)
m_Flags |= kOwnsMmap;
@@ -271,7 +276,7 @@ IoBufferExtendedCore::Materialize() const
/* flags */ MAP_SHARED | MAP_NORESERVE,
/* fd */ int(uintptr_t(m_FileHandle)),
/* offset */ MapOffset);
-#endif // ZEN_PLATFORM_WINDOWS
+# endif // ZEN_PLATFORM_WINDOWS
if (MappedBase == nullptr)
{
@@ -376,7 +381,7 @@ IoBufferBuilder::MakeFromFile(const path_char_t* FileName, uint64_t Offset, uint
{
uint64_t FileSize;
-#if ZEN_PLATFORM_WINDOWS
+# if ZEN_PLATFORM_WINDOWS
CAtlFile DataFile;
HRESULT hRes = DataFile.Create(FileName, GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
@@ -387,7 +392,7 @@ IoBufferBuilder::MakeFromFile(const path_char_t* FileName, uint64_t Offset, uint
}
DataFile.GetSize((ULONGLONG&)FileSize);
-#else
+# else
int Fd = open(FileName, O_RDONLY);
if (Fd < 0)
{
@@ -398,7 +403,7 @@ IoBufferBuilder::MakeFromFile(const path_char_t* FileName, uint64_t Offset, uint
struct stat Stat;
fstat(Fd, &Stat);
FileSize = Stat.st_size;
-#endif // ZEN_PLATFORM_WINDOWS
+# endif // ZEN_PLATFORM_WINDOWS
// TODO: should validate that offset is in range
@@ -417,15 +422,15 @@ IoBufferBuilder::MakeFromFile(const path_char_t* FileName, uint64_t Offset, uint
if (Size)
{
-#if ZEN_PLATFORM_WINDOWS
+# if ZEN_PLATFORM_WINDOWS
void* Fd = DataFile.Detach();
-#endif
+# endif
return IoBuffer(IoBuffer::File, (void*)uintptr_t(Fd), Offset, Size);
}
-#if !ZEN_PLATFORM_WINDOWS
+# if !ZEN_PLATFORM_WINDOWS
close(Fd);
-#endif
+# endif
// For an empty file, we may as well just return an empty memory IoBuffer
return IoBuffer(IoBuffer::Wrap, "", 0);
@@ -437,7 +442,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const path_char_t* FileName)
uint64_t FileSize;
void* Handle;
-#if ZEN_PLATFORM_WINDOWS
+# if ZEN_PLATFORM_WINDOWS
CAtlFile DataFile;
// We need to open with DELETE since this is used for the case
@@ -454,8 +459,8 @@ IoBufferBuilder::MakeFromTemporaryFile(const path_char_t* FileName)
DataFile.GetSize((ULONGLONG&)FileSize);
Handle = DataFile.Detach();
-#else
- int Fd = open(FileName, O_RDONLY);
+# else
+ int Fd = open(FileName, O_RDONLY);
if (Fd < 0)
{
return {};
@@ -467,7 +472,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const path_char_t* FileName)
FileSize = Stat.st_size;
Handle = (void*)uintptr_t(Fd);
-#endif // ZEN_PLATFORM_WINDOWS
+# endif // ZEN_PLATFORM_WINDOWS
IoBuffer Iob(IoBuffer::File, Handle, 0, FileSize);
Iob.m_Core->SetIsWholeFile(true);
@@ -484,7 +489,7 @@ HashBuffer(IoBuffer& Buffer)
//////////////////////////////////////////////////////////////////////////
-#if ZEN_WITH_TESTS
+# if ZEN_WITH_TESTS
void
iobuffer_forcelink()
@@ -498,6 +503,6 @@ TEST_CASE("IoBuffer")
zen::IoBuffer buffer3(buffer2, 0, buffer2.Size());
}
-#endif
+# endif
} // namespace zen
diff --git a/zencore/stats.cpp b/zencore/stats.cpp
index 34dc2828f..0c0647999 100644
--- a/zencore/stats.cpp
+++ b/zencore/stats.cpp
@@ -295,7 +295,14 @@ Histogram::Min() const
double
Histogram::Mean() const
{
- return double(m_Sum.load(std::memory_order_relaxed)) / m_Count;
+ if (m_Count)
+ {
+ return double(m_Sum.load(std::memory_order_relaxed)) / m_Count;
+ }
+ else
+ {
+ return 0.0;
+ }
}
uint64_t
@@ -398,12 +405,73 @@ OperationTiming::Scope::Scope(OperationTiming& Outer) : m_Outer(Outer), m_StartT
OperationTiming::Scope::~Scope()
{
- m_Outer.Update(GetHifreqTimerValue() - m_StartTick);
+ if (m_StartTick != 0)
+ {
+ m_Outer.Update(GetHifreqTimerValue() - m_StartTick);
+ }
+}
+
+void
+OperationTiming::Scope::Cancel()
+{
+ m_StartTick = 0;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+RequestStats::RequestStats(int32_t SampleCount) : m_RequestTimeHistogram{SampleCount}, m_BytesHistogram{SampleCount}
+{
+}
+
+RequestStats::~RequestStats()
+{
+}
+
+void
+RequestStats::Update(int64_t Duration, int64_t Bytes)
+{
+ m_RequestMeter.Mark(1);
+ m_RequestTimeHistogram.Update(Duration);
+
+ m_BytesMeter.Mark(Bytes);
+ m_BytesHistogram.Update(Bytes);
+}
+
+uint64_t
+RequestStats::Count() const
+{
+ return m_RequestMeter.Count();
}
//////////////////////////////////////////////////////////////////////////
void
+EmitSnapshot(Meter& Stat, CbObjectWriter& Cbo)
+{
+ Cbo << "count" << Stat.Count();
+ Cbo << "rate_mean" << Stat.MeanRate();
+ Cbo << "rate_1" << Stat.Rate1() << "rate_5" << Stat.Rate5() << "rate_15" << Stat.Rate15();
+}
+
+void
+RequestStats::EmitSnapshot(std::string_view Tag, CbObjectWriter& Cbo)
+{
+ Cbo.BeginObject(Tag);
+
+ Cbo.BeginObject("requests");
+ metrics::EmitSnapshot(m_RequestMeter, Cbo);
+ metrics::EmitSnapshot(m_RequestTimeHistogram, Cbo, GetHifreqTimerToSeconds());
+ Cbo.EndObject();
+
+ Cbo.BeginObject("bytes");
+ metrics::EmitSnapshot(m_BytesMeter, Cbo);
+ metrics::EmitSnapshot(m_BytesHistogram, Cbo, 1.0);
+ Cbo.EndObject();
+
+ Cbo.EndObject();
+}
+
+void
EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo)
{
Cbo.BeginObject(Tag);
@@ -411,7 +479,6 @@ EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo)
SampleSnapshot Snap = Stat.Snapshot();
Cbo << "count" << Stat.Count();
-
Cbo << "rate_mean" << Stat.MeanRate();
Cbo << "rate_1" << Stat.Rate1() << "rate_5" << Stat.Rate5() << "rate_15" << Stat.Rate15();
@@ -426,18 +493,22 @@ EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo)
}
void
-EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo)
+EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor)
{
Cbo.BeginObject(Tag);
+ EmitSnapshot(Stat, Cbo, ConversionFactor);
+ Cbo.EndObject();
+}
+void
+EmitSnapshot(const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor)
+{
SampleSnapshot Snap = Stat.Snapshot();
- Cbo << "count" << Stat.Count() << "avg" << Stat.Mean();
- Cbo << "min" << Stat.Min() << "max" << Stat.Max();
- Cbo << "p75" << Snap.Get75Percentile() << "p95" << Snap.Get95Percentile() << "p99" << Snap.Get99Percentile() << "p999"
- << Snap.Get999Percentile();
-
- Cbo.EndObject();
+ Cbo << "count" << Stat.Count() * ConversionFactor << "avg" << Stat.Mean() * ConversionFactor;
+ Cbo << "min" << Stat.Min() * ConversionFactor << "max" << Stat.Max() * ConversionFactor;
+ Cbo << "p75" << Snap.Get75Percentile() * ConversionFactor << "p95" << Snap.Get95Percentile() * ConversionFactor << "p99"
+ << Snap.Get99Percentile() * ConversionFactor << "p999" << Snap.Get999Percentile() * ConversionFactor;
}
void
diff --git a/zencore/timer.cpp b/zencore/timer.cpp
index 5d30d9b29..9180519bd 100644
--- a/zencore/timer.cpp
+++ b/zencore/timer.cpp
@@ -69,6 +69,22 @@ GetHifreqTimerFrequencySafe()
}
//////////////////////////////////////////////////////////////////////////
+
+uint64_t detail::g_LofreqTimerValue = GetHifreqTimerValue();
+
+void
+UpdateLofreqTimerValue()
+{
+ detail::g_LofreqTimerValue = GetHifreqTimerValue();
+}
+
+uint64_t
+GetLofreqTimerFrequency()
+{
+ return GetHifreqTimerFrequencySafe();
+}
+
+//////////////////////////////////////////////////////////////////////////
//
// Testing related code follows...
//
diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp
index 8e5d61877..69974ca06 100644
--- a/zenhttp/httpserver.cpp
+++ b/zenhttp/httpserver.cpp
@@ -58,19 +58,25 @@ MapContentTypeToString(HttpContentType ContentType)
case HttpContentType::kYAML:
return "text/yaml"sv;
+
+ case HttpContentType::kHTML:
+ return "text/html"sv;
}
}
//////////////////////////////////////////////////////////////////////////
static constinit uint32_t HashBinary = HashStringDjb2("application/octet-stream"sv);
-static constinit uint32_t HashJson = HashStringDjb2("application/json"sv);
-static constinit uint32_t HashYaml = HashStringDjb2("text/yaml"sv);
+static constinit uint32_t HashApplicationJson = HashStringDjb2("application/json"sv);
+static constinit uint32_t HashApplicationYaml = HashStringDjb2("text/yaml"sv);
static constinit uint32_t HashText = HashStringDjb2("text/plain"sv);
static constinit uint32_t HashCompactBinary = HashStringDjb2("application/x-ue-cb"sv);
static constinit uint32_t HashCompactBinaryPackage = HashStringDjb2("application/x-ue-cbpkg"sv);
static constinit uint32_t HashCompactBinaryPackageOffer = HashStringDjb2("application/x-ue-offer"sv);
static constinit uint32_t HashCompressedBinary = HashStringDjb2("application/x-ue-comp"sv);
+static constinit uint32_t HashJson = HashStringDjb2("json"sv);
+static constinit uint32_t HashYaml = HashStringDjb2("yaml"sv);
+static constinit uint32_t HashHtml = HashStringDjb2("text/html"sv);
std::once_flag InitContentTypeLookup;
@@ -78,14 +84,21 @@ struct HashedTypeEntry
{
uint32_t Hash;
HttpContentType Type;
-} TypeHashTable[] = {{HashBinary, HttpContentType::kBinary},
- {HashCompactBinary, HttpContentType::kCbObject},
- {HashCompactBinaryPackage, HttpContentType::kCbPackage},
- {HashCompactBinaryPackageOffer, HttpContentType::kCbPackageOffer},
- {HashJson, HttpContentType::kJSON},
- {HashYaml, HttpContentType::kYAML},
- {HashText, HttpContentType::kText},
- {HashCompressedBinary, HttpContentType::kCompressedBinary}};
+} TypeHashTable[] = {
+ // clang-format off
+ {HashBinary, HttpContentType::kBinary},
+ {HashCompactBinary, HttpContentType::kCbObject},
+ {HashCompactBinaryPackage, HttpContentType::kCbPackage},
+ {HashCompactBinaryPackageOffer, HttpContentType::kCbPackageOffer},
+ {HashJson, HttpContentType::kJSON},
+ {HashApplicationJson, HttpContentType::kJSON},
+ {HashYaml, HttpContentType::kYAML},
+ {HashApplicationYaml, HttpContentType::kYAML},
+ {HashText, HttpContentType::kText},
+ {HashCompressedBinary, HttpContentType::kCompressedBinary},
+ {HashHtml, HttpContentType::kHTML},
+ // clang-format on
+};
HttpContentType
ParseContentTypeImpl(const std::string_view& ContentTypeString)
@@ -117,6 +130,16 @@ ParseContentTypeInit(const std::string_view& ContentTypeString)
std::sort(std::begin(TypeHashTable), std::end(TypeHashTable), [](const HashedTypeEntry& Lhs, const HashedTypeEntry& Rhs) {
return Lhs.Hash < Rhs.Hash;
});
+
+ // validate that there are no hash collisions
+
+ uint32_t LastHash = 0;
+
+ for (const auto& Item : TypeHashTable)
+ {
+ ZEN_ASSERT(LastHash != Item.Hash);
+ LastHash = Item.Hash;
+ }
});
ParseContentType = ParseContentTypeImpl;
@@ -548,7 +571,7 @@ CreateHttpServer()
#if 0
return new HttpUwsServer;
#elif ZEN_WITH_HTTPSYS
- return new HttpSysServer{std::thread::hardware_concurrency()};
+ return new HttpSysServer(std::thread::hardware_concurrency(), /* background worker threads */ 16);
#else
return new HttpNullServer;
#endif
diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp
index 9b2e7f832..f88563097 100644
--- a/zenhttp/httpsys.cpp
+++ b/zenhttp/httpsys.cpp
@@ -129,16 +129,18 @@ GetAcceptType(const HTTP_REQUEST* HttpRequest)
class HttpSysRequestHandler
{
public:
- explicit HttpSysRequestHandler(HttpSysTransaction& InRequest) : m_Request(InRequest) {}
+ explicit HttpSysRequestHandler(HttpSysTransaction& Transaction) : m_Transaction(Transaction) {}
virtual ~HttpSysRequestHandler() = default;
virtual void IssueRequest(std::error_code& ErrorCode) = 0;
virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) = 0;
+ HttpSysTransaction& Transaction() { return m_Transaction; }
- HttpSysTransaction& Transaction() { return m_Request; }
+ HttpSysRequestHandler(const HttpSysRequestHandler&) = delete;
+ HttpSysRequestHandler& operator=(const HttpSysRequestHandler&) = delete;
private:
- HttpSysTransaction& m_Request; // Related HTTP transaction object
+ HttpSysTransaction& m_Transaction;
};
/**
@@ -184,12 +186,16 @@ public:
virtual void WriteResponse(HttpResponseCode ResponseCode) override;
virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span<IoBuffer> Blobs) override;
virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) override;
+ virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) override;
using HttpServerRequest::WriteResponse;
- HttpSysTransaction& m_HttpTx;
- HttpMessageResponseRequest* m_Response = nullptr; // TODO: make this more general
- IoBuffer m_PayloadBuffer;
+ HttpSysServerRequest(const HttpSysServerRequest&) = delete;
+ HttpSysServerRequest& operator=(const HttpSysServerRequest&) = delete;
+
+ HttpSysTransaction& m_HttpTx;
+ HttpSysRequestHandler* m_NextCompletionHandler = nullptr;
+ IoBuffer m_PayloadBuffer;
};
/** HTTP transaction
@@ -218,7 +224,9 @@ public:
ULONG_PTR NumberOfBytesTransferred,
PTP_IO Io);
- void IssueInitialRequest(std::error_code& ErrorCode);
+ void IssueInitialRequest(std::error_code& ErrorCode);
+ bool IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler);
+
PTP_IO Iocp();
HANDLE RequestQueueHandle();
inline OVERLAPPED* Overlapped() { return &m_HttpOverlapped; }
@@ -227,6 +235,8 @@ public:
HttpSysServerRequest& InvokeRequestHandler(HttpService& Service, IoBuffer Payload);
+ HttpSysServerRequest& ServerRequest() { return m_HandlerRequest.value(); }
+
private:
OVERLAPPED m_HttpOverlapped{};
HttpSysServer& m_HttpServer;
@@ -239,8 +249,6 @@ private:
Ref<IHttpPackageHandler> m_PackageHandler;
};
-//////////////////////////////////////////////////////////////////////////
-
/**
* @brief HTTP request response I/O request handler
*
@@ -588,6 +596,108 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode)
}
}
+/** HTTP completion handler for async work
+
+ This is used to allow work to be taken off the request handler threads
+ and to support posting responses asynchronously.
+ */
+
+class HttpAsyncWorkRequest : public HttpSysRequestHandler
+{
+public:
+ HttpAsyncWorkRequest(HttpSysTransaction& Tx, std::function<void(HttpServerRequest&)>&& Response);
+ ~HttpAsyncWorkRequest();
+
+ virtual void IssueRequest(std::error_code& ErrorCode) override final;
+ virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) override;
+
+private:
+ struct AsyncWorkItem : public IWork
+ {
+ virtual void Execute() override;
+
+ AsyncWorkItem(HttpSysTransaction& InTx, std::function<void(HttpServerRequest&)>&& InHandler)
+ : Tx(InTx)
+ , Handler(std::move(InHandler))
+ {
+ }
+
+ HttpSysTransaction& Tx;
+ std::function<void(HttpServerRequest&)> Handler;
+ };
+
+ Ref<AsyncWorkItem> m_WorkItem;
+};
+
+HttpAsyncWorkRequest::HttpAsyncWorkRequest(HttpSysTransaction& Tx, std::function<void(HttpServerRequest&)>&& Response)
+: HttpSysRequestHandler(Tx)
+{
+ m_WorkItem = new AsyncWorkItem(Tx, std::move(Response));
+}
+
+HttpAsyncWorkRequest::~HttpAsyncWorkRequest()
+{
+}
+
+void
+HttpAsyncWorkRequest::IssueRequest(std::error_code& ErrorCode)
+{
+ ErrorCode.clear();
+
+ Transaction().Server().WorkPool().ScheduleWork(m_WorkItem);
+}
+
+HttpSysRequestHandler*
+HttpAsyncWorkRequest::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred)
+{
+ // This ought to not be called since there should be no outstanding I/O request
+ // when this completion handler is active
+
+ ZEN_UNUSED(IoResult, NumberOfBytesTransferred);
+
+ ZEN_WARN("Unexpected I/O completion during async work! IoResult: {}, NumberOfBytesTransferred: {}", IoResult, NumberOfBytesTransferred);
+
+ return this;
+}
+
+void
+HttpAsyncWorkRequest::AsyncWorkItem::Execute()
+{
+ using namespace fmt::literals;
+
+ try
+ {
+ HttpSysServerRequest& ThisRequest = Tx.ServerRequest();
+
+ ThisRequest.m_NextCompletionHandler = nullptr;
+
+ Handler(ThisRequest);
+
+ // TODO: should Handler be destroyed at this point to ensure there
+ // are no outstanding references into state which could be
+ // deleted asynchronously as a result of issuing the response?
+
+ if (HttpSysRequestHandler* NextHandler = ThisRequest.m_NextCompletionHandler)
+ {
+ return (void)Tx.IssueNextRequest(NextHandler);
+ }
+ else if (!ThisRequest.IsHandled())
+ {
+ return (void)Tx.IssueNextRequest(new HttpMessageResponseRequest(Tx, 404, "Not found"sv));
+ }
+ else
+ {
+ // "Handled" but no request handler? Shouldn't ever happen
+ return (void)Tx.IssueNextRequest(
+ new HttpMessageResponseRequest(Tx, 500, "Response generated but no request handler scheduled"sv));
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ return (void)Tx.IssueNextRequest(new HttpMessageResponseRequest(Tx, 500, "Exception thrown in async work: '{}'"_format(Ex.what())));
+ }
+}
+
/**
_________
/ _____/ ______________ __ ___________
@@ -597,10 +707,11 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode)
\/ \/ \/
*/
-HttpSysServer::HttpSysServer(unsigned int ThreadCount)
+HttpSysServer::HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThreadCount)
: m_Log(logging::Get("http"))
, m_RequestLog(logging::Get("http_requests"))
, m_ThreadPool(ThreadCount)
+, m_AsyncWorkPool(AsyncWorkThreadCount)
{
ULONG Result = HttpInitialize(HTTPAPI_VERSION_2, HTTP_INITIALIZE_SERVER, nullptr);
@@ -611,6 +722,8 @@ HttpSysServer::HttpSysServer(unsigned int ThreadCount)
m_IsHttpInitialized = true;
m_IsOk = true;
+
+ ZEN_INFO("http.sys server started, using {} I/O threads and {} async worker threads", ThreadCount, AsyncWorkThreadCount);
}
HttpSysServer::~HttpSysServer()
@@ -915,6 +1028,47 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance,
}
}
+bool
+HttpSysTransaction::IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler)
+{
+ HttpSysRequestHandler* CurrentHandler = m_CompletionHandler;
+ m_CompletionHandler = NewCompletionHandler;
+
+ auto _ = MakeGuard([this, CurrentHandler] {
+ if ((CurrentHandler != &m_InitialHttpHandler) && (CurrentHandler != m_CompletionHandler))
+ {
+ delete CurrentHandler;
+ }
+ });
+
+ if (NewCompletionHandler == nullptr)
+ {
+ return false;
+ }
+
+ try
+ {
+ std::error_code ErrorCode;
+ m_CompletionHandler->IssueRequest(ErrorCode);
+
+ if (!ErrorCode)
+ {
+ return true;
+ }
+
+ ZEN_ERROR("IssueRequest() failed: '{}'", ErrorCode.message());
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("exception caught in IssueNextRequest(): '{}'", Ex.what());
+ }
+
+ // something went wrong, no request is pending
+ m_CompletionHandler = nullptr;
+
+ return false;
+}
+
HttpSysTransaction::Status
HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred)
{
@@ -934,38 +1088,9 @@ HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTran
m_HttpServer.OnHandlingRequest();
}
- m_CompletionHandler = CurrentHandler->HandleCompletion(IoResult, NumberOfBytesTransferred);
+ auto NewCompletionHandler = CurrentHandler->HandleCompletion(IoResult, NumberOfBytesTransferred);
- if (m_CompletionHandler)
- {
- try
- {
- std::error_code ErrorCode;
- m_CompletionHandler->IssueRequest(ErrorCode);
-
- if (ErrorCode)
- {
- ZEN_ERROR("IssueRequest() failed {}", ErrorCode.message());
- }
- else
- {
- IsRequestPending = true;
- }
- }
- catch (std::exception& Ex)
- {
- ZEN_ERROR("exception caught from IssueRequest(): {}", Ex.what());
-
- // something went wrong, no request is pending
- }
- }
- else
- {
- if (CurrentHandler != &m_InitialHttpHandler)
- {
- delete CurrentHandler;
- }
- }
+ IsRequestPending = IssueNextRequest(NewCompletionHandler);
}
// Ensure new requests are enqueued as necessary
@@ -1086,23 +1211,48 @@ HttpSysServerRequest::HttpSysServerRequest(HttpSysTransaction& Tx, HttpService&
const int PrefixLength = Service.UriPrefixLength();
const int AbsPathLength = HttpRequestPtr->CookedUrl.AbsPathLength / sizeof(char16_t);
+ HttpContentType AcceptContentType = HttpContentType::kUnknownContentType;
+
if (AbsPathLength >= PrefixLength)
{
// We convert the URI immediately because most of the code involved prefers to deal
- // with utf8. This has some performance impact which I'd prefer to avoid but for now
- // we just have to live with it
+ // with utf8. This is overhead which I'd prefer to avoid but for now we just have
+ // to live with it
WideToUtf8({(char16_t*)HttpRequestPtr->CookedUrl.pAbsPath + PrefixLength, gsl::narrow<size_t>(AbsPathLength - PrefixLength)},
m_UriUtf8);
+
+ std::string_view UriSuffix8{m_UriUtf8};
+
+ const size_t LastComponentIndex = UriSuffix8.find_last_of('/');
+
+ if (LastComponentIndex != std::string_view::npos)
+ {
+ UriSuffix8.remove_prefix(LastComponentIndex);
+ }
+
+ const size_t LastDotIndex = UriSuffix8.find_last_of('.');
+
+ if (LastDotIndex != std::string_view::npos)
+ {
+ UriSuffix8.remove_prefix(LastDotIndex + 1);
+
+ AcceptContentType = ParseContentType(UriSuffix8);
+
+ if (AcceptContentType != HttpContentType::kUnknownContentType)
+ {
+ m_UriUtf8.RemoveSuffix(uint32_t(m_UriUtf8.Size() - LastComponentIndex - LastDotIndex - 1));
+ }
+ }
}
else
{
m_UriUtf8.Reset();
}
- if (auto QueryStringLength = HttpRequestPtr->CookedUrl.QueryStringLength)
+ if (uint16_t QueryStringLength = HttpRequestPtr->CookedUrl.QueryStringLength)
{
- --QueryStringLength;
+ --QueryStringLength; // We skip the leading question mark
WideToUtf8({(char16_t*)(HttpRequestPtr->CookedUrl.pQueryString) + 1, QueryStringLength / sizeof(char16_t)}, m_QueryStringUtf8);
}
@@ -1114,7 +1264,23 @@ HttpSysServerRequest::HttpSysServerRequest(HttpSysTransaction& Tx, HttpService&
m_Verb = TranslateHttpVerb(HttpRequestPtr->Verb);
m_ContentLength = GetContentLength(HttpRequestPtr);
m_ContentType = GetContentType(HttpRequestPtr);
- m_AcceptType = GetAcceptType(HttpRequestPtr);
+
+ // It an explicit content type extension was specified then we'll use that over any
+ // Accept: header value that may be present
+
+ if (AcceptContentType != HttpContentType::kUnknownContentType)
+ {
+ m_AcceptType = AcceptContentType;
+ }
+ else
+ {
+ m_AcceptType = GetAcceptType(HttpRequestPtr);
+ }
+
+ if (m_Verb == HttpVerb::kHead)
+ {
+ SetSuppressResponseBody();
+ }
}
Oid
@@ -1172,13 +1338,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode)
{
ZEN_ASSERT(IsHandled() == false);
- m_Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode);
+ auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode);
if (SuppressBody())
{
- m_Response->SuppressResponseBody();
+ Response->SuppressResponseBody();
}
+ m_NextCompletionHandler = Response;
+
SetIsHandled();
}
@@ -1187,13 +1355,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy
{
ZEN_ASSERT(IsHandled() == false);
- m_Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs);
+ auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs);
if (SuppressBody())
{
- m_Response->SuppressResponseBody();
+ Response->SuppressResponseBody();
}
+ m_NextCompletionHandler = Response;
+
SetIsHandled();
}
@@ -1202,17 +1372,32 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy
{
ZEN_ASSERT(IsHandled() == false);
- m_Response =
+ auto Response =
new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, ResponseString.data(), ResponseString.size());
if (SuppressBody())
{
- m_Response->SuppressResponseBody();
+ Response->SuppressResponseBody();
}
+ m_NextCompletionHandler = Response;
+
SetIsHandled();
}
+void
+HttpSysServerRequest::WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler)
+{
+ if (m_HttpTx.Server().IsAsyncResponseEnabled())
+ {
+ ContinuationHandler(m_HttpTx.ServerRequest());
+ }
+ else
+ {
+ m_NextCompletionHandler = new HttpAsyncWorkRequest(m_HttpTx, std::move(ContinuationHandler));
+ }
+}
+
//////////////////////////////////////////////////////////////////////////
InitialRequestHandler::InitialRequestHandler(HttpSysTransaction& InRequest) : HttpSysRequestHandler(InRequest)
@@ -1411,14 +1596,14 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT
HttpSysServerRequest& ThisRequest = Transaction().InvokeRequestHandler(*Service, m_PayloadBuffer);
- if (!ThisRequest.IsHandled())
+ if (HttpSysRequestHandler* Response = ThisRequest.m_NextCompletionHandler)
{
- return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv);
+ return Response;
}
- if (HttpMessageResponseRequest* Response = ThisRequest.m_Response)
+ if (!ThisRequest.IsHandled())
{
- return Response;
+ return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv);
}
}
@@ -1462,4 +1647,4 @@ HttpSysServer::RegisterService(HttpService& Service)
}
} // namespace zen
-#endif \ No newline at end of file
+#endif
diff --git a/zenhttp/httpsys.h b/zenhttp/httpsys.h
index a8395b283..46ba122cc 100644
--- a/zenhttp/httpsys.h
+++ b/zenhttp/httpsys.h
@@ -16,6 +16,7 @@
# define _WINSOCKAPI_
# include <zencore/windows.h>
# include "iothreadpool.h"
+# include "workthreadpool.h"
# include <http.h>
@@ -35,7 +36,7 @@ class HttpSysServer : public HttpServer
friend class HttpSysTransaction;
public:
- explicit HttpSysServer(unsigned int ThreadCount);
+ explicit HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThreadCount);
~HttpSysServer();
// HttpServer interface implementation
@@ -45,6 +46,11 @@ public:
virtual void RequestExit() override;
virtual void RegisterService(HttpService& Service) override;
+ WorkerThreadPool& WorkPool() { return m_AsyncWorkPool; }
+
+ inline bool IsOk() const { return m_IsOk; }
+ inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; }
+
private:
void Initialize(const wchar_t* UrlPath);
void Cleanup();
@@ -53,8 +59,6 @@ private:
void OnHandlingRequest();
void IssueNewRequestMaybe();
- inline bool IsOk() const { return m_IsOk; }
-
void RegisterService(const char* Endpoint, HttpService& Service);
void UnregisterService(const char* Endpoint, HttpService& Service);
@@ -63,10 +67,13 @@ private:
spdlog::logger& m_RequestLog;
spdlog::logger& Log() { return m_Log; }
- bool m_IsOk = false;
- bool m_IsHttpInitialized = false;
- bool m_IsRequestLoggingEnabled = false;
- WinIoThreadPool m_ThreadPool;
+ bool m_IsOk = false;
+ bool m_IsHttpInitialized = false;
+ bool m_IsRequestLoggingEnabled = false;
+ bool m_IsAsyncResponseEnabled = true;
+
+ WinIoThreadPool m_ThreadPool;
+ WorkerThreadPool m_AsyncWorkPool;
std::wstring m_BaseUri; // http://*:nnnn/
HTTP_SERVER_SESSION_ID m_HttpSessionId = 0;
diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h
index 6a7dc8a70..3e6608f11 100644
--- a/zenhttp/include/zenhttp/httpserver.h
+++ b/zenhttp/include/zenhttp/httpserver.h
@@ -97,6 +97,8 @@ public:
void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::string_view ResponseString);
void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, IoBuffer Blob);
+ virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) = 0;
+
protected:
enum
{
diff --git a/zenhttp/iothreadpool.cpp b/zenhttp/iothreadpool.cpp
index 4f1a6642b..6087e69ec 100644
--- a/zenhttp/iothreadpool.cpp
+++ b/zenhttp/iothreadpool.cpp
@@ -4,6 +4,8 @@
#include <zencore/except.h>
+#if ZEN_PLATFORM_WINDOWS
+
namespace zen {
WinIoThreadPool::WinIoThreadPool(int InThreadCount)
@@ -32,6 +34,8 @@ WinIoThreadPool::~WinIoThreadPool()
void
WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode)
{
+ ZEN_ASSERT(!m_ThreadPoolIo);
+
m_ThreadPoolIo = CreateThreadpoolIo(IoHandle, Callback, Context, &m_CallbackEnvironment);
if (!m_ThreadPoolIo)
@@ -41,3 +45,5 @@ WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, voi
}
} // namespace zen
+
+#endif
diff --git a/zenhttp/iothreadpool.h b/zenhttp/iothreadpool.h
index 4418b940b..8333964c3 100644
--- a/zenhttp/iothreadpool.h
+++ b/zenhttp/iothreadpool.h
@@ -2,9 +2,12 @@
#pragma once
-#include <zencore/windows.h>
+#include <zencore/zencore.h>
-#include <system_error>
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+
+# include <system_error>
namespace zen {
@@ -31,3 +34,4 @@ private:
};
} // namespace zen
+#endif
diff --git a/zenhttp/workthreadpool.cpp b/zenhttp/workthreadpool.cpp
new file mode 100644
index 000000000..41eaaae94
--- /dev/null
+++ b/zenhttp/workthreadpool.cpp
@@ -0,0 +1,77 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "workthreadpool.h"
+
+#include <zencore/logging.h>
+
+namespace zen {
+
+namespace detail {
+ struct LambdaWork : IWork
+ {
+ LambdaWork(auto Work) : WorkFunction(Work) {}
+ virtual void Execute() override { WorkFunction(); }
+
+ std::function<void()> WorkFunction;
+ };
+} // namespace detail
+
+WorkerThreadPool::WorkerThreadPool(int InThreadCount)
+{
+ for (int i = 0; i < InThreadCount; ++i)
+ {
+ m_WorkerThreads.emplace_back(&WorkerThreadPool::WorkerThreadFunction, this);
+ }
+}
+
+WorkerThreadPool::~WorkerThreadPool()
+{
+ m_WorkQueue.CompleteAdding();
+
+ for (std::thread& Thread : m_WorkerThreads)
+ {
+ Thread.join();
+ }
+
+ m_WorkerThreads.clear();
+}
+
+void
+WorkerThreadPool::ScheduleWork(Ref<IWork> Work)
+{
+ m_WorkQueue.Enqueue(std::move(Work));
+}
+
+void
+WorkerThreadPool::ScheduleWork(std::function<void()>&& Work)
+{
+ m_WorkQueue.Enqueue(new detail::LambdaWork(Work));
+}
+
+void
+WorkerThreadPool::WorkerThreadFunction()
+{
+ do
+ {
+ Ref<IWork> Work;
+ if (m_WorkQueue.WaitAndDequeue(Work))
+ {
+ try
+ {
+ Work->Execute();
+ }
+ catch (std::exception& e)
+ {
+ Work->m_Exception = std::current_exception();
+
+ ZEN_WARN("Caught exception in worker thread: {}", e.what());
+ }
+ }
+ else
+ {
+ return;
+ }
+ } while (true);
+}
+
+} // namespace zen \ No newline at end of file
diff --git a/zenhttp/workthreadpool.h b/zenhttp/workthreadpool.h
new file mode 100644
index 000000000..6581cc08f
--- /dev/null
+++ b/zenhttp/workthreadpool.h
@@ -0,0 +1,47 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#include <zencore/blockingqueue.h>
+#include <zencore/refcount.h>
+#include <zencore/windows.h>
+
+#include <exception>
+#include <functional>
+#include <system_error>
+#include <thread>
+#include <vector>
+
+namespace zen {
+
+struct IWork : public RefCounted
+{
+ virtual void Execute() = 0;
+
+ inline std::exception_ptr GetException() { return m_Exception; }
+
+private:
+ std::exception_ptr m_Exception;
+
+ friend class WorkerThreadPool;
+};
+
+class WorkerThreadPool
+{
+public:
+ WorkerThreadPool(int InThreadCount);
+ ~WorkerThreadPool();
+
+ void ScheduleWork(Ref<IWork> Work);
+ void ScheduleWork(std::function<void()>&& Work);
+
+ void WorkerThreadFunction();
+
+private:
+ std::vector<std::thread> m_WorkerThreads;
+ BlockingQueue<Ref<IWork>> m_WorkQueue;
+};
+
+} // namespace zen
diff --git a/zenhttp/zenhttp.vcxproj b/zenhttp/zenhttp.vcxproj
index 899cf4bd1..1fc64bfc2 100644
--- a/zenhttp/zenhttp.vcxproj
+++ b/zenhttp/zenhttp.vcxproj
@@ -100,6 +100,7 @@
<ClCompile Include="httpsys.cpp" />
<ClCompile Include="httpuws.cpp" />
<ClCompile Include="iothreadpool.cpp" />
+ <ClCompile Include="workthreadpool.cpp" />
<ClCompile Include="zenhttp.cpp" />
</ItemGroup>
<ItemGroup>
@@ -112,6 +113,7 @@
<ClInclude Include="include\zenhttp\httpshared.h" />
<ClInclude Include="include\zenhttp\zenhttp.h" />
<ClInclude Include="iothreadpool.h" />
+ <ClInclude Include="workthreadpool.h" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\zencore\zencore.vcxproj">
diff --git a/zenhttp/zenhttp.vcxproj.filters b/zenhttp/zenhttp.vcxproj.filters
index 2e968055c..e57e7a712 100644
--- a/zenhttp/zenhttp.vcxproj.filters
+++ b/zenhttp/zenhttp.vcxproj.filters
@@ -9,6 +9,7 @@
<ClCompile Include="httpuws.cpp" />
<ClCompile Include="httpshared.cpp" />
<ClCompile Include="zenhttp.cpp" />
+ <ClCompile Include="workthreadpool.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="include\zenhttp\httpclient.h" />
@@ -20,6 +21,7 @@
<ClInclude Include="httpuws.h" />
<ClInclude Include="include\zenhttp\httpcommon.h" />
<ClInclude Include="include\zenhttp\httpshared.h" />
+ <ClInclude Include="workthreadpool.h" />
</ItemGroup>
<ItemGroup>
<None Include="xmake.lua" />
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 0e5e73ffc..fe21aa834 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -1448,16 +1448,21 @@ TEST_CASE("zcache.policy")
return Buf;
};
- auto GeneratePackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage {
+ auto GeneratePackage = [](zen::IoHash& OutRecordKey, zen::IoHash& OutAttachmentKey) -> zen::CbPackage {
auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9}));
auto CompressedData = zen::CompressedBuffer::Compress(Data);
OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash());
- zen::CbWriter Obj;
- Obj.BeginObject("obj"sv);
- Obj.AddBinaryAttachment("data", OutAttachmentKey);
- Obj.EndObject();
+
+ zen::CbWriter Writer;
+ Writer.BeginObject("obj"sv);
+ Writer.AddBinaryAttachment("data", OutAttachmentKey);
+ Writer.EndObject();
+ CbObject CacheRecord = Writer.Save().AsObject();
+
+ OutRecordKey = IoHash::HashBuffer(CacheRecord.GetBuffer().GetView());
+
zen::CbPackage Package;
- Package.SetObject(Obj.Save().AsObject());
+ Package.SetObject(CacheRecord);
Package.AddAttachment(zen::CbAttachment(CompressedData));
return Package;
@@ -1587,7 +1592,8 @@ TEST_CASE("zcache.policy")
LocalCfg.Spawn(LocalInst);
zen::IoHash Key;
- zen::CbPackage Package = GeneratePackage(Key);
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
auto Buf = ToBuffer(Package);
// Store package upstream
@@ -1623,7 +1629,8 @@ TEST_CASE("zcache.policy")
LocalCfg.Spawn(LocalInst);
zen::IoHash Key;
- zen::CbPackage Package = GeneratePackage(Key);
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
auto Buf = ToBuffer(Package);
// Store packge locally
@@ -1659,7 +1666,8 @@ TEST_CASE("zcache.policy")
LocalCfg.Spawn(LocalInst);
zen::IoHash Key;
- zen::CbPackage Package = GeneratePackage(Key);
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
auto Buf = ToBuffer(Package);
// Store package locally and upstream
@@ -1692,7 +1700,8 @@ TEST_CASE("zcache.policy")
LocalCfg.Spawn(LocalInst);
zen::IoHash Key;
- zen::CbPackage Package = GeneratePackage(Key);
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
auto Buf = ToBuffer(Package);
// Store package locally
@@ -1748,7 +1757,8 @@ TEST_CASE("zcache.policy")
LocalCfg.Spawn(LocalInst);
zen::IoHash Key;
- zen::CbPackage Package = GeneratePackage(Key);
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
auto Buf = ToBuffer(Package);
// Store package upstream
@@ -1791,6 +1801,80 @@ TEST_CASE("zcache.policy")
CHECK(Package.GetAttachments().size() != 0);
}
}
+
+ SUBCASE("skip - 'data' returns empty cache record/payload")
+ {
+ ZenConfig Cfg = ZenConfig::New();
+ ZenServerInstance Instance(TestEnv);
+ const auto Bucket = "test"sv;
+
+ Cfg.Spawn(Instance);
+
+ zen::IoHash Key;
+ zen::IoHash PayloadId;
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
+ auto Buf = ToBuffer(Package);
+
+ // Store package
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(Cfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ // Get package
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?skip=data"_format(Cfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(Result.status_code == 200);
+ CHECK(Result.text.size() == 0);
+ }
+
+ // Get record
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?skip=data"_format(Cfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/x-ue-cbobject"}});
+ CHECK(Result.status_code == 200);
+ CHECK(Result.text.size() == 0);
+ }
+
+ // Get payload
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}/{}?skip=data"_format(Cfg.BaseUri, Bucket, Key, PayloadId)},
+ cpr::Header{{"Accept", "application/x-ue-comp"}});
+ CHECK(Result.status_code == 200);
+ CHECK(Result.text.size() == 0);
+ }
+ }
+
+ SUBCASE("skip - 'data' returns empty binary value")
+ {
+ ZenConfig Cfg = ZenConfig::New();
+ ZenServerInstance Instance(TestEnv);
+ const auto Bucket = "test"sv;
+
+ Cfg.Spawn(Instance);
+
+ zen::IoHash Key;
+ auto BinaryValue = GenerateData(1024, Key);
+
+ // Store binary cache value
+ {
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(Cfg.BaseUri, Bucket, Key)},
+ cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
+ cpr::Header{{"Content-Type", "application/octet-stream"}});
+ CHECK(Result.status_code == 201);
+ }
+
+ // Get package
+ {
+ cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?skip=data"_format(Cfg.BaseUri, Bucket, Key)},
+ cpr::Header{{"Accept", "application/octet-stream"}});
+ CHECK(Result.status_code == 200);
+ CHECK(Result.text.size() == 0);
+ }
+ }
}
struct RemoteExecutionRequest
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index dc96aecae..4a2a3748a 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -12,6 +12,7 @@
#include <zenhttp/httpserver.h>
#include <zenstore/CAS.h>
+#include "monitoring/httpstats.h"
#include "structuredcache.h"
#include "structuredcachestore.h"
#include "upstream/jupiter.h"
@@ -149,13 +150,19 @@ ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
CasStore& InStore,
CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
std::unique_ptr<UpstreamCache> UpstreamCache)
: m_Log(logging::Get("cache"))
, m_CacheStore(InCacheStore)
+, m_StatsService(StatsService)
+, m_StatusService(StatusService)
, m_CasStore(InStore)
, m_CidStore(InCidStore)
, m_UpstreamCache(std::move(UpstreamCache))
{
+ StatsService.RegisterHandler("z$", *this);
+ StatusService.RegisterHandler("z$", *this);
}
HttpStructuredCacheService::~HttpStructuredCacheService()
@@ -200,11 +207,6 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
std::string_view Key = Request.RelativeUri();
- if (Key.empty())
- {
- return HandleStatusRequest(Request);
- }
-
if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); }))
{
// Bucket reference
@@ -270,10 +272,6 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request,
case kHead:
case kGet:
{
- if (Verb == kHead)
- {
- Request.SetSuppressResponseBody();
- }
HandleGetCacheRecord(Request, Ref, Policy);
}
break;
@@ -288,28 +286,104 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request,
void
HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
{
- const ZenContentType AcceptType = Request.AcceptContentType();
+ const ZenContentType AcceptType = Request.AcceptContentType();
+ const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData;
+ const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
+ const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote);
- ZenCacheValue Value;
- bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
- bool InUpstreamCache = false;
+ bool Success = false;
+ ZenCacheValue LocalCacheValue;
- const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote));
+ if (m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, LocalCacheValue))
+ {
+ Success = true;
- if (QueryUpstream)
+ if (!SkipData && AcceptType == ZenContentType::kCbPackage)
+ {
+ CbPackage Package;
+ CbObjectView CacheRecord(LocalCacheValue.Value.Data());
+ uint32_t AttachmentCount = 0;
+ uint32_t ValidCount = 0;
+
+ if (!SkipAttachments)
+ {
+ CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ ValidCount++;
+ }
+ AttachmentCount++;
+ });
+
+ if (ValidCount != AttachmentCount)
+ {
+ Success = false;
+ ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(AcceptType),
+ ValidCount,
+ AttachmentCount);
+ }
+ }
+
+ Package.SetObject(LoadCompactBinaryObject(LocalCacheValue.Value));
+
+ BinaryWriter MemStream;
+ Package.Save(MemStream);
+
+ LocalCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ LocalCacheValue.Value.SetContentType(HttpContentType::kCbPackage);
+ }
+ }
+
+ if (Success)
{
- const ZenContentType CacheRecordType = AcceptType;
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(LocalCacheValue.Value.Size()),
+ ToString(LocalCacheValue.Value.GetContentType()));
+
+ m_CacheStats.HitCount++;
+
+ if (SkipData)
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ return Request.WriteResponse(HttpResponseCode::OK, LocalCacheValue.Value.GetContentType(), LocalCacheValue.Value);
+ }
+ }
+ else if (!QueryUpstream)
+ {
+ ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
+ m_CacheStats.MissCount++;
+ return Request.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ // Issue upstream query asynchronously in order to keep requests flowing without
+ // hogging I/O servicing threads with blocking work
+
+ Request.WriteResponseAsync([this, AcceptType, SkipData, SkipAttachments, Ref](HttpServerRequest& AsyncRequest) {
+ bool Success = false;
+ ZenCacheValue UpstreamCacheValue;
- if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType);
+ metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming);
+
+ if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType);
UpstreamResult.Success)
{
- Value.Value = UpstreamResult.Value;
- Success = true;
- InUpstreamCache = true;
+ Success = true;
+ UpstreamCacheValue.Value = UpstreamResult.Value;
+
+ UpstreamCacheValue.Value.SetContentType(AcceptType);
- if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject)
+ if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject)
{
- if (CacheRecordType == ZenContentType::kCbObject)
+ if (AcceptType == ZenContentType::kCbObject)
{
const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All);
@@ -322,7 +396,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); });
IndexData.EndArray();
- Value.IndexData = IndexData.Save();
+ UpstreamCacheValue.IndexData = IndexData.Save();
}
else
{
@@ -336,19 +410,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (Success)
{
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, UpstreamCacheValue);
}
}
- else
+ else if (AcceptType == ZenContentType::kCbPackage)
{
- ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage);
-
CbPackage Package;
- if (Package.TryLoad(UpstreamResult.Value))
+ if (Package.TryLoad(UpstreamCacheValue.Value))
{
+ CbObject CacheRecord = Package.GetObject();
uint32_t AttachmentCount = 0;
uint32_t ValidCount = 0;
- CbObject CacheRecord = Package.GetObject();
CacheRecord.IterateAttachments([this, &Package, &Ref, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) {
if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
@@ -373,7 +445,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
{
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
- if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments))
+ if (SkipAttachments)
{
CbPackage PackageWithoutAttachments;
PackageWithoutAttachments.SetObject(CacheRecord);
@@ -381,7 +453,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
BinaryWriter MemStream;
PackageWithoutAttachments.Save(MemStream);
- Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ UpstreamCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
}
}
else
@@ -400,86 +472,34 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
}
}
}
- }
-
- if (!Success)
- {
- ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache)
- {
- CbObjectView CacheRecord(Value.Value.Data());
-
- const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All);
- if (ValidationResult != CbValidateError::None)
+ if (Success)
{
- ZEN_WARN("GET - '{}/{}' '{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
- return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv);
- }
-
- const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments);
- uint32_t AttachmentCount = 0;
- uint32_t ValidCount = 0;
- uint64_t AttachmentBytes = 0ull;
-
- CbPackage Package;
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(UpstreamCacheValue.Value.Size()),
+ ToString(UpstreamCacheValue.Value.GetContentType()));
- if (!SkipAttachments)
- {
- CacheRecord.IterateAttachments(
- [this, &Ref, &Package, &AttachmentCount, &ValidCount, &AttachmentBytes](CbFieldView AttachmentHash) {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
- AttachmentBytes += Chunk.Size();
- ValidCount++;
- }
- AttachmentCount++;
- });
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
- if (ValidCount != AttachmentCount)
+ if (SkipData)
{
- ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments",
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType),
- ValidCount,
- AttachmentCount);
-
- return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv);
+ AsyncRequest.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ AsyncRequest.WriteResponse(HttpResponseCode::OK, UpstreamCacheValue.Value.GetContentType(), UpstreamCacheValue.Value);
}
}
-
- Package.SetObject(LoadCompactBinaryObject(Value.Value));
-
- ZEN_DEBUG("HIT - '{}/{}' {} '{}', {} attachments (LOCAL)",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(AttachmentBytes + Value.Value.Size()),
- ToString(HttpContentType::kCbPackage),
- AttachmentCount);
-
- BinaryWriter MemStream;
- Package.Save(MemStream);
-
- IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
-
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response);
- }
- else
- {
- ZEN_DEBUG("HIT - '{}/{}' {} '{}' ({})",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Value.Value.Size()),
- ToString(Value.Value.GetContentType()),
- InUpstreamCache ? "UPSTREAM" : "LOCAL");
-
- Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
- }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
+ m_CacheStats.MissCount++;
+ AsyncRequest.WriteResponse(HttpResponseCode::NotFound);
+ }
+ });
}
void
@@ -668,10 +688,6 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request
case kGet:
{
HandleGetCachePayload(Request, Ref, Policy);
- if (Verb == kHead)
- {
- Request.SetSuppressResponseBody();
- }
}
break;
case kPut:
@@ -712,7 +728,8 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
if (!Payload)
{
- ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId);
+ ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType()));
+ m_CacheStats.MissCount++;
return Request.WriteResponse(HttpResponseCode::NotFound);
}
@@ -724,7 +741,20 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
ToString(Payload.GetContentType()),
InUpstreamCache ? "UPSTREAM" : "LOCAL");
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload);
+ m_CacheStats.HitCount++;
+ if (InUpstreamCache)
+ {
+ m_CacheStats.UpstreamHitCount++;
+ }
+
+ if ((Policy & CachePolicy::SkipData) == CachePolicy::SkipData)
+ {
+ Request.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload);
+ }
}
void
@@ -839,12 +869,25 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
}
void
-HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request)
+HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request)
{
CbObjectWriter Cbo;
- Cbo << "ok" << true;
EmitSnapshot("requests", m_HttpRequests, Cbo);
+ EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo);
+
+ const uint64_t HitCount = m_CacheStats.HitCount;
+ const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount;
+ const uint64_t MissCount = m_CacheStats.MissCount;
+ const uint64_t TotalCount = HitCount + MissCount;
+
+ Cbo.BeginObject("cache");
+ Cbo << "hits" << HitCount << "misses" << MissCount;
+ Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount) * 100.0) : 0.0);
+ Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount;
+ Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) * 100.0 : 0.0);
+ Cbo.EndObject();
+
if (m_UpstreamCache)
{
Cbo.BeginObject("upstream");
@@ -855,4 +898,12 @@ HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request)
Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
+void
+HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
} // namespace zen
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 47fc173e9..ad7253f79 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -5,6 +5,9 @@
#include <zencore/stats.h>
#include <zenhttp/httpserver.h>
+#include "monitoring/httpstats.h"
+#include "monitoring/httpstatus.h"
+
#include <memory>
namespace spdlog {
@@ -27,12 +30,12 @@ enum class CachePolicy : uint8_t;
*
* {BucketId}/{KeyHash}
*
- * Where BucketId is an alphanumeric string, and KeyHash is a 40-character hexadecimal
- * sequence. The hash value may be derived in any number of ways, it's up to the
- * application to pick an approach.
+ * Where BucketId is a lower-case alphanumeric string, and KeyHash is a 40-character
+ * hexadecimal sequence. The hash value may be derived in any number of ways, it's
+ * up to the application to pick an approach.
*
* Values may be structured or unstructured. Structured values are encoded using Unreal
- * Engine's compact binary encoding
+ * Engine's compact binary encoding (see CbObject)
*
* Additionally, attachments may be addressed as:
*
@@ -47,18 +50,19 @@ enum class CachePolicy : uint8_t;
*
*/
-class HttpStructuredCacheService : public zen::HttpService
+class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider
{
public:
HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- zen::CasStore& InCasStore,
- zen::CidStore& InCidStore,
+ CasStore& InCasStore,
+ CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
std::unique_ptr<UpstreamCache> UpstreamCache);
~HttpStructuredCacheService();
virtual const char* BaseUri() const override;
-
- virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleRequest(zen::HttpServerRequest& Request) override;
void Flush();
void Scrub(ScrubContext& Ctx);
@@ -71,6 +75,13 @@ private:
IoHash PayloadId;
};
+ struct CacheStats
+ {
+ std::atomic_uint64_t HitCount{};
+ std::atomic_uint64_t UpstreamHitCount{};
+ std::atomic_uint64_t MissCount{};
+ };
+
[[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef);
void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
@@ -79,16 +90,21 @@ private:
void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
- void HandleStatusRequest(zen::HttpServerRequest& Request);
+ virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
- zen::ZenCacheStore& m_CacheStore;
- zen::CasStore& m_CasStore;
- zen::CidStore& m_CidStore;
+ ZenCacheStore& m_CacheStore;
+ HttpStatsService& m_StatsService;
+ HttpStatusService& m_StatusService;
+ CasStore& m_CasStore;
+ CidStore& m_CidStore;
std::unique_ptr<UpstreamCache> m_UpstreamCache;
uint64_t m_LastScrubTime = 0;
metrics::OperationTiming m_HttpRequests;
+ metrics::OperationTiming m_UpstreamGetRequestTiming;
+ CacheStats m_CacheStats;
};
} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 5e93ebaa9..580446473 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -32,6 +32,8 @@ ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir
{
ZEN_INFO("initializing structured cache at '{}'", RootDir);
CreateDirectories(RootDir);
+
+ m_DiskLayer.DiscoverBuckets();
}
ZenCacheStore::~ZenCacheStore()
@@ -116,6 +118,13 @@ ZenCacheStore::Scrub(ScrubContext& Ctx)
m_DiskLayer.Scrub(Ctx);
m_MemLayer.Scrub(Ctx);
}
+
+void
+ZenCacheStore::GarbageCollect(GcContext& GcCtx)
+{
+ ZEN_UNUSED(GcCtx);
+}
+
//////////////////////////////////////////////////////////////////////////
ZenCacheMemoryLayer::ZenCacheMemoryLayer()
@@ -142,6 +151,10 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa
_.ReleaseNow();
+ // There's a race here. Since the lock is released early to allow
+ // inserts, the bucket delete path could end up deleting the
+ // underlying data structure
+
return Bucket->Get(HashKey, OutValue);
}
@@ -195,13 +208,21 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx)
}
void
+ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx)
+{
+ ZEN_UNUSED(GcCtx);
+}
+
+void
ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
+ RwLock::SharedLockScope _(m_bucketLock);
+
std::vector<IoHash> BadHashes;
for (auto& Kv : m_cacheMap)
{
- if (Kv.first != IoHash::HashBuffer(Kv.second))
+ if (Kv.first != IoHash::HashBuffer(Kv.second.Payload))
{
BadHashes.push_back(Kv.first);
}
@@ -209,10 +230,16 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
if (!BadHashes.empty())
{
- Ctx.ReportBadChunks(BadHashes);
+ Ctx.ReportBadCasChunks(BadHashes);
}
}
+void
+ZenCacheMemoryLayer::CacheBucket::GarbageCollect(GcContext& GcCtx)
+{
+ ZEN_UNUSED(GcCtx);
+}
+
bool
ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -224,18 +251,26 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV
}
else
{
- OutValue.Value = bucketIt->second;
+ BucketValue& Value = bucketIt.value();
+ OutValue.Value = Value.Payload;
+ Value.LastAccess = GetCurrentTimeStamp();
return true;
}
}
+uint64_t
+ZenCacheMemoryLayer::CacheBucket::GetCurrentTimeStamp()
+{
+ return GetLofreqTimerValue();
+}
+
void
ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
RwLock::ExclusiveLockScope _(m_bucketLock);
- m_cacheMap[HashKey] = Value.Value;
+ m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GetCurrentTimeStamp(), .Payload = Value.Value});
}
//////////////////////////////////////////////////////////////////////////
@@ -245,11 +280,17 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue
struct DiskLocation
{
- uint64_t OffsetAndFlags;
- uint32_t Size;
- uint32_t IndexDataSize;
+ inline DiskLocation() = default;
+
+ inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
+ : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
+ , LowerSize(ValueSize & 0xFFFFffff)
+ , IndexDataSize(IndexSize)
+ {
+ }
- static const uint64_t kOffsetMask = 0x00FF'ffFF'ffFF'ffFFull;
+ static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
+ static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull;
static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
@@ -257,6 +298,7 @@ struct DiskLocation
static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
+ inline uint64_t Size() const { return LowerSize; }
inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
inline ZenContentType GetContentType() const
{
@@ -269,6 +311,11 @@ struct DiskLocation
return ContentType;
}
+
+private:
+ uint64_t OffsetAndFlags = 0;
+ uint32_t LowerSize = 0;
+ uint32_t IndexDataSize = 0;
};
struct DiskIndexEntry
@@ -286,7 +333,7 @@ struct ZenCacheDiskLayer::CacheBucket
CacheBucket(CasStore& Cas);
~CacheBucket();
- void OpenOrCreate(std::filesystem::path BucketDir);
+ void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
static bool Delete(std::filesystem::path BucketDir);
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -294,14 +341,15 @@ struct ZenCacheDiskLayer::CacheBucket
void Drop();
void Flush();
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
- inline bool IsOk() const { return m_Ok; }
+ inline bool IsOk() const { return m_IsOk; }
private:
CasStore& m_CasStore;
std::filesystem::path m_BucketDir;
Oid m_BucketId;
- bool m_Ok = false;
+ bool m_IsOk = false;
uint64_t m_LargeObjectThreshold = 64 * 1024;
// These files are used to manage storage of small objects for this bucket
@@ -314,9 +362,19 @@ private:
uint64_t m_WriteCursor = 0;
void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey);
- void PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value);
- bool GetStandaloneCacheValue(const IoHash& HashKey, ZenCacheValue& OutValue, const DiskLocation& Loc);
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
+
+ // These locks are here to avoid contention on file creation, therefore it's sufficient
+ // that we take the same lock for the same hash
+ //
+ // These locks are small and should really be spaced out so they don't share cache lines,
+ // but we don't currently access them at particularly high frequency so it should not be
+ // an issue in practice
+
+ RwLock m_ShardedLocks[256];
+ inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; }
};
ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas)
@@ -341,7 +399,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir)
}
void
-ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
+ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate)
{
CreateDirectories(BucketDir);
@@ -368,17 +426,23 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
{
ManifestFile.Read(&m_BucketId, sizeof(Oid), 0);
- m_Ok = true;
+ m_IsOk = true;
}
- if (!m_Ok)
+ if (!m_IsOk)
{
ManifestFile.Close();
}
}
- if (!m_Ok)
+ if (!m_IsOk)
{
+ if (AllowCreate == false)
+ {
+ // Invalid bucket
+ return;
+ }
+
// No manifest file found, this is a new bucket
ManifestFile.Open(ManifestPath, /* IsCreate */ true, Ec);
@@ -410,13 +474,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
m_SlogFile.Replay([&](const DiskIndexEntry& Record) {
m_Index[Record.Key] = Record.Location;
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size);
+ MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size());
});
m_WriteCursor = (MaxFileOffset + 15) & ~15;
}
- m_Ok = true;
+ m_IsOk = true;
}
void
@@ -437,23 +501,25 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(WideStringBuilderBase& Path, const IoH
bool
ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue)
{
- if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size);
- OutValue.Value.SetContentType(Loc.GetContentType());
-
- return true;
+ return false;
}
- return false;
+ OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size());
+ OutValue.Value.SetContentType(Loc.GetContentType());
+
+ return true;
}
bool
-ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const IoHash& HashKey, ZenCacheValue& OutValue, const DiskLocation& Loc)
+ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue)
{
WideStringBuilder<128> DataFilePath;
BuildPath(DataFilePath, HashKey);
+ RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
+
if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str()))
{
OutValue.Value = Data;
@@ -468,7 +534,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const IoHash& HashKey, Z
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
- if (!m_Ok)
+ if (!m_IsOk)
{
return false;
}
@@ -486,7 +552,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
_.ReleaseNow();
- return GetStandaloneCacheValue(HashKey, OutValue, Loc);
+ return GetStandaloneCacheValue(Loc, HashKey, OutValue);
}
return false;
@@ -495,14 +561,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
void
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
- if (!m_Ok)
+ if (!m_IsOk)
{
return;
}
if (Value.Value.Size() >= m_LargeObjectThreshold)
{
- return PutLargeObject(HashKey, Value);
+ return PutStandaloneCacheValue(HashKey, Value);
}
else
{
@@ -517,10 +583,9 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
RwLock::ExclusiveLockScope _(m_IndexLock);
- DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags),
- .Size = gsl::narrow<uint32_t>(Value.Value.Size())};
+ DiskLocation Loc(m_WriteCursor, Value.Value.Size(), 0, EntryFlags);
- m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size, 16);
+ m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size(), 16);
if (auto it = m_Index.find(HashKey); it == m_Index.end())
{
@@ -530,11 +595,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
else
{
// TODO: should check if write is idempotent and bail out if it is?
+ // this would requiring comparing contents on disk unless we add a
+ // content hash to the index entry
it.value() = Loc;
}
m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset());
+ m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset());
}
}
@@ -558,61 +625,69 @@ ZenCacheDiskLayer::CacheBucket::Flush()
void
ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
- std::vector<DiskIndexEntry> StandaloneFiles;
+ std::atomic<uint64_t> ScrubbedChunks{0}, ScrubbedBytes{0};
- std::vector<IoHash> BadChunks;
- std::vector<IoBuffer> BadStandaloneChunks;
+ std::vector<IoHash> BadChunks;
{
RwLock::SharedLockScope _(m_IndexLock);
for (auto& Kv : m_Index)
{
- const IoHash& Hash = Kv.first;
- const DiskLocation& Loc = Kv.second;
+ const IoHash& HashKey = Kv.first;
+ const DiskLocation& Loc = Kv.second;
ZenCacheValue Value;
- if (!GetInlineCacheValue(Loc, Value))
+ if (GetInlineCacheValue(Loc, Value))
{
- ZEN_ASSERT(Loc.IsFlagSet(DiskLocation::kStandaloneFile));
- StandaloneFiles.push_back({.Key = Hash, .Location = Loc});
+ // Validate contents
}
else
{
- if (GetStandaloneCacheValue(Hash, Value, Loc))
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- // Hash contents
-
- const IoHash ComputedHash = HashBuffer(Value.Value);
-
- if (ComputedHash != Hash)
+ if (GetStandaloneCacheValue(Loc, HashKey, Value))
{
- BadChunks.push_back(Hash);
+ // Note: we cannot currently validate contents since we don't
+ // have a content hash!
+ }
+ else
+ {
+ // Value not found
+ BadChunks.push_back(HashKey);
}
- }
- else
- {
- // Non-existent
}
}
}
}
- if (Ctx.RunRecovery())
+ Ctx.ReportScrubbed(ScrubbedChunks, ScrubbedBytes);
+
+ if (BadChunks.empty())
{
- // Clean out bad chunks
+ return;
}
- if (!BadChunks.empty())
+ Ctx.ReportBadCasChunks(BadChunks);
+
+ if (Ctx.RunRecovery())
{
- Ctx.ReportBadChunks(BadChunks);
+ // Clean out bad data
}
}
void
-ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheDiskLayer::CacheBucket::GarbageCollect(GcContext& GcCtx)
{
+ ZEN_UNUSED(GcCtx);
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
+
WideStringBuilder<128> DataFilePath;
BuildPath(DataFilePath, HashKey);
@@ -661,7 +736,7 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenC
RwLock::ExclusiveLockScope _(m_IndexLock);
- DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0};
+ DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags);
if (auto it = m_Index.find(HashKey); it == m_Index.end())
{
@@ -719,7 +794,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= std::string(InBucket);
- Bucket->OpenOrCreate(BucketPath.c_str());
+ Bucket->OpenOrCreate(BucketPath);
}
}
@@ -762,7 +837,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
std::filesystem::path bucketPath = m_RootDir;
bucketPath /= std::string(InBucket);
- Bucket->OpenOrCreate(bucketPath.c_str());
+ Bucket->OpenOrCreate(bucketPath);
}
}
@@ -774,6 +849,63 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
}
}
+void
+ZenCacheDiskLayer::DiscoverBuckets()
+{
+ FileSystemTraversal Traversal;
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent,
+ [[maybe_unused]] const path_view& File,
+ [[maybe_unused]] uint64_t FileSize) override
+ {
+ }
+
+ virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override
+ {
+ Dirs.push_back(std::wstring(DirectoryName));
+ return false;
+ }
+
+ std::vector<std::wstring> Dirs;
+ } Visit;
+
+ Traversal.TraverseFileSystem(m_RootDir, Visit);
+
+ // Initialize buckets
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ for (const std::wstring& BucketName : Visit.Dirs)
+ {
+ // New bucket needs to be created
+
+ std::string BucketName8 = WideToUtf8(BucketName);
+
+ if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end())
+ {
+ }
+ else
+ {
+ auto InsertResult = m_Buckets.try_emplace(BucketName8, m_CasStore);
+
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= BucketName8;
+
+ CacheBucket& Bucket = InsertResult.first->second;
+
+ Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false);
+
+ if (!Bucket.IsOk())
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir);
+
+ m_Buckets.erase(InsertResult.first);
+ }
+ }
+ }
+}
+
bool
ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
{
@@ -830,27 +962,10 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
}
}
-//////////////////////////////////////////////////////////////////////////
-
-ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore)
-{
- ZEN_UNUSED(CacheStore);
-}
-
-ZenCacheTracker::~ZenCacheTracker()
-{
-}
-
-void
-ZenCacheTracker::TrackAccess(std::string_view Bucket, const IoHash& HashKey)
-{
- ZEN_UNUSED(Bucket);
- ZEN_UNUSED(HashKey);
-}
-
void
-ZenCacheTracker::Flush()
+ZenCacheDiskLayer::GarbageCollect(GcContext& GcCtx)
{
+ ZEN_UNUSED(GcCtx);
}
} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index f96757409..4753af627 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -46,6 +46,11 @@ struct ZenCacheValue
CbObject IndexData;
};
+/** In-memory cache storage
+
+ Intended for small values which are frequently accessed
+
+ */
class ZenCacheMemoryLayer
{
public:
@@ -56,20 +61,41 @@ public:
void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
bool DropBucket(std::string_view Bucket);
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
+
+ struct Configuration
+ {
+ uint64_t TargetFootprintBytes = 16 * 1024 * 1024;
+ uint64_t ScavengeThreshold = 4 * 1024 * 1024;
+ };
+
+ const Configuration& GetConfiguration() const { return m_Configuration; }
+ void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; }
private:
struct CacheBucket
{
- RwLock m_bucketLock;
- tsl::robin_map<IoHash, IoBuffer> m_cacheMap;
+ struct BucketValue
+ {
+ uint64_t LastAccess = 0;
+ IoBuffer Payload;
+ };
+
+ RwLock m_bucketLock;
+ tsl::robin_map<IoHash, BucketValue> m_cacheMap;
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(const IoHash& HashKey, const ZenCacheValue& Value);
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
+
+ private:
+ uint64_t GetCurrentTimeStamp();
};
RwLock m_Lock;
std::unordered_map<std::string, CacheBucket> m_Buckets;
+ Configuration m_Configuration;
};
class ZenCacheDiskLayer
@@ -83,6 +109,9 @@ public:
bool DropBucket(std::string_view Bucket);
void Flush();
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
+
+ void DiscoverBuckets();
private:
/** A cache bucket manages a single directory containing
@@ -107,6 +136,7 @@ public:
bool DropBucket(std::string_view Bucket);
void Flush();
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
private:
std::filesystem::path m_RootDir;
@@ -116,18 +146,4 @@ private:
uint64_t m_LastScrubTime = 0;
};
-/** Tracks cache entry access, stats and orchestrates cleanup activities
- */
-class ZenCacheTracker
-{
-public:
- ZenCacheTracker(ZenCacheStore& CacheStore);
- ~ZenCacheTracker();
-
- void TrackAccess(std::string_view Bucket, const IoHash& HashKey);
- void Flush();
-
-private:
-};
-
} // namespace zen
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index 254032226..df3259542 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -90,6 +90,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(GlobalOptions.IsTest)->default_value("false"));
options.add_options()("log-id", "Specify id for adding context to log output", cxxopts::value<std::string>(GlobalOptions.LogId));
options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::filesystem::path>(GlobalOptions.DataDir));
+ options.add_options()("content-dir", "Frontend content directory", cxxopts::value<std::filesystem::path>(GlobalOptions.ContentDir));
+ options.add_options()("abslog", "Path to log file", cxxopts::value<std::filesystem::path>(GlobalOptions.AbsLogFile));
options
.add_option("lifetime", "", "owner-pid", "Specify owning process id", cxxopts::value<int>(GlobalOptions.OwnerPid), "<identifier>");
@@ -212,8 +214,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
options.add_option("cache",
"",
"upstream-zen-url",
- "URL to a remote Zen server instance",
- cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.ZenConfig.Url)->default_value(""),
+ "URL to remote Zen server. Use a comma separated list to choose the one with the best latency.",
+ cxxopts::value<std::vector<std::string>>(ServiceConfig.UpstreamCacheConfig.ZenConfig.Urls)->default_value(""),
"");
options.add_option("cache",
@@ -227,7 +229,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
"",
"upstream-stats",
"Collect performance metrics for upstream endpoints",
- cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("true"),
+ cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("false"),
"");
try
@@ -366,9 +368,17 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv
if (auto ZenConfig = UpstreamConfig->get<sol::optional<sol::table>>("zen"))
{
- UpdateStringValueFromConfig(ZenConfig.value(),
- std::string_view("url"),
- ServiceConfig.UpstreamCacheConfig.ZenConfig.Url);
+ if (auto Url = ZenConfig.value().get<sol::optional<std::string>>("url"))
+ {
+ ServiceConfig.UpstreamCacheConfig.ZenConfig.Urls.push_back(Url.value());
+ }
+ else if (auto Urls = ZenConfig.value().get<sol::optional<sol::table>>("url"))
+ {
+ for (const auto& Kv : Urls.value())
+ {
+ ServiceConfig.UpstreamCacheConfig.ZenConfig.Urls.push_back(Kv.second.as<std::string>());
+ }
+ }
}
}
}
diff --git a/zenserver/config.h b/zenserver/config.h
index 75c19d690..405e22739 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -17,6 +17,8 @@ struct ZenServerOptions
bool UninstallService = false; // Flag used to initiate service uninstall (temporary)
std::string LogId; // Id for tagging log output
std::filesystem::path DataDir; // Root directory for state (used for testing)
+ std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
+ std::filesystem::path AbsLogFile;
};
struct ZenUpstreamJupiterConfig
@@ -34,7 +36,7 @@ struct ZenUpstreamJupiterConfig
struct ZenUpstreamZenConfig
{
- std::string Url;
+ std::vector<std::string> Urls;
};
enum class UpstreamCachePolicy : uint8_t
diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp
index bc7b883b5..7a7773cba 100644
--- a/zenserver/diag/logging.cpp
+++ b/zenserver/diag/logging.cpp
@@ -196,7 +196,8 @@ InitializeLogging(const ZenServerOptions& GlobalOptions)
EnableVTMode();
- std::filesystem::path LogPath = GlobalOptions.DataDir / "logs/zenserver.log";
+ std::filesystem::path LogPath =
+ !GlobalOptions.AbsLogFile.empty() ? GlobalOptions.AbsLogFile : GlobalOptions.DataDir / "logs/zenserver.log";
bool IsAsync = true;
spdlog::level::level_enum LogLevel = spdlog::level::info;
@@ -250,7 +251,7 @@ InitializeLogging(const ZenServerOptions& GlobalOptions)
Sinks.push_back(FileSink);
#if ZEN_PLATFORM_WINDOWS
- if (zen::IsDebuggerPresent())
+ if (zen::IsDebuggerPresent() && GlobalOptions.IsDebug)
{
auto DebugSink = std::make_shared<spdlog::sinks::msvc_sink_mt>();
DebugSink->set_level(spdlog::level::debug);
diff --git a/zenserver/experimental/frontend.cpp b/zenserver/experimental/frontend.cpp
new file mode 100644
index 000000000..98d570cfe
--- /dev/null
+++ b/zenserver/experimental/frontend.cpp
@@ -0,0 +1,119 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "frontend.h"
+
+#include <zencore/filesystem.h>
+#include <zencore/string.h>
+
+namespace zen {
+
+namespace html {
+
+ constexpr std::string_view Index = R"(
+<!DOCTYPE html>
+<html>
+<head>
+<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" integrity="sha384-F3w7mX95PdgyTmZZMECAngseQB83DfGTowi0iMjiWaeVhAn4FJkqJByhZMI3AhiU" crossorigin="anonymous">
+<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.min.js" integrity="sha384-skAcpIdS7UcVUC05LJ9Dxay8AXcDYfBJqt1CJ85S/CFujBsIzCIv+l9liuYLaMQ/" crossorigin="anonymous"></script>
+<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/font/bootstrap-icons.css">
+<style type="text/css">
+body {
+ background-color: #fafafa;
+}
+</style>
+<script type="text/javascript">
+ const getCacheStats = () => {
+ const opts = { headers: { "Accept": "application/json" } };
+ fetch("/stats/z$", opts)
+ .then(response => {
+ if (!response.ok) {
+ throw Error(response.statusText);
+ }
+ return response.json();
+ })
+ .then(json => {
+ document.getElementById("status").innerHTML = "connected"
+ document.getElementById("stats").innerHTML = JSON.stringify(json, null, 4);
+ })
+ .catch(error => {
+ document.getElementById("status").innerHTML = "disconnected"
+ document.getElementById("stats").innerHTML = ""
+ console.log(error);
+ })
+ .finally(() => {
+ window.setTimeout(getCacheStats, 1000);
+ });
+ };
+ getCacheStats();
+</script>
+</head>
+<body>
+ <div class="container">
+ <div class="row">
+ <div class="text-center mt-5">
+ <pre>
+__________ _________ __
+\____ / ____ ____ / _____/_/ |_ ____ _______ ____
+ / / _/ __ \ / \ \_____ \ \ __\ / _ \ \_ __ \_/ __ \
+ / /_ \ ___/ | | \ / \ | | ( <_> ) | | \/\ ___/
+/_______ \ \___ >|___| //_______ / |__| \____/ |__| \___ >
+ \/ \/ \/ \/ \/
+ </pre>
+ <pre id="status"/>
+ </div>
+ </div>
+ <div class="row">
+ <pre class="mb-0">Z$:</pre>
+ <pre id="stats"></pre>
+ <div>
+ </div>
+</body>
+</html>
+)";
+
+} // namespace html
+
+HttpFrontendService::HttpFrontendService(std::filesystem::path Directory) : m_Directory(Directory)
+{
+}
+
+HttpFrontendService::~HttpFrontendService()
+{
+}
+
+const char*
+HttpFrontendService::BaseUri() const
+{
+ return "/dashboard"; // in order to use the root path we need to remove HttpAddUrlToUrlGroup in HttpSys.cpp
+}
+
+void
+HttpFrontendService::HandleRequest(zen::HttpServerRequest& Request)
+{
+ using namespace std::literals;
+
+ if (m_Directory.empty())
+ {
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kHTML, html::Index);
+ }
+ else
+ {
+ std::string_view Uri = Request.RelativeUri();
+ std::filesystem::path RelPath{Uri.empty() ? "index.html" : Uri};
+ std::filesystem::path AbsPath = m_Directory / RelPath;
+
+ FileContents File = ReadFile(AbsPath);
+
+ if (!File.ErrorCode)
+ {
+ // TODO: Map file extension to MIME type
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kHTML, File.Data[0]);
+ }
+ else
+ {
+ return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Ooops!"sv);
+ }
+ }
+}
+
+} // namespace zen
diff --git a/zenserver/experimental/frontend.h b/zenserver/experimental/frontend.h
new file mode 100644
index 000000000..2ae20e940
--- /dev/null
+++ b/zenserver/experimental/frontend.h
@@ -0,0 +1,24 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenhttp/httpserver.h>
+
+#include <filesystem>
+
+namespace zen {
+
+class HttpFrontendService final : public zen::HttpService
+{
+public:
+ HttpFrontendService(std::filesystem::path Directory);
+ virtual ~HttpFrontendService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+
+private:
+ std::filesystem::path m_Directory;
+};
+
+} // namespace zen
diff --git a/zenserver/monitoring/httpstats.cpp b/zenserver/monitoring/httpstats.cpp
new file mode 100644
index 000000000..de04294d0
--- /dev/null
+++ b/zenserver/monitoring/httpstats.cpp
@@ -0,0 +1,50 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "httpstats.h"
+
+namespace zen {
+
+HttpStatsService::HttpStatsService() : m_Log(logging::Get("stats"))
+{
+}
+
+HttpStatsService::~HttpStatsService()
+{
+}
+
+const char*
+HttpStatsService::BaseUri() const
+{
+ return "/stats/";
+}
+
+void
+HttpStatsService::RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Providers.insert_or_assign(std::string(Id), &Provider);
+}
+
+void
+HttpStatsService::HandleRequest(HttpServerRequest& Request)
+{
+ using namespace std::literals;
+
+ std::string_view Key = Request.RelativeUri();
+
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kHead:
+ case HttpVerb::kGet:
+ if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers))
+ {
+ return It->second->HandleStatsRequest(Request);
+ }
+
+ [[fallthrough]];
+ default:
+ return;
+ }
+}
+
+} // namespace zen
diff --git a/zenserver/monitoring/httpstats.h b/zenserver/monitoring/httpstats.h
new file mode 100644
index 000000000..1c3c79dd0
--- /dev/null
+++ b/zenserver/monitoring/httpstats.h
@@ -0,0 +1,37 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+#include <zenhttp/httpserver.h>
+
+#include <map>
+
+namespace zen {
+
+struct IHttpStatsProvider
+{
+ virtual void HandleStatsRequest(HttpServerRequest& Request) = 0;
+};
+
+class HttpStatsService : public HttpService
+{
+public:
+ HttpStatsService();
+ ~HttpStatsService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+ void RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider);
+
+private:
+ spdlog::logger& m_Log;
+ HttpRequestRouter m_Router;
+
+ inline spdlog::logger& Log() { return m_Log; }
+
+ RwLock m_Lock;
+ std::map<std::string, IHttpStatsProvider*> m_Providers;
+};
+
+} // namespace zen \ No newline at end of file
diff --git a/zenserver/monitoring/httpstatus.cpp b/zenserver/monitoring/httpstatus.cpp
new file mode 100644
index 000000000..e12662b1c
--- /dev/null
+++ b/zenserver/monitoring/httpstatus.cpp
@@ -0,0 +1,50 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "httpstatus.h"
+
+namespace zen {
+
+HttpStatusService::HttpStatusService() : m_Log(logging::Get("status"))
+{
+}
+
+HttpStatusService::~HttpStatusService()
+{
+}
+
+const char*
+HttpStatusService::BaseUri() const
+{
+ return "/status/";
+}
+
+void
+HttpStatusService::RegisterHandler(std::string_view Id, IHttpStatusProvider& Provider)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Providers.insert_or_assign(std::string(Id), &Provider);
+}
+
+void
+HttpStatusService::HandleRequest(HttpServerRequest& Request)
+{
+ using namespace std::literals;
+
+ std::string_view Key = Request.RelativeUri();
+
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kHead:
+ case HttpVerb::kGet:
+ if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers))
+ {
+ return It->second->HandleStatusRequest(Request);
+ }
+
+ [[fallthrough]];
+ default:
+ return;
+ }
+}
+
+} // namespace zen
diff --git a/zenserver/monitoring/httpstatus.h b/zenserver/monitoring/httpstatus.h
new file mode 100644
index 000000000..8f069f760
--- /dev/null
+++ b/zenserver/monitoring/httpstatus.h
@@ -0,0 +1,37 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+#include <zenhttp/httpserver.h>
+
+#include <map>
+
+namespace zen {
+
+struct IHttpStatusProvider
+{
+ virtual void HandleStatusRequest(HttpServerRequest& Request) = 0;
+};
+
+class HttpStatusService : public HttpService
+{
+public:
+ HttpStatusService();
+ ~HttpStatusService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+ void RegisterHandler(std::string_view Id, IHttpStatusProvider& Provider);
+
+private:
+ spdlog::logger& m_Log;
+ HttpRequestRouter m_Router;
+
+ RwLock m_Lock;
+ std::map<std::string, IHttpStatusProvider*> m_Providers;
+
+ inline spdlog::logger& Log() { return m_Log; }
+};
+
+} // namespace zen \ No newline at end of file
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 7870f9559..5c4983472 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -1200,11 +1200,6 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
}
- if (Verb == HttpVerb::kHead)
- {
- HttpReq.SetSuppressResponseBody();
- }
-
if (IsOffset)
{
if (Offset > Value.Size())
@@ -1425,7 +1420,12 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver))
{
- ZEN_ERROR("Received malformed package!");
+ std::filesystem::path BadPackagePath =
+ Oplog.TempPath() / "bad_packages" / "session{}_request{}"_format(HttpReq.SessionId(), HttpReq.RequestId());
+
+ ZEN_ERROR("Received malformed package! Saving payload to '{}'", BadPackagePath);
+
+ zen::WriteFile(BadPackagePath, Payload);
return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package");
}
diff --git a/zenserver/testing/httptest.cpp b/zenserver/testing/httptest.cpp
index 01866a63b..924546762 100644
--- a/zenserver/testing/httptest.cpp
+++ b/zenserver/testing/httptest.cpp
@@ -4,9 +4,12 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
+#include <zencore/timer.h>
namespace zen {
+using namespace fmt::literals;
+
HttpTestingService::HttpTestingService()
{
m_Router.RegisterRoute(
@@ -15,6 +18,44 @@ HttpTestingService::HttpTestingService()
HttpVerb::kGet);
m_Router.RegisterRoute(
+ "hello_slow",
+ [this](HttpRouterRequest& Req) {
+ Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest& Request) {
+ Stopwatch Timer;
+ Sleep(1000);
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kText,
+ "hello, took me {}"_format(NiceTimeSpanMs(Timer.GetElapsedTimeMs())));
+ });
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "hello_veryslow",
+ [this](HttpRouterRequest& Req) {
+ Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest& Request) {
+ Stopwatch Timer;
+ Sleep(60000);
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kText,
+ "hello, took me {}"_format(NiceTimeSpanMs(Timer.GetElapsedTimeMs())));
+ });
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "hello_throw",
+ [this](HttpRouterRequest& Req) {
+ Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest&) { throw std::runtime_error("intentional error"); });
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "hello_noresponse",
+ [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest&) {}); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"metrics",
[this](HttpRouterRequest& Req) {
metrics::OperationTiming::Scope _(m_TimingStats);
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 9573a1631..1de417008 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -47,9 +47,9 @@ struct CloudCacheAccessToken
struct CloudCacheResult
{
IoBuffer Response;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- int32_t ErrorCode = {};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ int32_t ErrorCode{};
std::string Reason;
bool Success = false;
};
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 03054b542..5b2629f72 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -4,6 +4,7 @@
#include "jupiter.h"
#include "zen.h"
+#include <zencore/blockingqueue.h>
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
@@ -23,7 +24,6 @@
#include <algorithm>
#include <atomic>
-#include <deque>
#include <thread>
#include <unordered_map>
@@ -33,70 +33,6 @@ using namespace std::literals;
namespace detail {
- template<typename T>
- class BlockingQueue
- {
- public:
- BlockingQueue() = default;
-
- ~BlockingQueue() { CompleteAdding(); }
-
- void Enqueue(T&& Item)
- {
- {
- std::lock_guard Lock(m_Lock);
- m_Queue.emplace_back(std::move(Item));
- m_Size++;
- }
-
- m_NewItemSignal.notify_one();
- }
-
- bool WaitAndDequeue(T& Item)
- {
- if (m_CompleteAdding.load())
- {
- return false;
- }
-
- std::unique_lock Lock(m_Lock);
- m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); });
-
- if (!m_Queue.empty())
- {
- Item = std::move(m_Queue.front());
- m_Queue.pop_front();
- m_Size--;
-
- return true;
- }
-
- return false;
- }
-
- void CompleteAdding()
- {
- if (!m_CompleteAdding.load())
- {
- m_CompleteAdding.store(true);
- m_NewItemSignal.notify_all();
- }
- }
-
- std::size_t Size() const
- {
- std::unique_lock Lock(m_Lock);
- return m_Queue.size();
- }
-
- private:
- mutable std::mutex m_Lock;
- std::condition_variable m_NewItemSignal;
- std::deque<T> m_Queue;
- std::atomic_bool m_CompleteAdding{false};
- std::atomic_uint32_t m_Size;
- };
-
class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
@@ -105,12 +41,14 @@ namespace detail {
, m_UseLegacyDdc(Options.UseLegacyDdc)
{
using namespace fmt::literals;
- m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl);
+ m_DisplayName = "Jupiter - '{}'"_format(Options.ServiceUrl);
m_Client = new CloudCacheClient(Options);
}
virtual ~JupiterUpstreamEndpoint() = default;
+ virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); }
+
virtual bool IsHealthy() const override { return m_HealthOk.load(); }
virtual UpstreamEndpointHealth CheckHealth() override
@@ -186,16 +124,23 @@ namespace detail {
}
}
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -206,16 +151,23 @@ namespace detail {
CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -386,22 +338,70 @@ namespace detail {
class ZenUpstreamEndpoint final : public UpstreamEndpoint
{
+ struct ZenEndpoint
+ {
+ std::string Url;
+ std::string Reason;
+ double Latency{};
+ bool Ok = false;
+
+ bool operator<(const ZenEndpoint& RHS) const { return Ok && RHS.Ok ? Latency < RHS.Latency : Ok; }
+ };
+
public:
- ZenUpstreamEndpoint(std::string_view ServiceUrl)
+ ZenUpstreamEndpoint(std::span<std::string const> Urls) : m_Log(zen::logging::Get("upstream")), m_DisplayName("ZEN")
{
- using namespace fmt::literals;
- m_DisplayName = "Zen - {}"_format(ServiceUrl);
- m_Client = new ZenStructuredCacheClient(ServiceUrl);
+ for (const auto& Url : Urls)
+ {
+ m_Endpoints.push_back({.Url = Url});
+ }
}
~ZenUpstreamEndpoint() = default;
+ virtual UpstreamEndpointHealth Initialize() override
+ {
+ using namespace fmt::literals;
+
+ const ZenEndpoint& Ep = GetEndpoint();
+ if (Ep.Ok)
+ {
+ m_ServiceUrl = Ep.Url;
+ m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
+ m_Client = new ZenStructuredCacheClient(m_ServiceUrl);
+
+ m_HealthOk = true;
+ return {.Ok = true};
+ }
+
+ m_HealthOk = false;
+ return {.Reason = Ep.Reason};
+ }
+
virtual bool IsHealthy() const override { return m_HealthOk; }
virtual UpstreamEndpointHealth CheckHealth() override
{
+ using namespace fmt::literals;
+
try
{
+ if (m_Client.IsNull())
+ {
+ const ZenEndpoint& Ep = GetEndpoint();
+ if (Ep.Ok)
+ {
+ m_ServiceUrl = Ep.Url;
+ m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
+ m_Client = new ZenStructuredCacheClient(m_ServiceUrl);
+
+ m_HealthOk = true;
+ return {.Ok = true};
+ }
+
+ return {.Reason = Ep.Reason};
+ }
+
ZenStructuredCacheSession Session(*m_Client);
ZenCacheResult Result;
@@ -429,16 +429,23 @@ namespace detail {
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -450,16 +457,23 @@ namespace detail {
const ZenCacheResult Result =
Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -563,6 +577,42 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
+ const ZenEndpoint& GetEndpoint()
+ {
+ for (ZenEndpoint& Ep : m_Endpoints)
+ {
+ ZenStructuredCacheClient Client(Ep.Url);
+ ZenStructuredCacheSession Session(Client);
+ const int32_t SampleCount = 2;
+
+ Ep.Ok = false;
+ Ep.Latency = {};
+
+ for (int32_t Sample = 0; Sample < SampleCount; ++Sample)
+ {
+ ZenCacheResult Result = Session.CheckHealth();
+ Ep.Ok = Result.Success;
+ Ep.Reason = std::move(Result.Reason);
+ Ep.Latency += Result.ElapsedSeconds;
+ }
+ Ep.Latency /= double(SampleCount);
+ }
+
+ std::sort(std::begin(m_Endpoints), std::end(m_Endpoints));
+
+ for (const auto& Ep : m_Endpoints)
+ {
+ ZEN_INFO("ping ZEN endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason);
+ }
+
+ return m_Endpoints.front();
+ }
+
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
+ std::string m_ServiceUrl;
+ std::vector<ZenEndpoint> m_Endpoints;
std::string m_DisplayName;
RefPtr<ZenStructuredCacheClient> m_Client;
UpstreamEndpointStats m_Stats;
@@ -575,7 +625,7 @@ namespace detail {
struct UpstreamStats
{
- static constexpr uint64_t MaxSampleCount = 100ull;
+ static constexpr uint64_t MaxSampleCount = 1000ull;
UpstreamStats(bool Enabled) : m_Enabled(Enabled) {}
@@ -584,11 +634,6 @@ struct UpstreamStats
const GetUpstreamCacheResult& Result,
const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- if (!m_Enabled)
- {
- return;
- }
-
UpstreamEndpointStats& Stats = Endpoint.Stats();
if (Result.Error)
@@ -606,7 +651,7 @@ struct UpstreamStats
Stats.MissCount++;
}
- if (m_SampleCount++ % MaxSampleCount)
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
{
Dump(Logger, Endpoints);
}
@@ -617,11 +662,6 @@ struct UpstreamStats
const PutUpstreamCacheResult& Result,
const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- if (!m_Enabled)
- {
- return;
- }
-
UpstreamEndpointStats& Stats = Endpoint.Stats();
if (Result.Success)
{
@@ -634,7 +674,7 @@ struct UpstreamStats
Stats.ErrorCount++;
}
- if (m_SampleCount++ % MaxSampleCount)
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
{
Dump(Logger, Endpoints);
}
@@ -693,7 +733,7 @@ public:
{
for (auto& Endpoint : m_Endpoints)
{
- const UpstreamEndpointHealth Health = Endpoint->CheckHealth();
+ const UpstreamEndpointHealth Health = Endpoint->Initialize();
if (Health.Ok)
{
ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName());
@@ -925,7 +965,7 @@ private:
spdlog::logger& Log() { return m_Log; }
- using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>;
+ using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>;
struct RunState
{
@@ -975,9 +1015,9 @@ MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
}
std::unique_ptr<UpstreamEndpoint>
-MakeZenUpstreamEndpoint(std::string_view Url)
+MakeZenUpstreamEndpoint(std::span<std::string const> Urls)
{
- return std::make_unique<detail::ZenUpstreamEndpoint>(Url);
+ return std::make_unique<detail::ZenUpstreamEndpoint>(Urls);
}
} // namespace zen
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index a6b1e9784..edc995da6 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -45,35 +45,29 @@ struct UpstreamCacheOptions
bool StatsEnabled = false;
};
-enum class UpstreamStatusCode : uint8_t
-{
- Ok,
- Error
-};
-
struct UpstreamError
{
- UpstreamStatusCode StatusCode = UpstreamStatusCode::Ok;
- std::string Reason;
+ int32_t ErrorCode{};
+ std::string Reason{};
- explicit operator bool() const { return StatusCode != UpstreamStatusCode::Ok; }
+ explicit operator bool() const { return ErrorCode != 0; }
};
struct GetUpstreamCacheResult
{
IoBuffer Value;
- UpstreamError Error;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- bool Success = false;
+ UpstreamError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
};
struct PutUpstreamCacheResult
{
std::string Reason;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- bool Success = false;
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
};
struct UpstreamEndpointHealth
@@ -84,14 +78,14 @@ struct UpstreamEndpointHealth
struct UpstreamEndpointStats
{
- std::atomic_uint64_t HitCount = {};
- std::atomic_uint64_t MissCount = {};
- std::atomic_uint64_t UpCount = {};
- std::atomic_uint64_t ErrorCount = {};
- std::atomic<double> UpBytes = {};
- std::atomic<double> DownBytes = {};
- std::atomic<double> SecondsUp = {};
- std::atomic<double> SecondsDown = {};
+ std::atomic_uint64_t HitCount{};
+ std::atomic_uint64_t MissCount{};
+ std::atomic_uint64_t UpCount{};
+ std::atomic_uint64_t ErrorCount{};
+ std::atomic<double> UpBytes{};
+ std::atomic<double> DownBytes{};
+ std::atomic<double> SecondsUp{};
+ std::atomic<double> SecondsDown{};
};
/**
@@ -102,6 +96,8 @@ class UpstreamEndpoint
public:
virtual ~UpstreamEndpoint() = default;
+ virtual UpstreamEndpointHealth Initialize() = 0;
+
virtual bool IsHealthy() const = 0;
virtual UpstreamEndpointHealth CheckHealth() = 0;
@@ -149,6 +145,6 @@ std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Opt
std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options);
-std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::string_view Url);
+std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::span<std::string const> Urls);
} // namespace zen
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index c988a6b0b..6141fd397 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -328,7 +328,9 @@ namespace detail {
//////////////////////////////////////////////////////////////////////////
-ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl) : m_ServiceUrl(ServiceUrl)
+ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl)
+: m_Log(logging::Get(std::string_view("zenclient")))
+, m_ServiceUrl(ServiceUrl)
{
}
@@ -369,7 +371,7 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State)
using namespace std::literals;
ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient)
-: m_Log(logging::Get("zenclient"sv))
+: m_Log(OuterClient.Log())
, m_Client(OuterClient)
{
m_SessionState = m_Client.AllocSessionState();
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 158be668a..12e46bd8d 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -138,8 +138,11 @@ public:
std::string_view ServiceUrl() const { return m_ServiceUrl; }
+ inline spdlog::logger& Log() { return m_Log; }
+
private:
- std::string m_ServiceUrl;
+ spdlog::logger& m_Log;
+ std::string m_ServiceUrl;
RwLock m_SessionStateLock;
std::list<detail::ZenCacheSessionState*> m_SessionStateCache;
diff --git a/zenserver/xmake.lua b/zenserver/xmake.lua
index 7a6981fcd..fb1ba651d 100644
--- a/zenserver/xmake.lua
+++ b/zenserver/xmake.lua
@@ -32,3 +32,18 @@ target("zenserver")
add_packages(
"vcpkg::cxxopts",
"vcpkg::mimalloc")
+
+ on_load(function(target)
+ local commit, err = os.iorun("git log -1 --format=\"%h-%cI\"")
+ if commit ~= nil then
+ commit = commit:gsub("%s+", "")
+ commit = commit:gsub("\n", "")
+ if is_mode("release") then
+ commit = "rel-" .. commit
+ else
+ commit = "dbg-" .. commit
+ end
+ target:add("defines","BUILD_VERSION=\"" .. commit .. "\"")
+ print("build version " .. commit)
+ end
+ end)
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index b45df9fef..18c59636d 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -1,5 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include <zencore/compactbinarybuilder.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
@@ -31,6 +32,10 @@
#include <set>
#include <unordered_map>
+#if !defined(BUILD_VERSION)
+# define BUILD_VERSION ("dev-build")
+#endif
+
//////////////////////////////////////////////////////////////////////////
// We don't have any doctest code in this file but this is needed to bring
// in some shared code into the executable
@@ -81,7 +86,10 @@
#include "cache/structuredcachestore.h"
#include "compute/apply.h"
#include "diag/diagsvcs.h"
+#include "experimental/frontend.h"
#include "experimental/usnjournal.h"
+#include "monitoring/httpstats.h"
+#include "monitoring/httpstatus.h"
#include "projectstore.h"
#include "testing/httptest.h"
#include "testing/launch.h"
@@ -95,17 +103,16 @@
namespace zen {
-class ZenServer
-{
- ZenServerState::ZenServerEntry* m_ServerEntry = nullptr;
+using namespace std::literals;
+class ZenServer : public IHttpStatusProvider
+{
public:
void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid, ZenServerState::ZenServerEntry* ServerEntry)
{
- m_ServerEntry = ServerEntry;
using namespace fmt::literals;
- ZEN_INFO(ZEN_APP_NAME " initializing");
+ m_ServerEntry = ServerEntry;
m_DebugOptionForcedCrash = ServiceConfig.ShouldCrash;
if (ParentPid)
@@ -139,6 +146,15 @@ public:
// Ok so now we're configured, let's kick things off
+ m_Http = zen::CreateHttpServer();
+ m_Http->Initialize(BasePort);
+ m_Http->RegisterService(m_HealthService);
+ m_Http->RegisterService(m_StatsService);
+ m_Http->RegisterService(m_StatusService);
+ m_StatusService.RegisterHandler("status", *this);
+
+ // Initialize storage and services
+
ZEN_INFO("initializing storage");
zen::CasStoreConfiguration Config;
@@ -166,95 +182,7 @@ public:
if (ServiceConfig.StructuredCacheEnabled)
{
- using namespace std::literals;
- auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; };
-
- ZEN_INFO("instantiating structured cache service");
- m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
-
- std::unique_ptr<zen::UpstreamCache> UpstreamCache;
- if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
- {
- const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig;
-
- zen::UpstreamCacheOptions UpstreamOptions;
- UpstreamOptions.ReadUpstream =
- (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
- UpstreamOptions.WriteUpstream =
- (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
-
- if (UpstreamConfig.UpstreamThreadCount < 32)
- {
- UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
- }
-
- UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled;
-
- UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
-
- if (!UpstreamConfig.ZenConfig.Url.empty())
- {
- std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Url);
- UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
- }
-
- {
- zen::CloudCacheClientOptions Options;
- if (UpstreamConfig.JupiterConfig.UseProductionSettings)
- {
- Options = zen::CloudCacheClientOptions{
- .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
- .DdcNamespace = "ue.ddc"sv,
- .BlobStoreNamespace = "ue.ddc"sv,
- .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
- .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
- .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
- .UseLegacyDdc = false};
- }
- else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
- {
- Options = zen::CloudCacheClientOptions{
- .ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
- .DdcNamespace = "ue4.ddc"sv,
- .BlobStoreNamespace = "test.ddc"sv,
- .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
- .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
- .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
- .UseLegacyDdc = false};
- }
-
- Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl);
- Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace);
- Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace);
- Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider);
- Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId);
- Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret);
- Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc;
-
- if (!Options.ServiceUrl.empty())
- {
- std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options);
- UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
- }
- }
-
- if (UpstreamCache->Initialize())
- {
- ZEN_INFO("upstream cache active ({})",
- UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE"
- : UpstreamOptions.ReadUpstream ? "READONLY"
- : UpstreamOptions.WriteUpstream ? "WRITEONLY"
- : "DISABLED");
- }
- else
- {
- UpstreamCache.reset();
- ZEN_INFO("NOT using upstream cache");
- }
- }
-
- m_StructuredCacheService.reset(
- new zen::HttpStructuredCacheService(*m_CacheStore, *m_CasStore, *m_CidStore, std::move(UpstreamCache)));
+ InitializeStructuredCache(ServiceConfig);
}
else
{
@@ -272,13 +200,8 @@ public:
}
#endif
- m_Http = zen::CreateHttpServer();
- m_Http->Initialize(BasePort);
- m_Http->RegisterService(m_HealthService);
-
m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics
m_Http->RegisterService(m_TestingService);
-
m_Http->RegisterService(m_AdminService);
if (m_HttpProjectService)
@@ -302,8 +225,17 @@ public:
{
m_Http->RegisterService(*m_HttpFunctionService);
}
+
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot);
+
+ if (m_FrontendService)
+ {
+ m_Http->RegisterService(*m_FrontendService);
+ }
}
+ void InitializeStructuredCache(ZenServiceConfig& ServiceConfig);
+
#if ZEN_ENABLE_MESH
void StartMesh(int BasePort)
{
@@ -344,8 +276,12 @@ public:
const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode;
+ m_CurrentState = kRunning;
+
m_Http->Run(IsInteractiveMode);
+ m_CurrentState = kShuttingDown;
+
ZEN_INFO(ZEN_APP_NAME " exiting");
m_IoContext.stop();
@@ -364,6 +300,7 @@ public:
void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; }
void SetTestMode(bool State) { m_TestMode = State; }
void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; }
+ void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; }
void EnsureIoRunner()
{
@@ -417,6 +354,7 @@ public:
void Scrub()
{
+ Stopwatch Timer;
ZEN_INFO("Storage validation STARTING");
ScrubContext Ctx;
@@ -425,7 +363,13 @@ public:
m_ProjectStore->Scrub(Ctx);
m_StructuredCacheService->Scrub(Ctx);
- ZEN_INFO("Storage validation DONE");
+ const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
+
+ ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})",
+ NiceTimeSpanMs(ElapsedTimeMs),
+ NiceBytes(Ctx.ScrubbedBytes()),
+ Ctx.ScrubbedChunks(),
+ NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs));
}
void Flush()
@@ -443,17 +387,51 @@ public:
m_ProjectStore->Flush();
}
+ virtual void HandleStatusRequest(HttpServerRequest& Request) override
+ {
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Cbo << "state" << ToString(m_CurrentState);
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+
private:
- bool m_IsDedicatedMode = false;
- bool m_TestMode = false;
- std::filesystem::path m_DataRoot;
- std::jthread m_IoRunner;
- asio::io_context m_IoContext;
- asio::steady_timer m_PidCheckTimer{m_IoContext};
- zen::ProcessMonitor m_ProcessMonitor;
- zen::NamedMutex m_ServerMutex;
+ ZenServerState::ZenServerEntry* m_ServerEntry = nullptr;
+ bool m_IsDedicatedMode = false;
+ bool m_TestMode = false;
+ std::filesystem::path m_DataRoot;
+ std::filesystem::path m_ContentRoot;
+ std::jthread m_IoRunner;
+ asio::io_context m_IoContext;
+ asio::steady_timer m_PidCheckTimer{m_IoContext};
+ zen::ProcessMonitor m_ProcessMonitor;
+ zen::NamedMutex m_ServerMutex;
+
+ enum ServerState
+ {
+ kInitializing,
+ kRunning,
+ kShuttingDown
+ } m_CurrentState = kInitializing;
+
+ std::string_view ToString(ServerState Value)
+ {
+ switch (Value)
+ {
+ case kInitializing:
+ return "initializing"sv;
+ case kRunning:
+ return "running"sv;
+ case kShuttingDown:
+ return "shutdown"sv;
+ default:
+ return "unknown"sv;
+ }
+ }
zen::Ref<zen::HttpServer> m_Http;
+ zen::HttpStatusService m_StatusService;
+ zen::HttpStatsService m_StatsService;
std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()};
std::unique_ptr<zen::CidStore> m_CidStore;
std::unique_ptr<zen::ZenCacheStore> m_CacheStore;
@@ -471,10 +449,105 @@ private:
zen::HttpHealthService m_HealthService;
zen::Mesh m_ZenMesh{m_IoContext};
std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
+ std::unique_ptr<zen::HttpFrontendService> m_FrontendService;
bool m_DebugOptionForcedCrash = false;
};
+void
+ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig)
+{
+ using namespace std::literals;
+ auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; };
+
+ ZEN_INFO("instantiating structured cache service");
+ m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
+
+ std::unique_ptr<zen::UpstreamCache> UpstreamCache;
+ if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
+ {
+ const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig;
+
+ zen::UpstreamCacheOptions UpstreamOptions;
+ UpstreamOptions.ReadUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
+ UpstreamOptions.WriteUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
+
+ if (UpstreamConfig.UpstreamThreadCount < 32)
+ {
+ UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
+ }
+
+ UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled;
+
+ UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
+
+ if (!UpstreamConfig.ZenConfig.Urls.empty())
+ {
+ std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls);
+ UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
+ }
+
+ {
+ zen::CloudCacheClientOptions Options;
+ if (UpstreamConfig.JupiterConfig.UseProductionSettings)
+ {
+ Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
+ .DdcNamespace = "ue.ddc"sv,
+ .BlobStoreNamespace = "ue.ddc"sv,
+ .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
+ .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
+ .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
+ .UseLegacyDdc = false};
+ }
+ else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
+ {
+ Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
+ .DdcNamespace = "ue4.ddc"sv,
+ .BlobStoreNamespace = "test.ddc"sv,
+ .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
+ .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
+ .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
+ .UseLegacyDdc = false};
+ }
+
+ Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl);
+ Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace);
+ Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace);
+ Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider);
+ Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId);
+ Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret);
+ Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc;
+
+ if (!Options.ServiceUrl.empty())
+ {
+ std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options);
+ UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
+ }
+ }
+
+ if (UpstreamCache->Initialize())
+ {
+ ZEN_INFO("upstream cache active ({})",
+ UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE"
+ : UpstreamOptions.ReadUpstream ? "READONLY"
+ : UpstreamOptions.WriteUpstream ? "WRITEONLY"
+ : "DISABLED");
+ }
+ else
+ {
+ UpstreamCache.reset();
+ ZEN_INFO("NOT using upstream cache");
+ }
+ }
+
+ m_StructuredCacheService.reset(new zen::HttpStructuredCacheService(*m_CacheStore,
+ *m_CasStore,
+ *m_CidStore,
+ m_StatsService,
+ m_StatusService,
+ std::move(UpstreamCache)));
+}
+
} // namespace zen
class ZenWindowsService : public WindowsService
@@ -522,7 +595,7 @@ ZenWindowsService::Run()
ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig);
- ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort);
+ ZEN_INFO(ZEN_APP_NAME " - starting on port {}, build '{}'", GlobalOptions.BasePort, BUILD_VERSION);
ZenServerState ServerState;
ServerState.Initialize();
@@ -560,6 +633,7 @@ ZenWindowsService::Run()
ZenServer Server;
Server.SetDataRoot(GlobalOptions.DataDir);
+ Server.SetContentRoot(GlobalOptions.ContentDir);
Server.SetTestMode(GlobalOptions.IsTest);
Server.SetDedicatedMode(GlobalOptions.IsDedicated);
Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry);
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index bcb7ea028..7fad477a1 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -108,7 +108,12 @@
<ClInclude Include="cache\structuredcachestore.h" />
<ClInclude Include="compute\apply.h" />
<ClInclude Include="config.h" />
+ <ClInclude Include="diag\formatters.h" />
<ClInclude Include="diag\logging.h" />
+ <ClInclude Include="experimental\frontend.h" />
+ <ClInclude Include="experimental\vfs.h" />
+ <ClInclude Include="monitoring\httpstats.h" />
+ <ClInclude Include="monitoring\httpstatus.h" />
<ClInclude Include="resource.h" />
<ClInclude Include="sos\sos.h" />
<ClInclude Include="testing\httptest.h" />
@@ -132,6 +137,10 @@
<ClCompile Include="compute\apply.cpp" />
<ClCompile Include="config.cpp" />
<ClCompile Include="diag\logging.cpp" />
+ <ClCompile Include="experimental\frontend.cpp" />
+ <ClCompile Include="experimental\vfs.cpp" />
+ <ClCompile Include="monitoring\httpstats.cpp" />
+ <ClCompile Include="monitoring\httpstatus.cpp" />
<ClCompile Include="projectstore.cpp" />
<ClCompile Include="cache\cacheagent.cpp" />
<ClCompile Include="sos\sos.cpp" />
diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters
index 6b99ca8d7..04e639a33 100644
--- a/zenserver/zenserver.vcxproj.filters
+++ b/zenserver/zenserver.vcxproj.filters
@@ -38,6 +38,13 @@
<ClInclude Include="testing\httptest.h" />
<ClInclude Include="windows\service.h" />
<ClInclude Include="resource.h" />
+ <ClInclude Include="experimental\frontend.h">
+ <Filter>experimental</Filter>
+ </ClInclude>
+ <ClInclude Include="diag\formatters.h" />
+ <ClInclude Include="experimental\vfs.h" />
+ <ClInclude Include="monitoring\httpstats.h" />
+ <ClInclude Include="monitoring\httpstatus.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="zenserver.cpp" />
@@ -70,6 +77,13 @@
</ClCompile>
<ClCompile Include="testing\httptest.cpp" />
<ClCompile Include="windows\service.cpp" />
+ <ClCompile Include="admin\admin.cpp" />
+ <ClCompile Include="experimental\frontend.cpp">
+ <Filter>experimental</Filter>
+ </ClCompile>
+ <ClCompile Include="experimental\vfs.cpp" />
+ <ClCompile Include="monitoring\httpstats.cpp" />
+ <ClCompile Include="monitoring\httpstatus.cpp" />
</ItemGroup>
<ItemGroup>
<Filter Include="cache">
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp
index 1db2b50bf..a4bbfa340 100644
--- a/zenstore/CAS.cpp
+++ b/zenstore/CAS.cpp
@@ -32,6 +32,15 @@ CasChunkSet::AddChunkToSet(const IoHash& HashToAdd)
}
void
+CasChunkSet::AddChunksToSet(std::span<const IoHash> HashesToAdd)
+{
+ for (const IoHash& Hash : HashesToAdd)
+ {
+ m_ChunkSet.insert(Hash);
+ }
+}
+
+void
CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate)
{
for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;)
@@ -58,10 +67,45 @@ CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callba
//////////////////////////////////////////////////////////////////////////
+struct GcContext::GcState
+{
+ CasChunkSet m_CasChunks;
+ CasChunkSet m_CidChunks;
+};
+
+GcContext::GcContext() : m_State(std::make_unique<GcState>())
+{
+}
+
+GcContext::~GcContext()
+{
+}
+
+void
+GcContext::ContributeCids(std::span<const IoHash> Cids)
+{
+ m_State->m_CidChunks.AddChunksToSet(Cids);
+}
+
+void
+GcContext::ContributeCas(std::span<const IoHash> Cas)
+{
+ m_State->m_CasChunks.AddChunksToSet(Cas);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks)
+{
+ m_BadCas.AddChunksToSet(BadCasChunks);
+}
+
void
-ScrubContext::ReportBadChunks(std::span<IoHash> BadChunks)
+ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
{
- ZEN_UNUSED(BadChunks);
+ m_ChunkCount.fetch_add(ChunkCount);
+ m_ByteCount.fetch_add(ChunkBytes);
}
/**
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index df5c32d25..7a5d7bcf4 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -204,7 +204,7 @@ struct CidStore::Impl
// TODO: Should compute a snapshot index here
- Ctx.ReportBadChunks(BadChunks);
+ Ctx.ReportBadCasChunks(BadChunks);
}
uint64_t m_LastScrubTime = 0;
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 5fc3ac356..612f87c7c 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -254,7 +254,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
- Ctx.ReportBadChunks(BadChunkHashes);
+ Ctx.ReportBadCasChunks(BadChunkHashes);
}
void
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index 0b18848d5..ee641b80a 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -394,7 +394,8 @@ FileCasStrategy::Flush()
void
FileCasStrategy::Scrub(ScrubContext& Ctx)
{
- std::vector<IoHash> BadHashes;
+ std::vector<IoHash> BadHashes;
+ std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
IterateChunks([&](const IoHash& Hash, BasicFile& Payload) {
IoHashStream Hasher;
@@ -405,8 +406,13 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
{
BadHashes.push_back(Hash);
}
+
+ ++ChunkCount;
+ ChunkBytes.fetch_add(Payload.FileSize());
});
+ Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
+
if (!BadHashes.empty())
{
ZEN_ERROR("file CAS scrubbing: {} bad chunks found", BadHashes.size());
@@ -428,7 +434,9 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
}
}
- Ctx.ReportBadChunks(BadHashes);
+ Ctx.ReportBadCasChunks(BadHashes);
+
+ ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes));
}
void
diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h
index 93454ca6f..d0698df7f 100644
--- a/zenstore/include/zenstore/CAS.h
+++ b/zenstore/include/zenstore/CAS.h
@@ -31,13 +31,41 @@ struct CasStoreConfiguration
uint64_t HugeValueThreshold = 1024 * 1024;
};
+/** Manage a set of IoHash values
+ */
+
+class CasChunkSet
+{
+public:
+ void AddChunkToSet(const IoHash& HashToAdd);
+ void AddChunksToSet(std::span<const IoHash> HashesToAdd);
+ void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate);
+ void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback);
+ inline [[nodiscard]] bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); }
+ inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); }
+ inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); }
+
+private:
+ // Q: should we protect this with a lock, or is that a higher level concern?
+ std::unordered_set<IoHash> m_ChunkSet;
+};
+
/** Garbage Collection context object
*/
class GcContext
{
public:
+ GcContext();
+ ~GcContext();
+
+ void ContributeCids(std::span<const IoHash> Cid);
+ void ContributeCas(std::span<const IoHash> Hash);
+
private:
+ struct GcState;
+
+ std::unique_ptr<GcState> m_State;
};
/** Context object for data scrubbing
@@ -49,28 +77,26 @@ private:
class ScrubContext
{
public:
- virtual void ReportBadChunks(std::span<IoHash> BadChunks);
+ virtual void ReportBadCasChunks(std::span<IoHash> BadCasChunks);
inline uint64_t ScrubTimestamp() const { return m_ScrubTime; }
inline bool RunRecovery() const { return m_Recover; }
+ void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes);
+
+ inline uint64_t ScrubbedChunks() const { return m_ChunkCount; }
+ inline uint64_t ScrubbedBytes() const { return m_ByteCount; }
private:
- uint64_t m_ScrubTime = GetHifreqTimerValue();
- bool m_Recover = true;
+ uint64_t m_ScrubTime = GetHifreqTimerValue();
+ bool m_Recover = true;
+ std::atomic<uint64_t> m_ChunkCount{0};
+ std::atomic<uint64_t> m_ByteCount{0};
+ CasChunkSet m_BadCas;
+ CasChunkSet m_BadCid;
};
-class CasChunkSet
-{
-public:
- void AddChunkToSet(const IoHash& HashToAdd);
- void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate);
- void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback);
- inline [[nodiscard]] bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); }
- inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); }
- inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); }
+/** Content Addressable Storage interface
-private:
- std::unordered_set<IoHash> m_ChunkSet;
-};
+ */
class CasStore
{