diff options
Diffstat (limited to 'src/zenutil/cloud/s3client.cpp')
| -rw-r--r-- | src/zenutil/cloud/s3client.cpp | 159 |
1 files changed, 110 insertions, 49 deletions
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp index 88d844b61..d9fde05d9 100644 --- a/src/zenutil/cloud/s3client.cpp +++ b/src/zenutil/cloud/s3client.cpp @@ -137,6 +137,8 @@ namespace { } // namespace +std::string_view S3GetObjectResult::NotFoundErrorText = "Not found"; + S3Client::S3Client(const S3ClientOptions& Options) : m_Log(logging::Get("s3")) , m_BucketName(Options.BucketName) @@ -145,13 +147,7 @@ S3Client::S3Client(const S3ClientOptions& Options) , m_PathStyle(Options.PathStyle) , m_Credentials(Options.Credentials) , m_CredentialProvider(Options.CredentialProvider) -, m_HttpClient(BuildEndpoint(), - HttpClientSettings{ - .LogCategory = "s3", - .ConnectTimeout = Options.ConnectTimeout, - .Timeout = Options.Timeout, - .RetryCount = Options.RetryCount, - }) +, m_HttpClient(BuildEndpoint(), Options.HttpSettings) { m_Host = BuildHostHeader(); ZEN_INFO("S3 client configured for bucket '{}' in region '{}' (endpoint: {}, {})", @@ -171,14 +167,19 @@ S3Client::GetCurrentCredentials() SigV4Credentials Creds = m_CredentialProvider->GetCredentials(); if (!Creds.AccessKeyId.empty()) { - // Invalidate the signing key cache when the access key changes + // Invalidate the signing key cache when the access key changes, and update stored + // credentials atomically under the same lock so callers see a consistent snapshot. + RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock); if (Creds.AccessKeyId != m_Credentials.AccessKeyId) { - RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock); m_CachedDateStamp.clear(); } m_Credentials = Creds; + // Return Creds directly - avoids reading m_Credentials after releasing the lock, + // which would race with another concurrent write. + return Creds; } + // IMDS returned empty credentials; fall back to the last known-good credentials. return m_Credentials; } return m_Credentials; @@ -252,7 +253,7 @@ S3Client::BucketRootPath() const Sha256Digest S3Client::GetSigningKey(std::string_view DateStamp) { - // Fast path: shared lock for cache hit (common case — key only changes once per day) + // Fast path: shared lock for cache hit (common case - key only changes once per day) { RwLock::SharedLockScope SharedLock(m_SigningKeyLock); if (m_CachedDateStamp == DateStamp) @@ -342,15 +343,20 @@ S3Client::PutObject(std::string_view Key, IoBuffer Content) } S3GetObjectResult -S3Client::GetObject(std::string_view Key) +S3Client::GetObject(std::string_view Key, const std::filesystem::path& TempFilePath) { std::string Path = KeyToPath(Key); HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash); - HttpClient::Response Response = m_HttpClient.Get(Path, Headers); + HttpClient::Response Response = m_HttpClient.Download(Path, TempFilePath, Headers); if (!Response.IsSuccess()) { + if (Response.StatusCode == HttpResponseCode::NotFound) + { + return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}}; + } + std::string Err = Response.ErrorMessage("S3 GET failed"); ZEN_WARN("S3 GET '{}' failed: {}", Key, Err); return S3GetObjectResult{S3Result{std::move(Err)}, {}}; @@ -360,6 +366,51 @@ S3Client::GetObject(std::string_view Key) return S3GetObjectResult{{}, std::move(Response.ResponsePayload)}; } +S3GetObjectResult +S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t RangeSize) +{ + ZEN_ASSERT(RangeSize > 0); + std::string Path = KeyToPath(Key); + + HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash); + Headers->emplace("Range", fmt::format("bytes={}-{}", RangeStart, RangeStart + RangeSize - 1)); + + HttpClient::Response Response = m_HttpClient.Get(Path, Headers); + if (!Response.IsSuccess()) + { + if (Response.StatusCode == HttpResponseCode::NotFound) + { + return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}}; + } + + std::string Err = Response.ErrorMessage("S3 GET range failed"); + ZEN_WARN("S3 GET range '{}' [{}-{}] failed: {}", Key, RangeStart, RangeStart + RangeSize - 1, Err); + return S3GetObjectResult{S3Result{std::move(Err)}, {}}; + } + + // Callers are expected to request only ranges that lie within the known object size (e.g. + // by calling HeadObject first). Treat a short read as an error rather than silently + // returning a truncated buffer - a partial write is more dangerous than a hard failure. + if (Response.ResponsePayload.GetSize() != RangeSize) + { + std::string Err = fmt::format("S3 GET range '{}' [{}-{}] returned {} bytes, expected {}", + Key, + RangeStart, + RangeStart + RangeSize - 1, + Response.ResponsePayload.GetSize(), + RangeSize); + ZEN_WARN("{}", Err); + return S3GetObjectResult{S3Result{std::move(Err)}, {}}; + } + + ZEN_DEBUG("S3 GET range '{}' [{}-{}] succeeded ({} bytes)", + Key, + RangeStart, + RangeStart + RangeSize - 1, + Response.ResponsePayload.GetSize()); + return S3GetObjectResult{{}, std::move(Response.ResponsePayload)}; +} + S3Result S3Client::DeleteObject(std::string_view Key) { @@ -693,19 +744,18 @@ S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view M } S3Result -S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize) +S3Client::PutObjectMultipart(std::string_view Key, + uint64_t TotalSize, + std::function<IoBuffer(uint64_t Offset, uint64_t Size)> FetchRange, + uint64_t PartSize) { - const uint64_t ContentSize = Content.GetSize(); - // If the content fits in a single part, just use PutObject - if (ContentSize <= PartSize) + if (TotalSize <= PartSize) { - return PutObject(Key, Content); + return PutObject(Key, TotalSize > 0 ? FetchRange(0, TotalSize) : IoBuffer{}); } - ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, ContentSize, (ContentSize + PartSize - 1) / PartSize); - - // Initiate multipart upload + ZEN_DEBUG("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize); S3CreateMultipartUploadResult InitResult = CreateMultipartUpload(Key); if (!InitResult) @@ -722,38 +772,51 @@ S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t Pa uint64_t Offset = 0; uint32_t PartNumber = 1; - while (Offset < ContentSize) + try { - uint64_t ThisPartSize = std::min(PartSize, ContentSize - Offset); + while (Offset < TotalSize) + { + uint64_t ThisPartSize = std::min(PartSize, TotalSize - Offset); + IoBuffer PartContent = FetchRange(Offset, ThisPartSize); + S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, std::move(PartContent)); + if (!PartResult) + { + AbortMultipartUpload(Key, UploadId); + return S3Result{std::move(PartResult.Error)}; + } - // Create a sub-buffer referencing the part data within the original content - IoBuffer PartContent(Content, Offset, ThisPartSize); + PartETags.emplace_back(PartNumber, std::move(PartResult.ETag)); + Offset += ThisPartSize; + PartNumber++; + } - S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, PartContent); - if (!PartResult) + S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags); + if (!CompleteResult) { - // Attempt to abort the multipart upload on failure AbortMultipartUpload(Key, UploadId); - return S3Result{std::move(PartResult.Error)}; + return CompleteResult; } - - PartETags.emplace_back(PartNumber, std::move(PartResult.ETag)); - Offset += ThisPartSize; - PartNumber++; } - - // Complete multipart upload - S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags); - if (!CompleteResult) + catch (...) { AbortMultipartUpload(Key, UploadId); - return CompleteResult; + throw; } - ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), ContentSize); + ZEN_DEBUG("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize); return {}; } +S3Result +S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize) +{ + return PutObjectMultipart( + Key, + Content.GetSize(), + [&Content](uint64_t Offset, uint64_t Size) { return IoBuffer(Content, Offset, Size); }, + PartSize); +} + ////////////////////////////////////////////////////////////////////////// // Tests @@ -828,7 +891,10 @@ TEST_CASE("s3client.minio_integration") { using namespace std::literals; - // Spawn a local MinIO server + // Spawn a single MinIO server for the entire test case. Previously each SUBCASE re-entered + // the TEST_CASE from the top, spawning and killing MinIO per subcase — slow and flaky on + // macOS CI. Sequential sections avoid the re-entry while still sharing one MinIO instance + // that is torn down via RAII at scope exit. MinioProcessOptions MinioOpts; MinioOpts.Port = 19000; MinioOpts.RootUser = "testuser"; @@ -836,11 +902,8 @@ TEST_CASE("s3client.minio_integration") MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); - - // Pre-create the test bucket (creates a subdirectory in MinIO's data dir) Minio.CreateBucket("integration-test"); - // Configure S3Client for the test bucket S3ClientOptions Opts; Opts.BucketName = "integration-test"; Opts.Region = "us-east-1"; @@ -851,7 +914,7 @@ TEST_CASE("s3client.minio_integration") S3Client Client(Opts); - SUBCASE("put_get_delete") + // -- put_get_delete ------------------------------------------------------- { // PUT std::string_view TestData = "hello, minio integration test!"sv; @@ -880,14 +943,14 @@ TEST_CASE("s3client.minio_integration") CHECK(HeadRes2.Status == HeadObjectResult::NotFound); } - SUBCASE("head_not_found") + // -- head_not_found ------------------------------------------------------- { S3HeadObjectResult Res = Client.HeadObject("nonexistent/key.dat"); CHECK(Res.IsSuccess()); CHECK(Res.Status == HeadObjectResult::NotFound); } - SUBCASE("list_objects") + // -- list_objects --------------------------------------------------------- { // Upload several objects with a common prefix for (int i = 0; i < 3; ++i) @@ -922,7 +985,7 @@ TEST_CASE("s3client.minio_integration") } } - SUBCASE("multipart_upload") + // -- multipart_upload ----------------------------------------------------- { // Create a payload large enough to exercise multipart (use minimum part size) constexpr uint64_t PartSize = 5 * 1024 * 1024; // 5 MB minimum @@ -949,7 +1012,7 @@ TEST_CASE("s3client.minio_integration") Client.DeleteObject("multipart/large.bin"); } - SUBCASE("presigned_urls") + // -- presigned_urls ------------------------------------------------------- { // Upload an object std::string_view TestData = "presigned-url-test-data"sv; @@ -975,8 +1038,6 @@ TEST_CASE("s3client.minio_integration") // Cleanup Client.DeleteObject("presigned/test.txt"); } - - Minio.StopMinioServer(); } TEST_SUITE_END(); |