// Copyright Epic Games, Inc. All Rights Reserved. #include "hydration.h" #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include # include # include # include # include # include # include #endif // ZEN_WITH_TESTS namespace zen { namespace { /// UTC time decomposed to calendar fields with sub-second milliseconds. struct UtcTime { std::tm Tm{}; int Ms = 0; // sub-second milliseconds [0, 999] static UtcTime Now() { std::chrono::system_clock::time_point TimePoint = std::chrono::system_clock::now(); std::time_t TimeT = std::chrono::system_clock::to_time_t(TimePoint); int SubSecMs = static_cast((std::chrono::duration_cast(TimePoint.time_since_epoch()) % 1000).count()); UtcTime Result; Result.Ms = SubSecMs; #if ZEN_PLATFORM_WINDOWS gmtime_s(&Result.Tm, &TimeT); #else gmtime_r(&TimeT, &Result.Tm); #endif return Result; } }; } // namespace /////////////////////////////////////////////////////////////////////////// constexpr std::string_view FileHydratorPrefix = "file://"; struct FileHydrator : public HydrationStrategyBase { virtual void Configure(const HydrationConfig& Config) override; virtual void Hydrate() override; virtual void Dehydrate() override; private: HydrationConfig m_Config; std::filesystem::path m_StorageModuleRootDir; }; void FileHydrator::Configure(const HydrationConfig& Config) { m_Config = Config; std::filesystem::path ConfigPath(Utf8ToWide(m_Config.TargetSpecification.substr(FileHydratorPrefix.length()))); MakeSafeAbsolutePathInPlace(ConfigPath); if (!std::filesystem::exists(ConfigPath)) { throw std::invalid_argument(fmt::format("Target does not exist: '{}'", ConfigPath.string())); } m_StorageModuleRootDir = ConfigPath / m_Config.ModuleId; CreateDirectories(m_StorageModuleRootDir); } void FileHydrator::Hydrate() { ZEN_INFO("Hydrating state from '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir); // Ensure target is clean ZEN_DEBUG("Wiping server state at '{}'", m_Config.ServerStateDir); const bool ForceRemoveReadOnlyFiles = true; CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); bool WipeServerState = false; try { ZEN_DEBUG("Copying '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir); CopyTree(m_StorageModuleRootDir, m_Config.ServerStateDir, {.EnableClone = true}); } catch (std::exception& Ex) { ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_Config.ServerStateDir); // We don't do the clean right here to avoid potentially running into double-throws WipeServerState = true; } if (WipeServerState) { ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); } } void FileHydrator::Dehydrate() { ZEN_INFO("Dehydrating state from '{}' to '{}'", m_Config.ServerStateDir, m_StorageModuleRootDir); const std::filesystem::path TargetDir = m_StorageModuleRootDir; // Ensure target is clean. This could be replaced with an atomic copy at a later date // (i.e copy into a temporary directory name and rename it once complete) ZEN_DEBUG("Cleaning storage root '{}'", TargetDir); const bool ForceRemoveReadOnlyFiles = true; CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles); bool CopySuccess = true; try { ZEN_DEBUG("Copying '{}' to '{}'", m_Config.ServerStateDir, TargetDir); CopyTree(m_Config.ServerStateDir, TargetDir, {.EnableClone = true}); } catch (std::exception& Ex) { ZEN_WARN("Copy failed: {}. Will wipe any partially copied state from '{}'", Ex.what(), m_StorageModuleRootDir); // We don't do the clean right here to avoid potentially running into double-throws CopySuccess = false; } if (!CopySuccess) { ZEN_DEBUG("Removing partially copied state from '{}'", TargetDir); CleanDirectory(TargetDir, ForceRemoveReadOnlyFiles); } ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); } /////////////////////////////////////////////////////////////////////////// constexpr std::string_view S3HydratorPrefix = "s3://"; struct S3Hydrator : public HydrationStrategyBase { void Configure(const HydrationConfig& Config) override; void Dehydrate() override; void Hydrate() override; private: S3Client CreateS3Client() const; std::string BuildTimestampFolderName() const; std::string MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const; HydrationConfig m_Config; std::string m_Bucket; std::string m_KeyPrefix; // "/" or just "" - no trailing slash std::string m_Region; SigV4Credentials m_Credentials; Ref m_CredentialProvider; }; void S3Hydrator::Configure(const HydrationConfig& Config) { m_Config = Config; std::string_view Spec = m_Config.TargetSpecification; Spec.remove_prefix(S3HydratorPrefix.size()); size_t SlashPos = Spec.find('/'); std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); m_KeyPrefix = UserPrefix.empty() ? m_Config.ModuleId : UserPrefix + "/" + m_Config.ModuleId; ZEN_ASSERT(!m_Bucket.empty()); std::string Region = GetEnvVariable("AWS_DEFAULT_REGION"); if (Region.empty()) { Region = GetEnvVariable("AWS_REGION"); } if (Region.empty()) { Region = "us-east-1"; } m_Region = std::move(Region); std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); if (AccessKeyId.empty()) { m_CredentialProvider = Ref(new ImdsCredentialProvider({})); } else { m_Credentials.AccessKeyId = std::move(AccessKeyId); m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY"); m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); } } S3Client S3Hydrator::CreateS3Client() const { S3ClientOptions Options; Options.BucketName = m_Bucket; Options.Region = m_Region; if (!m_Config.S3Endpoint.empty()) { Options.Endpoint = m_Config.S3Endpoint; Options.PathStyle = m_Config.S3PathStyle; } if (m_CredentialProvider) { Options.CredentialProvider = m_CredentialProvider; } else { Options.Credentials = m_Credentials; } return S3Client(Options); } std::string S3Hydrator::BuildTimestampFolderName() const { UtcTime Now = UtcTime::Now(); return fmt::format("{:04d}{:02d}{:02d}-{:02d}{:02d}{:02d}-{:03d}", Now.Tm.tm_year + 1900, Now.Tm.tm_mon + 1, Now.Tm.tm_mday, Now.Tm.tm_hour, Now.Tm.tm_min, Now.Tm.tm_sec, Now.Ms); } std::string S3Hydrator::MakeObjectKey(std::string_view FolderName, const std::filesystem::path& RelPath) const { return m_KeyPrefix + "/" + std::string(FolderName) + "/" + RelPath.generic_string(); } void S3Hydrator::Dehydrate() { ZEN_INFO("Dehydrating state from '{}' to s3://{}/{}", m_Config.ServerStateDir, m_Bucket, m_KeyPrefix); try { S3Client Client = CreateS3Client(); std::string FolderName = BuildTimestampFolderName(); uint64_t TotalBytes = 0; uint32_t FileCount = 0; std::chrono::steady_clock::time_point UploadStart = std::chrono::steady_clock::now(); DirectoryContent DirContent; GetDirectoryContent(m_Config.ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, DirContent); for (const std::filesystem::path& AbsPath : DirContent.Files) { std::filesystem::path RelPath = AbsPath.lexically_relative(m_Config.ServerStateDir); if (RelPath.empty() || *RelPath.begin() == "..") { throw zen::runtime_error( "lexically_relative produced a '..'-escape path for '{}' relative to '{}' - " "path form mismatch (e.g. \\\\?\\ prefix on one but not the other)", AbsPath.string(), m_Config.ServerStateDir.string()); } std::string Key = MakeObjectKey(FolderName, RelPath); BasicFile File(AbsPath, BasicFile::Mode::kRead); uint64_t FileSize = File.FileSize(); S3Result UploadResult = Client.PutObjectMultipart(Key, FileSize, [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); }); if (!UploadResult.IsSuccess()) { throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, UploadResult.Error); } TotalBytes += FileSize; ++FileCount; } // Write current-state.json int64_t UploadDurationMs = std::chrono::duration_cast(std::chrono::steady_clock::now() - UploadStart).count(); UtcTime Now = UtcTime::Now(); std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", Now.Tm.tm_year + 1900, Now.Tm.tm_mon + 1, Now.Tm.tm_mday, Now.Tm.tm_hour, Now.Tm.tm_min, Now.Tm.tm_sec, Now.Ms); CbObjectWriter Meta; Meta << "FolderName" << FolderName; Meta << "ModuleId" << m_Config.ModuleId; Meta << "HostName" << GetMachineName(); Meta << "UploadTimeUtc" << UploadTimeUtc; Meta << "UploadDurationMs" << UploadDurationMs; Meta << "TotalSizeBytes" << TotalBytes; Meta << "FileCount" << FileCount; ExtendableStringBuilder<1024> JsonBuilder; Meta.Save().ToJson(JsonBuilder); std::string MetaKey = m_KeyPrefix + "/current-state.json"; std::string_view JsonText = JsonBuilder.ToView(); IoBuffer MetaBuf(IoBuffer::Clone, JsonText.data(), JsonText.size()); S3Result MetaUploadResult = Client.PutObject(MetaKey, std::move(MetaBuf)); if (!MetaUploadResult.IsSuccess()) { throw zen::runtime_error("Failed to write current-state.json to '{}': {}", MetaKey, MetaUploadResult.Error); } ZEN_INFO("Dehydration complete: {} files, {} bytes, {} ms", FileCount, TotalBytes, UploadDurationMs); } catch (std::exception& Ex) { // Any in-progress multipart upload has already been aborted by PutObjectMultipart. // current-state.json is only written on success, so the previous S3 state remains valid. ZEN_WARN("S3 dehydration failed: {}. S3 state not updated.", Ex.what()); } } void S3Hydrator::Hydrate() { ZEN_INFO("Hydrating state from s3://{}/{} to '{}'", m_Bucket, m_KeyPrefix, m_Config.ServerStateDir); const bool ForceRemoveReadOnlyFiles = true; // Clean temp dir before starting in case of leftover state from a previous failed hydration ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); bool WipeServerState = false; try { S3Client Client = CreateS3Client(); std::string MetaKey = m_KeyPrefix + "/current-state.json"; S3HeadObjectResult HeadResult = Client.HeadObject(MetaKey); if (HeadResult.Status == HeadObjectResult::NotFound) { throw zen::runtime_error("No state found in S3 at '{}'", MetaKey); } if (!HeadResult.IsSuccess()) { throw zen::runtime_error("Failed to check for state in S3 at '{}': {}", MetaKey, HeadResult.Error); } S3GetObjectResult MetaResult = Client.GetObject(MetaKey); if (!MetaResult.IsSuccess()) { throw zen::runtime_error("Failed to read current-state.json from '{}': {}", MetaKey, MetaResult.Error); } std::string ParseError; json11::Json MetaJson = json11::Json::parse(std::string(MetaResult.AsText()), ParseError); if (!ParseError.empty()) { throw zen::runtime_error("Failed to parse current-state.json from '{}': {}", MetaKey, ParseError); } std::string FolderName = MetaJson["FolderName"].string_value(); if (FolderName.empty()) { throw zen::runtime_error("current-state.json from '{}' has missing or empty FolderName", MetaKey); } std::string FolderPrefix = m_KeyPrefix + "/" + FolderName + "/"; S3ListObjectsResult ListResult = Client.ListObjects(FolderPrefix); if (!ListResult.IsSuccess()) { throw zen::runtime_error("Failed to list S3 objects under '{}': {}", FolderPrefix, ListResult.Error); } for (const S3ObjectInfo& Obj : ListResult.Objects) { if (!Obj.Key.starts_with(FolderPrefix)) { ZEN_WARN("Skipping unexpected S3 key '{}' (expected prefix '{}')", Obj.Key, FolderPrefix); continue; } std::string RelKey = Obj.Key.substr(FolderPrefix.size()); if (RelKey.empty()) { continue; } std::filesystem::path DestPath = MakeSafeAbsolutePath(m_Config.TempDir / std::filesystem::path(RelKey)); CreateDirectories(DestPath.parent_path()); BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate); DestFile.SetFileSize(Obj.Size); if (Obj.Size > 0) { BasicFileWriter Writer(DestFile, 64 * 1024); uint64_t Offset = 0; while (Offset < Obj.Size) { uint64_t ChunkSize = std::min(8 * 1024 * 1024, Obj.Size - Offset); S3GetObjectResult Chunk = Client.GetObjectRange(Obj.Key, Offset, ChunkSize); if (!Chunk.IsSuccess()) { throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", Obj.Key, Offset, Offset + ChunkSize - 1, Chunk.Error); } Writer.Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); Offset += ChunkSize; } Writer.Flush(); } } // Downloaded successfully - swap into ServerStateDir ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir); CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); // If the two paths share at least one common component they are on the same drive/volume // and atomic renames will succeed. Otherwise fall back to a full copy. auto [ItTmp, ItState] = std::mismatch(m_Config.TempDir.begin(), m_Config.TempDir.end(), m_Config.ServerStateDir.begin(), m_Config.ServerStateDir.end()); if (ItTmp != m_Config.TempDir.begin()) { // Fast path: atomic renames - no data copying needed for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.TempDir)) { std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / Entry.path().filename()); if (Entry.is_directory()) { RenameDirectory(Entry.path(), Dest); } else { RenameFile(Entry.path(), Dest); } } ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); } else { // Slow path: TempDir and ServerStateDir are on different filesystems, so rename // would fail. Copy the tree instead and clean up the temp files afterwards. ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); CopyTree(m_Config.TempDir, m_Config.ServerStateDir, {.EnableClone = true}); ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); } ZEN_INFO("Hydration complete from folder '{}'", FolderName); } catch (std::exception& Ex) { ZEN_WARN("S3 hydration failed: {}. Will wipe any partially installed state.", Ex.what()); // We don't do the clean right here to avoid potentially running into double-throws WipeServerState = true; } if (WipeServerState) { ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles); ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles); } } std::unique_ptr CreateHydrator(const HydrationConfig& Config) { if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0) { std::unique_ptr Hydrator = std::make_unique(); Hydrator->Configure(Config); return Hydrator; } if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0) { std::unique_ptr Hydrator = std::make_unique(); Hydrator->Configure(Config); return Hydrator; } throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification)); } #if ZEN_WITH_TESTS namespace { /// Scoped RAII helper to set/restore a single environment variable within a test. /// Used to configure AWS credentials for each S3 test's MinIO instance /// without polluting the global environment. struct ScopedEnvVar { std::string m_Name; std::optional m_OldValue; // nullopt = was not set; "" = was set to empty string ScopedEnvVar(std::string_view Name, std::string_view Value) : m_Name(Name) { # if ZEN_PLATFORM_WINDOWS // Use the raw API so we can distinguish "not set" (ERROR_ENVVAR_NOT_FOUND) // from "set to empty string" (returns 0 with no error). char Buf[1]; DWORD Len = GetEnvironmentVariableA(m_Name.c_str(), Buf, sizeof(Buf)); if (Len == 0 && GetLastError() == ERROR_ENVVAR_NOT_FOUND) { m_OldValue = std::nullopt; } else { // Len == 0 with no error: variable exists but is empty. // Len > sizeof(Buf): value is non-empty; Len is the required buffer size // (including null terminator) - allocate and re-read. std::string Old(Len == 0 ? 0 : Len - 1, '\0'); if (Len > sizeof(Buf)) { GetEnvironmentVariableA(m_Name.c_str(), Old.data(), Len); } m_OldValue = std::move(Old); } SetEnvironmentVariableA(m_Name.c_str(), std::string(Value).c_str()); # else // getenv returns nullptr when not set, "" when set to empty string. const char* Existing = getenv(m_Name.c_str()); m_OldValue = Existing ? std::optional(Existing) : std::nullopt; setenv(m_Name.c_str(), std::string(Value).c_str(), 1); # endif } ~ScopedEnvVar() { # if ZEN_PLATFORM_WINDOWS SetEnvironmentVariableA(m_Name.c_str(), m_OldValue.has_value() ? m_OldValue->c_str() : nullptr); # else if (m_OldValue.has_value()) { setenv(m_Name.c_str(), m_OldValue->c_str(), 1); } else { unsetenv(m_Name.c_str()); } # endif } }; /// Create a small file hierarchy under BaseDir: /// file_a.bin /// subdir/file_b.bin /// subdir/nested/file_c.bin /// Returns a vector of (relative path, content) pairs for later verification. std::vector> CreateTestTree(const std::filesystem::path& BaseDir) { std::vector> Files; auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) { std::filesystem::path FullPath = BaseDir / RelPath; CreateDirectories(FullPath.parent_path()); WriteFile(FullPath, Content); Files.emplace_back(std::move(RelPath), std::move(Content)); }; AddFile("file_a.bin", CreateSemiRandomBlob(1024)); AddFile("subdir/file_b.bin", CreateSemiRandomBlob(2048)); AddFile("subdir/nested/file_c.bin", CreateSemiRandomBlob(512)); return Files; } void VerifyTree(const std::filesystem::path& Dir, const std::vector>& Expected) { for (const auto& [RelPath, Content] : Expected) { std::filesystem::path FullPath = Dir / RelPath; REQUIRE_MESSAGE(std::filesystem::exists(FullPath), FullPath.string()); BasicFile ReadBack(FullPath, BasicFile::Mode::kRead); IoBuffer ReadContent = ReadBack.ReadRange(0, ReadBack.FileSize()); REQUIRE_EQ(ReadContent.GetSize(), Content.GetSize()); CHECK(std::memcmp(ReadContent.GetData(), Content.GetData(), Content.GetSize()) == 0); } } } // namespace TEST_SUITE_BEGIN("server.hydration"); // --------------------------------------------------------------------------- // FileHydrator tests // --------------------------------------------------------------------------- TEST_CASE("hydration.file.dehydrate_hydrate") { ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); const std::string ModuleId = "testmodule"; auto TestFiles = CreateTestTree(ServerStateDir); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = ModuleId; Config.TargetSpecification = "file://" + HydrationStore.string(); // Dehydrate: copy server state to file store { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(); } // Verify the module folder exists in the store and ServerStateDir was wiped CHECK(std::filesystem::exists(HydrationStore / ModuleId)); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: restore server state from file store { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } // Verify restored contents match the original VerifyTree(ServerStateDir, TestFiles); } TEST_CASE("hydration.file.dehydrate_cleans_server_state") { ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); CreateTestTree(ServerStateDir); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = "testmodule"; Config.TargetSpecification = "file://" + HydrationStore.string(); std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(); // FileHydrator::Dehydrate() must wipe ServerStateDir when done CHECK(std::filesystem::is_empty(ServerStateDir)); } TEST_CASE("hydration.file.hydrate_overwrites_existing_state") { ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); auto TestFiles = CreateTestTree(ServerStateDir); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = "testmodule"; Config.TargetSpecification = "file://" + HydrationStore.string(); // Dehydrate the original state { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(); } // Put a stale file in ServerStateDir to simulate leftover state WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); // Hydrate - must wipe stale file and restore original { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin")); VerifyTree(ServerStateDir, TestFiles); } // --------------------------------------------------------------------------- // FileHydrator concurrent test // --------------------------------------------------------------------------- TEST_CASE("hydration.file.concurrent") { // N modules dehydrate and hydrate concurrently via ParallelWork. // Each module operates in its own directory - tests for global/static state races. constexpr int kModuleCount = 4; ScopedTemporaryDirectory TempDir; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; CreateDirectories(HydrationStore); struct ModuleData { HydrationConfig Config; std::vector> Files; }; std::vector Modules(kModuleCount); for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("file_concurrent_{}", I); std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; CreateDirectories(StateDir); CreateDirectories(TempPath); Modules[I].Config.ServerStateDir = StateDir; Modules[I].Config.TempDir = TempPath; Modules[I].Config.ModuleId = ModuleId; Modules[I].Config.TargetSpecification = "file://" + HydrationStore.string(); Modules[I].Files = CreateTestTree(StateDir); } // Concurrent dehydrate { WorkerThreadPool Pool(kModuleCount, "hydration_file_dehy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic&) { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Concurrent hydrate { WorkerThreadPool Pool(kModuleCount, "hydration_file_hy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic&) { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Verify all modules restored correctly for (int I = 0; I < kModuleCount; ++I) { VerifyTree(Modules[I].Config.ServerStateDir, Modules[I].Files); } } // --------------------------------------------------------------------------- // S3Hydrator tests // // Each test case spawns its own local MinIO instance (self-contained, no external setup needed). // The MinIO binary must be present in the same directory as the test executable (copied by xmake). // --------------------------------------------------------------------------- TEST_CASE("hydration.s3.dehydrate_hydrate") { MinioProcessOptions MinioOpts; MinioOpts.Port = 19010; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); const std::string ModuleId = "s3test_roundtrip"; auto TestFiles = CreateTestTree(ServerStateDir); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = ModuleId; Config.TargetSpecification = "s3://zen-hydration-test"; Config.S3Endpoint = Minio.Endpoint(); Config.S3PathStyle = true; // Dehydrate: upload server state to MinIO { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(); } // Wipe server state CleanDirectory(ServerStateDir, true); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: download from MinIO back to server state { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } // Verify restored contents match the original VerifyTree(ServerStateDir, TestFiles); } TEST_CASE("hydration.s3.current_state_json_selects_latest_folder") { // Each Dehydrate() uploads files to a new timestamp-named folder and then overwrites // current-state.json to point at that folder. Old folders are NOT deleted. // Hydrate() must read current-state.json to determine which folder to restore from. // // This test verifies that: // 1. After two dehydrations, Hydrate() restores from the second snapshot, not the first, // confirming that current-state.json was updated between dehydrations. // 2. current-state.json is updated to point at the second (latest) folder. // 3. Hydrate() restores the v2 snapshot (identified by v2marker.bin), NOT the v1 snapshot. MinioProcessOptions MinioOpts; MinioOpts.Port = 19011; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); const std::string ModuleId = "s3test_folder_select"; HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = ModuleId; Config.TargetSpecification = "s3://zen-hydration-test"; Config.S3Endpoint = Minio.Endpoint(); Config.S3PathStyle = true; // v1: dehydrate without a marker file CreateTestTree(ServerStateDir); { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(); } // ServerStateDir is now empty. Wait so the v2 timestamp folder name is strictly later // (timestamp resolution is 1 ms, but macOS scheduler granularity requires a larger margin). Sleep(100); // v2: dehydrate WITH a marker file that only v2 has CreateTestTree(ServerStateDir); WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64)); { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(); } // Hydrate must restore v2 (current-state.json points to the v2 folder) CleanDirectory(ServerStateDir, true); { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } // v2 marker must be present - confirms current-state.json pointed to the v2 folder CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin")); // Subdirectory hierarchy must also be intact CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "file_b.bin")); CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "nested" / "file_c.bin")); } TEST_CASE("hydration.s3.module_isolation") { // Two independent modules dehydrate/hydrate without interfering with each other. // Uses VerifyTree with per-module byte content to detect cross-module data mixing. MinioProcessOptions MinioOpts; MinioOpts.Port = 19012; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; struct ModuleData { HydrationConfig Config; std::vector> Files; }; std::vector Modules; for (const char* ModuleId : {"s3test_iso_a", "s3test_iso_b"}) { std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; CreateDirectories(StateDir); CreateDirectories(TempPath); ModuleData Data; Data.Config.ServerStateDir = StateDir; Data.Config.TempDir = TempPath; Data.Config.ModuleId = ModuleId; Data.Config.TargetSpecification = "s3://zen-hydration-test"; Data.Config.S3Endpoint = Minio.Endpoint(); Data.Config.S3PathStyle = true; Data.Files = CreateTestTree(StateDir); std::unique_ptr Hydrator = CreateHydrator(Data.Config); Hydrator->Dehydrate(); Modules.push_back(std::move(Data)); } for (ModuleData& Module : Modules) { CleanDirectory(Module.Config.ServerStateDir, true); std::unique_ptr Hydrator = CreateHydrator(Module.Config); Hydrator->Hydrate(); // Each module's files must be independently restorable with correct byte content. // If S3 key prefixes were mixed up, CreateSemiRandomBlob content would differ. VerifyTree(Module.Config.ServerStateDir, Module.Files); } } // --------------------------------------------------------------------------- // S3Hydrator concurrent test // --------------------------------------------------------------------------- TEST_CASE("hydration.s3.concurrent") { // N modules dehydrate and hydrate concurrently against MinIO. // Each module has a distinct ModuleId, so S3 key prefixes don't overlap. MinioProcessOptions MinioOpts; MinioOpts.Port = 19013; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); constexpr int kModuleCount = 4; ScopedTemporaryDirectory TempDir; struct ModuleData { HydrationConfig Config; std::vector> Files; }; std::vector Modules(kModuleCount); for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("s3_concurrent_{}", I); std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; CreateDirectories(StateDir); CreateDirectories(TempPath); Modules[I].Config.ServerStateDir = StateDir; Modules[I].Config.TempDir = TempPath; Modules[I].Config.ModuleId = ModuleId; Modules[I].Config.TargetSpecification = "s3://zen-hydration-test"; Modules[I].Config.S3Endpoint = Minio.Endpoint(); Modules[I].Config.S3PathStyle = true; Modules[I].Files = CreateTestTree(StateDir); } // Concurrent dehydrate { WorkerThreadPool Pool(kModuleCount, "hydration_s3_dehy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic&) { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Concurrent hydrate { WorkerThreadPool Pool(kModuleCount, "hydration_s3_hy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic&) { CleanDirectory(Config.ServerStateDir, true); std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Verify all modules restored correctly for (int I = 0; I < kModuleCount; ++I) { VerifyTree(Modules[I].Config.ServerStateDir, Modules[I].Files); } } // --------------------------------------------------------------------------- // S3Hydrator: no prior state (first-boot path) // --------------------------------------------------------------------------- TEST_CASE("hydration.s3.no_prior_state") { // Hydrate() against an empty bucket (first-boot scenario) must leave ServerStateDir empty. // The "No state found in S3" path goes through the error-cleanup branch, which wipes // ServerStateDir to ensure no partial or stale content is left for the server to start on. MinioProcessOptions MinioOpts; MinioOpts.Port = 19014; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); // Pre-populate ServerStateDir to confirm the wipe actually runs. WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = "s3test_no_prior"; Config.TargetSpecification = "s3://zen-hydration-test"; Config.S3Endpoint = Minio.Endpoint(); Config.S3PathStyle = true; std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); // ServerStateDir must be empty: the error path wipes it to prevent a server start // against stale or partially-installed content. CHECK(std::filesystem::is_empty(ServerStateDir)); } // --------------------------------------------------------------------------- // S3Hydrator: bucket path prefix in TargetSpecification // --------------------------------------------------------------------------- TEST_CASE("hydration.s3.path_prefix") { // TargetSpecification of the form "s3://bucket/some/prefix" stores objects under // "some/prefix//..." rather than directly under "/...". // Tests the second branch of the m_KeyPrefix calculation in S3Hydrator::Configure(). MinioProcessOptions MinioOpts; MinioOpts.Port = 19015; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); std::vector> TestFiles = CreateTestTree(ServerStateDir); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = "s3test_prefix"; Config.TargetSpecification = "s3://zen-hydration-test/team/project"; Config.S3Endpoint = Minio.Endpoint(); Config.S3PathStyle = true; { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(); } CleanDirectory(ServerStateDir, true); { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } VerifyTree(ServerStateDir, TestFiles); } TEST_SUITE_END(); void hydration_forcelink() { } #endif // ZEN_WITH_TESTS } // namespace zen