aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/hydration.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/hub/hydration.cpp')
-rw-r--r--src/zenserver/hub/hydration.cpp1306
1 files changed, 1300 insertions, 6 deletions
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index 0e78f8545..ed16bfe56 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -2,14 +2,67 @@
#include "hydration.h"
+#include <zencore/basicfile.h>
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/except_fmt.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/system.h>
+#include <zencore/timer.h>
+#include <zenutil/cloud/imdscredentials.h>
+#include <zenutil/cloud/s3client.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <json11.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#if ZEN_WITH_TESTS
+# include <zencore/parallelwork.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/thread.h>
+# include <zencore/workthreadpool.h>
+# include <zenutil/cloud/minioprocess.h>
+# include <cstring>
+#endif // ZEN_WITH_TESTS
namespace zen {
+namespace {
+
+ /// UTC time decomposed to calendar fields with sub-second milliseconds.
+ struct UtcTime
+ {
+ std::tm Tm{};
+ int Ms = 0; // sub-second milliseconds [0, 999]
+
+ static UtcTime Now()
+ {
+ std::chrono::system_clock::time_point TimePoint = std::chrono::system_clock::now();
+ std::time_t TimeT = std::chrono::system_clock::to_time_t(TimePoint);
+ int SubSecMs =
+ static_cast<int>((std::chrono::duration_cast<std::chrono::milliseconds>(TimePoint.time_since_epoch()) % 1000).count());
+
+ UtcTime Result;
+ Result.Ms = SubSecMs;
+#if ZEN_PLATFORM_WINDOWS
+ gmtime_s(&Result.Tm, &TimeT);
+#else
+ gmtime_r(&TimeT, &Result.Tm);
+#endif
+ return Result;
+ }
+ };
+
+} // namespace
+
///////////////////////////////////////////////////////////////////////////
+constexpr std::string_view FileHydratorPrefix = "file://";
+constexpr std::string_view FileHydratorType = "file";
+
struct FileHydrator : public HydrationStrategyBase
{
virtual void Configure(const HydrationConfig& Config) override;
@@ -26,7 +79,22 @@ FileHydrator::Configure(const HydrationConfig& Config)
{
m_Config = Config;
- std::filesystem::path ConfigPath(Utf8ToWide(m_Config.TargetSpecification));
+ std::filesystem::path ConfigPath;
+ if (!m_Config.TargetSpecification.empty())
+ {
+ ConfigPath = Utf8ToWide(m_Config.TargetSpecification.substr(FileHydratorPrefix.length()));
+ }
+ else
+ {
+ CbObjectView Settings = m_Config.Options["settings"].AsObjectView();
+ std::string_view Path = Settings["path"].AsString();
+ if (Path.empty())
+ {
+ throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'");
+ }
+ ConfigPath = Utf8ToWide(std::string(Path));
+ }
+ MakeSafeAbsolutePathInPlace(ConfigPath);
if (!std::filesystem::exists(ConfigPath))
{
@@ -43,6 +111,8 @@ FileHydrator::Hydrate()
{
ZEN_INFO("Hydrating state from '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir);
+ Stopwatch Timer;
+
// Ensure target is clean
ZEN_DEBUG("Wiping server state at '{}'", m_Config.ServerStateDir);
const bool ForceRemoveReadOnlyFiles = true;
@@ -68,8 +138,10 @@ FileHydrator::Hydrate()
ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
}
-
- // Note that we leave the storage state intact until next dehydration replaces the content
+ else
+ {
+ ZEN_INFO("Hydration complete in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
}
void
@@ -77,6 +149,8 @@ FileHydrator::Dehydrate()
{
ZEN_INFO("Dehydrating state from '{}' to '{}'", m_Config.ServerStateDir, m_StorageModuleRootDir);
+ Stopwatch Timer;
+
const std::filesystem::path TargetDir = m_StorageModuleRootDir;
// Ensure target is clean. This could be replaced with an atomic copy at a later date
@@ -91,7 +165,23 @@ FileHydrator::Dehydrate()
try
{
ZEN_DEBUG("Copying '{}' to '{}'", m_Config.ServerStateDir, TargetDir);
- CopyTree(m_Config.ServerStateDir, TargetDir, {.EnableClone = true});
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.ServerStateDir))
+ {
+ if (Entry.path().filename() == ".sentry-native")
+ {
+ continue;
+ }
+ std::filesystem::path Dest = TargetDir / Entry.path().filename();
+ if (Entry.is_directory())
+ {
+ CreateDirectories(Dest);
+ CopyTree(Entry.path(), Dest, {.EnableClone = true});
+ }
+ else
+ {
+ CopyFile(Entry.path(), Dest, {.EnableClone = true});
+ }
+ }
}
catch (std::exception& Ex)
{
@@ -109,12 +199,1216 @@ FileHydrator::Dehydrate()
ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir);
CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+
+ if (CopySuccess)
+ {
+ ZEN_INFO("Dehydration complete in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+constexpr std::string_view S3HydratorPrefix = "s3://";
+constexpr std::string_view S3HydratorType = "s3";
+
+struct S3Hydrator : public HydrationStrategyBase
+{
+ void Configure(const HydrationConfig& Config) override;
+ void Dehydrate() override;
+ void Hydrate() override;
+
+private:
+ S3Client CreateS3Client() const;
+ std::string BuildTimestampFolderName() const;
+ std::string MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const;
+
+ HydrationConfig m_Config;
+ std::string m_Bucket;
+ std::string m_KeyPrefix; // "<user-prefix>/<ModuleId>" or just "<ModuleId>" - no trailing slash
+ std::string m_Region;
+ SigV4Credentials m_Credentials;
+ Ref<ImdsCredentialProvider> m_CredentialProvider;
+
+ static constexpr uint64_t MultipartChunkSize = 8 * 1024 * 1024;
+};
+
+void
+S3Hydrator::Configure(const HydrationConfig& Config)
+{
+ m_Config = Config;
+
+ CbObjectView Settings = m_Config.Options["settings"].AsObjectView();
+ std::string_view Spec;
+ if (!m_Config.TargetSpecification.empty())
+ {
+ Spec = m_Config.TargetSpecification;
+ Spec.remove_prefix(S3HydratorPrefix.size());
+ }
+ else
+ {
+ std::string_view Uri = Settings["uri"].AsString();
+ if (Uri.empty())
+ {
+ throw zen::runtime_error("Hydration config 's3' type requires 'settings.uri'");
+ }
+ Spec = Uri;
+ Spec.remove_prefix(S3HydratorPrefix.size());
+ }
+
+ size_t SlashPos = Spec.find('/');
+ std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{};
+ m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec);
+ m_KeyPrefix = UserPrefix.empty() ? m_Config.ModuleId : UserPrefix + "/" + m_Config.ModuleId;
+
+ ZEN_ASSERT(!m_Bucket.empty());
+
+ std::string Region = std::string(Settings["region"].AsString());
+ if (Region.empty())
+ {
+ Region = GetEnvVariable("AWS_DEFAULT_REGION");
+ }
+ if (Region.empty())
+ {
+ Region = GetEnvVariable("AWS_REGION");
+ }
+ if (Region.empty())
+ {
+ Region = "us-east-1";
+ }
+ m_Region = std::move(Region);
+
+ std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID");
+ if (AccessKeyId.empty())
+ {
+ m_CredentialProvider = Ref<ImdsCredentialProvider>(new ImdsCredentialProvider({}));
+ }
+ else
+ {
+ m_Credentials.AccessKeyId = std::move(AccessKeyId);
+ m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY");
+ m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN");
+ }
+}
+
+S3Client
+S3Hydrator::CreateS3Client() const
+{
+ S3ClientOptions Options;
+ Options.BucketName = m_Bucket;
+ Options.Region = m_Region;
+
+ CbObjectView Settings = m_Config.Options["settings"].AsObjectView();
+ std::string_view Endpoint = Settings["endpoint"].AsString();
+ if (!Endpoint.empty())
+ {
+ Options.Endpoint = std::string(Endpoint);
+ Options.PathStyle = Settings["path-style"].AsBool();
+ }
+
+ if (m_CredentialProvider)
+ {
+ Options.CredentialProvider = m_CredentialProvider;
+ }
+ else
+ {
+ Options.Credentials = m_Credentials;
+ }
+
+ Options.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u;
+
+ return S3Client(Options);
+}
+
+std::string
+S3Hydrator::BuildTimestampFolderName() const
+{
+ UtcTime Now = UtcTime::Now();
+ return fmt::format("{:04d}{:02d}{:02d}-{:02d}{:02d}{:02d}-{:03d}",
+ Now.Tm.tm_year + 1900,
+ Now.Tm.tm_mon + 1,
+ Now.Tm.tm_mday,
+ Now.Tm.tm_hour,
+ Now.Tm.tm_min,
+ Now.Tm.tm_sec,
+ Now.Ms);
+}
+
+std::string
+S3Hydrator::MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const
+{
+ return m_KeyPrefix + "/" + std::string(FolderName) + "/" + RelPath.generic_string();
+}
+
+void
+S3Hydrator::Dehydrate()
+{
+ ZEN_INFO("Dehydrating state from '{}' to s3://{}/{}", m_Config.ServerStateDir, m_Bucket, m_KeyPrefix);
+
+ try
+ {
+ S3Client Client = CreateS3Client();
+ std::string FolderName = BuildTimestampFolderName();
+ uint64_t TotalBytes = 0;
+ uint32_t FileCount = 0;
+ Stopwatch Timer;
+
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_Config.ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, DirContent);
+
+ for (const std::filesystem::path& AbsPath : DirContent.Files)
+ {
+ std::filesystem::path RelPath = AbsPath.lexically_relative(m_Config.ServerStateDir);
+ if (RelPath.empty() || *RelPath.begin() == "..")
+ {
+ throw zen::runtime_error(
+ "lexically_relative produced a '..'-escape path for '{}' relative to '{}' - "
+ "path form mismatch (e.g. \\\\?\\ prefix on one but not the other)",
+ AbsPath.string(),
+ m_Config.ServerStateDir.string());
+ }
+ if (*RelPath.begin() == ".sentry-native")
+ {
+ continue;
+ }
+ std::string Key = MakeObjectKey(FolderName, RelPath);
+
+ BasicFile File(AbsPath, BasicFile::Mode::kRead);
+ uint64_t FileSize = File.FileSize();
+
+ S3Result UploadResult = Client.PutObjectMultipart(
+ Key,
+ FileSize,
+ [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); },
+ MultipartChunkSize);
+ if (!UploadResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, UploadResult.Error);
+ }
+
+ TotalBytes += FileSize;
+ ++FileCount;
+ }
+
+ // Write current-state.json
+ uint64_t UploadDurationMs = Timer.GetElapsedTimeMs();
+
+ UtcTime Now = UtcTime::Now();
+ std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
+ Now.Tm.tm_year + 1900,
+ Now.Tm.tm_mon + 1,
+ Now.Tm.tm_mday,
+ Now.Tm.tm_hour,
+ Now.Tm.tm_min,
+ Now.Tm.tm_sec,
+ Now.Ms);
+
+ CbObjectWriter Meta;
+ Meta << "FolderName" << FolderName;
+ Meta << "ModuleId" << m_Config.ModuleId;
+ Meta << "HostName" << GetMachineName();
+ Meta << "UploadTimeUtc" << UploadTimeUtc;
+ Meta << "UploadDurationMs" << UploadDurationMs;
+ Meta << "TotalSizeBytes" << TotalBytes;
+ Meta << "FileCount" << FileCount;
+
+ ExtendableStringBuilder<1024> JsonBuilder;
+ Meta.Save().ToJson(JsonBuilder);
+
+ std::string MetaKey = m_KeyPrefix + "/current-state.json";
+ std::string_view JsonText = JsonBuilder.ToView();
+ IoBuffer MetaBuf(IoBuffer::Clone, JsonText.data(), JsonText.size());
+ S3Result MetaUploadResult = Client.PutObject(MetaKey, std::move(MetaBuf));
+ if (!MetaUploadResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to write current-state.json to '{}': {}", MetaKey, MetaUploadResult.Error);
+ }
+
+ ZEN_INFO("Dehydration complete: {} files, {}, {}", FileCount, NiceBytes(TotalBytes), NiceTimeSpanMs(UploadDurationMs));
+ }
+ catch (std::exception& Ex)
+ {
+ // Any in-progress multipart upload has already been aborted by PutObjectMultipart.
+ // current-state.json is only written on success, so the previous S3 state remains valid.
+ ZEN_WARN("S3 dehydration failed: {}. S3 state not updated.", Ex.what());
+ }
+}
+
+void
+S3Hydrator::Hydrate()
+{
+ ZEN_INFO("Hydrating state from s3://{}/{} to '{}'", m_Bucket, m_KeyPrefix, m_Config.ServerStateDir);
+
+ Stopwatch Timer;
+ const bool ForceRemoveReadOnlyFiles = true;
+
+ // Clean temp dir before starting in case of leftover state from a previous failed hydration
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+
+ bool WipeServerState = false;
+
+ try
+ {
+ S3Client Client = CreateS3Client();
+ std::string MetaKey = m_KeyPrefix + "/current-state.json";
+
+ S3GetObjectResult MetaResult = Client.GetObject(MetaKey);
+ if (!MetaResult.IsSuccess())
+ {
+ if (MetaResult.Error == S3GetObjectResult::NotFoundErrorText)
+ {
+ ZEN_INFO("No state found in S3 at {}", MetaKey);
+
+ ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+ return;
+ }
+ throw zen::runtime_error("Failed to read current-state.json from '{}': {}", MetaKey, MetaResult.Error);
+ }
+
+ std::string ParseError;
+ json11::Json MetaJson = json11::Json::parse(std::string(MetaResult.AsText()), ParseError);
+ if (!ParseError.empty())
+ {
+ throw zen::runtime_error("Failed to parse current-state.json from '{}': {}", MetaKey, ParseError);
+ }
+
+ std::string FolderName = MetaJson["FolderName"].string_value();
+ if (FolderName.empty())
+ {
+ throw zen::runtime_error("current-state.json from '{}' has missing or empty FolderName", MetaKey);
+ }
+
+ std::string FolderPrefix = m_KeyPrefix + "/" + FolderName + "/";
+ S3ListObjectsResult ListResult = Client.ListObjects(FolderPrefix);
+ if (!ListResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to list S3 objects under '{}': {}", FolderPrefix, ListResult.Error);
+ }
+
+ for (const S3ObjectInfo& Obj : ListResult.Objects)
+ {
+ if (!Obj.Key.starts_with(FolderPrefix))
+ {
+ ZEN_WARN("Skipping unexpected S3 key '{}' (expected prefix '{}')", Obj.Key, FolderPrefix);
+ continue;
+ }
+
+ std::string RelKey = Obj.Key.substr(FolderPrefix.size());
+ if (RelKey.empty())
+ {
+ continue;
+ }
+ std::filesystem::path DestPath = MakeSafeAbsolutePath(m_Config.TempDir / std::filesystem::path(RelKey));
+ CreateDirectories(DestPath.parent_path());
+
+ if (Obj.Size > MultipartChunkSize)
+ {
+ BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate);
+ DestFile.SetFileSize(Obj.Size);
+
+ BasicFileWriter Writer(DestFile, 64 * 1024);
+
+ uint64_t Offset = 0;
+ while (Offset < Obj.Size)
+ {
+ uint64_t ChunkSize = std::min<uint64_t>(MultipartChunkSize, Obj.Size - Offset);
+ S3GetObjectResult Chunk = Client.GetObjectRange(Obj.Key, Offset, ChunkSize);
+ if (!Chunk.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}",
+ Obj.Key,
+ Offset,
+ Offset + ChunkSize - 1,
+ Chunk.Error);
+ }
+
+ Writer.Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset);
+ Offset += ChunkSize;
+ }
+
+ Writer.Flush();
+ }
+ else
+ {
+ S3GetObjectResult Chunk = Client.GetObject(Obj.Key, m_Config.TempDir);
+ if (!Chunk.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to download '{}' from S3: {}", Obj.Key, Chunk.Error);
+ }
+
+ if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef))
+ {
+ std::error_code Ec;
+ std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (Ec)
+ {
+ WriteFile(DestPath, Chunk.Content);
+ }
+ else
+ {
+ Chunk.Content.SetDeleteOnClose(false);
+ Chunk.Content = {};
+ RenameFile(ChunkPath, DestPath, Ec);
+ }
+ }
+ else
+ {
+ WriteFile(DestPath, Chunk.Content);
+ }
+ }
+ }
+
+ // Downloaded successfully - swap into ServerStateDir
+ ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+
+ // If the two paths share at least one common component they are on the same drive/volume
+ // and atomic renames will succeed. Otherwise fall back to a full copy.
+ auto [ItTmp, ItState] =
+ std::mismatch(m_Config.TempDir.begin(), m_Config.TempDir.end(), m_Config.ServerStateDir.begin(), m_Config.ServerStateDir.end());
+ if (ItTmp != m_Config.TempDir.begin())
+ {
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_Config.TempDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, DirContent);
+
+ for (const std::filesystem::path& AbsPath : DirContent.Directories)
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / AbsPath.filename());
+ RenameDirectory(AbsPath, Dest);
+ }
+ for (const std::filesystem::path& AbsPath : DirContent.Files)
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / AbsPath.filename());
+ RenameFile(AbsPath, Dest);
+ }
+
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+ }
+ else
+ {
+ // Slow path: TempDir and ServerStateDir are on different filesystems, so rename
+ // would fail. Copy the tree instead and clean up the temp files afterwards.
+ ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree");
+ CopyTree(m_Config.TempDir, m_Config.ServerStateDir, {.EnableClone = true});
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+ }
+
+ ZEN_INFO("Hydration complete from folder '{}' in {}", FolderName, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("S3 hydration failed: {}. Will wipe any partially installed state.", Ex.what());
+
+ // We don't do the clean right here to avoid potentially running into double-throws
+ WipeServerState = true;
+ }
+
+ if (WipeServerState)
+ {
+ ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
+ }
}
std::unique_ptr<HydrationStrategyBase>
-CreateFileHydrator()
+CreateHydrator(const HydrationConfig& Config)
{
- return std::make_unique<FileHydrator>();
+ if (!Config.TargetSpecification.empty())
+ {
+ if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0)
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>();
+ Hydrator->Configure(Config);
+ return Hydrator;
+ }
+ if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0)
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>();
+ Hydrator->Configure(Config);
+ return Hydrator;
+ }
+ throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification));
+ }
+
+ std::string_view Type = Config.Options["type"].AsString();
+ if (Type == FileHydratorType)
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>();
+ Hydrator->Configure(Config);
+ return Hydrator;
+ }
+ if (Type == S3HydratorType)
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>();
+ Hydrator->Configure(Config);
+ return Hydrator;
+ }
+ if (!Type.empty())
+ {
+ throw zen::runtime_error("Unknown hydration target type '{}'", Type);
+ }
+ throw zen::runtime_error("No hydration target configured");
}
+#if ZEN_WITH_TESTS
+
+namespace {
+
+ /// Scoped RAII helper to set/restore a single environment variable within a test.
+ /// Used to configure AWS credentials for each S3 test's MinIO instance
+ /// without polluting the global environment.
+ struct ScopedEnvVar
+ {
+ std::string m_Name;
+ std::optional<std::string> m_OldValue; // nullopt = was not set; "" = was set to empty string
+
+ ScopedEnvVar(std::string_view Name, std::string_view Value) : m_Name(Name)
+ {
+# if ZEN_PLATFORM_WINDOWS
+ // Use the raw API so we can distinguish "not set" (ERROR_ENVVAR_NOT_FOUND)
+ // from "set to empty string" (returns 0 with no error).
+ char Buf[1];
+ DWORD Len = GetEnvironmentVariableA(m_Name.c_str(), Buf, sizeof(Buf));
+ if (Len == 0 && GetLastError() == ERROR_ENVVAR_NOT_FOUND)
+ {
+ m_OldValue = std::nullopt;
+ }
+ else
+ {
+ // Len == 0 with no error: variable exists but is empty.
+ // Len > sizeof(Buf): value is non-empty; Len is the required buffer size
+ // (including null terminator) - allocate and re-read.
+ std::string Old(Len == 0 ? 0 : Len - 1, '\0');
+ if (Len > sizeof(Buf))
+ {
+ GetEnvironmentVariableA(m_Name.c_str(), Old.data(), Len);
+ }
+ m_OldValue = std::move(Old);
+ }
+ SetEnvironmentVariableA(m_Name.c_str(), std::string(Value).c_str());
+# else
+ // getenv returns nullptr when not set, "" when set to empty string.
+ const char* Existing = getenv(m_Name.c_str());
+ m_OldValue = Existing ? std::optional<std::string>(Existing) : std::nullopt;
+ setenv(m_Name.c_str(), std::string(Value).c_str(), 1);
+# endif
+ }
+ ~ScopedEnvVar()
+ {
+# if ZEN_PLATFORM_WINDOWS
+ SetEnvironmentVariableA(m_Name.c_str(), m_OldValue.has_value() ? m_OldValue->c_str() : nullptr);
+# else
+ if (m_OldValue.has_value())
+ {
+ setenv(m_Name.c_str(), m_OldValue->c_str(), 1);
+ }
+ else
+ {
+ unsetenv(m_Name.c_str());
+ }
+# endif
+ }
+ };
+
+ /// Create a small file hierarchy under BaseDir:
+ /// file_a.bin
+ /// subdir/file_b.bin
+ /// subdir/nested/file_c.bin
+ /// Returns a vector of (relative path, content) pairs for later verification.
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> CreateTestTree(const std::filesystem::path& BaseDir)
+ {
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> Files;
+
+ auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) {
+ std::filesystem::path FullPath = BaseDir / RelPath;
+ CreateDirectories(FullPath.parent_path());
+ WriteFile(FullPath, Content);
+ Files.emplace_back(std::move(RelPath), std::move(Content));
+ };
+
+ AddFile("file_a.bin", CreateSemiRandomBlob(1024));
+ AddFile("subdir/file_b.bin", CreateSemiRandomBlob(2048));
+ AddFile("subdir/nested/file_c.bin", CreateSemiRandomBlob(512));
+ AddFile("subdir/nested/file_d.bin", CreateSemiRandomBlob(512));
+ AddFile("subdir/nested/file_e.bin", CreateSemiRandomBlob(512));
+ AddFile("subdir/nested/file_f.bin", CreateSemiRandomBlob(512));
+ AddFile("subdir/nested/medium.bulk", CreateSemiRandomBlob(256u * 1024u));
+ AddFile("subdir/nested/big.bulk", CreateSemiRandomBlob(512u * 1024u));
+ AddFile("subdir/nested/huge.bulk", CreateSemiRandomBlob(9u * 1024u * 1024u));
+
+ return Files;
+ }
+
+ void VerifyTree(const std::filesystem::path& Dir, const std::vector<std::pair<std::filesystem::path, IoBuffer>>& Expected)
+ {
+ for (const auto& [RelPath, Content] : Expected)
+ {
+ std::filesystem::path FullPath = Dir / RelPath;
+ REQUIRE_MESSAGE(std::filesystem::exists(FullPath), FullPath.string());
+ BasicFile ReadBack(FullPath, BasicFile::Mode::kRead);
+ IoBuffer ReadContent = ReadBack.ReadRange(0, ReadBack.FileSize());
+ REQUIRE_EQ(ReadContent.GetSize(), Content.GetSize());
+ CHECK(std::memcmp(ReadContent.GetData(), Content.GetData(), Content.GetSize()) == 0);
+ }
+ }
+
+} // namespace
+
+TEST_SUITE_BEGIN("server.hydration");
+
+// ---------------------------------------------------------------------------
+// FileHydrator tests
+// ---------------------------------------------------------------------------
+
+TEST_CASE("hydration.file.dehydrate_hydrate")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationStore);
+ CreateDirectories(HydrationTemp);
+
+ const std::string ModuleId = "testmodule";
+ auto TestFiles = CreateTestTree(ServerStateDir);
+
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = ModuleId;
+ Config.TargetSpecification = "file://" + HydrationStore.string();
+
+ // Dehydrate: copy server state to file store
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate();
+ }
+
+ // Verify the module folder exists in the store and ServerStateDir was wiped
+ CHECK(std::filesystem::exists(HydrationStore / ModuleId));
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ // Hydrate: restore server state from file store
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+ }
+
+ // Verify restored contents match the original
+ VerifyTree(ServerStateDir, TestFiles);
+}
+
+TEST_CASE("hydration.file.dehydrate_cleans_server_state")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationStore);
+ CreateDirectories(HydrationTemp);
+
+ CreateTestTree(ServerStateDir);
+
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = "testmodule";
+ Config.TargetSpecification = "file://" + HydrationStore.string();
+
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate();
+
+ // FileHydrator::Dehydrate() must wipe ServerStateDir when done
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+}
+
+TEST_CASE("hydration.file.hydrate_overwrites_existing_state")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationStore);
+ CreateDirectories(HydrationTemp);
+
+ auto TestFiles = CreateTestTree(ServerStateDir);
+
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = "testmodule";
+ Config.TargetSpecification = "file://" + HydrationStore.string();
+
+ // Dehydrate the original state
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate();
+ }
+
+ // Put a stale file in ServerStateDir to simulate leftover state
+ WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
+
+ // Hydrate - must wipe stale file and restore original
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+ }
+
+ CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin"));
+ VerifyTree(ServerStateDir, TestFiles);
+}
+
+// ---------------------------------------------------------------------------
+// FileHydrator concurrent test
+// ---------------------------------------------------------------------------
+
+TEST_CASE("hydration.file.concurrent")
+{
+ // N modules dehydrate and hydrate concurrently via ParallelWork.
+ // Each module operates in its own directory - tests for global/static state races.
+ constexpr int kModuleCount = 4;
+
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
+ CreateDirectories(HydrationStore);
+
+ struct ModuleData
+ {
+ HydrationConfig Config;
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> Files;
+ };
+ std::vector<ModuleData> Modules(kModuleCount);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ std::string ModuleId = fmt::format("file_concurrent_{}", I);
+ std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state";
+ std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp";
+ CreateDirectories(StateDir);
+ CreateDirectories(TempPath);
+
+ Modules[I].Config.ServerStateDir = StateDir;
+ Modules[I].Config.TempDir = TempPath;
+ Modules[I].Config.ModuleId = ModuleId;
+ Modules[I].Config.TargetSpecification = "file://" + HydrationStore.string();
+ Modules[I].Files = CreateTestTree(StateDir);
+ }
+
+ // Concurrent dehydrate
+ {
+ WorkerThreadPool Pool(kModuleCount, "hydration_file_dehy");
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate();
+ });
+ }
+ Work.Wait();
+ CHECK_FALSE(Work.IsAborted());
+ }
+
+ // Concurrent hydrate
+ {
+ WorkerThreadPool Pool(kModuleCount, "hydration_file_hy");
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+ });
+ }
+ Work.Wait();
+ CHECK_FALSE(Work.IsAborted());
+ }
+
+ // Verify all modules restored correctly
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ VerifyTree(Modules[I].Config.ServerStateDir, Modules[I].Files);
+ }
+}
+
+// ---------------------------------------------------------------------------
+// S3Hydrator tests
+//
+// Each test case spawns its own local MinIO instance (self-contained, no external setup needed).
+// The MinIO binary must be present in the same directory as the test executable (copied by xmake).
+// ---------------------------------------------------------------------------
+
+TEST_CASE("hydration.s3.dehydrate_hydrate")
+{
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = 19010;
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationTemp);
+
+ const std::string ModuleId = "s3test_roundtrip";
+ auto TestFiles = CreateTestTree(ServerStateDir);
+
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = ModuleId;
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+
+ // Dehydrate: upload server state to MinIO
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate();
+ }
+
+ // Wipe server state
+ CleanDirectory(ServerStateDir, true);
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ // Hydrate: download from MinIO back to server state
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+ }
+
+ // Verify restored contents match the original
+ VerifyTree(ServerStateDir, TestFiles);
+}
+
+TEST_CASE("hydration.s3.current_state_json_selects_latest_folder")
+{
+ // Each Dehydrate() uploads files to a new timestamp-named folder and then overwrites
+ // current-state.json to point at that folder. Old folders are NOT deleted.
+ // Hydrate() must read current-state.json to determine which folder to restore from.
+ //
+ // This test verifies that:
+ // 1. After two dehydrations, Hydrate() restores from the second snapshot, not the first,
+ // confirming that current-state.json was updated between dehydrations.
+ // 2. current-state.json is updated to point at the second (latest) folder.
+ // 3. Hydrate() restores the v2 snapshot (identified by v2marker.bin), NOT the v1 snapshot.
+
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = 19011;
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationTemp);
+
+ const std::string ModuleId = "s3test_folder_select";
+
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = ModuleId;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+ }
+
+ // v1: dehydrate without a marker file
+ CreateTestTree(ServerStateDir);
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate();
+ }
+
+ // ServerStateDir is now empty. Wait so the v2 timestamp folder name is strictly later
+ // (timestamp resolution is 1 ms, but macOS scheduler granularity requires a larger margin).
+ Sleep(100);
+
+ // v2: dehydrate WITH a marker file that only v2 has
+ CreateTestTree(ServerStateDir);
+ WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64));
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate();
+ }
+
+ // Hydrate must restore v2 (current-state.json points to the v2 folder)
+ CleanDirectory(ServerStateDir, true);
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+ }
+
+ // v2 marker must be present - confirms current-state.json pointed to the v2 folder
+ CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin"));
+ // Subdirectory hierarchy must also be intact
+ CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "file_b.bin"));
+ CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "nested" / "file_c.bin"));
+}
+
+TEST_CASE("hydration.s3.module_isolation")
+{
+ // Two independent modules dehydrate/hydrate without interfering with each other.
+ // Uses VerifyTree with per-module byte content to detect cross-module data mixing.
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = 19012;
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ ScopedTemporaryDirectory TempDir;
+
+ struct ModuleData
+ {
+ HydrationConfig Config;
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> Files;
+ };
+
+ std::vector<ModuleData> Modules;
+ for (const char* ModuleId : {"s3test_iso_a", "s3test_iso_b"})
+ {
+ std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state";
+ std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp";
+ CreateDirectories(StateDir);
+ CreateDirectories(TempPath);
+
+ ModuleData Data;
+ Data.Config.ServerStateDir = StateDir;
+ Data.Config.TempDir = TempPath;
+ Data.Config.ModuleId = ModuleId;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Data.Config.Options = std::move(Root).AsObject();
+ }
+ Data.Files = CreateTestTree(StateDir);
+
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Data.Config);
+ Hydrator->Dehydrate();
+
+ Modules.push_back(std::move(Data));
+ }
+
+ for (ModuleData& Module : Modules)
+ {
+ CleanDirectory(Module.Config.ServerStateDir, true);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Module.Config);
+ Hydrator->Hydrate();
+
+ // Each module's files must be independently restorable with correct byte content.
+ // If S3 key prefixes were mixed up, CreateSemiRandomBlob content would differ.
+ VerifyTree(Module.Config.ServerStateDir, Module.Files);
+ }
+}
+
+// ---------------------------------------------------------------------------
+// S3Hydrator concurrent test
+// ---------------------------------------------------------------------------
+
+TEST_CASE("hydration.s3.concurrent")
+{
+ // N modules dehydrate and hydrate concurrently against MinIO.
+ // Each module has a distinct ModuleId, so S3 key prefixes don't overlap.
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = 19013;
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ constexpr int kModuleCount = 16;
+ constexpr int kThreadCount = 4;
+
+ ScopedTemporaryDirectory TempDir;
+
+ struct ModuleData
+ {
+ HydrationConfig Config;
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> Files;
+ };
+ std::vector<ModuleData> Modules(kModuleCount);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ std::string ModuleId = fmt::format("s3_concurrent_{}", I);
+ std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state";
+ std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp";
+ CreateDirectories(StateDir);
+ CreateDirectories(TempPath);
+
+ Modules[I].Config.ServerStateDir = StateDir;
+ Modules[I].Config.TempDir = TempPath;
+ Modules[I].Config.ModuleId = ModuleId;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Modules[I].Config.Options = std::move(Root).AsObject();
+ }
+ Modules[I].Files = CreateTestTree(StateDir);
+ }
+
+ // Concurrent dehydrate
+ {
+ WorkerThreadPool Pool(kThreadCount, "hydration_s3_dehy");
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate();
+ });
+ }
+ Work.Wait();
+ CHECK_FALSE(Work.IsAborted());
+ }
+
+ // Concurrent hydrate
+ {
+ WorkerThreadPool Pool(kThreadCount, "hydration_s3_hy");
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic<bool>&) {
+ CleanDirectory(Config.ServerStateDir, true);
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+ });
+ }
+ Work.Wait();
+ CHECK_FALSE(Work.IsAborted());
+ }
+
+ // Verify all modules restored correctly
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ VerifyTree(Modules[I].Config.ServerStateDir, Modules[I].Files);
+ }
+}
+
+// ---------------------------------------------------------------------------
+// S3Hydrator: no prior state (first-boot path)
+// ---------------------------------------------------------------------------
+
+TEST_CASE("hydration.s3.no_prior_state")
+{
+ // Hydrate() against an empty bucket (first-boot scenario) must leave ServerStateDir empty.
+ // The "No state found in S3" path goes through the error-cleanup branch, which wipes
+ // ServerStateDir to ensure no partial or stale content is left for the server to start on.
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = 19014;
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationTemp);
+
+ // Pre-populate ServerStateDir to confirm the wipe actually runs.
+ WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
+
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = "s3test_no_prior";
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+ }
+
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+
+ // ServerStateDir must be empty: the error path wipes it to prevent a server start
+ // against stale or partially-installed content.
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+}
+
+// ---------------------------------------------------------------------------
+// S3Hydrator: bucket path prefix in TargetSpecification
+// ---------------------------------------------------------------------------
+
+TEST_CASE("hydration.s3.path_prefix")
+{
+ // TargetSpecification of the form "s3://bucket/some/prefix" stores objects under
+ // "some/prefix/<ModuleId>/..." rather than directly under "<ModuleId>/...".
+ // Tests the second branch of the m_KeyPrefix calculation in S3Hydrator::Configure().
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = 19015;
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationTemp);
+
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles = CreateTestTree(ServerStateDir);
+
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = "s3test_prefix";
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+ }
+
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate();
+ }
+
+ CleanDirectory(ServerStateDir, true);
+
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+ }
+
+ VerifyTree(ServerStateDir, TestFiles);
+}
+
+TEST_CASE("hydration.s3.options_region_override")
+{
+ // Verify that 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION env var.
+ // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options.
+
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = 19016;
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+ ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region");
+
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationTemp);
+
+ auto TestFiles = CreateTestTree(ServerStateDir);
+
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = "s3test_region_override";
+ {
+ std::string ConfigJson = fmt::format(
+ R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+ }
+
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate();
+ }
+
+ CleanDirectory(ServerStateDir, true);
+
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+ }
+
+ VerifyTree(ServerStateDir, TestFiles);
+}
+
+TEST_SUITE_END();
+
+void
+hydration_forcelink()
+{
+}
+
+#endif // ZEN_WITH_TESTS
+
} // namespace zen