diff options
| author | Stefan Boberg <[email protected]> | 2026-02-04 10:23:36 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-02-04 10:23:36 +0100 |
| commit | 2ab9d9f70ff2db69f2e8b731fbea4ec825c26dbb (patch) | |
| tree | 90a82170358d75bcd8bc2c83849d61782f3c3167 /src/zenhttp/servers/httpasio.cpp | |
| parent | Merge pull request #724 from ue-foundation/lm/restrict-reads-to-project (diff) | |
| download | zen-2ab9d9f70ff2db69f2e8b731fbea4ec825c26dbb.tar.xz zen-2ab9d9f70ff2db69f2e8b731fbea4ec825c26dbb.zip | |
implemented chunking for TransmitFile path (#736)
* implemented chunking for TransmitFile path, to ensure payloads exceeding the TransmitFile API limit of 4GB can be handled
* also fixes similar issue with memory path
Diffstat (limited to 'src/zenhttp/servers/httpasio.cpp')
| -rw-r--r-- | src/zenhttp/servers/httpasio.cpp | 177 |
1 files changed, 119 insertions, 58 deletions
diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index b11eebc1a..9666a1d28 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -163,7 +163,7 @@ Log() #endif #if defined(ASIO_HAS_WINDOWS_OVERLAPPED_PTR) -# define ZEN_USE_TRANSMITFILE 0 +# define ZEN_USE_TRANSMITFILE 1 # define ZEN_USE_ASYNC_SENDFILE 0 #else # define ZEN_USE_TRANSMITFILE 0 @@ -558,6 +558,14 @@ public: std::unique_ptr<HttpResponse> m_Response; }; +/** + * HTTP Response representation used internally by the ASIO server + * + * This is used to build up the response headers and payload prior to sending + * it over the network. It's also responsible for managing the send operation itself, + * including ownership of the source buffers until the operation completes. + * + */ struct HttpResponse { public: @@ -568,8 +576,18 @@ public: ~HttpResponse() = default; + /** + * Initialize the response for sending a payload made up of multiple blobs + * + * This builds the necessary headers and IO vectors for sending the response + * and also makes sure all buffers are owned for the duration of the + * operation. + * + */ void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> BlobList) { + ZEN_ASSERT(m_State == State::kUninitialized); + ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_TRACE_CPU("asio::InitializeForPayload"); @@ -578,13 +596,13 @@ public: const uint32_t ChunkCount = gsl::narrow<uint32_t>(BlobList.size()); m_DataBuffers.reserve(ChunkCount); - m_IoVecCount = ChunkCount + 1 /* one extra buffer for headers */; - m_IoVecs.resize(m_IoVecCount); + m_IoVecs.reserve(ChunkCount + 1); + + m_IoVecs.emplace_back(); // header IoVec m_IoVecCursor = 0; uint64_t LocalDataSize = 0; - int Index = 1; for (IoBuffer& Buffer : BlobList) { @@ -597,37 +615,71 @@ public: IoBuffer OwnedBuffer = std::move(Buffer); OwnedBuffer.MakeOwned(); - IoVec& Io = m_IoVecs[Index++]; - bool ChunkHandled = false; #if ZEN_USE_TRANSMITFILE || ZEN_USE_ASYNC_SENDFILE if (IoBufferFileReference FileRef; OwnedBuffer.GetFileReference(/* out */ FileRef)) { - Io.IsFileRef = true; - Io.Ref.FileRef = FileRef; - ChunkHandled = true; + // Since there's a limit to how much data TransmitFile can send in one go, + // we may need to split this into multiple IoVec entries. In this case we'll + // end up reallocating the IoVec array, but this should be rare. + + uint64_t RemainingChunkBytes = FileRef.FileChunkSize; + uint64_t ChunkOffset = FileRef.FileChunkOffset; + + const uint32_t MaxTransmitSize = 1 * 1024 * 1024 * 1024; // 1 GB + + while (RemainingChunkBytes) + { + IoVec Io{.IsFileRef = true}; + + Io.Ref.FileRef.FileHandle = FileRef.FileHandle; + Io.Ref.FileRef.FileChunkOffset = ChunkOffset; + + if (RemainingChunkBytes > MaxTransmitSize) + { + Io.Ref.FileRef.FileChunkSize = MaxTransmitSize; + RemainingChunkBytes -= MaxTransmitSize; + } + else + { + Io.Ref.FileRef.FileChunkSize = gsl::narrow<uint32_t>(RemainingChunkBytes); + RemainingChunkBytes = 0; + } + + ChunkOffset += Io.Ref.FileRef.FileChunkSize; + + m_IoVecs.push_back(Io); + } + + ChunkHandled = true; } #endif if (!ChunkHandled) { - Io.IsFileRef = false; - uint32_t Size = gsl::narrow<uint32_t>(OwnedBuffer.Size()); - Io.Ref.MemoryRef = {OwnedBuffer.Data(), Size}; + IoVec Io{.IsFileRef = false}; + + Io.Ref.MemoryRef = {.Data = OwnedBuffer.Data(), .Size = OwnedBuffer.Size()}; + + m_IoVecs.push_back(Io); } - m_DataBuffers.emplace_back(OwnedBuffer); + m_DataBuffers.push_back(std::move(OwnedBuffer)); } + // Now that we know the full data size, we can build the headers + m_ContentLength = LocalDataSize; std::string_view Headers = GetHeaders(); - IoVec& Io = m_IoVecs[0]; + IoVec& HeaderIo = m_IoVecs[0]; - Io.IsFileRef = false; - Io.Ref.MemoryRef = {.Data = Headers.data(), .Size = gsl::narrow_cast<uint32_t>(Headers.size())}; + HeaderIo.IsFileRef = false; + HeaderIo.Ref.MemoryRef = {.Data = Headers.data(), .Size = Headers.size()}; + + m_State = State::kInitialized; } uint16_t ResponseCode() const { return m_ResponseCode; } @@ -653,17 +705,32 @@ public: void SendResponse(asio::ip::tcp::socket& TcpSocket, std::function<void(const asio::error_code& Ec, std::size_t ByteCount)>&& Token) { + ZEN_ASSERT(m_State == State::kInitialized); + + ZEN_MEMSCOPE(GetHttpasioTag()); + ZEN_TRACE_CPU("asio::SendResponse"); + m_SendCb = std::move(Token); + m_State = State::kSending; SendNextChunk(TcpSocket); } void SendNextChunk(asio::ip::tcp::socket& TcpSocket) { - if (m_IoVecCursor == m_IoVecCount) + ZEN_ASSERT(m_State == State::kSending); + + ZEN_MEMSCOPE(GetHttpasioTag()); + ZEN_TRACE_CPU("asio::SendNextChunk"); + + if (m_IoVecCursor == m_IoVecs.size()) { + // All data sent, complete the operation + ZEN_ASSERT(m_SendCb); + m_State = State::kSent; + auto CompletionToken = [Self = this, Token = std::move(m_SendCb), TotalBytes = m_TotalBytesSent] { Token({}, TotalBytes); }; asio::defer(TcpSocket.get_executor(), std::move(CompletionToken)); @@ -671,45 +738,43 @@ public: return; } + auto OnCompletion = [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) { + ZEN_ASSERT(m_State == State::kSending); + + m_TotalBytesSent += ByteCount; + if (Ec) + { + m_State = State::kFailed; + m_SendCb(Ec, m_TotalBytesSent); + } + else + { + SendNextChunk(TcpSocket); + } + }; + const IoVec& Io = m_IoVecs[m_IoVecCursor++]; if (Io.IsFileRef) { - ZEN_TRACE_VERBOSE("SendNextChunk from FILE, thread: {}, bytes: {}", zen::GetCurrentThreadId(), Io.Ref.FileRef.FileChunkSize); + ZEN_TRACE_VERBOSE("SendNextChunk from FILE, thread: {}, offset: {}, bytes: {}", + zen::GetCurrentThreadId(), + Io.Ref.FileRef.FileChunkOffset, + Io.Ref.FileRef.FileChunkSize); #if ZEN_USE_TRANSMITFILE TransmitFileAsync(TcpSocket, Io.Ref.FileRef.FileHandle, Io.Ref.FileRef.FileChunkOffset, gsl::narrow_cast<uint32_t>(Io.Ref.FileRef.FileChunkSize), - [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) { - m_TotalBytesSent += ByteCount; - if (Ec) - { - m_SendCb(Ec, m_TotalBytesSent); - } - else - { - SendNextChunk(TcpSocket); - } - }); + OnCompletion); #elif ZEN_USE_ASYNC_SENDFILE SendFileAsync(TcpSocket, Io.Ref.FileRef.FileHandle, Io.Ref.FileRef.FileChunkOffset, Io.Ref.FileRef.FileChunkSize, 64 * 1024, - [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) { - m_TotalBytesSent += ByteCount; - if (Ec) - { - m_SendCb(Ec, m_TotalBytesSent); - } - else - { - SendNextChunk(TcpSocket); - } - }); + OnCompletion); #else // This should never occur unless we compile with one // of the options above @@ -724,7 +789,7 @@ public: std::vector<asio::const_buffer> AsioBuffers; AsioBuffers.push_back(asio::const_buffer{Io.Ref.MemoryRef.Data, Io.Ref.MemoryRef.Size}); - while (m_IoVecCursor != m_IoVecCount) + while (m_IoVecCursor != m_IoVecs.size()) { const IoVec& Io2 = m_IoVecs[m_IoVecCursor]; @@ -737,26 +802,23 @@ public: ++m_IoVecCursor; } - asio::async_write(TcpSocket, - std::move(AsioBuffers), - asio::transfer_all(), - [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) { - m_TotalBytesSent += ByteCount; - if (Ec) - { - m_SendCb(Ec, m_TotalBytesSent); - } - else - { - SendNextChunk(TcpSocket); - } - }); + asio::async_write(TcpSocket, std::move(AsioBuffers), asio::transfer_all(), OnCompletion); } private: + enum class State : uint8_t + { + kUninitialized, + kInitialized, + kSending, + kSent, + kFailed + }; + uint32_t m_RequestNumber = 0; uint16_t m_ResponseCode = 0; bool m_IsKeepAlive = true; + State m_State = State::kUninitialized; HttpContentType m_ContentType = HttpContentType::kBinary; uint64_t m_ContentLength = 0; eastl::fixed_vector<IoBuffer, 8> m_DataBuffers; // This is here to keep the IoBuffer buffers/handles alive @@ -770,15 +832,14 @@ private: struct MemoryBuffer { const void* Data; - uint32_t Size; + uint64_t Size; } MemoryRef; IoBufferFileReference FileRef; } Ref; }; eastl::fixed_vector<IoVec, 8> m_IoVecs; - int m_IoVecCursor = 0; - int m_IoVecCount = 0; + unsigned int m_IoVecCursor = 0; std::function<void(const asio::error_code& Ec, std::size_t ByteCount)> m_SendCb; uint64_t m_TotalBytesSent = 0; |