diff options
| author | Dan Engelbrecht <[email protected]> | 2026-05-05 14:59:21 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-05-05 14:59:21 +0200 |
| commit | 46f456ffd4d0717a035253ff9076ca6ee664e536 (patch) | |
| tree | 69d7a9a43b9874fd3990c43aa5ff4135c35d53d9 /src/zenutil/cloud/s3client.cpp | |
| parent | watchdog ephemeral port exhaust (#1022) (diff) | |
| download | archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.tar.xz archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.zip | |
hub async s3 client (#1024)
- Feature: `AsyncHttpClient` adds cancellable request tokens, streaming GET to a file (`AsyncDownload`), zero-copy chunk-callback GET (`AsyncStream`), pull-mode body source for streaming `AsyncPut`, retry layer mirroring the synchronous client, and a submit-side in-flight cap (`HttpClientSettings::MaxConcurrentRequests`) so hub-scale fanout against a single host cannot stall queued handles into curl's connect-timeout window
- Feature: Hub hydration can route S3 transfers through a non-blocking `AsyncHttpClient` (curl_multi + asio) backed by a single io thread; hydrate and dehydrate now pipeline requests instead of blocking worker threads
- `--hub-hydration-async-enabled` (Lua: `hub.hydration.async.enabled`, default true)
- `--hub-hydration-async-max-concurrent-requests` (Lua: `hub.hydration.async.maxconcurrentrequests`, default `clamp(cpu*4, 128, 512)`)
- Feature: Hub provision/deprovision/obliterate now run as two phases on separate worker pools so per-module hydration cannot starve child-process spawn/despawn (and vice versa)
- New `--hub-instance-spawn-threads` (Lua: `hub.instance.spawnthreads`, default `clamp(cpu/8, 4, 16)`) drives child-process spawn/despawn
- `--hub-instance-provision-threads` (Lua: `hub.instance.provisionthreads`) now drives per-module hydrate/dehydrate scheduling only; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)`
- `--hub-hydration-threads` (Lua: `hub.hydration.threads`) now controls per-file workers inside a single hydrate/dehydrate; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)`
- Feature: `AsyncHttpClient` owns its `asio::io_context` and one io thread by default; the `(BaseUri, io_context&)` constructor is preserved for callers that want to share an externally-driven `io_context` across clients (caller MUST keep the loop running until the client destructs)
- Feature: `Hub::Configuration` C++ struct fields renamed (`OptionalProvisionWorkerPool`/`OptionalHydrationWorkerPool` -> `OptionalProvisionPool`/`OptionalSpawnPool`/`OptionalHydrationPool`). Embedders constructing `Hub` directly must update field names; provision and spawn pools must both be set or both null (asserted at construction).
- Bugfix: `S3Client` signing-key cache no longer returns stale signatures after IMDS-rotated credentials change `AccessKeyId`; cache is now keyed on `(DateStamp, AccessKeyId)`
Diffstat (limited to 'src/zenutil/cloud/s3client.cpp')
| -rw-r--r-- | src/zenutil/cloud/s3client.cpp | 592 |
1 files changed, 181 insertions, 411 deletions
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp index ab80cfcc7..83443b98b 100644 --- a/src/zenutil/cloud/s3client.cpp +++ b/src/zenutil/cloud/s3client.cpp @@ -4,6 +4,7 @@ #include <zenutil/cloud/imdscredentials.h> #include <zenutil/cloud/minioprocess.h> +#include <zenutil/cloud/s3response.h> #include <zencore/except_fmt.h> #include <zencore/iobuffer.h> @@ -19,210 +20,21 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { -namespace { - - /// The SHA-256 hash of an empty payload, precomputed - constexpr std::string_view EmptyPayloadHash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; - - /// Simple XML value extractor. Finds the text content between <Tag> and </Tag>. - /// This is intentionally minimal - we only need to parse ListBucketResult responses. - /// Returns a string_view into the original XML when no entity decoding is needed. - std::string_view ExtractXmlValue(std::string_view Xml, std::string_view Tag) - { - std::string OpenTag = fmt::format("<{}>", Tag); - std::string CloseTag = fmt::format("</{}>", Tag); - - size_t Start = Xml.find(OpenTag); - if (Start == std::string_view::npos) - { - return {}; - } - Start += OpenTag.size(); - - size_t End = Xml.find(CloseTag, Start); - if (End == std::string_view::npos) - { - return {}; - } - - return Xml.substr(Start, End - Start); - } - - /// Decode the five standard XML entities (& < > " ') into a StringBuilderBase. - void DecodeXmlEntities(std::string_view Input, StringBuilderBase& Out) - { - if (Input.find('&') == std::string_view::npos) - { - Out.Append(Input); - return; - } - - for (size_t i = 0; i < Input.size(); ++i) - { - if (Input[i] == '&') - { - std::string_view Remaining = Input.substr(i); - if (Remaining.starts_with("&")) - { - Out.Append('&'); - i += 4; - } - else if (Remaining.starts_with("<")) - { - Out.Append('<'); - i += 3; - } - else if (Remaining.starts_with(">")) - { - Out.Append('>'); - i += 3; - } - else if (Remaining.starts_with(""")) - { - Out.Append('"'); - i += 5; - } - else if (Remaining.starts_with("'")) - { - Out.Append('\''); - i += 5; - } - else - { - Out.Append(Input[i]); - } - } - else - { - Out.Append(Input[i]); - } - } - } - - /// Convenience: decode XML entities and return as std::string. - std::string DecodeXmlEntities(std::string_view Input) - { - if (Input.find('&') == std::string_view::npos) - { - return std::string(Input); - } - - ExtendableStringBuilder<256> Sb; - DecodeXmlEntities(Input, Sb); - return Sb.ToString(); - } - - /// Join a path and canonical query string into a full request path for the HTTP client. - std::string BuildRequestPath(std::string_view Path, std::string_view CanonicalQS) - { - if (CanonicalQS.empty()) - { - return std::string(Path); - } - return fmt::format("{}?{}", Path, CanonicalQS); - } - - /// Case-insensitive header lookup in an HttpClient response header map. - const std::string* FindResponseHeader(const HttpClient::KeyValueMap& Headers, std::string_view Name) - { - for (const auto& [K, V] : *Headers) - { - if (StrCaseCompare(K, Name) == 0) - { - return &V; - } - } - return nullptr; - } - - /// Extract Code/Message from an S3 XML error body. Returns true if an <Error> element was - /// found, even if Code/Message are empty. - bool ExtractS3Error(std::string_view Body, std::string_view& OutCode, std::string_view& OutMessage) - { - if (Body.find("<Error>") == std::string_view::npos) - { - return false; - } - OutCode = ExtractXmlValue(Body, "Code"); - OutMessage = ExtractXmlValue(Body, "Message"); - return true; - } - - /// True if the response indicates S3 throttling (503 SlowDown / ServiceUnavailable / 429). - /// Code is checked on both the HTTP status and the XML error code so we catch proxies that - /// return 200 with a SlowDown body. - bool IsS3Throttled(const HttpClient::Response& Response, std::string_view ErrorCode) - { - const int Status = static_cast<int>(Response.StatusCode); - if (Status == 503 || Status == 429) - { - return true; - } - if (ErrorCode == "SlowDown" || ErrorCode == "ServiceUnavailable" || ErrorCode == "ThrottlingException" || - ErrorCode == "RequestLimitExceeded" || ErrorCode == "TooManyRequests") - { - return true; - } - return false; - } - - /// Build a human-readable error message for a failed S3 response. When the response body - /// contains an S3 `<Error>` element, the Code and Message fields are included in the string - /// so transient 4xx/5xx failures (SignatureDoesNotMatch, AuthorizationHeaderMalformed, etc.) - /// show up in logs instead of being swallowed. Falls back to the generic HTTP/transport - /// message when no XML body is available (HEAD responses, transport errors). - /// Also emits a distinct `S3 THROTTLED` warning when the response indicates throttling so - /// callers can grep for it without parsing combined error text. - std::string S3ErrorMessage(std::string_view Prefix, const HttpClient::Response& Response) - { - if (!Response.Error.has_value() && Response.ResponsePayload) - { - std::string_view Body(reinterpret_cast<const char*>(Response.ResponsePayload.GetData()), Response.ResponsePayload.GetSize()); - std::string_view Code; - std::string_view Message; - if (ExtractS3Error(Body, Code, Message) && (!Code.empty() || !Message.empty())) - { - ExtendableStringBuilder<256> Decoded; - DecodeXmlEntities(Message, Decoded); - if (IsS3Throttled(Response, Code)) - { - ZEN_WARN("S3 THROTTLED [{}] status={} code='{}' message='{}'", - Prefix, - static_cast<int>(Response.StatusCode), - Code, - Decoded.ToView()); - } - return fmt::format("{}: HTTP status ({}) {} - {}", Prefix, static_cast<int>(Response.StatusCode), Code, Decoded.ToView()); - } - } - if (IsS3Throttled(Response, {})) - { - ZEN_WARN("S3 THROTTLED [{}] status={} (no XML body)", Prefix, static_cast<int>(Response.StatusCode)); - } - return Response.ErrorMessage(Prefix); - } - -} // namespace - std::string_view S3GetObjectResult::NotFoundErrorText = "Not found"; S3Client::S3Client(const S3ClientOptions& Options) : m_Log(logging::Get("s3")) -, m_BucketName(Options.BucketName) -, m_Region(Options.Region) -, m_Endpoint(Options.Endpoint) -, m_PathStyle(Options.PathStyle) +, m_Builder(Options.Region, Options.BucketName, Options.Endpoint, Options.PathStyle) , m_Credentials(Options.Credentials) , m_CredentialProvider(Options.CredentialProvider) -, m_HttpClient(BuildEndpoint(), Options.HttpSettings) +, m_HttpClient(std::string(m_Builder.Endpoint()), Options.HttpSettings) , m_Verbose(Options.HttpSettings.Verbose) { - m_Host = BuildHostHeader(); ZEN_INFO("S3 client configured for bucket '{}' in region '{}' (endpoint: {}, {})", - m_BucketName, - m_Region, + m_Builder.BucketName(), + m_Builder.Region(), m_HttpClient.GetBaseUri(), - m_PathStyle ? "path-style" : "virtual-hosted"); + m_Builder.PathStyle() ? "path-style" : "virtual-hosted"); } S3Client::~S3Client() = default; @@ -235,19 +47,15 @@ S3Client::GetCurrentCredentials() SigV4Credentials Creds = m_CredentialProvider->GetCredentials(); if (!Creds.AccessKeyId.empty()) { - // Invalidate the signing key cache when the access key changes, and update stored - // credentials atomically under the same lock so callers see a consistent snapshot. - RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock); - if (Creds.AccessKeyId != m_Credentials.AccessKeyId) - { - m_CachedDateStamp.clear(); - } + // Update stored credentials atomically so callers see a consistent snapshot. + // The builder's signing-key cache is keyed on (DateStamp, AccessKeyId), so it + // self-invalidates on the next Sign() call when either rotates. + RwLock::ExclusiveLockScope _(m_CredentialsLock); m_Credentials = Creds; - // Return Creds directly - avoids reading m_Credentials after releasing the lock, - // which would race with another concurrent write. return Creds; } // IMDS returned empty credentials; fall back to the last known-good credentials. + RwLock::SharedLockScope _(m_CredentialsLock); return m_Credentials; } return m_Credentials; @@ -261,153 +69,6 @@ S3Client::BuildNoCredentialsError(std::string Context) return Err; } -std::string -S3Client::BuildEndpoint() const -{ - if (!m_Endpoint.empty()) - { - return m_Endpoint; - } - - if (m_PathStyle) - { - // Path-style: https://s3.region.amazonaws.com - return fmt::format("https://s3.{}.amazonaws.com", m_Region); - } - - // Virtual-hosted style: https://bucket.s3.region.amazonaws.com - return fmt::format("https://{}.s3.{}.amazonaws.com", m_BucketName, m_Region); -} - -std::string -S3Client::BuildHostHeader() const -{ - if (!m_Endpoint.empty()) - { - // Extract host from custom endpoint URL (strip scheme) - std::string_view Ep = m_Endpoint; - if (size_t Pos = Ep.find("://"); Pos != std::string_view::npos) - { - Ep = Ep.substr(Pos + 3); - } - // Strip trailing slash - if (!Ep.empty() && Ep.back() == '/') - { - Ep = Ep.substr(0, Ep.size() - 1); - } - return std::string(Ep); - } - - if (m_PathStyle) - { - return fmt::format("s3.{}.amazonaws.com", m_Region); - } - - return fmt::format("{}.s3.{}.amazonaws.com", m_BucketName, m_Region); -} - -std::string -S3Client::KeyToPath(std::string_view Key) const -{ - if (m_PathStyle) - { - return fmt::format("/{}/{}", m_BucketName, Key); - } - return fmt::format("/{}", Key); -} - -std::string -S3Client::BucketRootPath() const -{ - if (m_PathStyle) - { - return fmt::format("/{}/", m_BucketName); - } - return "/"; -} - -Sha256Digest -S3Client::GetSigningKey(std::string_view DateStamp) -{ - // Fast path: shared lock for cache hit (common case - key only changes once per day) - { - RwLock::SharedLockScope SharedLock(m_SigningKeyLock); - if (m_CachedDateStamp == DateStamp) - { - return m_CachedSigningKey; - } - } - - // Slow path: exclusive lock to recompute the signing key - RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock); - - // Double-check after acquiring exclusive lock (another thread may have updated it) - if (m_CachedDateStamp == DateStamp) - { - return m_CachedSigningKey; - } - - std::string SecretPrefix = fmt::format("AWS4{}", m_Credentials.SecretAccessKey); - - Sha256Digest DateKey = ComputeHmacSha256(SecretPrefix.data(), SecretPrefix.size(), DateStamp.data(), DateStamp.size()); - SecureZeroSecret(SecretPrefix.data(), SecretPrefix.size()); - - Sha256Digest RegionKey = ComputeHmacSha256(DateKey, m_Region); - Sha256Digest ServiceKey = ComputeHmacSha256(RegionKey, "s3"); - m_CachedSigningKey = ComputeHmacSha256(ServiceKey, "aws4_request"); - m_CachedDateStamp = std::string(DateStamp); - - return m_CachedSigningKey; -} - -HttpClient::KeyValueMap -S3Client::SignRequest(const SigV4Credentials& Credentials, - std::string_view Method, - std::string_view Path, - std::string_view CanonicalQueryString, - std::string_view PayloadHash, - std::span<const std::pair<std::string, std::string>> ExtraSignedHeaders) -{ - std::string AmzDate = GetAmzTimestamp(); - - // Build sorted headers to sign (must be sorted by lowercase name) - std::vector<std::pair<std::string, std::string>> HeadersToSign; - HeadersToSign.reserve(4 + ExtraSignedHeaders.size()); - HeadersToSign.emplace_back("host", m_Host); - HeadersToSign.emplace_back("x-amz-content-sha256", std::string(PayloadHash)); - HeadersToSign.emplace_back("x-amz-date", AmzDate); - if (!Credentials.SessionToken.empty()) - { - HeadersToSign.emplace_back("x-amz-security-token", Credentials.SessionToken); - } - for (const auto& [K, V] : ExtraSignedHeaders) - { - HeadersToSign.emplace_back(K, V); - } - std::sort(HeadersToSign.begin(), HeadersToSign.end()); - - std::string_view DateStamp(AmzDate.data(), 8); - Sha256Digest SigningKey = GetSigningKey(DateStamp); - - SigV4SignedHeaders Signed = - SignRequestV4(Credentials, Method, Path, CanonicalQueryString, m_Region, "s3", AmzDate, HeadersToSign, PayloadHash, &SigningKey); - - HttpClient::KeyValueMap Result; - Result->emplace("Authorization", std::move(Signed.Authorization)); - Result->emplace("x-amz-date", std::move(Signed.AmzDate)); - Result->emplace("x-amz-content-sha256", std::move(Signed.PayloadHash)); - if (!Credentials.SessionToken.empty()) - { - Result->emplace("x-amz-security-token", Credentials.SessionToken); - } - for (const auto& [K, V] : ExtraSignedHeaders) - { - Result->emplace(K, V); - } - - return Result; -} - S3Result S3Client::PutObject(std::string_view Key, IoBuffer Content) { @@ -417,12 +78,12 @@ S3Client::PutObject(std::string_view Key, IoBuffer Content) return S3Result{std::move(Err)}; } - std::string Path = KeyToPath(Key); + std::string Path = m_Builder.KeyToPath(Key); // Hash the payload std::string PayloadHash = Sha256ToHex(ComputeSha256(Content.GetData(), Content.GetSize())); - HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, "", PayloadHash); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "PUT", Path, "", PayloadHash); HttpClient::Response Response = m_HttpClient.Put(Path, Content, Headers); if (!Response.IsSuccess()) @@ -448,9 +109,9 @@ S3Client::GetObject(std::string_view Key, const std::filesystem::path& TempFileP return S3GetObjectResult{S3Result{std::move(Err)}, {}}; } - std::string Path = KeyToPath(Key); + std::string Path = m_Builder.KeyToPath(Key); - HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", Path, "", EmptyPayloadHash); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "GET", Path, "", S3EmptyPayloadHash); HttpClient::Response Response = m_HttpClient.Download(Path, TempFilePath, Headers); if (!Response.IsSuccess()) @@ -484,9 +145,9 @@ S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t Ran return S3GetObjectResult{S3Result{std::move(Err)}, {}}; } - std::string Path = KeyToPath(Key); + std::string Path = m_Builder.KeyToPath(Key); - HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", Path, "", EmptyPayloadHash); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "GET", Path, "", S3EmptyPayloadHash); Headers->emplace("Range", fmt::format("bytes={}-{}", RangeStart, RangeStart + RangeSize - 1)); HttpClient::Response Response = m_HttpClient.Get(Path, Headers); @@ -537,9 +198,9 @@ S3Client::DeleteObject(std::string_view Key) return S3Result{std::move(Err)}; } - std::string Path = KeyToPath(Key); + std::string Path = m_Builder.KeyToPath(Key); - HttpClient::KeyValueMap Headers = SignRequest(Credentials, "DELETE", Path, "", EmptyPayloadHash); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "DELETE", Path, "", S3EmptyPayloadHash); HttpClient::Response Response = m_HttpClient.Delete(Path, Headers); if (!Response.IsSuccess()) @@ -565,17 +226,17 @@ S3Client::Touch(std::string_view Key) return S3Result{std::move(Err)}; } - std::string Path = KeyToPath(Key); + std::string Path = m_Builder.KeyToPath(Key); // x-amz-copy-source is always "/bucket/key" regardless of addressing style. // Key must be URI-encoded except for '/' separators. When source and destination // are identical, REPLACE is required; COPY is rejected with InvalidRequest. const std::array<std::pair<std::string, std::string>, 2> ExtraSigned{{ - {"x-amz-copy-source", fmt::format("/{}/{}", m_BucketName, AwsUriEncode(Key, false))}, + {"x-amz-copy-source", fmt::format("/{}/{}", m_Builder.BucketName(), AwsUriEncode(Key, false))}, {"x-amz-metadata-directive", "REPLACE"}, }}; - HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, "", EmptyPayloadHash, ExtraSigned); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "PUT", Path, "", S3EmptyPayloadHash, ExtraSigned); HttpClient::Response Response = m_HttpClient.Put(Path, IoBuffer{}, Headers); if (!Response.IsSuccess()) @@ -589,7 +250,7 @@ S3Client::Touch(std::string_view Key) std::string_view ResponseBody = Response.AsText(); std::string_view ErrorCode; std::string_view ErrorMessage; - if (ExtractS3Error(ResponseBody, ErrorCode, ErrorMessage)) + if (S3ExtractError(ResponseBody, ErrorCode, ErrorMessage)) { std::string Err = fmt::format("S3 Touch '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage); ZEN_WARN("{}", Err); @@ -612,9 +273,9 @@ S3Client::HeadObject(std::string_view Key) return S3HeadObjectResult{S3Result{std::move(Err)}, {}, HeadObjectResult::Error}; } - std::string Path = KeyToPath(Key); + std::string Path = m_Builder.KeyToPath(Key); - HttpClient::KeyValueMap Headers = SignRequest(Credentials, "HEAD", Path, "", EmptyPayloadHash); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "HEAD", Path, "", S3EmptyPayloadHash); HttpClient::Response Response = m_HttpClient.Head(Path, Headers); if (!Response.IsSuccess()) @@ -632,17 +293,17 @@ S3Client::HeadObject(std::string_view Key) S3ObjectInfo Info; Info.Key = std::string(Key); - if (const std::string* V = FindResponseHeader(Response.Header, "content-length")) + if (const std::string* V = S3FindResponseHeader(Response.Header, "content-length")) { Info.Size = ParseInt<uint64_t>(*V).value_or(0); } - if (const std::string* V = FindResponseHeader(Response.Header, "etag")) + if (const std::string* V = S3FindResponseHeader(Response.Header, "etag")) { Info.ETag = *V; } - if (const std::string* V = FindResponseHeader(Response.Header, "last-modified")) + if (const std::string* V = S3FindResponseHeader(Response.Header, "last-modified")) { Info.LastModified = *V; } @@ -687,10 +348,10 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys) } std::string CanonicalQS = BuildCanonicalQueryString(std::move(QueryParams)); - std::string RootPath = BucketRootPath(); - HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", RootPath, CanonicalQS, EmptyPayloadHash); + std::string RootPath = m_Builder.BucketRootPath(); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "GET", RootPath, CanonicalQS, S3EmptyPayloadHash); - std::string FullPath = BuildRequestPath(RootPath, CanonicalQS); + std::string FullPath = S3BuildRequestPath(RootPath, CanonicalQS); HttpClient::Response Response = m_HttpClient.Get(FullPath, Headers); if (!Response.IsSuccess()) { @@ -722,11 +383,11 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys) std::string_view ContentsXml = Remaining.substr(ContentsStart, ContentsEnd - ContentsStart + 11); S3ObjectInfo Info; - Info.Key = DecodeXmlEntities(ExtractXmlValue(ContentsXml, "Key")); - Info.ETag = DecodeXmlEntities(ExtractXmlValue(ContentsXml, "ETag")); - Info.LastModified = std::string(ExtractXmlValue(ContentsXml, "LastModified")); + Info.Key = S3DecodeXmlEntities(S3ExtractXmlValue(ContentsXml, "Key")); + Info.ETag = S3DecodeXmlEntities(S3ExtractXmlValue(ContentsXml, "ETag")); + Info.LastModified = std::string(S3ExtractXmlValue(ContentsXml, "LastModified")); - std::string_view SizeStr = ExtractXmlValue(ContentsXml, "Size"); + std::string_view SizeStr = S3ExtractXmlValue(ContentsXml, "Size"); if (!SizeStr.empty()) { Info.Size = ParseInt<uint64_t>(SizeStr).value_or(0); @@ -741,13 +402,13 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys) } // Check if there are more pages - std::string_view IsTruncated = ExtractXmlValue(ResponseBody, "IsTruncated"); + std::string_view IsTruncated = S3ExtractXmlValue(ResponseBody, "IsTruncated"); if (IsTruncated != "true") { break; } - std::string_view NextToken = ExtractXmlValue(ResponseBody, "NextContinuationToken"); + std::string_view NextToken = S3ExtractXmlValue(ResponseBody, "NextContinuationToken"); if (NextToken.empty()) { break; @@ -779,12 +440,12 @@ S3Client::CreateMultipartUpload(std::string_view Key) return S3CreateMultipartUploadResult{S3Result{std::move(Err)}, {}}; } - std::string Path = KeyToPath(Key); + std::string Path = m_Builder.KeyToPath(Key); std::string CanonicalQS = BuildCanonicalQueryString({{"uploads", ""}}); - HttpClient::KeyValueMap Headers = SignRequest(Credentials, "POST", Path, CanonicalQS, EmptyPayloadHash); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "POST", Path, CanonicalQS, S3EmptyPayloadHash); - std::string FullPath = BuildRequestPath(Path, CanonicalQS); + std::string FullPath = S3BuildRequestPath(Path, CanonicalQS); HttpClient::Response Response = m_HttpClient.Post(FullPath, Headers); if (!Response.IsSuccess()) { @@ -800,7 +461,7 @@ S3Client::CreateMultipartUpload(std::string_view Key) // <UploadId>...</UploadId> // </InitiateMultipartUploadResult> std::string_view ResponseBody = Response.AsText(); - std::string_view UploadId = ExtractXmlValue(ResponseBody, "UploadId"); + std::string_view UploadId = S3ExtractXmlValue(ResponseBody, "UploadId"); if (UploadId.empty()) { std::string Err = "failed to parse UploadId from CreateMultipartUpload response"; @@ -824,7 +485,7 @@ S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t P return S3UploadPartResult{S3Result{std::move(Err)}, {}}; } - std::string Path = KeyToPath(Key); + std::string Path = m_Builder.KeyToPath(Key); std::string CanonicalQS = BuildCanonicalQueryString({ {"partNumber", fmt::format("{}", PartNumber)}, {"uploadId", std::string(UploadId)}, @@ -832,9 +493,9 @@ S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t P std::string PayloadHash = Sha256ToHex(ComputeSha256(Content.GetData(), Content.GetSize())); - HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, CanonicalQS, PayloadHash); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "PUT", Path, CanonicalQS, PayloadHash); - std::string FullPath = BuildRequestPath(Path, CanonicalQS); + std::string FullPath = S3BuildRequestPath(Path, CanonicalQS); HttpClient::Response Response = m_HttpClient.Put(FullPath, Content, Headers); if (!Response.IsSuccess()) { @@ -844,7 +505,7 @@ S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t P } // Extract ETag from response headers - const std::string* ETag = FindResponseHeader(Response.Header, "etag"); + const std::string* ETag = S3FindResponseHeader(Response.Header, "etag"); if (!ETag) { std::string Err = "S3 UploadPart response missing ETag header"; @@ -870,7 +531,7 @@ S3Client::CompleteMultipartUpload(std::string_view Key, return S3Result{std::move(Err)}; } - std::string Path = KeyToPath(Key); + std::string Path = m_Builder.KeyToPath(Key); std::string CanonicalQS = BuildCanonicalQueryString({{"uploadId", std::string(UploadId)}}); // Build the CompleteMultipartUpload XML payload @@ -885,12 +546,12 @@ S3Client::CompleteMultipartUpload(std::string_view Key, std::string_view XmlView = XmlBody.ToView(); std::string PayloadHash = Sha256ToHex(ComputeSha256(XmlView)); - HttpClient::KeyValueMap Headers = SignRequest(Credentials, "POST", Path, CanonicalQS, PayloadHash); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "POST", Path, CanonicalQS, PayloadHash); Headers->emplace("Content-Type", "application/xml"); IoBuffer Payload(IoBuffer::Clone, XmlView.data(), XmlView.size()); - std::string FullPath = BuildRequestPath(Path, CanonicalQS); + std::string FullPath = S3BuildRequestPath(Path, CanonicalQS); HttpClient::Response Response = m_HttpClient.Post(FullPath, Payload, Headers); if (!Response.IsSuccess()) { @@ -903,7 +564,7 @@ S3Client::CompleteMultipartUpload(std::string_view Key, std::string_view ResponseBody = Response.AsText(); std::string_view ErrorCode; std::string_view ErrorMessage; - if (ExtractS3Error(ResponseBody, ErrorCode, ErrorMessage)) + if (S3ExtractError(ResponseBody, ErrorCode, ErrorMessage)) { std::string Err = fmt::format("S3 CompleteMultipartUpload '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage); ZEN_WARN("{}", Err); @@ -926,12 +587,12 @@ S3Client::AbortMultipartUpload(std::string_view Key, std::string_view UploadId) return S3Result{std::move(Err)}; } - std::string Path = KeyToPath(Key); + std::string Path = m_Builder.KeyToPath(Key); std::string CanonicalQS = BuildCanonicalQueryString({{"uploadId", std::string(UploadId)}}); - HttpClient::KeyValueMap Headers = SignRequest(Credentials, "DELETE", Path, CanonicalQS, EmptyPayloadHash); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Credentials, "DELETE", Path, CanonicalQS, S3EmptyPayloadHash); - std::string FullPath = BuildRequestPath(Path, CanonicalQS); + std::string FullPath = S3BuildRequestPath(Path, CanonicalQS); HttpClient::Response Response = m_HttpClient.Delete(FullPath, Headers); if (!Response.IsSuccess()) { @@ -968,15 +629,15 @@ S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view M return {}; } - std::string Path = KeyToPath(Key); + std::string Path = m_Builder.KeyToPath(Key); std::string Scheme = "https"; - if (!m_Endpoint.empty() && m_Endpoint.starts_with("http://")) + if (!m_Builder.Endpoint().empty() && m_Builder.Endpoint().starts_with("http://")) { Scheme = "http"; } - return GeneratePresignedUrl(Credentials, Method, Scheme, m_Host, Path, m_Region, "s3", ExpiresIn); + return GeneratePresignedUrl(Credentials, Method, Scheme, m_Builder.Host(), Path, m_Builder.Region(), "s3", ExpiresIn); } S3Result @@ -1004,8 +665,26 @@ S3Client::PutObjectMultipart(std::string_view Key, const std::string& UploadId = InitResult.UploadId; - // Upload parts sequentially - // TODO: upload parts in parallel for improved throughput on large uploads + // Cleanup helper: AbortMultipartUpload itself can throw on transport failure; + // inside the catch (...) below that would replace the original exception with + // a less actionable transport one. Swallow + log. + auto SafeAbort = [this, &Key, &UploadId]() noexcept { + try + { + AbortMultipartUpload(Key, UploadId); + } + catch (const std::exception& Ex) + { + ZEN_WARN("S3 AbortMultipartUpload '{}' threw during cleanup: {}", Key, Ex.what()); + } + catch (...) + { + ZEN_WARN("S3 AbortMultipartUpload '{}' threw during cleanup", Key); + } + }; + + // Sequential upload by design; for parallel multipart use S3AsyncStorage::PutMultipart + // in the hub hydration path. std::vector<std::pair<uint32_t, std::string>> PartETags; uint64_t Offset = 0; @@ -1020,7 +699,7 @@ S3Client::PutObjectMultipart(std::string_view Key, S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, std::move(PartContent)); if (!PartResult) { - AbortMultipartUpload(Key, UploadId); + SafeAbort(); return S3Result{std::move(PartResult.Error)}; } @@ -1032,13 +711,13 @@ S3Client::PutObjectMultipart(std::string_view Key, S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags); if (!CompleteResult) { - AbortMultipartUpload(Key, UploadId); + SafeAbort(); return CompleteResult; } } catch (...) { - AbortMultipartUpload(Key, UploadId); + SafeAbort(); throw; } @@ -1077,25 +756,25 @@ TEST_CASE("s3client.xml_extract") "<Contents><Key>test/file.txt</Key><Size>1234</Size>" "<ETag>\"abc123\"</ETag><LastModified>2024-01-01T00:00:00Z</LastModified></Contents>"; - CHECK(ExtractXmlValue(Xml, "Key") == "test/file.txt"); - CHECK(ExtractXmlValue(Xml, "Size") == "1234"); - CHECK(ExtractXmlValue(Xml, "ETag") == "\"abc123\""); - CHECK(ExtractXmlValue(Xml, "LastModified") == "2024-01-01T00:00:00Z"); - CHECK(ExtractXmlValue(Xml, "NonExistent") == ""); + CHECK(S3ExtractXmlValue(Xml, "Key") == "test/file.txt"); + CHECK(S3ExtractXmlValue(Xml, "Size") == "1234"); + CHECK(S3ExtractXmlValue(Xml, "ETag") == "\"abc123\""); + CHECK(S3ExtractXmlValue(Xml, "LastModified") == "2024-01-01T00:00:00Z"); + CHECK(S3ExtractXmlValue(Xml, "NonExistent") == ""); } TEST_CASE("s3client.xml_entity_decode") { - CHECK(DecodeXmlEntities("no entities") == "no entities"); - CHECK(DecodeXmlEntities("a&b") == "a&b"); - CHECK(DecodeXmlEntities("<tag>") == "<tag>"); - CHECK(DecodeXmlEntities(""hello'") == "\"hello'"); - CHECK(DecodeXmlEntities("&&") == "&&"); - CHECK(DecodeXmlEntities("") == ""); + CHECK(S3DecodeXmlEntities("no entities") == "no entities"); + CHECK(S3DecodeXmlEntities("a&b") == "a&b"); + CHECK(S3DecodeXmlEntities("<tag>") == "<tag>"); + CHECK(S3DecodeXmlEntities(""hello'") == "\"hello'"); + CHECK(S3DecodeXmlEntities("&&") == "&&"); + CHECK(S3DecodeXmlEntities("") == ""); // Key with entities as S3 would return it std::string_view Xml = "<Key>path/file&name<1>.txt</Key>"; - CHECK(DecodeXmlEntities(ExtractXmlValue(Xml, "Key")) == "path/file&name<1>.txt"); + CHECK(S3DecodeXmlEntities(S3ExtractXmlValue(Xml, "Key")) == "path/file&name<1>.txt"); } TEST_CASE("s3client.path_style_addressing") @@ -1129,6 +808,97 @@ TEST_CASE("s3client.virtual_hosted_addressing") CHECK(Client.Region() == "eu-west-1"); } +TEST_CASE("s3requestbuilder.path_style_paths") +{ + S3RequestBuilder Builder("us-east-1", "test-bucket", "http://localhost:9000", /*PathStyle*/ true); + + CHECK(Builder.Region() == "us-east-1"); + CHECK(Builder.BucketName() == "test-bucket"); + CHECK(Builder.PathStyle()); + CHECK(Builder.Endpoint() == "http://localhost:9000"); + CHECK(Builder.Host() == "localhost:9000"); + CHECK(Builder.KeyToPath("foo/bar") == "/test-bucket/foo/bar"); + CHECK(Builder.BucketRootPath() == "/test-bucket/"); +} + +TEST_CASE("s3requestbuilder.virtual_hosted_paths") +{ + S3RequestBuilder Builder("eu-west-1", "my-bucket", /*Endpoint*/ "", /*PathStyle*/ false); + + CHECK(Builder.Endpoint() == "https://my-bucket.s3.eu-west-1.amazonaws.com"); + CHECK(Builder.Host() == "my-bucket.s3.eu-west-1.amazonaws.com"); + CHECK(Builder.KeyToPath("foo/bar") == "/foo/bar"); + CHECK(Builder.BucketRootPath() == "/"); +} + +TEST_CASE("s3requestbuilder.derived_endpoint_path_style") +{ + S3RequestBuilder Builder("us-west-2", "another-bucket", /*Endpoint*/ "", /*PathStyle*/ true); + CHECK(Builder.Endpoint() == "https://s3.us-west-2.amazonaws.com"); + CHECK(Builder.Host() == "s3.us-west-2.amazonaws.com"); +} + +TEST_CASE("s3requestbuilder.sign_request_headers") +{ + S3RequestBuilder Builder("us-east-1", "bucket", "http://localhost:9000", /*PathStyle*/ true); + + SigV4Credentials Creds; + Creds.AccessKeyId = "AKIDEXAMPLE"; + Creds.SecretAccessKey = "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY"; + + HttpClient::KeyValueMap Headers = Builder.SignRequest(Creds, "GET", "/bucket/foo", "", S3EmptyPayloadHash); + + // All four required SigV4 headers present. + CHECK(Headers->find("Authorization") != Headers->end()); + CHECK(Headers->find("x-amz-date") != Headers->end()); + CHECK(Headers->find("x-amz-content-sha256") != Headers->end()); + + // SessionToken absent -> no x-amz-security-token header. + CHECK(Headers->find("x-amz-security-token") == Headers->end()); + + const std::string& Auth = Headers->find("Authorization")->second; + CHECK(Auth.starts_with("AWS4-HMAC-SHA256 ")); + CHECK(Auth.find("Credential=AKIDEXAMPLE/") != std::string::npos); + CHECK(Auth.find("/us-east-1/s3/aws4_request") != std::string::npos); + CHECK(Auth.find("Signature=") != std::string::npos); +} + +TEST_CASE("s3requestbuilder.session_token_emits_security_header") +{ + S3RequestBuilder Builder("us-east-1", "bucket", "http://localhost:9000", true); + + SigV4Credentials Creds; + Creds.AccessKeyId = "ASIA-tmp"; + Creds.SecretAccessKey = "secret"; + Creds.SessionToken = "sts-session-token-value"; + + HttpClient::KeyValueMap Headers = Builder.SignRequest(Creds, "PUT", "/bucket/key", "", S3EmptyPayloadHash); + + auto It = Headers->find("x-amz-security-token"); + REQUIRE(It != Headers->end()); + CHECK(It->second == "sts-session-token-value"); +} + +TEST_CASE("s3requestbuilder.signing_key_cache_invalidates_on_key_rotate") +{ + // Two consecutive Sign() calls with the same date but different AccessKeyId + // must produce different Authorization signatures, proving the cache is keyed + // on (DateStamp, AccessKeyId) and not date alone. + S3RequestBuilder Builder("us-east-1", "bucket", "http://localhost:9000", true); + + SigV4Credentials A; + A.AccessKeyId = "AKIDEXAMPLE"; + A.SecretAccessKey = "secretA"; + HttpClient::KeyValueMap HA = Builder.SignRequest(A, "GET", "/bucket/foo", "", S3EmptyPayloadHash); + + SigV4Credentials B; + B.AccessKeyId = "AKIDEXAMPLE2"; + B.SecretAccessKey = "secretB"; + HttpClient::KeyValueMap HB = Builder.SignRequest(B, "GET", "/bucket/foo", "", S3EmptyPayloadHash); + + CHECK(HA->find("Authorization")->second != HB->find("Authorization")->second); +} + TEST_CASE("s3client.minio_integration") { using namespace std::literals; |