diff options
| author | Dan Engelbrecht <[email protected]> | 2026-05-05 14:59:21 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-05-05 14:59:21 +0200 |
| commit | 46f456ffd4d0717a035253ff9076ca6ee664e536 (patch) | |
| tree | 69d7a9a43b9874fd3990c43aa5ff4135c35d53d9 /src/zenutil/cloud/s3response.cpp | |
| parent | watchdog ephemeral port exhaust (#1022) (diff) | |
| download | archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.tar.xz archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.zip | |
hub async s3 client (#1024)
- Feature: `AsyncHttpClient` adds cancellable request tokens, streaming GET to a file (`AsyncDownload`), zero-copy chunk-callback GET (`AsyncStream`), pull-mode body source for streaming `AsyncPut`, retry layer mirroring the synchronous client, and a submit-side in-flight cap (`HttpClientSettings::MaxConcurrentRequests`) so hub-scale fanout against a single host cannot stall queued handles into curl's connect-timeout window
- Feature: Hub hydration can route S3 transfers through a non-blocking `AsyncHttpClient` (curl_multi + asio) backed by a single io thread; hydrate and dehydrate now pipeline requests instead of blocking worker threads
- `--hub-hydration-async-enabled` (Lua: `hub.hydration.async.enabled`, default true)
- `--hub-hydration-async-max-concurrent-requests` (Lua: `hub.hydration.async.maxconcurrentrequests`, default `clamp(cpu*4, 128, 512)`)
- Feature: Hub provision/deprovision/obliterate now run as two phases on separate worker pools so per-module hydration cannot starve child-process spawn/despawn (and vice versa)
- New `--hub-instance-spawn-threads` (Lua: `hub.instance.spawnthreads`, default `clamp(cpu/8, 4, 16)`) drives child-process spawn/despawn
- `--hub-instance-provision-threads` (Lua: `hub.instance.provisionthreads`) now drives per-module hydrate/dehydrate scheduling only; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)`
- `--hub-hydration-threads` (Lua: `hub.hydration.threads`) now controls per-file workers inside a single hydrate/dehydrate; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)`
- Feature: `AsyncHttpClient` owns its `asio::io_context` and one io thread by default; the `(BaseUri, io_context&)` constructor is preserved for callers that want to share an externally-driven `io_context` across clients (caller MUST keep the loop running until the client destructs)
- Feature: `Hub::Configuration` C++ struct fields renamed (`OptionalProvisionWorkerPool`/`OptionalHydrationWorkerPool` -> `OptionalProvisionPool`/`OptionalSpawnPool`/`OptionalHydrationPool`). Embedders constructing `Hub` directly must update field names; provision and spawn pools must both be set or both null (asserted at construction).
- Bugfix: `S3Client` signing-key cache no longer returns stale signatures after IMDS-rotated credentials change `AccessKeyId`; cache is now keyed on `(DateStamp, AccessKeyId)`
Diffstat (limited to 'src/zenutil/cloud/s3response.cpp')
| -rw-r--r-- | src/zenutil/cloud/s3response.cpp | 181 |
1 files changed, 181 insertions, 0 deletions
diff --git a/src/zenutil/cloud/s3response.cpp b/src/zenutil/cloud/s3response.cpp new file mode 100644 index 000000000..a9e7f0208 --- /dev/null +++ b/src/zenutil/cloud/s3response.cpp @@ -0,0 +1,181 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/cloud/s3response.h> + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> + +namespace zen { + +std::string_view +S3ExtractXmlValue(std::string_view Xml, std::string_view Tag) +{ + std::string OpenTag = fmt::format("<{}>", Tag); + std::string CloseTag = fmt::format("</{}>", Tag); + + size_t Start = Xml.find(OpenTag); + if (Start == std::string_view::npos) + { + return {}; + } + Start += OpenTag.size(); + + size_t End = Xml.find(CloseTag, Start); + if (End == std::string_view::npos) + { + return {}; + } + + return Xml.substr(Start, End - Start); +} + +void +S3DecodeXmlEntities(std::string_view Input, StringBuilderBase& Out) +{ + if (Input.find('&') == std::string_view::npos) + { + Out.Append(Input); + return; + } + + for (size_t i = 0; i < Input.size(); ++i) + { + if (Input[i] == '&') + { + std::string_view Remaining = Input.substr(i); + if (Remaining.starts_with("&")) + { + Out.Append('&'); + i += 4; + } + else if (Remaining.starts_with("<")) + { + Out.Append('<'); + i += 3; + } + else if (Remaining.starts_with(">")) + { + Out.Append('>'); + i += 3; + } + else if (Remaining.starts_with(""")) + { + Out.Append('"'); + i += 5; + } + else if (Remaining.starts_with("'")) + { + Out.Append('\''); + i += 5; + } + else + { + Out.Append(Input[i]); + } + } + else + { + Out.Append(Input[i]); + } + } +} + +std::string +S3DecodeXmlEntities(std::string_view Input) +{ + if (Input.find('&') == std::string_view::npos) + { + return std::string(Input); + } + ExtendableStringBuilder<256> Sb; + S3DecodeXmlEntities(Input, Sb); + return Sb.ToString(); +} + +std::string +S3BuildRequestPath(std::string_view Path, std::string_view CanonicalQS) +{ + if (CanonicalQS.empty()) + { + return std::string(Path); + } + return fmt::format("{}?{}", Path, CanonicalQS); +} + +const std::string* +S3FindResponseHeader(const HttpClient::KeyValueMap& Headers, std::string_view Name) +{ + for (const auto& [K, V] : *Headers) + { + if (StrCaseCompare(K, Name) == 0) + { + return &V; + } + } + return nullptr; +} + +bool +S3ExtractError(std::string_view Body, std::string_view& OutCode, std::string_view& OutMessage) +{ + if (Body.find("<Error>") == std::string_view::npos) + { + return false; + } + OutCode = S3ExtractXmlValue(Body, "Code"); + OutMessage = S3ExtractXmlValue(Body, "Message"); + // Treat malformed bodies (Error tag present but no parseable Code/Message) + // as a parse miss; callers format "<prefix>: <Code> - <Message>" and an + // empty render is indistinguishable from "no error". S3IsThrottled with + // empty ErrorCode + S3ErrorMessage's Response.ErrorMessage fallback path + // covers status-only triage. + return !OutCode.empty() || !OutMessage.empty(); +} + +bool +S3IsThrottled(const HttpClient::Response& Response, std::string_view ErrorCode) +{ + const int Status = static_cast<int>(Response.StatusCode); + if (Status == 503 || Status == 429) + { + return true; + } + if (ErrorCode == "SlowDown" || ErrorCode == "ServiceUnavailable" || ErrorCode == "ThrottlingException" || + ErrorCode == "RequestLimitExceeded" || ErrorCode == "TooManyRequests") + { + return true; + } + return false; +} + +std::string +S3ErrorMessage(std::string_view Prefix, const HttpClient::Response& Response) +{ + if (!Response.Error.has_value() && Response.ResponsePayload) + { + std::string_view Body(reinterpret_cast<const char*>(Response.ResponsePayload.GetData()), Response.ResponsePayload.GetSize()); + std::string_view Code; + std::string_view Message; + if (S3ExtractError(Body, Code, Message)) + { + ExtendableStringBuilder<256> Decoded; + S3DecodeXmlEntities(Message, Decoded); + if (S3IsThrottled(Response, Code)) + { + ZEN_WARN("S3 THROTTLED [{}] status={} code='{}' message='{}'", + Prefix, + static_cast<int>(Response.StatusCode), + Code, + Decoded.ToView()); + } + return fmt::format("{}: HTTP status ({}) {} - {}", Prefix, static_cast<int>(Response.StatusCode), Code, Decoded.ToView()); + } + } + if (S3IsThrottled(Response, {})) + { + ZEN_WARN("S3 THROTTLED [{}] status={} (no XML body)", Prefix, static_cast<int>(Response.StatusCode)); + } + return Response.ErrorMessage(Prefix); +} + +} // namespace zen |