aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/cloud/s3client.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-05-05 14:59:21 +0200
committerGitHub Enterprise <[email protected]>2026-05-05 14:59:21 +0200
commit46f456ffd4d0717a035253ff9076ca6ee664e536 (patch)
tree69d7a9a43b9874fd3990c43aa5ff4135c35d53d9 /src/zenutil/cloud/s3client.cpp
parentwatchdog ephemeral port exhaust (#1022) (diff)
downloadarchived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.tar.xz
archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.zip
hub async s3 client (#1024)
- Feature: `AsyncHttpClient` adds cancellable request tokens, streaming GET to a file (`AsyncDownload`), zero-copy chunk-callback GET (`AsyncStream`), pull-mode body source for streaming `AsyncPut`, retry layer mirroring the synchronous client, and a submit-side in-flight cap (`HttpClientSettings::MaxConcurrentRequests`) so hub-scale fanout against a single host cannot stall queued handles into curl's connect-timeout window - Feature: Hub hydration can route S3 transfers through a non-blocking `AsyncHttpClient` (curl_multi + asio) backed by a single io thread; hydrate and dehydrate now pipeline requests instead of blocking worker threads - `--hub-hydration-async-enabled` (Lua: `hub.hydration.async.enabled`, default true) - `--hub-hydration-async-max-concurrent-requests` (Lua: `hub.hydration.async.maxconcurrentrequests`, default `clamp(cpu*4, 128, 512)`) - Feature: Hub provision/deprovision/obliterate now run as two phases on separate worker pools so per-module hydration cannot starve child-process spawn/despawn (and vice versa) - New `--hub-instance-spawn-threads` (Lua: `hub.instance.spawnthreads`, default `clamp(cpu/8, 4, 16)`) drives child-process spawn/despawn - `--hub-instance-provision-threads` (Lua: `hub.instance.provisionthreads`) now drives per-module hydrate/dehydrate scheduling only; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)` - `--hub-hydration-threads` (Lua: `hub.hydration.threads`) now controls per-file workers inside a single hydrate/dehydrate; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)` - Feature: `AsyncHttpClient` owns its `asio::io_context` and one io thread by default; the `(BaseUri, io_context&)` constructor is preserved for callers that want to share an externally-driven `io_context` across clients (caller MUST keep the loop running until the client destructs) - Feature: `Hub::Configuration` C++ struct fields renamed (`OptionalProvisionWorkerPool`/`OptionalHydrationWorkerPool` -> `OptionalProvisionPool`/`OptionalSpawnPool`/`OptionalHydrationPool`). Embedders constructing `Hub` directly must update field names; provision and spawn pools must both be set or both null (asserted at construction). - Bugfix: `S3Client` signing-key cache no longer returns stale signatures after IMDS-rotated credentials change `AccessKeyId`; cache is now keyed on `(DateStamp, AccessKeyId)`
Diffstat (limited to 'src/zenutil/cloud/s3client.cpp')
-rw-r--r--src/zenutil/cloud/s3client.cpp592
1 files changed, 181 insertions, 411 deletions
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp
index ab80cfcc7..83443b98b 100644
--- a/src/zenutil/cloud/s3client.cpp
+++ b/src/zenutil/cloud/s3client.cpp
@@ -4,6 +4,7 @@
#include <zenutil/cloud/imdscredentials.h>
#include <zenutil/cloud/minioprocess.h>
+#include <zenutil/cloud/s3response.h>
#include <zencore/except_fmt.h>
#include <zencore/iobuffer.h>
@@ -19,210 +20,21 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
-namespace {
-
- /// The SHA-256 hash of an empty payload, precomputed
- constexpr std::string_view EmptyPayloadHash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
-
- /// Simple XML value extractor. Finds the text content between <Tag> and </Tag>.
- /// This is intentionally minimal - we only need to parse ListBucketResult responses.
- /// Returns a string_view into the original XML when no entity decoding is needed.
- std::string_view ExtractXmlValue(std::string_view Xml, std::string_view Tag)
- {
- std::string OpenTag = fmt::format("<{}>", Tag);
- std::string CloseTag = fmt::format("</{}>", Tag);
-
- size_t Start = Xml.find(OpenTag);
- if (Start == std::string_view::npos)
- {
- return {};
- }
- Start += OpenTag.size();
-
- size_t End = Xml.find(CloseTag, Start);
- if (End == std::string_view::npos)
- {
- return {};
- }
-
- return Xml.substr(Start, End - Start);
- }
-
- /// Decode the five standard XML entities (&amp; &lt; &gt; &quot; &apos;) into a StringBuilderBase.
- void DecodeXmlEntities(std::string_view Input, StringBuilderBase& Out)
- {
- if (Input.find('&') == std::string_view::npos)
- {
- Out.Append(Input);
- return;
- }
-
- for (size_t i = 0; i < Input.size(); ++i)
- {
- if (Input[i] == '&')
- {
- std::string_view Remaining = Input.substr(i);
- if (Remaining.starts_with("&amp;"))
- {
- Out.Append('&');
- i += 4;
- }
- else if (Remaining.starts_with("&lt;"))
- {
- Out.Append('<');
- i += 3;
- }
- else if (Remaining.starts_with("&gt;"))
- {
- Out.Append('>');
- i += 3;
- }
- else if (Remaining.starts_with("&quot;"))
- {
- Out.Append('"');
- i += 5;
- }
- else if (Remaining.starts_with("&apos;"))
- {
- Out.Append('\'');
- i += 5;
- }
- else
- {
- Out.Append(Input[i]);
- }
- }
- else
- {
- Out.Append(Input[i]);
- }
- }
- }
-
- /// Convenience: decode XML entities and return as std::string.
- std::string DecodeXmlEntities(std::string_view Input)
- {
- if (Input.find('&') == std::string_view::npos)
- {
- return std::string(Input);
- }
-
- ExtendableStringBuilder<256> Sb;
- DecodeXmlEntities(Input, Sb);
- return Sb.ToString();
- }
-
- /// Join a path and canonical query string into a full request path for the HTTP client.
- std::string BuildRequestPath(std::string_view Path, std::string_view CanonicalQS)
- {
- if (CanonicalQS.empty())
- {
- return std::string(Path);
- }
- return fmt::format("{}?{}", Path, CanonicalQS);
- }
-
- /// Case-insensitive header lookup in an HttpClient response header map.
- const std::string* FindResponseHeader(const HttpClient::KeyValueMap& Headers, std::string_view Name)
- {
- for (const auto& [K, V] : *Headers)
- {
- if (StrCaseCompare(K, Name) == 0)
- {
- return &V;
- }
- }
- return nullptr;
- }
-
- /// Extract Code/Message from an S3 XML error body. Returns true if an <Error> element was
- /// found, even if Code/Message are empty.
- bool ExtractS3Error(std::string_view Body, std::string_view& OutCode, std::string_view& OutMessage)
- {
- if (Body.find("<Error>") == std::string_view::npos)
- {
- return false;
- }
- OutCode = ExtractXmlValue(Body, "Code");
- OutMessage = ExtractXmlValue(Body, "Message");
- return true;
- }
-
- /// True if the response indicates S3 throttling (503 SlowDown / ServiceUnavailable / 429).
- /// Code is checked on both the HTTP status and the XML error code so we catch proxies that
- /// return 200 with a SlowDown body.
- bool IsS3Throttled(const HttpClient::Response& Response, std::string_view ErrorCode)
- {
- const int Status = static_cast<int>(Response.StatusCode);
- if (Status == 503 || Status == 429)
- {
- return true;
- }
- if (ErrorCode == "SlowDown" || ErrorCode == "ServiceUnavailable" || ErrorCode == "ThrottlingException" ||
- ErrorCode == "RequestLimitExceeded" || ErrorCode == "TooManyRequests")
- {
- return true;
- }
- return false;
- }
-
- /// Build a human-readable error message for a failed S3 response. When the response body
- /// contains an S3 `<Error>` element, the Code and Message fields are included in the string
- /// so transient 4xx/5xx failures (SignatureDoesNotMatch, AuthorizationHeaderMalformed, etc.)
- /// show up in logs instead of being swallowed. Falls back to the generic HTTP/transport
- /// message when no XML body is available (HEAD responses, transport errors).
- /// Also emits a distinct `S3 THROTTLED` warning when the response indicates throttling so
- /// callers can grep for it without parsing combined error text.
- std::string S3ErrorMessage(std::string_view Prefix, const HttpClient::Response& Response)
- {
- if (!Response.Error.has_value() && Response.ResponsePayload)
- {
- std::string_view Body(reinterpret_cast<const char*>(Response.ResponsePayload.GetData()), Response.ResponsePayload.GetSize());
- std::string_view Code;
- std::string_view Message;
- if (ExtractS3Error(Body, Code, Message) && (!Code.empty() || !Message.empty()))
- {
- ExtendableStringBuilder<256> Decoded;
- DecodeXmlEntities(Message, Decoded);
- if (IsS3Throttled(Response, Code))
- {
- ZEN_WARN("S3 THROTTLED [{}] status={} code='{}' message='{}'",
- Prefix,
- static_cast<int>(Response.StatusCode),
- Code,
- Decoded.ToView());
- }
- return fmt::format("{}: HTTP status ({}) {} - {}", Prefix, static_cast<int>(Response.StatusCode), Code, Decoded.ToView());
- }
- }
- if (IsS3Throttled(Response, {}))
- {
- ZEN_WARN("S3 THROTTLED [{}] status={} (no XML body)", Prefix, static_cast<int>(Response.StatusCode));
- }
- return Response.ErrorMessage(Prefix);
- }
-
-} // namespace
-
std::string_view S3GetObjectResult::NotFoundErrorText = "Not found";
S3Client::S3Client(const S3ClientOptions& Options)
: m_Log(logging::Get("s3"))
-, m_BucketName(Options.BucketName)
-, m_Region(Options.Region)
-, m_Endpoint(Options.Endpoint)
-, m_PathStyle(Options.PathStyle)
+, m_Builder(Options.Region, Options.BucketName, Options.Endpoint, Options.PathStyle)
, m_Credentials(Options.Credentials)
, m_CredentialProvider(Options.CredentialProvider)
-, m_HttpClient(BuildEndpoint(), Options.HttpSettings)
+, m_HttpClient(std::string(m_Builder.Endpoint()), Options.HttpSettings)
, m_Verbose(Options.HttpSettings.Verbose)
{
- m_Host = BuildHostHeader();
ZEN_INFO("S3 client configured for bucket '{}' in region '{}' (endpoint: {}, {})",
- m_BucketName,
- m_Region,
+ m_Builder.BucketName(),
+ m_Builder.Region(),
m_HttpClient.GetBaseUri(),
- m_PathStyle ? "path-style" : "virtual-hosted");
+ m_Builder.PathStyle() ? "path-style" : "virtual-hosted");
}
S3Client::~S3Client() = default;
@@ -235,19 +47,15 @@ S3Client::GetCurrentCredentials()
SigV4Credentials Creds = m_CredentialProvider->GetCredentials();
if (!Creds.AccessKeyId.empty())
{
- // Invalidate the signing key cache when the access key changes, and update stored
- // credentials atomically under the same lock so callers see a consistent snapshot.
- RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock);
- if (Creds.AccessKeyId != m_Credentials.AccessKeyId)
- {
- m_CachedDateStamp.clear();
- }
+ // Update stored credentials atomically so callers see a consistent snapshot.
+ // The builder's signing-key cache is keyed on (DateStamp, AccessKeyId), so it
+ // self-invalidates on the next Sign() call when either rotates.
+ RwLock::ExclusiveLockScope _(m_CredentialsLock);
m_Credentials = Creds;
- // Return Creds directly - avoids reading m_Credentials after releasing the lock,
- // which would race with another concurrent write.
return Creds;
}
// IMDS returned empty credentials; fall back to the last known-good credentials.
+ RwLock::SharedLockScope _(m_CredentialsLock);
return m_Credentials;
}
return m_Credentials;
@@ -261,153 +69,6 @@ S3Client::BuildNoCredentialsError(std::string Context)
return Err;
}
-std::string
-S3Client::BuildEndpoint() const
-{
- if (!m_Endpoint.empty())
- {
- return m_Endpoint;
- }
-
- if (m_PathStyle)
- {
- // Path-style: https://s3.region.amazonaws.com
- return fmt::format("https://s3.{}.amazonaws.com", m_Region);
- }
-
- // Virtual-hosted style: https://bucket.s3.region.amazonaws.com
- return fmt::format("https://{}.s3.{}.amazonaws.com", m_BucketName, m_Region);
-}
-
-std::string
-S3Client::BuildHostHeader() const
-{
- if (!m_Endpoint.empty())
- {
- // Extract host from custom endpoint URL (strip scheme)
- std::string_view Ep = m_Endpoint;
- if (size_t Pos = Ep.find("://"); Pos != std::string_view::npos)
- {
- Ep = Ep.substr(Pos + 3);
- }
- // Strip trailing slash
- if (!Ep.empty() && Ep.back() == '/')
- {
- Ep = Ep.substr(0, Ep.size() - 1);
- }
- return std::string(Ep);
- }
-
- if (m_PathStyle)
- {
- return fmt::format("s3.{}.amazonaws.com", m_Region);
- }
-
- return fmt::format("{}.s3.{}.amazonaws.com", m_BucketName, m_Region);
-}
-
-std::string
-S3Client::KeyToPath(std::string_view Key) const
-{
- if (m_PathStyle)
- {
- return fmt::format("/{}/{}", m_BucketName, Key);
- }
- return fmt::format("/{}", Key);
-}
-
-std::string
-S3Client::BucketRootPath() const
-{
- if (m_PathStyle)
- {
- return fmt::format("/{}/", m_BucketName);
- }
- return "/";
-}
-
-Sha256Digest
-S3Client::GetSigningKey(std::string_view DateStamp)
-{
- // Fast path: shared lock for cache hit (common case - key only changes once per day)
- {
- RwLock::SharedLockScope SharedLock(m_SigningKeyLock);
- if (m_CachedDateStamp == DateStamp)
- {
- return m_CachedSigningKey;
- }
- }
-
- // Slow path: exclusive lock to recompute the signing key
- RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock);
-
- // Double-check after acquiring exclusive lock (another thread may have updated it)
- if (m_CachedDateStamp == DateStamp)
- {
- return m_CachedSigningKey;
- }
-
- std::string SecretPrefix = fmt::format("AWS4{}", m_Credentials.SecretAccessKey);
-
- Sha256Digest DateKey = ComputeHmacSha256(SecretPrefix.data(), SecretPrefix.size(), DateStamp.data(), DateStamp.size());
- SecureZeroSecret(SecretPrefix.data(), SecretPrefix.size());
-
- Sha256Digest RegionKey = ComputeHmacSha256(DateKey, m_Region);
- Sha256Digest ServiceKey = ComputeHmacSha256(RegionKey, "s3");
- m_CachedSigningKey = ComputeHmacSha256(ServiceKey, "aws4_request");
- m_CachedDateStamp = std::string(DateStamp);
-
- return m_CachedSigningKey;
-}
-
-HttpClient::KeyValueMap
-S3Client::SignRequest(const SigV4Credentials& Credentials,
- std::string_view Method,
- std::string_view Path,
- std::string_view CanonicalQueryString,
- std::string_view PayloadHash,
- std::span<const std::pair<std::string, std::string>> ExtraSignedHeaders)
-{
- std::string AmzDate = GetAmzTimestamp();
-
- // Build sorted headers to sign (must be sorted by lowercase name)
- std::vector<std::pair<std::string, std::string>> HeadersToSign;
- HeadersToSign.reserve(4 + ExtraSignedHeaders.size());
- HeadersToSign.emplace_back("host", m_Host);
- HeadersToSign.emplace_back("x-amz-content-sha256", std::string(PayloadHash));
- HeadersToSign.emplace_back("x-amz-date", AmzDate);
- if (!Credentials.SessionToken.empty())
- {
- HeadersToSign.emplace_back("x-amz-security-token", Credentials.SessionToken);
- }
- for (const auto& [K, V] : ExtraSignedHeaders)
- {
- HeadersToSign.emplace_back(K, V);
- }
- std::sort(HeadersToSign.begin(), HeadersToSign.end());
-
- std::string_view DateStamp(AmzDate.data(), 8);
- Sha256Digest SigningKey = GetSigningKey(DateStamp);
-
- SigV4SignedHeaders Signed =
- SignRequestV4(Credentials, Method, Path, CanonicalQueryString, m_Region, "s3", AmzDate, HeadersToSign, PayloadHash, &SigningKey);
-
- HttpClient::KeyValueMap Result;
- Result->emplace("Authorization", std::move(Signed.Authorization));
- Result->emplace("x-amz-date", std::move(Signed.AmzDate));
- Result->emplace("x-amz-content-sha256", std::move(Signed.PayloadHash));
- if (!Credentials.SessionToken.empty())
- {
- Result->emplace("x-amz-security-token", Credentials.SessionToken);
- }
- for (const auto& [K, V] : ExtraSignedHeaders)
- {
- Result->emplace(K, V);
- }
-
- return Result;
-}
-
S3Result
S3Client::PutObject(std::string_view Key, IoBuffer Content)
{
@@ -417,12 +78,12 @@ S3Client::PutObject(std::string_view Key, IoBuffer Content)
return S3Result{std::move(Err)};
}
- std::string Path = KeyToPath(Key);
+ std::string Path = m_Builder.KeyToPath(Key);
// Hash the payload
std::string PayloadHash = Sha256ToHex(ComputeSha256(Content.GetData(), Content.GetSize()));
- HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, "", PayloadHash);
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "PUT", Path, "", PayloadHash);
HttpClient::Response Response = m_HttpClient.Put(Path, Content, Headers);
if (!Response.IsSuccess())
@@ -448,9 +109,9 @@ S3Client::GetObject(std::string_view Key, const std::filesystem::path& TempFileP
return S3GetObjectResult{S3Result{std::move(Err)}, {}};
}
- std::string Path = KeyToPath(Key);
+ std::string Path = m_Builder.KeyToPath(Key);
- HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", Path, "", EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "GET", Path, "", S3EmptyPayloadHash);
HttpClient::Response Response = m_HttpClient.Download(Path, TempFilePath, Headers);
if (!Response.IsSuccess())
@@ -484,9 +145,9 @@ S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t Ran
return S3GetObjectResult{S3Result{std::move(Err)}, {}};
}
- std::string Path = KeyToPath(Key);
+ std::string Path = m_Builder.KeyToPath(Key);
- HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", Path, "", EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "GET", Path, "", S3EmptyPayloadHash);
Headers->emplace("Range", fmt::format("bytes={}-{}", RangeStart, RangeStart + RangeSize - 1));
HttpClient::Response Response = m_HttpClient.Get(Path, Headers);
@@ -537,9 +198,9 @@ S3Client::DeleteObject(std::string_view Key)
return S3Result{std::move(Err)};
}
- std::string Path = KeyToPath(Key);
+ std::string Path = m_Builder.KeyToPath(Key);
- HttpClient::KeyValueMap Headers = SignRequest(Credentials, "DELETE", Path, "", EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "DELETE", Path, "", S3EmptyPayloadHash);
HttpClient::Response Response = m_HttpClient.Delete(Path, Headers);
if (!Response.IsSuccess())
@@ -565,17 +226,17 @@ S3Client::Touch(std::string_view Key)
return S3Result{std::move(Err)};
}
- std::string Path = KeyToPath(Key);
+ std::string Path = m_Builder.KeyToPath(Key);
// x-amz-copy-source is always "/bucket/key" regardless of addressing style.
// Key must be URI-encoded except for '/' separators. When source and destination
// are identical, REPLACE is required; COPY is rejected with InvalidRequest.
const std::array<std::pair<std::string, std::string>, 2> ExtraSigned{{
- {"x-amz-copy-source", fmt::format("/{}/{}", m_BucketName, AwsUriEncode(Key, false))},
+ {"x-amz-copy-source", fmt::format("/{}/{}", m_Builder.BucketName(), AwsUriEncode(Key, false))},
{"x-amz-metadata-directive", "REPLACE"},
}};
- HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, "", EmptyPayloadHash, ExtraSigned);
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "PUT", Path, "", S3EmptyPayloadHash, ExtraSigned);
HttpClient::Response Response = m_HttpClient.Put(Path, IoBuffer{}, Headers);
if (!Response.IsSuccess())
@@ -589,7 +250,7 @@ S3Client::Touch(std::string_view Key)
std::string_view ResponseBody = Response.AsText();
std::string_view ErrorCode;
std::string_view ErrorMessage;
- if (ExtractS3Error(ResponseBody, ErrorCode, ErrorMessage))
+ if (S3ExtractError(ResponseBody, ErrorCode, ErrorMessage))
{
std::string Err = fmt::format("S3 Touch '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage);
ZEN_WARN("{}", Err);
@@ -612,9 +273,9 @@ S3Client::HeadObject(std::string_view Key)
return S3HeadObjectResult{S3Result{std::move(Err)}, {}, HeadObjectResult::Error};
}
- std::string Path = KeyToPath(Key);
+ std::string Path = m_Builder.KeyToPath(Key);
- HttpClient::KeyValueMap Headers = SignRequest(Credentials, "HEAD", Path, "", EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "HEAD", Path, "", S3EmptyPayloadHash);
HttpClient::Response Response = m_HttpClient.Head(Path, Headers);
if (!Response.IsSuccess())
@@ -632,17 +293,17 @@ S3Client::HeadObject(std::string_view Key)
S3ObjectInfo Info;
Info.Key = std::string(Key);
- if (const std::string* V = FindResponseHeader(Response.Header, "content-length"))
+ if (const std::string* V = S3FindResponseHeader(Response.Header, "content-length"))
{
Info.Size = ParseInt<uint64_t>(*V).value_or(0);
}
- if (const std::string* V = FindResponseHeader(Response.Header, "etag"))
+ if (const std::string* V = S3FindResponseHeader(Response.Header, "etag"))
{
Info.ETag = *V;
}
- if (const std::string* V = FindResponseHeader(Response.Header, "last-modified"))
+ if (const std::string* V = S3FindResponseHeader(Response.Header, "last-modified"))
{
Info.LastModified = *V;
}
@@ -687,10 +348,10 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys)
}
std::string CanonicalQS = BuildCanonicalQueryString(std::move(QueryParams));
- std::string RootPath = BucketRootPath();
- HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", RootPath, CanonicalQS, EmptyPayloadHash);
+ std::string RootPath = m_Builder.BucketRootPath();
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "GET", RootPath, CanonicalQS, S3EmptyPayloadHash);
- std::string FullPath = BuildRequestPath(RootPath, CanonicalQS);
+ std::string FullPath = S3BuildRequestPath(RootPath, CanonicalQS);
HttpClient::Response Response = m_HttpClient.Get(FullPath, Headers);
if (!Response.IsSuccess())
{
@@ -722,11 +383,11 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys)
std::string_view ContentsXml = Remaining.substr(ContentsStart, ContentsEnd - ContentsStart + 11);
S3ObjectInfo Info;
- Info.Key = DecodeXmlEntities(ExtractXmlValue(ContentsXml, "Key"));
- Info.ETag = DecodeXmlEntities(ExtractXmlValue(ContentsXml, "ETag"));
- Info.LastModified = std::string(ExtractXmlValue(ContentsXml, "LastModified"));
+ Info.Key = S3DecodeXmlEntities(S3ExtractXmlValue(ContentsXml, "Key"));
+ Info.ETag = S3DecodeXmlEntities(S3ExtractXmlValue(ContentsXml, "ETag"));
+ Info.LastModified = std::string(S3ExtractXmlValue(ContentsXml, "LastModified"));
- std::string_view SizeStr = ExtractXmlValue(ContentsXml, "Size");
+ std::string_view SizeStr = S3ExtractXmlValue(ContentsXml, "Size");
if (!SizeStr.empty())
{
Info.Size = ParseInt<uint64_t>(SizeStr).value_or(0);
@@ -741,13 +402,13 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys)
}
// Check if there are more pages
- std::string_view IsTruncated = ExtractXmlValue(ResponseBody, "IsTruncated");
+ std::string_view IsTruncated = S3ExtractXmlValue(ResponseBody, "IsTruncated");
if (IsTruncated != "true")
{
break;
}
- std::string_view NextToken = ExtractXmlValue(ResponseBody, "NextContinuationToken");
+ std::string_view NextToken = S3ExtractXmlValue(ResponseBody, "NextContinuationToken");
if (NextToken.empty())
{
break;
@@ -779,12 +440,12 @@ S3Client::CreateMultipartUpload(std::string_view Key)
return S3CreateMultipartUploadResult{S3Result{std::move(Err)}, {}};
}
- std::string Path = KeyToPath(Key);
+ std::string Path = m_Builder.KeyToPath(Key);
std::string CanonicalQS = BuildCanonicalQueryString({{"uploads", ""}});
- HttpClient::KeyValueMap Headers = SignRequest(Credentials, "POST", Path, CanonicalQS, EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "POST", Path, CanonicalQS, S3EmptyPayloadHash);
- std::string FullPath = BuildRequestPath(Path, CanonicalQS);
+ std::string FullPath = S3BuildRequestPath(Path, CanonicalQS);
HttpClient::Response Response = m_HttpClient.Post(FullPath, Headers);
if (!Response.IsSuccess())
{
@@ -800,7 +461,7 @@ S3Client::CreateMultipartUpload(std::string_view Key)
// <UploadId>...</UploadId>
// </InitiateMultipartUploadResult>
std::string_view ResponseBody = Response.AsText();
- std::string_view UploadId = ExtractXmlValue(ResponseBody, "UploadId");
+ std::string_view UploadId = S3ExtractXmlValue(ResponseBody, "UploadId");
if (UploadId.empty())
{
std::string Err = "failed to parse UploadId from CreateMultipartUpload response";
@@ -824,7 +485,7 @@ S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t P
return S3UploadPartResult{S3Result{std::move(Err)}, {}};
}
- std::string Path = KeyToPath(Key);
+ std::string Path = m_Builder.KeyToPath(Key);
std::string CanonicalQS = BuildCanonicalQueryString({
{"partNumber", fmt::format("{}", PartNumber)},
{"uploadId", std::string(UploadId)},
@@ -832,9 +493,9 @@ S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t P
std::string PayloadHash = Sha256ToHex(ComputeSha256(Content.GetData(), Content.GetSize()));
- HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, CanonicalQS, PayloadHash);
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "PUT", Path, CanonicalQS, PayloadHash);
- std::string FullPath = BuildRequestPath(Path, CanonicalQS);
+ std::string FullPath = S3BuildRequestPath(Path, CanonicalQS);
HttpClient::Response Response = m_HttpClient.Put(FullPath, Content, Headers);
if (!Response.IsSuccess())
{
@@ -844,7 +505,7 @@ S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t P
}
// Extract ETag from response headers
- const std::string* ETag = FindResponseHeader(Response.Header, "etag");
+ const std::string* ETag = S3FindResponseHeader(Response.Header, "etag");
if (!ETag)
{
std::string Err = "S3 UploadPart response missing ETag header";
@@ -870,7 +531,7 @@ S3Client::CompleteMultipartUpload(std::string_view Key,
return S3Result{std::move(Err)};
}
- std::string Path = KeyToPath(Key);
+ std::string Path = m_Builder.KeyToPath(Key);
std::string CanonicalQS = BuildCanonicalQueryString({{"uploadId", std::string(UploadId)}});
// Build the CompleteMultipartUpload XML payload
@@ -885,12 +546,12 @@ S3Client::CompleteMultipartUpload(std::string_view Key,
std::string_view XmlView = XmlBody.ToView();
std::string PayloadHash = Sha256ToHex(ComputeSha256(XmlView));
- HttpClient::KeyValueMap Headers = SignRequest(Credentials, "POST", Path, CanonicalQS, PayloadHash);
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "POST", Path, CanonicalQS, PayloadHash);
Headers->emplace("Content-Type", "application/xml");
IoBuffer Payload(IoBuffer::Clone, XmlView.data(), XmlView.size());
- std::string FullPath = BuildRequestPath(Path, CanonicalQS);
+ std::string FullPath = S3BuildRequestPath(Path, CanonicalQS);
HttpClient::Response Response = m_HttpClient.Post(FullPath, Payload, Headers);
if (!Response.IsSuccess())
{
@@ -903,7 +564,7 @@ S3Client::CompleteMultipartUpload(std::string_view Key,
std::string_view ResponseBody = Response.AsText();
std::string_view ErrorCode;
std::string_view ErrorMessage;
- if (ExtractS3Error(ResponseBody, ErrorCode, ErrorMessage))
+ if (S3ExtractError(ResponseBody, ErrorCode, ErrorMessage))
{
std::string Err = fmt::format("S3 CompleteMultipartUpload '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage);
ZEN_WARN("{}", Err);
@@ -926,12 +587,12 @@ S3Client::AbortMultipartUpload(std::string_view Key, std::string_view UploadId)
return S3Result{std::move(Err)};
}
- std::string Path = KeyToPath(Key);
+ std::string Path = m_Builder.KeyToPath(Key);
std::string CanonicalQS = BuildCanonicalQueryString({{"uploadId", std::string(UploadId)}});
- HttpClient::KeyValueMap Headers = SignRequest(Credentials, "DELETE", Path, CanonicalQS, EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "DELETE", Path, CanonicalQS, S3EmptyPayloadHash);
- std::string FullPath = BuildRequestPath(Path, CanonicalQS);
+ std::string FullPath = S3BuildRequestPath(Path, CanonicalQS);
HttpClient::Response Response = m_HttpClient.Delete(FullPath, Headers);
if (!Response.IsSuccess())
{
@@ -968,15 +629,15 @@ S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view M
return {};
}
- std::string Path = KeyToPath(Key);
+ std::string Path = m_Builder.KeyToPath(Key);
std::string Scheme = "https";
- if (!m_Endpoint.empty() && m_Endpoint.starts_with("http://"))
+ if (!m_Builder.Endpoint().empty() && m_Builder.Endpoint().starts_with("http://"))
{
Scheme = "http";
}
- return GeneratePresignedUrl(Credentials, Method, Scheme, m_Host, Path, m_Region, "s3", ExpiresIn);
+ return GeneratePresignedUrl(Credentials, Method, Scheme, m_Builder.Host(), Path, m_Builder.Region(), "s3", ExpiresIn);
}
S3Result
@@ -1004,8 +665,26 @@ S3Client::PutObjectMultipart(std::string_view Key,
const std::string& UploadId = InitResult.UploadId;
- // Upload parts sequentially
- // TODO: upload parts in parallel for improved throughput on large uploads
+ // Cleanup helper: AbortMultipartUpload itself can throw on transport failure;
+ // inside the catch (...) below that would replace the original exception with
+ // a less actionable transport one. Swallow + log.
+ auto SafeAbort = [this, &Key, &UploadId]() noexcept {
+ try
+ {
+ AbortMultipartUpload(Key, UploadId);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("S3 AbortMultipartUpload '{}' threw during cleanup: {}", Key, Ex.what());
+ }
+ catch (...)
+ {
+ ZEN_WARN("S3 AbortMultipartUpload '{}' threw during cleanup", Key);
+ }
+ };
+
+ // Sequential upload by design; for parallel multipart use S3AsyncStorage::PutMultipart
+ // in the hub hydration path.
std::vector<std::pair<uint32_t, std::string>> PartETags;
uint64_t Offset = 0;
@@ -1020,7 +699,7 @@ S3Client::PutObjectMultipart(std::string_view Key,
S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, std::move(PartContent));
if (!PartResult)
{
- AbortMultipartUpload(Key, UploadId);
+ SafeAbort();
return S3Result{std::move(PartResult.Error)};
}
@@ -1032,13 +711,13 @@ S3Client::PutObjectMultipart(std::string_view Key,
S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags);
if (!CompleteResult)
{
- AbortMultipartUpload(Key, UploadId);
+ SafeAbort();
return CompleteResult;
}
}
catch (...)
{
- AbortMultipartUpload(Key, UploadId);
+ SafeAbort();
throw;
}
@@ -1077,25 +756,25 @@ TEST_CASE("s3client.xml_extract")
"<Contents><Key>test/file.txt</Key><Size>1234</Size>"
"<ETag>\"abc123\"</ETag><LastModified>2024-01-01T00:00:00Z</LastModified></Contents>";
- CHECK(ExtractXmlValue(Xml, "Key") == "test/file.txt");
- CHECK(ExtractXmlValue(Xml, "Size") == "1234");
- CHECK(ExtractXmlValue(Xml, "ETag") == "\"abc123\"");
- CHECK(ExtractXmlValue(Xml, "LastModified") == "2024-01-01T00:00:00Z");
- CHECK(ExtractXmlValue(Xml, "NonExistent") == "");
+ CHECK(S3ExtractXmlValue(Xml, "Key") == "test/file.txt");
+ CHECK(S3ExtractXmlValue(Xml, "Size") == "1234");
+ CHECK(S3ExtractXmlValue(Xml, "ETag") == "\"abc123\"");
+ CHECK(S3ExtractXmlValue(Xml, "LastModified") == "2024-01-01T00:00:00Z");
+ CHECK(S3ExtractXmlValue(Xml, "NonExistent") == "");
}
TEST_CASE("s3client.xml_entity_decode")
{
- CHECK(DecodeXmlEntities("no entities") == "no entities");
- CHECK(DecodeXmlEntities("a&amp;b") == "a&b");
- CHECK(DecodeXmlEntities("&lt;tag&gt;") == "<tag>");
- CHECK(DecodeXmlEntities("&quot;hello&apos;") == "\"hello'");
- CHECK(DecodeXmlEntities("&amp;&amp;") == "&&");
- CHECK(DecodeXmlEntities("") == "");
+ CHECK(S3DecodeXmlEntities("no entities") == "no entities");
+ CHECK(S3DecodeXmlEntities("a&amp;b") == "a&b");
+ CHECK(S3DecodeXmlEntities("&lt;tag&gt;") == "<tag>");
+ CHECK(S3DecodeXmlEntities("&quot;hello&apos;") == "\"hello'");
+ CHECK(S3DecodeXmlEntities("&amp;&amp;") == "&&");
+ CHECK(S3DecodeXmlEntities("") == "");
// Key with entities as S3 would return it
std::string_view Xml = "<Key>path/file&amp;name&lt;1&gt;.txt</Key>";
- CHECK(DecodeXmlEntities(ExtractXmlValue(Xml, "Key")) == "path/file&name<1>.txt");
+ CHECK(S3DecodeXmlEntities(S3ExtractXmlValue(Xml, "Key")) == "path/file&name<1>.txt");
}
TEST_CASE("s3client.path_style_addressing")
@@ -1129,6 +808,97 @@ TEST_CASE("s3client.virtual_hosted_addressing")
CHECK(Client.Region() == "eu-west-1");
}
+TEST_CASE("s3requestbuilder.path_style_paths")
+{
+ S3RequestBuilder Builder("us-east-1", "test-bucket", "http://localhost:9000", /*PathStyle*/ true);
+
+ CHECK(Builder.Region() == "us-east-1");
+ CHECK(Builder.BucketName() == "test-bucket");
+ CHECK(Builder.PathStyle());
+ CHECK(Builder.Endpoint() == "http://localhost:9000");
+ CHECK(Builder.Host() == "localhost:9000");
+ CHECK(Builder.KeyToPath("foo/bar") == "/test-bucket/foo/bar");
+ CHECK(Builder.BucketRootPath() == "/test-bucket/");
+}
+
+TEST_CASE("s3requestbuilder.virtual_hosted_paths")
+{
+ S3RequestBuilder Builder("eu-west-1", "my-bucket", /*Endpoint*/ "", /*PathStyle*/ false);
+
+ CHECK(Builder.Endpoint() == "https://my-bucket.s3.eu-west-1.amazonaws.com");
+ CHECK(Builder.Host() == "my-bucket.s3.eu-west-1.amazonaws.com");
+ CHECK(Builder.KeyToPath("foo/bar") == "/foo/bar");
+ CHECK(Builder.BucketRootPath() == "/");
+}
+
+TEST_CASE("s3requestbuilder.derived_endpoint_path_style")
+{
+ S3RequestBuilder Builder("us-west-2", "another-bucket", /*Endpoint*/ "", /*PathStyle*/ true);
+ CHECK(Builder.Endpoint() == "https://s3.us-west-2.amazonaws.com");
+ CHECK(Builder.Host() == "s3.us-west-2.amazonaws.com");
+}
+
+TEST_CASE("s3requestbuilder.sign_request_headers")
+{
+ S3RequestBuilder Builder("us-east-1", "bucket", "http://localhost:9000", /*PathStyle*/ true);
+
+ SigV4Credentials Creds;
+ Creds.AccessKeyId = "AKIDEXAMPLE";
+ Creds.SecretAccessKey = "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY";
+
+ HttpClient::KeyValueMap Headers = Builder.SignRequest(Creds, "GET", "/bucket/foo", "", S3EmptyPayloadHash);
+
+ // All four required SigV4 headers present.
+ CHECK(Headers->find("Authorization") != Headers->end());
+ CHECK(Headers->find("x-amz-date") != Headers->end());
+ CHECK(Headers->find("x-amz-content-sha256") != Headers->end());
+
+ // SessionToken absent -> no x-amz-security-token header.
+ CHECK(Headers->find("x-amz-security-token") == Headers->end());
+
+ const std::string& Auth = Headers->find("Authorization")->second;
+ CHECK(Auth.starts_with("AWS4-HMAC-SHA256 "));
+ CHECK(Auth.find("Credential=AKIDEXAMPLE/") != std::string::npos);
+ CHECK(Auth.find("/us-east-1/s3/aws4_request") != std::string::npos);
+ CHECK(Auth.find("Signature=") != std::string::npos);
+}
+
+TEST_CASE("s3requestbuilder.session_token_emits_security_header")
+{
+ S3RequestBuilder Builder("us-east-1", "bucket", "http://localhost:9000", true);
+
+ SigV4Credentials Creds;
+ Creds.AccessKeyId = "ASIA-tmp";
+ Creds.SecretAccessKey = "secret";
+ Creds.SessionToken = "sts-session-token-value";
+
+ HttpClient::KeyValueMap Headers = Builder.SignRequest(Creds, "PUT", "/bucket/key", "", S3EmptyPayloadHash);
+
+ auto It = Headers->find("x-amz-security-token");
+ REQUIRE(It != Headers->end());
+ CHECK(It->second == "sts-session-token-value");
+}
+
+TEST_CASE("s3requestbuilder.signing_key_cache_invalidates_on_key_rotate")
+{
+ // Two consecutive Sign() calls with the same date but different AccessKeyId
+ // must produce different Authorization signatures, proving the cache is keyed
+ // on (DateStamp, AccessKeyId) and not date alone.
+ S3RequestBuilder Builder("us-east-1", "bucket", "http://localhost:9000", true);
+
+ SigV4Credentials A;
+ A.AccessKeyId = "AKIDEXAMPLE";
+ A.SecretAccessKey = "secretA";
+ HttpClient::KeyValueMap HA = Builder.SignRequest(A, "GET", "/bucket/foo", "", S3EmptyPayloadHash);
+
+ SigV4Credentials B;
+ B.AccessKeyId = "AKIDEXAMPLE2";
+ B.SecretAccessKey = "secretB";
+ HttpClient::KeyValueMap HB = Builder.SignRequest(B, "GET", "/bucket/foo", "", S3EmptyPayloadHash);
+
+ CHECK(HA->find("Authorization")->second != HB->find("Authorization")->second);
+}
+
TEST_CASE("s3client.minio_integration")
{
using namespace std::literals;