// Copyright Epic Games, Inc. All Rights Reserved. #include "httpasio.h" #include "httptracer.h" #include #include #include #include #include #include "httpparser.h" #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #if ZEN_PLATFORM_WINDOWS # include # include #endif #include ZEN_THIRD_PARTY_INCLUDES_END #define ASIO_VERBOSE_TRACE 0 #if ASIO_VERBOSE_TRACE # define ZEN_TRACE_VERBOSE ZEN_TRACE #else # define ZEN_TRACE_VERBOSE(fmtstr, ...) #endif namespace zen::asio_http { using namespace std::literals; struct HttpAcceptor; struct HttpResponse; struct HttpServerConnection; inline LoggerRef InitLogger() { LoggerRef Logger = logging::Get("asio"); // Logger.set_level(spdlog::level::trace); return Logger; } inline LoggerRef Log() { static LoggerRef g_Logger = InitLogger(); return g_Logger; } ////////////////////////////////////////////////////////////////////////// struct HttpAsioServerImpl { public: HttpAsioServerImpl(); ~HttpAsioServerImpl(); void Initialize(std::filesystem::path DataDir); int Start(uint16_t Port, bool ForceLooopback, int ThreadCount); void Stop(); void RegisterService(const char* UrlPath, HttpService& Service); HttpService* RouteRequest(std::string_view Url); asio::io_service m_IoService; asio::io_service::work m_Work{m_IoService}; std::unique_ptr m_Acceptor; std::vector m_ThreadPool; LoggerRef m_RequestLog; HttpServerTracer m_RequestTracer; struct ServiceEntry { std::string ServiceUrlPath; HttpService* Service; }; RwLock m_Lock; std::vector m_UriHandlers; }; /** * This is the class which request handlers use to interact with the server instance */ class HttpAsioServerRequest : public HttpServerRequest { public: HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer); ~HttpAsioServerRequest(); virtual Oid ParseSessionId() const override; virtual uint32_t ParseRequestId() const override; virtual IoBuffer ReadPayload() override; virtual void WriteResponse(HttpResponseCode ResponseCode) override; virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span Blobs) override; virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) override; virtual void WriteResponseAsync(std::function&& ContinuationHandler) override; virtual bool TryGetRanges(HttpRanges& Ranges) override; using HttpServerRequest::WriteResponse; HttpAsioServerRequest(const HttpAsioServerRequest&) = delete; HttpAsioServerRequest& operator=(const HttpAsioServerRequest&) = delete; HttpRequestParser& m_Request; IoBuffer m_PayloadBuffer; std::unique_ptr m_Response; }; struct HttpResponse { public: HttpResponse() = default; explicit HttpResponse(HttpContentType ContentType) : m_ContentType(ContentType) {} void InitializeForPayload(uint16_t ResponseCode, std::span BlobList) { ZEN_TRACE_CPU("asio::InitializeForPayload"); m_ResponseCode = ResponseCode; const uint32_t ChunkCount = gsl::narrow(BlobList.size()); m_DataBuffers.reserve(ChunkCount); for (IoBuffer& Buffer : BlobList) { #if 1 m_DataBuffers.emplace_back(std::move(Buffer)).MakeOwned(); #else IoBuffer TempBuffer = std::move(Buffer); TempBuffer.MakeOwned(); m_DataBuffers.emplace_back(IoBufferBuilder::ReadFromFileMaybe(TempBuffer)); #endif } uint64_t LocalDataSize = 0; m_AsioBuffers.push_back({}); // Placeholder for header for (IoBuffer& Buffer : m_DataBuffers) { uint64_t BufferDataSize = Buffer.Size(); ZEN_ASSERT(BufferDataSize); LocalDataSize += BufferDataSize; IoBufferFileReference FileRef; if (Buffer.GetFileReference(/* out */ FileRef)) { // TODO: Use direct file transfer, via TransmitFile/sendfile // // this looks like it requires some custom asio plumbing however m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()}); } else { // Send from memory m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()}); } } m_ContentLength = LocalDataSize; auto Headers = GetHeaders(); m_AsioBuffers[0] = asio::const_buffer(Headers.data(), Headers.size()); } uint16_t ResponseCode() const { return m_ResponseCode; } uint64_t ContentLength() const { return m_ContentLength; } const std::vector& AsioBuffers() const { return m_AsioBuffers; } std::string_view GetHeaders() { m_Headers << "HTTP/1.1 " << ResponseCode() << " " << ReasonStringForHttpResultCode(ResponseCode()) << "\r\n" << "Content-Type: " << MapContentTypeToString(m_ContentType) << "\r\n" << "Content-Length: " << ContentLength() << "\r\n"sv; if (!m_IsKeepAlive) { m_Headers << "Connection: close\r\n"sv; } m_Headers << "\r\n"sv; return m_Headers; } void SuppressPayload() { m_AsioBuffers.resize(1); } private: uint16_t m_ResponseCode = 0; bool m_IsKeepAlive = true; HttpContentType m_ContentType = HttpContentType::kBinary; uint64_t m_ContentLength = 0; std::vector m_DataBuffers; std::vector m_AsioBuffers; ExtendableStringBuilder<160> m_Headers; }; ////////////////////////////////////////////////////////////////////////// struct HttpServerConnection : public HttpRequestParserCallbacks, std::enable_shared_from_this { HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr&& Socket); ~HttpServerConnection(); std::shared_ptr AsSharedPtr() { return shared_from_this(); } // HttpConnectionBase implementation virtual void TerminateConnection() override; virtual void HandleRequest() override; void HandleNewRequest(); private: enum class RequestState { kInitialState, kInitialRead, kReadingMore, kWriting, kWritingFinal, kDone, kTerminated }; RequestState m_RequestState = RequestState::kInitialState; HttpRequestParser m_RequestData{*this}; void EnqueueRead(); void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount); void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop = false); void CloseConnection(); HttpAsioServerImpl& m_Server; asio::streambuf m_RequestBuffer; std::unique_ptr m_Socket; std::atomic m_RequestCounter{0}; uint32_t m_ConnectionId = 0; Ref m_PackageHandler; RwLock m_ResponsesLock; std::deque> m_Responses; }; std::atomic g_ConnectionIdCounter{0}; HttpServerConnection::HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr&& Socket) : m_Server(Server) , m_Socket(std::move(Socket)) , m_ConnectionId(g_ConnectionIdCounter.fetch_add(1)) { ZEN_TRACE_VERBOSE("new connection #{}", m_ConnectionId); } HttpServerConnection::~HttpServerConnection() { ZEN_TRACE_VERBOSE("destroying connection #{}", m_ConnectionId); } void HttpServerConnection::HandleNewRequest() { EnqueueRead(); } void HttpServerConnection::TerminateConnection() { if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kTerminated) { return; } m_RequestState = RequestState::kTerminated; ZEN_ASSERT(m_Socket); // Terminating, we don't care about any errors when closing socket std::error_code Ec; m_Socket->shutdown(asio::socket_base::shutdown_both, Ec); m_Socket->close(Ec); } void HttpServerConnection::EnqueueRead() { if ((m_RequestState == RequestState::kInitialRead) || (m_RequestState == RequestState::kReadingMore)) { m_RequestState = RequestState::kReadingMore; } else { m_RequestState = RequestState::kInitialRead; } m_RequestBuffer.prepare(64 * 1024); asio::async_read(*m_Socket.get(), m_RequestBuffer, asio::transfer_at_least(1), [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnDataReceived(Ec, ByteCount); }); } void HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount) { if (Ec) { switch (m_RequestState) { case RequestState::kDone: case RequestState::kInitialRead: case RequestState::kTerminated: ZEN_TRACE_VERBOSE("on data received ERROR (EXPECTED), connection: {}, reason: '{}'", m_ConnectionId, Ec.message()); return; default: ZEN_WARN("on data received ERROR, connection: {}, reason '{}'", m_ConnectionId, Ec.message()); return TerminateConnection(); } } ZEN_TRACE_VERBOSE("on data received, connection: {}, request: {}, thread: {}, bytes: {}", m_ConnectionId, m_RequestCounter.load(std::memory_order_relaxed), zen::GetCurrentThreadId(), NiceBytes(ByteCount)); while (m_RequestBuffer.size()) { const asio::const_buffer& InputBuffer = m_RequestBuffer.data(); size_t Result = m_RequestData.ConsumeData((const char*)InputBuffer.data(), InputBuffer.size()); if (Result == ~0ull) { return TerminateConnection(); } m_RequestBuffer.consume(Result); } switch (m_RequestState) { case RequestState::kDone: case RequestState::kWritingFinal: case RequestState::kTerminated: break; default: EnqueueRead(); break; } } void HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount, bool Pop) { if (Ec) { ZEN_WARN("on data sent ERROR, connection: {}, reason: '{}'", m_ConnectionId, Ec.message()); TerminateConnection(); } else { ZEN_TRACE_VERBOSE("on data sent, connection: {}, request: {}, thread: {}, bytes: {}", m_ConnectionId, m_RequestCounter.load(std::memory_order_relaxed), zen::GetCurrentThreadId(), NiceBytes(ByteCount)); if (!m_RequestData.IsKeepAlive()) { CloseConnection(); } else { if (Pop) { RwLock::ExclusiveLockScope _(m_ResponsesLock); m_Responses.pop_front(); } m_RequestCounter.fetch_add(1); } } } void HttpServerConnection::CloseConnection() { if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kTerminated) { return; } ZEN_ASSERT(m_Socket); m_RequestState = RequestState::kDone; std::error_code Ec; m_Socket->shutdown(asio::socket_base::shutdown_receive, Ec); if (Ec) { ZEN_WARN("socket shutdown ERROR, reason '{}'", Ec.message()); } m_Socket->close(Ec); if (Ec) { ZEN_WARN("socket close ERROR, reason '{}'", Ec.message()); } } void HttpServerConnection::HandleRequest() { if (!m_RequestData.IsKeepAlive()) { m_RequestState = RequestState::kWritingFinal; std::error_code Ec; m_Socket->shutdown(asio::socket_base::shutdown_receive, Ec); if (Ec) { ZEN_WARN("socket shutdown ERROR, reason '{}'", Ec.message()); } } else { m_RequestState = RequestState::kWriting; } if (HttpService* Service = m_Server.RouteRequest(m_RequestData.Url())) { ZEN_TRACE_CPU("asio::HandleRequest"); const uint32_t RequestNumber = m_RequestCounter.load(std::memory_order_relaxed); HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body()); ZEN_TRACE_VERBOSE("handle request, connection: {}, request: {}'", m_ConnectionId, RequestNumber); const HttpVerb RequestVerb = Request.RequestVerb(); const std::string_view Uri = Request.RelativeUri(); if (m_Server.m_RequestLog.ShouldLog(logging::level::Trace)) { ZEN_LOG_TRACE(m_Server.m_RequestLog, "connection #{} Handling Request: {} {} ({} bytes ({}), accept: {})", m_ConnectionId, ToString(RequestVerb), Uri, Request.ContentLength(), ToString(Request.RequestContentType()), ToString(Request.AcceptContentType())); m_Server.m_RequestTracer.WriteDebugPayload(fmt::format("request_{}_{}.bin", m_ConnectionId, RequestNumber), std::vector{Request.ReadPayload()}); } if (!HandlePackageOffers(*Service, Request, m_PackageHandler)) { try { Service->HandleRequest(Request); } catch (const AssertException& AssertEx) { // Drop any partially formatted response Request.m_Response.reset(); ZEN_ERROR("Caught assert exception while handling request: {}", AssertEx.FullDescription()); Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, AssertEx.FullDescription()); } catch (const std::system_error& SystemError) { // Drop any partially formatted response Request.m_Response.reset(); if (IsOOM(SystemError.code()) || IsOOD(SystemError.code())) { Request.WriteResponse(HttpResponseCode::InsufficientStorage, HttpContentType::kText, SystemError.what()); } else { ZEN_ERROR("Caught system error exception while handling request: {}", SystemError.what()); Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, SystemError.what()); } } catch (const std::bad_alloc& BadAlloc) { // Drop any partially formatted response Request.m_Response.reset(); Request.WriteResponse(HttpResponseCode::InsufficientStorage, HttpContentType::kText, BadAlloc.what()); } catch (const std::exception& ex) { // Drop any partially formatted response Request.m_Response.reset(); ZEN_ERROR("Caught exception while handling request: {}", ex.what()); Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, ex.what()); } } if (std::unique_ptr Response = std::move(Request.m_Response)) { // Transmit the response if (m_RequestData.RequestVerb() == HttpVerb::kHead) { Response->SuppressPayload(); } auto ResponseBuffers = Response->AsioBuffers(); uint64_t ResponseLength = 0; for (auto& Buffer : ResponseBuffers) { ResponseLength += Buffer.size(); } { RwLock::ExclusiveLockScope _(m_ResponsesLock); m_Responses.push_back(std::move(Response)); } // TODO: should cork/uncork for Linux? { ZEN_TRACE_CPU("asio::async_write"); asio::async_write(*m_Socket.get(), ResponseBuffers, asio::transfer_exactly(ResponseLength), [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount, true); }); } return; } } if (m_RequestData.RequestVerb() == HttpVerb::kHead) { std::string_view Response = "HTTP/1.1 404 NOT FOUND\r\n" "\r\n"sv; if (!m_RequestData.IsKeepAlive()) { Response = "HTTP/1.1 404 NOT FOUND\r\n" "Connection: close\r\n" "\r\n"sv; } asio::async_write( *m_Socket.get(), asio::buffer(Response), [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); }); } else { std::string_view Response = "HTTP/1.1 404 NOT FOUND\r\n" "Content-Length: 23\r\n" "Content-Type: text/plain\r\n" "\r\n" "No suitable route found"sv; if (!m_RequestData.IsKeepAlive()) { Response = "HTTP/1.1 404 NOT FOUND\r\n" "Content-Length: 23\r\n" "Content-Type: text/plain\r\n" "Connection: close\r\n" "\r\n" "No suitable route found"sv; } asio::async_write( *m_Socket.get(), asio::buffer(Response), [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); }); } } ////////////////////////////////////////////////////////////////////////// struct HttpAcceptor { HttpAcceptor(HttpAsioServerImpl& Server, asio::io_service& IoService, uint16_t BasePort, bool ForceLoopback) : m_Server(Server) , m_IoService(IoService) , m_Acceptor(m_IoService, asio::ip::tcp::v6()) , m_AlternateProtocolAcceptor(m_IoService, asio::ip::tcp::v4()) { m_Acceptor.set_option(asio::ip::v6_only(false)); #if ZEN_PLATFORM_WINDOWS // Special option for Windows settings as !asio::socket_base::reuse_address is not the same as exclusive access on Windows platforms typedef asio::detail::socket_option::boolean exclusive_address; m_Acceptor.set_option(exclusive_address(true)); m_AlternateProtocolAcceptor.set_option(exclusive_address(true)); #else // ZEN_PLATFORM_WINDOWS m_Acceptor.set_option(asio::socket_base::reuse_address(false)); m_AlternateProtocolAcceptor.set_option(asio::socket_base::reuse_address(false)); #endif // ZEN_PLATFORM_WINDOWS m_Acceptor.set_option(asio::ip::tcp::no_delay(true)); m_Acceptor.set_option(asio::socket_base::receive_buffer_size(128 * 1024)); m_Acceptor.set_option(asio::socket_base::send_buffer_size(256 * 1024)); m_AlternateProtocolAcceptor.set_option(asio::ip::tcp::no_delay(true)); m_AlternateProtocolAcceptor.set_option(asio::socket_base::receive_buffer_size(128 * 1024)); m_AlternateProtocolAcceptor.set_option(asio::socket_base::send_buffer_size(256 * 1024)); asio::ip::address_v6 BindAddress = ForceLoopback ? asio::ip::address_v6::loopback() : asio::ip::address_v6::any(); uint16_t EffectivePort = BasePort; if (BindAddress.is_loopback()) { m_Acceptor.set_option(asio::ip::v6_only(true)); } asio::error_code BindErrorCode; m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); if (BindErrorCode == asio::error::access_denied && !BindAddress.is_loopback()) { // Access denied for a public port - lets try fall back to local port only BindAddress = asio::ip::address_v6::loopback(); m_Acceptor.set_option(asio::ip::v6_only(true)); m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); } if (BindErrorCode == asio::error::address_in_use) { // Do a retry after a short sleep on same port just to be sure ZEN_INFO("Desired port %d is in use, retrying", BasePort); Sleep(100); m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); } // Sharing violation implies the port is being used by another process for (uint16_t PortOffset = 1; (BindErrorCode == asio::error::address_in_use) && (PortOffset < 10); ++PortOffset) { EffectivePort = BasePort + (PortOffset * 100); m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); } if (BindErrorCode == asio::error::access_denied) { EffectivePort = 0; m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); } if (BindErrorCode) { ZEN_ERROR("Unable open asio service, error '{}'", BindErrorCode.message()); } else if (BindAddress.is_loopback()) { m_AlternateProtocolAcceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), EffectivePort), BindErrorCode); m_UseAlternateProtocolAcceptor = true; ZEN_INFO("Registered local-only handler 'http://{}:{}/' - this is not accessible from remote hosts", "localhost", EffectivePort); } #if ZEN_PLATFORM_WINDOWS // On Windows, loopback connections can take advantage of a faster code path optionally with this flag. // This must be used by both the client and server side, and is only effective in the absence of // Windows Filtering Platform (WFP) callouts which can be installed by security software. // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path SOCKET NativeSocket = m_Acceptor.native_handle(); int LoopbackOptionValue = 1; DWORD OptionNumberOfBytesReturned = 0; WSAIoctl(NativeSocket, SIO_LOOPBACK_FAST_PATH, &LoopbackOptionValue, sizeof(LoopbackOptionValue), NULL, 0, &OptionNumberOfBytesReturned, 0, 0); if (m_UseAlternateProtocolAcceptor) { NativeSocket = m_AlternateProtocolAcceptor.native_handle(); WSAIoctl(NativeSocket, SIO_LOOPBACK_FAST_PATH, &LoopbackOptionValue, sizeof(LoopbackOptionValue), NULL, 0, &OptionNumberOfBytesReturned, 0, 0); } #endif m_Acceptor.listen(); if (m_UseAlternateProtocolAcceptor) { m_AlternateProtocolAcceptor.listen(); } ZEN_INFO("Started asio server at 'http://{}:{}'", BindAddress.is_loopback() ? "[::1]" : "*", EffectivePort); } ~HttpAcceptor() { m_Acceptor.close(); if (m_UseAlternateProtocolAcceptor) { m_AlternateProtocolAcceptor.close(); } } void Start() { ZEN_ASSERT(!m_IsStopped); InitAcceptInternal(m_Acceptor); if (m_UseAlternateProtocolAcceptor) { InitAcceptInternal(m_AlternateProtocolAcceptor); } } void StopAccepting() { m_IsStopped = true; } int GetAcceptPort() { return m_Acceptor.local_endpoint().port(); } private: void InitAcceptInternal(asio::ip::tcp::acceptor& Acceptor) { auto SocketPtr = std::make_unique(m_IoService); asio::ip::tcp::socket& SocketRef = *SocketPtr.get(); Acceptor.async_accept(SocketRef, [this, &Acceptor, Socket = std::move(SocketPtr)](const asio::error_code& Ec) mutable { if (Ec) { ZEN_WARN("asio async_accept, connection failed to '{}:{}' reason '{}'", Acceptor.local_endpoint().address().to_string(), Acceptor.local_endpoint().port(), Ec.message()); } else { // New connection established, pass socket ownership into connection object // and initiate request handling loop. The connection lifetime is // managed by the async read/write loop by passing the shared // reference to the callbacks. Socket->set_option(asio::ip::tcp::no_delay(true)); Socket->set_option(asio::socket_base::receive_buffer_size(128 * 1024)); Socket->set_option(asio::socket_base::send_buffer_size(256 * 1024)); auto Conn = std::make_shared(m_Server, std::move(Socket)); Conn->HandleNewRequest(); } if (!m_IsStopped.load()) { InitAcceptInternal(Acceptor); } else { std::error_code CloseEc; Acceptor.close(CloseEc); if (CloseEc) { ZEN_WARN("acceptor close ERROR, reason '{}'", CloseEc.message()); } } }); } HttpAsioServerImpl& m_Server; asio::io_service& m_IoService; asio::ip::tcp::acceptor m_Acceptor; asio::ip::tcp::acceptor m_AlternateProtocolAcceptor; bool m_UseAlternateProtocolAcceptor{false}; std::atomic m_IsStopped{false}; }; ////////////////////////////////////////////////////////////////////////// HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer) : m_Request(Request) , m_PayloadBuffer(std::move(PayloadBuffer)) { const int PrefixLength = Service.UriPrefixLength(); std::string_view Uri = Request.Url(); Uri.remove_prefix(std::min(PrefixLength, static_cast(Uri.size()))); m_Uri = Uri; m_UriWithExtension = Uri; m_QueryString = Request.QueryString(); m_Verb = Request.RequestVerb(); m_ContentLength = Request.Body().Size(); m_ContentType = Request.ContentType(); HttpContentType AcceptContentType = HttpContentType::kUnknownContentType; // Parse any extension, to allow requesting a particular response encoding via the URL { std::string_view UriSuffix8{m_Uri}; const size_t LastComponentIndex = UriSuffix8.find_last_of('/'); if (LastComponentIndex != std::string_view::npos) { UriSuffix8.remove_prefix(LastComponentIndex); } const size_t LastDotIndex = UriSuffix8.find_last_of('.'); if (LastDotIndex != std::string_view::npos) { UriSuffix8.remove_prefix(LastDotIndex + 1); AcceptContentType = ParseContentType(UriSuffix8); if (AcceptContentType != HttpContentType::kUnknownContentType) { m_Uri.remove_suffix(uint32_t(UriSuffix8.size() + 1)); } } } // It an explicit content type extension was specified then we'll use that over any // Accept: header value that may be present if (AcceptContentType != HttpContentType::kUnknownContentType) { m_AcceptType = AcceptContentType; } else { m_AcceptType = Request.AcceptType(); } } HttpAsioServerRequest::~HttpAsioServerRequest() { } Oid HttpAsioServerRequest::ParseSessionId() const { return m_Request.SessionId(); } uint32_t HttpAsioServerRequest::ParseRequestId() const { return m_Request.RequestId(); } IoBuffer HttpAsioServerRequest::ReadPayload() { return m_PayloadBuffer; } void HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode) { ZEN_ASSERT(!m_Response); m_Response.reset(new HttpResponse(HttpContentType::kBinary)); std::array Empty; m_Response->InitializeForPayload((uint16_t)ResponseCode, Empty); } void HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span Blobs) { ZEN_ASSERT(!m_Response); m_Response.reset(new HttpResponse(ContentType)); m_Response->InitializeForPayload((uint16_t)ResponseCode, Blobs); } void HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) { ZEN_ASSERT(!m_Response); m_Response.reset(new HttpResponse(ContentType)); IoBuffer MessageBuffer(IoBuffer::Wrap, ResponseString.data(), ResponseString.size()); std::array SingleBufferList({MessageBuffer}); m_Response->InitializeForPayload((uint16_t)ResponseCode, SingleBufferList); } void HttpAsioServerRequest::WriteResponseAsync(std::function&& ContinuationHandler) { ZEN_ASSERT(!m_Response); // Not one bit async, innit ContinuationHandler(*this); } bool HttpAsioServerRequest::TryGetRanges(HttpRanges& Ranges) { return TryParseHttpRangeHeader(m_Request.RangeHeader(), Ranges); } ////////////////////////////////////////////////////////////////////////// HttpAsioServerImpl::HttpAsioServerImpl() : m_RequestLog(logging::Get("http_requests")) { } HttpAsioServerImpl::~HttpAsioServerImpl() { } void HttpAsioServerImpl::Initialize(std::filesystem::path DataDir) { m_RequestTracer.Initialize(DataDir); } int HttpAsioServerImpl::Start(uint16_t Port, bool ForceLooopback, int ThreadCount) { ZEN_ASSERT(ThreadCount > 0); ZEN_INFO("starting asio http with {} service threads", ThreadCount); m_Acceptor.reset(new asio_http::HttpAcceptor(*this, m_IoService, Port, ForceLooopback)); m_Acceptor->Start(); // This should consist of a set of minimum threads and grow on demand to // meet concurrency needs? Right now we end up allocating a large number // of threads even if we never end up using all of them, which seems // wasteful. It's also not clear how the demand for concurrency should // be balanced with the engine side - ideally we'd have some kind of // global scheduling to prevent one side from preventing the other side // from making progress. Or at the very least, thread priorities should // be considered. for (int i = 0; i < ThreadCount; ++i) { m_ThreadPool.emplace_back([this, Index = i + 1] { SetCurrentThreadName(fmt::format("asio_io_{}", Index)); try { m_IoService.run(); } catch (const AssertException& AssertEx) { ZEN_ERROR("Assert caught in asio event loop: {}", AssertEx.FullDescription()); } catch (const std::exception& e) { ZEN_ERROR("Exception caught in asio event loop: '{}'", e.what()); } }); } ZEN_INFO("asio http started (port {})", m_Acceptor->GetAcceptPort()); return m_Acceptor->GetAcceptPort(); } void HttpAsioServerImpl::Stop() { if (m_Acceptor) { m_Acceptor->StopAccepting(); } m_IoService.stop(); for (auto& Thread : m_ThreadPool) { if (Thread.joinable()) { Thread.join(); } } m_ThreadPool.clear(); m_Acceptor.reset(); } void HttpAsioServerImpl::RegisterService(const char* InUrlPath, HttpService& Service) { std::string_view UrlPath(InUrlPath); Service.SetUriPrefixLength(UrlPath.size()); if (!UrlPath.empty() && UrlPath.back() == '/') { UrlPath.remove_suffix(1); } RwLock::ExclusiveLockScope _(m_Lock); m_UriHandlers.push_back({std::string(UrlPath), &Service}); } HttpService* HttpAsioServerImpl::RouteRequest(std::string_view Url) { RwLock::SharedLockScope _(m_Lock); HttpService* CandidateService = nullptr; std::string::size_type CandidateMatchSize = 0; for (const ServiceEntry& SvcEntry : m_UriHandlers) { const std::string& SvcUrl = SvcEntry.ServiceUrlPath; const std::string::size_type SvcUrlSize = SvcUrl.size(); if ((SvcUrlSize >= CandidateMatchSize) && Url.compare(0, SvcUrlSize, SvcUrl) == 0 && ((SvcUrlSize == Url.size()) || (Url[SvcUrlSize] == '/'))) { CandidateMatchSize = SvcUrl.size(); CandidateService = SvcEntry.Service; } } return CandidateService; } } // namespace zen::asio_http ////////////////////////////////////////////////////////////////////////// namespace zen { class HttpAsioServer : public HttpServer { public: HttpAsioServer(bool ForceLoopback, unsigned int ThreadCount); ~HttpAsioServer(); virtual void RegisterService(HttpService& Service) override; virtual int Initialize(int BasePort, std::filesystem::path DataDir) override; virtual void Run(bool IsInteractiveSession) override; virtual void RequestExit() override; virtual void Close() override; private: Event m_ShutdownEvent; int m_BasePort = 0; bool m_ForceLoopback = false; unsigned int m_ThreadCount = 0; std::unique_ptr m_Impl; }; HttpAsioServer::HttpAsioServer(bool ForceLoopback, unsigned int ThreadCount) : m_ForceLoopback(ForceLoopback) , m_ThreadCount(ThreadCount != 0 ? ThreadCount : Max(std::thread::hardware_concurrency(), 8u)) , m_Impl(std::make_unique()) { ZEN_DEBUG("Request object size: {} ({:#x})", sizeof(HttpRequestParser), sizeof(HttpRequestParser)); } HttpAsioServer::~HttpAsioServer() { if (m_Impl) { ZEN_ERROR("~HttpAsioServer() called without calling Close() first"); } } void HttpAsioServer::Close() { try { m_Impl->Stop(); } catch (const std::exception& ex) { ZEN_WARN("Caught exception stopping http asio server: {}", ex.what()); } m_Impl.reset(); } void HttpAsioServer::RegisterService(HttpService& Service) { m_Impl->RegisterService(Service.BaseUri(), Service); } int HttpAsioServer::Initialize(int BasePort, std::filesystem::path DataDir) { m_Impl->Initialize(DataDir); m_BasePort = m_Impl->Start(gsl::narrow(BasePort), m_ForceLoopback, m_ThreadCount); return m_BasePort; } void HttpAsioServer::Run(bool IsInteractive) { const bool TestMode = !IsInteractive; int WaitTimeout = -1; if (!TestMode) { WaitTimeout = 1000; } #if ZEN_PLATFORM_WINDOWS if (TestMode == false) { ZEN_CONSOLE("Zen Server running (asio HTTP). Press ESC or Q to quit"); } do { if (!TestMode && _kbhit() != 0) { char c = (char)_getch(); if (c == 27 || c == 'Q' || c == 'q') { RequestApplicationExit(0); } } m_ShutdownEvent.Wait(WaitTimeout); } while (!IsApplicationExitRequested()); #else if (TestMode == false) { ZEN_CONSOLE("Zen Server running (asio HTTP). Ctrl-C to quit"); } do { m_ShutdownEvent.Wait(WaitTimeout); } while (!IsApplicationExitRequested()); #endif } void HttpAsioServer::RequestExit() { m_ShutdownEvent.Set(); } Ref CreateHttpAsioServer(bool ForceLoopback, unsigned int ThreadCount) { return Ref{new HttpAsioServer(ForceLoopback, ThreadCount)}; } } // namespace zen