aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/cloud/s3client.cpp115
-rw-r--r--src/zenutil/include/zenutil/cloud/s3client.h26
2 files changed, 107 insertions, 34 deletions
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp
index 88d844b61..26d1023f4 100644
--- a/src/zenutil/cloud/s3client.cpp
+++ b/src/zenutil/cloud/s3client.cpp
@@ -171,14 +171,19 @@ S3Client::GetCurrentCredentials()
SigV4Credentials Creds = m_CredentialProvider->GetCredentials();
if (!Creds.AccessKeyId.empty())
{
- // Invalidate the signing key cache when the access key changes
+ // Invalidate the signing key cache when the access key changes, and update stored
+ // credentials atomically under the same lock so callers see a consistent snapshot.
+ RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock);
if (Creds.AccessKeyId != m_Credentials.AccessKeyId)
{
- RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock);
m_CachedDateStamp.clear();
}
m_Credentials = Creds;
+ // Return Creds directly - avoids reading m_Credentials after releasing the lock,
+ // which would race with another concurrent write.
+ return Creds;
}
+ // IMDS returned empty credentials; fall back to the last known-good credentials.
return m_Credentials;
}
return m_Credentials;
@@ -252,7 +257,7 @@ S3Client::BucketRootPath() const
Sha256Digest
S3Client::GetSigningKey(std::string_view DateStamp)
{
- // Fast path: shared lock for cache hit (common case — key only changes once per day)
+ // Fast path: shared lock for cache hit (common case - key only changes once per day)
{
RwLock::SharedLockScope SharedLock(m_SigningKeyLock);
if (m_CachedDateStamp == DateStamp)
@@ -360,6 +365,46 @@ S3Client::GetObject(std::string_view Key)
return S3GetObjectResult{{}, std::move(Response.ResponsePayload)};
}
+S3GetObjectResult
+S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t RangeSize)
+{
+ ZEN_ASSERT(RangeSize > 0);
+ std::string Path = KeyToPath(Key);
+
+ HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash);
+ Headers->emplace("Range", fmt::format("bytes={}-{}", RangeStart, RangeStart + RangeSize - 1));
+
+ HttpClient::Response Response = m_HttpClient.Get(Path, Headers);
+ if (!Response.IsSuccess())
+ {
+ std::string Err = Response.ErrorMessage("S3 GET range failed");
+ ZEN_WARN("S3 GET range '{}' [{}-{}] failed: {}", Key, RangeStart, RangeStart + RangeSize - 1, Err);
+ return S3GetObjectResult{S3Result{std::move(Err)}, {}};
+ }
+
+ // Callers are expected to request only ranges that lie within the known object size (e.g.
+ // by calling HeadObject first). Treat a short read as an error rather than silently
+ // returning a truncated buffer - a partial write is more dangerous than a hard failure.
+ if (Response.ResponsePayload.GetSize() != RangeSize)
+ {
+ std::string Err = fmt::format("S3 GET range '{}' [{}-{}] returned {} bytes, expected {}",
+ Key,
+ RangeStart,
+ RangeStart + RangeSize - 1,
+ Response.ResponsePayload.GetSize(),
+ RangeSize);
+ ZEN_WARN("{}", Err);
+ return S3GetObjectResult{S3Result{std::move(Err)}, {}};
+ }
+
+ ZEN_DEBUG("S3 GET range '{}' [{}-{}] succeeded ({} bytes)",
+ Key,
+ RangeStart,
+ RangeStart + RangeSize - 1,
+ Response.ResponsePayload.GetSize());
+ return S3GetObjectResult{{}, std::move(Response.ResponsePayload)};
+}
+
S3Result
S3Client::DeleteObject(std::string_view Key)
{
@@ -693,19 +738,18 @@ S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view M
}
S3Result
-S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize)
+S3Client::PutObjectMultipart(std::string_view Key,
+ uint64_t TotalSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)> FetchRange,
+ uint64_t PartSize)
{
- const uint64_t ContentSize = Content.GetSize();
-
// If the content fits in a single part, just use PutObject
- if (ContentSize <= PartSize)
+ if (TotalSize <= PartSize)
{
- return PutObject(Key, Content);
+ return PutObject(Key, TotalSize > 0 ? FetchRange(0, TotalSize) : IoBuffer{});
}
- ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, ContentSize, (ContentSize + PartSize - 1) / PartSize);
-
- // Initiate multipart upload
+ ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize);
S3CreateMultipartUploadResult InitResult = CreateMultipartUpload(Key);
if (!InitResult)
@@ -722,38 +766,51 @@ S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t Pa
uint64_t Offset = 0;
uint32_t PartNumber = 1;
- while (Offset < ContentSize)
+ try
{
- uint64_t ThisPartSize = std::min(PartSize, ContentSize - Offset);
+ while (Offset < TotalSize)
+ {
+ uint64_t ThisPartSize = std::min(PartSize, TotalSize - Offset);
+ IoBuffer PartContent = FetchRange(Offset, ThisPartSize);
+ S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, std::move(PartContent));
+ if (!PartResult)
+ {
+ AbortMultipartUpload(Key, UploadId);
+ return S3Result{std::move(PartResult.Error)};
+ }
- // Create a sub-buffer referencing the part data within the original content
- IoBuffer PartContent(Content, Offset, ThisPartSize);
+ PartETags.emplace_back(PartNumber, std::move(PartResult.ETag));
+ Offset += ThisPartSize;
+ PartNumber++;
+ }
- S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, PartContent);
- if (!PartResult)
+ S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags);
+ if (!CompleteResult)
{
- // Attempt to abort the multipart upload on failure
AbortMultipartUpload(Key, UploadId);
- return S3Result{std::move(PartResult.Error)};
+ return CompleteResult;
}
-
- PartETags.emplace_back(PartNumber, std::move(PartResult.ETag));
- Offset += ThisPartSize;
- PartNumber++;
}
-
- // Complete multipart upload
- S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags);
- if (!CompleteResult)
+ catch (...)
{
AbortMultipartUpload(Key, UploadId);
- return CompleteResult;
+ throw;
}
- ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), ContentSize);
+ ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize);
return {};
}
+S3Result
+S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize)
+{
+ return PutObjectMultipart(
+ Key,
+ Content.GetSize(),
+ [&Content](uint64_t Offset, uint64_t Size) { return IoBuffer(Content, Offset, Size); },
+ PartSize);
+}
+
//////////////////////////////////////////////////////////////////////////
// Tests
diff --git a/src/zenutil/include/zenutil/cloud/s3client.h b/src/zenutil/include/zenutil/cloud/s3client.h
index 47501c5b5..bd30aa8a2 100644
--- a/src/zenutil/include/zenutil/cloud/s3client.h
+++ b/src/zenutil/include/zenutil/cloud/s3client.h
@@ -11,6 +11,7 @@
#include <zencore/thread.h>
+#include <functional>
#include <string>
#include <string_view>
#include <vector>
@@ -63,7 +64,7 @@ enum class HeadObjectResult
Error,
};
-/// Result of GetObject — carries the downloaded content.
+/// Result of GetObject - carries the downloaded content.
struct S3GetObjectResult : S3Result
{
IoBuffer Content;
@@ -71,26 +72,26 @@ struct S3GetObjectResult : S3Result
std::string_view AsText() const { return std::string_view(reinterpret_cast<const char*>(Content.GetData()), Content.GetSize()); }
};
-/// Result of HeadObject — carries object metadata and existence status.
+/// Result of HeadObject - carries object metadata and existence status.
struct S3HeadObjectResult : S3Result
{
S3ObjectInfo Info;
HeadObjectResult Status = HeadObjectResult::NotFound;
};
-/// Result of ListObjects — carries the list of matching objects.
+/// Result of ListObjects - carries the list of matching objects.
struct S3ListObjectsResult : S3Result
{
std::vector<S3ObjectInfo> Objects;
};
-/// Result of CreateMultipartUpload — carries the upload ID.
+/// Result of CreateMultipartUpload - carries the upload ID.
struct S3CreateMultipartUploadResult : S3Result
{
std::string UploadId;
};
-/// Result of UploadPart — carries the part ETag.
+/// Result of UploadPart - carries the part ETag.
struct S3UploadPartResult : S3Result
{
std::string ETag;
@@ -120,6 +121,11 @@ public:
/// Download an object from S3
S3GetObjectResult GetObject(std::string_view Key);
+ /// Download a byte range of an object from S3
+ /// @param RangeStart First byte offset (inclusive)
+ /// @param RangeSize Number of bytes to download
+ S3GetObjectResult GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t RangeSize);
+
/// Delete an object from S3
S3Result DeleteObject(std::string_view Key);
@@ -151,6 +157,16 @@ public:
/// @param PartSize Size of each part in bytes (minimum 5 MB, default 8 MB)
S3Result PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize = 8 * 1024 * 1024);
+ /// High-level multipart upload: calls FetchRange(Offset, Size) to read each part on demand,
+ /// avoiding loading the full content into memory.
+ /// @param TotalSize Total object size in bytes
+ /// @param FetchRange Callback invoked once per part; must return exactly Size bytes
+ /// @param PartSize Size of each part in bytes (minimum 5 MB, default 8 MB)
+ S3Result PutObjectMultipart(std::string_view Key,
+ uint64_t TotalSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)> FetchRange,
+ uint64_t PartSize = 8 * 1024 * 1024);
+
/// Generate a pre-signed URL for downloading an object (GET)
/// @param Key The object key
/// @param ExpiresIn URL validity duration (default 1 hour, max 7 days)