diff options
Diffstat (limited to 'src/zenhttp')
| -rw-r--r-- | src/zenhttp/httpasio.cpp | 1 | ||||
| -rw-r--r-- | src/zenhttp/httpasio.h | 3 | ||||
| -rw-r--r-- | src/zenhttp/httpserver.cpp | 18 | ||||
| -rw-r--r-- | src/zenhttp/httpsys.cpp | 88 | ||||
| -rw-r--r-- | src/zenhttp/httpsys.h | 87 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/transportplugin.h | 97 | ||||
| -rw-r--r-- | src/zenhttp/transports/asiotransport.cpp | 439 | ||||
| -rw-r--r-- | src/zenhttp/transports/asiotransport.h | 15 | ||||
| -rw-r--r-- | src/zenhttp/transports/dlltransport.cpp (renamed from src/zenhttp/dlltransport.cpp) | 0 | ||||
| -rw-r--r-- | src/zenhttp/transports/dlltransport.h (renamed from src/zenhttp/dlltransport.h) | 0 | ||||
| -rw-r--r-- | src/zenhttp/transports/winsocktransport.cpp (renamed from src/zenhttp/winsocktransport.cpp) | 0 | ||||
| -rw-r--r-- | src/zenhttp/transports/winsocktransport.h (renamed from src/zenhttp/winsocktransport.h) | 0 | ||||
| -rw-r--r-- | src/zenhttp/xmake.lua | 2 |
13 files changed, 562 insertions, 188 deletions
diff --git a/src/zenhttp/httpasio.cpp b/src/zenhttp/httpasio.cpp index f23e0edb1..0c6b189f9 100644 --- a/src/zenhttp/httpasio.cpp +++ b/src/zenhttp/httpasio.cpp @@ -4,6 +4,7 @@ #include <zencore/except.h> #include <zencore/logging.h> +#include <zencore/thread.h> #include <zencore/trace.h> #include <zenhttp/httpserver.h> diff --git a/src/zenhttp/httpasio.h b/src/zenhttp/httpasio.h index 57068f7c5..2366f3437 100644 --- a/src/zenhttp/httpasio.h +++ b/src/zenhttp/httpasio.h @@ -2,11 +2,8 @@ #pragma once -#include <zencore/thread.h> #include <zenhttp/httpserver.h> -#include <memory> - namespace zen { Ref<HttpServer> CreateHttpAsioServer(unsigned int ThreadCount); diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp index f22438a58..a2ea4cff8 100644 --- a/src/zenhttp/httpserver.cpp +++ b/src/zenhttp/httpserver.cpp @@ -9,8 +9,9 @@ #include "zenhttp/httpplugin.h" #if ZEN_WITH_PLUGINS -# include "dlltransport.h" -# include "winsocktransport.h" +# include "transports/asiotransport.h" +# include "transports/dlltransport.h" +# include "transports/winsocktransport.h" #endif #include <zencore/compactbinary.h> @@ -771,6 +772,11 @@ CreateHttpServer(const HttpServerConfig& Config) # endif # if 0 + Ref<TransportPlugin> AsioPlugin{CreateAsioTransportPlugin(1337, Config.ThreadCount)}; + Server->AddPlugin(AsioPlugin); +# endif + +# if 0 Ref<DllTransportPlugin> DllPlugin{new DllTransportPlugin(1337, Config.ThreadCount)}; DllPlugin->LoadDll("winsock"); Server->AddPlugin(DllPlugin); @@ -783,10 +789,10 @@ CreateHttpServer(const HttpServerConfig& Config) #if ZEN_WITH_HTTPSYS case HttpServerClass::kHttpSys: ZEN_INFO("using http.sys server implementation"); - return Ref<HttpServer>(new HttpSysServer({.ThreadCount = Config.ThreadCount, - .AsyncWorkThreadCount = Config.HttpSys.AsyncWorkThreadCount, - .IsAsyncResponseEnabled = Config.HttpSys.IsAsyncResponseEnabled, - .IsRequestLoggingEnabled = Config.HttpSys.IsRequestLoggingEnabled})); + return Ref<HttpServer>(CreateHttpSysServer({.ThreadCount = Config.ThreadCount, + .AsyncWorkThreadCount = Config.HttpSys.AsyncWorkThreadCount, + .IsAsyncResponseEnabled = Config.HttpSys.IsAsyncResponseEnabled, + .IsRequestLoggingEnabled = Config.HttpSys.IsRequestLoggingEnabled})); #endif case HttpServerClass::kHttpNull: diff --git a/src/zenhttp/httpsys.cpp b/src/zenhttp/httpsys.cpp index c7ed0bb2f..8401dcf83 100644 --- a/src/zenhttp/httpsys.cpp +++ b/src/zenhttp/httpsys.cpp @@ -16,6 +16,86 @@ #include <zenhttp/httpshared.h> #if ZEN_WITH_HTTPSYS +# define _WINSOCKAPI_ +# include <zencore/windows.h> +# include <zencore/workthreadpool.h> +# include "iothreadpool.h" + +# include <http.h> + +namespace spdlog { +class logger; +} + +namespace zen { + +/** + * @brief Windows implementation of HTTP server based on http.sys + * + * This requires elevation to function + */ +class HttpSysServer : public HttpServer +{ + friend class HttpSysTransaction; + +public: + explicit HttpSysServer(const HttpSysConfig& Config); + ~HttpSysServer(); + + // HttpServer interface implementation + + virtual int Initialize(int BasePort) override; + virtual void Run(bool TestMode) override; + virtual void RequestExit() override; + virtual void RegisterService(HttpService& Service) override; + virtual void Close() override; + + WorkerThreadPool& WorkPool(); + + inline bool IsOk() const { return m_IsOk; } + inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; } + +private: + int InitializeServer(int BasePort); + void Cleanup(); + + void StartServer(); + void OnHandlingNewRequest(); + void IssueNewRequestMaybe(); + + void RegisterService(const char* Endpoint, HttpService& Service); + void UnregisterService(const char* Endpoint, HttpService& Service); + +private: + spdlog::logger& m_Log; + spdlog::logger& m_RequestLog; + spdlog::logger& Log() { return m_Log; } + + bool m_IsOk = false; + bool m_IsHttpInitialized = false; + bool m_IsRequestLoggingEnabled = false; + bool m_IsAsyncResponseEnabled = true; + + WinIoThreadPool m_ThreadPool; + RwLock m_AsyncWorkPoolInitLock; + WorkerThreadPool* m_AsyncWorkPool = nullptr; + + std::vector<std::wstring> m_BaseUris; // eg: http://*:nnnn/ + HTTP_SERVER_SESSION_ID m_HttpSessionId = 0; + HTTP_URL_GROUP_ID m_HttpUrlGroupId = 0; + HANDLE m_RequestQueueHandle = 0; + std::atomic_int32_t m_PendingRequests{0}; + std::atomic_int32_t m_IsShuttingDown{0}; + int32_t m_MinPendingRequests = 16; + int32_t m_MaxPendingRequests = 128; + Event m_ShutdownEvent; + const HttpSysConfig m_InitialConfig; +}; + +} // namespace zen +#endif + +#if ZEN_WITH_HTTPSYS # include <conio.h> # include <mstcpip.h> @@ -809,7 +889,7 @@ HttpAsyncWorkRequest::AsyncWorkItem::Execute() \/ \/ \/ */ -HttpSysServer::HttpSysServer(const Config& Config) +HttpSysServer::HttpSysServer(const HttpSysConfig& Config) : m_Log(logging::Get("http")) , m_RequestLog(logging::Get("http_requests")) , m_IsRequestLoggingEnabled(Config.IsRequestLoggingEnabled) @@ -1868,5 +1948,11 @@ HttpSysServer::RegisterService(HttpService& Service) RegisterService(Service.BaseUri(), Service); } +Ref<HttpServer> +CreateHttpSysServer(HttpSysConfig Config) +{ + return Ref<HttpServer>(new HttpSysServer(Config)); +} + } // namespace zen #endif diff --git a/src/zenhttp/httpsys.h b/src/zenhttp/httpsys.h index 65239bae7..1553d56ef 100644 --- a/src/zenhttp/httpsys.h +++ b/src/zenhttp/httpsys.h @@ -12,89 +12,16 @@ # endif #endif -#if ZEN_WITH_HTTPSYS -# define _WINSOCKAPI_ -# include <zencore/windows.h> -# include <zencore/workthreadpool.h> -# include "iothreadpool.h" - -# include <http.h> - -namespace spdlog { -class logger; -} - namespace zen { -/** - * @brief Windows implementation of HTTP server based on http.sys - * - * This requires elevation to function - */ -class HttpSysServer : public HttpServer +struct HttpSysConfig { - friend class HttpSysTransaction; - -public: - struct Config - { - unsigned int ThreadCount = 0; - unsigned int AsyncWorkThreadCount = 0; - bool IsAsyncResponseEnabled = true; - bool IsRequestLoggingEnabled = false; - }; - explicit HttpSysServer(const Config& Config); - ~HttpSysServer(); - - // HttpServer interface implementation - - virtual int Initialize(int BasePort) override; - virtual void Run(bool TestMode) override; - virtual void RequestExit() override; - virtual void RegisterService(HttpService& Service) override; - virtual void Close() override; - - WorkerThreadPool& WorkPool(); - - inline bool IsOk() const { return m_IsOk; } - inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; } - -private: - int InitializeServer(int BasePort); - void Cleanup(); - - void StartServer(); - void OnHandlingNewRequest(); - void IssueNewRequestMaybe(); - - void RegisterService(const char* Endpoint, HttpService& Service); - void UnregisterService(const char* Endpoint, HttpService& Service); - -private: - spdlog::logger& m_Log; - spdlog::logger& m_RequestLog; - spdlog::logger& Log() { return m_Log; } - - bool m_IsOk = false; - bool m_IsHttpInitialized = false; - bool m_IsRequestLoggingEnabled = false; - bool m_IsAsyncResponseEnabled = true; - - WinIoThreadPool m_ThreadPool; - RwLock m_AsyncWorkPoolInitLock; - WorkerThreadPool* m_AsyncWorkPool = nullptr; - - std::vector<std::wstring> m_BaseUris; // eg: http://*:nnnn/ - HTTP_SERVER_SESSION_ID m_HttpSessionId = 0; - HTTP_URL_GROUP_ID m_HttpUrlGroupId = 0; - HANDLE m_RequestQueueHandle = 0; - std::atomic_int32_t m_PendingRequests{0}; - std::atomic_int32_t m_IsShuttingDown{0}; - int32_t m_MinPendingRequests = 16; - int32_t m_MaxPendingRequests = 128; - Event m_ShutdownEvent; - const Config m_InitialConfig; + unsigned int ThreadCount = 0; + unsigned int AsyncWorkThreadCount = 0; + bool IsAsyncResponseEnabled = true; + bool IsRequestLoggingEnabled = false; }; +Ref<HttpServer> CreateHttpSysServer(HttpSysConfig Config); + } // namespace zen -#endif diff --git a/src/zenhttp/include/zenhttp/transportplugin.h b/src/zenhttp/include/zenhttp/transportplugin.h deleted file mode 100644 index fe17680de..000000000 --- a/src/zenhttp/include/zenhttp/transportplugin.h +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <stdint.h> - -// Important note: this header is meant to compile standalone -// and should therefore not depend on anything from the Zen codebase - -class TransportConnection; -class TransportPlugin; -class TransportServerConnection; -class TransportServer; - -/************************************************************************* - - The following interfaces are implemented on the server side, and instances - are provided to the plugins. - -*************************************************************************/ - -/** Plugin-server interface for connection - * - * This is how the transport feeds data to the connection handler - * which will parse the incoming messages and dispatch to - * appropriate request handlers and ultimately call into functions - * which write data back to the client. - */ -class TransportServerConnection -{ -public: - virtual uint32_t AddRef() const = 0; - virtual uint32_t Release() const = 0; - virtual void OnBytesRead(const void* Buffer, size_t DataSize) = 0; -}; - -/** Plugin-server interface - * - * There will be one instance of this per plugin, and the plugin - * should use this to manage lifetimes of connections and any - * other resources. - */ -class TransportServer -{ -public: - virtual TransportServerConnection* CreateConnectionHandler(TransportConnection* Connection) = 0; -}; - -/************************************************************************* - - The following interfaces are to be implemented by transport plugins. - -*************************************************************************/ - -/** Interface which needs to be implemented by a transport plugin - * - * This is responsible for setting up and running the communication - * for a given transport. - */ -class TransportPlugin -{ -public: - virtual uint32_t AddRef() const = 0; - virtual uint32_t Release() const = 0; - virtual void Initialize(TransportServer* ServerInterface) = 0; - virtual void Shutdown() = 0; - - /** Check whether this transport is usable. - */ - virtual bool IsAvailable() = 0; -}; - -/** A transport plugin provider needs to implement this interface - * - * There will be one instance of this per established connection and - * this interface is used to write response data back to the client. - */ -class TransportConnection -{ -public: - virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) = 0; - virtual void Shutdown(bool Receive, bool Transmit) = 0; - virtual void CloseConnection() = 0; -}; - -#if defined(_MSC_VER) -# define DLL_TRANSPORT_API __declspec(dllexport) -#else -# define DLL_TRANSPORT_API -#endif - -extern "C" -{ - DLL_TRANSPORT_API TransportPlugin* CreateTransportPlugin(); -} - -typedef TransportPlugin* (*PfnCreateTransportPlugin)(); diff --git a/src/zenhttp/transports/asiotransport.cpp b/src/zenhttp/transports/asiotransport.cpp new file mode 100644 index 000000000..b8fef8f5f --- /dev/null +++ b/src/zenhttp/transports/asiotransport.cpp @@ -0,0 +1,439 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "winsocktransport.h" + +#if ZEN_WITH_PLUGINS + +# include <zencore/logging.h> +# include <zencore/scopeguard.h> +# include <zencore/workthreadpool.h> + +ZEN_THIRD_PARTY_INCLUDES_START +# include <asio.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +# if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +ZEN_THIRD_PARTY_INCLUDES_START +# include <mstcpip.h> +ZEN_THIRD_PARTY_INCLUDES_END +# endif + +# include <fmt/format.h> + +# include <memory> +# include <thread> + +namespace zen { + +struct AsioTransportAcceptor; + +class AsioTransportPlugin : public TransportPlugin, RefCounted +{ +public: + AsioTransportPlugin(uint16_t BasePort, unsigned int ThreadCount); + ~AsioTransportPlugin(); + + virtual uint32_t AddRef() const override; + virtual uint32_t Release() const override; + virtual void Initialize(TransportServer* ServerInterface) override; + virtual void Shutdown() override; + virtual bool IsAvailable() override; + +private: + bool m_IsOk = true; + uint16_t m_BasePort = 0; + int m_ThreadCount = 0; + + asio::io_service m_IoService; + asio::io_service::work m_Work{m_IoService}; + std::unique_ptr<AsioTransportAcceptor> m_Acceptor; + std::vector<std::thread> m_ThreadPool; +}; + +struct AsioTransportConnection : public TransportConnection, std::enable_shared_from_this<AsioTransportConnection> +{ + AsioTransportConnection(std::unique_ptr<asio::ip::tcp::socket>&& Socket); + ~AsioTransportConnection(); + + void Initialize(TransportServerConnection* ConnectionHandler); + + std::shared_ptr<AsioTransportConnection> AsSharedPtr() { return shared_from_this(); } + + // TransportConnectionInterface + + virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override; + virtual void Shutdown(bool Receive, bool Transmit) override; + virtual void CloseConnection() override; + +private: + void EnqueueRead(); + void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount); + void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount); + + Ref<TransportServerConnection> m_ConnectionHandler; + asio::streambuf m_RequestBuffer; + std::unique_ptr<asio::ip::tcp::socket> m_Socket; + uint32_t m_ConnectionId = 0; + std::atomic_flag m_IsTerminated{}; +}; + +////////////////////////////////////////////////////////////////////////// + +struct AsioTransportAcceptor +{ + AsioTransportAcceptor(TransportServer* ServerInterface, asio::io_service& IoService, uint16_t BasePort); + ~AsioTransportAcceptor(); + + void Start(); + void RequestStop(); + + inline int GetAcceptPort() { return m_Acceptor.local_endpoint().port(); } + +private: + TransportServer* m_ServerInterface = nullptr; + asio::io_service& m_IoService; + asio::ip::tcp::acceptor m_Acceptor; + std::atomic<bool> m_IsStopped{false}; + + void EnqueueAccept(); +}; + +////////////////////////////////////////////////////////////////////////// + +AsioTransportAcceptor::AsioTransportAcceptor(TransportServer* ServerInterface, asio::io_service& IoService, uint16_t BasePort) +: m_ServerInterface(ServerInterface) +, m_IoService(IoService) +, m_Acceptor(m_IoService, asio::ip::tcp::v6()) +{ + m_Acceptor.set_option(asio::ip::v6_only(false)); + +# if ZEN_PLATFORM_WINDOWS + // Special option for Windows settings as !asio::socket_base::reuse_address is not the same as exclusive access on Windows platforms + typedef asio::detail::socket_option::boolean<ASIO_OS_DEF(SOL_SOCKET), SO_EXCLUSIVEADDRUSE> exclusive_address; + m_Acceptor.set_option(exclusive_address(true)); +# else + m_Acceptor.set_option(asio::socket_base::reuse_address(false)); +# endif + + m_Acceptor.set_option(asio::ip::tcp::no_delay(true)); + m_Acceptor.set_option(asio::socket_base::receive_buffer_size(128 * 1024)); + m_Acceptor.set_option(asio::socket_base::send_buffer_size(256 * 1024)); + + uint16_t EffectivePort = BasePort; + + asio::error_code BindErrorCode; + m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v6::any(), EffectivePort), BindErrorCode); + // Sharing violation implies the port is being used by another process + for (uint16_t PortOffset = 1; (BindErrorCode == asio::error::address_in_use) && (PortOffset < 10); ++PortOffset) + { + EffectivePort = BasePort + (PortOffset * 100); + m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v6::any(), EffectivePort), BindErrorCode); + } + if (BindErrorCode == asio::error::access_denied) + { + EffectivePort = 0; + m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v6::any(), EffectivePort), BindErrorCode); + } + if (BindErrorCode) + { + ZEN_ERROR("Unable open asio service, error '{}'", BindErrorCode.message()); + } + +# if ZEN_PLATFORM_WINDOWS + // On Windows, loopback connections can take advantage of a faster code path optionally with this flag. + // This must be used by both the client and server side, and is only effective in the absence of + // Windows Filtering Platform (WFP) callouts which can be installed by security software. + // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path + SOCKET NativeSocket = m_Acceptor.native_handle(); + int LoopbackOptionValue = 1; + DWORD OptionNumberOfBytesReturned = 0; + WSAIoctl(NativeSocket, + SIO_LOOPBACK_FAST_PATH, + &LoopbackOptionValue, + sizeof(LoopbackOptionValue), + NULL, + 0, + &OptionNumberOfBytesReturned, + 0, + 0); +# endif + + ZEN_INFO("started asio transport at port: {}", EffectivePort); +} + +AsioTransportAcceptor::~AsioTransportAcceptor() +{ +} + +void +AsioTransportAcceptor::Start() +{ + m_Acceptor.listen(); + + EnqueueAccept(); +} + +void +AsioTransportAcceptor::RequestStop() +{ + m_IsStopped = true; +} + +void +AsioTransportAcceptor::EnqueueAccept() +{ + auto SocketPtr = std::make_unique<asio::ip::tcp::socket>(m_IoService); + asio::ip::tcp::socket& SocketRef = *SocketPtr.get(); + + m_Acceptor.async_accept(SocketRef, [this, Socket = std::move(SocketPtr)](const asio::error_code& Ec) mutable { + if (Ec) + { + ZEN_WARN("asio async_accept error ({}:{}): {}", + m_Acceptor.local_endpoint().address().to_string(), + m_Acceptor.local_endpoint().port(), + Ec.message()); + } + else + { + // New connection established, pass socket ownership into connection object + // and initiate request handling loop. The connection lifetime is + // managed by the async read/write loop by passing the shared + // reference to the callbacks. + + Socket->set_option(asio::ip::tcp::no_delay(true)); + Socket->set_option(asio::socket_base::receive_buffer_size(128 * 1024)); + Socket->set_option(asio::socket_base::send_buffer_size(256 * 1024)); + + auto Conn = std::make_shared<AsioTransportConnection>(std::move(Socket)); + Conn->Initialize(m_ServerInterface->CreateConnectionHandler(Conn.get())); + } + + if (!m_IsStopped.load()) + { + EnqueueAccept(); + } + else + { + std::error_code CloseEc; + m_Acceptor.close(CloseEc); + + if (CloseEc) + { + ZEN_WARN("acceptor close ERROR, reason '{}'", CloseEc.message()); + } + } + }); +} + +////////////////////////////////////////////////////////////////////////// + +AsioTransportConnection::AsioTransportConnection(std::unique_ptr<asio::ip::tcp::socket>&& Socket) : m_Socket(std::move(Socket)) +{ +} + +AsioTransportConnection::~AsioTransportConnection() +{ +} + +void +AsioTransportConnection::Initialize(TransportServerConnection* ConnectionHandler) +{ + m_ConnectionHandler = ConnectionHandler; + + EnqueueRead(); +} + +int64_t +AsioTransportConnection::WriteBytes(const void* Buffer, size_t DataSize) +{ + size_t WrittenBytes = asio::write(*m_Socket.get(), asio::const_buffer(Buffer, DataSize), asio::transfer_exactly(DataSize)); + + return WrittenBytes; +} + +void +AsioTransportConnection::Shutdown(bool Receive, bool Transmit) +{ + std::error_code Ec; + if (Receive) + { + if (Transmit) + { + m_Socket->shutdown(asio::socket_base::shutdown_both, Ec); + } + else + { + m_Socket->shutdown(asio::socket_base::shutdown_receive, Ec); + } + } + else if (Transmit) + { + m_Socket->shutdown(asio::socket_base::shutdown_send, Ec); + } +} + +void +AsioTransportConnection::CloseConnection() +{ + if (m_IsTerminated.test()) + { + return; + } + + if (m_IsTerminated.test_and_set() == false) + { + Shutdown(true, true); + + std::error_code Ec; + m_Socket->close(Ec); + } +} + +void +AsioTransportConnection::EnqueueRead() +{ + if (m_IsTerminated.test() == false) + { + m_RequestBuffer.prepare(64 * 1024); + + asio::async_read( + *m_Socket.get(), + m_RequestBuffer, + asio::transfer_at_least(1), + [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnDataReceived(Ec, ByteCount); }); + } +} + +void +AsioTransportConnection::OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount) +{ + ZEN_UNUSED(ByteCount); + + if (Ec) + { + if (!m_IsTerminated.test()) + { + ZEN_WARN("on data received ERROR, connection: {}, reason '{}'", m_ConnectionId, Ec.message()); + } + + const bool Receive = true; + const bool Transmit = true; + return Shutdown(Receive, Transmit); + } + + while (m_RequestBuffer.size()) + { + const asio::const_buffer& InputBuffer = m_RequestBuffer.data(); + m_ConnectionHandler->OnBytesRead(InputBuffer.data(), InputBuffer.size()); + m_RequestBuffer.consume(InputBuffer.size()); + } + + EnqueueRead(); +} + +void +AsioTransportConnection::OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount) +{ + ZEN_UNUSED(ByteCount); + + if (Ec) + { + ZEN_WARN("on data sent ERROR, connection: {}, reason '{}'", m_ConnectionId, Ec.message()); + + const bool Receive = true; + const bool Transmit = true; + return Shutdown(Receive, Transmit); + } +} + +////////////////////////////////////////////////////////////////////////// + +AsioTransportPlugin::AsioTransportPlugin(uint16_t BasePort, unsigned int ThreadCount) +: m_BasePort(BasePort) +, m_ThreadCount(ThreadCount != 0 ? ThreadCount : Max(std::thread::hardware_concurrency(), 8u)) +{ +} + +AsioTransportPlugin::~AsioTransportPlugin() +{ +} + +uint32_t +AsioTransportPlugin::AddRef() const +{ + return RefCounted::AddRef(); +} + +uint32_t +AsioTransportPlugin::Release() const +{ + return RefCounted::Release(); +} + +void +AsioTransportPlugin::Initialize(TransportServer* ServerInterface) +{ + ZEN_ASSERT(m_ThreadCount > 0); + ZEN_ASSERT(ServerInterface); + + ZEN_INFO("starting asio http with {} service threads", m_ThreadCount); + + m_Acceptor.reset(new AsioTransportAcceptor(ServerInterface, m_IoService, m_BasePort)); + m_Acceptor->Start(); + + // This should consist of a set of minimum threads and grow on demand to + // meet concurrency needs? Right now we end up allocating a large number + // of threads even if we never end up using all of them, which seems + // wasteful. It's also not clear how the demand for concurrency should + // be balanced with the engine side - ideally we'd have some kind of + // global scheduling to prevent one side from starving the other side + // and thus preventing progress. Or at the very least, thread priorities + // should be considered + + for (int i = 0; i < m_ThreadCount; ++i) + { + m_ThreadPool.emplace_back([this, ThreadNumber = i + 1] { + SetCurrentThreadName(fmt::format("asio_thr_{}", ThreadNumber)); + + try + { + m_IoService.run(); + } + catch (std::exception& e) + { + ZEN_ERROR("exception caught in asio event loop: {}", e.what()); + } + }); + } + + ZEN_INFO("asio http transport started (port {})", m_Acceptor->GetAcceptPort()); +} + +void +AsioTransportPlugin::Shutdown() +{ + m_Acceptor->RequestStop(); + m_IoService.stop(); + + for (auto& Thread : m_ThreadPool) + { + Thread.join(); + } +} + +bool +AsioTransportPlugin::IsAvailable() +{ + return true; +} + +TransportPlugin* +CreateAsioTransportPlugin(uint16_t BasePort, unsigned int ThreadCount) +{ + return new AsioTransportPlugin(BasePort, ThreadCount); +} + +} // namespace zen + +#endif diff --git a/src/zenhttp/transports/asiotransport.h b/src/zenhttp/transports/asiotransport.h new file mode 100644 index 000000000..b10174b85 --- /dev/null +++ b/src/zenhttp/transports/asiotransport.h @@ -0,0 +1,15 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhttp/httpplugin.h> + +#if ZEN_WITH_PLUGINS + +namespace zen { + +TransportPlugin* CreateAsioTransportPlugin(uint16_t BasePort, unsigned int ThreadCount); + +} // namespace zen + +#endif diff --git a/src/zenhttp/dlltransport.cpp b/src/zenhttp/transports/dlltransport.cpp index 04fb6caaa..04fb6caaa 100644 --- a/src/zenhttp/dlltransport.cpp +++ b/src/zenhttp/transports/dlltransport.cpp diff --git a/src/zenhttp/dlltransport.h b/src/zenhttp/transports/dlltransport.h index 2dccdd0f9..2dccdd0f9 100644 --- a/src/zenhttp/dlltransport.h +++ b/src/zenhttp/transports/dlltransport.h diff --git a/src/zenhttp/winsocktransport.cpp b/src/zenhttp/transports/winsocktransport.cpp index ad3302550..ad3302550 100644 --- a/src/zenhttp/winsocktransport.cpp +++ b/src/zenhttp/transports/winsocktransport.cpp diff --git a/src/zenhttp/winsocktransport.h b/src/zenhttp/transports/winsocktransport.h index 2b2a55aef..2b2a55aef 100644 --- a/src/zenhttp/winsocktransport.h +++ b/src/zenhttp/transports/winsocktransport.h diff --git a/src/zenhttp/xmake.lua b/src/zenhttp/xmake.lua index 411436b16..9c3869911 100644 --- a/src/zenhttp/xmake.lua +++ b/src/zenhttp/xmake.lua @@ -7,7 +7,7 @@ target('zenhttp') add_files("**.cpp") add_files("httpsys.cpp", {unity_ignored=true}) add_includedirs("include", {public=true}) - add_deps("zencore") + add_deps("zencore", "plugins") add_packages( "vcpkg::cpr", "vcpkg::curl", -- required by cpr |