aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-02-04 10:23:36 +0100
committerGitHub Enterprise <[email protected]>2026-02-04 10:23:36 +0100
commit2ab9d9f70ff2db69f2e8b731fbea4ec825c26dbb (patch)
tree90a82170358d75bcd8bc2c83849d61782f3c3167
parentMerge pull request #724 from ue-foundation/lm/restrict-reads-to-project (diff)
downloadzen-2ab9d9f70ff2db69f2e8b731fbea4ec825c26dbb.tar.xz
zen-2ab9d9f70ff2db69f2e8b731fbea4ec825c26dbb.zip
implemented chunking for TransmitFile path (#736)
* implemented chunking for TransmitFile path, to ensure payloads exceeding the TransmitFile API limit of 4GB can be handled * also fixes similar issue with memory path
-rw-r--r--CHANGELOG.md2
-rw-r--r--src/zenhttp/servers/httpasio.cpp177
2 files changed, 121 insertions, 58 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index da3e9c81c..187c35ddf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,11 +3,13 @@
- Improvement: Reduce maximum size per chunk to read to reduce disk contention
- Improvement: Reduce default size for block store chunk read window when iterating chunks
- Improvement: Increase timeout before warning on slow shut down of zenserver
+- Improvement: On Windows, `TransmitFile` is used to transfer data directly from files when possible
- Improvement: Revise logic for enabling work backlog during `zen builds download`
- Improvement: Added `--maxtimeslice` option to `zen scrub` command to control how long a scrub operation may run
- Improvement: Increased the default scrub timeslice from 1 min 40 sec to 5 min.
- Improvement: Reduce lock contention when performing a scrub operation
- Bugfix: Restore `/health/log` and `/health/info` endpoint functionality
+- Bugfix: Fixed 32-bit truncation of transmission chunk sizes when using the asio http path
## 5.7.19
- Feature: `zen builds upload` now support structure manifest input for `--manifest-path` when the path has a `.json` extension enabling multi-part upload
diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp
index b11eebc1a..9666a1d28 100644
--- a/src/zenhttp/servers/httpasio.cpp
+++ b/src/zenhttp/servers/httpasio.cpp
@@ -163,7 +163,7 @@ Log()
#endif
#if defined(ASIO_HAS_WINDOWS_OVERLAPPED_PTR)
-# define ZEN_USE_TRANSMITFILE 0
+# define ZEN_USE_TRANSMITFILE 1
# define ZEN_USE_ASYNC_SENDFILE 0
#else
# define ZEN_USE_TRANSMITFILE 0
@@ -558,6 +558,14 @@ public:
std::unique_ptr<HttpResponse> m_Response;
};
+/**
+ * HTTP Response representation used internally by the ASIO server
+ *
+ * This is used to build up the response headers and payload prior to sending
+ * it over the network. It's also responsible for managing the send operation itself,
+ * including ownership of the source buffers until the operation completes.
+ *
+ */
struct HttpResponse
{
public:
@@ -568,8 +576,18 @@ public:
~HttpResponse() = default;
+ /**
+ * Initialize the response for sending a payload made up of multiple blobs
+ *
+ * This builds the necessary headers and IO vectors for sending the response
+ * and also makes sure all buffers are owned for the duration of the
+ * operation.
+ *
+ */
void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> BlobList)
{
+ ZEN_ASSERT(m_State == State::kUninitialized);
+
ZEN_MEMSCOPE(GetHttpasioTag());
ZEN_TRACE_CPU("asio::InitializeForPayload");
@@ -578,13 +596,13 @@ public:
const uint32_t ChunkCount = gsl::narrow<uint32_t>(BlobList.size());
m_DataBuffers.reserve(ChunkCount);
- m_IoVecCount = ChunkCount + 1 /* one extra buffer for headers */;
- m_IoVecs.resize(m_IoVecCount);
+ m_IoVecs.reserve(ChunkCount + 1);
+
+ m_IoVecs.emplace_back(); // header IoVec
m_IoVecCursor = 0;
uint64_t LocalDataSize = 0;
- int Index = 1;
for (IoBuffer& Buffer : BlobList)
{
@@ -597,37 +615,71 @@ public:
IoBuffer OwnedBuffer = std::move(Buffer);
OwnedBuffer.MakeOwned();
- IoVec& Io = m_IoVecs[Index++];
-
bool ChunkHandled = false;
#if ZEN_USE_TRANSMITFILE || ZEN_USE_ASYNC_SENDFILE
if (IoBufferFileReference FileRef; OwnedBuffer.GetFileReference(/* out */ FileRef))
{
- Io.IsFileRef = true;
- Io.Ref.FileRef = FileRef;
- ChunkHandled = true;
+ // Since there's a limit to how much data TransmitFile can send in one go,
+ // we may need to split this into multiple IoVec entries. In this case we'll
+ // end up reallocating the IoVec array, but this should be rare.
+
+ uint64_t RemainingChunkBytes = FileRef.FileChunkSize;
+ uint64_t ChunkOffset = FileRef.FileChunkOffset;
+
+ const uint32_t MaxTransmitSize = 1 * 1024 * 1024 * 1024; // 1 GB
+
+ while (RemainingChunkBytes)
+ {
+ IoVec Io{.IsFileRef = true};
+
+ Io.Ref.FileRef.FileHandle = FileRef.FileHandle;
+ Io.Ref.FileRef.FileChunkOffset = ChunkOffset;
+
+ if (RemainingChunkBytes > MaxTransmitSize)
+ {
+ Io.Ref.FileRef.FileChunkSize = MaxTransmitSize;
+ RemainingChunkBytes -= MaxTransmitSize;
+ }
+ else
+ {
+ Io.Ref.FileRef.FileChunkSize = gsl::narrow<uint32_t>(RemainingChunkBytes);
+ RemainingChunkBytes = 0;
+ }
+
+ ChunkOffset += Io.Ref.FileRef.FileChunkSize;
+
+ m_IoVecs.push_back(Io);
+ }
+
+ ChunkHandled = true;
}
#endif
if (!ChunkHandled)
{
- Io.IsFileRef = false;
- uint32_t Size = gsl::narrow<uint32_t>(OwnedBuffer.Size());
- Io.Ref.MemoryRef = {OwnedBuffer.Data(), Size};
+ IoVec Io{.IsFileRef = false};
+
+ Io.Ref.MemoryRef = {.Data = OwnedBuffer.Data(), .Size = OwnedBuffer.Size()};
+
+ m_IoVecs.push_back(Io);
}
- m_DataBuffers.emplace_back(OwnedBuffer);
+ m_DataBuffers.push_back(std::move(OwnedBuffer));
}
+ // Now that we know the full data size, we can build the headers
+
m_ContentLength = LocalDataSize;
std::string_view Headers = GetHeaders();
- IoVec& Io = m_IoVecs[0];
+ IoVec& HeaderIo = m_IoVecs[0];
- Io.IsFileRef = false;
- Io.Ref.MemoryRef = {.Data = Headers.data(), .Size = gsl::narrow_cast<uint32_t>(Headers.size())};
+ HeaderIo.IsFileRef = false;
+ HeaderIo.Ref.MemoryRef = {.Data = Headers.data(), .Size = Headers.size()};
+
+ m_State = State::kInitialized;
}
uint16_t ResponseCode() const { return m_ResponseCode; }
@@ -653,17 +705,32 @@ public:
void SendResponse(asio::ip::tcp::socket& TcpSocket, std::function<void(const asio::error_code& Ec, std::size_t ByteCount)>&& Token)
{
+ ZEN_ASSERT(m_State == State::kInitialized);
+
+ ZEN_MEMSCOPE(GetHttpasioTag());
+ ZEN_TRACE_CPU("asio::SendResponse");
+
m_SendCb = std::move(Token);
+ m_State = State::kSending;
SendNextChunk(TcpSocket);
}
void SendNextChunk(asio::ip::tcp::socket& TcpSocket)
{
- if (m_IoVecCursor == m_IoVecCount)
+ ZEN_ASSERT(m_State == State::kSending);
+
+ ZEN_MEMSCOPE(GetHttpasioTag());
+ ZEN_TRACE_CPU("asio::SendNextChunk");
+
+ if (m_IoVecCursor == m_IoVecs.size())
{
+ // All data sent, complete the operation
+
ZEN_ASSERT(m_SendCb);
+ m_State = State::kSent;
+
auto CompletionToken = [Self = this, Token = std::move(m_SendCb), TotalBytes = m_TotalBytesSent] { Token({}, TotalBytes); };
asio::defer(TcpSocket.get_executor(), std::move(CompletionToken));
@@ -671,45 +738,43 @@ public:
return;
}
+ auto OnCompletion = [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) {
+ ZEN_ASSERT(m_State == State::kSending);
+
+ m_TotalBytesSent += ByteCount;
+ if (Ec)
+ {
+ m_State = State::kFailed;
+ m_SendCb(Ec, m_TotalBytesSent);
+ }
+ else
+ {
+ SendNextChunk(TcpSocket);
+ }
+ };
+
const IoVec& Io = m_IoVecs[m_IoVecCursor++];
if (Io.IsFileRef)
{
- ZEN_TRACE_VERBOSE("SendNextChunk from FILE, thread: {}, bytes: {}", zen::GetCurrentThreadId(), Io.Ref.FileRef.FileChunkSize);
+ ZEN_TRACE_VERBOSE("SendNextChunk from FILE, thread: {}, offset: {}, bytes: {}",
+ zen::GetCurrentThreadId(),
+ Io.Ref.FileRef.FileChunkOffset,
+ Io.Ref.FileRef.FileChunkSize);
#if ZEN_USE_TRANSMITFILE
TransmitFileAsync(TcpSocket,
Io.Ref.FileRef.FileHandle,
Io.Ref.FileRef.FileChunkOffset,
gsl::narrow_cast<uint32_t>(Io.Ref.FileRef.FileChunkSize),
- [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) {
- m_TotalBytesSent += ByteCount;
- if (Ec)
- {
- m_SendCb(Ec, m_TotalBytesSent);
- }
- else
- {
- SendNextChunk(TcpSocket);
- }
- });
+ OnCompletion);
#elif ZEN_USE_ASYNC_SENDFILE
SendFileAsync(TcpSocket,
Io.Ref.FileRef.FileHandle,
Io.Ref.FileRef.FileChunkOffset,
Io.Ref.FileRef.FileChunkSize,
64 * 1024,
- [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) {
- m_TotalBytesSent += ByteCount;
- if (Ec)
- {
- m_SendCb(Ec, m_TotalBytesSent);
- }
- else
- {
- SendNextChunk(TcpSocket);
- }
- });
+ OnCompletion);
#else
// This should never occur unless we compile with one
// of the options above
@@ -724,7 +789,7 @@ public:
std::vector<asio::const_buffer> AsioBuffers;
AsioBuffers.push_back(asio::const_buffer{Io.Ref.MemoryRef.Data, Io.Ref.MemoryRef.Size});
- while (m_IoVecCursor != m_IoVecCount)
+ while (m_IoVecCursor != m_IoVecs.size())
{
const IoVec& Io2 = m_IoVecs[m_IoVecCursor];
@@ -737,26 +802,23 @@ public:
++m_IoVecCursor;
}
- asio::async_write(TcpSocket,
- std::move(AsioBuffers),
- asio::transfer_all(),
- [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) {
- m_TotalBytesSent += ByteCount;
- if (Ec)
- {
- m_SendCb(Ec, m_TotalBytesSent);
- }
- else
- {
- SendNextChunk(TcpSocket);
- }
- });
+ asio::async_write(TcpSocket, std::move(AsioBuffers), asio::transfer_all(), OnCompletion);
}
private:
+ enum class State : uint8_t
+ {
+ kUninitialized,
+ kInitialized,
+ kSending,
+ kSent,
+ kFailed
+ };
+
uint32_t m_RequestNumber = 0;
uint16_t m_ResponseCode = 0;
bool m_IsKeepAlive = true;
+ State m_State = State::kUninitialized;
HttpContentType m_ContentType = HttpContentType::kBinary;
uint64_t m_ContentLength = 0;
eastl::fixed_vector<IoBuffer, 8> m_DataBuffers; // This is here to keep the IoBuffer buffers/handles alive
@@ -770,15 +832,14 @@ private:
struct MemoryBuffer
{
const void* Data;
- uint32_t Size;
+ uint64_t Size;
} MemoryRef;
IoBufferFileReference FileRef;
} Ref;
};
eastl::fixed_vector<IoVec, 8> m_IoVecs;
- int m_IoVecCursor = 0;
- int m_IoVecCount = 0;
+ unsigned int m_IoVecCursor = 0;
std::function<void(const asio::error_code& Ec, std::size_t ByteCount)> m_SendCb;
uint64_t m_TotalBytesSent = 0;