aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/cloud/s3client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/cloud/s3client.cpp')
-rw-r--r--src/zenutil/cloud/s3client.cpp509
1 files changed, 424 insertions, 85 deletions
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp
index 88d844b61..ab80cfcc7 100644
--- a/src/zenutil/cloud/s3client.cpp
+++ b/src/zenutil/cloud/s3client.cpp
@@ -135,8 +135,77 @@ namespace {
return nullptr;
}
+ /// Extract Code/Message from an S3 XML error body. Returns true if an <Error> element was
+ /// found, even if Code/Message are empty.
+ bool ExtractS3Error(std::string_view Body, std::string_view& OutCode, std::string_view& OutMessage)
+ {
+ if (Body.find("<Error>") == std::string_view::npos)
+ {
+ return false;
+ }
+ OutCode = ExtractXmlValue(Body, "Code");
+ OutMessage = ExtractXmlValue(Body, "Message");
+ return true;
+ }
+
+ /// True if the response indicates S3 throttling (503 SlowDown / ServiceUnavailable / 429).
+ /// Code is checked on both the HTTP status and the XML error code so we catch proxies that
+ /// return 200 with a SlowDown body.
+ bool IsS3Throttled(const HttpClient::Response& Response, std::string_view ErrorCode)
+ {
+ const int Status = static_cast<int>(Response.StatusCode);
+ if (Status == 503 || Status == 429)
+ {
+ return true;
+ }
+ if (ErrorCode == "SlowDown" || ErrorCode == "ServiceUnavailable" || ErrorCode == "ThrottlingException" ||
+ ErrorCode == "RequestLimitExceeded" || ErrorCode == "TooManyRequests")
+ {
+ return true;
+ }
+ return false;
+ }
+
+ /// Build a human-readable error message for a failed S3 response. When the response body
+ /// contains an S3 `<Error>` element, the Code and Message fields are included in the string
+ /// so transient 4xx/5xx failures (SignatureDoesNotMatch, AuthorizationHeaderMalformed, etc.)
+ /// show up in logs instead of being swallowed. Falls back to the generic HTTP/transport
+ /// message when no XML body is available (HEAD responses, transport errors).
+ /// Also emits a distinct `S3 THROTTLED` warning when the response indicates throttling so
+ /// callers can grep for it without parsing combined error text.
+ std::string S3ErrorMessage(std::string_view Prefix, const HttpClient::Response& Response)
+ {
+ if (!Response.Error.has_value() && Response.ResponsePayload)
+ {
+ std::string_view Body(reinterpret_cast<const char*>(Response.ResponsePayload.GetData()), Response.ResponsePayload.GetSize());
+ std::string_view Code;
+ std::string_view Message;
+ if (ExtractS3Error(Body, Code, Message) && (!Code.empty() || !Message.empty()))
+ {
+ ExtendableStringBuilder<256> Decoded;
+ DecodeXmlEntities(Message, Decoded);
+ if (IsS3Throttled(Response, Code))
+ {
+ ZEN_WARN("S3 THROTTLED [{}] status={} code='{}' message='{}'",
+ Prefix,
+ static_cast<int>(Response.StatusCode),
+ Code,
+ Decoded.ToView());
+ }
+ return fmt::format("{}: HTTP status ({}) {} - {}", Prefix, static_cast<int>(Response.StatusCode), Code, Decoded.ToView());
+ }
+ }
+ if (IsS3Throttled(Response, {}))
+ {
+ ZEN_WARN("S3 THROTTLED [{}] status={} (no XML body)", Prefix, static_cast<int>(Response.StatusCode));
+ }
+ return Response.ErrorMessage(Prefix);
+ }
+
} // namespace
+std::string_view S3GetObjectResult::NotFoundErrorText = "Not found";
+
S3Client::S3Client(const S3ClientOptions& Options)
: m_Log(logging::Get("s3"))
, m_BucketName(Options.BucketName)
@@ -145,13 +214,8 @@ S3Client::S3Client(const S3ClientOptions& Options)
, m_PathStyle(Options.PathStyle)
, m_Credentials(Options.Credentials)
, m_CredentialProvider(Options.CredentialProvider)
-, m_HttpClient(BuildEndpoint(),
- HttpClientSettings{
- .LogCategory = "s3",
- .ConnectTimeout = Options.ConnectTimeout,
- .Timeout = Options.Timeout,
- .RetryCount = Options.RetryCount,
- })
+, m_HttpClient(BuildEndpoint(), Options.HttpSettings)
+, m_Verbose(Options.HttpSettings.Verbose)
{
m_Host = BuildHostHeader();
ZEN_INFO("S3 client configured for bucket '{}' in region '{}' (endpoint: {}, {})",
@@ -171,20 +235,33 @@ S3Client::GetCurrentCredentials()
SigV4Credentials Creds = m_CredentialProvider->GetCredentials();
if (!Creds.AccessKeyId.empty())
{
- // Invalidate the signing key cache when the access key changes
+ // Invalidate the signing key cache when the access key changes, and update stored
+ // credentials atomically under the same lock so callers see a consistent snapshot.
+ RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock);
if (Creds.AccessKeyId != m_Credentials.AccessKeyId)
{
- RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock);
m_CachedDateStamp.clear();
}
m_Credentials = Creds;
+ // Return Creds directly - avoids reading m_Credentials after releasing the lock,
+ // which would race with another concurrent write.
+ return Creds;
}
+ // IMDS returned empty credentials; fall back to the last known-good credentials.
return m_Credentials;
}
return m_Credentials;
}
std::string
+S3Client::BuildNoCredentialsError(std::string Context)
+{
+ std::string Err = fmt::format("{}: no credentials available", Context);
+ ZEN_WARN("{}", Err);
+ return Err;
+}
+
+std::string
S3Client::BuildEndpoint() const
{
if (!m_Endpoint.empty())
@@ -252,7 +329,7 @@ S3Client::BucketRootPath() const
Sha256Digest
S3Client::GetSigningKey(std::string_view DateStamp)
{
- // Fast path: shared lock for cache hit (common case — key only changes once per day)
+ // Fast path: shared lock for cache hit (common case - key only changes once per day)
{
RwLock::SharedLockScope SharedLock(m_SigningKeyLock);
if (m_CachedDateStamp == DateStamp)
@@ -284,14 +361,18 @@ S3Client::GetSigningKey(std::string_view DateStamp)
}
HttpClient::KeyValueMap
-S3Client::SignRequest(std::string_view Method, std::string_view Path, std::string_view CanonicalQueryString, std::string_view PayloadHash)
+S3Client::SignRequest(const SigV4Credentials& Credentials,
+ std::string_view Method,
+ std::string_view Path,
+ std::string_view CanonicalQueryString,
+ std::string_view PayloadHash,
+ std::span<const std::pair<std::string, std::string>> ExtraSignedHeaders)
{
- SigV4Credentials Credentials = GetCurrentCredentials();
-
std::string AmzDate = GetAmzTimestamp();
// Build sorted headers to sign (must be sorted by lowercase name)
std::vector<std::pair<std::string, std::string>> HeadersToSign;
+ HeadersToSign.reserve(4 + ExtraSignedHeaders.size());
HeadersToSign.emplace_back("host", m_Host);
HeadersToSign.emplace_back("x-amz-content-sha256", std::string(PayloadHash));
HeadersToSign.emplace_back("x-amz-date", AmzDate);
@@ -299,6 +380,10 @@ S3Client::SignRequest(std::string_view Method, std::string_view Path, std::strin
{
HeadersToSign.emplace_back("x-amz-security-token", Credentials.SessionToken);
}
+ for (const auto& [K, V] : ExtraSignedHeaders)
+ {
+ HeadersToSign.emplace_back(K, V);
+ }
std::sort(HeadersToSign.begin(), HeadersToSign.end());
std::string_view DateStamp(AmzDate.data(), 8);
@@ -315,6 +400,10 @@ S3Client::SignRequest(std::string_view Method, std::string_view Path, std::strin
{
Result->emplace("x-amz-security-token", Credentials.SessionToken);
}
+ for (const auto& [K, V] : ExtraSignedHeaders)
+ {
+ Result->emplace(K, V);
+ }
return Result;
}
@@ -322,69 +411,210 @@ S3Client::SignRequest(std::string_view Method, std::string_view Path, std::strin
S3Result
S3Client::PutObject(std::string_view Key, IoBuffer Content)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 PUT '{}' failed", Key); !Err.empty())
+ {
+ return S3Result{std::move(Err)};
+ }
+
std::string Path = KeyToPath(Key);
// Hash the payload
std::string PayloadHash = Sha256ToHex(ComputeSha256(Content.GetData(), Content.GetSize()));
- HttpClient::KeyValueMap Headers = SignRequest("PUT", Path, "", PayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, "", PayloadHash);
HttpClient::Response Response = m_HttpClient.Put(Path, Content, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 PUT failed");
+ std::string Err = S3ErrorMessage("S3 PUT failed", Response);
ZEN_WARN("S3 PUT '{}' failed: {}", Key, Err);
return S3Result{std::move(Err)};
}
- ZEN_DEBUG("S3 PUT '{}' succeeded ({} bytes)", Key, Content.GetSize());
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 PUT '{}' succeeded ({} bytes)", Key, Content.GetSize());
+ }
return {};
}
S3GetObjectResult
-S3Client::GetObject(std::string_view Key)
+S3Client::GetObject(std::string_view Key, const std::filesystem::path& TempFilePath)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 GET '{}' failed", Key); !Err.empty())
+ {
+ return S3GetObjectResult{S3Result{std::move(Err)}, {}};
+ }
+
std::string Path = KeyToPath(Key);
- HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", Path, "", EmptyPayloadHash);
- HttpClient::Response Response = m_HttpClient.Get(Path, Headers);
+ HttpClient::Response Response = m_HttpClient.Download(Path, TempFilePath, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 GET failed");
+ if (Response.StatusCode == HttpResponseCode::NotFound)
+ {
+ return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}};
+ }
+
+ std::string Err = S3ErrorMessage("S3 GET failed", Response);
ZEN_WARN("S3 GET '{}' failed: {}", Key, Err);
return S3GetObjectResult{S3Result{std::move(Err)}, {}};
}
- ZEN_DEBUG("S3 GET '{}' succeeded ({} bytes)", Key, Response.ResponsePayload.GetSize());
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 GET '{}' succeeded ({} bytes)", Key, Response.ResponsePayload.GetSize());
+ }
+ return S3GetObjectResult{{}, std::move(Response.ResponsePayload)};
+}
+
+S3GetObjectResult
+S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t RangeSize)
+{
+ ZEN_ASSERT(RangeSize > 0);
+
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 GET range '{}' [{}-{}] failed", Key, RangeStart, RangeStart + RangeSize - 1);
+ !Err.empty())
+ {
+ return S3GetObjectResult{S3Result{std::move(Err)}, {}};
+ }
+
+ std::string Path = KeyToPath(Key);
+
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", Path, "", EmptyPayloadHash);
+ Headers->emplace("Range", fmt::format("bytes={}-{}", RangeStart, RangeStart + RangeSize - 1));
+
+ HttpClient::Response Response = m_HttpClient.Get(Path, Headers);
+ if (!Response.IsSuccess())
+ {
+ if (Response.StatusCode == HttpResponseCode::NotFound)
+ {
+ return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}};
+ }
+
+ std::string Err = S3ErrorMessage("S3 GET range failed", Response);
+ ZEN_WARN("S3 GET range '{}' [{}-{}] failed: {}", Key, RangeStart, RangeStart + RangeSize - 1, Err);
+ return S3GetObjectResult{S3Result{std::move(Err)}, {}};
+ }
+
+ // Callers are expected to request only ranges that lie within the known object size (e.g.
+ // by calling HeadObject first). Treat a short read as an error rather than silently
+ // returning a truncated buffer - a partial write is more dangerous than a hard failure.
+ if (Response.ResponsePayload.GetSize() != RangeSize)
+ {
+ std::string Err = fmt::format("S3 GET range '{}' [{}-{}] returned {} bytes, expected {}",
+ Key,
+ RangeStart,
+ RangeStart + RangeSize - 1,
+ Response.ResponsePayload.GetSize(),
+ RangeSize);
+ ZEN_WARN("{}", Err);
+ return S3GetObjectResult{S3Result{std::move(Err)}, {}};
+ }
+
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 GET range '{}' [{}-{}] succeeded ({} bytes)",
+ Key,
+ RangeStart,
+ RangeStart + RangeSize - 1,
+ Response.ResponsePayload.GetSize());
+ }
return S3GetObjectResult{{}, std::move(Response.ResponsePayload)};
}
S3Result
S3Client::DeleteObject(std::string_view Key)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 DELETE '{}' failed", Key); !Err.empty())
+ {
+ return S3Result{std::move(Err)};
+ }
+
std::string Path = KeyToPath(Key);
- HttpClient::KeyValueMap Headers = SignRequest("DELETE", Path, "", EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "DELETE", Path, "", EmptyPayloadHash);
HttpClient::Response Response = m_HttpClient.Delete(Path, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 DELETE failed");
+ std::string Err = S3ErrorMessage("S3 DELETE failed", Response);
ZEN_WARN("S3 DELETE '{}' failed: {}", Key, Err);
return S3Result{std::move(Err)};
}
- ZEN_DEBUG("S3 DELETE '{}' succeeded", Key);
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 DELETE '{}' succeeded", Key);
+ }
+ return {};
+}
+
+S3Result
+S3Client::Touch(std::string_view Key)
+{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 Touch '{}' failed", Key); !Err.empty())
+ {
+ return S3Result{std::move(Err)};
+ }
+
+ std::string Path = KeyToPath(Key);
+
+ // x-amz-copy-source is always "/bucket/key" regardless of addressing style.
+ // Key must be URI-encoded except for '/' separators. When source and destination
+ // are identical, REPLACE is required; COPY is rejected with InvalidRequest.
+ const std::array<std::pair<std::string, std::string>, 2> ExtraSigned{{
+ {"x-amz-copy-source", fmt::format("/{}/{}", m_BucketName, AwsUriEncode(Key, false))},
+ {"x-amz-metadata-directive", "REPLACE"},
+ }};
+
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, "", EmptyPayloadHash, ExtraSigned);
+
+ HttpClient::Response Response = m_HttpClient.Put(Path, IoBuffer{}, Headers);
+ if (!Response.IsSuccess())
+ {
+ std::string Err = S3ErrorMessage("S3 Touch failed", Response);
+ ZEN_WARN("S3 Touch '{}' failed: {}", Key, Err);
+ return S3Result{std::move(Err)};
+ }
+
+ // Copy operations can return HTTP 200 with an error in the XML body.
+ std::string_view ResponseBody = Response.AsText();
+ std::string_view ErrorCode;
+ std::string_view ErrorMessage;
+ if (ExtractS3Error(ResponseBody, ErrorCode, ErrorMessage))
+ {
+ std::string Err = fmt::format("S3 Touch '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage);
+ ZEN_WARN("{}", Err);
+ return S3Result{std::move(Err)};
+ }
+
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 Touch '{}' succeeded", Key);
+ }
return {};
}
S3HeadObjectResult
S3Client::HeadObject(std::string_view Key)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 HEAD '{}' failed", Key); !Err.empty())
+ {
+ return S3HeadObjectResult{S3Result{std::move(Err)}, {}, HeadObjectResult::Error};
+ }
+
std::string Path = KeyToPath(Key);
- HttpClient::KeyValueMap Headers = SignRequest("HEAD", Path, "", EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "HEAD", Path, "", EmptyPayloadHash);
HttpClient::Response Response = m_HttpClient.Head(Path, Headers);
if (!Response.IsSuccess())
@@ -394,7 +624,7 @@ S3Client::HeadObject(std::string_view Key)
return S3HeadObjectResult{{}, {}, HeadObjectResult::NotFound};
}
- std::string Err = Response.ErrorMessage("S3 HEAD failed");
+ std::string Err = S3ErrorMessage("S3 HEAD failed", Response);
ZEN_WARN("S3 HEAD '{}' failed: {}", Key, Err);
return S3HeadObjectResult{S3Result{std::move(Err)}, {}, HeadObjectResult::Error};
}
@@ -417,7 +647,10 @@ S3Client::HeadObject(std::string_view Key)
Info.LastModified = *V;
}
- ZEN_DEBUG("S3 HEAD '{}' succeeded (size={})", Key, Info.Size);
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 HEAD '{}' succeeded (size={})", Key, Info.Size);
+ }
return S3HeadObjectResult{{}, std::move(Info), HeadObjectResult::Found};
}
@@ -430,6 +663,13 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys)
for (;;)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 ListObjectsV2 prefix='{}' failed", Prefix); !Err.empty())
+ {
+ Result.Error = std::move(Err);
+ return Result;
+ }
+
// Build query parameters for ListObjectsV2
std::vector<std::pair<std::string, std::string>> QueryParams;
QueryParams.emplace_back("list-type", "2");
@@ -448,13 +688,13 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys)
std::string CanonicalQS = BuildCanonicalQueryString(std::move(QueryParams));
std::string RootPath = BucketRootPath();
- HttpClient::KeyValueMap Headers = SignRequest("GET", RootPath, CanonicalQS, EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "GET", RootPath, CanonicalQS, EmptyPayloadHash);
std::string FullPath = BuildRequestPath(RootPath, CanonicalQS);
HttpClient::Response Response = m_HttpClient.Get(FullPath, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 ListObjectsV2 failed");
+ std::string Err = S3ErrorMessage("S3 ListObjectsV2 failed", Response);
ZEN_WARN("S3 ListObjectsV2 prefix='{}' failed: {}", Prefix, Err);
Result.Error = std::move(Err);
return Result;
@@ -514,10 +754,16 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys)
}
ContinuationToken = std::string(NextToken);
- ZEN_DEBUG("S3 ListObjectsV2 prefix='{}' fetching next page ({} objects so far)", Prefix, Result.Objects.size());
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 ListObjectsV2 prefix='{}' fetching next page ({} objects so far)", Prefix, Result.Objects.size());
+ }
}
- ZEN_DEBUG("S3 ListObjectsV2 prefix='{}' returned {} objects", Prefix, Result.Objects.size());
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 ListObjectsV2 prefix='{}' returned {} objects", Prefix, Result.Objects.size());
+ }
return Result;
}
@@ -527,16 +773,22 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys)
S3CreateMultipartUploadResult
S3Client::CreateMultipartUpload(std::string_view Key)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 CreateMultipartUpload '{}' failed", Key); !Err.empty())
+ {
+ return S3CreateMultipartUploadResult{S3Result{std::move(Err)}, {}};
+ }
+
std::string Path = KeyToPath(Key);
std::string CanonicalQS = BuildCanonicalQueryString({{"uploads", ""}});
- HttpClient::KeyValueMap Headers = SignRequest("POST", Path, CanonicalQS, EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "POST", Path, CanonicalQS, EmptyPayloadHash);
std::string FullPath = BuildRequestPath(Path, CanonicalQS);
HttpClient::Response Response = m_HttpClient.Post(FullPath, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 CreateMultipartUpload failed");
+ std::string Err = S3ErrorMessage("S3 CreateMultipartUpload failed", Response);
ZEN_WARN("S3 CreateMultipartUpload '{}' failed: {}", Key, Err);
return S3CreateMultipartUploadResult{S3Result{std::move(Err)}, {}};
}
@@ -556,13 +808,22 @@ S3Client::CreateMultipartUpload(std::string_view Key)
return S3CreateMultipartUploadResult{S3Result{std::move(Err)}, {}};
}
- ZEN_DEBUG("S3 CreateMultipartUpload '{}' succeeded (uploadId={})", Key, UploadId);
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 CreateMultipartUpload '{}' succeeded (uploadId={})", Key, UploadId);
+ }
return S3CreateMultipartUploadResult{{}, std::string(UploadId)};
}
S3UploadPartResult
S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t PartNumber, IoBuffer Content)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 UploadPart '{}' part {} failed", Key, PartNumber); !Err.empty())
+ {
+ return S3UploadPartResult{S3Result{std::move(Err)}, {}};
+ }
+
std::string Path = KeyToPath(Key);
std::string CanonicalQS = BuildCanonicalQueryString({
{"partNumber", fmt::format("{}", PartNumber)},
@@ -571,13 +832,13 @@ S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t P
std::string PayloadHash = Sha256ToHex(ComputeSha256(Content.GetData(), Content.GetSize()));
- HttpClient::KeyValueMap Headers = SignRequest("PUT", Path, CanonicalQS, PayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "PUT", Path, CanonicalQS, PayloadHash);
std::string FullPath = BuildRequestPath(Path, CanonicalQS);
HttpClient::Response Response = m_HttpClient.Put(FullPath, Content, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage(fmt::format("S3 UploadPart {} failed", PartNumber));
+ std::string Err = S3ErrorMessage(fmt::format("S3 UploadPart {} failed", PartNumber), Response);
ZEN_WARN("S3 UploadPart '{}' part {} failed: {}", Key, PartNumber, Err);
return S3UploadPartResult{S3Result{std::move(Err)}, {}};
}
@@ -591,7 +852,10 @@ S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t P
return S3UploadPartResult{S3Result{std::move(Err)}, {}};
}
- ZEN_DEBUG("S3 UploadPart '{}' part {} succeeded ({} bytes, etag={})", Key, PartNumber, Content.GetSize(), *ETag);
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 UploadPart '{}' part {} succeeded ({} bytes, etag={})", Key, PartNumber, Content.GetSize(), *ETag);
+ }
return S3UploadPartResult{{}, *ETag};
}
@@ -600,6 +864,12 @@ S3Client::CompleteMultipartUpload(std::string_view Key,
std::string_view UploadId,
const std::vector<std::pair<uint32_t, std::string>>& PartETags)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 CompleteMultipartUpload '{}' failed", Key); !Err.empty())
+ {
+ return S3Result{std::move(Err)};
+ }
+
std::string Path = KeyToPath(Key);
std::string CanonicalQS = BuildCanonicalQueryString({{"uploadId", std::string(UploadId)}});
@@ -615,7 +885,7 @@ S3Client::CompleteMultipartUpload(std::string_view Key,
std::string_view XmlView = XmlBody.ToView();
std::string PayloadHash = Sha256ToHex(ComputeSha256(XmlView));
- HttpClient::KeyValueMap Headers = SignRequest("POST", Path, CanonicalQS, PayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "POST", Path, CanonicalQS, PayloadHash);
Headers->emplace("Content-Type", "application/xml");
IoBuffer Payload(IoBuffer::Clone, XmlView.data(), XmlView.size());
@@ -624,44 +894,56 @@ S3Client::CompleteMultipartUpload(std::string_view Key,
HttpClient::Response Response = m_HttpClient.Post(FullPath, Payload, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 CompleteMultipartUpload failed");
+ std::string Err = S3ErrorMessage("S3 CompleteMultipartUpload failed", Response);
ZEN_WARN("S3 CompleteMultipartUpload '{}' failed: {}", Key, Err);
return S3Result{std::move(Err)};
}
// Check for error in response body - S3 can return 200 with an error in the XML body
std::string_view ResponseBody = Response.AsText();
- if (ResponseBody.find("<Error>") != std::string_view::npos)
+ std::string_view ErrorCode;
+ std::string_view ErrorMessage;
+ if (ExtractS3Error(ResponseBody, ErrorCode, ErrorMessage))
{
- std::string_view ErrorCode = ExtractXmlValue(ResponseBody, "Code");
- std::string_view ErrorMessage = ExtractXmlValue(ResponseBody, "Message");
- std::string Err = fmt::format("S3 CompleteMultipartUpload '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage);
+ std::string Err = fmt::format("S3 CompleteMultipartUpload '{}' returned error: {} - {}", Key, ErrorCode, ErrorMessage);
ZEN_WARN("{}", Err);
return S3Result{std::move(Err)};
}
- ZEN_DEBUG("S3 CompleteMultipartUpload '{}' succeeded ({} parts)", Key, PartETags.size());
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 CompleteMultipartUpload '{}' succeeded ({} parts)", Key, PartETags.size());
+ }
return {};
}
S3Result
S3Client::AbortMultipartUpload(std::string_view Key, std::string_view UploadId)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 AbortMultipartUpload '{}' failed", Key); !Err.empty())
+ {
+ return S3Result{std::move(Err)};
+ }
+
std::string Path = KeyToPath(Key);
std::string CanonicalQS = BuildCanonicalQueryString({{"uploadId", std::string(UploadId)}});
- HttpClient::KeyValueMap Headers = SignRequest("DELETE", Path, CanonicalQS, EmptyPayloadHash);
+ HttpClient::KeyValueMap Headers = SignRequest(Credentials, "DELETE", Path, CanonicalQS, EmptyPayloadHash);
std::string FullPath = BuildRequestPath(Path, CanonicalQS);
HttpClient::Response Response = m_HttpClient.Delete(FullPath, Headers);
if (!Response.IsSuccess())
{
- std::string Err = Response.ErrorMessage("S3 AbortMultipartUpload failed");
+ std::string Err = S3ErrorMessage("S3 AbortMultipartUpload failed", Response);
ZEN_WARN("S3 AbortMultipartUpload '{}' failed: {}", Key, Err);
return S3Result{std::move(Err)};
}
- ZEN_DEBUG("S3 AbortMultipartUpload '{}' succeeded (uploadId={})", Key, UploadId);
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 AbortMultipartUpload '{}' succeeded (uploadId={})", Key, UploadId);
+ }
return {};
}
@@ -680,6 +962,12 @@ S3Client::GeneratePresignedPutUrl(std::string_view Key, std::chrono::seconds Exp
std::string
S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view Method, std::chrono::seconds ExpiresIn)
{
+ SigV4Credentials Credentials;
+ if (std::string Err = RequireCredentials(Credentials, "S3 GeneratePresignedUrl '{}' {} failed", Key, Method); !Err.empty())
+ {
+ return {};
+ }
+
std::string Path = KeyToPath(Key);
std::string Scheme = "https";
@@ -688,24 +976,25 @@ S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view M
Scheme = "http";
}
- SigV4Credentials Credentials = GetCurrentCredentials();
return GeneratePresignedUrl(Credentials, Method, Scheme, m_Host, Path, m_Region, "s3", ExpiresIn);
}
S3Result
-S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize)
+S3Client::PutObjectMultipart(std::string_view Key,
+ uint64_t TotalSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)> FetchRange,
+ uint64_t PartSize)
{
- const uint64_t ContentSize = Content.GetSize();
-
// If the content fits in a single part, just use PutObject
- if (ContentSize <= PartSize)
+ if (TotalSize <= PartSize)
{
- return PutObject(Key, Content);
+ return PutObject(Key, TotalSize > 0 ? FetchRange(0, TotalSize) : IoBuffer{});
}
- ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, ContentSize, (ContentSize + PartSize - 1) / PartSize);
-
- // Initiate multipart upload
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize);
+ }
S3CreateMultipartUploadResult InitResult = CreateMultipartUpload(Key);
if (!InitResult)
@@ -722,38 +1011,54 @@ S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t Pa
uint64_t Offset = 0;
uint32_t PartNumber = 1;
- while (Offset < ContentSize)
+ try
{
- uint64_t ThisPartSize = std::min(PartSize, ContentSize - Offset);
+ while (Offset < TotalSize)
+ {
+ uint64_t ThisPartSize = std::min(PartSize, TotalSize - Offset);
+ IoBuffer PartContent = FetchRange(Offset, ThisPartSize);
+ S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, std::move(PartContent));
+ if (!PartResult)
+ {
+ AbortMultipartUpload(Key, UploadId);
+ return S3Result{std::move(PartResult.Error)};
+ }
- // Create a sub-buffer referencing the part data within the original content
- IoBuffer PartContent(Content, Offset, ThisPartSize);
+ PartETags.emplace_back(PartNumber, std::move(PartResult.ETag));
+ Offset += ThisPartSize;
+ PartNumber++;
+ }
- S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, PartContent);
- if (!PartResult)
+ S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags);
+ if (!CompleteResult)
{
- // Attempt to abort the multipart upload on failure
AbortMultipartUpload(Key, UploadId);
- return S3Result{std::move(PartResult.Error)};
+ return CompleteResult;
}
-
- PartETags.emplace_back(PartNumber, std::move(PartResult.ETag));
- Offset += ThisPartSize;
- PartNumber++;
}
-
- // Complete multipart upload
- S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags);
- if (!CompleteResult)
+ catch (...)
{
AbortMultipartUpload(Key, UploadId);
- return CompleteResult;
+ throw;
}
- ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), ContentSize);
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize);
+ }
return {};
}
+S3Result
+S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize)
+{
+ return PutObjectMultipart(
+ Key,
+ Content.GetSize(),
+ [&Content](uint64_t Offset, uint64_t Size) { return IoBuffer(Content, Offset, Size); },
+ PartSize);
+}
+
//////////////////////////////////////////////////////////////////////////
// Tests
@@ -828,7 +1133,10 @@ TEST_CASE("s3client.minio_integration")
{
using namespace std::literals;
- // Spawn a local MinIO server
+ // Spawn a single MinIO server for the entire test case. Previously each SUBCASE re-entered
+ // the TEST_CASE from the top, spawning and killing MinIO per subcase - slow and flaky on
+ // macOS CI. Sequential sections avoid the re-entry while still sharing one MinIO instance
+ // that is torn down via RAII at scope exit.
MinioProcessOptions MinioOpts;
MinioOpts.Port = 19000;
MinioOpts.RootUser = "testuser";
@@ -836,11 +1144,8 @@ TEST_CASE("s3client.minio_integration")
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
-
- // Pre-create the test bucket (creates a subdirectory in MinIO's data dir)
Minio.CreateBucket("integration-test");
- // Configure S3Client for the test bucket
S3ClientOptions Opts;
Opts.BucketName = "integration-test";
Opts.Region = "us-east-1";
@@ -851,7 +1156,7 @@ TEST_CASE("s3client.minio_integration")
S3Client Client(Opts);
- SUBCASE("put_get_delete")
+ // -- put_get_delete -------------------------------------------------------
{
// PUT
std::string_view TestData = "hello, minio integration test!"sv;
@@ -880,14 +1185,50 @@ TEST_CASE("s3client.minio_integration")
CHECK(HeadRes2.Status == HeadObjectResult::NotFound);
}
- SUBCASE("head_not_found")
+ // -- touch ----------------------------------------------------------------
+ {
+ std::string_view TestData = "touch-me"sv;
+ IoBuffer Content = IoBufferBuilder::MakeFromMemory(MakeMemoryView(TestData));
+ S3Result PutRes = Client.PutObject("touch/obj.txt", std::move(Content));
+ REQUIRE(PutRes.IsSuccess());
+
+ S3HeadObjectResult Before = Client.HeadObject("touch/obj.txt");
+ REQUIRE(Before.IsSuccess());
+ REQUIRE(Before.Status == HeadObjectResult::Found);
+
+ // S3 LastModified has second precision; sleep past the second boundary so
+ // the touched timestamp is strictly greater.
+ Sleep(1100);
+
+ S3Result TouchRes = Client.Touch("touch/obj.txt");
+ REQUIRE(TouchRes.IsSuccess());
+
+ S3HeadObjectResult After = Client.HeadObject("touch/obj.txt");
+ REQUIRE(After.IsSuccess());
+ REQUIRE(After.Status == HeadObjectResult::Found);
+ CHECK(After.Info.Size == Before.Info.Size);
+ CHECK(After.Info.LastModified != Before.Info.LastModified);
+
+ // Content must be unchanged by a self-copy.
+ S3GetObjectResult GetRes = Client.GetObject("touch/obj.txt");
+ REQUIRE(GetRes.IsSuccess());
+ CHECK(GetRes.AsText() == TestData);
+
+ // Touching a missing key must fail.
+ S3Result MissRes = Client.Touch("touch/does-not-exist.txt");
+ CHECK_FALSE(MissRes.IsSuccess());
+
+ Client.DeleteObject("touch/obj.txt");
+ }
+
+ // -- head_not_found -------------------------------------------------------
{
S3HeadObjectResult Res = Client.HeadObject("nonexistent/key.dat");
CHECK(Res.IsSuccess());
CHECK(Res.Status == HeadObjectResult::NotFound);
}
- SUBCASE("list_objects")
+ // -- list_objects ---------------------------------------------------------
{
// Upload several objects with a common prefix
for (int i = 0; i < 3; ++i)
@@ -922,7 +1263,7 @@ TEST_CASE("s3client.minio_integration")
}
}
- SUBCASE("multipart_upload")
+ // -- multipart_upload -----------------------------------------------------
{
// Create a payload large enough to exercise multipart (use minimum part size)
constexpr uint64_t PartSize = 5 * 1024 * 1024; // 5 MB minimum
@@ -949,7 +1290,7 @@ TEST_CASE("s3client.minio_integration")
Client.DeleteObject("multipart/large.bin");
}
- SUBCASE("presigned_urls")
+ // -- presigned_urls -------------------------------------------------------
{
// Upload an object
std::string_view TestData = "presigned-url-test-data"sv;
@@ -975,8 +1316,6 @@ TEST_CASE("s3client.minio_integration")
// Cleanup
Client.DeleteObject("presigned/test.txt");
}
-
- Minio.StopMinioServer();
}
TEST_SUITE_END();