diff options
| author | Stefan Boberg <[email protected]> | 2026-01-14 10:32:37 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-01-14 10:32:37 +0100 |
| commit | f99a4687ff78e2823332e09d0e35cb16f361ff93 (patch) | |
| tree | 58020dafaad9e1c9664698e3d36884667cb5c465 /src/zenhttp/servers/httpasio.cpp | |
| parent | 5.7.16-pre2 (diff) | |
| download | zen-f99a4687ff78e2823332e09d0e35cb16f361ff93.tar.xz zen-f99a4687ff78e2823332e09d0e35cb16f361ff93.zip | |
asio/http optimizations (#449)
This change primarily introduces improved logic for dealing with sending data from file references.
This is intended to reduce the amount of memory-mapping we end up doing when sending data from files. Windows now uses `TransmitFile` to send file data more efficiently using kernel-side I/O, but Linux/Mac basically behaves as before since they don't offer any true async file I/O support via asio. This should be implemented separately using a background I/O thread pool.
This PR also includes improved memory management for http/asio with reduced allocation counts, and a fix for a potential use-after-free in very high load scenarios.
Diffstat (limited to 'src/zenhttp/servers/httpasio.cpp')
| -rw-r--r-- | src/zenhttp/servers/httpasio.cpp | 742 |
1 files changed, 638 insertions, 104 deletions
diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index a0431b0b1..be4e73576 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -4,6 +4,7 @@ #include "httptracer.h" #include <zencore/except.h> +#include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/memory/llm.h> #include <zencore/thread.h> @@ -13,6 +14,8 @@ #include "httpparser.h" +#include <EASTL/fixed_vector.h> + #include <deque> #include <memory> #include <string_view> @@ -28,6 +31,7 @@ ZEN_THIRD_PARTY_INCLUDES_START # include <errno.h> #endif #include <asio.hpp> +#include <asio/stream_file.hpp> ZEN_THIRD_PARTY_INCLUDES_END #define ASIO_VERBOSE_TRACE 0 @@ -154,6 +158,345 @@ Log() ////////////////////////////////////////////////////////////////////////// +#if !defined(ASIO_HAS_FILE) +# define ASIO_HAS_FILE 0 +#endif + +#if defined(ASIO_HAS_WINDOWS_OVERLAPPED_PTR) +# define ZEN_USE_TRANSMITFILE 1 +# define ZEN_USE_ASYNC_SENDFILE ASIO_HAS_FILE +#else +# define ZEN_USE_TRANSMITFILE 0 +# define ZEN_USE_ASYNC_SENDFILE 0 +#endif + +#if ZEN_USE_TRANSMITFILE +template<typename Handler> +void +TransmitFileAsync(asio::ip::tcp::socket& Socket, HANDLE FileHandle, uint64_t ByteOffset, uint32_t ByteSize, Handler&& Cb) +{ + // We need to establish a new handle here to avoid running into random errors + // during TransmitFile. I'm not entirely sure why it's necessary yet. + + HANDLE hReopenedFile = + ReOpenFile(FileHandle, FILE_GENERIC_READ, FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, FILE_FLAG_OVERLAPPED); + + const uint64_t FileSize = FileSizeFromHandle(FileHandle); + const uint64_t SendEndOffset = ByteOffset + ByteSize; + + if (SendEndOffset > FileSize) + { + std::error_code DummyEc; + + ZEN_WARN("TransmitFileAsync (offset {:#x}, size {:#x}) file: '{}' (size {:#x})) tries to transmit {} bytes too many", + ByteOffset, + ByteSize, + PathFromHandle(FileHandle, DummyEc), + FileSizeFromHandle(FileHandle), + SendEndOffset - FileSize); + } + + asio::windows::overlapped_ptr OverlappedPtr( + Socket.get_executor(), + [WrappedCb = std::move(Cb), ExpectedBytes = ByteSize, FileHandle, ByteOffset, ByteSize, hReopenedFile]( + const std::error_code& Ec, + std::size_t ActualBytesTransferred) { + if (Ec) + { + std::error_code DummyEc; + ZEN_WARN("NOTE: TransmitFileAsync (offset {:#x}, size {:#x}) file: '{}' (size {:#x})) error '{}', transmitted {} bytes", + ByteOffset, + ByteSize, + PathFromHandle(hReopenedFile, DummyEc), + FileSizeFromHandle(hReopenedFile), + Ec.message(), + ActualBytesTransferred); + } + + CloseHandle(hReopenedFile); + WrappedCb(Ec, ActualBytesTransferred); + }); + + OVERLAPPED* RawOverlapped = OverlappedPtr.get(); + RawOverlapped->Offset = uint32_t(ByteOffset & 0xffffFFFFull); + RawOverlapped->OffsetHigh = uint32_t(ByteOffset >> 32); + + const DWORD NumberOfBytesPerSend = 0; // let TransmitFile decide + + const BOOL Ok = + ::TransmitFile(Socket.native_handle(), hReopenedFile, ByteSize, NumberOfBytesPerSend, RawOverlapped, nullptr, /* dwReserved */ 0); + const DWORD LastError = ::GetLastError(); + + // Check if the operation completed immediately. + if (!Ok && LastError != ERROR_IO_PENDING) + { + // The operation completed immediately, so a completion notification needs + // to be posted. When complete() is called, ownership of the OVERLAPPED- + // derived object passes to the io_context. + + asio::error_code ec(LastError, asio::error::get_system_category()); + OverlappedPtr.complete(ec, 0); + } + else + { + // The operation was successfully initiated, so ownership of the + // OVERLAPPED-derived object has passed to the io_context. + + OverlappedPtr.release(); + } +} +#endif // ZEN_USE_TRANSMITFILE + +#if ZEN_USE_ASYNC_SENDFILE + +// Pipelined file sender that reads from a file and writes to a socket using two buffers +// to pipeline reads and writes. Unfortunately this strategy can't currently be used on +// non-Windows platforms as they don't currently support async file reads. We'll have +// to build a mechanism using a thread pool for that, perhaps with optional support +// for io_uring where available since that should be the most efficient. +// +// In other words, this is not super useful as Windows already has the TransmitFile +// version above, but it's here for completeness and potential future use on other platforms. + +template<class AsyncWriteStream, class CompletionHandler> +class PipelinedFileSender : public std::enable_shared_from_this<PipelinedFileSender<AsyncWriteStream, CompletionHandler>> +{ +public: + PipelinedFileSender(AsyncWriteStream& WriteSocket, + asio::stream_file&& FileToReadFrom, + uint64_t ByteSize, + std::size_t BufferSize, + CompletionHandler&& CompletionToken) + : m_WriteSocket(WriteSocket) + , m_SourceFile(std::move(FileToReadFrom)) + , m_Strand(asio::make_strand(m_WriteSocket.get_executor())) + , m_Buffers{std::vector<char>(BufferSize), std::vector<char>(BufferSize)} + , m_CompletionHandler(std::move(CompletionToken)) + , m_TotalBytesToRead(ByteSize) + , m_RemainingBytesToRead(ByteSize) + { + } + + void Start() { EnqueueRead(); } + +private: + struct Slot + { + std::size_t Size = 0; // valid bytes in buffer + bool Ready = false; // has unwritten data + bool InUse = false; // being written + }; + + void OnSendCompleted(const std::error_code& Ec) + { + // TODO: ensure this behaves properly for instance if the write fails while a read is pending + + if (m_SendCompleted) + { + return; + } + + m_SendCompleted = true; + + // Ensure completion runs on the strand/executor. + + asio::dispatch(m_Strand, [CompletionHandler = std::move(m_CompletionHandler), Ec, TotalBytesWritten = m_TotalBytesWritten]() { + CompletionHandler(Ec, TotalBytesWritten); + }); + } + + void EnqueueRead() + { + asio::dispatch(m_Strand, [self = this->shared_from_this()] { + self->TryPostRead(); + self->PumpWrites(); + }); + } + + void TryPostRead() + { + if (m_IsEof || m_ReadInFlight || m_RemainingBytesToRead == 0) + { + return; + } + + const int ReadSlotIndex = GetFreeSlotIndex(); + if (ReadSlotIndex < 0) + { + // no free slot; wait for writer to free one (not meant to ever happen) + return; + } + + m_ReadInFlight = true; + auto ReadBuffer = asio::buffer(m_Buffers[ReadSlotIndex].data(), zen::Min(m_Buffers[ReadSlotIndex].size(), m_RemainingBytesToRead)); + + asio::async_read( + m_SourceFile, + ReadBuffer, + asio::bind_executor(m_Strand, + [self = this->shared_from_this(), ReadSlotIndex](const std::error_code& Ec, std::size_t ActualBytesRead) { + self->m_ReadInFlight = false; + self->m_RemainingBytesToRead -= ActualBytesRead; + + if (Ec) + { + if (Ec == asio::error::eof) + { + ZEN_ASSERT(self->m_RemainingBytesToRead == 0); + + self->m_IsEof = true; + + // No data produced on EOF; just try to pump whatever is left + self->PumpWrites(); + } + else + { + // read error, cancel everything and let outer completion handler know + self->OnSendCompleted(Ec); + } + } + else + { + // Mark slot as ready with ActualBytesRead valid bytes of data in buffer + self->m_Slots[ReadSlotIndex].Size = ActualBytesRead; + self->m_Slots[ReadSlotIndex].Ready = true; + self->PumpWrites(); + self->TryPostRead(); + } + })); + } + + void PumpWrites() + { + if (m_WriteInFlight) + { + return; + } + + const int WriteSlotIndex = GetReadySlotIndex(); + if (WriteSlotIndex < 0) + { + // No ready data. We're done if EOF/no more data to read and no reads in flight and nothing ready + if (!m_ReadInFlight && (m_IsEof || m_RemainingBytesToRead == 0)) + { + // all done + return OnSendCompleted({}); + } + + return; + } + + m_WriteInFlight = true; + m_Slots[WriteSlotIndex].InUse = true; + + asio::async_write( + m_WriteSocket, + asio::buffer(m_Buffers[WriteSlotIndex].data(), m_Slots[WriteSlotIndex].Size), + asio::bind_executor(m_Strand, + [self = this->shared_from_this(), WriteSlotIndex](const std::error_code& Ec, std::size_t BytesWritten) { + self->m_TotalBytesWritten += BytesWritten; + self->m_WriteInFlight = false; + + if (Ec) + { + self->OnSendCompleted(Ec); + + return; + } + else + { + // Free the slot + self->m_Slots[WriteSlotIndex].Ready = false; + self->m_Slots[WriteSlotIndex].InUse = false; + self->m_Slots[WriteSlotIndex].Size = 0; + + self->TryPostRead(); + self->PumpWrites(); + } + })); + } + + int GetFreeSlotIndex() const + { + for (int i = 0; i < 2; ++i) + { + if (!m_Slots[i].Ready && !m_Slots[i].InUse) + { + return i; + } + } + return -1; + } + + int GetReadySlotIndex() const + { + for (int i = 0; i < 2; ++i) + { + if (m_Slots[i].Ready && !m_Slots[i].InUse) + { + return i; + } + } + return -1; + } + + AsyncWriteStream& m_WriteSocket; + asio::stream_file m_SourceFile; + asio::strand<asio::any_io_executor> m_Strand; + + // There's no synchronization needed for these as all access is via the strand + std::vector<char> m_Buffers[2]; + Slot m_Slots[2]; + + bool m_IsEof = false; + bool m_ReadInFlight = false; + bool m_WriteInFlight = false; + bool m_SendCompleted = false; + + const uint64_t m_TotalBytesToRead = 0; + uint64_t m_RemainingBytesToRead = 0; + uint64_t m_TotalBytesWritten = 0; + + CompletionHandler m_CompletionHandler; +}; + +template<class AsyncWriteStream, class CompletionHandler> +void +SendFileAsync(AsyncWriteStream& WriteSocket, + const auto FileHandle, + uint64_t ByteOffset, + uint64_t ByteSize, + std::size_t BufferSize, + CompletionHandler&& CompletionToken) +{ + HANDLE hReopenedFile = + ReOpenFile(FileHandle, FILE_GENERIC_READ, FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, FILE_FLAG_OVERLAPPED); + + // Note that this assumes ownership of the handle + asio::stream_file SourceFile(WriteSocket.get_executor(), hReopenedFile); + + // TODO: handle any error properly here + SourceFile.seek(ByteOffset, asio::stream_file::seek_set); + + if (BufferSize > ByteSize) + { + BufferSize = ByteSize; + } + + auto op = std::make_shared<PipelinedFileSender<AsyncWriteStream, std::decay_t<CompletionHandler>>>( + WriteSocket, + std::move(SourceFile), + ByteSize, + BufferSize, + std::forward<CompletionHandler>(CompletionToken)); + + // Start the pipeline + op->Start(); +} +#endif // ZEN_USE_ASYNC_SENDFILE + +////////////////////////////////////////////////////////////////////////// + struct HttpAsioServerImpl { public: @@ -191,7 +534,7 @@ public: class HttpAsioServerRequest : public HttpServerRequest { public: - HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer); + HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer, uint32_t RequestNumber); ~HttpAsioServerRequest(); virtual Oid ParseSessionId() const override; @@ -210,6 +553,7 @@ public: HttpAsioServerRequest& operator=(const HttpAsioServerRequest&) = delete; HttpRequestParser& m_Request; + uint32_t m_RequestNumber = 0; // Note: different to request ID which is derived from headers IoBuffer m_PayloadBuffer; std::unique_ptr<HttpResponse> m_Response; }; @@ -218,7 +562,11 @@ struct HttpResponse { public: HttpResponse() = default; - explicit HttpResponse(HttpContentType ContentType) : m_ContentType(ContentType) {} + explicit HttpResponse(HttpContentType ContentType, uint32_t RequestNumber) : m_RequestNumber(RequestNumber), m_ContentType(ContentType) + { + } + + ~HttpResponse() = default; void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> BlobList) { @@ -230,57 +578,61 @@ public: const uint32_t ChunkCount = gsl::narrow<uint32_t>(BlobList.size()); m_DataBuffers.reserve(ChunkCount); + m_IoVecCount = ChunkCount + 1 /* one extra buffer for headers */; + m_IoVecs.resize(m_IoVecCount); - for (IoBuffer& Buffer : BlobList) - { -#if 1 - m_DataBuffers.emplace_back(std::move(Buffer)).MakeOwned(); -#else - IoBuffer TempBuffer = std::move(Buffer); - TempBuffer.MakeOwned(); - m_DataBuffers.emplace_back(IoBufferBuilder::ReadFromFileMaybe(TempBuffer)); -#endif - } + m_IoVecCursor = 0; uint64_t LocalDataSize = 0; + int Index = 1; - m_AsioBuffers.push_back({}); // Placeholder for header - - for (IoBuffer& Buffer : m_DataBuffers) + for (IoBuffer& Buffer : BlobList) { - uint64_t BufferDataSize = Buffer.Size(); + const uint64_t BufferDataSize = Buffer.Size(); ZEN_ASSERT(BufferDataSize); LocalDataSize += BufferDataSize; - IoBufferFileReference FileRef; - if (Buffer.GetFileReference(/* out */ FileRef)) - { - // TODO: Use direct file transfer, via TransmitFile/sendfile - // - // this looks like it requires some custom asio plumbing however + IoBuffer OwnedBuffer = std::move(Buffer); + OwnedBuffer.MakeOwned(); - m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()}); - } - else + IoVec& Io = m_IoVecs[Index++]; + + bool ChunkHandled = false; + +#if ZEN_USE_TRANSMITFILE || ZEN_USE_ASYNC_SENDFILE + if (IoBufferFileReference FileRef; OwnedBuffer.GetFileReference(/* out */ FileRef)) { - // Send from memory + Io.IsFileRef = true; + Io.Ref.FileRef = FileRef; + ChunkHandled = true; + } +#endif - m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()}); + if (!ChunkHandled) + { + Io.IsFileRef = false; + uint32_t Size = gsl::narrow<uint32_t>(OwnedBuffer.Size()); + Io.Ref.MemoryRef = {OwnedBuffer.Data(), Size}; } + + m_DataBuffers.emplace_back(OwnedBuffer); } + m_ContentLength = LocalDataSize; std::string_view Headers = GetHeaders(); - m_AsioBuffers[0] = asio::const_buffer(Headers.data(), Headers.size()); + + IoVec& Io = m_IoVecs[0]; + + Io.IsFileRef = false; + Io.Ref.MemoryRef = {.Data = Headers.data(), .Size = gsl::narrow_cast<uint32_t>(Headers.size())}; } uint16_t ResponseCode() const { return m_ResponseCode; } uint64_t ContentLength() const { return m_ContentLength; } - const std::vector<asio::const_buffer>& AsioBuffers() const { return m_AsioBuffers; } - std::string_view GetHeaders() { ZEN_MEMSCOPE(GetHttpasioTag()); @@ -299,16 +651,137 @@ public: return m_Headers; } - void SuppressPayload() { m_AsioBuffers.resize(1); } + void SendResponse(asio::ip::tcp::socket& TcpSocket, std::function<void(const asio::error_code& Ec, std::size_t ByteCount)>&& Token) + { + m_SendCb = std::move(Token); + + SendNextChunk(TcpSocket); + } + + void SendNextChunk(asio::ip::tcp::socket& TcpSocket) + { + if (m_IoVecCursor == m_IoVecCount) + { + ZEN_ASSERT(m_SendCb); + + auto CompletionToken = [Self = this, Token = std::move(m_SendCb), TotalBytes = m_TotalBytesSent] { Token({}, TotalBytes); }; + + asio::defer(TcpSocket.get_executor(), std::move(CompletionToken)); + + return; + } + + const IoVec& Io = m_IoVecs[m_IoVecCursor++]; + + if (Io.IsFileRef) + { + ZEN_TRACE_VERBOSE("SendNextChunk from FILE, thread: {}, bytes: {}", zen::GetCurrentThreadId(), Io.Ref.FileRef.FileChunkSize); + +#if ZEN_USE_TRANSMITFILE + TransmitFileAsync(TcpSocket, + Io.Ref.FileRef.FileHandle, + Io.Ref.FileRef.FileChunkOffset, + gsl::narrow_cast<uint32_t>(Io.Ref.FileRef.FileChunkSize), + [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) { + m_TotalBytesSent += ByteCount; + if (Ec) + { + m_SendCb(Ec, m_TotalBytesSent); + } + else + { + SendNextChunk(TcpSocket); + } + }); +#elif ZEN_USE_ASYNC_SENDFILE + SendFileAsync(TcpSocket, + Io.Ref.FileRef.FileHandle, + Io.Ref.FileRef.FileChunkOffset, + Io.Ref.FileRef.FileChunkSize, + 64 * 1024, + [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) { + m_TotalBytesSent += ByteCount; + if (Ec) + { + m_SendCb(Ec, m_TotalBytesSent); + } + else + { + SendNextChunk(TcpSocket); + } + }); +#else + // This should never occur unless we compile with one + // of the options above + ZEN_ASSERT("invalid file reference in response"); +#endif + + return; + } + + // Send as many consecutive non-file references as possible in one asio operation + + std::vector<asio::const_buffer> AsioBuffers; + AsioBuffers.push_back(asio::const_buffer{Io.Ref.MemoryRef.Data, Io.Ref.MemoryRef.Size}); + + while (m_IoVecCursor != m_IoVecCount) + { + const IoVec& Io2 = m_IoVecs[m_IoVecCursor]; + + if (Io2.IsFileRef) + { + break; + } + + AsioBuffers.push_back(asio::const_buffer{Io2.Ref.MemoryRef.Data, Io2.Ref.MemoryRef.Size}); + ++m_IoVecCursor; + } + + asio::async_write(TcpSocket, + std::move(AsioBuffers), + asio::transfer_all(), + [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) { + m_TotalBytesSent += ByteCount; + if (Ec) + { + m_SendCb(Ec, m_TotalBytesSent); + } + else + { + SendNextChunk(TcpSocket); + } + }); + } private: - uint16_t m_ResponseCode = 0; - bool m_IsKeepAlive = true; - HttpContentType m_ContentType = HttpContentType::kBinary; - uint64_t m_ContentLength = 0; - std::vector<IoBuffer> m_DataBuffers; - std::vector<asio::const_buffer> m_AsioBuffers; - ExtendableStringBuilder<160> m_Headers; + uint32_t m_RequestNumber = 0; + uint16_t m_ResponseCode = 0; + bool m_IsKeepAlive = true; + HttpContentType m_ContentType = HttpContentType::kBinary; + uint64_t m_ContentLength = 0; + eastl::fixed_vector<IoBuffer, 8> m_DataBuffers; // This is here to keep the IoBuffer buffers/handles alive + ExtendableStringBuilder<160> m_Headers; + + struct IoVec + { + bool IsFileRef; + union + { + struct MemoryBuffer + { + const void* Data; + uint32_t Size; + } MemoryRef; + IoBufferFileReference FileRef; + } Ref; + }; + + eastl::fixed_vector<IoVec, 8> m_IoVecs; + int m_IoVecCursor = 0; + int m_IoVecCount = 0; + + std::function<void(const asio::error_code& Ec, std::size_t ByteCount)> m_SendCb; + uint64_t m_TotalBytesSent = 0; }; ////////////////////////////////////////////////////////////////////////// @@ -339,37 +812,63 @@ private: kTerminated }; + const char* StateToString(RequestState State) + { + switch (State) + { + case RequestState::kInitialState: + return "InitialState"; + case RequestState::kInitialRead: + return "InitialRead"; + case RequestState::kReadingMore: + return "ReadingMore"; + case RequestState::kWriting: + return "Writing"; + case RequestState::kWritingFinal: + return "WritingFinal"; + case RequestState::kDone: + return "Done"; + case RequestState::kTerminated: + return "Terminated"; + default: + return "Unknown"; + } + } + RequestState m_RequestState = RequestState::kInitialState; HttpRequestParser m_RequestData{*this}; void EnqueueRead(); void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount); - void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop = false); + void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, uint32_t RequestNumber, HttpResponse* ResponseToPop); void CloseConnection(); - HttpAsioServerImpl& m_Server; - asio::streambuf m_RequestBuffer; - std::unique_ptr<asio::ip::tcp::socket> m_Socket; - std::atomic<uint32_t> m_RequestCounter{0}; - uint32_t m_ConnectionId = 0; - Ref<IHttpPackageHandler> m_PackageHandler; + HttpAsioServerImpl& m_Server; + asio::streambuf m_RequestBuffer; + std::atomic<uint32_t> m_RequestCounter{0}; + uint32_t m_ConnectionId = 0; + Ref<IHttpPackageHandler> m_PackageHandler; - RwLock m_ResponsesLock; - std::deque<std::unique_ptr<HttpResponse>> m_Responses; + RwLock m_ActiveResponsesLock; + std::deque<std::unique_ptr<HttpResponse>> m_ActiveResponses; + + std::unique_ptr<asio::ip::tcp::socket> m_Socket; }; std::atomic<uint32_t> g_ConnectionIdCounter{0}; HttpServerConnection::HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr<asio::ip::tcp::socket>&& Socket) : m_Server(Server) -, m_Socket(std::move(Socket)) , m_ConnectionId(g_ConnectionIdCounter.fetch_add(1)) +, m_Socket(std::move(Socket)) { ZEN_TRACE_VERBOSE("new connection #{}", m_ConnectionId); } HttpServerConnection::~HttpServerConnection() { + RwLock::ExclusiveLockScope _(m_ActiveResponsesLock); + ZEN_TRACE_VERBOSE("destroying connection #{}", m_ConnectionId); } @@ -434,7 +933,11 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused] return; default: - ZEN_WARN("on data received ERROR, connection: {}, reason '{}'", m_ConnectionId, Ec.message()); + ZEN_WARN("on data received ERROR, connection: {} (state: {}), reason '{}'", + m_ConnectionId, + StateToString(m_RequestState), + Ec.message()); + return TerminateConnection(); } } @@ -472,37 +975,58 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused] } void -HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount, bool Pop) +HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, + [[maybe_unused]] std::size_t ByteCount, + [[maybe_unused]] uint32_t RequestNumber, + HttpResponse* ResponseToPop) { ZEN_MEMSCOPE(GetHttpasioTag()); if (Ec) { - ZEN_WARN("on data sent ERROR, connection: {}, reason: '{}'", m_ConnectionId, Ec.message()); + ZEN_WARN("on data sent ERROR, connection: {} (state: {}), reason: '{}' (bytes: {})", + m_ConnectionId, + StateToString(m_RequestState), + Ec.message(), + ByteCount); + TerminateConnection(); + + return; } - else - { - ZEN_TRACE_VERBOSE("on data sent, connection: {}, request: {}, thread: {}, bytes: {}", - m_ConnectionId, - m_RequestCounter.load(std::memory_order_relaxed), - zen::GetCurrentThreadId(), - NiceBytes(ByteCount)); - if (!m_RequestData.IsKeepAlive()) - { - CloseConnection(); - } - else - { - if (Pop) + ZEN_TRACE_VERBOSE("on data sent, connection: {}, request: {}, thread: {}, bytes: {}", + m_ConnectionId, + RequestNumber, + zen::GetCurrentThreadId(), + NiceBytes(ByteCount)); + + if (ResponseToPop) + { + m_ActiveResponsesLock.WithExclusiveLock([&] { + // Once a response is sent we can release any referenced resources + // + // completion callbacks may be issued out-of-order so we need to + // remove the relevant entry from our active response list, it may + // not be the first + + if (auto It = find_if(begin(m_ActiveResponses), + end(m_ActiveResponses), + [ResponseToPop](const auto& Item) { return Item.get() == ResponseToPop; }); + It != end(m_ActiveResponses)) + { + m_ActiveResponses.erase(It); + } + else { - RwLock::ExclusiveLockScope _(m_ResponsesLock); - m_Responses.pop_front(); + ZEN_WARN("response not found"); } + }); + } - m_RequestCounter.fetch_add(1); - } + if (!m_RequestData.IsKeepAlive()) + { + CloseConnection(); } } @@ -553,13 +1077,13 @@ HttpServerConnection::HandleRequest() m_RequestState = RequestState::kWriting; } + const uint32_t RequestNumber = m_RequestCounter.fetch_add(1); + if (HttpService* Service = m_Server.RouteRequest(m_RequestData.Url())) { ZEN_TRACE_CPU("asio::HandleRequest"); - const uint32_t RequestNumber = m_RequestCounter.load(std::memory_order_relaxed); - - HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body()); + HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body(), RequestNumber); ZEN_TRACE_VERBOSE("handle request, connection: {}, request: {}'", m_ConnectionId, RequestNumber); @@ -635,34 +1159,38 @@ HttpServerConnection::HandleRequest() if (m_RequestData.RequestVerb() == HttpVerb::kHead) { - Response->SuppressPayload(); - } + ZEN_TRACE_CPU("asio::async_write"); - const std::vector<asio::const_buffer>& ResponseBuffers = Response->AsioBuffers(); + std::string_view Headers = Response->GetHeaders(); - uint64_t ResponseLength = 0; + std::vector<asio::const_buffer> AsioBuffers; + AsioBuffers.push_back(asio::const_buffer(Headers.data(), Headers.size())); - for (const asio::const_buffer& Buffer : ResponseBuffers) - { - ResponseLength += Buffer.size(); + asio::async_write(*m_Socket.get(), + AsioBuffers, + asio::transfer_all(), + [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); + }); } - + else { - RwLock::ExclusiveLockScope _(m_ResponsesLock); - m_Responses.push_back(std::move(Response)); - } + ZEN_TRACE_CPU("asio::async_write"); - // TODO: should cork/uncork for Linux? + HttpResponse* ResponseRaw = Response.get(); - { - ZEN_TRACE_CPU("asio::async_write"); - asio::async_write(*m_Socket.get(), - ResponseBuffers, - asio::transfer_exactly(ResponseLength), - [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { - Conn->OnResponseDataSent(Ec, ByteCount, true); - }); + m_ActiveResponsesLock.WithExclusiveLock([&] { + // Keep referenced resources alive + m_ActiveResponses.push_back(std::move(Response)); + }); + + ResponseRaw->SendResponse( + *m_Socket, + [Conn = AsSharedPtr(), ResponseRaw, RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ ResponseRaw); + }); } + return; } } @@ -681,10 +1209,11 @@ HttpServerConnection::HandleRequest() "\r\n"sv; } - asio::async_write( - *m_Socket.get(), - asio::buffer(Response), - [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); }); + asio::async_write(*m_Socket.get(), + asio::buffer(Response), + [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); + }); } else { @@ -706,10 +1235,11 @@ HttpServerConnection::HandleRequest() "No suitable route found"sv; } - asio::async_write( - *m_Socket.get(), - asio::buffer(Response), - [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); }); + asio::async_write(*m_Socket.get(), + asio::buffer(Response), + [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); + }); } } @@ -1016,9 +1546,13 @@ private: ////////////////////////////////////////////////////////////////////////// -HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer) +HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request, + HttpService& Service, + IoBuffer PayloadBuffer, + uint32_t RequestNumber) : HttpServerRequest(Service) , m_Request(Request) +, m_RequestNumber(RequestNumber) , m_PayloadBuffer(std::move(PayloadBuffer)) { const int PrefixLength = Service.UriPrefixLength(); @@ -1104,7 +1638,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode) ZEN_ASSERT(!m_Response); - m_Response.reset(new HttpResponse(HttpContentType::kBinary)); + m_Response.reset(new HttpResponse(HttpContentType::kBinary, m_RequestNumber)); std::array<IoBuffer, 0> Empty; m_Response->InitializeForPayload((uint16_t)ResponseCode, Empty); @@ -1117,7 +1651,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT ZEN_ASSERT(!m_Response); - m_Response.reset(new HttpResponse(ContentType)); + m_Response.reset(new HttpResponse(ContentType, m_RequestNumber)); m_Response->InitializeForPayload((uint16_t)ResponseCode, Blobs); } @@ -1127,7 +1661,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_ASSERT(!m_Response); - m_Response.reset(new HttpResponse(ContentType)); + m_Response.reset(new HttpResponse(ContentType, m_RequestNumber)); IoBuffer MessageBuffer(IoBuffer::Wrap, ResponseString.data(), ResponseString.size()); std::array<IoBuffer, 1> SingleBufferList({MessageBuffer}); |