diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-20 17:07:47 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-03-20 17:07:47 +0100 |
| commit | bcecb25578149e999c3710aeb6948eebe4b3e1b7 (patch) | |
| tree | b09951a9de718b40211e0a58bc4b53209d21224d /src/zenutil | |
| parent | auth fail no cache (#871) (diff) | |
| download | zen-de/s3-hub-hydration.tar.xz zen-de/s3-hub-hydration.zip | |
S3 hydration backend for hub modede/s3-hub-hydration
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/cloud/s3client.cpp | 115 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cloud/s3client.h | 26 |
2 files changed, 107 insertions, 34 deletions
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp index 88d844b61..26d1023f4 100644 --- a/src/zenutil/cloud/s3client.cpp +++ b/src/zenutil/cloud/s3client.cpp @@ -171,14 +171,19 @@ S3Client::GetCurrentCredentials() SigV4Credentials Creds = m_CredentialProvider->GetCredentials(); if (!Creds.AccessKeyId.empty()) { - // Invalidate the signing key cache when the access key changes + // Invalidate the signing key cache when the access key changes, and update stored + // credentials atomically under the same lock so callers see a consistent snapshot. + RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock); if (Creds.AccessKeyId != m_Credentials.AccessKeyId) { - RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock); m_CachedDateStamp.clear(); } m_Credentials = Creds; + // Return Creds directly - avoids reading m_Credentials after releasing the lock, + // which would race with another concurrent write. + return Creds; } + // IMDS returned empty credentials; fall back to the last known-good credentials. return m_Credentials; } return m_Credentials; @@ -252,7 +257,7 @@ S3Client::BucketRootPath() const Sha256Digest S3Client::GetSigningKey(std::string_view DateStamp) { - // Fast path: shared lock for cache hit (common case — key only changes once per day) + // Fast path: shared lock for cache hit (common case - key only changes once per day) { RwLock::SharedLockScope SharedLock(m_SigningKeyLock); if (m_CachedDateStamp == DateStamp) @@ -360,6 +365,46 @@ S3Client::GetObject(std::string_view Key) return S3GetObjectResult{{}, std::move(Response.ResponsePayload)}; } +S3GetObjectResult +S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t RangeSize) +{ + ZEN_ASSERT(RangeSize > 0); + std::string Path = KeyToPath(Key); + + HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash); + Headers->emplace("Range", fmt::format("bytes={}-{}", RangeStart, RangeStart + RangeSize - 1)); + + HttpClient::Response Response = m_HttpClient.Get(Path, Headers); + if (!Response.IsSuccess()) + { + std::string Err = Response.ErrorMessage("S3 GET range failed"); + ZEN_WARN("S3 GET range '{}' [{}-{}] failed: {}", Key, RangeStart, RangeStart + RangeSize - 1, Err); + return S3GetObjectResult{S3Result{std::move(Err)}, {}}; + } + + // Callers are expected to request only ranges that lie within the known object size (e.g. + // by calling HeadObject first). Treat a short read as an error rather than silently + // returning a truncated buffer - a partial write is more dangerous than a hard failure. + if (Response.ResponsePayload.GetSize() != RangeSize) + { + std::string Err = fmt::format("S3 GET range '{}' [{}-{}] returned {} bytes, expected {}", + Key, + RangeStart, + RangeStart + RangeSize - 1, + Response.ResponsePayload.GetSize(), + RangeSize); + ZEN_WARN("{}", Err); + return S3GetObjectResult{S3Result{std::move(Err)}, {}}; + } + + ZEN_DEBUG("S3 GET range '{}' [{}-{}] succeeded ({} bytes)", + Key, + RangeStart, + RangeStart + RangeSize - 1, + Response.ResponsePayload.GetSize()); + return S3GetObjectResult{{}, std::move(Response.ResponsePayload)}; +} + S3Result S3Client::DeleteObject(std::string_view Key) { @@ -693,19 +738,18 @@ S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view M } S3Result -S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize) +S3Client::PutObjectMultipart(std::string_view Key, + uint64_t TotalSize, + std::function<IoBuffer(uint64_t Offset, uint64_t Size)> FetchRange, + uint64_t PartSize) { - const uint64_t ContentSize = Content.GetSize(); - // If the content fits in a single part, just use PutObject - if (ContentSize <= PartSize) + if (TotalSize <= PartSize) { - return PutObject(Key, Content); + return PutObject(Key, TotalSize > 0 ? FetchRange(0, TotalSize) : IoBuffer{}); } - ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, ContentSize, (ContentSize + PartSize - 1) / PartSize); - - // Initiate multipart upload + ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize); S3CreateMultipartUploadResult InitResult = CreateMultipartUpload(Key); if (!InitResult) @@ -722,38 +766,51 @@ S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t Pa uint64_t Offset = 0; uint32_t PartNumber = 1; - while (Offset < ContentSize) + try { - uint64_t ThisPartSize = std::min(PartSize, ContentSize - Offset); + while (Offset < TotalSize) + { + uint64_t ThisPartSize = std::min(PartSize, TotalSize - Offset); + IoBuffer PartContent = FetchRange(Offset, ThisPartSize); + S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, std::move(PartContent)); + if (!PartResult) + { + AbortMultipartUpload(Key, UploadId); + return S3Result{std::move(PartResult.Error)}; + } - // Create a sub-buffer referencing the part data within the original content - IoBuffer PartContent(Content, Offset, ThisPartSize); + PartETags.emplace_back(PartNumber, std::move(PartResult.ETag)); + Offset += ThisPartSize; + PartNumber++; + } - S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, PartContent); - if (!PartResult) + S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags); + if (!CompleteResult) { - // Attempt to abort the multipart upload on failure AbortMultipartUpload(Key, UploadId); - return S3Result{std::move(PartResult.Error)}; + return CompleteResult; } - - PartETags.emplace_back(PartNumber, std::move(PartResult.ETag)); - Offset += ThisPartSize; - PartNumber++; } - - // Complete multipart upload - S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags); - if (!CompleteResult) + catch (...) { AbortMultipartUpload(Key, UploadId); - return CompleteResult; + throw; } - ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), ContentSize); + ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize); return {}; } +S3Result +S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize) +{ + return PutObjectMultipart( + Key, + Content.GetSize(), + [&Content](uint64_t Offset, uint64_t Size) { return IoBuffer(Content, Offset, Size); }, + PartSize); +} + ////////////////////////////////////////////////////////////////////////// // Tests diff --git a/src/zenutil/include/zenutil/cloud/s3client.h b/src/zenutil/include/zenutil/cloud/s3client.h index 47501c5b5..bd30aa8a2 100644 --- a/src/zenutil/include/zenutil/cloud/s3client.h +++ b/src/zenutil/include/zenutil/cloud/s3client.h @@ -11,6 +11,7 @@ #include <zencore/thread.h> +#include <functional> #include <string> #include <string_view> #include <vector> @@ -63,7 +64,7 @@ enum class HeadObjectResult Error, }; -/// Result of GetObject — carries the downloaded content. +/// Result of GetObject - carries the downloaded content. struct S3GetObjectResult : S3Result { IoBuffer Content; @@ -71,26 +72,26 @@ struct S3GetObjectResult : S3Result std::string_view AsText() const { return std::string_view(reinterpret_cast<const char*>(Content.GetData()), Content.GetSize()); } }; -/// Result of HeadObject — carries object metadata and existence status. +/// Result of HeadObject - carries object metadata and existence status. struct S3HeadObjectResult : S3Result { S3ObjectInfo Info; HeadObjectResult Status = HeadObjectResult::NotFound; }; -/// Result of ListObjects — carries the list of matching objects. +/// Result of ListObjects - carries the list of matching objects. struct S3ListObjectsResult : S3Result { std::vector<S3ObjectInfo> Objects; }; -/// Result of CreateMultipartUpload — carries the upload ID. +/// Result of CreateMultipartUpload - carries the upload ID. struct S3CreateMultipartUploadResult : S3Result { std::string UploadId; }; -/// Result of UploadPart — carries the part ETag. +/// Result of UploadPart - carries the part ETag. struct S3UploadPartResult : S3Result { std::string ETag; @@ -120,6 +121,11 @@ public: /// Download an object from S3 S3GetObjectResult GetObject(std::string_view Key); + /// Download a byte range of an object from S3 + /// @param RangeStart First byte offset (inclusive) + /// @param RangeSize Number of bytes to download + S3GetObjectResult GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t RangeSize); + /// Delete an object from S3 S3Result DeleteObject(std::string_view Key); @@ -151,6 +157,16 @@ public: /// @param PartSize Size of each part in bytes (minimum 5 MB, default 8 MB) S3Result PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize = 8 * 1024 * 1024); + /// High-level multipart upload: calls FetchRange(Offset, Size) to read each part on demand, + /// avoiding loading the full content into memory. + /// @param TotalSize Total object size in bytes + /// @param FetchRange Callback invoked once per part; must return exactly Size bytes + /// @param PartSize Size of each part in bytes (minimum 5 MB, default 8 MB) + S3Result PutObjectMultipart(std::string_view Key, + uint64_t TotalSize, + std::function<IoBuffer(uint64_t Offset, uint64_t Size)> FetchRange, + uint64_t PartSize = 8 * 1024 * 1024); + /// Generate a pre-signed URL for downloading an object (GET) /// @param Key The object key /// @param ExpiresIn URL validity duration (default 1 hour, max 7 days) |