diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-20 17:07:47 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-03-20 17:07:47 +0100 |
| commit | bcecb25578149e999c3710aeb6948eebe4b3e1b7 (patch) | |
| tree | b09951a9de718b40211e0a58bc4b53209d21224d /src/zenserver/hub/hydration.cpp | |
| parent | auth fail no cache (#871) (diff) | |
| download | zen-de/s3-hub-hydration.tar.xz zen-de/s3-hub-hydration.zip | |
S3 hydration backend for hub modede/s3-hub-hydration
Diffstat (limited to 'src/zenserver/hub/hydration.cpp')
| -rw-r--r-- | src/zenserver/hub/hydration.cpp | 379 |
1 files changed, 379 insertions, 0 deletions
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp index e56be3934..4998ef1af 100644 --- a/src/zenserver/hub/hydration.cpp +++ b/src/zenserver/hub/hydration.cpp @@ -2,12 +2,51 @@ #include "hydration.h" +#include <zencore/basicfile.h> +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/except_fmt.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/system.h> +#include <zenutil/cloud/imdscredentials.h> +#include <zenutil/cloud/s3client.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <json11.hpp> +ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +namespace { + + /// UTC time decomposed to calendar fields with sub-second milliseconds. + struct UtcTime + { + std::tm Tm{}; + int Ms = 0; // sub-second milliseconds [0, 999] + + static UtcTime Now() + { + std::chrono::system_clock::time_point TimePoint = std::chrono::system_clock::now(); + std::time_t TimeT = std::chrono::system_clock::to_time_t(TimePoint); + int SubSecMs = + static_cast<int>((std::chrono::duration_cast<std::chrono::milliseconds>(TimePoint.time_since_epoch()) % 1000).count()); + + UtcTime Result; + Result.Ms = SubSecMs; +#if ZEN_PLATFORM_WINDOWS + gmtime_s(&Result.Tm, &TimeT); +#else + gmtime_r(&TimeT, &Result.Tm); +#endif + return Result; + } + }; + +} // namespace + /////////////////////////////////////////////////////////////////////////// constexpr std::string_view FileHydratorPrefix = "file://"; @@ -114,6 +153,340 @@ FileHydrator::Dehydrate() CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); } +/////////////////////////////////////////////////////////////////////////// + +constexpr std::string_view S3HydratorPrefix = "s3://"; + +struct S3Hydrator : public HydrationStrategyBase +{ + void Configure(const HydrationConfig& Config) override; + void Dehydrate() override; + void Hydrate() override; + +private: + S3Client CreateS3Client() const; + std::string BuildTimestampFolderName() const; + std::string MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const; + + HydrationConfig m_Config; + std::string m_Bucket; + std::string m_KeyPrefix; // "<user-prefix>/<ModuleId>" or just "<ModuleId>" - no trailing slash + std::string m_Region; + SigV4Credentials m_Credentials; + Ref<ImdsCredentialProvider> m_CredentialProvider; +}; + +void +S3Hydrator::Configure(const HydrationConfig& Config) +{ + m_Config = Config; + + std::string_view Spec = m_Config.TargetSpecification; + Spec.remove_prefix(S3HydratorPrefix.size()); + + size_t SlashPos = Spec.find('/'); + std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; + m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); + m_KeyPrefix = UserPrefix.empty() ? m_Config.ModuleId : UserPrefix + "/" + m_Config.ModuleId; + + ZEN_ASSERT(!m_Bucket.empty()); + + std::string Region = GetEnvVariable("AWS_DEFAULT_REGION"); + if (Region.empty()) + { + Region = GetEnvVariable("AWS_REGION"); + } + if (Region.empty()) + { + Region = "us-east-1"; + } + m_Region = std::move(Region); + + std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); + if (AccessKeyId.empty()) + { + m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({})); + } + else + { + m_Credentials.AccessKeyId = std::move(AccessKeyId); + m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY"); + m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); + } +} + +S3Client +S3Hydrator::CreateS3Client() const +{ + S3ClientOptions Options; + Options.BucketName = m_Bucket; + Options.Region = m_Region; + + if (m_CredentialProvider) + { + Options.CredentialProvider = m_CredentialProvider; + } + else + { + Options.Credentials = m_Credentials; + } + + return S3Client(Options); +} + +std::string +S3Hydrator::BuildTimestampFolderName() const +{ + UtcTime Now = UtcTime::Now(); + return fmt::format("{:04d}{:02d}{:02d}-{:02d}{:02d}{:02d}-{:03d}", + Now.Tm.tm_year + 1900, + Now.Tm.tm_mon + 1, + Now.Tm.tm_mday, + Now.Tm.tm_hour, + Now.Tm.tm_min, + Now.Tm.tm_sec, + Now.Ms); +} + +std::string +S3Hydrator::MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const +{ + return m_KeyPrefix + "/" + std::string(FolderName) + "/" + RelPath.generic_string(); +} + +void +S3Hydrator::Dehydrate() +{ + ZEN_INFO("Dehydrating state from '{}' to s3://{}/{}", m_Config.ServerStateDir, m_Bucket, m_KeyPrefix); + + try + { + S3Client Client = CreateS3Client(); + std::string FolderName = BuildTimestampFolderName(); + uint64_t TotalBytes = 0; + uint32_t FileCount = 0; + std::chrono::steady_clock::time_point UploadStart = std::chrono::steady_clock::now(); + + DirectoryContent DirContent; + GetDirectoryContent(m_Config.ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, DirContent); + + for (const std::filesystem::path& RelPath : DirContent.Files) + { + std::string Key = MakeObjectKey(FolderName, RelPath); + + BasicFile File(MakeSafeAbsolutePath(m_Config.ServerStateDir / RelPath), BasicFile::Mode::kRead); + uint64_t FileSize = File.FileSize(); + + S3Result UploadResult = + Client.PutObjectMultipart(Key, FileSize, [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); }); + if (!UploadResult.IsSuccess()) + { + throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, UploadResult.Error); + } + + TotalBytes += FileSize; + ++FileCount; + } + + // Write current-state.json + int64_t UploadDurationMs = + std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - UploadStart).count(); + + UtcTime Now = UtcTime::Now(); + std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", + Now.Tm.tm_year + 1900, + Now.Tm.tm_mon + 1, + Now.Tm.tm_mday, + Now.Tm.tm_hour, + Now.Tm.tm_min, + Now.Tm.tm_sec, + Now.Ms); + + CbObjectWriter Meta; + Meta << "FolderName" << FolderName; + Meta << "ModuleId" << m_Config.ModuleId; + Meta << "HostName" << GetMachineName(); + Meta << "UploadTimeUtc" << UploadTimeUtc; + Meta << "UploadDurationMs" << UploadDurationMs; + Meta << "TotalSizeBytes" << TotalBytes; + Meta << "FileCount" << FileCount; + + ExtendableStringBuilder<1024> JsonBuilder; + Meta.Save().ToJson(JsonBuilder); + + std::string MetaKey = m_KeyPrefix + "/current-state.json"; + std::string_view JsonText = JsonBuilder.ToView(); + IoBuffer MetaBuf(IoBuffer::Clone, JsonText.data(), JsonText.size()); + S3Result MetaUploadResult = Client.PutObject(MetaKey, std::move(MetaBuf)); + if (!MetaUploadResult.IsSuccess()) + { + throw zen::runtime_error("Failed to write current-state.json to '{}': {}", MetaKey, MetaUploadResult.Error); + } + + ZEN_INFO("Dehydration complete: {} files, {} bytes, {} ms", FileCount, TotalBytes, UploadDurationMs); + } + catch (std::exception& Ex) + { + // Any in-progress multipart upload has already been aborted by PutObjectMultipart. + // current-state.json is only written on success, so the previous S3 state remains valid. + ZEN_WARN("S3 dehydration failed: {}. S3 state not updated.", Ex.what()); + } +} + +void +S3Hydrator::Hydrate() +{ + ZEN_INFO("Hydrating state from s3://{}/{} to '{}'", m_Bucket, m_KeyPrefix, m_Config.ServerStateDir); + + const bool ForceRemoveReadOnlyFiles = true; + + // Clean temp dir before starting in case of leftover state from a previous failed hydration + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + + bool WipeServerState = false; + + try + { + S3Client Client = CreateS3Client(); + std::string MetaKey = m_KeyPrefix + "/current-state.json"; + + S3HeadObjectResult HeadResult = Client.HeadObject(MetaKey); + if (HeadResult.Status == HeadObjectResult::NotFound) + { + throw zen::runtime_error("No state found in S3 at '{}'", MetaKey); + } + if (!HeadResult.IsSuccess()) + { + throw zen::runtime_error("Failed to check for state in S3 at '{}': {}", MetaKey, HeadResult.Error); + } + + S3GetObjectResult MetaResult = Client.GetObject(MetaKey); + if (!MetaResult.IsSuccess()) + { + throw zen::runtime_error("Failed to read current-state.json from '{}': {}", MetaKey, MetaResult.Error); + } + + std::string ParseError; + json11::Json MetaJson = json11::Json::parse(std::string(MetaResult.AsText()), ParseError); + if (!ParseError.empty()) + { + throw zen::runtime_error("Failed to parse current-state.json from '{}': {}", MetaKey, ParseError); + } + + std::string FolderName = MetaJson["FolderName"].string_value(); + if (FolderName.empty()) + { + throw zen::runtime_error("current-state.json from '{}' has missing or empty FolderName", MetaKey); + } + + std::string FolderPrefix = m_KeyPrefix + "/" + FolderName + "/"; + S3ListObjectsResult ListResult = Client.ListObjects(FolderPrefix); + if (!ListResult.IsSuccess()) + { + throw zen::runtime_error("Failed to list S3 objects under '{}': {}", FolderPrefix, ListResult.Error); + } + + for (const S3ObjectInfo& Obj : ListResult.Objects) + { + if (!Obj.Key.starts_with(FolderPrefix)) + { + ZEN_WARN("Skipping unexpected S3 key '{}' (expected prefix '{}')", Obj.Key, FolderPrefix); + continue; + } + + std::string RelKey = Obj.Key.substr(FolderPrefix.size()); + if (RelKey.empty()) + { + continue; + } + std::filesystem::path DestPath = MakeSafeAbsolutePath(m_Config.TempDir / std::filesystem::path(RelKey)); + CreateDirectories(DestPath.parent_path()); + + BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate); + DestFile.SetFileSize(Obj.Size); + + if (Obj.Size > 0) + { + BasicFileWriter Writer(DestFile, 64 * 1024); + + uint64_t Offset = 0; + while (Offset < Obj.Size) + { + uint64_t ChunkSize = std::min<uint64_t>(8 * 1024 * 1024, Obj.Size - Offset); + S3GetObjectResult Chunk = Client.GetObjectRange(Obj.Key, Offset, ChunkSize); + if (!Chunk.IsSuccess()) + { + throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", + Obj.Key, + Offset, + Offset + ChunkSize - 1, + Chunk.Error); + } + + Writer.Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); + Offset += ChunkSize; + } + + Writer.Flush(); + } + } + + // Downloaded successfully - swap into ServerStateDir + ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); + CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + + // If the two paths share at least one common component they are on the same drive/volume + // and atomic renames will succeed. Otherwise fall back to a full copy. + auto [ItTmp, ItState] = + std::mismatch(m_Config.TempDir.begin(), m_Config.TempDir.end(), m_Config.ServerStateDir.begin(), m_Config.ServerStateDir.end()); + if (ItTmp != m_Config.TempDir.begin()) + { + // Fast path: atomic renames - no data copying needed + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.TempDir)) + { + std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / Entry.path().filename()); + if (Entry.is_directory()) + { + RenameDirectory(Entry.path(), Dest); + } + else + { + RenameFile(Entry.path(), Dest); + } + } + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + } + else + { + // Slow path: TempDir and ServerStateDir are on different filesystems, so rename + // would fail. Copy the tree instead and clean up the temp files afterwards. + ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); + CopyTree(m_Config.TempDir, m_Config.ServerStateDir, {.EnableClone = true}); + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + } + + ZEN_INFO("Hydration complete from folder '{}'", FolderName); + } + catch (std::exception& Ex) + { + ZEN_WARN("S3 hydration failed: {}. Will wipe any partially installed state.", Ex.what()); + + // We don't do the clean right here to avoid potentially running into double-throws + WipeServerState = true; + } + + if (WipeServerState) + { + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); + ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); + CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); + } +} + std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) { @@ -123,6 +496,12 @@ CreateHydrator(const HydrationConfig& Config) Hydrator->Configure(Config); return Hydrator; } + if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0) + { + std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>(); + Hydrator->Configure(Config); + return Hydrator; + } throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification)); } |