aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-10-05 22:25:53 +0200
committerStefan Boberg <[email protected]>2021-10-05 22:25:53 +0200
commit20ac7384f8ca558f1fb933eda846604792240ea0 (patch)
treee5c95b422b847af50b77807af916e389fcaf83aa
parentstats: Mean returns zero when the count is zero (diff)
downloadzen-20ac7384f8ca558f1fb933eda846604792240ea0.tar.xz
zen-20ac7384f8ca558f1fb933eda846604792240ea0.zip
Merged from upstream
-rw-r--r--scripts/deploybuild.py37
-rw-r--r--zencore/compactbinary.cpp51
-rw-r--r--zencore/filesystem.cpp9
-rw-r--r--zencore/include/zencore/blockingqueue.h73
-rw-r--r--zencore/include/zencore/refcount.h9
-rw-r--r--zencore/include/zencore/timer.h15
-rw-r--r--zencore/timer.cpp16
-rw-r--r--zenhttp/httpsys.cpp23
-rw-r--r--zenhttp/iothreadpool.cpp6
-rw-r--r--zenhttp/iothreadpool.h8
-rw-r--r--zenserver/cache/structuredcache.cpp25
-rw-r--r--zenserver/cache/structuredcache.h22
-rw-r--r--zenserver/cache/structuredcachestore.cpp194
-rw-r--r--zenserver/cache/structuredcachestore.h31
-rw-r--r--zenserver/monitoring/httpstats.cpp50
-rw-r--r--zenserver/monitoring/httpstats.h37
-rw-r--r--zenserver/monitoring/httpstatus.cpp50
-rw-r--r--zenserver/monitoring/httpstatus.h37
-rw-r--r--zenserver/upstream/upstreamcache.cpp68
-rw-r--r--zenserver/upstream/zen.cpp6
-rw-r--r--zenserver/upstream/zen.h5
-rw-r--r--zenserver/xmake.lua15
-rw-r--r--zenserver/zenserver.cpp283
-rw-r--r--zenserver/zenserver.vcxproj7
-rw-r--r--zenserver/zenserver.vcxproj.filters7
-rw-r--r--zenstore/CAS.cpp11
-rw-r--r--zenstore/cidstore.cpp2
-rw-r--r--zenstore/compactcas.cpp2
-rw-r--r--zenstore/filecas.cpp12
-rw-r--r--zenstore/include/zenstore/CAS.h52
30 files changed, 862 insertions, 301 deletions
diff --git a/scripts/deploybuild.py b/scripts/deploybuild.py
index 971f34ff9..c81235a8c 100644
--- a/scripts/deploybuild.py
+++ b/scripts/deploybuild.py
@@ -17,6 +17,10 @@ def jazz_print(tag, detail = ""):
def jazz_fail(tag, detail = ""):
print(f"{Fore.RED}{Style.BRIGHT}||> {tag}{Style.RESET_ALL} {detail}")
+def copy_file(src, dst):
+ print(f"{Fore.WHITE}{Style.BRIGHT}||> COPY {Style.RESET_ALL} {src} -> {Fore.GREEN}{Style.BRIGHT}{dst}")
+ shutil.copy(src, dst)
+
colorama.init()
origcwd = os.getcwd()
@@ -25,9 +29,13 @@ origcwd = os.getcwd()
parser = argparse.ArgumentParser(description='Deploy a zen build to an UE tree')
parser.add_argument("root", help="Path to an UE5 root directory")
+parser.add_argument("--sentry", action="store_true", help="Whether to upload symobls to Sentry")
+parser.add_argument("--xmake", action="store_true", help="Build with XMake")
args = parser.parse_args()
engineroot = args.root
+upload_symbols = args.sentry
+use_xmake = args.xmake
if not os.path.isfile(os.path.join(engineroot, "RunUAT.bat")):
print(f"{Fore.RED}Not a valid UE5 engine root directory: '{engineroot}'")
@@ -45,20 +53,29 @@ jazz_print("Zen root:", zenroot)
# Build fresh binaries
-vs_path = vswhere.get_latest_path() # can also specify prerelease=True
-jazz_print("BUILDING CODE", f"using VS root: {vs_path}")
-devenv_path = os.path.join(vs_path, "Common7\\IDE\\devenv.com")
+if use_xmake:
+ build_cmd = ["xmake", "-b", "zenserver"]
+ build_output_dir = r'build\windows\x64\release'
+else:
+ vs_path = vswhere.get_latest_path() # can also specify prerelease=True
+ jazz_print("BUILDING CODE", f"using VS root: {vs_path}")
+ devenv_path = os.path.join(vs_path, "Common7\\IDE\\devenv.com")
+ build_cmd = [devenv_path, "/build", "Release", "zen.sln"]
+ build_output_dir = r'x64\Release'
try:
- subprocess.run([devenv_path, "/build", "Release", "zen.sln"], check=True)
+ subprocess.run(build_cmd, check=True)
except:
jazz_fail("Build failed!")
exit(1)
-# Upload symbols etc to Sentry
+build_output_binary_path = os.path.join(zenroot, build_output_dir, "zenserver.exe")
+build_output_binary_pdb_path = os.path.join(zenroot, build_output_dir, "zenserver.pdb")
-jazz_print("Uploading symbols", "to Sentry")
-subprocess.run(["scripts\sentry-cli.exe", "upload-dif", "--org", "to", "--project", "zen-server", "x64\\Release\\zenserver.exe", "x64\\Release\\zenserver.pdb"])
+# Upload symbols etc to Sentry
+if upload_symbols:
+ jazz_print("Uploading symbols", "to Sentry")
+ subprocess.run(["scripts\sentry-cli.exe", "upload-dif", "--org", "to", "--project", "zen-server", build_output_binary_path, build_output_binary_pdb_path])
# Change into root directory to pick up Perforce environment
@@ -105,9 +122,9 @@ jazz_print("Placing zenserver", f"executables into tree at '{target_bin_dir}'")
crashpadtarget = os.path.join(target_bin_dir, "crashpad_handler.exe")
try:
- shutil.copy(os.path.join(zenroot, "x64\Release\zenserver.exe"), os.path.join(target_bin_dir, "zenserver.exe"))
- shutil.copy(os.path.join(zenroot, "x64\Release\zenserver.pdb"), os.path.join(target_bin_dir, "zenserver.pdb"))
- shutil.copy(os.path.join(zenroot, r'vcpkg_installed\x64-windows-static\x64-windows-static\tools\sentry-native\crashpad_handler.exe'), crashpadtarget)
+ copy_file(build_output_binary_path, os.path.join(target_bin_dir, "zenserver.exe"))
+ copy_file(build_output_binary_pdb_path, os.path.join(target_bin_dir, "zenserver.pdb"))
+ copy_file(os.path.join(zenroot, r'vcpkg_installed\x64-windows-static\tools\sentry-native\crashpad_handler.exe'), crashpadtarget)
P4.add(crashpadtarget).run()
except Exception as e:
print(f"Caught exception while copying: {e.args}")
diff --git a/zencore/compactbinary.cpp b/zencore/compactbinary.cpp
index 4072ab5cf..c4694733c 100644
--- a/zencore/compactbinary.cpp
+++ b/zencore/compactbinary.cpp
@@ -1474,10 +1474,30 @@ public:
Builder << Accessor.AsIntegerNegative();
break;
case CbFieldType::Float32:
- Builder.Append("{:.9g}"_format(Accessor.AsFloat32()));
+ {
+ const float Value = Accessor.AsFloat32();
+ if (std::isfinite(Value))
+ {
+ Builder.Append("{:.9g}"_format(Value));
+ }
+ else
+ {
+ Builder << "null"sv;
+ }
+ }
break;
case CbFieldType::Float64:
- Builder.Append("{:.17g}"_format(Accessor.AsFloat64()));
+ {
+ const double Value = Accessor.AsFloat64();
+ if (std::isfinite(Value))
+ {
+ Builder.Append("{:.17g}"_format(Value));
+ }
+ else
+ {
+ Builder << "null"sv;
+ }
+ }
break;
case CbFieldType::BoolFalse:
Builder << "false"sv;
@@ -1834,7 +1854,7 @@ TEST_CASE("uson.json")
SUBCASE("number")
{
- const double ExpectedFloatValue = 21.21f;
+ const float ExpectedFloatValue = 21.21f;
const double ExpectedDoubleValue = 42.42;
CbObjectWriter Writer;
@@ -1856,6 +1876,31 @@ TEST_CASE("uson.json")
CHECK(FloatValue == doctest::Approx(ExpectedFloatValue));
CHECK(DoubleValue == doctest::Approx(ExpectedDoubleValue));
}
+
+ SUBCASE("number.nan")
+ {
+ const float FloatNan = std::numeric_limits<float>::quiet_NaN();
+ const double DoubleNan = std::numeric_limits<double>::quiet_NaN();
+
+ CbObjectWriter Writer;
+ Writer << "FloatNan" << FloatNan;
+ Writer << "DoubleNan" << DoubleNan;
+
+ CbObject Obj = Writer.Save();
+
+ StringBuilder<128> Sb;
+ const std::string_view JsonText = Obj.ToJson(Sb).ToView();
+
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(JsonText.data(), JsonError);
+
+ const double FloatValue = Json["FloatNan"].number_value();
+ const double DoubleValue = Json["DoubleNan"].number_value();
+
+ CHECK(JsonError.empty());
+ CHECK(FloatValue == 0);
+ CHECK(DoubleValue == 0);
+ }
}
#endif
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp
index f6ba92f98..745fe0779 100644
--- a/zencore/filesystem.cpp
+++ b/zencore/filesystem.cpp
@@ -63,6 +63,13 @@ DeleteReparsePoint(const wchar_t* Path, DWORD dwReparseTag)
bool
CreateDirectories(const wchar_t* Dir)
{
+ // This may be suboptimal, in that it appears to try and create directories
+ // from the root on up instead of from some directory which is known to
+ // be present
+ //
+ // We should implement a smarter version at some point since this can be
+ // pretty expensive in aggregate
+
return std::filesystem::create_directories(Dir);
}
@@ -522,7 +529,7 @@ WriteFile(std::filesystem::path Path, IoBuffer Data)
WriteFile(Path, &DataPtr, 1);
}
-IoBuffer
+IoBuffer
FileContents::Flatten()
{
if (Data.size() == 1)
diff --git a/zencore/include/zencore/blockingqueue.h b/zencore/include/zencore/blockingqueue.h
new file mode 100644
index 000000000..fbb4480a1
--- /dev/null
+++ b/zencore/include/zencore/blockingqueue.h
@@ -0,0 +1,73 @@
+#pragma once
+
+#include <atomic>
+#include <deque>
+#include <mutex>
+
+namespace zen {
+
+template<typename T>
+class BlockingQueue
+{
+public:
+ BlockingQueue() = default;
+
+ ~BlockingQueue() { CompleteAdding(); }
+
+ void Enqueue(T&& Item)
+ {
+ {
+ std::lock_guard Lock(m_Lock);
+ m_Queue.emplace_back(std::move(Item));
+ m_Size++;
+ }
+
+ m_NewItemSignal.notify_one();
+ }
+
+ bool WaitAndDequeue(T& Item)
+ {
+ if (m_CompleteAdding.load())
+ {
+ return false;
+ }
+
+ std::unique_lock Lock(m_Lock);
+ m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); });
+
+ if (!m_Queue.empty())
+ {
+ Item = std::move(m_Queue.front());
+ m_Queue.pop_front();
+ m_Size--;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ void CompleteAdding()
+ {
+ if (!m_CompleteAdding.load())
+ {
+ m_CompleteAdding.store(true);
+ m_NewItemSignal.notify_all();
+ }
+ }
+
+ std::size_t Size() const
+ {
+ std::unique_lock Lock(m_Lock);
+ return m_Queue.size();
+ }
+
+private:
+ mutable std::mutex m_Lock;
+ std::condition_variable m_NewItemSignal;
+ std::deque<T> m_Queue;
+ std::atomic_bool m_CompleteAdding{false};
+ std::atomic_uint32_t m_Size;
+};
+
+} // namespace zen
diff --git a/zencore/include/zencore/refcount.h b/zencore/include/zencore/refcount.h
index 320718f5b..7167ab3b5 100644
--- a/zencore/include/zencore/refcount.h
+++ b/zencore/include/zencore/refcount.h
@@ -4,6 +4,8 @@
#include "atomic.h"
#include "zencore.h"
+#include <concepts>
+
namespace zen {
/**
@@ -114,6 +116,10 @@ public:
inline Ref(T* Ptr) : m_Ref(Ptr) { m_Ref && m_Ref->AddRef(); }
inline ~Ref() { m_Ref && m_Ref->Release(); }
+ template<typename DerivedType>
+ requires std::derived_from<DerivedType, T>
+ inline Ref(const Ref<DerivedType>& Rhs) : Ref(Rhs.m_Ref) {}
+
[[nodiscard]] inline bool IsNull() const { return m_Ref == nullptr; }
inline explicit operator bool() const { return m_Ref != nullptr; }
inline T* operator->() const { return m_Ref; }
@@ -152,6 +158,9 @@ public:
private:
T* m_Ref = nullptr;
+
+ template<class T>
+ friend class Ref;
};
void refcount_forcelink();
diff --git a/zencore/include/zencore/timer.h b/zencore/include/zencore/timer.h
index 693b6daaa..e4ddc3505 100644
--- a/zencore/include/zencore/timer.h
+++ b/zencore/include/zencore/timer.h
@@ -38,6 +38,21 @@ private:
uint64_t m_StartValue;
};
+// Low frequency timers
+
+namespace detail {
+ extern ZENCORE_API uint64_t g_LofreqTimerValue;
+} // namespace detail
+
+inline uint64_t
+GetLofreqTimerValue()
+{
+ return detail::g_LofreqTimerValue;
+}
+
+ZENCORE_API void UpdateLofreqTimerValue();
+ZENCORE_API uint64_t GetLofreqTimerFrequency();
+
void timer_forcelink(); // internal
} // namespace zen
diff --git a/zencore/timer.cpp b/zencore/timer.cpp
index 5d30d9b29..9180519bd 100644
--- a/zencore/timer.cpp
+++ b/zencore/timer.cpp
@@ -69,6 +69,22 @@ GetHifreqTimerFrequencySafe()
}
//////////////////////////////////////////////////////////////////////////
+
+uint64_t detail::g_LofreqTimerValue = GetHifreqTimerValue();
+
+void
+UpdateLofreqTimerValue()
+{
+ detail::g_LofreqTimerValue = GetHifreqTimerValue();
+}
+
+uint64_t
+GetLofreqTimerFrequency()
+{
+ return GetHifreqTimerFrequencySafe();
+}
+
+//////////////////////////////////////////////////////////////////////////
//
// Testing related code follows...
//
diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp
index fedaf282e..de3069bb8 100644
--- a/zenhttp/httpsys.cpp
+++ b/zenhttp/httpsys.cpp
@@ -1091,29 +1091,34 @@ HttpSysServerRequest::HttpSysServerRequest(HttpSysTransaction& Tx, HttpService&
if (AbsPathLength >= PrefixLength)
{
// We convert the URI immediately because most of the code involved prefers to deal
- // with utf8. This has some performance impact which I'd prefer to avoid but for now
- // we just have to live with it
+ // with utf8. This is overhead which I'd prefer to avoid but for now we just have
+ // to live with it
WideToUtf8({(char16_t*)HttpRequestPtr->CookedUrl.pAbsPath + PrefixLength, gsl::narrow<size_t>(AbsPathLength - PrefixLength)},
m_UriUtf8);
- std::string_view Uri8{m_UriUtf8};
+ std::string_view UriSuffix8{m_UriUtf8};
- const size_t LastComponentIndex = Uri8.find_last_of('/');
+ const size_t LastComponentIndex = UriSuffix8.find_last_of('/');
if (LastComponentIndex != std::string_view::npos)
{
- Uri8.remove_prefix(LastComponentIndex);
+ UriSuffix8.remove_prefix(LastComponentIndex);
}
- const size_t LastDotIndex = Uri8.find_last_of('.');
+ const size_t LastDotIndex = UriSuffix8.find_last_of('.');
if (LastDotIndex != std::string_view::npos)
{
- Uri8.remove_prefix(LastDotIndex + 1);
- }
+ UriSuffix8.remove_prefix(LastDotIndex + 1);
+
+ AcceptContentType = ParseContentType(UriSuffix8);
- AcceptContentType = ParseContentType(Uri8);
+ if (AcceptContentType != HttpContentType::kUnknownContentType)
+ {
+ m_UriUtf8.RemoveSuffix(uint32_t(m_UriUtf8.Size() - LastComponentIndex - LastDotIndex - 1));
+ }
+ }
}
else
{
diff --git a/zenhttp/iothreadpool.cpp b/zenhttp/iothreadpool.cpp
index 4f1a6642b..6087e69ec 100644
--- a/zenhttp/iothreadpool.cpp
+++ b/zenhttp/iothreadpool.cpp
@@ -4,6 +4,8 @@
#include <zencore/except.h>
+#if ZEN_PLATFORM_WINDOWS
+
namespace zen {
WinIoThreadPool::WinIoThreadPool(int InThreadCount)
@@ -32,6 +34,8 @@ WinIoThreadPool::~WinIoThreadPool()
void
WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode)
{
+ ZEN_ASSERT(!m_ThreadPoolIo);
+
m_ThreadPoolIo = CreateThreadpoolIo(IoHandle, Callback, Context, &m_CallbackEnvironment);
if (!m_ThreadPoolIo)
@@ -41,3 +45,5 @@ WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, voi
}
} // namespace zen
+
+#endif
diff --git a/zenhttp/iothreadpool.h b/zenhttp/iothreadpool.h
index 4418b940b..8333964c3 100644
--- a/zenhttp/iothreadpool.h
+++ b/zenhttp/iothreadpool.h
@@ -2,9 +2,12 @@
#pragma once
-#include <zencore/windows.h>
+#include <zencore/zencore.h>
-#include <system_error>
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+
+# include <system_error>
namespace zen {
@@ -31,3 +34,4 @@ private:
};
} // namespace zen
+#endif
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index d6174caf6..27bb1c5cd 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -12,6 +12,7 @@
#include <zenhttp/httpserver.h>
#include <zenstore/CAS.h>
+#include "monitoring/httpstats.h"
#include "structuredcache.h"
#include "structuredcachestore.h"
#include "upstream/jupiter.h"
@@ -149,13 +150,19 @@ ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
CasStore& InStore,
CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
std::unique_ptr<UpstreamCache> UpstreamCache)
: m_Log(logging::Get("cache"))
, m_CacheStore(InCacheStore)
+, m_StatsService(StatsService)
+, m_StatusService(StatusService)
, m_CasStore(InStore)
, m_CidStore(InCidStore)
, m_UpstreamCache(std::move(UpstreamCache))
{
+ StatsService.RegisterHandler("z$", *this);
+ StatusService.RegisterHandler("z$", *this);
}
HttpStructuredCacheService::~HttpStructuredCacheService()
@@ -200,13 +207,6 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
std::string_view Key = Request.RelativeUri();
- if (Key.empty() || Key == "stats.json")
- {
- $.Cancel();
-
- return HandleStatusRequest(Request);
- }
-
if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); }))
{
// Bucket reference
@@ -870,10 +870,9 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
}
void
-HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request)
+HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request)
{
CbObjectWriter Cbo;
- Cbo << "ok" << true;
EmitSnapshot("requests", m_HttpRequests, Cbo);
@@ -899,4 +898,12 @@ HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request)
Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
+void
+HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
} // namespace zen
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index a360878bd..703e24ed3 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -5,6 +5,9 @@
#include <zencore/stats.h>
#include <zenhttp/httpserver.h>
+#include "monitoring/httpstats.h"
+#include "monitoring/httpstatus.h"
+
#include <memory>
namespace spdlog {
@@ -47,12 +50,14 @@ enum class CachePolicy : uint8_t;
*
*/
-class HttpStructuredCacheService : public zen::HttpService
+class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider
{
public:
HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- zen::CasStore& InCasStore,
- zen::CidStore& InCidStore,
+ CasStore& InCasStore,
+ CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
std::unique_ptr<UpstreamCache> UpstreamCache);
~HttpStructuredCacheService();
@@ -86,13 +91,16 @@ private:
void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
- void HandleStatusRequest(zen::HttpServerRequest& Request);
+ virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
- zen::ZenCacheStore& m_CacheStore;
- zen::CasStore& m_CasStore;
- zen::CidStore& m_CidStore;
+ ZenCacheStore& m_CacheStore;
+ HttpStatsService& m_StatsService;
+ HttpStatusService& m_StatusService;
+ CasStore& m_CasStore;
+ CidStore& m_CidStore;
std::unique_ptr<UpstreamCache> m_UpstreamCache;
uint64_t m_LastScrubTime = 0;
metrics::OperationTiming m_HttpRequests;
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index b97f0830f..83a21f87a 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -32,6 +32,8 @@ ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir
{
ZEN_INFO("initializing structured cache at '{}'", RootDir);
CreateDirectories(RootDir);
+
+ m_DiskLayer.DiscoverBuckets();
}
ZenCacheStore::~ZenCacheStore()
@@ -149,6 +151,10 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa
_.ReleaseNow();
+ // There's a race here. Since the lock is released early to allow
+ // inserts, the bucket delete path could end up deleting the
+ // underlying data structure
+
return Bucket->Get(HashKey, OutValue);
}
@@ -210,11 +216,13 @@ ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx)
void
ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
+ RwLock::SharedLockScope _(m_bucketLock);
+
std::vector<IoHash> BadHashes;
for (auto& Kv : m_cacheMap)
{
- if (Kv.first != IoHash::HashBuffer(Kv.second))
+ if (Kv.first != IoHash::HashBuffer(Kv.second.Payload))
{
BadHashes.push_back(Kv.first);
}
@@ -222,10 +230,16 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
if (!BadHashes.empty())
{
- Ctx.ReportBadChunks(BadHashes);
+ Ctx.ReportBadCasChunks(BadHashes);
}
}
+void
+ZenCacheMemoryLayer::CacheBucket::GarbageCollect(GcContext& GcCtx)
+{
+ ZEN_UNUSED(GcCtx);
+}
+
bool
ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -237,18 +251,26 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV
}
else
{
- OutValue.Value = bucketIt->second;
+ BucketValue& Value = bucketIt.value();
+ OutValue.Value = Value.Payload;
+ Value.LastAccess = GetCurrentTimeStamp();
return true;
}
}
+uint64_t
+ZenCacheMemoryLayer::CacheBucket::GetCurrentTimeStamp()
+{
+ return GetLofreqTimerValue();
+}
+
void
ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
RwLock::ExclusiveLockScope _(m_bucketLock);
- m_cacheMap[HashKey] = Value.Value;
+ m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GetCurrentTimeStamp(), .Payload = Value.Value});
}
//////////////////////////////////////////////////////////////////////////
@@ -258,11 +280,17 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue
struct DiskLocation
{
- uint64_t OffsetAndFlags;
- uint32_t Size;
- uint32_t IndexDataSize;
+ inline DiskLocation() = default;
+
+ inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
+ : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
+ , LowerSize(ValueSize & 0xFFFFffff)
+ , IndexDataSize(IndexSize)
+ {
+ }
- static const uint64_t kOffsetMask = 0x00FF'ffFF'ffFF'ffFFull;
+ static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
+ static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull;
static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
@@ -270,6 +298,7 @@ struct DiskLocation
static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
+ inline uint64_t Size() const { return LowerSize; }
inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
inline ZenContentType GetContentType() const
{
@@ -282,6 +311,11 @@ struct DiskLocation
return ContentType;
}
+
+private:
+ uint64_t OffsetAndFlags = 0;
+ uint32_t LowerSize = 0;
+ uint32_t IndexDataSize = 0;
};
struct DiskIndexEntry
@@ -299,7 +333,7 @@ struct ZenCacheDiskLayer::CacheBucket
CacheBucket(CasStore& Cas);
~CacheBucket();
- void OpenOrCreate(std::filesystem::path BucketDir);
+ void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
static bool Delete(std::filesystem::path BucketDir);
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -309,13 +343,13 @@ struct ZenCacheDiskLayer::CacheBucket
void Scrub(ScrubContext& Ctx);
void GarbageCollect(GcContext& GcCtx);
- inline bool IsOk() const { return m_Ok; }
+ inline bool IsOk() const { return m_IsOk; }
private:
CasStore& m_CasStore;
std::filesystem::path m_BucketDir;
Oid m_BucketId;
- bool m_Ok = false;
+ bool m_IsOk = false;
uint64_t m_LargeObjectThreshold = 64 * 1024;
// These files are used to manage storage of small objects for this bucket
@@ -355,7 +389,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir)
}
void
-ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
+ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate)
{
CreateDirectories(BucketDir);
@@ -382,17 +416,23 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
{
ManifestFile.Read(&m_BucketId, sizeof(Oid), 0);
- m_Ok = true;
+ m_IsOk = true;
}
- if (!m_Ok)
+ if (!m_IsOk)
{
ManifestFile.Close();
}
}
- if (!m_Ok)
+ if (!m_IsOk)
{
+ if (AllowCreate == false)
+ {
+ // Invalid bucket
+ return;
+ }
+
// No manifest file found, this is a new bucket
ManifestFile.Open(ManifestPath, /* IsCreate */ true, Ec);
@@ -424,13 +464,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
m_SlogFile.Replay([&](const DiskIndexEntry& Record) {
m_Index[Record.Key] = Record.Location;
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size);
+ MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size());
});
m_WriteCursor = (MaxFileOffset + 15) & ~15;
}
- m_Ok = true;
+ m_IsOk = true;
}
void
@@ -453,7 +493,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, Zen
{
if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size);
+ OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size());
OutValue.Value.SetContentType(Loc.GetContentType());
return true;
@@ -482,7 +522,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const IoHash& HashKey, Z
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
- if (!m_Ok)
+ if (!m_IsOk)
{
return false;
}
@@ -509,7 +549,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
void
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
- if (!m_Ok)
+ if (!m_IsOk)
{
return;
}
@@ -531,10 +571,9 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
RwLock::ExclusiveLockScope _(m_IndexLock);
- DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags),
- .Size = gsl::narrow<uint32_t>(Value.Value.Size())};
+ DiskLocation Loc(m_WriteCursor, Value.Value.Size(), 0, EntryFlags);
- m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size, 16);
+ m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size(), 16);
if (auto it = m_Index.find(HashKey); it == m_Index.end())
{
@@ -544,11 +583,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
else
{
// TODO: should check if write is idempotent and bail out if it is?
+ // this would requiring comparing contents on disk unless we add a
+ // content hash to the index entry
it.value() = Loc;
}
m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset());
+ m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset());
}
}
@@ -572,55 +613,45 @@ ZenCacheDiskLayer::CacheBucket::Flush()
void
ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
- std::vector<DiskIndexEntry> StandaloneFiles;
-
- std::vector<IoHash> BadChunks;
- std::vector<IoBuffer> BadStandaloneChunks;
+ std::atomic<uint64_t> ScrubbedChunks{0}, ScrubbedBytes{0};
{
RwLock::SharedLockScope _(m_IndexLock);
for (auto& Kv : m_Index)
{
- const IoHash& Hash = Kv.first;
- const DiskLocation& Loc = Kv.second;
+ const IoHash& HashKey = Kv.first;
+ const DiskLocation& Loc = Kv.second;
ZenCacheValue Value;
- if (!GetInlineCacheValue(Loc, Value))
+ if (GetInlineCacheValue(Loc, Value))
{
- ZEN_ASSERT(Loc.IsFlagSet(DiskLocation::kStandaloneFile));
- StandaloneFiles.push_back({.Key = Hash, .Location = Loc});
+ // Validate contents
}
else
{
- if (GetStandaloneCacheValue(Hash, Value, Loc))
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- // Hash contents
-
- const IoHash ComputedHash = HashBuffer(Value.Value);
-
- if (ComputedHash != Hash)
+ if (GetStandaloneCacheValue(HashKey, Value, Loc))
{
- BadChunks.push_back(Hash);
+ // Note: we cannot currently validate contents since we don't
+ // have a content hash!
+ }
+ else
+ {
+ // Value not found
}
- }
- else
- {
- // Non-existent
}
}
}
}
- if (Ctx.RunRecovery())
- {
- // Clean out bad chunks
- }
+ Ctx.ReportScrubbed(ScrubbedChunks, ScrubbedBytes);
- if (!BadChunks.empty())
+ if (Ctx.RunRecovery())
{
- Ctx.ReportBadChunks(BadChunks);
+ // Clean out bad data
}
}
@@ -681,7 +712,7 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenC
RwLock::ExclusiveLockScope _(m_IndexLock);
- DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0};
+ DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags);
if (auto it = m_Index.find(HashKey); it == m_Index.end())
{
@@ -739,7 +770,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= std::string(InBucket);
- Bucket->OpenOrCreate(BucketPath.c_str());
+ Bucket->OpenOrCreate(BucketPath);
}
}
@@ -782,7 +813,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
std::filesystem::path bucketPath = m_RootDir;
bucketPath /= std::string(InBucket);
- Bucket->OpenOrCreate(bucketPath.c_str());
+ Bucket->OpenOrCreate(bucketPath);
}
}
@@ -794,6 +825,63 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
}
}
+void
+ZenCacheDiskLayer::DiscoverBuckets()
+{
+ FileSystemTraversal Traversal;
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent,
+ [[maybe_unused]] const path_view& File,
+ [[maybe_unused]] uint64_t FileSize) override
+ {
+ }
+
+ virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override
+ {
+ Dirs.push_back(std::wstring(DirectoryName));
+ return false;
+ }
+
+ std::vector<std::wstring> Dirs;
+ } Visit;
+
+ Traversal.TraverseFileSystem(m_RootDir, Visit);
+
+ // Initialize buckets
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ for (const std::wstring& BucketName : Visit.Dirs)
+ {
+ // New bucket needs to be created
+
+ std::string BucketName8 = WideToUtf8(BucketName);
+
+ if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end())
+ {
+ }
+ else
+ {
+ auto InsertResult = m_Buckets.try_emplace(BucketName8, m_CasStore);
+
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= BucketName8;
+
+ CacheBucket& Bucket = InsertResult.first->second;
+
+ Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false);
+
+ if (!Bucket.IsOk())
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir);
+
+ m_Buckets.erase(InsertResult.first);
+ }
+ }
+ }
+}
+
bool
ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
{
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index 011f13323..4753af627 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -46,6 +46,11 @@ struct ZenCacheValue
CbObject IndexData;
};
+/** In-memory cache storage
+
+ Intended for small values which are frequently accessed
+
+ */
class ZenCacheMemoryLayer
{
public:
@@ -58,19 +63,39 @@ public:
void Scrub(ScrubContext& Ctx);
void GarbageCollect(GcContext& GcCtx);
+ struct Configuration
+ {
+ uint64_t TargetFootprintBytes = 16 * 1024 * 1024;
+ uint64_t ScavengeThreshold = 4 * 1024 * 1024;
+ };
+
+ const Configuration& GetConfiguration() const { return m_Configuration; }
+ void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; }
+
private:
struct CacheBucket
{
- RwLock m_bucketLock;
- tsl::robin_map<IoHash, IoBuffer> m_cacheMap;
+ struct BucketValue
+ {
+ uint64_t LastAccess = 0;
+ IoBuffer Payload;
+ };
+
+ RwLock m_bucketLock;
+ tsl::robin_map<IoHash, BucketValue> m_cacheMap;
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(const IoHash& HashKey, const ZenCacheValue& Value);
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
+
+ private:
+ uint64_t GetCurrentTimeStamp();
};
RwLock m_Lock;
std::unordered_map<std::string, CacheBucket> m_Buckets;
+ Configuration m_Configuration;
};
class ZenCacheDiskLayer
@@ -86,6 +111,8 @@ public:
void Scrub(ScrubContext& Ctx);
void GarbageCollect(GcContext& GcCtx);
+ void DiscoverBuckets();
+
private:
/** A cache bucket manages a single directory containing
metadata and data for that bucket
diff --git a/zenserver/monitoring/httpstats.cpp b/zenserver/monitoring/httpstats.cpp
new file mode 100644
index 000000000..de04294d0
--- /dev/null
+++ b/zenserver/monitoring/httpstats.cpp
@@ -0,0 +1,50 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "httpstats.h"
+
+namespace zen {
+
+HttpStatsService::HttpStatsService() : m_Log(logging::Get("stats"))
+{
+}
+
+HttpStatsService::~HttpStatsService()
+{
+}
+
+const char*
+HttpStatsService::BaseUri() const
+{
+ return "/stats/";
+}
+
+void
+HttpStatsService::RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Providers.insert_or_assign(std::string(Id), &Provider);
+}
+
+void
+HttpStatsService::HandleRequest(HttpServerRequest& Request)
+{
+ using namespace std::literals;
+
+ std::string_view Key = Request.RelativeUri();
+
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kHead:
+ case HttpVerb::kGet:
+ if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers))
+ {
+ return It->second->HandleStatsRequest(Request);
+ }
+
+ [[fallthrough]];
+ default:
+ return;
+ }
+}
+
+} // namespace zen
diff --git a/zenserver/monitoring/httpstats.h b/zenserver/monitoring/httpstats.h
new file mode 100644
index 000000000..1c3c79dd0
--- /dev/null
+++ b/zenserver/monitoring/httpstats.h
@@ -0,0 +1,37 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+#include <zenhttp/httpserver.h>
+
+#include <map>
+
+namespace zen {
+
+struct IHttpStatsProvider
+{
+ virtual void HandleStatsRequest(HttpServerRequest& Request) = 0;
+};
+
+class HttpStatsService : public HttpService
+{
+public:
+ HttpStatsService();
+ ~HttpStatsService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+ void RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider);
+
+private:
+ spdlog::logger& m_Log;
+ HttpRequestRouter m_Router;
+
+ inline spdlog::logger& Log() { return m_Log; }
+
+ RwLock m_Lock;
+ std::map<std::string, IHttpStatsProvider*> m_Providers;
+};
+
+} // namespace zen \ No newline at end of file
diff --git a/zenserver/monitoring/httpstatus.cpp b/zenserver/monitoring/httpstatus.cpp
new file mode 100644
index 000000000..e12662b1c
--- /dev/null
+++ b/zenserver/monitoring/httpstatus.cpp
@@ -0,0 +1,50 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "httpstatus.h"
+
+namespace zen {
+
+HttpStatusService::HttpStatusService() : m_Log(logging::Get("status"))
+{
+}
+
+HttpStatusService::~HttpStatusService()
+{
+}
+
+const char*
+HttpStatusService::BaseUri() const
+{
+ return "/status/";
+}
+
+void
+HttpStatusService::RegisterHandler(std::string_view Id, IHttpStatusProvider& Provider)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Providers.insert_or_assign(std::string(Id), &Provider);
+}
+
+void
+HttpStatusService::HandleRequest(HttpServerRequest& Request)
+{
+ using namespace std::literals;
+
+ std::string_view Key = Request.RelativeUri();
+
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kHead:
+ case HttpVerb::kGet:
+ if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers))
+ {
+ return It->second->HandleStatusRequest(Request);
+ }
+
+ [[fallthrough]];
+ default:
+ return;
+ }
+}
+
+} // namespace zen
diff --git a/zenserver/monitoring/httpstatus.h b/zenserver/monitoring/httpstatus.h
new file mode 100644
index 000000000..8f069f760
--- /dev/null
+++ b/zenserver/monitoring/httpstatus.h
@@ -0,0 +1,37 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+#include <zenhttp/httpserver.h>
+
+#include <map>
+
+namespace zen {
+
+struct IHttpStatusProvider
+{
+ virtual void HandleStatusRequest(HttpServerRequest& Request) = 0;
+};
+
+class HttpStatusService : public HttpService
+{
+public:
+ HttpStatusService();
+ ~HttpStatusService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+ void RegisterHandler(std::string_view Id, IHttpStatusProvider& Provider);
+
+private:
+ spdlog::logger& m_Log;
+ HttpRequestRouter m_Router;
+
+ RwLock m_Lock;
+ std::map<std::string, IHttpStatusProvider*> m_Providers;
+
+ inline spdlog::logger& Log() { return m_Log; }
+};
+
+} // namespace zen \ No newline at end of file
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index a183e7ebf..ba5991b3f 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -8,6 +8,7 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
+#include <zencore/blockingqueue.h>
#include <zencore/fmtutils.h>
#include <zencore/stats.h>
#include <zencore/stream.h>
@@ -23,7 +24,6 @@
#include <algorithm>
#include <atomic>
-#include <deque>
#include <thread>
#include <unordered_map>
@@ -33,70 +33,6 @@ using namespace std::literals;
namespace detail {
- template<typename T>
- class BlockingQueue
- {
- public:
- BlockingQueue() = default;
-
- ~BlockingQueue() { CompleteAdding(); }
-
- void Enqueue(T&& Item)
- {
- {
- std::lock_guard Lock(m_Lock);
- m_Queue.emplace_back(std::move(Item));
- m_Size++;
- }
-
- m_NewItemSignal.notify_one();
- }
-
- bool WaitAndDequeue(T& Item)
- {
- if (m_CompleteAdding.load())
- {
- return false;
- }
-
- std::unique_lock Lock(m_Lock);
- m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); });
-
- if (!m_Queue.empty())
- {
- Item = std::move(m_Queue.front());
- m_Queue.pop_front();
- m_Size--;
-
- return true;
- }
-
- return false;
- }
-
- void CompleteAdding()
- {
- if (!m_CompleteAdding.load())
- {
- m_CompleteAdding.store(true);
- m_NewItemSignal.notify_all();
- }
- }
-
- std::size_t Size() const
- {
- std::unique_lock Lock(m_Lock);
- return m_Queue.size();
- }
-
- private:
- mutable std::mutex m_Lock;
- std::condition_variable m_NewItemSignal;
- std::deque<T> m_Queue;
- std::atomic_bool m_CompleteAdding{false};
- std::atomic_uint32_t m_Size;
- };
-
class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
@@ -1029,7 +965,7 @@ private:
spdlog::logger& Log() { return m_Log; }
- using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>;
+ using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>;
struct RunState
{
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index c988a6b0b..6141fd397 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -328,7 +328,9 @@ namespace detail {
//////////////////////////////////////////////////////////////////////////
-ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl) : m_ServiceUrl(ServiceUrl)
+ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl)
+: m_Log(logging::Get(std::string_view("zenclient")))
+, m_ServiceUrl(ServiceUrl)
{
}
@@ -369,7 +371,7 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State)
using namespace std::literals;
ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient)
-: m_Log(logging::Get("zenclient"sv))
+: m_Log(OuterClient.Log())
, m_Client(OuterClient)
{
m_SessionState = m_Client.AllocSessionState();
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 158be668a..12e46bd8d 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -138,8 +138,11 @@ public:
std::string_view ServiceUrl() const { return m_ServiceUrl; }
+ inline spdlog::logger& Log() { return m_Log; }
+
private:
- std::string m_ServiceUrl;
+ spdlog::logger& m_Log;
+ std::string m_ServiceUrl;
RwLock m_SessionStateLock;
std::list<detail::ZenCacheSessionState*> m_SessionStateCache;
diff --git a/zenserver/xmake.lua b/zenserver/xmake.lua
index 7a6981fcd..fb1ba651d 100644
--- a/zenserver/xmake.lua
+++ b/zenserver/xmake.lua
@@ -32,3 +32,18 @@ target("zenserver")
add_packages(
"vcpkg::cxxopts",
"vcpkg::mimalloc")
+
+ on_load(function(target)
+ local commit, err = os.iorun("git log -1 --format=\"%h-%cI\"")
+ if commit ~= nil then
+ commit = commit:gsub("%s+", "")
+ commit = commit:gsub("\n", "")
+ if is_mode("release") then
+ commit = "rel-" .. commit
+ else
+ commit = "dbg-" .. commit
+ end
+ target:add("defines","BUILD_VERSION=\"" .. commit .. "\"")
+ print("build version " .. commit)
+ end
+ end)
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index f9b4d5677..18c59636d 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -1,5 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include <zencore/compactbinarybuilder.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
@@ -31,6 +32,10 @@
#include <set>
#include <unordered_map>
+#if !defined(BUILD_VERSION)
+# define BUILD_VERSION ("dev-build")
+#endif
+
//////////////////////////////////////////////////////////////////////////
// We don't have any doctest code in this file but this is needed to bring
// in some shared code into the executable
@@ -83,6 +88,8 @@
#include "diag/diagsvcs.h"
#include "experimental/frontend.h"
#include "experimental/usnjournal.h"
+#include "monitoring/httpstats.h"
+#include "monitoring/httpstatus.h"
#include "projectstore.h"
#include "testing/httptest.h"
#include "testing/launch.h"
@@ -96,17 +103,16 @@
namespace zen {
-class ZenServer
-{
- ZenServerState::ZenServerEntry* m_ServerEntry = nullptr;
+using namespace std::literals;
+class ZenServer : public IHttpStatusProvider
+{
public:
void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid, ZenServerState::ZenServerEntry* ServerEntry)
{
- m_ServerEntry = ServerEntry;
using namespace fmt::literals;
- ZEN_INFO(ZEN_APP_NAME " initializing");
+ m_ServerEntry = ServerEntry;
m_DebugOptionForcedCrash = ServiceConfig.ShouldCrash;
if (ParentPid)
@@ -140,6 +146,15 @@ public:
// Ok so now we're configured, let's kick things off
+ m_Http = zen::CreateHttpServer();
+ m_Http->Initialize(BasePort);
+ m_Http->RegisterService(m_HealthService);
+ m_Http->RegisterService(m_StatsService);
+ m_Http->RegisterService(m_StatusService);
+ m_StatusService.RegisterHandler("status", *this);
+
+ // Initialize storage and services
+
ZEN_INFO("initializing storage");
zen::CasStoreConfiguration Config;
@@ -167,95 +182,7 @@ public:
if (ServiceConfig.StructuredCacheEnabled)
{
- using namespace std::literals;
- auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; };
-
- ZEN_INFO("instantiating structured cache service");
- m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
-
- std::unique_ptr<zen::UpstreamCache> UpstreamCache;
- if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
- {
- const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig;
-
- zen::UpstreamCacheOptions UpstreamOptions;
- UpstreamOptions.ReadUpstream =
- (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
- UpstreamOptions.WriteUpstream =
- (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
-
- if (UpstreamConfig.UpstreamThreadCount < 32)
- {
- UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
- }
-
- UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled;
-
- UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
-
- if (!UpstreamConfig.ZenConfig.Urls.empty())
- {
- std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls);
- UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
- }
-
- {
- zen::CloudCacheClientOptions Options;
- if (UpstreamConfig.JupiterConfig.UseProductionSettings)
- {
- Options = zen::CloudCacheClientOptions{
- .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
- .DdcNamespace = "ue.ddc"sv,
- .BlobStoreNamespace = "ue.ddc"sv,
- .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
- .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
- .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
- .UseLegacyDdc = false};
- }
- else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
- {
- Options = zen::CloudCacheClientOptions{
- .ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
- .DdcNamespace = "ue4.ddc"sv,
- .BlobStoreNamespace = "test.ddc"sv,
- .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
- .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
- .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
- .UseLegacyDdc = false};
- }
-
- Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl);
- Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace);
- Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace);
- Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider);
- Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId);
- Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret);
- Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc;
-
- if (!Options.ServiceUrl.empty())
- {
- std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options);
- UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
- }
- }
-
- if (UpstreamCache->Initialize())
- {
- ZEN_INFO("upstream cache active ({})",
- UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE"
- : UpstreamOptions.ReadUpstream ? "READONLY"
- : UpstreamOptions.WriteUpstream ? "WRITEONLY"
- : "DISABLED");
- }
- else
- {
- UpstreamCache.reset();
- ZEN_INFO("NOT using upstream cache");
- }
- }
-
- m_StructuredCacheService.reset(
- new zen::HttpStructuredCacheService(*m_CacheStore, *m_CasStore, *m_CidStore, std::move(UpstreamCache)));
+ InitializeStructuredCache(ServiceConfig);
}
else
{
@@ -273,13 +200,8 @@ public:
}
#endif
- m_Http = zen::CreateHttpServer();
- m_Http->Initialize(BasePort);
- m_Http->RegisterService(m_HealthService);
-
m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics
m_Http->RegisterService(m_TestingService);
-
m_Http->RegisterService(m_AdminService);
if (m_HttpProjectService)
@@ -305,12 +227,15 @@ public:
}
m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot);
+
if (m_FrontendService)
{
m_Http->RegisterService(*m_FrontendService);
}
}
+ void InitializeStructuredCache(ZenServiceConfig& ServiceConfig);
+
#if ZEN_ENABLE_MESH
void StartMesh(int BasePort)
{
@@ -351,8 +276,12 @@ public:
const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode;
+ m_CurrentState = kRunning;
+
m_Http->Run(IsInteractiveMode);
+ m_CurrentState = kShuttingDown;
+
ZEN_INFO(ZEN_APP_NAME " exiting");
m_IoContext.stop();
@@ -425,6 +354,7 @@ public:
void Scrub()
{
+ Stopwatch Timer;
ZEN_INFO("Storage validation STARTING");
ScrubContext Ctx;
@@ -433,7 +363,13 @@ public:
m_ProjectStore->Scrub(Ctx);
m_StructuredCacheService->Scrub(Ctx);
- ZEN_INFO("Storage validation DONE");
+ const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
+
+ ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})",
+ NiceTimeSpanMs(ElapsedTimeMs),
+ NiceBytes(Ctx.ScrubbedBytes()),
+ Ctx.ScrubbedChunks(),
+ NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs));
}
void Flush()
@@ -451,18 +387,51 @@ public:
m_ProjectStore->Flush();
}
+ virtual void HandleStatusRequest(HttpServerRequest& Request) override
+ {
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Cbo << "state" << ToString(m_CurrentState);
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+
private:
- bool m_IsDedicatedMode = false;
- bool m_TestMode = false;
- std::filesystem::path m_DataRoot;
- std::filesystem::path m_ContentRoot;
- std::jthread m_IoRunner;
- asio::io_context m_IoContext;
- asio::steady_timer m_PidCheckTimer{m_IoContext};
- zen::ProcessMonitor m_ProcessMonitor;
- zen::NamedMutex m_ServerMutex;
+ ZenServerState::ZenServerEntry* m_ServerEntry = nullptr;
+ bool m_IsDedicatedMode = false;
+ bool m_TestMode = false;
+ std::filesystem::path m_DataRoot;
+ std::filesystem::path m_ContentRoot;
+ std::jthread m_IoRunner;
+ asio::io_context m_IoContext;
+ asio::steady_timer m_PidCheckTimer{m_IoContext};
+ zen::ProcessMonitor m_ProcessMonitor;
+ zen::NamedMutex m_ServerMutex;
+
+ enum ServerState
+ {
+ kInitializing,
+ kRunning,
+ kShuttingDown
+ } m_CurrentState = kInitializing;
+
+ std::string_view ToString(ServerState Value)
+ {
+ switch (Value)
+ {
+ case kInitializing:
+ return "initializing"sv;
+ case kRunning:
+ return "running"sv;
+ case kShuttingDown:
+ return "shutdown"sv;
+ default:
+ return "unknown"sv;
+ }
+ }
zen::Ref<zen::HttpServer> m_Http;
+ zen::HttpStatusService m_StatusService;
+ zen::HttpStatsService m_StatsService;
std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()};
std::unique_ptr<zen::CidStore> m_CidStore;
std::unique_ptr<zen::ZenCacheStore> m_CacheStore;
@@ -485,6 +454,100 @@ private:
bool m_DebugOptionForcedCrash = false;
};
+void
+ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig)
+{
+ using namespace std::literals;
+ auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; };
+
+ ZEN_INFO("instantiating structured cache service");
+ m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
+
+ std::unique_ptr<zen::UpstreamCache> UpstreamCache;
+ if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
+ {
+ const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig;
+
+ zen::UpstreamCacheOptions UpstreamOptions;
+ UpstreamOptions.ReadUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
+ UpstreamOptions.WriteUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
+
+ if (UpstreamConfig.UpstreamThreadCount < 32)
+ {
+ UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
+ }
+
+ UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled;
+
+ UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
+
+ if (!UpstreamConfig.ZenConfig.Urls.empty())
+ {
+ std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls);
+ UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
+ }
+
+ {
+ zen::CloudCacheClientOptions Options;
+ if (UpstreamConfig.JupiterConfig.UseProductionSettings)
+ {
+ Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
+ .DdcNamespace = "ue.ddc"sv,
+ .BlobStoreNamespace = "ue.ddc"sv,
+ .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
+ .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
+ .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
+ .UseLegacyDdc = false};
+ }
+ else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
+ {
+ Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
+ .DdcNamespace = "ue4.ddc"sv,
+ .BlobStoreNamespace = "test.ddc"sv,
+ .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
+ .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
+ .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
+ .UseLegacyDdc = false};
+ }
+
+ Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl);
+ Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace);
+ Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace);
+ Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider);
+ Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId);
+ Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret);
+ Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc;
+
+ if (!Options.ServiceUrl.empty())
+ {
+ std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options);
+ UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
+ }
+ }
+
+ if (UpstreamCache->Initialize())
+ {
+ ZEN_INFO("upstream cache active ({})",
+ UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE"
+ : UpstreamOptions.ReadUpstream ? "READONLY"
+ : UpstreamOptions.WriteUpstream ? "WRITEONLY"
+ : "DISABLED");
+ }
+ else
+ {
+ UpstreamCache.reset();
+ ZEN_INFO("NOT using upstream cache");
+ }
+ }
+
+ m_StructuredCacheService.reset(new zen::HttpStructuredCacheService(*m_CacheStore,
+ *m_CasStore,
+ *m_CidStore,
+ m_StatsService,
+ m_StatusService,
+ std::move(UpstreamCache)));
+}
+
} // namespace zen
class ZenWindowsService : public WindowsService
@@ -532,7 +595,7 @@ ZenWindowsService::Run()
ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig);
- ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort);
+ ZEN_INFO(ZEN_APP_NAME " - starting on port {}, build '{}'", GlobalOptions.BasePort, BUILD_VERSION);
ZenServerState ServerState;
ServerState.Initialize();
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index 335786fbf..7fad477a1 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -108,8 +108,12 @@
<ClInclude Include="cache\structuredcachestore.h" />
<ClInclude Include="compute\apply.h" />
<ClInclude Include="config.h" />
+ <ClInclude Include="diag\formatters.h" />
<ClInclude Include="diag\logging.h" />
<ClInclude Include="experimental\frontend.h" />
+ <ClInclude Include="experimental\vfs.h" />
+ <ClInclude Include="monitoring\httpstats.h" />
+ <ClInclude Include="monitoring\httpstatus.h" />
<ClInclude Include="resource.h" />
<ClInclude Include="sos\sos.h" />
<ClInclude Include="testing\httptest.h" />
@@ -134,6 +138,9 @@
<ClCompile Include="config.cpp" />
<ClCompile Include="diag\logging.cpp" />
<ClCompile Include="experimental\frontend.cpp" />
+ <ClCompile Include="experimental\vfs.cpp" />
+ <ClCompile Include="monitoring\httpstats.cpp" />
+ <ClCompile Include="monitoring\httpstatus.cpp" />
<ClCompile Include="projectstore.cpp" />
<ClCompile Include="cache\cacheagent.cpp" />
<ClCompile Include="sos\sos.cpp" />
diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters
index 1c5b17fee..04e639a33 100644
--- a/zenserver/zenserver.vcxproj.filters
+++ b/zenserver/zenserver.vcxproj.filters
@@ -41,6 +41,10 @@
<ClInclude Include="experimental\frontend.h">
<Filter>experimental</Filter>
</ClInclude>
+ <ClInclude Include="diag\formatters.h" />
+ <ClInclude Include="experimental\vfs.h" />
+ <ClInclude Include="monitoring\httpstats.h" />
+ <ClInclude Include="monitoring\httpstatus.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="zenserver.cpp" />
@@ -77,6 +81,9 @@
<ClCompile Include="experimental\frontend.cpp">
<Filter>experimental</Filter>
</ClCompile>
+ <ClCompile Include="experimental\vfs.cpp" />
+ <ClCompile Include="monitoring\httpstats.cpp" />
+ <ClCompile Include="monitoring\httpstatus.cpp" />
</ItemGroup>
<ItemGroup>
<Filter Include="cache">
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp
index 808fc8fb3..a4bbfa340 100644
--- a/zenstore/CAS.cpp
+++ b/zenstore/CAS.cpp
@@ -96,9 +96,16 @@ GcContext::ContributeCas(std::span<const IoHash> Cas)
//////////////////////////////////////////////////////////////////////////
void
-ScrubContext::ReportBadChunks(std::span<IoHash> BadChunks)
+ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks)
{
- ZEN_UNUSED(BadChunks);
+ m_BadCas.AddChunksToSet(BadCasChunks);
+}
+
+void
+ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
+{
+ m_ChunkCount.fetch_add(ChunkCount);
+ m_ByteCount.fetch_add(ChunkBytes);
}
/**
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index df5c32d25..7a5d7bcf4 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -204,7 +204,7 @@ struct CidStore::Impl
// TODO: Should compute a snapshot index here
- Ctx.ReportBadChunks(BadChunks);
+ Ctx.ReportBadCasChunks(BadChunks);
}
uint64_t m_LastScrubTime = 0;
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 5fc3ac356..612f87c7c 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -254,7 +254,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
- Ctx.ReportBadChunks(BadChunkHashes);
+ Ctx.ReportBadCasChunks(BadChunkHashes);
}
void
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index 0b18848d5..ee641b80a 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -394,7 +394,8 @@ FileCasStrategy::Flush()
void
FileCasStrategy::Scrub(ScrubContext& Ctx)
{
- std::vector<IoHash> BadHashes;
+ std::vector<IoHash> BadHashes;
+ std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
IterateChunks([&](const IoHash& Hash, BasicFile& Payload) {
IoHashStream Hasher;
@@ -405,8 +406,13 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
{
BadHashes.push_back(Hash);
}
+
+ ++ChunkCount;
+ ChunkBytes.fetch_add(Payload.FileSize());
});
+ Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
+
if (!BadHashes.empty())
{
ZEN_ERROR("file CAS scrubbing: {} bad chunks found", BadHashes.size());
@@ -428,7 +434,9 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
}
}
- Ctx.ReportBadChunks(BadHashes);
+ Ctx.ReportBadCasChunks(BadHashes);
+
+ ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes));
}
void
diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h
index 1425845a0..d0698df7f 100644
--- a/zenstore/include/zenstore/CAS.h
+++ b/zenstore/include/zenstore/CAS.h
@@ -31,6 +31,25 @@ struct CasStoreConfiguration
uint64_t HugeValueThreshold = 1024 * 1024;
};
+/** Manage a set of IoHash values
+ */
+
+class CasChunkSet
+{
+public:
+ void AddChunkToSet(const IoHash& HashToAdd);
+ void AddChunksToSet(std::span<const IoHash> HashesToAdd);
+ void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate);
+ void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback);
+ inline [[nodiscard]] bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); }
+ inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); }
+ inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); }
+
+private:
+ // Q: should we protect this with a lock, or is that a higher level concern?
+ std::unordered_set<IoHash> m_ChunkSet;
+};
+
/** Garbage Collection context object
*/
@@ -58,33 +77,26 @@ private:
class ScrubContext
{
public:
- virtual void ReportBadChunks(std::span<IoHash> BadChunks);
+ virtual void ReportBadCasChunks(std::span<IoHash> BadCasChunks);
inline uint64_t ScrubTimestamp() const { return m_ScrubTime; }
inline bool RunRecovery() const { return m_Recover; }
+ void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes);
+
+ inline uint64_t ScrubbedChunks() const { return m_ChunkCount; }
+ inline uint64_t ScrubbedBytes() const { return m_ByteCount; }
private:
- uint64_t m_ScrubTime = GetHifreqTimerValue();
- bool m_Recover = true;
+ uint64_t m_ScrubTime = GetHifreqTimerValue();
+ bool m_Recover = true;
+ std::atomic<uint64_t> m_ChunkCount{0};
+ std::atomic<uint64_t> m_ByteCount{0};
+ CasChunkSet m_BadCas;
+ CasChunkSet m_BadCid;
};
-/** Manage a set of IoHash values
- */
-
-class CasChunkSet
-{
-public:
- void AddChunkToSet(const IoHash& HashToAdd);
- void AddChunksToSet(std::span<const IoHash> HashesToAdd);
- void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate);
- void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback);
- inline [[nodiscard]] bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); }
- inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); }
- inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); }
+/** Content Addressable Storage interface
-private:
- // Q: should we protect this with a lock, or is that a higher level concern?
- std::unordered_set<IoHash> m_ChunkSet;
-};
+ */
class CasStore
{