From 4b97a66d2ea8e75bcf8a93b321514e9050a9ecdd Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 12 Oct 2023 10:17:17 +0200 Subject: adjust resource usage for dedicated servers (#466) when dedicated mode is enabled via `--dedicated` or `server.dedicated` then we tune http.sys server settings to be more suitable for a shared server initially we tune two things * the thread pool used to service I/O requests allows a larger number of threads to be created when needed. The minimum thread count is unchanged but in dedicated server mode we double the maximum number of threads allowed * the http.sys request queue length (`HttpServerQueueLengthProperty`) is increased to 50,000 in dedicated mode. The regular default is 1,000 --- src/zenhttp/httpserver.cpp | 3 +- src/zenhttp/httpsys.cpp | 82 ++++++++++++++++++++++++++------ src/zenhttp/httpsys.h | 1 + src/zenhttp/include/zenhttp/httpserver.h | 3 +- src/zenhttp/iothreadpool.cpp | 11 +++-- src/zenhttp/iothreadpool.h | 8 +--- src/zenserver/config.cpp | 2 + 7 files changed, 84 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp index a2ea4cff8..eb13f77fc 100644 --- a/src/zenhttp/httpserver.cpp +++ b/src/zenhttp/httpserver.cpp @@ -792,7 +792,8 @@ CreateHttpServer(const HttpServerConfig& Config) return Ref(CreateHttpSysServer({.ThreadCount = Config.ThreadCount, .AsyncWorkThreadCount = Config.HttpSys.AsyncWorkThreadCount, .IsAsyncResponseEnabled = Config.HttpSys.IsAsyncResponseEnabled, - .IsRequestLoggingEnabled = Config.HttpSys.IsRequestLoggingEnabled})); + .IsRequestLoggingEnabled = Config.HttpSys.IsRequestLoggingEnabled, + .IsDedicatedServer = Config.IsDedicatedServer})); #endif case HttpServerClass::kHttpNull: diff --git a/src/zenhttp/httpsys.cpp b/src/zenhttp/httpsys.cpp index 8401dcf83..75d8bfe7a 100644 --- a/src/zenhttp/httpsys.cpp +++ b/src/zenhttp/httpsys.cpp @@ -32,7 +32,10 @@ namespace zen { /** * @brief Windows implementation of HTTP server based on http.sys * - * This requires elevation to function + * This requires elevation to function by default but system configuration + * can soften this requirement. + * + * See README.md for details. */ class HttpSysServer : public HttpServer { @@ -76,7 +79,8 @@ private: bool m_IsRequestLoggingEnabled = false; bool m_IsAsyncResponseEnabled = true; - WinIoThreadPool m_ThreadPool; + std::unique_ptr m_IoThreadPool; + RwLock m_AsyncWorkPoolInitLock; WorkerThreadPool* m_AsyncWorkPool = nullptr; @@ -89,7 +93,7 @@ private: int32_t m_MinPendingRequests = 16; int32_t m_MaxPendingRequests = 128; Event m_ShutdownEvent; - const HttpSysConfig m_InitialConfig; + HttpSysConfig m_InitialConfig; }; } // namespace zen @@ -889,14 +893,45 @@ HttpAsyncWorkRequest::AsyncWorkItem::Execute() \/ \/ \/ */ -HttpSysServer::HttpSysServer(const HttpSysConfig& Config) +HttpSysServer::HttpSysServer(const HttpSysConfig& InConfig) : m_Log(logging::Get("http")) , m_RequestLog(logging::Get("http_requests")) -, m_IsRequestLoggingEnabled(Config.IsRequestLoggingEnabled) -, m_IsAsyncResponseEnabled(Config.IsAsyncResponseEnabled) -, m_ThreadPool(Config.ThreadCount != 0 ? Config.ThreadCount : std::thread::hardware_concurrency()) -, m_InitialConfig(Config) +, m_IsRequestLoggingEnabled(InConfig.IsRequestLoggingEnabled) +, m_IsAsyncResponseEnabled(InConfig.IsAsyncResponseEnabled) +, m_InitialConfig(InConfig) { + // Initialize thread pool + + int MinThreadCount; + int MaxThreadCount; + + if (m_InitialConfig.ThreadCount == 0) + { + MinThreadCount = Max(8u, std::thread::hardware_concurrency()); + } + else + { + MinThreadCount = m_InitialConfig.ThreadCount; + } + + MaxThreadCount = MinThreadCount * 2; + + if (m_InitialConfig.IsDedicatedServer) + { + // In order to limit the potential impact of threads stuck + // in locks we allow the thread pool to be oversubscribed + // by a fair amount + + MaxThreadCount *= 2; + } + + m_IoThreadPool = std::make_unique(MinThreadCount, MaxThreadCount); + + if (m_InitialConfig.AsyncWorkThreadCount == 0) + { + m_InitialConfig.AsyncWorkThreadCount = 16; + } + ULONG Result = HttpInitialize(HTTPAPI_VERSION_2, HTTP_INITIALIZE_SERVER, nullptr); if (Result != NO_ERROR) @@ -907,7 +942,11 @@ HttpSysServer::HttpSysServer(const HttpSysConfig& Config) m_IsHttpInitialized = true; m_IsOk = true; - ZEN_INFO("http.sys server started, using {} I/O threads and {} async worker threads", Config.ThreadCount, Config.AsyncWorkThreadCount); + ZEN_INFO("http.sys server started in {} mode, using {}-{} I/O threads and {} async worker threads", + m_InitialConfig.IsDedicatedServer ? "DEDICATED" : "NORMAL", + MinThreadCount, + MaxThreadCount, + m_InitialConfig.AsyncWorkThreadCount); } HttpSysServer::~HttpSysServer() @@ -1070,10 +1109,27 @@ HttpSysServer::InitializeServer(int BasePort) 0); } + // Tune the maximum number of pending requests in the http.sys request queue. By default + // the value is 1000 which is plenty for single user machines but for dedicated servers + // serving many users it makes sense to increase this to a higher number to help smooth + // out intermittent stalls like we might experience when GC is triggered + + if (m_InitialConfig.IsDedicatedServer) + { + ULONG QueueLength = 50000; + + Result = HttpSetRequestQueueProperty(m_RequestQueueHandle, HttpServerQueueLengthProperty, &QueueLength, sizeof QueueLength, 0, 0); + + if (Result != NO_ERROR) + { + ZEN_WARN("changing request queue length to {} failed: {}", QueueLength, Result); + } + } + // Create I/O completion port std::error_code ErrorCode; - m_ThreadPool.CreateIocp(m_RequestQueueHandle, HttpSysTransaction::IoCompletionCallback, /* Context */ this, /* out */ ErrorCode); + m_IoThreadPool->CreateIocp(m_RequestQueueHandle, HttpSysTransaction::IoCompletionCallback, /* Context */ this, /* out */ ErrorCode); if (ErrorCode) { @@ -1137,9 +1193,7 @@ HttpSysServer::WorkPool() if (!m_AsyncWorkPool) { - unsigned int WorkerThreadCount = m_InitialConfig.AsyncWorkThreadCount != 0 ? m_InitialConfig.AsyncWorkThreadCount : 16; - - m_AsyncWorkPool = new WorkerThreadPool(WorkerThreadCount, "http_async"); + m_AsyncWorkPool = new WorkerThreadPool(m_InitialConfig.AsyncWorkThreadCount, "http_async"); } } @@ -1305,7 +1359,7 @@ HttpSysTransaction::~HttpSysTransaction() PTP_IO HttpSysTransaction::Iocp() { - return m_HttpServer.m_ThreadPool.Iocp(); + return m_HttpServer.m_IoThreadPool->Iocp(); } HANDLE diff --git a/src/zenhttp/httpsys.h b/src/zenhttp/httpsys.h index 1553d56ef..6a6b16525 100644 --- a/src/zenhttp/httpsys.h +++ b/src/zenhttp/httpsys.h @@ -20,6 +20,7 @@ struct HttpSysConfig unsigned int AsyncWorkThreadCount = 0; bool IsAsyncResponseEnabled = true; bool IsRequestLoggingEnabled = false; + bool IsDedicatedServer = false; }; Ref CreateHttpSysServer(HttpSysConfig Config); diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h index c233075be..3cbe05dd6 100644 --- a/src/zenhttp/include/zenhttp/httpserver.h +++ b/src/zenhttp/include/zenhttp/httpserver.h @@ -184,7 +184,8 @@ public: struct HttpServerConfig { - std::string ServerClass; // Choice of HTTP server implementation + bool IsDedicatedServer = false; // Should be set to true for shared servers + std::string ServerClass; // Choice of HTTP server implementation unsigned int ThreadCount = 0; struct diff --git a/src/zenhttp/iothreadpool.cpp b/src/zenhttp/iothreadpool.cpp index 6087e69ec..da4b42e28 100644 --- a/src/zenhttp/iothreadpool.cpp +++ b/src/zenhttp/iothreadpool.cpp @@ -8,14 +8,19 @@ namespace zen { -WinIoThreadPool::WinIoThreadPool(int InThreadCount) +WinIoThreadPool::WinIoThreadPool(int InThreadCount, int InMaxThreadCount) { - // Thread pool setup + ZEN_ASSERT(InThreadCount); + + if (InMaxThreadCount < InThreadCount) + { + InMaxThreadCount = InThreadCount; + } m_ThreadPool = CreateThreadpool(NULL); SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount); - SetThreadpoolThreadMaximum(m_ThreadPool, InThreadCount * 2); + SetThreadpoolThreadMaximum(m_ThreadPool, InMaxThreadCount); InitializeThreadpoolEnvironment(&m_CallbackEnvironment); diff --git a/src/zenhttp/iothreadpool.h b/src/zenhttp/iothreadpool.h index 8333964c3..e75e95e58 100644 --- a/src/zenhttp/iothreadpool.h +++ b/src/zenhttp/iothreadpool.h @@ -11,16 +11,10 @@ namespace zen { -////////////////////////////////////////////////////////////////////////// -// -// Thread pool. Implemented in terms of Windows thread pool right now, will -// need a cross-platform implementation eventually -// - class WinIoThreadPool { public: - WinIoThreadPool(int InThreadCount); + WinIoThreadPool(int InThreadCount, int InMaxThreadCount); ~WinIoThreadPool(); void CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode); diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index 85db7bade..cdd1fb031 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -1353,6 +1353,8 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) { ServerOptions.AbsLogFile = ServerOptions.DataDir / "logs" / "zenserver.log"; } + + ServerOptions.HttpServerConfig.IsDedicatedServer = ServerOptions.IsDedicated; } } // namespace zen -- cgit v1.2.3 From 6ab44e95d77f711e43f9e1465facd16082442c6a Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 12 Oct 2023 10:18:53 +0200 Subject: skip lightweight GC if full GC is due soon (#467) GC will now skip a lightweight GC if a full GC is due to run within the next lightweight GC interval also fixed some minor typos --- src/zenstore/gc.cpp | 50 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index bddc3a42a..0f3f178d6 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -793,7 +793,7 @@ GcScheduler::SchedulerThread() bool DoDelete = true; bool CollectSmallObjects = m_Config.CollectSmallObjects; std::chrono::seconds GcInterval = m_Config.Interval; - std::chrono::seconds LightwightGcInterval = m_Config.LightweightInterval; + std::chrono::seconds LightweightGcInterval = m_Config.LightweightInterval; std::chrono::seconds MaxCacheDuration = m_Config.MaxCacheDuration; std::chrono::seconds MaxProjectStoreDuration = m_Config.MaxProjectStoreDuration; uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit; @@ -914,38 +914,48 @@ GcScheduler::SchedulerThread() } } - std::chrono::seconds RemaingTimeUntilGc = + std::chrono::seconds RemainingTimeUntilGc = GcInterval.count() == 0 ? std::chrono::seconds::max() : std::chrono::duration_cast(m_LastGcTime + GcInterval - GcClock::Now()); - if (RemaingTimeUntilGc < std::chrono::seconds::zero()) + + if (RemainingTimeUntilGc < std::chrono::seconds::zero()) + { + RemainingTimeUntilGc = std::chrono::seconds::zero(); + } + + std::chrono::seconds RemainingTimeUntilLightweightGc = + LightweightGcInterval.count() == 0 ? std::chrono::seconds::max() + : std::chrono::duration_cast( + m_LastLightweightGcTime + LightweightGcInterval - GcClock::Now()); + + if (RemainingTimeUntilLightweightGc < std::chrono::seconds::zero()) { - RemaingTimeUntilGc = std::chrono::seconds::zero(); + RemainingTimeUntilLightweightGc = std::chrono::seconds::zero(); } - std::chrono::seconds RemaingTimeUntilLightweightGc = - LightwightGcInterval.count() == 0 - ? std::chrono::seconds::max() - : std::chrono::duration_cast(m_LastLightweightGcTime + LightwightGcInterval - GcClock::Now()); - if (RemaingTimeUntilLightweightGc < std::chrono::seconds::zero()) + + // Don't schedule a lightweight GC if a full GC is + // due quite soon anyway + if (RemainingTimeUntilGc < LightweightGcInterval) { - RemaingTimeUntilLightweightGc = std::chrono::seconds::zero(); + RemainingTimeUntilLightweightGc = RemainingTimeUntilGc; } if (GcDiskSpaceGoal > 0) { DiskSpaceGCTriggered = true; } - else if (RemaingTimeUntilGc.count() == 0) + else if (RemainingTimeUntilGc.count() == 0) { TimeBasedGCTriggered = true; } - else if (RemaingTimeUntilLightweightGc.count() == 0) + else if (RemainingTimeUntilLightweightGc.count() == 0) { TimeBasedGCTriggered = true; SkipCid = true; } std::string NextTriggerStatus; - if (GcInterval.count() != 0 || LightwightGcInterval.count() != 0 || DiskSizeSoftLimit != 0) + if (GcInterval.count() != 0 || LightweightGcInterval.count() != 0 || DiskSizeSoftLimit != 0) { ExtendableStringBuilder<256> Sb; if (DiskSpaceGCTriggered) @@ -970,13 +980,13 @@ GcScheduler::SchedulerThread() if (GcInterval.count() != 0) { Sb.Append(fmt::format(" Full GC in {}.", - NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(RemaingTimeUntilGc).count())))); + NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(RemainingTimeUntilGc).count())))); } - if (LightwightGcInterval.count() != 0) + if (LightweightGcInterval.count() != 0) { Sb.Append( fmt::format(" Lightweight GC in {}.", - NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(RemaingTimeUntilLightweightGc).count())))); + NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(RemainingTimeUntilLightweightGc).count())))); } if (DiskSizeSoftLimit != 0 && DiskSizeSoftLimit > TotalSize.DiskSize) { @@ -1002,13 +1012,13 @@ GcScheduler::SchedulerThread() if (!DiskSpaceGCTriggered && !TimeBasedGCTriggered) { WaitTime = m_Config.MonitorInterval; - if (RemaingTimeUntilGc < WaitTime) + if (RemainingTimeUntilGc < WaitTime) { - WaitTime = RemaingTimeUntilGc; + WaitTime = RemainingTimeUntilGc; } - if (RemaingTimeUntilLightweightGc < WaitTime) + if (RemainingTimeUntilLightweightGc < WaitTime) { - WaitTime = RemaingTimeUntilLightweightGc; + WaitTime = RemainingTimeUntilLightweightGc; } continue; } -- cgit v1.2.3 From 767b66c7c974877e994e6574705a623be3c14a08 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 12 Oct 2023 13:44:48 +0200 Subject: added logging utility functions (from sb/proto) (#469) these allow standalone programs to share much of the logging setup from zenserver --- src/zenutil/include/zenutil/logging.h | 33 +++ src/zenutil/logging.cpp | 516 ++++++++++++++++++++++++++++++++++ 2 files changed, 549 insertions(+) create mode 100644 src/zenutil/include/zenutil/logging.h create mode 100644 src/zenutil/logging.cpp (limited to 'src') diff --git a/src/zenutil/include/zenutil/logging.h b/src/zenutil/include/zenutil/logging.h new file mode 100644 index 000000000..6f408f512 --- /dev/null +++ b/src/zenutil/include/zenutil/logging.h @@ -0,0 +1,33 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include + +#include +#include + +////////////////////////////////////////////////////////////////////////// +// +// Logging utilities +// +// These functions extend the basic logging functionality by setting up +// console and file logging, as well as colored output where available, +// for sharing across different executables +// + +namespace zen { + +struct LoggingOptions +{ + bool IsDebug = false; + bool IsVerbose = false; + bool IsTest = false; + std::filesystem::path AbsLogFile; // Absolute path to main log file + std::string LogId; +}; + +void InitializeLogging(const LoggingOptions& LoggingOptions); +void ShutdownLogging(); + +} // namespace zen diff --git a/src/zenutil/logging.cpp b/src/zenutil/logging.cpp new file mode 100644 index 000000000..c0a63cc71 --- /dev/null +++ b/src/zenutil/logging.cpp @@ -0,0 +1,516 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenutil/logging.h" + +#undef ZEN_USE_SENTRY +#define ZEN_USE_SENTRY 0 + +#if !defined(ZEN_USE_SENTRY) +# if ZEN_PLATFORM_MAC && ZEN_ARCH_ARM64 +// vcpkg's sentry-native port does not support Arm on Mac. +# define ZEN_USE_SENTRY 0 +# else +# define ZEN_USE_SENTRY 1 +# endif +#endif + +ZEN_THIRD_PARTY_INCLUDES_START +#include +#include +#include +#include +#include +#include +#include +#include +#include +#if ZEN_USE_SENTRY +# include +#endif +ZEN_THIRD_PARTY_INCLUDES_END + +#include +#include + +#include +#include + +// Custom logging -- test code, this should be tweaked + +namespace zen::logging { + +using namespace spdlog; +using namespace spdlog::details; +using namespace std::literals; + +class full_formatter final : public spdlog::formatter +{ +public: + full_formatter(std::string_view LogId, std::chrono::time_point Epoch) : m_Epoch(Epoch), m_LogId(LogId) {} + + virtual std::unique_ptr clone() const override { return std::make_unique(m_LogId, m_Epoch); } + + static constexpr bool UseDate = false; + + virtual void format(const details::log_msg& msg, memory_buf_t& dest) override + { + using std::chrono::duration_cast; + using std::chrono::milliseconds; + using std::chrono::seconds; + + if constexpr (UseDate) + { + auto secs = std::chrono::duration_cast(msg.time.time_since_epoch()); + if (secs != m_LastLogSecs) + { + m_CachedTm = os::localtime(log_clock::to_time_t(msg.time)); + m_LastLogSecs = secs; + } + } + + const auto& tm_time = m_CachedTm; + + // cache the date/time part for the next second. + auto duration = msg.time - m_Epoch; + auto secs = duration_cast(duration); + + if (m_CacheTimestamp != secs || m_CachedDatetime.size() == 0) + { + m_CachedDatetime.clear(); + m_CachedDatetime.push_back('['); + + if constexpr (UseDate) + { + fmt_helper::append_int(tm_time.tm_year + 1900, m_CachedDatetime); + m_CachedDatetime.push_back('-'); + + fmt_helper::pad2(tm_time.tm_mon + 1, m_CachedDatetime); + m_CachedDatetime.push_back('-'); + + fmt_helper::pad2(tm_time.tm_mday, m_CachedDatetime); + m_CachedDatetime.push_back(' '); + + fmt_helper::pad2(tm_time.tm_hour, m_CachedDatetime); + m_CachedDatetime.push_back(':'); + + fmt_helper::pad2(tm_time.tm_min, m_CachedDatetime); + m_CachedDatetime.push_back(':'); + + fmt_helper::pad2(tm_time.tm_sec, m_CachedDatetime); + } + else + { + int Count = int(secs.count()); + + const int LogSecs = Count % 60; + Count /= 60; + + const int LogMins = Count % 60; + Count /= 60; + + const int LogHours = Count; + + fmt_helper::pad2(LogHours, m_CachedDatetime); + m_CachedDatetime.push_back(':'); + fmt_helper::pad2(LogMins, m_CachedDatetime); + m_CachedDatetime.push_back(':'); + fmt_helper::pad2(LogSecs, m_CachedDatetime); + } + + m_CachedDatetime.push_back('.'); + + m_CacheTimestamp = secs; + } + + dest.append(m_CachedDatetime.begin(), m_CachedDatetime.end()); + + auto millis = fmt_helper::time_fraction(msg.time); + fmt_helper::pad3(static_cast(millis.count()), dest); + dest.push_back(']'); + dest.push_back(' '); + + if (!m_LogId.empty()) + { + dest.push_back('['); + fmt_helper::append_string_view(m_LogId, dest); + dest.push_back(']'); + dest.push_back(' '); + } + + // append logger name if exists + if (msg.logger_name.size() > 0) + { + dest.push_back('['); + fmt_helper::append_string_view(msg.logger_name, dest); + dest.push_back(']'); + dest.push_back(' '); + } + + dest.push_back('['); + // wrap the level name with color + msg.color_range_start = dest.size(); + fmt_helper::append_string_view(level::to_string_view(msg.level), dest); + msg.color_range_end = dest.size(); + dest.push_back(']'); + dest.push_back(' '); + + // add source location if present + if (!msg.source.empty()) + { + dest.push_back('['); + const char* filename = details::short_filename_formatter::basename(msg.source.filename); + fmt_helper::append_string_view(filename, dest); + dest.push_back(':'); + fmt_helper::append_int(msg.source.line, dest); + dest.push_back(']'); + dest.push_back(' '); + } + + fmt_helper::append_string_view(msg.payload, dest); + fmt_helper::append_string_view("\n"sv, dest); + } + +private: + std::chrono::time_point m_Epoch; + std::tm m_CachedTm; + std::chrono::seconds m_LastLogSecs; + std::chrono::seconds m_CacheTimestamp{0}; + memory_buf_t m_CachedDatetime; + std::string m_LogId; +}; + +class json_formatter final : public spdlog::formatter +{ +public: + json_formatter(std::string_view LogId) : m_LogId(LogId) {} + + virtual std::unique_ptr clone() const override { return std::make_unique(m_LogId); } + + virtual void format(const details::log_msg& msg, memory_buf_t& dest) override + { + using std::chrono::duration_cast; + using std::chrono::milliseconds; + using std::chrono::seconds; + + auto secs = std::chrono::duration_cast(msg.time.time_since_epoch()); + if (secs != m_LastLogSecs) + { + m_CachedTm = os::localtime(log_clock::to_time_t(msg.time)); + m_LastLogSecs = secs; + } + + const auto& tm_time = m_CachedTm; + + // cache the date/time part for the next second. + + if (m_CacheTimestamp != secs || m_CachedDatetime.size() == 0) + { + m_CachedDatetime.clear(); + + fmt_helper::append_int(tm_time.tm_year + 1900, m_CachedDatetime); + m_CachedDatetime.push_back('-'); + + fmt_helper::pad2(tm_time.tm_mon + 1, m_CachedDatetime); + m_CachedDatetime.push_back('-'); + + fmt_helper::pad2(tm_time.tm_mday, m_CachedDatetime); + m_CachedDatetime.push_back(' '); + + fmt_helper::pad2(tm_time.tm_hour, m_CachedDatetime); + m_CachedDatetime.push_back(':'); + + fmt_helper::pad2(tm_time.tm_min, m_CachedDatetime); + m_CachedDatetime.push_back(':'); + + fmt_helper::pad2(tm_time.tm_sec, m_CachedDatetime); + + m_CachedDatetime.push_back('.'); + + m_CacheTimestamp = secs; + } + dest.append("{"sv); + dest.append("\"time\": \""sv); + dest.append(m_CachedDatetime.begin(), m_CachedDatetime.end()); + auto millis = fmt_helper::time_fraction(msg.time); + fmt_helper::pad3(static_cast(millis.count()), dest); + dest.append("\", "sv); + + dest.append("\"status\": \""sv); + dest.append(level::to_string_view(msg.level)); + dest.append("\", "sv); + + dest.append("\"source\": \""sv); + dest.append("zenserver"sv); + dest.append("\", "sv); + + dest.append("\"service\": \""sv); + dest.append("zencache"sv); + dest.append("\", "sv); + + if (!m_LogId.empty()) + { + dest.append("\"id\": \""sv); + dest.append(m_LogId); + dest.append("\", "sv); + } + + if (msg.logger_name.size() > 0) + { + dest.append("\"logger.name\": \""sv); + dest.append(msg.logger_name); + dest.append("\", "sv); + } + + if (msg.thread_id != 0) + { + dest.append("\"logger.thread_name\": \""sv); + fmt_helper::pad_uint(msg.thread_id, 0, dest); + dest.append("\", "sv); + } + + if (!msg.source.empty()) + { + dest.append("\"file\": \""sv); + WriteEscapedString(dest, details::short_filename_formatter::basename(msg.source.filename)); + dest.append("\","sv); + + dest.append("\"line\": \""sv); + dest.append(fmt::format("{}", msg.source.line)); + dest.append("\","sv); + + dest.append("\"logger.method_name\": \""sv); + WriteEscapedString(dest, msg.source.funcname); + dest.append("\", "sv); + } + + dest.append("\"message\": \""sv); + WriteEscapedString(dest, msg.payload); + dest.append("\""sv); + + dest.append("}\n"sv); + } + +private: + static inline const std::unordered_map SpecialCharacterMap{{'\b', "\\b"sv}, + {'\f', "\\f"sv}, + {'\n', "\\n"sv}, + {'\r', "\\r"sv}, + {'\t', "\\t"sv}, + {'"', "\\\""sv}, + {'\\', "\\\\"sv}}; + + static void WriteEscapedString(memory_buf_t& dest, const spdlog::string_view_t& payload) + { + const char* RangeStart = payload.begin(); + for (const char* It = RangeStart; It != payload.end(); ++It) + { + if (auto SpecialIt = SpecialCharacterMap.find(*It); SpecialIt != SpecialCharacterMap.end()) + { + if (RangeStart != It) + { + dest.append(RangeStart, It); + } + dest.append(SpecialIt->second); + RangeStart = It + 1; + } + } + if (RangeStart != payload.end()) + { + dest.append(RangeStart, payload.end()); + } + }; + + std::tm m_CachedTm{0, 0, 0, 0, 0, 0, 0, 0, 0}; + std::chrono::seconds m_LastLogSecs{0}; + std::chrono::seconds m_CacheTimestamp{0}; + memory_buf_t m_CachedDatetime; + std::string m_LogId; +}; +} // namespace zen::logging + +bool +EnableVTMode() +{ +#if ZEN_PLATFORM_WINDOWS + // Set output mode to handle virtual terminal sequences + HANDLE hOut = GetStdHandle(STD_OUTPUT_HANDLE); + if (hOut == INVALID_HANDLE_VALUE) + { + return false; + } + + DWORD dwMode = 0; + if (!GetConsoleMode(hOut, &dwMode)) + { + return false; + } + + dwMode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING; + if (!SetConsoleMode(hOut, dwMode)) + { + return false; + } +#endif + + return true; +} + +#if ZEN_USE_SENTRY + +class sentry_sink final : public spdlog::sinks::base_sink +{ +public: + sentry_sink() {} + +protected: + static constexpr sentry_level_t MapToSentryLevel[spdlog::level::level_enum::n_levels] = {SENTRY_LEVEL_DEBUG, + SENTRY_LEVEL_DEBUG, + SENTRY_LEVEL_INFO, + SENTRY_LEVEL_WARNING, + SENTRY_LEVEL_ERROR, + SENTRY_LEVEL_FATAL, + SENTRY_LEVEL_DEBUG}; + + void sink_it_(const spdlog::details::log_msg& msg) override + { + if (msg.level >= SPDLOG_LEVEL_ERROR && msg.level <= SPDLOG_LEVEL_CRITICAL) + { + sentry_value_t event = sentry_value_new_message_event( + /* level */ MapToSentryLevel[msg.level], + /* logger */ nullptr, + /* message */ std::string(msg.payload.data(), msg.payload.size()).c_str()); + sentry_event_value_add_stacktrace(event, NULL, 0); + sentry_capture_event(event); + } + } + void flush_() override {} +}; +#endif + +namespace zen { + +void +InitializeLogging(const LoggingOptions& LogOptions) +{ + zen::logging::InitializeLogging(); + + EnableVTMode(); + + bool IsAsync = true; + spdlog::level::level_enum LogLevel = spdlog::level::info; + + if (LogOptions.IsDebug) + { + LogLevel = spdlog::level::debug; + IsAsync = false; + } + + if (LogOptions.IsTest) + { + LogLevel = spdlog::level::trace; + IsAsync = false; + } + + if (IsAsync) + { + const int QueueSize = 8192; + const int ThreadCount = 1; + spdlog::init_thread_pool(QueueSize, ThreadCount); + + auto AsyncLogger = spdlog::create_async("main"); + zen::logging::SetDefault(AsyncLogger); + } + + // Sinks + + auto ConsoleSink = std::make_shared(); + + spdlog::sink_ptr FileSink; + spdlog::sink_ptr SentrySink; + + // spdlog can't create directories that starts with `\\?\` so we make sure the folder exists before creating the logger instance + + if (!LogOptions.AbsLogFile.empty()) + { + if (LogOptions.AbsLogFile.has_parent_path()) + { + zen::CreateDirectories(LogOptions.AbsLogFile.parent_path()); + } + +#if 0 + FileSink = std::make_shared( zen::PathToUtf8(GlobalOptions.AbsLogFile), + 0, + 0, + /* truncate */ false, + uint16_t(/* max files */ 14)); +#else + FileSink = std::make_shared(zen::PathToUtf8(LogOptions.AbsLogFile), + /* max size */ 128 * 1024 * 1024, + /* max files */ 16, + /* rotate on open */ true); +#endif + } + +#if ZEN_USE_SENTRY + SentrySink = std::make_shared(); +#endif + + std::set_terminate([]() { ZEN_CRITICAL("Program exited abnormally via std::terminate()"); }); + + // Default + + auto& DefaultLogger = zen::logging::Default(); + auto& Sinks = DefaultLogger.sinks(); + + Sinks.clear(); + Sinks.push_back(ConsoleSink); + + if (FileSink) + { + Sinks.push_back(FileSink); + } + + if (SentrySink) + { + Sinks.push_back(SentrySink); + } + +#if ZEN_PLATFORM_WINDOWS + if (zen::IsDebuggerPresent() && LogOptions.IsDebug) + { + auto DebugSink = std::make_shared(); + DebugSink->set_level(spdlog::level::debug); + Sinks.push_back(DebugSink); + } +#endif + + // Configure all registered loggers according to settings + + spdlog::set_level(LogLevel); + spdlog::flush_on(spdlog::level::err); + spdlog::flush_every(std::chrono::seconds{2}); + spdlog::set_formatter(std::make_unique(LogOptions.LogId, std::chrono::system_clock::now())); + + if (FileSink) + { + if (LogOptions.AbsLogFile.extension() == ".json") + { + FileSink->set_formatter(std::make_unique(LogOptions.LogId)); + } + else + { + FileSink->set_pattern("[%C-%m-%d.%e %T] [%n] [%l] %v"); + } + } + DefaultLogger.info("log starting at {:%FT%T%z}", std::chrono::system_clock::now()); +} + +void +ShutdownLogging() +{ + auto& DefaultLogger = zen::logging::Default(); + DefaultLogger.info("log ending at {:%FT%T%z}", std::chrono::system_clock::now()); + zen::logging::ShutdownLogging(); +} + +} // namespace zen -- cgit v1.2.3 From 91c43f3f3790302639d57a717f996ee133deb523 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 12 Oct 2023 13:45:43 +0200 Subject: restructured transports SDK for easier UE integration (#470) --- src/plugins/include/transportplugin.h | 111 ------- src/plugins/winsock/winsock.cpp | 352 --------------------- src/plugins/winsock/xmake.lua | 14 - src/plugins/xmake.lua | 7 - .../transport-sdk/include/transportplugin.h | 111 +++++++ src/transports/transport-sdk/xmake.lua | 7 + src/transports/winsock/winsock.cpp | 352 +++++++++++++++++++++ src/transports/winsock/xmake.lua | 14 + src/transports/xmake.lua | 10 + src/zenhttp/xmake.lua | 2 +- 10 files changed, 495 insertions(+), 485 deletions(-) delete mode 100644 src/plugins/include/transportplugin.h delete mode 100644 src/plugins/winsock/winsock.cpp delete mode 100644 src/plugins/winsock/xmake.lua delete mode 100644 src/plugins/xmake.lua create mode 100644 src/transports/transport-sdk/include/transportplugin.h create mode 100644 src/transports/transport-sdk/xmake.lua create mode 100644 src/transports/winsock/winsock.cpp create mode 100644 src/transports/winsock/xmake.lua create mode 100644 src/transports/xmake.lua (limited to 'src') diff --git a/src/plugins/include/transportplugin.h b/src/plugins/include/transportplugin.h deleted file mode 100644 index aee5b2e7a..000000000 --- a/src/plugins/include/transportplugin.h +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include - -// Important note: this header is meant to compile standalone -// and should therefore not depend on anything from the Zen codebase - -namespace zen { - -class TransportConnection; -class TransportPlugin; -class TransportServerConnection; -class TransportServer; - -/************************************************************************* - - The following interfaces are implemented on the server side, and instances - are provided to the plugins. - -*************************************************************************/ - -/** Plugin-server interface for connection - - This is returned by a call to TransportServer::CreateConnectionHandler - and there should be one instance created per established connection - - The plugin uses this interface to feed data into the server side - protocol implementation which will parse the incoming messages and - dispatch to appropriate request handlers and ultimately call into - TransportConnection functions which write data back to the client - */ -class TransportServerConnection -{ -public: - virtual uint32_t AddRef() const = 0; - virtual uint32_t Release() const = 0; - virtual void OnBytesRead(const void* Buffer, size_t DataSize) = 0; -}; - -/** Server interface - - There will be one instance of this provided by the system to the transport plugin - - The plugin can use this to register new connections - - */ -class TransportServer -{ -public: - virtual TransportServerConnection* CreateConnectionHandler(TransportConnection* Connection) = 0; -}; - -/************************************************************************* - - The following interfaces are to be implemented by transport plugins. - -*************************************************************************/ - -/** Interface which needs to be implemented by a transport plugin - - This is responsible for setting up and running the communication - for a given transport. - - Once initialized, the plugin should be ready to accept connections - using its own execution resources (threads, thread pools etc) - */ -class TransportPlugin -{ -public: - virtual uint32_t AddRef() const = 0; - virtual uint32_t Release() const = 0; - virtual void Initialize(TransportServer* ServerInterface) = 0; - virtual void Shutdown() = 0; - - /** Check whether this transport is usable. - */ - virtual bool IsAvailable() = 0; -}; - -/** A transport plugin provider needs to implement this interface - - The plugin should create one instance of this per established - connection and register it with the TransportServer instance - CreateConnectionHandler() function. The server will subsequently - use this interface to write response data back to the client and - to manage the connection life cycle in general -*/ -class TransportConnection -{ -public: - virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) = 0; - virtual void Shutdown(bool Receive, bool Transmit) = 0; - virtual void CloseConnection() = 0; -}; - -} // namespace zen - -#if defined(_MSC_VER) -# define DLL_TRANSPORT_API __declspec(dllexport) -#else -# define DLL_TRANSPORT_API -#endif - -extern "C" -{ - DLL_TRANSPORT_API zen::TransportPlugin* CreateTransportPlugin(); -} - -typedef zen::TransportPlugin* (*PfnCreateTransportPlugin)(); diff --git a/src/plugins/winsock/winsock.cpp b/src/plugins/winsock/winsock.cpp deleted file mode 100644 index a6cfed1e3..000000000 --- a/src/plugins/winsock/winsock.cpp +++ /dev/null @@ -1,352 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include -#include -#include -#include -#include -#include - -#include -#include - -#ifndef _WIN32_WINNT -# define _WIN32_WINNT 0x0A00 -#endif - -ZEN_THIRD_PARTY_INCLUDES_START -#include -#include -#include -ZEN_THIRD_PARTY_INCLUDES_END - -#include - -////////////////////////////////////////////////////////////////////////// - -using namespace zen; - -class SocketTransportPlugin : public TransportPlugin, zen::RefCounted -{ -public: - SocketTransportPlugin(uint16_t BasePort, unsigned int ThreadCount); - ~SocketTransportPlugin(); - - // TransportPlugin implementation - - virtual uint32_t AddRef() const override; - virtual uint32_t Release() const override; - virtual void Initialize(TransportServer* ServerInterface) override; - virtual void Shutdown() override; - virtual bool IsAvailable() override; - -private: - TransportServer* m_ServerInterface = nullptr; - bool m_IsOk = true; - uint16_t m_BasePort = 0; - int m_ThreadCount = 0; - - SOCKET m_ListenSocket{}; - std::thread m_AcceptThread; - std::atomic_flag m_KeepRunning; - std::vector> m_Connections; -}; - -struct SocketTransportConnection : public TransportConnection -{ -public: - SocketTransportConnection(); - ~SocketTransportConnection(); - - void Initialize(TransportServerConnection* ServerConnection, SOCKET ClientSocket); - void HandleConnection(); - - // TransportConnection implementation - - virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override; - virtual void Shutdown(bool Receive, bool Transmit) override; - virtual void CloseConnection() override; - -private: - zen::Ref m_ConnectionHandler; - SOCKET m_ClientSocket{}; - bool m_IsTerminated = false; -}; - -////////////////////////////////////////////////////////////////////////// - -SocketTransportConnection::SocketTransportConnection() -{ -} - -SocketTransportConnection::~SocketTransportConnection() -{ -} - -void -SocketTransportConnection::Initialize(TransportServerConnection* ServerConnection, SOCKET ClientSocket) -{ - // ZEN_ASSERT(!m_ConnectionHandler); - - m_ConnectionHandler = ServerConnection; - m_ClientSocket = ClientSocket; -} - -void -SocketTransportConnection::HandleConnection() -{ - // ZEN_ASSERT(m_ConnectionHandler); - - const int InputBufferSize = 64 * 1024; - std::unique_ptr InputBuffer{new uint8_t[64 * 1024]}; - - do - { - const int RecvBytes = recv(m_ClientSocket, (char*)InputBuffer.get(), InputBufferSize, /* flags */ 0); - - if (RecvBytes == 0) - { - // Connection closed - return CloseConnection(); - } - else if (RecvBytes < 0) - { - // Error - return CloseConnection(); - } - - m_ConnectionHandler->OnBytesRead(InputBuffer.get(), RecvBytes); - } while (m_ClientSocket); -} - -void -SocketTransportConnection::CloseConnection() -{ - if (m_IsTerminated) - { - return; - } - - // ZEN_ASSERT(m_ClientSocket); - m_IsTerminated = true; - - shutdown(m_ClientSocket, SD_BOTH); // We won't be sending or receiving any more data - - closesocket(m_ClientSocket); - m_ClientSocket = 0; -} - -int64_t -SocketTransportConnection::WriteBytes(const void* Buffer, size_t DataSize) -{ - const uint8_t* BufferCursor = reinterpret_cast(Buffer); - int64_t TotalBytesSent = 0; - - while (DataSize) - { - const int MaxBlockSize = 128 * 1024; - const int SendBlockSize = (DataSize > MaxBlockSize) ? MaxBlockSize : (int)DataSize; - const int SentBytes = send(m_ClientSocket, (const char*)BufferCursor, SendBlockSize, /* flags */ 0); - - if (SentBytes < 0) - { - // Error - return SentBytes; - } - - BufferCursor += SentBytes; - DataSize -= SentBytes; - TotalBytesSent += SentBytes; - } - - return TotalBytesSent; -} - -void -SocketTransportConnection::Shutdown(bool Receive, bool Transmit) -{ - if (Receive) - { - if (Transmit) - { - shutdown(m_ClientSocket, SD_BOTH); - } - else - { - shutdown(m_ClientSocket, SD_RECEIVE); - } - } - else if (Transmit) - { - shutdown(m_ClientSocket, SD_SEND); - } -} - -////////////////////////////////////////////////////////////////////////// - -SocketTransportPlugin::SocketTransportPlugin(uint16_t BasePort, unsigned int ThreadCount) -: m_BasePort(BasePort) -, m_ThreadCount(ThreadCount != 0 ? ThreadCount : std::max(std::thread::hardware_concurrency(), 8u)) -{ -#if ZEN_PLATFORM_WINDOWS - WSADATA wsaData; - if (int Result = WSAStartup(0x202, &wsaData); Result != 0) - { - m_IsOk = false; - WSACleanup(); - } -#endif -} - -SocketTransportPlugin::~SocketTransportPlugin() -{ - Shutdown(); - -#if ZEN_PLATFORM_WINDOWS - if (m_IsOk) - { - WSACleanup(); - } -#endif -} - -uint32_t -SocketTransportPlugin::AddRef() const -{ - return RefCounted::AddRef(); -} - -uint32_t -SocketTransportPlugin::Release() const -{ - return RefCounted::Release(); -} - -void -SocketTransportPlugin::Initialize(TransportServer* ServerInterface) -{ - uint16_t Port = m_BasePort; - - m_ServerInterface = ServerInterface; - m_ListenSocket = socket(AF_INET6, SOCK_STREAM, 0); - - if (m_ListenSocket == SOCKET_ERROR || m_ListenSocket == INVALID_SOCKET) - { - throw std::system_error(std::error_code(WSAGetLastError(), std::system_category()), - "socket creation failed in HTTP plugin server init"); - } - - sockaddr_in6 Server{}; - Server.sin6_family = AF_INET6; - Server.sin6_port = htons(Port); - Server.sin6_addr = in6addr_any; - - if (int Result = bind(m_ListenSocket, (sockaddr*)&Server, sizeof(Server)); Result == SOCKET_ERROR) - { - throw std::system_error(std::error_code(WSAGetLastError(), std::system_category()), "bind call failed in HTTP plugin server init"); - } - - if (int Result = listen(m_ListenSocket, AF_INET6); Result == SOCKET_ERROR) - { - throw std::system_error(std::error_code(WSAGetLastError(), std::system_category()), - "listen call failed in HTTP plugin server init"); - } - - m_KeepRunning.test_and_set(); - - m_AcceptThread = std::thread([&] { - // SetCurrentThreadName("http_plugin_acceptor"); - - // ZEN_INFO("HTTP plugin server waiting for connections"); - - do - { - if (SOCKET ClientSocket = accept(m_ListenSocket, NULL, NULL); ClientSocket != SOCKET_ERROR) - { - int Flag = 1; - setsockopt(ClientSocket, IPPROTO_TCP, TCP_NODELAY, (char*)&Flag, sizeof(Flag)); - - // Handle new connection - SocketTransportConnection* Connection = new SocketTransportConnection(); - TransportServerConnection* ConnectionInterface{m_ServerInterface->CreateConnectionHandler(Connection)}; - Connection->Initialize(ConnectionInterface, ClientSocket); - - m_Connections.push_back(std::async(std::launch::async, [Connection] { - try - { - Connection->HandleConnection(); - } - catch (std::exception&) - { - // ZEN_WARN("exception caught in connection loop: {}", Ex.what()); - } - - delete Connection; - })); - } - else - { - } - } while (m_KeepRunning.test()); - - // ZEN_INFO("HTTP plugin server accept thread exit"); - }); -} - -void -SocketTransportPlugin::Shutdown() -{ - // TODO: all pending/ongoing work should be drained here as well - - m_KeepRunning.clear(); - - closesocket(m_ListenSocket); - m_ListenSocket = 0; - - if (m_AcceptThread.joinable()) - { - m_AcceptThread.join(); - } -} - -bool -SocketTransportPlugin::IsAvailable() -{ - return true; -} - -////////////////////////////////////////////////////////////////////////// - -TransportPlugin* -CreateTransportPlugin() -{ - return new SocketTransportPlugin(1337, 8); -} - -BOOL WINAPI -DllMain([[maybe_unused]] HINSTANCE hinstDLL, // handle to DLL module - DWORD fdwReason, // reason for calling function - LPVOID lpvReserved) // reserved -{ - // Perform actions based on the reason for calling. - switch (fdwReason) - { - case DLL_PROCESS_ATTACH: - break; - - case DLL_THREAD_ATTACH: - break; - - case DLL_THREAD_DETACH: - break; - - case DLL_PROCESS_DETACH: - if (lpvReserved != nullptr) - { - break; // do not do cleanup if process termination scenario - } - break; - } - - return TRUE; -} diff --git a/src/plugins/winsock/xmake.lua b/src/plugins/winsock/xmake.lua deleted file mode 100644 index 408a248b1..000000000 --- a/src/plugins/winsock/xmake.lua +++ /dev/null @@ -1,14 +0,0 @@ --- Copyright Epic Games, Inc. All Rights Reserved. - -target("winsock") - set_kind("shared") - add_headerfiles("**.h") - add_files("**.cpp") - add_links("Ws2_32") - add_includedirs(".", "../../zencore/include") - set_symbols("debug") - add_deps("plugins") - - if is_mode("release") then - set_optimize("fastest") - end diff --git a/src/plugins/xmake.lua b/src/plugins/xmake.lua deleted file mode 100644 index 9e4d49685..000000000 --- a/src/plugins/xmake.lua +++ /dev/null @@ -1,7 +0,0 @@ --- Copyright Epic Games, Inc. All Rights Reserved. - -target('plugins') - set_kind("headeronly") - set_group("plugins") - add_headerfiles("**.h") - add_includedirs("include", {public=true}) diff --git a/src/transports/transport-sdk/include/transportplugin.h b/src/transports/transport-sdk/include/transportplugin.h new file mode 100644 index 000000000..aee5b2e7a --- /dev/null +++ b/src/transports/transport-sdk/include/transportplugin.h @@ -0,0 +1,111 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include + +// Important note: this header is meant to compile standalone +// and should therefore not depend on anything from the Zen codebase + +namespace zen { + +class TransportConnection; +class TransportPlugin; +class TransportServerConnection; +class TransportServer; + +/************************************************************************* + + The following interfaces are implemented on the server side, and instances + are provided to the plugins. + +*************************************************************************/ + +/** Plugin-server interface for connection + + This is returned by a call to TransportServer::CreateConnectionHandler + and there should be one instance created per established connection + + The plugin uses this interface to feed data into the server side + protocol implementation which will parse the incoming messages and + dispatch to appropriate request handlers and ultimately call into + TransportConnection functions which write data back to the client + */ +class TransportServerConnection +{ +public: + virtual uint32_t AddRef() const = 0; + virtual uint32_t Release() const = 0; + virtual void OnBytesRead(const void* Buffer, size_t DataSize) = 0; +}; + +/** Server interface + + There will be one instance of this provided by the system to the transport plugin + + The plugin can use this to register new connections + + */ +class TransportServer +{ +public: + virtual TransportServerConnection* CreateConnectionHandler(TransportConnection* Connection) = 0; +}; + +/************************************************************************* + + The following interfaces are to be implemented by transport plugins. + +*************************************************************************/ + +/** Interface which needs to be implemented by a transport plugin + + This is responsible for setting up and running the communication + for a given transport. + + Once initialized, the plugin should be ready to accept connections + using its own execution resources (threads, thread pools etc) + */ +class TransportPlugin +{ +public: + virtual uint32_t AddRef() const = 0; + virtual uint32_t Release() const = 0; + virtual void Initialize(TransportServer* ServerInterface) = 0; + virtual void Shutdown() = 0; + + /** Check whether this transport is usable. + */ + virtual bool IsAvailable() = 0; +}; + +/** A transport plugin provider needs to implement this interface + + The plugin should create one instance of this per established + connection and register it with the TransportServer instance + CreateConnectionHandler() function. The server will subsequently + use this interface to write response data back to the client and + to manage the connection life cycle in general +*/ +class TransportConnection +{ +public: + virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) = 0; + virtual void Shutdown(bool Receive, bool Transmit) = 0; + virtual void CloseConnection() = 0; +}; + +} // namespace zen + +#if defined(_MSC_VER) +# define DLL_TRANSPORT_API __declspec(dllexport) +#else +# define DLL_TRANSPORT_API +#endif + +extern "C" +{ + DLL_TRANSPORT_API zen::TransportPlugin* CreateTransportPlugin(); +} + +typedef zen::TransportPlugin* (*PfnCreateTransportPlugin)(); diff --git a/src/transports/transport-sdk/xmake.lua b/src/transports/transport-sdk/xmake.lua new file mode 100644 index 000000000..60387c26f --- /dev/null +++ b/src/transports/transport-sdk/xmake.lua @@ -0,0 +1,7 @@ +-- Copyright Epic Games, Inc. All Rights Reserved. + +target('transport-sdk') + set_kind("headeronly") + set_group("transports") + add_headerfiles("**.h") + add_includedirs("include", {public=true}) diff --git a/src/transports/winsock/winsock.cpp b/src/transports/winsock/winsock.cpp new file mode 100644 index 000000000..a6cfed1e3 --- /dev/null +++ b/src/transports/winsock/winsock.cpp @@ -0,0 +1,352 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include +#include +#include +#include +#include +#include + +#include +#include + +#ifndef _WIN32_WINNT +# define _WIN32_WINNT 0x0A00 +#endif + +ZEN_THIRD_PARTY_INCLUDES_START +#include +#include +#include +ZEN_THIRD_PARTY_INCLUDES_END + +#include + +////////////////////////////////////////////////////////////////////////// + +using namespace zen; + +class SocketTransportPlugin : public TransportPlugin, zen::RefCounted +{ +public: + SocketTransportPlugin(uint16_t BasePort, unsigned int ThreadCount); + ~SocketTransportPlugin(); + + // TransportPlugin implementation + + virtual uint32_t AddRef() const override; + virtual uint32_t Release() const override; + virtual void Initialize(TransportServer* ServerInterface) override; + virtual void Shutdown() override; + virtual bool IsAvailable() override; + +private: + TransportServer* m_ServerInterface = nullptr; + bool m_IsOk = true; + uint16_t m_BasePort = 0; + int m_ThreadCount = 0; + + SOCKET m_ListenSocket{}; + std::thread m_AcceptThread; + std::atomic_flag m_KeepRunning; + std::vector> m_Connections; +}; + +struct SocketTransportConnection : public TransportConnection +{ +public: + SocketTransportConnection(); + ~SocketTransportConnection(); + + void Initialize(TransportServerConnection* ServerConnection, SOCKET ClientSocket); + void HandleConnection(); + + // TransportConnection implementation + + virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override; + virtual void Shutdown(bool Receive, bool Transmit) override; + virtual void CloseConnection() override; + +private: + zen::Ref m_ConnectionHandler; + SOCKET m_ClientSocket{}; + bool m_IsTerminated = false; +}; + +////////////////////////////////////////////////////////////////////////// + +SocketTransportConnection::SocketTransportConnection() +{ +} + +SocketTransportConnection::~SocketTransportConnection() +{ +} + +void +SocketTransportConnection::Initialize(TransportServerConnection* ServerConnection, SOCKET ClientSocket) +{ + // ZEN_ASSERT(!m_ConnectionHandler); + + m_ConnectionHandler = ServerConnection; + m_ClientSocket = ClientSocket; +} + +void +SocketTransportConnection::HandleConnection() +{ + // ZEN_ASSERT(m_ConnectionHandler); + + const int InputBufferSize = 64 * 1024; + std::unique_ptr InputBuffer{new uint8_t[64 * 1024]}; + + do + { + const int RecvBytes = recv(m_ClientSocket, (char*)InputBuffer.get(), InputBufferSize, /* flags */ 0); + + if (RecvBytes == 0) + { + // Connection closed + return CloseConnection(); + } + else if (RecvBytes < 0) + { + // Error + return CloseConnection(); + } + + m_ConnectionHandler->OnBytesRead(InputBuffer.get(), RecvBytes); + } while (m_ClientSocket); +} + +void +SocketTransportConnection::CloseConnection() +{ + if (m_IsTerminated) + { + return; + } + + // ZEN_ASSERT(m_ClientSocket); + m_IsTerminated = true; + + shutdown(m_ClientSocket, SD_BOTH); // We won't be sending or receiving any more data + + closesocket(m_ClientSocket); + m_ClientSocket = 0; +} + +int64_t +SocketTransportConnection::WriteBytes(const void* Buffer, size_t DataSize) +{ + const uint8_t* BufferCursor = reinterpret_cast(Buffer); + int64_t TotalBytesSent = 0; + + while (DataSize) + { + const int MaxBlockSize = 128 * 1024; + const int SendBlockSize = (DataSize > MaxBlockSize) ? MaxBlockSize : (int)DataSize; + const int SentBytes = send(m_ClientSocket, (const char*)BufferCursor, SendBlockSize, /* flags */ 0); + + if (SentBytes < 0) + { + // Error + return SentBytes; + } + + BufferCursor += SentBytes; + DataSize -= SentBytes; + TotalBytesSent += SentBytes; + } + + return TotalBytesSent; +} + +void +SocketTransportConnection::Shutdown(bool Receive, bool Transmit) +{ + if (Receive) + { + if (Transmit) + { + shutdown(m_ClientSocket, SD_BOTH); + } + else + { + shutdown(m_ClientSocket, SD_RECEIVE); + } + } + else if (Transmit) + { + shutdown(m_ClientSocket, SD_SEND); + } +} + +////////////////////////////////////////////////////////////////////////// + +SocketTransportPlugin::SocketTransportPlugin(uint16_t BasePort, unsigned int ThreadCount) +: m_BasePort(BasePort) +, m_ThreadCount(ThreadCount != 0 ? ThreadCount : std::max(std::thread::hardware_concurrency(), 8u)) +{ +#if ZEN_PLATFORM_WINDOWS + WSADATA wsaData; + if (int Result = WSAStartup(0x202, &wsaData); Result != 0) + { + m_IsOk = false; + WSACleanup(); + } +#endif +} + +SocketTransportPlugin::~SocketTransportPlugin() +{ + Shutdown(); + +#if ZEN_PLATFORM_WINDOWS + if (m_IsOk) + { + WSACleanup(); + } +#endif +} + +uint32_t +SocketTransportPlugin::AddRef() const +{ + return RefCounted::AddRef(); +} + +uint32_t +SocketTransportPlugin::Release() const +{ + return RefCounted::Release(); +} + +void +SocketTransportPlugin::Initialize(TransportServer* ServerInterface) +{ + uint16_t Port = m_BasePort; + + m_ServerInterface = ServerInterface; + m_ListenSocket = socket(AF_INET6, SOCK_STREAM, 0); + + if (m_ListenSocket == SOCKET_ERROR || m_ListenSocket == INVALID_SOCKET) + { + throw std::system_error(std::error_code(WSAGetLastError(), std::system_category()), + "socket creation failed in HTTP plugin server init"); + } + + sockaddr_in6 Server{}; + Server.sin6_family = AF_INET6; + Server.sin6_port = htons(Port); + Server.sin6_addr = in6addr_any; + + if (int Result = bind(m_ListenSocket, (sockaddr*)&Server, sizeof(Server)); Result == SOCKET_ERROR) + { + throw std::system_error(std::error_code(WSAGetLastError(), std::system_category()), "bind call failed in HTTP plugin server init"); + } + + if (int Result = listen(m_ListenSocket, AF_INET6); Result == SOCKET_ERROR) + { + throw std::system_error(std::error_code(WSAGetLastError(), std::system_category()), + "listen call failed in HTTP plugin server init"); + } + + m_KeepRunning.test_and_set(); + + m_AcceptThread = std::thread([&] { + // SetCurrentThreadName("http_plugin_acceptor"); + + // ZEN_INFO("HTTP plugin server waiting for connections"); + + do + { + if (SOCKET ClientSocket = accept(m_ListenSocket, NULL, NULL); ClientSocket != SOCKET_ERROR) + { + int Flag = 1; + setsockopt(ClientSocket, IPPROTO_TCP, TCP_NODELAY, (char*)&Flag, sizeof(Flag)); + + // Handle new connection + SocketTransportConnection* Connection = new SocketTransportConnection(); + TransportServerConnection* ConnectionInterface{m_ServerInterface->CreateConnectionHandler(Connection)}; + Connection->Initialize(ConnectionInterface, ClientSocket); + + m_Connections.push_back(std::async(std::launch::async, [Connection] { + try + { + Connection->HandleConnection(); + } + catch (std::exception&) + { + // ZEN_WARN("exception caught in connection loop: {}", Ex.what()); + } + + delete Connection; + })); + } + else + { + } + } while (m_KeepRunning.test()); + + // ZEN_INFO("HTTP plugin server accept thread exit"); + }); +} + +void +SocketTransportPlugin::Shutdown() +{ + // TODO: all pending/ongoing work should be drained here as well + + m_KeepRunning.clear(); + + closesocket(m_ListenSocket); + m_ListenSocket = 0; + + if (m_AcceptThread.joinable()) + { + m_AcceptThread.join(); + } +} + +bool +SocketTransportPlugin::IsAvailable() +{ + return true; +} + +////////////////////////////////////////////////////////////////////////// + +TransportPlugin* +CreateTransportPlugin() +{ + return new SocketTransportPlugin(1337, 8); +} + +BOOL WINAPI +DllMain([[maybe_unused]] HINSTANCE hinstDLL, // handle to DLL module + DWORD fdwReason, // reason for calling function + LPVOID lpvReserved) // reserved +{ + // Perform actions based on the reason for calling. + switch (fdwReason) + { + case DLL_PROCESS_ATTACH: + break; + + case DLL_THREAD_ATTACH: + break; + + case DLL_THREAD_DETACH: + break; + + case DLL_PROCESS_DETACH: + if (lpvReserved != nullptr) + { + break; // do not do cleanup if process termination scenario + } + break; + } + + return TRUE; +} diff --git a/src/transports/winsock/xmake.lua b/src/transports/winsock/xmake.lua new file mode 100644 index 000000000..9f9a32daf --- /dev/null +++ b/src/transports/winsock/xmake.lua @@ -0,0 +1,14 @@ +-- Copyright Epic Games, Inc. All Rights Reserved. + +target("winsock") + set_kind("shared") + add_headerfiles("**.h") + add_files("**.cpp") + add_links("Ws2_32") + add_includedirs(".", "../../zencore/include") + set_symbols("debug") + add_deps("transport-sdk") + + if is_mode("release") then + set_optimize("fastest") + end diff --git a/src/transports/xmake.lua b/src/transports/xmake.lua new file mode 100644 index 000000000..44800a8af --- /dev/null +++ b/src/transports/xmake.lua @@ -0,0 +1,10 @@ +-- Copyright Epic Games, Inc. All Rights Reserved. + +set_warnings("allextra", "error") +set_languages("cxx20") + +includes('transport-sdk') + +if is_plat("windows") then + includes("winsock") +end diff --git a/src/zenhttp/xmake.lua b/src/zenhttp/xmake.lua index 9c3869911..588fd8b87 100644 --- a/src/zenhttp/xmake.lua +++ b/src/zenhttp/xmake.lua @@ -7,7 +7,7 @@ target('zenhttp') add_files("**.cpp") add_files("httpsys.cpp", {unity_ignored=true}) add_includedirs("include", {public=true}) - add_deps("zencore", "plugins") + add_deps("zencore", "transport-sdk") add_packages( "vcpkg::cpr", "vcpkg::curl", -- required by cpr -- cgit v1.2.3 From b26281d23ace307e26fa5262986c7485474b1543 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 12 Oct 2023 13:51:38 +0200 Subject: added initial README.md --- src/transports/README.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 src/transports/README.md (limited to 'src') diff --git a/src/transports/README.md b/src/transports/README.md new file mode 100644 index 000000000..ec6015703 --- /dev/null +++ b/src/transports/README.md @@ -0,0 +1,5 @@ +This code corresponds to the code in [UE5/Engine/Source/Developer/ZenPluggableTransport](https://github.com/EpicGames/UnrealEngine/tree/release/Engine/Source/Developer), +and provides the API definitions for creating transpor plug-ins for use with the +Zen server. Pluggable transports allow us to support a variety of transport mechanisms, +including some which are not possible to share with a general audience. These are +developed and maintained in the UE tree or elsewhere for logistical and legal reasons. -- cgit v1.2.3 From 9230a19e52f52d26772710b601ab23b89aa7cc0d Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 12 Oct 2023 13:52:41 +0200 Subject: Update README.md --- src/transports/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/transports/README.md b/src/transports/README.md index ec6015703..a4079f178 100644 --- a/src/transports/README.md +++ b/src/transports/README.md @@ -1,5 +1,5 @@ This code corresponds to the code in [UE5/Engine/Source/Developer/ZenPluggableTransport](https://github.com/EpicGames/UnrealEngine/tree/release/Engine/Source/Developer), -and provides the API definitions for creating transpor plug-ins for use with the +and provides the API definitions for creating transport plug-ins for use with the Zen server. Pluggable transports allow us to support a variety of transport mechanisms, including some which are not possible to share with a general audience. These are developed and maintained in the UE tree or elsewhere for logistical and legal reasons. -- cgit v1.2.3