diff options
| author | zousar <[email protected]> | 2023-12-07 08:48:04 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-07 08:48:04 -0700 |
| commit | 6229149482f00893afa6874cc75d5e5ed0c438a9 (patch) | |
| tree | 531317314903da569eea099c4a07e721de659b93 | |
| parent | Change naming to ChunkInfos instead of Chunks (diff) | |
| parent | Update CHANGELOG.md (diff) | |
| download | zen-zs/get-all-chunk-infos.tar.xz zen-zs/get-all-chunk-infos.zip | |
Merge branch 'main' into zs/get-all-chunk-infoszs/get-all-chunk-infos
33 files changed, 523 insertions, 211 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 64b53f31c..d159cf2c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,13 +2,17 @@ - Feature: Added xmake task `updatefrontend` which updates the zip file containing the frontend html (`/src/zenserver/frontend/html.zip`) - Feature: Added `--powercycle` option to zenserver which causes it do shut down immediately after initialization is completed. This is useful for profiling startup/shutdown primarily but could also be useful for some kinds of validation/state upgrade scenarios - Feature: New endpoint `/admin/gc-stop` to cancel a running garbage collect operation -- Feature: Added `zen gc-stop` command to cancel a running garbage collect operation` +- Feature: Added `zen gc-stop` command to cancel a running garbage collect operation +- Feature: Added ability to configure logger verbosity on the command line. You can now use `--log-debug=http_requests` to configure the `http_requests` logger to DEBUG level. The provided options are `--log-trace`, `--log-debug`, `--log-info`, `--log-warn`, `--log-error`, `--log-critical`, `--log-off` and each accepts a comma-separated list of logger names to apply the threshold to. - Bugfix: Fix sentry host name where last character of name was being truncated - Bugfix: GCv2 - make sure to discover all projects and oplogs before checking for expired data - Bugfix: Fix sync of log position and state log when writing cas index snapshot - Bugfix: Make sure we can override flags to "false" when running `zen gc` commmand - `smallobjects`, `skipcid`, `skipdelete`, `verbose` - Bugfix: fixed file log timestamp format so the milliseconds are appended after the time not the date +- Bugfix: Shut down thread pools earlier so worker threads have a chance to terminate before main thread calls `atexit()` +- Bugfix: Use correct lookup index when checking for memcached buffer when finding references in diskcache GC +- Bugfix: CasContainerStrategy::ReadIndexFile issue could cause CAS items to not be found after a shutdown/restart cycle - Improvement: The frontend html content is no longer appended at the end of the executable which prevented signing, instead it is compiled in from the `/src/zenserver/frontend/html.zip` archive - Improvement: MacOS now does ad-hoc code signing by default when issuing `xmake bundle`, signing with proper cert is done on CI builds - Improvement: Updated branding to be consistent with current working name ("Unreal Zen Storage Server" etc) @@ -24,6 +28,11 @@ - Improvement: GCv2: Exit as soon as no more unreferenced items are left - Improvement: Reduce memory usage in GC and diskbucket flush - Improvement: Added a `{project}/oplog/{log}/chunkinfos` endpoint that can be used for getting all chunk info within an oplog in batch +- Improvement: Reserve vector sizes in GCv2 to reduce reallocations +- Improvement: Set min/max load factor for cachedisk/compactcas/filecas indexes to reduce memory footprint +- Improvement: Added context (upstream host name) to Zen upstream resolve error message +- Improvement: Make a more accurate estimation of memory usage for in-memory cache values +- Improvement: Added detailed debug logging for pluggable transports ## 0.2.35 - Bugfix: Fix timeout calculation for semtimedop call diff --git a/VERSION.txt b/VERSION.txt index 83a18a49a..48a7bba93 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1 +1 @@ -0.2.36-pre1
\ No newline at end of file +0.2.36-pre3
\ No newline at end of file diff --git a/src/transports/transport-sdk/include/transportplugin.h b/src/transports/transport-sdk/include/transportplugin.h index 2a3b8075f..4347868e6 100644 --- a/src/transports/transport-sdk/include/transportplugin.h +++ b/src/transports/transport-sdk/include/transportplugin.h @@ -77,11 +77,12 @@ public: class TransportPlugin { public: - virtual uint32_t AddRef() const = 0; - virtual uint32_t Release() const = 0; - virtual void Configure(const char* OptionTag, const char* OptionValue) = 0; - virtual void Initialize(TransportServer* ServerInterface) = 0; - virtual void Shutdown() = 0; + virtual uint32_t AddRef() const = 0; + virtual uint32_t Release() const = 0; + virtual void Configure(const char* OptionTag, const char* OptionValue) = 0; + virtual void Initialize(TransportServer* ServerInterface) = 0; + virtual void Shutdown() = 0; + virtual const char* GetDebugName() = 0; /** Check whether this transport is usable. */ @@ -99,9 +100,10 @@ public: class TransportConnection { public: - virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) = 0; - virtual void Shutdown(bool Receive, bool Transmit) = 0; - virtual void CloseConnection() = 0; + virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) = 0; + virtual void Shutdown(bool Receive, bool Transmit) = 0; + virtual void CloseConnection() = 0; + virtual const char* GetDebugName() = 0; }; } // namespace zen diff --git a/src/transports/winsock/winsock.cpp b/src/transports/winsock/winsock.cpp index 28ac10ec1..7ee2b5ed1 100644 --- a/src/transports/winsock/winsock.cpp +++ b/src/transports/winsock/winsock.cpp @@ -51,12 +51,13 @@ public: // TransportPlugin implementation - virtual uint32_t AddRef() const override; - virtual uint32_t Release() const override; - virtual void Configure(const char* OptionTag, const char* OptionValue) override; - virtual void Initialize(TransportServer* ServerInterface) override; - virtual void Shutdown() override; - virtual bool IsAvailable() override; + virtual uint32_t AddRef() const override; + virtual uint32_t Release() const override; + virtual void Configure(const char* OptionTag, const char* OptionValue) override; + virtual void Initialize(TransportServer* ServerInterface) override; + virtual void Shutdown() override; + virtual const char* GetDebugName() override; + virtual bool IsAvailable() override; private: TransportServer* m_ServerInterface = nullptr; @@ -80,9 +81,10 @@ public: // TransportConnection implementation - virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override; - virtual void Shutdown(bool Receive, bool Transmit) override; - virtual void CloseConnection() override; + virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override; + virtual void Shutdown(bool Receive, bool Transmit) override; + virtual void CloseConnection() override; + virtual const char* GetDebugName() override; private: zen::Ref<TransportServerConnection> m_ConnectionHandler; @@ -153,6 +155,12 @@ WinsockTransportConnection::CloseConnection() m_ClientSocket = 0; } +const char* +WinsockTransportConnection::GetDebugName() +{ + return nullptr; +} + int64_t WinsockTransportConnection::WriteBytes(const void* Buffer, size_t DataSize) { @@ -342,6 +350,12 @@ WinsockTransportPlugin::Shutdown() } } +const char* +WinsockTransportPlugin::GetDebugName() +{ + return nullptr; +} + bool WinsockTransportPlugin::IsAvailable() { diff --git a/src/transports/winsock/xmake.lua b/src/transports/winsock/xmake.lua index 552a62702..c14283546 100644 --- a/src/transports/winsock/xmake.lua +++ b/src/transports/winsock/xmake.lua @@ -6,9 +6,9 @@ target("winsock") add_headerfiles("**.h") add_files("**.cpp") add_links("Ws2_32") - add_includedirs(".", "../../zenbase/include") + add_includedirs(".") set_symbols("debug") - add_deps("transport-sdk") + add_deps("zenbase", "transport-sdk") if is_mode("release") then set_optimize("fastest") diff --git a/src/transports/xmake.lua b/src/transports/xmake.lua index 44800a8af..78d637d85 100644 --- a/src/transports/xmake.lua +++ b/src/transports/xmake.lua @@ -5,6 +5,10 @@ set_languages("cxx20") includes('transport-sdk') +if os.isdir('zenbase') then + includes('zenbase') +end + if is_plat("windows") then includes("winsock") end diff --git a/src/zencore/include/zencore/logbase.h b/src/zencore/include/zencore/logbase.h index ad873aa51..00af68b0a 100644 --- a/src/zencore/include/zencore/logbase.h +++ b/src/zencore/include/zencore/logbase.h @@ -90,6 +90,9 @@ struct LoggerRef bool ShouldLog(int Level) const; inline operator bool() const { return SpdLogger != nullptr; } + void SetLogLevel(logging::level::LogLevel NewLogLevel); + logging::level::LogLevel GetLogLevel(); + spdlog::logger* SpdLogger = nullptr; }; diff --git a/src/zencore/include/zencore/logging.h b/src/zencore/include/zencore/logging.h index d14d1ab8d..8b76d754c 100644 --- a/src/zencore/include/zencore/logging.h +++ b/src/zencore/include/zencore/logging.h @@ -35,6 +35,10 @@ LoggerRef ErrorLog(); void SetErrorLog(std::string_view LoggerId); LoggerRef Get(std::string_view Name); +void ConfigureLogLevels(level::LogLevel Level, std::string_view Loggers); +void RefreshLogLevels(); +void RefreshLogLevels(level::LogLevel DefaultLevel); + struct LogCategory { inline LogCategory(std::string_view InCategory) : CategoryName(InCategory) {} diff --git a/src/zencore/logging.cpp b/src/zencore/logging.cpp index 434c461ae..0bf07affd 100644 --- a/src/zencore/logging.cpp +++ b/src/zencore/logging.cpp @@ -4,10 +4,14 @@ #include <zencore/string.h> #include <zencore/testing.h> +#include <zencore/thread.h> +ZEN_THIRD_PARTY_INCLUDES_START +#include <spdlog/details/registry.h> #include <spdlog/sinks/null_sink.h> #include <spdlog/sinks/stdout_color_sinks.h> #include <spdlog/spdlog.h> +ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_PLATFORM_WINDOWS # pragma section(".zlog$a", read) @@ -46,6 +50,8 @@ LoggingContext::~LoggingContext() { } +////////////////////////////////////////////////////////////////////////// + static inline bool IsErrorLevel(int LogLevel) { @@ -176,8 +182,77 @@ ToStringView(level::LogLevel Level) } // namespace zen::logging::level +////////////////////////////////////////////////////////////////////////// + namespace zen::logging { +RwLock LogLevelsLock; +std::string LogLevels[level::LogLevelCount]; + +void +ConfigureLogLevels(level::LogLevel Level, std::string_view Loggers) +{ + RwLock::ExclusiveLockScope _(LogLevelsLock); + LogLevels[Level] = Loggers; +} + +void +RefreshLogLevels(level::LogLevel* DefaultLevel) +{ + spdlog::details::registry::log_levels Levels; + + { + RwLock::SharedLockScope _(LogLevelsLock); + + for (int i = 0; i < level::LogLevelCount; ++i) + { + level::LogLevel CurrentLevel{i}; + + std::string_view Spec = LogLevels[i]; + + while (!Spec.empty()) + { + std::string LoggerName; + + if (auto CommaPos = Spec.find_first_of(','); CommaPos != std::string_view::npos) + { + LoggerName = Spec.substr(CommaPos + 1); + Spec.remove_prefix(CommaPos + 1); + } + else + { + LoggerName = Spec; + Spec = {}; + } + + Levels[LoggerName] = to_spdlog_level(CurrentLevel); + } + } + } + + if (DefaultLevel) + { + spdlog::level::level_enum SpdDefaultLevel = to_spdlog_level(*DefaultLevel); + spdlog::details::registry::instance().set_levels(Levels, &SpdDefaultLevel); + } + else + { + spdlog::details::registry::instance().set_levels(Levels, nullptr); + } +} + +void +RefreshLogLevels(level::LogLevel DefaultLevel) +{ + RefreshLogLevels(&DefaultLevel); +} + +void +RefreshLogLevels() +{ + RefreshLogLevels(nullptr); +} + void SetLogLevel(level::LogLevel NewLogLevel) { @@ -240,6 +315,7 @@ Get(std::string_view Name) if (!Logger) { Logger = Default().SpdLogger->clone(std::string(Name)); + spdlog::apply_logger_env_levels(Logger); spdlog::register_logger(Logger); } @@ -262,6 +338,7 @@ ConsoleLog() if (!ConLogger) { ConLogger = spdlog::stdout_color_mt("console"); + spdlog::apply_logger_env_levels(ConLogger); ConLogger->set_pattern("%v"); } @@ -320,6 +397,18 @@ LoggerRef::ShouldLog(int Level) const return SpdLogger->should_log(static_cast<spdlog::level::level_enum>(Level)); } +void +LoggerRef::SetLogLevel(logging::level::LogLevel NewLogLevel) +{ + SpdLogger->set_level(to_spdlog_level(NewLogLevel)); +} + +logging::level::LogLevel +LoggerRef::GetLogLevel() +{ + return logging::level::to_logging_level(SpdLogger->level()); +} + thread_local ScopedActivityBase* t_ScopeStack = nullptr; ScopedActivityBase* diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h index eabad4728..1089dd221 100644 --- a/src/zenhttp/include/zenhttp/httpserver.h +++ b/src/zenhttp/include/zenhttp/httpserver.h @@ -175,11 +175,11 @@ private: class HttpServer : public RefCounted { public: - virtual void RegisterService(HttpService& Service) = 0; - virtual int Initialize(int BasePort) = 0; - virtual void Run(bool IsInteractiveSession) = 0; - virtual void RequestExit() = 0; - virtual void Close() = 0; + virtual void RegisterService(HttpService& Service) = 0; + virtual int Initialize(int BasePort, std::filesystem::path DataDir) = 0; + virtual void Run(bool IsInteractiveSession) = 0; + virtual void RequestExit() = 0; + virtual void Close() = 0; }; struct HttpServerConfig diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index c62aca001..9fca314b3 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -941,7 +941,7 @@ public: ~HttpAsioServer(); virtual void RegisterService(HttpService& Service) override; - virtual int Initialize(int BasePort) override; + virtual int Initialize(int BasePort, std::filesystem::path DataDir) override; virtual void Run(bool IsInteractiveSession) override; virtual void RequestExit() override; virtual void Close() override; @@ -992,8 +992,9 @@ HttpAsioServer::RegisterService(HttpService& Service) } int -HttpAsioServer::Initialize(int BasePort) +HttpAsioServer::Initialize(int BasePort, std::filesystem::path DataDir) { + ZEN_UNUSED(DataDir); m_BasePort = m_Impl->Start(gsl::narrow<uint16_t>(BasePort), m_ForceLoopback, m_ThreadCount); return m_BasePort; } diff --git a/src/zenhttp/servers/httpmulti.cpp b/src/zenhttp/servers/httpmulti.cpp index d8ebdc9c0..2a6a90d2e 100644 --- a/src/zenhttp/servers/httpmulti.cpp +++ b/src/zenhttp/servers/httpmulti.cpp @@ -28,15 +28,16 @@ HttpMultiServer::RegisterService(HttpService& Service) } int -HttpMultiServer::Initialize(int BasePort) +HttpMultiServer::Initialize(int BasePort, std::filesystem::path DataDir) { + ZEN_UNUSED(DataDir); ZEN_ASSERT(!m_IsInitialized); int EffectivePort = 0; for (auto& Server : m_Servers) { - const int InitializeResult = Server->Initialize(BasePort); + const int InitializeResult = Server->Initialize(BasePort, DataDir); if (!EffectivePort) { diff --git a/src/zenhttp/servers/httpmulti.h b/src/zenhttp/servers/httpmulti.h index d5b21d3c3..53cf57568 100644 --- a/src/zenhttp/servers/httpmulti.h +++ b/src/zenhttp/servers/httpmulti.h @@ -16,7 +16,7 @@ public: ~HttpMultiServer(); virtual void RegisterService(HttpService& Service) override; - virtual int Initialize(int BasePort) override; + virtual int Initialize(int BasePort, std::filesystem::path DataDir) override; virtual void Run(bool IsInteractiveSession) override; virtual void RequestExit() override; virtual void Close() override; diff --git a/src/zenhttp/servers/httpnull.cpp b/src/zenhttp/servers/httpnull.cpp index 7d3e9079a..9ac1c61ce 100644 --- a/src/zenhttp/servers/httpnull.cpp +++ b/src/zenhttp/servers/httpnull.cpp @@ -25,8 +25,9 @@ HttpNullServer::RegisterService(HttpService& Service) } int -HttpNullServer::Initialize(int BasePort) +HttpNullServer::Initialize(int BasePort, std::filesystem::path DataDir) { + ZEN_UNUSED(DataDir); return BasePort; } diff --git a/src/zenhttp/servers/httpnull.h b/src/zenhttp/servers/httpnull.h index 965e729f7..818020604 100644 --- a/src/zenhttp/servers/httpnull.h +++ b/src/zenhttp/servers/httpnull.h @@ -18,7 +18,7 @@ public: ~HttpNullServer(); virtual void RegisterService(HttpService& Service) override; - virtual int Initialize(int BasePort) override; + virtual int Initialize(int BasePort, std::filesystem::path DataDir) override; virtual void Run(bool IsInteractiveSession) override; virtual void RequestExit() override; virtual void Close() override; diff --git a/src/zenhttp/servers/httpplugin.cpp b/src/zenhttp/servers/httpplugin.cpp index 4ae7cd87a..3eed9db8f 100644 --- a/src/zenhttp/servers/httpplugin.cpp +++ b/src/zenhttp/servers/httpplugin.cpp @@ -7,7 +7,11 @@ # include "httpparser.h" # include <zencore/except.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> # include <zencore/logging.h> +# include <zencore/scopeguard.h> +# include <zencore/session.h> # include <zencore/thread.h> # include <zencore/trace.h> # include <zenhttp/httpserver.h> @@ -15,6 +19,8 @@ # include <memory> # include <string_view> +# include <fmt/format.h> + # if ZEN_PLATFORM_WINDOWS # include <conio.h> # endif @@ -38,17 +44,21 @@ using namespace std::literals; struct HttpPluginConnectionHandler : public TransportServerConnection, public HttpRequestParserCallbacks, RefCounted { + HttpPluginConnectionHandler(); + ~HttpPluginConnectionHandler(); + + // TransportServerConnection + virtual uint32_t AddRef() const override; virtual uint32_t Release() const override; - - virtual void OnBytesRead(const void* Buffer, size_t DataSize) override; + virtual void OnBytesRead(const void* Buffer, size_t DataSize) override; // HttpRequestParserCallbacks virtual void HandleRequest() override; virtual void TerminateConnection() override; - void Initialize(TransportConnection* Transport, HttpPluginServerImpl& Server); + void Initialize(TransportConnection* Transport, HttpPluginServerImpl& Server, uint32_t ConnectionId); private: enum class RequestState @@ -65,7 +75,8 @@ private: RequestState m_RequestState = RequestState::kInitialState; HttpRequestParser m_RequestParser{*this}; - uint32_t m_ConnectionId = 0; + uint32_t m_ConnectionId = 0; + std::atomic_uint32_t m_RequestCounter = 0; Ref<IHttpPackageHandler> m_PackageHandler; TransportConnection* m_TransportConnection = nullptr; @@ -82,7 +93,7 @@ struct HttpPluginServerImpl : public HttpPluginServer, TransportServer // HttpPluginServer virtual void RegisterService(HttpService& Service) override; - virtual int Initialize(int BasePort) override; + virtual int Initialize(int BasePort, std::filesystem::path DataDir) override; virtual void Run(bool IsInteractiveSession) override; virtual void RequestExit() override; virtual void Close() override; @@ -92,6 +103,8 @@ struct HttpPluginServerImpl : public HttpPluginServer, TransportServer HttpService* RouteRequest(std::string_view Url); + void WriteDebugPayload(std::string_view Filename, const std::span<const IoBuffer> Payload); + struct ServiceEntry { std::string ServiceUrlPath; @@ -103,6 +116,11 @@ struct HttpPluginServerImpl : public HttpPluginServer, TransportServer std::vector<ServiceEntry> m_UriHandlers; std::vector<Ref<TransportPlugin>> m_Plugins; Event m_ShutdownEvent; + bool m_IsRequestLoggingEnabled = false; + LoggerRef m_RequestLog; + std::atomic_uint32_t m_ConnectionIdCounter{0}; + std::filesystem::path m_DataDir; // Application data directory + std::filesystem::path m_PayloadDir; // Request debugging payload directory // TransportServer @@ -147,14 +165,20 @@ public: HttpPluginResponse() = default; explicit HttpPluginResponse(HttpContentType ContentType) : m_ContentType(ContentType) {} + HttpPluginResponse(const HttpPluginResponse&) = delete; + HttpPluginResponse& operator=(const HttpPluginResponse&) = delete; + void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> BlobList); - inline uint16_t ResponseCode() const { return m_ResponseCode; } - inline uint64_t ContentLength() const { return m_ContentLength; } + inline uint16_t ResponseCode() const { return m_ResponseCode; } + inline uint64_t ContentLength() const { return m_ContentLength; } + inline HttpContentType ContentType() const { return m_ContentType; } const std::vector<IoBuffer>& ResponseBuffers() const { return m_ResponseBuffers; } void SuppressPayload() { m_ResponseBuffers.resize(1); } + std::string_view GetHeaders(); + private: uint16_t m_ResponseCode = 0; bool m_IsKeepAlive = true; @@ -162,8 +186,6 @@ private: uint64_t m_ContentLength = 0; std::vector<IoBuffer> m_ResponseBuffers; ExtendableStringBuilder<160> m_Headers; - - std::string_view GetHeaders(); }; void @@ -210,27 +232,55 @@ HttpPluginResponse::InitializeForPayload(uint16_t ResponseCode, std::span<IoBuff std::string_view HttpPluginResponse::GetHeaders() { - m_Headers << "HTTP/1.1 " << ResponseCode() << " " << ReasonStringForHttpResultCode(ResponseCode()) << "\r\n" - << "Content-Type: " << MapContentTypeToString(m_ContentType) << "\r\n" - << "Content-Length: " << ContentLength() << "\r\n"sv; - - if (!m_IsKeepAlive) + if (m_Headers.Size() == 0) { - m_Headers << "Connection: close\r\n"sv; - } + m_Headers << "HTTP/1.1 " << ResponseCode() << " " << ReasonStringForHttpResultCode(ResponseCode()) << "\r\n" + << "Content-Type: " << MapContentTypeToString(m_ContentType) << "\r\n" + << "Content-Length: " << ContentLength() << "\r\n"sv; + + if (!m_IsKeepAlive) + { + m_Headers << "Connection: close\r\n"sv; + } - m_Headers << "\r\n"sv; + m_Headers << "\r\n"sv; + } return m_Headers; } ////////////////////////////////////////////////////////////////////////// +HttpPluginConnectionHandler::HttpPluginConnectionHandler() +{ +} + +HttpPluginConnectionHandler::~HttpPluginConnectionHandler() +{ + if (m_Server) + { + ZEN_LOG_TRACE(m_Server->m_RequestLog, "END connection #{}", m_ConnectionId); + } +} + void -HttpPluginConnectionHandler::Initialize(TransportConnection* Transport, HttpPluginServerImpl& Server) +HttpPluginConnectionHandler::Initialize(TransportConnection* Transport, HttpPluginServerImpl& Server, uint32_t ConnectionId) { m_TransportConnection = Transport; m_Server = &Server; + m_ConnectionId = ConnectionId; + + std::string_view ConnectionName; + if (const char* Name = Transport->GetDebugName()) + { + ConnectionName = Name; + } + else + { + ConnectionName = "anonymous"; + } + + ZEN_LOG_TRACE(m_Server->m_RequestLog, "NEW connection #{} ('')", m_ConnectionId, ConnectionName); } uint32_t @@ -248,13 +298,19 @@ HttpPluginConnectionHandler::Release() const void HttpPluginConnectionHandler::OnBytesRead(const void* Buffer, size_t AvailableBytes) { + ZEN_ASSERT(m_Server); + + ZEN_LOG_TRACE(m_Server->m_RequestLog, "connection #{} OnBytesRead: {}", m_ConnectionId, AvailableBytes); + while (AvailableBytes) { const size_t ConsumedBytes = m_RequestParser.ConsumeData((const char*)Buffer, AvailableBytes); if (ConsumedBytes == ~0ull) { - // terminate connection + // request parser error -- terminate connection + + ZEN_LOG_TRACE(m_Server->m_RequestLog, "connection #{} terminating due to request parsing error", m_ConnectionId); return TerminateConnection(); } @@ -269,15 +325,21 @@ HttpPluginConnectionHandler::OnBytesRead(const void* Buffer, size_t AvailableByt void HttpPluginConnectionHandler::HandleRequest() { + ZEN_ASSERT(m_Server); + + const uint32_t RequestNumber = m_RequestCounter.fetch_add(1); + + ZEN_LOG_TRACE(m_Server->m_RequestLog, "connection #{} ENTER HandleRequest #{}", m_ConnectionId, RequestNumber); + auto $Exit = + MakeGuard([&] { ZEN_LOG_TRACE(m_Server->m_RequestLog, "connection #{} EXIT HandleRequest #{}", m_ConnectionId, RequestNumber); }); + if (!m_RequestParser.IsKeepAlive()) { // Once response has been written, connection is done m_RequestState = RequestState::kWritingFinal; - // We're not going to read any more data from this socket - - const bool Receive = true; - const bool Transmit = false; + const bool Receive = true; // We're not going to read any more data from this socket + const bool Transmit = false; // We will write more data however m_TransportConnection->Shutdown(Receive, Transmit); } else @@ -300,6 +362,24 @@ HttpPluginConnectionHandler::HandleRequest() HttpPluginServerRequest Request(m_RequestParser, *Service, m_RequestParser.Body()); + const HttpVerb RequestVerb = Request.RequestVerb(); + const std::string_view Uri = Request.RelativeUri(); + + if (m_Server->m_RequestLog.ShouldLog(logging::level::Trace)) + { + ZEN_LOG_TRACE(m_Server->m_RequestLog, + "connection #{} Handling Request: {} {} ({} bytes ({}), accept: {})", + m_ConnectionId, + ToString(RequestVerb), + Uri, + Request.ContentLength(), + ToString(Request.RequestContentType()), + ToString(Request.AcceptContentType())); + + m_Server->WriteDebugPayload(fmt::format("request_{}_{}.bin", m_ConnectionId, RequestNumber), + std::vector<IoBuffer>{Request.ReadPayload()}); + } + if (!HandlePackageOffers(*Service, Request, m_PackageHandler)) { try @@ -340,6 +420,17 @@ HttpPluginConnectionHandler::HandleRequest() if (std::unique_ptr<HttpPluginResponse> Response = std::move(Request.m_Response)) { + { + const uint16_t ResponseCode = Response->ResponseCode(); + ZEN_LOG_TRACE(m_Server->m_RequestLog, + "connection #{} Response: {} {} ({} bytes, {})", + m_ConnectionId, + ResponseCode, + ToString(HttpResponseCode(ResponseCode)), + Response->ContentLength(), + ToString(Response->ContentType())); + } + // Transmit the response if (m_RequestParser.RequestVerb() == HttpVerb::kHead) @@ -349,10 +440,19 @@ HttpPluginConnectionHandler::HandleRequest() const std::vector<IoBuffer>& ResponseBuffers = Response->ResponseBuffers(); - //// TODO: should cork/uncork for Linux? + if (m_Server->m_RequestLog.ShouldLog(logging::level::Trace)) + { + m_Server->WriteDebugPayload(fmt::format("response_{}_{}.bin", m_ConnectionId, RequestNumber), ResponseBuffers); + } for (const IoBuffer& Buffer : ResponseBuffers) { + ZEN_LOG_TRACE(m_Server->m_RequestLog, + "connection #{} SEND: {} bytes, {}", + m_ConnectionId, + Buffer.GetSize(), + ToString(Buffer.GetContentType())); + int64_t SentBytes = SendBuffer(Buffer); if (SentBytes < 0) @@ -558,7 +658,7 @@ HttpPluginServerRequest::TryGetRanges(HttpRanges& Ranges) ////////////////////////////////////////////////////////////////////////// -HttpPluginServerImpl::HttpPluginServerImpl() +HttpPluginServerImpl::HttpPluginServerImpl() : m_RequestLog(logging::Get("http_requests")) { } @@ -570,13 +670,19 @@ TransportServerConnection* HttpPluginServerImpl::CreateConnectionHandler(TransportConnection* Connection) { HttpPluginConnectionHandler* Handler{new HttpPluginConnectionHandler()}; - Handler->Initialize(Connection, *this); + const uint32_t ConnectionId = m_ConnectionIdCounter.fetch_add(1); + Handler->Initialize(Connection, *this, ConnectionId); return Handler; } int -HttpPluginServerImpl::Initialize(int BasePort) +HttpPluginServerImpl::Initialize(int BasePort, std::filesystem::path DataDir) { + m_DataDir = DataDir; + m_PayloadDir = DataDir / "debug" / GetSessionIdString(); + + ZEN_INFO("any debug payloads will be written to '{}'", m_PayloadDir); + try { RwLock::ExclusiveLockScope _(m_Lock); @@ -742,6 +848,23 @@ HttpPluginServerImpl::RouteRequest(std::string_view Url) return CandidateService; } +void +HttpPluginServerImpl::WriteDebugPayload(std::string_view Filename, const std::span<const IoBuffer> Payload) +{ + uint64_t PayloadSize = 0; + std::vector<const IoBuffer*> Buffers; + for (auto& Io : Payload) + { + Buffers.push_back(&Io); + PayloadSize += Io.GetSize(); + } + + if (PayloadSize) + { + WriteFile(m_PayloadDir / Filename, Buffers.data(), Buffers.size()); + } +} + ////////////////////////////////////////////////////////////////////////// struct HttpPluginServerImpl; diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index 0b11d396b..d2cb63cd7 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -43,7 +43,7 @@ public: // HttpServer interface implementation - virtual int Initialize(int BasePort) override; + virtual int Initialize(int BasePort, std::filesystem::path DataDir) override; virtual void Run(bool TestMode) override; virtual void RequestExit() override; virtual void RegisterService(HttpService& Service) override; @@ -2012,8 +2012,9 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT // int -HttpSysServer::Initialize(int BasePort) +HttpSysServer::Initialize(int BasePort, std::filesystem::path DataDir) { + ZEN_UNUSED(DataDir); if (int EffectivePort = InitializeServer(BasePort)) { StartServer(); diff --git a/src/zenhttp/transports/asiotransport.cpp b/src/zenhttp/transports/asiotransport.cpp index ab053a748..a9a782821 100644 --- a/src/zenhttp/transports/asiotransport.cpp +++ b/src/zenhttp/transports/asiotransport.cpp @@ -34,12 +34,13 @@ public: AsioTransportPlugin(); ~AsioTransportPlugin(); - virtual uint32_t AddRef() const override; - virtual uint32_t Release() const override; - virtual void Configure(const char* OptionTag, const char* OptionValue) override; - virtual void Initialize(TransportServer* ServerInterface) override; - virtual void Shutdown() override; - virtual bool IsAvailable() override; + virtual uint32_t AddRef() const override; + virtual uint32_t Release() const override; + virtual void Configure(const char* OptionTag, const char* OptionValue) override; + virtual void Initialize(TransportServer* ServerInterface) override; + virtual void Shutdown() override; + virtual const char* GetDebugName() override { return nullptr; } + virtual bool IsAvailable() override; private: bool m_IsOk = true; @@ -63,9 +64,10 @@ struct AsioTransportConnection : public TransportConnection, std::enable_shared_ // TransportConnectionInterface - virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override; - virtual void Shutdown(bool Receive, bool Transmit) override; - virtual void CloseConnection() override; + virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override; + virtual void Shutdown(bool Receive, bool Transmit) override; + virtual void CloseConnection() override; + virtual const char* GetDebugName() override { return nullptr; } private: void EnqueueRead(); diff --git a/src/zenhttp/transports/dlltransport.cpp b/src/zenhttp/transports/dlltransport.cpp index dd4479e39..e09e62ec5 100644 --- a/src/zenhttp/transports/dlltransport.cpp +++ b/src/zenhttp/transports/dlltransport.cpp @@ -19,69 +19,6 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { -struct DllTransportConnection : public TransportConnection -{ -public: - DllTransportConnection(); - ~DllTransportConnection(); - - void Initialize(TransportServerConnection& ServerConnection); - void HandleConnection(); - - // TransportConnection - - virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override; - virtual void Shutdown(bool Receive, bool Transmit) override; - virtual void CloseConnection() override; - -private: - Ref<TransportServerConnection> m_ConnectionHandler; - bool m_IsTerminated = false; -}; - -DllTransportConnection::DllTransportConnection() -{ -} - -DllTransportConnection::~DllTransportConnection() -{ -} - -void -DllTransportConnection::Initialize(TransportServerConnection& ServerConnection) -{ - m_ConnectionHandler = &ServerConnection; // TODO: this is awkward -} - -void -DllTransportConnection::HandleConnection() -{ -} - -void -DllTransportConnection::CloseConnection() -{ - if (m_IsTerminated) - { - return; - } - - m_IsTerminated = true; -} - -int64_t -DllTransportConnection::WriteBytes(const void* Buffer, size_t DataSize) -{ - ZEN_UNUSED(Buffer, DataSize); - return DataSize; -} - -void -DllTransportConnection::Shutdown(bool Receive, bool Transmit) -{ - ZEN_UNUSED(Receive, Transmit); -} - ////////////////////////////////////////////////////////////////////////// struct LoadedDll @@ -97,12 +34,13 @@ public: DllTransportPluginImpl(); ~DllTransportPluginImpl(); - virtual uint32_t AddRef() const override; - virtual uint32_t Release() const override; - virtual void Configure(const char* OptionTag, const char* OptionValue) override; - virtual void Initialize(TransportServer* ServerInterface) override; - virtual void Shutdown() override; - virtual bool IsAvailable() override; + virtual uint32_t AddRef() const override; + virtual uint32_t Release() const override; + virtual void Configure(const char* OptionTag, const char* OptionValue) override; + virtual void Initialize(TransportServer* ServerInterface) override; + virtual void Shutdown() override; + virtual const char* GetDebugName() override; + virtual bool IsAvailable() override; virtual void LoadDll(std::string_view Name) override; virtual void ConfigureDll(std::string_view Name, const char* OptionTag, const char* OptionValue) override; @@ -179,6 +117,12 @@ DllTransportPluginImpl::Shutdown() } } +const char* +DllTransportPluginImpl::GetDebugName() +{ + return nullptr; +} + bool DllTransportPluginImpl::IsAvailable() { diff --git a/src/zenhttp/transports/winsocktransport.cpp b/src/zenhttp/transports/winsocktransport.cpp index 2397dd7cf..7407c55dd 100644 --- a/src/zenhttp/transports/winsocktransport.cpp +++ b/src/zenhttp/transports/winsocktransport.cpp @@ -31,9 +31,10 @@ public: // TransportConnection - virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override; - virtual void Shutdown(bool Receive, bool Transmit) override; - virtual void CloseConnection() override; + virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override; + virtual void Shutdown(bool Receive, bool Transmit) override; + virtual void CloseConnection() override; + virtual const char* GetDebugName() override; private: Ref<TransportServerConnection> m_ConnectionHandler; @@ -103,6 +104,12 @@ SocketTransportConnection::CloseConnection() m_ClientSocket = 0; } +const char* +SocketTransportConnection::GetDebugName() +{ + return nullptr; +} + int64_t SocketTransportConnection::WriteBytes(const void* Buffer, size_t DataSize) { @@ -157,12 +164,13 @@ public: SocketTransportPluginImpl(); ~SocketTransportPluginImpl(); - virtual uint32_t AddRef() const override; - virtual uint32_t Release() const override; - virtual void Configure(const char* OptionTag, const char* OptionValue) override; - virtual void Initialize(TransportServer* ServerInterface) override; - virtual void Shutdown() override; - virtual bool IsAvailable() override; + virtual uint32_t AddRef() const override; + virtual uint32_t Release() const override; + virtual void Configure(const char* OptionTag, const char* OptionValue) override; + virtual void Initialize(TransportServer* ServerInterface) override; + virtual void Shutdown() override; + virtual const char* GetDebugName() override; + virtual bool IsAvailable() override; private: TransportServer* m_ServerInterface = nullptr; @@ -337,6 +345,12 @@ SocketTransportPluginImpl::Shutdown() } } +const char* +SocketTransportPluginImpl::GetDebugName() +{ + return nullptr; +} + ////////////////////////////////////////////////////////////////////////// TransportPlugin* diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 2c344dd1d..700529443 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -209,6 +209,9 @@ namespace { zen::Sleep(100); } while (true); } + + uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize) { return 8u + 32u + RoundUp(PayloadSize, 8u); } + } // namespace namespace fs = std::filesystem; @@ -655,6 +658,9 @@ BucketManifestSerializer::WriteSidecarFile(const std::filesystem::path& ////////////////////////////////////////////////////////////////////////// +static const float IndexMinLoadFactor = 0.2f; +static const float IndexMaxLoadFactor = 0.7f; + ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, @@ -665,6 +671,9 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, , m_Configuration(Config) , m_BucketId(Oid::Zero) { + m_Index.min_load_factor(IndexMinLoadFactor); + m_Index.max_load_factor(IndexMaxLoadFactor); + if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) { const uint64_t LegacyOverrideSize = 16 * 1024 * 1024; @@ -1289,14 +1298,26 @@ ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); RwLock::ExclusiveLockScope _(m_IndexLock); + if (m_MemCachedPayloads.empty()) + { + return; + } for (const auto& Kv : m_Index) { - if (m_AccessTimes[Kv.second] < ExpireTicks) + size_t Index = Kv.second; + BucketPayload& Payload = m_Payloads[Index]; + if (!Payload.MemCached) + { + continue; + } + if (m_AccessTimes[Index] < ExpireTicks) { - BucketPayload& Payload = m_Payloads[Kv.second]; RemoveMemCachedData(Payload); } } + m_MemCachedPayloads.shrink_to_fit(); + m_FreeMemCachedPayloads.shrink_to_fit(); + m_FreeMetaDatas.shrink_to_fit(); } void @@ -1305,6 +1326,10 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, std::vector<uint64_t>& InOutUsageSlots) { RwLock::SharedLockScope _(m_IndexLock); + if (m_MemCachedPayloads.empty()) + { + return; + } for (const auto& It : m_Index) { size_t Index = It.second; @@ -1929,10 +1954,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) for (const auto& Entry : StructuredItemsWithUnknownAttachments) { - const IoHash& Key = Entry.first; - size_t PayloadIndex = Entry.second; - BucketPayload& Payload = Payloads[PayloadIndex]; - const DiskLocation& Loc = Payload.Location; + const IoHash& Key = Entry.first; + BucketPayload& Payload = Payloads[Entry.second]; + const DiskLocation& Loc = Payload.Location; { IoBuffer Buffer; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) @@ -1955,7 +1979,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Key); It != m_Index.end()) { - const BucketPayload& CachedPayload = Payloads[PayloadIndex]; + const BucketPayload& CachedPayload = Payloads[It->second]; if (CachedPayload.MemCached) { Buffer = m_MemCachedPayloads[CachedPayload.MemCached]; @@ -2630,7 +2654,7 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(BucketPayload& Payload, IoBuffe { Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(m_MemCachedPayloads.size())); m_MemCachedPayloads.push_back(MemCachedData); - AddMemCacheUsage(PayloadSize); + AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } } @@ -2639,7 +2663,7 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(BucketPayload& Payload, IoBuffe Payload.MemCached = m_FreeMemCachedPayloads.back(); m_FreeMemCachedPayloads.pop_back(); m_MemCachedPayloads[Payload.MemCached] = MemCachedData; - AddMemCacheUsage(PayloadSize); + AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } } @@ -2650,7 +2674,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(BucketPayload& Payload) if (Payload.MemCached) { size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].GetSize(); - RemoveMemCacheUsage(PayloadSize); + RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemCachedPayloads[Payload.MemCached] = IoBuffer{}; m_FreeMemCachedPayloads.push_back(Payload.MemCached); Payload.MemCached = {}; @@ -2844,7 +2868,8 @@ public: m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys = std::make_unique<HashSet>(); }); auto __ = MakeGuard([&]() { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys.reset(); }); }); - std::unordered_map<uint32_t, uint64_t> BlockUsage; + size_t InlineEntryCount = 0; + BlockStore::BlockUsageMap BlockUsage; { RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); for (const auto& Entry : m_Bucket.m_Index) @@ -2857,15 +2882,17 @@ public: { continue; } + InlineEntryCount++; uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex(); uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment); if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end()) { - It->second += ChunkSize; + It->second.EntryCount++; + It->second.DiskUsage += ChunkSize; } else { - BlockUsage.insert_or_assign(BlockIndex, ChunkSize); + BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); } } } @@ -2873,8 +2900,9 @@ public: { BlockStoreCompactState BlockCompactState; std::vector<IoHash> BlockCompactStateKeys; + BlockCompactStateKeys.reserve(InlineEntryCount); - std::vector<uint32_t> BlocksToCompact = + BlockStore::BlockEntryCountMap BlocksToCompact = m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent); BlockCompactState.IncludeBlocks(BlocksToCompact); @@ -3149,7 +3177,7 @@ public: uint32_t Size; }; std::vector<std::vector<InlineEntry>> EntriesPerBlock; - + size_t UpdateCount = 0; { RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); for (const auto& Entry : m_CacheBucket.m_Index) @@ -3174,6 +3202,7 @@ public: { continue; } + UpdateCount++; const IoHash& Key = Entry.first; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { @@ -3200,6 +3229,8 @@ public: } } + UpdateKeys.reserve(UpdateCount); + for (auto It : BlockIndexToEntriesPerBlockIndex) { uint32_t BlockIndex = It.first; @@ -3652,6 +3683,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloa FirstReferenceIndex.reserve(EntryCount); } Index.reserve(EntryCount); + Index.min_load_factor(IndexMinLoadFactor); + Index.max_load_factor(IndexMaxLoadFactor); for (auto It : m_Index) { PayloadIndex EntryIndex = PayloadIndex(Payloads.size()); diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index fe92613f4..3fe0b0c63 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -9,6 +9,7 @@ #include <zencore/except.h> #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> +#include <zencore/logging.h> #include <zencore/string.h> #include <zenhttp/zenhttp.h> #include <zenutil/basicfile.h> @@ -519,7 +520,6 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) cxxopts::value<bool>(ServerOptions.IsCleanStart)->default_value("false")); options.add_options()("help", "Show command line help"); options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(ServerOptions.IsTest)->default_value("false")); - options.add_options()("log-id", "Specify id for adding context to log output", cxxopts::value<std::string>(ServerOptions.LogId)); options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::string>(DataDir)); options.add_options()("snapshot-dir", "Specify a snapshot of server state to mirror into the persistence root at startup", @@ -528,7 +528,6 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_options()("powercycle", "Exit immediately after initialization is complete", cxxopts::value<bool>(ServerOptions.IsPowerCycle)); - options.add_options()("abslog", "Path to log file", cxxopts::value<std::string>(AbsLogFile)); options.add_options()("config", "Path to Lua config file", cxxopts::value<std::string>(ConfigFile)); options.add_options()("write-config", "Path to output Lua config file", cxxopts::value<std::string>(OutputConfigFile)); options.add_options()("no-sentry", @@ -537,7 +536,21 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_options()("sentry-allow-personal-info", "Allow personally identifiable information in sentry crash reports", cxxopts::value<bool>(ServerOptions.SentryAllowPII)->default_value("false")); - options.add_options()("quiet", "Disable console logging", cxxopts::value<bool>(ServerOptions.NoConsoleOutput)->default_value("false")); + + // clang-format off + options.add_options("logging") + ("abslog", "Path to log file", cxxopts::value<std::string>(AbsLogFile)) + ("log-id", "Specify id for adding context to log output", cxxopts::value<std::string>(ServerOptions.LogId)) + ("quiet", "Disable console logging", cxxopts::value<bool>(ServerOptions.NoConsoleOutput)->default_value("false")) + ("log-trace", "Change selected loggers to level TRACE", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Trace])) + ("log-debug", "Change selected loggers to level DEBUG", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Debug])) + ("log-info", "Change selected loggers to level INFO", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Info])) + ("log-warn", "Change selected loggers to level WARN", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Warn])) + ("log-error", "Change selected loggers to level ERROR", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Err])) + ("log-critical", "Change selected loggers to level CRITICAL", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Critical])) + ("log-off", "Change selected loggers to level OFF", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Off])) + ; + // clang-format on options.add_option("security", "", @@ -952,6 +965,12 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) exit(0); } + for (int i = 0; i < logging::level::LogLevelCount; ++i) + { + logging::ConfigureLogLevels(logging::level::LogLevel(i), ServerOptions.Loggers[i]); + } + logging::RefreshLogLevels(); + ServerOptions.DataDir = MakeSafePath(DataDir); ServerOptions.BaseSnapshotDir = MakeSafePath(BaseSnapshotDir); ServerOptions.ContentDir = MakeSafePath(ContentDir); diff --git a/src/zenserver/config.h b/src/zenserver/config.h index 8135bf8f0..11311f9d8 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/logbase.h> #include <zencore/zencore.h> #include <zenhttp/httpserver.h> #include <filesystem> @@ -151,6 +152,7 @@ struct ZenServerOptions bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports bool ObjectStoreEnabled = false; bool NoConsoleOutput = false; // Control default use of stdout for diagnostics + std::string Loggers[zen::logging::level::LogLevelCount]; #if ZEN_WITH_TRACE std::string TraceHost; // Host name or IP address to send trace data to std::string TraceFile; // Path of a file to write a trace diff --git a/src/zenserver/diag/logging.cpp b/src/zenserver/diag/logging.cpp index e2d57b840..dc1675819 100644 --- a/src/zenserver/diag/logging.cpp +++ b/src/zenserver/diag/logging.cpp @@ -42,6 +42,7 @@ InitializeServerLogging(const ZenServerOptions& InOptions) /* max files */ 16, /* rotate on open */ true); auto HttpLogger = std::make_shared<spdlog::logger>("http_requests", HttpSink); + spdlog::apply_logger_env_levels(HttpLogger); spdlog::register_logger(HttpLogger); // Cache request logging @@ -53,16 +54,19 @@ InitializeServerLogging(const ZenServerOptions& InOptions) /* max files */ 16, /* rotate on open */ false); auto CacheLogger = std::make_shared<spdlog::logger>("z$", CacheSink); + spdlog::apply_logger_env_levels(CacheLogger); spdlog::register_logger(CacheLogger); // Jupiter - only log upstream HTTP traffic to file auto JupiterLogger = std::make_shared<spdlog::logger>("jupiter", FileSink); + spdlog::apply_logger_env_levels(JupiterLogger); spdlog::register_logger(JupiterLogger); // Zen - only log upstream HTTP traffic to file auto ZenClientLogger = std::make_shared<spdlog::logger>("zenclient", FileSink); + spdlog::apply_logger_env_levels(ZenClientLogger); spdlog::register_logger(ZenClientLogger); FinishInitializeLogging(LogOptions); diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 2430267c1..2aeb6a4d5 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -162,7 +162,7 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen // Ok so now we're configured, let's kick things off m_Http = CreateHttpServer(ServerOptions.HttpServerConfig); - int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort); + int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort, ServerOptions.DataDir); // Setup authentication manager { @@ -491,7 +491,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) const asio::error_code Err = utils::ResolveHostname(m_IoContext, Dns, "8558"sv, ZenUrls); if (Err) { - ZEN_ERROR("resolve FAILED, reason '{}'", Err.message()); + ZEN_ERROR("resolve of '{}' FAILED, reason '{}'", Dns, Err.message()); } } } @@ -643,6 +643,8 @@ ZenServer::Cleanup() Flush(); + ShutdownWorkerPools(); + m_AdminService.reset(); m_VfsService.reset(); m_ObjStoreService.reset(); @@ -660,7 +662,6 @@ ZenServer::Cleanup() m_AuthMgr.reset(); m_Http = {}; m_JobQueue.reset(); - ShutdownWorkerPools(); } catch (std::exception& Ex) { diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 918f464ac..71e306eca 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -282,10 +282,10 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations) } } -std::vector<uint32_t> -BlockStore::GetBlocksToCompact(const std::unordered_map<uint32_t, uint64_t>& BlockUsage, uint32_t BlockUsageThresholdPercent) +BlockStore::BlockEntryCountMap +BlockStore::GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent) { - std::unordered_set<uint32_t> Result; + BlockEntryCountMap Result; { RwLock::SharedLockScope InsertLock(m_InsertLock); for (const auto& It : m_ChunkBlocks) @@ -299,31 +299,34 @@ BlockStore::GetBlocksToCompact(const std::unordered_map<uint32_t, uint64_t>& Blo { continue; } - uint64_t BlockSize = It.second ? It.second->FileSize() : 0u; - if (BlockSize == 0) + + uint64_t UsedSize = 0; + uint32_t UsedCount = 0; + if (auto UsageIt = BlockUsage.find(BlockIndex); UsageIt != BlockUsage.end()) { - Result.insert(BlockIndex); - continue; + UsedSize = UsageIt->second.DiskUsage; + UsedCount = UsageIt->second.EntryCount; } - uint64_t UsedSize = 0; - if (auto UsageIt = BlockUsage.find(BlockIndex); UsageIt != BlockUsage.end()) + uint64_t BlockSize = It.second ? It.second->FileSize() : 0u; + if (BlockSize == 0) { - UsedSize = UsageIt->second; + Result.insert_or_assign(BlockIndex, UsedCount); + continue; } if (BlockUsageThresholdPercent == 100) { if (UsedSize < BlockSize) { - Result.insert(BlockIndex); + Result.insert_or_assign(BlockIndex, UsedCount); } } else if (BlockUsageThresholdPercent == 0) { if (UsedSize == 0) { - Result.insert(BlockIndex); + Result.insert_or_assign(BlockIndex, UsedCount); } } else @@ -331,12 +334,12 @@ BlockStore::GetBlocksToCompact(const std::unordered_map<uint32_t, uint64_t>& Blo const uint32_t UsedPercent = UsedSize < BlockSize ? gsl::narrow<uint32_t>((100 * UsedSize) / BlockSize) : 100u; if (UsedPercent < BlockUsageThresholdPercent) { - Result.insert(BlockIndex); + Result.insert_or_assign(BlockIndex, UsedCount); } } } } - return std::vector<uint32_t>(Result.begin(), Result.end()); + return Result; } void diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 5de82f219..96ab65a5f 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -25,6 +25,9 @@ # include <zenstore/cidstore.h> # include <algorithm> # include <random> +ZEN_THIRD_PARTY_INCLUDES_START +# include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END #endif ////////////////////////////////////////////////////////////////////////// @@ -114,8 +117,14 @@ namespace { ////////////////////////////////////////////////////////////////////////// +static const float IndexMinLoadFactor = 0.2f; +static const float IndexMaxLoadFactor = 0.7f; + CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get("containercas")), m_Gc(Gc) { + m_LocationMap.min_load_factor(IndexMinLoadFactor); + m_LocationMap.max_load_factor(IndexMaxLoadFactor); + m_Gc.AddGcStorage(this); m_Gc.AddGcReferenceStore(*this); } @@ -576,7 +585,7 @@ public: if (Ctx.Settings.CollectSmallObjects) { - std::unordered_map<uint32_t, uint64_t> BlockUsage; + BlockStore::BlockUsageMap BlockUsage; { RwLock::SharedLockScope __(m_CasContainerStrategy.m_LocationMapLock); if (Ctx.IsCancelledFlag.load()) @@ -591,14 +600,14 @@ public: uint32_t BlockIndex = Loc.GetBlockIndex(); uint64_t ChunkSize = RoundUp(Loc.GetSize(), m_CasContainerStrategy.m_PayloadAlignment); - auto It = BlockUsage.find(BlockIndex); - if (It == BlockUsage.end()) + if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end()) { - BlockUsage.insert_or_assign(BlockIndex, ChunkSize); + It->second.EntryCount++; + It->second.DiskUsage += ChunkSize; } else { - It->second += ChunkSize; + BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); } } } @@ -607,7 +616,7 @@ public: BlockStoreCompactState BlockCompactState; std::vector<IoHash> BlockCompactStateKeys; - std::vector<uint32_t> BlocksToCompact = + BlockStore::BlockEntryCountMap BlocksToCompact = m_CasContainerStrategy.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent); BlockCompactState.IncludeBlocks(BlocksToCompact); @@ -980,13 +989,14 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint Entries.resize(128 * 1024 / sizeof(CasDiskIndexEntry)); uint64_t RemainingEntries = Header.EntryCount; + uint64_t ReadOffset = sizeof(CasDiskIndexHeader); do { const uint64_t NumToRead = Min(RemainingEntries, Entries.size()); Entries.resize(NumToRead); - ObjectIndexFile.Read(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); + ObjectIndexFile.Read(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), ReadOffset); std::string InvalidEntryReason; for (const CasDiskIndexEntry& Entry : Entries) @@ -1002,6 +1012,7 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint } RemainingEntries -= NumToRead; + ReadOffset += NumToRead * sizeof(CasDiskIndexEntry); } while (RemainingEntries); OutVersion = CasDiskIndexHeader::CurrentVersion; diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index aeca01dd1..5da612e30 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -128,8 +128,14 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo ////////////////////////////////////////////////////////////////////////// +static const float IndexMinLoadFactor = 0.2f; +static const float IndexMaxLoadFactor = 0.7f; + FileCasStrategy::FileCasStrategy(GcManager& Gc) : m_Log(logging::Get("filecas")), m_Gc(Gc) { + m_Index.min_load_factor(IndexMinLoadFactor); + m_Index.max_load_factor(IndexMaxLoadFactor); + m_Gc.AddGcStorage(this); m_Gc.AddGcReferenceStore(*this); } diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h index cb1347580..70cd4ef5a 100644 --- a/src/zenstore/filecas.h +++ b/src/zenstore/filecas.h @@ -16,6 +16,10 @@ #include <atomic> #include <functional> +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + namespace zen { class BasicFile; diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 919684e41..786780b5e 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -132,6 +132,14 @@ public: typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback; typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback; + struct BlockUsageInfo + { + uint64_t DiskUsage; + uint32_t EntryCount; + }; + typedef std::unordered_map<uint32_t, BlockUsageInfo> BlockUsageMap; + typedef std::unordered_map<uint32_t, uint32_t> BlockEntryCountMap; + void Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount); struct BlockIndexSet @@ -145,8 +153,8 @@ public: // Ask the store to create empty blocks for all locations that does not have a block // Remove any block that is not referenced - void SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations); - std::vector<uint32_t> GetBlocksToCompact(const std::unordered_map<uint32_t, uint64_t>& BlockUsage, uint32_t BlockUsageThresholdPercent); + void SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations); + BlockEntryCountMap GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent); void Close(); @@ -205,23 +213,29 @@ class BlockStoreCompactState public: BlockStoreCompactState() = default; - void IncludeBlocks(const std::span<const uint32_t> BlockIndexes) + void IncludeBlocks(const BlockStore::BlockEntryCountMap& BlockEntryCountMap) { - for (uint32_t BlockIndex : BlockIndexes) + size_t EntryCountTotal = 0; + for (auto& BlockUsageIt : BlockEntryCountMap) { - auto It = m_BlockIndexToChunkMapIndex.find(BlockIndex); - if (It == m_BlockIndexToChunkMapIndex.end()) - { - m_KeepChunks.emplace_back(std::vector<size_t>()); - m_BlockIndexToChunkMapIndex.insert_or_assign(BlockIndex, m_KeepChunks.size() - 1); - } + uint32_t BlockIndex = BlockUsageIt.first; + ZEN_ASSERT(m_BlockIndexToChunkMapIndex.find(BlockIndex) == m_BlockIndexToChunkMapIndex.end()); + + m_KeepChunks.emplace_back(std::vector<size_t>()); + m_KeepChunks.back().reserve(BlockUsageIt.second); + m_BlockIndexToChunkMapIndex.insert_or_assign(BlockIndex, m_KeepChunks.size() - 1); + EntryCountTotal += BlockUsageIt.second; } + m_ChunkLocations.reserve(EntryCountTotal); } void IncludeBlock(uint32_t BlockIndex) { - const uint32_t Blocks[1] = {BlockIndex}; - IncludeBlocks(Blocks); + if (m_BlockIndexToChunkMapIndex.find(BlockIndex) == m_BlockIndexToChunkMapIndex.end()) + { + m_KeepChunks.emplace_back(std::vector<size_t>()); + m_BlockIndexToChunkMapIndex.insert_or_assign(BlockIndex, m_KeepChunks.size() - 1); + } } bool AddKeepLocation(const BlockStoreLocation& Location) diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h index 319683dcb..4c9f30608 100644 --- a/src/zenstore/include/zenstore/cidstore.h +++ b/src/zenstore/include/zenstore/cidstore.h @@ -9,10 +9,6 @@ #include <zenstore/hashkeyset.h> #include <zenutil/statsreporter.h> -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_map.h> -ZEN_THIRD_PARTY_INCLUDES_END - #include <filesystem> namespace zen { diff --git a/src/zenutil/logging.cpp b/src/zenutil/logging.cpp index d0a6ac0b4..d82789e42 100644 --- a/src/zenutil/logging.cpp +++ b/src/zenutil/logging.cpp @@ -12,6 +12,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <zencore/compactbinary.h> #include <zencore/filesystem.h> +#include <zencore/logging.h> #include <zencore/string.h> #include <zenutil/logging/fullformatter.h> #include <zenutil/logging/jsonformatter.h> @@ -152,21 +153,21 @@ BeginInitializeLogging(const LoggingOptions& LogOptions) void FinishInitializeLogging(const LoggingOptions& LogOptions) { - spdlog::level::level_enum LogLevel = spdlog::level::info; + logging::level::LogLevel LogLevel = logging::level::Info; if (LogOptions.IsDebug) { - LogLevel = spdlog::level::debug; + LogLevel = logging::level::Debug; } if (LogOptions.IsTest) { - LogLevel = spdlog::level::trace; + LogLevel = logging::level::Trace; } // Configure all registered loggers according to settings - spdlog::set_level(LogLevel); + logging::RefreshLogLevels(LogLevel); spdlog::flush_on(spdlog::level::err); spdlog::flush_every(std::chrono::seconds{2}); spdlog::set_formatter(std::make_unique<logging::full_formatter>(LogOptions.LogId, std::chrono::system_clock::now())); diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp index b511b0c5c..3ae302064 100644 --- a/src/zenutil/workerpools.cpp +++ b/src/zenutil/workerpools.cpp @@ -14,6 +14,8 @@ namespace { const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency()); const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 1u)); + static bool IsShutDown = false; + RwLock PoolLock; std::unique_ptr<WorkerThreadPool> LargeWorkerPool; @@ -32,6 +34,7 @@ GetLargeWorkerPool() } } RwLock::ExclusiveLockScope _(PoolLock); + ZEN_ASSERT(!IsShutDown); if (LargeWorkerPool) { return *LargeWorkerPool; @@ -51,6 +54,7 @@ GetSmallWorkerPool() } } RwLock::ExclusiveLockScope _(PoolLock); + ZEN_ASSERT(!IsShutDown); if (SmallWorkerPool) { return *SmallWorkerPool; @@ -70,6 +74,7 @@ GetSyncWorkerPool() } } RwLock::ExclusiveLockScope _(PoolLock); + ZEN_ASSERT(!IsShutDown); if (SyncWorkerPool) { return *SyncWorkerPool; @@ -82,6 +87,7 @@ void ShutdownWorkerPools() { RwLock::ExclusiveLockScope _(PoolLock); + IsShutDown = true; LargeWorkerPool.reset(); SmallWorkerPool.reset(); SyncWorkerPool.reset(); |