aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/clients
diff options
context:
space:
mode:
authorLiam Mitchell <[email protected]>2026-03-09 19:06:36 -0700
committerLiam Mitchell <[email protected]>2026-03-09 19:06:36 -0700
commitd1abc50ee9d4fb72efc646e17decafea741caa34 (patch)
treee4288e00f2f7ca0391b83d986efcb69d3ba66a83 /src/zenhttp/clients
parentAllow requests with invalid content-types unless specified in command line or... (diff)
parentupdated chunk–block analyser (#818) (diff)
downloadzen-d1abc50ee9d4fb72efc646e17decafea741caa34.tar.xz
zen-d1abc50ee9d4fb72efc646e17decafea741caa34.zip
Merge branch 'main' into lm/restrict-content-type
Diffstat (limited to 'src/zenhttp/clients')
-rw-r--r--src/zenhttp/clients/httpclientcommon.cpp323
-rw-r--r--src/zenhttp/clients/httpclientcommon.h115
-rw-r--r--src/zenhttp/clients/httpclientcpr.cpp579
-rw-r--r--src/zenhttp/clients/httpclientcpr.h15
-rw-r--r--src/zenhttp/clients/httpwsclient.cpp566
5 files changed, 1344 insertions, 254 deletions
diff --git a/src/zenhttp/clients/httpclientcommon.cpp b/src/zenhttp/clients/httpclientcommon.cpp
index 47425e014..6f4c67dd0 100644
--- a/src/zenhttp/clients/httpclientcommon.cpp
+++ b/src/zenhttp/clients/httpclientcommon.cpp
@@ -142,7 +142,10 @@ namespace detail {
DataSize -= CopySize;
if (m_CacheBufferOffset == CacheBufferSize)
{
- AppendData(m_CacheBuffer, CacheBufferSize);
+ if (std::error_code Ec = AppendData(m_CacheBuffer, CacheBufferSize))
+ {
+ return Ec;
+ }
if (DataSize > 0)
{
ZEN_ASSERT(DataSize < CacheBufferSize);
@@ -382,6 +385,177 @@ namespace detail {
return Result;
}
+ MultipartBoundaryParser::MultipartBoundaryParser() : BoundaryEndMatcher("--"), HeaderEndMatcher("\r\n\r\n") {}
+
+ bool MultipartBoundaryParser::Init(const std::string_view ContentTypeHeaderValue)
+ {
+ std::string LowerCaseValue = ToLower(ContentTypeHeaderValue);
+ if (LowerCaseValue.starts_with("multipart/byteranges"))
+ {
+ size_t BoundaryPos = LowerCaseValue.find("boundary=");
+ if (BoundaryPos != std::string::npos)
+ {
+ // Yes, we do a substring of the non-lowercase value string as we want the exact boundary string
+ std::string_view BoundaryName = std::string_view(ContentTypeHeaderValue).substr(BoundaryPos + 9);
+ size_t BoundaryEnd = std::string::npos;
+ while (!BoundaryName.empty() && BoundaryName[0] == ' ')
+ {
+ BoundaryName = BoundaryName.substr(1);
+ }
+ if (!BoundaryName.empty())
+ {
+ if (BoundaryName.size() > 2 && BoundaryName.front() == '"' && BoundaryName.back() == '"')
+ {
+ BoundaryEnd = BoundaryName.find('"', 1);
+ if (BoundaryEnd != std::string::npos)
+ {
+ BoundaryBeginMatcher.Init(fmt::format("\r\n--{}", BoundaryName.substr(1, BoundaryEnd - 1)));
+ return true;
+ }
+ }
+ else
+ {
+ BoundaryEnd = BoundaryName.find_first_of(" \r\n");
+ BoundaryBeginMatcher.Init(fmt::format("\r\n--{}", BoundaryName.substr(0, BoundaryEnd)));
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ void MultipartBoundaryParser::ParseInput(std::string_view data)
+ {
+ const char* InputPtr = data.data();
+ size_t InputLength = data.length();
+ size_t ScanPos = 0;
+ while (ScanPos < InputLength)
+ {
+ const char ScanChar = InputPtr[ScanPos];
+ if (BoundaryBeginMatcher.MatchState == IncrementalStringMatcher::EMatchState::Complete)
+ {
+ if (PayloadOffset + ScanPos < (BoundaryBeginMatcher.GetMatchEndOffset() + BoundaryEndMatcher.GetMatchString().length()))
+ {
+ BoundaryEndMatcher.Match(PayloadOffset + ScanPos, ScanChar);
+ if (BoundaryEndMatcher.MatchState == IncrementalStringMatcher::EMatchState::Complete)
+ {
+ BoundaryBeginMatcher.Reset();
+ HeaderEndMatcher.Reset();
+ BoundaryEndMatcher.Reset();
+ BoundaryHeader.Reset();
+ break;
+ }
+ }
+
+ BoundaryHeader.Append(ScanChar);
+
+ HeaderEndMatcher.Match(PayloadOffset + ScanPos, ScanChar);
+
+ if (HeaderEndMatcher.MatchState == IncrementalStringMatcher::EMatchState::Complete)
+ {
+ const uint64_t HeaderStartOffset = BoundaryBeginMatcher.GetMatchEndOffset();
+ const uint64_t HeaderEndOffset = HeaderEndMatcher.GetMatchStartOffset();
+ const uint64_t HeaderLength = HeaderEndOffset - HeaderStartOffset;
+ std::string_view HeaderText(BoundaryHeader.ToView().substr(0, HeaderLength));
+
+ uint64_t OffsetInPayload = PayloadOffset + ScanPos + 1;
+
+ uint64_t RangeOffset = 0;
+ uint64_t RangeLength = 0;
+ HttpContentType ContentType = HttpContentType::kBinary;
+
+ ForEachStrTok(HeaderText, "\r\n", [&](std::string_view Line) {
+ const std::pair<std::string_view, std::string_view> KeyAndValue = GetHeaderKeyAndValue(Line);
+ const std::string_view Key = KeyAndValue.first;
+ const std::string_view Value = KeyAndValue.second;
+ if (Key == "Content-Range")
+ {
+ std::pair<uint64_t, uint64_t> ContentRange = ParseContentRange(Value);
+ if (ContentRange.second != 0)
+ {
+ RangeOffset = ContentRange.first;
+ RangeLength = ContentRange.second;
+ }
+ }
+ else if (Key == "Content-Type")
+ {
+ ContentType = ParseContentType(Value);
+ }
+
+ return true;
+ });
+
+ if (RangeLength > 0)
+ {
+ Boundaries.push_back(HttpClient::Response::MultipartBoundary{.OffsetInPayload = OffsetInPayload,
+ .RangeOffset = RangeOffset,
+ .RangeLength = RangeLength,
+ .ContentType = ContentType});
+ }
+
+ BoundaryBeginMatcher.Reset();
+ HeaderEndMatcher.Reset();
+ BoundaryEndMatcher.Reset();
+ BoundaryHeader.Reset();
+ }
+ }
+ else
+ {
+ BoundaryBeginMatcher.Match(PayloadOffset + ScanPos, ScanChar);
+ }
+ ScanPos++;
+ }
+ PayloadOffset += InputLength;
+ }
+
+ std::pair<std::string_view, std::string_view> GetHeaderKeyAndValue(std::string_view HeaderString)
+ {
+ size_t DelimiterPos = HeaderString.find(':');
+ if (DelimiterPos != std::string::npos)
+ {
+ std::string_view Key = HeaderString.substr(0, DelimiterPos);
+ constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n");
+ Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters);
+ Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters);
+
+ std::string_view Value = HeaderString.substr(DelimiterPos + 1);
+ Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters);
+ Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters);
+ return std::make_pair(Key, Value);
+ }
+ return std::make_pair(HeaderString, std::string_view{});
+ }
+
+ std::pair<uint64_t, uint64_t> ParseContentRange(std::string_view Value)
+ {
+ if (Value.starts_with("bytes "))
+ {
+ size_t RangeSplitPos = Value.find('-', 6);
+ if (RangeSplitPos != std::string::npos)
+ {
+ size_t RangeEndLength = Value.find('/', RangeSplitPos + 1);
+ if (RangeEndLength == std::string::npos)
+ {
+ RangeEndLength = Value.length() - (RangeSplitPos + 1);
+ }
+ else
+ {
+ RangeEndLength = RangeEndLength - (RangeSplitPos + 1);
+ }
+ std::optional<size_t> RequestedRangeStart = ParseInt<size_t>(Value.substr(6, RangeSplitPos - 6));
+ std::optional<size_t> RequestedRangeEnd = ParseInt<size_t>(Value.substr(RangeSplitPos + 1, RangeEndLength));
+ if (RequestedRangeStart.has_value() && RequestedRangeEnd.has_value())
+ {
+ uint64_t RangeOffset = RequestedRangeStart.value();
+ uint64_t RangeLength = RequestedRangeEnd.value() - RangeOffset + 1;
+ return std::make_pair(RangeOffset, RangeLength);
+ }
+ }
+ }
+ return {0, 0};
+ }
+
} // namespace detail
} // namespace zen
@@ -423,6 +597,8 @@ namespace testutil {
} // namespace testutil
+TEST_SUITE_BEGIN("http.httpclientcommon");
+
TEST_CASE("BufferedReadFileStream")
{
ScopedTemporaryDirectory TmpDir;
@@ -470,5 +646,150 @@ TEST_CASE("CompositeBufferReadStream")
CHECK_EQ(IoHash::HashBuffer(Data), testutil::HashComposite(Data));
}
+TEST_CASE("MultipartBoundaryParser")
+{
+ uint64_t Range1Offset = 2638;
+ uint64_t Range1Length = (5111437 - Range1Offset) + 1;
+
+ uint64_t Range2Offset = 5118199;
+ uint64_t Range2Length = (9147741 - Range2Offset) + 1;
+
+ std::string_view ContentTypeHeaderValue1 = "multipart/byteranges; boundary=00000000000000019229";
+ std::string_view ContentTypeHeaderValue2 = "multipart/byteranges; boundary=\"00000000000000019229\"";
+
+ {
+ std::string_view Example1 =
+ "\r\n--00000000000000019229\r\n"
+ "Content-Type: application/x-ue-comp\r\n"
+ "Content-Range: bytes 2638-5111437/44369878\r\n"
+ "\r\n"
+ "datadatadatadata"
+ "\r\n--00000000000000019229\r\n"
+ "Content-Type: application/x-ue-comp\r\n"
+ "Content-Range: bytes 5118199-9147741/44369878\r\n"
+ "\r\n"
+ "ditaditadita"
+ "\r\n--00000000000000019229--";
+
+ detail::MultipartBoundaryParser ParserExample1;
+ ParserExample1.Init(ContentTypeHeaderValue1);
+
+ const size_t InputWindow = 7;
+ for (size_t Offset = 0; Offset < Example1.length(); Offset += InputWindow)
+ {
+ ParserExample1.ParseInput(Example1.substr(Offset, Min(Example1.length() - Offset, InputWindow)));
+ }
+
+ CHECK(ParserExample1.Boundaries.size() == 2);
+
+ CHECK(ParserExample1.Boundaries[0].RangeOffset == Range1Offset);
+ CHECK(ParserExample1.Boundaries[0].RangeLength == Range1Length);
+ CHECK(ParserExample1.Boundaries[1].RangeOffset == Range2Offset);
+ CHECK(ParserExample1.Boundaries[1].RangeLength == Range2Length);
+ }
+
+ {
+ std::string_view Example2 =
+ "\r\n--00000000000000019229\r\n"
+ "Content-Type: application/x-ue-comp\r\n"
+ "Content-Range: bytes 2638-5111437/*\r\n"
+ "\r\n"
+ "datadatadatadata"
+ "\r\n--00000000000000019229\r\n"
+ "Content-Type: application/x-ue-comp\r\n"
+ "Content-Range: bytes 5118199-9147741/44369878\r\n"
+ "\r\n"
+ "ditaditadita"
+ "\r\n--00000000000000019229--";
+
+ detail::MultipartBoundaryParser ParserExample2;
+ ParserExample2.Init(ContentTypeHeaderValue1);
+
+ const size_t InputWindow = 3;
+ for (size_t Offset = 0; Offset < Example2.length(); Offset += InputWindow)
+ {
+ std::string_view Window = Example2.substr(Offset, Min(Example2.length() - Offset, InputWindow));
+ ParserExample2.ParseInput(Window);
+ }
+
+ CHECK(ParserExample2.Boundaries.size() == 2);
+
+ CHECK(ParserExample2.Boundaries[0].RangeOffset == Range1Offset);
+ CHECK(ParserExample2.Boundaries[0].RangeLength == Range1Length);
+ CHECK(ParserExample2.Boundaries[1].RangeOffset == Range2Offset);
+ CHECK(ParserExample2.Boundaries[1].RangeLength == Range2Length);
+ }
+
+ {
+ std::string_view Example3 =
+ "\r\n--00000000000000019229\r\n"
+ "Content-Type: application/x-ue-comp\r\n"
+ "Content-Range: bytes 2638-5111437/*\r\n"
+ "\r\n"
+ "datadatadatadata"
+ "\r\n--00000000000000019229\r\n"
+ "Content-Type: application/x-ue-comp\r\n"
+ "Content-Range: bytes 5118199-9147741/44369878\r\n"
+ "\r\n"
+ "ditaditadita";
+
+ detail::MultipartBoundaryParser ParserExample3;
+ ParserExample3.Init(ContentTypeHeaderValue2);
+
+ const size_t InputWindow = 31;
+ for (size_t Offset = 0; Offset < Example3.length(); Offset += InputWindow)
+ {
+ ParserExample3.ParseInput(Example3.substr(Offset, Min(Example3.length() - Offset, InputWindow)));
+ }
+
+ CHECK(ParserExample3.Boundaries.size() == 2);
+
+ CHECK(ParserExample3.Boundaries[0].RangeOffset == Range1Offset);
+ CHECK(ParserExample3.Boundaries[0].RangeLength == Range1Length);
+ CHECK(ParserExample3.Boundaries[1].RangeOffset == Range2Offset);
+ CHECK(ParserExample3.Boundaries[1].RangeLength == Range2Length);
+ }
+
+ {
+ std::string_view Example4 =
+ "\r\n--00000000000000019229\r\n"
+ "Content-Type: application/x-ue-comp\r\n"
+ "Content-Range: bytes 2638-5111437/*\r\n"
+ "Not: really\r\n"
+ "\r\n"
+ "datadatadatadata"
+ "\r\n--000000000bait0019229\r\n"
+ "\r\n--00\r\n--000000000bait001922\r\n"
+ "\r\n\r\n\r\r\n--00000000000000019229\r\n"
+ "Content-Type: application/x-ue-comp\r\n"
+ "Content-Range: bytes 5118199-9147741/44369878\r\n"
+ "\r\n"
+ "ditaditadita"
+ "Content-Type: application/x-ue-comp\r\n"
+ "ditaditadita"
+ "Content-Range: bytes 5118199-9147741/44369878\r\n"
+ "\r\n---\r\n--00000000000000019229--";
+
+ detail::MultipartBoundaryParser ParserExample4;
+ ParserExample4.Init(ContentTypeHeaderValue1);
+
+ const size_t InputWindow = 3;
+ for (size_t Offset = 0; Offset < Example4.length(); Offset += InputWindow)
+ {
+ std::string_view Window = Example4.substr(Offset, Min(Example4.length() - Offset, InputWindow));
+ ParserExample4.ParseInput(Window);
+ }
+
+ CHECK(ParserExample4.Boundaries.size() == 2);
+
+ CHECK(ParserExample4.Boundaries[0].RangeOffset == Range1Offset);
+ CHECK(ParserExample4.Boundaries[0].RangeLength == Range1Length);
+ CHECK(ParserExample4.Boundaries[1].RangeOffset == Range2Offset);
+ CHECK(ParserExample4.Boundaries[1].RangeLength == Range2Length);
+ }
+}
+
+TEST_SUITE_END();
+
} // namespace zen
#endif
diff --git a/src/zenhttp/clients/httpclientcommon.h b/src/zenhttp/clients/httpclientcommon.h
index 1d0b7f9ea..5ed946541 100644
--- a/src/zenhttp/clients/httpclientcommon.h
+++ b/src/zenhttp/clients/httpclientcommon.h
@@ -3,6 +3,7 @@
#pragma once
#include <zencore/compositebuffer.h>
+#include <zencore/string.h>
#include <zencore/trace.h>
#include <zenhttp/httpclient.h>
@@ -87,7 +88,7 @@ namespace detail {
std::error_code Write(std::string_view DataString);
IoBuffer DetachToIoBuffer();
IoBuffer BorrowIoBuffer();
- inline uint64_t GetSize() const { return m_WriteOffset; }
+ inline uint64_t GetSize() const { return m_WriteOffset + m_CacheBufferOffset; }
void ResetWritePos(uint64_t WriteOffset);
private:
@@ -143,6 +144,118 @@ namespace detail {
uint64_t m_BytesLeftInSegment;
};
+ class IncrementalStringMatcher
+ {
+ public:
+ enum class EMatchState
+ {
+ None,
+ Partial,
+ Complete
+ };
+
+ EMatchState MatchState = EMatchState::None;
+
+ IncrementalStringMatcher() {}
+
+ IncrementalStringMatcher(std::string&& InMatchString) : MatchString(std::move(InMatchString))
+ {
+ RawMatchString = MatchString.data();
+ }
+
+ void Init(std::string&& InMatchString)
+ {
+ MatchString = std::move(InMatchString);
+ RawMatchString = MatchString.data();
+ }
+
+ inline void Reset()
+ {
+ MatchLength = 0;
+ MatchStartOffset = 0;
+ MatchState = EMatchState::None;
+ }
+
+ inline uint64_t GetMatchEndOffset() const
+ {
+ if (MatchState == EMatchState::Complete)
+ {
+ return MatchStartOffset + MatchString.length();
+ }
+ return 0;
+ }
+
+ inline uint64_t GetMatchStartOffset() const
+ {
+ ZEN_ASSERT(MatchState == EMatchState::Complete);
+ return MatchStartOffset;
+ }
+
+ void Match(uint64_t Offset, char C)
+ {
+ ZEN_ASSERT_SLOW(RawMatchString != nullptr);
+
+ if (MatchState == EMatchState::Complete)
+ {
+ Reset();
+ }
+ if (C == RawMatchString[MatchLength])
+ {
+ if (MatchLength == 0)
+ {
+ MatchStartOffset = Offset;
+ }
+ MatchLength++;
+ if (MatchLength == MatchString.length())
+ {
+ MatchState = EMatchState::Complete;
+ }
+ else
+ {
+ MatchState = EMatchState::Partial;
+ }
+ }
+ else if (MatchLength != 0)
+ {
+ Reset();
+ Match(Offset, C);
+ }
+ else
+ {
+ Reset();
+ }
+ }
+ inline const std::string& GetMatchString() const { return MatchString; }
+
+ private:
+ std::string MatchString;
+ const char* RawMatchString = nullptr;
+ uint64_t MatchLength = 0;
+
+ uint64_t MatchStartOffset = 0;
+ };
+
+ class MultipartBoundaryParser
+ {
+ public:
+ std::vector<HttpClient::Response::MultipartBoundary> Boundaries;
+
+ MultipartBoundaryParser();
+ bool Init(const std::string_view ContentTypeHeaderValue);
+ void ParseInput(std::string_view data);
+
+ private:
+ IncrementalStringMatcher BoundaryBeginMatcher;
+ IncrementalStringMatcher BoundaryEndMatcher;
+ IncrementalStringMatcher HeaderEndMatcher;
+
+ ExtendableStringBuilder<64> BoundaryHeader;
+ uint64_t PayloadOffset = 0;
+ };
+
+ std::pair<std::string_view, std::string_view> GetHeaderKeyAndValue(std::string_view HeaderString);
+ std::pair<uint64_t, uint64_t> ParseContentRange(std::string_view Value);
+
} // namespace detail
} // namespace zen
diff --git a/src/zenhttp/clients/httpclientcpr.cpp b/src/zenhttp/clients/httpclientcpr.cpp
index 5d92b3b6b..14e40b02a 100644
--- a/src/zenhttp/clients/httpclientcpr.cpp
+++ b/src/zenhttp/clients/httpclientcpr.cpp
@@ -12,6 +12,7 @@
#include <zencore/session.h>
#include <zencore/stream.h>
#include <zenhttp/packageformat.h>
+#include <algorithm>
namespace zen {
@@ -23,6 +24,21 @@ CreateCprHttpClient(std::string_view BaseUri, const HttpClientSettings& Connecti
static std::atomic<uint32_t> HttpClientRequestIdCounter{0};
+bool
+HttpClient::ErrorContext::IsConnectionError() const
+{
+ switch (static_cast<cpr::ErrorCode>(ErrorCode))
+ {
+ case cpr::ErrorCode::CONNECTION_FAILURE:
+ case cpr::ErrorCode::OPERATION_TIMEDOUT:
+ case cpr::ErrorCode::HOST_RESOLUTION_FAILURE:
+ case cpr::ErrorCode::PROXY_RESOLUTION_FAILURE:
+ return true;
+ default:
+ return false;
+ }
+}
+
// If we want to support different HTTP client implementations then we'll need to make this more abstract
HttpClientError::ResponseClass
@@ -149,6 +165,18 @@ CprHttpClient::CprHttpClient(std::string_view BaseUri,
{
}
+bool
+CprHttpClient::ShouldLogErrorCode(HttpResponseCode ResponseCode) const
+{
+ if (m_CheckIfAbortFunction && m_CheckIfAbortFunction())
+ {
+ // Quiet
+ return false;
+ }
+ const auto& Expected = m_ConnectionSettings.ExpectedErrorCodes;
+ return std::find(Expected.begin(), Expected.end(), ResponseCode) == Expected.end();
+}
+
CprHttpClient::~CprHttpClient()
{
ZEN_TRACE_CPU("CprHttpClient::~CprHttpClient");
@@ -162,10 +190,11 @@ CprHttpClient::~CprHttpClient()
}
HttpClient::Response
-CprHttpClient::ResponseWithPayload(std::string_view SessionId,
- cpr::Response&& HttpResponse,
- const HttpResponseCode WorkResponseCode,
- IoBuffer&& Payload)
+CprHttpClient::ResponseWithPayload(std::string_view SessionId,
+ cpr::Response&& HttpResponse,
+ const HttpResponseCode WorkResponseCode,
+ IoBuffer&& Payload,
+ std::vector<HttpClient::Response::MultipartBoundary>&& BoundaryPositions)
{
// This ends up doing a memcpy, would be good to get rid of it by streaming results
// into buffer directly
@@ -174,30 +203,37 @@ CprHttpClient::ResponseWithPayload(std::string_view SessionId,
if (auto It = HttpResponse.header.find("Content-Type"); It != HttpResponse.header.end())
{
const HttpContentType ContentType = ParseContentType(It->second);
-
ResponseBuffer.SetContentType(ContentType);
}
- const bool Quiet = m_CheckIfAbortFunction && m_CheckIfAbortFunction();
-
- if (!Quiet)
+ if (!IsHttpSuccessCode(WorkResponseCode) && WorkResponseCode != HttpResponseCode::NotFound)
{
- if (!IsHttpSuccessCode(WorkResponseCode) && WorkResponseCode != HttpResponseCode::NotFound)
+ if (ShouldLogErrorCode(WorkResponseCode))
{
ZEN_WARN("HttpClient request failed (session: {}): {}", SessionId, HttpResponse);
}
}
+ std::sort(BoundaryPositions.begin(),
+ BoundaryPositions.end(),
+ [](const HttpClient::Response::MultipartBoundary& Lhs, const HttpClient::Response::MultipartBoundary& Rhs) {
+ return Lhs.RangeOffset < Rhs.RangeOffset;
+ });
+
return HttpClient::Response{.StatusCode = WorkResponseCode,
.ResponsePayload = std::move(ResponseBuffer),
.Header = HttpClient::KeyValueMap(HttpResponse.header.begin(), HttpResponse.header.end()),
.UploadedBytes = gsl::narrow<int64_t>(HttpResponse.uploaded_bytes),
.DownloadedBytes = gsl::narrow<int64_t>(HttpResponse.downloaded_bytes),
- .ElapsedSeconds = HttpResponse.elapsed};
+ .ElapsedSeconds = HttpResponse.elapsed,
+ .Ranges = std::move(BoundaryPositions)};
}
HttpClient::Response
-CprHttpClient::CommonResponse(std::string_view SessionId, cpr::Response&& HttpResponse, IoBuffer&& Payload)
+CprHttpClient::CommonResponse(std::string_view SessionId,
+ cpr::Response&& HttpResponse,
+ IoBuffer&& Payload,
+ std::vector<HttpClient::Response::MultipartBoundary>&& BoundaryPositions)
{
const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code);
if (HttpResponse.error)
@@ -235,7 +271,7 @@ CprHttpClient::CommonResponse(std::string_view SessionId, cpr::Response&& HttpRe
}
else
{
- return ResponseWithPayload(SessionId, std::move(HttpResponse), WorkResponseCode, std::move(Payload));
+ return ResponseWithPayload(SessionId, std::move(HttpResponse), WorkResponseCode, std::move(Payload), std::move(BoundaryPositions));
}
}
@@ -346,8 +382,7 @@ CprHttpClient::DoWithRetry(std::string_view SessionId,
}
Sleep(100 * (Attempt + 1));
Attempt++;
- const bool Quiet = m_CheckIfAbortFunction && m_CheckIfAbortFunction();
- if (!Quiet)
+ if (ShouldLogErrorCode(HttpResponseCode(Result.status_code)))
{
ZEN_INFO("{} Attempt {}/{}",
CommonResponse(SessionId, std::move(Result), {}).ErrorMessage("Retry"),
@@ -385,8 +420,7 @@ CprHttpClient::DoWithRetry(std::string_view SessionId,
}
Sleep(100 * (Attempt + 1));
Attempt++;
- const bool Quiet = m_CheckIfAbortFunction && m_CheckIfAbortFunction();
- if (!Quiet)
+ if (ShouldLogErrorCode(HttpResponseCode(Result.status_code)))
{
ZEN_INFO("{} Attempt {}/{}",
CommonResponse(SessionId, std::move(Result), {}).ErrorMessage("Retry"),
@@ -621,7 +655,7 @@ CprHttpClient::TransactPackage(std::string_view Url, CbPackage Package, const Ke
ResponseBuffer.SetContentType(ContentType);
}
- return {.StatusCode = HttpResponseCode(FilterResponse.status_code), .ResponsePayload = ResponseBuffer};
+ return {.StatusCode = HttpResponseCode(FilterResponse.status_code), .ResponsePayload = std::move(ResponseBuffer)};
}
//////////////////////////////////////////////////////////////////////////
@@ -896,236 +930,287 @@ CprHttpClient::Download(std::string_view Url, const std::filesystem::path& TempF
std::string PayloadString;
std::unique_ptr<detail::TempPayloadFile> PayloadFile;
- cpr::Response Response = DoWithRetry(
- m_SessionId,
- [&]() {
- auto GetHeader = [&](std::string header) -> std::pair<std::string, std::string> {
- size_t DelimiterPos = header.find(':');
- if (DelimiterPos != std::string::npos)
- {
- std::string Key = header.substr(0, DelimiterPos);
- constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n");
- Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters);
- Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters);
-
- std::string Value = header.substr(DelimiterPos + 1);
- Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters);
- Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters);
-
- return std::make_pair(Key, Value);
- }
- return std::make_pair(header, "");
- };
-
- auto DownloadCallback = [&](std::string data, intptr_t) {
- if (m_CheckIfAbortFunction && m_CheckIfAbortFunction())
- {
- return false;
- }
- if (PayloadFile)
- {
- ZEN_ASSERT(PayloadString.empty());
- std::error_code Ec = PayloadFile->Write(data);
- if (Ec)
- {
- ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}",
- TempFolderPath.string(),
- Ec.message());
- return false;
- }
- }
- else
- {
- PayloadString.append(data);
- }
- return true;
- };
-
- uint64_t RequestedContentLength = (uint64_t)-1;
- if (auto RangeIt = AdditionalHeader.Entries.find("Range"); RangeIt != AdditionalHeader.Entries.end())
- {
- if (RangeIt->second.starts_with("bytes"))
- {
- size_t RangeStartPos = RangeIt->second.find('=', 5);
- if (RangeStartPos != std::string::npos)
- {
- RangeStartPos++;
- size_t RangeSplitPos = RangeIt->second.find('-', RangeStartPos);
- if (RangeSplitPos != std::string::npos)
- {
- std::optional<size_t> RequestedRangeStart =
- ParseInt<size_t>(RangeIt->second.substr(RangeStartPos, RangeSplitPos - RangeStartPos));
- std::optional<size_t> RequestedRangeEnd = ParseInt<size_t>(RangeIt->second.substr(RangeStartPos + 1));
- if (RequestedRangeStart.has_value() && RequestedRangeEnd.has_value())
- {
- RequestedContentLength = RequestedRangeEnd.value() - 1;
- }
- }
- }
- }
- }
-
- cpr::Response Response;
- {
- std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
- auto HeaderCallback = [&](std::string header, intptr_t) {
- std::pair<std::string, std::string> Header = GetHeader(header);
- if (Header.first == "Content-Length"sv)
- {
- std::optional<size_t> ContentLength = ParseInt<size_t>(Header.second);
- if (ContentLength.has_value())
- {
- if (ContentLength.value() > m_ConnectionSettings.MaximumInMemoryDownloadSize)
- {
- PayloadFile = std::make_unique<detail::TempPayloadFile>();
- std::error_code Ec = PayloadFile->Open(TempFolderPath, ContentLength.value());
- if (Ec)
- {
- ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}",
- TempFolderPath.string(),
- Ec.message());
- PayloadFile.reset();
- }
- }
- else
- {
- PayloadString.reserve(ContentLength.value());
- }
- }
- }
- if (!Header.first.empty())
- {
- ReceivedHeaders.emplace_back(std::move(Header));
- }
- return 1;
- };
-
- Session Sess = AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
- for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
- {
- Response.header.insert_or_assign(H.first, H.second);
- }
- }
- if (m_ConnectionSettings.AllowResume)
- {
- auto SupportsRanges = [](const cpr::Response& Response) -> bool {
- if (Response.header.find("Content-Range") != Response.header.end())
- {
- return true;
- }
- if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end())
- {
- return It->second == "bytes"sv;
- }
- return false;
- };
-
- auto ShouldResume = [&SupportsRanges](const cpr::Response& Response) -> bool {
- if (ShouldRetry(Response))
- {
- return SupportsRanges(Response);
- }
- return false;
- };
-
- if (ShouldResume(Response))
- {
- auto It = Response.header.find("Content-Length");
- if (It != Response.header.end())
- {
- uint64_t ContentLength = RequestedContentLength;
- if (ContentLength == uint64_t(-1))
- {
- if (auto ParsedContentLength = ParseInt<int64_t>(It->second); ParsedContentLength.has_value())
- {
- ContentLength = ParsedContentLength.value();
- }
- }
-
- std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
-
- auto HeaderCallback = [&](std::string header, intptr_t) {
- std::pair<std::string, std::string> Header = GetHeader(header);
- if (!Header.first.empty())
- {
- ReceivedHeaders.emplace_back(std::move(Header));
- }
-
- if (Header.first == "Content-Range"sv)
- {
- if (Header.second.starts_with("bytes "sv))
- {
- size_t RangeStartEnd = Header.second.find('-', 6);
- if (RangeStartEnd != std::string::npos)
- {
- const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6));
- if (Start)
- {
- uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
- if (Start.value() == DownloadedSize)
- {
- return 1;
- }
- else if (Start.value() > DownloadedSize)
- {
- return 0;
- }
- if (PayloadFile)
- {
- PayloadFile->ResetWritePos(Start.value());
- }
- else
- {
- PayloadString = PayloadString.substr(0, Start.value());
- }
- return 1;
- }
- }
- }
- return 0;
- }
- return 1;
- };
-
- KeyValueMap HeadersWithRange(AdditionalHeader);
- do
- {
- uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
-
- std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength - 1);
- if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end())
- {
- if (RangeIt->second == Range)
- {
- // If we didn't make any progress, abort
- break;
- }
- }
- HeadersWithRange.Entries.insert_or_assign("Range", Range);
-
- Session Sess =
- AllocSession(m_BaseUri, Url, m_ConnectionSettings, HeadersWithRange, {}, m_SessionId, GetAccessToken());
- Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
- for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
- {
- Response.header.insert_or_assign(H.first, H.second);
- }
- ReceivedHeaders.clear();
- } while (ShouldResume(Response));
- }
- }
- }
-
- if (!PayloadString.empty())
- {
- Response.text = std::move(PayloadString);
- }
- return Response;
- },
- PayloadFile);
-
- return CommonResponse(m_SessionId, std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{});
+
+ HttpContentType ContentType = HttpContentType::kUnknownContentType;
+ detail::MultipartBoundaryParser BoundaryParser;
+ bool IsMultiRangeResponse = false;
+
+ cpr::Response Response = DoWithRetry(
+ m_SessionId,
+ [&]() {
+ // Reset state from any previous attempt
+ PayloadString.clear();
+ PayloadFile.reset();
+ BoundaryParser.Boundaries.clear();
+ ContentType = HttpContentType::kUnknownContentType;
+ IsMultiRangeResponse = false;
+
+ auto DownloadCallback = [&](std::string data, intptr_t) {
+ if (m_CheckIfAbortFunction && m_CheckIfAbortFunction())
+ {
+ return false;
+ }
+
+ if (IsMultiRangeResponse)
+ {
+ BoundaryParser.ParseInput(data);
+ }
+
+ if (PayloadFile)
+ {
+ ZEN_ASSERT(PayloadString.empty());
+ std::error_code Ec = PayloadFile->Write(data);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}",
+ TempFolderPath.string(),
+ Ec.message());
+ return false;
+ }
+ }
+ else
+ {
+ PayloadString.append(data);
+ }
+ return true;
+ };
+
+ uint64_t RequestedContentLength = (uint64_t)-1;
+ if (auto RangeIt = AdditionalHeader.Entries.find("Range"); RangeIt != AdditionalHeader.Entries.end())
+ {
+ if (RangeIt->second.starts_with("bytes"))
+ {
+ std::string_view RangeValue(RangeIt->second);
+ size_t RangeStartPos = RangeValue.find('=', 5);
+ if (RangeStartPos != std::string::npos)
+ {
+ RangeStartPos++;
+ while (RangeStartPos < RangeValue.length() && RangeValue[RangeStartPos] == ' ')
+ {
+ RangeStartPos++;
+ }
+ RequestedContentLength = 0;
+
+ while (RangeStartPos < RangeValue.length())
+ {
+ size_t RangeEnd = RangeValue.find_first_of(", \r\n", RangeStartPos);
+ if (RangeEnd == std::string::npos)
+ {
+ RangeEnd = RangeValue.length();
+ }
+
+ std::string_view RangeString = RangeValue.substr(RangeStartPos, RangeEnd - RangeStartPos);
+ size_t RangeSplitPos = RangeString.find('-');
+ if (RangeSplitPos != std::string::npos)
+ {
+ std::optional<size_t> RequestedRangeStart = ParseInt<size_t>(RangeString.substr(0, RangeSplitPos));
+ std::optional<size_t> RequestedRangeEnd = ParseInt<size_t>(RangeString.substr(RangeSplitPos + 1));
+ if (RequestedRangeStart.has_value() && RequestedRangeEnd.has_value())
+ {
+ RequestedContentLength += RequestedRangeEnd.value() - RequestedRangeStart.value() + 1;
+ }
+ }
+ RangeStartPos = RangeEnd;
+ while (RangeStartPos != RangeValue.length() &&
+ (RangeValue[RangeStartPos] == ',' || RangeValue[RangeStartPos] == ' '))
+ {
+ RangeStartPos++;
+ }
+ }
+ }
+ }
+ }
+
+ cpr::Response Response;
+ {
+ std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
+ auto HeaderCallback = [&](std::string header, intptr_t) {
+ const std::pair<std::string_view, std::string_view> Header = detail::GetHeaderKeyAndValue(header);
+ if (Header.first == "Content-Length"sv)
+ {
+ std::optional<size_t> ContentLength = ParseInt<size_t>(Header.second);
+ if (ContentLength.has_value())
+ {
+ if (ContentLength.value() > m_ConnectionSettings.MaximumInMemoryDownloadSize)
+ {
+ PayloadFile = std::make_unique<detail::TempPayloadFile>();
+ std::error_code Ec = PayloadFile->Open(TempFolderPath, ContentLength.value());
+ if (Ec)
+ {
+ ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}",
+ TempFolderPath.string(),
+ Ec.message());
+ PayloadFile.reset();
+ }
+ }
+ else
+ {
+ PayloadString.reserve(ContentLength.value());
+ }
+ }
+ }
+ else if (Header.first == "Content-Type")
+ {
+ IsMultiRangeResponse = BoundaryParser.Init(Header.second);
+ if (!IsMultiRangeResponse)
+ {
+ ContentType = ParseContentType(Header.second);
+ }
+ }
+ else if (Header.first == "Content-Range")
+ {
+ if (!IsMultiRangeResponse)
+ {
+ std::pair<uint64_t, uint64_t> Range = detail::ParseContentRange(Header.second);
+ if (Range.second != 0)
+ {
+ BoundaryParser.Boundaries.push_back(HttpClient::Response::MultipartBoundary{.OffsetInPayload = 0,
+ .RangeOffset = Range.first,
+ .RangeLength = Range.second,
+ .ContentType = ContentType});
+ }
+ }
+ }
+ if (!Header.first.empty())
+ {
+ ReceivedHeaders.emplace_back(std::move(Header));
+ }
+ return 1;
+ };
+
+ Session Sess = AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
+ for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
+ {
+ Response.header.insert_or_assign(H.first, H.second);
+ }
+ }
+ if (m_ConnectionSettings.AllowResume)
+ {
+ auto SupportsRanges = [](const cpr::Response& Response) -> bool {
+ if (Response.header.find("Content-Range") != Response.header.end())
+ {
+ return true;
+ }
+ if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end())
+ {
+ return It->second == "bytes"sv;
+ }
+ return false;
+ };
+
+ auto ShouldResume = [&SupportsRanges, &IsMultiRangeResponse](const cpr::Response& Response) -> bool {
+ if (IsMultiRangeResponse)
+ {
+ return false;
+ }
+ if (ShouldRetry(Response))
+ {
+ return SupportsRanges(Response);
+ }
+ return false;
+ };
+
+ if (ShouldResume(Response))
+ {
+ auto It = Response.header.find("Content-Length");
+ if (It != Response.header.end())
+ {
+ uint64_t ContentLength = RequestedContentLength;
+ if (ContentLength == uint64_t(-1))
+ {
+ if (auto ParsedContentLength = ParseInt<int64_t>(It->second); ParsedContentLength.has_value())
+ {
+ ContentLength = ParsedContentLength.value();
+ }
+ }
+
+ std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
+
+ auto HeaderCallback = [&](std::string header, intptr_t) {
+ const std::pair<std::string_view, std::string_view> Header = detail::GetHeaderKeyAndValue(header);
+ if (!Header.first.empty())
+ {
+ ReceivedHeaders.emplace_back(std::move(Header));
+ }
+
+ if (Header.first == "Content-Range"sv)
+ {
+ if (Header.second.starts_with("bytes "sv))
+ {
+ size_t RangeStartEnd = Header.second.find('-', 6);
+ if (RangeStartEnd != std::string::npos)
+ {
+ const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6));
+ if (Start)
+ {
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+ if (Start.value() == DownloadedSize)
+ {
+ return 1;
+ }
+ else if (Start.value() > DownloadedSize)
+ {
+ return 0;
+ }
+ if (PayloadFile)
+ {
+ PayloadFile->ResetWritePos(Start.value());
+ }
+ else
+ {
+ PayloadString = PayloadString.substr(0, Start.value());
+ }
+ return 1;
+ }
+ }
+ }
+ return 0;
+ }
+ return 1;
+ };
+
+ KeyValueMap HeadersWithRange(AdditionalHeader);
+ do
+ {
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+
+ std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength - 1);
+ if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end())
+ {
+ if (RangeIt->second == Range)
+ {
+ // If we didn't make any progress, abort
+ break;
+ }
+ }
+ HeadersWithRange.Entries.insert_or_assign("Range", Range);
+
+ Session Sess =
+ AllocSession(m_BaseUri, Url, m_ConnectionSettings, HeadersWithRange, {}, m_SessionId, GetAccessToken());
+ Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
+ for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
+ {
+ Response.header.insert_or_assign(H.first, H.second);
+ }
+ ReceivedHeaders.clear();
+ } while (ShouldResume(Response));
+ }
+ }
+ }
+
+ if (!PayloadString.empty())
+ {
+ Response.text = std::move(PayloadString);
+ }
+ return Response;
+ },
+ PayloadFile);
+
+ return CommonResponse(m_SessionId,
+ std::move(Response),
+ PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{},
+ std::move(BoundaryParser.Boundaries));
}
} // namespace zen
diff --git a/src/zenhttp/clients/httpclientcpr.h b/src/zenhttp/clients/httpclientcpr.h
index 40af53b5d..752d91add 100644
--- a/src/zenhttp/clients/httpclientcpr.h
+++ b/src/zenhttp/clients/httpclientcpr.h
@@ -155,14 +155,19 @@ private:
std::function<cpr::Response()>&& Func,
std::function<bool(cpr::Response& Result)>&& Validate = [](cpr::Response&) { return true; });
+ bool ShouldLogErrorCode(HttpResponseCode ResponseCode) const;
bool ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile>& PayloadFile);
- HttpClient::Response CommonResponse(std::string_view SessionId, cpr::Response&& HttpResponse, IoBuffer&& Payload);
+ HttpClient::Response CommonResponse(std::string_view SessionId,
+ cpr::Response&& HttpResponse,
+ IoBuffer&& Payload,
+ std::vector<HttpClient::Response::MultipartBoundary>&& BoundaryPositions = {});
- HttpClient::Response ResponseWithPayload(std::string_view SessionId,
- cpr::Response&& HttpResponse,
- const HttpResponseCode WorkResponseCode,
- IoBuffer&& Payload);
+ HttpClient::Response ResponseWithPayload(std::string_view SessionId,
+ cpr::Response&& HttpResponse,
+ const HttpResponseCode WorkResponseCode,
+ IoBuffer&& Payload,
+ std::vector<HttpClient::Response::MultipartBoundary>&& BoundaryPositions);
};
} // namespace zen
diff --git a/src/zenhttp/clients/httpwsclient.cpp b/src/zenhttp/clients/httpwsclient.cpp
new file mode 100644
index 000000000..9497dadb8
--- /dev/null
+++ b/src/zenhttp/clients/httpwsclient.cpp
@@ -0,0 +1,566 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenhttp/httpwsclient.h>
+
+#include "../servers/wsframecodec.h"
+
+#include <zencore/base64.h>
+#include <zencore/logging.h>
+#include <zencore/string.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <deque>
+#include <random>
+#include <thread>
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+
+struct HttpWsClient::Impl
+{
+ Impl(std::string_view Url, IWsClientHandler& Handler, const HttpWsClientSettings& Settings)
+ : m_Handler(Handler)
+ , m_Settings(Settings)
+ , m_Log(logging::Get(Settings.LogCategory))
+ , m_OwnedIoContext(std::make_unique<asio::io_context>())
+ , m_IoContext(*m_OwnedIoContext)
+ {
+ ParseUrl(Url);
+ }
+
+ Impl(std::string_view Url, IWsClientHandler& Handler, asio::io_context& IoContext, const HttpWsClientSettings& Settings)
+ : m_Handler(Handler)
+ , m_Settings(Settings)
+ , m_Log(logging::Get(Settings.LogCategory))
+ , m_IoContext(IoContext)
+ {
+ ParseUrl(Url);
+ }
+
+ ~Impl()
+ {
+ // Release work guard so io_context::run() can return
+ m_WorkGuard.reset();
+
+ // Close the socket to cancel pending async ops
+ if (m_Socket)
+ {
+ asio::error_code Ec;
+ m_Socket->close(Ec);
+ }
+
+ if (m_IoThread.joinable())
+ {
+ m_IoThread.join();
+ }
+ }
+
+ void ParseUrl(std::string_view Url)
+ {
+ // Expected format: ws://host:port/path
+ if (Url.substr(0, 5) == "ws://")
+ {
+ Url.remove_prefix(5);
+ }
+
+ auto SlashPos = Url.find('/');
+ std::string_view HostPort;
+ if (SlashPos != std::string_view::npos)
+ {
+ HostPort = Url.substr(0, SlashPos);
+ m_Path = std::string(Url.substr(SlashPos));
+ }
+ else
+ {
+ HostPort = Url;
+ m_Path = "/";
+ }
+
+ auto ColonPos = HostPort.find(':');
+ if (ColonPos != std::string_view::npos)
+ {
+ m_Host = std::string(HostPort.substr(0, ColonPos));
+ m_Port = std::string(HostPort.substr(ColonPos + 1));
+ }
+ else
+ {
+ m_Host = std::string(HostPort);
+ m_Port = "80";
+ }
+ }
+
+ void Connect()
+ {
+ if (m_OwnedIoContext)
+ {
+ m_WorkGuard = std::make_unique<asio::io_context::work>(m_IoContext);
+ m_IoThread = std::thread([this] { m_IoContext.run(); });
+ }
+
+ asio::post(m_IoContext, [this] { DoResolve(); });
+ }
+
+ void DoResolve()
+ {
+ m_Resolver = std::make_unique<asio::ip::tcp::resolver>(m_IoContext);
+
+ m_Resolver->async_resolve(m_Host, m_Port, [this](const asio::error_code& Ec, asio::ip::tcp::resolver::results_type Results) {
+ if (Ec)
+ {
+ ZEN_LOG_DEBUG(m_Log, "WebSocket resolve failed for {}:{}: {}", m_Host, m_Port, Ec.message());
+ m_Handler.OnWsClose(1006, "resolve failed");
+ return;
+ }
+
+ DoConnect(Results);
+ });
+ }
+
+ void DoConnect(const asio::ip::tcp::resolver::results_type& Endpoints)
+ {
+ m_Socket = std::make_unique<asio::ip::tcp::socket>(m_IoContext);
+
+ // Start connect timeout timer
+ m_Timer = std::make_unique<asio::steady_timer>(m_IoContext, m_Settings.ConnectTimeout);
+ m_Timer->async_wait([this](const asio::error_code& Ec) {
+ if (!Ec && !m_IsOpen.load(std::memory_order_relaxed))
+ {
+ ZEN_LOG_DEBUG(m_Log, "WebSocket connect timeout for {}:{}", m_Host, m_Port);
+ if (m_Socket)
+ {
+ asio::error_code CloseEc;
+ m_Socket->close(CloseEc);
+ }
+ }
+ });
+
+ asio::async_connect(*m_Socket, Endpoints, [this](const asio::error_code& Ec, const asio::ip::tcp::endpoint&) {
+ if (Ec)
+ {
+ m_Timer->cancel();
+ ZEN_LOG_DEBUG(m_Log, "WebSocket connect failed for {}:{}: {}", m_Host, m_Port, Ec.message());
+ m_Handler.OnWsClose(1006, "connect failed");
+ return;
+ }
+
+ DoHandshake();
+ });
+ }
+
+ void DoHandshake()
+ {
+ // Generate random Sec-WebSocket-Key (16 random bytes, base64 encoded)
+ uint8_t KeyBytes[16];
+ {
+ static thread_local std::mt19937 s_Rng(std::random_device{}());
+ for (int i = 0; i < 4; ++i)
+ {
+ uint32_t Val = s_Rng();
+ std::memcpy(KeyBytes + i * 4, &Val, 4);
+ }
+ }
+
+ char KeyBase64[Base64::GetEncodedDataSize(16) + 1];
+ uint32_t KeyLen = Base64::Encode(KeyBytes, 16, KeyBase64);
+ KeyBase64[KeyLen] = '\0';
+ m_WebSocketKey = std::string(KeyBase64, KeyLen);
+
+ // Build the HTTP upgrade request
+ ExtendableStringBuilder<512> Request;
+ Request << "GET " << m_Path << " HTTP/1.1\r\n"
+ << "Host: " << m_Host << ":" << m_Port << "\r\n"
+ << "Upgrade: websocket\r\n"
+ << "Connection: Upgrade\r\n"
+ << "Sec-WebSocket-Key: " << m_WebSocketKey << "\r\n"
+ << "Sec-WebSocket-Version: 13\r\n";
+
+ // Add Authorization header if access token provider is set
+ if (m_Settings.AccessTokenProvider)
+ {
+ HttpClientAccessToken Token = (*m_Settings.AccessTokenProvider)();
+ if (Token.IsValid())
+ {
+ Request << "Authorization: Bearer " << Token.Value << "\r\n";
+ }
+ }
+
+ Request << "\r\n";
+
+ std::string_view ReqStr = Request.ToView();
+
+ m_HandshakeBuffer = std::make_shared<std::string>(ReqStr);
+
+ asio::async_write(*m_Socket,
+ asio::buffer(m_HandshakeBuffer->data(), m_HandshakeBuffer->size()),
+ [this](const asio::error_code& Ec, std::size_t) {
+ if (Ec)
+ {
+ m_Timer->cancel();
+ ZEN_LOG_DEBUG(m_Log, "WebSocket handshake write failed: {}", Ec.message());
+ m_Handler.OnWsClose(1006, "handshake write failed");
+ return;
+ }
+
+ DoReadHandshakeResponse();
+ });
+ }
+
+ void DoReadHandshakeResponse()
+ {
+ asio::async_read_until(*m_Socket, m_ReadBuffer, "\r\n\r\n", [this](const asio::error_code& Ec, std::size_t) {
+ m_Timer->cancel();
+
+ if (Ec)
+ {
+ ZEN_LOG_DEBUG(m_Log, "WebSocket handshake read failed: {}", Ec.message());
+ m_Handler.OnWsClose(1006, "handshake read failed");
+ return;
+ }
+
+ // Parse the response
+ const auto& Data = m_ReadBuffer.data();
+ std::string Response(asio::buffers_begin(Data), asio::buffers_end(Data));
+
+ // Consume the headers from the read buffer (any extra data stays for frame parsing)
+ auto HeaderEnd = Response.find("\r\n\r\n");
+ if (HeaderEnd != std::string::npos)
+ {
+ m_ReadBuffer.consume(HeaderEnd + 4);
+ }
+
+ // Validate 101 response
+ if (Response.find("101") == std::string::npos)
+ {
+ ZEN_LOG_DEBUG(m_Log, "WebSocket handshake rejected (no 101): {}", Response.substr(0, 80));
+ m_Handler.OnWsClose(1006, "handshake rejected");
+ return;
+ }
+
+ // Validate Sec-WebSocket-Accept
+ std::string ExpectedAccept = WsFrameCodec::ComputeAcceptKey(m_WebSocketKey);
+ if (Response.find(ExpectedAccept) == std::string::npos)
+ {
+ ZEN_LOG_DEBUG(m_Log, "WebSocket handshake: invalid Sec-WebSocket-Accept");
+ m_Handler.OnWsClose(1006, "invalid accept key");
+ return;
+ }
+
+ m_IsOpen.store(true);
+ m_Handler.OnWsOpen();
+ EnqueueRead();
+ });
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ //
+ // Read loop
+ //
+
+ void EnqueueRead()
+ {
+ if (!m_IsOpen.load(std::memory_order_relaxed))
+ {
+ return;
+ }
+
+ asio::async_read(*m_Socket, m_ReadBuffer, asio::transfer_at_least(1), [this](const asio::error_code& Ec, std::size_t) {
+ OnDataReceived(Ec);
+ });
+ }
+
+ void OnDataReceived(const asio::error_code& Ec)
+ {
+ if (Ec)
+ {
+ if (Ec != asio::error::eof && Ec != asio::error::operation_aborted)
+ {
+ ZEN_LOG_DEBUG(m_Log, "WebSocket read error: {}", Ec.message());
+ }
+
+ if (m_IsOpen.exchange(false))
+ {
+ m_Handler.OnWsClose(1006, "connection lost");
+ }
+ return;
+ }
+
+ ProcessReceivedData();
+
+ if (m_IsOpen.load(std::memory_order_relaxed))
+ {
+ EnqueueRead();
+ }
+ }
+
+ void ProcessReceivedData()
+ {
+ while (m_ReadBuffer.size() > 0)
+ {
+ const auto& InputBuffer = m_ReadBuffer.data();
+ const auto* RawData = static_cast<const uint8_t*>(InputBuffer.data());
+ const auto Size = InputBuffer.size();
+
+ WsFrameParseResult Frame = WsFrameCodec::TryParseFrame(RawData, Size);
+ if (!Frame.IsValid)
+ {
+ break;
+ }
+
+ m_ReadBuffer.consume(Frame.BytesConsumed);
+
+ switch (Frame.Opcode)
+ {
+ case WebSocketOpcode::kText:
+ case WebSocketOpcode::kBinary:
+ {
+ WebSocketMessage Msg;
+ Msg.Opcode = Frame.Opcode;
+ Msg.Payload = IoBuffer(IoBuffer::Clone, Frame.Payload.data(), Frame.Payload.size());
+ m_Handler.OnWsMessage(Msg);
+ break;
+ }
+
+ case WebSocketOpcode::kPing:
+ {
+ // Auto-respond with masked pong
+ std::vector<uint8_t> PongFrame = WsFrameCodec::BuildMaskedFrame(WebSocketOpcode::kPong, Frame.Payload);
+ EnqueueWrite(std::move(PongFrame));
+ break;
+ }
+
+ case WebSocketOpcode::kPong:
+ break;
+
+ case WebSocketOpcode::kClose:
+ {
+ uint16_t Code = 1000;
+ std::string_view Reason;
+
+ if (Frame.Payload.size() >= 2)
+ {
+ Code = (uint16_t(Frame.Payload[0]) << 8) | uint16_t(Frame.Payload[1]);
+ if (Frame.Payload.size() > 2)
+ {
+ Reason =
+ std::string_view(reinterpret_cast<const char*>(Frame.Payload.data() + 2), Frame.Payload.size() - 2);
+ }
+ }
+
+ // Echo masked close frame if we haven't sent one yet
+ if (!m_CloseSent.exchange(true))
+ {
+ std::vector<uint8_t> CloseFrame = WsFrameCodec::BuildMaskedCloseFrame(Code);
+ EnqueueWrite(std::move(CloseFrame));
+ }
+
+ m_IsOpen.store(false);
+ m_Handler.OnWsClose(Code, Reason);
+ return;
+ }
+
+ default:
+ ZEN_LOG_WARN(m_Log, "Unknown WebSocket opcode: {:#x}", static_cast<uint8_t>(Frame.Opcode));
+ break;
+ }
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ //
+ // Write queue
+ //
+
+ void EnqueueWrite(std::vector<uint8_t> Frame)
+ {
+ bool ShouldFlush = false;
+
+ m_WriteLock.WithExclusiveLock([&] {
+ m_WriteQueue.push_back(std::move(Frame));
+ if (!m_IsWriting)
+ {
+ m_IsWriting = true;
+ ShouldFlush = true;
+ }
+ });
+
+ if (ShouldFlush)
+ {
+ FlushWriteQueue();
+ }
+ }
+
+ void FlushWriteQueue()
+ {
+ std::vector<uint8_t> Frame;
+
+ m_WriteLock.WithExclusiveLock([&] {
+ if (m_WriteQueue.empty())
+ {
+ m_IsWriting = false;
+ return;
+ }
+ Frame = std::move(m_WriteQueue.front());
+ m_WriteQueue.pop_front();
+ });
+
+ if (Frame.empty())
+ {
+ return;
+ }
+
+ auto OwnedFrame = std::make_shared<std::vector<uint8_t>>(std::move(Frame));
+
+ asio::async_write(*m_Socket,
+ asio::buffer(OwnedFrame->data(), OwnedFrame->size()),
+ [this, OwnedFrame](const asio::error_code& Ec, std::size_t) { OnWriteComplete(Ec); });
+ }
+
+ void OnWriteComplete(const asio::error_code& Ec)
+ {
+ if (Ec)
+ {
+ if (Ec != asio::error::operation_aborted)
+ {
+ ZEN_LOG_DEBUG(m_Log, "WebSocket write error: {}", Ec.message());
+ }
+
+ m_WriteLock.WithExclusiveLock([&] {
+ m_IsWriting = false;
+ m_WriteQueue.clear();
+ });
+
+ if (m_IsOpen.exchange(false))
+ {
+ m_Handler.OnWsClose(1006, "write error");
+ }
+ return;
+ }
+
+ FlushWriteQueue();
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ //
+ // Public operations
+ //
+
+ void SendText(std::string_view Text)
+ {
+ if (!m_IsOpen.load(std::memory_order_relaxed))
+ {
+ return;
+ }
+
+ std::span<const uint8_t> Payload(reinterpret_cast<const uint8_t*>(Text.data()), Text.size());
+ std::vector<uint8_t> Frame = WsFrameCodec::BuildMaskedFrame(WebSocketOpcode::kText, Payload);
+ EnqueueWrite(std::move(Frame));
+ }
+
+ void SendBinary(std::span<const uint8_t> Data)
+ {
+ if (!m_IsOpen.load(std::memory_order_relaxed))
+ {
+ return;
+ }
+
+ std::vector<uint8_t> Frame = WsFrameCodec::BuildMaskedFrame(WebSocketOpcode::kBinary, Data);
+ EnqueueWrite(std::move(Frame));
+ }
+
+ void DoClose(uint16_t Code, std::string_view Reason)
+ {
+ if (!m_IsOpen.exchange(false))
+ {
+ return;
+ }
+
+ if (!m_CloseSent.exchange(true))
+ {
+ std::vector<uint8_t> CloseFrame = WsFrameCodec::BuildMaskedCloseFrame(Code, Reason);
+ EnqueueWrite(std::move(CloseFrame));
+ }
+ }
+
+ IWsClientHandler& m_Handler;
+ HttpWsClientSettings m_Settings;
+ LoggerRef m_Log;
+
+ std::string m_Host;
+ std::string m_Port;
+ std::string m_Path;
+
+ // io_context: owned (standalone) or external (shared)
+ std::unique_ptr<asio::io_context> m_OwnedIoContext;
+ asio::io_context& m_IoContext;
+ std::unique_ptr<asio::io_context::work> m_WorkGuard;
+ std::thread m_IoThread;
+
+ // Connection state
+ std::unique_ptr<asio::ip::tcp::resolver> m_Resolver;
+ std::unique_ptr<asio::ip::tcp::socket> m_Socket;
+ std::unique_ptr<asio::steady_timer> m_Timer;
+ asio::streambuf m_ReadBuffer;
+ std::string m_WebSocketKey;
+ std::shared_ptr<std::string> m_HandshakeBuffer;
+
+ // Write queue
+ RwLock m_WriteLock;
+ std::deque<std::vector<uint8_t>> m_WriteQueue;
+ bool m_IsWriting = false;
+
+ std::atomic<bool> m_IsOpen{false};
+ std::atomic<bool> m_CloseSent{false};
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+HttpWsClient::HttpWsClient(std::string_view Url, IWsClientHandler& Handler, const HttpWsClientSettings& Settings)
+: m_Impl(std::make_unique<Impl>(Url, Handler, Settings))
+{
+}
+
+HttpWsClient::HttpWsClient(std::string_view Url,
+ IWsClientHandler& Handler,
+ asio::io_context& IoContext,
+ const HttpWsClientSettings& Settings)
+: m_Impl(std::make_unique<Impl>(Url, Handler, IoContext, Settings))
+{
+}
+
+HttpWsClient::~HttpWsClient() = default;
+
+void
+HttpWsClient::Connect()
+{
+ m_Impl->Connect();
+}
+
+void
+HttpWsClient::SendText(std::string_view Text)
+{
+ m_Impl->SendText(Text);
+}
+
+void
+HttpWsClient::SendBinary(std::span<const uint8_t> Data)
+{
+ m_Impl->SendBinary(Data);
+}
+
+void
+HttpWsClient::Close(uint16_t Code, std::string_view Reason)
+{
+ m_Impl->DoClose(Code, Reason);
+}
+
+bool
+HttpWsClient::IsOpen() const
+{
+ return m_Impl->m_IsOpen.load(std::memory_order_relaxed);
+}
+
+} // namespace zen