aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-20 17:07:47 +0100
committerDan Engelbrecht <[email protected]>2026-03-20 17:07:47 +0100
commitbcecb25578149e999c3710aeb6948eebe4b3e1b7 (patch)
treeb09951a9de718b40211e0a58bc4b53209d21224d
parentauth fail no cache (#871) (diff)
downloadzen-de/s3-hub-hydration.tar.xz
zen-de/s3-hub-hydration.zip
S3 hydration backend for hub modede/s3-hub-hydration
-rw-r--r--CHANGELOG.md4
-rw-r--r--src/zenserver/hub/hydration.cpp379
-rw-r--r--src/zenutil/cloud/s3client.cpp115
-rw-r--r--src/zenutil/include/zenutil/cloud/s3client.h26
4 files changed, 490 insertions, 34 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7b072923e..d85e65fb3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,8 @@
##
+- Feature: Added S3 hydration backend for hub mode (`--hub-hydration-target-spec s3://<bucket>[/<prefix>]`)
+ - Credentials resolved from `AWS_ACCESS_KEY_ID`/`AWS_SECRET_ACCESS_KEY` env vars, falling back to EC2 instance profile via IMDS
+ - Each dehydration uploads to a new timestamped folder and commits a `current-state.json` pointer on success, so a failed upload never invalidates the previous state
+ - Hydration downloads to a temp directory first and only replaces the server state on full success; failures leave the existing state intact
- Feature: Added `zen bench disk <path>` command to gather basic disk performance numbers for diagnostics. Measures sequential/random/clone and disk operation (create,delete) timings
- Feature: Added transparent TCP proxy mode (`zenserver proxy`) for inspecting and monitoring HTTP/1.x traffic
between clients and upstream Zen servers in real time. Useful for observing multi-server/client interactions
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index e56be3934..4998ef1af 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -2,12 +2,51 @@
#include "hydration.h"
+#include <zencore/basicfile.h>
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/except_fmt.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/system.h>
+#include <zenutil/cloud/imdscredentials.h>
+#include <zenutil/cloud/s3client.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <json11.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+namespace {
+
+ /// UTC time decomposed to calendar fields with sub-second milliseconds.
+ struct UtcTime
+ {
+ std::tm Tm{};
+ int Ms = 0; // sub-second milliseconds [0, 999]
+
+ static UtcTime Now()
+ {
+ std::chrono::system_clock::time_point TimePoint = std::chrono::system_clock::now();
+ std::time_t TimeT = std::chrono::system_clock::to_time_t(TimePoint);
+ int SubSecMs =
+ static_cast<int>((std::chrono::duration_cast<std::chrono::milliseconds>(TimePoint.time_since_epoch()) % 1000).count());
+
+ UtcTime Result;
+ Result.Ms = SubSecMs;
+#if ZEN_PLATFORM_WINDOWS
+ gmtime_s(&Result.Tm, &TimeT);
+#else
+ gmtime_r(&TimeT, &Result.Tm);
+#endif
+ return Result;
+ }
+ };
+
+} // namespace
+
///////////////////////////////////////////////////////////////////////////
constexpr std::string_view FileHydratorPrefix = "file://";
@@ -114,6 +153,340 @@ FileHydrator::Dehydrate()
CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
}
+///////////////////////////////////////////////////////////////////////////
+
+constexpr std::string_view S3HydratorPrefix = "s3://";
+
+struct S3Hydrator : public HydrationStrategyBase
+{
+ void Configure(const HydrationConfig& Config) override;
+ void Dehydrate() override;
+ void Hydrate() override;
+
+private:
+ S3Client CreateS3Client() const;
+ std::string BuildTimestampFolderName() const;
+ std::string MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const;
+
+ HydrationConfig m_Config;
+ std::string m_Bucket;
+ std::string m_KeyPrefix; // "<user-prefix>/<ModuleId>" or just "<ModuleId>" - no trailing slash
+ std::string m_Region;
+ SigV4Credentials m_Credentials;
+ Ref<ImdsCredentialProvider> m_CredentialProvider;
+};
+
+void
+S3Hydrator::Configure(const HydrationConfig& Config)
+{
+ m_Config = Config;
+
+ std::string_view Spec = m_Config.TargetSpecification;
+ Spec.remove_prefix(S3HydratorPrefix.size());
+
+ size_t SlashPos = Spec.find('/');
+ std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{};
+ m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec);
+ m_KeyPrefix = UserPrefix.empty() ? m_Config.ModuleId : UserPrefix + "/" + m_Config.ModuleId;
+
+ ZEN_ASSERT(!m_Bucket.empty());
+
+ std::string Region = GetEnvVariable("AWS_DEFAULT_REGION");
+ if (Region.empty())
+ {
+ Region = GetEnvVariable("AWS_REGION");
+ }
+ if (Region.empty())
+ {
+ Region = "us-east-1";
+ }
+ m_Region = std::move(Region);
+
+ std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID");
+ if (AccessKeyId.empty())
+ {
+ m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({}));
+ }
+ else
+ {
+ m_Credentials.AccessKeyId = std::move(AccessKeyId);
+ m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY");
+ m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN");
+ }
+}
+
+S3Client
+S3Hydrator::CreateS3Client() const
+{
+ S3ClientOptions Options;
+ Options.BucketName = m_Bucket;
+ Options.Region = m_Region;
+
+ if (m_CredentialProvider)
+ {
+ Options.CredentialProvider = m_CredentialProvider;
+ }
+ else
+ {
+ Options.Credentials = m_Credentials;
+ }
+
+ return S3Client(Options);
+}
+
+std::string
+S3Hydrator::BuildTimestampFolderName() const
+{
+ UtcTime Now = UtcTime::Now();
+ return fmt::format("{:04d}{:02d}{:02d}-{:02d}{:02d}{:02d}-{:03d}",
+ Now.Tm.tm_year + 1900,
+ Now.Tm.tm_mon + 1,
+ Now.Tm.tm_mday,
+ Now.Tm.tm_hour,
+ Now.Tm.tm_min,
+ Now.Tm.tm_sec,
+ Now.Ms);
+}
+
+std::string
+S3Hydrator::MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const
+{
+ return m_KeyPrefix + "/" + std::string(FolderName) + "/" + RelPath.generic_string();
+}
+
+void
+S3Hydrator::Dehydrate()
+{
+ ZEN_INFO("Dehydrating state from '{}' to s3://{}/{}", m_Config.ServerStateDir, m_Bucket, m_KeyPrefix);
+
+ try
+ {
+ S3Client Client = CreateS3Client();
+ std::string FolderName = BuildTimestampFolderName();
+ uint64_t TotalBytes = 0;
+ uint32_t FileCount = 0;
+ std::chrono::steady_clock::time_point UploadStart = std::chrono::steady_clock::now();
+
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_Config.ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, DirContent);
+
+ for (const std::filesystem::path& RelPath : DirContent.Files)
+ {
+ std::string Key = MakeObjectKey(FolderName, RelPath);
+
+ BasicFile File(MakeSafeAbsolutePath(m_Config.ServerStateDir / RelPath), BasicFile::Mode::kRead);
+ uint64_t FileSize = File.FileSize();
+
+ S3Result UploadResult =
+ Client.PutObjectMultipart(Key, FileSize, [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); });
+ if (!UploadResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, UploadResult.Error);
+ }
+
+ TotalBytes += FileSize;
+ ++FileCount;
+ }
+
+ // Write current-state.json
+ int64_t UploadDurationMs =
+ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - UploadStart).count();
+
+ UtcTime Now = UtcTime::Now();
+ std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
+ Now.Tm.tm_year + 1900,
+ Now.Tm.tm_mon + 1,
+ Now.Tm.tm_mday,
+ Now.Tm.tm_hour,
+ Now.Tm.tm_min,
+ Now.Tm.tm_sec,
+ Now.Ms);
+
+ CbObjectWriter Meta;
+ Meta << "FolderName" << FolderName;
+ Meta << "ModuleId" << m_Config.ModuleId;
+ Meta << "HostName" << GetMachineName();
+ Meta << "UploadTimeUtc" << UploadTimeUtc;
+ Meta << "UploadDurationMs" << UploadDurationMs;
+ Meta << "TotalSizeBytes" << TotalBytes;
+ Meta << "FileCount" << FileCount;
+
+ ExtendableStringBuilder<1024> JsonBuilder;
+ Meta.Save().ToJson(JsonBuilder);
+
+ std::string MetaKey = m_KeyPrefix + "/current-state.json";
+ std::string_view JsonText = JsonBuilder.ToView();
+ IoBuffer MetaBuf(IoBuffer::Clone, JsonText.data(), JsonText.size());
+ S3Result MetaUploadResult = Client.PutObject(MetaKey, std::move(MetaBuf));
+ if (!MetaUploadResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to write current-state.json to '{}': {}", MetaKey, MetaUploadResult.Error);
+ }
+
+ ZEN_INFO("Dehydration complete: {} files, {} bytes, {} ms", FileCount, TotalBytes, UploadDurationMs);
+ }
+ catch (std::exception& Ex)
+ {
+ // Any in-progress multipart upload has already been aborted by PutObjectMultipart.
+ // current-state.json is only written on success, so the previous S3 state remains valid.
+ ZEN_WARN("S3 dehydration failed: {}. S3 state not updated.", Ex.what());
+ }
+}
+
+void
+S3Hydrator::Hydrate()
+{
+ ZEN_INFO("Hydrating state from s3://{}/{} to '{}'", m_Bucket, m_KeyPrefix, m_Config.ServerStateDir);
+
+ const bool ForceRemoveReadOnlyFiles = true;
+
+ // Clean temp dir before starting in case of leftover state from a previous failed hydration
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+
+ bool WipeServerState = false;
+
+ try
+ {
+ S3Client Client = CreateS3Client();
+ std::string MetaKey = m_KeyPrefix + "/current-state.json";
+
+ S3HeadObjectResult HeadResult = Client.HeadObject(MetaKey);
+ if (HeadResult.Status == HeadObjectResult::NotFound)
+ {
+ throw zen::runtime_error("No state found in S3 at '{}'", MetaKey);
+ }
+ if (!HeadResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to check for state in S3 at '{}': {}", MetaKey, HeadResult.Error);
+ }
+
+ S3GetObjectResult MetaResult = Client.GetObject(MetaKey);
+ if (!MetaResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to read current-state.json from '{}': {}", MetaKey, MetaResult.Error);
+ }
+
+ std::string ParseError;
+ json11::Json MetaJson = json11::Json::parse(std::string(MetaResult.AsText()), ParseError);
+ if (!ParseError.empty())
+ {
+ throw zen::runtime_error("Failed to parse current-state.json from '{}': {}", MetaKey, ParseError);
+ }
+
+ std::string FolderName = MetaJson["FolderName"].string_value();
+ if (FolderName.empty())
+ {
+ throw zen::runtime_error("current-state.json from '{}' has missing or empty FolderName", MetaKey);
+ }
+
+ std::string FolderPrefix = m_KeyPrefix + "/" + FolderName + "/";
+ S3ListObjectsResult ListResult = Client.ListObjects(FolderPrefix);
+ if (!ListResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to list S3 objects under '{}': {}", FolderPrefix, ListResult.Error);
+ }
+
+ for (const S3ObjectInfo& Obj : ListResult.Objects)
+ {
+ if (!Obj.Key.starts_with(FolderPrefix))
+ {
+ ZEN_WARN("Skipping unexpected S3 key '{}' (expected prefix '{}')", Obj.Key, FolderPrefix);
+ continue;
+ }
+
+ std::string RelKey = Obj.Key.substr(FolderPrefix.size());
+ if (RelKey.empty())
+ {
+ continue;
+ }
+ std::filesystem::path DestPath = MakeSafeAbsolutePath(m_Config.TempDir / std::filesystem::path(RelKey));
+ CreateDirectories(DestPath.parent_path());
+
+ BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate);
+ DestFile.SetFileSize(Obj.Size);
+
+ if (Obj.Size > 0)
+ {
+ BasicFileWriter Writer(DestFile, 64 * 1024);
+
+ uint64_t Offset = 0;
+ while (Offset < Obj.Size)
+ {
+ uint64_t ChunkSize = std::min<uint64_t>(8 * 1024 * 1024, Obj.Size - Offset);
+ S3GetObjectResult Chunk = Client.GetObjectRange(Obj.Key, Offset, ChunkSize);
+ if (!Chunk.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}",
+ Obj.Key,
+ Offset,
+ Offset + ChunkSize - 1,
+ Chunk.Error);
+ }
+
+ Writer.Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset);
+ Offset += ChunkSize;
+ }
+
+ Writer.Flush();
+ }
+ }
+
+ // Downloaded successfully - swap into ServerStateDir
+ ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+
+ // If the two paths share at least one common component they are on the same drive/volume
+ // and atomic renames will succeed. Otherwise fall back to a full copy.
+ auto [ItTmp, ItState] =
+ std::mismatch(m_Config.TempDir.begin(), m_Config.TempDir.end(), m_Config.ServerStateDir.begin(), m_Config.ServerStateDir.end());
+ if (ItTmp != m_Config.TempDir.begin())
+ {
+ // Fast path: atomic renames - no data copying needed
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.TempDir))
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / Entry.path().filename());
+ if (Entry.is_directory())
+ {
+ RenameDirectory(Entry.path(), Dest);
+ }
+ else
+ {
+ RenameFile(Entry.path(), Dest);
+ }
+ }
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+ }
+ else
+ {
+ // Slow path: TempDir and ServerStateDir are on different filesystems, so rename
+ // would fail. Copy the tree instead and clean up the temp files afterwards.
+ ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree");
+ CopyTree(m_Config.TempDir, m_Config.ServerStateDir, {.EnableClone = true});
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+ }
+
+ ZEN_INFO("Hydration complete from folder '{}'", FolderName);
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("S3 hydration failed: {}. Will wipe any partially installed state.", Ex.what());
+
+ // We don't do the clean right here to avoid potentially running into double-throws
+ WipeServerState = true;
+ }
+
+ if (WipeServerState)
+ {
+ ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+ }
+}
+
std::unique_ptr<HydrationStrategyBase>
CreateHydrator(const HydrationConfig& Config)
{
@@ -123,6 +496,12 @@ CreateHydrator(const HydrationConfig& Config)
Hydrator->Configure(Config);
return Hydrator;
}
+ if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0)
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>();
+ Hydrator->Configure(Config);
+ return Hydrator;
+ }
throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification));
}
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp
index 88d844b61..26d1023f4 100644
--- a/src/zenutil/cloud/s3client.cpp
+++ b/src/zenutil/cloud/s3client.cpp
@@ -171,14 +171,19 @@ S3Client::GetCurrentCredentials()
SigV4Credentials Creds = m_CredentialProvider->GetCredentials();
if (!Creds.AccessKeyId.empty())
{
- // Invalidate the signing key cache when the access key changes
+ // Invalidate the signing key cache when the access key changes, and update stored
+ // credentials atomically under the same lock so callers see a consistent snapshot.
+ RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock);
if (Creds.AccessKeyId != m_Credentials.AccessKeyId)
{
- RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock);
m_CachedDateStamp.clear();
}
m_Credentials = Creds;
+ // Return Creds directly - avoids reading m_Credentials after releasing the lock,
+ // which would race with another concurrent write.
+ return Creds;
}
+ // IMDS returned empty credentials; fall back to the last known-good credentials.
return m_Credentials;
}
return m_Credentials;
@@ -252,7 +257,7 @@ S3Client::BucketRootPath() const
Sha256Digest
S3Client::GetSigningKey(std::string_view DateStamp)
{
- // Fast path: shared lock for cache hit (common case — key only changes once per day)
+ // Fast path: shared lock for cache hit (common case - key only changes once per day)
{
RwLock::SharedLockScope SharedLock(m_SigningKeyLock);
if (m_CachedDateStamp == DateStamp)
@@ -360,6 +365,46 @@ S3Client::GetObject(std::string_view Key)
return S3GetObjectResult{{}, std::move(Response.ResponsePayload)};
}
+S3GetObjectResult
+S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t RangeSize)
+{
+ ZEN_ASSERT(RangeSize > 0);
+ std::string Path = KeyToPath(Key);
+
+ HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash);
+ Headers->emplace("Range", fmt::format("bytes={}-{}", RangeStart, RangeStart + RangeSize - 1));
+
+ HttpClient::Response Response = m_HttpClient.Get(Path, Headers);
+ if (!Response.IsSuccess())
+ {
+ std::string Err = Response.ErrorMessage("S3 GET range failed");
+ ZEN_WARN("S3 GET range '{}' [{}-{}] failed: {}", Key, RangeStart, RangeStart + RangeSize - 1, Err);
+ return S3GetObjectResult{S3Result{std::move(Err)}, {}};
+ }
+
+ // Callers are expected to request only ranges that lie within the known object size (e.g.
+ // by calling HeadObject first). Treat a short read as an error rather than silently
+ // returning a truncated buffer - a partial write is more dangerous than a hard failure.
+ if (Response.ResponsePayload.GetSize() != RangeSize)
+ {
+ std::string Err = fmt::format("S3 GET range '{}' [{}-{}] returned {} bytes, expected {}",
+ Key,
+ RangeStart,
+ RangeStart + RangeSize - 1,
+ Response.ResponsePayload.GetSize(),
+ RangeSize);
+ ZEN_WARN("{}", Err);
+ return S3GetObjectResult{S3Result{std::move(Err)}, {}};
+ }
+
+ ZEN_DEBUG("S3 GET range '{}' [{}-{}] succeeded ({} bytes)",
+ Key,
+ RangeStart,
+ RangeStart + RangeSize - 1,
+ Response.ResponsePayload.GetSize());
+ return S3GetObjectResult{{}, std::move(Response.ResponsePayload)};
+}
+
S3Result
S3Client::DeleteObject(std::string_view Key)
{
@@ -693,19 +738,18 @@ S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view M
}
S3Result
-S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize)
+S3Client::PutObjectMultipart(std::string_view Key,
+ uint64_t TotalSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)> FetchRange,
+ uint64_t PartSize)
{
- const uint64_t ContentSize = Content.GetSize();
-
// If the content fits in a single part, just use PutObject
- if (ContentSize <= PartSize)
+ if (TotalSize <= PartSize)
{
- return PutObject(Key, Content);
+ return PutObject(Key, TotalSize > 0 ? FetchRange(0, TotalSize) : IoBuffer{});
}
- ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, ContentSize, (ContentSize + PartSize - 1) / PartSize);
-
- // Initiate multipart upload
+ ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize);
S3CreateMultipartUploadResult InitResult = CreateMultipartUpload(Key);
if (!InitResult)
@@ -722,38 +766,51 @@ S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t Pa
uint64_t Offset = 0;
uint32_t PartNumber = 1;
- while (Offset < ContentSize)
+ try
{
- uint64_t ThisPartSize = std::min(PartSize, ContentSize - Offset);
+ while (Offset < TotalSize)
+ {
+ uint64_t ThisPartSize = std::min(PartSize, TotalSize - Offset);
+ IoBuffer PartContent = FetchRange(Offset, ThisPartSize);
+ S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, std::move(PartContent));
+ if (!PartResult)
+ {
+ AbortMultipartUpload(Key, UploadId);
+ return S3Result{std::move(PartResult.Error)};
+ }
- // Create a sub-buffer referencing the part data within the original content
- IoBuffer PartContent(Content, Offset, ThisPartSize);
+ PartETags.emplace_back(PartNumber, std::move(PartResult.ETag));
+ Offset += ThisPartSize;
+ PartNumber++;
+ }
- S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, PartContent);
- if (!PartResult)
+ S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags);
+ if (!CompleteResult)
{
- // Attempt to abort the multipart upload on failure
AbortMultipartUpload(Key, UploadId);
- return S3Result{std::move(PartResult.Error)};
+ return CompleteResult;
}
-
- PartETags.emplace_back(PartNumber, std::move(PartResult.ETag));
- Offset += ThisPartSize;
- PartNumber++;
}
-
- // Complete multipart upload
- S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags);
- if (!CompleteResult)
+ catch (...)
{
AbortMultipartUpload(Key, UploadId);
- return CompleteResult;
+ throw;
}
- ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), ContentSize);
+ ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize);
return {};
}
+S3Result
+S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize)
+{
+ return PutObjectMultipart(
+ Key,
+ Content.GetSize(),
+ [&Content](uint64_t Offset, uint64_t Size) { return IoBuffer(Content, Offset, Size); },
+ PartSize);
+}
+
//////////////////////////////////////////////////////////////////////////
// Tests
diff --git a/src/zenutil/include/zenutil/cloud/s3client.h b/src/zenutil/include/zenutil/cloud/s3client.h
index 47501c5b5..bd30aa8a2 100644
--- a/src/zenutil/include/zenutil/cloud/s3client.h
+++ b/src/zenutil/include/zenutil/cloud/s3client.h
@@ -11,6 +11,7 @@
#include <zencore/thread.h>
+#include <functional>
#include <string>
#include <string_view>
#include <vector>
@@ -63,7 +64,7 @@ enum class HeadObjectResult
Error,
};
-/// Result of GetObject — carries the downloaded content.
+/// Result of GetObject - carries the downloaded content.
struct S3GetObjectResult : S3Result
{
IoBuffer Content;
@@ -71,26 +72,26 @@ struct S3GetObjectResult : S3Result
std::string_view AsText() const { return std::string_view(reinterpret_cast<const char*>(Content.GetData()), Content.GetSize()); }
};
-/// Result of HeadObject — carries object metadata and existence status.
+/// Result of HeadObject - carries object metadata and existence status.
struct S3HeadObjectResult : S3Result
{
S3ObjectInfo Info;
HeadObjectResult Status = HeadObjectResult::NotFound;
};
-/// Result of ListObjects — carries the list of matching objects.
+/// Result of ListObjects - carries the list of matching objects.
struct S3ListObjectsResult : S3Result
{
std::vector<S3ObjectInfo> Objects;
};
-/// Result of CreateMultipartUpload — carries the upload ID.
+/// Result of CreateMultipartUpload - carries the upload ID.
struct S3CreateMultipartUploadResult : S3Result
{
std::string UploadId;
};
-/// Result of UploadPart — carries the part ETag.
+/// Result of UploadPart - carries the part ETag.
struct S3UploadPartResult : S3Result
{
std::string ETag;
@@ -120,6 +121,11 @@ public:
/// Download an object from S3
S3GetObjectResult GetObject(std::string_view Key);
+ /// Download a byte range of an object from S3
+ /// @param RangeStart First byte offset (inclusive)
+ /// @param RangeSize Number of bytes to download
+ S3GetObjectResult GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t RangeSize);
+
/// Delete an object from S3
S3Result DeleteObject(std::string_view Key);
@@ -151,6 +157,16 @@ public:
/// @param PartSize Size of each part in bytes (minimum 5 MB, default 8 MB)
S3Result PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize = 8 * 1024 * 1024);
+ /// High-level multipart upload: calls FetchRange(Offset, Size) to read each part on demand,
+ /// avoiding loading the full content into memory.
+ /// @param TotalSize Total object size in bytes
+ /// @param FetchRange Callback invoked once per part; must return exactly Size bytes
+ /// @param PartSize Size of each part in bytes (minimum 5 MB, default 8 MB)
+ S3Result PutObjectMultipart(std::string_view Key,
+ uint64_t TotalSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)> FetchRange,
+ uint64_t PartSize = 8 * 1024 * 1024);
+
/// Generate a pre-signed URL for downloading an object (GET)
/// @param Key The object key
/// @param ExpiresIn URL validity duration (default 1 hour, max 7 days)