diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-20 17:07:47 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-03-20 17:07:47 +0100 |
| commit | bcecb25578149e999c3710aeb6948eebe4b3e1b7 (patch) | |
| tree | b09951a9de718b40211e0a58bc4b53209d21224d /src | |
| parent | auth fail no cache (#871) (diff) | |
| download | zen-de/s3-hub-hydration.tar.xz zen-de/s3-hub-hydration.zip | |
S3 hydration backend for hub modede/s3-hub-hydration
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/hub/hydration.cpp | 379 | ||||
| -rw-r--r-- | src/zenutil/cloud/s3client.cpp | 115 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cloud/s3client.h | 26 |
3 files changed, 486 insertions, 34 deletions
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp index e56be3934..4998ef1af 100644 --- a/src/zenserver/hub/hydration.cpp +++ b/src/zenserver/hub/hydration.cpp @@ -2,12 +2,51 @@ #include "hydration.h" +#include <zencore/basicfile.h> +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/except_fmt.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/system.h> +#include <zenutil/cloud/imdscredentials.h> +#include <zenutil/cloud/s3client.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <json11.hpp> +ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +namespace { + + /// UTC time decomposed to calendar fields with sub-second milliseconds. + struct UtcTime + { + std::tm Tm{}; + int Ms = 0; // sub-second milliseconds [0, 999] + + static UtcTime Now() + { + std::chrono::system_clock::time_point TimePoint = std::chrono::system_clock::now(); + std::time_t TimeT = std::chrono::system_clock::to_time_t(TimePoint); + int SubSecMs = + static_cast<int>((std::chrono::duration_cast<std::chrono::milliseconds>(TimePoint.time_since_epoch()) % 1000).count()); + + UtcTime Result; + Result.Ms = SubSecMs; +#if ZEN_PLATFORM_WINDOWS + gmtime_s(&Result.Tm, &TimeT); +#else + gmtime_r(&TimeT, &Result.Tm); +#endif + return Result; + } + }; + +} // namespace + /////////////////////////////////////////////////////////////////////////// constexpr std::string_view FileHydratorPrefix = "file://"; @@ -114,6 +153,340 @@ FileHydrator::Dehydrate() CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); } +/////////////////////////////////////////////////////////////////////////// + +constexpr std::string_view S3HydratorPrefix = "s3://"; + +struct S3Hydrator : public HydrationStrategyBase +{ + void Configure(const HydrationConfig& Config) override; + void Dehydrate() override; + void Hydrate() override; + +private: + S3Client CreateS3Client() const; + std::string BuildTimestampFolderName() const; + std::string MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const; + + HydrationConfig m_Config; + std::string m_Bucket; + std::string m_KeyPrefix; // "<user-prefix>/<ModuleId>" or just "<ModuleId>" - no trailing slash + std::string m_Region; + SigV4Credentials m_Credentials; + Ref<ImdsCredentialProvider> m_CredentialProvider; +}; + +void +S3Hydrator::Configure(const HydrationConfig& Config) +{ + m_Config = Config; + + std::string_view Spec = m_Config.TargetSpecification; + Spec.remove_prefix(S3HydratorPrefix.size()); + + size_t SlashPos = Spec.find('/'); + std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; + m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); + m_KeyPrefix = UserPrefix.empty() ? m_Config.ModuleId : UserPrefix + "/" + m_Config.ModuleId; + + ZEN_ASSERT(!m_Bucket.empty()); + + std::string Region = GetEnvVariable("AWS_DEFAULT_REGION"); + if (Region.empty()) + { + Region = GetEnvVariable("AWS_REGION"); + } + if (Region.empty()) + { + Region = "us-east-1"; + } + m_Region = std::move(Region); + + std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); + if (AccessKeyId.empty()) + { + m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({})); + } + else + { + m_Credentials.AccessKeyId = std::move(AccessKeyId); + m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY"); + m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); + } +} + +S3Client +S3Hydrator::CreateS3Client() const +{ + S3ClientOptions Options; + Options.BucketName = m_Bucket; + Options.Region = m_Region; + + if (m_CredentialProvider) + { + Options.CredentialProvider = m_CredentialProvider; + } + else + { + Options.Credentials = m_Credentials; + } + + return S3Client(Options); +} + +std::string +S3Hydrator::BuildTimestampFolderName() const +{ + UtcTime Now = UtcTime::Now(); + return fmt::format("{:04d}{:02d}{:02d}-{:02d}{:02d}{:02d}-{:03d}", + Now.Tm.tm_year + 1900, + Now.Tm.tm_mon + 1, + Now.Tm.tm_mday, + Now.Tm.tm_hour, + Now.Tm.tm_min, + Now.Tm.tm_sec, + Now.Ms); +} + +std::string +S3Hydrator::MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const +{ + return m_KeyPrefix + "/" + std::string(FolderName) + "/" + RelPath.generic_string(); +} + +void +S3Hydrator::Dehydrate() +{ + ZEN_INFO("Dehydrating state from '{}' to s3://{}/{}", m_Config.ServerStateDir, m_Bucket, m_KeyPrefix); + + try + { + S3Client Client = CreateS3Client(); + std::string FolderName = BuildTimestampFolderName(); + uint64_t TotalBytes = 0; + uint32_t FileCount = 0; + std::chrono::steady_clock::time_point UploadStart = std::chrono::steady_clock::now(); + + DirectoryContent DirContent; + GetDirectoryContent(m_Config.ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, DirContent); + + for (const std::filesystem::path& RelPath : DirContent.Files) + { + std::string Key = MakeObjectKey(FolderName, RelPath); + + BasicFile File(MakeSafeAbsolutePath(m_Config.ServerStateDir / RelPath), BasicFile::Mode::kRead); + uint64_t FileSize = File.FileSize(); + + S3Result UploadResult = + Client.PutObjectMultipart(Key, FileSize, [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); }); + if (!UploadResult.IsSuccess()) + { + throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, UploadResult.Error); + } + + TotalBytes += FileSize; + ++FileCount; + } + + // Write current-state.json + int64_t UploadDurationMs = + std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - UploadStart).count(); + + UtcTime Now = UtcTime::Now(); + std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", + Now.Tm.tm_year + 1900, + Now.Tm.tm_mon + 1, + Now.Tm.tm_mday, + Now.Tm.tm_hour, + Now.Tm.tm_min, + Now.Tm.tm_sec, + Now.Ms); + + CbObjectWriter Meta; + Meta << "FolderName" << FolderName; + Meta << "ModuleId" << m_Config.ModuleId; + Meta << "HostName" << GetMachineName(); + Meta << "UploadTimeUtc" << UploadTimeUtc; + Meta << "UploadDurationMs" << UploadDurationMs; + Meta << "TotalSizeBytes" << TotalBytes; + Meta << "FileCount" << FileCount; + + ExtendableStringBuilder<1024> JsonBuilder; + Meta.Save().ToJson(JsonBuilder); + + std::string MetaKey = m_KeyPrefix + "/current-state.json"; + std::string_view JsonText = JsonBuilder.ToView(); + IoBuffer MetaBuf(IoBuffer::Clone, JsonText.data(), JsonText.size()); + S3Result MetaUploadResult = Client.PutObject(MetaKey, std::move(MetaBuf)); + if (!MetaUploadResult.IsSuccess()) + { + throw zen::runtime_error("Failed to write current-state.json to '{}': {}", MetaKey, MetaUploadResult.Error); + } + + ZEN_INFO("Dehydration complete: {} files, {} bytes, {} ms", FileCount, TotalBytes, UploadDurationMs); + } + catch (std::exception& Ex) + { + // Any in-progress multipart upload has already been aborted by PutObjectMultipart. + // current-state.json is only written on success, so the previous S3 state remains valid. + ZEN_WARN("S3 dehydration failed: {}. S3 state not updated.", Ex.what()); + } +} + +void +S3Hydrator::Hydrate() +{ + ZEN_INFO("Hydrating state from s3://{}/{} to '{}'", m_Bucket, m_KeyPrefix, m_Config.ServerStateDir); + + const bool ForceRemoveReadOnlyFiles = true; + + // Clean temp dir before starting in case of leftover state from a previous failed hydration + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + + bool WipeServerState = false; + + try + { + S3Client Client = CreateS3Client(); + std::string MetaKey = m_KeyPrefix + "/current-state.json"; + + S3HeadObjectResult HeadResult = Client.HeadObject(MetaKey); + if (HeadResult.Status == HeadObjectResult::NotFound) + { + throw zen::runtime_error("No state found in S3 at '{}'", MetaKey); + } + if (!HeadResult.IsSuccess()) + { + throw zen::runtime_error("Failed to check for state in S3 at '{}': {}", MetaKey, HeadResult.Error); + } + + S3GetObjectResult MetaResult = Client.GetObject(MetaKey); + if (!MetaResult.IsSuccess()) + { + throw zen::runtime_error("Failed to read current-state.json from '{}': {}", MetaKey, MetaResult.Error); + } + + std::string ParseError; + json11::Json MetaJson = json11::Json::parse(std::string(MetaResult.AsText()), ParseError); + if (!ParseError.empty()) + { + throw zen::runtime_error("Failed to parse current-state.json from '{}': {}", MetaKey, ParseError); + } + + std::string FolderName = MetaJson["FolderName"].string_value(); + if (FolderName.empty()) + { + throw zen::runtime_error("current-state.json from '{}' has missing or empty FolderName", MetaKey); + } + + std::string FolderPrefix = m_KeyPrefix + "/" + FolderName + "/"; + S3ListObjectsResult ListResult = Client.ListObjects(FolderPrefix); + if (!ListResult.IsSuccess()) + { + throw zen::runtime_error("Failed to list S3 objects under '{}': {}", FolderPrefix, ListResult.Error); + } + + for (const S3ObjectInfo& Obj : ListResult.Objects) + { + if (!Obj.Key.starts_with(FolderPrefix)) + { + ZEN_WARN("Skipping unexpected S3 key '{}' (expected prefix '{}')", Obj.Key, FolderPrefix); + continue; + } + + std::string RelKey = Obj.Key.substr(FolderPrefix.size()); + if (RelKey.empty()) + { + continue; + } + std::filesystem::path DestPath = MakeSafeAbsolutePath(m_Config.TempDir / std::filesystem::path(RelKey)); + CreateDirectories(DestPath.parent_path()); + + BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate); + DestFile.SetFileSize(Obj.Size); + + if (Obj.Size > 0) + { + BasicFileWriter Writer(DestFile, 64 * 1024); + + uint64_t Offset = 0; + while (Offset < Obj.Size) + { + uint64_t ChunkSize = std::min<uint64_t>(8 * 1024 * 1024, Obj.Size - Offset); + S3GetObjectResult Chunk = Client.GetObjectRange(Obj.Key, Offset, ChunkSize); + if (!Chunk.IsSuccess()) + { + throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", + Obj.Key, + Offset, + Offset + ChunkSize - 1, + Chunk.Error); + } + + Writer.Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); + Offset += ChunkSize; + } + + Writer.Flush(); + } + } + + // Downloaded successfully - swap into ServerStateDir + ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); + CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + + // If the two paths share at least one common component they are on the same drive/volume + // and atomic renames will succeed. Otherwise fall back to a full copy. + auto [ItTmp, ItState] = + std::mismatch(m_Config.TempDir.begin(), m_Config.TempDir.end(), m_Config.ServerStateDir.begin(), m_Config.ServerStateDir.end()); + if (ItTmp != m_Config.TempDir.begin()) + { + // Fast path: atomic renames - no data copying needed + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.TempDir)) + { + std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / Entry.path().filename()); + if (Entry.is_directory()) + { + RenameDirectory(Entry.path(), Dest); + } + else + { + RenameFile(Entry.path(), Dest); + } + } + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + } + else + { + // Slow path: TempDir and ServerStateDir are on different filesystems, so rename + // would fail. Copy the tree instead and clean up the temp files afterwards. + ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); + CopyTree(m_Config.TempDir, m_Config.ServerStateDir, {.EnableClone = true}); + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + } + + ZEN_INFO("Hydration complete from folder '{}'", FolderName); + } + catch (std::exception& Ex) + { + ZEN_WARN("S3 hydration failed: {}. Will wipe any partially installed state.", Ex.what()); + + // We don't do the clean right here to avoid potentially running into double-throws + WipeServerState = true; + } + + if (WipeServerState) + { + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + } +} + std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) { @@ -123,6 +496,12 @@ CreateHydrator(const HydrationConfig& Config) Hydrator->Configure(Config); return Hydrator; } + if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0) + { + std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>(); + Hydrator->Configure(Config); + return Hydrator; + } throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification)); } diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp index 88d844b61..26d1023f4 100644 --- a/src/zenutil/cloud/s3client.cpp +++ b/src/zenutil/cloud/s3client.cpp @@ -171,14 +171,19 @@ S3Client::GetCurrentCredentials() SigV4Credentials Creds = m_CredentialProvider->GetCredentials(); if (!Creds.AccessKeyId.empty()) { - // Invalidate the signing key cache when the access key changes + // Invalidate the signing key cache when the access key changes, and update stored + // credentials atomically under the same lock so callers see a consistent snapshot. + RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock); if (Creds.AccessKeyId != m_Credentials.AccessKeyId) { - RwLock::ExclusiveLockScope ExclusiveLock(m_SigningKeyLock); m_CachedDateStamp.clear(); } m_Credentials = Creds; + // Return Creds directly - avoids reading m_Credentials after releasing the lock, + // which would race with another concurrent write. + return Creds; } + // IMDS returned empty credentials; fall back to the last known-good credentials. return m_Credentials; } return m_Credentials; @@ -252,7 +257,7 @@ S3Client::BucketRootPath() const Sha256Digest S3Client::GetSigningKey(std::string_view DateStamp) { - // Fast path: shared lock for cache hit (common case — key only changes once per day) + // Fast path: shared lock for cache hit (common case - key only changes once per day) { RwLock::SharedLockScope SharedLock(m_SigningKeyLock); if (m_CachedDateStamp == DateStamp) @@ -360,6 +365,46 @@ S3Client::GetObject(std::string_view Key) return S3GetObjectResult{{}, std::move(Response.ResponsePayload)}; } +S3GetObjectResult +S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t RangeSize) +{ + ZEN_ASSERT(RangeSize > 0); + std::string Path = KeyToPath(Key); + + HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash); + Headers->emplace("Range", fmt::format("bytes={}-{}", RangeStart, RangeStart + RangeSize - 1)); + + HttpClient::Response Response = m_HttpClient.Get(Path, Headers); + if (!Response.IsSuccess()) + { + std::string Err = Response.ErrorMessage("S3 GET range failed"); + ZEN_WARN("S3 GET range '{}' [{}-{}] failed: {}", Key, RangeStart, RangeStart + RangeSize - 1, Err); + return S3GetObjectResult{S3Result{std::move(Err)}, {}}; + } + + // Callers are expected to request only ranges that lie within the known object size (e.g. + // by calling HeadObject first). Treat a short read as an error rather than silently + // returning a truncated buffer - a partial write is more dangerous than a hard failure. + if (Response.ResponsePayload.GetSize() != RangeSize) + { + std::string Err = fmt::format("S3 GET range '{}' [{}-{}] returned {} bytes, expected {}", + Key, + RangeStart, + RangeStart + RangeSize - 1, + Response.ResponsePayload.GetSize(), + RangeSize); + ZEN_WARN("{}", Err); + return S3GetObjectResult{S3Result{std::move(Err)}, {}}; + } + + ZEN_DEBUG("S3 GET range '{}' [{}-{}] succeeded ({} bytes)", + Key, + RangeStart, + RangeStart + RangeSize - 1, + Response.ResponsePayload.GetSize()); + return S3GetObjectResult{{}, std::move(Response.ResponsePayload)}; +} + S3Result S3Client::DeleteObject(std::string_view Key) { @@ -693,19 +738,18 @@ S3Client::GeneratePresignedUrlForMethod(std::string_view Key, std::string_view M } S3Result -S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize) +S3Client::PutObjectMultipart(std::string_view Key, + uint64_t TotalSize, + std::function<IoBuffer(uint64_t Offset, uint64_t Size)> FetchRange, + uint64_t PartSize) { - const uint64_t ContentSize = Content.GetSize(); - // If the content fits in a single part, just use PutObject - if (ContentSize <= PartSize) + if (TotalSize <= PartSize) { - return PutObject(Key, Content); + return PutObject(Key, TotalSize > 0 ? FetchRange(0, TotalSize) : IoBuffer{}); } - ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, ContentSize, (ContentSize + PartSize - 1) / PartSize); - - // Initiate multipart upload + ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize); S3CreateMultipartUploadResult InitResult = CreateMultipartUpload(Key); if (!InitResult) @@ -722,38 +766,51 @@ S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t Pa uint64_t Offset = 0; uint32_t PartNumber = 1; - while (Offset < ContentSize) + try { - uint64_t ThisPartSize = std::min(PartSize, ContentSize - Offset); + while (Offset < TotalSize) + { + uint64_t ThisPartSize = std::min(PartSize, TotalSize - Offset); + IoBuffer PartContent = FetchRange(Offset, ThisPartSize); + S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, std::move(PartContent)); + if (!PartResult) + { + AbortMultipartUpload(Key, UploadId); + return S3Result{std::move(PartResult.Error)}; + } - // Create a sub-buffer referencing the part data within the original content - IoBuffer PartContent(Content, Offset, ThisPartSize); + PartETags.emplace_back(PartNumber, std::move(PartResult.ETag)); + Offset += ThisPartSize; + PartNumber++; + } - S3UploadPartResult PartResult = UploadPart(Key, UploadId, PartNumber, PartContent); - if (!PartResult) + S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags); + if (!CompleteResult) { - // Attempt to abort the multipart upload on failure AbortMultipartUpload(Key, UploadId); - return S3Result{std::move(PartResult.Error)}; + return CompleteResult; } - - PartETags.emplace_back(PartNumber, std::move(PartResult.ETag)); - Offset += ThisPartSize; - PartNumber++; } - - // Complete multipart upload - S3Result CompleteResult = CompleteMultipartUpload(Key, UploadId, PartETags); - if (!CompleteResult) + catch (...) { AbortMultipartUpload(Key, UploadId); - return CompleteResult; + throw; } - ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), ContentSize); + ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize); return {}; } +S3Result +S3Client::PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize) +{ + return PutObjectMultipart( + Key, + Content.GetSize(), + [&Content](uint64_t Offset, uint64_t Size) { return IoBuffer(Content, Offset, Size); }, + PartSize); +} + ////////////////////////////////////////////////////////////////////////// // Tests diff --git a/src/zenutil/include/zenutil/cloud/s3client.h b/src/zenutil/include/zenutil/cloud/s3client.h index 47501c5b5..bd30aa8a2 100644 --- a/src/zenutil/include/zenutil/cloud/s3client.h +++ b/src/zenutil/include/zenutil/cloud/s3client.h @@ -11,6 +11,7 @@ #include <zencore/thread.h> +#include <functional> #include <string> #include <string_view> #include <vector> @@ -63,7 +64,7 @@ enum class HeadObjectResult Error, }; -/// Result of GetObject — carries the downloaded content. +/// Result of GetObject - carries the downloaded content. struct S3GetObjectResult : S3Result { IoBuffer Content; @@ -71,26 +72,26 @@ struct S3GetObjectResult : S3Result std::string_view AsText() const { return std::string_view(reinterpret_cast<const char*>(Content.GetData()), Content.GetSize()); } }; -/// Result of HeadObject — carries object metadata and existence status. +/// Result of HeadObject - carries object metadata and existence status. struct S3HeadObjectResult : S3Result { S3ObjectInfo Info; HeadObjectResult Status = HeadObjectResult::NotFound; }; -/// Result of ListObjects — carries the list of matching objects. +/// Result of ListObjects - carries the list of matching objects. struct S3ListObjectsResult : S3Result { std::vector<S3ObjectInfo> Objects; }; -/// Result of CreateMultipartUpload — carries the upload ID. +/// Result of CreateMultipartUpload - carries the upload ID. struct S3CreateMultipartUploadResult : S3Result { std::string UploadId; }; -/// Result of UploadPart — carries the part ETag. +/// Result of UploadPart - carries the part ETag. struct S3UploadPartResult : S3Result { std::string ETag; @@ -120,6 +121,11 @@ public: /// Download an object from S3 S3GetObjectResult GetObject(std::string_view Key); + /// Download a byte range of an object from S3 + /// @param RangeStart First byte offset (inclusive) + /// @param RangeSize Number of bytes to download + S3GetObjectResult GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t RangeSize); + /// Delete an object from S3 S3Result DeleteObject(std::string_view Key); @@ -151,6 +157,16 @@ public: /// @param PartSize Size of each part in bytes (minimum 5 MB, default 8 MB) S3Result PutObjectMultipart(std::string_view Key, IoBuffer Content, uint64_t PartSize = 8 * 1024 * 1024); + /// High-level multipart upload: calls FetchRange(Offset, Size) to read each part on demand, + /// avoiding loading the full content into memory. + /// @param TotalSize Total object size in bytes + /// @param FetchRange Callback invoked once per part; must return exactly Size bytes + /// @param PartSize Size of each part in bytes (minimum 5 MB, default 8 MB) + S3Result PutObjectMultipart(std::string_view Key, + uint64_t TotalSize, + std::function<IoBuffer(uint64_t Offset, uint64_t Size)> FetchRange, + uint64_t PartSize = 8 * 1024 * 1024); + /// Generate a pre-signed URL for downloading an object (GET) /// @param Key The object key /// @param ExpiresIn URL validity duration (default 1 hour, max 7 days) |