aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-01-14 10:32:37 +0100
committerGitHub Enterprise <[email protected]>2026-01-14 10:32:37 +0100
commitf99a4687ff78e2823332e09d0e35cb16f361ff93 (patch)
tree58020dafaad9e1c9664698e3d36884667cb5c465 /src
parent5.7.16-pre2 (diff)
downloadzen-f99a4687ff78e2823332e09d0e35cb16f361ff93.tar.xz
zen-f99a4687ff78e2823332e09d0e35cb16f361ff93.zip
asio/http optimizations (#449)
This change primarily introduces improved logic for dealing with sending data from file references. This is intended to reduce the amount of memory-mapping we end up doing when sending data from files. Windows now uses `TransmitFile` to send file data more efficiently using kernel-side I/O, but Linux/Mac basically behaves as before since they don't offer any true async file I/O support via asio. This should be implemented separately using a background I/O thread pool. This PR also includes improved memory management for http/asio with reduced allocation counts, and a fix for a potential use-after-free in very high load scenarios.
Diffstat (limited to 'src')
-rw-r--r--src/zenhttp/servers/httpasio.cpp742
-rw-r--r--src/zenserver/diag/diagsvcs.cpp29
2 files changed, 640 insertions, 131 deletions
diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp
index a0431b0b1..be4e73576 100644
--- a/src/zenhttp/servers/httpasio.cpp
+++ b/src/zenhttp/servers/httpasio.cpp
@@ -4,6 +4,7 @@
#include "httptracer.h"
#include <zencore/except.h>
+#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/memory/llm.h>
#include <zencore/thread.h>
@@ -13,6 +14,8 @@
#include "httpparser.h"
+#include <EASTL/fixed_vector.h>
+
#include <deque>
#include <memory>
#include <string_view>
@@ -28,6 +31,7 @@ ZEN_THIRD_PARTY_INCLUDES_START
# include <errno.h>
#endif
#include <asio.hpp>
+#include <asio/stream_file.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
#define ASIO_VERBOSE_TRACE 0
@@ -154,6 +158,345 @@ Log()
//////////////////////////////////////////////////////////////////////////
+#if !defined(ASIO_HAS_FILE)
+# define ASIO_HAS_FILE 0
+#endif
+
+#if defined(ASIO_HAS_WINDOWS_OVERLAPPED_PTR)
+# define ZEN_USE_TRANSMITFILE 1
+# define ZEN_USE_ASYNC_SENDFILE ASIO_HAS_FILE
+#else
+# define ZEN_USE_TRANSMITFILE 0
+# define ZEN_USE_ASYNC_SENDFILE 0
+#endif
+
+#if ZEN_USE_TRANSMITFILE
+template<typename Handler>
+void
+TransmitFileAsync(asio::ip::tcp::socket& Socket, HANDLE FileHandle, uint64_t ByteOffset, uint32_t ByteSize, Handler&& Cb)
+{
+ // We need to establish a new handle here to avoid running into random errors
+ // during TransmitFile. I'm not entirely sure why it's necessary yet.
+
+ HANDLE hReopenedFile =
+ ReOpenFile(FileHandle, FILE_GENERIC_READ, FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, FILE_FLAG_OVERLAPPED);
+
+ const uint64_t FileSize = FileSizeFromHandle(FileHandle);
+ const uint64_t SendEndOffset = ByteOffset + ByteSize;
+
+ if (SendEndOffset > FileSize)
+ {
+ std::error_code DummyEc;
+
+ ZEN_WARN("TransmitFileAsync (offset {:#x}, size {:#x}) file: '{}' (size {:#x})) tries to transmit {} bytes too many",
+ ByteOffset,
+ ByteSize,
+ PathFromHandle(FileHandle, DummyEc),
+ FileSizeFromHandle(FileHandle),
+ SendEndOffset - FileSize);
+ }
+
+ asio::windows::overlapped_ptr OverlappedPtr(
+ Socket.get_executor(),
+ [WrappedCb = std::move(Cb), ExpectedBytes = ByteSize, FileHandle, ByteOffset, ByteSize, hReopenedFile](
+ const std::error_code& Ec,
+ std::size_t ActualBytesTransferred) {
+ if (Ec)
+ {
+ std::error_code DummyEc;
+ ZEN_WARN("NOTE: TransmitFileAsync (offset {:#x}, size {:#x}) file: '{}' (size {:#x})) error '{}', transmitted {} bytes",
+ ByteOffset,
+ ByteSize,
+ PathFromHandle(hReopenedFile, DummyEc),
+ FileSizeFromHandle(hReopenedFile),
+ Ec.message(),
+ ActualBytesTransferred);
+ }
+
+ CloseHandle(hReopenedFile);
+ WrappedCb(Ec, ActualBytesTransferred);
+ });
+
+ OVERLAPPED* RawOverlapped = OverlappedPtr.get();
+ RawOverlapped->Offset = uint32_t(ByteOffset & 0xffffFFFFull);
+ RawOverlapped->OffsetHigh = uint32_t(ByteOffset >> 32);
+
+ const DWORD NumberOfBytesPerSend = 0; // let TransmitFile decide
+
+ const BOOL Ok =
+ ::TransmitFile(Socket.native_handle(), hReopenedFile, ByteSize, NumberOfBytesPerSend, RawOverlapped, nullptr, /* dwReserved */ 0);
+ const DWORD LastError = ::GetLastError();
+
+ // Check if the operation completed immediately.
+ if (!Ok && LastError != ERROR_IO_PENDING)
+ {
+ // The operation completed immediately, so a completion notification needs
+ // to be posted. When complete() is called, ownership of the OVERLAPPED-
+ // derived object passes to the io_context.
+
+ asio::error_code ec(LastError, asio::error::get_system_category());
+ OverlappedPtr.complete(ec, 0);
+ }
+ else
+ {
+ // The operation was successfully initiated, so ownership of the
+ // OVERLAPPED-derived object has passed to the io_context.
+
+ OverlappedPtr.release();
+ }
+}
+#endif // ZEN_USE_TRANSMITFILE
+
+#if ZEN_USE_ASYNC_SENDFILE
+
+// Pipelined file sender that reads from a file and writes to a socket using two buffers
+// to pipeline reads and writes. Unfortunately this strategy can't currently be used on
+// non-Windows platforms as they don't currently support async file reads. We'll have
+// to build a mechanism using a thread pool for that, perhaps with optional support
+// for io_uring where available since that should be the most efficient.
+//
+// In other words, this is not super useful as Windows already has the TransmitFile
+// version above, but it's here for completeness and potential future use on other platforms.
+
+template<class AsyncWriteStream, class CompletionHandler>
+class PipelinedFileSender : public std::enable_shared_from_this<PipelinedFileSender<AsyncWriteStream, CompletionHandler>>
+{
+public:
+ PipelinedFileSender(AsyncWriteStream& WriteSocket,
+ asio::stream_file&& FileToReadFrom,
+ uint64_t ByteSize,
+ std::size_t BufferSize,
+ CompletionHandler&& CompletionToken)
+ : m_WriteSocket(WriteSocket)
+ , m_SourceFile(std::move(FileToReadFrom))
+ , m_Strand(asio::make_strand(m_WriteSocket.get_executor()))
+ , m_Buffers{std::vector<char>(BufferSize), std::vector<char>(BufferSize)}
+ , m_CompletionHandler(std::move(CompletionToken))
+ , m_TotalBytesToRead(ByteSize)
+ , m_RemainingBytesToRead(ByteSize)
+ {
+ }
+
+ void Start() { EnqueueRead(); }
+
+private:
+ struct Slot
+ {
+ std::size_t Size = 0; // valid bytes in buffer
+ bool Ready = false; // has unwritten data
+ bool InUse = false; // being written
+ };
+
+ void OnSendCompleted(const std::error_code& Ec)
+ {
+ // TODO: ensure this behaves properly for instance if the write fails while a read is pending
+
+ if (m_SendCompleted)
+ {
+ return;
+ }
+
+ m_SendCompleted = true;
+
+ // Ensure completion runs on the strand/executor.
+
+ asio::dispatch(m_Strand, [CompletionHandler = std::move(m_CompletionHandler), Ec, TotalBytesWritten = m_TotalBytesWritten]() {
+ CompletionHandler(Ec, TotalBytesWritten);
+ });
+ }
+
+ void EnqueueRead()
+ {
+ asio::dispatch(m_Strand, [self = this->shared_from_this()] {
+ self->TryPostRead();
+ self->PumpWrites();
+ });
+ }
+
+ void TryPostRead()
+ {
+ if (m_IsEof || m_ReadInFlight || m_RemainingBytesToRead == 0)
+ {
+ return;
+ }
+
+ const int ReadSlotIndex = GetFreeSlotIndex();
+ if (ReadSlotIndex < 0)
+ {
+ // no free slot; wait for writer to free one (not meant to ever happen)
+ return;
+ }
+
+ m_ReadInFlight = true;
+ auto ReadBuffer = asio::buffer(m_Buffers[ReadSlotIndex].data(), zen::Min(m_Buffers[ReadSlotIndex].size(), m_RemainingBytesToRead));
+
+ asio::async_read(
+ m_SourceFile,
+ ReadBuffer,
+ asio::bind_executor(m_Strand,
+ [self = this->shared_from_this(), ReadSlotIndex](const std::error_code& Ec, std::size_t ActualBytesRead) {
+ self->m_ReadInFlight = false;
+ self->m_RemainingBytesToRead -= ActualBytesRead;
+
+ if (Ec)
+ {
+ if (Ec == asio::error::eof)
+ {
+ ZEN_ASSERT(self->m_RemainingBytesToRead == 0);
+
+ self->m_IsEof = true;
+
+ // No data produced on EOF; just try to pump whatever is left
+ self->PumpWrites();
+ }
+ else
+ {
+ // read error, cancel everything and let outer completion handler know
+ self->OnSendCompleted(Ec);
+ }
+ }
+ else
+ {
+ // Mark slot as ready with ActualBytesRead valid bytes of data in buffer
+ self->m_Slots[ReadSlotIndex].Size = ActualBytesRead;
+ self->m_Slots[ReadSlotIndex].Ready = true;
+ self->PumpWrites();
+ self->TryPostRead();
+ }
+ }));
+ }
+
+ void PumpWrites()
+ {
+ if (m_WriteInFlight)
+ {
+ return;
+ }
+
+ const int WriteSlotIndex = GetReadySlotIndex();
+ if (WriteSlotIndex < 0)
+ {
+ // No ready data. We're done if EOF/no more data to read and no reads in flight and nothing ready
+ if (!m_ReadInFlight && (m_IsEof || m_RemainingBytesToRead == 0))
+ {
+ // all done
+ return OnSendCompleted({});
+ }
+
+ return;
+ }
+
+ m_WriteInFlight = true;
+ m_Slots[WriteSlotIndex].InUse = true;
+
+ asio::async_write(
+ m_WriteSocket,
+ asio::buffer(m_Buffers[WriteSlotIndex].data(), m_Slots[WriteSlotIndex].Size),
+ asio::bind_executor(m_Strand,
+ [self = this->shared_from_this(), WriteSlotIndex](const std::error_code& Ec, std::size_t BytesWritten) {
+ self->m_TotalBytesWritten += BytesWritten;
+ self->m_WriteInFlight = false;
+
+ if (Ec)
+ {
+ self->OnSendCompleted(Ec);
+
+ return;
+ }
+ else
+ {
+ // Free the slot
+ self->m_Slots[WriteSlotIndex].Ready = false;
+ self->m_Slots[WriteSlotIndex].InUse = false;
+ self->m_Slots[WriteSlotIndex].Size = 0;
+
+ self->TryPostRead();
+ self->PumpWrites();
+ }
+ }));
+ }
+
+ int GetFreeSlotIndex() const
+ {
+ for (int i = 0; i < 2; ++i)
+ {
+ if (!m_Slots[i].Ready && !m_Slots[i].InUse)
+ {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ int GetReadySlotIndex() const
+ {
+ for (int i = 0; i < 2; ++i)
+ {
+ if (m_Slots[i].Ready && !m_Slots[i].InUse)
+ {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ AsyncWriteStream& m_WriteSocket;
+ asio::stream_file m_SourceFile;
+ asio::strand<asio::any_io_executor> m_Strand;
+
+ // There's no synchronization needed for these as all access is via the strand
+ std::vector<char> m_Buffers[2];
+ Slot m_Slots[2];
+
+ bool m_IsEof = false;
+ bool m_ReadInFlight = false;
+ bool m_WriteInFlight = false;
+ bool m_SendCompleted = false;
+
+ const uint64_t m_TotalBytesToRead = 0;
+ uint64_t m_RemainingBytesToRead = 0;
+ uint64_t m_TotalBytesWritten = 0;
+
+ CompletionHandler m_CompletionHandler;
+};
+
+template<class AsyncWriteStream, class CompletionHandler>
+void
+SendFileAsync(AsyncWriteStream& WriteSocket,
+ const auto FileHandle,
+ uint64_t ByteOffset,
+ uint64_t ByteSize,
+ std::size_t BufferSize,
+ CompletionHandler&& CompletionToken)
+{
+ HANDLE hReopenedFile =
+ ReOpenFile(FileHandle, FILE_GENERIC_READ, FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, FILE_FLAG_OVERLAPPED);
+
+ // Note that this assumes ownership of the handle
+ asio::stream_file SourceFile(WriteSocket.get_executor(), hReopenedFile);
+
+ // TODO: handle any error properly here
+ SourceFile.seek(ByteOffset, asio::stream_file::seek_set);
+
+ if (BufferSize > ByteSize)
+ {
+ BufferSize = ByteSize;
+ }
+
+ auto op = std::make_shared<PipelinedFileSender<AsyncWriteStream, std::decay_t<CompletionHandler>>>(
+ WriteSocket,
+ std::move(SourceFile),
+ ByteSize,
+ BufferSize,
+ std::forward<CompletionHandler>(CompletionToken));
+
+ // Start the pipeline
+ op->Start();
+}
+#endif // ZEN_USE_ASYNC_SENDFILE
+
+//////////////////////////////////////////////////////////////////////////
+
struct HttpAsioServerImpl
{
public:
@@ -191,7 +534,7 @@ public:
class HttpAsioServerRequest : public HttpServerRequest
{
public:
- HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer);
+ HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer, uint32_t RequestNumber);
~HttpAsioServerRequest();
virtual Oid ParseSessionId() const override;
@@ -210,6 +553,7 @@ public:
HttpAsioServerRequest& operator=(const HttpAsioServerRequest&) = delete;
HttpRequestParser& m_Request;
+ uint32_t m_RequestNumber = 0; // Note: different to request ID which is derived from headers
IoBuffer m_PayloadBuffer;
std::unique_ptr<HttpResponse> m_Response;
};
@@ -218,7 +562,11 @@ struct HttpResponse
{
public:
HttpResponse() = default;
- explicit HttpResponse(HttpContentType ContentType) : m_ContentType(ContentType) {}
+ explicit HttpResponse(HttpContentType ContentType, uint32_t RequestNumber) : m_RequestNumber(RequestNumber), m_ContentType(ContentType)
+ {
+ }
+
+ ~HttpResponse() = default;
void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> BlobList)
{
@@ -230,57 +578,61 @@ public:
const uint32_t ChunkCount = gsl::narrow<uint32_t>(BlobList.size());
m_DataBuffers.reserve(ChunkCount);
+ m_IoVecCount = ChunkCount + 1 /* one extra buffer for headers */;
+ m_IoVecs.resize(m_IoVecCount);
- for (IoBuffer& Buffer : BlobList)
- {
-#if 1
- m_DataBuffers.emplace_back(std::move(Buffer)).MakeOwned();
-#else
- IoBuffer TempBuffer = std::move(Buffer);
- TempBuffer.MakeOwned();
- m_DataBuffers.emplace_back(IoBufferBuilder::ReadFromFileMaybe(TempBuffer));
-#endif
- }
+ m_IoVecCursor = 0;
uint64_t LocalDataSize = 0;
+ int Index = 1;
- m_AsioBuffers.push_back({}); // Placeholder for header
-
- for (IoBuffer& Buffer : m_DataBuffers)
+ for (IoBuffer& Buffer : BlobList)
{
- uint64_t BufferDataSize = Buffer.Size();
+ const uint64_t BufferDataSize = Buffer.Size();
ZEN_ASSERT(BufferDataSize);
LocalDataSize += BufferDataSize;
- IoBufferFileReference FileRef;
- if (Buffer.GetFileReference(/* out */ FileRef))
- {
- // TODO: Use direct file transfer, via TransmitFile/sendfile
- //
- // this looks like it requires some custom asio plumbing however
+ IoBuffer OwnedBuffer = std::move(Buffer);
+ OwnedBuffer.MakeOwned();
- m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()});
- }
- else
+ IoVec& Io = m_IoVecs[Index++];
+
+ bool ChunkHandled = false;
+
+#if ZEN_USE_TRANSMITFILE || ZEN_USE_ASYNC_SENDFILE
+ if (IoBufferFileReference FileRef; OwnedBuffer.GetFileReference(/* out */ FileRef))
{
- // Send from memory
+ Io.IsFileRef = true;
+ Io.Ref.FileRef = FileRef;
+ ChunkHandled = true;
+ }
+#endif
- m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()});
+ if (!ChunkHandled)
+ {
+ Io.IsFileRef = false;
+ uint32_t Size = gsl::narrow<uint32_t>(OwnedBuffer.Size());
+ Io.Ref.MemoryRef = {OwnedBuffer.Data(), Size};
}
+
+ m_DataBuffers.emplace_back(OwnedBuffer);
}
+
m_ContentLength = LocalDataSize;
std::string_view Headers = GetHeaders();
- m_AsioBuffers[0] = asio::const_buffer(Headers.data(), Headers.size());
+
+ IoVec& Io = m_IoVecs[0];
+
+ Io.IsFileRef = false;
+ Io.Ref.MemoryRef = {.Data = Headers.data(), .Size = gsl::narrow_cast<uint32_t>(Headers.size())};
}
uint16_t ResponseCode() const { return m_ResponseCode; }
uint64_t ContentLength() const { return m_ContentLength; }
- const std::vector<asio::const_buffer>& AsioBuffers() const { return m_AsioBuffers; }
-
std::string_view GetHeaders()
{
ZEN_MEMSCOPE(GetHttpasioTag());
@@ -299,16 +651,137 @@ public:
return m_Headers;
}
- void SuppressPayload() { m_AsioBuffers.resize(1); }
+ void SendResponse(asio::ip::tcp::socket& TcpSocket, std::function<void(const asio::error_code& Ec, std::size_t ByteCount)>&& Token)
+ {
+ m_SendCb = std::move(Token);
+
+ SendNextChunk(TcpSocket);
+ }
+
+ void SendNextChunk(asio::ip::tcp::socket& TcpSocket)
+ {
+ if (m_IoVecCursor == m_IoVecCount)
+ {
+ ZEN_ASSERT(m_SendCb);
+
+ auto CompletionToken = [Self = this, Token = std::move(m_SendCb), TotalBytes = m_TotalBytesSent] { Token({}, TotalBytes); };
+
+ asio::defer(TcpSocket.get_executor(), std::move(CompletionToken));
+
+ return;
+ }
+
+ const IoVec& Io = m_IoVecs[m_IoVecCursor++];
+
+ if (Io.IsFileRef)
+ {
+ ZEN_TRACE_VERBOSE("SendNextChunk from FILE, thread: {}, bytes: {}", zen::GetCurrentThreadId(), Io.Ref.FileRef.FileChunkSize);
+
+#if ZEN_USE_TRANSMITFILE
+ TransmitFileAsync(TcpSocket,
+ Io.Ref.FileRef.FileHandle,
+ Io.Ref.FileRef.FileChunkOffset,
+ gsl::narrow_cast<uint32_t>(Io.Ref.FileRef.FileChunkSize),
+ [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) {
+ m_TotalBytesSent += ByteCount;
+ if (Ec)
+ {
+ m_SendCb(Ec, m_TotalBytesSent);
+ }
+ else
+ {
+ SendNextChunk(TcpSocket);
+ }
+ });
+#elif ZEN_USE_ASYNC_SENDFILE
+ SendFileAsync(TcpSocket,
+ Io.Ref.FileRef.FileHandle,
+ Io.Ref.FileRef.FileChunkOffset,
+ Io.Ref.FileRef.FileChunkSize,
+ 64 * 1024,
+ [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) {
+ m_TotalBytesSent += ByteCount;
+ if (Ec)
+ {
+ m_SendCb(Ec, m_TotalBytesSent);
+ }
+ else
+ {
+ SendNextChunk(TcpSocket);
+ }
+ });
+#else
+ // This should never occur unless we compile with one
+ // of the options above
+ ZEN_ASSERT("invalid file reference in response");
+#endif
+
+ return;
+ }
+
+ // Send as many consecutive non-file references as possible in one asio operation
+
+ std::vector<asio::const_buffer> AsioBuffers;
+ AsioBuffers.push_back(asio::const_buffer{Io.Ref.MemoryRef.Data, Io.Ref.MemoryRef.Size});
+
+ while (m_IoVecCursor != m_IoVecCount)
+ {
+ const IoVec& Io2 = m_IoVecs[m_IoVecCursor];
+
+ if (Io2.IsFileRef)
+ {
+ break;
+ }
+
+ AsioBuffers.push_back(asio::const_buffer{Io2.Ref.MemoryRef.Data, Io2.Ref.MemoryRef.Size});
+ ++m_IoVecCursor;
+ }
+
+ asio::async_write(TcpSocket,
+ std::move(AsioBuffers),
+ asio::transfer_all(),
+ [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) {
+ m_TotalBytesSent += ByteCount;
+ if (Ec)
+ {
+ m_SendCb(Ec, m_TotalBytesSent);
+ }
+ else
+ {
+ SendNextChunk(TcpSocket);
+ }
+ });
+ }
private:
- uint16_t m_ResponseCode = 0;
- bool m_IsKeepAlive = true;
- HttpContentType m_ContentType = HttpContentType::kBinary;
- uint64_t m_ContentLength = 0;
- std::vector<IoBuffer> m_DataBuffers;
- std::vector<asio::const_buffer> m_AsioBuffers;
- ExtendableStringBuilder<160> m_Headers;
+ uint32_t m_RequestNumber = 0;
+ uint16_t m_ResponseCode = 0;
+ bool m_IsKeepAlive = true;
+ HttpContentType m_ContentType = HttpContentType::kBinary;
+ uint64_t m_ContentLength = 0;
+ eastl::fixed_vector<IoBuffer, 8> m_DataBuffers; // This is here to keep the IoBuffer buffers/handles alive
+ ExtendableStringBuilder<160> m_Headers;
+
+ struct IoVec
+ {
+ bool IsFileRef;
+ union
+ {
+ struct MemoryBuffer
+ {
+ const void* Data;
+ uint32_t Size;
+ } MemoryRef;
+ IoBufferFileReference FileRef;
+ } Ref;
+ };
+
+ eastl::fixed_vector<IoVec, 8> m_IoVecs;
+ int m_IoVecCursor = 0;
+ int m_IoVecCount = 0;
+
+ std::function<void(const asio::error_code& Ec, std::size_t ByteCount)> m_SendCb;
+ uint64_t m_TotalBytesSent = 0;
};
//////////////////////////////////////////////////////////////////////////
@@ -339,37 +812,63 @@ private:
kTerminated
};
+ const char* StateToString(RequestState State)
+ {
+ switch (State)
+ {
+ case RequestState::kInitialState:
+ return "InitialState";
+ case RequestState::kInitialRead:
+ return "InitialRead";
+ case RequestState::kReadingMore:
+ return "ReadingMore";
+ case RequestState::kWriting:
+ return "Writing";
+ case RequestState::kWritingFinal:
+ return "WritingFinal";
+ case RequestState::kDone:
+ return "Done";
+ case RequestState::kTerminated:
+ return "Terminated";
+ default:
+ return "Unknown";
+ }
+ }
+
RequestState m_RequestState = RequestState::kInitialState;
HttpRequestParser m_RequestData{*this};
void EnqueueRead();
void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount);
- void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop = false);
+ void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, uint32_t RequestNumber, HttpResponse* ResponseToPop);
void CloseConnection();
- HttpAsioServerImpl& m_Server;
- asio::streambuf m_RequestBuffer;
- std::unique_ptr<asio::ip::tcp::socket> m_Socket;
- std::atomic<uint32_t> m_RequestCounter{0};
- uint32_t m_ConnectionId = 0;
- Ref<IHttpPackageHandler> m_PackageHandler;
+ HttpAsioServerImpl& m_Server;
+ asio::streambuf m_RequestBuffer;
+ std::atomic<uint32_t> m_RequestCounter{0};
+ uint32_t m_ConnectionId = 0;
+ Ref<IHttpPackageHandler> m_PackageHandler;
- RwLock m_ResponsesLock;
- std::deque<std::unique_ptr<HttpResponse>> m_Responses;
+ RwLock m_ActiveResponsesLock;
+ std::deque<std::unique_ptr<HttpResponse>> m_ActiveResponses;
+
+ std::unique_ptr<asio::ip::tcp::socket> m_Socket;
};
std::atomic<uint32_t> g_ConnectionIdCounter{0};
HttpServerConnection::HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr<asio::ip::tcp::socket>&& Socket)
: m_Server(Server)
-, m_Socket(std::move(Socket))
, m_ConnectionId(g_ConnectionIdCounter.fetch_add(1))
+, m_Socket(std::move(Socket))
{
ZEN_TRACE_VERBOSE("new connection #{}", m_ConnectionId);
}
HttpServerConnection::~HttpServerConnection()
{
+ RwLock::ExclusiveLockScope _(m_ActiveResponsesLock);
+
ZEN_TRACE_VERBOSE("destroying connection #{}", m_ConnectionId);
}
@@ -434,7 +933,11 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]
return;
default:
- ZEN_WARN("on data received ERROR, connection: {}, reason '{}'", m_ConnectionId, Ec.message());
+ ZEN_WARN("on data received ERROR, connection: {} (state: {}), reason '{}'",
+ m_ConnectionId,
+ StateToString(m_RequestState),
+ Ec.message());
+
return TerminateConnection();
}
}
@@ -472,37 +975,58 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]
}
void
-HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount, bool Pop)
+HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec,
+ [[maybe_unused]] std::size_t ByteCount,
+ [[maybe_unused]] uint32_t RequestNumber,
+ HttpResponse* ResponseToPop)
{
ZEN_MEMSCOPE(GetHttpasioTag());
if (Ec)
{
- ZEN_WARN("on data sent ERROR, connection: {}, reason: '{}'", m_ConnectionId, Ec.message());
+ ZEN_WARN("on data sent ERROR, connection: {} (state: {}), reason: '{}' (bytes: {})",
+ m_ConnectionId,
+ StateToString(m_RequestState),
+ Ec.message(),
+ ByteCount);
+
TerminateConnection();
+
+ return;
}
- else
- {
- ZEN_TRACE_VERBOSE("on data sent, connection: {}, request: {}, thread: {}, bytes: {}",
- m_ConnectionId,
- m_RequestCounter.load(std::memory_order_relaxed),
- zen::GetCurrentThreadId(),
- NiceBytes(ByteCount));
- if (!m_RequestData.IsKeepAlive())
- {
- CloseConnection();
- }
- else
- {
- if (Pop)
+ ZEN_TRACE_VERBOSE("on data sent, connection: {}, request: {}, thread: {}, bytes: {}",
+ m_ConnectionId,
+ RequestNumber,
+ zen::GetCurrentThreadId(),
+ NiceBytes(ByteCount));
+
+ if (ResponseToPop)
+ {
+ m_ActiveResponsesLock.WithExclusiveLock([&] {
+ // Once a response is sent we can release any referenced resources
+ //
+ // completion callbacks may be issued out-of-order so we need to
+ // remove the relevant entry from our active response list, it may
+ // not be the first
+
+ if (auto It = find_if(begin(m_ActiveResponses),
+ end(m_ActiveResponses),
+ [ResponseToPop](const auto& Item) { return Item.get() == ResponseToPop; });
+ It != end(m_ActiveResponses))
+ {
+ m_ActiveResponses.erase(It);
+ }
+ else
{
- RwLock::ExclusiveLockScope _(m_ResponsesLock);
- m_Responses.pop_front();
+ ZEN_WARN("response not found");
}
+ });
+ }
- m_RequestCounter.fetch_add(1);
- }
+ if (!m_RequestData.IsKeepAlive())
+ {
+ CloseConnection();
}
}
@@ -553,13 +1077,13 @@ HttpServerConnection::HandleRequest()
m_RequestState = RequestState::kWriting;
}
+ const uint32_t RequestNumber = m_RequestCounter.fetch_add(1);
+
if (HttpService* Service = m_Server.RouteRequest(m_RequestData.Url()))
{
ZEN_TRACE_CPU("asio::HandleRequest");
- const uint32_t RequestNumber = m_RequestCounter.load(std::memory_order_relaxed);
-
- HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body());
+ HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body(), RequestNumber);
ZEN_TRACE_VERBOSE("handle request, connection: {}, request: {}'", m_ConnectionId, RequestNumber);
@@ -635,34 +1159,38 @@ HttpServerConnection::HandleRequest()
if (m_RequestData.RequestVerb() == HttpVerb::kHead)
{
- Response->SuppressPayload();
- }
+ ZEN_TRACE_CPU("asio::async_write");
- const std::vector<asio::const_buffer>& ResponseBuffers = Response->AsioBuffers();
+ std::string_view Headers = Response->GetHeaders();
- uint64_t ResponseLength = 0;
+ std::vector<asio::const_buffer> AsioBuffers;
+ AsioBuffers.push_back(asio::const_buffer(Headers.data(), Headers.size()));
- for (const asio::const_buffer& Buffer : ResponseBuffers)
- {
- ResponseLength += Buffer.size();
+ asio::async_write(*m_Socket.get(),
+ AsioBuffers,
+ asio::transfer_all(),
+ [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) {
+ Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr);
+ });
}
-
+ else
{
- RwLock::ExclusiveLockScope _(m_ResponsesLock);
- m_Responses.push_back(std::move(Response));
- }
+ ZEN_TRACE_CPU("asio::async_write");
- // TODO: should cork/uncork for Linux?
+ HttpResponse* ResponseRaw = Response.get();
- {
- ZEN_TRACE_CPU("asio::async_write");
- asio::async_write(*m_Socket.get(),
- ResponseBuffers,
- asio::transfer_exactly(ResponseLength),
- [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) {
- Conn->OnResponseDataSent(Ec, ByteCount, true);
- });
+ m_ActiveResponsesLock.WithExclusiveLock([&] {
+ // Keep referenced resources alive
+ m_ActiveResponses.push_back(std::move(Response));
+ });
+
+ ResponseRaw->SendResponse(
+ *m_Socket,
+ [Conn = AsSharedPtr(), ResponseRaw, RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) {
+ Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ ResponseRaw);
+ });
}
+
return;
}
}
@@ -681,10 +1209,11 @@ HttpServerConnection::HandleRequest()
"\r\n"sv;
}
- asio::async_write(
- *m_Socket.get(),
- asio::buffer(Response),
- [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); });
+ asio::async_write(*m_Socket.get(),
+ asio::buffer(Response),
+ [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) {
+ Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr);
+ });
}
else
{
@@ -706,10 +1235,11 @@ HttpServerConnection::HandleRequest()
"No suitable route found"sv;
}
- asio::async_write(
- *m_Socket.get(),
- asio::buffer(Response),
- [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); });
+ asio::async_write(*m_Socket.get(),
+ asio::buffer(Response),
+ [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) {
+ Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr);
+ });
}
}
@@ -1016,9 +1546,13 @@ private:
//////////////////////////////////////////////////////////////////////////
-HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer)
+HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request,
+ HttpService& Service,
+ IoBuffer PayloadBuffer,
+ uint32_t RequestNumber)
: HttpServerRequest(Service)
, m_Request(Request)
+, m_RequestNumber(RequestNumber)
, m_PayloadBuffer(std::move(PayloadBuffer))
{
const int PrefixLength = Service.UriPrefixLength();
@@ -1104,7 +1638,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode)
ZEN_ASSERT(!m_Response);
- m_Response.reset(new HttpResponse(HttpContentType::kBinary));
+ m_Response.reset(new HttpResponse(HttpContentType::kBinary, m_RequestNumber));
std::array<IoBuffer, 0> Empty;
m_Response->InitializeForPayload((uint16_t)ResponseCode, Empty);
@@ -1117,7 +1651,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT
ZEN_ASSERT(!m_Response);
- m_Response.reset(new HttpResponse(ContentType));
+ m_Response.reset(new HttpResponse(ContentType, m_RequestNumber));
m_Response->InitializeForPayload((uint16_t)ResponseCode, Blobs);
}
@@ -1127,7 +1661,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT
ZEN_MEMSCOPE(GetHttpasioTag());
ZEN_ASSERT(!m_Response);
- m_Response.reset(new HttpResponse(ContentType));
+ m_Response.reset(new HttpResponse(ContentType, m_RequestNumber));
IoBuffer MessageBuffer(IoBuffer::Wrap, ResponseString.data(), ResponseString.size());
std::array<IoBuffer, 1> SingleBufferList({MessageBuffer});
diff --git a/src/zenserver/diag/diagsvcs.cpp b/src/zenserver/diag/diagsvcs.cpp
index 8abf6e8a3..d8d53b0e3 100644
--- a/src/zenserver/diag/diagsvcs.cpp
+++ b/src/zenserver/diag/diagsvcs.cpp
@@ -28,30 +28,6 @@ GetHealthTag()
using namespace std::literals;
-static bool
-ReadLogFile(const std::string& Path, StringBuilderBase& Out)
-{
- try
- {
- constexpr auto ReadSize = std::size_t{4096};
- auto FileStream = std::ifstream{Path};
-
- std::string Buf(ReadSize, '\0');
- while (FileStream.read(&Buf[0], ReadSize))
- {
- Out.Append(std::string_view(&Buf[0], FileStream.gcount()));
- }
- Out.Append(std::string_view(&Buf[0], FileStream.gcount()));
-
- return true;
- }
- catch (const std::exception&)
- {
- Out.Reset();
- return false;
- }
-}
-
HttpHealthService::HttpHealthService()
{
ZEN_MEMSCOPE(GetHealthTag());
@@ -95,10 +71,9 @@ HttpHealthService::HttpHealthService()
return m_HealthInfo.AbsLogPath.empty() ? m_HealthInfo.DataRoot / "logs/zenserver.log" : m_HealthInfo.AbsLogPath;
}();
- ExtendableStringBuilder<4096> Sb;
- if (ReadLogFile(Path.string(), Sb) && Sb.Size() > 0)
+ if (IoBuffer LogBuffer = IoBufferBuilder::MakeFromFile(Path))
{
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, Sb.ToView());
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, LogBuffer);
}
else
{