aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/cloud/s3response.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-05-05 14:59:21 +0200
committerGitHub Enterprise <[email protected]>2026-05-05 14:59:21 +0200
commit46f456ffd4d0717a035253ff9076ca6ee664e536 (patch)
tree69d7a9a43b9874fd3990c43aa5ff4135c35d53d9 /src/zenutil/cloud/s3response.cpp
parentwatchdog ephemeral port exhaust (#1022) (diff)
downloadarchived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.tar.xz
archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.zip
hub async s3 client (#1024)
- Feature: `AsyncHttpClient` adds cancellable request tokens, streaming GET to a file (`AsyncDownload`), zero-copy chunk-callback GET (`AsyncStream`), pull-mode body source for streaming `AsyncPut`, retry layer mirroring the synchronous client, and a submit-side in-flight cap (`HttpClientSettings::MaxConcurrentRequests`) so hub-scale fanout against a single host cannot stall queued handles into curl's connect-timeout window - Feature: Hub hydration can route S3 transfers through a non-blocking `AsyncHttpClient` (curl_multi + asio) backed by a single io thread; hydrate and dehydrate now pipeline requests instead of blocking worker threads - `--hub-hydration-async-enabled` (Lua: `hub.hydration.async.enabled`, default true) - `--hub-hydration-async-max-concurrent-requests` (Lua: `hub.hydration.async.maxconcurrentrequests`, default `clamp(cpu*4, 128, 512)`) - Feature: Hub provision/deprovision/obliterate now run as two phases on separate worker pools so per-module hydration cannot starve child-process spawn/despawn (and vice versa) - New `--hub-instance-spawn-threads` (Lua: `hub.instance.spawnthreads`, default `clamp(cpu/8, 4, 16)`) drives child-process spawn/despawn - `--hub-instance-provision-threads` (Lua: `hub.instance.provisionthreads`) now drives per-module hydrate/dehydrate scheduling only; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)` - `--hub-hydration-threads` (Lua: `hub.hydration.threads`) now controls per-file workers inside a single hydrate/dehydrate; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)` - Feature: `AsyncHttpClient` owns its `asio::io_context` and one io thread by default; the `(BaseUri, io_context&)` constructor is preserved for callers that want to share an externally-driven `io_context` across clients (caller MUST keep the loop running until the client destructs) - Feature: `Hub::Configuration` C++ struct fields renamed (`OptionalProvisionWorkerPool`/`OptionalHydrationWorkerPool` -> `OptionalProvisionPool`/`OptionalSpawnPool`/`OptionalHydrationPool`). Embedders constructing `Hub` directly must update field names; provision and spawn pools must both be set or both null (asserted at construction). - Bugfix: `S3Client` signing-key cache no longer returns stale signatures after IMDS-rotated credentials change `AccessKeyId`; cache is now keyed on `(DateStamp, AccessKeyId)`
Diffstat (limited to 'src/zenutil/cloud/s3response.cpp')
-rw-r--r--src/zenutil/cloud/s3response.cpp181
1 files changed, 181 insertions, 0 deletions
diff --git a/src/zenutil/cloud/s3response.cpp b/src/zenutil/cloud/s3response.cpp
new file mode 100644
index 000000000..a9e7f0208
--- /dev/null
+++ b/src/zenutil/cloud/s3response.cpp
@@ -0,0 +1,181 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/cloud/s3response.h>
+
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+
+namespace zen {
+
+std::string_view
+S3ExtractXmlValue(std::string_view Xml, std::string_view Tag)
+{
+ std::string OpenTag = fmt::format("<{}>", Tag);
+ std::string CloseTag = fmt::format("</{}>", Tag);
+
+ size_t Start = Xml.find(OpenTag);
+ if (Start == std::string_view::npos)
+ {
+ return {};
+ }
+ Start += OpenTag.size();
+
+ size_t End = Xml.find(CloseTag, Start);
+ if (End == std::string_view::npos)
+ {
+ return {};
+ }
+
+ return Xml.substr(Start, End - Start);
+}
+
+void
+S3DecodeXmlEntities(std::string_view Input, StringBuilderBase& Out)
+{
+ if (Input.find('&') == std::string_view::npos)
+ {
+ Out.Append(Input);
+ return;
+ }
+
+ for (size_t i = 0; i < Input.size(); ++i)
+ {
+ if (Input[i] == '&')
+ {
+ std::string_view Remaining = Input.substr(i);
+ if (Remaining.starts_with("&amp;"))
+ {
+ Out.Append('&');
+ i += 4;
+ }
+ else if (Remaining.starts_with("&lt;"))
+ {
+ Out.Append('<');
+ i += 3;
+ }
+ else if (Remaining.starts_with("&gt;"))
+ {
+ Out.Append('>');
+ i += 3;
+ }
+ else if (Remaining.starts_with("&quot;"))
+ {
+ Out.Append('"');
+ i += 5;
+ }
+ else if (Remaining.starts_with("&apos;"))
+ {
+ Out.Append('\'');
+ i += 5;
+ }
+ else
+ {
+ Out.Append(Input[i]);
+ }
+ }
+ else
+ {
+ Out.Append(Input[i]);
+ }
+ }
+}
+
+std::string
+S3DecodeXmlEntities(std::string_view Input)
+{
+ if (Input.find('&') == std::string_view::npos)
+ {
+ return std::string(Input);
+ }
+ ExtendableStringBuilder<256> Sb;
+ S3DecodeXmlEntities(Input, Sb);
+ return Sb.ToString();
+}
+
+std::string
+S3BuildRequestPath(std::string_view Path, std::string_view CanonicalQS)
+{
+ if (CanonicalQS.empty())
+ {
+ return std::string(Path);
+ }
+ return fmt::format("{}?{}", Path, CanonicalQS);
+}
+
+const std::string*
+S3FindResponseHeader(const HttpClient::KeyValueMap& Headers, std::string_view Name)
+{
+ for (const auto& [K, V] : *Headers)
+ {
+ if (StrCaseCompare(K, Name) == 0)
+ {
+ return &V;
+ }
+ }
+ return nullptr;
+}
+
+bool
+S3ExtractError(std::string_view Body, std::string_view& OutCode, std::string_view& OutMessage)
+{
+ if (Body.find("<Error>") == std::string_view::npos)
+ {
+ return false;
+ }
+ OutCode = S3ExtractXmlValue(Body, "Code");
+ OutMessage = S3ExtractXmlValue(Body, "Message");
+ // Treat malformed bodies (Error tag present but no parseable Code/Message)
+ // as a parse miss; callers format "<prefix>: <Code> - <Message>" and an
+ // empty render is indistinguishable from "no error". S3IsThrottled with
+ // empty ErrorCode + S3ErrorMessage's Response.ErrorMessage fallback path
+ // covers status-only triage.
+ return !OutCode.empty() || !OutMessage.empty();
+}
+
+bool
+S3IsThrottled(const HttpClient::Response& Response, std::string_view ErrorCode)
+{
+ const int Status = static_cast<int>(Response.StatusCode);
+ if (Status == 503 || Status == 429)
+ {
+ return true;
+ }
+ if (ErrorCode == "SlowDown" || ErrorCode == "ServiceUnavailable" || ErrorCode == "ThrottlingException" ||
+ ErrorCode == "RequestLimitExceeded" || ErrorCode == "TooManyRequests")
+ {
+ return true;
+ }
+ return false;
+}
+
+std::string
+S3ErrorMessage(std::string_view Prefix, const HttpClient::Response& Response)
+{
+ if (!Response.Error.has_value() && Response.ResponsePayload)
+ {
+ std::string_view Body(reinterpret_cast<const char*>(Response.ResponsePayload.GetData()), Response.ResponsePayload.GetSize());
+ std::string_view Code;
+ std::string_view Message;
+ if (S3ExtractError(Body, Code, Message))
+ {
+ ExtendableStringBuilder<256> Decoded;
+ S3DecodeXmlEntities(Message, Decoded);
+ if (S3IsThrottled(Response, Code))
+ {
+ ZEN_WARN("S3 THROTTLED [{}] status={} code='{}' message='{}'",
+ Prefix,
+ static_cast<int>(Response.StatusCode),
+ Code,
+ Decoded.ToView());
+ }
+ return fmt::format("{}: HTTP status ({}) {} - {}", Prefix, static_cast<int>(Response.StatusCode), Code, Decoded.ToView());
+ }
+ }
+ if (S3IsThrottled(Response, {}))
+ {
+ ZEN_WARN("S3 THROTTLED [{}] status={} (no XML body)", Prefix, static_cast<int>(Response.StatusCode));
+ }
+ return Response.ErrorMessage(Prefix);
+}
+
+} // namespace zen