aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/cloud/s3client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/cloud/s3client.cpp')
-rw-r--r--src/zenutil/cloud/s3client.cpp159
1 files changed, 110 insertions, 49 deletions
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp
index 88d844b61..d9fde05d9 100644
--- a/src/zenutil/cloud/s3client.cpp
+++ b/src/zenutil/cloud/s3client.cpp
@@ -137,6 +137,8 @@ namespace {
} // namespace
+std::string_view S3GetObjectResult::NotFoundErrorText = "Not found";
+
S3Client::S3Client(const S3ClientOptions& Options)
: m_Log(logging::Get("s3"))
, m_BucketName(Options.BucketName)
@@ -145,13 +147,7 @@ S3Client::S3Client(const S3ClientOptions& Options)
, m_PathStyle(Options.PathStyle)
, m_Credentials(Options.Credentials)
, m_CredentialProvider(Options.CredentialProvider)
-, m_HttpClient(BuildEndpoint(),
- HttpClientSettings{
- .LogCategory = "s3",
- .ConnectTimeout = Options.ConnectTimeout,
- .Timeout = Options.Timeout,
- .RetryCount = Options.RetryCount,
- })
+, m_HttpClient(BuildEndpoint(), Options.HttpSettings)
{
m_Host = BuildHostHeader();
ZEN_INFO("S3 client configured for bucket '{}' in region '{}' (endpoint: {}, {})",
@@ -171,14 +167,19 @@ S3Client::GetCurrentCredentials()
SigV4Credentials Creds = m_CredentialProvider->GetCredentials();
if (!Creds.AccessKeyId.empty())
{
- // Invalidate the signing key cache when the access key changes
+ // Invalidate the signing key cache when the access key changes, and update stored
+ // credentials atomically under the same lock so callers see a consistent snapshot.
+ RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock);
if (Creds.AccessKeyId != m_Credentials.AccessKeyId)
{
- RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock);
m_CachedDateStamp.clear();
}
m_Credentials = Creds;
+ // Return Creds directly - avoids reading m_Credentials after releasing the lock,
+ // which would race with another concurrent write.
+ return Creds;
}
+ // IMDS returned empty credentials; fall back to the last known-good credentials.
return m_Credentials;
}
return m_Credentials;
@@ -252,7 +253,7 @@ S3Client::BucketRootPath() const
Sha256Digest
S3Client::GetSigningKey(std::string_view DateStamp)
{
- // Fast path: shared lock for cache hit (common case — key only changes once per day)
+ // Fast path: shared lock for cache hit (common case - key only changes once per day)
{
RwLock::SharedLockScope SharedLock(m_SigningKeyLock);
if (m_CachedDateStamp == DateStamp)
@@ -342,15 +343,20 @@ S3Client::PutObject(std::string_view Key, IoBuffer Content)
}
S3GetObjectResult
-S3Client::GetObject(std::string_view Key)
+S3Client::GetObject(std::string_view Key, const std::filesystem::path& TempFilePath)
{
std::string Path = KeyToPath(Key);
HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash);
- HttpClient::Response Response = m_HttpClient.Get(Path, Headers);
+ HttpClient::Response Response = m_HttpClient.Download(Path, TempFilePath, Headers);
if (!Response.IsSuccess())
{
+ if (Response.StatusCode == HttpResponseCode::NotFound)
+ {
+ return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}};
+ }
+
std::string Err = Response.ErrorMessage("S3 GET failed");
ZEN_WARN("S3 GET '{}' failed: {}", Key, Err);
return S3GetObjectResult{S3Result{std::move(Err)}, {}};
@@ -360,6 +366,51 @@ S3Client::GetObject(std::string_view Key)
return S3GetObjectResult{{}, std::move(Response.ResponsePayload)};
}
+S3GetObjectResult
+S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t RangeSize)
+{
+ ZEN_ASSERT(RangeSize > 0);
+ std::string Path = KeyToPath(Key);
+
+ HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash);
+ Headers->emplace("Range", fmt::format("bytes={}-{}", RangeStart, RangeStart + RangeSize - 1));
+
+ HttpClient::Response Response = m_HttpClient.Get(Path, Headers);
+ if (!Response.IsSuccess())
+ {
+ if (Response.StatusCode == HttpResponseCode::NotFound)
+ {
+ return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}};
+ }
+
+ std::string Err = Response.ErrorMessage("S3 GET range failed");
+ ZEN_WARN("S3 GET range '{}' [{}-{}] failed: {}", Key, RangeStart, RangeStart + RangeSize - 1, Err);
+ return S3GetObjectResult{S3Result{std::move(Err)}, {}};
+ }
+
+ // Callers are expected to request only ranges that lie within the known object size (e.g.
+ // by calling HeadObject first). Treat a short read as an error rather than silently
+ // returning a truncated buffer - a partial write is more dangerous than a hard failure.
+ if (Response.ResponsePayload.GetSize() != RangeSize)
+ {
+ std::string Err = fmt::format("S3 GET range '{}' [{}-{}] returned {} bytes, expected {}",
+ Key,
+ RangeStart,
+ RangeStart + RangeSize - 1,
+ Response.ResponsePayload.GetSize(),
+ RangeSize);
+ ZEN_WARN("{}", Err);
+ return S3GetObjectResult{S3Result{std::move(Err)}, {}};
+ }
+
+ ZEN_DEBUG("S3 GET range '{}' [{}-{}] succeeded ({} bytes)",
+ Key,
+ RangeStart,
+ RangeStart + RangeSize - 1,
+ Response.ResponsePayload.GetSize());
+ return S3GetObjectResult{{}, std::move(Response.ResponsePayload)};
+}
+
S3Result
S3Client::DeleteObject(std::string_view Key)
{
@@ -693,19 +744,18 @@ S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view M
}
S3Result
-S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize)
+S3Client::PutObjectMultipart(std::string_view Key,
+ uint64_t TotalSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)> FetchRange,
+ uint64_t PartSize)
{
- const uint64_t ContentSize = Content.GetSize();
-
// If the content fits in a single part, just use PutObject
- if (ContentSize <= PartSize)
+ if (TotalSize <= PartSize)
{
- return PutObject(Key, Content);
+ return PutObject(Key, TotalSize > 0 ? FetchRange(0, TotalSize) : IoBuffer{});
}
- ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, ContentSize, (ContentSize + PartSize - 1) / PartSize);
-
- // Initiate multipart upload
+ ZEN_DEBUG("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize);
S3CreateMultipartUploadResult InitResult = CreateMultipartUpload(Key);
if (!InitResult)
@@ -722,38 +772,51 @@ S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t Pa
uint64_t Offset = 0;
uint32_t PartNumber = 1;
- while (Offset < ContentSize)
+ try
{
- uint64_t ThisPartSize = std::min(PartSize, ContentSize - Offset);
+ while (Offset < TotalSize)
+ {
+ uint64_t ThisPartSize = std::min(PartSize, TotalSize - Offset);
+ IoBuffer PartContent = FetchRange(Offset, ThisPartSize);
+ S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, std::move(PartContent));
+ if (!PartResult)
+ {
+ AbortMultipartUpload(Key, UploadId);
+ return S3Result{std::move(PartResult.Error)};
+ }
- // Create a sub-buffer referencing the part data within the original content
- IoBuffer PartContent(Content, Offset, ThisPartSize);
+ PartETags.emplace_back(PartNumber, std::move(PartResult.ETag));
+ Offset += ThisPartSize;
+ PartNumber++;
+ }
- S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, PartContent);
- if (!PartResult)
+ S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags);
+ if (!CompleteResult)
{
- // Attempt to abort the multipart upload on failure
AbortMultipartUpload(Key, UploadId);
- return S3Result{std::move(PartResult.Error)};
+ return CompleteResult;
}
-
- PartETags.emplace_back(PartNumber, std::move(PartResult.ETag));
- Offset += ThisPartSize;
- PartNumber++;
}
-
- // Complete multipart upload
- S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags);
- if (!CompleteResult)
+ catch (...)
{
AbortMultipartUpload(Key, UploadId);
- return CompleteResult;
+ throw;
}
- ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), ContentSize);
+ ZEN_DEBUG("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize);
return {};
}
+S3Result
+S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize)
+{
+ return PutObjectMultipart(
+ Key,
+ Content.GetSize(),
+ [&Content](uint64_t Offset, uint64_t Size) { return IoBuffer(Content, Offset, Size); },
+ PartSize);
+}
+
//////////////////////////////////////////////////////////////////////////
// Tests
@@ -828,7 +891,10 @@ TEST_CASE("s3client.minio_integration")
{
using namespace std::literals;
- // Spawn a local MinIO server
+ // Spawn a single MinIO server for the entire test case. Previously each SUBCASE re-entered
+ // the TEST_CASE from the top, spawning and killing MinIO per subcase — slow and flaky on
+ // macOS CI. Sequential sections avoid the re-entry while still sharing one MinIO instance
+ // that is torn down via RAII at scope exit.
MinioProcessOptions MinioOpts;
MinioOpts.Port = 19000;
MinioOpts.RootUser = "testuser";
@@ -836,11 +902,8 @@ TEST_CASE("s3client.minio_integration")
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
-
- // Pre-create the test bucket (creates a subdirectory in MinIO's data dir)
Minio.CreateBucket("integration-test");
- // Configure S3Client for the test bucket
S3ClientOptions Opts;
Opts.BucketName = "integration-test";
Opts.Region = "us-east-1";
@@ -851,7 +914,7 @@ TEST_CASE("s3client.minio_integration")
S3Client Client(Opts);
- SUBCASE("put_get_delete")
+ // -- put_get_delete -------------------------------------------------------
{
// PUT
std::string_view TestData = "hello, minio integration test!"sv;
@@ -880,14 +943,14 @@ TEST_CASE("s3client.minio_integration")
CHECK(HeadRes2.Status == HeadObjectResult::NotFound);
}
- SUBCASE("head_not_found")
+ // -- head_not_found -------------------------------------------------------
{
S3HeadObjectResult Res = Client.HeadObject("nonexistent/key.dat");
CHECK(Res.IsSuccess());
CHECK(Res.Status == HeadObjectResult::NotFound);
}
- SUBCASE("list_objects")
+ // -- list_objects ---------------------------------------------------------
{
// Upload several objects with a common prefix
for (int i = 0; i < 3; ++i)
@@ -922,7 +985,7 @@ TEST_CASE("s3client.minio_integration")
}
}
- SUBCASE("multipart_upload")
+ // -- multipart_upload -----------------------------------------------------
{
// Create a payload large enough to exercise multipart (use minimum part size)
constexpr uint64_t PartSize = 5 * 1024 * 1024; // 5 MB minimum
@@ -949,7 +1012,7 @@ TEST_CASE("s3client.minio_integration")
Client.DeleteObject("multipart/large.bin");
}
- SUBCASE("presigned_urls")
+ // -- presigned_urls -------------------------------------------------------
{
// Upload an object
std::string_view TestData = "presigned-url-test-data"sv;
@@ -975,8 +1038,6 @@ TEST_CASE("s3client.minio_integration")
// Cleanup
Client.DeleteObject("presigned/test.txt");
}
-
- Minio.StopMinioServer();
}
TEST_SUITE_END();