// Copyright Epic Games, Inc. All Rights Reserved. #include "hydration.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include # include # include # include # include #endif // ZEN_WITH_TESTS namespace zen { namespace hydration_impl { /// UTC time decomposed to calendar fields with sub-second milliseconds. struct UtcTime { std::tm Tm{}; int Ms = 0; // sub-second milliseconds [0, 999] static UtcTime Now() { std::chrono::system_clock::time_point TimePoint = std::chrono::system_clock::now(); std::time_t TimeT = std::chrono::system_clock::to_time_t(TimePoint); int SubSecMs = static_cast((std::chrono::duration_cast(TimePoint.time_since_epoch()) % 1000).count()); UtcTime Result; Result.Ms = SubSecMs; #if ZEN_PLATFORM_WINDOWS gmtime_s(&Result.Tm, &TimeT); #else gmtime_r(&TimeT, &Result.Tm); #endif return Result; } }; std::filesystem::path FastRelativePath(const std::filesystem::path& Root, const std::filesystem::path& Abs) { auto [_, ItAbs] = std::mismatch(Root.begin(), Root.end(), Abs.begin(), Abs.end()); std::filesystem::path RelativePath; for (auto I = ItAbs; I != Abs.end(); I++) { RelativePath = RelativePath / *I; } return RelativePath; } void CleanDirectory(WorkerThreadPool& WorkerPool, std::atomic& AbortFlag, std::atomic& PauseFlag, const std::filesystem::path& Path) { CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector{}, {}, 0); } class StorageBase { public: virtual ~StorageBase() {} virtual void Configure(std::string_view ModuleId, const std::filesystem::path& TempDir, std::string_view TargetSpecification, const CbObject& Options) = 0; virtual void SaveMetadata(const CbObject& Data) = 0; virtual CbObject LoadMetadata() = 0; virtual CbObject GetSettings() = 0; virtual void ParseSettings(const CbObjectView& Settings) = 0; virtual std::vector List() = 0; virtual void Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath) = 0; virtual void Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath) = 0; virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) = 0; }; constexpr std::string_view FileHydratorPrefix = "file://"; constexpr std::string_view FileHydratorType = "file"; constexpr std::string_view S3HydratorPrefix = "s3://"; constexpr std::string_view S3HydratorType = "s3"; class FileStorage : public StorageBase { public: FileStorage() {} virtual void Configure(std::string_view ModuleId, const std::filesystem::path& TempDir, std::string_view TargetSpecification, const CbObject& Options) { ZEN_UNUSED(TempDir); if (!TargetSpecification.empty()) { m_StoragePath = Utf8ToWide(TargetSpecification.substr(FileHydratorPrefix.length())); if (m_StoragePath.empty()) { throw zen::runtime_error("Hydration config 'file' type requires a directory path"); } } else { CbObjectView Settings = Options["settings"].AsObjectView(); std::string_view Path = Settings["path"].AsString(); if (Path.empty()) { throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"); } m_StoragePath = Utf8ToWide(std::string(Path)); } m_StoragePath = m_StoragePath / ModuleId; MakeSafeAbsolutePathInPlace(m_StoragePath); m_StatePathName = m_StoragePath / "current-state.cbo"; m_CASPath = m_StoragePath / "cas"; CreateDirectories(m_CASPath); } virtual void SaveMetadata(const CbObject& Data) { BinaryWriter Output; SaveCompactBinary(Output, Data); WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize())); } virtual CbObject LoadMetadata() { if (!IsFile(m_StatePathName)) { return {}; } FileContents Content = ReadFile(m_StatePathName); if (Content.ErrorCode) { ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file"); } IoBuffer Payload = Content.Flatten(); CbValidateError Error; CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error); if (Error != CbValidateError::None) { throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error))); } return Result; } virtual CbObject GetSettings() override { return {}; } virtual void ParseSettings(const CbObjectView& Settings) { ZEN_UNUSED(Settings); } virtual std::vector List() { DirectoryContent DirContent; GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent); std::vector Result; Result.reserve(DirContent.Files.size()); for (const std::filesystem::path& Path : DirContent.Files) { IoHash Hash; if (IoHash::TryParse(Path.filename().string(), Hash)) { Result.push_back(Hash); } } return Result; } virtual void Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath) { ZEN_UNUSED(Size); Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash), SourcePath = std::filesystem::path(SourcePath)](std::atomic& AbortFlag) { if (!AbortFlag.load()) { CopyFile(SourcePath, m_CASPath / fmt::format("{}", Hash), CopyFileOptions{.EnableClone = true}); } }); } virtual void Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath) { ZEN_UNUSED(Size); Work.ScheduleWork( WorkerPool, [this, Hash = IoHash(Hash), DestinationPath = std::filesystem::path(DestinationPath)](std::atomic& AbortFlag) { if (!AbortFlag.load()) { CopyFile(m_CASPath / fmt::format("{}", Hash), DestinationPath, CopyFileOptions{.EnableClone = true}); } }); } virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override { ZEN_UNUSED(Work); ZEN_UNUSED(WorkerPool); DeleteDirectories(m_StoragePath); } private: std::filesystem::path m_StoragePath; std::filesystem::path m_StatePathName; std::filesystem::path m_CASPath; }; class S3Storage : public StorageBase { public: S3Storage() {} virtual void Configure(std::string_view ModuleId, const std::filesystem::path& TempDir, std::string_view TargetSpecification, const CbObject& Options) { m_Options = Options; CbObjectView Settings = m_Options["settings"].AsObjectView(); std::string_view Spec; if (!TargetSpecification.empty()) { Spec = TargetSpecification; Spec.remove_prefix(S3HydratorPrefix.size()); } else { std::string_view Uri = Settings["uri"].AsString(); if (Uri.empty()) { throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"); } Spec = Uri; Spec.remove_prefix(S3HydratorPrefix.size()); } size_t SlashPos = Spec.find('/'); std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); m_KeyPrefix = UserPrefix.empty() ? std::string(ModuleId) : UserPrefix + "/" + std::string(ModuleId); ZEN_ASSERT(!m_Bucket.empty()); std::string Region = std::string(Settings["region"].AsString()); if (Region.empty()) { Region = GetEnvVariable("AWS_DEFAULT_REGION"); } if (Region.empty()) { Region = GetEnvVariable("AWS_REGION"); } if (Region.empty()) { Region = "us-east-1"; } m_Region = std::move(Region); std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); if (AccessKeyId.empty()) { m_CredentialProvider = Ref(new ImdsCredentialProvider({})); } else { m_Credentials.AccessKeyId = std::move(AccessKeyId); m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY"); m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); } m_TempDir = TempDir; m_Client = CreateS3Client(); } virtual void SaveMetadata(const CbObject& Data) { S3Client& Client = *m_Client; BinaryWriter Output; SaveCompactBinary(Output, Data); IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize()); std::string Key = m_KeyPrefix + "/incremental-state.cbo"; S3Result Result = Client.PutObject(Key, std::move(Payload)); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error); } } virtual CbObject LoadMetadata() { S3Client& Client = *m_Client; std::string Key = m_KeyPrefix + "/incremental-state.cbo"; S3GetObjectResult Result = Client.GetObject(Key); if (!Result.IsSuccess()) { if (Result.Error == S3GetObjectResult::NotFoundErrorText) { return {}; } throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error); } CbValidateError Error; CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error); if (Error != CbValidateError::None) { throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error)); } return Meta; } virtual CbObject GetSettings() override { CbObjectWriter Writer; Writer << "MultipartChunkSize" << m_MultipartChunkSize; return Writer.Save(); } virtual void ParseSettings(const CbObjectView& Settings) { m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(DefaultMultipartChunkSize); } virtual std::vector List() { S3Client& Client = *m_Client; std::string Prefix = m_KeyPrefix + "/cas/"; S3ListObjectsResult Result = Client.ListObjects(Prefix); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to list S3 objects under '{}': {}", Prefix, Result.Error); } std::vector Hashes; Hashes.reserve(Result.Objects.size()); for (const S3ObjectInfo& Obj : Result.Objects) { size_t LastSlash = Obj.Key.rfind('/'); if (LastSlash == std::string::npos) { continue; } IoHash Hash; if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash)) { Hashes.push_back(Hash); } } return Hashes; } virtual void Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath) { Work.ScheduleWork( WorkerPool, [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath)](std::atomic& AbortFlag) { if (AbortFlag.load()) { return; } S3Client& Client = *m_Client; std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) { BasicFile File(SourcePath, BasicFile::Mode::kRead); S3Result Result = Client.PutObjectMultipart( Key, Size, [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); }, m_MultipartChunkSize); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); } } else { BasicFile File(SourcePath, BasicFile::Mode::kRead); S3Result Result = Client.PutObject(Key, File.ReadAll()); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); } } }); } virtual void Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath) { std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) { class WorkData { public: WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate) { PrepareFileForScatteredWrite(m_DestFile.Handle(), Size); } ~WorkData() { m_DestFile.Flush(); } void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); } private: BasicFile m_DestFile; }; std::shared_ptr Data = std::make_shared(DestinationPath, Size); uint64_t Offset = 0; while (Offset < Size) { uint64_t ChunkSize = std::min(m_MultipartChunkSize, Size - Offset); Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data](std::atomic& AbortFlag) { if (AbortFlag) { return; } S3GetObjectResult Chunk = m_Client->GetObjectRange(Key, Offset, ChunkSize); if (!Chunk.IsSuccess()) { throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", Key, Offset, Offset + ChunkSize - 1, Chunk.Error); } Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); }); Offset += ChunkSize; } } else { Work.ScheduleWork( WorkerPool, [this, Key = Key, DestinationPath = std::filesystem::path(DestinationPath)](std::atomic& AbortFlag) { if (AbortFlag) { return; } S3GetObjectResult Chunk = m_Client->GetObject(Key, m_TempDir); if (!Chunk.IsSuccess()) { throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error); } if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef)) { std::error_code Ec; std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec); if (Ec) { WriteFile(DestinationPath, Chunk.Content); } else { Chunk.Content.SetDeleteOnClose(false); Chunk.Content = {}; RenameFile(ChunkPath, DestinationPath, Ec); if (Ec) { Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath); Chunk.Content.SetDeleteOnClose(true); WriteFile(DestinationPath, Chunk.Content); } } } else { WriteFile(DestinationPath, Chunk.Content); } }); } } virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override { std::string Prefix = m_KeyPrefix + "/"; S3ListObjectsResult ListResult = m_Client->ListObjects(Prefix); if (!ListResult.IsSuccess()) { throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", Prefix, ListResult.Error); } for (const S3ObjectInfo& Obj : ListResult.Objects) { Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic& AbortFlag) { if (AbortFlag.load()) { return; } S3Result DelResult = m_Client->DeleteObject(Key); if (!DelResult.IsSuccess()) { throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error); } }); } } private: std::unique_ptr CreateS3Client() const { S3ClientOptions Options; Options.BucketName = m_Bucket; Options.Region = m_Region; CbObjectView Settings = m_Options["settings"].AsObjectView(); std::string_view Endpoint = Settings["endpoint"].AsString(); if (!Endpoint.empty()) { Options.Endpoint = std::string(Endpoint); Options.PathStyle = Settings["path-style"].AsBool(); } if (m_CredentialProvider) { Options.CredentialProvider = m_CredentialProvider; } else { Options.Credentials = m_Credentials; } Options.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u; return std::make_unique(Options); } static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u; std::string m_KeyPrefix; CbObject m_Options; std::string m_Bucket; std::string m_Region; SigV4Credentials m_Credentials; Ref m_CredentialProvider; std::unique_ptr m_Client; std::filesystem::path m_TempDir; uint64_t m_MultipartChunkSize = DefaultMultipartChunkSize; }; } // namespace hydration_impl using namespace hydration_impl; /////////////////////////////////////////////////////////////////////////// class IncrementalHydrator : public HydrationStrategyBase { public: IncrementalHydrator(std::unique_ptr&& Storage); virtual ~IncrementalHydrator() override; virtual void Configure(const HydrationConfig& Config) override; virtual void Dehydrate(const CbObject& CachedState) override; virtual CbObject Hydrate() override; virtual void Obliterate() override; private: struct Entry { std::filesystem::path RelativePath; uint64_t Size; uint64_t ModTick; IoHash Hash; }; std::unique_ptr m_Storage; HydrationConfig m_Config; WorkerThreadPool m_FallbackWorkPool; std::atomic m_FallbackAbortFlag{false}; std::atomic m_FallbackPauseFlag{false}; HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool, .AbortFlag = &m_FallbackAbortFlag, .PauseFlag = &m_FallbackPauseFlag}; }; IncrementalHydrator::IncrementalHydrator(std::unique_ptr&& Storage) : m_Storage(std::move(Storage)), m_FallbackWorkPool(0) { } IncrementalHydrator::~IncrementalHydrator() { m_Storage.reset(); } void IncrementalHydrator::Configure(const HydrationConfig& Config) { m_Config = Config; m_Storage->Configure(Config.ModuleId, Config.TempDir, Config.TargetSpecification, Config.Options); if (Config.Threading) { m_Threading = *Config.Threading; } } void IncrementalHydrator::Dehydrate(const CbObject& CachedState) { Stopwatch Timer; const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); try { std::unordered_map StateEntryLookup; std::vector StateEntries; for (CbFieldView FieldView : CachedState["Files"].AsArrayView()) { CbObjectView EntryView = FieldView.AsObjectView(); std::filesystem::path RelativePath(EntryView["Path"].AsString()); uint64_t Size = EntryView["Size"].AsUInt64(); uint64_t ModTick = EntryView["ModTick"].AsUInt64(); IoHash Hash = EntryView["Hash"].AsHash(); StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size()); StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash}); } DirectoryContent DirContent; GetDirectoryContent(*m_Threading.WorkerPool, ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, DirContent); ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files", m_Config.ModuleId, m_Config.ServerStateDir, DirContent.Files.size(), NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0)))); std::vector Entries; Entries.resize(DirContent.Files.size()); uint64_t TotalBytes = 0; uint64_t TotalFiles = 0; uint64_t HashedFiles = 0; uint64_t HashedBytes = 0; std::unordered_set ExistsLookup; { ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) { const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]); if (AbsPath.filename() == "reserve.gc") { continue; } const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); if (*RelativePath.begin() == ".sentry-native") { continue; } if (RelativePath == ".lock") { continue; } Entry& CurrentEntry = Entries[TotalFiles]; CurrentEntry.RelativePath = RelativePath; CurrentEntry.Size = DirContent.FileSizes[FileIndex]; CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex]; bool FoundHash = false; if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end()) { const Entry& StateEntry = StateEntries[KnownIt->second]; if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick) { CurrentEntry.Hash = StateEntry.Hash; FoundHash = true; } } if (!FoundHash) { Work.ScheduleWork(*m_Threading.WorkerPool, [AbsPath, EntryIndex = TotalFiles, &Entries](std::atomic& AbortFlag) { if (AbortFlag) { return; } Entry& CurrentEntry = Entries[EntryIndex]; bool FoundHash = false; if (AbsPath.extension().empty()) { auto It = CurrentEntry.RelativePath.begin(); if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas")) { IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), RawHash, RawSize); if (Compressed) { // We compose a meta-hash since taking the RawHash might collide with an existing // non-compressed file with the same content The collision is unlikely except if the // compressed data is zero bytes causing RawHash to be the same as an empty file. IoHashStream Hasher; Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash)); Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size)); CurrentEntry.Hash = Hasher.GetHash(); FoundHash = true; } } } if (!FoundHash) { CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath)); } }); HashedFiles++; HashedBytes += CurrentEntry.Size; } TotalFiles++; TotalBytes += CurrentEntry.Size; } std::vector ExistingEntries = m_Storage->List(); ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end()); Work.Wait(); Entries.resize(TotalFiles); } uint64_t UploadedFiles = 0; uint64_t UploadedBytes = 0; { ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::DisableBacklog); for (const Entry& CurrentEntry : Entries) { if (!ExistsLookup.contains(CurrentEntry.Hash)) { m_Storage->Put(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath)); UploadedFiles++; UploadedBytes += CurrentEntry.Size; } } Work.Wait(); uint64_t UploadTimeMs = Timer.GetElapsedTimeMs(); UtcTime Now = UtcTime::Now(); std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", Now.Tm.tm_year + 1900, Now.Tm.tm_mon + 1, Now.Tm.tm_mday, Now.Tm.tm_hour, Now.Tm.tm_min, Now.Tm.tm_sec, Now.Ms); CbObjectWriter Meta; Meta << "SourceFolder" << ServerStateDir.generic_string(); Meta << "ModuleId" << m_Config.ModuleId; Meta << "HostName" << GetMachineName(); Meta << "UploadTimeUtc" << UploadTimeUtc; Meta << "UploadDurationMs" << UploadTimeMs; Meta << "TotalSizeBytes" << TotalBytes; Meta << "StorageSettings" << m_Storage->GetSettings(); Meta.BeginArray("Files"); for (const Entry& CurrentEntry : Entries) { Meta.BeginObject(); { Meta << "Path" << CurrentEntry.RelativePath.generic_string(); Meta << "Size" << CurrentEntry.Size; Meta << "ModTick" << CurrentEntry.ModTick; Meta << "Hash" << CurrentEntry.Hash; } Meta.EndObject(); } Meta.EndArray(); m_Storage->SaveMetadata(Meta.Save()); } ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); ZEN_INFO("Dehydration of module '{}' completed from folder '{}'. Hashed {} ({}). Uploaded {} ({}). Total {} ({}) in {}", m_Config.ModuleId, m_Config.ServerStateDir, HashedFiles, NiceBytes(HashedBytes), UploadedFiles, NiceBytes(UploadedBytes), TotalFiles, NiceBytes(TotalBytes), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } catch (const std::exception& Ex) { ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir); } } CbObject IncrementalHydrator::Hydrate() { Stopwatch Timer; const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); try { CbObject Meta = m_Storage->LoadMetadata(); if (!Meta) { ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'", m_Config.ModuleId, m_Config.ServerStateDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); return CbObject(); } std::unordered_map EntryLookup; std::vector Entries; uint64_t TotalSize = 0; for (CbFieldView FieldView : Meta["Files"]) { CbObjectView EntryView = FieldView.AsObjectView(); if (EntryView) { Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()), .Size = EntryView["Size"].AsUInt64(), .ModTick = EntryView["ModTick"].AsUInt64(), .Hash = EntryView["Hash"].AsHash()}; TotalSize += NewEntry.Size; EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size()); Entries.emplace_back(std::move(NewEntry)); } } ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files", m_Config.ModuleId, m_Config.ServerStateDir, Entries.size(), NiceBytes(TotalSize)); m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView()); uint64_t DownloadedBytes = 0; uint64_t DownloadedFiles = 0; { ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (const Entry& CurrentEntry : Entries) { std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath); CreateDirectories(Path.parent_path()); m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path); DownloadedBytes += CurrentEntry.Size; DownloadedFiles++; } Work.Wait(); } // Downloaded successfully - swap into ServerStateDir ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); // If the two paths share at least one common component they are on the same drive/volume // and atomic renames will succeed. Otherwise fall back to a full copy. auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end()); if (ItTmp != TempDir.begin()) { DirectoryContent DirContent; GetDirectoryContent(*m_Threading.WorkerPool, TempDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, DirContent); for (const std::filesystem::path& AbsPath : DirContent.Directories) { std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest)); } } for (const std::filesystem::path& AbsPath : DirContent.Files) { std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); std::error_code Ec = RenameFileWithRetry(AbsPath, Dest); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest)); } } ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); } else { // Slow path: TempDir and ServerStateDir are on different filesystems, so rename // would fail. Copy the tree instead and clean up the temp files afterwards. ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); CopyTree(TempDir, ServerStateDir, {.EnableClone = true}); ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); } // TODO: This could perhaps be done more efficently, but ok for now DirectoryContent DirContent; GetDirectoryContent(*m_Threading.WorkerPool, ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, DirContent); CbObjectWriter HydrateState; HydrateState.BeginArray("Files"); for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) { std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end()) { HydrateState.BeginObject(); { HydrateState << "Path" << RelativePath.generic_string(); HydrateState << "Size" << DirContent.FileSizes[FileIndex]; HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex]; HydrateState << "Hash" << Entries[It->second].Hash; } HydrateState.EndObject(); } else { ZEN_ASSERT(false); } } HydrateState.EndArray(); CbObject StateObject = HydrateState.Save(); ZEN_INFO("Hydration of module '{}' complete to folder '{}'. {} ({}) files in {}", m_Config.ModuleId, m_Config.ServerStateDir, DownloadedFiles, NiceBytes(DownloadedBytes), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); return StateObject; } catch (const std::exception& Ex) { ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); return {}; } } void IncrementalHydrator::Obliterate() { const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); try { ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); m_Storage->Delete(Work, *m_Threading.WorkerPool); Work.Wait(); } catch (const std::exception& Ex) { ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what()); } CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); } std::unique_ptr CreateHydrator(const HydrationConfig& Config) { std::unique_ptr Storage; if (!Config.TargetSpecification.empty()) { if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0) { Storage = std::make_unique(); } else if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0) { Storage = std::make_unique(); } else { throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification)); } } else { std::string_view Type = Config.Options["type"].AsString(); if (Type == FileHydratorType) { Storage = std::make_unique(); } else if (Type == S3HydratorType) { Storage = std::make_unique(); } else if (!Type.empty()) { throw zen::runtime_error("Unknown hydration target type '{}'", Type); } else { throw zen::runtime_error("No hydration target configured"); } } auto Hydrator = std::make_unique(std::move(Storage)); Hydrator->Configure(Config); return Hydrator; } #if ZEN_WITH_TESTS namespace { struct TestThreading { WorkerThreadPool WorkerPool; std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; HydrationConfig::ThreadingOptions Options{.WorkerPool = &WorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag}; explicit TestThreading(int ThreadCount) : WorkerPool(ThreadCount) {} }; /// Scoped RAII helper to set/restore a single environment variable within a test. /// Used to configure AWS credentials for each S3 test's MinIO instance /// without polluting the global environment. struct ScopedEnvVar { std::string m_Name; std::optional m_OldValue; // nullopt = was not set; "" = was set to empty string ScopedEnvVar(std::string_view Name, std::string_view Value) : m_Name(Name) { # if ZEN_PLATFORM_WINDOWS // Use the raw API so we can distinguish "not set" (ERROR_ENVVAR_NOT_FOUND) // from "set to empty string" (returns 0 with no error). char Buf[1]; DWORD Len = GetEnvironmentVariableA(m_Name.c_str(), Buf, sizeof(Buf)); if (Len == 0 && GetLastError() == ERROR_ENVVAR_NOT_FOUND) { m_OldValue = std::nullopt; } else { // Len == 0 with no error: variable exists but is empty. // Len > sizeof(Buf): value is non-empty; Len is the required buffer size // (including null terminator) - allocate and re-read. std::string Old(Len == 0 ? 0 : Len - 1, '\0'); if (Len > sizeof(Buf)) { GetEnvironmentVariableA(m_Name.c_str(), Old.data(), Len); } m_OldValue = std::move(Old); } SetEnvironmentVariableA(m_Name.c_str(), std::string(Value).c_str()); # else // getenv returns nullptr when not set, "" when set to empty string. const char* Existing = getenv(m_Name.c_str()); m_OldValue = Existing ? std::optional(Existing) : std::nullopt; setenv(m_Name.c_str(), std::string(Value).c_str(), 1); # endif } ~ScopedEnvVar() { # if ZEN_PLATFORM_WINDOWS SetEnvironmentVariableA(m_Name.c_str(), m_OldValue.has_value() ? m_OldValue->c_str() : nullptr); # else if (m_OldValue.has_value()) { setenv(m_Name.c_str(), m_OldValue->c_str(), 1); } else { unsetenv(m_Name.c_str()); } # endif } }; /// Create a small file hierarchy under BaseDir: /// file_a.bin /// subdir/file_b.bin /// subdir/nested/file_c.bin /// Returns a vector of (relative path, content) pairs for later verification. typedef std::vector> TestFileList; TestFileList AddTestFiles(const std::filesystem::path& BaseDir, TestFileList& Files) { auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) { std::filesystem::path FullPath = BaseDir / RelPath; CreateDirectories(FullPath.parent_path()); WriteFile(FullPath, Content); Files.emplace_back(std::move(RelPath), std::move(Content)); }; AddFile("file_a.bin", CreateSemiRandomBlob(1024)); AddFile("subdir/file_b.bin", CreateSemiRandomBlob(2048)); AddFile("subdir/nested/file_c.bin", CreateSemiRandomBlob(512)); AddFile("subdir/nested/file_d.bin", CreateSemiRandomBlob(512)); AddFile("subdir/nested/file_e.bin", CreateSemiRandomBlob(512)); AddFile("subdir/nested/file_f.bin", CreateSemiRandomBlob(512)); return Files; } TestFileList CreateSmallTestTree(const std::filesystem::path& BaseDir) { TestFileList Files; AddTestFiles(BaseDir, Files); return Files; } TestFileList CreateTestTree(const std::filesystem::path& BaseDir) { TestFileList Files; AddTestFiles(BaseDir, Files); auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) { std::filesystem::path FullPath = BaseDir / RelPath; CreateDirectories(FullPath.parent_path()); WriteFile(FullPath, Content); Files.emplace_back(std::move(RelPath), std::move(Content)); }; AddFile("subdir/nested/medium.bulk", CreateSemiRandomBlob(256u * 1024u)); AddFile("subdir/nested/big.bulk", CreateSemiRandomBlob(512u * 1024u)); AddFile("subdir/nested/huge.bulk", CreateSemiRandomBlob(9u * 1024u * 1024u)); AddFile("subdir/nested/biggest.bulk", CreateSemiRandomBlob(63u * 1024u * 1024u)); return Files; } void VerifyTree(const std::filesystem::path& Dir, const std::vector>& Expected) { for (const auto& [RelPath, Content] : Expected) { std::filesystem::path FullPath = Dir / RelPath; REQUIRE_MESSAGE(std::filesystem::exists(FullPath), FullPath.string()); BasicFile ReadBack(FullPath, BasicFile::Mode::kRead); IoBuffer ReadContent = ReadBack.ReadRange(0, ReadBack.FileSize()); REQUIRE_EQ(ReadContent.GetSize(), Content.GetSize()); CHECK(std::memcmp(ReadContent.GetData(), Content.GetData(), Content.GetSize()) == 0); } } } // namespace TEST_SUITE_BEGIN("server.hydration"); // --------------------------------------------------------------------------- // FileHydrator tests // --------------------------------------------------------------------------- TEST_CASE("hydration.file.dehydrate_hydrate") { ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); const std::string ModuleId = "testmodule"; auto TestFiles = CreateSmallTestTree(ServerStateDir); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = ModuleId; Config.TargetSpecification = "file://" + HydrationStore.string(); // Dehydrate: copy server state to file store { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(CbObject()); } // Verify the module folder exists in the store and ServerStateDir was wiped CHECK(std::filesystem::exists(HydrationStore / ModuleId)); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: restore server state from file store { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } // Verify restored contents match the original VerifyTree(ServerStateDir, TestFiles); } TEST_CASE("hydration.file.hydrate_overwrites_existing_state") { ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); auto TestFiles = CreateSmallTestTree(ServerStateDir); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = "testmodule"; Config.TargetSpecification = "file://" + HydrationStore.string(); // Dehydrate the original state { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(CbObject()); } // Put a stale file in ServerStateDir to simulate leftover state WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); // Hydrate - must wipe stale file and restore original { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin")); VerifyTree(ServerStateDir, TestFiles); } TEST_CASE("hydration.file.excluded_files_not_dehydrated") { ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); auto TestFiles = CreateSmallTestTree(ServerStateDir); // Add files that the dehydrator should skip WriteFile(ServerStateDir / "reserve.gc", CreateSemiRandomBlob(64)); CreateDirectories(ServerStateDir / ".sentry-native"); WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32)); WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128)); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = "testmodule_excl"; Config.TargetSpecification = "file://" + HydrationStore.string(); { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(CbObject()); } // Hydrate into a clean directory CleanDirectory(ServerStateDir, true); { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } // Normal files must be restored VerifyTree(ServerStateDir, TestFiles); // Excluded files must NOT be restored CHECK_FALSE(std::filesystem::exists(ServerStateDir / "reserve.gc")); CHECK_FALSE(std::filesystem::exists(ServerStateDir / ".sentry-native")); } // --------------------------------------------------------------------------- // FileHydrator obliterate test // --------------------------------------------------------------------------- TEST_CASE("hydration.file.obliterate") { ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); const std::string ModuleId = "obliterate_test"; CreateSmallTestTree(ServerStateDir); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = ModuleId; Config.TargetSpecification = "file://" + HydrationStore.string(); // Dehydrate so the backend store has data { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(CbObject()); } CHECK(std::filesystem::exists(HydrationStore / ModuleId)); // Put some files back in ServerStateDir and TempDir to verify cleanup CreateSmallTestTree(ServerStateDir); WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); // Obliterate { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Obliterate(); } // Backend store directory deleted CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId)); // ServerStateDir cleaned CHECK(std::filesystem::is_empty(ServerStateDir)); // TempDir cleaned CHECK(std::filesystem::is_empty(HydrationTemp)); } // --------------------------------------------------------------------------- // FileHydrator concurrent test // --------------------------------------------------------------------------- TEST_CASE("hydration.file.concurrent") { // N modules dehydrate and hydrate concurrently via ParallelWork. // Each module operates in its own directory - tests for global/static state races. constexpr int kModuleCount = 4; ScopedTemporaryDirectory TempDir; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; CreateDirectories(HydrationStore); TestThreading Threading(8); struct ModuleData { HydrationConfig Config; std::vector> Files; }; std::vector Modules(kModuleCount); for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("file_concurrent_{}", I); std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; CreateDirectories(StateDir); CreateDirectories(TempPath); Modules[I].Config.ServerStateDir = StateDir; Modules[I].Config.TempDir = TempPath; Modules[I].Config.ModuleId = ModuleId; Modules[I].Config.TargetSpecification = "file://" + HydrationStore.string(); Modules[I].Config.Threading = Threading.Options; Modules[I].Files = CreateSmallTestTree(StateDir); } // Concurrent dehydrate { WorkerThreadPool Pool(kModuleCount, "hydration_file_dehy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic&) { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(CbObject()); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Concurrent hydrate { WorkerThreadPool Pool(kModuleCount, "hydration_file_hy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic&) { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Verify all modules restored correctly for (int I = 0; I < kModuleCount; ++I) { VerifyTree(Modules[I].Config.ServerStateDir, Modules[I].Files); } } // --------------------------------------------------------------------------- // S3Hydrator tests // // Each test case spawns a local MinIO instance (self-contained, no external setup needed). // The MinIO binary must be present in the same directory as the test executable (copied by xmake). // --------------------------------------------------------------------------- TEST_CASE("hydration.s3.dehydrate_hydrate") { MinioProcessOptions MinioOpts; MinioOpts.Port = 19011; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = "s3test_roundtrip"; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); Config.Options = std::move(Root).AsObject(); } // Hydrate with no prior S3 state (first-boot path). Pre-populate ServerStateDir // with a stale file to confirm the cleanup branch wipes it. WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } CHECK(std::filesystem::is_empty(ServerStateDir)); // v1: dehydrate without a marker file CreateSmallTestTree(ServerStateDir); { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(CbObject()); } // v2: dehydrate WITH a marker file that only v2 has CreateSmallTestTree(ServerStateDir); WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64)); { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(CbObject()); } // Hydrate must restore v2 (the latest dehydrated state) CleanDirectory(ServerStateDir, true); { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } // v2 marker must be present - confirms the second dehydration overwrote the first CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin")); CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "file_b.bin")); CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "nested" / "file_c.bin")); } TEST_CASE("hydration.s3.concurrent") { // N modules dehydrate and hydrate concurrently against MinIO. // Each module has a distinct ModuleId, so S3 key prefixes don't overlap. MinioProcessOptions MinioOpts; MinioOpts.Port = 19013; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); constexpr int kModuleCount = 6; constexpr int kThreadCount = 4; TestThreading Threading(kThreadCount); ScopedTemporaryDirectory TempDir; struct ModuleData { HydrationConfig Config; std::vector> Files; }; std::vector Modules(kModuleCount); for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("s3_concurrent_{}", I); std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; CreateDirectories(StateDir); CreateDirectories(TempPath); Modules[I].Config.ServerStateDir = StateDir; Modules[I].Config.TempDir = TempPath; Modules[I].Config.ModuleId = ModuleId; Modules[I].Config.Threading = Threading.Options; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); Modules[I].Config.Options = std::move(Root).AsObject(); } Modules[I].Files = CreateTestTree(StateDir); } // Concurrent dehydrate { WorkerThreadPool Pool(kThreadCount, "hydration_s3_dehy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic&) { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(CbObject()); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Concurrent hydrate { WorkerThreadPool Pool(kThreadCount, "hydration_s3_hy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Config = Modules[I].Config](std::atomic&) { CleanDirectory(Config.ServerStateDir, true); std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Verify all modules restored correctly for (int I = 0; I < kModuleCount; ++I) { VerifyTree(Modules[I].Config.ServerStateDir, Modules[I].Files); } } TEST_CASE("hydration.s3.obliterate") { MinioProcessOptions MinioOpts; MinioOpts.Port = 19019; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); const std::string ModuleId = "s3test_obliterate"; HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = ModuleId; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); Config.Options = std::move(Root).AsObject(); } // Dehydrate to populate backend CreateSmallTestTree(ServerStateDir); { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(CbObject()); } auto ListModuleObjects = [&]() { S3ClientOptions Opts; Opts.BucketName = "zen-hydration-test"; Opts.Endpoint = Minio.Endpoint(); Opts.PathStyle = true; Opts.Credentials.AccessKeyId = Minio.RootUser(); Opts.Credentials.SecretAccessKey = Minio.RootPassword(); S3Client Client(Opts); return Client.ListObjects(ModuleId + "/"); }; // Verify objects exist in S3 CHECK(!ListModuleObjects().Objects.empty()); // Re-populate ServerStateDir and TempDir for cleanup verification CreateSmallTestTree(ServerStateDir); WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); // Obliterate { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Obliterate(); } // Verify S3 objects deleted CHECK(ListModuleObjects().Objects.empty()); // Local directories cleaned CHECK(std::filesystem::is_empty(ServerStateDir)); CHECK(std::filesystem::is_empty(HydrationTemp)); } TEST_CASE("hydration.s3.config_overrides") { MinioProcessOptions MinioOpts; MinioOpts.Port = 19015; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); // Path prefix: "s3://bucket/some/prefix" stores objects under // "some/prefix//..." rather than directly under "/...". { auto TestFiles = CreateSmallTestTree(ServerStateDir); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = "s3test_prefix"; { std::string ConfigJson = fmt::format( R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); Config.Options = std::move(Root).AsObject(); } { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(CbObject()); } CleanDirectory(ServerStateDir, true); { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } VerifyTree(ServerStateDir, TestFiles); } // Region override: 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION. // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options. { CleanDirectory(ServerStateDir, true); auto TestFiles = CreateSmallTestTree(ServerStateDir); ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region"); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = "s3test_region_override"; { std::string ConfigJson = fmt::format( R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); Config.Options = std::move(Root).AsObject(); } { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(CbObject()); } CleanDirectory(ServerStateDir, true); { std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } VerifyTree(ServerStateDir, TestFiles); } } TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip()) { MinioProcessOptions MinioOpts; MinioOpts.Port = 19010; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); const std::string ModuleId = "s3test_performance"; CopyTree("E:\\Dev\\hub\\brainrot\\20260402-225355-508", ServerStateDir, {.EnableClone = true}); // auto TestFiles = CreateTestTree(ServerStateDir); TestThreading Threading(4); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = ModuleId; Config.Threading = Threading.Options; std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); Config.Options = std::move(Root).AsObject(); // Dehydrate: upload server state to MinIO { ZEN_INFO("============== DEHYDRATE =============="); std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Dehydrate(CbObject()); } for (size_t I = 0; I < 1; I++) { // Wipe server state CleanDirectory(ServerStateDir, true); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: download from MinIO back to server state { ZEN_INFO("=============== HYDRATE ==============="); std::unique_ptr Hydrator = CreateHydrator(Config); Hydrator->Hydrate(); } } } //#define REAL_DATA_PATH "E:\\Dev\\hub\\zenddc\\Zen" //#define REAL_DATA_PATH "E:\\Dev\\hub\\brainrot\\20260402-225355-508" TEST_CASE("hydration.file.incremental") { std::filesystem::path TmpPath; # ifdef REAL_DATA_PATH TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub"; # endif ScopedTemporaryDirectory TempDir(TmpPath); std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); const std::string ModuleId = "testmodule"; // auto TestFiles = CreateTestTree(ServerStateDir); TestThreading Threading(4); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = ModuleId; Config.TargetSpecification = "file://" + HydrationStore.string(); Config.Threading = Threading.Options; std::unique_ptr Storage = std::make_unique(); std::unique_ptr Hydrator = std::make_unique(std::move(Storage)); // Hydrate with no prior state CbObject HydrationState; { Hydrator->Configure(Config); HydrationState = Hydrator->Hydrate(); CHECK_FALSE(HydrationState); } # ifdef REAL_DATA_PATH ZEN_INFO("Writing state data..."); CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true}); ZEN_INFO("Writing state data complete"); # else // Create test files and dehydrate auto TestFiles = CreateTestTree(ServerStateDir); # endif { Hydrator->Configure(Config); Hydrator->Dehydrate(HydrationState); } CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: restore from S3 { Hydrator->Configure(Config); HydrationState = Hydrator->Hydrate(); } # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // Dehydrate again with cached state (should skip re-uploading unchanged files) { Hydrator->Configure(Config); Hydrator->Dehydrate(HydrationState); } CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate one more time to confirm second dehydrate produced valid state { Hydrator->Configure(Config); HydrationState = Hydrator->Hydrate(); } // Replace files and dehydrate TestFiles = CreateTestTree(ServerStateDir); { Hydrator->Configure(Config); Hydrator->Dehydrate(HydrationState); } // Hydrate one more time to confirm second dehydrate produced valid state { Hydrator->Configure(Config); HydrationState = Hydrator->Hydrate(); } # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // 0 // Dehydrate, nothing touched - no hashing, no upload { Hydrator->Configure(Config); Hydrator->Dehydrate(HydrationState); } } // --------------------------------------------------------------------------- // S3Storage test // --------------------------------------------------------------------------- TEST_CASE("hydration.s3.incremental") { MinioProcessOptions MinioOpts; MinioOpts.Port = 19017; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); std::filesystem::path TmpPath; # ifdef REAL_DATA_PATH TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub"; # endif ScopedTemporaryDirectory TempDir(TmpPath); std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); const std::string ModuleId = "s3test_incremental"; TestThreading Threading(8); HydrationConfig Config; Config.ServerStateDir = ServerStateDir; Config.TempDir = HydrationTemp; Config.ModuleId = ModuleId; Config.Threading = Threading.Options; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); Config.Options = std::move(Root).AsObject(); } std::unique_ptr Storage = std::make_unique(); std::unique_ptr Hydrator = std::make_unique(std::move(Storage)); // Hydrate with no prior state CbObject HydrationState; { Hydrator->Configure(Config); HydrationState = Hydrator->Hydrate(); CHECK_FALSE(HydrationState); } # ifdef REAL_DATA_PATH ZEN_INFO("Writing state data..."); CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true}); ZEN_INFO("Writing state data complete"); # else // Create test files and dehydrate auto TestFiles = CreateTestTree(ServerStateDir); # endif { Hydrator->Configure(Config); Hydrator->Dehydrate(HydrationState); } CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: restore from S3 { Hydrator->Configure(Config); HydrationState = Hydrator->Hydrate(); } # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // Dehydrate again with cached state (should skip re-uploading unchanged files) { Hydrator->Configure(Config); Hydrator->Dehydrate(HydrationState); } CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate one more time to confirm second dehydrate produced valid state { Hydrator->Configure(Config); HydrationState = Hydrator->Hydrate(); } // Replace files and dehydrate TestFiles = CreateTestTree(ServerStateDir); { Hydrator->Configure(Config); Hydrator->Dehydrate(HydrationState); } // Hydrate one more time to confirm second dehydrate produced valid state { Hydrator->Configure(Config); HydrationState = Hydrator->Hydrate(); } # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // 0 // Dehydrate, nothing touched - no hashing, no upload { Hydrator->Configure(Config); Hydrator->Dehydrate(HydrationState); } } TEST_CASE("hydration.create_hydrator_rejects_invalid_config") { ScopedTemporaryDirectory TempDir; HydrationConfig Config; Config.ServerStateDir = TempDir.Path() / "state"; Config.TempDir = TempDir.Path() / "temp"; Config.ModuleId = "invalid_test"; // Unknown TargetSpecification prefix Config.TargetSpecification = "ftp://somewhere"; CHECK_THROWS(CreateHydrator(Config)); // Unknown Options type Config.TargetSpecification.clear(); { std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(R"({"type":"dynamodb"})", ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); Config.Options = std::move(Root).AsObject(); } CHECK_THROWS(CreateHydrator(Config)); // Empty Options (no type field) Config.Options = CbObject(); CHECK_THROWS(CreateHydrator(Config)); } TEST_SUITE_END(); void hydration_forcelink() { } #endif // ZEN_WITH_TESTS } // namespace zen