diff options
| author | Stefan Boberg <[email protected]> | 2026-03-10 17:27:26 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-10 17:27:26 +0100 |
| commit | d0a07e555577dcd4a8f55f1b45d9e8e4e6366ab7 (patch) | |
| tree | 2dfe1e3e0b620043d358e0b7f8bdf8320d985491 /src/zenhttp/servers/httpasio.cpp | |
| parent | changelog entry which was inadvertently omitted from PR merge (diff) | |
| download | zen-d0a07e555577dcd4a8f55f1b45d9e8e4e6366ab7.tar.xz zen-d0a07e555577dcd4a8f55f1b45d9e8e4e6366ab7.zip | |
HttpClient using libcurl, Unix Sockets for HTTP. HTTPS support (#770)
The main goal of this change is to eliminate the cpr back-end altogether and replace it with the curl implementation. I would expect to drop cpr as soon as we feel happy with the libcurl back-end. That would leave us with a direct dependency on libcurl only, and cpr can be eliminated as a dependency.
### HttpClient Backend Overhaul
- Implemented a new **libcurl-based HttpClient** backend (`httpclientcurl.cpp`, ~2000 lines)
as an alternative to the cpr-based one
- Made HttpClient backend **configurable at runtime** via constructor arguments
and `-httpclient=...` CLI option (for zen, zenserver, and tests)
- Extended HttpClient test suite to cover multipart/content-range scenarios
### Unix Domain Socket Support
- Added Unix domain socket support to **httpasio** (server side)
- Added Unix domain socket support to **HttpClient**
- Added Unix domain socket support to **HttpWsClient** (WebSocket client)
- Templatized `HttpServerConnectionT<SocketType>` and `WsAsioConnectionT<SocketType>`
to handle TCP, Unix, and SSL sockets uniformly via `if constexpr` dispatch
### HTTPS Support
- Added **preliminary HTTPS support to httpasio** (for Mac/Linux via OpenSSL)
- Added **basic HTTPS support for http.sys** (Windows)
- Implemented HTTPS test for httpasio
- Split `InitializeServer` into smaller sub-functions for http.sys
### Other Notable Changes
- Improved **zenhttp-test stability** with dynamic port allocation
- Enhanced port retry logic in http.sys (handles ERROR_ACCESS_DENIED)
- Fatal signal/exception handlers for backtrace generation in tests
- Added `zen bench http` subcommand to exercise network + HTTP client/server communication stack
Diffstat (limited to 'src/zenhttp/servers/httpasio.cpp')
| -rw-r--r-- | src/zenhttp/servers/httpasio.cpp | 687 |
1 files changed, 496 insertions, 191 deletions
diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index f5178ebe8..ee8e71256 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -1,6 +1,7 @@ // Copyright Epic Games, Inc. All Rights Reserved. #include "httpasio.h" +#include "asio_socket_traits.h" #include "httptracer.h" #include <zencore/except.h> @@ -35,6 +36,12 @@ ZEN_THIRD_PARTY_INCLUDES_START #endif #include <asio.hpp> #include <asio/stream_file.hpp> +#if defined(ASIO_HAS_LOCAL_SOCKETS) +# include <asio/local/stream_protocol.hpp> +#endif +#if ZEN_USE_OPENSSL +# include <asio/ssl.hpp> +#endif ZEN_THIRD_PARTY_INCLUDES_END #define ASIO_VERBOSE_TRACE 0 @@ -144,7 +151,17 @@ using namespace std::literals; struct HttpAcceptor; struct HttpResponse; -struct HttpServerConnection; +template<typename SocketType> +struct HttpServerConnectionT; +using HttpServerConnection = HttpServerConnectionT<asio::ip::tcp::socket>; +#if defined(ASIO_HAS_LOCAL_SOCKETS) +struct UnixAcceptor; +using UnixServerConnection = HttpServerConnectionT<asio::local::stream_protocol::socket>; +#endif +#if ZEN_USE_OPENSSL +struct HttpsAcceptor; +using HttpsSslServerConnection = HttpServerConnectionT<SslSocket>; +#endif inline LoggerRef InitLogger() @@ -176,9 +193,9 @@ Log() #endif #if ZEN_USE_TRANSMITFILE -template<typename Handler> +template<typename Handler, typename SocketType> void -TransmitFileAsync(asio::ip::tcp::socket& Socket, HANDLE FileHandle, uint64_t ByteOffset, uint32_t ByteSize, Handler&& Cb) +TransmitFileAsync(SocketType& Socket, HANDLE FileHandle, uint64_t ByteOffset, uint32_t ByteSize, Handler&& Cb) { # if ZEN_BUILD_DEBUG const uint64_t FileSize = FileSizeFromHandle(FileHandle); @@ -511,11 +528,20 @@ public: bool IsLoopbackOnly() const; + int GetEffectiveHttpsPort() const; + asio::io_service m_IoService; asio::io_service::work m_Work{m_IoService}; std::unique_ptr<asio_http::HttpAcceptor> m_Acceptor; - std::vector<std::thread> m_ThreadPool; - std::atomic<IHttpRequestFilter*> m_HttpRequestFilter = nullptr; +#if defined(ASIO_HAS_LOCAL_SOCKETS) + std::unique_ptr<asio_http::UnixAcceptor> m_UnixAcceptor; +#endif +#if ZEN_USE_OPENSSL + std::unique_ptr<asio::ssl::context> m_SslContext; + std::unique_ptr<asio_http::HttpsAcceptor> m_HttpsAcceptor; +#endif + std::vector<std::thread> m_ThreadPool; + std::atomic<IHttpRequestFilter*> m_HttpRequestFilter = nullptr; LoggerRef m_RequestLog; HttpServerTracer m_RequestTracer; @@ -573,6 +599,7 @@ public: uint32_t m_RequestNumber = 0; // Note: different to request ID which is derived from headers IoBuffer m_PayloadBuffer; bool m_IsLocalMachineRequest; + bool m_AllowZeroCopyFileSend = true; std::string m_RemoteAddress; std::unique_ptr<HttpResponse> m_Response; }; @@ -595,6 +622,8 @@ public: ~HttpResponse() = default; + void SetAllowZeroCopyFileSend(bool Allow) { m_AllowZeroCopyFileSend = Allow; } + /** * Initialize the response for sending a payload made up of multiple blobs * @@ -636,7 +665,7 @@ public: bool ChunkHandled = false; #if ZEN_USE_TRANSMITFILE || ZEN_USE_ASYNC_SENDFILE - if (OwnedBuffer.IsWholeFile()) + if (m_AllowZeroCopyFileSend && OwnedBuffer.IsWholeFile()) { if (IoBufferFileReference FileRef; OwnedBuffer.GetFileReference(/* out */ FileRef)) { @@ -751,7 +780,8 @@ public: return m_Headers; } - void SendResponse(asio::ip::tcp::socket& TcpSocket, std::function<void(const asio::error_code& Ec, std::size_t ByteCount)>&& Token) + template<typename SocketType> + void SendResponse(SocketType& Socket, std::function<void(const asio::error_code& Ec, std::size_t ByteCount)>&& Token) { ZEN_ASSERT(m_State == State::kInitialized); @@ -761,10 +791,11 @@ public: m_SendCb = std::move(Token); m_State = State::kSending; - SendNextChunk(TcpSocket); + SendNextChunk(Socket); } - void SendNextChunk(asio::ip::tcp::socket& TcpSocket) + template<typename SocketType> + void SendNextChunk(SocketType& Socket) { ZEN_ASSERT(m_State == State::kSending); @@ -781,12 +812,12 @@ public: auto CompletionToken = [Self = this, Token = std::move(m_SendCb), TotalBytes = m_TotalBytesSent] { Token({}, TotalBytes); }; - asio::defer(TcpSocket.get_executor(), std::move(CompletionToken)); + asio::defer(Socket.get_executor(), std::move(CompletionToken)); return; } - auto OnCompletion = [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) { + auto OnCompletion = [this, &Socket](const asio::error_code& Ec, std::size_t ByteCount) { ZEN_ASSERT(m_State == State::kSending); m_TotalBytesSent += ByteCount; @@ -797,7 +828,7 @@ public: } else { - SendNextChunk(TcpSocket); + SendNextChunk(Socket); } }; @@ -811,25 +842,21 @@ public: Io.Ref.FileRef.FileChunkSize); #if ZEN_USE_TRANSMITFILE - TransmitFileAsync(TcpSocket, + TransmitFileAsync(Socket, Io.Ref.FileRef.FileHandle, Io.Ref.FileRef.FileChunkOffset, gsl::narrow_cast<uint32_t>(Io.Ref.FileRef.FileChunkSize), OnCompletion); + return; #elif ZEN_USE_ASYNC_SENDFILE - SendFileAsync(TcpSocket, + SendFileAsync(Socket, Io.Ref.FileRef.FileHandle, Io.Ref.FileRef.FileChunkOffset, Io.Ref.FileRef.FileChunkSize, 64 * 1024, OnCompletion); -#else - // This should never occur unless we compile with one - // of the options above - ZEN_WARN("invalid file reference in response"); -#endif - return; +#endif } // Send as many consecutive non-file references as possible in one asio operation @@ -850,7 +877,7 @@ public: ++m_IoVecCursor; } - asio::async_write(TcpSocket, std::move(AsioBuffers), asio::transfer_all(), OnCompletion); + asio::async_write(Socket, std::move(AsioBuffers), asio::transfer_all(), OnCompletion); } private: @@ -863,12 +890,13 @@ private: kFailed }; - uint32_t m_RequestNumber = 0; - uint16_t m_ResponseCode = 0; - bool m_IsKeepAlive = true; - State m_State = State::kUninitialized; - HttpContentType m_ContentType = HttpContentType::kBinary; - uint64_t m_ContentLength = 0; + uint32_t m_RequestNumber = 0; + uint16_t m_ResponseCode = 0; + bool m_IsKeepAlive = true; + bool m_AllowZeroCopyFileSend = true; + State m_State = State::kUninitialized; + HttpContentType m_ContentType = HttpContentType::kBinary; + uint64_t m_ContentLength = 0; eastl::fixed_vector<IoBuffer, 8> m_DataBuffers; // This is here to keep the IoBuffer buffers/handles alive ExtendableStringBuilder<160> m_Headers; @@ -895,12 +923,13 @@ private: ////////////////////////////////////////////////////////////////////////// -struct HttpServerConnection : public HttpRequestParserCallbacks, std::enable_shared_from_this<HttpServerConnection> +template<typename SocketType> +struct HttpServerConnectionT : public HttpRequestParserCallbacks, std::enable_shared_from_this<HttpServerConnectionT<SocketType>> { - HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr<asio::ip::tcp::socket>&& Socket); - ~HttpServerConnection(); + HttpServerConnectionT(HttpAsioServerImpl& Server, std::unique_ptr<SocketType>&& Socket); + ~HttpServerConnectionT(); - std::shared_ptr<HttpServerConnection> AsSharedPtr() { return shared_from_this(); } + std::shared_ptr<HttpServerConnectionT> AsSharedPtr() { return this->shared_from_this(); } // HttpConnectionBase implementation @@ -962,12 +991,13 @@ private: RwLock m_ActiveResponsesLock; std::deque<std::unique_ptr<HttpResponse>> m_ActiveResponses; - std::unique_ptr<asio::ip::tcp::socket> m_Socket; + std::unique_ptr<SocketType> m_Socket; }; std::atomic<uint32_t> g_ConnectionIdCounter{0}; -HttpServerConnection::HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr<asio::ip::tcp::socket>&& Socket) +template<typename SocketType> +HttpServerConnectionT<SocketType>::HttpServerConnectionT(HttpAsioServerImpl& Server, std::unique_ptr<SocketType>&& Socket) : m_Server(Server) , m_ConnectionId(g_ConnectionIdCounter.fetch_add(1)) , m_Socket(std::move(Socket)) @@ -975,21 +1005,24 @@ HttpServerConnection::HttpServerConnection(HttpAsioServerImpl& Server, std::uniq ZEN_TRACE_VERBOSE("new connection #{}", m_ConnectionId); } -HttpServerConnection::~HttpServerConnection() +template<typename SocketType> +HttpServerConnectionT<SocketType>::~HttpServerConnectionT() { RwLock::ExclusiveLockScope _(m_ActiveResponsesLock); ZEN_TRACE_VERBOSE("destroying connection #{}", m_ConnectionId); } +template<typename SocketType> void -HttpServerConnection::HandleNewRequest() +HttpServerConnectionT<SocketType>::HandleNewRequest() { EnqueueRead(); } +template<typename SocketType> void -HttpServerConnection::TerminateConnection() +HttpServerConnectionT<SocketType>::TerminateConnection() { if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kTerminated) { @@ -1001,12 +1034,13 @@ HttpServerConnection::TerminateConnection() // Terminating, we don't care about any errors when closing socket std::error_code Ec; - m_Socket->shutdown(asio::socket_base::shutdown_both, Ec); - m_Socket->close(Ec); + SocketTraits<SocketType>::ShutdownBoth(*m_Socket, Ec); + SocketTraits<SocketType>::Close(*m_Socket, Ec); } +template<typename SocketType> void -HttpServerConnection::EnqueueRead() +HttpServerConnectionT<SocketType>::EnqueueRead() { ZEN_MEMSCOPE(GetHttpasioTag()); @@ -1027,8 +1061,9 @@ HttpServerConnection::EnqueueRead() [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnDataReceived(Ec, ByteCount); }); } +template<typename SocketType> void -HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount) +HttpServerConnectionT<SocketType>::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount) { ZEN_MEMSCOPE(GetHttpasioTag()); @@ -1086,11 +1121,12 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused] } } +template<typename SocketType> void -HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, - [[maybe_unused]] std::size_t ByteCount, - [[maybe_unused]] uint32_t RequestNumber, - HttpResponse* ResponseToPop) +HttpServerConnectionT<SocketType>::OnResponseDataSent(const asio::error_code& Ec, + [[maybe_unused]] std::size_t ByteCount, + [[maybe_unused]] uint32_t RequestNumber, + HttpResponse* ResponseToPop) { ZEN_MEMSCOPE(GetHttpasioTag()); @@ -1144,8 +1180,9 @@ HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, } } +template<typename SocketType> void -HttpServerConnection::CloseConnection() +HttpServerConnectionT<SocketType>::CloseConnection() { ZEN_MEMSCOPE(GetHttpasioTag()); @@ -1157,23 +1194,24 @@ HttpServerConnection::CloseConnection() m_RequestState = RequestState::kDone; std::error_code Ec; - m_Socket->shutdown(asio::socket_base::shutdown_receive, Ec); + SocketTraits<SocketType>::ShutdownReceive(*m_Socket, Ec); if (Ec) { ZEN_WARN("socket shutdown ERROR, reason '{}'", Ec.message()); } - m_Socket->close(Ec); + SocketTraits<SocketType>::Close(*m_Socket, Ec); if (Ec) { ZEN_WARN("socket close ERROR, reason '{}'", Ec.message()); } } +template<typename SocketType> void -HttpServerConnection::SendInlineResponse(uint32_t RequestNumber, - std::string_view StatusLine, - std::string_view Headers, - std::string_view Body) +HttpServerConnectionT<SocketType>::SendInlineResponse(uint32_t RequestNumber, + std::string_view StatusLine, + std::string_view Headers, + std::string_view Body) { ExtendableStringBuilder<256> ResponseBuilder; ResponseBuilder << "HTTP/1.1 " << StatusLine << "\r\n"; @@ -1194,15 +1232,16 @@ HttpServerConnection::SendInlineResponse(uint32_t RequestNumber, IoBuffer ResponseData(IoBuffer::Clone, ResponseView.data(), ResponseView.size()); auto Buffer = asio::buffer(ResponseData.GetData(), ResponseData.GetSize()); asio::async_write( - *m_Socket.get(), + *m_Socket, Buffer, [Conn = AsSharedPtr(), RequestNumber, Response = std::move(ResponseData)](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); }); } +template<typename SocketType> void -HttpServerConnection::HandleRequest() +HttpServerConnectionT<SocketType>::HandleRequest() { ZEN_MEMSCOPE(GetHttpasioTag()); @@ -1229,24 +1268,25 @@ HttpServerConnection::HandleRequest() ResponseStr->append("\r\n\r\n"); // Send the 101 response on the current socket, then hand the socket off - // to a WsAsioConnection for the WebSocket protocol. - asio::async_write(*m_Socket, - asio::buffer(ResponseStr->data(), ResponseStr->size()), - [Conn = AsSharedPtr(), WsHandler, OwnedResponse = ResponseStr](const asio::error_code& Ec, std::size_t) { - if (Ec) - { - ZEN_WARN("WebSocket 101 send failed: {}", Ec.message()); - return; - } - - Conn->m_Server.m_HttpServer->OnWebSocketConnectionOpened(); - Ref<WsAsioConnection> WsConn( - new WsAsioConnection(std::move(Conn->m_Socket), *WsHandler, Conn->m_Server.m_HttpServer)); - Ref<WebSocketConnection> WsConnRef(WsConn.Get()); - - WsHandler->OnWebSocketOpen(std::move(WsConnRef)); - WsConn->Start(); - }); + // to a WsAsioConnectionT for the WebSocket protocol. + asio::async_write( + *m_Socket, + asio::buffer(ResponseStr->data(), ResponseStr->size()), + [Conn = AsSharedPtr(), WsHandler, OwnedResponse = ResponseStr](const asio::error_code& Ec, std::size_t) { + if (Ec) + { + ZEN_WARN("WebSocket 101 send failed: {}", Ec.message()); + return; + } + + Conn->m_Server.m_HttpServer->OnWebSocketConnectionOpened(); + using WsConnType = WsAsioConnectionT<SocketType>; + Ref<WsConnType> WsConn(new WsConnType(std::move(Conn->m_Socket), *WsHandler, Conn->m_Server.m_HttpServer)); + Ref<WebSocketConnection> WsConnRef(WsConn.Get()); + + WsHandler->OnWebSocketOpen(std::move(WsConnRef)); + WsConn->Start(); + }); m_RequestState = RequestState::kDone; return; @@ -1260,7 +1300,7 @@ HttpServerConnection::HandleRequest() m_RequestState = RequestState::kWritingFinal; std::error_code Ec; - m_Socket->shutdown(asio::socket_base::shutdown_receive, Ec); + SocketTraits<SocketType>::ShutdownReceive(*m_Socket, Ec); if (Ec) { @@ -1280,15 +1320,36 @@ HttpServerConnection::HandleRequest() m_Server.m_HttpServer->MarkRequest(); - auto RemoteEndpoint = m_Socket->remote_endpoint(); - bool IsLocalConnection = m_Socket->local_endpoint().address() == RemoteEndpoint.address(); + bool IsLocalConnection = true; + std::string RemoteAddress; + + if constexpr (std::is_same_v<SocketType, asio::ip::tcp::socket>) + { + auto RemoteEndpoint = m_Socket->remote_endpoint(); + IsLocalConnection = m_Socket->local_endpoint().address() == RemoteEndpoint.address(); + RemoteAddress = RemoteEndpoint.address().to_string(); + } +#if ZEN_USE_OPENSSL + else if constexpr (std::is_same_v<SocketType, SslSocket>) + { + auto RemoteEndpoint = m_Socket->lowest_layer().remote_endpoint(); + IsLocalConnection = m_Socket->lowest_layer().local_endpoint().address() == RemoteEndpoint.address(); + RemoteAddress = RemoteEndpoint.address().to_string(); + } +#endif + else + { + RemoteAddress = "unix"; + } HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body(), RequestNumber, IsLocalConnection, - RemoteEndpoint.address().to_string()); + std::move(RemoteAddress)); + + Request.m_AllowZeroCopyFileSend = !SocketTraits<SocketType>::IsSslSocket; ZEN_TRACE_VERBOSE("handle request, connection: {}, request: {}'", m_ConnectionId, RequestNumber); @@ -1439,14 +1500,23 @@ HttpServerConnection::HandleRequest() } ////////////////////////////////////////////////////////////////////////// +// Base class for TCP acceptors that handles socket setup, port binding +// with probing/retry, and dual-stack (IPv6+IPv4 loopback) support. +// Subclasses only need to implement OnAccept() to handle new connections. -struct HttpAcceptor +struct TcpAcceptorBase { - HttpAcceptor(HttpAsioServerImpl& Server, asio::io_service& IoService, uint16_t BasePort, bool ForceLoopback, bool AllowPortProbing) + TcpAcceptorBase(HttpAsioServerImpl& Server, + asio::io_service& IoService, + uint16_t BasePort, + bool ForceLoopback, + bool AllowPortProbing, + std::string_view Label) : m_Server(Server) , m_IoService(IoService) , m_Acceptor(m_IoService, asio::ip::tcp::v6()) , m_AlternateProtocolAcceptor(m_IoService, asio::ip::tcp::v4()) + , m_Label(Label) { const bool IsUsingIPv6 = IsIPv6Capable(); if (!IsUsingIPv6) @@ -1455,7 +1525,6 @@ struct HttpAcceptor } #if ZEN_PLATFORM_WINDOWS - // Special option for Windows settings as !asio::socket_base::reuse_address is not the same as exclusive access on Windows platforms typedef asio::detail::socket_option::boolean<ASIO_OS_DEF(SOL_SOCKET), SO_EXCLUSIVEADDRUSE> exclusive_address; m_Acceptor.set_option(exclusive_address(true)); m_AlternateProtocolAcceptor.set_option(exclusive_address(true)); @@ -1468,83 +1537,54 @@ struct HttpAcceptor #endif // ZEN_PLATFORM_WINDOWS m_Acceptor.set_option(asio::ip::tcp::no_delay(true)); - m_Acceptor.set_option(asio::socket_base::receive_buffer_size(128 * 1024)); - m_Acceptor.set_option(asio::socket_base::send_buffer_size(256 * 1024)); - m_AlternateProtocolAcceptor.set_option(asio::ip::tcp::no_delay(true)); - m_AlternateProtocolAcceptor.set_option(asio::socket_base::receive_buffer_size(128 * 1024)); - m_AlternateProtocolAcceptor.set_option(asio::socket_base::send_buffer_size(256 * 1024)); - - std::string BoundBaseUrl; if (IsUsingIPv6) { - BoundBaseUrl = BindAcceptor<asio::ip::address_v6>(BasePort, ForceLoopback, AllowPortProbing); + BindAcceptor<asio::ip::address_v6>(BasePort, ForceLoopback, AllowPortProbing); } else { - ZEN_INFO("NOTE: ipv6 support is disabled, binding to ipv4 only"); - - BoundBaseUrl = BindAcceptor<asio::ip::address_v4>(BasePort, ForceLoopback, AllowPortProbing); + ZEN_INFO("{}: ipv6 support is disabled, binding to ipv4 only", m_Label); + BindAcceptor<asio::ip::address_v4>(BasePort, ForceLoopback, AllowPortProbing); } + } - if (!IsValid()) - { - return; - } - -#if ZEN_PLATFORM_WINDOWS - // On Windows, loopback connections can take advantage of a faster code path optionally with this flag. - // This must be used by both the client and server side, and is only effective in the absence of - // Windows Filtering Platform (WFP) callouts which can be installed by security software. - // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path - SOCKET NativeSocket = m_Acceptor.native_handle(); - int LoopbackOptionValue = 1; - DWORD OptionNumberOfBytesReturned = 0; - WSAIoctl(NativeSocket, - SIO_LOOPBACK_FAST_PATH, - &LoopbackOptionValue, - sizeof(LoopbackOptionValue), - NULL, - 0, - &OptionNumberOfBytesReturned, - 0, - 0); - - if (m_UseAlternateProtocolAcceptor) - { - NativeSocket = m_AlternateProtocolAcceptor.native_handle(); - WSAIoctl(NativeSocket, - SIO_LOOPBACK_FAST_PATH, - &LoopbackOptionValue, - sizeof(LoopbackOptionValue), - NULL, - 0, - &OptionNumberOfBytesReturned, - 0, - 0); - } -#endif - m_Acceptor.listen(); + virtual ~TcpAcceptorBase() + { + m_Acceptor.close(); if (m_UseAlternateProtocolAcceptor) { - m_AlternateProtocolAcceptor.listen(); + m_AlternateProtocolAcceptor.close(); } - - ZEN_INFO("Started asio server at '{}", BoundBaseUrl); } - ~HttpAcceptor() + void Start() { - m_Acceptor.close(); + ZEN_ASSERT(!m_IsStopped); + InitAcceptLoop(m_Acceptor); if (m_UseAlternateProtocolAcceptor) { - m_AlternateProtocolAcceptor.close(); + InitAcceptLoop(m_AlternateProtocolAcceptor); } } + void StopAccepting() { m_IsStopped = true; } + + uint16_t GetPort() const { return m_Acceptor.local_endpoint().port(); } + bool IsLoopbackOnly() const { return m_Acceptor.local_endpoint().address().is_loopback(); } + bool IsValid() const { return m_IsValid; } + +protected: + /// Called for each accepted TCP socket. Subclasses create the appropriate connection type. + virtual void OnAccept(std::unique_ptr<asio::ip::tcp::socket> Socket) = 0; + + HttpAsioServerImpl& m_Server; + asio::io_service& m_IoService; + +private: template<typename AddressType> - std::string BindAcceptor(uint16_t BasePort, bool ForceLoopback, bool AllowPortProbing) + void BindAcceptor(uint16_t BasePort, bool ForceLoopback, bool AllowPortProbing) { uint16_t EffectivePort = BasePort; @@ -1571,7 +1611,7 @@ struct HttpAcceptor if (BindErrorCode == asio::error::access_denied && !BindAddress.is_loopback()) { - ZEN_INFO("Access denied for public port {}, falling back to loopback", BasePort); + ZEN_INFO("{}: Access denied for public port {}, falling back to loopback", m_Label, BasePort); BindAddress = AddressType::loopback(); @@ -1585,7 +1625,7 @@ struct HttpAcceptor if (BindErrorCode == asio::error::address_in_use) { - ZEN_INFO("Desired port {} is in use (bind returned '{}'), retrying", EffectivePort, BindErrorCode.message()); + ZEN_INFO("{}: Desired port {} is in use (bind returned '{}'), retrying", m_Label, EffectivePort, BindErrorCode.message()); Sleep(500); m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); } @@ -1601,7 +1641,8 @@ struct HttpAcceptor if (BindErrorCode) { - ZEN_INFO("Unable to bind to preferred port range, falling back to automatic assignment (bind returned '{}')", + ZEN_INFO("{}: Unable to bind to preferred port range, falling back to automatic assignment (bind returned '{}')", + m_Label, BindErrorCode.message()); EffectivePort = 0; @@ -1617,7 +1658,7 @@ struct HttpAcceptor { for (uint32_t Retries = 0; (BindErrorCode == asio::error::address_in_use) && (Retries < 3); Retries++) { - ZEN_INFO("Desired port {} is in use (bind returned '{}'), retrying", EffectivePort, BindErrorCode.message()); + ZEN_INFO("{}: Desired port {} is in use (bind returned '{}'), retrying", m_Label, EffectivePort, BindErrorCode.message()); Sleep(500); m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); } @@ -1625,14 +1666,13 @@ struct HttpAcceptor if (BindErrorCode) { - ZEN_WARN("Unable to initialize asio service, (bind returned '{}')", BindErrorCode.message()); - - return {}; + ZEN_WARN("{}: Unable to bind on port {} (bind returned '{}')", m_Label, BasePort, BindErrorCode.message()); + return; } if (EffectivePort != BasePort) { - ZEN_WARN("Desired port {} is in use, remapped to port {}", BasePort, EffectivePort); + ZEN_WARN("{}: Desired port {} is in use, remapped to port {}", m_Label, BasePort, EffectivePort); } if constexpr (std::is_same_v<asio::ip::address_v6, AddressType>) @@ -1642,55 +1682,64 @@ struct HttpAcceptor // IPv6 loopback will only respond on the IPv6 loopback address. Not everyone does // IPv6 though so we also bind to IPv4 loopback (localhost/127.0.0.1) - m_AlternateProtocolAcceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), EffectivePort), BindErrorCode); + asio::error_code AltEc; + m_AlternateProtocolAcceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), EffectivePort), AltEc); - if (BindErrorCode) + if (AltEc) { - ZEN_WARN("Failed to register secondary IPv4 local-only handler 'http://{}:{}/'", "localhost", EffectivePort); + ZEN_WARN("{}: Failed to register secondary IPv4 local-only handler on port {}", m_Label, EffectivePort); } else { m_UseAlternateProtocolAcceptor = true; - ZEN_INFO("Registered local-only handler 'http://{}:{}/' - this is not accessible from remote hosts", - "localhost", - EffectivePort); } } } - m_IsValid = true; +#if ZEN_PLATFORM_WINDOWS + // On Windows, loopback connections can take advantage of a faster code path optionally with this flag. + // This must be used by both the client and server side, and is only effective in the absence of + // Windows Filtering Platform (WFP) callouts which can be installed by security software. + // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path + SOCKET NativeSocket = m_Acceptor.native_handle(); + int LoopbackOptionValue = 1; + DWORD OptionNumberOfBytesReturned = 0; + WSAIoctl(NativeSocket, + SIO_LOOPBACK_FAST_PATH, + &LoopbackOptionValue, + sizeof(LoopbackOptionValue), + NULL, + 0, + &OptionNumberOfBytesReturned, + 0, + 0); - if constexpr (std::is_same_v<asio::ip::address_v6, AddressType>) - { - return fmt::format("http://{}:{}'", BindAddress.is_loopback() ? "[::1]" : "*", EffectivePort); - } - else + if (m_UseAlternateProtocolAcceptor) { - return fmt::format("http://{}:{}'", BindAddress.is_loopback() ? "127.0.0.1" : "*", EffectivePort); + NativeSocket = m_AlternateProtocolAcceptor.native_handle(); + WSAIoctl(NativeSocket, + SIO_LOOPBACK_FAST_PATH, + &LoopbackOptionValue, + sizeof(LoopbackOptionValue), + NULL, + 0, + &OptionNumberOfBytesReturned, + 0, + 0); } - } - - void Start() - { - ZEN_MEMSCOPE(GetHttpasioTag()); +#endif - ZEN_ASSERT(!m_IsStopped); - InitAcceptInternal(m_Acceptor); + m_Acceptor.listen(); if (m_UseAlternateProtocolAcceptor) { - InitAcceptInternal(m_AlternateProtocolAcceptor); + m_AlternateProtocolAcceptor.listen(); } - } - void StopAccepting() { m_IsStopped = true; } - - int GetAcceptPort() const { return m_Acceptor.local_endpoint().port(); } - bool IsLoopbackOnly() const { return m_Acceptor.local_endpoint().address().is_loopback(); } - - bool IsValid() const { return m_IsValid; } + m_IsValid = true; + ZEN_INFO("{}: Listening on port {}", m_Label, m_Acceptor.local_endpoint().port()); + } -private: - void InitAcceptInternal(asio::ip::tcp::acceptor& Acceptor) + void InitAcceptLoop(asio::ip::tcp::acceptor& Acceptor) { auto SocketPtr = std::make_unique<asio::ip::tcp::socket>(m_IoService); asio::ip::tcp::socket& SocketRef = *SocketPtr.get(); @@ -1698,29 +1747,19 @@ private: Acceptor.async_accept(SocketRef, [this, &Acceptor, Socket = std::move(SocketPtr)](const asio::error_code& Ec) mutable { if (Ec) { - ZEN_WARN("asio async_accept, connection failed to '{}:{}' reason '{}'", - Acceptor.local_endpoint().address().to_string(), - Acceptor.local_endpoint().port(), - Ec.message()); + if (!m_IsStopped.load()) + { + ZEN_WARN("{}: async_accept failed: '{}'", m_Label, Ec.message()); + } } else { - // New connection established, pass socket ownership into connection object - // and initiate request handling loop. The connection lifetime is - // managed by the async read/write loop by passing the shared - // reference to the callbacks. - - Socket->set_option(asio::ip::tcp::no_delay(true)); - Socket->set_option(asio::socket_base::receive_buffer_size(128 * 1024)); - Socket->set_option(asio::socket_base::send_buffer_size(256 * 1024)); - - auto Conn = std::make_shared<HttpServerConnection>(m_Server, std::move(Socket)); - Conn->HandleNewRequest(); + OnAccept(std::move(Socket)); } if (!m_IsStopped.load()) { - InitAcceptInternal(Acceptor); + InitAcceptLoop(Acceptor); } else { @@ -1728,21 +1767,204 @@ private: Acceptor.close(CloseEc); if (CloseEc) { - ZEN_WARN("acceptor close ERROR, reason '{}'", CloseEc.message()); + ZEN_WARN("{}: acceptor close error: '{}'", m_Label, CloseEc.message()); } } }); } - HttpAsioServerImpl& m_Server; - asio::io_service& m_IoService; asio::ip::tcp::acceptor m_Acceptor; asio::ip::tcp::acceptor m_AlternateProtocolAcceptor; bool m_UseAlternateProtocolAcceptor{false}; bool m_IsValid{false}; std::atomic<bool> m_IsStopped{false}; + std::string_view m_Label; +}; + +////////////////////////////////////////////////////////////////////////// + +struct HttpAcceptor final : TcpAcceptorBase +{ + HttpAcceptor(HttpAsioServerImpl& Server, asio::io_service& IoService, uint16_t BasePort, bool ForceLoopback, bool AllowPortProbing) + : TcpAcceptorBase(Server, IoService, BasePort, ForceLoopback, AllowPortProbing, "HTTP") + { + } + + int GetAcceptPort() const { return GetPort(); } + +protected: + void OnAccept(std::unique_ptr<asio::ip::tcp::socket> Socket) override + { + Socket->set_option(asio::ip::tcp::no_delay(true)); + Socket->set_option(asio::socket_base::receive_buffer_size(128 * 1024)); + Socket->set_option(asio::socket_base::send_buffer_size(256 * 1024)); + + auto Conn = std::make_shared<HttpServerConnection>(m_Server, std::move(Socket)); + Conn->HandleNewRequest(); + } }; +#if defined(ASIO_HAS_LOCAL_SOCKETS) + +////////////////////////////////////////////////////////////////////////// + +struct UnixAcceptor +{ + UnixAcceptor(HttpAsioServerImpl& Server, asio::io_service& IoService, const std::string& SocketPath) + : m_Server(Server) + , m_IoService(IoService) + , m_Acceptor(m_IoService) + , m_SocketPath(SocketPath) + { + // Remove any stale socket file from a previous run + std::filesystem::remove(m_SocketPath); + + asio::local::stream_protocol::endpoint Endpoint(m_SocketPath); + + asio::error_code Ec; + m_Acceptor.open(Endpoint.protocol(), Ec); + if (Ec) + { + ZEN_WARN("failed to open unix domain socket: {}", Ec.message()); + return; + } + + m_Acceptor.bind(Endpoint, Ec); + if (Ec) + { + ZEN_WARN("failed to bind unix domain socket at '{}': {}", m_SocketPath, Ec.message()); + return; + } + + m_Acceptor.listen(asio::socket_base::max_listen_connections, Ec); + if (Ec) + { + ZEN_WARN("failed to listen on unix domain socket at '{}': {}", m_SocketPath, Ec.message()); + return; + } + + m_IsValid = true; + ZEN_INFO("Started unix domain socket listener at '{}'", m_SocketPath); + } + + ~UnixAcceptor() + { + asio::error_code Ec; + m_Acceptor.close(Ec); + std::filesystem::remove(m_SocketPath); + } + + void Start() + { + ZEN_ASSERT(!m_IsStopped); + InitAccept(); + } + + void StopAccepting() { m_IsStopped = true; } + + bool IsValid() const { return m_IsValid; } + +private: + void InitAccept() + { + auto SocketPtr = std::make_unique<asio::local::stream_protocol::socket>(m_IoService); + asio::local::stream_protocol::socket& SocketRef = *SocketPtr.get(); + + m_Acceptor.async_accept(SocketRef, [this, Socket = std::move(SocketPtr)](const asio::error_code& Ec) mutable { + if (Ec) + { + if (!m_IsStopped.load()) + { + ZEN_WARN("unix domain socket async_accept failed: '{}'", Ec.message()); + } + } + else + { + auto Conn = std::make_shared<UnixServerConnection>(m_Server, std::move(Socket)); + Conn->HandleNewRequest(); + } + + if (!m_IsStopped.load()) + { + InitAccept(); + } + else + { + std::error_code CloseEc; + m_Acceptor.close(CloseEc); + } + }); + } + + HttpAsioServerImpl& m_Server; + asio::io_service& m_IoService; + asio::local::stream_protocol::acceptor m_Acceptor; + std::string m_SocketPath; + bool m_IsValid{false}; + std::atomic<bool> m_IsStopped{false}; +}; + +#endif // ASIO_HAS_LOCAL_SOCKETS + +#if ZEN_USE_OPENSSL + +////////////////////////////////////////////////////////////////////////// + +struct HttpsAcceptor final : TcpAcceptorBase +{ + HttpsAcceptor(HttpAsioServerImpl& Server, + asio::io_service& IoService, + asio::ssl::context& SslContext, + uint16_t Port, + bool ForceLoopback, + bool AllowPortProbing) + : TcpAcceptorBase(Server, IoService, Port, ForceLoopback, AllowPortProbing, "HTTPS") + , m_SslContext(SslContext) + { + } + +protected: + void OnAccept(std::unique_ptr<asio::ip::tcp::socket> Socket) override + { + Socket->set_option(asio::ip::tcp::no_delay(true)); + Socket->set_option(asio::socket_base::receive_buffer_size(128 * 1024)); + Socket->set_option(asio::socket_base::send_buffer_size(256 * 1024)); + + // Wrap accepted TCP socket in an SSL stream and perform the handshake + auto SslSocketPtr = std::make_unique<SslSocket>(std::move(*Socket), m_SslContext); + + SslSocket& SslRef = *SslSocketPtr; + SslRef.async_handshake(asio::ssl::stream_base::server, + [this, SslSocket = std::move(SslSocketPtr)](const asio::error_code& HandshakeEc) mutable { + if (HandshakeEc) + { + ZEN_WARN("SSL handshake failed: '{}'", HandshakeEc.message()); + std::error_code Ec; + SslSocket->lowest_layer().close(Ec); + return; + } + + auto Conn = std::make_shared<HttpsSslServerConnection>(m_Server, std::move(SslSocket)); + Conn->HandleNewRequest(); + }); + } + +private: + asio::ssl::context& m_SslContext; +}; + +#endif // ZEN_USE_OPENSSL + +int +HttpAsioServerImpl::GetEffectiveHttpsPort() const +{ +#if ZEN_USE_OPENSSL + return m_HttpsAcceptor ? m_HttpsAcceptor->GetPort() : 0; +#else + return 0; +#endif +} + ////////////////////////////////////////////////////////////////////////// HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request, @@ -1860,6 +2082,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode) ZEN_ASSERT(!m_Response); m_Response.reset(new HttpResponse(HttpContentType::kBinary, m_RequestNumber)); + m_Response->SetAllowZeroCopyFileSend(m_AllowZeroCopyFileSend); std::array<IoBuffer, 0> Empty; m_Response->InitializeForPayload((uint16_t)ResponseCode, Empty); @@ -1873,6 +2096,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT ZEN_ASSERT(!m_Response); m_Response.reset(new HttpResponse(ContentType, m_RequestNumber)); + m_Response->SetAllowZeroCopyFileSend(m_AllowZeroCopyFileSend); m_Response->InitializeForPayload((uint16_t)ResponseCode, Blobs); } @@ -1883,6 +2107,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT ZEN_ASSERT(!m_Response); m_Response.reset(new HttpResponse(ContentType, m_RequestNumber)); + m_Response->SetAllowZeroCopyFileSend(m_AllowZeroCopyFileSend); IoBuffer MessageBuffer(IoBuffer::Wrap, ResponseString.data(), ResponseString.size()); std::array<IoBuffer, 1> SingleBufferList({MessageBuffer}); @@ -1942,6 +2167,51 @@ HttpAsioServerImpl::Start(uint16_t Port, const AsioConfig& Config) m_Acceptor->Start(); +#if defined(ASIO_HAS_LOCAL_SOCKETS) + if (!Config.UnixSocketPath.empty()) + { + m_UnixAcceptor.reset(new asio_http::UnixAcceptor(*this, m_IoService, Config.UnixSocketPath)); + + if (m_UnixAcceptor->IsValid()) + { + m_UnixAcceptor->Start(); + } + else + { + m_UnixAcceptor.reset(); + } + } +#endif + +#if ZEN_USE_OPENSSL + if (!Config.CertFile.empty() && !Config.KeyFile.empty()) + { + m_SslContext = std::make_unique<asio::ssl::context>(asio::ssl::context::tlsv12_server); + m_SslContext->set_options(asio::ssl::context::default_workarounds | asio::ssl::context::no_sslv2 | asio::ssl::context::no_sslv3 | + asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1); + m_SslContext->use_certificate_chain_file(Config.CertFile); + m_SslContext->use_private_key_file(Config.KeyFile, asio::ssl::context::pem); + + ZEN_INFO("SSL context initialized (cert: '{}', key: '{}')", Config.CertFile, Config.KeyFile); + + m_HttpsAcceptor.reset(new asio_http::HttpsAcceptor(*this, + m_IoService, + *m_SslContext, + gsl::narrow<uint16_t>(Config.HttpsPort), + Config.ForceLoopback, + /*AllowPortProbing*/ !Config.IsDedicatedServer)); + + if (m_HttpsAcceptor->IsValid()) + { + m_HttpsAcceptor->Start(); + } + else + { + m_HttpsAcceptor.reset(); + } + } +#endif + // This should consist of a set of minimum threads and grow on demand to // meet concurrency needs? Right now we end up allocating a large number // of threads even if we never end up using all of them, which seems @@ -1990,6 +2260,18 @@ HttpAsioServerImpl::Stop() { m_Acceptor->StopAccepting(); } +#if defined(ASIO_HAS_LOCAL_SOCKETS) + if (m_UnixAcceptor) + { + m_UnixAcceptor->StopAccepting(); + } +#endif +#if ZEN_USE_OPENSSL + if (m_HttpsAcceptor) + { + m_HttpsAcceptor->StopAccepting(); + } +#endif m_IoService.stop(); for (auto& Thread : m_ThreadPool) { @@ -1999,7 +2281,23 @@ HttpAsioServerImpl::Stop() } } m_ThreadPool.clear(); + + // Drain remaining handlers (e.g. cancellation callbacks from active WebSocket + // connections) so that their captured Ref<> pointers are released while the + // io_service and its epoll reactor are still alive. Without this, sockets + // held by external code (e.g. IWebSocketHandler connection lists) can outlive + // the reactor and crash during deregistration. + m_IoService.restart(); + m_IoService.poll(); + m_Acceptor.reset(); +#if defined(ASIO_HAS_LOCAL_SOCKETS) + m_UnixAcceptor.reset(); +#endif +#if ZEN_USE_OPENSSL + m_HttpsAcceptor.reset(); + m_SslContext.reset(); +#endif } void @@ -2166,6 +2464,13 @@ HttpAsioServer::OnInitialize(int BasePort, std::filesystem::path DataDir) m_BasePort = m_Impl->Start(gsl::narrow<uint16_t>(BasePort), Config); +#if ZEN_USE_OPENSSL + if (int EffectiveHttpsPort = m_Impl->GetEffectiveHttpsPort(); EffectiveHttpsPort > 0) + { + SetEffectiveHttpsPort(EffectiveHttpsPort); + } +#endif + return m_BasePort; } |