aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenhttp')
-rw-r--r--src/zenhttp/httpasio.cpp1
-rw-r--r--src/zenhttp/httpasio.h3
-rw-r--r--src/zenhttp/httpserver.cpp18
-rw-r--r--src/zenhttp/httpsys.cpp88
-rw-r--r--src/zenhttp/httpsys.h87
-rw-r--r--src/zenhttp/include/zenhttp/transportplugin.h97
-rw-r--r--src/zenhttp/transports/asiotransport.cpp439
-rw-r--r--src/zenhttp/transports/asiotransport.h15
-rw-r--r--src/zenhttp/transports/dlltransport.cpp (renamed from src/zenhttp/dlltransport.cpp)0
-rw-r--r--src/zenhttp/transports/dlltransport.h (renamed from src/zenhttp/dlltransport.h)0
-rw-r--r--src/zenhttp/transports/winsocktransport.cpp (renamed from src/zenhttp/winsocktransport.cpp)0
-rw-r--r--src/zenhttp/transports/winsocktransport.h (renamed from src/zenhttp/winsocktransport.h)0
-rw-r--r--src/zenhttp/xmake.lua2
13 files changed, 562 insertions, 188 deletions
diff --git a/src/zenhttp/httpasio.cpp b/src/zenhttp/httpasio.cpp
index f23e0edb1..0c6b189f9 100644
--- a/src/zenhttp/httpasio.cpp
+++ b/src/zenhttp/httpasio.cpp
@@ -4,6 +4,7 @@
#include <zencore/except.h>
#include <zencore/logging.h>
+#include <zencore/thread.h>
#include <zencore/trace.h>
#include <zenhttp/httpserver.h>
diff --git a/src/zenhttp/httpasio.h b/src/zenhttp/httpasio.h
index 57068f7c5..2366f3437 100644
--- a/src/zenhttp/httpasio.h
+++ b/src/zenhttp/httpasio.h
@@ -2,11 +2,8 @@
#pragma once
-#include <zencore/thread.h>
#include <zenhttp/httpserver.h>
-#include <memory>
-
namespace zen {
Ref<HttpServer> CreateHttpAsioServer(unsigned int ThreadCount);
diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp
index f22438a58..a2ea4cff8 100644
--- a/src/zenhttp/httpserver.cpp
+++ b/src/zenhttp/httpserver.cpp
@@ -9,8 +9,9 @@
#include "zenhttp/httpplugin.h"
#if ZEN_WITH_PLUGINS
-# include "dlltransport.h"
-# include "winsocktransport.h"
+# include "transports/asiotransport.h"
+# include "transports/dlltransport.h"
+# include "transports/winsocktransport.h"
#endif
#include <zencore/compactbinary.h>
@@ -771,6 +772,11 @@ CreateHttpServer(const HttpServerConfig& Config)
# endif
# if 0
+ Ref<TransportPlugin> AsioPlugin{CreateAsioTransportPlugin(1337, Config.ThreadCount)};
+ Server->AddPlugin(AsioPlugin);
+# endif
+
+# if 0
Ref<DllTransportPlugin> DllPlugin{new DllTransportPlugin(1337, Config.ThreadCount)};
DllPlugin->LoadDll("winsock");
Server->AddPlugin(DllPlugin);
@@ -783,10 +789,10 @@ CreateHttpServer(const HttpServerConfig& Config)
#if ZEN_WITH_HTTPSYS
case HttpServerClass::kHttpSys:
ZEN_INFO("using http.sys server implementation");
- return Ref<HttpServer>(new HttpSysServer({.ThreadCount = Config.ThreadCount,
- .AsyncWorkThreadCount = Config.HttpSys.AsyncWorkThreadCount,
- .IsAsyncResponseEnabled = Config.HttpSys.IsAsyncResponseEnabled,
- .IsRequestLoggingEnabled = Config.HttpSys.IsRequestLoggingEnabled}));
+ return Ref<HttpServer>(CreateHttpSysServer({.ThreadCount = Config.ThreadCount,
+ .AsyncWorkThreadCount = Config.HttpSys.AsyncWorkThreadCount,
+ .IsAsyncResponseEnabled = Config.HttpSys.IsAsyncResponseEnabled,
+ .IsRequestLoggingEnabled = Config.HttpSys.IsRequestLoggingEnabled}));
#endif
case HttpServerClass::kHttpNull:
diff --git a/src/zenhttp/httpsys.cpp b/src/zenhttp/httpsys.cpp
index c7ed0bb2f..8401dcf83 100644
--- a/src/zenhttp/httpsys.cpp
+++ b/src/zenhttp/httpsys.cpp
@@ -16,6 +16,86 @@
#include <zenhttp/httpshared.h>
#if ZEN_WITH_HTTPSYS
+# define _WINSOCKAPI_
+# include <zencore/windows.h>
+# include <zencore/workthreadpool.h>
+# include "iothreadpool.h"
+
+# include <http.h>
+
+namespace spdlog {
+class logger;
+}
+
+namespace zen {
+
+/**
+ * @brief Windows implementation of HTTP server based on http.sys
+ *
+ * This requires elevation to function
+ */
+class HttpSysServer : public HttpServer
+{
+ friend class HttpSysTransaction;
+
+public:
+ explicit HttpSysServer(const HttpSysConfig& Config);
+ ~HttpSysServer();
+
+ // HttpServer interface implementation
+
+ virtual int Initialize(int BasePort) override;
+ virtual void Run(bool TestMode) override;
+ virtual void RequestExit() override;
+ virtual void RegisterService(HttpService& Service) override;
+ virtual void Close() override;
+
+ WorkerThreadPool& WorkPool();
+
+ inline bool IsOk() const { return m_IsOk; }
+ inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; }
+
+private:
+ int InitializeServer(int BasePort);
+ void Cleanup();
+
+ void StartServer();
+ void OnHandlingNewRequest();
+ void IssueNewRequestMaybe();
+
+ void RegisterService(const char* Endpoint, HttpService& Service);
+ void UnregisterService(const char* Endpoint, HttpService& Service);
+
+private:
+ spdlog::logger& m_Log;
+ spdlog::logger& m_RequestLog;
+ spdlog::logger& Log() { return m_Log; }
+
+ bool m_IsOk = false;
+ bool m_IsHttpInitialized = false;
+ bool m_IsRequestLoggingEnabled = false;
+ bool m_IsAsyncResponseEnabled = true;
+
+ WinIoThreadPool m_ThreadPool;
+ RwLock m_AsyncWorkPoolInitLock;
+ WorkerThreadPool* m_AsyncWorkPool = nullptr;
+
+ std::vector<std::wstring> m_BaseUris; // eg: http://*:nnnn/
+ HTTP_SERVER_SESSION_ID m_HttpSessionId = 0;
+ HTTP_URL_GROUP_ID m_HttpUrlGroupId = 0;
+ HANDLE m_RequestQueueHandle = 0;
+ std::atomic_int32_t m_PendingRequests{0};
+ std::atomic_int32_t m_IsShuttingDown{0};
+ int32_t m_MinPendingRequests = 16;
+ int32_t m_MaxPendingRequests = 128;
+ Event m_ShutdownEvent;
+ const HttpSysConfig m_InitialConfig;
+};
+
+} // namespace zen
+#endif
+
+#if ZEN_WITH_HTTPSYS
# include <conio.h>
# include <mstcpip.h>
@@ -809,7 +889,7 @@ HttpAsyncWorkRequest::AsyncWorkItem::Execute()
\/ \/ \/
*/
-HttpSysServer::HttpSysServer(const Config& Config)
+HttpSysServer::HttpSysServer(const HttpSysConfig& Config)
: m_Log(logging::Get("http"))
, m_RequestLog(logging::Get("http_requests"))
, m_IsRequestLoggingEnabled(Config.IsRequestLoggingEnabled)
@@ -1868,5 +1948,11 @@ HttpSysServer::RegisterService(HttpService& Service)
RegisterService(Service.BaseUri(), Service);
}
+Ref<HttpServer>
+CreateHttpSysServer(HttpSysConfig Config)
+{
+ return Ref<HttpServer>(new HttpSysServer(Config));
+}
+
} // namespace zen
#endif
diff --git a/src/zenhttp/httpsys.h b/src/zenhttp/httpsys.h
index 65239bae7..1553d56ef 100644
--- a/src/zenhttp/httpsys.h
+++ b/src/zenhttp/httpsys.h
@@ -12,89 +12,16 @@
# endif
#endif
-#if ZEN_WITH_HTTPSYS
-# define _WINSOCKAPI_
-# include <zencore/windows.h>
-# include <zencore/workthreadpool.h>
-# include "iothreadpool.h"
-
-# include <http.h>
-
-namespace spdlog {
-class logger;
-}
-
namespace zen {
-/**
- * @brief Windows implementation of HTTP server based on http.sys
- *
- * This requires elevation to function
- */
-class HttpSysServer : public HttpServer
+struct HttpSysConfig
{
- friend class HttpSysTransaction;
-
-public:
- struct Config
- {
- unsigned int ThreadCount = 0;
- unsigned int AsyncWorkThreadCount = 0;
- bool IsAsyncResponseEnabled = true;
- bool IsRequestLoggingEnabled = false;
- };
- explicit HttpSysServer(const Config& Config);
- ~HttpSysServer();
-
- // HttpServer interface implementation
-
- virtual int Initialize(int BasePort) override;
- virtual void Run(bool TestMode) override;
- virtual void RequestExit() override;
- virtual void RegisterService(HttpService& Service) override;
- virtual void Close() override;
-
- WorkerThreadPool& WorkPool();
-
- inline bool IsOk() const { return m_IsOk; }
- inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; }
-
-private:
- int InitializeServer(int BasePort);
- void Cleanup();
-
- void StartServer();
- void OnHandlingNewRequest();
- void IssueNewRequestMaybe();
-
- void RegisterService(const char* Endpoint, HttpService& Service);
- void UnregisterService(const char* Endpoint, HttpService& Service);
-
-private:
- spdlog::logger& m_Log;
- spdlog::logger& m_RequestLog;
- spdlog::logger& Log() { return m_Log; }
-
- bool m_IsOk = false;
- bool m_IsHttpInitialized = false;
- bool m_IsRequestLoggingEnabled = false;
- bool m_IsAsyncResponseEnabled = true;
-
- WinIoThreadPool m_ThreadPool;
- RwLock m_AsyncWorkPoolInitLock;
- WorkerThreadPool* m_AsyncWorkPool = nullptr;
-
- std::vector<std::wstring> m_BaseUris; // eg: http://*:nnnn/
- HTTP_SERVER_SESSION_ID m_HttpSessionId = 0;
- HTTP_URL_GROUP_ID m_HttpUrlGroupId = 0;
- HANDLE m_RequestQueueHandle = 0;
- std::atomic_int32_t m_PendingRequests{0};
- std::atomic_int32_t m_IsShuttingDown{0};
- int32_t m_MinPendingRequests = 16;
- int32_t m_MaxPendingRequests = 128;
- Event m_ShutdownEvent;
- const Config m_InitialConfig;
+ unsigned int ThreadCount = 0;
+ unsigned int AsyncWorkThreadCount = 0;
+ bool IsAsyncResponseEnabled = true;
+ bool IsRequestLoggingEnabled = false;
};
+Ref<HttpServer> CreateHttpSysServer(HttpSysConfig Config);
+
} // namespace zen
-#endif
diff --git a/src/zenhttp/include/zenhttp/transportplugin.h b/src/zenhttp/include/zenhttp/transportplugin.h
deleted file mode 100644
index fe17680de..000000000
--- a/src/zenhttp/include/zenhttp/transportplugin.h
+++ /dev/null
@@ -1,97 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <stdint.h>
-
-// Important note: this header is meant to compile standalone
-// and should therefore not depend on anything from the Zen codebase
-
-class TransportConnection;
-class TransportPlugin;
-class TransportServerConnection;
-class TransportServer;
-
-/*************************************************************************
-
- The following interfaces are implemented on the server side, and instances
- are provided to the plugins.
-
-*************************************************************************/
-
-/** Plugin-server interface for connection
- *
- * This is how the transport feeds data to the connection handler
- * which will parse the incoming messages and dispatch to
- * appropriate request handlers and ultimately call into functions
- * which write data back to the client.
- */
-class TransportServerConnection
-{
-public:
- virtual uint32_t AddRef() const = 0;
- virtual uint32_t Release() const = 0;
- virtual void OnBytesRead(const void* Buffer, size_t DataSize) = 0;
-};
-
-/** Plugin-server interface
- *
- * There will be one instance of this per plugin, and the plugin
- * should use this to manage lifetimes of connections and any
- * other resources.
- */
-class TransportServer
-{
-public:
- virtual TransportServerConnection* CreateConnectionHandler(TransportConnection* Connection) = 0;
-};
-
-/*************************************************************************
-
- The following interfaces are to be implemented by transport plugins.
-
-*************************************************************************/
-
-/** Interface which needs to be implemented by a transport plugin
- *
- * This is responsible for setting up and running the communication
- * for a given transport.
- */
-class TransportPlugin
-{
-public:
- virtual uint32_t AddRef() const = 0;
- virtual uint32_t Release() const = 0;
- virtual void Initialize(TransportServer* ServerInterface) = 0;
- virtual void Shutdown() = 0;
-
- /** Check whether this transport is usable.
- */
- virtual bool IsAvailable() = 0;
-};
-
-/** A transport plugin provider needs to implement this interface
- *
- * There will be one instance of this per established connection and
- * this interface is used to write response data back to the client.
- */
-class TransportConnection
-{
-public:
- virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) = 0;
- virtual void Shutdown(bool Receive, bool Transmit) = 0;
- virtual void CloseConnection() = 0;
-};
-
-#if defined(_MSC_VER)
-# define DLL_TRANSPORT_API __declspec(dllexport)
-#else
-# define DLL_TRANSPORT_API
-#endif
-
-extern "C"
-{
- DLL_TRANSPORT_API TransportPlugin* CreateTransportPlugin();
-}
-
-typedef TransportPlugin* (*PfnCreateTransportPlugin)();
diff --git a/src/zenhttp/transports/asiotransport.cpp b/src/zenhttp/transports/asiotransport.cpp
new file mode 100644
index 000000000..b8fef8f5f
--- /dev/null
+++ b/src/zenhttp/transports/asiotransport.cpp
@@ -0,0 +1,439 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "winsocktransport.h"
+
+#if ZEN_WITH_PLUGINS
+
+# include <zencore/logging.h>
+# include <zencore/scopeguard.h>
+# include <zencore/workthreadpool.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+# if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <mstcpip.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+# endif
+
+# include <fmt/format.h>
+
+# include <memory>
+# include <thread>
+
+namespace zen {
+
+struct AsioTransportAcceptor;
+
+class AsioTransportPlugin : public TransportPlugin, RefCounted
+{
+public:
+ AsioTransportPlugin(uint16_t BasePort, unsigned int ThreadCount);
+ ~AsioTransportPlugin();
+
+ virtual uint32_t AddRef() const override;
+ virtual uint32_t Release() const override;
+ virtual void Initialize(TransportServer* ServerInterface) override;
+ virtual void Shutdown() override;
+ virtual bool IsAvailable() override;
+
+private:
+ bool m_IsOk = true;
+ uint16_t m_BasePort = 0;
+ int m_ThreadCount = 0;
+
+ asio::io_service m_IoService;
+ asio::io_service::work m_Work{m_IoService};
+ std::unique_ptr<AsioTransportAcceptor> m_Acceptor;
+ std::vector<std::thread> m_ThreadPool;
+};
+
+struct AsioTransportConnection : public TransportConnection, std::enable_shared_from_this<AsioTransportConnection>
+{
+ AsioTransportConnection(std::unique_ptr<asio::ip::tcp::socket>&& Socket);
+ ~AsioTransportConnection();
+
+ void Initialize(TransportServerConnection* ConnectionHandler);
+
+ std::shared_ptr<AsioTransportConnection> AsSharedPtr() { return shared_from_this(); }
+
+ // TransportConnectionInterface
+
+ virtual int64_t WriteBytes(const void* Buffer, size_t DataSize) override;
+ virtual void Shutdown(bool Receive, bool Transmit) override;
+ virtual void CloseConnection() override;
+
+private:
+ void EnqueueRead();
+ void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount);
+ void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount);
+
+ Ref<TransportServerConnection> m_ConnectionHandler;
+ asio::streambuf m_RequestBuffer;
+ std::unique_ptr<asio::ip::tcp::socket> m_Socket;
+ uint32_t m_ConnectionId = 0;
+ std::atomic_flag m_IsTerminated{};
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+struct AsioTransportAcceptor
+{
+ AsioTransportAcceptor(TransportServer* ServerInterface, asio::io_service& IoService, uint16_t BasePort);
+ ~AsioTransportAcceptor();
+
+ void Start();
+ void RequestStop();
+
+ inline int GetAcceptPort() { return m_Acceptor.local_endpoint().port(); }
+
+private:
+ TransportServer* m_ServerInterface = nullptr;
+ asio::io_service& m_IoService;
+ asio::ip::tcp::acceptor m_Acceptor;
+ std::atomic<bool> m_IsStopped{false};
+
+ void EnqueueAccept();
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+AsioTransportAcceptor::AsioTransportAcceptor(TransportServer* ServerInterface, asio::io_service& IoService, uint16_t BasePort)
+: m_ServerInterface(ServerInterface)
+, m_IoService(IoService)
+, m_Acceptor(m_IoService, asio::ip::tcp::v6())
+{
+ m_Acceptor.set_option(asio::ip::v6_only(false));
+
+# if ZEN_PLATFORM_WINDOWS
+ // Special option for Windows settings as !asio::socket_base::reuse_address is not the same as exclusive access on Windows platforms
+ typedef asio::detail::socket_option::boolean<ASIO_OS_DEF(SOL_SOCKET), SO_EXCLUSIVEADDRUSE> exclusive_address;
+ m_Acceptor.set_option(exclusive_address(true));
+# else
+ m_Acceptor.set_option(asio::socket_base::reuse_address(false));
+# endif
+
+ m_Acceptor.set_option(asio::ip::tcp::no_delay(true));
+ m_Acceptor.set_option(asio::socket_base::receive_buffer_size(128 * 1024));
+ m_Acceptor.set_option(asio::socket_base::send_buffer_size(256 * 1024));
+
+ uint16_t EffectivePort = BasePort;
+
+ asio::error_code BindErrorCode;
+ m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v6::any(), EffectivePort), BindErrorCode);
+ // Sharing violation implies the port is being used by another process
+ for (uint16_t PortOffset = 1; (BindErrorCode == asio::error::address_in_use) && (PortOffset < 10); ++PortOffset)
+ {
+ EffectivePort = BasePort + (PortOffset * 100);
+ m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v6::any(), EffectivePort), BindErrorCode);
+ }
+ if (BindErrorCode == asio::error::access_denied)
+ {
+ EffectivePort = 0;
+ m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v6::any(), EffectivePort), BindErrorCode);
+ }
+ if (BindErrorCode)
+ {
+ ZEN_ERROR("Unable open asio service, error '{}'", BindErrorCode.message());
+ }
+
+# if ZEN_PLATFORM_WINDOWS
+ // On Windows, loopback connections can take advantage of a faster code path optionally with this flag.
+ // This must be used by both the client and server side, and is only effective in the absence of
+ // Windows Filtering Platform (WFP) callouts which can be installed by security software.
+ // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path
+ SOCKET NativeSocket = m_Acceptor.native_handle();
+ int LoopbackOptionValue = 1;
+ DWORD OptionNumberOfBytesReturned = 0;
+ WSAIoctl(NativeSocket,
+ SIO_LOOPBACK_FAST_PATH,
+ &LoopbackOptionValue,
+ sizeof(LoopbackOptionValue),
+ NULL,
+ 0,
+ &OptionNumberOfBytesReturned,
+ 0,
+ 0);
+# endif
+
+ ZEN_INFO("started asio transport at port: {}", EffectivePort);
+}
+
+AsioTransportAcceptor::~AsioTransportAcceptor()
+{
+}
+
+void
+AsioTransportAcceptor::Start()
+{
+ m_Acceptor.listen();
+
+ EnqueueAccept();
+}
+
+void
+AsioTransportAcceptor::RequestStop()
+{
+ m_IsStopped = true;
+}
+
+void
+AsioTransportAcceptor::EnqueueAccept()
+{
+ auto SocketPtr = std::make_unique<asio::ip::tcp::socket>(m_IoService);
+ asio::ip::tcp::socket& SocketRef = *SocketPtr.get();
+
+ m_Acceptor.async_accept(SocketRef, [this, Socket = std::move(SocketPtr)](const asio::error_code& Ec) mutable {
+ if (Ec)
+ {
+ ZEN_WARN("asio async_accept error ({}:{}): {}",
+ m_Acceptor.local_endpoint().address().to_string(),
+ m_Acceptor.local_endpoint().port(),
+ Ec.message());
+ }
+ else
+ {
+ // New connection established, pass socket ownership into connection object
+ // and initiate request handling loop. The connection lifetime is
+ // managed by the async read/write loop by passing the shared
+ // reference to the callbacks.
+
+ Socket->set_option(asio::ip::tcp::no_delay(true));
+ Socket->set_option(asio::socket_base::receive_buffer_size(128 * 1024));
+ Socket->set_option(asio::socket_base::send_buffer_size(256 * 1024));
+
+ auto Conn = std::make_shared<AsioTransportConnection>(std::move(Socket));
+ Conn->Initialize(m_ServerInterface->CreateConnectionHandler(Conn.get()));
+ }
+
+ if (!m_IsStopped.load())
+ {
+ EnqueueAccept();
+ }
+ else
+ {
+ std::error_code CloseEc;
+ m_Acceptor.close(CloseEc);
+
+ if (CloseEc)
+ {
+ ZEN_WARN("acceptor close ERROR, reason '{}'", CloseEc.message());
+ }
+ }
+ });
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+AsioTransportConnection::AsioTransportConnection(std::unique_ptr<asio::ip::tcp::socket>&& Socket) : m_Socket(std::move(Socket))
+{
+}
+
+AsioTransportConnection::~AsioTransportConnection()
+{
+}
+
+void
+AsioTransportConnection::Initialize(TransportServerConnection* ConnectionHandler)
+{
+ m_ConnectionHandler = ConnectionHandler;
+
+ EnqueueRead();
+}
+
+int64_t
+AsioTransportConnection::WriteBytes(const void* Buffer, size_t DataSize)
+{
+ size_t WrittenBytes = asio::write(*m_Socket.get(), asio::const_buffer(Buffer, DataSize), asio::transfer_exactly(DataSize));
+
+ return WrittenBytes;
+}
+
+void
+AsioTransportConnection::Shutdown(bool Receive, bool Transmit)
+{
+ std::error_code Ec;
+ if (Receive)
+ {
+ if (Transmit)
+ {
+ m_Socket->shutdown(asio::socket_base::shutdown_both, Ec);
+ }
+ else
+ {
+ m_Socket->shutdown(asio::socket_base::shutdown_receive, Ec);
+ }
+ }
+ else if (Transmit)
+ {
+ m_Socket->shutdown(asio::socket_base::shutdown_send, Ec);
+ }
+}
+
+void
+AsioTransportConnection::CloseConnection()
+{
+ if (m_IsTerminated.test())
+ {
+ return;
+ }
+
+ if (m_IsTerminated.test_and_set() == false)
+ {
+ Shutdown(true, true);
+
+ std::error_code Ec;
+ m_Socket->close(Ec);
+ }
+}
+
+void
+AsioTransportConnection::EnqueueRead()
+{
+ if (m_IsTerminated.test() == false)
+ {
+ m_RequestBuffer.prepare(64 * 1024);
+
+ asio::async_read(
+ *m_Socket.get(),
+ m_RequestBuffer,
+ asio::transfer_at_least(1),
+ [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnDataReceived(Ec, ByteCount); });
+ }
+}
+
+void
+AsioTransportConnection::OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount)
+{
+ ZEN_UNUSED(ByteCount);
+
+ if (Ec)
+ {
+ if (!m_IsTerminated.test())
+ {
+ ZEN_WARN("on data received ERROR, connection: {}, reason '{}'", m_ConnectionId, Ec.message());
+ }
+
+ const bool Receive = true;
+ const bool Transmit = true;
+ return Shutdown(Receive, Transmit);
+ }
+
+ while (m_RequestBuffer.size())
+ {
+ const asio::const_buffer& InputBuffer = m_RequestBuffer.data();
+ m_ConnectionHandler->OnBytesRead(InputBuffer.data(), InputBuffer.size());
+ m_RequestBuffer.consume(InputBuffer.size());
+ }
+
+ EnqueueRead();
+}
+
+void
+AsioTransportConnection::OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount)
+{
+ ZEN_UNUSED(ByteCount);
+
+ if (Ec)
+ {
+ ZEN_WARN("on data sent ERROR, connection: {}, reason '{}'", m_ConnectionId, Ec.message());
+
+ const bool Receive = true;
+ const bool Transmit = true;
+ return Shutdown(Receive, Transmit);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+AsioTransportPlugin::AsioTransportPlugin(uint16_t BasePort, unsigned int ThreadCount)
+: m_BasePort(BasePort)
+, m_ThreadCount(ThreadCount != 0 ? ThreadCount : Max(std::thread::hardware_concurrency(), 8u))
+{
+}
+
+AsioTransportPlugin::~AsioTransportPlugin()
+{
+}
+
+uint32_t
+AsioTransportPlugin::AddRef() const
+{
+ return RefCounted::AddRef();
+}
+
+uint32_t
+AsioTransportPlugin::Release() const
+{
+ return RefCounted::Release();
+}
+
+void
+AsioTransportPlugin::Initialize(TransportServer* ServerInterface)
+{
+ ZEN_ASSERT(m_ThreadCount > 0);
+ ZEN_ASSERT(ServerInterface);
+
+ ZEN_INFO("starting asio http with {} service threads", m_ThreadCount);
+
+ m_Acceptor.reset(new AsioTransportAcceptor(ServerInterface, m_IoService, m_BasePort));
+ m_Acceptor->Start();
+
+ // This should consist of a set of minimum threads and grow on demand to
+ // meet concurrency needs? Right now we end up allocating a large number
+ // of threads even if we never end up using all of them, which seems
+ // wasteful. It's also not clear how the demand for concurrency should
+ // be balanced with the engine side - ideally we'd have some kind of
+ // global scheduling to prevent one side from starving the other side
+ // and thus preventing progress. Or at the very least, thread priorities
+ // should be considered
+
+ for (int i = 0; i < m_ThreadCount; ++i)
+ {
+ m_ThreadPool.emplace_back([this, ThreadNumber = i + 1] {
+ SetCurrentThreadName(fmt::format("asio_thr_{}", ThreadNumber));
+
+ try
+ {
+ m_IoService.run();
+ }
+ catch (std::exception& e)
+ {
+ ZEN_ERROR("exception caught in asio event loop: {}", e.what());
+ }
+ });
+ }
+
+ ZEN_INFO("asio http transport started (port {})", m_Acceptor->GetAcceptPort());
+}
+
+void
+AsioTransportPlugin::Shutdown()
+{
+ m_Acceptor->RequestStop();
+ m_IoService.stop();
+
+ for (auto& Thread : m_ThreadPool)
+ {
+ Thread.join();
+ }
+}
+
+bool
+AsioTransportPlugin::IsAvailable()
+{
+ return true;
+}
+
+TransportPlugin*
+CreateAsioTransportPlugin(uint16_t BasePort, unsigned int ThreadCount)
+{
+ return new AsioTransportPlugin(BasePort, ThreadCount);
+}
+
+} // namespace zen
+
+#endif
diff --git a/src/zenhttp/transports/asiotransport.h b/src/zenhttp/transports/asiotransport.h
new file mode 100644
index 000000000..b10174b85
--- /dev/null
+++ b/src/zenhttp/transports/asiotransport.h
@@ -0,0 +1,15 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenhttp/httpplugin.h>
+
+#if ZEN_WITH_PLUGINS
+
+namespace zen {
+
+TransportPlugin* CreateAsioTransportPlugin(uint16_t BasePort, unsigned int ThreadCount);
+
+} // namespace zen
+
+#endif
diff --git a/src/zenhttp/dlltransport.cpp b/src/zenhttp/transports/dlltransport.cpp
index 04fb6caaa..04fb6caaa 100644
--- a/src/zenhttp/dlltransport.cpp
+++ b/src/zenhttp/transports/dlltransport.cpp
diff --git a/src/zenhttp/dlltransport.h b/src/zenhttp/transports/dlltransport.h
index 2dccdd0f9..2dccdd0f9 100644
--- a/src/zenhttp/dlltransport.h
+++ b/src/zenhttp/transports/dlltransport.h
diff --git a/src/zenhttp/winsocktransport.cpp b/src/zenhttp/transports/winsocktransport.cpp
index ad3302550..ad3302550 100644
--- a/src/zenhttp/winsocktransport.cpp
+++ b/src/zenhttp/transports/winsocktransport.cpp
diff --git a/src/zenhttp/winsocktransport.h b/src/zenhttp/transports/winsocktransport.h
index 2b2a55aef..2b2a55aef 100644
--- a/src/zenhttp/winsocktransport.h
+++ b/src/zenhttp/transports/winsocktransport.h
diff --git a/src/zenhttp/xmake.lua b/src/zenhttp/xmake.lua
index 411436b16..9c3869911 100644
--- a/src/zenhttp/xmake.lua
+++ b/src/zenhttp/xmake.lua
@@ -7,7 +7,7 @@ target('zenhttp')
add_files("**.cpp")
add_files("httpsys.cpp", {unity_ignored=true})
add_includedirs("include", {public=true})
- add_deps("zencore")
+ add_deps("zencore", "plugins")
add_packages(
"vcpkg::cpr",
"vcpkg::curl", -- required by cpr