diff options
| author | Liam Mitchell <[email protected]> | 2026-03-09 18:25:30 -0700 |
|---|---|---|
| committer | Liam Mitchell <[email protected]> | 2026-03-09 18:25:30 -0700 |
| commit | 57c1683b2935c834250b73eb506319ed67946160 (patch) | |
| tree | 1fc8f237010b26e65659b731fe6f6eae30422f5c /src/zenhttp/servers | |
| parent | Allow external OidcToken executable to be specified unless disabled via comma... (diff) | |
| parent | reduce lock time for project store gc precache and gc validate (#750) (diff) | |
| download | zen-57c1683b2935c834250b73eb506319ed67946160.tar.xz zen-57c1683b2935c834250b73eb506319ed67946160.zip | |
Merge branch 'main' into lm/oidctoken-exe-path
Diffstat (limited to 'src/zenhttp/servers')
| -rw-r--r-- | src/zenhttp/servers/httpasio.cpp | 835 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpmulti.cpp | 14 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpnull.cpp | 14 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpplugin.cpp | 14 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpsys.cpp | 10 |
5 files changed, 741 insertions, 146 deletions
diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index a0431b0b1..18a0f6a40 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -4,6 +4,7 @@ #include "httptracer.h" #include <zencore/except.h> +#include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/memory/llm.h> #include <zencore/thread.h> @@ -13,6 +14,8 @@ #include "httpparser.h" +#include <EASTL/fixed_vector.h> + #include <deque> #include <memory> #include <string_view> @@ -28,6 +31,7 @@ ZEN_THIRD_PARTY_INCLUDES_START # include <errno.h> #endif #include <asio.hpp> +#include <asio/stream_file.hpp> ZEN_THIRD_PARTY_INCLUDES_END #define ASIO_VERBOSE_TRACE 0 @@ -154,6 +158,338 @@ Log() ////////////////////////////////////////////////////////////////////////// +#if !defined(ASIO_HAS_FILE) +# define ASIO_HAS_FILE 0 +#endif + +#if defined(ASIO_HAS_WINDOWS_OVERLAPPED_PTR) +# define ZEN_USE_TRANSMITFILE 1 +# define ZEN_USE_ASYNC_SENDFILE 0 +#else +# define ZEN_USE_TRANSMITFILE 0 +# define ZEN_USE_ASYNC_SENDFILE 0 +#endif + +#if ZEN_USE_TRANSMITFILE +template<typename Handler> +void +TransmitFileAsync(asio::ip::tcp::socket& Socket, HANDLE FileHandle, uint64_t ByteOffset, uint32_t ByteSize, Handler&& Cb) +{ +# if ZEN_BUILD_DEBUG + const uint64_t FileSize = FileSizeFromHandle(FileHandle); + const uint64_t SendEndOffset = ByteOffset + ByteSize; + + if (SendEndOffset > FileSize) + { + std::error_code DummyEc; + + ZEN_WARN("TransmitFileAsync (offset {:#x}, size {:#x}) file: '{}' (size {:#x})) tries to transmit {} bytes too many", + ByteOffset, + ByteSize, + PathFromHandle(FileHandle, DummyEc), + FileSizeFromHandle(FileHandle), + SendEndOffset - FileSize); + } +# endif // ZEN_BUILD_DEBUG + + asio::windows::overlapped_ptr OverlappedPtr( + Socket.get_executor(), + [WrappedCb = std::move(Cb), ExpectedBytes = ByteSize, FileHandle, ByteOffset, ByteSize](const std::error_code& Ec, + std::size_t ActualBytesTransferred) { + if (Ec) + { + std::error_code DummyEc; + ZEN_WARN("NOTE: TransmitFileAsync (offset {:#x}, size {:#x}) file: '{}' (size {:#x})) error '{}', transmitted {} bytes", + ByteOffset, + ByteSize, + PathFromHandle(FileHandle, DummyEc), + FileSizeFromHandle(FileHandle), + Ec.message(), + ActualBytesTransferred); + } + WrappedCb(Ec, ActualBytesTransferred); + }); + + OVERLAPPED* RawOverlapped = OverlappedPtr.get(); + RawOverlapped->Offset = uint32_t(ByteOffset & 0xffffFFFFull); + RawOverlapped->OffsetHigh = uint32_t(ByteOffset >> 32); + + const DWORD NumberOfBytesPerSend = 0; // let TransmitFile decide + + const BOOL Ok = + ::TransmitFile(Socket.native_handle(), FileHandle, ByteSize, NumberOfBytesPerSend, RawOverlapped, nullptr, /* dwReserved */ 0); + const DWORD LastError = ::GetLastError(); + + // Check if the operation completed immediately. + if (!Ok && LastError != ERROR_IO_PENDING) + { + // The operation completed immediately, so a completion notification needs + // to be posted. When complete() is called, ownership of the OVERLAPPED- + // derived object passes to the io_context. + + asio::error_code ec(LastError, asio::error::get_system_category()); + OverlappedPtr.complete(ec, 0); + } + else + { + // The operation was successfully initiated, so ownership of the + // OVERLAPPED-derived object has passed to the io_context. + + OverlappedPtr.release(); + } +} +#endif // ZEN_USE_TRANSMITFILE + +#if ZEN_USE_ASYNC_SENDFILE + +// Pipelined file sender that reads from a file and writes to a socket using two buffers +// to pipeline reads and writes. Unfortunately this strategy can't currently be used on +// non-Windows platforms as they don't currently support async file reads. We'll have +// to build a mechanism using a thread pool for that, perhaps with optional support +// for io_uring where available since that should be the most efficient. +// +// In other words, this is not super useful as Windows already has the TransmitFile +// version above, but it's here for completeness and potential future use on other platforms. + +template<class AsyncWriteStream, class CompletionHandler> +class PipelinedFileSender : public std::enable_shared_from_this<PipelinedFileSender<AsyncWriteStream, CompletionHandler>> +{ +public: + PipelinedFileSender(AsyncWriteStream& WriteSocket, + asio::stream_file&& FileToReadFrom, + uint64_t ByteSize, + std::size_t BufferSize, + CompletionHandler&& CompletionToken) + : m_WriteSocket(WriteSocket) + , m_SourceFile(std::move(FileToReadFrom)) + , m_Strand(asio::make_strand(m_WriteSocket.get_executor())) + , m_Buffers{std::vector<char>(BufferSize), std::vector<char>(BufferSize)} + , m_CompletionHandler(std::move(CompletionToken)) + , m_TotalBytesToRead(ByteSize) + , m_RemainingBytesToRead(ByteSize) + { + } + + void Start() { EnqueueRead(); } + +private: + struct Slot + { + std::size_t Size = 0; // valid bytes in buffer + bool Ready = false; // has unwritten data + bool InUse = false; // being written + }; + + void OnSendCompleted(const std::error_code& Ec) + { + // TODO: ensure this behaves properly for instance if the write fails while a read is pending + + if (m_SendCompleted) + { + return; + } + + m_SendCompleted = true; + + // Ensure completion runs on the strand/executor. + + asio::dispatch(m_Strand, [CompletionHandler = std::move(m_CompletionHandler), Ec, TotalBytesWritten = m_TotalBytesWritten]() { + CompletionHandler(Ec, TotalBytesWritten); + }); + } + + void EnqueueRead() + { + asio::dispatch(m_Strand, [self = this->shared_from_this()] { + self->TryPostRead(); + self->PumpWrites(); + }); + } + + void TryPostRead() + { + if (m_IsEof || m_ReadInFlight || m_RemainingBytesToRead == 0) + { + return; + } + + const int ReadSlotIndex = GetFreeSlotIndex(); + if (ReadSlotIndex < 0) + { + // no free slot; wait for writer to free one (not meant to ever happen) + return; + } + + m_ReadInFlight = true; + auto ReadBuffer = asio::buffer(m_Buffers[ReadSlotIndex].data(), zen::Min(m_Buffers[ReadSlotIndex].size(), m_RemainingBytesToRead)); + + asio::async_read( + m_SourceFile, + ReadBuffer, + asio::bind_executor(m_Strand, + [self = this->shared_from_this(), ReadSlotIndex](const std::error_code& Ec, std::size_t ActualBytesRead) { + self->m_ReadInFlight = false; + self->m_RemainingBytesToRead -= ActualBytesRead; + + if (Ec) + { + if (Ec == asio::error::eof) + { + ZEN_ASSERT(self->m_RemainingBytesToRead == 0); + + self->m_IsEof = true; + + // No data produced on EOF; just try to pump whatever is left + self->PumpWrites(); + } + else + { + // read error, cancel everything and let outer completion handler know + self->OnSendCompleted(Ec); + } + } + else + { + // Mark slot as ready with ActualBytesRead valid bytes of data in buffer + self->m_Slots[ReadSlotIndex].Size = ActualBytesRead; + self->m_Slots[ReadSlotIndex].Ready = true; + self->PumpWrites(); + self->TryPostRead(); + } + })); + } + + void PumpWrites() + { + if (m_WriteInFlight) + { + return; + } + + const int WriteSlotIndex = GetReadySlotIndex(); + if (WriteSlotIndex < 0) + { + // No ready data. We're done if EOF/no more data to read and no reads in flight and nothing ready + if (!m_ReadInFlight && (m_IsEof || m_RemainingBytesToRead == 0)) + { + // all done + return OnSendCompleted({}); + } + + return; + } + + m_WriteInFlight = true; + m_Slots[WriteSlotIndex].InUse = true; + + asio::async_write( + m_WriteSocket, + asio::buffer(m_Buffers[WriteSlotIndex].data(), m_Slots[WriteSlotIndex].Size), + asio::bind_executor(m_Strand, + [self = this->shared_from_this(), WriteSlotIndex](const std::error_code& Ec, std::size_t BytesWritten) { + self->m_TotalBytesWritten += BytesWritten; + self->m_WriteInFlight = false; + + if (Ec) + { + self->OnSendCompleted(Ec); + + return; + } + else + { + // Free the slot + self->m_Slots[WriteSlotIndex].Ready = false; + self->m_Slots[WriteSlotIndex].InUse = false; + self->m_Slots[WriteSlotIndex].Size = 0; + + self->TryPostRead(); + self->PumpWrites(); + } + })); + } + + int GetFreeSlotIndex() const + { + for (int i = 0; i < 2; ++i) + { + if (!m_Slots[i].Ready && !m_Slots[i].InUse) + { + return i; + } + } + return -1; + } + + int GetReadySlotIndex() const + { + for (int i = 0; i < 2; ++i) + { + if (m_Slots[i].Ready && !m_Slots[i].InUse) + { + return i; + } + } + return -1; + } + + AsyncWriteStream& m_WriteSocket; + asio::stream_file m_SourceFile; + asio::strand<asio::any_io_executor> m_Strand; + + // There's no synchronization needed for these as all access is via the strand + std::vector<char> m_Buffers[2]; + Slot m_Slots[2]; + + bool m_IsEof = false; + bool m_ReadInFlight = false; + bool m_WriteInFlight = false; + bool m_SendCompleted = false; + + const uint64_t m_TotalBytesToRead = 0; + uint64_t m_RemainingBytesToRead = 0; + uint64_t m_TotalBytesWritten = 0; + + CompletionHandler m_CompletionHandler; +}; + +template<class AsyncWriteStream, class CompletionHandler> +void +SendFileAsync(AsyncWriteStream& WriteSocket, + const auto FileHandle, + uint64_t ByteOffset, + uint64_t ByteSize, + std::size_t BufferSize, + CompletionHandler&& CompletionToken) +{ + HANDLE hReopenedFile = + ReOpenFile(FileHandle, FILE_GENERIC_READ, FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, FILE_FLAG_OVERLAPPED); + + // Note that this assumes ownership of the handle + asio::stream_file SourceFile(WriteSocket.get_executor(), hReopenedFile); + + // TODO: handle any error properly here + SourceFile.seek(ByteOffset, asio::stream_file::seek_set); + + if (BufferSize > ByteSize) + { + BufferSize = ByteSize; + } + + auto op = std::make_shared<PipelinedFileSender<AsyncWriteStream, std::decay_t<CompletionHandler>>>( + WriteSocket, + std::move(SourceFile), + ByteSize, + BufferSize, + std::forward<CompletionHandler>(CompletionToken)); + + // Start the pipeline + op->Start(); +} +#endif // ZEN_USE_ASYNC_SENDFILE + +////////////////////////////////////////////////////////////////////////// + struct HttpAsioServerImpl { public: @@ -191,7 +527,7 @@ public: class HttpAsioServerRequest : public HttpServerRequest { public: - HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer); + HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer, uint32_t RequestNumber); ~HttpAsioServerRequest(); virtual Oid ParseSessionId() const override; @@ -210,18 +546,41 @@ public: HttpAsioServerRequest& operator=(const HttpAsioServerRequest&) = delete; HttpRequestParser& m_Request; + uint32_t m_RequestNumber = 0; // Note: different to request ID which is derived from headers IoBuffer m_PayloadBuffer; std::unique_ptr<HttpResponse> m_Response; }; +/** + * HTTP Response representation used internally by the ASIO server + * + * This is used to build up the response headers and payload prior to sending + * it over the network. It's also responsible for managing the send operation itself, + * including ownership of the source buffers until the operation completes. + * + */ struct HttpResponse { public: HttpResponse() = default; - explicit HttpResponse(HttpContentType ContentType) : m_ContentType(ContentType) {} + explicit HttpResponse(HttpContentType ContentType, uint32_t RequestNumber) : m_RequestNumber(RequestNumber), m_ContentType(ContentType) + { + } + + ~HttpResponse() = default; + /** + * Initialize the response for sending a payload made up of multiple blobs + * + * This builds the necessary headers and IO vectors for sending the response + * and also makes sure all buffers are owned for the duration of the + * operation. + * + */ void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> BlobList) { + ZEN_ASSERT(m_State == State::kUninitialized); + ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_TRACE_CPU("asio::InitializeForPayload"); @@ -230,57 +589,124 @@ public: const uint32_t ChunkCount = gsl::narrow<uint32_t>(BlobList.size()); m_DataBuffers.reserve(ChunkCount); + m_IoVecs.reserve(ChunkCount + 1); - for (IoBuffer& Buffer : BlobList) - { -#if 1 - m_DataBuffers.emplace_back(std::move(Buffer)).MakeOwned(); -#else - IoBuffer TempBuffer = std::move(Buffer); - TempBuffer.MakeOwned(); - m_DataBuffers.emplace_back(IoBufferBuilder::ReadFromFileMaybe(TempBuffer)); -#endif - } + m_IoVecs.emplace_back(); // header IoVec - uint64_t LocalDataSize = 0; + m_IoVecCursor = 0; - m_AsioBuffers.push_back({}); // Placeholder for header + uint64_t LocalDataSize = 0; - for (IoBuffer& Buffer : m_DataBuffers) + for (IoBuffer& Buffer : BlobList) { - uint64_t BufferDataSize = Buffer.Size(); + const uint64_t BufferDataSize = Buffer.Size(); ZEN_ASSERT(BufferDataSize); LocalDataSize += BufferDataSize; - IoBufferFileReference FileRef; - if (Buffer.GetFileReference(/* out */ FileRef)) + IoBuffer OwnedBuffer = std::move(Buffer); + + bool ChunkHandled = false; + +#if ZEN_USE_TRANSMITFILE || ZEN_USE_ASYNC_SENDFILE + if (OwnedBuffer.IsWholeFile()) { - // TODO: Use direct file transfer, via TransmitFile/sendfile - // - // this looks like it requires some custom asio plumbing however + if (IoBufferFileReference FileRef; OwnedBuffer.GetFileReference(/* out */ FileRef)) + { +# if ZEN_USE_TRANSMITFILE + // We establish a new handle here to add the FILE_FLAG_OVERLAPPED flag for use during TransmitFile + + HANDLE WinFileHandle = ReOpenFile(FileRef.FileHandle, + FILE_GENERIC_READ, + FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, + FILE_FLAG_OVERLAPPED); + + if (WinFileHandle == INVALID_HANDLE_VALUE) + { + HRESULT hRes = HRESULT_FROM_WIN32(GetLastError()); + std::error_code DummyEc; + ThrowSystemException(hRes, + fmt::format("Failed to ReOpenFile file {}", PathFromHandle(FileRef.FileHandle, DummyEc))); + } + + void* FileHandle = (void*)WinFileHandle; + + OwnedBuffer = IoBufferBuilder::MakeFromFileHandle(FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize); +# else // ZEN_USE_TRANSMITFILE + void* FileHandle = FileRef.FileHandle; + + OwnedBuffer.MakeOwned(); +# endif // ZEN_USE_TRANSMITFILE + + // Since there's a limit to how much data TransmitFile can send in one go, + // we may need to split this into multiple IoVec entries. In this case we'll + // end up reallocating the IoVec array, but this should be rare. - m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()}); + uint64_t RemainingChunkBytes = FileRef.FileChunkSize; + uint64_t ChunkOffset = FileRef.FileChunkOffset; + + const uint32_t MaxTransmitSize = 1 * 1024 * 1024 * 1024; // 1 GB + + while (RemainingChunkBytes) + { + IoVec Io{.IsFileRef = true}; + + Io.Ref.FileRef.FileHandle = FileHandle; + Io.Ref.FileRef.FileChunkOffset = ChunkOffset; + + if (RemainingChunkBytes > MaxTransmitSize) + { + Io.Ref.FileRef.FileChunkSize = MaxTransmitSize; + RemainingChunkBytes -= MaxTransmitSize; + } + else + { + Io.Ref.FileRef.FileChunkSize = gsl::narrow<uint32_t>(RemainingChunkBytes); + RemainingChunkBytes = 0; + } + + ChunkOffset += Io.Ref.FileRef.FileChunkSize; + + m_IoVecs.push_back(Io); + } + + ChunkHandled = true; + } } - else +#endif // ZEN_USE_TRANSMITFILE || ZEN_USE_ASYNC_SENDFILE + + if (!ChunkHandled) { - // Send from memory + OwnedBuffer.MakeOwned(); + + IoVec Io{.IsFileRef = false}; + + Io.Ref.MemoryRef = {.Data = OwnedBuffer.Data(), .Size = OwnedBuffer.Size()}; - m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()}); + m_IoVecs.push_back(Io); } + + m_DataBuffers.push_back(std::move(OwnedBuffer)); } + + // Now that we know the full data size, we can build the headers + m_ContentLength = LocalDataSize; std::string_view Headers = GetHeaders(); - m_AsioBuffers[0] = asio::const_buffer(Headers.data(), Headers.size()); + + IoVec& HeaderIo = m_IoVecs[0]; + + HeaderIo.IsFileRef = false; + HeaderIo.Ref.MemoryRef = {.Data = Headers.data(), .Size = Headers.size()}; + + m_State = State::kInitialized; } uint16_t ResponseCode() const { return m_ResponseCode; } uint64_t ContentLength() const { return m_ContentLength; } - const std::vector<asio::const_buffer>& AsioBuffers() const { return m_AsioBuffers; } - std::string_view GetHeaders() { ZEN_MEMSCOPE(GetHttpasioTag()); @@ -299,16 +725,146 @@ public: return m_Headers; } - void SuppressPayload() { m_AsioBuffers.resize(1); } + void SendResponse(asio::ip::tcp::socket& TcpSocket, std::function<void(const asio::error_code& Ec, std::size_t ByteCount)>&& Token) + { + ZEN_ASSERT(m_State == State::kInitialized); + + ZEN_MEMSCOPE(GetHttpasioTag()); + ZEN_TRACE_CPU("asio::SendResponse"); + + m_SendCb = std::move(Token); + m_State = State::kSending; + + SendNextChunk(TcpSocket); + } + + void SendNextChunk(asio::ip::tcp::socket& TcpSocket) + { + ZEN_ASSERT(m_State == State::kSending); + + ZEN_MEMSCOPE(GetHttpasioTag()); + ZEN_TRACE_CPU("asio::SendNextChunk"); + + if (m_IoVecCursor == m_IoVecs.size()) + { + // All data sent, complete the operation + + ZEN_ASSERT(m_SendCb); + + m_State = State::kSent; + + auto CompletionToken = [Self = this, Token = std::move(m_SendCb), TotalBytes = m_TotalBytesSent] { Token({}, TotalBytes); }; + + asio::defer(TcpSocket.get_executor(), std::move(CompletionToken)); + + return; + } + + auto OnCompletion = [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) { + ZEN_ASSERT(m_State == State::kSending); + + m_TotalBytesSent += ByteCount; + if (Ec) + { + m_State = State::kFailed; + m_SendCb(Ec, m_TotalBytesSent); + } + else + { + SendNextChunk(TcpSocket); + } + }; + + const IoVec& Io = m_IoVecs[m_IoVecCursor++]; + + if (Io.IsFileRef) + { + ZEN_TRACE_VERBOSE("SendNextChunk from FILE, thread: {}, offset: {}, bytes: {}", + zen::GetCurrentThreadId(), + Io.Ref.FileRef.FileChunkOffset, + Io.Ref.FileRef.FileChunkSize); + +#if ZEN_USE_TRANSMITFILE + TransmitFileAsync(TcpSocket, + Io.Ref.FileRef.FileHandle, + Io.Ref.FileRef.FileChunkOffset, + gsl::narrow_cast<uint32_t>(Io.Ref.FileRef.FileChunkSize), + OnCompletion); +#elif ZEN_USE_ASYNC_SENDFILE + SendFileAsync(TcpSocket, + Io.Ref.FileRef.FileHandle, + Io.Ref.FileRef.FileChunkOffset, + Io.Ref.FileRef.FileChunkSize, + 64 * 1024, + OnCompletion); +#else + // This should never occur unless we compile with one + // of the options above + ZEN_WARN("invalid file reference in response"); +#endif + + return; + } + + // Send as many consecutive non-file references as possible in one asio operation + + std::vector<asio::const_buffer> AsioBuffers; + AsioBuffers.push_back(asio::const_buffer{Io.Ref.MemoryRef.Data, Io.Ref.MemoryRef.Size}); + + while (m_IoVecCursor != m_IoVecs.size()) + { + const IoVec& Io2 = m_IoVecs[m_IoVecCursor]; + + if (Io2.IsFileRef) + { + break; + } + + AsioBuffers.push_back(asio::const_buffer{Io2.Ref.MemoryRef.Data, Io2.Ref.MemoryRef.Size}); + ++m_IoVecCursor; + } + + asio::async_write(TcpSocket, std::move(AsioBuffers), asio::transfer_all(), OnCompletion); + } private: - uint16_t m_ResponseCode = 0; - bool m_IsKeepAlive = true; - HttpContentType m_ContentType = HttpContentType::kBinary; - uint64_t m_ContentLength = 0; - std::vector<IoBuffer> m_DataBuffers; - std::vector<asio::const_buffer> m_AsioBuffers; - ExtendableStringBuilder<160> m_Headers; + enum class State : uint8_t + { + kUninitialized, + kInitialized, + kSending, + kSent, + kFailed + }; + + uint32_t m_RequestNumber = 0; + uint16_t m_ResponseCode = 0; + bool m_IsKeepAlive = true; + State m_State = State::kUninitialized; + HttpContentType m_ContentType = HttpContentType::kBinary; + uint64_t m_ContentLength = 0; + eastl::fixed_vector<IoBuffer, 8> m_DataBuffers; // This is here to keep the IoBuffer buffers/handles alive + ExtendableStringBuilder<160> m_Headers; + + struct IoVec + { + bool IsFileRef; + union + { + struct MemoryBuffer + { + const void* Data; + uint64_t Size; + } MemoryRef; + IoBufferFileReference FileRef; + } Ref; + }; + + eastl::fixed_vector<IoVec, 8> m_IoVecs; + unsigned int m_IoVecCursor = 0; + + std::function<void(const asio::error_code& Ec, std::size_t ByteCount)> m_SendCb; + uint64_t m_TotalBytesSent = 0; }; ////////////////////////////////////////////////////////////////////////// @@ -339,37 +895,63 @@ private: kTerminated }; + const char* StateToString(RequestState State) + { + switch (State) + { + case RequestState::kInitialState: + return "InitialState"; + case RequestState::kInitialRead: + return "InitialRead"; + case RequestState::kReadingMore: + return "ReadingMore"; + case RequestState::kWriting: + return "Writing"; + case RequestState::kWritingFinal: + return "WritingFinal"; + case RequestState::kDone: + return "Done"; + case RequestState::kTerminated: + return "Terminated"; + default: + return "Unknown"; + } + } + RequestState m_RequestState = RequestState::kInitialState; HttpRequestParser m_RequestData{*this}; void EnqueueRead(); void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount); - void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop = false); + void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, uint32_t RequestNumber, HttpResponse* ResponseToPop); void CloseConnection(); - HttpAsioServerImpl& m_Server; - asio::streambuf m_RequestBuffer; - std::unique_ptr<asio::ip::tcp::socket> m_Socket; - std::atomic<uint32_t> m_RequestCounter{0}; - uint32_t m_ConnectionId = 0; - Ref<IHttpPackageHandler> m_PackageHandler; + HttpAsioServerImpl& m_Server; + asio::streambuf m_RequestBuffer; + std::atomic<uint32_t> m_RequestCounter{0}; + uint32_t m_ConnectionId = 0; + Ref<IHttpPackageHandler> m_PackageHandler; - RwLock m_ResponsesLock; - std::deque<std::unique_ptr<HttpResponse>> m_Responses; + RwLock m_ActiveResponsesLock; + std::deque<std::unique_ptr<HttpResponse>> m_ActiveResponses; + + std::unique_ptr<asio::ip::tcp::socket> m_Socket; }; std::atomic<uint32_t> g_ConnectionIdCounter{0}; HttpServerConnection::HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr<asio::ip::tcp::socket>&& Socket) : m_Server(Server) -, m_Socket(std::move(Socket)) , m_ConnectionId(g_ConnectionIdCounter.fetch_add(1)) +, m_Socket(std::move(Socket)) { ZEN_TRACE_VERBOSE("new connection #{}", m_ConnectionId); } HttpServerConnection::~HttpServerConnection() { + RwLock::ExclusiveLockScope _(m_ActiveResponsesLock); + ZEN_TRACE_VERBOSE("destroying connection #{}", m_ConnectionId); } @@ -434,7 +1016,11 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused] return; default: - ZEN_WARN("on data received ERROR, connection: {}, reason '{}'", m_ConnectionId, Ec.message()); + ZEN_WARN("on data received ERROR, connection: {} (state: {}), reason '{}'", + m_ConnectionId, + StateToString(m_RequestState), + Ec.message()); + return TerminateConnection(); } } @@ -472,37 +1058,58 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused] } void -HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount, bool Pop) +HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, + [[maybe_unused]] std::size_t ByteCount, + [[maybe_unused]] uint32_t RequestNumber, + HttpResponse* ResponseToPop) { ZEN_MEMSCOPE(GetHttpasioTag()); if (Ec) { - ZEN_WARN("on data sent ERROR, connection: {}, reason: '{}'", m_ConnectionId, Ec.message()); + ZEN_WARN("on data sent ERROR, connection: {} (state: {}), reason: '{}' (bytes: {})", + m_ConnectionId, + StateToString(m_RequestState), + Ec.message(), + ByteCount); + TerminateConnection(); + + return; } - else - { - ZEN_TRACE_VERBOSE("on data sent, connection: {}, request: {}, thread: {}, bytes: {}", - m_ConnectionId, - m_RequestCounter.load(std::memory_order_relaxed), - zen::GetCurrentThreadId(), - NiceBytes(ByteCount)); - if (!m_RequestData.IsKeepAlive()) - { - CloseConnection(); - } - else - { - if (Pop) + ZEN_TRACE_VERBOSE("on data sent, connection: {}, request: {}, thread: {}, bytes: {}", + m_ConnectionId, + RequestNumber, + zen::GetCurrentThreadId(), + NiceBytes(ByteCount)); + + if (ResponseToPop) + { + m_ActiveResponsesLock.WithExclusiveLock([&] { + // Once a response is sent we can release any referenced resources + // + // completion callbacks may be issued out-of-order so we need to + // remove the relevant entry from our active response list, it may + // not be the first + + if (auto It = find_if(begin(m_ActiveResponses), + end(m_ActiveResponses), + [ResponseToPop](const auto& Item) { return Item.get() == ResponseToPop; }); + It != end(m_ActiveResponses)) { - RwLock::ExclusiveLockScope _(m_ResponsesLock); - m_Responses.pop_front(); + m_ActiveResponses.erase(It); } + else + { + ZEN_WARN("response not found"); + } + }); + } - m_RequestCounter.fetch_add(1); - } + if (!m_RequestData.IsKeepAlive()) + { + CloseConnection(); } } @@ -553,13 +1160,13 @@ HttpServerConnection::HandleRequest() m_RequestState = RequestState::kWriting; } + const uint32_t RequestNumber = m_RequestCounter.fetch_add(1); + if (HttpService* Service = m_Server.RouteRequest(m_RequestData.Url())) { ZEN_TRACE_CPU("asio::HandleRequest"); - const uint32_t RequestNumber = m_RequestCounter.load(std::memory_order_relaxed); - - HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body()); + HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body(), RequestNumber); ZEN_TRACE_VERBOSE("handle request, connection: {}, request: {}'", m_ConnectionId, RequestNumber); @@ -635,34 +1242,38 @@ HttpServerConnection::HandleRequest() if (m_RequestData.RequestVerb() == HttpVerb::kHead) { - Response->SuppressPayload(); - } + ZEN_TRACE_CPU("asio::async_write"); - const std::vector<asio::const_buffer>& ResponseBuffers = Response->AsioBuffers(); + std::string_view Headers = Response->GetHeaders(); - uint64_t ResponseLength = 0; + std::vector<asio::const_buffer> AsioBuffers; + AsioBuffers.push_back(asio::const_buffer(Headers.data(), Headers.size())); - for (const asio::const_buffer& Buffer : ResponseBuffers) - { - ResponseLength += Buffer.size(); + asio::async_write(*m_Socket.get(), + AsioBuffers, + asio::transfer_all(), + [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); + }); } - + else { - RwLock::ExclusiveLockScope _(m_ResponsesLock); - m_Responses.push_back(std::move(Response)); - } + ZEN_TRACE_CPU("asio::async_write"); - // TODO: should cork/uncork for Linux? + HttpResponse* ResponseRaw = Response.get(); - { - ZEN_TRACE_CPU("asio::async_write"); - asio::async_write(*m_Socket.get(), - ResponseBuffers, - asio::transfer_exactly(ResponseLength), - [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { - Conn->OnResponseDataSent(Ec, ByteCount, true); - }); + m_ActiveResponsesLock.WithExclusiveLock([&] { + // Keep referenced resources alive + m_ActiveResponses.push_back(std::move(Response)); + }); + + ResponseRaw->SendResponse( + *m_Socket, + [Conn = AsSharedPtr(), ResponseRaw, RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ ResponseRaw); + }); } + return; } } @@ -681,10 +1292,11 @@ HttpServerConnection::HandleRequest() "\r\n"sv; } - asio::async_write( - *m_Socket.get(), - asio::buffer(Response), - [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); }); + asio::async_write(*m_Socket.get(), + asio::buffer(Response), + [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); + }); } else { @@ -706,10 +1318,11 @@ HttpServerConnection::HandleRequest() "No suitable route found"sv; } - asio::async_write( - *m_Socket.get(), - asio::buffer(Response), - [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); }); + asio::async_write(*m_Socket.get(), + asio::buffer(Response), + [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); + }); } } @@ -1016,9 +1629,13 @@ private: ////////////////////////////////////////////////////////////////////////// -HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer) +HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request, + HttpService& Service, + IoBuffer PayloadBuffer, + uint32_t RequestNumber) : HttpServerRequest(Service) , m_Request(Request) +, m_RequestNumber(RequestNumber) , m_PayloadBuffer(std::move(PayloadBuffer)) { const int PrefixLength = Service.UriPrefixLength(); @@ -1104,7 +1721,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode) ZEN_ASSERT(!m_Response); - m_Response.reset(new HttpResponse(HttpContentType::kBinary)); + m_Response.reset(new HttpResponse(HttpContentType::kBinary, m_RequestNumber)); std::array<IoBuffer, 0> Empty; m_Response->InitializeForPayload((uint16_t)ResponseCode, Empty); @@ -1117,7 +1734,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT ZEN_ASSERT(!m_Response); - m_Response.reset(new HttpResponse(ContentType)); + m_Response.reset(new HttpResponse(ContentType, m_RequestNumber)); m_Response->InitializeForPayload((uint16_t)ResponseCode, Blobs); } @@ -1127,7 +1744,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_ASSERT(!m_Response); - m_Response.reset(new HttpResponse(ContentType)); + m_Response.reset(new HttpResponse(ContentType, m_RequestNumber)); IoBuffer MessageBuffer(IoBuffer::Wrap, ResponseString.data(), ResponseString.size()); std::array<IoBuffer, 1> SingleBufferList({MessageBuffer}); @@ -1375,23 +1992,17 @@ HttpAsioServer::OnInitialize(int BasePort, std::filesystem::path DataDir) void HttpAsioServer::OnRun(bool IsInteractive) { - const bool TestMode = !IsInteractive; - - int WaitTimeout = -1; - if (!TestMode) - { - WaitTimeout = 1000; - } + const int WaitTimeout = 1000; #if ZEN_PLATFORM_WINDOWS - if (TestMode == false) + if (IsInteractive) { ZEN_CONSOLE("Zen Server running (asio HTTP). Press ESC or Q to quit"); } do { - if (!TestMode && _kbhit() != 0) + if (IsInteractive && _kbhit() != 0) { char c = (char)_getch(); @@ -1404,7 +2015,7 @@ HttpAsioServer::OnRun(bool IsInteractive) m_ShutdownEvent.Wait(WaitTimeout); } while (!IsApplicationExitRequested()); #else - if (TestMode == false) + if (IsInteractive) { ZEN_CONSOLE("Zen Server running (asio HTTP). Ctrl-C to quit"); } diff --git a/src/zenhttp/servers/httpmulti.cpp b/src/zenhttp/servers/httpmulti.cpp index 6541a1c48..31cb04be5 100644 --- a/src/zenhttp/servers/httpmulti.cpp +++ b/src/zenhttp/servers/httpmulti.cpp @@ -56,23 +56,17 @@ HttpMultiServer::OnInitialize(int BasePort, std::filesystem::path DataDir) void HttpMultiServer::OnRun(bool IsInteractiveSession) { - const bool TestMode = !IsInteractiveSession; - - int WaitTimeout = -1; - if (!TestMode) - { - WaitTimeout = 1000; - } + const int WaitTimeout = 1000; #if ZEN_PLATFORM_WINDOWS - if (TestMode == false) + if (IsInteractiveSession) { ZEN_CONSOLE("Zen Server running (multi server). Press ESC or Q to quit"); } do { - if (!TestMode && _kbhit() != 0) + if (IsInteractiveSession && _kbhit() != 0) { char c = (char)_getch(); @@ -85,7 +79,7 @@ HttpMultiServer::OnRun(bool IsInteractiveSession) m_ShutdownEvent.Wait(WaitTimeout); } while (!IsApplicationExitRequested()); #else - if (TestMode == false) + if (IsInteractiveSession) { ZEN_CONSOLE("Zen Server running (null HTTP). Ctrl-C to quit"); } diff --git a/src/zenhttp/servers/httpnull.cpp b/src/zenhttp/servers/httpnull.cpp index 06838a0ed..0ec1cb3c4 100644 --- a/src/zenhttp/servers/httpnull.cpp +++ b/src/zenhttp/servers/httpnull.cpp @@ -34,23 +34,17 @@ HttpNullServer::OnInitialize(int BasePort, std::filesystem::path DataDir) void HttpNullServer::OnRun(bool IsInteractiveSession) { - const bool TestMode = !IsInteractiveSession; - - int WaitTimeout = -1; - if (!TestMode) - { - WaitTimeout = 1000; - } + const int WaitTimeout = 1000; #if ZEN_PLATFORM_WINDOWS - if (TestMode == false) + if (IsInteractiveSession) { ZEN_CONSOLE("Zen Server running (null HTTP). Press ESC or Q to quit"); } do { - if (!TestMode && _kbhit() != 0) + if (IsInteractiveSession && _kbhit() != 0) { char c = (char)_getch(); @@ -63,7 +57,7 @@ HttpNullServer::OnRun(bool IsInteractiveSession) m_ShutdownEvent.Wait(WaitTimeout); } while (!IsApplicationExitRequested()); #else - if (TestMode == false) + if (IsInteractiveSession) { ZEN_CONSOLE("Zen Server running (null HTTP). Ctrl-C to quit"); } diff --git a/src/zenhttp/servers/httpplugin.cpp b/src/zenhttp/servers/httpplugin.cpp index 7ee4c6e62..b9217ed87 100644 --- a/src/zenhttp/servers/httpplugin.cpp +++ b/src/zenhttp/servers/httpplugin.cpp @@ -790,23 +790,17 @@ HttpPluginServerImpl::OnRun(bool IsInteractive) { ZEN_MEMSCOPE(GetHttppluginTag()); - const bool TestMode = !IsInteractive; - - int WaitTimeout = -1; - if (!TestMode) - { - WaitTimeout = 1000; - } + const int WaitTimeout = 1000; # if ZEN_PLATFORM_WINDOWS - if (TestMode == false) + if (IsInteractive) { ZEN_CONSOLE("Zen Server running (plugin HTTP). Press ESC or Q to quit"); } do { - if (!TestMode && _kbhit() != 0) + if (IsInteractive && _kbhit() != 0) { char c = (char)_getch(); @@ -819,7 +813,7 @@ HttpPluginServerImpl::OnRun(bool IsInteractive) m_ShutdownEvent.Wait(WaitTimeout); } while (!IsApplicationExitRequested()); # else - if (TestMode == false) + if (IsInteractive) { ZEN_CONSOLE("Zen Server running (plugin HTTP). Ctrl-C to quit"); } diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index c555a39b6..54cc0c22d 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -1647,9 +1647,9 @@ HttpSysTransaction::InvokeRequestHandler(HttpService& Service, IoBuffer Payload) std::string_view Verb = ToString(ThisRequest.RequestVerb()); std::string_view Uri = ThisRequest.m_UriUtf8.ToView(); - ExtendableStringBuilder<64> SpanName; - SpanName << Verb << " " << Uri; - otel::ScopedSpan HttpSpan(SpanName.ToView(), [&](otel::Span& Span) { + auto SpanNamer = [&](StringBuilderBase& SpanName) { SpanName << Verb << " " << Uri; }; + + auto SpanAnnotator = [&](otel::Span& Span) { Span.AddAttribute("http.request.method"sv, Verb); Span.AddAttribute("url.path"sv, Uri); // FIXME: should be total size including headers etc according to spec @@ -1661,7 +1661,9 @@ HttpSysTransaction::InvokeRequestHandler(HttpService& Service, IoBuffer Payload) ExtendableStringBuilder<64> ClientAddr; GetAddressString(ClientAddr, SockAddr, /* IncludePort */ false); Span.AddAttribute("client.address"sv, ClientAddr.ToView()); - }); + }; + + otel::ScopedSpan HttpSpan(SpanNamer, SpanAnnotator); # endif if (!HandlePackageOffers(Service, ThisRequest, m_PackageHandler)) |