// Copyright Epic Games, Inc. All Rights Reserved. #include "httpasio.h" #include "httptracer.h" #include #include #include #include #include #include #include #include #include "httpparser.h" #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #if ZEN_PLATFORM_WINDOWS # include # include #else # include # include # include #endif #include #include ZEN_THIRD_PARTY_INCLUDES_END #define ASIO_VERBOSE_TRACE 0 #if ASIO_VERBOSE_TRACE # define ZEN_TRACE_VERBOSE ZEN_TRACE #else # define ZEN_TRACE_VERBOSE(fmtstr, ...) #endif #if ZEN_PLATFORM_LINUX static bool IsIPv6Available() { int fd = ::socket(AF_INET6, SOCK_DGRAM, 0); if (fd < 0) { // errors when IPv6 is unavailable/disabled: EAFNOSUPPORT, EPROTONOSUPPORT, ... return false; } ::close(fd); return true; } #elif ZEN_PLATFORM_WINDOWS static bool IsIPv6Available() { SOCKET s = ::socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); if (s == INVALID_SOCKET) { int e = WSAGetLastError(); // WSAEAFNOSUPPORT means the IPv6 stack is unavailable return e != WSAEAFNOSUPPORT; } closesocket(s); return true; } #else static bool IsIPv6Available() { return true; } #endif #if ZEN_PLATFORM_LINUX static bool IsIPv6AvailableSysctl(void) { int val = 0; const char* path = "/proc/sys/net/ipv6/conf/all/disable_ipv6"; if (FILE* f = fopen(path, "r")) { char buf[16]; if (fgets(buf, sizeof(buf), f)) { fclose(f); // 0 means IPv6 enabled, 1 means disabled val = atoi(buf); } } return val == 0; } #endif // ZEN_PLATFORM_LINUX namespace zen { #if ZEN_PLATFORM_LINUX bool IsIPv6Capable() { static bool CachedCaps = IsIPv6Available() && IsIPv6AvailableSysctl(); return CachedCaps; } #else bool IsIPv6Capable() { static bool CachedCaps = IsIPv6Available(); // On Windows it's possible to disable IPv6 via `HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip6\Parameters` // and setting `DisabledComponents` to 0xFF. This should act like Linux does with the setting above but I have not tested // that. // On MacOS you can disable IPv6 but allegedly loopback is always still available - this might be worth investigating at // some point for completeness. return CachedCaps; } #endif const FLLMTag& GetHttpasioTag() { static FLLMTag _("httpasio"); return _; } } // namespace zen namespace zen::asio_http { using namespace std::literals; struct HttpAcceptor; struct HttpResponse; struct HttpServerConnection; inline LoggerRef InitLogger() { LoggerRef Logger = logging::Get("asio"); // Logger.SetLogLevel(logging::level::Trace); return Logger; } inline LoggerRef Log() { static LoggerRef g_Logger = InitLogger(); return g_Logger; } ////////////////////////////////////////////////////////////////////////// #if !defined(ASIO_HAS_FILE) # define ASIO_HAS_FILE 0 #endif #if defined(ASIO_HAS_WINDOWS_OVERLAPPED_PTR) # define ZEN_USE_TRANSMITFILE 1 # define ZEN_USE_ASYNC_SENDFILE 0 #else # define ZEN_USE_TRANSMITFILE 0 # define ZEN_USE_ASYNC_SENDFILE 0 #endif #if ZEN_USE_TRANSMITFILE template void TransmitFileAsync(asio::ip::tcp::socket& Socket, HANDLE FileHandle, uint64_t ByteOffset, uint32_t ByteSize, Handler&& Cb) { # if ZEN_BUILD_DEBUG const uint64_t FileSize = FileSizeFromHandle(FileHandle); const uint64_t SendEndOffset = ByteOffset + ByteSize; if (SendEndOffset > FileSize) { std::error_code DummyEc; ZEN_WARN("TransmitFileAsync (offset {:#x}, size {:#x}) file: '{}' (size {:#x})) tries to transmit {} bytes too many", ByteOffset, ByteSize, PathFromHandle(FileHandle, DummyEc), FileSizeFromHandle(FileHandle), SendEndOffset - FileSize); } # endif // ZEN_BUILD_DEBUG asio::windows::overlapped_ptr OverlappedPtr( Socket.get_executor(), [WrappedCb = std::move(Cb), ExpectedBytes = ByteSize, FileHandle, ByteOffset, ByteSize](const std::error_code& Ec, std::size_t ActualBytesTransferred) { if (Ec) { std::error_code DummyEc; ZEN_WARN("NOTE: TransmitFileAsync (offset {:#x}, size {:#x}) file: '{}' (size {:#x})) error '{}', transmitted {} bytes", ByteOffset, ByteSize, PathFromHandle(FileHandle, DummyEc), FileSizeFromHandle(FileHandle), Ec.message(), ActualBytesTransferred); } WrappedCb(Ec, ActualBytesTransferred); }); OVERLAPPED* RawOverlapped = OverlappedPtr.get(); RawOverlapped->Offset = uint32_t(ByteOffset & 0xffffFFFFull); RawOverlapped->OffsetHigh = uint32_t(ByteOffset >> 32); const DWORD NumberOfBytesPerSend = 0; // let TransmitFile decide const BOOL Ok = ::TransmitFile(Socket.native_handle(), FileHandle, ByteSize, NumberOfBytesPerSend, RawOverlapped, nullptr, /* dwReserved */ 0); const DWORD LastError = ::GetLastError(); // Check if the operation completed immediately. if (!Ok && LastError != ERROR_IO_PENDING) { // The operation completed immediately, so a completion notification needs // to be posted. When complete() is called, ownership of the OVERLAPPED- // derived object passes to the io_context. asio::error_code ec(LastError, asio::error::get_system_category()); OverlappedPtr.complete(ec, 0); } else { // The operation was successfully initiated, so ownership of the // OVERLAPPED-derived object has passed to the io_context. OverlappedPtr.release(); } } #endif // ZEN_USE_TRANSMITFILE #if ZEN_USE_ASYNC_SENDFILE // Pipelined file sender that reads from a file and writes to a socket using two buffers // to pipeline reads and writes. Unfortunately this strategy can't currently be used on // non-Windows platforms as they don't currently support async file reads. We'll have // to build a mechanism using a thread pool for that, perhaps with optional support // for io_uring where available since that should be the most efficient. // // In other words, this is not super useful as Windows already has the TransmitFile // version above, but it's here for completeness and potential future use on other platforms. template class PipelinedFileSender : public std::enable_shared_from_this> { public: PipelinedFileSender(AsyncWriteStream& WriteSocket, asio::stream_file&& FileToReadFrom, uint64_t ByteSize, std::size_t BufferSize, CompletionHandler&& CompletionToken) : m_WriteSocket(WriteSocket) , m_SourceFile(std::move(FileToReadFrom)) , m_Strand(asio::make_strand(m_WriteSocket.get_executor())) , m_Buffers{std::vector(BufferSize), std::vector(BufferSize)} , m_CompletionHandler(std::move(CompletionToken)) , m_TotalBytesToRead(ByteSize) , m_RemainingBytesToRead(ByteSize) { } void Start() { EnqueueRead(); } private: struct Slot { std::size_t Size = 0; // valid bytes in buffer bool Ready = false; // has unwritten data bool InUse = false; // being written }; void OnSendCompleted(const std::error_code& Ec) { // TODO: ensure this behaves properly for instance if the write fails while a read is pending if (m_SendCompleted) { return; } m_SendCompleted = true; // Ensure completion runs on the strand/executor. asio::dispatch(m_Strand, [CompletionHandler = std::move(m_CompletionHandler), Ec, TotalBytesWritten = m_TotalBytesWritten]() { CompletionHandler(Ec, TotalBytesWritten); }); } void EnqueueRead() { asio::dispatch(m_Strand, [self = this->shared_from_this()] { self->TryPostRead(); self->PumpWrites(); }); } void TryPostRead() { if (m_IsEof || m_ReadInFlight || m_RemainingBytesToRead == 0) { return; } const int ReadSlotIndex = GetFreeSlotIndex(); if (ReadSlotIndex < 0) { // no free slot; wait for writer to free one (not meant to ever happen) return; } m_ReadInFlight = true; auto ReadBuffer = asio::buffer(m_Buffers[ReadSlotIndex].data(), zen::Min(m_Buffers[ReadSlotIndex].size(), m_RemainingBytesToRead)); asio::async_read( m_SourceFile, ReadBuffer, asio::bind_executor(m_Strand, [self = this->shared_from_this(), ReadSlotIndex](const std::error_code& Ec, std::size_t ActualBytesRead) { self->m_ReadInFlight = false; self->m_RemainingBytesToRead -= ActualBytesRead; if (Ec) { if (Ec == asio::error::eof) { ZEN_ASSERT(self->m_RemainingBytesToRead == 0); self->m_IsEof = true; // No data produced on EOF; just try to pump whatever is left self->PumpWrites(); } else { // read error, cancel everything and let outer completion handler know self->OnSendCompleted(Ec); } } else { // Mark slot as ready with ActualBytesRead valid bytes of data in buffer self->m_Slots[ReadSlotIndex].Size = ActualBytesRead; self->m_Slots[ReadSlotIndex].Ready = true; self->PumpWrites(); self->TryPostRead(); } })); } void PumpWrites() { if (m_WriteInFlight) { return; } const int WriteSlotIndex = GetReadySlotIndex(); if (WriteSlotIndex < 0) { // No ready data. We're done if EOF/no more data to read and no reads in flight and nothing ready if (!m_ReadInFlight && (m_IsEof || m_RemainingBytesToRead == 0)) { // all done return OnSendCompleted({}); } return; } m_WriteInFlight = true; m_Slots[WriteSlotIndex].InUse = true; asio::async_write( m_WriteSocket, asio::buffer(m_Buffers[WriteSlotIndex].data(), m_Slots[WriteSlotIndex].Size), asio::bind_executor(m_Strand, [self = this->shared_from_this(), WriteSlotIndex](const std::error_code& Ec, std::size_t BytesWritten) { self->m_TotalBytesWritten += BytesWritten; self->m_WriteInFlight = false; if (Ec) { self->OnSendCompleted(Ec); return; } else { // Free the slot self->m_Slots[WriteSlotIndex].Ready = false; self->m_Slots[WriteSlotIndex].InUse = false; self->m_Slots[WriteSlotIndex].Size = 0; self->TryPostRead(); self->PumpWrites(); } })); } int GetFreeSlotIndex() const { for (int i = 0; i < 2; ++i) { if (!m_Slots[i].Ready && !m_Slots[i].InUse) { return i; } } return -1; } int GetReadySlotIndex() const { for (int i = 0; i < 2; ++i) { if (m_Slots[i].Ready && !m_Slots[i].InUse) { return i; } } return -1; } AsyncWriteStream& m_WriteSocket; asio::stream_file m_SourceFile; asio::strand m_Strand; // There's no synchronization needed for these as all access is via the strand std::vector m_Buffers[2]; Slot m_Slots[2]; bool m_IsEof = false; bool m_ReadInFlight = false; bool m_WriteInFlight = false; bool m_SendCompleted = false; const uint64_t m_TotalBytesToRead = 0; uint64_t m_RemainingBytesToRead = 0; uint64_t m_TotalBytesWritten = 0; CompletionHandler m_CompletionHandler; }; template void SendFileAsync(AsyncWriteStream& WriteSocket, const auto FileHandle, uint64_t ByteOffset, uint64_t ByteSize, std::size_t BufferSize, CompletionHandler&& CompletionToken) { HANDLE hReopenedFile = ReOpenFile(FileHandle, FILE_GENERIC_READ, FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, FILE_FLAG_OVERLAPPED); // Note that this assumes ownership of the handle asio::stream_file SourceFile(WriteSocket.get_executor(), hReopenedFile); // TODO: handle any error properly here SourceFile.seek(ByteOffset, asio::stream_file::seek_set); if (BufferSize > ByteSize) { BufferSize = ByteSize; } auto op = std::make_shared>>( WriteSocket, std::move(SourceFile), ByteSize, BufferSize, std::forward(CompletionToken)); // Start the pipeline op->Start(); } #endif // ZEN_USE_ASYNC_SENDFILE ////////////////////////////////////////////////////////////////////////// struct HttpAsioServerImpl { public: HttpAsioServerImpl(); ~HttpAsioServerImpl(); void Initialize(std::filesystem::path DataDir); int Start(uint16_t Port, const AsioConfig& Config); void Stop(); void RegisterService(const char* UrlPath, HttpService& Service); void SetHttpRequestFilter(IHttpRequestFilter* RequestFilter); HttpService* RouteRequest(std::string_view Url); IHttpRequestFilter::Result FilterRequest(HttpServerRequest& Request); asio::io_service m_IoService; asio::io_service::work m_Work{m_IoService}; std::unique_ptr m_Acceptor; std::vector m_ThreadPool; std::atomic m_HttpRequestFilter = nullptr; LoggerRef m_RequestLog; HttpServerTracer m_RequestTracer; struct ServiceEntry { std::string ServiceUrlPath; HttpService* Service; }; RwLock m_Lock; std::vector m_UriHandlers; }; /** * This is the class which request handlers use to interact with the server instance */ class HttpAsioServerRequest : public HttpServerRequest { public: HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer, uint32_t RequestNumber, bool IsLocalMachineRequest); ~HttpAsioServerRequest(); virtual Oid ParseSessionId() const override; virtual uint32_t ParseRequestId() const override; virtual bool IsLocalMachineRequest() const override; virtual std::string_view GetAuthorizationHeader() const override; virtual IoBuffer ReadPayload() override; virtual void WriteResponse(HttpResponseCode ResponseCode) override; virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span Blobs) override; virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) override; virtual void WriteResponseAsync(std::function&& ContinuationHandler) override; virtual bool TryGetRanges(HttpRanges& Ranges) override; using HttpServerRequest::WriteResponse; HttpAsioServerRequest(const HttpAsioServerRequest&) = delete; HttpAsioServerRequest& operator=(const HttpAsioServerRequest&) = delete; HttpRequestParser& m_Request; uint32_t m_RequestNumber = 0; // Note: different to request ID which is derived from headers IoBuffer m_PayloadBuffer; bool m_IsLocalMachineRequest; std::unique_ptr m_Response; }; /** * HTTP Response representation used internally by the ASIO server * * This is used to build up the response headers and payload prior to sending * it over the network. It's also responsible for managing the send operation itself, * including ownership of the source buffers until the operation completes. * */ struct HttpResponse { public: HttpResponse() = default; explicit HttpResponse(HttpContentType ContentType, uint32_t RequestNumber) : m_RequestNumber(RequestNumber), m_ContentType(ContentType) { } ~HttpResponse() = default; /** * Initialize the response for sending a payload made up of multiple blobs * * This builds the necessary headers and IO vectors for sending the response * and also makes sure all buffers are owned for the duration of the * operation. * */ void InitializeForPayload(uint16_t ResponseCode, std::span BlobList) { ZEN_ASSERT(m_State == State::kUninitialized); ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_TRACE_CPU("asio::InitializeForPayload"); m_ResponseCode = ResponseCode; const uint32_t ChunkCount = gsl::narrow(BlobList.size()); m_DataBuffers.reserve(ChunkCount); m_IoVecs.reserve(ChunkCount + 1); m_IoVecs.emplace_back(); // header IoVec m_IoVecCursor = 0; uint64_t LocalDataSize = 0; for (IoBuffer& Buffer : BlobList) { const uint64_t BufferDataSize = Buffer.Size(); ZEN_ASSERT(BufferDataSize); LocalDataSize += BufferDataSize; IoBuffer OwnedBuffer = std::move(Buffer); bool ChunkHandled = false; #if ZEN_USE_TRANSMITFILE || ZEN_USE_ASYNC_SENDFILE if (OwnedBuffer.IsWholeFile()) { if (IoBufferFileReference FileRef; OwnedBuffer.GetFileReference(/* out */ FileRef)) { # if ZEN_USE_TRANSMITFILE // We establish a new handle here to add the FILE_FLAG_OVERLAPPED flag for use during TransmitFile HANDLE WinFileHandle = ReOpenFile(FileRef.FileHandle, FILE_GENERIC_READ, FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, FILE_FLAG_OVERLAPPED); if (WinFileHandle == INVALID_HANDLE_VALUE) { HRESULT hRes = HRESULT_FROM_WIN32(GetLastError()); std::error_code DummyEc; ThrowSystemException(hRes, fmt::format("Failed to ReOpenFile file {}", PathFromHandle(FileRef.FileHandle, DummyEc))); } void* FileHandle = (void*)WinFileHandle; OwnedBuffer = IoBufferBuilder::MakeFromFileHandle(FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize); # else // ZEN_USE_TRANSMITFILE void* FileHandle = FileRef.FileHandle; OwnedBuffer.MakeOwned(); # endif // ZEN_USE_TRANSMITFILE // Since there's a limit to how much data TransmitFile can send in one go, // we may need to split this into multiple IoVec entries. In this case we'll // end up reallocating the IoVec array, but this should be rare. uint64_t RemainingChunkBytes = FileRef.FileChunkSize; uint64_t ChunkOffset = FileRef.FileChunkOffset; const uint32_t MaxTransmitSize = 1 * 1024 * 1024 * 1024; // 1 GB while (RemainingChunkBytes) { IoVec Io{.IsFileRef = true}; Io.Ref.FileRef.FileHandle = FileHandle; Io.Ref.FileRef.FileChunkOffset = ChunkOffset; if (RemainingChunkBytes > MaxTransmitSize) { Io.Ref.FileRef.FileChunkSize = MaxTransmitSize; RemainingChunkBytes -= MaxTransmitSize; } else { Io.Ref.FileRef.FileChunkSize = gsl::narrow(RemainingChunkBytes); RemainingChunkBytes = 0; } ChunkOffset += Io.Ref.FileRef.FileChunkSize; m_IoVecs.push_back(Io); } ChunkHandled = true; } } #endif // ZEN_USE_TRANSMITFILE || ZEN_USE_ASYNC_SENDFILE if (!ChunkHandled) { OwnedBuffer.MakeOwned(); IoVec Io{.IsFileRef = false}; Io.Ref.MemoryRef = {.Data = OwnedBuffer.Data(), .Size = OwnedBuffer.Size()}; m_IoVecs.push_back(Io); } m_DataBuffers.push_back(std::move(OwnedBuffer)); } // Now that we know the full data size, we can build the headers m_ContentLength = LocalDataSize; std::string_view Headers = GetHeaders(); IoVec& HeaderIo = m_IoVecs[0]; HeaderIo.IsFileRef = false; HeaderIo.Ref.MemoryRef = {.Data = Headers.data(), .Size = Headers.size()}; m_State = State::kInitialized; } uint16_t ResponseCode() const { return m_ResponseCode; } uint64_t ContentLength() const { return m_ContentLength; } std::string_view GetHeaders() { ZEN_MEMSCOPE(GetHttpasioTag()); m_Headers << "HTTP/1.1 " << ResponseCode() << " " << ReasonStringForHttpResultCode(ResponseCode()) << "\r\n" << "Content-Type: " << MapContentTypeToString(m_ContentType) << "\r\n" << "Content-Length: " << ContentLength() << "\r\n"sv; if (!m_IsKeepAlive) { m_Headers << "Connection: close\r\n"sv; } m_Headers << "\r\n"sv; return m_Headers; } void SendResponse(asio::ip::tcp::socket& TcpSocket, std::function&& Token) { ZEN_ASSERT(m_State == State::kInitialized); ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_TRACE_CPU("asio::SendResponse"); m_SendCb = std::move(Token); m_State = State::kSending; SendNextChunk(TcpSocket); } void SendNextChunk(asio::ip::tcp::socket& TcpSocket) { ZEN_ASSERT(m_State == State::kSending); ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_TRACE_CPU("asio::SendNextChunk"); if (m_IoVecCursor == m_IoVecs.size()) { // All data sent, complete the operation ZEN_ASSERT(m_SendCb); m_State = State::kSent; auto CompletionToken = [Self = this, Token = std::move(m_SendCb), TotalBytes = m_TotalBytesSent] { Token({}, TotalBytes); }; asio::defer(TcpSocket.get_executor(), std::move(CompletionToken)); return; } auto OnCompletion = [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) { ZEN_ASSERT(m_State == State::kSending); m_TotalBytesSent += ByteCount; if (Ec) { m_State = State::kFailed; m_SendCb(Ec, m_TotalBytesSent); } else { SendNextChunk(TcpSocket); } }; const IoVec& Io = m_IoVecs[m_IoVecCursor++]; if (Io.IsFileRef) { ZEN_TRACE_VERBOSE("SendNextChunk from FILE, thread: {}, offset: {}, bytes: {}", zen::GetCurrentThreadId(), Io.Ref.FileRef.FileChunkOffset, Io.Ref.FileRef.FileChunkSize); #if ZEN_USE_TRANSMITFILE TransmitFileAsync(TcpSocket, Io.Ref.FileRef.FileHandle, Io.Ref.FileRef.FileChunkOffset, gsl::narrow_cast(Io.Ref.FileRef.FileChunkSize), OnCompletion); #elif ZEN_USE_ASYNC_SENDFILE SendFileAsync(TcpSocket, Io.Ref.FileRef.FileHandle, Io.Ref.FileRef.FileChunkOffset, Io.Ref.FileRef.FileChunkSize, 64 * 1024, OnCompletion); #else // This should never occur unless we compile with one // of the options above ZEN_WARN("invalid file reference in response"); #endif return; } // Send as many consecutive non-file references as possible in one asio operation std::vector AsioBuffers; AsioBuffers.push_back(asio::const_buffer{Io.Ref.MemoryRef.Data, Io.Ref.MemoryRef.Size}); while (m_IoVecCursor != m_IoVecs.size()) { const IoVec& Io2 = m_IoVecs[m_IoVecCursor]; if (Io2.IsFileRef) { break; } AsioBuffers.push_back(asio::const_buffer{Io2.Ref.MemoryRef.Data, Io2.Ref.MemoryRef.Size}); ++m_IoVecCursor; } asio::async_write(TcpSocket, std::move(AsioBuffers), asio::transfer_all(), OnCompletion); } private: enum class State : uint8_t { kUninitialized, kInitialized, kSending, kSent, kFailed }; uint32_t m_RequestNumber = 0; uint16_t m_ResponseCode = 0; bool m_IsKeepAlive = true; State m_State = State::kUninitialized; HttpContentType m_ContentType = HttpContentType::kBinary; uint64_t m_ContentLength = 0; eastl::fixed_vector m_DataBuffers; // This is here to keep the IoBuffer buffers/handles alive ExtendableStringBuilder<160> m_Headers; struct IoVec { bool IsFileRef; union { struct MemoryBuffer { const void* Data; uint64_t Size; } MemoryRef; IoBufferFileReference FileRef; } Ref; }; eastl::fixed_vector m_IoVecs; unsigned int m_IoVecCursor = 0; std::function m_SendCb; uint64_t m_TotalBytesSent = 0; }; ////////////////////////////////////////////////////////////////////////// struct HttpServerConnection : public HttpRequestParserCallbacks, std::enable_shared_from_this { HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr&& Socket); ~HttpServerConnection(); std::shared_ptr AsSharedPtr() { return shared_from_this(); } // HttpConnectionBase implementation virtual void TerminateConnection() override; virtual void HandleRequest() override; void HandleNewRequest(); private: enum class RequestState { kInitialState, kInitialRead, kReadingMore, kWriting, kWritingFinal, kDone, kTerminated }; const char* StateToString(RequestState State) { switch (State) { case RequestState::kInitialState: return "InitialState"; case RequestState::kInitialRead: return "InitialRead"; case RequestState::kReadingMore: return "ReadingMore"; case RequestState::kWriting: return "Writing"; case RequestState::kWritingFinal: return "WritingFinal"; case RequestState::kDone: return "Done"; case RequestState::kTerminated: return "Terminated"; default: return "Unknown"; } } RequestState m_RequestState = RequestState::kInitialState; HttpRequestParser m_RequestData{*this}; void EnqueueRead(); void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount); void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, uint32_t RequestNumber, HttpResponse* ResponseToPop); void CloseConnection(); HttpAsioServerImpl& m_Server; asio::streambuf m_RequestBuffer; std::atomic m_RequestCounter{0}; uint32_t m_ConnectionId = 0; Ref m_PackageHandler; RwLock m_ActiveResponsesLock; std::deque> m_ActiveResponses; std::unique_ptr m_Socket; }; std::atomic g_ConnectionIdCounter{0}; HttpServerConnection::HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr&& Socket) : m_Server(Server) , m_ConnectionId(g_ConnectionIdCounter.fetch_add(1)) , m_Socket(std::move(Socket)) { ZEN_TRACE_VERBOSE("new connection #{}", m_ConnectionId); } HttpServerConnection::~HttpServerConnection() { RwLock::ExclusiveLockScope _(m_ActiveResponsesLock); ZEN_TRACE_VERBOSE("destroying connection #{}", m_ConnectionId); } void HttpServerConnection::HandleNewRequest() { EnqueueRead(); } void HttpServerConnection::TerminateConnection() { if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kTerminated) { return; } m_RequestState = RequestState::kTerminated; ZEN_ASSERT(m_Socket); // Terminating, we don't care about any errors when closing socket std::error_code Ec; m_Socket->shutdown(asio::socket_base::shutdown_both, Ec); m_Socket->close(Ec); } void HttpServerConnection::EnqueueRead() { ZEN_MEMSCOPE(GetHttpasioTag()); if ((m_RequestState == RequestState::kInitialRead) || (m_RequestState == RequestState::kReadingMore)) { m_RequestState = RequestState::kReadingMore; } else { m_RequestState = RequestState::kInitialRead; } m_RequestBuffer.prepare(64 * 1024); asio::async_read(*m_Socket.get(), m_RequestBuffer, asio::transfer_at_least(1), [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnDataReceived(Ec, ByteCount); }); } void HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount) { ZEN_MEMSCOPE(GetHttpasioTag()); if (Ec) { switch (m_RequestState) { case RequestState::kDone: case RequestState::kInitialRead: case RequestState::kTerminated: ZEN_TRACE_VERBOSE("on data received ERROR (EXPECTED), connection: {}, reason: '{}'", m_ConnectionId, Ec.message()); return; default: ZEN_WARN("on data received ERROR, connection: {} (state: {}), reason '{}'", m_ConnectionId, StateToString(m_RequestState), Ec.message()); return TerminateConnection(); } } ZEN_TRACE_VERBOSE("on data received, connection: {}, request: {}, thread: {}, bytes: {}", m_ConnectionId, m_RequestCounter.load(std::memory_order_relaxed), zen::GetCurrentThreadId(), NiceBytes(ByteCount)); while (m_RequestBuffer.size()) { const asio::const_buffer& InputBuffer = m_RequestBuffer.data(); size_t Result = m_RequestData.ConsumeData((const char*)InputBuffer.data(), InputBuffer.size()); if (Result == ~0ull) { return TerminateConnection(); } m_RequestBuffer.consume(Result); } switch (m_RequestState) { case RequestState::kDone: case RequestState::kWritingFinal: case RequestState::kTerminated: break; default: EnqueueRead(); break; } } void HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount, [[maybe_unused]] uint32_t RequestNumber, HttpResponse* ResponseToPop) { ZEN_MEMSCOPE(GetHttpasioTag()); if (Ec) { ZEN_WARN("on data sent ERROR, connection: {} (state: {}), reason: '{}' (bytes: {})", m_ConnectionId, StateToString(m_RequestState), Ec.message(), ByteCount); TerminateConnection(); return; } ZEN_TRACE_VERBOSE("on data sent, connection: {}, request: {}, thread: {}, bytes: {}", m_ConnectionId, RequestNumber, zen::GetCurrentThreadId(), NiceBytes(ByteCount)); if (ResponseToPop) { m_ActiveResponsesLock.WithExclusiveLock([&] { // Once a response is sent we can release any referenced resources // // completion callbacks may be issued out-of-order so we need to // remove the relevant entry from our active response list, it may // not be the first if (auto It = find_if(begin(m_ActiveResponses), end(m_ActiveResponses), [ResponseToPop](const auto& Item) { return Item.get() == ResponseToPop; }); It != end(m_ActiveResponses)) { m_ActiveResponses.erase(It); } else { ZEN_WARN("response not found"); } }); } if (!m_RequestData.IsKeepAlive()) { CloseConnection(); } } void HttpServerConnection::CloseConnection() { ZEN_MEMSCOPE(GetHttpasioTag()); if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kTerminated) { return; } ZEN_ASSERT(m_Socket); m_RequestState = RequestState::kDone; std::error_code Ec; m_Socket->shutdown(asio::socket_base::shutdown_receive, Ec); if (Ec) { ZEN_WARN("socket shutdown ERROR, reason '{}'", Ec.message()); } m_Socket->close(Ec); if (Ec) { ZEN_WARN("socket close ERROR, reason '{}'", Ec.message()); } } void HttpServerConnection::HandleRequest() { ZEN_MEMSCOPE(GetHttpasioTag()); if (!m_RequestData.IsKeepAlive()) { m_RequestState = RequestState::kWritingFinal; std::error_code Ec; m_Socket->shutdown(asio::socket_base::shutdown_receive, Ec); if (Ec) { ZEN_WARN("socket shutdown ERROR, reason '{}'", Ec.message()); } } else { m_RequestState = RequestState::kWriting; } const uint32_t RequestNumber = m_RequestCounter.fetch_add(1); if (HttpService* Service = m_Server.RouteRequest(m_RequestData.Url())) { ZEN_TRACE_CPU("asio::HandleRequest"); bool IsLocalConnection = m_Socket->local_endpoint().address() == m_Socket->remote_endpoint().address(); HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body(), RequestNumber, IsLocalConnection); ZEN_TRACE_VERBOSE("handle request, connection: {}, request: {}'", m_ConnectionId, RequestNumber); const HttpVerb RequestVerb = Request.RequestVerb(); const std::string_view Uri = Request.RelativeUri(); if (m_Server.m_RequestLog.ShouldLog(logging::level::Trace)) { ZEN_LOG_TRACE(m_Server.m_RequestLog, "connection #{} Handling Request: {} {} ({} bytes ({}), accept: {})", m_ConnectionId, ToString(RequestVerb), Uri, Request.ContentLength(), ToString(Request.RequestContentType()), ToString(Request.AcceptContentType())); m_Server.m_RequestTracer.WriteDebugPayload(fmt::format("request_{}_{}.bin", m_ConnectionId, RequestNumber), std::vector{Request.ReadPayload()}); } IHttpRequestFilter::Result FilterResult = m_Server.FilterRequest(Request); if (FilterResult == IHttpRequestFilter::Result::Accepted) { if (!HandlePackageOffers(*Service, Request, m_PackageHandler)) { try { Service->HandleRequest(Request); } catch (const AssertException& AssertEx) { // Drop any partially formatted response Request.m_Response.reset(); ZEN_ERROR("Caught assert exception while handling request: {}", AssertEx.FullDescription()); Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, AssertEx.FullDescription()); } catch (const std::system_error& SystemError) { // Drop any partially formatted response Request.m_Response.reset(); if (IsOOM(SystemError.code()) || IsOOD(SystemError.code())) { Request.WriteResponse(HttpResponseCode::InsufficientStorage, HttpContentType::kText, SystemError.what()); } else { ZEN_WARN("Caught system error exception while handling request: {}. ({})", SystemError.what(), SystemError.code().value()); Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, SystemError.what()); } } catch (const std::bad_alloc& BadAlloc) { // Drop any partially formatted response Request.m_Response.reset(); Request.WriteResponse(HttpResponseCode::InsufficientStorage, HttpContentType::kText, BadAlloc.what()); } catch (const std::exception& ex) { // Drop any partially formatted response Request.m_Response.reset(); ZEN_WARN("Caught exception while handling request: {}", ex.what()); Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, ex.what()); } } } else if (FilterResult == IHttpRequestFilter::Result::Forbidden) { Request.WriteResponse(HttpResponseCode::Forbidden); } else { ZEN_ASSERT(FilterResult == IHttpRequestFilter::Result::ResponseSent); } if (std::unique_ptr Response = std::move(Request.m_Response)) { if (Request.ShouldLogRequest()) { ZEN_INFO("{} {} {} -> {}", ToString(RequestVerb), Uri, Response->ResponseCode(), NiceBytes(Response->ContentLength())); } // Transmit the response if (m_RequestData.RequestVerb() == HttpVerb::kHead) { ZEN_TRACE_CPU("asio::async_write"); std::string_view Headers = Response->GetHeaders(); std::vector AsioBuffers; AsioBuffers.push_back(asio::const_buffer(Headers.data(), Headers.size())); asio::async_write(*m_Socket.get(), AsioBuffers, asio::transfer_all(), [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); }); } else { ZEN_TRACE_CPU("asio::async_write"); HttpResponse* ResponseRaw = Response.get(); m_ActiveResponsesLock.WithExclusiveLock([&] { // Keep referenced resources alive m_ActiveResponses.push_back(std::move(Response)); }); ResponseRaw->SendResponse( *m_Socket, [Conn = AsSharedPtr(), ResponseRaw, RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ ResponseRaw); }); } return; } } if (m_RequestData.RequestVerb() == HttpVerb::kHead) { std::string_view Response = "HTTP/1.1 404 NOT FOUND\r\n" "\r\n"sv; if (!m_RequestData.IsKeepAlive()) { Response = "HTTP/1.1 404 NOT FOUND\r\n" "Connection: close\r\n" "\r\n"sv; } asio::async_write(*m_Socket.get(), asio::buffer(Response), [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); }); } else { std::string_view Response = "HTTP/1.1 404 NOT FOUND\r\n" "Content-Length: 23\r\n" "Content-Type: text/plain\r\n" "\r\n" "No suitable route found"sv; if (!m_RequestData.IsKeepAlive()) { Response = "HTTP/1.1 404 NOT FOUND\r\n" "Content-Length: 23\r\n" "Content-Type: text/plain\r\n" "Connection: close\r\n" "\r\n" "No suitable route found"sv; } asio::async_write(*m_Socket.get(), asio::buffer(Response), [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); }); } } ////////////////////////////////////////////////////////////////////////// struct HttpAcceptor { HttpAcceptor(HttpAsioServerImpl& Server, asio::io_service& IoService, uint16_t BasePort, bool ForceLoopback, bool AllowPortProbing) : m_Server(Server) , m_IoService(IoService) , m_Acceptor(m_IoService, asio::ip::tcp::v6()) , m_AlternateProtocolAcceptor(m_IoService, asio::ip::tcp::v4()) { const bool IsUsingIPv6 = IsIPv6Capable(); if (!IsUsingIPv6) { m_Acceptor = asio::ip::tcp::acceptor(m_IoService, asio::ip::tcp::v4()); } #if ZEN_PLATFORM_WINDOWS // Special option for Windows settings as !asio::socket_base::reuse_address is not the same as exclusive access on Windows platforms typedef asio::detail::socket_option::boolean exclusive_address; m_Acceptor.set_option(exclusive_address(true)); m_AlternateProtocolAcceptor.set_option(exclusive_address(true)); #else // ZEN_PLATFORM_WINDOWS m_Acceptor.set_option(asio::socket_base::reuse_address(false)); m_AlternateProtocolAcceptor.set_option(asio::socket_base::reuse_address(false)); #endif // ZEN_PLATFORM_WINDOWS m_Acceptor.set_option(asio::ip::tcp::no_delay(true)); m_Acceptor.set_option(asio::socket_base::receive_buffer_size(128 * 1024)); m_Acceptor.set_option(asio::socket_base::send_buffer_size(256 * 1024)); m_AlternateProtocolAcceptor.set_option(asio::ip::tcp::no_delay(true)); m_AlternateProtocolAcceptor.set_option(asio::socket_base::receive_buffer_size(128 * 1024)); m_AlternateProtocolAcceptor.set_option(asio::socket_base::send_buffer_size(256 * 1024)); std::string BoundBaseUrl; if (IsUsingIPv6) { BoundBaseUrl = BindAcceptor(BasePort, ForceLoopback, AllowPortProbing); } else { ZEN_INFO("NOTE: ipv6 support is disabled, binding to ipv4 only"); BoundBaseUrl = BindAcceptor(BasePort, ForceLoopback, AllowPortProbing); } if (!IsValid()) { return; } #if ZEN_PLATFORM_WINDOWS // On Windows, loopback connections can take advantage of a faster code path optionally with this flag. // This must be used by both the client and server side, and is only effective in the absence of // Windows Filtering Platform (WFP) callouts which can be installed by security software. // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path SOCKET NativeSocket = m_Acceptor.native_handle(); int LoopbackOptionValue = 1; DWORD OptionNumberOfBytesReturned = 0; WSAIoctl(NativeSocket, SIO_LOOPBACK_FAST_PATH, &LoopbackOptionValue, sizeof(LoopbackOptionValue), NULL, 0, &OptionNumberOfBytesReturned, 0, 0); if (m_UseAlternateProtocolAcceptor) { NativeSocket = m_AlternateProtocolAcceptor.native_handle(); WSAIoctl(NativeSocket, SIO_LOOPBACK_FAST_PATH, &LoopbackOptionValue, sizeof(LoopbackOptionValue), NULL, 0, &OptionNumberOfBytesReturned, 0, 0); } #endif m_Acceptor.listen(); if (m_UseAlternateProtocolAcceptor) { m_AlternateProtocolAcceptor.listen(); } ZEN_INFO("Started asio server at '{}", BoundBaseUrl); } ~HttpAcceptor() { m_Acceptor.close(); if (m_UseAlternateProtocolAcceptor) { m_AlternateProtocolAcceptor.close(); } } template std::string BindAcceptor(uint16_t BasePort, bool ForceLoopback, bool AllowPortProbing) { uint16_t EffectivePort = BasePort; AddressType BindAddress; if (ForceLoopback) { BindAddress = AddressType::loopback(); if constexpr (std::is_same_v) { m_Acceptor.set_option(asio::ip::v6_only(true)); } } else { BindAddress = AddressType::any(); } asio::error_code BindErrorCode; BindErrorCode = asio::error::access_denied; m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); if (BindErrorCode == asio::error::access_denied && !BindAddress.is_loopback()) { ZEN_INFO("Access denied for public port {}, falling back to loopback", BasePort); BindAddress = AddressType::loopback(); if constexpr (std::is_same_v) { m_Acceptor.set_option(asio::ip::v6_only(true)); } m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); } if (BindErrorCode == asio::error::address_in_use) { ZEN_INFO("Desired port {} is in use (bind returned '{}'), retrying", EffectivePort, BindErrorCode.message()); Sleep(500); m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); } if (AllowPortProbing) { // Try some alternative ports for (uint16_t PortOffset = 1; BindErrorCode && (PortOffset < 10); ++PortOffset) { EffectivePort = BasePort + (PortOffset * 100); m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); } if (BindErrorCode) { ZEN_INFO("Unable to bind to preferred port range, falling back to automatic assignment (bind returned '{}')", BindErrorCode.message()); EffectivePort = 0; m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); if (!BindErrorCode) { EffectivePort = m_Acceptor.local_endpoint().port(); } } } else { for (uint32_t Retries = 0; (BindErrorCode == asio::error::address_in_use) && (Retries < 3); Retries++) { ZEN_INFO("Desired port {} is in use (bind returned '{}'), retrying", EffectivePort, BindErrorCode.message()); Sleep(500); m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); } } if (BindErrorCode) { ZEN_WARN("Unable to initialize asio service, (bind returned '{}')", BindErrorCode.message()); return 0; } if (EffectivePort != BasePort) { ZEN_WARN("Desired port {} is in use, remapped to port {}", BasePort, EffectivePort); } if constexpr (std::is_same_v) { if (BindAddress.is_loopback()) { // IPv6 loopback will only respond on the IPv6 loopback address. Not everyone does // IPv6 though so we also bind to IPv4 loopback (localhost/127.0.0.1) m_AlternateProtocolAcceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), EffectivePort), BindErrorCode); if (BindErrorCode) { ZEN_WARN("Failed to register secondary IPv4 local-only handler 'http://{}:{}/'", "localhost", EffectivePort); } else { m_UseAlternateProtocolAcceptor = true; ZEN_INFO("Registered local-only handler 'http://{}:{}/' - this is not accessible from remote hosts", "localhost", EffectivePort); } } } m_IsValid = true; if constexpr (std::is_same_v) { return fmt::format("http://{}:{}'", BindAddress.is_loopback() ? "[::1]" : "*", EffectivePort); } else { return fmt::format("http://{}:{}'", BindAddress.is_loopback() ? "127.0.0.1" : "*", EffectivePort); } } void Start() { ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_ASSERT(!m_IsStopped); InitAcceptInternal(m_Acceptor); if (m_UseAlternateProtocolAcceptor) { InitAcceptInternal(m_AlternateProtocolAcceptor); } } void StopAccepting() { m_IsStopped = true; } int GetAcceptPort() { return m_Acceptor.local_endpoint().port(); } bool IsValid() const { return m_IsValid; } private: void InitAcceptInternal(asio::ip::tcp::acceptor& Acceptor) { auto SocketPtr = std::make_unique(m_IoService); asio::ip::tcp::socket& SocketRef = *SocketPtr.get(); Acceptor.async_accept(SocketRef, [this, &Acceptor, Socket = std::move(SocketPtr)](const asio::error_code& Ec) mutable { if (Ec) { ZEN_WARN("asio async_accept, connection failed to '{}:{}' reason '{}'", Acceptor.local_endpoint().address().to_string(), Acceptor.local_endpoint().port(), Ec.message()); } else { // New connection established, pass socket ownership into connection object // and initiate request handling loop. The connection lifetime is // managed by the async read/write loop by passing the shared // reference to the callbacks. Socket->set_option(asio::ip::tcp::no_delay(true)); Socket->set_option(asio::socket_base::receive_buffer_size(128 * 1024)); Socket->set_option(asio::socket_base::send_buffer_size(256 * 1024)); auto Conn = std::make_shared(m_Server, std::move(Socket)); Conn->HandleNewRequest(); } if (!m_IsStopped.load()) { InitAcceptInternal(Acceptor); } else { std::error_code CloseEc; Acceptor.close(CloseEc); if (CloseEc) { ZEN_WARN("acceptor close ERROR, reason '{}'", CloseEc.message()); } } }); } HttpAsioServerImpl& m_Server; asio::io_service& m_IoService; asio::ip::tcp::acceptor m_Acceptor; asio::ip::tcp::acceptor m_AlternateProtocolAcceptor; bool m_UseAlternateProtocolAcceptor{false}; bool m_IsValid{false}; std::atomic m_IsStopped{false}; }; ////////////////////////////////////////////////////////////////////////// HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer, uint32_t RequestNumber, bool IsLocalMachineRequest) : HttpServerRequest(Service) , m_Request(Request) , m_RequestNumber(RequestNumber) , m_PayloadBuffer(std::move(PayloadBuffer)) , m_IsLocalMachineRequest(IsLocalMachineRequest) { const int PrefixLength = Service.UriPrefixLength(); std::string_view Uri = Request.Url(); Uri.remove_prefix(std::min(PrefixLength, static_cast(Uri.size()))); m_Uri = Uri; m_UriWithExtension = Uri; m_QueryString = Request.QueryString(); m_Verb = Request.RequestVerb(); m_ContentLength = Request.Body().Size(); m_ContentType = Request.ContentType(); HttpContentType AcceptContentType = HttpContentType::kUnknownContentType; // Parse any extension, to allow requesting a particular response encoding via the URL { std::string_view UriSuffix8{m_Uri}; const size_t LastComponentIndex = UriSuffix8.find_last_of('/'); if (LastComponentIndex != std::string_view::npos) { UriSuffix8.remove_prefix(LastComponentIndex); } const size_t LastDotIndex = UriSuffix8.find_last_of('.'); if (LastDotIndex != std::string_view::npos) { UriSuffix8.remove_prefix(LastDotIndex + 1); AcceptContentType = ParseContentType(UriSuffix8); if (AcceptContentType != HttpContentType::kUnknownContentType) { m_Uri.remove_suffix(uint32_t(UriSuffix8.size() + 1)); } } } // It an explicit content type extension was specified then we'll use that over any // Accept: header value that may be present if (AcceptContentType != HttpContentType::kUnknownContentType) { m_AcceptType = AcceptContentType; } else { m_AcceptType = Request.AcceptType(); } } HttpAsioServerRequest::~HttpAsioServerRequest() { } Oid HttpAsioServerRequest::ParseSessionId() const { return m_Request.SessionId(); } uint32_t HttpAsioServerRequest::ParseRequestId() const { return m_Request.RequestId(); } bool HttpAsioServerRequest::IsLocalMachineRequest() const { return m_IsLocalMachineRequest; } std::string_view HttpAsioServerRequest::GetAuthorizationHeader() const { return m_Request.AuthorizationHeader(); } IoBuffer HttpAsioServerRequest::ReadPayload() { return m_PayloadBuffer; } void HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode) { ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_ASSERT(!m_Response); m_Response.reset(new HttpResponse(HttpContentType::kBinary, m_RequestNumber)); std::array Empty; m_Response->InitializeForPayload((uint16_t)ResponseCode, Empty); } void HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span Blobs) { ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_ASSERT(!m_Response); m_Response.reset(new HttpResponse(ContentType, m_RequestNumber)); m_Response->InitializeForPayload((uint16_t)ResponseCode, Blobs); } void HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) { ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_ASSERT(!m_Response); m_Response.reset(new HttpResponse(ContentType, m_RequestNumber)); IoBuffer MessageBuffer(IoBuffer::Wrap, ResponseString.data(), ResponseString.size()); std::array SingleBufferList({MessageBuffer}); m_Response->InitializeForPayload((uint16_t)ResponseCode, SingleBufferList); } void HttpAsioServerRequest::WriteResponseAsync(std::function&& ContinuationHandler) { ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_ASSERT(!m_Response); // Not one bit async, innit ContinuationHandler(*this); } bool HttpAsioServerRequest::TryGetRanges(HttpRanges& Ranges) { return TryParseHttpRangeHeader(m_Request.RangeHeader(), Ranges); } ////////////////////////////////////////////////////////////////////////// HttpAsioServerImpl::HttpAsioServerImpl() : m_RequestLog(logging::Get("http_requests")) { } HttpAsioServerImpl::~HttpAsioServerImpl() { } void HttpAsioServerImpl::Initialize(std::filesystem::path DataDir) { m_RequestTracer.Initialize(DataDir); } int HttpAsioServerImpl::Start(uint16_t Port, const AsioConfig& Config) { ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_ASSERT(Config.ThreadCount > 0); ZEN_INFO("starting asio http with {} service threads", Config.ThreadCount); m_Acceptor.reset( new asio_http::HttpAcceptor(*this, m_IoService, Port, Config.ForceLoopback, /*AllowPortProbing */ !Config.IsDedicatedServer)); if (!m_Acceptor->IsValid()) { return 0; } m_Acceptor->Start(); // This should consist of a set of minimum threads and grow on demand to // meet concurrency needs? Right now we end up allocating a large number // of threads even if we never end up using all of them, which seems // wasteful. It's also not clear how the demand for concurrency should // be balanced with the engine side - ideally we'd have some kind of // global scheduling to prevent one side from preventing the other side // from making progress. Or at the very least, thread priorities should // be considered. for (unsigned int i = 0; i < Config.ThreadCount; ++i) { m_ThreadPool.emplace_back([this, Index = i + 1] { ZEN_MEMSCOPE(GetHttpasioTag()); SetCurrentThreadName(fmt::format("asio_io_{}", Index)); try { m_IoService.run(); } catch (const AssertException& AssertEx) { ZEN_ERROR("Assert caught in asio event loop: {}", AssertEx.FullDescription()); } catch (const std::exception& e) { ZEN_ERROR("Exception caught in asio event loop: '{}'", e.what()); } }); } ZEN_INFO("asio http started in {} mode, using {} threads on port {}", Config.IsDedicatedServer ? "DEDICATED" : "NORMAL", Config.ThreadCount, m_Acceptor->GetAcceptPort()); return m_Acceptor->GetAcceptPort(); } void HttpAsioServerImpl::Stop() { ZEN_MEMSCOPE(GetHttpasioTag()); if (m_Acceptor) { m_Acceptor->StopAccepting(); } m_IoService.stop(); for (auto& Thread : m_ThreadPool) { if (Thread.joinable()) { Thread.join(); } } m_ThreadPool.clear(); m_Acceptor.reset(); } void HttpAsioServerImpl::RegisterService(const char* InUrlPath, HttpService& Service) { ZEN_MEMSCOPE(GetHttpasioTag()); std::string_view UrlPath(InUrlPath); Service.SetUriPrefixLength(UrlPath.size()); if (!UrlPath.empty() && UrlPath.back() == '/') { UrlPath.remove_suffix(1); } RwLock::ExclusiveLockScope _(m_Lock); m_UriHandlers.push_back({std::string(UrlPath), &Service}); } HttpService* HttpAsioServerImpl::RouteRequest(std::string_view Url) { ZEN_MEMSCOPE(GetHttpasioTag()); RwLock::SharedLockScope _(m_Lock); HttpService* CandidateService = nullptr; std::string::size_type CandidateMatchSize = 0; for (const ServiceEntry& SvcEntry : m_UriHandlers) { const std::string& SvcUrl = SvcEntry.ServiceUrlPath; const std::string::size_type SvcUrlSize = SvcUrl.size(); if ((SvcUrlSize >= CandidateMatchSize) && Url.compare(0, SvcUrlSize, SvcUrl) == 0 && ((SvcUrlSize == Url.size()) || (Url[SvcUrlSize] == '/'))) { CandidateMatchSize = SvcUrl.size(); CandidateService = SvcEntry.Service; } } return CandidateService; } void HttpAsioServerImpl::SetHttpRequestFilter(IHttpRequestFilter* RequestFilter) { ZEN_MEMSCOPE(GetHttpasioTag()); RwLock::ExclusiveLockScope _(m_Lock); m_HttpRequestFilter.store(RequestFilter); } IHttpRequestFilter::Result HttpAsioServerImpl::FilterRequest(HttpServerRequest& Request) { if (!m_HttpRequestFilter.load()) { return IHttpRequestFilter::Result::Accepted; } RwLock::SharedLockScope _(m_Lock); IHttpRequestFilter* RequestFilter = m_HttpRequestFilter.load(); if (!RequestFilter) { return IHttpRequestFilter::Result::Accepted; } return RequestFilter->FilterRequest(Request); } } // namespace zen::asio_http ////////////////////////////////////////////////////////////////////////// namespace zen { class HttpAsioServer : public HttpServer { public: HttpAsioServer(const AsioConfig& Config); ~HttpAsioServer(); virtual void OnRegisterService(HttpService& Service) override; virtual int OnInitialize(int BasePort, std::filesystem::path DataDir) override; virtual void OnSetHttpRequestFilter(IHttpRequestFilter* RequestFilter) override; virtual void OnRun(bool IsInteractiveSession) override; virtual void OnRequestExit() override; virtual void OnClose() override; private: Event m_ShutdownEvent; int m_BasePort = 0; const AsioConfig m_InitialConfig; std::unique_ptr m_Impl; }; HttpAsioServer::HttpAsioServer(const AsioConfig& Config) : m_InitialConfig(Config) , m_Impl(std::make_unique()) { ZEN_DEBUG("Request object size: {} ({:#x})", sizeof(HttpRequestParser), sizeof(HttpRequestParser)); } HttpAsioServer::~HttpAsioServer() { if (m_Impl) { ZEN_ERROR("~HttpAsioServer() called without calling Close() first"); } } void HttpAsioServer::OnClose() { try { m_Impl->Stop(); } catch (const std::exception& ex) { ZEN_WARN("Caught exception stopping http asio server: {}", ex.what()); } m_Impl.reset(); } void HttpAsioServer::OnRegisterService(HttpService& Service) { m_Impl->RegisterService(Service.BaseUri(), Service); } void HttpAsioServer::OnSetHttpRequestFilter(IHttpRequestFilter* RequestFilter) { m_Impl->SetHttpRequestFilter(RequestFilter); } int HttpAsioServer::OnInitialize(int BasePort, std::filesystem::path DataDir) { ZEN_TRACE_CPU("HttpAsioServer::Initialize"); m_Impl->Initialize(DataDir); AsioConfig Config = m_InitialConfig; Config.ThreadCount = m_InitialConfig.ThreadCount != 0 ? m_InitialConfig.ThreadCount : Max(GetHardwareConcurrency(), 8u); if (Config.IsDedicatedServer && m_InitialConfig.ThreadCount == 0) { // In order to limit the potential impact of threads stuck // in locks we allow the thread pool to be oversubscribed // by a fair amount Config.ThreadCount *= 2; } m_BasePort = m_Impl->Start(gsl::narrow(BasePort), Config); return m_BasePort; } void HttpAsioServer::OnRun(bool IsInteractive) { const int WaitTimeout = 1000; bool ShutdownRequested = false; #if ZEN_PLATFORM_WINDOWS if (IsInteractive) { ZEN_CONSOLE("Zen Server running (asio HTTP). Press ESC or Q to quit"); } do { if (IsInteractive && _kbhit() != 0) { char c = (char)_getch(); if (c == 27 || c == 'Q' || c == 'q') { m_ShutdownEvent.Set(); RequestApplicationExit(0); } } ShutdownRequested = m_ShutdownEvent.Wait(WaitTimeout); } while (!ShutdownRequested); #else if (IsInteractive) { ZEN_CONSOLE("Zen Server running (asio HTTP). Ctrl-C to quit"); } do { ShutdownRequested = m_ShutdownEvent.Wait(WaitTimeout); } while (!ShutdownRequested); #endif } void HttpAsioServer::OnRequestExit() { m_ShutdownEvent.Set(); } Ref CreateHttpAsioServer(const AsioConfig& Config) { ZEN_TRACE_CPU("CreateHttpAsioServer"); ZEN_MEMSCOPE(GetHttpasioTag()); return Ref{new HttpAsioServer(Config)}; } } // namespace zen