aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzousar <[email protected]>2023-12-07 08:48:04 -0700
committerGitHub <[email protected]>2023-12-07 08:48:04 -0700
commit6229149482f00893afa6874cc75d5e5ed0c438a9 (patch)
tree531317314903da569eea099c4a07e721de659b93
parentChange naming to ChunkInfos instead of Chunks (diff)
parentUpdate CHANGELOG.md (diff)
downloadzen-zs/get-all-chunk-infos.tar.xz
zen-zs/get-all-chunk-infos.zip
Merge branch 'main' into zs/get-all-chunk-infoszs/get-all-chunk-infos
-rw-r--r--CHANGELOG.md11
-rw-r--r--VERSION.txt2
-rw-r--r--src/transports/transport-sdk/include/transportplugin.h18
-rw-r--r--src/transports/winsock/winsock.cpp32
-rw-r--r--src/transports/winsock/xmake.lua4
-rw-r--r--src/transports/xmake.lua4
-rw-r--r--src/zencore/include/zencore/logbase.h3
-rw-r--r--src/zencore/include/zencore/logging.h4
-rw-r--r--src/zencore/logging.cpp89
-rw-r--r--src/zenhttp/include/zenhttp/httpserver.h10
-rw-r--r--src/zenhttp/servers/httpasio.cpp5
-rw-r--r--src/zenhttp/servers/httpmulti.cpp5
-rw-r--r--src/zenhttp/servers/httpmulti.h2
-rw-r--r--src/zenhttp/servers/httpnull.cpp3
-rw-r--r--src/zenhttp/servers/httpnull.h2
-rw-r--r--src/zenhttp/servers/httpplugin.cpp177
-rw-r--r--src/zenhttp/servers/httpsys.cpp5
-rw-r--r--src/zenhttp/transports/asiotransport.cpp20
-rw-r--r--src/zenhttp/transports/dlltransport.cpp82
-rw-r--r--src/zenhttp/transports/winsocktransport.cpp32
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp63
-rw-r--r--src/zenserver/config.cpp25
-rw-r--r--src/zenserver/config.h2
-rw-r--r--src/zenserver/diag/logging.cpp4
-rw-r--r--src/zenserver/zenserver.cpp7
-rw-r--r--src/zenstore/blockstore.cpp31
-rw-r--r--src/zenstore/compactcas.cpp25
-rw-r--r--src/zenstore/filecas.cpp6
-rw-r--r--src/zenstore/filecas.h4
-rw-r--r--src/zenstore/include/zenstore/blockstore.h38
-rw-r--r--src/zenstore/include/zenstore/cidstore.h4
-rw-r--r--src/zenutil/logging.cpp9
-rw-r--r--src/zenutil/workerpools.cpp6
33 files changed, 523 insertions, 211 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 64b53f31c..d159cf2c3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,13 +2,17 @@
- Feature: Added xmake task `updatefrontend` which updates the zip file containing the frontend html (`/src/zenserver/frontend/html.zip`)
- Feature: Added `--powercycle` option to zenserver which causes it do shut down immediately after initialization is completed. This is useful for profiling startup/shutdown primarily but could also be useful for some kinds of validation/state upgrade scenarios
- Feature: New endpoint `/admin/gc-stop` to cancel a running garbage collect operation
-- Feature: Added `zen gc-stop` command to cancel a running garbage collect operation`
+- Feature: Added `zen gc-stop` command to cancel a running garbage collect operation
+- Feature: Added ability to configure logger verbosity on the command line. You can now use `--log-debug=http_requests` to configure the `http_requests` logger to DEBUG level. The provided options are `--log-trace`, `--log-debug`, `--log-info`, `--log-warn`, `--log-error`, `--log-critical`, `--log-off` and each accepts a comma-separated list of logger names to apply the threshold to.
- Bugfix: Fix sentry host name where last character of name was being truncated
- Bugfix: GCv2 - make sure to discover all projects and oplogs before checking for expired data
- Bugfix: Fix sync of log position and state log when writing cas index snapshot
- Bugfix: Make sure we can override flags to "false" when running `zen gc` commmand
- `smallobjects`, `skipcid`, `skipdelete`, `verbose`
- Bugfix: fixed file log timestamp format so the milliseconds are appended after the time not the date
+- Bugfix: Shut down thread pools earlier so worker threads have a chance to terminate before main thread calls `atexit()`
+- Bugfix: Use correct lookup index when checking for memcached buffer when finding references in diskcache GC
+- Bugfix: CasContainerStrategy::ReadIndexFile issue could cause CAS items to not be found after a shutdown/restart cycle
- Improvement: The frontend html content is no longer appended at the end of the executable which prevented signing, instead it is compiled in from the `/src/zenserver/frontend/html.zip` archive
- Improvement: MacOS now does ad-hoc code signing by default when issuing `xmake bundle`, signing with proper cert is done on CI builds
- Improvement: Updated branding to be consistent with current working name ("Unreal Zen Storage Server" etc)
@@ -24,6 +28,11 @@
- Improvement: GCv2: Exit as soon as no more unreferenced items are left
- Improvement: Reduce memory usage in GC and diskbucket flush
- Improvement: Added a `{project}/oplog/{log}/chunkinfos` endpoint that can be used for getting all chunk info within an oplog in batch
+- Improvement: Reserve vector sizes in GCv2 to reduce reallocations
+- Improvement: Set min/max load factor for cachedisk/compactcas/filecas indexes to reduce memory footprint
+- Improvement: Added context (upstream host name) to Zen upstream resolve error message
+- Improvement: Make a more accurate estimation of memory usage for in-memory cache values
+- Improvement: Added detailed debug logging for pluggable transports
## 0.2.35
- Bugfix: Fix timeout calculation for semtimedop call
diff --git a/VERSION.txt b/VERSION.txt
index 83a18a49a..48a7bba93 100644
--- a/VERSION.txt
+++ b/VERSION.txt
@@ -1 +1 @@
-0.2.36-pre1 \ No newline at end of file
+0.2.36-pre3 \ No newline at end of file
diff --git a/src/transports/transport-sdk/include/transportplugin.h b/src/transports/transport-sdk/include/transportplugin.h
index 2a3b8075f..4347868e6 100644
--- a/src/transports/transport-sdk/include/transportplugin.h
+++ b/src/transports/transport-sdk/include/transportplugin.h
@@ -77,11 +77,12 @@ public:
class TransportPlugin
{
public:
- virtual uint32_t AddRef() const = 0;
- virtual uint32_t Release() const = 0;
- virtual void Configure(const char* OptionTag, const char* OptionValue) = 0;
- virtual void Initialize(TransportServer* ServerInterface) = 0;
- virtual void Shutdown() = 0;
+ virtual uint32_t AddRef() const = 0;
+ virtual uint32_t Release() const = 0;
+ virtual void Configure(const char* OptionTag, const char* OptionValue) = 0;
+ virtual void Initialize(TransportServer* ServerInterface) = 0;
+ virtual void Shutdown() = 0;
+ virtual const char* GetDebugName() = 0;
/** Check whether this transport is usable.
*/
@@ -99,9 +100,10 @@ public:
class TransportConnection
{
public:
- virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) = 0;
- virtual void Shutdown(bool Receive, bool Transmit) = 0;
- virtual void CloseConnection() = 0;
+ virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) = 0;
+ virtual void Shutdown(bool Receive, bool Transmit) = 0;
+ virtual void CloseConnection() = 0;
+ virtual const char* GetDebugName() = 0;
};
} // namespace zen
diff --git a/src/transports/winsock/winsock.cpp b/src/transports/winsock/winsock.cpp
index 28ac10ec1..7ee2b5ed1 100644
--- a/src/transports/winsock/winsock.cpp
+++ b/src/transports/winsock/winsock.cpp
@@ -51,12 +51,13 @@ public:
// TransportPlugin implementation
- virtual uint32_t AddRef() const override;
- virtual uint32_t Release() const override;
- virtual void Configure(const char* OptionTag, const char* OptionValue) override;
- virtual void Initialize(TransportServer* ServerInterface) override;
- virtual void Shutdown() override;
- virtual bool IsAvailable() override;
+ virtual uint32_t AddRef() const override;
+ virtual uint32_t Release() const override;
+ virtual void Configure(const char* OptionTag, const char* OptionValue) override;
+ virtual void Initialize(TransportServer* ServerInterface) override;
+ virtual void Shutdown() override;
+ virtual const char* GetDebugName() override;
+ virtual bool IsAvailable() override;
private:
TransportServer* m_ServerInterface = nullptr;
@@ -80,9 +81,10 @@ public:
// TransportConnection implementation
- virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override;
- virtual void Shutdown(bool Receive, bool Transmit) override;
- virtual void CloseConnection() override;
+ virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override;
+ virtual void Shutdown(bool Receive, bool Transmit) override;
+ virtual void CloseConnection() override;
+ virtual const char* GetDebugName() override;
private:
zen::Ref<TransportServerConnection> m_ConnectionHandler;
@@ -153,6 +155,12 @@ WinsockTransportConnection::CloseConnection()
m_ClientSocket = 0;
}
+const char*
+WinsockTransportConnection::GetDebugName()
+{
+ return nullptr;
+}
+
int64_t
WinsockTransportConnection::WriteBytes(const void* Buffer, size_t DataSize)
{
@@ -342,6 +350,12 @@ WinsockTransportPlugin::Shutdown()
}
}
+const char*
+WinsockTransportPlugin::GetDebugName()
+{
+ return nullptr;
+}
+
bool
WinsockTransportPlugin::IsAvailable()
{
diff --git a/src/transports/winsock/xmake.lua b/src/transports/winsock/xmake.lua
index 552a62702..c14283546 100644
--- a/src/transports/winsock/xmake.lua
+++ b/src/transports/winsock/xmake.lua
@@ -6,9 +6,9 @@ target("winsock")
add_headerfiles("**.h")
add_files("**.cpp")
add_links("Ws2_32")
- add_includedirs(".", "../../zenbase/include")
+ add_includedirs(".")
set_symbols("debug")
- add_deps("transport-sdk")
+ add_deps("zenbase", "transport-sdk")
if is_mode("release") then
set_optimize("fastest")
diff --git a/src/transports/xmake.lua b/src/transports/xmake.lua
index 44800a8af..78d637d85 100644
--- a/src/transports/xmake.lua
+++ b/src/transports/xmake.lua
@@ -5,6 +5,10 @@ set_languages("cxx20")
includes('transport-sdk')
+if os.isdir('zenbase') then
+ includes('zenbase')
+end
+
if is_plat("windows") then
includes("winsock")
end
diff --git a/src/zencore/include/zencore/logbase.h b/src/zencore/include/zencore/logbase.h
index ad873aa51..00af68b0a 100644
--- a/src/zencore/include/zencore/logbase.h
+++ b/src/zencore/include/zencore/logbase.h
@@ -90,6 +90,9 @@ struct LoggerRef
bool ShouldLog(int Level) const;
inline operator bool() const { return SpdLogger != nullptr; }
+ void SetLogLevel(logging::level::LogLevel NewLogLevel);
+ logging::level::LogLevel GetLogLevel();
+
spdlog::logger* SpdLogger = nullptr;
};
diff --git a/src/zencore/include/zencore/logging.h b/src/zencore/include/zencore/logging.h
index d14d1ab8d..8b76d754c 100644
--- a/src/zencore/include/zencore/logging.h
+++ b/src/zencore/include/zencore/logging.h
@@ -35,6 +35,10 @@ LoggerRef ErrorLog();
void SetErrorLog(std::string_view LoggerId);
LoggerRef Get(std::string_view Name);
+void ConfigureLogLevels(level::LogLevel Level, std::string_view Loggers);
+void RefreshLogLevels();
+void RefreshLogLevels(level::LogLevel DefaultLevel);
+
struct LogCategory
{
inline LogCategory(std::string_view InCategory) : CategoryName(InCategory) {}
diff --git a/src/zencore/logging.cpp b/src/zencore/logging.cpp
index 434c461ae..0bf07affd 100644
--- a/src/zencore/logging.cpp
+++ b/src/zencore/logging.cpp
@@ -4,10 +4,14 @@
#include <zencore/string.h>
#include <zencore/testing.h>
+#include <zencore/thread.h>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <spdlog/details/registry.h>
#include <spdlog/sinks/null_sink.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/spdlog.h>
+ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_PLATFORM_WINDOWS
# pragma section(".zlog$a", read)
@@ -46,6 +50,8 @@ LoggingContext::~LoggingContext()
{
}
+//////////////////////////////////////////////////////////////////////////
+
static inline bool
IsErrorLevel(int LogLevel)
{
@@ -176,8 +182,77 @@ ToStringView(level::LogLevel Level)
} // namespace zen::logging::level
+//////////////////////////////////////////////////////////////////////////
+
namespace zen::logging {
+RwLock LogLevelsLock;
+std::string LogLevels[level::LogLevelCount];
+
+void
+ConfigureLogLevels(level::LogLevel Level, std::string_view Loggers)
+{
+ RwLock::ExclusiveLockScope _(LogLevelsLock);
+ LogLevels[Level] = Loggers;
+}
+
+void
+RefreshLogLevels(level::LogLevel* DefaultLevel)
+{
+ spdlog::details::registry::log_levels Levels;
+
+ {
+ RwLock::SharedLockScope _(LogLevelsLock);
+
+ for (int i = 0; i < level::LogLevelCount; ++i)
+ {
+ level::LogLevel CurrentLevel{i};
+
+ std::string_view Spec = LogLevels[i];
+
+ while (!Spec.empty())
+ {
+ std::string LoggerName;
+
+ if (auto CommaPos = Spec.find_first_of(','); CommaPos != std::string_view::npos)
+ {
+ LoggerName = Spec.substr(CommaPos + 1);
+ Spec.remove_prefix(CommaPos + 1);
+ }
+ else
+ {
+ LoggerName = Spec;
+ Spec = {};
+ }
+
+ Levels[LoggerName] = to_spdlog_level(CurrentLevel);
+ }
+ }
+ }
+
+ if (DefaultLevel)
+ {
+ spdlog::level::level_enum SpdDefaultLevel = to_spdlog_level(*DefaultLevel);
+ spdlog::details::registry::instance().set_levels(Levels, &SpdDefaultLevel);
+ }
+ else
+ {
+ spdlog::details::registry::instance().set_levels(Levels, nullptr);
+ }
+}
+
+void
+RefreshLogLevels(level::LogLevel DefaultLevel)
+{
+ RefreshLogLevels(&DefaultLevel);
+}
+
+void
+RefreshLogLevels()
+{
+ RefreshLogLevels(nullptr);
+}
+
void
SetLogLevel(level::LogLevel NewLogLevel)
{
@@ -240,6 +315,7 @@ Get(std::string_view Name)
if (!Logger)
{
Logger = Default().SpdLogger->clone(std::string(Name));
+ spdlog::apply_logger_env_levels(Logger);
spdlog::register_logger(Logger);
}
@@ -262,6 +338,7 @@ ConsoleLog()
if (!ConLogger)
{
ConLogger = spdlog::stdout_color_mt("console");
+ spdlog::apply_logger_env_levels(ConLogger);
ConLogger->set_pattern("%v");
}
@@ -320,6 +397,18 @@ LoggerRef::ShouldLog(int Level) const
return SpdLogger->should_log(static_cast<spdlog::level::level_enum>(Level));
}
+void
+LoggerRef::SetLogLevel(logging::level::LogLevel NewLogLevel)
+{
+ SpdLogger->set_level(to_spdlog_level(NewLogLevel));
+}
+
+logging::level::LogLevel
+LoggerRef::GetLogLevel()
+{
+ return logging::level::to_logging_level(SpdLogger->level());
+}
+
thread_local ScopedActivityBase* t_ScopeStack = nullptr;
ScopedActivityBase*
diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h
index eabad4728..1089dd221 100644
--- a/src/zenhttp/include/zenhttp/httpserver.h
+++ b/src/zenhttp/include/zenhttp/httpserver.h
@@ -175,11 +175,11 @@ private:
class HttpServer : public RefCounted
{
public:
- virtual void RegisterService(HttpService& Service) = 0;
- virtual int Initialize(int BasePort) = 0;
- virtual void Run(bool IsInteractiveSession) = 0;
- virtual void RequestExit() = 0;
- virtual void Close() = 0;
+ virtual void RegisterService(HttpService& Service) = 0;
+ virtual int Initialize(int BasePort, std::filesystem::path DataDir) = 0;
+ virtual void Run(bool IsInteractiveSession) = 0;
+ virtual void RequestExit() = 0;
+ virtual void Close() = 0;
};
struct HttpServerConfig
diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp
index c62aca001..9fca314b3 100644
--- a/src/zenhttp/servers/httpasio.cpp
+++ b/src/zenhttp/servers/httpasio.cpp
@@ -941,7 +941,7 @@ public:
~HttpAsioServer();
virtual void RegisterService(HttpService& Service) override;
- virtual int Initialize(int BasePort) override;
+ virtual int Initialize(int BasePort, std::filesystem::path DataDir) override;
virtual void Run(bool IsInteractiveSession) override;
virtual void RequestExit() override;
virtual void Close() override;
@@ -992,8 +992,9 @@ HttpAsioServer::RegisterService(HttpService& Service)
}
int
-HttpAsioServer::Initialize(int BasePort)
+HttpAsioServer::Initialize(int BasePort, std::filesystem::path DataDir)
{
+ ZEN_UNUSED(DataDir);
m_BasePort = m_Impl->Start(gsl::narrow<uint16_t>(BasePort), m_ForceLoopback, m_ThreadCount);
return m_BasePort;
}
diff --git a/src/zenhttp/servers/httpmulti.cpp b/src/zenhttp/servers/httpmulti.cpp
index d8ebdc9c0..2a6a90d2e 100644
--- a/src/zenhttp/servers/httpmulti.cpp
+++ b/src/zenhttp/servers/httpmulti.cpp
@@ -28,15 +28,16 @@ HttpMultiServer::RegisterService(HttpService& Service)
}
int
-HttpMultiServer::Initialize(int BasePort)
+HttpMultiServer::Initialize(int BasePort, std::filesystem::path DataDir)
{
+ ZEN_UNUSED(DataDir);
ZEN_ASSERT(!m_IsInitialized);
int EffectivePort = 0;
for (auto& Server : m_Servers)
{
- const int InitializeResult = Server->Initialize(BasePort);
+ const int InitializeResult = Server->Initialize(BasePort, DataDir);
if (!EffectivePort)
{
diff --git a/src/zenhttp/servers/httpmulti.h b/src/zenhttp/servers/httpmulti.h
index d5b21d3c3..53cf57568 100644
--- a/src/zenhttp/servers/httpmulti.h
+++ b/src/zenhttp/servers/httpmulti.h
@@ -16,7 +16,7 @@ public:
~HttpMultiServer();
virtual void RegisterService(HttpService& Service) override;
- virtual int Initialize(int BasePort) override;
+ virtual int Initialize(int BasePort, std::filesystem::path DataDir) override;
virtual void Run(bool IsInteractiveSession) override;
virtual void RequestExit() override;
virtual void Close() override;
diff --git a/src/zenhttp/servers/httpnull.cpp b/src/zenhttp/servers/httpnull.cpp
index 7d3e9079a..9ac1c61ce 100644
--- a/src/zenhttp/servers/httpnull.cpp
+++ b/src/zenhttp/servers/httpnull.cpp
@@ -25,8 +25,9 @@ HttpNullServer::RegisterService(HttpService& Service)
}
int
-HttpNullServer::Initialize(int BasePort)
+HttpNullServer::Initialize(int BasePort, std::filesystem::path DataDir)
{
+ ZEN_UNUSED(DataDir);
return BasePort;
}
diff --git a/src/zenhttp/servers/httpnull.h b/src/zenhttp/servers/httpnull.h
index 965e729f7..818020604 100644
--- a/src/zenhttp/servers/httpnull.h
+++ b/src/zenhttp/servers/httpnull.h
@@ -18,7 +18,7 @@ public:
~HttpNullServer();
virtual void RegisterService(HttpService& Service) override;
- virtual int Initialize(int BasePort) override;
+ virtual int Initialize(int BasePort, std::filesystem::path DataDir) override;
virtual void Run(bool IsInteractiveSession) override;
virtual void RequestExit() override;
virtual void Close() override;
diff --git a/src/zenhttp/servers/httpplugin.cpp b/src/zenhttp/servers/httpplugin.cpp
index 4ae7cd87a..3eed9db8f 100644
--- a/src/zenhttp/servers/httpplugin.cpp
+++ b/src/zenhttp/servers/httpplugin.cpp
@@ -7,7 +7,11 @@
# include "httpparser.h"
# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
# include <zencore/logging.h>
+# include <zencore/scopeguard.h>
+# include <zencore/session.h>
# include <zencore/thread.h>
# include <zencore/trace.h>
# include <zenhttp/httpserver.h>
@@ -15,6 +19,8 @@
# include <memory>
# include <string_view>
+# include <fmt/format.h>
+
# if ZEN_PLATFORM_WINDOWS
# include <conio.h>
# endif
@@ -38,17 +44,21 @@ using namespace std::literals;
struct HttpPluginConnectionHandler : public TransportServerConnection, public HttpRequestParserCallbacks, RefCounted
{
+ HttpPluginConnectionHandler();
+ ~HttpPluginConnectionHandler();
+
+ // TransportServerConnection
+
virtual uint32_t AddRef() const override;
virtual uint32_t Release() const override;
-
- virtual void OnBytesRead(const void* Buffer, size_t DataSize) override;
+ virtual void OnBytesRead(const void* Buffer, size_t DataSize) override;
// HttpRequestParserCallbacks
virtual void HandleRequest() override;
virtual void TerminateConnection() override;
- void Initialize(TransportConnection* Transport, HttpPluginServerImpl& Server);
+ void Initialize(TransportConnection* Transport, HttpPluginServerImpl& Server, uint32_t ConnectionId);
private:
enum class RequestState
@@ -65,7 +75,8 @@ private:
RequestState m_RequestState = RequestState::kInitialState;
HttpRequestParser m_RequestParser{*this};
- uint32_t m_ConnectionId = 0;
+ uint32_t m_ConnectionId = 0;
+ std::atomic_uint32_t m_RequestCounter = 0;
Ref<IHttpPackageHandler> m_PackageHandler;
TransportConnection* m_TransportConnection = nullptr;
@@ -82,7 +93,7 @@ struct HttpPluginServerImpl : public HttpPluginServer, TransportServer
// HttpPluginServer
virtual void RegisterService(HttpService& Service) override;
- virtual int Initialize(int BasePort) override;
+ virtual int Initialize(int BasePort, std::filesystem::path DataDir) override;
virtual void Run(bool IsInteractiveSession) override;
virtual void RequestExit() override;
virtual void Close() override;
@@ -92,6 +103,8 @@ struct HttpPluginServerImpl : public HttpPluginServer, TransportServer
HttpService* RouteRequest(std::string_view Url);
+ void WriteDebugPayload(std::string_view Filename, const std::span<const IoBuffer> Payload);
+
struct ServiceEntry
{
std::string ServiceUrlPath;
@@ -103,6 +116,11 @@ struct HttpPluginServerImpl : public HttpPluginServer, TransportServer
std::vector<ServiceEntry> m_UriHandlers;
std::vector<Ref<TransportPlugin>> m_Plugins;
Event m_ShutdownEvent;
+ bool m_IsRequestLoggingEnabled = false;
+ LoggerRef m_RequestLog;
+ std::atomic_uint32_t m_ConnectionIdCounter{0};
+ std::filesystem::path m_DataDir; // Application data directory
+ std::filesystem::path m_PayloadDir; // Request debugging payload directory
// TransportServer
@@ -147,14 +165,20 @@ public:
HttpPluginResponse() = default;
explicit HttpPluginResponse(HttpContentType ContentType) : m_ContentType(ContentType) {}
+ HttpPluginResponse(const HttpPluginResponse&) = delete;
+ HttpPluginResponse& operator=(const HttpPluginResponse&) = delete;
+
void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> BlobList);
- inline uint16_t ResponseCode() const { return m_ResponseCode; }
- inline uint64_t ContentLength() const { return m_ContentLength; }
+ inline uint16_t ResponseCode() const { return m_ResponseCode; }
+ inline uint64_t ContentLength() const { return m_ContentLength; }
+ inline HttpContentType ContentType() const { return m_ContentType; }
const std::vector<IoBuffer>& ResponseBuffers() const { return m_ResponseBuffers; }
void SuppressPayload() { m_ResponseBuffers.resize(1); }
+ std::string_view GetHeaders();
+
private:
uint16_t m_ResponseCode = 0;
bool m_IsKeepAlive = true;
@@ -162,8 +186,6 @@ private:
uint64_t m_ContentLength = 0;
std::vector<IoBuffer> m_ResponseBuffers;
ExtendableStringBuilder<160> m_Headers;
-
- std::string_view GetHeaders();
};
void
@@ -210,27 +232,55 @@ HttpPluginResponse::InitializeForPayload(uint16_t ResponseCode, std::span<IoBuff
std::string_view
HttpPluginResponse::GetHeaders()
{
- m_Headers << "HTTP/1.1 " << ResponseCode() << " " << ReasonStringForHttpResultCode(ResponseCode()) << "\r\n"
- << "Content-Type: " << MapContentTypeToString(m_ContentType) << "\r\n"
- << "Content-Length: " << ContentLength() << "\r\n"sv;
-
- if (!m_IsKeepAlive)
+ if (m_Headers.Size() == 0)
{
- m_Headers << "Connection: close\r\n"sv;
- }
+ m_Headers << "HTTP/1.1 " << ResponseCode() << " " << ReasonStringForHttpResultCode(ResponseCode()) << "\r\n"
+ << "Content-Type: " << MapContentTypeToString(m_ContentType) << "\r\n"
+ << "Content-Length: " << ContentLength() << "\r\n"sv;
+
+ if (!m_IsKeepAlive)
+ {
+ m_Headers << "Connection: close\r\n"sv;
+ }
- m_Headers << "\r\n"sv;
+ m_Headers << "\r\n"sv;
+ }
return m_Headers;
}
//////////////////////////////////////////////////////////////////////////
+HttpPluginConnectionHandler::HttpPluginConnectionHandler()
+{
+}
+
+HttpPluginConnectionHandler::~HttpPluginConnectionHandler()
+{
+ if (m_Server)
+ {
+ ZEN_LOG_TRACE(m_Server->m_RequestLog, "END connection #{}", m_ConnectionId);
+ }
+}
+
void
-HttpPluginConnectionHandler::Initialize(TransportConnection* Transport, HttpPluginServerImpl& Server)
+HttpPluginConnectionHandler::Initialize(TransportConnection* Transport, HttpPluginServerImpl& Server, uint32_t ConnectionId)
{
m_TransportConnection = Transport;
m_Server = &Server;
+ m_ConnectionId = ConnectionId;
+
+ std::string_view ConnectionName;
+ if (const char* Name = Transport->GetDebugName())
+ {
+ ConnectionName = Name;
+ }
+ else
+ {
+ ConnectionName = "anonymous";
+ }
+
+ ZEN_LOG_TRACE(m_Server->m_RequestLog, "NEW connection #{} ('')", m_ConnectionId, ConnectionName);
}
uint32_t
@@ -248,13 +298,19 @@ HttpPluginConnectionHandler::Release() const
void
HttpPluginConnectionHandler::OnBytesRead(const void* Buffer, size_t AvailableBytes)
{
+ ZEN_ASSERT(m_Server);
+
+ ZEN_LOG_TRACE(m_Server->m_RequestLog, "connection #{} OnBytesRead: {}", m_ConnectionId, AvailableBytes);
+
while (AvailableBytes)
{
const size_t ConsumedBytes = m_RequestParser.ConsumeData((const char*)Buffer, AvailableBytes);
if (ConsumedBytes == ~0ull)
{
- // terminate connection
+ // request parser error -- terminate connection
+
+ ZEN_LOG_TRACE(m_Server->m_RequestLog, "connection #{} terminating due to request parsing error", m_ConnectionId);
return TerminateConnection();
}
@@ -269,15 +325,21 @@ HttpPluginConnectionHandler::OnBytesRead(const void* Buffer, size_t AvailableByt
void
HttpPluginConnectionHandler::HandleRequest()
{
+ ZEN_ASSERT(m_Server);
+
+ const uint32_t RequestNumber = m_RequestCounter.fetch_add(1);
+
+ ZEN_LOG_TRACE(m_Server->m_RequestLog, "connection #{} ENTER HandleRequest #{}", m_ConnectionId, RequestNumber);
+ auto $Exit =
+ MakeGuard([&] { ZEN_LOG_TRACE(m_Server->m_RequestLog, "connection #{} EXIT HandleRequest #{}", m_ConnectionId, RequestNumber); });
+
if (!m_RequestParser.IsKeepAlive())
{
// Once response has been written, connection is done
m_RequestState = RequestState::kWritingFinal;
- // We're not going to read any more data from this socket
-
- const bool Receive = true;
- const bool Transmit = false;
+ const bool Receive = true; // We're not going to read any more data from this socket
+ const bool Transmit = false; // We will write more data however
m_TransportConnection->Shutdown(Receive, Transmit);
}
else
@@ -300,6 +362,24 @@ HttpPluginConnectionHandler::HandleRequest()
HttpPluginServerRequest Request(m_RequestParser, *Service, m_RequestParser.Body());
+ const HttpVerb RequestVerb = Request.RequestVerb();
+ const std::string_view Uri = Request.RelativeUri();
+
+ if (m_Server->m_RequestLog.ShouldLog(logging::level::Trace))
+ {
+ ZEN_LOG_TRACE(m_Server->m_RequestLog,
+ "connection #{} Handling Request: {} {} ({} bytes ({}), accept: {})",
+ m_ConnectionId,
+ ToString(RequestVerb),
+ Uri,
+ Request.ContentLength(),
+ ToString(Request.RequestContentType()),
+ ToString(Request.AcceptContentType()));
+
+ m_Server->WriteDebugPayload(fmt::format("request_{}_{}.bin", m_ConnectionId, RequestNumber),
+ std::vector<IoBuffer>{Request.ReadPayload()});
+ }
+
if (!HandlePackageOffers(*Service, Request, m_PackageHandler))
{
try
@@ -340,6 +420,17 @@ HttpPluginConnectionHandler::HandleRequest()
if (std::unique_ptr<HttpPluginResponse> Response = std::move(Request.m_Response))
{
+ {
+ const uint16_t ResponseCode = Response->ResponseCode();
+ ZEN_LOG_TRACE(m_Server->m_RequestLog,
+ "connection #{} Response: {} {} ({} bytes, {})",
+ m_ConnectionId,
+ ResponseCode,
+ ToString(HttpResponseCode(ResponseCode)),
+ Response->ContentLength(),
+ ToString(Response->ContentType()));
+ }
+
// Transmit the response
if (m_RequestParser.RequestVerb() == HttpVerb::kHead)
@@ -349,10 +440,19 @@ HttpPluginConnectionHandler::HandleRequest()
const std::vector<IoBuffer>& ResponseBuffers = Response->ResponseBuffers();
- //// TODO: should cork/uncork for Linux?
+ if (m_Server->m_RequestLog.ShouldLog(logging::level::Trace))
+ {
+ m_Server->WriteDebugPayload(fmt::format("response_{}_{}.bin", m_ConnectionId, RequestNumber), ResponseBuffers);
+ }
for (const IoBuffer& Buffer : ResponseBuffers)
{
+ ZEN_LOG_TRACE(m_Server->m_RequestLog,
+ "connection #{} SEND: {} bytes, {}",
+ m_ConnectionId,
+ Buffer.GetSize(),
+ ToString(Buffer.GetContentType()));
+
int64_t SentBytes = SendBuffer(Buffer);
if (SentBytes < 0)
@@ -558,7 +658,7 @@ HttpPluginServerRequest::TryGetRanges(HttpRanges& Ranges)
//////////////////////////////////////////////////////////////////////////
-HttpPluginServerImpl::HttpPluginServerImpl()
+HttpPluginServerImpl::HttpPluginServerImpl() : m_RequestLog(logging::Get("http_requests"))
{
}
@@ -570,13 +670,19 @@ TransportServerConnection*
HttpPluginServerImpl::CreateConnectionHandler(TransportConnection* Connection)
{
HttpPluginConnectionHandler* Handler{new HttpPluginConnectionHandler()};
- Handler->Initialize(Connection, *this);
+ const uint32_t ConnectionId = m_ConnectionIdCounter.fetch_add(1);
+ Handler->Initialize(Connection, *this, ConnectionId);
return Handler;
}
int
-HttpPluginServerImpl::Initialize(int BasePort)
+HttpPluginServerImpl::Initialize(int BasePort, std::filesystem::path DataDir)
{
+ m_DataDir = DataDir;
+ m_PayloadDir = DataDir / "debug" / GetSessionIdString();
+
+ ZEN_INFO("any debug payloads will be written to '{}'", m_PayloadDir);
+
try
{
RwLock::ExclusiveLockScope _(m_Lock);
@@ -742,6 +848,23 @@ HttpPluginServerImpl::RouteRequest(std::string_view Url)
return CandidateService;
}
+void
+HttpPluginServerImpl::WriteDebugPayload(std::string_view Filename, const std::span<const IoBuffer> Payload)
+{
+ uint64_t PayloadSize = 0;
+ std::vector<const IoBuffer*> Buffers;
+ for (auto& Io : Payload)
+ {
+ Buffers.push_back(&Io);
+ PayloadSize += Io.GetSize();
+ }
+
+ if (PayloadSize)
+ {
+ WriteFile(m_PayloadDir / Filename, Buffers.data(), Buffers.size());
+ }
+}
+
//////////////////////////////////////////////////////////////////////////
struct HttpPluginServerImpl;
diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp
index 0b11d396b..d2cb63cd7 100644
--- a/src/zenhttp/servers/httpsys.cpp
+++ b/src/zenhttp/servers/httpsys.cpp
@@ -43,7 +43,7 @@ public:
// HttpServer interface implementation
- virtual int Initialize(int BasePort) override;
+ virtual int Initialize(int BasePort, std::filesystem::path DataDir) override;
virtual void Run(bool TestMode) override;
virtual void RequestExit() override;
virtual void RegisterService(HttpService& Service) override;
@@ -2012,8 +2012,9 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT
//
int
-HttpSysServer::Initialize(int BasePort)
+HttpSysServer::Initialize(int BasePort, std::filesystem::path DataDir)
{
+ ZEN_UNUSED(DataDir);
if (int EffectivePort = InitializeServer(BasePort))
{
StartServer();
diff --git a/src/zenhttp/transports/asiotransport.cpp b/src/zenhttp/transports/asiotransport.cpp
index ab053a748..a9a782821 100644
--- a/src/zenhttp/transports/asiotransport.cpp
+++ b/src/zenhttp/transports/asiotransport.cpp
@@ -34,12 +34,13 @@ public:
AsioTransportPlugin();
~AsioTransportPlugin();
- virtual uint32_t AddRef() const override;
- virtual uint32_t Release() const override;
- virtual void Configure(const char* OptionTag, const char* OptionValue) override;
- virtual void Initialize(TransportServer* ServerInterface) override;
- virtual void Shutdown() override;
- virtual bool IsAvailable() override;
+ virtual uint32_t AddRef() const override;
+ virtual uint32_t Release() const override;
+ virtual void Configure(const char* OptionTag, const char* OptionValue) override;
+ virtual void Initialize(TransportServer* ServerInterface) override;
+ virtual void Shutdown() override;
+ virtual const char* GetDebugName() override { return nullptr; }
+ virtual bool IsAvailable() override;
private:
bool m_IsOk = true;
@@ -63,9 +64,10 @@ struct AsioTransportConnection : public TransportConnection, std::enable_shared_
// TransportConnectionInterface
- virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override;
- virtual void Shutdown(bool Receive, bool Transmit) override;
- virtual void CloseConnection() override;
+ virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override;
+ virtual void Shutdown(bool Receive, bool Transmit) override;
+ virtual void CloseConnection() override;
+ virtual const char* GetDebugName() override { return nullptr; }
private:
void EnqueueRead();
diff --git a/src/zenhttp/transports/dlltransport.cpp b/src/zenhttp/transports/dlltransport.cpp
index dd4479e39..e09e62ec5 100644
--- a/src/zenhttp/transports/dlltransport.cpp
+++ b/src/zenhttp/transports/dlltransport.cpp
@@ -19,69 +19,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
-struct DllTransportConnection : public TransportConnection
-{
-public:
- DllTransportConnection();
- ~DllTransportConnection();
-
- void Initialize(TransportServerConnection& ServerConnection);
- void HandleConnection();
-
- // TransportConnection
-
- virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override;
- virtual void Shutdown(bool Receive, bool Transmit) override;
- virtual void CloseConnection() override;
-
-private:
- Ref<TransportServerConnection> m_ConnectionHandler;
- bool m_IsTerminated = false;
-};
-
-DllTransportConnection::DllTransportConnection()
-{
-}
-
-DllTransportConnection::~DllTransportConnection()
-{
-}
-
-void
-DllTransportConnection::Initialize(TransportServerConnection& ServerConnection)
-{
- m_ConnectionHandler = &ServerConnection; // TODO: this is awkward
-}
-
-void
-DllTransportConnection::HandleConnection()
-{
-}
-
-void
-DllTransportConnection::CloseConnection()
-{
- if (m_IsTerminated)
- {
- return;
- }
-
- m_IsTerminated = true;
-}
-
-int64_t
-DllTransportConnection::WriteBytes(const void* Buffer, size_t DataSize)
-{
- ZEN_UNUSED(Buffer, DataSize);
- return DataSize;
-}
-
-void
-DllTransportConnection::Shutdown(bool Receive, bool Transmit)
-{
- ZEN_UNUSED(Receive, Transmit);
-}
-
//////////////////////////////////////////////////////////////////////////
struct LoadedDll
@@ -97,12 +34,13 @@ public:
DllTransportPluginImpl();
~DllTransportPluginImpl();
- virtual uint32_t AddRef() const override;
- virtual uint32_t Release() const override;
- virtual void Configure(const char* OptionTag, const char* OptionValue) override;
- virtual void Initialize(TransportServer* ServerInterface) override;
- virtual void Shutdown() override;
- virtual bool IsAvailable() override;
+ virtual uint32_t AddRef() const override;
+ virtual uint32_t Release() const override;
+ virtual void Configure(const char* OptionTag, const char* OptionValue) override;
+ virtual void Initialize(TransportServer* ServerInterface) override;
+ virtual void Shutdown() override;
+ virtual const char* GetDebugName() override;
+ virtual bool IsAvailable() override;
virtual void LoadDll(std::string_view Name) override;
virtual void ConfigureDll(std::string_view Name, const char* OptionTag, const char* OptionValue) override;
@@ -179,6 +117,12 @@ DllTransportPluginImpl::Shutdown()
}
}
+const char*
+DllTransportPluginImpl::GetDebugName()
+{
+ return nullptr;
+}
+
bool
DllTransportPluginImpl::IsAvailable()
{
diff --git a/src/zenhttp/transports/winsocktransport.cpp b/src/zenhttp/transports/winsocktransport.cpp
index 2397dd7cf..7407c55dd 100644
--- a/src/zenhttp/transports/winsocktransport.cpp
+++ b/src/zenhttp/transports/winsocktransport.cpp
@@ -31,9 +31,10 @@ public:
// TransportConnection
- virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override;
- virtual void Shutdown(bool Receive, bool Transmit) override;
- virtual void CloseConnection() override;
+ virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override;
+ virtual void Shutdown(bool Receive, bool Transmit) override;
+ virtual void CloseConnection() override;
+ virtual const char* GetDebugName() override;
private:
Ref<TransportServerConnection> m_ConnectionHandler;
@@ -103,6 +104,12 @@ SocketTransportConnection::CloseConnection()
m_ClientSocket = 0;
}
+const char*
+SocketTransportConnection::GetDebugName()
+{
+ return nullptr;
+}
+
int64_t
SocketTransportConnection::WriteBytes(const void* Buffer, size_t DataSize)
{
@@ -157,12 +164,13 @@ public:
SocketTransportPluginImpl();
~SocketTransportPluginImpl();
- virtual uint32_t AddRef() const override;
- virtual uint32_t Release() const override;
- virtual void Configure(const char* OptionTag, const char* OptionValue) override;
- virtual void Initialize(TransportServer* ServerInterface) override;
- virtual void Shutdown() override;
- virtual bool IsAvailable() override;
+ virtual uint32_t AddRef() const override;
+ virtual uint32_t Release() const override;
+ virtual void Configure(const char* OptionTag, const char* OptionValue) override;
+ virtual void Initialize(TransportServer* ServerInterface) override;
+ virtual void Shutdown() override;
+ virtual const char* GetDebugName() override;
+ virtual bool IsAvailable() override;
private:
TransportServer* m_ServerInterface = nullptr;
@@ -337,6 +345,12 @@ SocketTransportPluginImpl::Shutdown()
}
}
+const char*
+SocketTransportPluginImpl::GetDebugName()
+{
+ return nullptr;
+}
+
//////////////////////////////////////////////////////////////////////////
TransportPlugin*
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 2c344dd1d..700529443 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -209,6 +209,9 @@ namespace {
zen::Sleep(100);
} while (true);
}
+
+ uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize) { return 8u + 32u + RoundUp(PayloadSize, 8u); }
+
} // namespace
namespace fs = std::filesystem;
@@ -655,6 +658,9 @@ BucketManifestSerializer::WriteSidecarFile(const std::filesystem::path&
//////////////////////////////////////////////////////////////////////////
+static const float IndexMinLoadFactor = 0.2f;
+static const float IndexMaxLoadFactor = 0.7f;
+
ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
std::atomic_uint64_t& OuterCacheMemoryUsage,
std::string BucketName,
@@ -665,6 +671,9 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
, m_Configuration(Config)
, m_BucketId(Oid::Zero)
{
+ m_Index.min_load_factor(IndexMinLoadFactor);
+ m_Index.max_load_factor(IndexMaxLoadFactor);
+
if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap")))
{
const uint64_t LegacyOverrideSize = 16 * 1024 * 1024;
@@ -1289,14 +1298,26 @@ ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime)
GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
RwLock::ExclusiveLockScope _(m_IndexLock);
+ if (m_MemCachedPayloads.empty())
+ {
+ return;
+ }
for (const auto& Kv : m_Index)
{
- if (m_AccessTimes[Kv.second] < ExpireTicks)
+ size_t Index = Kv.second;
+ BucketPayload& Payload = m_Payloads[Index];
+ if (!Payload.MemCached)
+ {
+ continue;
+ }
+ if (m_AccessTimes[Index] < ExpireTicks)
{
- BucketPayload& Payload = m_Payloads[Kv.second];
RemoveMemCachedData(Payload);
}
}
+ m_MemCachedPayloads.shrink_to_fit();
+ m_FreeMemCachedPayloads.shrink_to_fit();
+ m_FreeMetaDatas.shrink_to_fit();
}
void
@@ -1305,6 +1326,10 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart,
std::vector<uint64_t>& InOutUsageSlots)
{
RwLock::SharedLockScope _(m_IndexLock);
+ if (m_MemCachedPayloads.empty())
+ {
+ return;
+ }
for (const auto& It : m_Index)
{
size_t Index = It.second;
@@ -1929,10 +1954,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
for (const auto& Entry : StructuredItemsWithUnknownAttachments)
{
- const IoHash& Key = Entry.first;
- size_t PayloadIndex = Entry.second;
- BucketPayload& Payload = Payloads[PayloadIndex];
- const DiskLocation& Loc = Payload.Location;
+ const IoHash& Key = Entry.first;
+ BucketPayload& Payload = Payloads[Entry.second];
+ const DiskLocation& Loc = Payload.Location;
{
IoBuffer Buffer;
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
@@ -1955,7 +1979,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
#endif // CALCULATE_BLOCKING_TIME
if (auto It = m_Index.find(Key); It != m_Index.end())
{
- const BucketPayload& CachedPayload = Payloads[PayloadIndex];
+ const BucketPayload& CachedPayload = Payloads[It->second];
if (CachedPayload.MemCached)
{
Buffer = m_MemCachedPayloads[CachedPayload.MemCached];
@@ -2630,7 +2654,7 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(BucketPayload& Payload, IoBuffe
{
Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(m_MemCachedPayloads.size()));
m_MemCachedPayloads.push_back(MemCachedData);
- AddMemCacheUsage(PayloadSize);
+ AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
m_MemoryWriteCount++;
}
}
@@ -2639,7 +2663,7 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(BucketPayload& Payload, IoBuffe
Payload.MemCached = m_FreeMemCachedPayloads.back();
m_FreeMemCachedPayloads.pop_back();
m_MemCachedPayloads[Payload.MemCached] = MemCachedData;
- AddMemCacheUsage(PayloadSize);
+ AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
m_MemoryWriteCount++;
}
}
@@ -2650,7 +2674,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(BucketPayload& Payload)
if (Payload.MemCached)
{
size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].GetSize();
- RemoveMemCacheUsage(PayloadSize);
+ RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
m_MemCachedPayloads[Payload.MemCached] = IoBuffer{};
m_FreeMemCachedPayloads.push_back(Payload.MemCached);
Payload.MemCached = {};
@@ -2844,7 +2868,8 @@ public:
m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys = std::make_unique<HashSet>(); });
auto __ = MakeGuard([&]() { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys.reset(); }); });
- std::unordered_map<uint32_t, uint64_t> BlockUsage;
+ size_t InlineEntryCount = 0;
+ BlockStore::BlockUsageMap BlockUsage;
{
RwLock::SharedLockScope ___(m_Bucket.m_IndexLock);
for (const auto& Entry : m_Bucket.m_Index)
@@ -2857,15 +2882,17 @@ public:
{
continue;
}
+ InlineEntryCount++;
uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex();
uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment);
if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end())
{
- It->second += ChunkSize;
+ It->second.EntryCount++;
+ It->second.DiskUsage += ChunkSize;
}
else
{
- BlockUsage.insert_or_assign(BlockIndex, ChunkSize);
+ BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1});
}
}
}
@@ -2873,8 +2900,9 @@ public:
{
BlockStoreCompactState BlockCompactState;
std::vector<IoHash> BlockCompactStateKeys;
+ BlockCompactStateKeys.reserve(InlineEntryCount);
- std::vector<uint32_t> BlocksToCompact =
+ BlockStore::BlockEntryCountMap BlocksToCompact =
m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent);
BlockCompactState.IncludeBlocks(BlocksToCompact);
@@ -3149,7 +3177,7 @@ public:
uint32_t Size;
};
std::vector<std::vector<InlineEntry>> EntriesPerBlock;
-
+ size_t UpdateCount = 0;
{
RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock);
for (const auto& Entry : m_CacheBucket.m_Index)
@@ -3174,6 +3202,7 @@ public:
{
continue;
}
+ UpdateCount++;
const IoHash& Key = Entry.first;
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
@@ -3200,6 +3229,8 @@ public:
}
}
+ UpdateKeys.reserve(UpdateCount);
+
for (auto It : BlockIndexToEntriesPerBlockIndex)
{
uint32_t BlockIndex = It.first;
@@ -3652,6 +3683,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloa
FirstReferenceIndex.reserve(EntryCount);
}
Index.reserve(EntryCount);
+ Index.min_load_factor(IndexMinLoadFactor);
+ Index.max_load_factor(IndexMaxLoadFactor);
for (auto It : m_Index)
{
PayloadIndex EntryIndex = PayloadIndex(Payloads.size());
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index fe92613f4..3fe0b0c63 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -9,6 +9,7 @@
#include <zencore/except.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
+#include <zencore/logging.h>
#include <zencore/string.h>
#include <zenhttp/zenhttp.h>
#include <zenutil/basicfile.h>
@@ -519,7 +520,6 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
cxxopts::value<bool>(ServerOptions.IsCleanStart)->default_value("false"));
options.add_options()("help", "Show command line help");
options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(ServerOptions.IsTest)->default_value("false"));
- options.add_options()("log-id", "Specify id for adding context to log output", cxxopts::value<std::string>(ServerOptions.LogId));
options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::string>(DataDir));
options.add_options()("snapshot-dir",
"Specify a snapshot of server state to mirror into the persistence root at startup",
@@ -528,7 +528,6 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_options()("powercycle",
"Exit immediately after initialization is complete",
cxxopts::value<bool>(ServerOptions.IsPowerCycle));
- options.add_options()("abslog", "Path to log file", cxxopts::value<std::string>(AbsLogFile));
options.add_options()("config", "Path to Lua config file", cxxopts::value<std::string>(ConfigFile));
options.add_options()("write-config", "Path to output Lua config file", cxxopts::value<std::string>(OutputConfigFile));
options.add_options()("no-sentry",
@@ -537,7 +536,21 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_options()("sentry-allow-personal-info",
"Allow personally identifiable information in sentry crash reports",
cxxopts::value<bool>(ServerOptions.SentryAllowPII)->default_value("false"));
- options.add_options()("quiet", "Disable console logging", cxxopts::value<bool>(ServerOptions.NoConsoleOutput)->default_value("false"));
+
+ // clang-format off
+ options.add_options("logging")
+ ("abslog", "Path to log file", cxxopts::value<std::string>(AbsLogFile))
+ ("log-id", "Specify id for adding context to log output", cxxopts::value<std::string>(ServerOptions.LogId))
+ ("quiet", "Disable console logging", cxxopts::value<bool>(ServerOptions.NoConsoleOutput)->default_value("false"))
+ ("log-trace", "Change selected loggers to level TRACE", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Trace]))
+ ("log-debug", "Change selected loggers to level DEBUG", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Debug]))
+ ("log-info", "Change selected loggers to level INFO", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Info]))
+ ("log-warn", "Change selected loggers to level WARN", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Warn]))
+ ("log-error", "Change selected loggers to level ERROR", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Err]))
+ ("log-critical", "Change selected loggers to level CRITICAL", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Critical]))
+ ("log-off", "Change selected loggers to level OFF", cxxopts::value<std::string>(ServerOptions.Loggers[logging::level::Off]))
+ ;
+ // clang-format on
options.add_option("security",
"",
@@ -952,6 +965,12 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
exit(0);
}
+ for (int i = 0; i < logging::level::LogLevelCount; ++i)
+ {
+ logging::ConfigureLogLevels(logging::level::LogLevel(i), ServerOptions.Loggers[i]);
+ }
+ logging::RefreshLogLevels();
+
ServerOptions.DataDir = MakeSafePath(DataDir);
ServerOptions.BaseSnapshotDir = MakeSafePath(BaseSnapshotDir);
ServerOptions.ContentDir = MakeSafePath(ContentDir);
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index 8135bf8f0..11311f9d8 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -2,6 +2,7 @@
#pragma once
+#include <zencore/logbase.h>
#include <zencore/zencore.h>
#include <zenhttp/httpserver.h>
#include <filesystem>
@@ -151,6 +152,7 @@ struct ZenServerOptions
bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports
bool ObjectStoreEnabled = false;
bool NoConsoleOutput = false; // Control default use of stdout for diagnostics
+ std::string Loggers[zen::logging::level::LogLevelCount];
#if ZEN_WITH_TRACE
std::string TraceHost; // Host name or IP address to send trace data to
std::string TraceFile; // Path of a file to write a trace
diff --git a/src/zenserver/diag/logging.cpp b/src/zenserver/diag/logging.cpp
index e2d57b840..dc1675819 100644
--- a/src/zenserver/diag/logging.cpp
+++ b/src/zenserver/diag/logging.cpp
@@ -42,6 +42,7 @@ InitializeServerLogging(const ZenServerOptions& InOptions)
/* max files */ 16,
/* rotate on open */ true);
auto HttpLogger = std::make_shared<spdlog::logger>("http_requests", HttpSink);
+ spdlog::apply_logger_env_levels(HttpLogger);
spdlog::register_logger(HttpLogger);
// Cache request logging
@@ -53,16 +54,19 @@ InitializeServerLogging(const ZenServerOptions& InOptions)
/* max files */ 16,
/* rotate on open */ false);
auto CacheLogger = std::make_shared<spdlog::logger>("z$", CacheSink);
+ spdlog::apply_logger_env_levels(CacheLogger);
spdlog::register_logger(CacheLogger);
// Jupiter - only log upstream HTTP traffic to file
auto JupiterLogger = std::make_shared<spdlog::logger>("jupiter", FileSink);
+ spdlog::apply_logger_env_levels(JupiterLogger);
spdlog::register_logger(JupiterLogger);
// Zen - only log upstream HTTP traffic to file
auto ZenClientLogger = std::make_shared<spdlog::logger>("zenclient", FileSink);
+ spdlog::apply_logger_env_levels(ZenClientLogger);
spdlog::register_logger(ZenClientLogger);
FinishInitializeLogging(LogOptions);
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 2430267c1..2aeb6a4d5 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -162,7 +162,7 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
// Ok so now we're configured, let's kick things off
m_Http = CreateHttpServer(ServerOptions.HttpServerConfig);
- int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort);
+ int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort, ServerOptions.DataDir);
// Setup authentication manager
{
@@ -491,7 +491,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
const asio::error_code Err = utils::ResolveHostname(m_IoContext, Dns, "8558"sv, ZenUrls);
if (Err)
{
- ZEN_ERROR("resolve FAILED, reason '{}'", Err.message());
+ ZEN_ERROR("resolve of '{}' FAILED, reason '{}'", Dns, Err.message());
}
}
}
@@ -643,6 +643,8 @@ ZenServer::Cleanup()
Flush();
+ ShutdownWorkerPools();
+
m_AdminService.reset();
m_VfsService.reset();
m_ObjStoreService.reset();
@@ -660,7 +662,6 @@ ZenServer::Cleanup()
m_AuthMgr.reset();
m_Http = {};
m_JobQueue.reset();
- ShutdownWorkerPools();
}
catch (std::exception& Ex)
{
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 918f464ac..71e306eca 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -282,10 +282,10 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations)
}
}
-std::vector<uint32_t>
-BlockStore::GetBlocksToCompact(const std::unordered_map<uint32_t, uint64_t>& BlockUsage, uint32_t BlockUsageThresholdPercent)
+BlockStore::BlockEntryCountMap
+BlockStore::GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent)
{
- std::unordered_set<uint32_t> Result;
+ BlockEntryCountMap Result;
{
RwLock::SharedLockScope InsertLock(m_InsertLock);
for (const auto& It : m_ChunkBlocks)
@@ -299,31 +299,34 @@ BlockStore::GetBlocksToCompact(const std::unordered_map<uint32_t, uint64_t>& Blo
{
continue;
}
- uint64_t BlockSize = It.second ? It.second->FileSize() : 0u;
- if (BlockSize == 0)
+
+ uint64_t UsedSize = 0;
+ uint32_t UsedCount = 0;
+ if (auto UsageIt = BlockUsage.find(BlockIndex); UsageIt != BlockUsage.end())
{
- Result.insert(BlockIndex);
- continue;
+ UsedSize = UsageIt->second.DiskUsage;
+ UsedCount = UsageIt->second.EntryCount;
}
- uint64_t UsedSize = 0;
- if (auto UsageIt = BlockUsage.find(BlockIndex); UsageIt != BlockUsage.end())
+ uint64_t BlockSize = It.second ? It.second->FileSize() : 0u;
+ if (BlockSize == 0)
{
- UsedSize = UsageIt->second;
+ Result.insert_or_assign(BlockIndex, UsedCount);
+ continue;
}
if (BlockUsageThresholdPercent == 100)
{
if (UsedSize < BlockSize)
{
- Result.insert(BlockIndex);
+ Result.insert_or_assign(BlockIndex, UsedCount);
}
}
else if (BlockUsageThresholdPercent == 0)
{
if (UsedSize == 0)
{
- Result.insert(BlockIndex);
+ Result.insert_or_assign(BlockIndex, UsedCount);
}
}
else
@@ -331,12 +334,12 @@ BlockStore::GetBlocksToCompact(const std::unordered_map<uint32_t, uint64_t>& Blo
const uint32_t UsedPercent = UsedSize < BlockSize ? gsl::narrow<uint32_t>((100 * UsedSize) / BlockSize) : 100u;
if (UsedPercent < BlockUsageThresholdPercent)
{
- Result.insert(BlockIndex);
+ Result.insert_or_assign(BlockIndex, UsedCount);
}
}
}
}
- return std::vector<uint32_t>(Result.begin(), Result.end());
+ return Result;
}
void
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 5de82f219..96ab65a5f 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -25,6 +25,9 @@
# include <zenstore/cidstore.h>
# include <algorithm>
# include <random>
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
#endif
//////////////////////////////////////////////////////////////////////////
@@ -114,8 +117,14 @@ namespace {
//////////////////////////////////////////////////////////////////////////
+static const float IndexMinLoadFactor = 0.2f;
+static const float IndexMaxLoadFactor = 0.7f;
+
CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get("containercas")), m_Gc(Gc)
{
+ m_LocationMap.min_load_factor(IndexMinLoadFactor);
+ m_LocationMap.max_load_factor(IndexMaxLoadFactor);
+
m_Gc.AddGcStorage(this);
m_Gc.AddGcReferenceStore(*this);
}
@@ -576,7 +585,7 @@ public:
if (Ctx.Settings.CollectSmallObjects)
{
- std::unordered_map<uint32_t, uint64_t> BlockUsage;
+ BlockStore::BlockUsageMap BlockUsage;
{
RwLock::SharedLockScope __(m_CasContainerStrategy.m_LocationMapLock);
if (Ctx.IsCancelledFlag.load())
@@ -591,14 +600,14 @@ public:
uint32_t BlockIndex = Loc.GetBlockIndex();
uint64_t ChunkSize = RoundUp(Loc.GetSize(), m_CasContainerStrategy.m_PayloadAlignment);
- auto It = BlockUsage.find(BlockIndex);
- if (It == BlockUsage.end())
+ if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end())
{
- BlockUsage.insert_or_assign(BlockIndex, ChunkSize);
+ It->second.EntryCount++;
+ It->second.DiskUsage += ChunkSize;
}
else
{
- It->second += ChunkSize;
+ BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1});
}
}
}
@@ -607,7 +616,7 @@ public:
BlockStoreCompactState BlockCompactState;
std::vector<IoHash> BlockCompactStateKeys;
- std::vector<uint32_t> BlocksToCompact =
+ BlockStore::BlockEntryCountMap BlocksToCompact =
m_CasContainerStrategy.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent);
BlockCompactState.IncludeBlocks(BlocksToCompact);
@@ -980,13 +989,14 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint
Entries.resize(128 * 1024 / sizeof(CasDiskIndexEntry));
uint64_t RemainingEntries = Header.EntryCount;
+ uint64_t ReadOffset = sizeof(CasDiskIndexHeader);
do
{
const uint64_t NumToRead = Min(RemainingEntries, Entries.size());
Entries.resize(NumToRead);
- ObjectIndexFile.Read(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
+ ObjectIndexFile.Read(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), ReadOffset);
std::string InvalidEntryReason;
for (const CasDiskIndexEntry& Entry : Entries)
@@ -1002,6 +1012,7 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint
}
RemainingEntries -= NumToRead;
+ ReadOffset += NumToRead * sizeof(CasDiskIndexEntry);
} while (RemainingEntries);
OutVersion = CasDiskIndexHeader::CurrentVersion;
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index aeca01dd1..5da612e30 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -128,8 +128,14 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo
//////////////////////////////////////////////////////////////////////////
+static const float IndexMinLoadFactor = 0.2f;
+static const float IndexMaxLoadFactor = 0.7f;
+
FileCasStrategy::FileCasStrategy(GcManager& Gc) : m_Log(logging::Get("filecas")), m_Gc(Gc)
{
+ m_Index.min_load_factor(IndexMinLoadFactor);
+ m_Index.max_load_factor(IndexMaxLoadFactor);
+
m_Gc.AddGcStorage(this);
m_Gc.AddGcReferenceStore(*this);
}
diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h
index cb1347580..70cd4ef5a 100644
--- a/src/zenstore/filecas.h
+++ b/src/zenstore/filecas.h
@@ -16,6 +16,10 @@
#include <atomic>
#include <functional>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
namespace zen {
class BasicFile;
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index 919684e41..786780b5e 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -132,6 +132,14 @@ public:
typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback;
typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback;
+ struct BlockUsageInfo
+ {
+ uint64_t DiskUsage;
+ uint32_t EntryCount;
+ };
+ typedef std::unordered_map<uint32_t, BlockUsageInfo> BlockUsageMap;
+ typedef std::unordered_map<uint32_t, uint32_t> BlockEntryCountMap;
+
void Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount);
struct BlockIndexSet
@@ -145,8 +153,8 @@ public:
// Ask the store to create empty blocks for all locations that does not have a block
// Remove any block that is not referenced
- void SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations);
- std::vector<uint32_t> GetBlocksToCompact(const std::unordered_map<uint32_t, uint64_t>& BlockUsage, uint32_t BlockUsageThresholdPercent);
+ void SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations);
+ BlockEntryCountMap GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent);
void Close();
@@ -205,23 +213,29 @@ class BlockStoreCompactState
public:
BlockStoreCompactState() = default;
- void IncludeBlocks(const std::span<const uint32_t> BlockIndexes)
+ void IncludeBlocks(const BlockStore::BlockEntryCountMap& BlockEntryCountMap)
{
- for (uint32_t BlockIndex : BlockIndexes)
+ size_t EntryCountTotal = 0;
+ for (auto& BlockUsageIt : BlockEntryCountMap)
{
- auto It = m_BlockIndexToChunkMapIndex.find(BlockIndex);
- if (It == m_BlockIndexToChunkMapIndex.end())
- {
- m_KeepChunks.emplace_back(std::vector<size_t>());
- m_BlockIndexToChunkMapIndex.insert_or_assign(BlockIndex, m_KeepChunks.size() - 1);
- }
+ uint32_t BlockIndex = BlockUsageIt.first;
+ ZEN_ASSERT(m_BlockIndexToChunkMapIndex.find(BlockIndex) == m_BlockIndexToChunkMapIndex.end());
+
+ m_KeepChunks.emplace_back(std::vector<size_t>());
+ m_KeepChunks.back().reserve(BlockUsageIt.second);
+ m_BlockIndexToChunkMapIndex.insert_or_assign(BlockIndex, m_KeepChunks.size() - 1);
+ EntryCountTotal += BlockUsageIt.second;
}
+ m_ChunkLocations.reserve(EntryCountTotal);
}
void IncludeBlock(uint32_t BlockIndex)
{
- const uint32_t Blocks[1] = {BlockIndex};
- IncludeBlocks(Blocks);
+ if (m_BlockIndexToChunkMapIndex.find(BlockIndex) == m_BlockIndexToChunkMapIndex.end())
+ {
+ m_KeepChunks.emplace_back(std::vector<size_t>());
+ m_BlockIndexToChunkMapIndex.insert_or_assign(BlockIndex, m_KeepChunks.size() - 1);
+ }
}
bool AddKeepLocation(const BlockStoreLocation& Location)
diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h
index 319683dcb..4c9f30608 100644
--- a/src/zenstore/include/zenstore/cidstore.h
+++ b/src/zenstore/include/zenstore/cidstore.h
@@ -9,10 +9,6 @@
#include <zenstore/hashkeyset.h>
#include <zenutil/statsreporter.h>
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <tsl/robin_map.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
#include <filesystem>
namespace zen {
diff --git a/src/zenutil/logging.cpp b/src/zenutil/logging.cpp
index d0a6ac0b4..d82789e42 100644
--- a/src/zenutil/logging.cpp
+++ b/src/zenutil/logging.cpp
@@ -12,6 +12,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include <zencore/compactbinary.h>
#include <zencore/filesystem.h>
+#include <zencore/logging.h>
#include <zencore/string.h>
#include <zenutil/logging/fullformatter.h>
#include <zenutil/logging/jsonformatter.h>
@@ -152,21 +153,21 @@ BeginInitializeLogging(const LoggingOptions& LogOptions)
void
FinishInitializeLogging(const LoggingOptions& LogOptions)
{
- spdlog::level::level_enum LogLevel = spdlog::level::info;
+ logging::level::LogLevel LogLevel = logging::level::Info;
if (LogOptions.IsDebug)
{
- LogLevel = spdlog::level::debug;
+ LogLevel = logging::level::Debug;
}
if (LogOptions.IsTest)
{
- LogLevel = spdlog::level::trace;
+ LogLevel = logging::level::Trace;
}
// Configure all registered loggers according to settings
- spdlog::set_level(LogLevel);
+ logging::RefreshLogLevels(LogLevel);
spdlog::flush_on(spdlog::level::err);
spdlog::flush_every(std::chrono::seconds{2});
spdlog::set_formatter(std::make_unique<logging::full_formatter>(LogOptions.LogId, std::chrono::system_clock::now()));
diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp
index b511b0c5c..3ae302064 100644
--- a/src/zenutil/workerpools.cpp
+++ b/src/zenutil/workerpools.cpp
@@ -14,6 +14,8 @@ namespace {
const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency());
const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 1u));
+ static bool IsShutDown = false;
+
RwLock PoolLock;
std::unique_ptr<WorkerThreadPool> LargeWorkerPool;
@@ -32,6 +34,7 @@ GetLargeWorkerPool()
}
}
RwLock::ExclusiveLockScope _(PoolLock);
+ ZEN_ASSERT(!IsShutDown);
if (LargeWorkerPool)
{
return *LargeWorkerPool;
@@ -51,6 +54,7 @@ GetSmallWorkerPool()
}
}
RwLock::ExclusiveLockScope _(PoolLock);
+ ZEN_ASSERT(!IsShutDown);
if (SmallWorkerPool)
{
return *SmallWorkerPool;
@@ -70,6 +74,7 @@ GetSyncWorkerPool()
}
}
RwLock::ExclusiveLockScope _(PoolLock);
+ ZEN_ASSERT(!IsShutDown);
if (SyncWorkerPool)
{
return *SyncWorkerPool;
@@ -82,6 +87,7 @@ void
ShutdownWorkerPools()
{
RwLock::ExclusiveLockScope _(PoolLock);
+ IsShutDown = true;
LargeWorkerPool.reset();
SmallWorkerPool.reset();
SyncWorkerPool.reset();