diff options
| author | Stefan Boberg <[email protected]> | 2021-10-05 22:25:53 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-10-05 22:25:53 +0200 |
| commit | 20ac7384f8ca558f1fb933eda846604792240ea0 (patch) | |
| tree | e5c95b422b847af50b77807af916e389fcaf83aa | |
| parent | stats: Mean returns zero when the count is zero (diff) | |
| download | zen-20ac7384f8ca558f1fb933eda846604792240ea0.tar.xz zen-20ac7384f8ca558f1fb933eda846604792240ea0.zip | |
Merged from upstream
30 files changed, 862 insertions, 301 deletions
diff --git a/scripts/deploybuild.py b/scripts/deploybuild.py index 971f34ff9..c81235a8c 100644 --- a/scripts/deploybuild.py +++ b/scripts/deploybuild.py @@ -17,6 +17,10 @@ def jazz_print(tag, detail = ""): def jazz_fail(tag, detail = ""): print(f"{Fore.RED}{Style.BRIGHT}||> {tag}{Style.RESET_ALL} {detail}") +def copy_file(src, dst): + print(f"{Fore.WHITE}{Style.BRIGHT}||> COPY {Style.RESET_ALL} {src} -> {Fore.GREEN}{Style.BRIGHT}{dst}") + shutil.copy(src, dst) + colorama.init() origcwd = os.getcwd() @@ -25,9 +29,13 @@ origcwd = os.getcwd() parser = argparse.ArgumentParser(description='Deploy a zen build to an UE tree') parser.add_argument("root", help="Path to an UE5 root directory") +parser.add_argument("--sentry", action="store_true", help="Whether to upload symobls to Sentry") +parser.add_argument("--xmake", action="store_true", help="Build with XMake") args = parser.parse_args() engineroot = args.root +upload_symbols = args.sentry +use_xmake = args.xmake if not os.path.isfile(os.path.join(engineroot, "RunUAT.bat")): print(f"{Fore.RED}Not a valid UE5 engine root directory: '{engineroot}'") @@ -45,20 +53,29 @@ jazz_print("Zen root:", zenroot) # Build fresh binaries -vs_path = vswhere.get_latest_path() # can also specify prerelease=True -jazz_print("BUILDING CODE", f"using VS root: {vs_path}") -devenv_path = os.path.join(vs_path, "Common7\\IDE\\devenv.com") +if use_xmake: + build_cmd = ["xmake", "-b", "zenserver"] + build_output_dir = r'build\windows\x64\release' +else: + vs_path = vswhere.get_latest_path() # can also specify prerelease=True + jazz_print("BUILDING CODE", f"using VS root: {vs_path}") + devenv_path = os.path.join(vs_path, "Common7\\IDE\\devenv.com") + build_cmd = [devenv_path, "/build", "Release", "zen.sln"] + build_output_dir = r'x64\Release' try: - subprocess.run([devenv_path, "/build", "Release", "zen.sln"], check=True) + subprocess.run(build_cmd, check=True) except: jazz_fail("Build failed!") exit(1) -# Upload symbols etc to Sentry +build_output_binary_path = os.path.join(zenroot, build_output_dir, "zenserver.exe") +build_output_binary_pdb_path = os.path.join(zenroot, build_output_dir, "zenserver.pdb") -jazz_print("Uploading symbols", "to Sentry") -subprocess.run(["scripts\sentry-cli.exe", "upload-dif", "--org", "to", "--project", "zen-server", "x64\\Release\\zenserver.exe", "x64\\Release\\zenserver.pdb"]) +# Upload symbols etc to Sentry +if upload_symbols: + jazz_print("Uploading symbols", "to Sentry") + subprocess.run(["scripts\sentry-cli.exe", "upload-dif", "--org", "to", "--project", "zen-server", build_output_binary_path, build_output_binary_pdb_path]) # Change into root directory to pick up Perforce environment @@ -105,9 +122,9 @@ jazz_print("Placing zenserver", f"executables into tree at '{target_bin_dir}'") crashpadtarget = os.path.join(target_bin_dir, "crashpad_handler.exe") try: - shutil.copy(os.path.join(zenroot, "x64\Release\zenserver.exe"), os.path.join(target_bin_dir, "zenserver.exe")) - shutil.copy(os.path.join(zenroot, "x64\Release\zenserver.pdb"), os.path.join(target_bin_dir, "zenserver.pdb")) - shutil.copy(os.path.join(zenroot, r'vcpkg_installed\x64-windows-static\x64-windows-static\tools\sentry-native\crashpad_handler.exe'), crashpadtarget) + copy_file(build_output_binary_path, os.path.join(target_bin_dir, "zenserver.exe")) + copy_file(build_output_binary_pdb_path, os.path.join(target_bin_dir, "zenserver.pdb")) + copy_file(os.path.join(zenroot, r'vcpkg_installed\x64-windows-static\tools\sentry-native\crashpad_handler.exe'), crashpadtarget) P4.add(crashpadtarget).run() except Exception as e: print(f"Caught exception while copying: {e.args}") diff --git a/zencore/compactbinary.cpp b/zencore/compactbinary.cpp index 4072ab5cf..c4694733c 100644 --- a/zencore/compactbinary.cpp +++ b/zencore/compactbinary.cpp @@ -1474,10 +1474,30 @@ public: Builder << Accessor.AsIntegerNegative(); break; case CbFieldType::Float32: - Builder.Append("{:.9g}"_format(Accessor.AsFloat32())); + { + const float Value = Accessor.AsFloat32(); + if (std::isfinite(Value)) + { + Builder.Append("{:.9g}"_format(Value)); + } + else + { + Builder << "null"sv; + } + } break; case CbFieldType::Float64: - Builder.Append("{:.17g}"_format(Accessor.AsFloat64())); + { + const double Value = Accessor.AsFloat64(); + if (std::isfinite(Value)) + { + Builder.Append("{:.17g}"_format(Value)); + } + else + { + Builder << "null"sv; + } + } break; case CbFieldType::BoolFalse: Builder << "false"sv; @@ -1834,7 +1854,7 @@ TEST_CASE("uson.json") SUBCASE("number") { - const double ExpectedFloatValue = 21.21f; + const float ExpectedFloatValue = 21.21f; const double ExpectedDoubleValue = 42.42; CbObjectWriter Writer; @@ -1856,6 +1876,31 @@ TEST_CASE("uson.json") CHECK(FloatValue == doctest::Approx(ExpectedFloatValue)); CHECK(DoubleValue == doctest::Approx(ExpectedDoubleValue)); } + + SUBCASE("number.nan") + { + const float FloatNan = std::numeric_limits<float>::quiet_NaN(); + const double DoubleNan = std::numeric_limits<double>::quiet_NaN(); + + CbObjectWriter Writer; + Writer << "FloatNan" << FloatNan; + Writer << "DoubleNan" << DoubleNan; + + CbObject Obj = Writer.Save(); + + StringBuilder<128> Sb; + const std::string_view JsonText = Obj.ToJson(Sb).ToView(); + + std::string JsonError; + json11::Json Json = json11::Json::parse(JsonText.data(), JsonError); + + const double FloatValue = Json["FloatNan"].number_value(); + const double DoubleValue = Json["DoubleNan"].number_value(); + + CHECK(JsonError.empty()); + CHECK(FloatValue == 0); + CHECK(DoubleValue == 0); + } } #endif diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp index f6ba92f98..745fe0779 100644 --- a/zencore/filesystem.cpp +++ b/zencore/filesystem.cpp @@ -63,6 +63,13 @@ DeleteReparsePoint(const wchar_t* Path, DWORD dwReparseTag) bool CreateDirectories(const wchar_t* Dir) { + // This may be suboptimal, in that it appears to try and create directories + // from the root on up instead of from some directory which is known to + // be present + // + // We should implement a smarter version at some point since this can be + // pretty expensive in aggregate + return std::filesystem::create_directories(Dir); } @@ -522,7 +529,7 @@ WriteFile(std::filesystem::path Path, IoBuffer Data) WriteFile(Path, &DataPtr, 1); } -IoBuffer +IoBuffer FileContents::Flatten() { if (Data.size() == 1) diff --git a/zencore/include/zencore/blockingqueue.h b/zencore/include/zencore/blockingqueue.h new file mode 100644 index 000000000..fbb4480a1 --- /dev/null +++ b/zencore/include/zencore/blockingqueue.h @@ -0,0 +1,73 @@ +#pragma once + +#include <atomic> +#include <deque> +#include <mutex> + +namespace zen { + +template<typename T> +class BlockingQueue +{ +public: + BlockingQueue() = default; + + ~BlockingQueue() { CompleteAdding(); } + + void Enqueue(T&& Item) + { + { + std::lock_guard Lock(m_Lock); + m_Queue.emplace_back(std::move(Item)); + m_Size++; + } + + m_NewItemSignal.notify_one(); + } + + bool WaitAndDequeue(T& Item) + { + if (m_CompleteAdding.load()) + { + return false; + } + + std::unique_lock Lock(m_Lock); + m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); + + if (!m_Queue.empty()) + { + Item = std::move(m_Queue.front()); + m_Queue.pop_front(); + m_Size--; + + return true; + } + + return false; + } + + void CompleteAdding() + { + if (!m_CompleteAdding.load()) + { + m_CompleteAdding.store(true); + m_NewItemSignal.notify_all(); + } + } + + std::size_t Size() const + { + std::unique_lock Lock(m_Lock); + return m_Queue.size(); + } + +private: + mutable std::mutex m_Lock; + std::condition_variable m_NewItemSignal; + std::deque<T> m_Queue; + std::atomic_bool m_CompleteAdding{false}; + std::atomic_uint32_t m_Size; +}; + +} // namespace zen diff --git a/zencore/include/zencore/refcount.h b/zencore/include/zencore/refcount.h index 320718f5b..7167ab3b5 100644 --- a/zencore/include/zencore/refcount.h +++ b/zencore/include/zencore/refcount.h @@ -4,6 +4,8 @@ #include "atomic.h" #include "zencore.h" +#include <concepts> + namespace zen { /** @@ -114,6 +116,10 @@ public: inline Ref(T* Ptr) : m_Ref(Ptr) { m_Ref && m_Ref->AddRef(); } inline ~Ref() { m_Ref && m_Ref->Release(); } + template<typename DerivedType> + requires std::derived_from<DerivedType, T> + inline Ref(const Ref<DerivedType>& Rhs) : Ref(Rhs.m_Ref) {} + [[nodiscard]] inline bool IsNull() const { return m_Ref == nullptr; } inline explicit operator bool() const { return m_Ref != nullptr; } inline T* operator->() const { return m_Ref; } @@ -152,6 +158,9 @@ public: private: T* m_Ref = nullptr; + + template<class T> + friend class Ref; }; void refcount_forcelink(); diff --git a/zencore/include/zencore/timer.h b/zencore/include/zencore/timer.h index 693b6daaa..e4ddc3505 100644 --- a/zencore/include/zencore/timer.h +++ b/zencore/include/zencore/timer.h @@ -38,6 +38,21 @@ private: uint64_t m_StartValue; }; +// Low frequency timers + +namespace detail { + extern ZENCORE_API uint64_t g_LofreqTimerValue; +} // namespace detail + +inline uint64_t +GetLofreqTimerValue() +{ + return detail::g_LofreqTimerValue; +} + +ZENCORE_API void UpdateLofreqTimerValue(); +ZENCORE_API uint64_t GetLofreqTimerFrequency(); + void timer_forcelink(); // internal } // namespace zen diff --git a/zencore/timer.cpp b/zencore/timer.cpp index 5d30d9b29..9180519bd 100644 --- a/zencore/timer.cpp +++ b/zencore/timer.cpp @@ -69,6 +69,22 @@ GetHifreqTimerFrequencySafe() } ////////////////////////////////////////////////////////////////////////// + +uint64_t detail::g_LofreqTimerValue = GetHifreqTimerValue(); + +void +UpdateLofreqTimerValue() +{ + detail::g_LofreqTimerValue = GetHifreqTimerValue(); +} + +uint64_t +GetLofreqTimerFrequency() +{ + return GetHifreqTimerFrequencySafe(); +} + +////////////////////////////////////////////////////////////////////////// // // Testing related code follows... // diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp index fedaf282e..de3069bb8 100644 --- a/zenhttp/httpsys.cpp +++ b/zenhttp/httpsys.cpp @@ -1091,29 +1091,34 @@ HttpSysServerRequest::HttpSysServerRequest(HttpSysTransaction& Tx, HttpService& if (AbsPathLength >= PrefixLength) { // We convert the URI immediately because most of the code involved prefers to deal - // with utf8. This has some performance impact which I'd prefer to avoid but for now - // we just have to live with it + // with utf8. This is overhead which I'd prefer to avoid but for now we just have + // to live with it WideToUtf8({(char16_t*)HttpRequestPtr->CookedUrl.pAbsPath + PrefixLength, gsl::narrow<size_t>(AbsPathLength - PrefixLength)}, m_UriUtf8); - std::string_view Uri8{m_UriUtf8}; + std::string_view UriSuffix8{m_UriUtf8}; - const size_t LastComponentIndex = Uri8.find_last_of('/'); + const size_t LastComponentIndex = UriSuffix8.find_last_of('/'); if (LastComponentIndex != std::string_view::npos) { - Uri8.remove_prefix(LastComponentIndex); + UriSuffix8.remove_prefix(LastComponentIndex); } - const size_t LastDotIndex = Uri8.find_last_of('.'); + const size_t LastDotIndex = UriSuffix8.find_last_of('.'); if (LastDotIndex != std::string_view::npos) { - Uri8.remove_prefix(LastDotIndex + 1); - } + UriSuffix8.remove_prefix(LastDotIndex + 1); + + AcceptContentType = ParseContentType(UriSuffix8); - AcceptContentType = ParseContentType(Uri8); + if (AcceptContentType != HttpContentType::kUnknownContentType) + { + m_UriUtf8.RemoveSuffix(uint32_t(m_UriUtf8.Size() - LastComponentIndex - LastDotIndex - 1)); + } + } } else { diff --git a/zenhttp/iothreadpool.cpp b/zenhttp/iothreadpool.cpp index 4f1a6642b..6087e69ec 100644 --- a/zenhttp/iothreadpool.cpp +++ b/zenhttp/iothreadpool.cpp @@ -4,6 +4,8 @@ #include <zencore/except.h> +#if ZEN_PLATFORM_WINDOWS + namespace zen { WinIoThreadPool::WinIoThreadPool(int InThreadCount) @@ -32,6 +34,8 @@ WinIoThreadPool::~WinIoThreadPool() void WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) { + ZEN_ASSERT(!m_ThreadPoolIo); + m_ThreadPoolIo = CreateThreadpoolIo(IoHandle, Callback, Context, &m_CallbackEnvironment); if (!m_ThreadPoolIo) @@ -41,3 +45,5 @@ WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, voi } } // namespace zen + +#endif diff --git a/zenhttp/iothreadpool.h b/zenhttp/iothreadpool.h index 4418b940b..8333964c3 100644 --- a/zenhttp/iothreadpool.h +++ b/zenhttp/iothreadpool.h @@ -2,9 +2,12 @@ #pragma once -#include <zencore/windows.h> +#include <zencore/zencore.h> -#include <system_error> +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> + +# include <system_error> namespace zen { @@ -31,3 +34,4 @@ private: }; } // namespace zen +#endif diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index d6174caf6..27bb1c5cd 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -12,6 +12,7 @@ #include <zenhttp/httpserver.h> #include <zenstore/CAS.h> +#include "monitoring/httpstats.h" #include "structuredcache.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" @@ -149,13 +150,19 @@ ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, CasStore& InStore, CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, std::unique_ptr<UpstreamCache> UpstreamCache) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) +, m_StatsService(StatsService) +, m_StatusService(StatusService) , m_CasStore(InStore) , m_CidStore(InCidStore) , m_UpstreamCache(std::move(UpstreamCache)) { + StatsService.RegisterHandler("z$", *this); + StatusService.RegisterHandler("z$", *this); } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -200,13 +207,6 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { std::string_view Key = Request.RelativeUri(); - if (Key.empty() || Key == "stats.json") - { - $.Cancel(); - - return HandleStatusRequest(Request); - } - if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); })) { // Bucket reference @@ -870,10 +870,9 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& } void -HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) +HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) { CbObjectWriter Cbo; - Cbo << "ok" << true; EmitSnapshot("requests", m_HttpRequests, Cbo); @@ -899,4 +898,12 @@ HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } +void +HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + } // namespace zen diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index a360878bd..703e24ed3 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -5,6 +5,9 @@ #include <zencore/stats.h> #include <zenhttp/httpserver.h> +#include "monitoring/httpstats.h" +#include "monitoring/httpstatus.h" + #include <memory> namespace spdlog { @@ -47,12 +50,14 @@ enum class CachePolicy : uint8_t; * */ -class HttpStructuredCacheService : public zen::HttpService +class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider { public: HttpStructuredCacheService(ZenCacheStore& InCacheStore, - zen::CasStore& InCasStore, - zen::CidStore& InCidStore, + CasStore& InCasStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, std::unique_ptr<UpstreamCache> UpstreamCache); ~HttpStructuredCacheService(); @@ -86,13 +91,16 @@ private: void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); - void HandleStatusRequest(zen::HttpServerRequest& Request); + virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; + virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; - zen::ZenCacheStore& m_CacheStore; - zen::CasStore& m_CasStore; - zen::CidStore& m_CidStore; + ZenCacheStore& m_CacheStore; + HttpStatsService& m_StatsService; + HttpStatusService& m_StatusService; + CasStore& m_CasStore; + CidStore& m_CidStore; std::unique_ptr<UpstreamCache> m_UpstreamCache; uint64_t m_LastScrubTime = 0; metrics::OperationTiming m_HttpRequests; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index b97f0830f..83a21f87a 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -32,6 +32,8 @@ ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir { ZEN_INFO("initializing structured cache at '{}'", RootDir); CreateDirectories(RootDir); + + m_DiskLayer.DiscoverBuckets(); } ZenCacheStore::~ZenCacheStore() @@ -149,6 +151,10 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa _.ReleaseNow(); + // There's a race here. Since the lock is released early to allow + // inserts, the bucket delete path could end up deleting the + // underlying data structure + return Bucket->Get(HashKey, OutValue); } @@ -210,11 +216,13 @@ ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx) void ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) { + RwLock::SharedLockScope _(m_bucketLock); + std::vector<IoHash> BadHashes; for (auto& Kv : m_cacheMap) { - if (Kv.first != IoHash::HashBuffer(Kv.second)) + if (Kv.first != IoHash::HashBuffer(Kv.second.Payload)) { BadHashes.push_back(Kv.first); } @@ -222,10 +230,16 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) if (!BadHashes.empty()) { - Ctx.ReportBadChunks(BadHashes); + Ctx.ReportBadCasChunks(BadHashes); } } +void +ZenCacheMemoryLayer::CacheBucket::GarbageCollect(GcContext& GcCtx) +{ + ZEN_UNUSED(GcCtx); +} + bool ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -237,18 +251,26 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV } else { - OutValue.Value = bucketIt->second; + BucketValue& Value = bucketIt.value(); + OutValue.Value = Value.Payload; + Value.LastAccess = GetCurrentTimeStamp(); return true; } } +uint64_t +ZenCacheMemoryLayer::CacheBucket::GetCurrentTimeStamp() +{ + return GetLofreqTimerValue(); +} + void ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { RwLock::ExclusiveLockScope _(m_bucketLock); - m_cacheMap[HashKey] = Value.Value; + m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GetCurrentTimeStamp(), .Payload = Value.Value}); } ////////////////////////////////////////////////////////////////////////// @@ -258,11 +280,17 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue struct DiskLocation { - uint64_t OffsetAndFlags; - uint32_t Size; - uint32_t IndexDataSize; + inline DiskLocation() = default; + + inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) + : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) + , LowerSize(ValueSize & 0xFFFFffff) + , IndexDataSize(IndexSize) + { + } - static const uint64_t kOffsetMask = 0x00FF'ffFF'ffFF'ffFFull; + static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; + static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; static const uint64_t kStructured = 0x4000'0000'0000'0000ull; @@ -270,6 +298,7 @@ struct DiskLocation static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } + inline uint64_t Size() const { return LowerSize; } inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } inline ZenContentType GetContentType() const { @@ -282,6 +311,11 @@ struct DiskLocation return ContentType; } + +private: + uint64_t OffsetAndFlags = 0; + uint32_t LowerSize = 0; + uint32_t IndexDataSize = 0; }; struct DiskIndexEntry @@ -299,7 +333,7 @@ struct ZenCacheDiskLayer::CacheBucket CacheBucket(CasStore& Cas); ~CacheBucket(); - void OpenOrCreate(std::filesystem::path BucketDir); + void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); static bool Delete(std::filesystem::path BucketDir); bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); @@ -309,13 +343,13 @@ struct ZenCacheDiskLayer::CacheBucket void Scrub(ScrubContext& Ctx); void GarbageCollect(GcContext& GcCtx); - inline bool IsOk() const { return m_Ok; } + inline bool IsOk() const { return m_IsOk; } private: CasStore& m_CasStore; std::filesystem::path m_BucketDir; Oid m_BucketId; - bool m_Ok = false; + bool m_IsOk = false; uint64_t m_LargeObjectThreshold = 64 * 1024; // These files are used to manage storage of small objects for this bucket @@ -355,7 +389,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) } void -ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) +ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) { CreateDirectories(BucketDir); @@ -382,17 +416,23 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) { ManifestFile.Read(&m_BucketId, sizeof(Oid), 0); - m_Ok = true; + m_IsOk = true; } - if (!m_Ok) + if (!m_IsOk) { ManifestFile.Close(); } } - if (!m_Ok) + if (!m_IsOk) { + if (AllowCreate == false) + { + // Invalid bucket + return; + } + // No manifest file found, this is a new bucket ManifestFile.Open(ManifestPath, /* IsCreate */ true, Ec); @@ -424,13 +464,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) m_SlogFile.Replay([&](const DiskIndexEntry& Record) { m_Index[Record.Key] = Record.Location; - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size); + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size()); }); m_WriteCursor = (MaxFileOffset + 15) & ~15; } - m_Ok = true; + m_IsOk = true; } void @@ -453,7 +493,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, Zen { if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size); + OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size()); OutValue.Value.SetContentType(Loc.GetContentType()); return true; @@ -482,7 +522,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const IoHash& HashKey, Z bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { - if (!m_Ok) + if (!m_IsOk) { return false; } @@ -509,7 +549,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { - if (!m_Ok) + if (!m_IsOk) { return; } @@ -531,10 +571,9 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& RwLock::ExclusiveLockScope _(m_IndexLock); - DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags), - .Size = gsl::narrow<uint32_t>(Value.Value.Size())}; + DiskLocation Loc(m_WriteCursor, Value.Value.Size(), 0, EntryFlags); - m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size, 16); + m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size(), 16); if (auto it = m_Index.find(HashKey); it == m_Index.end()) { @@ -544,11 +583,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& else { // TODO: should check if write is idempotent and bail out if it is? + // this would requiring comparing contents on disk unless we add a + // content hash to the index entry it.value() = Loc; } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset()); + m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset()); } } @@ -572,55 +613,45 @@ ZenCacheDiskLayer::CacheBucket::Flush() void ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) { - std::vector<DiskIndexEntry> StandaloneFiles; - - std::vector<IoHash> BadChunks; - std::vector<IoBuffer> BadStandaloneChunks; + std::atomic<uint64_t> ScrubbedChunks{0}, ScrubbedBytes{0}; { RwLock::SharedLockScope _(m_IndexLock); for (auto& Kv : m_Index) { - const IoHash& Hash = Kv.first; - const DiskLocation& Loc = Kv.second; + const IoHash& HashKey = Kv.first; + const DiskLocation& Loc = Kv.second; ZenCacheValue Value; - if (!GetInlineCacheValue(Loc, Value)) + if (GetInlineCacheValue(Loc, Value)) { - ZEN_ASSERT(Loc.IsFlagSet(DiskLocation::kStandaloneFile)); - StandaloneFiles.push_back({.Key = Hash, .Location = Loc}); + // Validate contents } else { - if (GetStandaloneCacheValue(Hash, Value, Loc)) + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - // Hash contents - - const IoHash ComputedHash = HashBuffer(Value.Value); - - if (ComputedHash != Hash) + if (GetStandaloneCacheValue(HashKey, Value, Loc)) { - BadChunks.push_back(Hash); + // Note: we cannot currently validate contents since we don't + // have a content hash! + } + else + { + // Value not found } - } - else - { - // Non-existent } } } } - if (Ctx.RunRecovery()) - { - // Clean out bad chunks - } + Ctx.ReportScrubbed(ScrubbedChunks, ScrubbedBytes); - if (!BadChunks.empty()) + if (Ctx.RunRecovery()) { - Ctx.ReportBadChunks(BadChunks); + // Clean out bad data } } @@ -681,7 +712,7 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenC RwLock::ExclusiveLockScope _(m_IndexLock); - DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0}; + DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags); if (auto it = m_Index.find(HashKey); it == m_Index.end()) { @@ -739,7 +770,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); - Bucket->OpenOrCreate(BucketPath.c_str()); + Bucket->OpenOrCreate(BucketPath); } } @@ -782,7 +813,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z std::filesystem::path bucketPath = m_RootDir; bucketPath /= std::string(InBucket); - Bucket->OpenOrCreate(bucketPath.c_str()); + Bucket->OpenOrCreate(bucketPath); } } @@ -794,6 +825,63 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z } } +void +ZenCacheDiskLayer::DiscoverBuckets() +{ + FileSystemTraversal Traversal; + struct Visitor : public FileSystemTraversal::TreeVisitor + { + virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent, + [[maybe_unused]] const path_view& File, + [[maybe_unused]] uint64_t FileSize) override + { + } + + virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override + { + Dirs.push_back(std::wstring(DirectoryName)); + return false; + } + + std::vector<std::wstring> Dirs; + } Visit; + + Traversal.TraverseFileSystem(m_RootDir, Visit); + + // Initialize buckets + + RwLock::ExclusiveLockScope _(m_Lock); + + for (const std::wstring& BucketName : Visit.Dirs) + { + // New bucket needs to be created + + std::string BucketName8 = WideToUtf8(BucketName); + + if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end()) + { + } + else + { + auto InsertResult = m_Buckets.try_emplace(BucketName8, m_CasStore); + + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName8; + + CacheBucket& Bucket = InsertResult.first->second; + + Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false); + + if (!Bucket.IsOk()) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir); + + m_Buckets.erase(InsertResult.first); + } + } + } +} + bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 011f13323..4753af627 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -46,6 +46,11 @@ struct ZenCacheValue CbObject IndexData; }; +/** In-memory cache storage + + Intended for small values which are frequently accessed + + */ class ZenCacheMemoryLayer { public: @@ -58,19 +63,39 @@ public: void Scrub(ScrubContext& Ctx); void GarbageCollect(GcContext& GcCtx); + struct Configuration + { + uint64_t TargetFootprintBytes = 16 * 1024 * 1024; + uint64_t ScavengeThreshold = 4 * 1024 * 1024; + }; + + const Configuration& GetConfiguration() const { return m_Configuration; } + void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; } + private: struct CacheBucket { - RwLock m_bucketLock; - tsl::robin_map<IoHash, IoBuffer> m_cacheMap; + struct BucketValue + { + uint64_t LastAccess = 0; + IoBuffer Payload; + }; + + RwLock m_bucketLock; + tsl::robin_map<IoHash, BucketValue> m_cacheMap; bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); void Put(const IoHash& HashKey, const ZenCacheValue& Value); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); + + private: + uint64_t GetCurrentTimeStamp(); }; RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; + Configuration m_Configuration; }; class ZenCacheDiskLayer @@ -86,6 +111,8 @@ public: void Scrub(ScrubContext& Ctx); void GarbageCollect(GcContext& GcCtx); + void DiscoverBuckets(); + private: /** A cache bucket manages a single directory containing metadata and data for that bucket diff --git a/zenserver/monitoring/httpstats.cpp b/zenserver/monitoring/httpstats.cpp new file mode 100644 index 000000000..de04294d0 --- /dev/null +++ b/zenserver/monitoring/httpstats.cpp @@ -0,0 +1,50 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httpstats.h" + +namespace zen { + +HttpStatsService::HttpStatsService() : m_Log(logging::Get("stats")) +{ +} + +HttpStatsService::~HttpStatsService() +{ +} + +const char* +HttpStatsService::BaseUri() const +{ + return "/stats/"; +} + +void +HttpStatsService::RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_Providers.insert_or_assign(std::string(Id), &Provider); +} + +void +HttpStatsService::HandleRequest(HttpServerRequest& Request) +{ + using namespace std::literals; + + std::string_view Key = Request.RelativeUri(); + + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers)) + { + return It->second->HandleStatsRequest(Request); + } + + [[fallthrough]]; + default: + return; + } +} + +} // namespace zen diff --git a/zenserver/monitoring/httpstats.h b/zenserver/monitoring/httpstats.h new file mode 100644 index 000000000..1c3c79dd0 --- /dev/null +++ b/zenserver/monitoring/httpstats.h @@ -0,0 +1,37 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/logging.h> +#include <zenhttp/httpserver.h> + +#include <map> + +namespace zen { + +struct IHttpStatsProvider +{ + virtual void HandleStatsRequest(HttpServerRequest& Request) = 0; +}; + +class HttpStatsService : public HttpService +{ +public: + HttpStatsService(); + ~HttpStatsService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + void RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider); + +private: + spdlog::logger& m_Log; + HttpRequestRouter m_Router; + + inline spdlog::logger& Log() { return m_Log; } + + RwLock m_Lock; + std::map<std::string, IHttpStatsProvider*> m_Providers; +}; + +} // namespace zen
\ No newline at end of file diff --git a/zenserver/monitoring/httpstatus.cpp b/zenserver/monitoring/httpstatus.cpp new file mode 100644 index 000000000..e12662b1c --- /dev/null +++ b/zenserver/monitoring/httpstatus.cpp @@ -0,0 +1,50 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httpstatus.h" + +namespace zen { + +HttpStatusService::HttpStatusService() : m_Log(logging::Get("status")) +{ +} + +HttpStatusService::~HttpStatusService() +{ +} + +const char* +HttpStatusService::BaseUri() const +{ + return "/status/"; +} + +void +HttpStatusService::RegisterHandler(std::string_view Id, IHttpStatusProvider& Provider) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_Providers.insert_or_assign(std::string(Id), &Provider); +} + +void +HttpStatusService::HandleRequest(HttpServerRequest& Request) +{ + using namespace std::literals; + + std::string_view Key = Request.RelativeUri(); + + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers)) + { + return It->second->HandleStatusRequest(Request); + } + + [[fallthrough]]; + default: + return; + } +} + +} // namespace zen diff --git a/zenserver/monitoring/httpstatus.h b/zenserver/monitoring/httpstatus.h new file mode 100644 index 000000000..8f069f760 --- /dev/null +++ b/zenserver/monitoring/httpstatus.h @@ -0,0 +1,37 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/logging.h> +#include <zenhttp/httpserver.h> + +#include <map> + +namespace zen { + +struct IHttpStatusProvider +{ + virtual void HandleStatusRequest(HttpServerRequest& Request) = 0; +}; + +class HttpStatusService : public HttpService +{ +public: + HttpStatusService(); + ~HttpStatusService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + void RegisterHandler(std::string_view Id, IHttpStatusProvider& Provider); + +private: + spdlog::logger& m_Log; + HttpRequestRouter m_Router; + + RwLock m_Lock; + std::map<std::string, IHttpStatusProvider*> m_Providers; + + inline spdlog::logger& Log() { return m_Log; } +}; + +} // namespace zen
\ No newline at end of file diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index a183e7ebf..ba5991b3f 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -8,6 +8,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> +#include <zencore/blockingqueue.h> #include <zencore/fmtutils.h> #include <zencore/stats.h> #include <zencore/stream.h> @@ -23,7 +24,6 @@ #include <algorithm> #include <atomic> -#include <deque> #include <thread> #include <unordered_map> @@ -33,70 +33,6 @@ using namespace std::literals; namespace detail { - template<typename T> - class BlockingQueue - { - public: - BlockingQueue() = default; - - ~BlockingQueue() { CompleteAdding(); } - - void Enqueue(T&& Item) - { - { - std::lock_guard Lock(m_Lock); - m_Queue.emplace_back(std::move(Item)); - m_Size++; - } - - m_NewItemSignal.notify_one(); - } - - bool WaitAndDequeue(T& Item) - { - if (m_CompleteAdding.load()) - { - return false; - } - - std::unique_lock Lock(m_Lock); - m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); - - if (!m_Queue.empty()) - { - Item = std::move(m_Queue.front()); - m_Queue.pop_front(); - m_Size--; - - return true; - } - - return false; - } - - void CompleteAdding() - { - if (!m_CompleteAdding.load()) - { - m_CompleteAdding.store(true); - m_NewItemSignal.notify_all(); - } - } - - std::size_t Size() const - { - std::unique_lock Lock(m_Lock); - return m_Queue.size(); - } - - private: - mutable std::mutex m_Lock; - std::condition_variable m_NewItemSignal; - std::deque<T> m_Queue; - std::atomic_bool m_CompleteAdding{false}; - std::atomic_uint32_t m_Size; - }; - class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: @@ -1029,7 +965,7 @@ private: spdlog::logger& Log() { return m_Log; } - using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>; + using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>; struct RunState { diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index c988a6b0b..6141fd397 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -328,7 +328,9 @@ namespace detail { ////////////////////////////////////////////////////////////////////////// -ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl) : m_ServiceUrl(ServiceUrl) +ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl) +: m_Log(logging::Get(std::string_view("zenclient"))) +, m_ServiceUrl(ServiceUrl) { } @@ -369,7 +371,7 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) using namespace std::literals; ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) -: m_Log(logging::Get("zenclient"sv)) +: m_Log(OuterClient.Log()) , m_Client(OuterClient) { m_SessionState = m_Client.AllocSessionState(); diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 158be668a..12e46bd8d 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -138,8 +138,11 @@ public: std::string_view ServiceUrl() const { return m_ServiceUrl; } + inline spdlog::logger& Log() { return m_Log; } + private: - std::string m_ServiceUrl; + spdlog::logger& m_Log; + std::string m_ServiceUrl; RwLock m_SessionStateLock; std::list<detail::ZenCacheSessionState*> m_SessionStateCache; diff --git a/zenserver/xmake.lua b/zenserver/xmake.lua index 7a6981fcd..fb1ba651d 100644 --- a/zenserver/xmake.lua +++ b/zenserver/xmake.lua @@ -32,3 +32,18 @@ target("zenserver") add_packages( "vcpkg::cxxopts", "vcpkg::mimalloc") + + on_load(function(target) + local commit, err = os.iorun("git log -1 --format=\"%h-%cI\"") + if commit ~= nil then + commit = commit:gsub("%s+", "") + commit = commit:gsub("\n", "") + if is_mode("release") then + commit = "rel-" .. commit + else + commit = "dbg-" .. commit + end + target:add("defines","BUILD_VERSION=\"" .. commit .. "\"") + print("build version " .. commit) + end + end) diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index f9b4d5677..18c59636d 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -1,5 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zencore/compactbinarybuilder.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> @@ -31,6 +32,10 @@ #include <set> #include <unordered_map> +#if !defined(BUILD_VERSION) +# define BUILD_VERSION ("dev-build") +#endif + ////////////////////////////////////////////////////////////////////////// // We don't have any doctest code in this file but this is needed to bring // in some shared code into the executable @@ -83,6 +88,8 @@ #include "diag/diagsvcs.h" #include "experimental/frontend.h" #include "experimental/usnjournal.h" +#include "monitoring/httpstats.h" +#include "monitoring/httpstatus.h" #include "projectstore.h" #include "testing/httptest.h" #include "testing/launch.h" @@ -96,17 +103,16 @@ namespace zen { -class ZenServer -{ - ZenServerState::ZenServerEntry* m_ServerEntry = nullptr; +using namespace std::literals; +class ZenServer : public IHttpStatusProvider +{ public: void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid, ZenServerState::ZenServerEntry* ServerEntry) { - m_ServerEntry = ServerEntry; using namespace fmt::literals; - ZEN_INFO(ZEN_APP_NAME " initializing"); + m_ServerEntry = ServerEntry; m_DebugOptionForcedCrash = ServiceConfig.ShouldCrash; if (ParentPid) @@ -140,6 +146,15 @@ public: // Ok so now we're configured, let's kick things off + m_Http = zen::CreateHttpServer(); + m_Http->Initialize(BasePort); + m_Http->RegisterService(m_HealthService); + m_Http->RegisterService(m_StatsService); + m_Http->RegisterService(m_StatusService); + m_StatusService.RegisterHandler("status", *this); + + // Initialize storage and services + ZEN_INFO("initializing storage"); zen::CasStoreConfiguration Config; @@ -167,95 +182,7 @@ public: if (ServiceConfig.StructuredCacheEnabled) { - using namespace std::literals; - auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; }; - - ZEN_INFO("instantiating structured cache service"); - m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); - - std::unique_ptr<zen::UpstreamCache> UpstreamCache; - if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) - { - const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig; - - zen::UpstreamCacheOptions UpstreamOptions; - UpstreamOptions.ReadUpstream = - (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; - UpstreamOptions.WriteUpstream = - (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; - - if (UpstreamConfig.UpstreamThreadCount < 32) - { - UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); - } - - UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled; - - UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); - - if (!UpstreamConfig.ZenConfig.Urls.empty()) - { - std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls); - UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); - } - - { - zen::CloudCacheClientOptions Options; - if (UpstreamConfig.JupiterConfig.UseProductionSettings) - { - Options = zen::CloudCacheClientOptions{ - .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, - .DdcNamespace = "ue.ddc"sv, - .BlobStoreNamespace = "ue.ddc"sv, - .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, - .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, - .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, - .UseLegacyDdc = false}; - } - else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings) - { - Options = zen::CloudCacheClientOptions{ - .ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv, - .DdcNamespace = "ue4.ddc"sv, - .BlobStoreNamespace = "test.ddc"sv, - .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, - .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, - .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, - .UseLegacyDdc = false}; - } - - Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl); - Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace); - Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace); - Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider); - Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId); - Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret); - Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc; - - if (!Options.ServiceUrl.empty()) - { - std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options); - UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); - } - } - - if (UpstreamCache->Initialize()) - { - ZEN_INFO("upstream cache active ({})", - UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE" - : UpstreamOptions.ReadUpstream ? "READONLY" - : UpstreamOptions.WriteUpstream ? "WRITEONLY" - : "DISABLED"); - } - else - { - UpstreamCache.reset(); - ZEN_INFO("NOT using upstream cache"); - } - } - - m_StructuredCacheService.reset( - new zen::HttpStructuredCacheService(*m_CacheStore, *m_CasStore, *m_CidStore, std::move(UpstreamCache))); + InitializeStructuredCache(ServiceConfig); } else { @@ -273,13 +200,8 @@ public: } #endif - m_Http = zen::CreateHttpServer(); - m_Http->Initialize(BasePort); - m_Http->RegisterService(m_HealthService); - m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics m_Http->RegisterService(m_TestingService); - m_Http->RegisterService(m_AdminService); if (m_HttpProjectService) @@ -305,12 +227,15 @@ public: } m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot); + if (m_FrontendService) { m_Http->RegisterService(*m_FrontendService); } } + void InitializeStructuredCache(ZenServiceConfig& ServiceConfig); + #if ZEN_ENABLE_MESH void StartMesh(int BasePort) { @@ -351,8 +276,12 @@ public: const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode; + m_CurrentState = kRunning; + m_Http->Run(IsInteractiveMode); + m_CurrentState = kShuttingDown; + ZEN_INFO(ZEN_APP_NAME " exiting"); m_IoContext.stop(); @@ -425,6 +354,7 @@ public: void Scrub() { + Stopwatch Timer; ZEN_INFO("Storage validation STARTING"); ScrubContext Ctx; @@ -433,7 +363,13 @@ public: m_ProjectStore->Scrub(Ctx); m_StructuredCacheService->Scrub(Ctx); - ZEN_INFO("Storage validation DONE"); + const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); + + ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})", + NiceTimeSpanMs(ElapsedTimeMs), + NiceBytes(Ctx.ScrubbedBytes()), + Ctx.ScrubbedChunks(), + NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs)); } void Flush() @@ -451,18 +387,51 @@ public: m_ProjectStore->Flush(); } + virtual void HandleStatusRequest(HttpServerRequest& Request) override + { + CbObjectWriter Cbo; + Cbo << "ok" << true; + Cbo << "state" << ToString(m_CurrentState); + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } + private: - bool m_IsDedicatedMode = false; - bool m_TestMode = false; - std::filesystem::path m_DataRoot; - std::filesystem::path m_ContentRoot; - std::jthread m_IoRunner; - asio::io_context m_IoContext; - asio::steady_timer m_PidCheckTimer{m_IoContext}; - zen::ProcessMonitor m_ProcessMonitor; - zen::NamedMutex m_ServerMutex; + ZenServerState::ZenServerEntry* m_ServerEntry = nullptr; + bool m_IsDedicatedMode = false; + bool m_TestMode = false; + std::filesystem::path m_DataRoot; + std::filesystem::path m_ContentRoot; + std::jthread m_IoRunner; + asio::io_context m_IoContext; + asio::steady_timer m_PidCheckTimer{m_IoContext}; + zen::ProcessMonitor m_ProcessMonitor; + zen::NamedMutex m_ServerMutex; + + enum ServerState + { + kInitializing, + kRunning, + kShuttingDown + } m_CurrentState = kInitializing; + + std::string_view ToString(ServerState Value) + { + switch (Value) + { + case kInitializing: + return "initializing"sv; + case kRunning: + return "running"sv; + case kShuttingDown: + return "shutdown"sv; + default: + return "unknown"sv; + } + } zen::Ref<zen::HttpServer> m_Http; + zen::HttpStatusService m_StatusService; + zen::HttpStatsService m_StatsService; std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()}; std::unique_ptr<zen::CidStore> m_CidStore; std::unique_ptr<zen::ZenCacheStore> m_CacheStore; @@ -485,6 +454,100 @@ private: bool m_DebugOptionForcedCrash = false; }; +void +ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) +{ + using namespace std::literals; + auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; }; + + ZEN_INFO("instantiating structured cache service"); + m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); + + std::unique_ptr<zen::UpstreamCache> UpstreamCache; + if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) + { + const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig; + + zen::UpstreamCacheOptions UpstreamOptions; + UpstreamOptions.ReadUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; + UpstreamOptions.WriteUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; + + if (UpstreamConfig.UpstreamThreadCount < 32) + { + UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); + } + + UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled; + + UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); + + if (!UpstreamConfig.ZenConfig.Urls.empty()) + { + std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls); + UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); + } + + { + zen::CloudCacheClientOptions Options; + if (UpstreamConfig.JupiterConfig.UseProductionSettings) + { + Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, + .DdcNamespace = "ue.ddc"sv, + .BlobStoreNamespace = "ue.ddc"sv, + .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, + .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, + .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, + .UseLegacyDdc = false}; + } + else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings) + { + Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv, + .DdcNamespace = "ue4.ddc"sv, + .BlobStoreNamespace = "test.ddc"sv, + .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, + .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, + .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, + .UseLegacyDdc = false}; + } + + Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl); + Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace); + Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace); + Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider); + Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId); + Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret); + Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc; + + if (!Options.ServiceUrl.empty()) + { + std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options); + UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); + } + } + + if (UpstreamCache->Initialize()) + { + ZEN_INFO("upstream cache active ({})", + UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE" + : UpstreamOptions.ReadUpstream ? "READONLY" + : UpstreamOptions.WriteUpstream ? "WRITEONLY" + : "DISABLED"); + } + else + { + UpstreamCache.reset(); + ZEN_INFO("NOT using upstream cache"); + } + } + + m_StructuredCacheService.reset(new zen::HttpStructuredCacheService(*m_CacheStore, + *m_CasStore, + *m_CidStore, + m_StatsService, + m_StatusService, + std::move(UpstreamCache))); +} + } // namespace zen class ZenWindowsService : public WindowsService @@ -532,7 +595,7 @@ ZenWindowsService::Run() ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig); - ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort); + ZEN_INFO(ZEN_APP_NAME " - starting on port {}, build '{}'", GlobalOptions.BasePort, BUILD_VERSION); ZenServerState ServerState; ServerState.Initialize(); diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index 335786fbf..7fad477a1 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -108,8 +108,12 @@ <ClInclude Include="cache\structuredcachestore.h" /> <ClInclude Include="compute\apply.h" /> <ClInclude Include="config.h" /> + <ClInclude Include="diag\formatters.h" /> <ClInclude Include="diag\logging.h" /> <ClInclude Include="experimental\frontend.h" /> + <ClInclude Include="experimental\vfs.h" /> + <ClInclude Include="monitoring\httpstats.h" /> + <ClInclude Include="monitoring\httpstatus.h" /> <ClInclude Include="resource.h" /> <ClInclude Include="sos\sos.h" /> <ClInclude Include="testing\httptest.h" /> @@ -134,6 +138,9 @@ <ClCompile Include="config.cpp" /> <ClCompile Include="diag\logging.cpp" /> <ClCompile Include="experimental\frontend.cpp" /> + <ClCompile Include="experimental\vfs.cpp" /> + <ClCompile Include="monitoring\httpstats.cpp" /> + <ClCompile Include="monitoring\httpstatus.cpp" /> <ClCompile Include="projectstore.cpp" /> <ClCompile Include="cache\cacheagent.cpp" /> <ClCompile Include="sos\sos.cpp" /> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index 1c5b17fee..04e639a33 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -41,6 +41,10 @@ <ClInclude Include="experimental\frontend.h"> <Filter>experimental</Filter> </ClInclude> + <ClInclude Include="diag\formatters.h" /> + <ClInclude Include="experimental\vfs.h" /> + <ClInclude Include="monitoring\httpstats.h" /> + <ClInclude Include="monitoring\httpstatus.h" /> </ItemGroup> <ItemGroup> <ClCompile Include="zenserver.cpp" /> @@ -77,6 +81,9 @@ <ClCompile Include="experimental\frontend.cpp"> <Filter>experimental</Filter> </ClCompile> + <ClCompile Include="experimental\vfs.cpp" /> + <ClCompile Include="monitoring\httpstats.cpp" /> + <ClCompile Include="monitoring\httpstatus.cpp" /> </ItemGroup> <ItemGroup> <Filter Include="cache"> diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp index 808fc8fb3..a4bbfa340 100644 --- a/zenstore/CAS.cpp +++ b/zenstore/CAS.cpp @@ -96,9 +96,16 @@ GcContext::ContributeCas(std::span<const IoHash> Cas) ////////////////////////////////////////////////////////////////////////// void -ScrubContext::ReportBadChunks(std::span<IoHash> BadChunks) +ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks) { - ZEN_UNUSED(BadChunks); + m_BadCas.AddChunksToSet(BadCasChunks); +} + +void +ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) +{ + m_ChunkCount.fetch_add(ChunkCount); + m_ByteCount.fetch_add(ChunkBytes); } /** diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index df5c32d25..7a5d7bcf4 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -204,7 +204,7 @@ struct CidStore::Impl // TODO: Should compute a snapshot index here - Ctx.ReportBadChunks(BadChunks); + Ctx.ReportBadCasChunks(BadChunks); } uint64_t m_LastScrubTime = 0; diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 5fc3ac356..612f87c7c 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -254,7 +254,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do - Ctx.ReportBadChunks(BadChunkHashes); + Ctx.ReportBadCasChunks(BadChunkHashes); } void diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 0b18848d5..ee641b80a 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -394,7 +394,8 @@ FileCasStrategy::Flush() void FileCasStrategy::Scrub(ScrubContext& Ctx) { - std::vector<IoHash> BadHashes; + std::vector<IoHash> BadHashes; + std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { IoHashStream Hasher; @@ -405,8 +406,13 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) { BadHashes.push_back(Hash); } + + ++ChunkCount; + ChunkBytes.fetch_add(Payload.FileSize()); }); + Ctx.ReportScrubbed(ChunkCount, ChunkBytes); + if (!BadHashes.empty()) { ZEN_ERROR("file CAS scrubbing: {} bad chunks found", BadHashes.size()); @@ -428,7 +434,9 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) } } - Ctx.ReportBadChunks(BadHashes); + Ctx.ReportBadCasChunks(BadHashes); + + ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes)); } void diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h index 1425845a0..d0698df7f 100644 --- a/zenstore/include/zenstore/CAS.h +++ b/zenstore/include/zenstore/CAS.h @@ -31,6 +31,25 @@ struct CasStoreConfiguration uint64_t HugeValueThreshold = 1024 * 1024; }; +/** Manage a set of IoHash values + */ + +class CasChunkSet +{ +public: + void AddChunkToSet(const IoHash& HashToAdd); + void AddChunksToSet(std::span<const IoHash> HashesToAdd); + void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate); + void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback); + inline [[nodiscard]] bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); } + inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); } + inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); } + +private: + // Q: should we protect this with a lock, or is that a higher level concern? + std::unordered_set<IoHash> m_ChunkSet; +}; + /** Garbage Collection context object */ @@ -58,33 +77,26 @@ private: class ScrubContext { public: - virtual void ReportBadChunks(std::span<IoHash> BadChunks); + virtual void ReportBadCasChunks(std::span<IoHash> BadCasChunks); inline uint64_t ScrubTimestamp() const { return m_ScrubTime; } inline bool RunRecovery() const { return m_Recover; } + void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes); + + inline uint64_t ScrubbedChunks() const { return m_ChunkCount; } + inline uint64_t ScrubbedBytes() const { return m_ByteCount; } private: - uint64_t m_ScrubTime = GetHifreqTimerValue(); - bool m_Recover = true; + uint64_t m_ScrubTime = GetHifreqTimerValue(); + bool m_Recover = true; + std::atomic<uint64_t> m_ChunkCount{0}; + std::atomic<uint64_t> m_ByteCount{0}; + CasChunkSet m_BadCas; + CasChunkSet m_BadCid; }; -/** Manage a set of IoHash values - */ - -class CasChunkSet -{ -public: - void AddChunkToSet(const IoHash& HashToAdd); - void AddChunksToSet(std::span<const IoHash> HashesToAdd); - void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate); - void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback); - inline [[nodiscard]] bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); } - inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); } - inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); } +/** Content Addressable Storage interface -private: - // Q: should we protect this with a lock, or is that a higher level concern? - std::unordered_set<IoHash> m_ChunkSet; -}; + */ class CasStore { |