// Copyright Epic Games, Inc. All Rights Reserved. #include "hydration.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include # include # include # include #endif // ZEN_WITH_TESTS namespace zen { namespace hydration_impl { /// UTC time decomposed to calendar fields with sub-second milliseconds. struct UtcTime { std::tm Tm{}; int Ms = 0; // sub-second milliseconds [0, 999] static UtcTime Now() { std::chrono::system_clock::time_point TimePoint = std::chrono::system_clock::now(); std::time_t TimeT = std::chrono::system_clock::to_time_t(TimePoint); int SubSecMs = static_cast((std::chrono::duration_cast(TimePoint.time_since_epoch()) % 1000).count()); UtcTime Result; Result.Ms = SubSecMs; #if ZEN_PLATFORM_WINDOWS gmtime_s(&Result.Tm, &TimeT); #else gmtime_r(&TimeT, &Result.Tm); #endif return Result; } }; std::filesystem::path FastRelativePath(const std::filesystem::path& Root, const std::filesystem::path& Abs) { auto [_, ItAbs] = std::mismatch(Root.begin(), Root.end(), Abs.begin(), Abs.end()); std::filesystem::path RelativePath; for (auto I = ItAbs; I != Abs.end(); I++) { RelativePath = RelativePath / *I; } return RelativePath; } void CleanDirectory(WorkerThreadPool& WorkerPool, std::atomic& AbortFlag, std::atomic& PauseFlag, const std::filesystem::path& Path) { CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector{}, {}, 0); } /////////////////////////////////////////////////////////////////////// // Hydration / dehydration statistics. Atomics so they are safe to update // from parallel worker lambdas. Summary is emitted once after the operation // completes (success or failure). struct PhaseStats { std::atomic Files{0}; // host-side: count of work scheduled in this phase std::atomic Bytes{0}; // lambda-side: bytes transferred on successful completion std::atomic ElapsedUs{0}; // wall time around Work.Wait() RwLock ThreadIdsLock; std::unordered_set ThreadIds; void RecordThread() { int Tid = zen::GetCurrentThreadId(); ThreadIdsLock.WithExclusiveLock([&] { ThreadIds.insert(Tid); }); } }; struct DehydrateStatistics { PhaseStats Hash; PhaseStats Upload; PhaseStats Touch; // Touch shares Upload's ParallelWork / ElapsedUs std::atomic LoadStateUs{0}; std::atomic DirScanUs{0}; std::atomic ListExistingUs{0}; std::atomic MetadataSaveUs{0}; std::atomic CleanUs{0}; std::atomic TotalFiles{0}; std::atomic TotalBytes{0}; std::atomic TotalUs{0}; }; struct HydrateStatistics { PhaseStats Download; std::atomic LoadMetadataUs{0}; std::atomic CleanUs{0}; std::atomic RenameOrCopyUs{0}; std::atomic VerifyScanUs{0}; std::atomic TotalFiles{0}; std::atomic TotalBytes{0}; std::atomic TotalUs{0}; }; // Bits-per-second rate computed at microsecond precision. Zero-safe. inline uint64_t BitsPerSecond(uint64_t Bytes, uint64_t ElapsedUs) { if (ElapsedUs == 0) { return 0; } return Bytes * 8 * 1'000'000ull / ElapsedUs; } /////////////////////////////////////////////////////////////////////// // Per-module storage interface driven by IncrementalHydrator. class StorageBase { public: virtual ~StorageBase() = default; virtual std::string Describe() const = 0; virtual void SaveMetadata(const CbObject& Data) = 0; virtual CbObject LoadMetadata() = 0; virtual CbObject GetSettings() = 0; virtual void ParseSettings(const CbObjectView& Settings) = 0; virtual std::vector List() = 0; virtual void Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, PhaseStats& Stats) = 0; virtual void Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, PhaseStats& Stats) = 0; virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) = 0; virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) = 0; }; class FileStorage : public StorageBase { public: static constexpr std::string_view Prefix = "file://"; static constexpr std::string_view Type = "file"; explicit FileStorage(std::filesystem::path ModulePath); virtual std::string Describe() const override { return fmt::format("file://{}", m_StoragePath.generic_string()); } virtual void SaveMetadata(const CbObject& Data) override; virtual CbObject LoadMetadata() override; virtual CbObject GetSettings() override { return {}; } virtual void ParseSettings(const CbObjectView&) override {} virtual std::vector List() override; virtual void Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, PhaseStats& Stats) override; virtual void Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, PhaseStats& Stats) override; virtual void Touch(ParallelWork&, WorkerThreadPool&, const IoHash&, PhaseStats&) override {} virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override; private: std::filesystem::path m_StoragePath; std::filesystem::path m_StatePathName; std::filesystem::path m_CASPath; }; class S3Storage : public StorageBase { public: static constexpr std::string_view Prefix = "s3://"; static constexpr std::string_view Type = "s3"; static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u; S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize); virtual std::string Describe() const override { return fmt::format("s3://{}/{}", m_Client.BucketName(), m_KeyPrefix); } virtual void SaveMetadata(const CbObject& Data) override; virtual CbObject LoadMetadata() override; virtual CbObject GetSettings() override; virtual void ParseSettings(const CbObjectView& Settings) override; virtual std::vector List() override; virtual void Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, PhaseStats& Stats) override; virtual void Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, PhaseStats& Stats) override; virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) override; virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override; private: S3Client& m_Client; std::string m_KeyPrefix; std::filesystem::path m_TempDir; uint64_t m_MultipartChunkSize; }; /////////////////////////////////////////////////////////////////////// // FileStorage implementations FileStorage::FileStorage(std::filesystem::path ModulePath) : m_StoragePath(std::move(ModulePath)) { MakeSafeAbsolutePathInPlace(m_StoragePath); m_StatePathName = m_StoragePath / "current-state.cbo"; m_CASPath = m_StoragePath / "cas"; CreateDirectories(m_CASPath); } void FileStorage::SaveMetadata(const CbObject& Data) { BinaryWriter Output; SaveCompactBinary(Output, Data); WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize())); } CbObject FileStorage::LoadMetadata() { if (!IsFile(m_StatePathName)) { return {}; } FileContents Content = ReadFile(m_StatePathName); if (Content.ErrorCode) { ThrowSystemError(Content.ErrorCode.value(), "Failed to read state file"); } IoBuffer Payload = Content.Flatten(); CbValidateError Error; CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error); if (Error != CbValidateError::None) { throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error))); } return Result; } std::vector FileStorage::List() { DirectoryContent DirContent; GetDirectoryContent(m_CASPath, DirectoryContentFlags::IncludeFiles, DirContent); std::vector Result; Result.reserve(DirContent.Files.size()); for (const std::filesystem::path& Path : DirContent.Files) { IoHash Hash; if (IoHash::TryParse(Path.filename().string(), Hash)) { Result.push_back(Hash); } } return Result; } void FileStorage::Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, PhaseStats& Stats) { Work.ScheduleWork( WorkerPool, [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic& AbortFlag) { Stats.RecordThread(); if (!AbortFlag.load()) { std::filesystem::path DestPath = m_CASPath / fmt::format("{}", Hash); if (std::error_code Ec = CopyFile(SourcePath, DestPath, CopyFileOptions{.EnableClone = true}); Ec) { throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestPath)); } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); } }); } void FileStorage::Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, PhaseStats& Stats) { Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash), Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats]( std::atomic& AbortFlag) { Stats.RecordThread(); if (!AbortFlag.load()) { std::filesystem::path SourcePath = m_CASPath / fmt::format("{}", Hash); if (std::error_code Ec = CopyFile(SourcePath, DestinationPath, CopyFileOptions{.EnableClone = true}); Ec) { throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestinationPath)); } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); } }); } void FileStorage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) { ZEN_UNUSED(Work); ZEN_UNUSED(WorkerPool); DeleteDirectories(m_StoragePath); } /////////////////////////////////////////////////////////////////////// // S3Storage implementations S3Storage::S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize) : m_Client(Client) , m_KeyPrefix(std::move(KeyPrefix)) , m_TempDir(std::move(TempDir)) , m_MultipartChunkSize(MultipartChunkSize) { } void S3Storage::SaveMetadata(const CbObject& Data) { BinaryWriter Output; SaveCompactBinary(Output, Data); IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize()); std::string Key = m_KeyPrefix + "/incremental-state.cbo"; S3Result Result = m_Client.PutObject(Key, std::move(Payload)); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error); } } CbObject S3Storage::LoadMetadata() { std::string Key = m_KeyPrefix + "/incremental-state.cbo"; S3GetObjectResult Result = m_Client.GetObject(Key); if (!Result.IsSuccess()) { if (Result.Error == S3GetObjectResult::NotFoundErrorText) { return {}; } throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error); } CbValidateError Error; CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error); if (Error != CbValidateError::None) { throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error)); } return Meta; } CbObject S3Storage::GetSettings() { CbObjectWriter Writer; Writer << "MultipartChunkSize" << m_MultipartChunkSize; return Writer.Save(); } void S3Storage::ParseSettings(const CbObjectView& Settings) { m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(S3Storage::DefaultMultipartChunkSize); } std::vector S3Storage::List() { std::string CasPrefix = m_KeyPrefix + "/cas/"; S3ListObjectsResult Result = m_Client.ListObjects(CasPrefix); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to list S3 objects under '{}': {}", CasPrefix, Result.Error); } std::vector Hashes; Hashes.reserve(Result.Objects.size()); for (const S3ObjectInfo& Obj : Result.Objects) { size_t LastSlash = Obj.Key.rfind('/'); if (LastSlash == std::string::npos) { continue; } IoHash Hash; if (IoHash::TryParse(Obj.Key.substr(LastSlash + 1), Hash)) { Hashes.push_back(Hash); } } return Hashes; } void S3Storage::Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, PhaseStats& Stats) { Work.ScheduleWork( WorkerPool, [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic& AbortFlag) { Stats.RecordThread(); if (AbortFlag.load()) { return; } S3Client& Client = m_Client; std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) { BasicFile File(SourcePath, BasicFile::Mode::kRead); S3Result Result = Client.PutObjectMultipart( Key, Size, [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); }, m_MultipartChunkSize); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); } } else { BasicFile File(SourcePath, BasicFile::Mode::kRead); S3Result Result = Client.PutObject(Key, File.ReadAll()); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); } } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); }); } void S3Storage::Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, PhaseStats& Stats) { std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) { class WorkData { public: WorkData(const std::filesystem::path& DestPath, uint64_t Size) : m_DestFile(DestPath, BasicFile::Mode::kTruncate) { PrepareFileForScatteredWrite(m_DestFile.Handle(), Size); } ~WorkData() { m_DestFile.Flush(); } void Write(const void* Data, uint64_t Size, uint64_t Offset) { m_DestFile.Write(Data, Size, Offset); } private: BasicFile m_DestFile; }; std::shared_ptr Data = std::make_shared(DestinationPath, Size); uint64_t Offset = 0; while (Offset < Size) { uint64_t ChunkSize = std::min(m_MultipartChunkSize, Size - Offset); Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data, &Stats](std::atomic& AbortFlag) { Stats.RecordThread(); if (AbortFlag) { return; } S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize); if (!Chunk.IsSuccess()) { throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", Key, Offset, Offset + ChunkSize - 1, Chunk.Error); } Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset); Stats.Bytes.fetch_add(ChunkSize, std::memory_order_relaxed); }); Offset += ChunkSize; } } else { Work.ScheduleWork( WorkerPool, [this, Key = Key, Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats](std::atomic& AbortFlag) { Stats.RecordThread(); if (AbortFlag) { return; } S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir); if (!Chunk.IsSuccess()) { throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error); } if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef)) { std::error_code Ec; std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec); if (Ec) { WriteFile(DestinationPath, Chunk.Content); } else { Chunk.Content.SetDeleteOnClose(false); Chunk.Content = {}; RenameFile(ChunkPath, DestinationPath, Ec); if (Ec) { Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath); Chunk.Content.SetDeleteOnClose(true); WriteFile(DestinationPath, Chunk.Content); } } } else { WriteFile(DestinationPath, Chunk.Content); } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); }); } } void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) { Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash), &Stats](std::atomic& AbortFlag) { Stats.RecordThread(); if (AbortFlag.load()) { return; } std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); S3Result Result = m_Client.Touch(Key); if (!Result.IsSuccess()) { throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error); } }); } void S3Storage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) { std::string ModulePrefix = m_KeyPrefix + "/"; S3ListObjectsResult ListResult = m_Client.ListObjects(ModulePrefix); if (!ListResult.IsSuccess()) { throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", ModulePrefix, ListResult.Error); } for (const S3ObjectInfo& Obj : ListResult.Objects) { Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic& AbortFlag) { if (AbortFlag.load()) { return; } S3Result DelResult = m_Client.DeleteObject(Key); if (!DelResult.IsSuccess()) { throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error); } }); } } /////////////////////////////////////////////////////////////////////// // IncrementalHydrator: the only HydrationStrategyBase implementation. // Summary emission for hydrate/dehydrate operations. void LogDehydrateSummary(std::string_view Prefix, const DehydrateStatistics& Stats, std::string_view ModuleId, const std::filesystem::path& Source, std::string_view Target) { const uint64_t HashUs = Stats.Hash.ElapsedUs.load(); const uint64_t UploadUs = Stats.Upload.ElapsedUs.load(); ZEN_INFO( "{} module '{}': {} files ({}) in {}\n" " Source: {}\n" " Target: {}\n" " Load state: {}\n" " Dir scan: {}\n" " Hash phase: {} {}/{} ({}) hashed, {}bits/s, {} threads\n" " List existing: {}\n" " Upload phase: {} {}/{} ({}) uploaded, {} ({}) touched, {}bits/s, {} threads\n" " Metadata save: {}\n" " Clean: {}", Prefix, ModuleId, ThousandsNum(Stats.TotalFiles.load()), NiceBytes(Stats.TotalBytes.load()), NiceLatencyNs(Stats.TotalUs.load() * 1000), Source.generic_string(), Target, NiceLatencyNs(Stats.LoadStateUs.load() * 1000), NiceLatencyNs(Stats.DirScanUs.load() * 1000), NiceLatencyNs(HashUs * 1000), ThousandsNum(Stats.Hash.Files.load()), ThousandsNum(Stats.TotalFiles.load()), NiceBytes(Stats.Hash.Bytes.load()), NiceNum(BitsPerSecond(Stats.Hash.Bytes.load(), HashUs)), Stats.Hash.ThreadIds.size(), NiceLatencyNs(Stats.ListExistingUs.load() * 1000), NiceLatencyNs(UploadUs * 1000), ThousandsNum(Stats.Upload.Files.load()), ThousandsNum(Stats.TotalFiles.load()), NiceBytes(Stats.Upload.Bytes.load()), ThousandsNum(Stats.Touch.Files.load()), NiceBytes(Stats.Touch.Bytes.load()), NiceNum(BitsPerSecond(Stats.Upload.Bytes.load(), UploadUs)), Stats.Upload.ThreadIds.size(), NiceLatencyNs(Stats.MetadataSaveUs.load() * 1000), NiceLatencyNs(Stats.CleanUs.load() * 1000)); } void LogHydrateSummary(std::string_view Prefix, const HydrateStatistics& Stats, std::string_view ModuleId, std::string_view Source, const std::filesystem::path& Target) { const uint64_t DownloadUs = Stats.Download.ElapsedUs.load(); ZEN_INFO( "{} module '{}': {} files ({}) in {}\n" " Source: {}\n" " Target: {}\n" " Load metadata: {}\n" " Download phase: {} {}/{} ({}) downloaded, {}bits/s, {} threads\n" " Clean: {}\n" " Rename/copy: {}\n" " Verify scan: {}", Prefix, ModuleId, ThousandsNum(Stats.TotalFiles.load()), NiceBytes(Stats.TotalBytes.load()), NiceLatencyNs(Stats.TotalUs.load() * 1000), Source, Target.generic_string(), NiceLatencyNs(Stats.LoadMetadataUs.load() * 1000), NiceLatencyNs(DownloadUs * 1000), ThousandsNum(Stats.Download.Files.load()), ThousandsNum(Stats.TotalFiles.load()), NiceBytes(Stats.Download.Bytes.load()), NiceNum(BitsPerSecond(Stats.Download.Bytes.load(), DownloadUs)), Stats.Download.ThreadIds.size(), NiceLatencyNs(Stats.CleanUs.load() * 1000), NiceLatencyNs(Stats.RenameOrCopyUs.load() * 1000), NiceLatencyNs(Stats.VerifyScanUs.load() * 1000)); } /////////////////////////////////////////////////////////////////////// // Holds a per-module StorageBase and threading context; drives the // hydrate/dehydrate algorithm. class IncrementalHydrator : public HydrationStrategyBase { public: IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr Storage); virtual ~IncrementalHydrator() override; virtual void Dehydrate(const CbObject& CachedState) override; virtual CbObject Hydrate() override; virtual void Obliterate() override; private: struct Entry { std::filesystem::path RelativePath; uint64_t Size; uint64_t ModTick; IoHash Hash; }; std::unique_ptr m_Storage; HydrationConfig m_Config; WorkerThreadPool m_FallbackWorkPool; std::atomic m_FallbackAbortFlag{false}; std::atomic m_FallbackPauseFlag{false}; HydrationConfig::ThreadingOptions m_Threading{.WorkerPool = &m_FallbackWorkPool, .AbortFlag = &m_FallbackAbortFlag, .PauseFlag = &m_FallbackPauseFlag}; }; /////////////////////////////////////////////////////////////////////// // IncrementalHydrator implementations IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr Storage) : m_Storage(std::move(Storage)) , m_Config(Config) , m_FallbackWorkPool(0) { if (Config.Threading) { m_Threading = *Config.Threading; } } IncrementalHydrator::~IncrementalHydrator() { m_Storage.reset(); } void IncrementalHydrator::Dehydrate(const CbObject& CachedState) { Stopwatch TotalTimer; DehydrateStatistics Stats; const std::string StorageTarget = m_Storage->Describe(); const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); try { std::unordered_map StateEntryLookup; std::vector StateEntries; { Stopwatch LoadStateTimer; for (CbFieldView FieldView : CachedState["Files"].AsArrayView()) { CbObjectView EntryView = FieldView.AsObjectView(); std::filesystem::path RelativePath(EntryView["Path"].AsString()); uint64_t Size = EntryView["Size"].AsUInt64(); uint64_t ModTick = EntryView["ModTick"].AsUInt64(); IoHash Hash = EntryView["Hash"].AsHash(); StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size()); StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash}); } Stats.LoadStateUs = LoadStateTimer.GetElapsedTimeUs(); } DirectoryContent DirContent; { Stopwatch DirScanTimer; GetDirectoryContent(*m_Threading.WorkerPool, ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, DirContent); Stats.DirScanUs = DirScanTimer.GetElapsedTimeUs(); } ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files", m_Config.ModuleId, m_Config.ServerStateDir, DirContent.Files.size(), NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0)))); std::vector Entries; Entries.resize(DirContent.Files.size()); uint64_t TotalBytes = 0; uint64_t TotalFiles = 0; std::unordered_set ExistsLookup; { Stopwatch HashTimer; ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) { const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]); if (AbsPath.filename() == "reserve.gc") { continue; } const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); if (*RelativePath.begin() == ".sentry-native") { continue; } if (RelativePath == ".lock") { continue; } Entry& CurrentEntry = Entries[TotalFiles]; CurrentEntry.RelativePath = RelativePath; CurrentEntry.Size = DirContent.FileSizes[FileIndex]; CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex]; bool FoundHash = false; if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end()) { const Entry& StateEntry = StateEntries[KnownIt->second]; if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick) { CurrentEntry.Hash = StateEntry.Hash; FoundHash = true; } } if (!FoundHash) { Work.ScheduleWork(*m_Threading.WorkerPool, [AbsPath, EntryIndex = TotalFiles, &Entries, &Stats](std::atomic& AbortFlag) { Stats.Hash.RecordThread(); if (AbortFlag) { return; } Entry& CurrentEntry = Entries[EntryIndex]; bool FoundHash = false; if (AbsPath.extension().empty()) { auto It = CurrentEntry.RelativePath.begin(); if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas")) { IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed( SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), RawHash, RawSize); if (Compressed) { // We compose a meta-hash since taking the RawHash might collide with an // existing non-compressed file with the same content The collision is // unlikely except if the compressed data is zero bytes causing RawHash // to be the same as an empty file. IoHashStream Hasher; Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash)); Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size)); CurrentEntry.Hash = Hasher.GetHash(); FoundHash = true; } } } if (!FoundHash) { CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath)); } Stats.Hash.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed); }); Stats.Hash.Files.fetch_add(1, std::memory_order_relaxed); } TotalFiles++; TotalBytes += CurrentEntry.Size; } { Stopwatch ListTimer; std::vector ExistingEntries = m_Storage->List(); ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end()); Stats.ListExistingUs = ListTimer.GetElapsedTimeUs(); } Work.Wait(); Entries.resize(TotalFiles); Stats.Hash.ElapsedUs = HashTimer.GetElapsedTimeUs(); Stats.TotalFiles = TotalFiles; Stats.TotalBytes = TotalBytes; } uint64_t UploadDurationMs = 0; { Stopwatch UploadTimer; ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (const Entry& CurrentEntry : Entries) { if (!ExistsLookup.contains(CurrentEntry.Hash)) { m_Storage->Put(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath), Stats.Upload); Stats.Upload.Files.fetch_add(1, std::memory_order_relaxed); } else { // Refresh the backend's modification time so lifecycle-expiration policies // do not evict CAS entries that are still referenced by this module. m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, Stats.Touch); Stats.Touch.Files.fetch_add(1, std::memory_order_relaxed); Stats.Touch.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed); } } Work.Wait(); Stats.Upload.ElapsedUs = UploadTimer.GetElapsedTimeUs(); UploadDurationMs = TotalTimer.GetElapsedTimeMs(); Stopwatch MetadataTimer; UtcTime Now = UtcTime::Now(); std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", Now.Tm.tm_year + 1900, Now.Tm.tm_mon + 1, Now.Tm.tm_mday, Now.Tm.tm_hour, Now.Tm.tm_min, Now.Tm.tm_sec, Now.Ms); CbObjectWriter Meta; Meta << "SourceFolder" << ServerStateDir.generic_string(); Meta << "ModuleId" << m_Config.ModuleId; Meta << "HostName" << GetMachineName(); Meta << "UploadTimeUtc" << UploadTimeUtc; Meta << "UploadDurationMs" << UploadDurationMs; Meta << "TotalSizeBytes" << TotalBytes; Meta << "StorageSettings" << m_Storage->GetSettings(); Meta.BeginArray("Files"); for (const Entry& CurrentEntry : Entries) { Meta.BeginObject(); { Meta << "Path" << CurrentEntry.RelativePath.generic_string(); Meta << "Size" << CurrentEntry.Size; Meta << "ModTick" << CurrentEntry.ModTick; Meta << "Hash" << CurrentEntry.Hash; } Meta.EndObject(); } Meta.EndArray(); m_Storage->SaveMetadata(Meta.Save()); Stats.MetadataSaveUs = MetadataTimer.GetElapsedTimeUs(); } ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); { Stopwatch CleanTimer; CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); Stats.CleanUs = CleanTimer.GetElapsedTimeUs(); } Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); LogDehydrateSummary("Dehydration complete", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); } catch (const std::exception& Ex) { ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir); Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); LogDehydrateSummary("Dehydration failed", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); } } CbObject IncrementalHydrator::Hydrate() { Stopwatch TotalTimer; HydrateStatistics Stats; const std::string StorageSource = m_Storage->Describe(); const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); try { CbObject Meta; { Stopwatch LoadTimer; Meta = m_Storage->LoadMetadata(); Stats.LoadMetadataUs = LoadTimer.GetElapsedTimeUs(); } if (!Meta) { ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'", m_Config.ModuleId, m_Config.ServerStateDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); return CbObject(); } std::unordered_map EntryLookup; std::vector Entries; uint64_t TotalSize = 0; for (CbFieldView FieldView : Meta["Files"]) { CbObjectView EntryView = FieldView.AsObjectView(); if (EntryView) { Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()), .Size = EntryView["Size"].AsUInt64(), .ModTick = EntryView["ModTick"].AsUInt64(), .Hash = EntryView["Hash"].AsHash()}; TotalSize += NewEntry.Size; EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size()); Entries.emplace_back(std::move(NewEntry)); } } Stats.TotalFiles = Entries.size(); Stats.TotalBytes = TotalSize; ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files", m_Config.ModuleId, m_Config.ServerStateDir, Entries.size(), NiceBytes(TotalSize)); m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView()); { Stopwatch DownloadTimer; ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (const Entry& CurrentEntry : Entries) { std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath); CreateDirectories(Path.parent_path()); m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path, Stats.Download); Stats.Download.Files.fetch_add(1, std::memory_order_relaxed); } Work.Wait(); Stats.Download.ElapsedUs = DownloadTimer.GetElapsedTimeUs(); } // Downloaded successfully - swap into ServerStateDir ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); { Stopwatch CleanTimer; CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); Stats.CleanUs = CleanTimer.GetElapsedTimeUs(); } { Stopwatch RenameTimer; // If the two paths share at least one common component they are on the same drive/volume // and atomic renames will succeed. Otherwise fall back to a full copy. auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end()); if (ItTmp != TempDir.begin()) { DirectoryContent DirContent; GetDirectoryContent(*m_Threading.WorkerPool, TempDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, DirContent); for (const std::filesystem::path& AbsPath : DirContent.Directories) { std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest)); } } for (const std::filesystem::path& AbsPath : DirContent.Files) { std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); std::error_code Ec = RenameFileWithRetry(AbsPath, Dest); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest)); } } ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); } else { // Slow path: TempDir and ServerStateDir are on different filesystems, so rename // would fail. Copy the tree instead and clean up the temp files afterwards. ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); CopyTree(TempDir, ServerStateDir, {.EnableClone = true}); ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); } Stats.RenameOrCopyUs = RenameTimer.GetElapsedTimeUs(); } CbObject StateObject; { Stopwatch VerifyTimer; DirectoryContent DirContent; GetDirectoryContent(*m_Threading.WorkerPool, ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, DirContent); CbObjectWriter HydrateState; HydrateState.BeginArray("Files"); for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) { std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end()) { HydrateState.BeginObject(); { HydrateState << "Path" << RelativePath.generic_string(); HydrateState << "Size" << DirContent.FileSizes[FileIndex]; HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex]; HydrateState << "Hash" << Entries[It->second].Hash; } HydrateState.EndObject(); } else { ZEN_ASSERT(false); } } HydrateState.EndArray(); StateObject = HydrateState.Save(); Stats.VerifyScanUs = VerifyTimer.GetElapsedTimeUs(); } Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); LogHydrateSummary("Hydration complete", Stats, m_Config.ModuleId, StorageSource, ServerStateDir); return StateObject; } catch (const std::exception& Ex) { ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'", m_Config.ModuleId, Ex.what(), m_Config.ServerStateDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); LogHydrateSummary("Hydration failed", Stats, m_Config.ModuleId, StorageSource, ServerStateDir); return {}; } } void IncrementalHydrator::Obliterate() { const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); try { ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); m_Storage->Delete(Work, *m_Threading.WorkerPool); Work.Wait(); } catch (const std::exception& Ex) { ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what()); } CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); } } // namespace hydration_impl /////////////////////////////////////////////////////////////////////////// // HydrationBase subclasses - own hub-wide backend state, hand per-module // storages the exact inputs they need in CreateHydrator. class FileHydration : public HydrationBase { public: explicit FileHydration(const Configuration& Config); virtual std::unique_ptr CreateHydrator(const HydrationConfig& Config) override; private: std::filesystem::path m_StorageRoot; }; class S3Hydration : public HydrationBase { public: explicit S3Hydration(const Configuration& Config); virtual std::unique_ptr CreateHydrator(const HydrationConfig& Config) override; private: std::string m_Bucket; std::string m_Region; std::string m_Endpoint; bool m_PathStyle = false; std::string m_KeyPrefixRoot; SigV4Credentials m_Credentials; Ref m_CredentialProvider; std::unique_ptr m_Client; uint64_t m_DefaultMultipartChunkSize; }; /////////////////////////////////////////////////////////////////////////// // Implementations FileHydration::FileHydration(const Configuration& Config) { if (!Config.TargetSpecification.empty()) { m_StorageRoot = Utf8ToWide(Config.TargetSpecification.substr(hydration_impl::FileStorage::Prefix.length())); if (m_StorageRoot.empty()) { throw zen::runtime_error("Hydration config 'file' type requires a directory path"); } } else { CbObjectView Settings = Config.Options["settings"].AsObjectView(); std::string_view Path = Settings["path"].AsString(); if (Path.empty()) { throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"); } m_StorageRoot = Utf8ToWide(std::string(Path)); } MakeSafeAbsolutePathInPlace(m_StorageRoot); } std::unique_ptr FileHydration::CreateHydrator(const HydrationConfig& Config) { using namespace hydration_impl; return std::make_unique(Config, std::make_unique(m_StorageRoot / Config.ModuleId)); } S3Hydration::S3Hydration(const Configuration& Config) { using namespace hydration_impl; CbObjectView Settings = Config.Options["settings"].AsObjectView(); std::string_view Spec; if (!Config.TargetSpecification.empty()) { Spec = Config.TargetSpecification; Spec.remove_prefix(S3Storage::Prefix.size()); } else { std::string_view Uri = Settings["uri"].AsString(); if (Uri.empty()) { throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"); } Spec = Uri; Spec.remove_prefix(S3Storage::Prefix.size()); } size_t SlashPos = Spec.find('/'); m_Bucket = std::string(SlashPos != std::string_view::npos ? Spec.substr(0, SlashPos) : Spec); m_KeyPrefixRoot = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{}; if (m_Bucket.empty()) { throw zen::runtime_error("Incremental S3 hydration config requires a bucket name"); } std::string Region = std::string(Settings["region"].AsString()); if (Region.empty()) { Region = GetEnvVariable("AWS_DEFAULT_REGION"); } if (Region.empty()) { Region = GetEnvVariable("AWS_REGION"); } if (Region.empty()) { Region = "us-east-1"; } m_Region = std::move(Region); std::string_view Endpoint = Settings["endpoint"].AsString(); if (!Endpoint.empty()) { m_Endpoint = std::string(Endpoint); m_PathStyle = Settings["path-style"].AsBool(); } std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); if (AccessKeyId.empty()) { m_CredentialProvider = Ref(new ImdsCredentialProvider({})); } else { m_Credentials.AccessKeyId = std::move(AccessKeyId); m_Credentials.SecretAccessKey = GetEnvVariable("AWS_SECRET_ACCESS_KEY"); m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); } m_DefaultMultipartChunkSize = Settings["chunksize"].AsUInt64(S3Storage::DefaultMultipartChunkSize); S3ClientOptions ClientOptions; ClientOptions.BucketName = m_Bucket; ClientOptions.Region = m_Region; ClientOptions.Endpoint = m_Endpoint; ClientOptions.PathStyle = m_PathStyle; if (m_CredentialProvider) { ClientOptions.CredentialProvider = m_CredentialProvider; } else { ClientOptions.Credentials = m_Credentials; } ClientOptions.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u; m_Client = std::make_unique(ClientOptions); } std::unique_ptr S3Hydration::CreateHydrator(const HydrationConfig& Config) { using namespace hydration_impl; std::string KeyPrefix = m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}", m_KeyPrefixRoot, Config.ModuleId); return std::make_unique( Config, std::make_unique(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize)); } std::unique_ptr InitHydration(const HydrationBase::Configuration& Config) { using namespace hydration_impl; if (!Config.TargetSpecification.empty()) { if (StrCaseCompare(Config.TargetSpecification.substr(0, FileStorage::Prefix.length()), FileStorage::Prefix) == 0) { return std::make_unique(Config); } if (StrCaseCompare(Config.TargetSpecification.substr(0, S3Storage::Prefix.length()), S3Storage::Prefix) == 0) { return std::make_unique(Config); } throw zen::runtime_error("Unknown hydration strategy: {}", Config.TargetSpecification); } std::string_view Type = Config.Options["type"].AsString(); if (Type == FileStorage::Type) { return std::make_unique(Config); } if (Type == S3Storage::Type) { return std::make_unique(Config); } if (!Type.empty()) { throw zen::runtime_error("Unknown hydration target type '{}'", Type); } throw zen::runtime_error("No hydration target configured"); } #if ZEN_WITH_TESTS namespace { struct TestThreading { WorkerThreadPool WorkerPool; std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; HydrationConfig::ThreadingOptions Options{.WorkerPool = &WorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag}; explicit TestThreading(int ThreadCount) : WorkerPool(ThreadCount) {} }; /// Create a small file hierarchy under BaseDir: /// file_a.bin /// subdir/file_b.bin /// subdir/nested/file_c.bin /// Returns a vector of (relative path, content) pairs for later verification. typedef std::vector> TestFileList; TestFileList AddTestFiles(const std::filesystem::path& BaseDir, TestFileList& Files) { auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) { std::filesystem::path FullPath = BaseDir / RelPath; CreateDirectories(FullPath.parent_path()); WriteFile(FullPath, Content); Files.emplace_back(std::move(RelPath), std::move(Content)); }; AddFile("file_a.bin", CreateSemiRandomBlob(1024)); AddFile("subdir/file_b.bin", CreateSemiRandomBlob(2048)); AddFile("subdir/nested/file_c.bin", CreateSemiRandomBlob(512)); AddFile("subdir/nested/file_d.bin", CreateSemiRandomBlob(512)); AddFile("subdir/nested/file_e.bin", CreateSemiRandomBlob(512)); AddFile("subdir/nested/file_f.bin", CreateSemiRandomBlob(512)); return Files; } TestFileList CreateSmallTestTree(const std::filesystem::path& BaseDir) { TestFileList Files; AddTestFiles(BaseDir, Files); return Files; } TestFileList CreateTestTree(const std::filesystem::path& BaseDir) { TestFileList Files; AddTestFiles(BaseDir, Files); auto AddFile = [&](std::filesystem::path RelPath, IoBuffer Content) { std::filesystem::path FullPath = BaseDir / RelPath; CreateDirectories(FullPath.parent_path()); WriteFile(FullPath, Content); Files.emplace_back(std::move(RelPath), std::move(Content)); }; AddFile("subdir/nested/medium.bulk", CreateSemiRandomBlob(256u * 1024u)); AddFile("subdir/nested/big.bulk", CreateSemiRandomBlob(512u * 1024u)); AddFile("subdir/nested/huge.bulk", CreateSemiRandomBlob(9u * 1024u * 1024u)); AddFile("subdir/nested/biggest.bulk", CreateSemiRandomBlob(63u * 1024u * 1024u)); return Files; } void VerifyTree(const std::filesystem::path& Dir, const std::vector>& Expected) { for (const auto& [RelPath, Content] : Expected) { std::filesystem::path FullPath = Dir / RelPath; REQUIRE_MESSAGE(std::filesystem::exists(FullPath), FullPath.string()); BasicFile ReadBack(FullPath, BasicFile::Mode::kRead); IoBuffer ReadContent = ReadBack.ReadRange(0, ReadBack.FileSize()); REQUIRE_EQ(ReadContent.GetSize(), Content.GetSize()); CHECK(std::memcmp(ReadContent.GetData(), Content.GetData(), Content.GetSize()) == 0); } } } // namespace TEST_SUITE_BEGIN("server.hydration"); // --------------------------------------------------------------------------- // FileHydrator tests // --------------------------------------------------------------------------- TEST_CASE("hydration.file.dehydrate_hydrate") { ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); const std::string ModuleId = "testmodule"; auto TestFiles = CreateSmallTestTree(ServerStateDir); auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; // Dehydrate: copy server state to file store Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // Verify the module folder exists in the store and ServerStateDir was wiped CHECK(std::filesystem::exists(HydrationStore / ModuleId)); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: restore server state from file store Hydration->CreateHydrator(Config)->Hydrate(); // Verify restored contents match the original VerifyTree(ServerStateDir, TestFiles); } TEST_CASE("hydration.file.hydrate_overwrites_existing_state") { ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); auto TestFiles = CreateSmallTestTree(ServerStateDir); auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule"}; Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // Put a stale file in ServerStateDir to simulate leftover state WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); // Hydrate - must wipe stale file and restore original Hydration->CreateHydrator(Config)->Hydrate(); CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin")); VerifyTree(ServerStateDir, TestFiles); } TEST_CASE("hydration.file.excluded_files_not_dehydrated") { ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); auto TestFiles = CreateSmallTestTree(ServerStateDir); // Add files that the dehydrator should skip WriteFile(ServerStateDir / "reserve.gc", CreateSemiRandomBlob(64)); CreateDirectories(ServerStateDir / ".sentry-native"); WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32)); WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128)); auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule_excl"}; Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // Hydrate into a clean directory CleanDirectory(ServerStateDir, true); Hydration->CreateHydrator(Config)->Hydrate(); // Normal files must be restored VerifyTree(ServerStateDir, TestFiles); // Excluded files must NOT be restored CHECK_FALSE(std::filesystem::exists(ServerStateDir / "reserve.gc")); CHECK_FALSE(std::filesystem::exists(ServerStateDir / ".sentry-native")); } // --------------------------------------------------------------------------- // FileHydrator obliterate test // --------------------------------------------------------------------------- TEST_CASE("hydration.file.obliterate") { ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); const std::string ModuleId = "obliterate_test"; CreateSmallTestTree(ServerStateDir); auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; // Dehydrate so the backend store has data Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CHECK(std::filesystem::exists(HydrationStore / ModuleId)); // Put some files back in ServerStateDir and TempDir to verify cleanup CreateSmallTestTree(ServerStateDir); WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); // Obliterate Hydration->CreateHydrator(Config)->Obliterate(); // Backend store directory deleted CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId)); // ServerStateDir cleaned CHECK(std::filesystem::is_empty(ServerStateDir)); // TempDir cleaned CHECK(std::filesystem::is_empty(HydrationTemp)); } // --------------------------------------------------------------------------- // FileHydrator concurrent test // --------------------------------------------------------------------------- TEST_CASE("hydration.file.concurrent") { // N modules dehydrate and hydrate concurrently via ParallelWork. // Each module operates in its own directory - tests for global/static state races. constexpr int kModuleCount = 4; ScopedTemporaryDirectory TempDir; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; CreateDirectories(HydrationStore); TestThreading Threading(8); struct ModuleData { HydrationConfig Config; std::vector> Files; }; std::vector Modules(kModuleCount); auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("file_concurrent_{}", I); std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; CreateDirectories(StateDir); CreateDirectories(TempPath); Modules[I].Config.ServerStateDir = StateDir; Modules[I].Config.TempDir = TempPath; Modules[I].Config.ModuleId = ModuleId; Modules[I].Config.Threading = Threading.Options; Modules[I].Files = CreateSmallTestTree(StateDir); } // Concurrent dehydrate { WorkerThreadPool Pool(kModuleCount, "hydration_file_dehy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic&) { Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Concurrent hydrate { WorkerThreadPool Pool(kModuleCount, "hydration_file_hy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic&) { Hydration->CreateHydrator(Config)->Hydrate(); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Verify all modules restored correctly for (int I = 0; I < kModuleCount; ++I) { VerifyTree(Modules[I].Config.ServerStateDir, Modules[I].Files); } } // --------------------------------------------------------------------------- // S3Hydrator tests // // Each test case spawns a local MinIO instance (self-contained, no external setup needed). // The MinIO binary must be present in the same directory as the test executable (copied by xmake). // --------------------------------------------------------------------------- TEST_CASE("hydration.s3.dehydrate_hydrate") { MinioProcessOptions MinioOpts; MinioOpts.Port = 19011; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_roundtrip"}; // Hydrate with no prior S3 state (first-boot path). Pre-populate ServerStateDir // with a stale file to confirm the cleanup branch wipes it. WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); Hydration->CreateHydrator(Config)->Hydrate(); CHECK(std::filesystem::is_empty(ServerStateDir)); // v1: dehydrate without a marker file CreateSmallTestTree(ServerStateDir); Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // v2: dehydrate WITH a marker file that only v2 has CreateSmallTestTree(ServerStateDir); WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64)); Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); // Hydrate must restore v2 (the latest dehydrated state) CleanDirectory(ServerStateDir, true); Hydration->CreateHydrator(Config)->Hydrate(); // v2 marker must be present - confirms the second dehydration overwrote the first CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin")); CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "file_b.bin")); CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "nested" / "file_c.bin")); } TEST_CASE("hydration.s3.concurrent") { // N modules dehydrate and hydrate concurrently against MinIO. // Each module has a distinct ModuleId, so S3 key prefixes don't overlap. MinioProcessOptions MinioOpts; MinioOpts.Port = 19013; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); constexpr int kModuleCount = 6; constexpr int kThreadCount = 4; TestThreading Threading(kThreadCount); ScopedTemporaryDirectory TempDir; struct ModuleData { HydrationConfig Config; std::vector> Files; }; std::vector Modules(kModuleCount); HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); for (int I = 0; I < kModuleCount; ++I) { std::string ModuleId = fmt::format("s3_concurrent_{}", I); std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; CreateDirectories(StateDir); CreateDirectories(TempPath); Modules[I].Config.ServerStateDir = StateDir; Modules[I].Config.TempDir = TempPath; Modules[I].Config.ModuleId = ModuleId; Modules[I].Config.Threading = Threading.Options; Modules[I].Files = CreateTestTree(StateDir); } // Concurrent dehydrate { WorkerThreadPool Pool(kThreadCount, "hydration_s3_dehy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic&) { Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Concurrent hydrate { WorkerThreadPool Pool(kThreadCount, "hydration_s3_hy"); std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (int I = 0; I < kModuleCount; ++I) { Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic&) { CleanDirectory(Config.ServerStateDir, true); Hydration->CreateHydrator(Config)->Hydrate(); }); } Work.Wait(); CHECK_FALSE(Work.IsAborted()); } // Verify all modules restored correctly for (int I = 0; I < kModuleCount; ++I) { VerifyTree(Modules[I].Config.ServerStateDir, Modules[I].Files); } } TEST_CASE("hydration.s3.obliterate") { MinioProcessOptions MinioOpts; MinioOpts.Port = 19019; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); const std::string ModuleId = "s3test_obliterate"; HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; // Dehydrate to populate backend CreateSmallTestTree(ServerStateDir); Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); auto ListModuleObjects = [&]() { S3ClientOptions Opts; Opts.BucketName = "zen-hydration-test"; Opts.Endpoint = Minio.Endpoint(); Opts.PathStyle = true; Opts.Credentials.AccessKeyId = Minio.RootUser(); Opts.Credentials.SecretAccessKey = Minio.RootPassword(); S3Client Client(Opts); return Client.ListObjects(ModuleId + "/"); }; // Verify objects exist in S3 CHECK(!ListModuleObjects().Objects.empty()); // Re-populate ServerStateDir and TempDir for cleanup verification CreateSmallTestTree(ServerStateDir); WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); // Obliterate Hydration->CreateHydrator(Config)->Obliterate(); // Verify S3 objects deleted CHECK(ListModuleObjects().Objects.empty()); // Local directories cleaned CHECK(std::filesystem::is_empty(ServerStateDir)); CHECK(std::filesystem::is_empty(HydrationTemp)); } TEST_CASE("hydration.s3.config_overrides") { MinioProcessOptions MinioOpts; MinioOpts.Port = 19015; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); // Path prefix: "s3://bucket/some/prefix" stores objects under // "some/prefix//..." rather than directly under "/...". { auto TestFiles = CreateSmallTestTree(ServerStateDir); HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format( R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_prefix"}; Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CleanDirectory(ServerStateDir, true); Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(ServerStateDir, TestFiles); } // Region override: 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION. // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options. { CleanDirectory(ServerStateDir, true); auto TestFiles = CreateSmallTestTree(ServerStateDir); ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region"); HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format( R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3test_region_override"}; Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); CleanDirectory(ServerStateDir, true); Hydration->CreateHydrator(Config)->Hydrate(); VerifyTree(ServerStateDir, TestFiles); } } TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip()) { MinioProcessOptions MinioOpts; MinioOpts.Port = 19010; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); ScopedTemporaryDirectory TempDir; std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); const std::string ModuleId = "s3test_performance"; CopyTree("E:\\Dev\\hub\\brainrot\\20260402-225355-508", ServerStateDir, {.EnableClone = true}); // auto TestFiles = CreateTestTree(ServerStateDir); TestThreading Threading(4); HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId, .Threading = Threading.Options}; // Dehydrate: upload server state to MinIO ZEN_INFO("============== DEHYDRATE =============="); Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); for (size_t I = 0; I < 1; I++) { // Wipe server state CleanDirectory(ServerStateDir, true); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: download from MinIO back to server state ZEN_INFO("=============== HYDRATE ==============="); Hydration->CreateHydrator(Config)->Hydrate(); } } //#define REAL_DATA_PATH "E:\\Dev\\hub\\zenddc\\Zen" //#define REAL_DATA_PATH "E:\\Dev\\hub\\brainrot\\20260402-225355-508" TEST_CASE("hydration.file.incremental") { std::filesystem::path TmpPath; # ifdef REAL_DATA_PATH TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub"; # endif ScopedTemporaryDirectory TempDir(TmpPath); std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); const std::string ModuleId = "testmodule"; // auto TestFiles = CreateTestTree(ServerStateDir); TestThreading Threading(4); auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId, .Threading = Threading.Options}; // Hydrate with no prior state CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); CHECK_FALSE(HydrationState); # ifdef REAL_DATA_PATH ZEN_INFO("Writing state data..."); CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true}); ZEN_INFO("Writing state data complete"); # else // Create test files and dehydrate auto TestFiles = CreateTestTree(ServerStateDir); # endif Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: restore from file store HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // Dehydrate again with cached state (should skip re-uploading unchanged files) Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate one more time to confirm second dehydrate produced valid state HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); // Replace files and dehydrate TestFiles = CreateTestTree(ServerStateDir); Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); // Hydrate one more time to confirm second dehydrate produced valid state HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // 0 // Dehydrate, nothing touched - no hashing, no upload Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); } // --------------------------------------------------------------------------- // S3Storage test // --------------------------------------------------------------------------- TEST_CASE("hydration.s3.incremental") { MinioProcessOptions MinioOpts; MinioOpts.Port = 19017; MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); std::filesystem::path TmpPath; # ifdef REAL_DATA_PATH TmpPath = std::filesystem::path(REAL_DATA_PATH).parent_path() / "hub"; # endif ScopedTemporaryDirectory TempDir(TmpPath); std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); const std::string ModuleId = "s3test_incremental"; TestThreading Threading(8); HydrationBase::Configuration BaseConfig; { std::string ConfigJson = fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", Minio.Endpoint()); std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); BaseConfig.Options = std::move(Root).AsObject(); } auto Hydration = InitHydration(BaseConfig); HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId, .Threading = Threading.Options}; // Hydrate with no prior state CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); CHECK_FALSE(HydrationState); # ifdef REAL_DATA_PATH ZEN_INFO("Writing state data..."); CopyTree(REAL_DATA_PATH, ServerStateDir, {.EnableClone = true}); ZEN_INFO("Writing state data complete"); # else // Create test files and dehydrate auto TestFiles = CreateTestTree(ServerStateDir); # endif Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate: restore from S3 HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // Dehydrate again with cached state (should skip re-uploading unchanged files) Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); CHECK(std::filesystem::is_empty(ServerStateDir)); // Hydrate one more time to confirm second dehydrate produced valid state HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); // Replace files and dehydrate TestFiles = CreateTestTree(ServerStateDir); Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); // Hydrate one more time to confirm second dehydrate produced valid state HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); # ifndef REAL_DATA_PATH VerifyTree(ServerStateDir, TestFiles); # endif // 0 // Dehydrate, nothing touched - no hashing, no upload Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); } TEST_CASE("hydration.create_hydrator_rejects_invalid_config") { // Unknown TargetSpecification prefix CHECK_THROWS(InitHydration({.TargetSpecification = "ftp://somewhere"})); // Unknown Options type { std::string ParseError; CbFieldIterator Root = LoadCompactBinaryFromJson(R"({"type":"dynamodb"})", ParseError); ZEN_ASSERT(ParseError.empty() && Root.IsObject()); CHECK_THROWS(InitHydration({.Options = std::move(Root).AsObject()})); } // Empty Options (no type field) CHECK_THROWS(InitHydration({})); } TEST_SUITE_END(); void hydration_forcelink() { } #endif // ZEN_WITH_TESTS } // namespace zen